Managed Airflow (第 3 代) | Managed Airflow (第 2 代) | Managed Airflow (舊版第 1 代)
本頁說明如何使用 Cloud Run 函式,根據事件觸發 Managed Service for Apache Airflow DAG。
Apache Airflow 的設計是定期執行 DAG,但您也可以在發生事件時觸發 DAG。其中一種做法是使用 Cloud Run 函式,在發生特定事件時觸發 Managed Airflow DAG。
本指南中的範例程式會在 Cloud Storage bucket 每次有所異動時執行 DAG。bucket 中任何物件的變更都會觸發函式。這個函式會向 Managed Airflow 環境的 Airflow REST API 發出要求。Airflow 會處理這項要求並執行 DAG。DAG 會輸出變更相關資訊。
事前準備
檢查環境的網路設定
這個解決方案不適用於私人 IP 和 VPC Service Controls 設定,因為在這些設定中,無法設定從 Cloud Run 函式到 Airflow 網路伺服器的連線。
在 Managed Airflow (第 2 代) 中,您可以使用其他方法: 使用 Cloud Run 函式和 Pub/Sub 訊息觸發 DAG
為專案啟用 API
控制台
啟用 Managed Airflow 和 Cloud Run functions API。
啟用 API 時所需的角色
如要啟用 API,您需要服務使用情形管理員 IAM 角色 (roles/serviceusage.serviceUsageAdmin),其中包含 serviceusage.services.enable 權限。瞭解如何授予角色。
gcloud
啟用 Managed Airflow 和 Cloud Run functions API:
啟用 API 時所需的角色
如要啟用 API,您需要具備服務使用情形管理員 IAM 角色 (roles/serviceusage.serviceUsageAdmin),其中包含 serviceusage.services.enable 權限。瞭解如何授予角色。
gcloud services enable cloudfunctions.googleapis.comcomposer.googleapis.com
啟用 Airflow REST API
視 Airflow 版本而定:
- 在 Airflow 2 中,穩定版 REST API 預設為啟用。如果環境停用了穩定版 API,請啟用穩定版 REST API。
- 如果是 Airflow 1,請啟用實驗性 REST API。
使用 Webserver Access Control 允許對 Airflow REST API 進行 API 呼叫
Cloud Run 函式可使用 IPv4 或 IPv6 位址連線至 Airflow REST API。
如果不確定呼叫 IP 範圍,請使用 Webserver Access Control 中的預設設定選項 All IP addresses have access (default),以免不慎封鎖 Cloud Run 函式。
建立 Cloud Storage bucket
這個範例會在 Cloud Storage bucket 發生變更時觸發 DAG。建立新的 bucket,以便用於這個範例。
取得 Airflow 網路伺服器網址
這個範例會向 Airflow 網路伺服器端點發出 REST API 要求。
在 Cloud Functions 程式碼中,您會使用 Airflow 網頁介面網址中 .appspot.com 前的部分。
控制台
前往 Google Cloud 控制台的「Environments」(環境) 頁面。
按一下環境名稱。
在「環境詳細資料」頁面中,前往「環境設定」分頁。
Airflow 網路伺服器的網址會列在「Airflow web UI」(Airflow 網頁版 UI) 項目中。
gcloud
執行下列指令:
gcloud composer environments describe ENVIRONMENT_NAME \
--location LOCATION \
--format='value(config.airflowUri)'
更改項目:
- 將
ENVIRONMENT_NAME替換為環境的名稱。 - 將
LOCATION替換為環境所在的區域。
取得 IAM Proxy 的 client_id
如要向 Airflow REST API 端點提出要求,您必須為函式提供保護 Airflow 網路伺服器的 Identity and Access Management Proxy 用戶端 ID。
Managed Airflow 不會直接提供這項資訊,而會向 Airflow 網路伺服器發出未經授權的要求,並從重新導向網址中擷取用戶端 ID:
cURL
curl -v AIRFLOW_URL 2>&1 >/dev/null | grep -o "client_id\=[A-Za-z0-9-]*\.apps\.googleusercontent\.com"
將 AIRFLOW_URL 替換為 Airflow 網頁介面的網址。
在輸出內容中,搜尋 client_id 後方的字串。例如:
client_id=836436932391-16q2c5f5dcsfnel77va9bvf4j280t35c.apps.googleusercontent.com
Python
將下列程式碼儲存為 get_client_id.py 檔案。填入 project_id、location 和 composer_environment 的值,然後在 Cloud Shell 或本機環境中執行程式碼。
將 DAG 上傳至環境
將 DAG 上傳至環境。 下列 DAG 範例會輸出收到的 DAG 執行設定。您將透過函式觸發這個 DAG,該函式會在稍後建立。
部署會觸發 DAG 的 Cloud Function
您可以使用 Cloud Run functions 或 Cloud Run 支援的偏好語言部署 Cloud Function。本教學課程將示範以 Python 和 Java 實作的 Cloud 函式。
指定 Cloud 函式設定參數
觸發條件。以這個範例來說,請選取在 bucket 中建立新物件或覆寫現有物件時觸發的觸發條件。
「執行階段、建構作業、連線和安全性設定」專區中的「執行階段服務帳戶」。根據偏好設定,使用下列其中一個選項:
選取「Compute Engine 預設服務帳戶」。根據預設的 IAM 權限,這個帳戶可以執行存取 Managed Airflow 環境的函式。
建立具備 Composer 使用者角色的自訂服務帳戶,並將其指定為此函式的執行階段服務帳戶。這個選項遵循最低權限原則。
執行階段和進入點,位於「程式碼」步驟。為這個範例新增程式碼時,請選取 Python 3.7 以上版本做為執行階段,並指定
trigger_dag做為進入點。
新增要求
在 requirements.txt 檔案中指定依附元件:
將下列程式碼放入 main.py 檔案,並進行下列替換:
將
client_id變數的值替換為先前取得的client_id值。將
webserver_id變數的值替換為您的租戶專案 ID,這是 Airflow 網頁介面網址中.appspot.com前的部分。您先前已取得 Airflow 網頁介面網址。指定您使用的 Airflow REST API 版本:
- 如果您使用穩定版 Airflow REST API,請將
USE_EXPERIMENTAL_API變數設為False。 - 如果您使用實驗性 Airflow REST API,則不必進行任何變更。
USE_EXPERIMENTAL_API變數已設為True。
- 如果您使用穩定版 Airflow REST API,請將
測試函式
如要確認函式和 DAG 是否正常運作,請按照下列步驟操作:
- 等待函式部署完成。
- 將檔案上傳至 Cloud Storage bucket。或者,您也可以在 Google Cloud 控制台中選取函式的「測試函式」動作,手動觸發函式。
- 在 Airflow 網頁介面中查看 DAG 頁面。DAG 應有一個有效或已完成的 DAG 執行作業。
- 在 Airflow UI 中,查看這項執行的工作記錄。您應該會看到
print_gcs_info工作將從函式收到的資料輸出至記錄:
[2021-04-04 18:25:44,778] {bash_operator.py:154} INFO - Output:
[2021-04-04 18:25:44,781] {bash_operator.py:158} INFO - Triggered from GCF:
{bucket: example-storage-for-gcf-triggers, contentType: text/plain,
crc32c: dldNmg==, etag: COW+26Sb5e8CEAE=, generation: 1617560727904101,
... }
[2021-04-04 18:25:44,781] {bash_operator.py:162} INFO - Command exited with
return code 0h