Skip to main content

Extract data

In the first step in our pipeline, we will use software-defined assets to load files into DuckDB, an analytical database. This data will serve as the foundation for the rest of the ETL tutorial.

1. Scaffold an asset

We will start by writing our ingestion assets. Assets serve as the building blocks for our platform in Dagster and represent the underlying entities in our pipelines (such as tables, datasets, or machine learning models).

When building assets, the first step is to scaffold the assets file with the dg scaffold command:

dg scaffold defs dagster.asset assets.py

This adds a file called assets.py that will contain our asset code to the etl_tutorial module:

src
└── etl_tutorial
└── defs
└── assets.py
info

The dg CLI provides a number of commands to help structure and navigate Dagster projects. For more information, see the dg CLI documentation.

2. Write DuckDB helper functions

Since we will be working with DuckDB, we will need to add the DuckDB Python library to our Dagster project:

uv pip install duckdb

Next, we need to write a helper function for working with DuckDB. Since we’ll be ingesting multiple files, it’s important to ensure that each asset can acquire a lock on the DuckDB database file. This function ensures that the query holds a lock on the file throughout its execution:

src/etl_tutorial/defs/assets.py
import duckdb
import filelock


def serialize_duckdb_query(duckdb_path: str, sql: str):
"""Execute SQL statement with file lock to guarantee cross-process concurrency."""
lock_path = f"{duckdb_path}.lock"
with filelock.FileLock(lock_path):
conn = duckdb.connect(duckdb_path)
try:
return conn.execute(sql)
finally:
conn.close()

Now, we can think about how we want to ingest the data. For this tutorial we will be working with three files:

All of these files are located in cloud storage, and we would like to ingest each of them into a separate table in the DuckDB database. Since each file will follow a similar path into the database, we can write a function for this process:

src/etl_tutorial/defs/assets.py
...


def import_url_to_duckdb(url: str, duckdb_path: str, table_name: str):
create_query = f"""
create or replace table {table_name} as (
select * from read_csv_auto('{url}')
)
"""

serialize_duckdb_query(duckdb_path, create_query)

This will create a table in DuckDB from a CSV file using the serialize_duckdb_query function we just defined.

3. Define assets

Now that we have written our DuckDB helper functions, we are ready to create our assets. We will define an asset for each file we want to ingest:

src/etl_tutorial/defs/assets.py
...


import dagster as dg


@dg.asset(kinds={"duckdb"}, key=["target", "main", "raw_customers"])
def raw_customers() -> None:
import_url_to_duckdb(
url="https://raw.githubusercontent.com/dbt-labs/jaffle-shop-classic/refs/heads/main/seeds/raw_customers.csv",
duckdb_path="/tmp/jaffle_platform.duckdb",
table_name="jaffle_platform.main.raw_customers",
)


@dg.asset(kinds={"duckdb"}, key=["target", "main", "raw_orders"])
def raw_orders() -> None:
import_url_to_duckdb(
url="https://raw.githubusercontent.com/dbt-labs/jaffle-shop-classic/refs/heads/main/seeds/raw_orders.csv",
duckdb_path="/tmp/jaffle_platform.duckdb",
table_name="jaffle_platform.main.raw_orders",
)


@dg.asset(kinds={"duckdb"}, key=["target", "main", "raw_payments"])
def raw_payments() -> None:
import_url_to_duckdb(
url="https://raw.githubusercontent.com/dbt-labs/jaffle-shop-classic/refs/heads/main/seeds/raw_payments.csv",
duckdb_path="/tmp/jaffle_platform.duckdb",
table_name="jaffle_platform.main.raw_payments",
)

To ensure the Definitions object loads successfully, we can use the dg check defs command:

dg check defs
All components validated successfully.
All definitions loaded successfully.
info

In Dagster, the Definitions object refers to the collection of all Dagster objects in a project. As we continue through this tutorial, we’ll add several more objects to the pipeline.

Most of these objects (such as assets) are loaded automatically via the definitions.py file, which was automatically generated when we first created our Dagster project:

src/etl_tutorial/definitions.py
from pathlib import Path

from dagster import definitions, load_from_defs_folder


@definitions
def defs():
return load_from_defs_folder(project_root=Path(__file__).parent.parent.parent)

4. Materialize the assets

Now that our assets are configured, they can be viewed in the asset catalog within the Dagster UI. Navigate back to http://127.0.0.1:3000 (or restart dg dev if you have closed it) and reload the definitions:

  1. Navigate to Deployment.

  2. Click Reload definitions.

    2048 resolution

You should see three assets, one for each of the three raw files (customers, orders, payments) being loaded into DuckDB:

To materialize the assets:

To materialize the assets:

  1. Click Assets, then click "View global asset lineage" to see all of your assets.

  2. Click materialize all.

  3. Navigate to the runs tab and select the most recent run. Here you can see the logs from the run.

    2048 resolution

tip

You can also materialize assets from the command line with dg launch. You will need to pass an asset selection -- in this case, * selects all assets:

dg launch --assets "*"

To launch specific assets, pass an asset selection that selects them:

dg launch --asset target/main/raw_customers,target/main/raw_orders,target/main/raw_payments

Summary

At this point, we have handled the ingestion layer of our ETL pipeline. The etl_tutorial module should look like this:

src
└── etl_tutorial
├── __init__.py
├── definitions.py
└── defs
├── __init__.py
└── assets.py

Next steps

In the next step, we will build downstream assets that transform the data we have loaded into DuckDB.