Managed Airflow (Gen 3) | Managed Airflow (Gen 2) | Managed Airflow (Legacy Gen 1)
Auf dieser Seite wird gezeigt, wie Sie einen DAG implementieren, der DAGs in anderen Managed Airflow-Umgebungen und -Projekten auslöst. Dazu verwenden Sie Airflow-Operatoren für Managed Airflow.
Wenn Sie stattdessen DAGs in Ihrer Umgebung auslösen möchten, lesen Sie DAGs planen und auslösen.
IAM-Berechtigungen konfigurieren
Wenn sich die Zielumgebung in einem anderen Projekt befindet, benötigt das Dienst konto Ihrer Umgebung Rollen, die die Interaktion mit Umgebungen in diesem Projekt ermöglichen.
| Projekt | Ressource | Prinzipal | Rolle |
|---|---|---|---|
| Projekt, in dem sich die Zielumgebung befindet | Projekt | Dienstkonto der Quellumgebung |
Rolle Composer-Worker (composer.worker) |
| Projekt, in dem sich die Zielumgebung befindet | Projekt | Dienstkonto der Quellumgebung |
Eine benutzerdefinierte Rolle mit der
composer.environments.executeAirflowCommand Berechtigung |
DAG in einer anderen Umgebung auslösen
Der in diesem Abschnitt beschriebene Beispiel-DAG führt folgende Aktionen aus:
- Einen DAG in einer anderen Managed Airflow-Umgebung auslösen.
- Prüfen, ob eine DAG-Ausführung abgeschlossen ist.
Nachdem die DAG-Ausführung in einer anderen Umgebung abgeschlossen ist, wird der Beispiel-DAG als erfolgreich markiert.
Befehle der Airflow-Befehlszeile mit CloudComposerRunAirflowCLICommandOperator ausführen
Mit dem
CloudComposerRunAirflowCLICommandOperator
Operator können Sie Befehle der Airflow-Befehlszeile in einer anderen Managed Airflow
Umgebung ausführen. Der Beispiel-DAG führt den Befehl dags trigger aus, der einen DAG auslöst.
Dieser Operator kann im verzögerbaren Modus ausgeführt werden. Sie
können ihn aktivieren, indem Sie den deferrable Parameter auf True setzen.
run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
task_id="run_airflow_cli_cmd",
project_id="target-project",
environment_id="target-composer-environment",
region="us-central1",
command="dags trigger -- target_dag",
# You can run this operator in the deferrable mode:
# deferrable=True
)
Prüfen, ob eine DAG-Ausführung abgeschlossen ist
Mit dem Sensor CloudComposerDAGRunSensor können Sie prüfen, ob eine DAG-Ausführung in einer anderen Managed Airflow Umgebung abgeschlossen ist.
Dieser Sensor kann im verzögerbaren Modus ausgeführt werden. Sie können ihn
aktivieren, indem Sie den deferrable Parameter auf True setzen.
dag_run_sensor = CloudComposerDAGRunSensor(
task_id="dag_run_sensor",
project_id="target-project",
environment_id="target-composer-environment",
region="us-central1",
composer_dag_id="target_dag",
allowed_states=["success"],
# You can run this sensor in the deferrable mode:
# deferrable=True
)
Vollständiges Codebeispiel
Im Folgenden finden Sie das vollständige Codebeispiel für einen DAG, der die beiden zuvor beschriebenen Aufgaben kombiniert.
from datetime import datetime, timedelta
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.cloud_composer import (
CloudComposerRunAirflowCLICommandOperator,
)
from airflow.providers.google.cloud.sensors.cloud_composer import CloudComposerDAGRunSensor
DAG_ID = "trigger_dag_in_another_composer_environment"
TARGET_PROJECT_ID = "example-target-project"
TARGET_REGION = "example-target-region"
TARGET_ENV_ID = "example-target-composer-environment"
TARGET_DAG = "example_target_dag_id"
COMMAND = f"dags trigger -- {TARGET_DAG}"
with DAG(
DAG_ID,
schedule="@once",
start_date=datetime(2024, 1, 1),
catchup=False,
tags=["example", "composer"],
) as dag:
run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
task_id="run_airflow_cli_cmd",
project_id=TARGET_PROJECT_ID,
environment_id=TARGET_ENV_ID,
region=TARGET_REGION,
command=COMMAND,
# You can run this operator in the deferrable mode:
# deferrable=True
)
dag_run_sensor = CloudComposerDAGRunSensor(
task_id="dag_run_sensor",
project_id=TARGET_PROJECT_ID,
environment_id=TARGET_ENV_ID,
region=TARGET_REGION,
composer_dag_id=TARGET_DAG,
allowed_states=["success"],
execution_range=timedelta(minutes=5),
# You can run this sensor in the deferrable mode:
# deferrable=True
)
run_airflow_cli_cmd >> dag_run_sensor