Skip to main content

Using Dagster and Airflow together

info

dg and Dagster Components are under active development. You may encounter feature gaps, and the APIs may change. To report issues or give feedback, please join the #dg-components channel in the Dagster Community Slack.

The dagster-airlift library provides an AirflowInstanceComponent which can be used to represent Airflow DAGs in Dagster, allowing easy interoperability between Airflow and Dagster.

Setup and peering

1. Prepare a Dagster project

To begin, you'll need a Dagster project. You can use an existing components-ready project or create a new one:

uvx -U create-dagster project my-project && cd my-project

Activate the project virtual environment:

source .venv/bin/activate

Finally, add the dagster-airlift library to the project:

uv add 'dagster-airlift[core]'

2. Scaffold an AirflowInstanceComponent

note

Currently dagster-airlift only supports basic authentication against an Airflow instance.

To scaffold a new component in your project, use the dg scaffold defs command:

dg scaffold defs dagster_airlift.core.components.AirflowInstanceComponent airflow --name my_airflow --auth-type basic_auth
Creating defs at /.../my-project/src/my_project/defs/airflow.

This will create a component definition file called defs.yaml in your project under the src/my_project/defs/airflow directory.

tree src/my_project/defs
src/my_project/defs
├── __init__.py
└── airflow
└── defs.yaml

2 directories, 2 files

4. Update defs.yaml with Airflow configuration

By default, the Airlift component reads values from the environment variables AIRFLOW_WEBSERVER_URL, AIRFLOW_USERNAME, and AIRFLOW_PASSWORD. While you should never include your password directly in this file, you can update defs.yaml to add the webserver URL and username:

cat src/my_project/defs/airflow/defs.yaml
type: dagster_airlift.core.components.AirflowInstanceComponent

attributes:
name: my_airflow
auth:
type: basic_auth
webserver_url: '{{ env("AIRFLOW_WEBSERVER_URL") }}'
username: '{{ env("AIRFLOW_USERNAME") }}'
password: '{{ env("AIRFLOW_PASSWORD") }}'

Once you have added these values, the following will happen:

  1. Dagster will create a sensor called your_airlift_instance__airflow_monitoring_job_sensor that is responsible for detecting runs in your Airflow instance and pulling them into Dagster.
  2. Your Airflow DAGs will be represented in Dagster in the "Jobs" page, and any jobs pulled from Airflow will be marked with an Airflow icon.
  3. Airflow datasets will be represented in Dagster as assets.
  4. When an Airflow DAG executes, that run will be represented in Dagster.

Mapping Dagster assets to Airflow tasks

Once you have represented your Airflow instance in Dagster using the Airflow instance component, you may want to represent the graph of asset dependencies produced by that DAG as well, which you can do in defs.yaml.

DAG-level mapping

You can manually define which assets are produced by a given Airflow DAG by editing mappings in defs.yaml:

type: dagster_airlift.core.components.AirflowInstanceComponent

attributes:
name: my_airflow
auth:
type: basic_auth
webserver_url: '{{ env("AIRFLOW_WEBSERVER_URL") }}'
username: '{{ env("AIRFLOW_USERNAME") }}'
password: '{{ env("AIRFLOW_PASSWORD") }}'
mappings:
- dag_id: upload_source_data
assets:
- spec:
key: order_data
- spec:
key: activity_data
- spec:
key: aggregated_user_data
deps: [order_data, activity_data]

Task-level mapping

If you have a more specific mapping from a task within the dag to a set of assets, you can also set these mappings at the task level:

type: dagster_airlift.core.components.AirflowInstanceComponent

attributes:
name: my_airflow
auth:
type: basic_auth
webserver_url: '{{ env("AIRFLOW_WEBSERVER_URL") }}'
username: '{{ env("AIRFLOW_USERNAME") }}'
password: '{{ env("AIRFLOW_PASSWORD") }}'
mappings:
- dag_id: upload_source_data
task_mappings:
- task_id: upload_orders
assets:
- spec:
key: order_data
- task_id: upload_activity
assets:
- spec:
key: activity_data
- task_id: aggregate_user_data
assets:
- spec:
key: aggregated_user_data
deps: [order_data, activity_data]