Le connecteur spark-bigquery est utilisé avec Apache Spark pour lire et écrire des données depuis et vers BigQuery. Le connecteur tire profit de l' API BigQuery Storage lors de la lecture des données de BigQuery.
Ce tutoriel fournit des informations sur la disponibilité du connecteur préinstallé et explique comment mettre une version spécifique du connecteur à la disposition des tâches Spark. L'exemple de code montre comment utiliser le connecteur Spark BigQuery dans une application Spark.
Utiliser le connecteur préinstallé
Le connecteur Spark BigQuery est préinstallé sur et est disponible pour les tâches Spark exécutées sur des clusters Dataproc créés avec les versions d'image 2.1 et ultérieures. La version préinstallée du connecteur est répertoriée sur chaque page de version d'image. Par exemple, la ligne BigQuery Connector (Connecteur BigQuery) de la page des
versions d'image 2.2.x
affiche la version du connecteur installée sur les dernières
versions d'image 2.2.
Mettre une version spécifique du connecteur à la disposition des tâches Spark
Si vous souhaitez utiliser une version de connecteur différente d'une version préinstallée sur un cluster de version d'image 2.1 ou ultérieure, ou si vous souhaitez installer le connecteur sur un cluster de version d'image antérieure à 2.1, suivez les instructions de cette section.
Important : La version spark-bigquery-connector doit être compatible avec la version d'image du cluster Dataproc. Consultez la
matrice de compatibilité entre le connecteur et l'image Dataproc.
Clusters de version d'image 2.1 et ultérieure
Lorsque vous créez un cluster Dataproc
avec une version d'image 2.1 ou ultérieure, spécifiez la
version du connecteur en tant que métadonnées de cluster.
Exemple gcloud CLI :
gcloud dataproc clusters create CLUSTER_NAME \ --region=REGION \ --image-version=2.2 \ --metadata=SPARK_BQ_CONNECTOR_VERSION or SPARK_BQ_CONNECTOR_URL\ other flags
Remarques :
SPARK_BQ_CONNECTOR_VERSION : spécifiez une version du connecteur. Les versions du connecteur Spark BigQuery sont répertoriées sur la page spark-bigquery-connector/releases de GitHub.
Exemple :
--metadata=SPARK_BQ_CONNECTOR_VERSION=0.42.1
SPARK_BQ_CONNECTOR_URL : spécifiez une URL qui pointe vers le fichier JAR dans Cloud Storage. Vous pouvez spécifier l'URL d'un connecteur répertorié dans la colonne link de la section Télécharger et utiliser le connecteur dans GitHub ou le chemin d'accès à un emplacement Cloud Storage où vous avez placé un fichier JAR de connecteur personnalisé.
Exemples :
--metadata=SPARK_BQ_CONNECTOR_URL=gs://spark-lib/bigquery/spark-3.5-bigquery-0.42.1.jar --metadata=SPARK_BQ_CONNECTOR_URL=gs://PATH_TO_CUSTOM_JAR
Clusters de version d'image 2.0 et antérieure
Vous pouvez mettre le connecteur Spark BigQuery à la disposition de votre application de l'une des manières suivantes :
Installez le connecteur Spark BigQuery dans le répertoire de fichiers JAR Spark de chaque nœud en effectuant l' action d'initialisation des connecteurs Dataproc lors de la création de votre cluster.
Fournissez l'URL du fichier JAR du connecteur lorsque vous envoyez votre tâche au cluster à l'aide de la Google Cloud console, gcloud CLI ou de l'API Dataproc.
Console
Utilisez l'élément Jars files (Fichiers JAR) de la tâche Spark sur la page Submit a job (Envoyer une tâche) de Dataproc.
gcloud
Utilisez l'option
gcloud dataproc jobs submit spark --jarsflag.API
Utilisez le
SparkJob.jarFileUrischamp.Comment spécifier le fichier JAR du connecteur lors de l'exécution de tâches Spark sur des clusters de version d'image antérieure à 2.0
- Spécifiez le fichier JAR du connecteur en remplaçant les informations sur la version Scala et du connecteur dans la chaîne d'URI suivante :
gs://spark-lib/bigquery/spark-bigquery-with-dependencies_SCALA_VERSION-CONNECTOR_VERSION.jar
- Utilisez Scala
2.12avec les versions d'image Dataproc1.5+. Exemple gcloud CLI :gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-CONNECTOR_VERSION.jar
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.23.2.jar \ -- job args
- Utilisez Scala
2.11avec les versions d'image Dataproc1.4et antérieures : Exemple gcloud CLI :gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-CONNECTOR_VERSION.jar
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.23.2.jar \ -- job-args
- Spécifiez le fichier JAR du connecteur en remplaçant les informations sur la version Scala et du connecteur dans la chaîne d'URI suivante :
Incluez le fichier JAR du connecteur dans votre application Scala ou Java Spark en tant que dépendance (consultez la section Compiler sur le connecteur).
Calculer les coûts
Dans ce document, vous utilisez les composants facturables suivants de Google Cloud:
- Dataproc
- BigQuery
- Cloud Storage
Obtenez une estimation des coûts en fonction de votre utilisation prévue,
utilisez le simulateur de coût.
Lire et écrire des données depuis et vers BigQuery
Cet exemple lit les données de BigQuery dans un DataFrame Spark pour effectuer un décompte de mots à l'aide de l'API de source de données standard.
Le connecteur écrit les données dans BigQuery en mettant d'abord toutes les données en mémoire tampon dans une table temporaire Cloud Storage. Ensuite, il copie toutes les données dans BigQuery en une seule opération. Le connecteur tente de supprimer les fichiers temporaires une fois l'opération de chargement BigQuery terminée, puis effectue une nouvelle tentative lorsque l'application Spark se termine.
Si la tâche échoue, supprimez tous les fichiers Cloud Storage temporaires restants. En règle générale, les fichiers BigQuery temporaires se trouvent dans gs://[bucket]/.spark-bigquery-[jobid]-[UUID].
Configurer la facturation
Par défaut, le projet qui est facturé pour l'utilisation de l'API est le projet associé aux identifiants ou au compte de service. Pour facturer un autre projet, définissez la configuration suivante
: spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>").
Vous pouvez également l'ajouter à une opération de lecture ou d'écriture, comme suit :
.option("parentProject", "<BILLED-GCP-PROJECT>").
Exécuter le code
Avant d'exécuter cet exemple, créez un ensemble de données nommé "wordcount_dataset" ou remplacez l'ensemble de données de sortie dans le code par un ensemble de données BigQuery existant dans votre Google Cloud projet.
Utilisez la
bq command pour créer
l'élément wordcount_dataset :
bq mk wordcount_dataset
Utilisez la commande Google Cloud CLI pour créer un bucket Cloud Storage qui servira à l'exportation vers BigQuery :
gcloud storage buckets create gs://[bucket]
Scala
- Examinez le code et remplacez l'espace réservé [bucket] par
le bucket Cloud Storage que vous avez créé précédemment.
/* * Remove comment if you are not running in spark-shell. * import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .appName("spark-bigquery-demo") .getOrCreate() */ // Use the Cloud Storage bucket for temporary BigQuery export data used // by the connector. val bucket = "[bucket]" spark.conf.set("temporaryGcsBucket", bucket) // Load data in from BigQuery. See // https://github.com/GoogleCloudDataproc/spark-bigquery-connector/tree/0.17.3#properties // for option information. val wordsDF = spark.read.bigquery("bigquery-public-data:samples.shakespeare") .cache() wordsDF.createOrReplaceTempView("words") // Perform word count. val wordCountDF = spark.sql( "SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word") wordCountDF.show() wordCountDF.printSchema() // Saving the data to BigQuery. (wordCountDF.write.format("bigquery") .save("wordcount_dataset.wordcount_output"))
- Exécutez le code sur votre cluster.
- Utilisez SSH pour vous connecter au nœud maître du cluster Dataproc
- Accédez à la
page Clusters de Dataproc dans la Google Cloud console, puis cliquez sur le nom de votre cluster
- Sur la page >Détails du cluster, sélectionnez l'onglet "Instances de VM". Cliquez ensuite sur
SSHà droite du nom du nœud maître du cluster>
Une fenêtre de navigateur s'ouvre dans votre répertoire de base sur le nœud maître.Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Accédez à la
page Clusters de Dataproc dans la Google Cloud console, puis cliquez sur le nom de votre cluster
- Créez
wordcount.scalaavec l'éditeur de textevi,vim, ounanopréinstallé, puis collez le code Scala à partir de la liste de codes Scala.nano wordcount.scala
- Lancez le REPL
spark-shell.$ spark-shell --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar ... Using Scala version ... Type in expressions to have them evaluated. Type :help for more information. ... Spark context available as sc. ... SQL context available as sqlContext. scala>
- Exécutez "WordCount.scala" avec la commande
:load wordcount.scalapour créer la table BigQuerywordcount_output. La liste de sortie affiche 20 lignes du résultat de la commande "wordcount".:load wordcount.scala ... +---------+----------+ | word|word_count| +---------+----------+ | XVII| 2| | spoil| 28| | Drink| 7| |forgetful| 5| | Cannot| 46| | cures| 10| | harder| 13| | tresses| 3| | few| 62| | steel'd| 5| | tripping| 7| | travel| 35| | ransom| 55| | hope| 366| | By| 816| | some| 1169| | those| 508| | still| 567| | art| 893| | feign| 10| +---------+----------+ only showing top 20 rows root |-- word: string (nullable = false) |-- word_count: long (nullable = true)
Pour prévisualiser la table de sortie, ouvrez laBigQuerypage, sélectionnez lawordcount_outputtable, puis cliquez sur Aperçu.
- Utilisez SSH pour vous connecter au nœud maître du cluster Dataproc
PySpark
- Examinez le code et remplacez l'espace réservé [bucket] par
le bucket Cloud Storage que vous avez créé précédemment.
#!/usr/bin/env python """BigQuery I/O PySpark example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-bigquery-demo') \ .getOrCreate() # Use the Cloud Storage bucket for temporary BigQuery export data used # by the connector. bucket = "[bucket]" spark.conf.set('temporaryGcsBucket', bucket) # Load data from BigQuery. words = spark.read.format('bigquery') \ .load('bigquery-public-data:samples.shakespeare') \ words.createOrReplaceTempView('words') # Perform word count. word_count = spark.sql( 'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word') word_count.show() word_count.printSchema() # Save the data to BigQuery word_count.write.format('bigquery') \ .save('wordcount_dataset.wordcount_output')
- Exécutez le code sur votre cluster
- Utilisez SSH pour vous connecter au nœud maître du cluster Dataproc.
- Accédez à la
page Clusters de Dataproc dans la Google Cloud console, puis cliquez sur le nom de votre cluster
- Sur la page Détails du cluster, sélectionnez l'onglet "Instances de VM". Cliquez ensuite sur
SSHà droite du nom du nœud maître du cluster
Une fenêtre de navigateur s'ouvre dans votre répertoire de base sur le nœud maître.Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Accédez à la
page Clusters de Dataproc dans la Google Cloud console, puis cliquez sur le nom de votre cluster
- Créez
wordcount.pyavec l'éditeur de textevi,vim, ounanopréinstallé, puis collez le code PySpark à partir de la liste de code PySpark.nano wordcount.py
- Exécutez "wordcount" avec
spark-submitpour créer la table BigQuerywordcount_output. La liste de sortie affiche 20 lignes du résultat de la commande "wordcount".spark-submit --jars gs://spark-lib/bigquery/spark-bigquery-latest.jar wordcount.py ... +---------+----------+ | word|word_count| +---------+----------+ | XVII| 2| | spoil| 28| | Drink| 7| |forgetful| 5| | Cannot| 46| | cures| 10| | harder| 13| | tresses| 3| | few| 62| | steel'd| 5| | tripping| 7| | travel| 35| | ransom| 55| | hope| 366| | By| 816| | some| 1169| | those| 508| | still| 567| | art| 893| | feign| 10| +---------+----------+ only showing top 20 rows root |-- word: string (nullable = false) |-- word_count: long (nullable = true)
Pour prévisualiser la table de sortie, ouvrez laBigQuerypage, sélectionnez lawordcount_outputtable, puis cliquez sur Aperçu.
- Utilisez SSH pour vous connecter au nœud maître du cluster Dataproc.
Conseils de dépannage
Vous pouvez examiner les journaux de tâches dans Cloud Logging et dans l'explorateur de tâches BigQuery pour résoudre les problèmes liés aux tâches Spark qui utilisent le connecteur BigQuery.
Les journaux de pilote Dataproc contiennent une entrée
BigQueryClientavec des métadonnées BigQuery qui incluent lejobId:ClassNotFoundException
INFO BigQueryClient:.. jobId: JobId{project=PROJECT_ID, job=JOB_ID, location=LOCATION} Les tâches BigQuery contiennent les libellés
Dataproc_job_idetDataproc_job_uuid:- Journalisation avec Logging :
protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.labels.dataproc_job_id="JOB_ID" protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.labels.dataproc_job_uuid="JOB_UUID" protoPayload.serviceData.jobCompletedEvent.job.jobName.jobId="JOB_NAME"
- Explorateur de tâches BigQuery : cliquez sur un ID de tâche pour afficher les détails de la tâche sous Libellés dans Informations sur la tâche.
- Journalisation avec Logging :
Étape suivante
- Consultez Stockage BigQuery et Spark SQL - Python.
- Découvrez comment créer un fichier de définition de table pour une source de données externe.
- Découvrez comment interroger des données partitionnées en externe.
- Consultez Conseils pour régler des tâches Spark.