使用来自 AWS 的数据在 Google Cloud 中运行数据分析 DAG

Managed Airflow(第 3 代) | Managed Airflow(第 2 代) | Managed Airflow(旧版第 1 代)

本教程是对 在 中运行数据分析 DAG Google Cloud 的修改,介绍了如何将 Managed Airflow 环境连接到 Amazon Web Services,以利用存储在那里的数据。它介绍了如何使用 Managed Airflow 创建 Apache Airflow DAG。该 DAG 会联接来自 BigQuery 公共数据集和存储 在 Amazon Web Services (AWS) S3 存储桶中的 CSV 文件的数据,然后运行 Managed Service for Apache Spark 批量作业来处理联接的数据。

本教程中的 BigQuery 公共数据集是 ghcn_d,这是一个全球气候摘要的集成数据库。CSV 文件包含 1997 年至 2021 年美国节假日的日期和名称信息 。

我们希望使用 DAG 回答的问题是:“过去 25 年来,芝加哥感恩节的天气有多暖和?”

目标

  • 在默认配置中创建 Managed Airflow 环境
  • 在 AWS S3 中创建存储桶
  • 创建空的 BigQuery 数据集
  • 创建新的 Cloud Storage 存储桶
  • 创建并运行包含以下任务的 DAG:
    • 将外部数据集从 S3 加载到 Cloud Storage
    • 将外部数据集从 Cloud Storage 加载到 BigQuery
    • 在 BigQuery 中联接两个数据集
    • 运行数据分析 PySpark 作业

准备工作

在 AWS 中管理权限

  1. 创建一个 AWS 账号

  2. 按照 创建 IAM 政策 AWS 教程的“使用可视化编辑器创建政策”部分,为 AWS S3 创建自定义 IAM 政策,并使用以下配置:

    • 服务: S3
    • ListAllMyBuckets (s3:ListAllMyBuckets),用于查看 S3 存储桶
    • CreateBucket (s3:CreateBucket),用于创建存储桶
    • PutBucketOwnershipControls (s3:PutBucketOwnershipControls),用于创建存储桶
    • ListBucket (s3:ListBucket),用于授予列出 S3 存储桶中对象的权限
    • PutObject (s3:PutObject),用于将文件上传到存储桶
    • GetBucketVersioning (s3:GetBucketVersioning),用于删除存储桶中的对象
    • DeleteObject (s3:DeleteObject),用于删除存储桶中的对象
    • ListBucketVersions (s3:ListBucketVersions),用于删除存储桶
    • DeleteBucket (s3:DeleteBucket),用于删除存储桶
    • 资源:选择“存储桶”和“对象”旁边的“任意”,以向该类型的任何资源授予权限。
    • 代码: 无
    • 名称: TutorialPolicy

    如需详细了解每项配置,请参阅 Amazon S3 中支持的 操作列表

  3. TutorialPolicy IAM 政策添加到您的身份

启用 API

启用以下 API:

控制台

启用 Managed Service for Apache Spark、Managed Airflow、BigQuery、Cloud Storage API。

启用 API 所需的角色

如需启用 API,您需要拥有 Service Usage Admin IAM 角色 (roles/serviceusage.serviceUsageAdmin),该角色包含 serviceusage.services.enable 权限。了解如何授予 角色

启用 API

gcloud

启用 Managed Service for Apache Spark、Managed Airflow、BigQuery、Cloud Storage API:

启用 API 所需的角色

如需启用 API,您需要拥有 Service Usage Admin IAM 角色 (roles/serviceusage.serviceUsageAdmin),该角色包含 serviceusage.services.enable 权限。了解如何授予 角色

gcloud services enable dataproc.googleapis.com  composer.googleapis.com  bigquery.googleapis.com  storage.googleapis.com

授予权限

向您的用户账号授予以下角色和权限:

