在 Managed Airflow (第 2 代) 中執行 Apache Airflow DAG (Google Cloud CLI)

Managed Airflow (第 3 代) | Managed Airflow (第 2 代) | Managed Airflow (舊版第 1 代)

本快速入門指南說明如何建立 Managed Service for Apache Airflow 環境,並在 Managed Airflow (第 2 代) 中執行 Apache Airflow DAG。

事前準備

  1. 登入 Google Cloud 帳戶。如果您是 Google Cloud新手,歡迎 建立帳戶,親自評估產品在實際工作環境中的成效。新客戶還能獲得價值 $300 美元的免費抵免額,可用於執行、測試及部署工作負載。
  2. 安裝 Google Cloud CLI。

  3. 若您採用的是外部識別資訊提供者 (IdP),請先使用聯合身分登入 gcloud CLI

  4. 執行下列指令,初始化 gcloud CLI:

    gcloud init
  5. 建立或選取 Google Cloud 專案

    選取或建立專案所需的角色

    • 選取專案:選取專案時,不需要具備特定 IAM 角色,只要您已獲授角色,即可選取任何專案。
    • 建立專案:如要建立專案,您需要「專案建立者」角色 (roles/resourcemanager.projectCreator),其中包含 resourcemanager.projects.create 權限。瞭解如何授予角色
    • 建立 Google Cloud 專案:

      gcloud projects create PROJECT_ID

      PROJECT_ID 替換為您要建立的 Google Cloud 專案名稱。

    • 選取您建立的 Google Cloud 專案:

      gcloud config set project PROJECT_ID

      PROJECT_ID 替換為 Google Cloud 專案名稱。

  6. 確認專案已啟用計費功能 Google Cloud

  7. 安裝 Google Cloud CLI。

  8. 若您採用的是外部識別資訊提供者 (IdP),請先使用聯合身分登入 gcloud CLI

  9. 執行下列指令,初始化 gcloud CLI:

    gcloud init
  10. 建立或選取 Google Cloud 專案

    選取或建立專案所需的角色

    • 選取專案:選取專案時,不需要具備特定 IAM 角色,只要您已獲授角色,即可選取任何專案。
    • 建立專案:如要建立專案,您需要「專案建立者」角色 (roles/resourcemanager.projectCreator),其中包含 resourcemanager.projects.create 權限。瞭解如何授予角色
    • 建立 Google Cloud 專案:

      gcloud projects create PROJECT_ID

      PROJECT_ID 替換為您要建立的 Google Cloud 專案名稱。

    • 選取您建立的 Google Cloud 專案:

      gcloud config set project PROJECT_ID

      PROJECT_ID 替換為 Google Cloud 專案名稱。

  11. 確認專案已啟用計費功能 Google Cloud

  12. 啟用 Managed Airflow API:

    啟用 API 時所需的角色

    如要啟用 API,您需要具備服務使用情形管理員 IAM 角色 (roles/serviceusage.serviceUsageAdmin),其中包含 serviceusage.services.enable 權限。瞭解如何授予角色

    gcloud services enable composer.googleapis.com
  13. 如要取得完成本快速入門導覽課程所需的權限,請要求管理員在專案中授予您下列 IAM 角色:

    • 如要指派 IAM 角色和權限: 專案 IAM 管理員 (roles/resourcemanager.projectIamAdmin)
    • 如要為 Managed Airflow 環境建立服務帳戶: 「建立服務帳戶」 (roles/iam.serviceAccountCreator)
    • 如要查看、建立及管理 Managed Airflow 環境:
    • 如要查看記錄,請按以下步驟操作: 「記錄檢視器」 (roles/logging.viewer)

    如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和組織的存取權」。

    您或許也能透過自訂角色或其他預先定義的角色,取得必要權限。

建立環境的服務帳戶

建立環境時,請指定服務帳戶。這個服務帳戶稱為環境的服務帳戶。您的環境會使用這個服務帳戶執行大部分作業。

環境的服務帳戶不是使用者帳戶。服務帳戶是一種特殊帳戶,由應用程式或虛擬機器 (VM) 執行個體使用,而非由人員使用。

如要為環境建立服務帳戶,請按照下列步驟操作:

  1. 按照 Identity and Access Management 說明文件中的說明,建立新的服務帳戶

  2. 請按照 Identity and Access Management 說明文件中的說明授予角色。必要角色為 Composer 工作者 (composer.worker)。

