访问 Airflow REST API

Managed Airflow(第 3 代) | Managed Airflow(第 2 代) | Managed Airflow(旧版第 1 代)

Apache Airflow 具有一个 REST API 接口,可用于执行诸如以下任务:获取有关 DAG 运行和任务的信息、更新 DAG、获取 Airflow 配置、添加和删除连接以及列出用户。

如需查看将 Airflow REST API 与 Cloud Run 函数搭配使用的示例,请参阅 使用 Cloud Run 函数触发 DAG

Airflow REST API 版本

配置 Airflow REST API

Airflow 2 中默认已启用 Airflow REST API v1。

Managed Airflow 使用自己的 API 身份验证后端

授权以 Airflow 提供的标准方式工作。当新用户通过 API 授权时,该用户的账号默认获得 Op 角色。

您可以通过替换以下 Airflow 配置 选项来启用或停用 Airflow REST API 或更改默认用户 角色

部分 备注
api (Airflow 2.2.5 及更早版本)auth_backend
(Airflow 2.3.0 及更高版本)auth_backends
airflow.composer.api.backend.composer_auth 如需停用稳定的 REST API,请更改为 airflow.api.auth.backend.deny_all
api composer_auth_user_registration_role Op 您可以指定任何其他角色

使用 Web 服务器访问权限控制允许对 Airflow REST API 进行 API 调用

根据用于调用 Airflow REST API 的方法,调用者方法可以使用 IPv4 或 IPv6 地址 。请务必使用 Web 服务器访问权限控制取消屏蔽对 Airflow REST API 的 IP 流量。

如果您不确定对 Airflow REST API 的调用将从哪些 IP 地址发送,请使用默认配置选项 All IP addresses have access (default)

调用 Airflow REST API

本部分提供了一个示例 Python 脚本,该脚本可用于使用 Airflow REST API 来触发 DAG。

在该脚本中,请设置以下变量:

  • dag_id:DAG 的名称,如 DAG 源文件中所定义。
  • dag_config:DAG 运行的配置。
  • web_server_url:您的 Airflow Web 服务器网址。 格式为 https://<web-server-id>.composer.googleusercontent.com

  • (Airflow 3) logical_date:DAG 运行的逻辑日期

from __future__ import annotations

from typing import Any

import google.auth
from google.auth.transport.requests import AuthorizedSession
import requests


# Following GCP best practices, these credentials should be
# constructed at start-up time and used throughout
# https://cloud.google.com/apis/docs/client-libraries-best-practices
AUTH_SCOPE = "https://www.googleapis.com/auth/cloud-platform"
CREDENTIALS, _ = google.auth.default(scopes=[AUTH_SCOPE])


def make_composer2_web_server_request(
    url: str, method: str = "GET", **kwargs: Any
) -> google.auth.transport.Response:
    """
    Make a request to Cloud Composer 2 environment's web server.
    Args:
      url: The URL to fetch.
      method: The request method to use ('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT',
        'PATCH', 'DELETE')
      **kwargs: Any of the parameters defined for the request function:
                https://github.com/requests/requests/blob/master/requests/api.py
                  If no timeout is provided, it is set to 90 by default.
    """

    authed_session = AuthorizedSession(CREDENTIALS)

    # Set the default timeout, if missing
    if "timeout" not in kwargs:
        kwargs["timeout"] = 90

    return authed_session.request(method, url, **kwargs)


def trigger_dag(web_server_url: str, dag_id: str, data: dict) -> str:
    """
    Make a request to trigger a dag using the stable Airflow 2 REST API.
    https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html

    Args:
      web_server_url: The URL of the Airflow 2 web server.
      dag_id: The DAG ID.
      data: Additional configuration parameters for the DAG run (json).
    """

    endpoint = f"api/v1/dags/{dag_id}/dagRuns"
    request_url = f"{web_server_url}/{endpoint}"
    json_data = {"conf": data}

    response = make_composer2_web_server_request(
        request_url, method="POST", json=json_data
    )

    if response.status_code == 403:
        raise requests.HTTPError(
            "You do not have a permission to perform this operation. "
            "Check Airflow RBAC roles for your account."
            f"{response.headers} / {response.text}"
        )
    elif response.status_code != 200:
        response.raise_for_status()
    else:
        return response.text




