Dagster & dbt (Pythonic)
If you are just getting started with the dbt integration, we recommend using the new dbt component.
Dagster orchestrates dbt alongside other technologies, so you can schedule dbt with Spark, Python, etc. in a single data pipeline. Dagster's asset-oriented approach allows Dagster to understand dbt at the level of individual dbt models.
1. Prepare a Dagster project
To begin, you'll need a Dagster project. You can use an existing components-ready project or create a new one:
create-dagster project my-project && cd my-project
Activate the project virtual environment:
source .venv/bin/activate
Then, add the dagster-dbt
library to the project, along with a duckdb adapter:
- uv
- pip
uv add dagster-dbt dbt-duckdb
pip install dagster-dbt dbt-duckdb
2. Set up a dbt project
For this tutorial, we'll use the jaffle shop dbt project as an example. Clone it into your project:
git clone --depth=1 https://github.com/dbt-labs/jaffle_shop.git dbt && rm -rf dbt/.git
We will create a profiles.yml
file in the dbt
directory to configure the project to use DuckDB:
jaffle_shop:
target: dev
outputs:
dev:
type: duckdb
path: tutorial.duckdb
threads: 24
3. Initialize the dbt assets
First create a dbt resource. This will point to the dbt project directory within the Dagster project directory:
from pathlib import Path
from dagster_dbt import DbtCliResource, DbtProject
import dagster as dg
dbt_project_directory = Path(__file__).absolute().parent / "dbt"
dbt_project = DbtProject(project_dir=dbt_project_directory)
dbt_resource = DbtCliResource(project_dir=dbt_project)
@dg.definitions
def resources():
return dg.Definitions(
resources={
"dbt": dbt_resource,
}
)
With the dbt resource defined, you can use the dbt project to generate the dbt assets:
from dagster_dbt import DbtCliResource, dbt_assets
import dagster as dg
from .resources import dbt_project, dbt_resource
@dbt_assets(manifest=dbt_project.manifest_path)
def dbt_models(context: dg.AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()
4. Run your dbt models
To execute your dbt models, you can use the dg launch
command to kick off a run through the CLI:
dg launch --assets '*'
5. Handling incremental models
If you have incremental models in your dbt project, you can model these as partitioned assets, and update the command that is used to run the dbt models to pass in --vars
based on the range of partitions that are being processed.
To enable partitioning for incremental dbt models in Dagster, we start by creating a new @dbt_assets
definition. First, we add a selector (INCREMENTAL_SELECTOR
) to identify incremental models and configure them with a daily_partition
. This allows Dagster to determine which partition is running and pass the correct time window (min_date, max_date) into dbt using the vars argument. With this setup, dbt builds only the records within the active partition.
Next, we separate our dbt executions: one @dbt_assets
function (dbt_analytics
) excludes incremental models, while another (incremental_dbt_models
) handles only those incremental assets with partitions. Inside incremental_dbt_models
, we fetch the partition time window from the context, format it as variables, and pass them into dbt’s CLI command. This ensures dbt materializes only the partition-specific slice of data while keeping dependencies clear and maintainable.
import json
from dagster_dbt import DbtCliResource, dbt_assets
import dagster as dg
from .resources import dbt_project
INCREMENTAL_SELECTOR = "config.materialized:incremental"
daily_partition = dg.DailyPartitionsDefinition()
@dbt_assets(
manifest=dbt_project.manifest_path,
exclude=INCREMENTAL_SELECTOR,
)
def dbt_analytics(context: dg.AssetExecutionContext, dbt: DbtCliResource):
dbt_build_invocation = dbt.cli(["build"], context=context)
yield from dbt_build_invocation.stream()
run_results_json = dbt_build_invocation.get_artifact("run_results.json")
for result in run_results_json["results"]:
context.log.debug(result["compiled_code"])
@dbt_assets(
manifest=dbt_project.manifest_path,
select=INCREMENTAL_SELECTOR,
partitions_def=daily_partition,
)
def incremental_dbt_models(context: dg.AssetExecutionContext, dbt: DbtCliResource):
time_window = context.partition_time_window
dbt_vars = {
"min_date": time_window.start.strftime("%Y-%m-%d"),
"max_date": time_window.end.strftime("%Y-%m-%d"),
}
yield from dbt.cli(
["build", "--vars", json.dumps(dbt_vars)], context=context
).stream()