建立環境

如果這是專案中的第一個環境,請在環境的服務帳戶中新增 Managed Airflow 服務代理程式帳戶做為主體,並授予 roles/composer.ServiceAgentV2Ext 角色。

根據預設,環境會使用預設的 Compute Engine 服務帳戶,以下範例說明如何為該帳戶新增這項權限。

# Get current project's project number
PROJECT_NUMBER=$(gcloud projects list \
  --filter="$(gcloud config get-value project)" \
  --format="value(PROJECT_NUMBER)" \
  --limit=1)

# Add the Cloud Composer v2 API Service Agent Extension role
gcloud iam service-accounts add-iam-policy-binding \
    ENVIRONMENT_SERVICE_ACCOUNT \
    --member serviceAccount:service-$PROJECT_NUMBER@cloudcomposer-accounts.iam.gserviceaccount.com \
    --role roles/composer.ServiceAgentV2Ext

ENVIRONMENT_SERVICE_ACCOUNT 替換為您先前建立的環境服務帳戶。

us-central1 區域中,使用最新版 Managed Airflow (第 2 代) 版本建立名為 example-environment 的新環境。

gcloud composer environments create example-environment \
    --location us-central1 \
    --image-version composer-2.17.2-airflow-2.11.1 \
    --service-account ENVIRONMENT_SERVICE_ACCOUNT

ENVIRONMENT_SERVICE_ACCOUNT 替換為您先前建立的環境服務帳戶。

建立 DAG 檔案

Airflow DAG 是經過整理的工作集合,可供您排程及執行。DAG 定義於標準 Python 檔案中。

本指南使用 quickstart.py 檔案中定義的範例 Airflow DAG。這個檔案中的 Python 程式碼會執行下列作業:

  1. 建立 DAG composer_sample_dag。這個 DAG 每天都會執行。
  2. 執行一項工作,print_dag_run_conf。這項工作會使用 Bash 運算子,列印 DAG 執行的設定。

quickstart.py 檔案副本儲存到本機電腦:

import datetime

from airflow import models
from airflow.operators import bash

# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    "owner": "Composer Example",
    "depends_on_past": False,
    "email": [""],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "start_date": YESTERDAY,
}

with models.DAG(
    "composer_quickstart",
    catchup=False,
    default_args=default_args,
    schedule_interval=datetime.timedelta(days=1),
) as dag:
    # Print the dag_run id from the Airflow logs
    print_dag_run_conf = bash.BashOperator(
        task_id="print_dag_run_conf", bash_command="echo {{ dag_run.id }}"
    )

將 DAG 檔案上傳至環境的值區

每個 Managed Airflow 環境都有相關聯的 Cloud Storage bucket。Managed Airflow 中的 Airflow 只會為這個 bucket 中 /dags 資料夾內的 DAG 排程。

如要排定 DAG 的時間,請從本機電腦將 quickstart.py 上傳至環境的 /dags 資料夾:

如要使用 Google Cloud CLI 上傳 quickstart.py,請在 quickstart.py 檔案所在的資料夾中執行下列指令:

gcloud composer environments storage dags import \
--environment example-environment --location us-central1 \
--source quickstart.py

查看 DAG

上傳 DAG 檔案後,Airflow 會執行下列作業:

  1. 剖析您上傳的 DAG 檔案。DAG 可能需要幾分鐘才會在 Airflow 中顯示。
  2. 將 DAG 新增至可用 DAG 清單。
  3. 根據您在 DAG 檔案中提供的時間表執行 DAG。

在 DAG UI 中查看 DAG,確認 DAG 處理作業未發生錯誤,且 DAG 可在 Airflow 中使用。DAG UI 是 Managed Airflow 介面,可在 Google Cloud 控制台中查看 DAG 資訊。Managed Airflow 也提供 Airflow UI 的存取權,這是原生 Airflow 網頁介面。

  1. 請等待約五分鐘,讓 Airflow 處理先前上傳的 DAG 檔案,並完成第一個 DAG 執行作業 (稍後會說明)。

  2. 在 Google Cloud CLI 中執行下列指令,這個指令會執行 dags list Airflow CLI 指令,列出環境中的 DAG。

    gcloud composer environments run example-environment \
    --location us-central1 \
    dags list
    
  3. 確認指令輸出內容中列出 composer_quickstart DAG。

    輸出內容範例:

    Executing the command: [ airflow dags list ]...
    Command has been started. execution_id=d49074c7-bbeb-4ee7-9b26-23124a5bafcb
    Use ctrl-c to interrupt the command
    dag_id              | filepath              | owner            | paused
    ====================+=======================+==================+=======
    airflow_monitoring  | airflow_monitoring.py | airflow          | False
    composer_quickstart | dag-quickstart-af2.py | Composer Example | False
    

