Managed Airflow(第 3 代) | Managed Airflow(第 2 代) | Managed Airflow(旧版第 1 代)
本教程是对 在 中运行数据分析 DAG Google Cloud 的修改,介绍了如何将 Managed Airflow 环境连接到 Amazon Web Services,以利用存储在那里的数据。它介绍了如何使用 Managed Airflow 创建 Apache Airflow DAG。该 DAG 会联接来自 BigQuery 公共数据集和存储 在 Amazon Web Services (AWS) S3 存储桶中的 CSV 文件的数据,然后运行 Managed Service for Apache Spark 批量作业来处理联接的数据。
本教程中的 BigQuery 公共数据集是 ghcn_d,这是一个全球气候摘要的集成数据库。CSV 文件包含 1997 年至 2021 年美国节假日的日期和名称信息 。
我们希望使用 DAG 回答的问题是:“过去 25 年来,芝加哥感恩节的天气有多暖和?”
目标
- 在默认配置中创建 Managed Airflow 环境
- 在 AWS S3 中创建存储桶
- 创建空的 BigQuery 数据集
- 创建新的 Cloud Storage 存储桶
- 创建并运行包含以下任务的 DAG:
- 将外部数据集从 S3 加载到 Cloud Storage
- 将外部数据集从 Cloud Storage 加载到 BigQuery
- 在 BigQuery 中联接两个数据集
- 运行数据分析 PySpark 作业
准备工作
在 AWS 中管理权限
按照 创建 IAM 政策 AWS 教程的“使用可视化编辑器创建政策”部分,为 AWS S3 创建自定义 IAM 政策,并使用以下配置:
- 服务: S3
- ListAllMyBuckets (
s3:ListAllMyBuckets),用于查看 S3 存储桶 - CreateBucket (
s3:CreateBucket),用于创建存储桶 - PutBucketOwnershipControls (
s3:PutBucketOwnershipControls),用于创建存储桶 - ListBucket (
s3:ListBucket),用于授予列出 S3 存储桶中对象的权限 - PutObject (
s3:PutObject),用于将文件上传到存储桶 - GetBucketVersioning (
s3:GetBucketVersioning),用于删除存储桶中的对象 - DeleteObject (
s3:DeleteObject),用于删除存储桶中的对象 - ListBucketVersions (
s3:ListBucketVersions),用于删除存储桶 - DeleteBucket (
s3:DeleteBucket),用于删除存储桶 - 资源:选择“存储桶”和“对象”旁边的“任意”,以向该类型的任何资源授予权限。
- 代码: 无
- 名称: TutorialPolicy
如需详细了解每项配置,请参阅 Amazon S3 中支持的 操作列表。
启用 API
启用以下 API:
控制台
启用 Managed Service for Apache Spark、Managed Airflow、BigQuery、Cloud Storage API。
启用 API 所需的角色
如需启用 API,您需要拥有 Service Usage Admin IAM 角色 (roles/serviceusage.serviceUsageAdmin),该角色包含 serviceusage.services.enable 权限。了解如何授予
角色。
gcloud
启用 Managed Service for Apache Spark、Managed Airflow、BigQuery、Cloud Storage API:
启用 API 所需的角色
如需启用 API,您需要拥有 Service Usage Admin IAM
角色 (roles/serviceusage.serviceUsageAdmin),该角色包含
serviceusage.services.enable 权限。了解如何授予
角色。
gcloud services enable dataproc.googleapis.comcomposer.googleapis.com bigquery.googleapis.com storage.googleapis.com
授予权限
向您的用户账号授予以下角色和权限:
授予用于 管理 Managed Airflow 环境和环境存储分区的角色。
授予 BigQuery Data Owner (
roles/bigquery.dataOwner) 角色以 创建 BigQuery 数据集。授予 Storage Admin (
roles/storage.admin) 角色以 创建 Cloud Storage 存储桶。
创建并准备 Managed Airflow 环境
使用默认 参数创建 Managed Airflow 环境:
- 选择美国境内的区域。
- 选择最新的 Managed Airflow 版本。
向 Managed Airflow 环境中使用的服务帐号授予以下角色,以便 Airflow 工作器成功运行 DAG 任务:
- BigQuery User (
roles/bigquery.user) - BigQuery Data Owner (
roles/bigquery.dataOwner) - Service Account User (
roles/iam.serviceAccountUser) - Dataproc Editor (
roles/dataproc.editor) - Dataproc Worker (
roles/dataproc.worker)
- BigQuery User (
在 中创建和修改相关资源 Google Cloud
在 Managed Airflow 环境中安装
apache-airflow-providers-amazonPyPI 软件包。创建空的 BigQuery 数据集 使用以下参数:
- 名称:
holiday_weather - 区域:
US
- 名称:
创建新的 Cloud Storage 存储桶 在
US多区域中。运行以下命令,在您想要运行 Managed Service for Apache Spark 的区域中的默认子网上启用专用 Google 访问通道,以满足 网络要求。我们建议您使用与 Managed Airflow 环境相同的区域。
gcloud compute networks subnets update default \ --region DATAPROC_SERVERLESS_REGION \ --enable-private-ip-google-access
在 AWS 中创建相关资源
在首选区域中 使用默认设置创建 S3 存储桶。
从 Managed Airflow 连接到 AWS
- 获取 AWS 访问密钥 ID 和密钥
使用 Airflow 界面添加 AWS S3 连接:
- 前往管理 > 连接。
使用以下配置创建新连接:
- 连接 ID:
aws_s3_connection - 连接类型:
Amazon S3 - 额外字段(或额外字段 JSON):
{"aws_access_key_id":"your_aws_access_key_id", "aws_secret_access_key": "your_aws_secret_access_key"}
- 连接 ID:
使用 Managed Service for Apache Spark 进行数据处理
本部分介绍了如何使用 Managed Service for Apache Spark 处理数据。
探索示例 PySpark 作业
以下代码是一个示例 PySpark 作业,用于将温度从摄氏度十分之一转换为摄氏度。此作业会将数据集中的温度数据转换为不同的格式。
将 PySpark 文件上传到 Cloud Storage
如需将 PySpark 文件上传到 Cloud Storage,请执行以下操作:
将 data_analytics_process.py 保存到本地机器。
在 Google Cloud 控制台中,前往 Cloud Storage 浏览器 页面:
点击您之前创建的存储桶的名称。
在存储桶的对象 标签页中,点击上传文件 按钮,在随即显示的对话框中选择
data_analytics_process.py,然后点击打开 。
将 CSV 文件上传到 AWS S3
如需上传 holidays.csv 文件,请执行以下操作:
- 将
holidays.csv保存到本地机器。 - 按照 AWS 指南 将文件上传到您的 存储桶。
数据分析 DAG
本部分介绍了如何配置和使用数据分析 DAG。
探索示例 DAG
该 DAG 使用多个运算符来转换和统一数据:
S3ToGCSOperator会将 holidays.csv 文件从您的 AWS S3 存储桶 转移到您的 Cloud Storage 存储桶。GCSToBigQueryOperator会将 holidays.csv 文件从 Cloud Storage 注入到您之前创建的 BigQueryholidays_weather数据集中的新表。DataprocCreateBatchOperator使用 Managed Service for Apache Spark 创建并 运行 PySpark 批量作业。The
BigQueryInsertJobOperator会将数据 从 holidays.csv 中“Date”列的数据与天气数据 从 BigQuery 公共数据集 ghcn_d 联接。BigQueryInsertJobOperator任务是 使用 for 循环动态生成的,这些任务位于TaskGroup中,以便在 Airflow 界面的图表 视图中获得更好的可读性。
使用 Airflow 界面添加变量
在 Airflow 中,变量是一种通用方式,用于将任意设置或配置作为简单的键值存储来存储和检索。此 DAG 使用 Airflow 变量来存储常见值。如需将它们添加到您的环境,请执行以下操作:
从 Google Cloud 控制台访问 Airflow 界面。
前往管理 > 变量。
添加以下变量:
s3_bucket:您之前创建的 S3 存储桶的名称。gcp_project:您的项目 ID。gcs_bucket:您之前创建的存储桶的名称(不带gs://前缀)。gce_region:您希望 Managed Service for Apache Spark 作业所在的区域,该作业满足 Managed Service for Apache Spark 网络要求。这是您之前启用专用 Google 访问通道的区域。dataproc_service_account:Managed Airflow 环境的服务帐号。您可以在 Managed Airflow 环境的环境配置标签页中找到此服务账号。
将 DAG 上传到环境的存储桶
Managed Airflow 会安排位于环境存储桶的 /dags 文件夹中的 DAG。如需使用
Google Cloud 控制台上传 DAG,请执行以下操作:
在本地机器上,保存 s3togcsoperator_tutorial.py。
在 Google Cloud 控制台中,前往环境 页面。
在环境列表中,点击 DAG 文件夹 列中的 DAG 链接。系统会打开您环境的 DAG 文件夹。
点击上传文件 。
在本地机器上选择
s3togcsoperator_tutorial.py,然后点击打开 。
触发 DAG
在 Managed Airflow 环境中,点击 DAG 标签页。
点击 DAG ID
s3_to_gcs_dag。点击 Trigger DAG 。
等待大约 5 到 10 分钟,直到看到绿色对勾,表示任务已成功完成。
验证 DAG 是否成功
在 Google Cloud 控制台中,前往 BigQuery 页面。
在Explorer 面板中,点击您的项目名称。
点击
holidays_weather_joined。点击“预览”以查看结果表。请注意,值列中的数字以摄氏度十分之一为单位。
点击
holidays_weather_normalized。点击“预览”以查看结果表。请注意,值列中的数字以摄氏度为单位。
清理
删除您为本教程创建的各个资源:
删除 Cloud Storage 存储桶,该存储分区是您 为本教程创建的。
删除 Managed Airflow 环境,包括 手动删除环境的存储桶。
后续步骤
- 在 Google Cloud中运行数据分析 DAG。
- 在 Azure 中运行数据分析 DAG。