Skip to main content

Define a Dagster asset that invokes subprocess

note

This is part one of the Using Dagster Pipes tutorial. If you are looking for how to modify your existing code that is already being orchestrated by Dagster, you can jump to part 2, Modify external code.

note

In this part of the tutorial, you'll create a Dagster asset that, in its execution function, opens a Dagster pipes session and invokes a subprocess that executes some external code.

Step 1: Create a Dagster project

First we will create a new Dagster project:

uvx -U create-dagster project external_pipeline

Step 2: Scaffold and define the asset

Next, we can scaffold the asset file:

dg scaffold defs dagster.assets dagster_code.py

Next, you’ll define the asset. Copy and paste the following into the file src/external_pipeline/defs/dagster_code.py:

src/external_pipeline/defs/dagster_code.py
import shutil

import dagster as dg


@dg.asset
def subprocess_asset(
context: dg.AssetExecutionContext, pipes_subprocess_client: dg.PipesSubprocessClient
) -> dg.MaterializeResult:
cmd = [shutil.which("python"), dg.file_relative_path(__file__, "external_code.py")]
return pipes_subprocess_client.run(
command=cmd, context=context
).get_materialize_result()

Step 3: Add external code file

Before we define our asset code, we will add a standalone Python script named external_code.py within the directory we just scaffolded (src/external_pipeline/defs/). Later, we will invoke a subprocess that executes this external code from the asset using the pipes_subprocess_client resource. The external code looks like the following:

src/external_pipeline/defs/external_code.py
def main():
orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})
total_orders = len(orders_df)
print(f"processing total {total_orders} orders")

Here’s what we did in this code:

  • Created an asset named subprocess_asset
  • Provided AssetExecutionContext as the context argument to the asset. This object provides system information such as resources, config, and logging. We’ll come back to this a bit later in this section.
  • Specified a resource for the asset to use, PipesSubprocessClient. We’ll also come back to this in a little bit.
  • Declared a command list cmd to run the external script. In the list:
    • First, found the path to the Python executable on the system using shutil.which("python").
    • Then, provided the file path to the file that we want to execute. In this case, it’s the external_code.py file that you created earlier.

Step 4: Invoke the external code from the asset

Next, invoke a subprocess that executes the external code from the asset using the pipes_subprocess_client resource:

src/external_pipeline/defs/dagster_code.py
import shutil

import dagster as dg


@dg.asset
def subprocess_asset(
context: dg.AssetExecutionContext, pipes_subprocess_client: dg.PipesSubprocessClient
) -> dg.MaterializeResult:
cmd = [shutil.which("python"), dg.file_relative_path(__file__, "external_code.py")]
return pipes_subprocess_client.run(
command=cmd, context=context
).get_materialize_result()

Let’s take a look at what this code does:

  • The PipesSubprocessClient resource used by the asset exposes a run method.
  • When the asset is executed, this method will synchronously execute the subprocess in in a pipes session, and it will return a PipesClientCompletedInvocation object.
  • This object contains a get_materialize_result method, which you can use to access the MaterializeResult event reported by the subprocess. We'll talk about how to report events from the subprocess in the next section.
  • Lastly, return the result of the subprocess.

Step 5: Define a Definitions object

tip

You can scaffold resources from the command line by running dg scaffold defs dagster.resources <path/to/resources_file.py>. For more information, see the dg CLI docs.

To make the subprocess resource loadable and accessible, such as the CLI, UI, and Dagster+, you’ll create a function with the @dg.Definitions.

Copy and paste the following to the bottom of src/external_pipeline/defs/resources.py:

src/external_pipeline/defs/resources.py
import dagster as dg


@dg.definitions
def resources():
return dg.Definitions(
resources={"pipes_subprocess_client": dg.PipesSubprocessClient()}
)

At this point, dagster_code.py should look like the following:

src/external_pipeline/defs/dagster_code.py
import shutil

import dagster as dg


@dg.asset
def subprocess_asset(
context: dg.AssetExecutionContext, pipes_subprocess_client: dg.PipesSubprocessClient
) -> dg.MaterializeResult:
cmd = [shutil.which("python"), dg.file_relative_path(__file__, "external_code.py")]
return pipes_subprocess_client.run(
command=cmd, context=context
).get_materialize_result()

Step 6: Run the subprocess from the Dagster UI

In this step, you’ll execute the subprocess asset you created in earlier steps from the Dagster UI.

  1. In a new command line session, run the following to start the UI:

    dg dev
  2. Navigate to http://localhost:3000, where you should see the UI:

    Asset in the UI

  3. Click Materialize located in the top right to run your code:

    Materialize asset

  4. Navigate to the Run details page, where you should see the logs for the run:

    Logs in the run details page

  5. In external_code.py, we have a print statement that outputs to stdout. Dagster will display these in the UI's raw compute log view. To see the stdout log, toggle the log section to stdout:

    Raw compute logs in the run details page

What's next?

At this point, you've created a Dagster asset that invokes an external Python script, launched the code in a subprocess, and viewed the result in Dagster UI. Next, you'll learn how to modify your external code to work with Dagster Pipes to send information back to Dagster.