El spark-bigquery-connector se usa con Apache Spark para leer y escribir datos desde y hacia BigQuery. El conector aprovecha la API de BigQuery Storage cuando lees datos de BigQuery.
En este instructivo, se proporciona información sobre la disponibilidad del conector preinstalado y se muestra cómo hacer que una versión específica del conector esté disponible para los trabajos de Spark. El código de ejemplo muestra cómo usar el conector de BigQuery de Spark dentro de una aplicación de Spark.
Usa el conector preinstalado
El conector de BigQuery de Spark está preinstalado en y está disponible para los trabajos de Spark que se ejecutan en clústeres de Dataproc creados con las versiones de imagen 2.1 y posteriores. La versión del conector preinstalado aparece en la página de lanzamiento de cada versión de imagen. Por ejemplo, la fila Conector de BigQuery en la página
de versiones de lanzamiento de imágenes 2.2.x
muestra la versión del conector que está instalada en los lanzamientos de imágenes 2.2 más recientes.
Haz que una versión específica del conector esté disponible para los trabajos de Spark
Si deseas usar una versión del conector que sea diferente de una versión preinstalada en un clúster de versión de imagen 2.1 o posterior, o si deseas instalar el conector en un clúster de versión de imagen anterior a 2.1, sigue las instrucciones de esta sección.
Importante: La versión spark-bigquery-connector debe ser compatible con la versión de imagen del clúster de Dataproc. Consulta la
Matriz de compatibilidad de la imagen del conector con Dataproc.
Clústeres de versión de imagen 2.1 y posteriores
Cuando creas un clúster de Dataproc
con una versión de imagen 2.1 o posterior, especifica la
versión del conector como metadatos del clúster.
Ejemplo de gcloud CLI:
gcloud dataproc clusters create CLUSTER_NAME \ --region=REGION \ --image-version=2.2 \ --metadata=SPARK_BQ_CONNECTOR_VERSION or SPARK_BQ_CONNECTOR_URL\ other flags
Notas:
SPARK_BQ_CONNECTOR_VERSION: Especifica una versión del conector. Las versiones del conector de BigQuery de Spark se enumeran en la página spark-bigquery-connector/releases en GitHub.
Ejemplo:
--metadata=SPARK_BQ_CONNECTOR_VERSION=0.42.1
SPARK_BQ_CONNECTOR_URL: Especifica una URL que apunte al archivo jar en Cloud Storage. Puedes especificar la URL de un conector que aparece en la columna vínculo en Descarga y uso del conector en GitHub o la ruta de acceso a una ubicación de Cloud Storage en la que colocaste un archivo jar del conector personalizado.
Ejemplos:
--metadata=SPARK_BQ_CONNECTOR_URL=gs://spark-lib/bigquery/spark-3.5-bigquery-0.42.1.jar --metadata=SPARK_BQ_CONNECTOR_URL=gs://PATH_TO_CUSTOM_JAR
Clústeres de versión de imagen 2.0 y anteriores
Puedes hacer que el conector de BigQuery de Spark esté disponible para tu aplicación de una de las siguientes maneras:
Instala el spark-bigquery-connector en el directorio de los archivos jar de Spark de cada nodo mediante la acción de inicialización de conectores de Dataproc cuando crees tu clúster.
Proporciona la URL del archivo jar del conector cuando envíes tu trabajo al clúster con la Google Cloud consola, gcloud CLI o la API de Dataproc.
Console
Usa el elemento Archivos jar del trabajo de Spark en la página Enviar un trabajo de Dataproc .
gcloud
API
Usa el
SparkJob.jarFileUriscampo.Cómo especificar el archivo jar del conector cuando se ejecutan trabajos de Spark en clústeres de versión de imagen anteriores a 2.0
- Para especificar el archivo jar del conector, sustituye la información de Scala y la versión del conector en la siguiente cadena de URI:
gs://spark-lib/bigquery/spark-bigquery-with-dependencies_SCALA_VERSION-CONNECTOR_VERSION.jar
- Usa Scala
2.12con las versiones de imagen1.5+de Dataproc. Ejemplo de gcloud CLI:gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-CONNECTOR_VERSION.jar
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.23.2.jar \ -- job args
- Usa Scala
2.11con las versiones de imagen1.4y anteriores de Dataproc: Ejemplo de gcloud CLI:gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-CONNECTOR_VERSION.jar
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.23.2.jar \ -- job-args
- Para especificar el archivo jar del conector, sustituye la información de Scala y la versión del conector en la siguiente cadena de URI:
Incluye el archivo jar del conector en tu aplicación de Scala o Java Spark como una dependencia (consulta Compila con el conector).
Calcula los costos
En este documento, usarás los siguientes componentes facturables de Google Cloud:
- Dataproc
- BigQuery
- Cloud Storage
Para generar una estimación de costos en función del uso previsto,
usa la calculadora de precios.
Lee y escribe datos desde y hacia BigQuery
Este ejemplo lee los datos de BigQuery en un DataFrame de Spark para realizar un recuento de palabras mediante la API de fuente de datos estándar.
El conector escribe los datos en BigQuery mediante el almacenamiento en búfer de todos los datos en una tabla temporal de Cloud Storage. Luego, copia todos los datos en BigQuery en una operación. El conector intenta borrar los archivos temporales una vez que la operación de carga de BigQuery se realiza correctamente, y lo vuelve a hacer cuando la aplicación Spark finaliza.
Si el trabajo falla, quita cualquier archivo temporal de Cloud Storage restante. Por lo general, los archivos temporales de BigQuery se encuentran en gs://[bucket]/.spark-bigquery-[jobid]-[UUID].
Configura la facturación
De forma predeterminada, el proyecto asociado con las credenciales o la cuenta de servicio se factura por el uso de la API. Para facturar un proyecto diferente, establece la siguiente
configuración: spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>").
También se puede agregar a una operación de lectura o escritura, de la siguiente manera:
.option("parentProject", "<BILLED-GCP-PROJECT>").
Ejecuta el código
Antes de ejecutar este ejemplo, crea un conjunto de datos llamado "wordcount_dataset" o cambia el conjunto de datos de salida en el código a un conjunto de datos de BigQuery existente en tu Google Cloud proyecto.
Usa el
bq comando para crear
el wordcount_dataset:
bq mk wordcount_dataset
Usa el comando de Google Cloud CLI para crear un bucket de Cloud Storage, que se usará a fin de exportar a BigQuery:
gcloud storage buckets create gs://[bucket]
Scala
- Examina el código y reemplaza el marcador de posición [bucket] por
el bucket de Cloud Storage que creaste anteriormente.
/* * Remove comment if you are not running in spark-shell. * import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .appName("spark-bigquery-demo") .getOrCreate() */ // Use the Cloud Storage bucket for temporary BigQuery export data used // by the connector. val bucket = "[bucket]" spark.conf.set("temporaryGcsBucket", bucket) // Load data in from BigQuery. See // https://github.com/GoogleCloudDataproc/spark-bigquery-connector/tree/0.17.3#properties // for option information. val wordsDF = spark.read.bigquery("bigquery-public-data:samples.shakespeare") .cache() wordsDF.createOrReplaceTempView("words") // Perform word count. val wordCountDF = spark.sql( "SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word") wordCountDF.show() wordCountDF.printSchema() // Saving the data to BigQuery. (wordCountDF.write.format("bigquery") .save("wordcount_dataset.wordcount_output"))
- Ejecuta el código en tu clúster
- Usa SSH para conectarte al nodo principal del clúster de Dataproc
de la siguiente manera:
- Ve a la
página Clústeres de Dataproc en la Google Cloud consola y, luego, haz clic en el nombre de tu clúster
- En la página >Detalles del clúster, selecciona la pestaña Instancias de VM. Luego, haz clic en
SSHa la derecha del nombre del nodo instancia principal del clúster>
Se abrirá una ventana del navegador en tu directorio principal del nodo principalConnected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Ve a la
página Clústeres de Dataproc en la Google Cloud consola y, luego, haz clic en el nombre de tu clúster
- Crea
wordcount.scalacon el editor de textovi,vim, onanopreinstalado y, luego, pega el código de Scala desde la lista de códigos de Scala.nano wordcount.scala
- Inicia el REPL de
spark-shell.$ spark-shell --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar ... Using Scala version ... Type in expressions to have them evaluated. Type :help for more information. ... Spark context available as sc. ... SQL context available as sqlContext. scala>
- Ejecuta wordcount.scala con el comando
:load wordcount.scalapara crear la tabla de BigQuerywordcount_output. La lista de salida muestra 20 líneas del resultado del recuento de palabras.:load wordcount.scala ... +---------+----------+ | word|word_count| +---------+----------+ | XVII| 2| | spoil| 28| | Drink| 7| |forgetful| 5| | Cannot| 46| | cures| 10| | harder| 13| | tresses| 3| | few| 62| | steel'd| 5| | tripping| 7| | travel| 35| | ransom| 55| | hope| 366| | By| 816| | some| 1169| | those| 508| | still| 567| | art| 893| | feign| 10| +---------+----------+ only showing top 20 rows root |-- word: string (nullable = false) |-- word_count: long (nullable = true)
Para obtener una vista previa de la tabla de resultados, abre laBigQuerypágina, selecciona la tablawordcount_outputy haz clic en Vista previa.
- Usa SSH para conectarte al nodo principal del clúster de Dataproc
de la siguiente manera:
PySpark
- Examina el código y reemplaza el marcador de posición [bucket] por
el bucket de Cloud Storage que creaste anteriormente.
#!/usr/bin/env python """BigQuery I/O PySpark example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-bigquery-demo') \ .getOrCreate() # Use the Cloud Storage bucket for temporary BigQuery export data used # by the connector. bucket = "[bucket]" spark.conf.set('temporaryGcsBucket', bucket) # Load data from BigQuery. words = spark.read.format('bigquery') \ .load('bigquery-public-data:samples.shakespeare') \ words.createOrReplaceTempView('words') # Perform word count. word_count = spark.sql( 'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word') word_count.show() word_count.printSchema() # Save the data to BigQuery word_count.write.format('bigquery') \ .save('wordcount_dataset.wordcount_output')
- Ejecuta el código en tu clúster
- Usa SSH para conectarte al nodo instancia principal del clúster de Dataproc de la siguiente manera:
- Ve a la
página Clústeres de Dataproc en la Google Cloud consola y, luego, haz clic en el nombre de tu clúster
- En la página Detalles del clúster, selecciona la pestaña Instancias de VM. Luego, haz clic en
SSHa la derecha del nombre del nodo instancia principal del clúster
Se abrirá una ventana del navegador en tu directorio principal del nodo principalConnected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Ve a la
página Clústeres de Dataproc en la Google Cloud consola y, luego, haz clic en el nombre de tu clúster
- Crea
wordcount.pycon el editor de textovi,vim, onanopreinstalado y, luego, pega el código de PySpark desde la lista de código de PySpark.nano wordcount.py
- Ejecuta el conteo de palabras con
spark-submitpara crear la tabla de BigQuerywordcount_output. La lista de salida muestra 20 líneas del resultado del recuento de palabras.spark-submit --jars gs://spark-lib/bigquery/spark-bigquery-latest.jar wordcount.py ... +---------+----------+ | word|word_count| +---------+----------+ | XVII| 2| | spoil| 28| | Drink| 7| |forgetful| 5| | Cannot| 46| | cures| 10| | harder| 13| | tresses| 3| | few| 62| | steel'd| 5| | tripping| 7| | travel| 35| | ransom| 55| | hope| 366| | By| 816| | some| 1169| | those| 508| | still| 567| | art| 893| | feign| 10| +---------+----------+ only showing top 20 rows root |-- word: string (nullable = false) |-- word_count: long (nullable = true)
Para obtener una vista previa de la tabla de resultados, abre laBigQuerypágina, selecciona lawordcount_outputtabla y haz clic en Vista previa.
- Usa SSH para conectarte al nodo instancia principal del clúster de Dataproc de la siguiente manera:
Sugerencias para solucionar problemas
Puedes examinar los registros de trabajos en Cloud Logging y en el Explorador de trabajos de BigQuery para solucionar problemas de trabajos de Spark que usan el conector de BigQuery.
Los registros del controlador de Dataproc contienen una entrada
BigQueryClientcon metadatos de BigQuery que incluyen eljobId:ClassNotFoundException
INFO BigQueryClient:.. jobId: JobId{project=PROJECT_ID, job=JOB_ID, location=LOCATION} Los trabajos de BigQuery contienen etiquetas
Dataproc_job_idyDataproc_job_uuid:- Registro:
protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.labels.dataproc_job_id="JOB_ID" protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.labels.dataproc_job_uuid="JOB_UUID" protoPayload.serviceData.jobCompletedEvent.job.jobName.jobId="JOB_NAME"
- Explorador de trabajos de BigQuery: Haz clic en un ID de trabajo para ver los detalles del trabajo en Etiquetas en Información del trabajo.
- Registro:
¿Qué sigue?
- Consulta BigQuery Storage y Spark SQL: Python.
- Aprende a crear un archivo de definición de tablas para una fuente de datos externa.
- Aprende a consultar datos particionados de forma externa.
- Consulta las sugerencias de ajuste de trabajo de Spark.