將資料載入 BigQuery

本文說明如何使用 Managed Service for Apache Spark 執行 Spark 工作,將 Cloud Storage 中經過處理的資料載入 BigQuery 資料表。Managed Service for Apache Spark 會管理 Spark 環境和必要連接器,簡化這個程序。

事前準備

  1. 登入 Google Cloud 帳戶。如果您是 Google Cloud新手,歡迎 建立帳戶,親自評估產品在實際工作環境中的成效。新客戶還能獲得價值 $300 美元的免費抵免額,可用於執行、測試及部署工作負載。
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Dataproc, BigQuery, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Dataproc, BigQuery, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  8. 建立 Cloud Storage bucket。
  9. 建立 Managed Service for Apache Spark 叢集,並使用 2.1 以上版本的映像檔。
  10. 建立 BigQuery 資料集

準備 PySpark 指令碼

  1. 建立名為 load_analytics_data.py 的本機 Python 檔案。

  2. 在檔案新增下列程式碼:這個指令碼會從 Cloud Storage 路徑讀取資料、執行彙整作業,然後將結果寫入 BigQuery。

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import sum as _sum
    import sys
    
    # --- Configuration ---
    gcs_bucket = "BUCKET_NAME"
    bq_project = "PROJECT_ID"
    bq_dataset = "DATASET"
    bq_table = "corpus_word_counts"
    
    # --- Paths ---
    processed_path = f"gs://{gcs_bucket}/processed/processed_data"
    temp_gcs_path = f"{gcs_bucket}"
    
    # --- Spark Session Initialization ---
    spark = SparkSession.builder \
       .appName("Dataproc-BigQuery-Load") \
       .config("spark.hadoop.google.cloud.bigdata.connector.temporary.gcs.bucket", temp_gcs_path) \
       .getOrCreate()
    
    # --- Read Processed Data from Cloud Storage ---
    processed_df = spark.read.parquet(processed_path)
    
    # --- Final Aggregation for Analytics-Ready Stage ---
    analytics_df = processed_df.groupBy("corpus") \
       .agg(_sum("word_count_int").alias("total_word_count")) \
       .orderBy("corpus")
    
    print("Aggregated Analytics-Ready data:")
    analytics_df.show()
    
    # --- Write DataFrame to BigQuery ---
    print(f"Writing data to BigQuery table: {bq_dataset}.{bq_table}")
    
    analytics_df.write \
       .format("bigquery") \
       .option("table", f"{bq_project}.{bq_dataset}.{bq_table}") \
       .mode("append") \
       .save()
    
    print("Successfully wrote data to BigQuery.")
    
    # --- Stop Spark Session ---
    spark.stop()
    
  3. 替換下列預留位置:

    • BUCKET_NAME:Cloud Storage bucket 的名稱。
    • PROJECT_ID:您的 Google Cloud專案 ID。
  4. load_analytics_data.py 指令碼上傳至 Cloud Storage bucket。

提交 Managed Service for Apache Spark 工作

將 PySpark 指令碼以工作形式提交至 Managed Service for Apache Spark 叢集。

  1. 在終端機中執行 gcloud dataproc jobs submit pyspark 指令:

    gcloud dataproc jobs submit pyspark gs://YOUR_BUCKET_NAME/scripts/load_analytics_data.py \
        --cluster=CLUSTER_NAME \
        --region=REGION
    
  2. 替換下列預留位置:

    • BUCKET_NAME:Cloud Storage bucket 的名稱。
    • CLUSTER_NAME:Managed Service for Apache Spark 叢集的名稱。
    • REGION:叢集所在的區域。

    這個指令會將 PySpark 工作提交至 Managed Service for Apache Spark 服務。Managed Service for Apache Spark 工作人員會從指定的 Cloud Storage 路徑擷取指令碼,並在叢集上執行。

驗證資料載入

  1. 前往 Google Cloud 控制台的 Managed Service for Apache Spark「Jobs」(工作) 頁面,監控工作執行情況並查看驅動程式輸出記錄。

  2. 工作完成後,請前往「BigQuery」BigQuery頁面。

  3. 在「Explorer」面板中找出專案和資料集,然後選取 corpus_word_counts 資料表。

  4. 按一下「預覽」分頁標籤,檢查載入的資料。

Spark-BigQuery 連接器的運作方式

Spark-BigQuery 連接器可讓 Spark 應用程式從 BigQuery 讀取資料,以及將資料寫入 BigQuery。在映像檔版本為 2.1 以上的 Managed Service for Apache Spark 叢集上,系統會預先安裝連接器。

連接器會使用間接寫入方法載入資料。這個方法會同時運用 Managed Service for Apache Spark 和 BigQuery,以提升效能。

  1. Managed Service for Apache Spark 叢集上的 Spark 工作會將最終 DataFrame 寫入 Cloud Storage bucket 中的暫時位置。

  2. 寫入 Cloud Storage 完成後,連接器會觸發 BigQuery 載入工作。

  3. BigQuery 會將臨時 Cloud Storage 位置的資料擷取至目標資料表。

這種間接方式可將 Spark 運算與 BigQuery 擷取作業分離。這種解耦方式可讓每項服務有效運作,並確保大量資料負載的高總處理量。

後續步驟