Load data into BigQuery

This document shows how to use Dataproc to run a Spark job that loads processed data from Cloud Storage into a BigQuery table. Dataproc streamlines this process by managing the Spark environment and the necessary connectors.

Before you begin

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Dataproc, BigQuery, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Dataproc, BigQuery, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  8. Create a Cloud Storage bucket.
  9. Create a Dataproc cluster that uses image version 2.1 or later.
  10. Create a BigQuery dataset.

Prepare the PySpark script

  1. Create a local Python file named load_analytics_data.py.

  2. Add the following code to the file. This script reads data from a Cloud Storage path, performs an aggregation, and writes the result to BigQuery.

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import sum as _sum
    import sys
    
    # --- Configuration ---
    gcs_bucket = "BUCKET_NAME"
    bq_project = "PROJECT_ID"
    bq_dataset = "DATASET"
    bq_table = "corpus_word_counts"
    
    # --- Paths ---
    processed_path = f"gs://{gcs_bucket}/processed/processed_data"
    temp_gcs_path = f"{gcs_bucket}"
    
    # --- Spark Session Initialization ---
    spark = SparkSession.builder \
       .appName("Dataproc-BigQuery-Load") \
       .config("spark.hadoop.google.cloud.bigdata.connector.temporary.gcs.bucket", temp_gcs_path) \
       .getOrCreate()
    
    # --- Read Processed Data from Cloud Storage ---
    processed_df = spark.read.parquet(processed_path)
    
    # --- Final Aggregation for Analytics-Ready Stage ---
    analytics_df = processed_df.groupBy("corpus") \
       .agg(_sum("word_count_int").alias("total_word_count")) \
       .orderBy("corpus")
    
    print("Aggregated Analytics-Ready data:")
    analytics_df.show()
    
    # --- Write DataFrame to BigQuery ---
    print(f"Writing data to BigQuery table: {bq_dataset}.{bq_table}")
    
    analytics_df.write \
       .format("bigquery") \
       .option("table", f"{bq_project}.{bq_dataset}.{bq_table}") \
       .mode("append") \
       .save()
    
    print("Successfully wrote data to BigQuery.")
    
    # --- Stop Spark Session ---
    spark.stop()
    
  3. Replace the following placeholders:

    • BUCKET_NAME: the name of your Cloud Storage bucket.
    • PROJECT_ID: your Google Cloud project ID.
  4. Upload the load_analytics_data.py script to your Cloud Storage bucket.

Submit the Dataproc job

Submit the PySpark script as a job to your Dataproc cluster.

  1. In a terminal, run the gcloud dataproc jobs submit pyspark command:

    gcloud dataproc jobs submit pyspark gs://YOUR_BUCKET_NAME/scripts/load_analytics_data.py \
        --cluster=CLUSTER_NAME \
        --region=REGION
    
  2. Replace the following placeholders:

    • BUCKET_NAME: the name of your Cloud Storage bucket.
    • CLUSTER_NAME: the name of your Dataproc cluster.
    • REGION: the region where your cluster is located.

    The command submits a PySpark job to the Dataproc service. Dataproc workers fetch the script from the specified Cloud Storage path and execute it on the cluster.

Verify the data load

  1. In the Google Cloud console, go to the Dataproc Jobs page to monitor the job's execution and view driver output logs.

  2. After the job completes, go to the BigQuery page.

  3. In the Explorer panel, find your project and dataset, then select the corpus_word_counts table.

  4. Click the Preview tab to inspect the loaded data.

How the Spark-BigQuery connector works

The Spark-BigQuery Connector enables Spark applications to read from and write to BigQuery. On Dataproc clusters with image versions 2.1 or later, the connector is pre-installed.

The connector uses an indirect write method to load data. This method leverages both Dataproc and BigQuery for high performance.

  1. The Spark job on the Dataproc cluster writes the final DataFrame to a temporary location in a Cloud Storage bucket.

  2. After the write to Cloud Storage completes, the connector triggers a BigQuery Load Job.

  3. BigQuery ingests the data from the temporary Cloud Storage location into the target table.

This indirect approach decouples the Spark computation from the BigQuery ingestion. This decoupling allows each service to operate efficiently and ensures high throughput for large data loads.

What's next