The spark-bigquery-connector is used with Apache Spark to read and write data from and to BigQuery. The connector takes advantage of the BigQuery Storage API when reading data from BigQuery.
This tutorial provides information on the availability of the pre-installed connector, and shows you how make a specific connector version available to Spark jobs. Example code shows you how to use the Spark BigQuery connector within a Spark application.
Use the pre-installed connector
The Spark BigQuery connector is pre-installed on and is available to
Spark jobs run on Dataproc clusters created with image versions
2.1 and later. The pre-installed connector version is listed on each image
version release page. For example, the BigQuery Connector row on the
2.2.x image release versions
page shows the connector version that is installed on the latest
2.2 image releases.
Make a specific connector version available to Spark jobs
If you want to use a connector version that is different from a pre-installed
version on a 2.1 or later image version cluster, or if you want to install
the connector on a pre-2.1 image version cluster, follow the instructions in
this section.
Important: The spark-bigquery-connector version must be compatible with
the Dataproc cluster image version. See the
Connector to Dataproc Image Compatibility Matrix.
2.1 and later image version clusters
When you create a Dataproc cluster
with a 2.1 or later image version, specify the
connector version as cluster metadata.
gcloud CLI example:
gcloud dataproc clusters create CLUSTER_NAME \ --region=REGION \ --image-version=2.2 \ --metadata=SPARK_BQ_CONNECTOR_VERSION or SPARK_BQ_CONNECTOR_URL\ other flags
Notes:
- SPARK_BQ_CONNECTOR_VERSION: Specify a connector version. Spark BigQuery connector versions are listed on the spark-bigquery-connector/releases page in GitHub. - Example: - --metadata=SPARK_BQ_CONNECTOR_VERSION=0.42.1 
- SPARK_BQ_CONNECTOR_URL: Specify a URL that points to the jar in Cloud Storage. You can specify the URL of a connector listed in the link column in the Downloading and Using the Connector in GitHub or the path to a Cloud Storage location where you have placed a custom connector jar. - Examples: - --metadata=SPARK_BQ_CONNECTOR_URL=gs://spark-lib/bigquery/spark-3.5-bigquery-0.42.1.jar --metadata=SPARK_BQ_CONNECTOR_URL=gs://PATH_TO_CUSTOM_JAR 
2.0 and earlier image version clusters
You can make the Spark BigQuery connector available to your application in one of the following ways:
- Install the spark-bigquery-connector in the Spark jars directory of every node by using the Dataproc connectors initialization action when you create your cluster. 
- Provide the connector jar URL when you submit your job to the cluster using the Google Cloud console, gcloud CLI, or the Dataproc API. - Console- Use the Spark job Jars files item on the Dataproc Submit a job page. - gcloud- API- Use the - SparkJob.jarFileUrisfield.- How to specify the connector jar when running Spark jobs on pre-2.0 image version clusters - Specify the connector jar by substituting the Scala and connector version
information in the following URI string:
gs://spark-lib/bigquery/spark-bigquery-with-dependencies_SCALA_VERSION-CONNECTOR_VERSION.jar 
- Use Scala 2.12with Dataproc image versions1.5+gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-CONNECTOR_VERSION.jar gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.23.2.jar \ -- job args 
- Use Scala 2.11with Dataproc image versions1.4and earlier:gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-CONNECTOR_VERSION.jar gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.23.2.jar \ -- job-args 
 
- Specify the connector jar by substituting the Scala and connector version
information in the following URI string:
- Include the connector jar in your Scala or Java Spark application as a dependency (see Compiling against the connector). 
Calculate costs
In this document, you use the following billable components of Google Cloud:
- Dataproc
- BigQuery
- Cloud Storage
  
  
  
  To generate a cost estimate based on your projected usage,
      use the pricing calculator.
  
Read and write data from and to BigQuery
This example reads data from BigQuery into a Spark DataFrame to perform a word count using the standard data source API.
The connector writes the data to BigQuery by
first buffering all the data into a Cloud Storage temporary table. Then it
copies all data from into BigQuery in one operation. The
connector attempts to delete the temporary files once the BigQuery
load operation has succeeded and once again when the Spark application terminates.
If the job fails, remove any remaining temporary
Cloud Storage files. Typically, temporary BigQuery
files are located in gs://[bucket]/.spark-bigquery-[jobid]-[UUID].
Configure billing
By default, the project associated with the credentials or service account is
billed for API usage. To bill a different project, set the following
configuration: spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>").
It can also be added to a read or write operation, as follows:
.option("parentProject", "<BILLED-GCP-PROJECT>").
Run the code
Before running this example, create a dataset named "wordcount_dataset" or change the output dataset in the code to an existing BigQuery dataset in your Google Cloud project.
Use the
bq command to create
the wordcount_dataset:
bq mk wordcount_dataset
Use the Google Cloud CLI command to create a Cloud Storage bucket, which will be used to export to BigQuery:
gcloud storage buckets create gs://[bucket]
Scala
- Examine the code and replace the [bucket] placeholder with
  the Cloud Storage bucket you created earlier.
