Skip to main content

Dagster & dbt (Pythonic)

note

If you are just getting started with the dbt integration, we recommend using the new dbt component.

Dagster orchestrates dbt alongside other technologies, so you can schedule dbt with Spark, Python, etc. in a single data pipeline. Dagster's asset-oriented approach allows Dagster to understand dbt at the level of individual dbt models.

1. Prepare a Dagster project

To begin, you'll need a Dagster project. You can use an existing components-ready project or create a new one:

create-dagster project my-project && cd my-project

Activate the project virtual environment:

source .venv/bin/activate

Then, add the dagster-dbt library to the project, along with a duckdb adapter:

uv add dagster-dbt dbt-duckdb

2. Set up a dbt project

For this tutorial, we'll use the jaffle shop dbt project as an example. Clone it into your project:

git clone --depth=1 https://github.com/dbt-labs/jaffle_shop.git dbt && rm -rf dbt/.git

We will create a profiles.yml file in the dbt directory to configure the project to use DuckDB:

dbt/profiles.yml
jaffle_shop:
target: dev
outputs:
dev:
type: duckdb
path: tutorial.duckdb
threads: 24

3. Initialize the dbt assets

First create a dbt resource. This will point to the dbt project directory within the Dagster project directory:

my_project/defs/resources.py
from pathlib import Path

from dagster_dbt import DbtCliResource, DbtProject

import dagster as dg

dbt_project_directory = Path(__file__).absolute().parent / "dbt"
dbt_project = DbtProject(project_dir=dbt_project_directory)

dbt_resource = DbtCliResource(project_dir=dbt_project)


@dg.definitions
def resources():
return dg.Definitions(
resources={
"dbt": dbt_resource,
}
)

With the dbt resource defined, you can use the dbt project to generate the dbt assets:

my_project/defs/assets.py
from dagster_dbt import DbtCliResource, dbt_assets

import dagster as dg

from .resources import dbt_project, dbt_resource


@dbt_assets(manifest=dbt_project.manifest_path)
def dbt_models(context: dg.AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()

4. Run your dbt models

To execute your dbt models, you can use the dg launch command to kick off a run through the CLI:

dg launch --assets '*'

5. Handling incremental models

If you have incremental models in your dbt project, you can model these as partitioned assets, and update the command that is used to run the dbt models to pass in --vars based on the range of partitions that are being processed.

To enable partitioning for incremental dbt models in Dagster, we start by creating a new @dbt_assets definition. First, we add a selector (INCREMENTAL_SELECTOR) to identify incremental models and configure them with a daily_partition. This allows Dagster to determine which partition is running and pass the correct time window (min_date, max_date) into dbt using the vars argument. With this setup, dbt builds only the records within the active partition.

Next, we separate our dbt executions: one @dbt_assets function (dbt_analytics) excludes incremental models, while another (incremental_dbt_models) handles only those incremental assets with partitions. Inside incremental_dbt_models, we fetch the partition time window from the context, format it as variables, and pass them into dbt’s CLI command. This ensures dbt materializes only the partition-specific slice of data while keeping dependencies clear and maintainable.

my_project/defs/assets.py
import json

from dagster_dbt import DbtCliResource, dbt_assets

import dagster as dg

from .resources import dbt_project

INCREMENTAL_SELECTOR = "config.materialized:incremental"

daily_partition = dg.DailyPartitionsDefinition()


@dbt_assets(
manifest=dbt_project.manifest_path,
exclude=INCREMENTAL_SELECTOR,
)
def dbt_analytics(context: dg.AssetExecutionContext, dbt: DbtCliResource):
dbt_build_invocation = dbt.cli(["build"], context=context)

yield from dbt_build_invocation.stream()

run_results_json = dbt_build_invocation.get_artifact("run_results.json")
for result in run_results_json["results"]:
context.log.debug(result["compiled_code"])


@dbt_assets(
manifest=dbt_project.manifest_path,
select=INCREMENTAL_SELECTOR,
partitions_def=daily_partition,
)
def incremental_dbt_models(context: dg.AssetExecutionContext, dbt: DbtCliResource):
time_window = context.partition_time_window
dbt_vars = {
"min_date": time_window.start.strftime("%Y-%m-%d"),
"max_date": time_window.end.strftime("%Y-%m-%d"),
}

yield from dbt.cli(
["build", "--vars", json.dumps(dbt_vars)], context=context
).stream()