Utilizzare il connettore Spark BigQuery

Il spark-bigquery-connector viene utilizzato con Apache Spark per leggere e scrivere dati da e in BigQuery. Quando legge i dati da BigQuery, il connettore sfrutta l' API BigQuery Storage.

Questo tutorial fornisce informazioni sulla disponibilità del connettore preinstallato, e mostra come rendere disponibile una versione specifica del connettore per i job Spark. Il codice di esempio mostra come utilizzare il connettore Spark BigQuery all'interno di un'applicazione Spark.

Utilizzare il connettore preinstallato

Il connettore Spark BigQuery è preinstallato ed è disponibile per i job Spark eseguiti sui cluster Dataproc creati con le versioni immagine 2.1 e successive. La versione del connettore preinstallato è elencata nella pagina di rilascio di ogni versione immagine. Ad esempio, la riga Connettore BigQuery nella pagina delle versioni di rilascio dell'immagine 2.2.x mostra la versione del connettore installata nelle ultime release dell'immagine 2.2.

Rendere disponibile una versione specifica del connettore per i job Spark

Se vuoi utilizzare una versione del connettore diversa da una versione preinstallata su un cluster con versione immagine 2.1 o successiva oppure se vuoi installare il connettore su un cluster con versione immagine precedente alla 2.1, segui le istruzioni riportate in questa sezione.

Importante: La versione di spark-bigquery-connector deve essere compatibile con la versione immagine del cluster Dataproc. Consulta la matrice di compatibilità tra il connettore e l'immagine Dataproc.

Cluster con versione immagine 2.1 e successive

Quando crei un cluster Dataproc con una versione immagine 2.1 o successiva, specifica la versione del connettore come metadati del cluster.

Esempio di gcloud CLI:

gcloud dataproc clusters create CLUSTER_NAME \
    --region=REGION \
    --image-version=2.2 \
    --metadata=SPARK_BQ_CONNECTOR_VERSION or SPARK_BQ_CONNECTOR_URL\
    other flags

Note:

  • SPARK_BQ_CONNECTOR_VERSION: specifica una versione del connettore. Le versioni del connettore Spark BigQuery sono elencate nella pagina spark-bigquery-connector/releases su GitHub.

    Esempio:

    --metadata=SPARK_BQ_CONNECTOR_VERSION=0.42.1
    
  • SPARK_BQ_CONNECTOR_URL: specifica un URL che rimanda al file JAR in Cloud Storage. Puoi specificare l'URL di un connettore elencato nella colonna link in Download e utilizzo del connettore in GitHub o il percorso di una località Cloud Storage in cui hai inserito un file JAR del connettore personalizzato.

    Esempi:

    --metadata=SPARK_BQ_CONNECTOR_URL=gs://spark-lib/bigquery/spark-3.5-bigquery-0.42.1.jar
    --metadata=SPARK_BQ_CONNECTOR_URL=gs://PATH_TO_CUSTOM_JAR
    

Cluster con versione immagine 2.0 e precedenti

Puoi rendere disponibile il connettore Spark BigQuery per la tua applicazione in uno dei seguenti modi:

  1. Installa spark-bigquery-connector nella directory dei file JAR di Spark di ogni nodo utilizzando l' azione di inizializzazione dei connettori Dataproc quando crei il cluster.

  2. Fornisci l'URL del file JAR del connettore quando invii il job al cluster utilizzando la Google Cloud console, gcloud CLI o l'API Dataproc.

    Console

    Utilizza l'elemento File JAR del job Spark nella pagina Invia un job di Dataproc.

    gcloud

    Utilizza il gcloud dataproc jobs submit spark --jars flag.

    API

    Utilizza il SparkJob.jarFileUris campo.

    Come specificare il file JAR del connettore quando esegui job Spark su cluster con versione immagine precedente alla 2.0

    Le versioni del connettore Spark-BigQuery sono elencate nel repository GitHub

    • Specifica il file JAR del connettore sostituendo le informazioni sulla versione di Scala e del connettore nella seguente stringa URI:
      gs://spark-lib/bigquery/spark-bigquery-with-dependencies_SCALA_VERSION-CONNECTOR_VERSION.jar
      
    • Utilizza Scala 2.12 con le versioni immagine di Dataproc 1.5+
      gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-CONNECTOR_VERSION.jar
      
      Esempio di gcloud CLI:
      gcloud dataproc jobs submit spark \
          --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.23.2.jar \
          -- job args
      
    • Utilizza Scala 2.11 con le versioni immagine di Dataproc 1.4 e precedenti:
      gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-CONNECTOR_VERSION.jar
      
      Esempio di gcloud CLI:
      gcloud dataproc jobs submit spark \
          --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.23.2.jar \
          -- job-args
      
  3. Includi il file JAR del connettore nell'applicazione Spark Scala o Java come dipendenza (vedi Compilazione rispetto al connettore).

