在 Managed Airflow(第 2 代)中运行 Apache Airflow DAG (Google Cloud CLI)
Managed Airflow(第 3 代) | Managed Airflow(第 2 代) | Managed Airflow(旧版第 1 代)
本快速入门指南将向您介绍如何在 Managed Service for Apache Airflow 环境中创建 Managed Airflow(第 2 代)环境并运行 Apache Airflow DAG。
如果您刚接触 Airflow,请参阅 Apache Airflow 文档中的 Airflow 概念教程,详细了解 Airflow 概念、对象及其 用法。
如果您想改用控制台 Google Cloud ,请参阅 在 Managed Service for Apache Airflow 中运行 Apache Airflow DAG。
如果您想使用 Terraform 创建环境,请参阅 创建环境 (Terraform)。
准备工作
- 登录您的 Google Cloud 账号。如果您刚接触 Google Cloud, 请创建一个账号,以评估我们的产品在 真实场景中的表现。新客户还可获享 $300 赠金,用于 运行、测试和部署工作负载。
-
安装 Google Cloud CLI。
-
如果您使用的是外部身份提供方 (IdP),则必须先使用联合身份登录 gcloud CLI。
-
如需初始化 gcloud CLI,请运行以下命令:
gcloud init -
选择或创建项目所需角色
- 选择项目:选择项目不需要特定的 IAM 角色,您可以选择已被授予角色的任何项目。
-
创建项目:如需创建项目,您需要拥有 Project Creator 角色
(
roles/resourcemanager.projectCreator),该角色包含resourcemanager.projects.create权限。了解如何授予 角色。
-
创建 Google Cloud 项目:
gcloud projects create PROJECT_ID
将
PROJECT_ID替换为您要创建的 Google Cloud 项目的名称。 -
选择您创建的 Google Cloud 项目:
gcloud config set project PROJECT_ID
将
PROJECT_ID替换为您的 Google Cloud 项目名称。
-
验证是否已为您的 Google Cloud 项目启用结算功能。
-
安装 Google Cloud CLI。
-
如果您使用的是外部身份提供方 (IdP),则必须先使用联合身份登录 gcloud CLI。
-
如需初始化 gcloud CLI,请运行以下命令:
gcloud init -
选择或创建项目所需角色
- 选择项目:选择项目不需要特定的 IAM 角色,您可以选择已被授予角色的任何项目。
-
创建项目:如需创建项目,您需要拥有 Project Creator 角色
(
roles/resourcemanager.projectCreator),该角色包含resourcemanager.projects.create权限。了解如何授予 角色。
-
创建 Google Cloud 项目:
gcloud projects create PROJECT_ID
将
PROJECT_ID替换为您要创建的 Google Cloud 项目的名称。 -
选择您创建的 Google Cloud 项目:
gcloud config set project PROJECT_ID
将
PROJECT_ID替换为您的 Google Cloud 项目名称。
-
验证是否已为您的 Google Cloud 项目启用结算功能。
-
启用 Managed Airflow API:
启用 API 所需的角色
如需启用 API,您需要拥有 Service Usage Admin IAM 角色 (
roles/serviceusage.serviceUsageAdmin),该角色包含serviceusage.services.enable权限。了解如何授予 角色。gcloud services enable composer.googleapis.com
-
如需获得完成本快速入门所需的权限,请让您的管理员为您授予项目的以下 IAM 角色:
-
分配 IAM 角色和权限:
Project IAM Admin (
roles/resourcemanager.projectIamAdmin) -
为 Managed Airflow 环境创建服务帐号:
Create Service Accounts (
roles/iam.serviceAccountCreator) -
查看、创建和管理 Managed Airflow 环境:
- Environment and Storage Object Administrator (
roles/composer.environmentAndStorageObjectAdmin) - Service Account User (
roles/iam.serviceAccountUser)
- Environment and Storage Object Administrator (
-
查看日志:
Logs Viewer (
roles/logging.viewer)
如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限。
-
分配 IAM 角色和权限:
Project IAM Admin (
创建环境的服务帐号
创建环境时,您需要指定服务帐号。此服务 账号称为“环境的服务账号”。您的环境使用此服务帐号执行大多数操作。
您的环境的服务帐号不是用户账号。服务帐号是由应用或虚拟机 (VM) 实例(而非个人)使用的特殊账号。
如需为您的环境创建服务帐号,请执行以下操作:
按照 Identity and Access Management 文档中的说明向其授予角色。所需角色为 Composer Worker (
composer.worker)。
创建环境
如果这是您项目中的第一个环境,请将 Managed Airflow Service Agent 账号作为新正文添加到环境的服务帐号中,并向其授予 roles/composer.ServiceAgentV2Ext 角色。
默认情况下,您的环境使用 默认 Compute Engine 服务账号,以下 示例展示了如何向其添加此权限。
# Get current project's project number
PROJECT_NUMBER=$(gcloud projects list \
--filter="$(gcloud config get-value project)" \
--format="value(PROJECT_NUMBER)" \
--limit=1)
# Add the Cloud Composer v2 API Service Agent Extension role
gcloud iam service-accounts add-iam-policy-binding \
ENVIRONMENT_SERVICE_ACCOUNT \
--member serviceAccount:service-$PROJECT_NUMBER@cloudcomposer-accounts.iam.gserviceaccount.com \
--role roles/composer.ServiceAgentV2Ext
将 ENVIRONMENT_SERVICE_ACCOUNT 替换为您之前创建的环境的服务帐号。
在 us-central1
区域中创建一个名为 example-environment 的新环境,该环境使用最新版本的 Managed Airflow(第 2 代)version。
gcloud composer environments create example-environment \
--location us-central1 \
--image-version composer-2.17.3-airflow-2.11.1 \
--service-account ENVIRONMENT_SERVICE_ACCOUNT
将 ENVIRONMENT_SERVICE_ACCOUNT 替换为您之前创建的环境的服务帐号。
创建 DAG 文件
Airflow DAG 是您要安排和运行的有序任务的集合 。DAG 在标准 Python 文件中定义。
本指南使用 quickstart.py 文件中定义的示例 Airflow DAG。
此文件中的 Python 代码执行以下操作:
- 创建一个 DAG
composer_sample_dag。此 DAG 每天运行一次。 - 执行一项任务
print_dag_run_conf。该任务使用 bash 运算符输出 DAG 运行的配置。
在本地机器上保存 quickstart.py 文件的副本:
将 DAG 文件上传到环境的存储桶
每个 Managed Airflow 环境都有一个与之关联的 Cloud Storage 存储桶。Managed Airflow 中的 Airflow 仅调度位于此存储桶的 /dags 文件夹中的 DAG。
如需安排 DAG,请将 quickstart.py 从本地机器上传到您的环境的 /dags 文件夹:
如需使用 Google Cloud CLI 上传 quickstart.py,请在 quickstart.py 文件所在的文件夹中运行以下命令:
gcloud composer environments storage dags import \
--environment example-environment --location us-central1 \
--source quickstart.py
查看 DAG
上传 DAG 文件后,Airflow 会执行以下操作:
- 解析您上传的 DAG 文件。Airflow 可能需要几分钟才能使用 DAG。
- 将 DAG 添加到可用 DAG 列表中。
- 根据您在 DAG 文件中提供的时间表执行 DAG。
通过在 DAG 界面中查看 DAG,检查 DAG 是否已处理完毕且无错误,以及是否可在 Airflow 中使用。DAG 界面是 Managed Airflow 界面,用于在 控制台中查看 Google Cloud DAG 信息。Managed Airflow 还提供 对 Airflow 界面的访问权限,后者是原生 Airflow 网页 界面。
等待大约 5 分钟,以便 Airflow 处理您之前上传的 DAG 文件,并完成第一次 DAG 运行(稍后会对此进行说明)。
在 Google Cloud CLI 中运行以下命令。此命令会执行
dags listAirflow CLI 命令,该命令会列出您的 环境中的 DAG。gcloud composer environments run example-environment \ --location us-central1 \ dags list检查
composer_quickstartDAG 是否列在命令的输出中。输出示例:
Executing the command: [ airflow dags list ]... Command has been started. execution_id=d49074c7-bbeb-4ee7-9b26-23124a5bafcb Use ctrl-c to interrupt the command dag_id | filepath | owner | paused ====================+=======================+==================+======= airflow_monitoring | airflow_monitoring.py | airflow | False composer_quickstart | dag-quickstart-af2.py | Composer Example | False
查看 DAG 运行详情
DAG 的单次执行称为“DAG 运行”。 Airflow 会立即为示例 DAG 执行 DAG 运行,因为 DAG 文件中的开始日期设置为昨天。这样,Airflow 就会赶上指定 DAG 的时间表。
示例 DAG 包含一项任务 print_dag_run_conf,该任务会在控制台中运行 echo 命令。此命令会输出有关 DAG 的元信息(DAG 运行的数字标识符)。
在 Google Cloud CLI 中运行以下命令。此命令会列出 composer_quickstart DAG 的 DAG 运行:
gcloud composer environments run example-environment \
--location us-central1 \
dags list-runs -- --dag-id composer_quickstart
输出示例:
dag_id | run_id | state | execution_date | start_date | end_date
====================+=============================================+=========+==================================+==================================+=================================
composer_quickstart | scheduled__2024-02-17T15:38:38.969307+00:00 | success | 2024-02-17T15:38:38.969307+00:00 | 2024-02-18T15:38:39.526707+00:00 | 2024-02-18T15:38:42.020661+00:00
Airflow CLI 不提供用于查看任务日志的命令。您可以使用 其他方法查看 Airflow 任务日志: Managed Airflow DAG 界面、Airflow 界面或 Cloud Logging。本指南展示了如何查询 Cloud Logging 以获取特定 DAG 运行的日志。
在 Google Cloud CLI 中运行以下命令。此命令会从 Cloud Logging 中读取 composer_quickstart DAG 的特定 DAG 运行的日志。--format 实参会设置输出格式,以便仅显示日志消息的文本。
gcloud logging read \
--format="value(textPayload)" \
--order=asc \
"resource.type=cloud_composer_environment \
resource.labels.location=us-central1 \
resource.labels.environment_name=example-environment \
labels.workflow=composer_quickstart \
(labels.\"execution-date\"=\"RUN_ID\")"
替换:
- 将
RUN_ID替换为您之前运行的tasks states-for-dag-run命令的输出中的run_id值。例如,2024-02-17T15:38:38.969307+00:00。
输出示例:
...
Starting attempt 1 of 2
Executing <Task(BashOperator): print_dag_run_conf> on 2024-02-17
15:38:38.969307+00:00
Started process 22544 to run task
...
Running command: ['/usr/bin/bash', '-c', 'echo 115746']
Output:
115746
...
Command exited with return code 0
Marking task as SUCCESS. dag_id=composer_quickstart,
task_id=print_dag_run_conf, execution_date=20240217T153838,
start_date=20240218T153841, end_date=20240218T153841
Task exited with return code 0
0 downstream tasks scheduled from follow-on schedule check
清理
为避免因本页面中使用的资源导致您的 Google Cloud 账号产生费用,请删除包含这些资源的 Google Cloud 项目。
删除本教程中使用的资源 :
删除 Managed Airflow 环境:
在 Google Cloud 控制台中,前往环境页面。
选择
example-environment,并点击删除。等待环境删除完成。
删除环境的存储桶。删除 Managed Airflow 环境不会删除其存储桶。
在 Google Cloud 控制台中,前往 存储 > 浏览器 页面。
选择环境的存储桶,然后点击删除。例如,此存储桶可以命名为
us-central1-example-environ-c1616fe8-bucket。
后续步骤