Source code for dagster._core.definitions.decorators.sensor_decorator

import collections.abc
import inspect
from functools import update_wrapper
from typing import Callable, Optional, Sequence, Set, Union

import dagster._check as check
from dagster._annotations import experimental
from dagster._core.definitions.asset_selection import AssetSelection

from ...errors import DagsterInvariantViolationError
from ..asset_sensor_definition import AssetSensorDefinition
from ..events import AssetKey
from ..multi_asset_sensor_definition import (
    AssetMaterializationFunction,
    MultiAssetMaterializationFunction,
    MultiAssetSensorDefinition,
)
from ..sensor_definition import (
    DefaultSensorStatus,
    RawSensorEvaluationFunction,
    RunRequest,
    SensorDefinition,
    SkipReason,
)
from ..target import ExecutableDefinition


[docs]def sensor( job_name: Optional[str] = None, *, name: Optional[str] = None, minimum_interval_seconds: Optional[int] = None, description: Optional[str] = None, job: Optional[ExecutableDefinition] = None, jobs: Optional[Sequence[ExecutableDefinition]] = None, default_status: DefaultSensorStatus = DefaultSensorStatus.STOPPED, asset_selection: Optional[AssetSelection] = None, required_resource_keys: Optional[Set[str]] = None, ) -> Callable[[RawSensorEvaluationFunction], SensorDefinition]: """Creates a sensor where the decorated function is used as the sensor's evaluation function. The decorated function may: 1. Return a `RunRequest` object. 2. Return a list of `RunRequest` objects. 3. Return a `SkipReason` object, providing a descriptive message of why no runs were requested. 4. Return nothing (skipping without providing a reason) 5. Yield a `SkipReason` or yield one or more `RunRequest` objects. Takes a :py:class:`~dagster.SensorEvaluationContext`. Args: name (Optional[str]): The name of the sensor. Defaults to the name of the decorated function. minimum_interval_seconds (Optional[int]): The minimum number of seconds that will elapse between sensor evaluations. description (Optional[str]): A human-readable description of the sensor. job (Optional[Union[GraphDefinition, JobDefinition, UnresolvedAssetJobDefinition]]): The job to be executed when the sensor fires. jobs (Optional[Sequence[Union[GraphDefinition, JobDefinition, UnresolvedAssetJobDefinition]]]): (experimental) A list of jobs to be executed when the sensor fires. default_status (DefaultSensorStatus): Whether the sensor starts as running or not. The default status can be overridden from Dagit or via the GraphQL API. asset_selection (AssetSelection): (Experimental) an asset selection to launch a run for if the sensor condition is met. This can be provided instead of specifying a job. """ check.opt_str_param(name, "name") def inner(fn: RawSensorEvaluationFunction) -> SensorDefinition: check.callable_param(fn, "fn") sensor_def = SensorDefinition( name=name, job_name=job_name, evaluation_fn=fn, minimum_interval_seconds=minimum_interval_seconds, description=description, job=job, jobs=jobs, default_status=default_status, asset_selection=asset_selection, required_resource_keys=required_resource_keys, ) update_wrapper(sensor_def, wrapped=fn) return sensor_def return inner
[docs]def asset_sensor( asset_key: AssetKey, *, job_name: Optional[str] = None, name: Optional[str] = None, minimum_interval_seconds: Optional[int] = None, description: Optional[str] = None, job: Optional[ExecutableDefinition] = None, jobs: Optional[Sequence[ExecutableDefinition]] = None, default_status: DefaultSensorStatus = DefaultSensorStatus.STOPPED, ) -> Callable[[AssetMaterializationFunction,], AssetSensorDefinition,]: """Creates an asset sensor where the decorated function is used as the asset sensor's evaluation function. The decorated function may: 1. Return a `RunRequest` object. 2. Return a list of `RunRequest` objects. 3. Return a `SkipReason` object, providing a descriptive message of why no runs were requested. 4. Return nothing (skipping without providing a reason) 5. Yield a `SkipReason` or yield one or more `RunRequest` objects. Takes a :py:class:`~dagster.SensorEvaluationContext` and an EventLogEntry corresponding to an AssetMaterialization event. Args: asset_key (AssetKey): The asset_key this sensor monitors. name (Optional[str]): The name of the sensor. Defaults to the name of the decorated function. minimum_interval_seconds (Optional[int]): The minimum number of seconds that will elapse between sensor evaluations. description (Optional[str]): A human-readable description of the sensor. job (Optional[Union[GraphDefinition, JobDefinition, UnresolvedAssetJobDefinition]]): The job to be executed when the sensor fires. jobs (Optional[Sequence[Union[GraphDefinition, JobDefinition, UnresolvedAssetJobDefinition]]]): (experimental) A list of jobs to be executed when the sensor fires. default_status (DefaultSensorStatus): Whether the sensor starts as running or not. The default status can be overridden from Dagit or via the GraphQL API. Example: .. code-block:: python from dagster import AssetKey, EventLogEntry, SensorEvaluationContext, asset_sensor @asset_sensor(asset_key=AssetKey("my_table"), job=my_job) def my_asset_sensor(context: SensorEvaluationContext, asset_event: EventLogEntry): return RunRequest( run_key=context.cursor, run_config={ "ops": { "read_materialization": { "config": { "asset_key": asset_event.dagster_event.asset_key.path, } } } }, ) """ check.opt_str_param(name, "name") def inner(fn: AssetMaterializationFunction) -> AssetSensorDefinition: check.callable_param(fn, "fn") sensor_name = name or fn.__name__ def _wrapped_fn(context, event): result = fn(context, event) if inspect.isgenerator(result) or isinstance(result, list): for item in result: yield item elif isinstance(result, (RunRequest, SkipReason)): yield result elif result is not None: raise DagsterInvariantViolationError( ( "Error in sensor {sensor_name}: Sensor unexpectedly returned output " "{result} of type {type_}. Should only return SkipReason or " "RunRequest objects." ).format(sensor_name=sensor_name, result=result, type_=type(result)) ) return AssetSensorDefinition( name=sensor_name, asset_key=asset_key, job_name=job_name, asset_materialization_fn=_wrapped_fn, minimum_interval_seconds=minimum_interval_seconds, description=description, job=job, jobs=jobs, default_status=default_status, ) return inner
[docs]@experimental def multi_asset_sensor( monitored_assets: Union[Sequence[AssetKey], AssetSelection], *, job_name: Optional[str] = None, name: Optional[str] = None, minimum_interval_seconds: Optional[int] = None, description: Optional[str] = None, job: Optional[ExecutableDefinition] = None, jobs: Optional[Sequence[ExecutableDefinition]] = None, default_status: DefaultSensorStatus = DefaultSensorStatus.STOPPED, request_assets: Optional[AssetSelection] = None, ) -> Callable[[MultiAssetMaterializationFunction,], MultiAssetSensorDefinition,]: """Creates an asset sensor that can monitor multiple assets. The decorated function is used as the asset sensor's evaluation function. The decorated function may: 1. Return a `RunRequest` object. 2. Return a list of `RunRequest` objects. 3. Return a `SkipReason` object, providing a descriptive message of why no runs were requested. 4. Return nothing (skipping without providing a reason) 5. Yield a `SkipReason` or yield one or more `RunRequest` objects. Takes a :py:class:`~dagster.MultiAssetSensorEvaluationContext`. Args: monitored_assets (Union[Sequence[AssetKey], AssetSelection]): The assets this sensor monitors. If an AssetSelection object is provided, it will only apply to assets within the Definitions that this sensor is part of. name (Optional[str]): The name of the sensor. Defaults to the name of the decorated function. minimum_interval_seconds (Optional[int]): The minimum number of seconds that will elapse between sensor evaluations. description (Optional[str]): A human-readable description of the sensor. job (Optional[Union[GraphDefinition, JobDefinition, UnresolvedAssetJobDefinition]]): The job to be executed when the sensor fires. jobs (Optional[Sequence[Union[GraphDefinition, JobDefinition, UnresolvedAssetJobDefinition]]]): (experimental) A list of jobs to be executed when the sensor fires. default_status (DefaultSensorStatus): Whether the sensor starts as running or not. The default status can be overridden from Dagit or via the GraphQL API. request_assets (Optional[AssetSelection]): (Experimental) an asset selection to launch a run for if the sensor condition is met. This can be provided instead of specifying a job. """ check.opt_str_param(name, "name") if not isinstance(monitored_assets, AssetSelection) and not ( isinstance(monitored_assets, collections.abc.Sequence) and all(isinstance(el, AssetKey) for el in monitored_assets) ): check.failed( "The value passed to monitored_assets param must be either an AssetSelection" f" or a Sequence of AssetKeys, but was a {type(monitored_assets)}" ) def inner(fn: MultiAssetMaterializationFunction) -> MultiAssetSensorDefinition: check.callable_param(fn, "fn") sensor_name = name or fn.__name__ sensor_def = MultiAssetSensorDefinition( name=sensor_name, monitored_assets=monitored_assets, job_name=job_name, asset_materialization_fn=fn, minimum_interval_seconds=minimum_interval_seconds, description=description, job=job, jobs=jobs, default_status=default_status, request_assets=request_assets, ) update_wrapper(sensor_def, wrapped=fn) return sensor_def return inner