Skip to main content

Factory pipelines

Dagster encourages software engineering best practices, one of which is keeping code DRY. We will apply our factory pipeline to multiple podcasts which will create distinct asset lineages for each podcast, all within the same Dagster project.

If you look at the rss_pipeline_factory function, it returns a Definitions object containing the four assets, a job for those assets, and the sensor for the specific podcast feed. All of those pipelines use the same ConfigurableResource which we can set at the project level:

src/project_dagster_modal_pipes/defs/resources.py
@dg.definitions
def resources() -> dg.Definitions:
return dg.Definitions(
resources={
"s3": S3Resource(
endpoint_url=dg.EnvVar("CLOUDFLARE_R2_API"),
aws_access_key_id=dg.EnvVar("CLOUDFLARE_R2_ACCESS_KEY_ID"),
aws_secret_access_key=dg.EnvVar("CLOUDFLARE_R2_SECRET_ACCESS_KEY"),
region_name="auto",
),
"modal": ModalClient(
project_directory=Path(__file__).parent.parent.parent.parent
/ "src"
/ "modal_project"
),
"openai": OpenAIResource(api_key=dg.EnvVar("OPENAI_API_KEY")),
}
)

With the resources set, the last step will be to initialize our three podcasts:

src/project_dagster_modal_pipes/defs/feeds.py
@dg.definitions
def defs() -> dg.Definitions:
feeds = [
RSSFeedDefinition(
name="practical_ai",
url="https://changelog.com/practicalai/feed",
),
RSSFeedDefinition(
name="comedy_bang_bang",
url="https://feeds.simplecast.com/byb4nhvN",
),
RSSFeedDefinition(
name="talk_tuah",
url="https://feeds.simplecast.com/lHFdU_33",
),
]
return dg.Definitions.merge(*[rss_pipeline_factory(feed) for feed in feeds])

We can now see all the assets in Dagster and know that we will ingest any new podcasts going forward.

2048 resolution