if __name__ == "__main__":
    # TODO(developer): replace with your values
    dag_id = "your-dag-id"  # Replace with the ID of the DAG that you want to run.
    dag_config = {
        "your-key": "your-value"
    }  # Replace with configuration parameters for the DAG run.
    # Replace web_server_url with the Airflow web server address. To obtain this
    # URL, run the following command for your environment:
    # gcloud composer environments describe example-environment \
    #  --location=your-composer-region \
    #  --format="value(config.airflowUri)"
    web_server_url = (
        "https://example-airflow-ui-url-dot-us-central1.composer.googleusercontent.com"
    )

    response_text = trigger_dag(
        web_server_url=web_server_url, dag_id=dag_id, data=dag_config
    )

    print(response_text)

使用服务帐号访问 Airflow REST API

在 Airflow 2.3.0 之前的 Airflow 版本中,Airflow 数据库将电子邮件字段的长度限制为 64 个字符。服务账号有时包含长度超过 64 个字符的电子邮件地址。以常规方式无法为此类服务账号创建 Airflow 用户。如果此类服务帐号没有 Airflow 用户,则访问 Airflow REST API 会导致 HTTP 错误 401 和 403。

要解决此问题,您可以为服务帐号预先注册一个 Airflow 用户。为此,请使用 accounts.google.com:NUMERIC_USER_ID 作为用户名,并使用任意唯一的字符串作为电子邮件。

  1. 如需获取服务账号的 NUMERIC_USER_ID,请运行以下命令:

    gcloud iam service-accounts describe \
      SA_NAME@PROJECT_ID.iam.gserviceaccount.com \
      --format="value(oauth2ClientId)"
    

    替换:

    • SA_NAME 替换为服务帐号名称。
    • PROJECT_ID 替换为 项目 ID
  2. 为该服务帐号创建具有 Op 角色的 Airflow 用户:

    Airflow 界面

    1. 转到 Airflow 界面

    2. 依次前往安全性 > 列出用户 ,然后点击 添加新记录。您的 Airflow 用户必须具有 Admin 角色才能打开此页面。

    3. 指定 accounts.google.com:NUMERIC_USER_ID 作为用户 名。将 NUMERIC_USER_ID 替换为上一步中获得的用户 ID。

    4. 将唯一标识符指定为电子邮件。您可以使用任意的唯一字符串。

    5. 指定用户的角色。例如 Op

    6. 确保选中是否处于活跃状态? 复选框。

    7. 指定用户的名字和姓氏。您可以使用任何字符串。

    8. 点击保存

    gcloud

    运行以下 Airflow CLI 命令:

    gcloud composer environments run ENVIRONMENT_NAME \
        --location LOCATION \
        users create -- \
        -u accounts.google.com:NUMERIC_USER_ID \
        -e UNIQUE_ID  \
        -f UNIQUE_ID \
        -l - -r Op --use-random-password
    

    替换:

    • ENVIRONMENT_NAME 替换为环境的名称。
    • LOCATION 替换为环境所在的区域。
    • NUMERIC_USER_ID 替换为在上一步中获取的用户 ID。
    • UNIQUE_ID 替换为 Airflow 用户的标识符。您可以使用任意的唯一字符串。
  3. 为服务帐号创建 Airflow 用户后,以该服务帐号身份进行身份验证的调用者会被识别为预注册用户,并登录到 Airflow。

扩缩 Airflow REST API 组件

Airflow REST API 和 Airflow 界面端点在 Airflow Web 服务器中运行。如果您大量使用 REST API,请考虑根据预期负载增加可供 Airflow Web 服务器使用的 CPU 和内存量。

后续步骤