To get started, you will need to install the dagster and dagster-airflow Python packages:
pip install dagster dagster-airflow
Step 1: Create a new repository using make_dagster_repo_from_airflow_dags_path#
The first step to migrating is to define a Dagster repository. To do this, we'll use make_dagster_repo_from_airflow_dags_path and pass it the file path of our Airflow Dag Bag. Dagster will then load the DagBag and convert all DAGs into Dagster jobs.
import os
from dagster_airflow import make_dagster_repo_from_airflow_dags_path
migrated_airflow_repo = make_dagster_repo_from_airflow_dags_path(
os.path.join(os.getenv("AIRFLOW_HOME"),"dags"),"migrated_airflow_repo",)
Under the hood, Dagster is running the exact operator code as you were in Airflow. You will be able to view your normal Airflow stdout/stderr logs as compute logs in Dagit.
By default, each job run of your migrated DAGs creates an ephemeral airflow metadatabase scoped to each job run. This means any Airflow connections that your DAG depends on will need to be created. To do this, you can provide a connections parameter to make_dagster_repo_from_airflow_dags_path:
import os
from airflow.models import Connection
from dagster_airflow import make_dagster_repo_from_airflow_dags_path
migrated_airflow_repo = make_dagster_repo_from_airflow_dags_path(
os.path.join(os.getenv("AIRFLOW_HOME"),"dags"),"migrated_airflow_repo",
connections=[
Connection(conn_id="http_default", conn_type="uri", host="https://google.com")],)
If you're running on Dagster Cloud and any of your connection fields are sensitive, you can securely pass them in using environment variables.
Your Airflow instance likely had specific IAM or Kubernetes permissions that allowed it to successfully run your Airflow DAGs. In order to run the migrated Dagster jobs, you'll need to duplicate these permissions for Dagster.