Sometimes you may not want to fetch an entire table as the input to a downstream asset. With the Snowflake I/O manager, you can select specific columns to load by supplying metadata on the downstream asset.
import pandas as pd
from dagster import AssetIn, asset
# this example uses the iris_dataset asset from Step 2 of the Using Dagster with Snowflake tutorial@asset(
ins={"iris_sepal": AssetIn(
key="iris_dataset",
metadata={"columns":["Sepal length (cm)","Sepal width (cm)"]},)})defsepal_data(iris_sepal: pd.DataFrame)-> pd.DataFrame:
iris_sepal["Sepal area (cm2)"]=(
iris_sepal["Sepal length (cm)"]* iris_sepal["Sepal width (cm)"])return iris_sepal
In this example, we only use the columns containing sepal data from the IRIS_DATASET table created in Step 2: Create tables in Snowflake of the Using Dagster with Snowflake tutorial. Fetching the entire table would be unnecessarily costly, so to select specific columns, we can add metadata to the input asset. We do this in the metadata parameter of the AssetIn that loads the iris_dataset asset in the ins parameter. We supply the key columns with a list of names of the columns we want to fetch.
When Dagster materializes sepal_data and loads the iris_dataset asset using the Snowflake I/O manager, it will only fetch the Sepal length (cm) and Sepal width (cm) columns of the FLOWERS.IRIS.IRIS_DATASET table and pass them to sepal_data as a Pandas DataFrame.
You may want to have different assets stored in different Snowflake schemas. The Snowflake I/O manager allows you to specify the schema in several ways.
In this example, the iris_dataset asset will be stored in the IRIS schema, and the daffodil_dataset asset will be found in the DAFFODIL schema.
The two options for specifying schema are mutually exclusive. If you provide schema configuration to the I/O manager, you cannot also provide it via the asset key and vice versa. If no schema is provided, either from configuration or asset keys, the default schema PUBLIC will be used.
Using the Snowflake I/O manager with other I/O managers#
You may have assets that you don't want to store in Snowflake. You can provide an I/O manager to each asset using the io_manager_key parameter in the asset decorator:
import pandas as pd
from dagster_aws.s3.io_manager import s3_pickle_io_manager
from dagster_snowflake_pandas import snowflake_pandas_io_manager
from dagster import Definitions, asset
@asset(io_manager_key="warehouse_io_manager")defiris_dataset()-> pd.DataFrame:return pd.read_csv("https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data",
names=["Sepal length (cm)","Sepal width (cm)","Petal length (cm)","Petal width (cm)","Species",],)@asset(io_manager_key="blob_io_manager")defiris_plots(iris_dataset):# plot_data is a function we've defined somewhere else# that plots the data in a DataFramereturn plot_data(iris_dataset)
defs = Definitions(
assets=[iris_dataset, iris_plots],
resources={"warehouse_io_manager": snowflake_pandas_io_manager.configured({"database":"FLOWERS","schema":"IRIS","account":"abc1234.us-east-1","user":{"env":"SNOWFLAKE_USER"},"password":{"env":"SNOWFLAKE_PASSWORD"},}),"blob_io_manager": s3_pickle_io_manager,},)
In this example, the iris_dataset asset uses the I/O manager bound to the key warehouse_io_manager and iris_plots will use the I/O manager bound to the key blob_io_manager. In the Definitions object, we supply the I/O managers for those keys. When materialize these assets, the iris_dataset will get stored in Snowflake, and iris_plots will get saved in S3.
Executing custom SQL commands with the Snowflake Resource#
In addition to the Snowflake I/O manager, Dagster also provides a Snowflake resource for executing custom SQL queries.
from dagster_snowflake import snowflake_resource
from dagster import Definitions, asset
# this example executes a query against the IRIS_DATASET table created in Step 2 of the# Using Dagster with Snowflake tutorial@asset(required_resource_keys={"snowflake"})defsmall_petals(context):return context.resources.snowflake.execute_query(('SELECT * FROM IRIS_DATASET WHERE "Petal length (cm)" < 1 AND "Petal width'' (cm)" < 1'),
fetch_results=True,
use_pandas_result=True,)
defs = Definitions(
assets=[small_petals],
resources={"snowflake": snowflake_resource.configured({"account":"abc1234.us-east-1","user":{"env":"SNOWFLAKE_USER"},"password":{"env":"SNOWFLAKE_PASSWORD"},"database":"FLOWERS","schema":"IRIS,",})},)
In this example, we attach the Snowflake resource to the small_petals asset. In the body of the asset function, we use the execute_query method of the resource to execute a custom SQL query against the IRIS_DATASET table created in Step 2: Create tables in Snowflake of the Using Dagster with Snowflake tutorial.
For more information on the Snowflake resource, including additional configuration settings, see the Snowflake resource API docs.