Memuat data ke BigQuery

Dokumen ini menunjukkan cara menggunakan Managed Service untuk Apache Spark guna menjalankan tugas Spark yang memuat data yang diproses dari Cloud Storage ke tabel BigQuery. Managed Service untuk Apache Spark menyederhanakan proses ini dengan mengelola lingkungan Spark dan konektor yang diperlukan.

Sebelum memulai

  1. Sign in to your Google Cloud account. Jika Anda baru menggunakan Google Cloud, buat akun untuk mengevaluasi performa produk kami dalam skenario dunia nyata. Pelanggan baru juga mendapatkan kredit gratis senilai $300 untuk menjalankan, menguji, dan men-deploy workload.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Dataproc, BigQuery, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Dataproc, BigQuery, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  8. Buat bucket Cloud Storage.
  9. Buat cluster Managed Service untuk Apache Spark yang menggunakan versi image 2.1 atau yang lebih baru.
  10. Buat set data BigQuery.

Menyiapkan skrip PySpark

  1. Buat file Python lokal bernama load_analytics_data.py.

  2. Tambahkan kode berikut ke file tersebut. Skrip ini membaca data dari jalur Cloud Storage, melakukan agregasi, dan menulis hasilnya ke BigQuery.

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import sum as _sum
    import sys
    
    # --- Configuration ---
    gcs_bucket = "BUCKET_NAME"
    bq_project = "PROJECT_ID"
    bq_dataset = "DATASET"
    bq_table = "corpus_word_counts"
    
    # --- Paths ---
    processed_path = f"gs://{gcs_bucket}/processed/processed_data"
    temp_gcs_path = f"{gcs_bucket}"
    
    # --- Spark Session Initialization ---
    spark = SparkSession.builder \
       .appName("Dataproc-BigQuery-Load") \
       .config("spark.hadoop.google.cloud.bigdata.connector.temporary.gcs.bucket", temp_gcs_path) \
       .getOrCreate()
    
    # --- Read Processed Data from Cloud Storage ---
    processed_df = spark.read.parquet(processed_path)
    
    # --- Final Aggregation for Analytics-Ready Stage ---
    analytics_df = processed_df.groupBy("corpus") \
       .agg(_sum("word_count_int").alias("total_word_count")) \
       .orderBy("corpus")
    
    print("Aggregated Analytics-Ready data:")
    analytics_df.show()
    
    # --- Write DataFrame to BigQuery ---
    print(f"Writing data to BigQuery table: {bq_dataset}.{bq_table}")
    
    analytics_df.write \
       .format("bigquery") \
       .option("table", f"{bq_project}.{bq_dataset}.{bq_table}") \
       .mode("append") \
       .save()
    
    print("Successfully wrote data to BigQuery.")
    
    # --- Stop Spark Session ---
    spark.stop()
    
  3. Ganti placeholder berikut:

    • BUCKET_NAME: nama bucket Cloud Storage Anda.
    • PROJECT_ID: project ID Anda. Google Cloud
  4. Upload skrip load_analytics_data.py ke bucket Cloud Storage Anda.

Mengirimkan tugas Managed Service untuk Apache Spark

Kirimkan skrip PySpark sebagai tugas ke cluster Managed Service untuk Apache Spark Anda.

  1. Di terminal, jalankan perintah gcloud dataproc jobs submit pyspark:

    gcloud dataproc jobs submit pyspark gs://YOUR_BUCKET_NAME/scripts/load_analytics_data.py \
        --cluster=CLUSTER_NAME \
        --region=REGION
    
  2. Ganti placeholder berikut:

    • BUCKET_NAME: nama bucket Cloud Storage Anda.
    • CLUSTER_NAME: nama cluster Managed Service untuk Apache Spark Anda.
    • REGION: region tempat cluster Anda berada.

    Perintah ini mengirimkan tugas PySpark ke Managed Service untuk Apache Spark. Worker Managed Service untuk Apache Spark mengambil skrip dari jalur Cloud Storage yang ditentukan dan menjalankannya di cluster.

Memverifikasi pemuatan data

  1. Di Google Cloud konsol, buka halaman Jobs Managed Service untuk Apache Spark guna memantau eksekusi tugas dan melihat log output driver.

  2. Setelah tugas selesai, buka halaman BigQuery.

  3. Di panel Explorer, temukan project dan set data Anda, lalu pilih tabel corpus_word_counts.

  4. Klik tab Preview untuk memeriksa data yang dimuat.

Cara kerja konektor Spark-BigQuery

Konektor Spark-BigQuery memungkinkan aplikasi Spark membaca dari dan menulis ke BigQuery. Di cluster Managed Service untuk Apache Spark dengan versi image 2.1 atau yang lebih baru, konektor ini telah diinstal sebelumnya.

Konektor menggunakan metode tulis tidak langsung untuk memuat data. Metode ini memanfaatkan Managed Service untuk Apache Spark dan BigQuery untuk performa tinggi.

  1. Tugas Spark di cluster Managed Service untuk Apache Spark menulis DataFrame akhir ke lokasi sementara di bucket Cloud Storage.

  2. Setelah penulisan ke Cloud Storage selesai, konektor akan memicu Tugas Pemuatan BigQuery.

  3. BigQuery menyerap data dari lokasi Cloud Storage sementara ke tabel target.

Pendekatan tidak langsung ini memisahkan komputasi Spark dari penyerapan BigQuery. Pemisahan ini memungkinkan setiap layanan beroperasi secara efisien dan memastikan throughput tinggi untuk pemuatan data besar.

Langkah berikutnya