自動化調度管理管道

本頁面說明如何透過 Managed Service for Apache Airflow 和觸發條件自動調度管理管道。Cloud Data Fusion 建議使用 Managed Airflow 協調管道。如需更簡單的自動化調度管理方式,請使用觸發條件。

Composer

使用 Managed Airflow 自動化調度管理管道

透過 Managed Airflow 在 Cloud Data Fusion 中協調管理管道執行作業,可享有下列優點:

  • 集中管理工作流程:統一管理多個 Cloud Data Fusion 管道的執行作業。
  • 依附元件管理:定義管道之間的依附元件,確保執行順序正確。
  • 監控和快訊:Managed Airflow 提供監控功能,以及失敗快訊。
  • 整合其他服務:透過 Managed Airflow,您可以協調跨 Cloud Data Fusion 和其他Google Cloud 服務的工作流程。

如要使用 Managed Airflow 調度 Cloud Data Fusion 管道,請按照下列步驟操作:

  1. 設定 Managed Airflow 環境。

    • 建立 Managed Airflow 環境。如果沒有,請在 Google Cloud 專案中佈建環境。這個環境是您的自動化調度管理工作區。
    • 授予權限。請確保 Managed Airflow 服務帳戶具備存取 Cloud Data Fusion 的必要權限 (例如啟動、停止及列出管道的權限)。
  2. 定義用於自動化調度管理的有向無環圖 (DAG)。

    • 建立 DAG:在 Managed Airflow 中建立 DAG,定義 Cloud Data Fusion 管道的自動化調度管理工作流程。
    • Cloud Data Fusion 運算子:在 DAG 中使用 Managed Airflow 的 Cloud Data Fusion 運算子。這些運算子可讓您以程式輔助方式與 Cloud Data Fusion 互動。

Cloud Data Fusion 運算子

Cloud Data Fusion 管道自動化調度管理有下列運算子:

CloudDataFusionStartPipelineOperator

依 ID 觸發 Cloud Data Fusion 管道的執行作業。這個函式具有下列參數:

  • 管道 ID
  • 位置 (Google Cloud 區域)
  • 管道命名空間
  • 執行階段引數 (選用)
  • 等待完成 (選用)
  • 逾時 (選用)
CloudDataFusionStopPipelineOperator

可停止執行中的 Cloud Data Fusion 管道。

CloudDataFusionDeletePipelineOperator

刪除 Cloud Data Fusion 管道。

建立 DAG 工作流程

建構 DAG 工作流程時,請考量下列事項:

  • 定義依附元件:使用 DAG 結構定義工作之間的依附元件。舉例來說,您可能有一個工作,會等待某個命名空間中的管道順利完成,再觸發另一個命名空間中的管道。
  • 排程:排定 DAG 按照特定時間間隔執行,例如每天或每小時,或設定為手動觸發。

詳情請參閱「Managed Airflow 總覽」。

觸發條件

透過觸發條件自動調度管理管道

Cloud Data Fusion 觸發條件可讓您在上游管道完成 (成功、失敗或任何指定條件) 後,自動執行下游管道。

觸發條件在下列工作相當實用:

  • 只要清理一次資料,即可供多個下游管道使用。
  • 在管道之間分享資訊,例如執行階段引數和外掛程式設定。這項工作稱為「酬載設定」
  • 使用一組動態管道,根據每小時、每天、每週或每月資料執行,而不是每次執行都必須更新的靜態管道。

舉例來說,您有一個資料集,其中包含貴公司所有出貨資訊。您想根據這項資料回答幾個業務問題。為此,您要建立一個管道來清理原始運送資料,並命名為「Shipments Data Cleaning」。接著,您會建立第二個管道「Delayed Shipments USA」,讀取清理後的資料,找出美國境內延遲超過指定門檻的運送案件。上游的「Shipments Data Cleaning」管道成功完成後,即可觸發「Delayed Shipments USA」管道。

此外,由於下游管道會使用上游管道的輸出內容,因此您必須指定下游管道使用這個觸發程序執行時,也會收到要讀取的輸入目錄 (也就是上游管道產生輸出內容的目錄)。這個程序稱為「傳遞酬載設定」,您可以使用執行階段引數定義。您可以根據每小時、每天、每週或每月資料執行一組動態管道 (而非靜態管道,靜態管道每次執行都必須更新)。

如要透過觸發條件協調管道,請按照下列程序操作:

  1. 建立上游和下游管道。

    • 在 Cloud Data Fusion Studio 中,設計及部署構成協調鏈的管道。
    • 請考慮工作流程中,哪個管道完成後會啟動下一個管道 (下游)。
  2. 選用:傳遞上游管道的執行階段引數。

  3. 在下游管道中建立輸入觸發條件。

    • 在 Cloud Data Fusion Studio 中,前往「List」(清單) 頁面。在「已部署」分頁中,按一下下游管道的名稱。系統會顯示該管道的「Deploy」(部署) 檢視畫面。
    • 按一下頁面左側中間的「輸入觸發條件」。畫面上會顯示可用的管道清單。
    • 按一下上游管道。選取一或多個上游管道完成狀態 (「成功」、「失敗」或「停止」),做為下游管道應執行的條件。
    • 如要讓上游管道與下游管道共用資訊 (稱為「酬載設定」),請按一下「觸發條件設定」,然後按照步驟將酬載設定做為執行階段引數傳遞。否則請按一下「啟用觸發條件」
  4. 測試觸發條件。

    • 啟動上游管道的執行作業。
    • 如果觸發條件設定正確,下游管道會在上游管道完成後,根據您設定的條件自動執行。

將酬載設定做為執行階段引數傳遞

酬載設定可讓您將上游管道的資訊分享至下游管道。例如輸出目錄、資料格式或管道的執行日期。下游管道隨後會使用這項資訊做出決策,例如判斷要從哪個資料集讀取資料。

如要將資訊從上游管道傳遞至下游管道,請使用上游管道中任何外掛程式的執行階段引數或設定值,設定下游管道的執行階段引數。

每當下游管道觸發及執行時,系統會使用觸發下游管道的上游管道特定執行作業的執行階段引數,設定下游管道的有效負載設定。

如要將酬載設定做為執行階段引數傳遞,請按照下列步驟操作:

  1. 接著建立輸入觸發條件,點選「觸發條件設定」後,系統會顯示您先前為上游管道設定的任何執行階段引數。選擇要從上游管道傳遞至下游管道的執行階段引數,以便在觸發條件執行時使用。
  2. 按一下「外掛程式設定」分頁標籤,即可查看上游管道觸發時,會將哪些項目傳遞至下游管道。
  3. 按一下「設定並啟用觸發條件」