Skip to main content

Snowflake patterns and best practices

This guide covers advanced patterns and best practices for integrating Snowflake with Dagster.

EL from S3 to Snowflake with TemplatedSqlComponent

You can use TemplatedSqlComponent together with Snowflake's native COPY INTO command to load data from S3 into Snowflake without any additional dependencies like dlt or Sling. A shared SQL template handles the loading logic, and each table gets its own component definition.

Prerequisites

Before configuring the component, you'll need:

The SQL template below includes the DDL to create both the stage and the target table.

1. Create the SQL template

Create a shared ingest.sql file. It uses template variables for the database, schema, table, and stage. The highlighted section is the core logic that runs each time the asset materializes:

my_project/defs/s3_leads/ingest.sql
--   1. Create a storage integration in Snowflake:
-- CREATE STORAGE INTEGRATION s3_integration
-- TYPE = EXTERNAL_STAGE
-- STORAGE_PROVIDER = 'S3'
-- STORAGE_AWS_ROLE_ARN = 'arn:aws:iam:::<account>:role/<role>'
-- ENABLED = TRUE
-- STORAGE_ALLOWED_LOCATIONS = ('s3://{{ s3_bucket }}/');
--
-- 2. Create an external stage:
-- CREATE STAGE {{ stage_name }}
-- STORAGE_INTEGRATION = s3_integration
-- URL = 's3://{{ s3_bucket }}/{{ s3_prefix }}'
-- FILE_FORMAT = (TYPE = '{{ file_format }}' SKIP_HEADER = 1);

-- Create target table if it doesn't exist
CREATE TABLE IF NOT EXISTS {{ database }}.{{ schema }}.{{ table_name }} (
id VARCHAR,
name VARCHAR,
email VARCHAR,
created_at TIMESTAMP_NTZ,
_loaded_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);

-- Truncate and reload (full refresh)
TRUNCATE TABLE {{ database }}.{{ schema }}.{{ table_name }};

-- Copy data from S3 stage into Snowflake
COPY INTO {{ database }}.{{ schema }}.{{ table_name }} (id, name, email, created_at)
FROM @{{ stage_name }}/{{ s3_prefix }}
FILE_FORMAT = (TYPE = '{{ file_format }}' SKIP_HEADER = 1)
PATTERN = '{{ file_pattern }}'
ON_ERROR = 'CONTINUE';

2. Configure the component

Create a defs.yaml that wires up the Snowflake connection and supplies values for each template variable. Define one document per table, separated by --- — only the table name, stage name, and S3 prefix need to change between them:

my_project/defs/s3_leads/defs.yaml
type: dagster.TemplatedSqlComponent

attributes:
connection: "{{ context.load_component('snowflake_connection') }}"
sql_template:
path: ingest.sql
sql_template_vars:
database: "{{ env.SNOWFLAKE_DATABASE }}"
schema: "{{ env.SNOWFLAKE_SCHEMA }}"
table_name: leads
stage_name: s3_leads_stage
s3_bucket: "{{ env('S3_BUCKET_URL', 's3://your-bucket') }}"
s3_prefix: leads/
file_format: CSV
file_pattern: ".*[.]csv"
assets:
- key: raw_data/leads
group_name: ingestion
description: >
Leads data ingested from S3 into Snowflake via COPY INTO.
---
type: dagster.TemplatedSqlComponent

attributes:
connection: "{{ context.load_component('snowflake_connection') }}"
sql_template:
path: ingest.sql
sql_template_vars:
database: "{{ env.SNOWFLAKE_DATABASE }}"
schema: "{{ env.SNOWFLAKE_SCHEMA }}"
table_name: accounts
stage_name: s3_accounts_stage
s3_bucket: "{{ env('S3_BUCKET_URL', 's3://your-bucket') }}"
s3_prefix: accounts/
file_format: CSV
file_pattern: ".*[.]csv"
assets:
- key: raw_data/accounts
group_name: ingestion
description: >
Accounts data ingested from S3 into Snowflake via COPY INTO.