DAGs in anderen Umgebungen und Projekten auslösen

Managed Airflow (Gen 3) | Managed Airflow (Gen 2) | Managed Airflow (Legacy Gen 1)

Auf dieser Seite wird gezeigt, wie Sie einen DAG implementieren, der DAGs in anderen Managed Airflow-Umgebungen und -Projekten auslöst. Dazu verwenden Sie Airflow-Operatoren für Managed Airflow.

Wenn Sie stattdessen DAGs in Ihrer Umgebung auslösen möchten, lesen Sie DAGs planen und auslösen.

IAM-Berechtigungen konfigurieren

Wenn sich die Zielumgebung in einem anderen Projekt befindet, benötigt das Dienst konto Ihrer Umgebung Rollen, die die Interaktion mit Umgebungen in diesem Projekt ermöglichen.

Projekt Ressource Prinzipal Rolle
Projekt, in dem sich die Zielumgebung befindet Projekt Dienstkonto der Quellumgebung Rolle Composer-Worker (composer.worker)
Projekt, in dem sich die Zielumgebung befindet Projekt Dienstkonto der Quellumgebung Eine benutzerdefinierte Rolle mit der composer.environments.executeAirflowCommand Berechtigung

DAG in einer anderen Umgebung auslösen

Der in diesem Abschnitt beschriebene Beispiel-DAG führt folgende Aktionen aus:

  1. Einen DAG in einer anderen Managed Airflow-Umgebung auslösen.
  2. Prüfen, ob eine DAG-Ausführung abgeschlossen ist.

Nachdem die DAG-Ausführung in einer anderen Umgebung abgeschlossen ist, wird der Beispiel-DAG als erfolgreich markiert.

Befehle der Airflow-Befehlszeile mit CloudComposerRunAirflowCLICommandOperator ausführen

Mit dem CloudComposerRunAirflowCLICommandOperator Operator können Sie Befehle der Airflow-Befehlszeile in einer anderen Managed Airflow Umgebung ausführen. Der Beispiel-DAG führt den Befehl dags trigger aus, der einen DAG auslöst.

Dieser Operator kann im verzögerbaren Modus ausgeführt werden. Sie können ihn aktivieren, indem Sie den deferrable Parameter auf True setzen.

run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
    task_id="run_airflow_cli_cmd",
    project_id="target-project",
    environment_id="target-composer-environment",
    region="us-central1",
    command="dags trigger -- target_dag",
    # You can run this operator in the deferrable mode:
    # deferrable=True
)

Prüfen, ob eine DAG-Ausführung abgeschlossen ist

Mit dem Sensor CloudComposerDAGRunSensor können Sie prüfen, ob eine DAG-Ausführung in einer anderen Managed Airflow Umgebung abgeschlossen ist.

Dieser Sensor kann im verzögerbaren Modus ausgeführt werden. Sie können ihn aktivieren, indem Sie den deferrable Parameter auf True setzen.

dag_run_sensor = CloudComposerDAGRunSensor(
    task_id="dag_run_sensor",
    project_id="target-project",
    environment_id="target-composer-environment",
    region="us-central1",
    composer_dag_id="target_dag",
    allowed_states=["success"],
    # You can run this sensor in the deferrable mode:
    # deferrable=True
)

Vollständiges Codebeispiel

Im Folgenden finden Sie das vollständige Codebeispiel für einen DAG, der die beiden zuvor beschriebenen Aufgaben kombiniert.

from datetime import datetime, timedelta
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.cloud_composer import (
    CloudComposerRunAirflowCLICommandOperator,
)
from airflow.providers.google.cloud.sensors.cloud_composer import CloudComposerDAGRunSensor

DAG_ID = "trigger_dag_in_another_composer_environment"

TARGET_PROJECT_ID = "example-target-project"
TARGET_REGION = "example-target-region"
TARGET_ENV_ID = "example-target-composer-environment"

TARGET_DAG = "example_target_dag_id"
COMMAND = f"dags trigger -- {TARGET_DAG}"

with DAG(
    DAG_ID,
    schedule="@once",
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=["example", "composer"],
) as dag:

    run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
        task_id="run_airflow_cli_cmd",
        project_id=TARGET_PROJECT_ID,
        environment_id=TARGET_ENV_ID,
        region=TARGET_REGION,
        command=COMMAND,
        # You can run this operator in the deferrable mode:
        # deferrable=True
    )

    dag_run_sensor = CloudComposerDAGRunSensor(
        task_id="dag_run_sensor",
        project_id=TARGET_PROJECT_ID,
        environment_id=TARGET_ENV_ID,
        region=TARGET_REGION,
        composer_dag_id=TARGET_DAG,
        allowed_states=["success"],
        execution_range=timedelta(minutes=5),
        # You can run this sensor in the deferrable mode:
        # deferrable=True
    )

    run_airflow_cli_cmd >> dag_run_sensor

Nächste Schritte