创建并准备 Managed Airflow 环境

  1. 使用默认 参数创建 Managed Airflow 环境:

  2. 向 Managed Airflow 环境中使用的服务帐号授予以下角色,以便 Airflow 工作器成功运行 DAG 任务:

    • BigQuery User (roles/bigquery.user)
    • BigQuery Data Owner (roles/bigquery.dataOwner)
    • Service Account User (roles/iam.serviceAccountUser)
    • Dataproc Editor (roles/dataproc.editor)
    • Dataproc Worker (roles/dataproc.worker)

在 中创建和修改相关资源 Google Cloud

  1. 在 Managed Airflow 环境中安装 apache-airflow-providers-amazon PyPI 软件包

  2. 创建空的 BigQuery 数据集 使用以下参数:

    • 名称: holiday_weather
    • 区域US
  3. 创建新的 Cloud Storage 存储桶US 多区域中。

  4. 运行以下命令,在您想要运行 Managed Service for Apache Spark 的区域中的默认子网上启用专用 Google 访问通道,以满足 网络要求。我们建议您使用与 Managed Airflow 环境相同的区域。

    gcloud compute networks subnets update default \
        --region DATAPROC_SERVERLESS_REGION \
        --enable-private-ip-google-access
    

在 AWS 中创建相关资源

在首选区域中 使用默认设置创建 S3 存储桶。

从 Managed Airflow 连接到 AWS

  1. 获取 AWS 访问密钥 ID 和密钥
  2. 使用 Airflow 界面添加 AWS S3 连接

    1. 前往管理 > 连接
    2. 使用以下配置创建新连接:

      • 连接 IDaws_s3_connection
      • 连接类型Amazon S3
      • 额外字段(或额外字段 JSON){"aws_access_key_id":"your_aws_access_key_id", "aws_secret_access_key": "your_aws_secret_access_key"}

使用 Managed Service for Apache Spark 进行数据处理

本部分介绍了如何使用 Managed Service for Apache Spark 处理数据。

探索示例 PySpark 作业

以下代码是一个示例 PySpark 作业,用于将温度从摄氏度十分之一转换为摄氏度。此作业会将数据集中的温度数据转换为不同的格式。

import sys


from py4j.protocol import Py4JJavaError
from pyspark.sql import SparkSession
from pyspark.sql.functions import col


if __name__ == "__main__":
    BUCKET_NAME = sys.argv[1]
    READ_TABLE = sys.argv[2]
    WRITE_TABLE = sys.argv[3]

    # Create a SparkSession, viewable via the Spark UI
    spark = SparkSession.builder.appName("data_processing").getOrCreate()

    # Load data into dataframe if READ_TABLE exists
    try:
        df = spark.read.format("bigquery").load(READ_TABLE)
    except Py4JJavaError as e:
        raise Exception(f"Error reading {READ_TABLE}") from e

    # Convert temperature from tenths of a degree in celsius to degrees celsius
    df = df.withColumn("value", col("value") / 10)
    # Display sample of rows
    df.show(n=20)

    # Write results to GCS
    if "--dry-run" in sys.argv:
        print("Data will not be uploaded to BigQuery")
    else:
        # Set GCS temp location
        temp_path = BUCKET_NAME

        # Saving the data to BigQuery using the "indirect path" method and the spark-bigquery connector
        # Uses the "overwrite" SaveMode to ensure DAG doesn't fail when being re-run
        # See https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#save-modes
        # for other save mode options
        df.write.format("bigquery").option("temporaryGcsBucket", temp_path).mode(
            "overwrite"
        ).save(WRITE_TABLE)
        print("Data written to BigQuery")

将 PySpark 文件上传到 Cloud Storage

如需将 PySpark 文件上传到 Cloud Storage,请执行以下操作:

  1. data_analytics_process.py 保存到本地机器。

  2. 在 Google Cloud 控制台中,前往 Cloud Storage 浏览器 页面:

    前往 Cloud Storage 浏览器

  3. 点击您之前创建的存储桶的名称。

  4. 在存储桶的对象 标签页中,点击上传文件 按钮,在随即显示的对话框中选择 data_analytics_process.py,然后点击打开

