This guide focuses on storing and loading Pandas DataFrames in BigQuery. Dagster also supports using PySpark DataFrames with BigQuery. The concepts from this guide apply to working with PySpark DataFrames, and you can learn more about setting up and using the BigQuery I/O manager with PySpark DataFrames in the reference guide.
To install the dagster-gcp and dagster-gcp-pandas libraries:
pip install dagster-gcp dagster-gcp-pandas
To gather the following information:
Google Cloud Project (GCP) project name: You can find this by logging into GCP and choosing one of the project names listed in the dropdown in the top left corner.
GCP credentials: You can authenticate with GCP two ways: by following GCP authentication instructions here, or by providing credentials directly to the BigQuery I/O manager.
In this guide, we assume that you have run one of the gcloud auth commands or have set GOOGLE_APPLICATION_CREDENTIALS as specified in the linked instructions. For more information on providing credentials directly to the BigQuery I/O manager, see Providing credentials as configuration in the BigQuery reference guide.
You can also specify a location where data should be stored and processed and dataset that should hold the created tables. You can also set a timeout and number of retries when working with Pandas DataFrames.
from dagster_gcp_pandas import bigquery_pandas_io_manager
from dagster import Definitions
defs = Definitions(
assets=[iris_data],
resources={"io_manager": bigquery_pandas_io_manager.configured({"project":"my-gcp-project",# required"location":"us-east5",# optional, defaults to the default location for the project - see https://cloud.google.com/bigquery/docs/locations for a list of locations"dataset":"IRIS",# optional, defaults to PUBLIC"timeout":15.0,# optional, defaults to None})},)
With this configuration, if you materialized an asset called iris_data, the BigQuery I/O manager would store the data in the IRIS.IRIS_DATA table in the my-gcp-project project. The BigQuery instance would be located in us-east5.
Finally, in the Definitions object, we assign the bigquery_pandas_io_manager to the io_manager key. io_manager is a reserved key to set the default I/O manager for your assets.
For more info about each of the configuration values, refer to the bigquery_pandas_io_manager API documentation.
The BigQuery I/O manager can create and update tables for your Dagster defined assets, but you can also make existing BigQuery tables available to Dagster.
To store data in BigQuery using the BigQuery I/O manager, the definitions of your assets don't need to change. You can tell Dagster to use the BigQuery I/O manager, like in Step 1: Configure the BigQuery I/O manager, and Dagster will handle storing and loading your assets in BigQuery.
import pandas as pd
from dagster import asset
@assetdefiris_data()-> pd.DataFrame:return pd.read_csv("https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data",
names=["Sepal length (cm)","Sepal width (cm)","Petal length (cm)","Petal width (cm)","Species",],)
In this example, we first define our asset. Here, we are fetching the Iris dataset as a Pandas DataFrame and renaming the columns. The type signature of the function tells the I/O manager what data type it is working with, so it is important to include the return type pd.DataFrame.
When Dagster materializes the iris_data asset using the configuration from Step 1: Configure the BigQuery I/O manager, the BigQuery I/O manager will create the table IRIS.IRIS_DATA if it does not exist and replace the contents of the table with the value returned from the iris_data asset.
You may already have tables in BigQuery that you want to make available to other Dagster assets. You can create source assets for these tables. By creating a source asset for the existing table, you tell Dagster how to find the table so it can be fetched for downstream assets.
from dagster import SourceAsset
iris_harvest_data = SourceAsset(key="iris_harvest_data")
In this example, we create a SourceAsset for a pre-existing table - perhaps created by an external data ingestion tool - that contains data about iris harvests. To make the data available to other Dagster assets, we need to tell the BigQuery I/O manager how to find the data.
Since we supply the project and dataset in the I/O manager configuration in Step 1: Configure the BigQuery I/O manager, we only need to provide the table name. We do this with the key parameter in SourceAsset. When the I/O manager needs to load the iris_harvest_data in a downstream asset, it will select the data in the IRIS.IRIS_HARVEST_DATA table as a Pandas DataFrame and provide it to the downstream asset.
Step 3: Load BigQuery tables in downstream assets#
Once you have created an asset or source asset that represents a table in BigQuery, you will likely want to create additional assets that work with the data. Dagster and the BigQuery I/O manager allow you to load the data stored in BigQuery tables into downstream assets.
import pandas as pd
from dagster import asset
# this example uses the iris_data asset from Step 2@assetdefiris_cleaned(iris_data: pd.DataFrame):return iris_data.dropna().drop_duplicates()
In iris_cleaned, the iris_data parameter tells Dagster that the value for the iris_data asset should be provided as input to iris_cleaned. If this feels too magical for you, refer to the docs for explicitly specifying dependencies.
When materializing these assets, Dagster will use the bigquery_pandas_io_manager to fetch the IRIS.IRIS_DATA as a Pandas DataFrame and pass this DataFrame as the iris_data parameter to iris_cleaned. When iris_cleaned returns a Pandas DataFrame, Dagster will use the bigquery_pandas_io_manager to store the DataFrame as the IRIS.IRIS_CLEANED table in BigQuery.