Skip to main content

Snowflake Polars (dagster-snowflake-polars)

This library provides an integration with Snowflake and Polars, allowing you to use Polars DataFrames with Snowflake storage.

I/O Manager

dagster_snowflake_polars.snowflake_polars_io_manager
beta

This API is currently in beta, and may have breaking changes in minor version releases, with behavior changes in patch releases.

An I/O manager definition that reads inputs from and writes Polars DataFrames to Snowflake. When using the snowflake_polars_io_manager, any inputs and outputs without type annotations will be loaded as Polars DataFrames.

Returns: IOManagerDefinition Examples:

from dagster_snowflake_polars import snowflake_polars_io_manager
from dagster import asset, Definitions
import polars as pl

@asset(
key_prefix=["my_schema"] # will be used as the schema in snowflake
)
def my_table() -> pl.DataFrame: # the name of the asset will be the table name
...

defs = Definitions(
assets=[my_table],
resources={
"io_manager": snowflake_polars_io_manager.configured({
"database": "my_database",
"account" : {"env": "SNOWFLAKE_ACCOUNT"},
...
})
}
)

You can set a default schema to store the assets using the schema configuration value of the Snowflake I/O Manager. This schema will be used if no other schema is specified directly on an asset or op.

defs = Definitions(
assets=[my_table],
resources={"io_manager": snowflake_polars_io_manager.configured(
{"database": "my_database", "schema": "my_schema", ...} # will be used as the schema
)}
)

On individual assets, you can also specify the schema where they should be stored using metadata or by adding a key_prefix to the asset key. If both key_prefix and metadata are defined, the metadata will take precedence.

@asset(
key_prefix=["my_schema"] # will be used as the schema in snowflake
)
def my_table() -> pl.DataFrame:
...

@asset(
metadata={"schema": "my_schema"} # will be used as the schema in snowflake
)
def my_other_table() -> pl.DataFrame:
...

For ops, the schema can be specified by including a “schema” entry in output metadata.

@op(
out={"my_table": Out(metadata={"schema": "my_schema"})}
)
def make_my_table() -> pl.DataFrame:
...

If none of these is provided, the schema will default to “public”.

To only use specific columns of a table as input to a downstream op or asset, add the metadata “columns” to the In or AssetIn.

@asset(
ins={"my_table": AssetIn("my_table", metadata={"columns": ["a"]})}
)
def my_table_a(my_table: pl.DataFrame) -> pl.DataFrame:
# my_table will just contain the data from column "a"
...
class dagster_snowflake_polars.SnowflakePolarsIOManager
beta

This API is currently in beta, and may have breaking changes in minor version releases, with behavior changes in patch releases.

An I/O manager definition that reads inputs from and writes Polars DataFrames to Snowflake. When using the SnowflakePolarsIOManager, any inputs and outputs without type annotations will be loaded as Polars DataFrames.

Returns: IOManagerDefinition Examples:

from dagster_snowflake_polars import SnowflakePolarsIOManager
from dagster import asset, Definitions, EnvVar
import polars as pl

@asset(
key_prefix=["my_schema"] # will be used as the schema in snowflake
)
def my_table() -> pl.DataFrame: # the name of the asset will be the table name
...

defs = Definitions(
assets=[my_table],
resources={
"io_manager": SnowflakePolarsIOManager(database="MY_DATABASE", account=EnvVar("SNOWFLAKE_ACCOUNT"), ...)
}
)

You can set a default schema to store the assets using the schema configuration value of the Snowflake I/O Manager. This schema will be used if no other schema is specified directly on an asset or op.

defs = Definitions(
assets=[my_table],
resources={
"io_manager": SnowflakePolarsIOManager(database="my_database", schema="my_schema", ...)
}
)

On individual assets, you can also specify the schema where they should be stored using metadata or by adding a key_prefix to the asset key. If both key_prefix and metadata are defined, the metadata will take precedence.

@asset(
key_prefix=["my_schema"] # will be used as the schema in snowflake
)
def my_table() -> pl.DataFrame:
...

@asset(
metadata={"schema": "my_schema"} # will be used as the schema in snowflake
)
def my_other_table() -> pl.DataFrame:
...

For ops, the schema can be specified by including a “schema” entry in output metadata.

@op(
out={"my_table": Out(metadata={"schema": "my_schema"})}
)
def make_my_table() -> pl.DataFrame:
...

If none of these is provided, the schema will default to “public”.

To only use specific columns of a table as input to a downstream op or asset, add the metadata “columns” to the In or AssetIn.

@asset(
ins={"my_table": AssetIn("my_table", metadata={"columns": ["a"]})}
)
def my_table_a(my_table: pl.DataFrame) -> pl.DataFrame:
# my_table will just contain the data from column "a"
...

Type Handler

class dagster_snowflake_polars.SnowflakePolarsTypeHandler
beta

This API is currently in beta, and may have breaking changes in minor version releases, with behavior changes in patch releases.

Plugin for the Snowflake I/O Manager that can store and load Polars DataFrames as Snowflake tables.

This handler uses Polars’ native write_database method with ADBC (Arrow Database Connectivity) for efficient data transfer without converting to pandas.

Examples:

from dagster_snowflake import SnowflakeIOManager
from dagster_snowflake_polars import SnowflakePolarsTypeHandler
from dagster import Definitions, EnvVar

class MySnowflakeIOManager(SnowflakeIOManager):
@staticmethod
def type_handlers() -> Sequence[DbTypeHandler]:
return [SnowflakePolarsTypeHandler()]

@asset(
key_prefix=["my_schema"] # will be used as the schema in snowflake
)
def my_table() -> pl.DataFrame: # the name of the asset will be the table name
...

defs = Definitions(
assets=[my_table],
resources={
"io_manager": MySnowflakeIOManager(database="MY_DATABASE", account=EnvVar("SNOWFLAKE_ACCOUNT"), ...)
}
)