将 CSV 文件上传到 AWS S3

如需上传 holidays.csv 文件,请执行以下操作:

  1. holidays.csv 保存到本地机器。
  2. 按照 AWS 指南 将文件上传到您的 存储桶。

数据分析 DAG

本部分介绍了如何配置和使用数据分析 DAG。

探索示例 DAG

该 DAG 使用多个运算符来转换和统一数据:

import datetime

from airflow import models
from airflow.providers.google.cloud.operators import dataproc
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import (
    GCSToBigQueryOperator,
)
from airflow.providers.google.cloud.transfers.s3_to_gcs import S3ToGCSOperator
from airflow.utils.task_group import TaskGroup

PROJECT_NAME = "{{var.value.gcp_project}}"
REGION = "{{var.value.gce_region}}"

# BigQuery configs
BQ_DESTINATION_DATASET_NAME = "holiday_weather"
BQ_DESTINATION_TABLE_NAME = "holidays_weather_joined"
BQ_NORMALIZED_TABLE_NAME = "holidays_weather_normalized"

# Dataproc configs
BUCKET_NAME = "{{var.value.gcs_bucket}}"
PYSPARK_JAR = "gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar"
PROCESSING_PYTHON_FILE = f"gs://{BUCKET_NAME}/data_analytics_process.py"

# S3 configs
S3_BUCKET_NAME = "{{var.value.s3_bucket}}"

BATCH_ID = "data-processing-{{ ts_nodash | lower}}"  # Dataproc serverless only allows lowercase characters
BATCH_CONFIG = {
    "pyspark_batch": {
        "jar_file_uris": [PYSPARK_JAR],
        "main_python_file_uri": PROCESSING_PYTHON_FILE,
        "args": [
            BUCKET_NAME,
            f"{BQ_DESTINATION_DATASET_NAME}.{BQ_DESTINATION_TABLE_NAME}",
            f"{BQ_DESTINATION_DATASET_NAME}.{BQ_NORMALIZED_TABLE_NAME}",
        ],
    },
    "environment_config": {
        "execution_config": {
            "service_account": "{{var.value.dataproc_service_account}}"
        }
    },
}

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()
)

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    "start_date": yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    "email_on_failure": False,
    "email_on_retry": False,
}

