Skip to main content

Build pipelines with AWS EMR Serverless

This article covers how to use Dagster Pipes with AWS EMR Serverless.

The dagster-aws integration library provides the pipes.PipesEMRServerlessClient resource, which can be used to launch AWS EMR Serverless jobs from Dagster assets and ops. Dagster can receive regular events such as logs, asset checks, or asset materializations from jobs launched with this client. Using it requires minimal code changes to your EMR jobs.

Prerequisites

To run the examples, you'll need to:

  • Create a new Dagster project:
    uvx -U create-dagster project <project-name>
  • Install the necessary Python libraries:
    uv pip install dagster-aws
  • Configure AWS authentication credentials. If you don't have this set up already, refer to the boto3 quickstart.
  • In AWS, you'll need:
    • An existing AWS account
    • An AWS EMR Serverless job. AWS CloudWatch logging has to be enabled in order to receive logs from the job:
    {
    "monitoringConfiguration": {
    "cloudWatchLoggingConfiguration": { "enabled": true }
    }
    }

Step 1: Install the dagster-pipes module in your EMR Serverless environment

There are a few options available for shipping Python packages to a PySpark job. For example, install it in your Docker image:

Install the dagster-pipes module in the image used for your EMR job. For example, you can install the dependency with pip in your image Dockerfile:

# start from EMR image
FROM public.ecr.aws/emr-serverless/spark/emr-7.2.0:latest

USER root

RUN python -m pip install dagster-pipes

# copy the job script
COPY . .

USER hadoop

Step 2: Add dagster-pipes to the EMR Serverless job script

Call open_dagster_pipes in the EMR Serverless script to create a context that can be used to send messages to Dagster:

from dagster_pipes import open_dagster_pipes
from pyspark.sql import SparkSession


def main():
with open_dagster_pipes() as pipes:
pipes.log.info("Hello from AWS EMR Serverless!")

spark = SparkSession.builder.appName("HelloWorld").getOrCreate()

df = spark.createDataFrame(
[(1, "Alice", 34), (2, "Bob", 45), (3, "Charlie", 56)],
["id", "name", "age"],
)

# calculate a really important statistic
avg_age = float(df.agg({"age": "avg"}).collect()[0][0])

# attach it to the asset materialization in Dagster
pipes.report_asset_materialization(
metadata={"average_age": {"raw_value": avg_age, "type": "float"}},
data_version="alpha",
)

spark.stop()

tip

The metadata format shown above ({"raw_value": value, "type": type}) is part of Dagster Pipes' special syntax for specifying rich Dagster metadata. For a complete reference of all supported metadata types and their formats, see the Dagster Pipes metadata reference.

Step 3: Create an asset using the PipesEMRServerlessClient to launch the job

tip

You can scaffold assets from the command line by running dg scaffold defs dagster.asset <path/to/asset_file.py>. For more information, see the dg CLI docs.

In the Dagster asset/op code, use the PipesEMRServerlessClient resource to launch the job:

src/<project_name>/defs/assets.py
from dagster_aws.pipes import PipesEMRServerlessClient

import dagster as dg


@dg.asset
def emr_serverless_asset(
context: dg.AssetExecutionContext,
pipes_emr_serverless_client: PipesEMRServerlessClient,
):
return pipes_emr_serverless_client.run(
context=context,
start_job_run_params={
"applicationId": "<app-id>",
"executionRoleArn": "<emr-role>",
"clientToken": context.run_id, # idempotency identifier for the job run
"configurationOverrides": {
"monitoringConfiguration": {
"cloudWatchLoggingConfiguration": {"enabled": True}
}
},
},
).get_results()

This will launch the AWS EMR Serverless job and wait for it completion. If the job fails, the Dagster process will raise an exception. If the Dagster process is interrupted while the job is still running, the job will be terminated.

Step 4: Create Dagster definitions

tip

You can scaffold resources from the command line by running dg scaffold defs dagster.resources <path/to/resources_file.py>. For more information, see the dg CLI docs.

Next, add the PipesEMRServerlessClient resource to your project's Definitions object:

src/<project_name>/defs/resources.py
from dagster_aws.pipes import PipesEMRServerlessClient

import dagster as dg


@dg.definitions
def resources():
return dg.Definitions(
resources={"pipes_emr_serverless_client": PipesEMRServerlessClient()}
)

Dagster will now be able to launch the AWS EMR Serverless task from the emr_serverless_asset asset, and receive logs and events from the job. If using the default message_reader PipesCloudwatchLogReader, driver logs will be forwarded to the Dagster process.