Managed Airflow(第 3 代) | Managed Airflow(第 2 代) | Managed Airflow(旧版第 1 代)
Apache Airflow 具有命令行界面 (CLI),可用于执行任务,例如触发和管理 DAG、获取有关 DAG 运行和任务的信息、添加和删除连接和用户。
支持的 Airflow CLI 命令
Airflow 使用 Airflow CLI 语法,该语法在 Airflow 文档中进行了说明。
如需查看支持的 Airflow CLI 命令的完整列表,请参阅 gcloud composer environments run command's
参考文档。
准备工作
如需通过 Google Cloud CLI 执行 Airflow CLI 命令,请执行以下操作:
您的 Google 账号必须具有 权限 才能使用 Google Cloud CLI 和 Managed Airflow 以及运行 Airflow CLI 命令。
通过 Google Cloud CLI 执行的 Airflow CLI 命令会消耗
通过 Google Cloud CLI 触发 DAG 可能会导致您的环境达到配额上限,从而导致 Airflow CLI 命令无法再执行。environments.executeAirflowCommand配额。在 2.4.0 之前的 Managed Airflow 版本中,您需要 访问环境集群的控制平面 才能运行 Airflow CLI 命令。
使用 gcloud CLI 运行 Airflow CLI 命令
如需在您的环境中运行 Airflow CLI 命令,请使用 gcloud CLI:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
SUBCOMMAND \
-- SUBCOMMAND_ARGUMENTS
替换以下内容:
ENVIRONMENT_NAME:您的环境的名称。LOCATION:环境所在的区域。SUBCOMMAND:支持的 Airflow CLI 命令之一。- 将
SUBCOMMAND_ARGUMENTS替换为 Airflow CLI 命令的参数。
子命令参数分隔符
使用 -- 分隔指定的 Airflow CLI 命令的参数:
- 将复合 CLI 命令指定为子命令。
- 在
--分隔符后面指定复合命令的任何参数,作为子命令参数。
示例:
gcloud composer environments run example-environment \
dags list -- --output=json
默认位置
大多数 gcloud composer 命令需要位置信息。您可以使用
位置和 --location 标志指定位置,也可以
设置默认位置。
例如,如需在 Managed Airflow 环境中触发名为 sample_quickstart 且 ID 为 5077 的 DAG,请使用以下命令:
gcloud composer environments run example-environment \
--location us-central1 dags trigger -- sample_quickstart \
--run-id=5077
在专用 IP 环境中运行命令
在 2.4.0 之前的 Managed Airflow 版本中:
如需在专用 IP 环境中运行 Airflow CLI 命令,请在可访问 GKE 集群的控制平面端点的机器上运行这些命令。您的选项可能会因您的 专用集群配置而有所不同。
如果在环境的集群中停用了公共端点访问权限,则无法使用 gcloud composer 命令运行 Airflow CLI。
如需能够运行 Airflow CLI 命令,请执行以下步骤:
- 在 VPC 网络中创建虚拟机
获取集群凭据。运行以下命令:
gcloud container clusters get-credentials CLUSTER_NAME \ --region REGION \ --project PROJECT \ --internal-ip
使用 kubectl 运行 Airflow 命令。例如:
kubectl exec deployment/airflow-scheduler -n COMPOSER_NAMESPACE \
--container airflow-scheduler -- airflow dags list
将 COMPOSER_NAMESPACE 替换为类似于 composer-2-0-28-airflow-2-3-394zxc12411 的命名空间。您可以在工作负载列表中找到 Managed Airflow,也可以使用 kubectl get namespaces 命令找到它。
如果在环境的集群中启用了公共端点访问权限,您还可以从其外部 IP 地址已添加到已获授权的网络的机器运行 Airflow CLI 命令。如需从您的机器启用访问权限,请将您的机器的 外部地址添加到环境的 已获授权的网络列表中。
运行 gcloud composer environments run 或 kubectl 命令时,您可能会遇到以下错误:
Get "https://<IP Address>/api?timeout=32s": dial tcp <IP Address>:443: i/o timeout"
症状:此错误消息表示您运行这些命令的计算机没有网络 连接。
解决方案:按照在
专用 IP 环境中运行命令
部分中提供的指南操作,或使用
kubectl 命令超时部分中提供的说明。
通过 Cloud Composer API 运行 Airflow CLI 命令
从 Managed Airflow 2.4.0 版开始,您可以通过 Cloud Composer API 运行 Airflow CLI 命令。
执行一项命令
构建 environments.executeAirflowCommand API 请求:
{
"environment": "projects/PROJECT_ID/locations/LOCATION/environments/ENVIRONMENT_NAME",
"command": "AIRFLOW_COMMAND",
"subcommand": "AIRFLOW_SUBCOMMAND",
"parameters": [
"SUBCOMMAND_PARAMETER"
]
}
替换以下内容:
PROJECT_ID:项目 ID。LOCATION:环境所在的区域。ENVIRONMENT_NAME:您的环境的名称。AIRFLOW_COMMAND:您要执行的 Airflow CLI 命令,例如dags。AIRFLOW_SUBCOMMAND:您要执行的 Airflow CLI 命令的子命令,例如list。- (可选)
SUBCOMMAND_PARAMETER:子命令的参数。如果您要使用多个参数,请向列表中添加更多项。
示例:
// POST https://composer.googleapis.com/v1/{environment=projects/*/locations/*/environments/*}:executeAirflowCommand
{
"environment": "projects/example-project/locations/us-central1/environments/example-environment",
"command": "dags",
"subcommand": "list",
"parameters": [
"-o json",
"--verbose"
]
}
轮询命令状态
通过 Cloud Composer API 执行 Airflow CLI 命令后,请发出
PollAirflowCommand请求并检查
exitInfo中的字段是否存在错误和状态代码,以检查
命令是否已成功完成。output 字段包含日志行。
如需获取命令执行状态并提取日志,请提供 executionId,
pod 和 podNamespace 值,这些值由 ExecuteAirflowCommandRequest 返回:
示例:
// POST https://composer.googleapis.com/v1/{environment=projects/*/locations/*/environments/*}:pollAirflowCommand
{
"executionId": "39b82312-3a19-4d21-abac-7f8f19855ce7",
"pod": "airflow-scheduler-1327d8cd68-hblpd",
"podNamespace": "composer-2-4-0-airflow-2-5-3-184dadaf",
"nextLineNumber": 1
}