with models.DAG(
    "s3_to_gcs_dag",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:
    s3_to_gcs_op = S3ToGCSOperator(
        task_id="s3_to_gcs",
        bucket=S3_BUCKET_NAME,
        gcp_conn_id="google_cloud_default",
        aws_conn_id="aws_s3_connection",
        dest_gcs=f"gs://{BUCKET_NAME}",
    )

    create_batch = dataproc.DataprocCreateBatchOperator(
        task_id="create_batch",
        project_id=PROJECT_NAME,
        region=REGION,
        batch=BATCH_CONFIG,
        batch_id=BATCH_ID,
    )

    load_external_dataset = GCSToBigQueryOperator(
        task_id="run_bq_external_ingestion",
        bucket=BUCKET_NAME,
        source_objects=["holidays.csv"],
        destination_project_dataset_table=f"{BQ_DESTINATION_DATASET_NAME}.holidays",
        source_format="CSV",
        schema_fields=[
            {"name": "Date", "type": "DATE"},
            {"name": "Holiday", "type": "STRING"},
        ],
        skip_leading_rows=1,
        write_disposition="WRITE_TRUNCATE",
    )

    with TaskGroup("join_bq_datasets") as bq_join_group:
        for year in range(1997, 2022):
            BQ_DATASET_NAME = f"bigquery-public-data.ghcn_d.ghcnd_{str(year)}"
            BQ_DESTINATION_TABLE_NAME = "holidays_weather_joined"
            # Specifically query a Chicago weather station
            WEATHER_HOLIDAYS_JOIN_QUERY = f"""
            SELECT Holidays.Date, Holiday, id, element, value
            FROM `{PROJECT_NAME}.holiday_weather.holidays` AS Holidays
            JOIN (SELECT id, date, element, value FROM {BQ_DATASET_NAME} AS Table
            WHERE Table.element="TMAX" AND Table.id="USW00094846") AS Weather
            ON Holidays.Date = Weather.Date;
            """

            # For demo purposes we are using WRITE_APPEND
            # but if you run the DAG repeatedly it will continue to append
            # Your use case may be different, see the Job docs
            # https://cloud.google.com/bigquery/docs/reference/rest/v2/Job
            # for alternative values for the writeDisposition
            # or consider using partitioned tables
            # https://cloud.google.com/bigquery/docs/partitioned-tables
            bq_join_holidays_weather_data = BigQueryInsertJobOperator(
                task_id=f"bq_join_holidays_weather_data_{str(year)}",
                configuration={
                    "query": {
                        "query": WEATHER_HOLIDAYS_JOIN_QUERY,
                        "useLegacySql": False,
                        "destinationTable": {
                            "projectId": PROJECT_NAME,
                            "datasetId": BQ_DESTINATION_DATASET_NAME,
                            "tableId": BQ_DESTINATION_TABLE_NAME,
                        },
                        "writeDisposition": "WRITE_APPEND",
                    }
                },
                location="US",
            )

        s3_to_gcs_op >> load_external_dataset >> bq_join_group >> create_batch

使用 Airflow 界面添加变量

在 Airflow 中,变量是一种通用方式,用于将任意设置或配置作为简单的键值存储来存储和检索。此 DAG 使用 Airflow 变量来存储常见值。如需将它们添加到您的环境,请执行以下操作:

  1. Google Cloud 控制台访问 Airflow 界面。

  2. 前往管理 > 变量

  3. 添加以下变量:

    • s3_bucket:您之前创建的 S3 存储桶的名称。

    • gcp_project:您的项目 ID。

    • gcs_bucket:您之前创建的存储桶的名称(不带 gs:// 前缀)。

    • gce_region:您希望 Managed Service for Apache Spark 作业所在的区域,该作业满足 Managed Service for Apache Spark 网络要求。这是您之前启用专用 Google 访问通道的区域。

    • dataproc_service_account:Managed Airflow 环境的服务帐号。您可以在 Managed Airflow 环境的环境配置标签页中找到此服务账号。

将 DAG 上传到环境的存储桶

Managed Airflow 会安排位于环境存储桶的 /dags 文件夹中的 DAG。如需使用 Google Cloud 控制台上传 DAG,请执行以下操作:

  1. 在本地机器上,保存 s3togcsoperator_tutorial.py

  2. 在 Google Cloud 控制台中,前往环境 页面。

    前往“环境”

  3. 在环境列表中,点击 DAG 文件夹 列中的 DAG 链接。系统会打开您环境的 DAG 文件夹。

  4. 点击上传文件

  5. 在本地机器上选择 s3togcsoperator_tutorial.py,然后点击打开

触发 DAG

  1. 在 Managed Airflow 环境中,点击 DAG 标签页。

  2. 点击 DAG ID s3_to_gcs_dag

  3. 点击 Trigger DAG

  4. 等待大约 5 到 10 分钟,直到看到绿色对勾,表示任务已成功完成。

验证 DAG 是否成功

  1. 在 Google Cloud 控制台中,前往 BigQuery 页面。

    转到 BigQuery

  2. Explorer 面板中,点击您的项目名称。

  3. 点击 holidays_weather_joined

  4. 点击“预览”以查看结果表。请注意,值列中的数字以摄氏度十分之一为单位。

  5. 点击 holidays_weather_normalized

  6. 点击“预览”以查看结果表。请注意,值列中的数字以摄氏度为单位。

清理

删除您为本教程创建的各个资源:

后续步骤