Using Databricks with Dagster#

Dagster can orchestrate your Databricks jobs and other Databricks API calls, making it easy to chain together multiple Databricks jobs, as well as orchestrate Databricks alongside your other technologies.


Prerequisites#

To get started, you will need to install the dagster and dagster-databricks Python packages:

pip install dagster dagster-databricks

You'll also want to have a Databricks workspace with an existing project that is deployed with a Databricks job. If you don't have one already, you can follow the Databricks quickstart to set one up.

To manage your Databricks job from Dagster, you'll need three values, which can be set as environment variables in Dagster:

  1. A host for connecting with your Databricks workspace, starting with https://, stored in an environment variable DATABRICKS_HOST,
  2. A token corresponding to a personal access token for your Databricks workspace, stored in an environment variable DATABRICKS_TOKEN, and
  3. A DATABRICKS_JOB_ID for the Databricks job you want to run.

You can follow the Databricks API authentication instructions to retrieve these values.


Step 1: Connecting to Databricks#

The first step in using Databricks with Dagster is to tell Dagster how to connect to your Databricks workspace using a Databricks resource. This resource contains information on where your Databricks workspace is located and any credentials sourced from environment variables that are needed to access it. By configuring the resource, you can access the underlying Databricks API client to communicate to your Databricks workspace.

For more information about the Databricks resource, see the API reference.

from dagster_databricks import databricks_client

databricks_client_instance = databricks_client.configured(
    {
        "host": {"env": "DATABRICKS_HOST"},
        "token": {"env": "DATABRICKS_TOKEN"},
    }
)

Step 2: Create an op/asset that connects to Databricks#

In this step, we show several ways to model a Databricks API call as either a Dagster op, or as the computation backing a software-defined asset. You can either:

Afterwards, we create a Dagster job that either invokes the op or selects the asset in order to run the Databricks API call.

For more information on how to decide whether to use an op or asset, see our guide to understand how they relate.

from dagster_databricks import create_databricks_run_now_op

my_databricks_run_now_op = create_databricks_run_now_op(
    databricks_job_id=DATABRICKS_JOB_ID,
)

@job(resource_defs={"databricks": databricks_client_instance})
def my_databricks_job():
    my_databricks_run_now_op()

Step 3: Schedule your Databricks computation#

Now that your Databricks API calls are modeled within Dagster, you can schedule it to run on a regular cadence.

In the example below, we schedule the materialize_databricks_table job to run daily, which materiali, and the my_databricks_job job to run daily.

from dagster import (
    AssetSelection,
    Definitions,
    ScheduleDefinition,
)

defs = Definitions(
    assets=[my_databricks_table],
    schedules=[
        ScheduleDefinition(
            job=materialize_databricks_table,
            cron_schedule="@daily",
        ),
        ScheduleDefinition(
            job=my_databricks_job,
            cron_schedule="@daily",
        ),
    ],
    jobs=[my_databricks_job],
    resources={"databricks": databricks_client_instance},
)

What's next?#

By now, you should have a working Databricks and Dagster integration!

What's next? From here, you can: