访问 Airflow 命令行界面

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Apache Airflow 具有命令行界面 (CLI),可用于执行任务,例如触发和管理 DAG、获取有关 DAG 运行和任务的信息、添加和删除连接和用户。

支持的 Airflow CLI 命令

Airflow 使用 Airflow CLI 语法,该语法在 Airflow 文档中进行了说明。

Cloud Composer 3 支持以下 Airflow CLI 命令:

  • (Airflow 2 和 3)gcloud composer environments run 命令可为您的环境运行 Airflow CLI 命令。某些命令被屏蔽,无法运行。如需查看受支持的 Airflow CLI 命令的列表,请参阅相应命令的参考文档

  • (Airflow 3) airflowctl 命令行工具支持部分 Airflow CLI 命令,并使用 Airflow REST API 运行这些命令。Cloud Composer 支持 airflowctl 中的所有可用命令。确保用户的 Airflow 账号具有可运行特定命令的 Airflow 角色

准备工作

通过 Google Cloud CLI 执行 Airflow CLI 命令:

  • 您的 Google 账号必须具有权限才能将 Google Cloud CLI 与 Cloud Composer 搭配使用并运行 Airflow CLI 命令。

  • 通过 Google Cloud CLI 执行的 Airflow CLI 命令会消耗 environments.executeAirflowCommand 配额

通过 airflowctl 命令行工具(在 Airflow 3 中)执行 Airflow CLI 命令:

使用 gcloud CLI 运行 Airflow CLI 命令

如需在您的环境中运行 Airflow CLI 命令,请使用 gcloud CLI:

gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    SUBCOMMAND \
    -- SUBCOMMAND_ARGUMENTS

替换以下内容:

  • ENVIRONMENT_NAME:您的环境的名称。
  • LOCATION:环境所在的区域。
  • SUBCOMMAND受支持的 Airflow CLI 命令之一。
  • SUBCOMMAND_ARGUMENTS 替换为 Airflow CLI 命令的参数。

子命令实参分隔符

使用 -- 分隔指定的 Airflow CLI 命令的参数:

  • 将复合 CLI 命令指定为子命令。
  • -- 分隔符后面指定复合命令的任何参数,作为子命令参数。

示例:

gcloud composer environments run example-environment \
    dags list -- --output=json

默认位置

大多数 gcloud composer 命令需要位置信息。您可以使用 --location 标志指定位置,也可以设置默认位置

例如,如需在 Cloud Composer 环境中触发名为 sample_quickstart 且 ID 为 5077 的 DAG,请使用以下命令:

gcloud composer environments run example-environment \
    --location us-central1 dags trigger -- sample_quickstart \
    --run-id=5077

使用 airflowctl 运行 Airflow CLI 命令

airflowctl 命令行工具是 Airflow 提供的一种命令行实用程序,用于运行 Airflow CLI 命令。它支持部分 Airflow CLI 命令,并使用 Airflow REST API 运行这些命令。Cloud Composer 支持 airflowctl 中的所有可用命令。

在 Cloud Composer 中使用 airflowctl 进行授权和身份验证的方式如下:

  • 由于 airflowctl 使用 Airflow REST API,请确保与您的 Google 账号或服务账号相关联的 Airflow 用户账号具有一个 Airflow 角色,该角色具有运行命令所需的足够权限。例如,某些命令只能由具有 Admin 角色的 Airflow 用户执行。

  • airflowctl 使用的 API 令牌通过 gcloud auth 命令提供。默认情况下,令牌的有效期为 1 小时。您可以使用为此命令提供的 --lifetime 实参来更改此设置。

如需使用 airflowctl 进行身份验证,请运行以下命令:

airflowctl auth login \
  --api-url WEB_SERVER_URL \
  --api-token $(gcloud auth print-access-token)

替换以下内容:

示例:

airflowctl auth login \
  --api-url https://example-dot-us-central1.composer.googleusercontent.com \
  --api-token $(gcloud auth print-access-token)

通过 airflowctl 进行身份验证后,您可以运行 Airflow CLI 命令:

airflowctl dags list

通过 Cloud Composer API 运行 Airflow CLI 命令

在 Cloud Composer 3 中,您可以通过 Cloud Composer API 运行 Airflow CLI 命令。

执行一项命令

构建 environments.executeAirflowCommand API 请求:

{
  "environment": "projects/PROJECT_ID/locations/LOCATION/environments/ENVIRONMENT_NAME",
  "command": "AIRFLOW_COMMAND",
  "subcommand": "AIRFLOW_SUBCOMMAND",
  "parameters": [
    "SUBCOMMAND_PARAMETER"
  ]
}

替换以下内容:

  • PROJECT_ID项目 ID
  • LOCATION:环境所在的区域。
  • ENVIRONMENT_NAME:您的环境的名称。
  • AIRFLOW_COMMAND:您要执行的 Airflow CLI 命令,例如 dags
  • AIRFLOW_SUBCOMMAND:您要执行的 Airflow CLI 命令的子命令,例如 list
  • (可选)SUBCOMMAND_PARAMETER:子命令的参数。如果您想使用多个形参,请向列表中添加更多项。

示例:

// POST https://composer.googleapis.com/v1/{environment=projects/*/locations/*/environments/*}:executeAirflowCommand
{
  "environment": "projects/example-project/locations/us-central1/environments/example-environment",
  "command": "dags",
  "subcommand": "list",
  "parameters": [
    "-o json",
    "--verbose"
  ]
}

轮询命令状态

通过 Cloud Composer API 执行 Airflow CLI 命令后,请通过发出 PollAirflowCommand 请求并检查 exitInfo 中的字段是否存在错误和状态代码,来确认该命令是否已成功完成。output 字段包含日志行。

如需获取命令执行状态并提取日志,请提供 ExecuteAirflowCommandRequest 返回的 executionIdpodpodNamespace 值:

示例:

// POST https://composer.googleapis.com/v1/{environment=projects/*/locations/*/environments/*}:pollAirflowCommand
{
  "executionId": "a117da94-355d-4ad4-839e-ac39ccb0bf48",
  "pod": "airflow-webserver-66d96b858f-tn96b",
  "podNamespace": "airflow-2-10-2-build-13-226523e4",
  "nextLineNumber": 1
}

后续步骤