Airflow géré (3e génération) | Airflow géré (2e génération) | Airflow géré (ancienne 1re génération)
Cette page explique comment implémenter un DAG qui déclenche des DAG dans d'autres environnements et projets Managed Airflow à l'aide d'opérateurs Airflow pour Managed Airflow.
Si vous souhaitez déclencher des DAG dans votre environnement, consultez Planifier et déclencher des DAG.
Configurer les autorisations IAM
Si l'environnement cible se trouve dans un autre projet, le compte de service de votre environnement a besoin de rôles qui permettent d'interagir avec les environnements de ce projet.
| Projet | Ressource | Compte principal | Rôle |
|---|---|---|---|
| Projet dans lequel se trouve l'environnement cible | Projet | Compte de service de l'environnement source |
Rôle Nœud de calcul Composer (composer.worker) |
| Projet dans lequel se trouve l'environnement cible | Projet | Compte de service de l'environnement source |
Rôle personnalisé doté de l'autorisation composer.environments.executeAirflowCommand |
Déclencher un DAG dans un autre environnement
Le DAG d'exemple décrit dans cette section effectue les opérations suivantes :
- Déclenchez un DAG dans un autre environnement Managed Airflow.
- Vérifie si une exécution DAG est terminée.
Une fois l'exécution du DAG dans un autre environnement terminée, l'exemple de DAG est marqué comme réussi.
Exécuter des commandes de CLI Airflow avec CloudComposerRunAirflowCLICommandOperator
Vous pouvez utiliser l'opérateur CloudComposerRunAirflowCLICommandOperator pour exécuter des commandes Airflow CLI dans un autre environnement Managed Airflow. L'exemple de DAG exécute la commande dags trigger, qui déclenche un DAG.
Cet opérateur peut s'exécuter en mode différable. Vous pouvez l'activer en définissant le paramètre deferrable sur True.
run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
task_id="run_airflow_cli_cmd",
project_id="target-project",
environment_id="target-composer-environment",
region="us-central1",
command="dags trigger -- target_dag",
# You can run this operator in the deferrable mode:
# deferrable=True
)
Vérifier si une exécution DAG est terminée
Vous pouvez utiliser le capteur CloudComposerDAGRunSensor pour vérifier si une exécution de DAG est terminée dans un autre environnement Managed Airflow.
Ce capteur peut fonctionner en mode différable. Vous pouvez l'activer en définissant le paramètre deferrable sur True.
dag_run_sensor = CloudComposerDAGRunSensor(
task_id="dag_run_sensor",
project_id="target-project",
environment_id="target-composer-environment",
region="us-central1",
composer_dag_id="target_dag",
allowed_states=["success"],
# You can run this sensor in the deferrable mode:
# deferrable=True
)
Exemple de code complet
Voici l'exemple de code complet d'un DAG qui combine les deux tâches décrites précédemment.
from datetime import datetime, timedelta
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.cloud_composer import (
CloudComposerRunAirflowCLICommandOperator,
)
from airflow.providers.google.cloud.sensors.cloud_composer import CloudComposerDAGRunSensor
DAG_ID = "trigger_dag_in_another_composer_environment"
TARGET_PROJECT_ID = "example-target-project"
TARGET_REGION = "example-target-region"
TARGET_ENV_ID = "example-target-composer-environment"
TARGET_DAG = "example_target_dag_id"
COMMAND = f"dags trigger -- {TARGET_DAG}"
with DAG(
DAG_ID,
schedule="@once",
start_date=datetime(2024, 1, 1),
catchup=False,
tags=["example", "composer"],
) as dag:
run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
task_id="run_airflow_cli_cmd",
project_id=TARGET_PROJECT_ID,
environment_id=TARGET_ENV_ID,
region=TARGET_REGION,
command=COMMAND,
# You can run this operator in the deferrable mode:
# deferrable=True
)
dag_run_sensor = CloudComposerDAGRunSensor(
task_id="dag_run_sensor",
project_id=TARGET_PROJECT_ID,
environment_id=TARGET_ENV_ID,
region=TARGET_REGION,
composer_dag_id=TARGET_DAG,
allowed_states=["success"],
execution_range=timedelta(minutes=5),
# You can run this sensor in the deferrable mode:
# deferrable=True
)
run_airflow_cli_cmd >> dag_run_sensor