import os
from dagster_airflow import make_dagster_repo_from_airflow_dags_path
migrated_airflow_repo = make_dagster_repo_from_airflow_dags_path(
os.path.join(os.getenv("AIRFLOW_HOME"),"dags"),"migrated_airflow_repo",)
You can orchestrate Dagster job runs from Airflow by using the DagsterCloudOperator or DagsterOperator operators in your existing Airflow DAGs. For example, here's an Airflow DAG:
from datetime import datetime
from airflow import DAG
from dagster_airflow import DagsterCloudOperator
with DAG(
dag_id="dagster_cloud",
start_date=datetime(2022,5,28),
schedule_interval="*/5 * * * *",
catchup=False,)as dag:
DagsterCloudOperator(
task_id="new_dagster_assets",
repostitory_location_name="example_location",
repository_name="my_dagster_project",
job_name="all_assets_job",)
In Airflow 2.0+, you can create a Dagster connection type to store configuration related to your Dagster Cloud organization. If you're using Airflow 1.0, you can also pass this directly to the operator.
You can compile Dagster jobs into DAGs that can be understood by Airflow. Each op in the job becomes an Airflow task. For example, here's a Dagster job:
import csv
import requests
from dagster import job, op
@opdefdownload_cereals():
response = requests.get("https://docs.dagster.io/assets/cereal.csv")
lines = response.text.split("\n")return[row for row in csv.DictReader(lines)]@opdeffind_sugariest(context, cereals):
sorted_by_sugar =sorted(cereals, key=lambda cereal: cereal["sugars"])
context.log.info(f'{sorted_by_sugar[-1]["name"]} is the sugariest cereal')@jobdefhello_cereal_job():
find_sugariest(download_cereals())
To make this job available inside Airflow, you can write an Airflow DAG definition file that invokes make_airflow_dag:
If you run this code interactively, you'll see that dag and tasks are ordinary Airflow objects, just as you'd expect to see when defining an Airflow pipeline manually:
>>> dag
<DAG: hello_cereal_job>>>> tasks
[<Task(DagsterPythonOperator): hello_cereal>]
Like other Airflow DAG definition files, this should go inside $AIRLFLOW_HOME/dags. The docs_snippets.integrations.airflow.hello_cereal module that's passed as the value for the module_name argument must be importable via the sys.path.
After this, the DAG should show up inside Airflow:
The approach above runs each op inside an operator that's similar to the Airflow PythonOperator. If you instead want to containerize your Dagster job and run it using an operator that's similar to the Airflow DockerOperator, you can use make_airflow_dag_containerized.
As in the uncontainerized case, you'll put a new Python file defining your DAG in the directory in which Airflow looks for DAGs:
The image argument is the name of the Docker image. Running in a containerized context requires a persistent intermediate storage layer available to the Dagster containers, such as a network filesystem, S3, or GCS. You can pass op_kwargs through to the the DagsterDockerOperator to use custom TLS settings, the private registry of your choice, etc., just as you would configure the ordinary Airflow DockerOperator.
If you want your containerized job to be available to Airflow operators running on other machines - for example, in environments where Airflow workers are running remotely - you'll need to push your Docker image to a Docker registry so that remote instances of Docker can pull the image by name, or otherwise ensure that the image is available on remote nodes.
Dagster can convert Airflow operators to Dagster ops for any Airflow operators that require no instance-wide configuration via airflow.cfg. Using airflow_operator_to_op, you can convert instantiated Airflow operators to be executed within a Dagster op. Optionally, you can pass in a list of Airflow connection objects utilized by the operator:
This example demonstrates how to use make_dagster_job_from_airflow_dag to compile an Airflow DAG into a Dagster job that works the same way as a Dagster-native job.
There are three jobs in the repo:
airflow_simple_dag demonstrates the use of Airflow templates.
airflow_complex_dag shows the translation of a more complex dependency structure.
airflow_kubernetes_dag shows the translation of a DAG using Kubernetes pod operators.
from dagster_airflow import(
make_dagster_job_from_airflow_dag,
make_dagster_repo_from_airflow_dags_path,
make_dagster_repo_from_airflow_example_dags,)from with_airflow.airflow_complex_dag import complex_dag
from with_airflow.airflow_kubernetes_dag import kubernetes_dag
from with_airflow.airflow_simple_dag import simple_dag
airflow_simple_dag = make_dagster_job_from_airflow_dag(simple_dag)
airflow_complex_dag = make_dagster_job_from_airflow_dag(complex_dag)
airflow_kubernetes_dag = make_dagster_job_from_airflow_dag(kubernetes_dag, mock_xcom=True)@repositorydefwith_airflow():return[airflow_complex_dag, airflow_simple_dag, airflow_kubernetes_dag]
Note that the execution_date for the Airflow DAG is specified through the job tags. To specify tags, call to: