Google Cloud でデータ分析 DAG を実行する

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

このチュートリアルでは、Cloud Composer を使用して Apache Airflow DAG を作成する方法について説明します。DAG は、BigQuery 一般公開データセットと Cloud Storage バケットに保存されている CSV ファイルのデータを結合し、Google Cloud Apache Spark 向け Serverless バッチジョブを実行して結合されたデータを処理します。

このチュートリアルの BigQuery 一般公開データセットは、世界中の気候統合データベースである ghcn_d です。CSV ファイルには、1997 年から 2021 年までの米国の祝日の日付と名前に関する情報が含まれています。

DAG を使用して答えを得たい質問は、「この 25 年間で感謝祭のシカゴはどのくらい温かかったか」というものです。

目標

  • デフォルト構成で Cloud Composer 環境を作成する
  • 空の BigQuery データセットを作成する
  • Cloud Storage バケットを新規作成する
  • 次のタスクを含む DAG を作成、実行します。
    • 外部データセットを Cloud Storage から BigQuery に読み込む
    • BigQuery で 2 つのデータセットを結合する
    • データ分析 PySpark ジョブを実行する

準備

API を有効にする

次の API を有効にします。

コンソール

Dataproc、Cloud Composer、BigQuery、Cloud Storage API を有効にします。

API を有効にするために必要なロール

API を有効にするには、serviceusage.services.enable 権限を含む Service Usage 管理者 IAM ロール(roles/serviceusage.serviceUsageAdmin)が必要です。詳しくは、ロールを付与する方法をご覧ください。

API を有効にする

gcloud

Dataproc、Cloud Composer、BigQuery、Cloud Storage API を有効にします。

API を有効にするために必要なロール

API を有効にするには、serviceusage.services.enable 権限を含む Service Usage 管理者 IAM ロール(roles/serviceusage.serviceUsageAdmin)が必要です。ロールを付与する方法を確認する

gcloud services enable dataproc.googleapis.com  composer.googleapis.com  bigquery.googleapis.com  storage.googleapis.com

権限を付与する

ユーザー アカウントに次のロールと権限を付与します。

Cloud Composer 環境の作成と準備

  1. デフォルト パラメータを使用して Cloud Composer 環境を作成します。

  2. Airflow ワーカーが DAG タスクを正常に実行するために、Cloud Composer 環境で使用されるサービス アカウントに次のロールを付与します。

    • BigQuery ユーザーroles/bigquery.user
    • BigQuery データオーナーroles/bigquery.dataOwner
    • サービス アカウント ユーザーroles/iam.serviceAccountUser
    • Dataproc 編集者roles/dataproc.editor
    • Dataproc ワーカーroles/dataproc.worker
  1. 次のパラメータを使用して空の BigQuery データセットを作成します。

    • 名前: holiday_weather
    • リージョン: US
  2. US マルチリージョンで新しい Cloud Storage バケットを作成します。

  3. 次のコマンドを実行して、ネットワーキングの要件を満たすためにGoogle Cloud Apache Spark 用サーバーレスを実行するリージョン内のデフォルト サブネットで限定公開の Google アクセスを有効にします。Cloud Composer 環境と同じリージョンを使用することをおすすめします。

    gcloud compute networks subnets update default \
        --region DATAPROC_SERVERLESS_REGION \
        --enable-private-ip-google-access
    

Google Cloud Serverless for Apache Spark を使用したデータ処理

サンプル PySpark のジョブを確認する

次のコードは、温度を摂氏 10 分の 1 の度数から摂氏度数に変換する PySpark ジョブの例です。このジョブは、データセットの温度データを別の形式に変換します。

import sys


from py4j.protocol import Py4JJavaError
from pyspark.sql import SparkSession
from pyspark.sql.functions import col


if __name__ == "__main__":
    BUCKET_NAME = sys.argv[1]
    READ_TABLE = sys.argv[2]
    WRITE_TABLE = sys.argv[3]

    # Create a SparkSession, viewable via the Spark UI
    spark = SparkSession.builder.appName("data_processing").getOrCreate()

    # Load data into dataframe if READ_TABLE exists
    try:
        df = spark.read.format("bigquery").load(READ_TABLE)
    except Py4JJavaError as e:
        raise Exception(f"Error reading {READ_TABLE}") from e

    # Convert temperature from tenths of a degree in celsius to degrees celsius
    df = df.withColumn("value", col("value") / 10)
    # Display sample of rows
    df.show(n=20)

    # Write results to GCS
    if "--dry-run" in sys.argv:
        print("Data will not be uploaded to BigQuery")
    else:
        # Set GCS temp location
        temp_path = BUCKET_NAME

        # Saving the data to BigQuery using the "indirect path" method and the spark-bigquery connector
        # Uses the "overwrite" SaveMode to ensure DAG doesn't fail when being re-run
        # See https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#save-modes
        # for other save mode options
        df.write.format("bigquery").option("temporaryGcsBucket", temp_path).mode(
            "overwrite"
        ).save(WRITE_TABLE)
        print("Data written to BigQuery")

