Airflow
Apache Airflow™ is an open-source platform for developing, scheduling, and monitoring batch-oriented workflows. Airflow’s extensible Python framework enables you to build workflows connecting with virtually any technology.
The Metaplane Airflow integration adds visibility into Airflow DAG performance and warehouse lineage originating from Airflow DAGs. The Airflow integration can be set up with just a few steps.
Create Airflow connection in Metaplane
Navigate to the connections page, click on "Airflow", then follow the prompt. Metaplane will associate all Airflow DAGs and Tasks with this connection. Set the "Host URL" to the URL of your Airflow UI. This will ensure we can link to your DAGs from Metaplane.
Once the Airflow connection has been created, please take note of the connection ID which can be copied from the URL. It will look something like: app.metaplane.dev/s/<your connectionId UUID>
DAG and Task runtime monitoring
Metaplane can monitor Airflow DAG and Task runtimes so users are alerted when DAGs take longer than expected to run. Follow the below instructions to set up Airflow Callbacks to send Airflow data to Metaplane when Airflow DAGs and Tasks complete. Once enabled, Metaplane will automatically create monitors to track your DAG runtimes. Metaplane will not automatically create monitors for your Task runtimes to avoid noisy alerting. If you would like to enable Task-level monitoring, you can do so from the app.
1. Install airflow-metaplane package in Airflow
Add the airflow-metaplane
package to your Airflow installation. Please note that installing a new package will require a restart of your Airflow cluster.
2. Create Metaplane connection in Airflow
Once the airflow-metaplane
package is installed and Airflow has been restarted, you can define a Metaplane connection either through the Airflow UI or as an environment variable.
Using the Airflow UI
Navigate to the Admin -> Connections page in your Airflow UI.
Add a new connection of type Metaplane.
Connection Id: Set the Connection Id to metaplane_default
.
API key: Use an existing API key or follow these instructions to generate a new API key.
Metaplane Airflow connection ID: Paste the connection ID copied from the Airflow connection you created in Metaplane.
Using an environment variable
export AIRFLOW_CONN_METAPLANE_DEFAULT='{
"conn_type": "metaplane",
"password": <api key>,
"extra": {
"mp_airflow_connection_id": <Airflow connection id in Metaplane copied from the first step>
}
}'
Custom connection name
Both methods above assume that the Metaplane connection id in Airflow is named metaplane_default
. If you would like to give your Metaplane connection a custom name, you can do so but be sure to pass the custom name into the DAG params
dictionary with the key metaplane_connection_id
. See below for an example:
DAG(
"name",
...
params={"metaplane_connection_id": <custom connection id>},
)
3. Add callbacks to DAGs
Metaplane takes advantage of the callback system in Airflow to track DAG and task run information. These callbacks are available in mp_callbacks
in the airflow_metaplane
module. The relevant import is below:
from airflow_metaplane import mp_callbacks
DAG with no existing callbacks
If your DAG does not have any existing callbacks, adding Metaplane callbacks is as simple as passing through mp_callbacks.dag_callbacks
and mp_callbacks.task_callbacks
in the DAG definition:
from airflow_metaplane import mp_callbacks
DAG(
"name",
...
default_args={
...
**mp_callbacks.task_callbacks
},
**mp_callbacks.dag_callbacks
)
DAG with existing callbacks
If you are already using callbacks with your DAG, you can add the specific Metaplane callbacks to the list of callbacks:
from airflow_metaplane import mp_callbacks
DAG(
"name",
...
default_args={
...
"on_execute_callback": [your_task_execute_callback, mp_callbacks.task_execute_callback],
"on_failure_callback": [your_task_failure_callback, mp_callbacks.task_failure_callback],
"on_success_callback": [your_task_success_callback, mp_callbacks.task_success_callback],
"on_retry_callback": [your_task_retry_callback, mp_callbacks.task_retry_callback]
},
on_failure_callback=[your_dag_failure_callback, mp_callbacks.dag_failure_callback],
on_success_callback=[your_dag_success_callback, mp_callbacks.dag_success_callback],
sla_miss_callback=[your_dag_sla_miss_callback, mp_callbacks.dag_sla_miss_callback]
)
DAG and Task lineage
If you use Airflow to run transformation queries, you can use Metaplane to track lineage from Airflow DAGs and Tasks to warehouse tables and columns. This gives you full visibility into how data moves through your systems. To support Airflow lineage, follow the below steps:
Snowflake
For Snowflake, Metaplane supports Snowflake query tags as an easy way to track which DAG and Task each query originates from. Metaplane expects the query tag to be valid json with dag_id
and task_id
as JSON properties. Many Snowflake operators support passing through query tags as session parameters. Example below:
SnowflakeOperator(
task_id=<your task id>,
sql=<your sql>,
session_parameters={
"QUERY_TAG": '{"dag_id": <your dag id>, "task_id": <your task id>}'
}
)
BigQuery
For BigQuery, Metaplane supports BigQuery labels. Metaplane expects a dag_id
and task_id
label. Many BigQuery operators support passing through labels. Example below:
BigQueryExecuteQueryOperator(
task_id=<your task id>,
sql=<your sql>,
labels={'dag_id': <your dag id>, 'task_id': <your task id>}
)
All warehouses
For all warehouses, Metaplane supports SQL comments. SQL comments must be in the following format:
-- mp_airflow {"dag_id": "{{ dag.dag_id }}", "task_id": "{{ task.task_id }}"}
Multiple Airflow connections
If you have multiple Airflow connections in Metaplane and you use the same DAG and task names across connections, you should add an additional connection_id
property to disambiguate which connection this DAG and task belong to. In the above examples, please add an additional connection_id
set to the Airflow connection id you created in Metaplane in the first step above.
Updated 2 months ago