This library provides a Dagster integration with Airflow.
For more information on getting started, see the Airflow integration guide.
Construct a Dagster repository corresponding to Airflow DAGs in dag_path.
Create make_dagster_definitions.py
:
from dagster_airflow import make_dagster_definitions_from_airflow_dags_path
def make_definitions_from_dir():
return make_dagster_definitions_from_airflow_dags_path(
'/path/to/dags/',
)
Use RepositoryDefinition as usual, for example:
dagit -f path/to/make_dagster_repo.py -n make_repo_from_dir
dag_path (str) – Path to directory or file that contains Airflow Dags
include_examples (bool) – True to include Airflow’s example DAGs. (default: False)
safe_mode (bool) – True to use Airflow’s default heuristic to find files that contain DAGs (ie find files that contain both b’DAG’ and b’airflow’) (default: True)
connections (List[Connection]) – List of Airflow Connections to be created in the Airflow DB
Definitions
Construct a Dagster definition corresponding to Airflow DAGs in DagBag.
from dagster_airflow import make_dagster_definition_from_airflow_dag_bag from airflow_home import my_dag_bag
return make_dagster_definition_from_airflow_dag_bag(my_dag_bag)
dagit -f path/to/make_dagster_definition.py
dag_bag (DagBag) – Airflow DagBag Model
connections (List[Connection]) – List of Airflow Connections to be created in the Airflow DB
Definitions
Construct Dagster Schedules and Jobs corresponding to Airflow DagBag.
dag_bag (DagBag) – Airflow DagBag Model
connections (List[Connection]) – List of Airflow Connections to be created in the Airflow DB
The generated Dagster Schedules - List[JobDefinition]: The generated Dagster Jobs
List[ScheduleDefinition]
Construct a Dagster job corresponding to a given Airflow DAG.
Tasks in the resulting job will execute the execute()
method on the corresponding
Airflow Operator. Dagster, any dependencies required by Airflow Operators, and the module
containing your DAG definition must be available in the Python environment within which your
Dagster solids execute.
To set Airflow’s execution_date
for use with Airflow Operator’s execute()
methods,
either:
time (in UTC) of the run.
{'airflow_execution_date': utc_date_string}
to the job tags. This will overridebehavior from (1).
my_dagster_job = make_dagster_job_from_airflow_dag(
dag=dag,
tags={'airflow_execution_date': utc_execution_date_str}
)
my_dagster_job.execute_in_process()
{'airflow_execution_date': utc_date_string}
to the run tags,such as in the Dagit UI. This will override behavior from (1) and (2)
We apply normalized_name() to the dag id and task ids when generating job name and op names to ensure that names conform to Dagster’s naming conventions.
dag (DAG) – The Airflow DAG to compile into a Dagster job
tags (Dict[str, Field]) – Job tags. Optionally include tags={‘airflow_execution_date’: utc_date_string} to specify execution_date used within execution of Airflow Operators.
connections (List[Connection]) – List of Airflow Connections to be created in the Ephemeral Airflow DB, if use_emphemeral_airflow_db is False this will be ignored.
The generated Dagster job
[Experimental] Construct Dagster Assets for a given Airflow DAG.
dag (DAG) – The Airflow DAG to compile into a Dagster job
task_ids_by_asset_key (Optional[Mapping[AssetKey, AbstractSet[str]]]) – A mapping from asset keys to task ids. Used break up the Airflow Dag into multiple SDAs
upstream_dependencies_by_asset_key (Optional[Mapping[AssetKey, AbstractSet[AssetKey]]]) – A mapping from upstream asset keys to assets provided in task_ids_by_asset_key. Used to declare new upstream SDA depenencies.
connections (List[Connection]) – List of Airflow Connections to be created in the Airflow DB
List[AssetsDefinition]
Creates a Dagster resource that provides an ephemeral Airflow database.
connections (List[Connection]) – List of Airflow Connections to be created in the Airflow DB
The ephemeral Airflow DB resource
Creates a Dagster resource that provides an persistent Airflow database.
from dagster_airflow import (
make_dagster_definitions_from_airflow_dags_path,
make_persistent_airflow_db_resource,
)
postgres_airflow_db = "postgresql+psycopg2://airflow:airflow@localhost:5432/airflow"
airflow_db = make_persistent_airflow_db_resource(uri=postgres_airflow_db)
definitions = make_dagster_definitions_from_airflow_example_dags(
'/path/to/dags/',
resource_defs={"airflow_db": airflow_db}
)
uri – SQLAlchemy URI of the Airflow DB to be used
connections (List[Connection]) – List of Airflow Connections to be created in the Airflow DB
The persistent Airflow DB resource
DagsterCloudOperator.
Uses the dagster cloud graphql api to run and monitor dagster jobs on dagster cloud
repository_name (str) – the name of the repository to use
repostitory_location_name (str) – the name of the repostitory location to use
job_name (str) – the name of the job to run
run_config (Optional[Dict[str, Any]]) – the run config to use for the job run
dagster_conn_id (Optional[str]) – the id of the dagster connection, airflow 2.0+ only
organization_id (Optional[str]) – the id of the dagster cloud organization
deployment_name (Optional[str]) – the name of the dagster cloud deployment
user_token (Optional[str]) – the dagster cloud user token to use
DagsterOperator.
Uses the dagster graphql api to run and monitor dagster jobs on remote dagster infrastructure
repository_name (str) – the name of the repository to use
repostitory_location_name (str) – the name of the repostitory location to use
job_name (str) – the name of the job to run
run_config (Optional[Dict[str, Any]]) – the run config to use for the job run
dagster_conn_id (Optional[str]) – the id of the dagster connection, airflow 2.0+ only
organization_id (Optional[str]) – the id of the dagster cloud organization
deployment_name (Optional[str]) – the name of the dagster cloud deployment
user_token (Optional[str]) – the dagster cloud user token to use