Calcolare i costi

In questo documento vengono utilizzati i seguenti componenti fatturabili di Google Cloud:

  • Dataproc
  • BigQuery
  • Cloud Storage

Per generare una stima dei costi in base all'utilizzo previsto, utilizza il Calcolatore prezzi.

I nuovi Google Cloud utenti potrebbero avere diritto a una prova senza costi.

Leggere e scrivere dati da e in BigQuery

Questo esempio legge i dati da BigQuery in un DataFrame Spark per eseguire un conteggio delle parole utilizzando l'API dell'origine dati standard.

Il connettore scrive i dati in BigQuery eseguendo prima il buffering di tutti i dati in una tabella temporanea di Cloud Storage. Quindi, copia tutti i dati in BigQuery in un'unica operazione. Il connettore tenta di eliminare i file temporanei una volta che l'operazione di caricamento di BigQuery è riuscita e di nuovo quando l'applicazione Spark termina. Se il job non riesce, rimuovi tutti i file temporanei di Cloud Storage rimanenti. In genere, i file BigQuery temporanei si trovano in gs://[bucket]/.spark-bigquery-[jobid]-[UUID].

Configurare la fatturazione

Per impostazione predefinita, il progetto associato alle credenziali o all'account di servizio viene fatturato per l'utilizzo dell'API. Per fatturare un progetto diverso, imposta la seguente configurazione: spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>").

Può anche essere aggiunto a un'operazione di lettura o scrittura, come segue: .option("parentProject", "<BILLED-GCP-PROJECT>").

Esegui il codice

Prima di eseguire questo esempio, crea un set di dati denominato "wordcount_dataset" o modifica il set di dati di output nel codice in un set di dati BigQuery esistente nel tuo Google Cloud progetto.

Utilizza il bq comando per creare il wordcount_dataset:

bq mk wordcount_dataset

Utilizza il comando Google Cloud CLI per creare un bucket Cloud Storage, che verrà utilizzato per l'esportazione in BigQuery:

gcloud storage buckets create gs://[bucket]

Scala

  1. Esamina il codice e sostituisci il segnaposto [bucket] con il bucket Cloud Storage che hai creato in precedenza.
    /*
     * Remove comment if you are not running in spark-shell.
     *
    import org.apache.spark.sql.SparkSession
    val spark = SparkSession.builder()
      .appName("spark-bigquery-demo")
      .getOrCreate()
    */
    
    // Use the Cloud Storage bucket for temporary BigQuery export data used
    // by the connector.
    val bucket = "[bucket]"
    spark.conf.set("temporaryGcsBucket", bucket)
    
    // Load data in from BigQuery. See
    // https://github.com/GoogleCloudDataproc/spark-bigquery-connector/tree/0.17.3#properties
    // for option information.
    val wordsDF =
      spark.read.bigquery("bigquery-public-data:samples.shakespeare")
      .cache()
    
    wordsDF.createOrReplaceTempView("words")
    
    // Perform word count.
    val wordCountDF = spark.sql(
      "SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word")
    wordCountDF.show()
    wordCountDF.printSchema()
    
    // Saving the data to BigQuery.
    (wordCountDF.write.format("bigquery")
      .save("wordcount_dataset.wordcount_output"))
  2. Esegui il codice sul cluster
    1. Utilizza SSH per connetterti al nodo master del cluster Dataproc
      1. Vai alla pagina **Cluster** di Dataproc nella Google Cloud console, quindi fai clic sul nome del cluster Pagina dei cluster Dataproc nella console Cloud.
      2. Nella pagina >Dettagli cluster, seleziona la scheda Istanze VM. Quindi, fai clic su SSH a destra del nome del nodo master del cluster> Pagina dei dettagli del cluster Dataproc nella console Cloud.
        Si apre una finestra del browser nella directory home del nodo master
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. Crea wordcount.scala con l'editor di testo preinstallato vi, vim, o nano, quindi incolla il codice Scala dall' elenco del codice Scala
      nano wordcount.scala
        
    3. Avvia la shell REPL spark-shell.
      $ spark-shell --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar
      ...
      Using Scala version ...
      Type in expressions to have them evaluated.
      Type :help for more information.
      ...
      Spark context available as sc.
      ...
      SQL context available as sqlContext.
      scala>
      
    4. Esegui wordcount.scala con il comando :load wordcount.scala per creare la tabella BigQuery wordcount_output. L'elenco di output mostra 20 righe dell'output del conteggio delle parole.
      :load wordcount.scala
      ...
      +---------+----------+
      |     word|word_count|
      +---------+----------+
      |     XVII|         2|
      |    spoil|        28|
      |    Drink|         7|
      |forgetful|         5|
      |   Cannot|        46|
      |    cures|        10|
      |   harder|        13|
      |  tresses|         3|
      |      few|        62|
      |  steel'd|         5|
      | tripping|         7|
      |   travel|        35|
      |   ransom|        55|
      |     hope|       366|
      |       By|       816|
      |     some|      1169|
      |    those|       508|
      |    still|       567|
      |      art|       893|
      |    feign|        10|
      +---------+----------+
      only showing top 20 rows
      
      root
       |-- word: string (nullable = false)
       |-- word_count: long (nullable = true)
      

      Per visualizzare l'anteprima della tabella di output, apri la BigQuery pagina, seleziona la wordcount_output tabella e poi fai clic su Anteprima. Visualizza l&#39;anteprima della tabella nella pagina Explorer di BigQuery nella console Cloud.

