Airflow

Apache Airflow™ is an open-source platform for developing, scheduling, and monitoring batch-oriented workflows. Airflow’s extensible Python framework enables you to build workflows connecting with virtually any technology.

The Metaplane Airflow integration adds visibility into Airflow DAG performance and warehouse lineage originating from Airflow DAGs. The Airflow integration can be set up with just a few steps.

Create Airflow connection in Metaplane

Navigate to the connections page, click on "Airflow", then follow the prompt. Metaplane will associate all Airflow DAGs and Tasks with this connection. Set the "Host URL" to the URL of your Airflow UI. This will ensure we can link to your DAGs from Metaplane.

Once the Airflow connection has been created, please take note of the connection ID which can be copied from the URL. It will look something like: app.metaplane.dev/s/<your connectionId UUID>

DAG and Task runtime monitoring

Metaplane can monitor Airflow DAG and Task runtimes so users are alerted when DAGs take longer than expected to run. Follow the below instructions to set up Airflow Callbacks to send Airflow data to Metaplane when Airflow DAGs and Tasks complete. Once enabled, Metaplane will automatically create monitors to track your DAG runtimes. Metaplane will not automatically create monitors for your Task runtimes to avoid noisy alerting. If you would like to enable Task-level monitoring, you can do so from the app.

1. Install airflow-metaplane package in Airflow

Add the airflow-metaplane package to your Airflow installation. Please note that installing a new package will require a restart of your Airflow cluster.

2. Create Metaplane connection in Airflow

Once the airflow-metaplane package is installed and Airflow has been restarted, you can define a Metaplane connection either through the Airflow UI or as an environment variable.

Using the Airflow UI

Navigate to the Admin -> Connections page in your Airflow UI.

Add a new connection of type Metaplane.

Connection Id: Set the Connection Id to metaplane_default.

API key: Use an existing API key or follow these instructions to generate a new API key.

Metaplane Airflow connection ID: Paste the connection ID copied from the Airflow connection you created in Metaplane.

Using an environment variable

export AIRFLOW_CONN_METAPLANE_DEFAULT='{
    "conn_type": "metaplane",
    "password": <api key>,
    "extra": {
        "mp_airflow_connection_id": <Airflow connection id in Metaplane copied from the first step>
    }
}'

Custom connection name

Both methods above assume that the Metaplane connection id in Airflow is named metaplane_default. If you would like to give your Metaplane connection a custom name, you can do so but be sure to pass the custom name into the DAG params dictionary with the key metaplane_connection_id. See below for an example:

DAG(
  "name",
  ...
  params={"metaplane_connection_id": <custom connection id>},
) 

3. Add callbacks to DAGs

Metaplane takes advantage of the callback system in Airflow to track DAG and task run information. These callbacks are available in mp_callbacks in the airflow_metaplane module. The relevant import is below:

from airflow_metaplane import mp_callbacks

DAG with no existing callbacks

If your DAG does not have any existing callbacks, adding Metaplane callbacks is as simple as passing through mp_callbacks.dag_callbacks and mp_callbacks.task_callbacks in the DAG definition:

from airflow_metaplane import mp_callbacks

DAG(
  "name",
  ...
  default_args={
    ...
    **mp_callbacks.task_callbacks
  },
  **mp_callbacks.dag_callbacks
) 

DAG with existing callbacks

If you are already using callbacks with your DAG, you can add the specific Metaplane callbacks to the list of callbacks:

from airflow_metaplane import mp_callbacks

DAG(
  "name",
  ...
  default_args={
    ...
    "on_execute_callback": [your_task_execute_callback, mp_callbacks.task_execute_callback],
    "on_failure_callback": [your_task_failure_callback, mp_callbacks.task_failure_callback],
    "on_success_callback": [your_task_success_callback, mp_callbacks.task_success_callback],
    "on_retry_callback": [your_task_retry_callback, mp_callbacks.task_retry_callback]
  },
  on_failure_callback=[your_dag_failure_callback, mp_callbacks.dag_failure_callback],
  on_success_callback=[your_dag_success_callback, mp_callbacks.dag_success_callback],
  sla_miss_callback=[your_dag_sla_miss_callback, mp_callbacks.dag_sla_miss_callback]
) 

DAG and Task lineage

If you use Airflow to run transformation queries, you can use Metaplane to track lineage from Airflow DAGs and Tasks to warehouse tables and columns. This gives you full visibility into how data moves through your systems. To support Airflow lineage, follow the below steps:

Snowflake

For Snowflake, Metaplane supports Snowflake query tags as an easy way to track which DAG and Task each query originates from. Metaplane expects the query tag to be valid json with dag_id and task_id as JSON properties. Many Snowflake operators support passing through query tags as session parameters. Example below:

SnowflakeOperator(
  task_id=<your task id>,
  sql=<your sql>,
  session_parameters={
    "QUERY_TAG": '{"dag_id": <your dag id>, "task_id": <your task id>}'
  }
)

BigQuery

For BigQuery, Metaplane supports BigQuery labels. Metaplane expects a dag_id and task_id label. Many BigQuery operators support passing through labels. Example below:

BigQueryExecuteQueryOperator(
  task_id=<your task id>,
  sql=<your sql>,
  labels={'dag_id': <your dag id>, 'task_id': <your task id>}
)

All warehouses

For all warehouses, Metaplane supports SQL comments. SQL comments must be in the following format:

-- mp_airflow {"dag_id": "{{ dag.dag_id }}", "task_id": "{{ task.task_id }}"}