BigQuery へのデータの読み込み

このドキュメントでは、Managed Service for Apache Spark を使用して、処理されたデータを Cloud Storage から BigQuery テーブルに読み込む Spark ジョブを実行する方法について説明します。Managed Service for Apache Spark は、Spark 環境と必要なコネクタを管理することで、このプロセスを効率化します。

始める前に

  1. Google Cloud アカウントにログインします。 Google Cloudを初めて使用する場合は、 アカウントを作成して、実際のシナリオでの Google プロダクトのパフォーマンスを評価してください。新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Dataproc, BigQuery, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Dataproc, BigQuery, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  8. Cloud Storage バケットを作成します。
  9. イメージ バージョン 2.1 以降を使用する Managed Service for Apache Spark クラスタを作成します。
  10. BigQuery データセットを作成します。

PySpark スクリプトを準備する

  1. load_analytics_data.py という名前のローカル Python ファイルを作成します。

  2. ファイルに次のコードを追加し、このスクリプトは、Cloud Storage パスからデータを読み取り、集計を実行して、結果を BigQuery に書き込みます。

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import sum as _sum
    import sys
    
    # --- Configuration ---
    gcs_bucket = "BUCKET_NAME"
    bq_project = "PROJECT_ID"
    bq_dataset = "DATASET"
    bq_table = "corpus_word_counts"
    
    # --- Paths ---
    processed_path = f"gs://{gcs_bucket}/processed/processed_data"
    temp_gcs_path = f"{gcs_bucket}"
    
    # --- Spark Session Initialization ---
    spark = SparkSession.builder \
       .appName("Dataproc-BigQuery-Load") \
       .config("spark.hadoop.google.cloud.bigdata.connector.temporary.gcs.bucket", temp_gcs_path) \
       .getOrCreate()
    
    # --- Read Processed Data from Cloud Storage ---
    processed_df = spark.read.parquet(processed_path)
    
    # --- Final Aggregation for Analytics-Ready Stage ---
    analytics_df = processed_df.groupBy("corpus") \
       .agg(_sum("word_count_int").alias("total_word_count")) \
       .orderBy("corpus")
    
    print("Aggregated Analytics-Ready data:")
    analytics_df.show()
    
    # --- Write DataFrame to BigQuery ---
    print(f"Writing data to BigQuery table: {bq_dataset}.{bq_table}")
    
    analytics_df.write \
       .format("bigquery") \
       .option("table", f"{bq_project}.{bq_dataset}.{bq_table}") \
       .mode("append") \
       .save()
    
    print("Successfully wrote data to BigQuery.")
    
    # --- Stop Spark Session ---
    spark.stop()
    
  3. 各プレースホルダを次のように置き換えます。

    • BUCKET_NAME: Cloud Storage バケットの名前。
    • PROJECT_ID: 実際の Google Cloudプロジェクト ID。
  4. load_analytics_data.py スクリプトを Cloud Storage バケットにアップロードします。

Managed Service for Apache Spark ジョブを送信する

PySpark スクリプトをジョブとして Managed Service for Apache Spark クラスタに送信します。

  1. ターミナルで gcloud dataproc jobs submit pyspark コマンドを実行します。

    gcloud dataproc jobs submit pyspark gs://YOUR_BUCKET_NAME/scripts/load_analytics_data.py \
        --cluster=CLUSTER_NAME \
        --region=REGION
    
  2. 各プレースホルダを次のように置き換えます。

    • BUCKET_NAME: Cloud Storage バケットの名前。
    • CLUSTER_NAME: Managed Service for Apache Spark クラスタの名前。
    • REGION: クラスタが配置されているリージョン。

    このコマンドは、PySpark ジョブを Managed Service for Apache Spark に送信します。Managed Service for Apache Spark ワーカーは、指定された Cloud Storage パスからスクリプトを取得し、クラスタで実行します。

データ読み込みを確認する

  1. Google Cloud コンソールで、Managed Service for Apache Spark の [ジョブ] ページに移動して、ジョブの実行をモニタリングし、ドライバ出力ログを表示します。

  2. ジョブが完了したら、[BigQuery] ページに移動します。

  3. [エクスプローラ] パネルで、プロジェクトとデータセットを見つけて、corpus_word_counts テーブルを選択します。

  4. [プレビュー] タブをクリックして、読み込まれたデータを調べます。

Spark-BigQuery コネクタの仕組み

Spark-BigQuery コネクタを使用すると、Spark アプリケーションで BigQuery との間で読み取りと書き込みを行うことができます。イメージ バージョン 2.1 以降の Managed Service for Apache Spark クラスタでは、コネクタはプリインストールされています。

コネクタは、間接書き込みメソッドを使用してデータを読み込みます。この方法では、Managed Service for Apache Spark と BigQuery の両方を活用して、高いパフォーマンスを実現します。

  1. Managed Service for Apache Spark クラスタの Spark ジョブは、最終的な DataFrame を Cloud Storage バケットの一時的な場所に書き込みます。

  2. Cloud Storage への書き込みが完了すると、コネクタは BigQuery 読み込みジョブをトリガーします。

  3. BigQuery は、一時的な Cloud Storage の場所からターゲット テーブルにデータを取り込みます。

この間接的なアプローチにより、Spark コンピューティングが BigQuery 取り込みから切り離されます。この切り離しにより、各サービスが効率的に動作し、大量のデータ読み込みで高スループットが確保されます。

次のステップ