サポート ファイルを Cloud Storage にアップロードする

PySpark ファイルと holidays.csv に保存されているデータセットをアップロードするには:

  1. data_analytics_process.py をローカルマシンに保存します。

  2. holidays.csv をローカルマシンに保存します。

  3. Google Cloud コンソールで、Cloud Storage ブラウザページに移動します。

    Cloud Storage ブラウザに移動

  4. 前に作成したバケットの名前をクリックします。

  5. バケットの [オブジェクト] タブで、[ファイルをアップロード] ボタンをクリックし、表示されたダイアログで data_analytics_process.pyholidays.csv を選択し、[開く] をクリックします。

データ分析 DAG

DAG の例を確認する

DAG は複数の演算子を使用してデータを変換し、統合します。

  • GCSToBigQueryOperatorは、Cloud Storage からの holidays.csv ファイルを、前の手順で作成した BigQuery holidays_weather データセット内の新しいテーブルに取り込みます。

  • DataprocCreateBatchOperator は、Serverless for Apache Spark を使用して PySpark バッチジョブを作成して実行します。

  • BigQueryInsertJobOperator は、[日付] 列の holidays.csv のデータを BigQuery 一般公開データセット ghcn_d の気象データと結合します。BigQueryInsertJobOperator タスクは for ループを使用して動的に生成されます。また、これらのタスクは TaskGroup にあるため、Airflow UI のグラフビューで読みやすくなります。

import datetime

from airflow import models
from airflow.providers.google.cloud.operators import dataproc
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import (
    GCSToBigQueryOperator,
)
from airflow.utils.task_group import TaskGroup

PROJECT_NAME = "{{var.value.gcp_project}}"

# BigQuery configs
BQ_DESTINATION_DATASET_NAME = "holiday_weather"
BQ_DESTINATION_TABLE_NAME = "holidays_weather_joined"
BQ_NORMALIZED_TABLE_NAME = "holidays_weather_normalized"

# Dataproc configs
BUCKET_NAME = "{{var.value.gcs_bucket}}"
PROCESSING_PYTHON_FILE = f"gs://{BUCKET_NAME}/data_analytics_process.py"

BATCH_ID = "data-processing-{{ ts_nodash | lower}}"  # Dataproc serverless only allows lowercase characters
BATCH_CONFIG = {
    "runtime_config": {"version": "1.1"},
    "pyspark_batch": {
        "main_python_file_uri": PROCESSING_PYTHON_FILE,
        "args": [
            BUCKET_NAME,
            f"{BQ_DESTINATION_DATASET_NAME}.{BQ_DESTINATION_TABLE_NAME}",
            f"{BQ_DESTINATION_DATASET_NAME}.{BQ_NORMALIZED_TABLE_NAME}",
        ],
    },
    "environment_config": {
        "execution_config": {
            "service_account": "{{var.value.dataproc_service_account}}"
        }
    },
}

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()
)

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    "start_date": yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    "email_on_failure": False,
    "email_on_retry": False,
}

