Déclencher des DAG dans d'autres environnements et projets

Airflow géré (3e génération) | Airflow géré (2e génération) | Airflow géré (ancienne 1re génération)

Cette page explique comment implémenter un DAG qui déclenche des DAG dans d'autres environnements et projets Managed Airflow à l'aide d'opérateurs Airflow pour Managed Airflow.

Si vous souhaitez déclencher des DAG dans votre environnement, consultez Planifier et déclencher des DAG.

Configurer les autorisations IAM

Si l'environnement cible se trouve dans un autre projet, le compte de service de votre environnement a besoin de rôles qui permettent d'interagir avec les environnements de ce projet.

Projet Ressource Compte principal Rôle
Projet dans lequel se trouve l'environnement cible Projet Compte de service de l'environnement source Rôle Nœud de calcul Composer (composer.worker)
Projet dans lequel se trouve l'environnement cible Projet Compte de service de l'environnement source Rôle personnalisé doté de l'autorisation composer.environments.executeAirflowCommand

Déclencher un DAG dans un autre environnement

Le DAG d'exemple décrit dans cette section effectue les opérations suivantes :

  1. Déclenchez un DAG dans un autre environnement Managed Airflow.
  2. Vérifie si une exécution DAG est terminée.

Une fois l'exécution du DAG dans un autre environnement terminée, l'exemple de DAG est marqué comme réussi.

Exécuter des commandes de CLI Airflow avec CloudComposerRunAirflowCLICommandOperator

Vous pouvez utiliser l'opérateur CloudComposerRunAirflowCLICommandOperator pour exécuter des commandes Airflow CLI dans un autre environnement Managed Airflow. L'exemple de DAG exécute la commande dags trigger, qui déclenche un DAG.

Cet opérateur peut s'exécuter en mode différable. Vous pouvez l'activer en définissant le paramètre deferrable sur True.

run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
    task_id="run_airflow_cli_cmd",
    project_id="target-project",
    environment_id="target-composer-environment",
    region="us-central1",
    command="dags trigger -- target_dag",
    # You can run this operator in the deferrable mode:
    # deferrable=True
)

Vérifier si une exécution DAG est terminée

Vous pouvez utiliser le capteur CloudComposerDAGRunSensor pour vérifier si une exécution de DAG est terminée dans un autre environnement Managed Airflow.

Ce capteur peut fonctionner en mode différable. Vous pouvez l'activer en définissant le paramètre deferrable sur True.

dag_run_sensor = CloudComposerDAGRunSensor(
    task_id="dag_run_sensor",
    project_id="target-project",
    environment_id="target-composer-environment",
    region="us-central1",
    composer_dag_id="target_dag",
    allowed_states=["success"],
    # You can run this sensor in the deferrable mode:
    # deferrable=True
)

Exemple de code complet

Voici l'exemple de code complet d'un DAG qui combine les deux tâches décrites précédemment.

from datetime import datetime, timedelta
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.cloud_composer import (
    CloudComposerRunAirflowCLICommandOperator,
)
from airflow.providers.google.cloud.sensors.cloud_composer import CloudComposerDAGRunSensor

DAG_ID = "trigger_dag_in_another_composer_environment"

TARGET_PROJECT_ID = "example-target-project"
TARGET_REGION = "example-target-region"
TARGET_ENV_ID = "example-target-composer-environment"

TARGET_DAG = "example_target_dag_id"
COMMAND = f"dags trigger -- {TARGET_DAG}"

with DAG(
    DAG_ID,
    schedule="@once",
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=["example", "composer"],
) as dag:

    run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
        task_id="run_airflow_cli_cmd",
        project_id=TARGET_PROJECT_ID,
        environment_id=TARGET_ENV_ID,
        region=TARGET_REGION,
        command=COMMAND,
        # You can run this operator in the deferrable mode:
        # deferrable=True
    )

    dag_run_sensor = CloudComposerDAGRunSensor(
        task_id="dag_run_sensor",
        project_id=TARGET_PROJECT_ID,
        environment_id=TARGET_ENV_ID,
        region=TARGET_REGION,
        composer_dag_id=TARGET_DAG,
        allowed_states=["success"],
        execution_range=timedelta(minutes=5),
        # You can run this sensor in the deferrable mode:
        # deferrable=True
    )

    run_airflow_cli_cmd >> dag_run_sensor

Étapes suivantes