Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
このチュートリアルでは、Cloud Composer を使用して Apache Airflow DAG を作成する方法について説明します。DAG は、BigQuery 一般公開データセットと Cloud Storage バケットに保存されている CSV ファイルのデータを結合し、Google Cloud Apache Spark 向け Serverless バッチジョブを実行して結合されたデータを処理します。
このチュートリアルの BigQuery 一般公開データセットは、世界中の気候統合データベースである ghcn_d です。CSV ファイルには、1997 年から 2021 年までの米国の祝日の日付と名前に関する情報が含まれています。
DAG を使用して答えを得たい質問は、「この 25 年間で感謝祭のシカゴはどのくらい温かかったか」というものです。
目標
- デフォルト構成で Cloud Composer 環境を作成する
- 空の BigQuery データセットを作成する
- Cloud Storage バケットを新規作成する
- 次のタスクを含む DAG を作成、実行します。
- 外部データセットを Cloud Storage から BigQuery に読み込む
- BigQuery で 2 つのデータセットを結合する
- データ分析 PySpark ジョブを実行する
準備
API を有効にする
次の API を有効にします。
コンソール
Dataproc、Cloud Composer、BigQuery、Cloud Storage API を有効にします。
API を有効にするために必要なロール
API を有効にするには、serviceusage.services.enable 権限を含む Service Usage 管理者 IAM ロール(roles/serviceusage.serviceUsageAdmin)が必要です。詳しくは、ロールを付与する方法をご覧ください。
gcloud
Dataproc、Cloud Composer、BigQuery、Cloud Storage API を有効にします。
API を有効にするために必要なロール
API を有効にするには、serviceusage.services.enable 権限を含む Service Usage 管理者 IAM ロール(roles/serviceusage.serviceUsageAdmin)が必要です。ロールを付与する方法を確認する。
gcloud services enable dataproc.googleapis.comcomposer.googleapis.com bigquery.googleapis.com storage.googleapis.com
権限を付与する
ユーザー アカウントに次のロールと権限を付与します。
BigQuery データオーナー(
roles/bigquery.dataOwner)のロールを付与して、BigQuery データセットを作成します。ストレージ管理者(
roles/storage.admin)のロールを付与して、Cloud Storage バケットを作成します。
Cloud Composer 環境の作成と準備
デフォルト パラメータを使用して Cloud Composer 環境を作成します。
- 米国のリージョンを選択します。
- 最新の Cloud Composer バージョンを選択します。
Airflow ワーカーが DAG タスクを正常に実行するために、Cloud Composer 環境で使用されるサービス アカウントに次のロールを付与します。
- BigQuery ユーザー(
roles/bigquery.user) - BigQuery データオーナー(
roles/bigquery.dataOwner) - サービス アカウント ユーザー(
roles/iam.serviceAccountUser) - Dataproc 編集者(
roles/dataproc.editor) - Dataproc ワーカー(
roles/dataproc.worker)
- BigQuery ユーザー(
関連リソースを作成する
次のパラメータを使用して空の BigQuery データセットを作成します。
- 名前:
holiday_weather - リージョン:
US
- 名前:
USマルチリージョンで新しい Cloud Storage バケットを作成します。次のコマンドを実行して、ネットワーキングの要件を満たすためにGoogle Cloud Apache Spark 用サーバーレスを実行するリージョン内のデフォルト サブネットで限定公開の Google アクセスを有効にします。Cloud Composer 環境と同じリージョンを使用することをおすすめします。
gcloud compute networks subnets update default \ --region DATAPROC_SERVERLESS_REGION \ --enable-private-ip-google-access
Google Cloud Serverless for Apache Spark を使用したデータ処理
サンプル PySpark のジョブを確認する
次のコードは、温度を摂氏 10 分の 1 の度数から摂氏度数に変換する PySpark ジョブの例です。このジョブは、データセットの温度データを別の形式に変換します。
サポート ファイルを Cloud Storage にアップロードする
PySpark ファイルと holidays.csv に保存されているデータセットをアップロードするには:
data_analytics_process.py をローカルマシンに保存します。
holidays.csv をローカルマシンに保存します。
Google Cloud コンソールで、Cloud Storage ブラウザページに移動します。
前に作成したバケットの名前をクリックします。
バケットの [オブジェクト] タブで、[ファイルをアップロード] ボタンをクリックし、表示されたダイアログで
data_analytics_process.pyとholidays.csvを選択し、[開く] をクリックします。
データ分析 DAG
DAG の例を確認する
DAG は複数の演算子を使用してデータを変換し、統合します。
GCSToBigQueryOperatorは、Cloud Storage からの holidays.csv ファイルを、前の手順で作成した BigQueryholidays_weatherデータセット内の新しいテーブルに取り込みます。DataprocCreateBatchOperatorは、Serverless for Apache Spark を使用して PySpark バッチジョブを作成して実行します。BigQueryInsertJobOperatorは、[日付] 列の holidays.csv のデータを BigQuery 一般公開データセット ghcn_d の気象データと結合します。BigQueryInsertJobOperatorタスクは for ループを使用して動的に生成されます。また、これらのタスクはTaskGroupにあるため、Airflow UI のグラフビューで読みやすくなります。
Airflow UI を使用して変数を追加する
Airflow における変数は、任意の設定や構成をシンプルな Key-Value ストアとして保存および取得するためのユニバーサルな方法です。この DAG は Airflow 変数を使用して共通値を保存します。これらの変数を環境に追加するには、次のようにします。
[管理] > [変数] に移動します。
次の変数を追加します。
gcp_project: プロジェクト ID。gcs_bucket: 前の手順で作成したバケットの名前(gs://接頭辞は付けない)。gce_region: Google Cloud Apache Spark 用サーバーレス ネットワーキングの要件を満たす Dataproc ジョブを配置するリージョン。これは、以前の手順で限定公開の Google アクセスを有効にしたリージョンです。dataproc_service_account: Cloud Composer 環境のサービス アカウント。このサービス アカウントは、Cloud Composer 環境の [環境の構成] タブで確認できます。
DAG を環境のバケットにアップロードする
Cloud Composer がスケジュールを設定するのは、環境のバケット内の /dags フォルダにある DAG です。Google Cloud コンソールを使用して DAG をアップロードするには:
ローカルマシンに data_analytics_dag.py を保存します。
Google Cloud コンソールで、[環境] ページに移動します。
環境のリストで、[DAG フォルダ] 列の [DAG] リンクをクリックします。環境の DAG フォルダが開きます。
[ファイルをアップロード] をクリックします。
ローカルマシン上の
data_analytics_dag.pyを選択して、[開く] をクリックします。
DAG をトリガーする
Cloud Composer 環境で [DAG] タブをクリックします。
DAG ID
data_analytics_dagをクリックします。[DAG をトリガー] をクリックします。
タスクが正常に完了したことを示す緑色のチェックマークが表示されるまで、5~10 分待ちます。
DAG の成功を確認する
Google Cloud コンソールで、[BigQuery] ページに移動します。
[エクスプローラ] パネルでプロジェクト名をクリックします。
[
holidays_weather_joined] をクリックします。プレビューをクリックして、生成されたテーブルを表示します。値列の数値は、10 分の 1 の摂氏度数です。
[
holidays_weather_normalized] をクリックします。プレビューをクリックして、生成されたテーブルを表示します。値列の数値は、摂氏度数です。
Google Cloud Apache Spark 向け Serverless の詳細(省略可)
より複雑な PySpark データ処理フローを使用して、この DAG の高度なバージョンを試すことができます。GitHub でデータ分析の例の Dataproc 拡張機能をご覧ください。
クリーンアップ
このチュートリアル用に作成した個々のリソースを削除します。
このチュートリアル用に作成した Cloud Storage バケットを削除します。
Cloud Composer 環境を削除します(環境のバケットを手動で削除します)。