with models.DAG(
    "data_analytics_dag",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:
    create_batch = dataproc.DataprocCreateBatchOperator(
        task_id="create_batch",
        project_id=PROJECT_NAME,
        region="{{ var.value.gce_region }}",
        batch=BATCH_CONFIG,
        batch_id=BATCH_ID,
    )
    # This data is static and it is safe to use WRITE_TRUNCATE
    # to reduce chance of 409 duplicate errors
    load_external_dataset = GCSToBigQueryOperator(
        task_id="run_bq_external_ingestion",
        bucket=BUCKET_NAME,
        source_objects=["holidays.csv"],
        destination_project_dataset_table=f"{BQ_DESTINATION_DATASET_NAME}.holidays",
        source_format="CSV",
        schema_fields=[
            {"name": "Date", "type": "DATE"},
            {"name": "Holiday", "type": "STRING"},
        ],
        skip_leading_rows=1,
        write_disposition="WRITE_TRUNCATE",
    )

    with TaskGroup("join_bq_datasets") as bq_join_group:
        for year in range(1997, 2022):
            # BigQuery configs
            BQ_DATASET_NAME = f"bigquery-public-data.ghcn_d.ghcnd_{str(year)}"
            BQ_DESTINATION_TABLE_NAME = "holidays_weather_joined"
            # Specifically query a Chicago weather station
            WEATHER_HOLIDAYS_JOIN_QUERY = f"""
            SELECT Holidays.Date, Holiday, id, element, value
            FROM `{PROJECT_NAME}.holiday_weather.holidays` AS Holidays
            JOIN (SELECT id, date, element, value FROM {BQ_DATASET_NAME} AS Table WHERE Table.element="TMAX" AND Table.id="USW00094846") AS Weather
            ON Holidays.Date = Weather.Date;
            """

            # for demo purposes we are using WRITE_APPEND
            # but if you run the DAG repeatedly it will continue to append
            # Your use case may be different, see the Job docs
            # https://cloud.google.com/bigquery/docs/reference/rest/v2/Job
            # for alternative values for the writeDisposition
            # or consider using partitioned tables
            # https://cloud.google.com/bigquery/docs/partitioned-tables
            bq_join_holidays_weather_data = BigQueryInsertJobOperator(
                task_id=f"bq_join_holidays_weather_data_{str(year)}",
                configuration={
                    "query": {
                        "query": WEATHER_HOLIDAYS_JOIN_QUERY,
                        "useLegacySql": False,
                        "destinationTable": {
                            "projectId": PROJECT_NAME,
                            "datasetId": BQ_DESTINATION_DATASET_NAME,
                            "tableId": BQ_DESTINATION_TABLE_NAME,
                        },
                        "writeDisposition": "WRITE_APPEND",
                    }
                },
                location="US",
            )

        load_external_dataset >> bq_join_group >> create_batch

Airflow UI を使用して変数を追加する

Airflow における変数は、任意の設定や構成をシンプルな Key-Value ストアとして保存および取得するためのユニバーサルな方法です。この DAG は Airflow 変数を使用して共通値を保存します。これらの変数を環境に追加するには、次のようにします。

  1. Cloud Composer コンソールから Airflow UI にアクセスします

  2. [管理] > [変数] に移動します。

  3. 次の変数を追加します。

    • gcp_project: プロジェクト ID。

    • gcs_bucket: 前の手順で作成したバケットの名前(gs:// 接頭辞は付けない)。

    • gce_region: Google Cloud Apache Spark 用サーバーレス ネットワーキングの要件を満たす Dataproc ジョブを配置するリージョン。これは、以前の手順で限定公開の Google アクセスを有効にしたリージョンです。

    • dataproc_service_account: Cloud Composer 環境のサービス アカウント。このサービス アカウントは、Cloud Composer 環境の [環境の構成] タブで確認できます。

DAG を環境のバケットにアップロードする

Cloud Composer がスケジュールを設定するのは、環境のバケット内の /dags フォルダにある DAG です。Google Cloud コンソールを使用して DAG をアップロードするには:

  1. ローカルマシンに data_analytics_dag.py を保存します。

  2. Google Cloud コンソールで、[環境] ページに移動します。

    [環境] に移動

  3. 環境のリストで、[DAG フォルダ] 列の [DAG] リンクをクリックします。環境の DAG フォルダが開きます。

  4. [ファイルをアップロード] をクリックします。

  5. ローカルマシン上の data_analytics_dag.py を選択して、[開く] をクリックします。

DAG をトリガーする

  1. Cloud Composer 環境で [DAG] タブをクリックします。

  2. DAG ID data_analytics_dag をクリックします。

  3. [DAG をトリガー] をクリックします。

  4. タスクが正常に完了したことを示す緑色のチェックマークが表示されるまで、5~10 分待ちます。

DAG の成功を確認する

  1. Google Cloud コンソールで、[BigQuery] ページに移動します。

    BigQuery に移動

  2. [エクスプローラ] パネルでプロジェクト名をクリックします。

  3. [holidays_weather_joined] をクリックします。

  4. プレビューをクリックして、生成されたテーブルを表示します。値列の数値は、10 分の 1 の摂氏度数です。

  5. [holidays_weather_normalized] をクリックします。

  6. プレビューをクリックして、生成されたテーブルを表示します。値列の数値は、摂氏度数です。

Google Cloud Apache Spark 向け Serverless の詳細(省略可)

より複雑な PySpark データ処理フローを使用して、この DAG の高度なバージョンを試すことができます。GitHub でデータ分析の例の Dataproc 拡張機能をご覧ください。

クリーンアップ

このチュートリアル用に作成した個々のリソースを削除します。

次のステップ