Use the Spark BigQuery connector

You can use the spark-bigquery-connector with Managed Service for Apache Spark to read and write data from and to BigQuery. This tutorial demonstrates a PySpark application that uses the spark-bigquery-connector.

Confirm the connector version

See Managed Service for Apache Spark runtime releases to determine the BigQuery connector version that is installed in your batch workload or interactive session runtime version. If the connector is not listed, see Make the connector available to applications.

Make the connector available to applications (if needed)

The BigQuery connector is installed in all supported Managed Service for Apache Spark runtime versions. If you are using an unsupported runtime version that does not install the connector (Spark runtime 1.0), you can make the connector available to an application in either of the following two ways:

  • Use the jars parameter to point to a connector jar file when you submit a Managed Service for Apache Spark batch workload or run an interactive session. The following batch workload example specifies a connector jar file (see the GoogleCloudDataproc/spark-bigquery-connector repository on GitHub for a list of available connector jar files).
    • Google Cloud CLI example:
      gcloud dataproc batches submit pyspark \
          --region=REGION \
          --jars=spark-3.5-bigquery-version.jar \
          ... other args
      

Calculate costs

This tutorial uses billable components of Google Cloud, including:

  • Managed Service for Apache Spark
  • BigQuery
  • Cloud Storage

Use the Pricing Calculator to generate a cost estimate based on your projected usage.

New Cloud Platform users may be eligible for a free trial.

Configure billing

By default, the project associated with the credentials or service account is billed for API usage. To bill a different project, set the following configuration property: spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>").

You can also add this property to a read or write operation, as follows: .option("parentProject", "<BILLED-GCP-PROJECT>").

Submit a PySpark wordcount batch workload

This example reads data from BigQuery into a Spark DataFrame to perform a word count using the standard data source API.

The connector writes the wordcount output to BigQuery in the following sequence of operations:

  1. Buffers the data into temporary files in your Cloud Storage bucket

  2. Copies the data in one operation from your Cloud Storage bucket into BigQuery

  3. Deletes the temporary files in Cloud Storage after the BigQuery load operation completes (temporary files are also deleted after the Spark application terminates). If deletion fails, you will need to delete any unwanted temporary Cloud Storage files, which typically are placed in gs://BUCKET_NAME/.spark-bigquery-JOB_ID-UUID.

Steps to run the wordcount workload

  1. Open a local terminal or Cloud Shell.
  2. Create the wordcount_dataset with the bq command-line tool in a local terminal or in Cloud Shell.
    bq mk wordcount_dataset
    
  3. Create a Cloud Storage bucket with the Google Cloud CLI.
    gcloud storage buckets create gs://BUCKET_NAME
    
    Replace BUCKET_NAME with the name of the Cloud Storage bucket you created.
  4. Create the file wordcount.py locally in a text editor by copying the following PySpark code.
    #!/usr/bin/python
    """BigQuery I/O PySpark example."""
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
      .builder \
      .appName('spark-bigquery-demo') \
      .getOrCreate()
    
    # Cloud Storage bucket used by the connector for temporary BigQuery
    # export data.
    bucket = "BUCKET_NAME"
    spark.conf.set('temporaryGcsBucket', bucket)
    
    # Load data from BigQuery.
    words = spark.read.format('bigquery') \
      .load('bigquery-public-data.samples.shakespeare') \
      .load()
    words.createOrReplaceTempView('words')
    
    # Perform word count.
    word_count = spark.sql(
        'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
    word_count.show()
    word_count.printSchema()
    
    # Save the data to BigQuery
    word_count.write.format('bigquery') \
      .save('wordcount_dataset.wordcount_output')
  5. Submit the PySpark batch workload:
    gcloud dataproc batches submit pyspark wordcount.py \
        --region=REGION \
        --deps-bucket=BUCKET_NAME
    
    Sample terminal output:
    ...
    +---------+----------+
    |     word|word_count|
    +---------+----------+
    |     XVII|         2|
    |    spoil|        28|
    |    Drink|         7|
    |forgetful|         5|
    |   Cannot|        46|
    |    cures|        10|
    |   harder|        13|
    |  tresses|         3|
    |      few|        62|
    |  steel'd|         5|
    | tripping|         7|
    |   travel|        35|
    |   ransom|        55|
    |     hope|       366|
    |       By|       816|
    |     some|      1169|
    |    those|       508|
    |    still|       567|
    |      art|       893|
    |    feign|        10|
    +---------+----------+
    only showing top 20 rows
    
    root
     |-- word: string (nullable = false)
     |-- word_count: long (nullable = true)
    

    To preview the output table in the Google Cloud console, open the BigQuery page, select the wordcount_output table, and then click Preview.
    BigQuery table preview rendering
    Figure 1: Preview the output table in BigQuery

For more information