Snowflake Polars (dagster-snowflake-polars)
This library provides an integration with Snowflake and Polars, allowing you to use Polars DataFrames with Snowflake storage.
I/O Manager
- dagster_snowflake_polars.snowflake_polars_io_manager
- beta
This API is currently in beta, and may have breaking changes in minor version releases, with behavior changes in patch releases.
An I/O manager definition that reads inputs from and writes Polars DataFrames to Snowflake. When using the snowflake_polars_io_manager, any inputs and outputs without type annotations will be loaded as Polars DataFrames.
Returns: IOManagerDefinition Examples:
from dagster_snowflake_polars import snowflake_polars_io_manager
from dagster import asset, Definitions
import polars as pl
@asset(
key_prefix=["my_schema"] # will be used as the schema in snowflake
)
def my_table() -> pl.DataFrame: # the name of the asset will be the table name
...
defs = Definitions(
assets=[my_table],
resources={
"io_manager": snowflake_polars_io_manager.configured({
"database": "my_database",
"account" : {"env": "SNOWFLAKE_ACCOUNT"},
...
})
}
)You can set a default schema to store the assets using the
schema
configuration value of the Snowflake I/O Manager. This schema will be used if no other schema is specified directly on an asset or op.defs = Definitions(
assets=[my_table],
resources={"io_manager": snowflake_polars_io_manager.configured(
{"database": "my_database", "schema": "my_schema", ...} # will be used as the schema
)}
)On individual assets, you can also specify the schema where they should be stored using metadata or by adding a
key_prefix
to the asset key. If bothkey_prefix
and metadata are defined, the metadata will take precedence.@asset(
key_prefix=["my_schema"] # will be used as the schema in snowflake
)
def my_table() -> pl.DataFrame:
...
@asset(
metadata={"schema": "my_schema"} # will be used as the schema in snowflake
)
def my_other_table() -> pl.DataFrame:
...For ops, the schema can be specified by including a “schema” entry in output metadata.
@op(
out={"my_table": Out(metadata={"schema": "my_schema"})}
)
def make_my_table() -> pl.DataFrame:
...If none of these is provided, the schema will default to “public”.
To only use specific columns of a table as input to a downstream op or asset, add the metadata “columns” to the In or AssetIn.
@asset(
ins={"my_table": AssetIn("my_table", metadata={"columns": ["a"]})}
)
def my_table_a(my_table: pl.DataFrame) -> pl.DataFrame:
# my_table will just contain the data from column "a"
...
class
dagster_snowflake_polars.SnowflakePolarsIOManager- beta
This API is currently in beta, and may have breaking changes in minor version releases, with behavior changes in patch releases.
An I/O manager definition that reads inputs from and writes Polars DataFrames to Snowflake. When using the SnowflakePolarsIOManager, any inputs and outputs without type annotations will be loaded as Polars DataFrames.
Returns: IOManagerDefinition Examples:
from dagster_snowflake_polars import SnowflakePolarsIOManager
from dagster import asset, Definitions, EnvVar
import polars as pl
@asset(
key_prefix=["my_schema"] # will be used as the schema in snowflake
)
def my_table() -> pl.DataFrame: # the name of the asset will be the table name
...
defs = Definitions(
assets=[my_table],
resources={
"io_manager": SnowflakePolarsIOManager(database="MY_DATABASE", account=EnvVar("SNOWFLAKE_ACCOUNT"), ...)
}
)You can set a default schema to store the assets using the
schema
configuration value of the Snowflake I/O Manager. This schema will be used if no other schema is specified directly on an asset or op.defs = Definitions(
assets=[my_table],
resources={
"io_manager": SnowflakePolarsIOManager(database="my_database", schema="my_schema", ...)
}
)On individual assets, you can also specify the schema where they should be stored using metadata or by adding a
key_prefix
to the asset key. If bothkey_prefix
and metadata are defined, the metadata will take precedence.@asset(
key_prefix=["my_schema"] # will be used as the schema in snowflake
)
def my_table() -> pl.DataFrame:
...
@asset(
metadata={"schema": "my_schema"} # will be used as the schema in snowflake
)
def my_other_table() -> pl.DataFrame:
...For ops, the schema can be specified by including a “schema” entry in output metadata.
@op(
out={"my_table": Out(metadata={"schema": "my_schema"})}
)
def make_my_table() -> pl.DataFrame:
...If none of these is provided, the schema will default to “public”.
To only use specific columns of a table as input to a downstream op or asset, add the metadata “columns” to the In or AssetIn.
@asset(
ins={"my_table": AssetIn("my_table", metadata={"columns": ["a"]})}
)
def my_table_a(my_table: pl.DataFrame) -> pl.DataFrame:
# my_table will just contain the data from column "a"
...
Type Handler
class
dagster_snowflake_polars.SnowflakePolarsTypeHandler- beta
This API is currently in beta, and may have breaking changes in minor version releases, with behavior changes in patch releases.
Plugin for the Snowflake I/O Manager that can store and load Polars DataFrames as Snowflake tables.
This handler uses Polars’ native write_database method with ADBC (Arrow Database Connectivity) for efficient data transfer without converting to pandas.
Examples:
from dagster_snowflake import SnowflakeIOManager
from dagster_snowflake_polars import SnowflakePolarsTypeHandler
from dagster import Definitions, EnvVar
class MySnowflakeIOManager(SnowflakeIOManager):
@staticmethod
def type_handlers() -> Sequence[DbTypeHandler]:
return [SnowflakePolarsTypeHandler()]
@asset(
key_prefix=["my_schema"] # will be used as the schema in snowflake
)
def my_table() -> pl.DataFrame: # the name of the asset will be the table name
...
defs = Definitions(
assets=[my_table],
resources={
"io_manager": MySnowflakeIOManager(database="MY_DATABASE", account=EnvVar("SNOWFLAKE_ACCOUNT"), ...)
}
)