Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Apache Airflow 具有 REST API 介面,可用來執行各種工作,例如取得 DAG 執行和工作相關資訊、更新 DAG、取得 Airflow 設定、新增及刪除連線,以及列出使用者。
如需搭配使用 Airflow REST API 與 Cloud Run 函式的範例,請參閱「使用 Cloud Run 函式觸發 DAG」。
Airflow REST API 版本
- Airflow 3 使用 Airflow REST API v2。
- Airflow 2 使用 Airflow REST API v1。
設定 Airflow REST API
Airflow 3
Airflow 3 使用 Airflow REST API 第 2 版。
Cloud Composer 使用自己的 API 驗證後端。授權作業的運作方式與 Airflow 3 提供的標準方式相同。新使用者透過 API 授權時,帳戶預設會取得Op角色。
Airflow 3 中的 Airflow REST API 一律會啟用,無法停用。如要變更預設使用者角色,請覆寫下列 Airflow 設定選項:
| 區段 | 鍵 | 值 | 附註 |
|---|---|---|---|
api
|
composer_auth_user_registration_role
|
Op
|
您可以指定任何其他角色。 |
Airflow 2
Airflow 2 預設會啟用 Airflow REST API v1。
Cloud Composer 使用自己的 API 驗證後端。授權方式與 Airflow 提供的標準方式相同。新使用者透過 API 授權時,帳戶預設會取得 Op 角色。
您可以覆寫下列 Airflow 設定選項,啟用或停用 Airflow REST API,或變更預設使用者角色:
| 區段 | 鍵 | 值 | 附註 |
|---|---|---|---|
api
|
auth_backends
|
airflow.composer.api.backend.composer_auth
|
如要停用 Airflow REST API,請將值變更為 airflow.api.auth.backend.deny_all |
api
|
composer_auth_user_registration_role
|
Op
|
您可以指定任何其他角色。 |
使用網路伺服器存取權控管,允許對 Airflow REST API 進行 API 呼叫
視呼叫 Airflow REST API 的方法而定,呼叫端方法可以使用 IPv4 或 IPv6 位址。請記得使用網路伺服器存取權控管,解除封鎖 Airflow REST API 的 IP 流量。
如果您不確定呼叫 Airflow REST API 時會使用哪些 IP 位址,請使用預設設定選項 All IP addresses have access (default)。
呼叫 Airflow REST API
本節提供 Python 指令碼範例,可用於透過 Airflow REST API 觸發 DAG。
在指令碼中設定下列變數:
dag_id:DAG 的名稱,如 DAG 來源檔案中所定義。dag_config:DAG 執行作業的設定。web_server_url:Airflow 網路伺服器網址。 格式為https://<web-server-id>.composer.googleusercontent.com。(Airflow 3)
logical_date:DAG 執行的邏輯日期。
Airflow 3
"""
Trigger a DAG in Cloud Composer 3 environment with Airflow 3 using the Airflow REST API v2.
"""
from __future__ import annotations
from typing import Any
import google.auth
from google.auth.transport.requests import AuthorizedSession
import requests
# Following best practices, these credentials should be
# constructed at start-up time and used throughout
# https://cloud.google.com/apis/docs/client-libraries-best-practices
AUTH_SCOPE = "https://www.googleapis.com/auth/cloud-platform"
CREDENTIALS, _ = google.auth.default(scopes=[AUTH_SCOPE])
def make_composer3_web_server_request(
url: str, method: str = "GET", **kwargs: Any
) -> google.auth.transport.Response:
"""
Make a request to Cloud Composer 3 environment's web server with Airflow 3.
Args:
url: The URL to fetch.
method: The request method to use ('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT',
'PATCH', 'DELETE')
**kwargs: Any of the parameters defined for the request function:
https://github.com/requests/requests/blob/master/requests/api.py
If no timeout is provided, it is set to 90 by default.
"""
authed_session = AuthorizedSession(CREDENTIALS)
# Set the default timeout, if missing
if "timeout" not in kwargs:
kwargs["timeout"] = 90
return authed_session.request(method, url, **kwargs)
def trigger_dag(web_server_url: str, dag_id: str, data: dict, logical_date: str) -> str:
"""
Make a request to trigger a DAG using the Airflow REST API v2.
https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html
Args:
web_server_url: The URL of the Airflow 3 web server.
dag_id: The DAG ID.
data: Additional configuration parameters for the DAG run (json).
"""
endpoint = f"api/v2/dags/{dag_id}/dagRuns"
request_url = f"{web_server_url}/{endpoint}"
json_data = {"conf": data, "logical_date": logical_date}
response = make_composer3_web_server_request(
request_url, method="POST", json=json_data
)
if response.status_code == 403:
raise requests.HTTPError(
"You do not have a permission to perform this operation. "
"Check Airflow RBAC roles for your account."
f"{response.headers} / {response.text}"
)
elif response.status_code != 200:
response.raise_for_status()
else:
return response.text
if __name__ == "__main__":
# TODO(developer): replace with your values
dag_id = "airflow_monitoring" # Replace with the ID of the DAG that you want to run.
dag_config = {
"your-key": "your-value"
} # Replace with configuration parameters for the DAG run.
# Replace web_server_url with the Airflow web server address. To obtain this
# URL, run the following command for your environment:
# gcloud composer environments describe example-environment \
# --location=your-composer-region \
# --format="value(config.airflowUri)"
logical_date = "2025-01-01T14:00:00Z" # Replace with the data interval for which to run the DAG
web_server_url = (
"https://example-airflow-ui-url-dot-us-central1.composer-staging.googleusercontent.com"
)
response_text = trigger_dag(
web_server_url=web_server_url, dag_id=dag_id, data=dag_config, logical_date=logical_date
)
print(response_text)
Airflow 2
使用服務帳戶存取 Airflow REST API
在 2.3.0 之前的 Airflow 版本中,Airflow 資料庫將電子郵件欄位的長度限制為 64 個字元。服務帳戶的電子郵件地址有時會超過 64 個字元。您無法以一般方式為這類服務帳戶建立 Airflow 使用者。如果這類服務帳戶沒有 Airflow 使用者,存取 Airflow REST API 時會發生 HTTP 錯誤 401 和 403。
解決方法是預先為服務帳戶註冊 Airflow 使用者。如要這麼做,請使用 accounts.google.com:NUMERIC_USER_ID 做為使用者名稱,並使用任何不重複的字串做為電子郵件。
如要取得服務帳戶的
NUMERIC_USER_ID,請執行下列指令:gcloud iam service-accounts describe \ SA_NAME@PROJECT_ID.iam.gserviceaccount.com \ --format="value(oauth2ClientId)"取代:
SA_NAME改為服務帳戶名稱。- 將
PROJECT_ID替換為專案 ID。
為服務帳戶建立具有
Op角色的 Airflow 使用者:Airflow UI
依序前往「安全性」>「列出使用者」,然後按一下「新增記錄」。您的 Airflow 使用者必須具備
Admin角色,才能開啟這個頁面。指定
accounts.google.com:NUMERIC_USER_ID做為使用者名稱。將NUMERIC_USER_ID替換為上一個步驟中取得的使用者 ID。將電子郵件指定為專屬 ID。你可以使用任何不重複的字串。
指定使用者的角色。例如
Op。確認已選取「Is Active?」(是否啟用?) 核取方塊。
指定使用者的名字和姓氏。您可以使用任何字串。
按一下 [儲存]。
gcloud
執行下列 Airflow CLI 指令:
gcloud composer environments run ENVIRONMENT_NAME \ --location LOCATION \ users create -- \ -u accounts.google.com:NUMERIC_USER_ID \ -e UNIQUE_ID \ -f UNIQUE_ID \ -l - -r Op --use-random-password取代:
- 將
ENVIRONMENT_NAME替換為環境的名稱。 LOCATION改成環境所在的地區。NUMERIC_USER_ID,並換成上一個步驟中取得的使用者 ID。UNIQUE_ID,並提供 Airflow 使用者的 ID。你可以使用任何不重複的字串。
為服務帳戶建立 Airflow 使用者後,經過服務帳戶驗證的呼叫端會被視為預先註冊的使用者,並登入 Airflow。
擴充 Airflow REST API 元件
Airflow REST API 和 Airflow UI 端點會在 Airflow 網路伺服器中執行。如果大量使用 REST API,請根據預期負載,考慮增加 Airflow 網頁伺服器可用的 CPU 和記憶體。