Cloud Composer 3 | Cloud Composer 2 | Cloud Composer�
本頁面說明如何實作 DAG,透過 Cloud Composer 適用的 Airflow 運算子,在其他 Cloud Composer 環境和專案中觸發 DAG。
如要在環境中觸發 DAG,請參閱「排定及觸發 DAG」。
設定 IAM 權限
如果目標環境位於其他專案,環境的服務帳戶必須具備可與該專案環境互動的角色。
專案 | 資源 | 主體 | 角色 |
---|---|---|---|
目標環境所在的專案 | 專案 | 來源環境的環境服務帳戶 |
Composer Worker 角色 (composer.worker ) |
目標環境所在的專案 | 專案 | 來源環境的環境服務帳戶 |
具有 composer.environments.executeAirflowCommand 權限的自訂角色 |
在其他環境中觸發 DAG
本節所述的 DAG 範例會執行下列操作:
- 在其他 Cloud Composer 環境中觸發 DAG。
- 檢查 DAG 執行作業是否完成。
另一個環境中的 DAG 執行作業完成後,範例 DAG 會標示為成功。
使用 CloudComposerRunAirflowCLICommandOperator 執行 Airflow CLI 指令
您可以使用 CloudComposerRunAirflowCLICommandOperator 運算子,在另一個 Cloud Composer 環境中執行 Airflow CLI 指令。範例 DAG 會執行 dags trigger
指令,觸發 DAG。
這個運算子可以在可延遲模式中執行,您可以將 deferrable
參數設為 True
來啟用。
run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
task_id="run_airflow_cli_cmd",
project_id="target-project",
environment_id="target-composer-environment",
region="us-central1",
command="dags trigger -- target_dag",
# You can run this operator in the deferrable mode:
# deferrable=True
)
檢查 DAG 執行作業是否完成
您可以使用 CloudComposerDAGRunSensor 感應器,檢查另一個 Cloud Composer 環境中的 DAG 執行作業是否已完成。
這個感應器可以在可延遲模式下執行,您可以將 deferrable
參數設為 True
來啟用這項功能。
dag_run_sensor = CloudComposerDAGRunSensor(
task_id="dag_run_sensor",
project_id="target-project",
environment_id="target-composer-environment",
region="us-central1",
composer_dag_id="target_dag",
allowed_states=["success"],
# You can run this sensor in the deferrable mode:
# deferrable=True
)
完整程式碼範例
以下是 DAG 的完整程式碼範例,其中結合了先前說明的兩項工作。
from datetime import datetime, timedelta
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.cloud_composer import (
CloudComposerRunAirflowCLICommandOperator,
)
from airflow.providers.google.cloud.sensors.cloud_composer import CloudComposerDAGRunSensor
DAG_ID = "trigger_dag_in_another_composer_environment"
TARGET_PROJECT_ID = "example-target-project"
TARGET_REGION = "example-target-region"
TARGET_ENV_ID = "example-target-composer-environment"
TARGET_DAG = "example_target_dag_id"
COMMAND = f"dags trigger -- {TARGET_DAG}"
with DAG(
DAG_ID,
schedule="@once",
start_date=datetime(2024, 1, 1),
catchup=False,
tags=["example", "composer"],
) as dag:
run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
task_id="run_airflow_cli_cmd",
project_id=TARGET_PROJECT_ID,
environment_id=TARGET_ENV_ID,
region=TARGET_REGION,
command=COMMAND,
# You can run this operator in the deferrable mode:
# deferrable=True
)
dag_run_sensor = CloudComposerDAGRunSensor(
task_id="dag_run_sensor",
project_id=TARGET_PROJECT_ID,
environment_id=TARGET_ENV_ID,
region=TARGET_REGION,
composer_dag_id=TARGET_DAG,
allowed_states=["success"],
execution_range=timedelta(minutes=5),
# You can run this sensor in the deferrable mode:
# deferrable=True
)
run_airflow_cli_cmd >> dag_run_sensor