import warnings
from inspect import Parameter
from typing import (
AbstractSet,
Any,
Callable,
Dict,
Mapping,
Optional,
Set,
Tuple,
Union,
cast,
overload,
)
import dagster._check as check
from dagster._builtins import Nothing
from dagster._config import UserConfigSchema
from dagster._core.decorator_utils import get_function_params, get_valid_name_permutations
from dagster._core.definitions.freshness_policy import FreshnessPolicy
from dagster._core.definitions.metadata import MetadataUserInput
from dagster._core.definitions.resource_annotation import get_resource_args
from dagster._core.errors import DagsterInvalidDefinitionError
from dagster._core.storage.io_manager import IOManagerDefinition
from dagster._core.types.dagster_type import DagsterType
from dagster._utils.backcompat import (
ExperimentalWarning,
deprecation_warning,
experimental_arg_warning,
)
from ..asset_in import AssetIn
from ..asset_out import AssetOut
from ..assets import AssetsDefinition
from ..decorators.graph_decorator import graph
from ..decorators.op_decorator import _Op
from ..events import AssetKey, CoercibleToAssetKeyPrefix
from ..input import In
from ..output import GraphOut, Out
from ..partition import PartitionsDefinition
from ..policy import RetryPolicy
from ..resource_definition import ResourceDefinition
from ..utils import DEFAULT_IO_MANAGER_KEY, NoValueSentinel
@overload
def asset(
compute_fn: Callable,
) -> AssetsDefinition:
...
@overload
def asset(
*,
name: Optional[str] = ...,
key_prefix: Optional[CoercibleToAssetKeyPrefix] = None,
ins: Optional[Mapping[str, AssetIn]] = ...,
non_argument_deps: Optional[Union[Set[AssetKey], Set[str]]] = ...,
metadata: Optional[Mapping[str, Any]] = ...,
description: Optional[str] = ...,
config_schema: Optional[UserConfigSchema] = None,
required_resource_keys: Optional[Set[str]] = ...,
resource_defs: Optional[Mapping[str, ResourceDefinition]] = ...,
io_manager_def: Optional[IOManagerDefinition] = ...,
io_manager_key: Optional[str] = ...,
compute_kind: Optional[str] = ...,
dagster_type: Optional[DagsterType] = ...,
partitions_def: Optional[PartitionsDefinition[Any]] = ...,
op_tags: Optional[Mapping[str, Any]] = ...,
group_name: Optional[str] = ...,
output_required: bool = ...,
freshness_policy: Optional[FreshnessPolicy] = ...,
retry_policy: Optional[RetryPolicy] = ...,
code_version: Optional[str] = ...,
) -> Callable[[Callable[..., Any]], AssetsDefinition]:
...
[docs]def asset(
compute_fn: Optional[Callable] = None,
*,
name: Optional[str] = None,
key_prefix: Optional[CoercibleToAssetKeyPrefix] = None,
ins: Optional[Mapping[str, AssetIn]] = None,
non_argument_deps: Optional[Union[Set[AssetKey], Set[str]]] = None,
metadata: Optional[Mapping[str, Any]] = None,
description: Optional[str] = None,
config_schema: Optional[UserConfigSchema] = None,
required_resource_keys: Optional[Set[str]] = None,
resource_defs: Optional[Mapping[str, ResourceDefinition]] = None,
io_manager_def: Optional[IOManagerDefinition] = None,
io_manager_key: Optional[str] = None,
compute_kind: Optional[str] = None,
dagster_type: Optional[DagsterType] = None,
partitions_def: Optional[PartitionsDefinition[Any]] = None,
op_tags: Optional[Mapping[str, Any]] = None,
group_name: Optional[str] = None,
output_required: bool = True,
freshness_policy: Optional[FreshnessPolicy] = None,
retry_policy: Optional[RetryPolicy] = None,
code_version: Optional[str] = None,
) -> Union[AssetsDefinition, Callable[[Callable[..., Any]], AssetsDefinition]]:
"""Create a definition for how to compute an asset.
A software-defined asset is the combination of:
1. An asset key, e.g. the name of a table.
2. A function, which can be run to compute the contents of the asset.
3. A set of upstream assets that are provided as inputs to the function when computing the asset.
Unlike an op, whose dependencies are determined by the graph it lives inside, an asset knows
about the upstream assets it depends on. The upstream assets are inferred from the arguments
to the decorated function. The name of the argument designates the name of the upstream asset.
An asset has an op inside it to represent the function that computes it. The name of the op
will be the segments of the asset key, separated by double-underscores.
Args:
name (Optional[str]): The name of the asset. If not provided, defaults to the name of the
decorated function. The asset's name must be a valid name in dagster (ie only contains
letters, numbers, and _) and may not contain python reserved keywords.
key_prefix (Optional[Union[str, Sequence[str]]]): If provided, the asset's key is the
concatenation of the key_prefix and the asset's name, which defaults to the name of
the decorated function. Each item in key_prefix must be a valid name in dagster (ie only
contains letters, numbers, and _) and may not contain python reserved keywords.
ins (Optional[Mapping[str, AssetIn]]): A dictionary that maps input names to information
about the input.
non_argument_deps (Optional[Union[Set[AssetKey], Set[str]]]): Set of asset keys that are
upstream dependencies, but do not pass an input to the asset.
config_schema (Optional[ConfigSchema): The configuration schema for the asset's underlying
op. If set, Dagster will check that config provided for the op matches this schema and fail
if it does not. If not set, Dagster will accept any config provided for the op.
metadata (Optional[Dict[str, Any]]): A dict of metadata entries for the asset.
required_resource_keys (Optional[Set[str]]): Set of resource handles required by the op.
io_manager_key (Optional[str]): The resource key of the IOManager used
for storing the output of the op as an asset, and for loading it in downstream ops
(default: "io_manager"). Only one of io_manager_key and io_manager_def can be provided.
io_manager_def (Optional[IOManagerDefinition]): (Experimental) The definition of the IOManager used for
storing the output of the op as an asset, and for loading it in
downstream ops. Only one of io_manager_def and io_manager_key can be provided.
compute_kind (Optional[str]): A string to represent the kind of computation that produces
the asset, e.g. "dbt" or "spark". It will be displayed in Dagit as a badge on the asset.
dagster_type (Optional[DagsterType]): Allows specifying type validation functions that
will be executed on the output of the decorated function after it runs.
partitions_def (Optional[PartitionsDefinition]): Defines the set of partition keys that
compose the asset.
op_tags (Optional[Dict[str, Any]]): A dictionary of tags for the op that computes the asset.
Frameworks may expect and require certain metadata to be attached to a op. Values that
are not strings will be json encoded and must meet the criteria that
`json.loads(json.dumps(value)) == value`.
group_name (Optional[str]): A string name used to organize multiple assets into groups. If not provided,
the name "default" is used.
resource_defs (Optional[Mapping[str, ResourceDefinition]]):
(Experimental) A mapping of resource keys to resource definitions. These resources
will be initialized during execution, and can be accessed from the
context within the body of the function.
output_required (bool): Whether the decorated function will always materialize an asset.
Defaults to True. If False, the function can return None, which will not be materialized to
storage and will halt execution of downstream assets.
freshness_policy (FreshnessPolicy): A constraint telling Dagster how often this asset is intended to be updated
with respect to its root data.
retry_policy (Optional[RetryPolicy]): The retry policy for the op that computes the asset.
code_version (Optional[str]): (Experimental) Version of the code that generates this asset. In
general, versions should be set only for code that deterministically produces the same
output when given the same inputs.
Examples:
.. code-block:: python
@asset
def my_asset(my_upstream_asset: int) -> int:
return my_upstream_asset + 1
"""
def create_asset():
return _Asset(
name=cast(Optional[str], name), # (mypy bug that it can't infer name is Optional[str])
key_prefix=key_prefix,
ins=ins,
non_argument_deps=_make_asset_keys(non_argument_deps),
metadata=metadata,
description=description,
config_schema=config_schema,
required_resource_keys=required_resource_keys,
resource_defs=resource_defs,
io_manager=io_manager_def or io_manager_key,
compute_kind=check.opt_str_param(compute_kind, "compute_kind"),
dagster_type=dagster_type,
partitions_def=partitions_def,
op_tags=op_tags,
group_name=group_name,
output_required=output_required,
freshness_policy=freshness_policy,
retry_policy=retry_policy,
code_version=code_version,
)
if compute_fn is not None:
return create_asset()(compute_fn)
def inner(fn: Callable[..., Any]) -> AssetsDefinition:
check.invariant(
not (io_manager_key and io_manager_def),
(
"Both io_manager_key and io_manager_def were provided to `@asset` decorator. Please"
" provide one or the other. "
),
)
if resource_defs is not None:
experimental_arg_warning("resource_defs", "asset")
if io_manager_def is not None:
experimental_arg_warning("io_manager_def", "asset")
return create_asset()(fn)
return inner
class _Asset:
def __init__(
self,
name: Optional[str] = None,
key_prefix: Optional[CoercibleToAssetKeyPrefix] = None,
ins: Optional[Mapping[str, AssetIn]] = None,
non_argument_deps: Optional[Set[AssetKey]] = None,
metadata: Optional[Mapping[str, Any]] = None,
description: Optional[str] = None,
config_schema: Optional[UserConfigSchema] = None,
required_resource_keys: Optional[Set[str]] = None,
resource_defs: Optional[Mapping[str, ResourceDefinition]] = None,
io_manager: Optional[Union[str, IOManagerDefinition]] = None,
compute_kind: Optional[str] = None,
dagster_type: Optional[DagsterType] = None,
partitions_def: Optional[PartitionsDefinition] = None,
op_tags: Optional[Mapping[str, Any]] = None,
group_name: Optional[str] = None,
output_required: bool = True,
freshness_policy: Optional[FreshnessPolicy] = None,
retry_policy: Optional[RetryPolicy] = None,
code_version: Optional[str] = None,
):
self.name = name
if isinstance(key_prefix, str):
key_prefix = [key_prefix]
self.key_prefix = key_prefix
self.ins = ins or {}
self.non_argument_deps = non_argument_deps
self.metadata = metadata
self.description = description
self.required_resource_keys = check.opt_set_param(
required_resource_keys, "required_resource_keys"
)
self.io_manager = io_manager
self.config_schema = config_schema
self.compute_kind = compute_kind
self.dagster_type = dagster_type
self.partitions_def = partitions_def
self.op_tags = op_tags
self.resource_defs = dict(check.opt_mapping_param(resource_defs, "resource_defs"))
self.group_name = group_name
self.output_required = output_required
self.freshness_policy = freshness_policy
self.retry_policy = retry_policy
self.code_version = code_version
def __call__(self, fn: Callable) -> AssetsDefinition:
asset_name = self.name or fn.__name__
asset_ins = build_asset_ins(fn, self.ins or {}, self.non_argument_deps)
out_asset_key = AssetKey(list(filter(None, [*(self.key_prefix or []), asset_name])))
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=ExperimentalWarning)
arg_resource_keys = {arg.name for arg in get_resource_args(fn)}
bare_required_resource_keys = set(self.required_resource_keys)
resource_defs_keys = set(self.resource_defs.keys())
decorator_resource_keys = bare_required_resource_keys | resource_defs_keys
check.param_invariant(
len(bare_required_resource_keys) == 0 or len(arg_resource_keys) == 0,
(
"Cannot specify resource requirements in both @asset decorator and as arguments"
" to the decorated function"
),
)
if isinstance(self.io_manager, str):
io_manager_key = cast(str, self.io_manager)
elif self.io_manager is not None:
io_manager_def = check.inst_param(
self.io_manager, "io_manager", IOManagerDefinition
)
io_manager_key = out_asset_key.to_python_identifier("io_manager")
self.resource_defs[io_manager_key] = cast(ResourceDefinition, io_manager_def)
else:
io_manager_key = DEFAULT_IO_MANAGER_KEY
out = Out(
metadata=self.metadata or {},
io_manager_key=io_manager_key,
dagster_type=self.dagster_type if self.dagster_type else NoValueSentinel,
description=self.description,
is_required=self.output_required,
code_version=self.code_version,
)
op_required_resource_keys = decorator_resource_keys - arg_resource_keys
op = _Op(
name=out_asset_key.to_python_identifier(),
description=self.description,
ins=dict(asset_ins.values()),
out=out,
# Any resource requirements specified as arguments will be identified as
# part of the Op definition instantiation
required_resource_keys=op_required_resource_keys,
tags={
**({"kind": self.compute_kind} if self.compute_kind else {}),
**(self.op_tags or {}),
},
config_schema=self.config_schema,
retry_policy=self.retry_policy,
code_version=self.code_version,
)(fn)
keys_by_input_name = {
input_name: asset_key for asset_key, (input_name, _) in asset_ins.items()
}
partition_mappings = {
keys_by_input_name[input_name]: asset_in.partition_mapping
for input_name, asset_in in self.ins.items()
if asset_in.partition_mapping is not None
}
return AssetsDefinition(
keys_by_input_name=keys_by_input_name,
keys_by_output_name={"result": out_asset_key},
node_def=op,
partitions_def=self.partitions_def,
partition_mappings=partition_mappings if partition_mappings else None,
resource_defs=self.resource_defs,
group_names_by_key={out_asset_key: self.group_name} if self.group_name else None,
freshness_policies_by_key={out_asset_key: self.freshness_policy}
if self.freshness_policy
else None,
)
[docs]def multi_asset(
*,
outs: Mapping[str, AssetOut],
name: Optional[str] = None,
ins: Optional[Mapping[str, AssetIn]] = None,
non_argument_deps: Optional[Union[Set[AssetKey], Set[str]]] = None,
description: Optional[str] = None,
config_schema: Optional[UserConfigSchema] = None,
required_resource_keys: Optional[Set[str]] = None,
compute_kind: Optional[str] = None,
internal_asset_deps: Optional[Mapping[str, Set[AssetKey]]] = None,
partitions_def: Optional[PartitionsDefinition[object]] = None,
op_tags: Optional[Mapping[str, Any]] = None,
can_subset: bool = False,
resource_defs: Optional[Mapping[str, ResourceDefinition]] = None,
group_name: Optional[str] = None,
retry_policy: Optional[RetryPolicy] = None,
code_version: Optional[str] = None,
) -> Callable[[Callable[..., Any]], AssetsDefinition]:
"""Create a combined definition of multiple assets that are computed using the same op and same
upstream assets.
Each argument to the decorated function references an upstream asset that this asset depends on.
The name of the argument designates the name of the upstream asset.
Args:
name (Optional[str]): The name of the op.
outs: (Optional[Dict[str, AssetOut]]): The AssetOuts representing the produced assets.
ins (Optional[Mapping[str, AssetIn]]): A dictionary that maps input names to information
about the input.
non_argument_deps (Optional[Union[Set[AssetKey], Set[str]]]): Set of asset keys that are upstream
dependencies, but do not pass an input to the multi_asset.
config_schema (Optional[ConfigSchema): The configuration schema for the asset's underlying
op. If set, Dagster will check that config provided for the op matches this schema and fail
if it does not. If not set, Dagster will accept any config provided for the op.
required_resource_keys (Optional[Set[str]]): Set of resource handles required by the underlying op.
compute_kind (Optional[str]): A string to represent the kind of computation that produces
the asset, e.g. "dbt" or "spark". It will be displayed in Dagit as a badge on the asset.
internal_asset_deps (Optional[Mapping[str, Set[AssetKey]]]): By default, it is assumed
that all assets produced by a multi_asset depend on all assets that are consumed by that
multi asset. If this default is not correct, you pass in a map of output names to a
corrected set of AssetKeys that they depend on. Any AssetKeys in this list must be either
used as input to the asset or produced within the op.
partitions_def (Optional[PartitionsDefinition]): Defines the set of partition keys that
compose the assets.
op_tags (Optional[Dict[str, Any]]): A dictionary of tags for the op that computes the asset.
Frameworks may expect and require certain metadata to be attached to a op. Values that
are not strings will be json encoded and must meet the criteria that
`json.loads(json.dumps(value)) == value`.
can_subset (bool): If this asset's computation can emit a subset of the asset
keys based on the context.selected_assets argument. Defaults to False.
resource_defs (Optional[Mapping[str, ResourceDefinition]]):
(Experimental) A mapping of resource keys to resource definitions. These resources
will be initialized during execution, and can be accessed from the
context within the body of the function.
group_name (Optional[str]): A string name used to organize multiple assets into groups. This
group name will be applied to all assets produced by this multi_asset.
retry_policy (Optional[RetryPolicy]): The retry policy for the op that computes the asset.
code_version (Optional[str]): (Experimental) Version of the code encapsulated by the multi-asset. If set,
this is used as a default code version for all defined assets.
"""
if resource_defs is not None:
experimental_arg_warning("resource_defs", "multi_asset")
asset_deps = check.opt_mapping_param(
internal_asset_deps, "internal_asset_deps", key_type=str, value_type=set
)
required_resource_keys = check.opt_set_param(
required_resource_keys, "required_resource_keys", of_type=str
)
resource_defs = check.opt_mapping_param(
resource_defs, "resource_defs", key_type=str, value_type=ResourceDefinition
)
_config_schema = check.opt_mapping_param(
config_schema,
"config_schema",
additional_message="Only dicts are supported for asset config_schema.",
)
bare_required_resource_keys = set(required_resource_keys)
resource_defs_keys = set(resource_defs.keys())
required_resource_keys = bare_required_resource_keys | resource_defs_keys
for out in outs.values():
if isinstance(out, Out) and not isinstance(out, AssetOut):
deprecation_warning(
"Passing Out objects as values for the out argument of @multi_asset",
"1.0.0",
additional_warn_txt="Use AssetOut instead.",
)
def inner(fn: Callable[..., Any]) -> AssetsDefinition:
op_name = name or fn.__name__
asset_ins = build_asset_ins(
fn, ins or {}, non_argument_deps=_make_asset_keys(non_argument_deps)
)
asset_outs = build_asset_outs(outs)
arg_resource_keys = {arg.name for arg in get_resource_args(fn)}
check.param_invariant(
len(bare_required_resource_keys or []) == 0 or len(arg_resource_keys) == 0,
(
"Cannot specify resource requirements in both @multi_asset decorator and as"
" arguments to the decorated function"
),
)
# validate that the asset_deps make sense
valid_asset_deps = set(asset_ins.keys()) | set(asset_outs.keys())
for out_name, asset_keys in asset_deps.items():
check.invariant(
out_name in outs,
(
f"Invalid out key '{out_name}' supplied to `internal_asset_deps` argument for"
f" multi-asset {op_name}. Must be one of the outs for this multi-asset"
f" {list(outs.keys())[:20]}."
),
)
invalid_asset_deps = asset_keys.difference(valid_asset_deps)
check.invariant(
not invalid_asset_deps,
(
f"Invalid asset dependencies: {invalid_asset_deps} specified in"
f" `internal_asset_deps` argument for multi-asset '{op_name}' on key"
f" '{out_name}'. Each specified asset key must be associated with an input to"
" the asset or produced by this asset. Valid keys:"
f" {list(valid_asset_deps)[:20]}"
),
)
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=ExperimentalWarning)
op_required_resource_keys = required_resource_keys - arg_resource_keys
op = _Op(
name=op_name,
description=description,
ins=dict(asset_ins.values()),
out=dict(asset_outs.values()),
required_resource_keys=op_required_resource_keys,
tags={
**({"kind": compute_kind} if compute_kind else {}),
**(op_tags or {}),
},
config_schema=_config_schema,
retry_policy=retry_policy,
code_version=code_version,
)(fn)
keys_by_input_name = {
input_name: asset_key for asset_key, (input_name, _) in asset_ins.items()
}
keys_by_output_name = {
output_name: asset_key for asset_key, (output_name, _) in asset_outs.items()
}
# source group names from the AssetOuts (if any)
group_names_by_key = {
keys_by_output_name[output_name]: out.group_name
for output_name, out in outs.items()
if isinstance(out, AssetOut) and out.group_name is not None
}
if group_name:
check.invariant(
not group_names_by_key,
(
"Cannot set group_name parameter on multi_asset if one or more of the AssetOuts"
" supplied to this multi_asset have a group_name defined."
),
)
group_names_by_key = {
asset_key: group_name for asset_key in keys_by_output_name.values()
}
# source freshness policies from the AssetOuts (if any)
freshness_policies_by_key = {
keys_by_output_name[output_name]: out.freshness_policy
for output_name, out in outs.items()
if isinstance(out, AssetOut) and out.freshness_policy is not None
}
partition_mappings = {
keys_by_input_name[input_name]: asset_in.partition_mapping
for input_name, asset_in in (ins or {}).items()
if asset_in.partition_mapping is not None
}
return AssetsDefinition(
keys_by_input_name=keys_by_input_name,
keys_by_output_name=keys_by_output_name,
node_def=op,
asset_deps={keys_by_output_name[name]: asset_deps[name] for name in asset_deps},
partitions_def=partitions_def,
partition_mappings=partition_mappings if partition_mappings else None,
can_subset=can_subset,
resource_defs=resource_defs,
group_names_by_key=group_names_by_key,
freshness_policies_by_key=freshness_policies_by_key,
)
return inner
def build_asset_ins(
fn: Callable,
asset_ins: Mapping[str, AssetIn],
non_argument_deps: Optional[AbstractSet[AssetKey]],
) -> Mapping[AssetKey, Tuple[str, In]]:
"""Creates a mapping from AssetKey to (name of input, In object)."""
non_argument_deps = check.opt_set_param(non_argument_deps, "non_argument_deps", AssetKey)
params = get_function_params(fn)
is_context_provided = len(params) > 0 and params[0].name in get_valid_name_permutations(
"context"
)
input_params = params[1:] if is_context_provided else params
# Filter config, resource args
resource_arg_names = {arg.name for arg in get_resource_args(fn)}
new_input_args = []
for input_arg in input_params:
if input_arg.name != "config" and input_arg.name not in resource_arg_names:
new_input_args.append(input_arg)
input_params = new_input_args
non_var_input_param_names = [
param.name for param in new_input_args if param.kind == Parameter.POSITIONAL_OR_KEYWORD
]
has_kwargs = any(param.kind == Parameter.VAR_KEYWORD for param in new_input_args)
all_input_names = set(non_var_input_param_names) | asset_ins.keys()
if not has_kwargs:
for in_key, asset_in in asset_ins.items():
if in_key not in non_var_input_param_names and (
not isinstance(asset_in.dagster_type, DagsterType)
or not asset_in.dagster_type.is_nothing
):
raise DagsterInvalidDefinitionError(
f"Key '{in_key}' in provided ins dict does not correspond to any of the names "
"of the arguments to the decorated function"
)
ins_by_asset_key: Dict[AssetKey, Tuple[str, In]] = {}
for input_name in all_input_names:
asset_key = None
if input_name in asset_ins:
asset_key = asset_ins[input_name].key
metadata = asset_ins[input_name].metadata or {}
key_prefix = asset_ins[input_name].key_prefix
input_manager_key = asset_ins[input_name].input_manager_key
dagster_type = asset_ins[input_name].dagster_type
else:
metadata = {}
key_prefix = None
input_manager_key = None
dagster_type = NoValueSentinel
asset_key = asset_key or AssetKey(list(filter(None, [*(key_prefix or []), input_name])))
ins_by_asset_key[asset_key] = (
input_name.replace("-", "_"),
In(metadata=metadata, input_manager_key=input_manager_key, dagster_type=dagster_type),
)
for asset_key in non_argument_deps:
stringified_asset_key = "_".join(asset_key.path).replace("-", "_")
# mypy doesn't realize that Nothing is a valid type here
ins_by_asset_key[asset_key] = (stringified_asset_key, In(cast(type, Nothing)))
return ins_by_asset_key
@overload
def graph_asset(compose_fn: Callable) -> AssetsDefinition:
...
@overload
def graph_asset(
*,
name: Optional[str] = None,
key_prefix: Optional[CoercibleToAssetKeyPrefix] = None,
ins: Optional[Mapping[str, AssetIn]] = None,
description: Optional[str] = None,
partitions_def: Optional[PartitionsDefinition] = None,
group_name: Optional[str] = None,
metadata: Optional[MetadataUserInput] = ...,
freshness_policy: Optional[FreshnessPolicy] = ...,
) -> Callable[[Callable[..., Any]], AssetsDefinition]:
...
def graph_asset(
compose_fn: Optional[Callable] = None,
*,
name: Optional[str] = None,
key_prefix: Optional[CoercibleToAssetKeyPrefix] = None,
ins: Optional[Mapping[str, AssetIn]] = None,
description: Optional[str] = None,
partitions_def: Optional[PartitionsDefinition] = None,
group_name: Optional[str] = None,
metadata: Optional[MetadataUserInput] = None,
freshness_policy: Optional[FreshnessPolicy] = None,
) -> Union[AssetsDefinition, Callable[[Callable[..., Any]], AssetsDefinition]]:
"""Creates a software-defined asset that's computed using a graph of ops.
This decorator is meant to decorate a function that composes a set of ops or graphs to define
the dependencies between them.
Args:
name (Optional[str]): The name of the asset. If not provided, defaults to the name of the
decorated function. The asset's name must be a valid name in dagster (ie only contains
letters, numbers, and _) and may not contain python reserved keywords.
key_prefix (Optional[Union[str, Sequence[str]]]): If provided, the asset's key is the
concatenation of the key_prefix and the asset's name, which defaults to the name of
the decorated function. Each item in key_prefix must be a valid name in dagster (ie only
contains letters, numbers, and _) and may not contain python reserved keywords.
ins (Optional[Mapping[str, AssetIn]]): A dictionary that maps input names to information
about the input.
partitions_def (Optional[PartitionsDefinition]): Defines the set of partition keys that
compose the asset.
group_name (Optional[str]): A string name used to organize multiple assets into groups. If
not provided, the name "default" is used.
metadata (Optional[MetadataUserInput]): Dictionary of metadata to be associated with
the asset.
freshness_policy (FreshnessPolicy): A constraint telling Dagster how often this asset is
intended to be updated with respect to its root data.
Examples:
.. code-block:: python
@op
def fetch_files_from_slack(context) -> pd.DataFrame:
...
@op
def store_files_in_table(files) -> None:
files.to_sql(name="slack_files", con=create_db_connection())
@graph_asset
def slack_files_table():
return store_files(fetch_files_from_slack())
"""
if compose_fn is not None:
return _GraphBackedAsset()(compose_fn)
def inner(fn: Callable[..., Any]) -> AssetsDefinition:
return _GraphBackedAsset(
name=cast(Optional[str], name), # (mypy bug that it can't infer name is Optional[str])
key_prefix=key_prefix,
ins=ins,
description=description,
partitions_def=partitions_def,
group_name=group_name,
metadata=metadata,
freshness_policy=freshness_policy,
)(fn)
return inner
class _GraphBackedAsset:
def __init__(
self,
name: Optional[str] = None,
key_prefix: Optional[CoercibleToAssetKeyPrefix] = None,
ins: Optional[Mapping[str, AssetIn]] = None,
description: Optional[str] = None,
partitions_def: Optional[PartitionsDefinition] = None,
group_name: Optional[str] = None,
metadata: Optional[MetadataUserInput] = None,
freshness_policy: Optional[FreshnessPolicy] = None,
):
self.name = name
if isinstance(key_prefix, str):
key_prefix = [key_prefix]
self.key_prefix = key_prefix
self.ins = ins or {}
self.description = description
self.partitions_def = partitions_def
self.group_name = group_name
self.metadata = metadata
self.freshness_policy = freshness_policy
def __call__(self, fn: Callable) -> AssetsDefinition:
asset_name = self.name or fn.__name__
asset_ins = build_asset_ins(fn, self.ins or {}, set())
out_asset_key = AssetKey(list(filter(None, [*(self.key_prefix or []), asset_name])))
keys_by_input_name = {
input_name: asset_key for asset_key, (input_name, _) in asset_ins.items()
}
partition_mappings = {
input_name: asset_in.partition_mapping
for input_name, asset_in in self.ins.items()
if asset_in.partition_mapping
}
op_graph = graph(name=out_asset_key.to_python_identifier(), description=self.description)(
fn
)
return AssetsDefinition.from_graph(
op_graph,
keys_by_input_name=keys_by_input_name,
keys_by_output_name={"result": out_asset_key},
partitions_def=self.partitions_def,
partition_mappings=partition_mappings if partition_mappings else None,
group_name=self.group_name,
metadata_by_output_name={"result": self.metadata} if self.metadata else None,
freshness_policies_by_output_name={"result": self.freshness_policy}
if self.freshness_policy
else None,
descriptions_by_output_name={"result": self.description} if self.description else None,
)
def graph_multi_asset(
*,
outs: Mapping[str, AssetOut],
name: Optional[str] = None,
ins: Optional[Mapping[str, AssetIn]] = None,
partitions_def: Optional[PartitionsDefinition[object]] = None,
group_name: Optional[str] = None,
can_subset: bool = False,
) -> Callable[[Callable[..., Any]], AssetsDefinition]:
"""Create a combined definition of multiple assets that are computed using the same graph of
ops, and the same upstream assets.
Each argument to the decorated function references an upstream asset that this asset depends on.
The name of the argument designates the name of the upstream asset.
Args:
name (Optional[str]): The name of the graph.
outs: (Optional[Dict[str, AssetOut]]): The AssetOuts representing the produced assets.
ins (Optional[Mapping[str, AssetIn]]): A dictionary that maps input names to information
about the input.
partitions_def (Optional[PartitionsDefinition]): Defines the set of partition keys that
compose the assets.
group_name (Optional[str]): A string name used to organize multiple assets into groups. This
group name will be applied to all assets produced by this multi_asset.
can_subset (bool): Whether this asset's computation can emit a subset of the asset
keys based on the context.selected_assets argument. Defaults to False.
"""
def inner(fn: Callable) -> AssetsDefinition:
partition_mappings = {
input_name: asset_in.partition_mapping
for input_name, asset_in in (ins or {}).items()
if asset_in.partition_mapping
}
asset_ins = build_asset_ins(fn, ins or {}, set())
keys_by_input_name = {
input_name: asset_key for asset_key, (input_name, _) in asset_ins.items()
}
asset_outs = build_asset_outs(outs)
op_graph = graph(
name=name or fn.__name__,
out={out_name: GraphOut() for out_name, _ in asset_outs.values()},
)(fn)
# source metadata from the AssetOuts (if any)
metadata_by_output_name = {
output_name: out.metadata
for output_name, out in outs.items()
if isinstance(out, AssetOut) and out.metadata is not None
}
# source freshness policies from the AssetOuts (if any)
freshness_policies_by_output_name = {
output_name: out.freshness_policy
for output_name, out in outs.items()
if isinstance(out, AssetOut) and out.freshness_policy is not None
}
# source descriptions from the AssetOuts (if any)
descriptions_by_output_name = {
output_name: out.description
for output_name, out in outs.items()
if isinstance(out, AssetOut) and out.description is not None
}
return AssetsDefinition.from_graph(
op_graph,
keys_by_input_name=keys_by_input_name,
keys_by_output_name={
output_name: asset_key for asset_key, (output_name, _) in asset_outs.items()
},
partitions_def=partitions_def,
partition_mappings=partition_mappings if partition_mappings else None,
group_name=group_name,
can_subset=can_subset,
metadata_by_output_name=metadata_by_output_name,
freshness_policies_by_output_name=freshness_policies_by_output_name,
descriptions_by_output_name=descriptions_by_output_name,
)
return inner
def build_asset_outs(asset_outs: Mapping[str, AssetOut]) -> Mapping[AssetKey, Tuple[str, Out]]:
"""Creates a mapping from AssetKey to (name of output, Out object)."""
outs_by_asset_key: Dict[AssetKey, Tuple[str, Out]] = {}
for output_name, asset_out in asset_outs.items():
out = asset_out.to_out()
asset_key = asset_out.key or AssetKey(
list(filter(None, [*(asset_out.key_prefix or []), output_name]))
)
outs_by_asset_key[asset_key] = (output_name.replace("-", "_"), out)
return outs_by_asset_key
def _make_asset_keys(deps: Optional[Union[Set[AssetKey], Set[str]]]) -> Optional[Set[AssetKey]]:
"""Convert all str items to AssetKey in the set."""
if deps is None:
return deps
deps_asset_keys = {
AssetKey.from_user_string(dep) if isinstance(dep, str) else dep for dep in deps
}
return deps_asset_keys