When working with software-defined assets, it's sometimes inconvenient or impossible to map each persisted asset to a unique op or graph. A multi-asset is a way to define a single op or graph that will produce the contents of multiple data assets at once.
Multi-assets may be useful in the following scenarios:
A single call to an API results in multiple tables being updated (e.g. Airbyte, Fivetran, dbt).
The same in-memory object is used to compute multiple assets.
The function responsible for computing the contents of any software-defined asset is an op. Multi-assets are responsible for updating multiple assets, so the underlying op will have multiple outputs -- one for each associated asset.
The easiest way to create a multi-asset is with the @multi_asset decorator. This decorator functions similarly to the @asset decorator, but requires an outs parameter specifying each output asset of the function.
from dagster import AssetOut, multi_asset
@multi_asset(
outs={"my_string_asset": AssetOut(),"my_int_asset": AssetOut(),})defmy_function():return"abc",123
By default, the names of the outputs will be used to form the asset keys of the multi-asset. The decorated function will be used to create the op for these assets and must emit an output for each of them. In this case, we can emit multiple outputs by returning a tuple of values, one for each asset.
In some cases, an asset may not need to be updated in storage each time the decorated function is executed. You can use the is_required parameter along with yield syntax to implement this behavior.
If the is_required parameter is set to False on an output, and your function does not yield an Output object for that output, then no asset materialization event will be created, the I/O manager will not be invoked, downstream assets will not be materialized, and asset sensors monitoring the asset will not trigger.
import random
from dagster import AssetOut, Output, asset, multi_asset
@multi_asset(
outs={"asset1": AssetOut(is_required=False),"asset2": AssetOut(is_required=False)})defassets_1_and_2():if random.randint(1,10)<5:yield Output([1,2,3,4], output_name="asset1")if random.randint(1,10)<5:yield Output([6,7,8,9], output_name="asset2")@assetdefdownstream1(asset1):# will not run when assets_1_and_2 doesn't materialize the asset1return asset1 +[5]@assetdefdownstream2(asset2):# will not run when assets_1_and_2 doesn't materialize the asset2return asset2 +[10]
By default, it is assumed that the computation inside of a multi-asset will always produce the contents all of the associated assets. This means that attempting to execute a set of assets that produces some, but not all, of the assets defined by a given multi-asset will result in an error.
Sometimes, the underlying computation is sufficiently flexible to allow for computing an arbitrary subset of the assets associated with it. In these cases, set the is_required attribute of the outputs to False, and set the can_subset parameter of the decorator to True.
Inside the body of the function, we can use context.selected_output_names or context.selected_asset_keys to find out which computations should be run.
Because our outputs are now optional, we can no longer rely on returning a tuple of values, as we don't know in advance which values will be computed. Instead, we explicitly yield each output that we're expected to create.
When a multi-asset is created, it is assumed that each output asset depends on all of the input assets. This may not always be the case, however.
In these situations, you may optionally provide a mapping from each output asset to the set of AssetKeys that it depends on. This information is used to display lineage information in Dagit and for parsing selections over your asset graph.
from dagster import AssetKey, AssetOut, Output, multi_asset
@multi_asset(
outs={"c": AssetOut(),"d": AssetOut()},
internal_asset_deps={"c":{AssetKey("a")},"d":{AssetKey("b")},},)defmy_complex_assets(a, b):# c only depends on ayield Output(value=a +1, output_name="c")# d only depends on byield Output(value=b +1, output_name="d")