Automating your data pipelines#

In this guide, we’ll walk you through how to run your data pipelines without manual intervention, i.e. automate, and identify the Dagster tools to make that happen.

This guide assumes you have some familiarity with several Dagster concepts, including software-defined assets and jobs.

Dagster offers a few different ways of automating data pipelines, and choosing the right Dagster tool depends on your specific needs. In this guide we'll walk through a few different cases, and which Dagster tool you should use in each case.

Things to think through#

When thinking about automating your data pipelines, it’s helpful to think through how and when your data needs to be refreshed. A few helpful things to think through:

  • How often does my data need to be refreshed? Does all data need to be refreshed at the same frequency?
  • How is my data split up? Do older records need updating?
  • Am I waiting for some data that needs to trigger downstream updates?
  • Do I want it to run every time a new file is added or update data in batches?

My data should be updated every morning#

You want some data to be updated every day by a certain time to refresh a dashboard, have updated data go to to a system or you want your database to have the most recent information for the team to work with. This is one of the most traditional cases when building a pipeline. Let's go into how to do this with Dagster!

I want a job to start at a specific time#

In Dagster, we have basic scheduling capabilities, which means that you say how often and when you want a job to run, and it will run. This can mean daily, weekly, or hourly. Dagster also supports any type of cron scheduling.

from dagster import AssetSelection, define_asset_job, ScheduleDefinition

asset_job = define_asset_job("asset_job", AssetSelection.groups("some_asset_group"))

basic_schedule = ScheduleDefinition(job=asset_job, cron_schedule="0 9 * * *")

You can also use the @schedule decorator if you want to provide custom run config and tags. For example, if you want to pass a parameter to the job at runtime, for example, activity = ‘party’ on the weekend and activity=’grind’ on weekdays, you can use the configuration to pass parameters in.

# sets the schedule to be updated everyday at 9:00 AM
@schedule(job=configurable_job, cron_schedule="0 9 * * *")
def configurable_job_schedule(context: ScheduleEvaluationContext):
    if context.scheduled_execution_time.weekday() < 5:
        activity_selection = "grind"
    else:
        activity_selection = "party"
    return RunRequest(
        run_config={
            "ops": {"configurable_op": {"config": {"activity": activity_selection}}}
        }
    )

I want my data to be updated at different cadences#

Let’s say the CFO wants a report updated by 9:00 AM so they can drink their coffee and review the latest numbers. The data comes from a number of systems and are all part of your DAG.

  • Some of the data, such as last quarter’s earnings report to investors, only need to be updated every 3 months.
  • Other data, such as the most recent revenue, needs to be updated every day and the CFO wants to see how the company did yesterday.

Dagster’s declarative scheduling allows you to set a freshness policy. The decision to use declarative scheduling and freshness policies is not a purely practical decision but also a mindset change compared to imperative workflow processes.

For example, using a 30 minute freshness policy, the asset would incorporate all data that arrived 30 minutes before that time.

This approach allows you to think about each asset’s freshness requirements individually. Then, Dagster figures out how to get the report updated by 9:00AM every day for the CFO.

In the following example, we set the freshness policy to instruct the sales asset not to be more than 60 minutes old by 9:00 AM. The sales data will prompt the upstream transactions asset to materialize to stay within the freshness policy.

We also have an asset for the expenses, which also depends on the transactions asset. This asset has a freshness policy of 120 minutes, or two hours at any time, since it did not specify a schedule. The expenses asset will require the transactions data to be refreshed in order to comply with the freshness policy.

@asset
def transactions():
    pass


@asset(
    freshness_policy=FreshnessPolicy(maximum_lag_minutes=60, cron_schedule="0 9 * * *")
)
def sales(transactions):
    pass


@asset(freshness_policy=FreshnessPolicy(maximum_lag_minutes=120))
def expenses(transactions):
    pass


update_sensor = build_asset_reconciliation_sensor(
    name="update_sensor", asset_selection=AssetSelection.all()
)

defs = Definitions(assets=[transactions, sales, expenses], sensors=[update_sensor])

My data should be updated every time something happens#

I want my data to be updated when an event happens#

Let’s say you have sales pipeline data that you want updated every time a customer submits an RFP on your website. Dagster offers sensors that allow you to kick off jobs based on some external change, such as a new file in an S3 bucket, if some other asset is updated, or if a system is down.

from dagster_aws.s3.sensor import get_s3_keys


@sensor(job=log_file_job)
def my_s3_sensor(context):
    since_key = context.cursor or None
    new_s3_keys = get_s3_keys("my_s3_rfp_bucket", since_key=since_key)
    if not new_s3_keys:
        return SkipReason("No new s3 files found for bucket my_s3_rfp_bucket.")
    last_key = new_s3_keys[-1]
    run_requests = [RunRequest(run_key=s3_key, run_config={}) for s3_key in new_s3_keys]
    context.update_cursor(last_key)
    return run_requests

I want some assets to materialize every time other assets materialize#

If every time you materialize Airbyte assets you want to execute a dbt asset, asset sensors can be used to automate that process. Asset sensors can be used across code locations, so if you are managing some Airbyte assets in one code location and dbt assets in another, you can use asset sensors to trigger jobs to run.

Dagster provides reconciliation sensors, a type of asset sensor, that automatically materialize assets if they are ‘unreconciled’. One of the ways assets are considered unreconciled if any of the asset’s upstream dependencies have been materialized more recently than it has.

In the example below, the sensor will materialize the dbt assets when the parent Airbtye assets materialize.

defs = Definitions(
    assets=[airbyte_asset1, airbyte_asset2, dbt_asset1, dbt_asset2],
    sensors=[
        build_asset_reconciliation_sensor(
            asset_selection=AssetSelection.assets(dbt_asset1, dbt_asset2),
            name="asset_reconciliation_sensor",
        ),
    ],
)

Some of my data should be updated, but not everything#

Let’s say you have a database with all your credit card transactions. When a return occurs, the original purchase record doesn’t change last week’s data, but a new transaction showing the return is added to the current day’s information.

In this case, you want your data pipeline to include all the latest data, but it doesn’t make sense to continuously process data from two years or two days ago when you know it hasn’t changed. What you want is to process yesterday’s data and not waste compute on the rest of the data.

Dagster partitions do just that. A partition is a slice of your data; in this case, each partition represents a day of transactions. You can partition or ‘split’ your data based on whatever makes the most sense, by one or multiple dimensions.

Partitions can be used with both schedules and sensors. You can use schedules to kick off a partitioned job to update slices of data. You can also trigger runs once a specific partition has been updated using asset sensors based on partitioned materializations.


Dagster automation cheat sheet#

Use this table as a quick reference when building out your data pipelines!

NameGood for automating data pipelines that need to be refreshed:Asset SupportOp/Graph Support
Basic (cron) schedulingby kicking off updates at a predictable time interval
Sensorswhen a specific event occurs
Asset sensorsafter an asset or set of assets materialize
Asset reconciliation sensorswhen assets are out of date with other assets
Asset reconciliation sensors w/freshness policeswhen assets are beyond an acceptable lag time