Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Apache Airflow 具有一个 REST API 接口,可用于执行诸如以下任务:获取有关 DAG 运行和任务的信息、更新 DAG、获取 Airflow 配置、添加和删除连接以及列出用户。
如需查看将 Airflow REST API 与 Cloud Run 函数搭配使用的示例,请参阅使用 Cloud Run 函数触发 DAG。
Airflow REST API 版本
- Airflow 3 使用 Airflow REST API v2。
- Airflow 2 使用 Airflow REST API v1。
配置 Airflow REST API
Airflow 3
Airflow 3 使用 Airflow REST API v2。
Cloud Composer 使用自己的 API 身份验证后端。授权以 Airflow 3 提供的标准方式工作。当新用户通过 API 授权时,该用户的账号默认获得 Op 角色。
Airflow 3 中的 Airflow REST API 始终处于启用状态,无法停用。您可以通过替换以下 Airflow 配置选项来更改默认用户角色:
| 部分 | 键 | 值 | 备注 |
|---|---|---|---|
api
|
composer_auth_user_registration_role
|
Op
|
您可以指定任何其他角色。 |
Airflow 2
Airflow 2 中默认已启用 Airflow REST API v1。
Cloud Composer 使用自己的 API 身份验证后端。授权以 Airflow 提供的标准方式工作。当新用户通过 API 授权时,该用户的账号默认获得 Op 角色。
您可以通过替换以下 Airflow 配置选项来启用或停用 Airflow REST API 或更改默认用户角色:
| 部分 | 键 | 值 | 备注 |
|---|---|---|---|
api
|
auth_backends
|
airflow.composer.api.backend.composer_auth
|
如需停用 Airflow REST API,请更改为 airflow.api.auth.backend.deny_all |
api
|
composer_auth_user_registration_role
|
Op
|
您可以指定任何其他角色。 |
允许使用 Web 服务器访问权限控制对 Airflow REST API 进行 API 调用
根据用于调用 Airflow REST API 的方法,调用方方法可以使用 IPv4 或 IPv6 地址。请记得使用Web 服务器访问控制来解除对 Airflow REST API 的 IP 流量的屏蔽。
如果您不确定对 Airflow REST API 的调用将从哪些 IP 地址发送,请使用默认配置选项 All IP addresses have access (default)。
调用 Airflow REST API
本部分提供了一个示例 Python 脚本,该脚本可用于使用 Airflow REST API 来触发 DAG。
在脚本中,设置以下变量:
dag_id:DAG 的名称,如 DAG 源文件中所定义。dag_config:DAG 运行的配置。web_server_url:您的 Airflow Web 服务器网址。 格式为https://<web-server-id>.composer.googleusercontent.com。(Airflow 3)
logical_date:DAG 运行的逻辑日期。
Airflow 3
"""
Trigger a DAG in Cloud Composer 3 environment with Airflow 3 using the Airflow REST API v2.
"""
from __future__ import annotations
from typing import Any
import google.auth
from google.auth.transport.requests import AuthorizedSession
import requests
# Following best practices, these credentials should be
# constructed at start-up time and used throughout
# https://cloud.google.com/apis/docs/client-libraries-best-practices
AUTH_SCOPE = "https://www.googleapis.com/auth/cloud-platform"
CREDENTIALS, _ = google.auth.default(scopes=[AUTH_SCOPE])
def make_composer3_web_server_request(
url: str, method: str = "GET", **kwargs: Any
) -> google.auth.transport.Response:
"""
Make a request to Cloud Composer 3 environment's web server with Airflow 3.
Args:
url: The URL to fetch.
method: The request method to use ('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT',
'PATCH', 'DELETE')
**kwargs: Any of the parameters defined for the request function:
https://github.com/requests/requests/blob/master/requests/api.py
If no timeout is provided, it is set to 90 by default.
"""
authed_session = AuthorizedSession(CREDENTIALS)
# Set the default timeout, if missing
if "timeout" not in kwargs:
kwargs["timeout"] = 90
return authed_session.request(method, url, **kwargs)
def trigger_dag(web_server_url: str, dag_id: str, data: dict, logical_date: str) -> str:
"""
Make a request to trigger a DAG using the Airflow REST API v2.
https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html
Args:
web_server_url: The URL of the Airflow 3 web server.
dag_id: The DAG ID.
data: Additional configuration parameters for the DAG run (json).
"""
endpoint = f"api/v2/dags/{dag_id}/dagRuns"
request_url = f"{web_server_url}/{endpoint}"
json_data = {"conf": data, "logical_date": logical_date}
response = make_composer3_web_server_request(
request_url, method="POST", json=json_data
)
if response.status_code == 403:
raise requests.HTTPError(
"You do not have a permission to perform this operation. "
"Check Airflow RBAC roles for your account."
f"{response.headers} / {response.text}"
)
elif response.status_code != 200:
response.raise_for_status()
else:
return response.text
if __name__ == "__main__":
# TODO(developer): replace with your values
dag_id = "airflow_monitoring" # Replace with the ID of the DAG that you want to run.
dag_config = {
"your-key": "your-value"
} # Replace with configuration parameters for the DAG run.
# Replace web_server_url with the Airflow web server address. To obtain this
# URL, run the following command for your environment:
# gcloud composer environments describe example-environment \
# --location=your-composer-region \
# --format="value(config.airflowUri)"
logical_date = "2025-01-01T14:00:00Z" # Replace with the data interval for which to run the DAG
web_server_url = (
"https://example-airflow-ui-url-dot-us-central1.composer-staging.googleusercontent.com"
)
response_text = trigger_dag(
web_server_url=web_server_url, dag_id=dag_id, data=dag_config, logical_date=logical_date
)
print(response_text)
Airflow 2
使用服务账号访问 Airflow REST API
在 2.3.0 之前的 Airflow 版本中,Airflow 数据库将电子邮件字段的长度限制为 64 个字符。服务账号有时包含长度超过 64 个字符的电子邮件地址。以常规方式无法为此类服务账号创建 Airflow 用户。如果此类服务账号没有 Airflow 用户,则访问 Airflow REST API 会导致 HTTP 错误 401 和 403。
要解决此问题,您可以为服务账号预先注册一个 Airflow 用户。为此,请使用 accounts.google.com:NUMERIC_USER_ID 作为用户名,并使用任意唯一的字符串作为电子邮件。
如需获取服务账号的
NUMERIC_USER_ID,请运行以下命令:gcloud iam service-accounts describe \ SA_NAME@PROJECT_ID.iam.gserviceaccount.com \ --format="value(oauth2ClientId)"替换:
- 将
SA_NAME替换为服务账号名称。 - 将
PROJECT_ID替换为项目 ID。
- 将
为该服务账号创建具有
Op角色的 Airflow 用户:Airflow 界面
依次前往安全 > 列出用户,然后点击添加新记录。您的 Airflow 用户必须具有
Admin角色才能打开此页面。指定
accounts.google.com:NUMERIC_USER_ID作为用户名。将NUMERIC_USER_ID替换为上一步中获得的用户 ID。将唯一标识符指定为电子邮件。您可以使用任意的唯一字符串。
指定用户的角色。例如
Op。确保选中有效?复选框。
指定用户的名字和姓氏。您可以使用任何字符串。
点击保存。
gcloud
运行以下 Airflow CLI 命令:
gcloud composer environments run ENVIRONMENT_NAME \ --location LOCATION \ users create -- \ -u accounts.google.com:NUMERIC_USER_ID \ -e UNIQUE_ID \ -f UNIQUE_ID \ -l - -r Op --use-random-password替换:
ENVIRONMENT_NAME替换为环境的名称。LOCATION替换为环境所在的区域。NUMERIC_USER_ID替换为在上一步中获取的用户 ID。UNIQUE_ID替换为 Airflow 用户的标识符。您可以使用任意的唯一字符串。
为服务账号创建 Airflow 用户后,以该服务账号身份进行身份验证的调用者会被识别为预注册用户,并登录到 Airflow。
扩缩 Airflow REST API 组件
Airflow REST API 和 Airflow 界面端点在 Airflow 网络服务器中运行。如果您大量使用 REST API,请根据预期负载考虑增加 Airflow Web 服务器可用的 CPU 和内存量。