This guide acts as an introduction to Dagster resources utilizing the new Pythonic resources API layer, which makes defining and using Dagster resources easier.
Resources are objects that are shared across the implementations of multiple software-defined assets and ops and that can be plugged in after defining those ops and assets.
Resources typically model external components that assets and ops interact with. For example, a resource might be a connection to a data warehouse like Snowflake or a service like Slack.
So, why use resources?
Plug in different implementations in different environments - If you have a heavy external dependency that you want to use in production, but avoid using in testing, you can accomplish this by providing different resources in each environment. Check out Separating Business Logic from Environments for more info about this capability.
Surface configuration in the UI - Resources and their configuration are surfaced in the Dagster UI, making it easy to see where your resources are used and how they are configured.
Share configuration across multiple ops or assets - Resources are configurable and shared, so you can supply configuration in one place instead of configuring the ops and assets individually.
Share implementations across multiple ops or assets - When multiple ops access the same external services, resources provide a standard way to structure your code to share the implementations.
Typically, resources are defined by subclassing ConfigurableResource. Attributes on the class are used to define the resource's configuration schema. The configuration system has a few advantages over plain Python parameter passing; configured values are displayed in the Dagster UI and can be set dynamically using environment variables.
Assets and ops specify resource dependencies by annotating the resource as a parameter to the asset or op function.
To provide resource values to your assets and ops, attach them to your Definitions call. These resources are automatically passed to the function at runtime.
Here, we define a subclass of ConfigurableResource representing a connection to an external service. We can configure the resource by constructing it in the Definitions call.
We can define methods on the resource class which depend on config values. These methods can be used by assets and ops.
Here, we define a subclass of ConfigurableResource representing a connection to an external service. We can configure the resource by constructing it in the Definitions call.
We can define methods on the resource class which depend on config values. These methods can be used by assets and ops.
Resources can be configured using environment variables, which is useful for secrets or other environment-specific configuration. If you're using Dagster Cloud, environment variables can be configured directly in the UI.
To use environment variables, pass an EnvVar when constructing your resource.
In some cases, you may want to specify configuration for a resource at runtime, in the launchpad or in a RunRequest for a schedule or sensor. For example, you may want a sensor-triggered run to specify a different target table in a database resource for each run.
You can use the configure_at_launch() method to defer the construction of a configurable resource until runtime.
In some situations, you may want to define a resource which depends on other resources. This is useful for common configuration. For example, separate resources for a database and for a filestore may both depend on credentials for a particular cloud provider. Defining these credentials as a separate, nested resource allows you to specify configuration in a single place. It also makes it easier to test your resources, since you can mock the nested resource.
In this case, you can list that nested resource as an attribute of your resource class.
If we instead would like the configuration for our credentials to be provided at runtime, we can use the configure_at_launch() method to defer the construction of the CredentialsResource until runtime.
Because credentials requires runtime configuration through the launchpad, it must also be passed to the Definitions object, so that configuration can be provided at runtime. Nested resources only need to be passed to the Definitions object if they require runtime configuration.
Pythonic I/O managers are defined as subclasses of ConfigurableIOManager, and similarly to Pythonic resources specify any configuration fields as attributes. Each subclass must implement a handle_output and load_input method, which are called Dagster at runtime to handle the storing and loading of data.
The adapter subclass should list all config fields as attributes and implement a wrapped_resource property which returns the underlying resource definition to adapt.
from dagster import(
resource,
Definitions,
ResourceDefinition,
asset,
ConfigurableLegacyResourceAdapter,)# Old code, interface cannot be changed for back-compat purposesclassWriter:def__init__(self, prefix:str):
self._prefix = prefix
defoutput(self, text:str)->None:print(self._prefix + text)@resource(config_schema={"prefix":str})defwriter_resource(context):
prefix = context.resource_config["prefix"]return Writer(prefix)# New adapter layerclassWriterResource(ConfigurableLegacyResourceAdapter):
prefix:str@propertydefwrapped_resource(self)-> ResourceDefinition:return writer_resource
@assetdefmy_asset(writer: Writer):
writer.output("hello, world!")
defs = Definitions(
assets=[my_asset], resources={"writer": WriterResource(prefix="greeting: ")})
Similar to with resources, in codebases that utilize function-style I/O managers with the @io_manager decorator, you can use the ConfigurableIOManagerAdapter class to adapt them to the new Pythonic I/O manager pattern.
The adapter subclass should list all config fields as attributes and implement a wrapped_io_manager property which returns the underlying I/O manager definition to adapt.
from dagster import(
Definitions,
IOManagerDefinition,
io_manager,
IOManager,
InputContext,
ConfigurableLegacyIOManagerAdapter,
OutputContext,)import os
# Old code, interface cannot be changed for back-compat purposesclassOldFileIOManager(IOManager):def__init__(self, base_path:str):
self.base_path = base_path
defhandle_output(self, context: OutputContext, obj):withopen(
os.path.join(self.base_path, context.step_key, context.name),"w")as fd:
fd.write(obj)defload_input(self, context: InputContext):withopen(
os.path.join(
self.base_path,
context.upstream_output.step_key,
context.upstream_output.name,),"r",)as fd:return fd.read()@io_manager(config_schema={"base_path":str})defold_file_io_manager(context):
base_path = context.resource_config["base_path"]return OldFileIOManager(base_path)# New adapter layerclassMyIOManager(ConfigurableLegacyIOManagerAdapter):
base_path:str@propertydefwrapped_io_manager(self)-> IOManagerDefinition:return old_file_io_manager
defs = Definitions(
assets=...,
resources={"io_manager": MyIOManager(base_path="/tmp/"),},)
In some cases, you may want to use a bare Python object as a resource which is not a subclass of ConfigurableResource.
For example, you may want to directly pass a third-party API client into your assets or ops. This follows a similar pattern to using a ConfigurableResource subclass, however assets and ops which use these resources must annotate them with Resource.
from dagster import Definitions, asset, Resource
# `Resource[GitHub]` is treated exactly like `GitHub` for type checking purposes,# and the runtime type of the github parameter is `GitHub`. The purpose of the# `Resource` wrapper is to let Dagster know that `github` is a resource and not an# upstream asset.@assetdefpublic_github_repos(github: Resource[GitHub]):return github.organization("dagster-io").repositories()
defs = Definitions(
assets=[public_github_repos],
resources={"github": GitHub(...)},)
If you would like a Pythonic resource to depend on a bare Python object, you can use the ResourceDependency annotation to annotate the attribute in question. This indicates to Dagster that the attribute should be treated as a resource dependency.
Resources defined by subclassing ConfigurableResource can take various unresolved inputs, including EnvVars instead of string values or unconfigured resources returned by configure_at_launch() in place of configured resources.
Between construction time and runtime, the ConfigurableResource class resolves all of its attributes to their final values, meaning that when an attribute of a resource is accessed in the body of an asset or op, it will always be a fully resolved value.
In the following example, the FileStoreBucket constructor will accept the still-unconfigured unconfigured_credentials_resource, which will be resolved once the resource is configured at runtime.
from dagster import ConfigurableResource
classCredentialsResource(ConfigurableResource):
username:str
password:strclassFileStoreBucket(ConfigurableResource):
credentials: CredentialsResource
region:strdefwrite(self, data:str):# In this context, `self.credentials` is ensured to# be a CredentialsResource with valid values for# `username` and `password`
get_filestore_client(
username=self.credentials.username,
password=self.credentials.password,
region=self.region,).write(data)# unconfigured_credentials_resource is typed as PartialResource[CredentialsResource]
unconfigured_credentials_resource = CredentialsResource.configure_at_launch()# FileStoreBucket constructor accepts either a CredentialsResource or a# PartialResource[CredentialsResource] for the `credentials` argument
bucket = FileStoreBucket(
credentials=unconfigured_credentials_resource,
region="us-east-1",)