Factory pipelines
Dagster encourages software engineering best practices, one of which is keeping code DRY. We will apply our factory pipeline to multiple podcasts which will create distinct asset lineages for each podcast, all within the same Dagster project.
If you look at the rss_pipeline_factory
function, it returns a Definitions
object containing the four assets, a job for those assets, and the sensor for the specific podcast feed. All of those pipelines use the same ConfigurableResource
which we can set at the project level:
src/project_dagster_modal_pipes/defs/resources.py
@dg.definitions
def resources() -> dg.Definitions:
return dg.Definitions(
resources={
"s3": S3Resource(
endpoint_url=dg.EnvVar("CLOUDFLARE_R2_API"),
aws_access_key_id=dg.EnvVar("CLOUDFLARE_R2_ACCESS_KEY_ID"),
aws_secret_access_key=dg.EnvVar("CLOUDFLARE_R2_SECRET_ACCESS_KEY"),
region_name="auto",
),
"modal": ModalClient(
project_directory=Path(__file__).parent.parent.parent.parent
/ "src"
/ "modal_project"
),
"openai": OpenAIResource(api_key=dg.EnvVar("OPENAI_API_KEY")),
}
)
With the resources set, the last step will be to initialize our three podcasts:
src/project_dagster_modal_pipes/defs/feeds.py
@dg.definitions
def defs() -> dg.Definitions:
feeds = [
RSSFeedDefinition(
name="practical_ai",
url="https://changelog.com/practicalai/feed",
),
RSSFeedDefinition(
name="comedy_bang_bang",
url="https://feeds.simplecast.com/byb4nhvN",
),
RSSFeedDefinition(
name="talk_tuah",
url="https://feeds.simplecast.com/lHFdU_33",
),
]
return dg.Definitions.merge(*[rss_pipeline_factory(feed) for feed in feeds])
We can now see all the assets in Dagster and know that we will ingest any new podcasts going forward.