查看 DAG 執行作業詳細資料

DAG 的單一執行作業稱為「DAG 執行作業」。由於 DAG 檔案中的開始日期設為昨天,Airflow 會立即執行範例 DAG 的 DAG 執行作業。這樣一來,Airflow 就會趕上指定 DAG 的排程。

範例 DAG 包含一項工作 print_dag_run_conf,會在控制台中執行 echo 指令。這項指令會輸出 DAG 的中繼資訊 (DAG 執行的數值 ID)。

在 Google Cloud CLI 中執行下列指令,這個指令會列出 composer_quickstart DAG 的 DAG 執行作業:

gcloud composer environments run example-environment \
--location us-central1 \
dags list-runs -- --dag-id composer_quickstart

輸出內容範例:

dag_id              | run_id                                      | state   | execution_date                   | start_date                       | end_date
====================+=============================================+=========+==================================+==================================+=================================
composer_quickstart | scheduled__2024-02-17T15:38:38.969307+00:00 | success | 2024-02-17T15:38:38.969307+00:00 | 2024-02-18T15:38:39.526707+00:00 | 2024-02-18T15:38:42.020661+00:00

Airflow CLI 不提供查看工作記錄的指令。您可以使用其他方法查看 Airflow 工作記錄:Managed Airflow DAG UI、Airflow UI 或 Cloud Logging。本指南說明如何查詢 Cloud Logging,取得特定 DAG 執行的記錄。

在 Google Cloud CLI 中執行下列指令,這項指令會從 Cloud Logging 讀取 composer_quickstart DAG 特定 DAG 執行作業的記錄。--format 引數會格式化輸出內容,只顯示記錄訊息的文字。

gcloud logging read \
--format="value(textPayload)" \
--order=asc \
"resource.type=cloud_composer_environment \
resource.labels.location=us-central1 \
resource.labels.environment_name=example-environment \
labels.workflow=composer_quickstart \
(labels.\"execution-date\"=\"RUN_ID\")"

更改項目:

  • RUN_ID 替換為先前執行的 tasks states-for-dag-run 指令輸出內容中的 run_id 值。例如:2024-02-17T15:38:38.969307+00:00

輸出內容範例:

...

Starting attempt 1 of 2
Executing <Task(BashOperator): print_dag_run_conf> on 2024-02-17
15:38:38.969307+00:00
Started process 22544 to run task

...

Running command: ['/usr/bin/bash', '-c', 'echo 115746']
Output:
115746

...

Command exited with return code 0
Marking task as SUCCESS. dag_id=composer_quickstart,
task_id=print_dag_run_conf, execution_date=20240217T153838,
start_date=20240218T153841, end_date=20240218T153841
Task exited with return code 0
0 downstream tasks scheduled from follow-on schedule check

清除所用資源

為了避免系統向您的 Google Cloud 帳戶收取本頁面所用資源的費用,請刪除含有這些資源的 Google Cloud 專案。

刪除本教學課程中使用的資源

  1. 刪除 Managed Airflow 環境:

    1. 前往 Google Cloud 控制台的「Environments」(環境) 頁面。

      前往「環境」

    2. 選取 example-environment,然後按一下「刪除」

    3. 等待環境刪除完成。

  2. 刪除環境的值區。刪除 Managed Airflow 環境不會一併刪除其值區。

    1. 在 Google Cloud 控制台,依序前往「Storage」(儲存空間) >「Browser」(瀏覽器) 頁面。

      前往「Storage」>「Browser」

    2. 選取環境的值區,然後按一下「Delete」(刪除)。舉例來說,這個 bucket 可以命名為 us-central1-example-environ-c1616fe8-bucket

後續步驟