Extract data
In the first step in our pipeline, we will use software-defined assets to load files into DuckDB, an analytical database. This data will serve as the foundation for the rest of the ETL tutorial.
1. Scaffold an asset
We will start by writing our ingestion assets. Assets serve as the building blocks for our platform in Dagster and represent the underlying entities in our pipelines (such as tables, datasets, or machine learning models).
When building assets, the first step is to scaffold the assets file with the dg scaffold
command:
dg scaffold defs dagster.asset assets.py
This adds a file called assets.py
that will contain our asset code to the etl_tutorial
module:
src
└── etl_tutorial
└── defs
└── assets.py
The dg
CLI provides a number of commands to help structure and navigate Dagster projects. For more information, see the dg
CLI documentation.
2. Write DuckDB helper functions
Since we will be working with DuckDB, we will need to add the DuckDB Python library to our Dagster project:
uv pip install duckdb
Next, we need to write a helper function for working with DuckDB. Since we’ll be ingesting multiple files, it’s important to ensure that each asset can acquire a lock on the DuckDB database file. This function ensures that the query holds a lock on the file throughout its execution:
import duckdb
import filelock
def serialize_duckdb_query(duckdb_path: str, sql: str):
"""Execute SQL statement with file lock to guarantee cross-process concurrency."""
lock_path = f"{duckdb_path}.lock"
with filelock.FileLock(lock_path):
conn = duckdb.connect(duckdb_path)
try:
return conn.execute(sql)
finally:
conn.close()
Now, we can think about how we want to ingest the data. For this tutorial we will be working with three files:
All of these files are located in cloud storage, and we would like to ingest each of them into a separate table in the DuckDB database. Since each file will follow a similar path into the database, we can write a function for this process:
...
def import_url_to_duckdb(url: str, duckdb_path: str, table_name: str):
create_query = f"""
create or replace table {table_name} as (
select * from read_csv_auto('{url}')
)
"""
serialize_duckdb_query(duckdb_path, create_query)
This will create a table in DuckDB from a CSV file using the serialize_duckdb_query
function we just defined.
3. Define assets
Now that we have written our DuckDB helper functions, we are ready to create our assets. We will define an asset for each file we want to ingest:
...
import dagster as dg
@dg.asset(kinds={"duckdb"}, key=["target", "main", "raw_customers"])
def raw_customers() -> None:
import_url_to_duckdb(
url="https://raw.githubusercontent.com/dbt-labs/jaffle-shop-classic/refs/heads/main/seeds/raw_customers.csv",
duckdb_path="/tmp/jaffle_platform.duckdb",
table_name="jaffle_platform.main.raw_customers",
)
@dg.asset(kinds={"duckdb"}, key=["target", "main", "raw_orders"])
def raw_orders() -> None:
import_url_to_duckdb(
url="https://raw.githubusercontent.com/dbt-labs/jaffle-shop-classic/refs/heads/main/seeds/raw_orders.csv",
duckdb_path="/tmp/jaffle_platform.duckdb",
table_name="jaffle_platform.main.raw_orders",
)
@dg.asset(kinds={"duckdb"}, key=["target", "main", "raw_payments"])
def raw_payments() -> None:
import_url_to_duckdb(
url="https://raw.githubusercontent.com/dbt-labs/jaffle-shop-classic/refs/heads/main/seeds/raw_payments.csv",
duckdb_path="/tmp/jaffle_platform.duckdb",
table_name="jaffle_platform.main.raw_payments",
)
To ensure the Definitions
object loads successfully, we can use the dg check defs
command:
dg check defs
All components validated successfully.
All definitions loaded successfully.
In Dagster, the Definitions
object refers to the collection of all Dagster objects in a project. As we continue through this tutorial, we’ll add several more objects to the pipeline.
Most of these objects (such as assets) are loaded automatically via the definitions.py
file, which was automatically generated when we first created our Dagster project:
from pathlib import Path
from dagster import definitions, load_from_defs_folder
@definitions
def defs():
return load_from_defs_folder(project_root=Path(__file__).parent.parent.parent)
4. Materialize the assets
Now that our assets are configured, they can be viewed in the asset catalog within the Dagster UI. Navigate back to http://127.0.0.1:3000 (or restart dg dev
if you have closed it) and reload the definitions:
-
Navigate to Deployment.
-
Click Reload definitions.
You should see three assets, one for each of the three raw files (customers, orders, payments) being loaded into DuckDB:
To materialize the assets:
To materialize the assets:
-
Click Assets, then click "View global asset lineage" to see all of your assets.
-
Click materialize all.
-
Navigate to the runs tab and select the most recent run. Here you can see the logs from the run.
You can also materialize assets from the command line with dg
launch. You will need to pass an asset selection -- in this case, *
selects all assets:
dg launch --assets "*"
To launch specific assets, pass an asset selection that selects them:
dg launch --asset target/main/raw_customers,target/main/raw_orders,target/main/raw_payments
Summary
At this point, we have handled the ingestion layer of our ETL pipeline. The etl_tutorial
module should look like this:
src
└── etl_tutorial
├── __init__.py
├── definitions.py
└── defs
├── __init__.py
└── assets.py
Next steps
In the next step, we will build downstream assets that transform the data we have loaded into DuckDB.