Snowflake patterns and best practices
This guide covers advanced patterns and best practices for integrating Snowflake with Dagster.
EL from S3 to Snowflake with TemplatedSqlComponent
You can use TemplatedSqlComponent together with Snowflake's native COPY INTO command to load data from S3 into Snowflake without any additional dependencies like dlt or Sling. A shared SQL template handles the loading logic, and each table gets its own component definition.
Prerequisites
Before configuring the component, you'll need:
- A Snowflake storage integration connecting Snowflake to your S3 bucket
- An external stage pointing at the S3 path you want to load from
The SQL template below includes the DDL to create both the stage and the target table.
1. Create the SQL template
Create a shared ingest.sql file. It uses template variables for the database, schema, table, and stage. The highlighted section is the core logic that runs each time the asset materializes:
-- 1. Create a storage integration in Snowflake:
-- CREATE STORAGE INTEGRATION s3_integration
-- TYPE = EXTERNAL_STAGE
-- STORAGE_PROVIDER = 'S3'
-- STORAGE_AWS_ROLE_ARN = 'arn:aws:iam:::<account>:role/<role>'
-- ENABLED = TRUE
-- STORAGE_ALLOWED_LOCATIONS = ('s3://{{ s3_bucket }}/');
--
-- 2. Create an external stage:
-- CREATE STAGE {{ stage_name }}
-- STORAGE_INTEGRATION = s3_integration
-- URL = 's3://{{ s3_bucket }}/{{ s3_prefix }}'
-- FILE_FORMAT = (TYPE = '{{ file_format }}' SKIP_HEADER = 1);
-- Create target table if it doesn't exist
CREATE TABLE IF NOT EXISTS {{ database }}.{{ schema }}.{{ table_name }} (
id VARCHAR,
name VARCHAR,
email VARCHAR,
created_at TIMESTAMP_NTZ,
_loaded_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);
-- Truncate and reload (full refresh)
TRUNCATE TABLE {{ database }}.{{ schema }}.{{ table_name }};
-- Copy data from S3 stage into Snowflake
COPY INTO {{ database }}.{{ schema }}.{{ table_name }} (id, name, email, created_at)
FROM @{{ stage_name }}/{{ s3_prefix }}
FILE_FORMAT = (TYPE = '{{ file_format }}' SKIP_HEADER = 1)
PATTERN = '{{ file_pattern }}'
ON_ERROR = 'CONTINUE';
2. Configure the component
Create a defs.yaml that wires up the Snowflake connection and supplies values for each template variable. Define one document per table, separated by --- — only the table name, stage name, and S3 prefix need to change between them:
type: dagster.TemplatedSqlComponent
attributes:
connection: "{{ context.load_component('snowflake_connection') }}"
sql_template:
path: ingest.sql
sql_template_vars:
database: "{{ env.SNOWFLAKE_DATABASE }}"
schema: "{{ env.SNOWFLAKE_SCHEMA }}"
table_name: leads
stage_name: s3_leads_stage
s3_bucket: "{{ env('S3_BUCKET_URL', 's3://your-bucket') }}"
s3_prefix: leads/
file_format: CSV
file_pattern: ".*[.]csv"
assets:
- key: raw_data/leads
group_name: ingestion
description: >
Leads data ingested from S3 into Snowflake via COPY INTO.
---
type: dagster.TemplatedSqlComponent
attributes:
connection: "{{ context.load_component('snowflake_connection') }}"
sql_template:
path: ingest.sql
sql_template_vars:
database: "{{ env.SNOWFLAKE_DATABASE }}"
schema: "{{ env.SNOWFLAKE_SCHEMA }}"
table_name: accounts
stage_name: s3_accounts_stage
s3_bucket: "{{ env('S3_BUCKET_URL', 's3://your-bucket') }}"
s3_prefix: accounts/
file_format: CSV
file_pattern: ".*[.]csv"
assets:
- key: raw_data/accounts
group_name: ingestion
description: >
Accounts data ingested from S3 into Snowflake via COPY INTO.