/* * Remove comment if you are not running in spark-shell. * import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .appName("spark-bigquery-demo") .getOrCreate() */ // Use the Cloud Storage bucket for temporary BigQuery export data used // by the connector. val bucket = "[bucket]" spark.conf.set("temporaryGcsBucket", bucket) // Load data in from BigQuery. See // https://github.com/GoogleCloudDataproc/spark-bigquery-connector/tree/0.17.3#properties // for option information. val wordsDF = spark.read.bigquery("bigquery-public-data:samples.shakespeare") .cache() wordsDF.createOrReplaceTempView("words") // Perform word count. val wordCountDF = spark.sql( "SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word") wordCountDF.show() wordCountDF.printSchema() // Saving the data to BigQuery. (wordCountDF.write.format("bigquery") .save("wordcount_dataset.wordcount_output")) 
- Run the code on your cluster
- Use SSH to connect to the Dataproc cluster
  master node
    - Go to the
      Dataproc Clusters
    page in the Google Cloud console, then click the name of your cluster
        
- On the >Cluster details page, select the VM Instances tab. Then, click
        SSHto the right of the name of the cluster master node>  
 A browser window opens at your home directory on the master nodeConnected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
 
- Go to the
      Dataproc Clusters
    page in the Google Cloud console, then click the name of your cluster
      
- Create wordcount.scalawith the pre-installedvi,vim, ornanotext editor, then paste in the Scala code from the Scala code listingnano wordcount.scala 
- Launch the spark-shellREPL.$ spark-shell --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar ... Using Scala version ... Type in expressions to have them evaluated. Type :help for more information. ... Spark context available as sc. ... SQL context available as sqlContext. scala> 
- Run wordcount.scala with the :load wordcount.scalacommand to create the BigQuerywordcount_outputtable. The output listing displays 20 lines from the wordcount output.:load wordcount.scala ... +---------+----------+ | word|word_count| +---------+----------+ | XVII| 2| | spoil| 28| | Drink| 7| |forgetful| 5| | Cannot| 46| | cures| 10| | harder| 13| | tresses| 3| | few| 62| | steel'd| 5| | tripping| 7| | travel| 35| | ransom| 55| | hope| 366| | By| 816| | some| 1169| | those| 508| | still| 567| | art| 893| | feign| 10| +---------+----------+ only showing top 20 rows root |-- word: string (nullable = false) |-- word_count: long (nullable = true) 
 To preview the output table, open theBigQuerypage, select thewordcount_outputtable, and then click Preview.  
 
- Use SSH to connect to the Dataproc cluster
  master node
    
PySpark
- Examine the code and replace the [bucket] placeholder with
  the Cloud Storage bucket you created earlier.
#!/usr/bin/env python """BigQuery I/O PySpark example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-bigquery-demo') \ .getOrCreate() # Use the Cloud Storage bucket for temporary BigQuery export data used # by the connector. bucket = "[bucket]" spark.conf.set('temporaryGcsBucket', bucket) # Load data from BigQuery. words = spark.read.format('bigquery') \ .load('bigquery-public-data:samples.shakespeare') \ words.createOrReplaceTempView('words') # Perform word count. word_count = spark.sql( 'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word') word_count.show() word_count.printSchema() # Save the data to BigQuery word_count.write.format('bigquery') \ .save('wordcount_dataset.wordcount_output') 
- Run the code on your cluster
- Use SSH to connect to the Dataproc cluster master node
    - Go to the
      Dataproc Clusters
    page in the Google Cloud console, then click the name of your cluster
        
- On the Cluster details page, select the VM Instances tab. Then, click
        SSHto the right of the name of the cluster master node  
 A browser window opens at your home directory on the master nodeConnected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
 
- Go to the
      Dataproc Clusters
    page in the Google Cloud console, then click the name of your cluster
      
- Create wordcount.pywith the pre-installedvi,vim, ornanotext editor, then paste in the PySpark code from the PySpark code listingnano wordcount.py 
- Run wordcount with spark-submitto create the BigQuerywordcount_outputtable. The output listing displays 20 lines from the wordcount output.spark-submit --jars gs://spark-lib/bigquery/spark-bigquery-latest.jar wordcount.py ... +---------+----------+ | word|word_count| +---------+----------+ | XVII| 2| | spoil| 28| | Drink| 7| |forgetful| 5| | Cannot| 46| | cures| 10| | harder| 13| | tresses| 3| | few| 62| | steel'd| 5| | tripping| 7| | travel| 35| | ransom| 55| | hope| 366| | By| 816| | some| 1169| | those| 508| | still| 567| | art| 893| | feign| 10| +---------+----------+ only showing top 20 rows root |-- word: string (nullable = false) |-- word_count: long (nullable = true) 
 To preview the output table, open theBigQuerypage, select thewordcount_outputtable, and then click Preview.  
 
- Use SSH to connect to the Dataproc cluster master node
    
Troubleshooting tips
You can examine job logs in Cloud Logging and in the BigQuery Jobs Explorer to troubleshoot Spark jobs that use the BigQuery connector.
- Dataproc driver logs contain a - BigQueryCliententry with BigQuery metadata that includes the- jobId:- ClassNotFoundException - INFO BigQueryClient:.. jobId: JobId{project=PROJECT_ID, job=JOB_ID, location=LOCATION} 
- BigQuery jobs contain - Dataproc_job_idand- Dataproc_job_uuidlabels:- Logging:
protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.labels.dataproc_job_id="JOB_ID" protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.labels.dataproc_job_uuid="JOB_UUID" protoPayload.serviceData.jobCompletedEvent.job.jobName.jobId="JOB_NAME" 
- BigQuery Jobs Explorer: Click a job ID to view job details under Labels in Job information.
 
- Logging:
What's next
- See BigQuery Storage & Spark SQL - Python.
- Learn how to create a table definition file for an external data source.
- Learn how to query externally partitioned data.
- See Spark job tuning tips.