BigQuery へのデータの読み込み
このドキュメントでは、Managed Service for Apache Spark を使用して、Cloud Storage から処理済みデータを BigQuery テーブルに読み込む Spark ジョブを実行する方法について説明します。Managed Service for Apache Spark は、Spark 環境と必要なコネクタを管理することで、このプロセスを効率化します。
始める前に
- アカウントにログインします。 Google Cloud を初めて使用する場合は、 アカウントを作成して、 実際のシナリオでプロダクトがどのように機能するかを評価してください。 Google Cloud新規のお客様には、ワークロードの実行、テスト、デプロイに利用できる $300 分の無料クレジットも提供されます。
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Dataproc, BigQuery, and Cloud Storage APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Dataproc, BigQuery, and Cloud Storage APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.- Cloud Storage バケットを作成します。
- イメージ バージョン
2.1以降を使用する Managed Service for Apache Spark クラスタを作成します。 - BigQuery データセットを作成します。
PySpark スクリプトを準備する
load_analytics_data.pyという名前のローカル Python ファイルを作成します。ファイルに次のコードを追加し、このスクリプトは、Cloud Storage パスからデータを読み取り、集計を実行して、結果を BigQuery に書き込みます。
from pyspark.sql import SparkSession from pyspark.sql.functions import sum as _sum import sys # --- Configuration --- gcs_bucket = "BUCKET_NAME" bq_project = "PROJECT_ID" bq_dataset = "DATASET" bq_table = "corpus_word_counts" # --- Paths --- processed_path = f"gs://{gcs_bucket}/processed/processed_data" temp_gcs_path = f"{gcs_bucket}" # --- Spark Session Initialization --- spark = SparkSession.builder \ .appName("Dataproc-BigQuery-Load") \ .config("spark.hadoop.google.cloud.bigdata.connector.temporary.gcs.bucket", temp_gcs_path) \ .getOrCreate() # --- Read Processed Data from Cloud Storage --- processed_df = spark.read.parquet(processed_path) # --- Final Aggregation for Analytics-Ready Stage --- analytics_df = processed_df.groupBy("corpus") \ .agg(_sum("word_count_int").alias("total_word_count")) \ .orderBy("corpus") print("Aggregated Analytics-Ready data:") analytics_df.show() # --- Write DataFrame to BigQuery --- print(f"Writing data to BigQuery table: {bq_dataset}.{bq_table}") analytics_df.write \ .format("bigquery") \ .option("table", f"{bq_project}.{bq_dataset}.{bq_table}") \ .mode("append") \ .save() print("Successfully wrote data to BigQuery.") # --- Stop Spark Session --- spark.stop()各プレースホルダを次のように置き換えます。
BUCKET_NAME: Cloud Storage バケットの名前。PROJECT_ID: 実際の Google Cloud プロジェクト ID。
load_analytics_data.pyスクリプトを Cloud Storage バケットにアップロードします。
Managed Service for Apache Spark ジョブを送信する
PySpark スクリプトをジョブとして Managed Service for Apache Spark クラスタに送信します。
ターミナルで、
gcloud dataproc jobs submit pysparkコマンドを実行します。gcloud dataproc jobs submit pyspark gs://YOUR_BUCKET_NAME/scripts/load_analytics_data.py \ --cluster=CLUSTER_NAME \ --region=REGION各プレースホルダを次のように置き換えます。
BUCKET_NAME: Cloud Storage バケットの名前。CLUSTER_NAME: Managed Service for Apache Spark クラスタの名前。REGION: クラスタが配置されているリージョン。
このコマンドは、PySpark ジョブを Managed Service for Apache Spark サービスに送信します。Managed Service for Apache Spark ワーカーは、指定された Cloud Storage パスからスクリプトを取得し、クラスタで実行します。
データの読み込みを確認する
コンソールで、Managed Service for Apache Spark の [ジョブ] ページに移動して、 ジョブの実行をモニタリングし、ドライバの出力ログを表示します。 Google Cloud
ジョブが完了したら、[BigQuery] ページに移動します。
[エクスプローラ] パネルで、プロジェクトとデータセットを見つけて、
corpus_word_countsテーブルを選択します。[プレビュー] タブをクリックして、読み込まれたデータを確認します。
Spark-BigQuery コネクタの仕組み
Spark-BigQuery コネクタを使用すると、Spark アプリケーションで BigQuery の読み取りと書き込みを行うことができます。イメージ バージョン 2.1 以降の Managed Service for Apache Spark クラスタでは、コネクタがプリインストールされています。
コネクタは、間接書き込みメソッドを使用してデータを読み込みます。このメソッドは、Managed Service for Apache Spark と BigQuery の両方を活用して、高いパフォーマンスを実現します。
Managed Service for Apache Spark クラスタの Spark ジョブは、最終的な DataFrame を Cloud Storage バケットの一時的な場所に書き込みます。
Cloud Storage への書き込みが完了すると、コネクタは BigQuery 読み込みジョブをトリガーします。
BigQuery は、一時的な Cloud Storage の場所からターゲット テーブルにデータを取り込みます。
この間接的なアプローチにより、Spark の計算と BigQuery の取り込みが分離されます。この分離により、各サービスを効率的に運用でき、大量のデータ読み込みで高スループットを確保できます。
次のステップ
- Spark-BigQuery コネクタの詳細を確認する。
- BigQuery でデータをクエリして可視化する方法を学習する。
- Looker を使用して BigQuery データを可視化する方法を確認する。