访问 Airflow 命令行界面

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Apache Airflow 具有命令行界面 (CLI),可用于执行任务,例如触发和管理 DAG、获取有关 DAG 运行和任务的信息、添加和删除连接和用户。

支持的 Airflow CLI 命令

Airflow 使用 Airflow CLI 语法,该语法在 Airflow 文档中进行了说明。

如需查看支持的 Airflow CLI 命令的完整列表,请参阅 gcloud composer environments run 命令的参考文档。

准备工作

通过 Google Cloud CLI 执行 Airflow CLI 命令:

  • 您的 Google 账号必须具有权限才能将 Google Cloud CLI 与 Cloud Composer 搭配使用并运行 Airflow CLI 命令。

  • 通过 Google Cloud CLI 执行的 Airflow CLI 命令会消耗 environments.executeAirflowCommand 配额

  • 在 2.4.0 之前的 Cloud Composer 版本中,您需要访问环境集群的控制平面才能运行 Airflow CLI 命令。

使用 gcloud CLI 运行 Airflow CLI 命令

如需在您的环境中运行 Airflow CLI 命令,请使用 gcloud CLI:

gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    SUBCOMMAND \
    -- SUBCOMMAND_ARGUMENTS

替换以下内容:

  • ENVIRONMENT_NAME:您的环境的名称。
  • LOCATION:环境所在的区域。
  • SUBCOMMAND受支持的 Airflow CLI 命令之一。
  • SUBCOMMAND_ARGUMENTS 替换为 Airflow CLI 命令的参数。

子命令实参分隔符

使用 -- 分隔指定的 Airflow CLI 命令的参数:

  • 将复合 CLI 命令指定为子命令。
  • -- 分隔符后面指定复合命令的任何参数,作为子命令参数。

示例:

gcloud composer environments run example-environment \
    dags list -- --output=json

默认位置

大多数 gcloud composer 命令需要位置信息。您可以使用 --location 标志指定位置,也可以设置默认位置

例如,如需在 Cloud Composer 环境中触发名为 sample_quickstart 且 ID 为 5077 的 DAG,请使用以下命令:

gcloud composer environments run example-environment \
    --location us-central1 dags trigger -- sample_quickstart \
    --run-id=5077

在专用 IP 环境中运行命令

在 2.4.0 之前的 Cloud Composer 版本中:

如需在专用 IP 环境中运行 Airflow CLI 命令,请在可访问 GKE 集群控制平面端点的机器上运行这些命令。您的选项可能会因专用集群配置而有所不同。

如果环境的集群中停用了公共端点访问权限,则无法使用 gcloud composer 命令运行 Airflow CLI。如需能够运行 Airflow CLI 命令,请执行以下步骤:

  1. 在 VPC 网络中创建虚拟机
  2. 获取集群凭据。运行以下命令:

    gcloud container clusters get-credentials CLUSTER_NAME \
      --region REGION \
      --project PROJECT \
      --internal-ip
    

使用 kubectl 运行 Airflow 命令。例如:

kubectl exec deployment/airflow-scheduler -n COMPOSER_NAMESPACE \
  --container airflow-scheduler -- airflow dags list

COMPOSER_NAMESPACE 替换为类似于 composer-2-0-28-airflow-2-3-394zxc12411 的命名空间。您可以在工作负载列表中找到 Cloud Composer,也可以使用 kubectl get namespaces 命令找到它。

如果环境集群中启用了公共端点访问权限,您还可以从具有已添加到授权网络的外部 IP 地址的机器运行 Airflow CLI 命令。如需允许从您的机器进行访问,请将您机器的外部地址添加到环境的授权网络列表中。

运行 gcloud composer environments runkubectl 命令时,您可能会遇到以下错误:

Get "https://<IP Address>/api?timeout=32s": dial tcp <IP Address>:443: i/o timeout"

症状:此错误消息表示运行这些命令的计算机没有网络连接。

解决方案:请按照在专用 IP 环境中运行命令部分中的准则操作,或使用kubectl 命令超时部分中的说明。

通过 Cloud Composer API 运行 Airflow CLI 命令

从 Cloud Composer 版本 2.4.0 开始,您可以通过 Cloud Composer API 运行 Airflow CLI 命令。

执行一项命令

构建 environments.executeAirflowCommand API 请求:

{
  "environment": "projects/PROJECT_ID/locations/LOCATION/environments/ENVIRONMENT_NAME",
  "command": "AIRFLOW_COMMAND",
  "subcommand": "AIRFLOW_SUBCOMMAND",
  "parameters": [
    "SUBCOMMAND_PARAMETER"
  ]
}

替换以下内容:

  • PROJECT_ID项目 ID
  • LOCATION:环境所在的区域。
  • ENVIRONMENT_NAME:您的环境的名称。
  • AIRFLOW_COMMAND:您要执行的 Airflow CLI 命令,例如 dags
  • AIRFLOW_SUBCOMMAND:您要执行的 Airflow CLI 命令的子命令,例如 list
  • (可选)SUBCOMMAND_PARAMETER:子命令的参数。如果您想使用多个形参,请向列表中添加更多项。

示例:

// POST https://composer.googleapis.com/v1/{environment=projects/*/locations/*/environments/*}:executeAirflowCommand
{
  "environment": "projects/example-project/locations/us-central1/environments/example-environment",
  "command": "dags",
  "subcommand": "list",
  "parameters": [
    "-o json",
    "--verbose"
  ]
}

轮询命令状态

通过 Cloud Composer API 执行 Airflow CLI 命令后,请通过发出 PollAirflowCommand 请求并检查 exitInfo 中的字段是否存在错误和状态代码,来确认该命令是否已成功完成。output 字段包含日志行。

如需获取命令执行状态并提取日志,请提供 ExecuteAirflowCommandRequest 返回的 executionIdpodpodNamespace 值:

示例:

// POST https://composer.googleapis.com/v1/{environment=projects/*/locations/*/environments/*}:pollAirflowCommand
{
  "executionId": "39b82312-3a19-4d21-abac-7f8f19855ce7",
  "pod": "airflow-scheduler-1327d8cd68-hblpd",
  "podNamespace": "composer-2-4-0-airflow-2-5-3-184dadaf",
  "nextLineNumber": 1
}

后续步骤