Skip to main content

Dagster & dlt with components

The dagster-dlt library provides a DltLoadCollectionComponent which can be used to easily represent a collection of dlt sources and pipelines as assets in Dagster.

1. Prepare a Dagster project

To begin, you'll need a Dagster project. You can use an existing components-ready project or create a new one:

uvx create-dagster project my-project && cd my-project/src

Activate the project virtual environment:

source ../.venv/bin/activate

Finally, add the dagster-dlt library to the project:

uv add dagster-dlt

2. Scaffold a dlt component

Now that you have a Dagster project, you can scaffold a dlt component. You may optionally provide the source and destination types, which will pull in the appropriate dlt source:

dg scaffold defs dagster_dlt.DltLoadCollectionComponent github_snowflake_ingest \
--source github --destination snowflake

The scaffold call will generate a basic defs.yaml file and a loads.py file:

tree my_project/defs
my_project/defs
├── __init__.py
└── github_snowflake_ingest
├── defs.yaml
├── github
│   ├── __init__.py
│   ├── helpers.py
│   ├── queries.py
│   ├── README.md
│   └── settings.py
└── loads.py

3 directories, 8 files

The loads.py file contains a skeleton dlt source and pipeline which are referenced by Dagster, but can also be run directly using dlt:

my_project/defs/github_snowflake_ingest/loads.py
import dlt


@dlt.source
def my_source():
@dlt.resource
def hello_world():
yield "hello, world!"

return hello_world


my_load_source = my_source()
my_load_pipeline = dlt.pipeline(destination="snowflake")

Each of these sources and pipelines are referenced by a fully scoped Python identifier in the defs.yaml file, pairing them into a set of loads:

my_project/defs/github_snowflake_ingest/defs.yaml
type: dagster_dlt.DltLoadCollectionComponent

attributes:
loads:
- source: .loads.my_load_source
pipeline: .loads.my_load_pipeline

3. Configure dlt loads

Next, you can fill in the template loads.py file with your own dlt sources and pipelines:

my_project/defs/github_snowflake_ingest/loads.py
import dlt
from .github import github_reactions, github_repo_events, github_stargazers

dlthub_dlt_stargazers_source = github_stargazers("dlt-hub", "dlt")
dlthub_dlt_stargazers_pipeline = dlt.pipeline(
"github_stargazers", destination="snowflake", dataset_name="dlthub_stargazers"
)
my_project/defs/github_snowflake_ingest/defs.yaml
type: dagster_dlt.DltLoadCollectionComponent

attributes:
loads:
- source: .loads.dlthub_dlt_stargazers_source
pipeline: .loads.dlthub_dlt_stargazers_pipeline

You can use dg list defs to list the assets produced by the load:

dg list defs
┏━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ Section ┃ Definitions ┃
┡━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ Assets │ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━┓ │
│ │ ┃ Key ┃ Group ┃ Deps ┃ Kinds ┃ Description ┃ │
│ │ ┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━┩ │
│ │ │ dlthub_stargazers/stargazers │ default │ github_stargazers_stargazers │ dlt │ │ │
│ │ │ │ │ │ snowflake │ │ │
│ │ ├──────────────────────────────┼─────────┼──────────────────────────────┼───────────┼─────────────┤ │
│ │ │ github_stargazers_stargazers │ default │ │ │ │ │
│ │ └──────────────────────────────┴─────────┴──────────────────────────────┴───────────┴─────────────┘ │
└─────────┴─────────────────────────────────────────────────────────────────────────────────────────────────────┘

4. Customize Dagster assets

Properties of the assets emitted by each load can be customized in the defs.yaml file using the translation key:

my_project/defs/github_snowflake_ingest/defs.yaml
type: dagster_dlt.DltLoadCollectionComponent

attributes:
loads:
- source: .loads.dlthub_dlt_stargazers_source
pipeline: .loads.dlthub_dlt_stargazers_pipeline
translation:
group_name: github_data
description: "Loads all users who have starred the dlt-hub/dlt repo"
dg list defs
┏━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ Section ┃ Definitions ┃
┡━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ Assets │ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━┓ │
│ │ ┃ Key ┃ Group ┃ Deps ┃ Kinds ┃ Description ┃ │
│ │ ┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━┩ │
│ │ │ dlthub_stargazers/stargazers │ github_data │ github_stargazers_st… │ dlt │ Loads all users who │ │
│ │ │ │ │ │ snowflake │ have starred the │ │
│ │ │ │ │ │ │ dlt-hub/dlt repo │ │
│ │ ├──────────────────────────────┼─────────────┼───────────────────────┼───────────┼───────────────────────┤ │
│ │ │ github_stargazers_stargazers │ default │ │ │ │ │
│ │ └──────────────────────────────┴─────────────┴───────────────────────┴───────────┴───────────────────────┘ │
└─────────┴────────────────────────────────────────────────────────────────────────────────────────────────────────────┘

Both the DltResource and Pipeline objects are available in scope, and can be used for dynamic customization:

my_project/defs/github_snowflake_ingest/defs.yaml
type: dagster_dlt.DltLoadCollectionComponent

attributes:
loads:
- source: .loads.dlthub_dlt_stargazers_source
pipeline: .loads.dlthub_dlt_stargazers_pipeline
translation:
metadata:
resource_name: "{{ resource.name }}"
pipeline_name: "{{ pipeline.pipeline_name }}"
is_transformer: "{{ resource.is_transformer }}"