PySpark

  1. Esamina il codice e sostituisci il segnaposto [bucket] con il bucket Cloud Storage che hai creato in precedenza.
    #!/usr/bin/env python
    
    """BigQuery I/O PySpark example."""
    
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
      .builder \
      .master('yarn') \
      .appName('spark-bigquery-demo') \
      .getOrCreate()
    
    # Use the Cloud Storage bucket for temporary BigQuery export data used
    # by the connector.
    bucket = "[bucket]"
    spark.conf.set('temporaryGcsBucket', bucket)
    
    # Load data from BigQuery.
    words = spark.read.format('bigquery') \
      .load('bigquery-public-data:samples.shakespeare') \
    words.createOrReplaceTempView('words')
    
    # Perform word count.
    word_count = spark.sql(
        'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
    word_count.show()
    word_count.printSchema()
    
    # Save the data to BigQuery
    word_count.write.format('bigquery') \
      .save('wordcount_dataset.wordcount_output')
  2. Esegui il codice sul cluster
    1. Utilizza SSH per connetterti al nodo master del cluster Dataproc
      1. Vai alla pagina **Cluster** di Dataproc nella Google Cloud console, quindi fai clic sul nome del cluster Pagina Cluster nella console Cloud.
      2. Nella pagina Dettagli cluster, seleziona la scheda Istanze VM. Quindi, fai clic su SSH a destra del nome del nodo master del cluster Seleziona SSH nella riga del nome del cluster nella pagina Dettagli cluster della console Cloud.
        Si apre una finestra del browser nella directory home del nodo master
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. Crea wordcount.py con l'editor di testo preinstallato vi, vim, o nano, quindi incolla il codice PySpark dall' elenco del codice PySpark
      nano wordcount.py
      
    3. Esegui wordcount con spark-submit per creare la tabella BigQuery wordcount_output. L'elenco di output mostra 20 righe dell'output del conteggio delle parole.
      spark-submit --jars gs://spark-lib/bigquery/spark-bigquery-latest.jar wordcount.py
      ...
      +---------+----------+
      |     word|word_count|
      +---------+----------+
      |     XVII|         2|
      |    spoil|        28|
      |    Drink|         7|
      |forgetful|         5|
      |   Cannot|        46|
      |    cures|        10|
      |   harder|        13|
      |  tresses|         3|
      |      few|        62|
      |  steel'd|         5|
      | tripping|         7|
      |   travel|        35|
      |   ransom|        55|
      |     hope|       366|
      |       By|       816|
      |     some|      1169|
      |    those|       508|
      |    still|       567|
      |      art|       893|
      |    feign|        10|
      +---------+----------+
      only showing top 20 rows
      
      root
       |-- word: string (nullable = false)
       |-- word_count: long (nullable = true)
      

      Per visualizzare l'anteprima della tabella di output, apri la BigQuery pagina, seleziona la tabella wordcount_output e poi fai clic su Anteprima. Visualizza l&#39;anteprima della tabella nella pagina Explorer di BigQuery nella console Cloud.

Suggerimenti per la risoluzione dei problemi

Puoi esaminare i log dei job in Cloud Logging e in Esplora job BigQuery per risolvere i problemi relativi ai job Spark che utilizzano il connettore BigQuery.

  • I log dei driver Dataproc contengono una voce BigQueryClient con i metadati di BigQuery che includono jobId:

    ClassNotFoundException INFO BigQueryClient:.. jobId: JobId{project=PROJECT_ID, job=JOB_ID, location=LOCATION}
    
  • I job BigQuery contengono le etichette Dataproc_job_id e Dataproc_job_uuid:

    • Logging:
      protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.labels.dataproc_job_id="JOB_ID"
      protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.labels.dataproc_job_uuid="JOB_UUID"
      protoPayload.serviceData.jobCompletedEvent.job.jobName.jobId="JOB_NAME"
      
    • Esplora job BigQuery: fai clic su un ID job per visualizzare i dettagli del job in Etichette in Informazioni job.

Passaggi successivi