在 Managed Airflow (第 2 代) 中執行 Apache Airflow DAG (Google Cloud CLI)
Managed Airflow (第 3 代) | Managed Airflow (第 2 代) | Managed Airflow (舊版第 1 代)
本快速入門指南說明如何建立 Managed Service for Apache Airflow 環境,並在 Managed Airflow (第 2 代) 中執行 Apache Airflow DAG。
如果您是 Airflow 新手,請參閱 Apache Airflow 說明文件中的 Airflow 概念教學課程,進一步瞭解 Airflow 概念、物件及其用途。
如要改用 Google Cloud 控制台,請參閱「在 Managed Service for Apache Airflow 中執行 Apache Airflow DAG」。
如要使用 Terraform 建立環境,請參閱「建立環境 (Terraform)」。
事前準備
- 登入 Google Cloud 帳戶。如果您是 Google Cloud新手,歡迎 建立帳戶,親自評估產品在實際工作環境中的成效。新客戶還能獲得價值 $300 美元的免費抵免額,可用於執行、測試及部署工作負載。
-
安裝 Google Cloud CLI。
-
若您採用的是外部識別資訊提供者 (IdP),請先使用聯合身分登入 gcloud CLI。
-
執行下列指令,初始化 gcloud CLI:
gcloud init -
選取或建立專案所需的角色
- 選取專案:選取專案時,不需要具備特定 IAM 角色,只要您已獲授角色,即可選取任何專案。
-
建立專案:如要建立專案,您需要「專案建立者」角色 (
roles/resourcemanager.projectCreator),其中包含resourcemanager.projects.create權限。瞭解如何授予角色。
-
建立 Google Cloud 專案:
gcloud projects create PROJECT_ID
將
PROJECT_ID替換為您要建立的 Google Cloud 專案名稱。 -
選取您建立的 Google Cloud 專案:
gcloud config set project PROJECT_ID
將
PROJECT_ID替換為 Google Cloud 專案名稱。
-
安裝 Google Cloud CLI。
-
若您採用的是外部識別資訊提供者 (IdP),請先使用聯合身分登入 gcloud CLI。
-
執行下列指令,初始化 gcloud CLI:
gcloud init -
選取或建立專案所需的角色
- 選取專案:選取專案時,不需要具備特定 IAM 角色,只要您已獲授角色,即可選取任何專案。
-
建立專案:如要建立專案,您需要「專案建立者」角色 (
roles/resourcemanager.projectCreator),其中包含resourcemanager.projects.create權限。瞭解如何授予角色。
-
建立 Google Cloud 專案:
gcloud projects create PROJECT_ID
將
PROJECT_ID替換為您要建立的 Google Cloud 專案名稱。 -
選取您建立的 Google Cloud 專案:
gcloud config set project PROJECT_ID
將
PROJECT_ID替換為 Google Cloud 專案名稱。
-
啟用 Managed Airflow API:
啟用 API 時所需的角色
如要啟用 API,您需要具備服務使用情形管理員 IAM 角色 (
roles/serviceusage.serviceUsageAdmin),其中包含serviceusage.services.enable權限。瞭解如何授予角色。gcloud services enable composer.googleapis.com
-
如要取得完成本快速入門導覽課程所需的權限,請要求管理員在專案中授予您下列 IAM 角色:
-
如要指派 IAM 角色和權限:
專案 IAM 管理員 (
roles/resourcemanager.projectIamAdmin) -
如要為 Managed Airflow 環境建立服務帳戶:
「建立服務帳戶」 (
roles/iam.serviceAccountCreator) -
如要查看、建立及管理 Managed Airflow 環境:
- 環境與 Storage 物件管理員 (
roles/composer.environmentAndStorageObjectAdmin) - 服務帳戶使用者 (
roles/iam.serviceAccountUser)
- 環境與 Storage 物件管理員 (
-
如要查看記錄,請按以下步驟操作:
「記錄檢視器」 (
roles/logging.viewer)
如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和組織的存取權」。
-
如要指派 IAM 角色和權限:
專案 IAM 管理員 (
建立環境的服務帳戶
建立環境時,請指定服務帳戶。這個服務帳戶稱為環境的服務帳戶。您的環境會使用這個服務帳戶執行大部分作業。
環境的服務帳戶不是使用者帳戶。服務帳戶是一種特殊帳戶,由應用程式或虛擬機器 (VM) 執行個體使用,而非由人員使用。
如要為環境建立服務帳戶,請按照下列步驟操作:
按照 Identity and Access Management 說明文件中的說明,建立新的服務帳戶。
請按照 Identity and Access Management 說明文件中的說明授予角色。必要角色為 Composer 工作者 (
composer.worker)。
建立環境
如果這是專案中的第一個環境,請在環境的服務帳戶中新增 Managed Airflow 服務代理程式帳戶做為主體,並授予 roles/composer.ServiceAgentV2Ext 角色。
根據預設,環境會使用預設的 Compute Engine 服務帳戶,以下範例說明如何為該帳戶新增這項權限。
# Get current project's project number
PROJECT_NUMBER=$(gcloud projects list \
--filter="$(gcloud config get-value project)" \
--format="value(PROJECT_NUMBER)" \
--limit=1)
# Add the Cloud Composer v2 API Service Agent Extension role
gcloud iam service-accounts add-iam-policy-binding \
ENVIRONMENT_SERVICE_ACCOUNT \
--member serviceAccount:service-$PROJECT_NUMBER@cloudcomposer-accounts.iam.gserviceaccount.com \
--role roles/composer.ServiceAgentV2Ext
將 ENVIRONMENT_SERVICE_ACCOUNT 替換為您先前建立的環境服務帳戶。
在 us-central1
區域中,使用最新版 Managed Airflow (第 2 代) 版本建立名為 example-environment 的新環境。
gcloud composer environments create example-environment \
--location us-central1 \
--image-version composer-2.17.2-airflow-2.11.1 \
--service-account ENVIRONMENT_SERVICE_ACCOUNT
將 ENVIRONMENT_SERVICE_ACCOUNT 替換為您先前建立的環境服務帳戶。
建立 DAG 檔案
Airflow DAG 是經過整理的工作集合,可供您排程及執行。DAG 定義於標準 Python 檔案中。
本指南使用 quickstart.py 檔案中定義的範例 Airflow DAG。這個檔案中的 Python 程式碼會執行下列作業:
- 建立 DAG
composer_sample_dag。這個 DAG 每天都會執行。 - 執行一項工作,
print_dag_run_conf。這項工作會使用 Bash 運算子,列印 DAG 執行的設定。
將 quickstart.py 檔案副本儲存到本機電腦:
將 DAG 檔案上傳至環境的值區
每個 Managed Airflow 環境都有相關聯的 Cloud Storage bucket。Managed Airflow 中的 Airflow 只會為這個 bucket 中 /dags 資料夾內的 DAG 排程。
如要排定 DAG 的時間,請從本機電腦將 quickstart.py 上傳至環境的 /dags 資料夾:
如要使用 Google Cloud CLI 上傳 quickstart.py,請在 quickstart.py 檔案所在的資料夾中執行下列指令:
gcloud composer environments storage dags import \
--environment example-environment --location us-central1 \
--source quickstart.py
查看 DAG
上傳 DAG 檔案後,Airflow 會執行下列作業:
- 剖析您上傳的 DAG 檔案。DAG 可能需要幾分鐘才會在 Airflow 中顯示。
- 將 DAG 新增至可用 DAG 清單。
- 根據您在 DAG 檔案中提供的時間表執行 DAG。
在 DAG UI 中查看 DAG,確認 DAG 處理作業未發生錯誤,且 DAG 可在 Airflow 中使用。DAG UI 是 Managed Airflow 介面,可在 Google Cloud 控制台中查看 DAG 資訊。Managed Airflow 也提供 Airflow UI 的存取權,這是原生 Airflow 網頁介面。
請等待約五分鐘,讓 Airflow 處理先前上傳的 DAG 檔案,並完成第一個 DAG 執行作業 (稍後會說明)。
在 Google Cloud CLI 中執行下列指令,這個指令會執行
dags listAirflow CLI 指令,列出環境中的 DAG。gcloud composer environments run example-environment \ --location us-central1 \ dags list確認指令輸出內容中列出
composer_quickstartDAG。輸出內容範例:
Executing the command: [ airflow dags list ]... Command has been started. execution_id=d49074c7-bbeb-4ee7-9b26-23124a5bafcb Use ctrl-c to interrupt the command dag_id | filepath | owner | paused ====================+=======================+==================+======= airflow_monitoring | airflow_monitoring.py | airflow | False composer_quickstart | dag-quickstart-af2.py | Composer Example | False
查看 DAG 執行作業詳細資料
DAG 的單一執行作業稱為「DAG 執行作業」。由於 DAG 檔案中的開始日期設為昨天,Airflow 會立即執行範例 DAG 的 DAG 執行作業。這樣一來,Airflow 就會趕上指定 DAG 的排程。
範例 DAG 包含一項工作 print_dag_run_conf,會在控制台中執行 echo 指令。這項指令會輸出 DAG 的中繼資訊 (DAG 執行的數值 ID)。
在 Google Cloud CLI 中執行下列指令,這個指令會列出 composer_quickstart DAG 的 DAG 執行作業:
gcloud composer environments run example-environment \
--location us-central1 \
dags list-runs -- --dag-id composer_quickstart
輸出內容範例:
dag_id | run_id | state | execution_date | start_date | end_date
====================+=============================================+=========+==================================+==================================+=================================
composer_quickstart | scheduled__2024-02-17T15:38:38.969307+00:00 | success | 2024-02-17T15:38:38.969307+00:00 | 2024-02-18T15:38:39.526707+00:00 | 2024-02-18T15:38:42.020661+00:00
Airflow CLI 不提供查看工作記錄的指令。您可以使用其他方法查看 Airflow 工作記錄:Managed Airflow DAG UI、Airflow UI 或 Cloud Logging。本指南說明如何查詢 Cloud Logging,取得特定 DAG 執行的記錄。
在 Google Cloud CLI 中執行下列指令,這項指令會從 Cloud Logging 讀取 composer_quickstart DAG 特定 DAG 執行作業的記錄。--format 引數會格式化輸出內容,只顯示記錄訊息的文字。
gcloud logging read \
--format="value(textPayload)" \
--order=asc \
"resource.type=cloud_composer_environment \
resource.labels.location=us-central1 \
resource.labels.environment_name=example-environment \
labels.workflow=composer_quickstart \
(labels.\"execution-date\"=\"RUN_ID\")"
更改項目:
- 將
RUN_ID替換為先前執行的tasks states-for-dag-run指令輸出內容中的run_id值。例如:2024-02-17T15:38:38.969307+00:00。
輸出內容範例:
...
Starting attempt 1 of 2
Executing <Task(BashOperator): print_dag_run_conf> on 2024-02-17
15:38:38.969307+00:00
Started process 22544 to run task
...
Running command: ['/usr/bin/bash', '-c', 'echo 115746']
Output:
115746
...
Command exited with return code 0
Marking task as SUCCESS. dag_id=composer_quickstart,
task_id=print_dag_run_conf, execution_date=20240217T153838,
start_date=20240218T153841, end_date=20240218T153841
Task exited with return code 0
0 downstream tasks scheduled from follow-on schedule check
清除所用資源
為了避免系統向您的 Google Cloud 帳戶收取本頁面所用資源的費用,請刪除含有這些資源的 Google Cloud 專案。
刪除本教學課程中使用的資源:
刪除 Managed Airflow 環境:
前往 Google Cloud 控制台的「Environments」(環境) 頁面。
選取
example-environment,然後按一下「刪除」。等待環境刪除完成。
刪除環境的值區。刪除 Managed Airflow 環境不會一併刪除其值區。
在 Google Cloud 控制台,依序前往「Storage」(儲存空間) >「Browser」(瀏覽器) 頁面。
選取環境的值區,然後按一下「Delete」(刪除)。舉例來說,這個 bucket 可以命名為
us-central1-example-environ-c1616fe8-bucket。
後續步驟