Utilizzare il connettore Spark BigQuery

Puoi utilizzare il spark-bigquery-connector con Managed Service for Apache Spark per leggere e scrivere dati da e in BigQuery. Questo tutorial mostra un'applicazione PySpark che utilizza spark-bigquery-connector.

Conferma la versione del connettore

Consulta le release del runtime di Managed Service for Apache Spark per determinare la versione del connettore BigQuery installata nella versione del runtime del carico di lavoro batch o della sessione interattiva. Se il connettore non è elencato, consulta Rendere il connettore disponibile per le applicazioni.

Rendi il connettore disponibile per le applicazioni (se necessario)

Il connettore BigQuery è installato in tutte le versioni del runtime di Managed Service for Apache Spark supportate. Se utilizzi una versione del runtime non supportata che non installa il connettore (Spark runtime 1.0), puoi rendere il connettore disponibile per un' applicazione in uno dei due modi seguenti:

  • Utilizza il parametro jars per puntare a un file JAR del connettore quando invii un carico di lavoro batch di Managed Service for Apache Spark o esegui una sessione interattiva. Il seguente esempio di carico di lavoro batch specifica un file JAR del connettore (per un elenco dei file JAR del connettore disponibili, consulta il repository GoogleCloudDataproc/spark-bigquery-connector su GitHub).
    • Esempio di Google Cloud CLI:
      gcloud dataproc batches submit pyspark \
          --region=REGION \
          --jars=spark-3.5-bigquery-version.jar \
          ... other args
      

Calcola i costi

Questo tutorial utilizza componenti fatturabili di Google Cloud, tra cui:

  • Managed Service for Apache Spark
  • BigQuery
  • Cloud Storage

Utilizza il Calcolatore prezzi per generare una stima dei costi in base all'utilizzo previsto.

I nuovi utenti di Cloud Platform potrebbero essere idonei a usufruire di una prova senza costi.

Configura la fatturazione

Per impostazione predefinita, il progetto associato alle credenziali o all'account di servizio viene fatturato per l'utilizzo dell'API. Per fatturare un progetto diverso, imposta la seguente proprietà di configurazione: spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>").

Puoi anche aggiungere questa proprietà a un'operazione di lettura o scrittura, come segue: .option("parentProject", "<BILLED-GCP-PROJECT>").

Invia un carico di lavoro batch di conteggio parole PySpark

Questo esempio legge i dati da BigQuery in un DataFrame Spark per eseguire un conteggio delle parole utilizzando l'API dell'origine dati standard.

Il connettore scrive l'output del conteggio delle parole in BigQuery nella seguente sequenza di operazioni:

  1. Memorizza i dati nel buffer in file temporanei nel bucket Cloud Storage

  2. Copia i dati in un'unica operazione dal bucket Cloud Storage a BigQuery

  3. Elimina i file temporanei in Cloud Storage al termine dell'operazione di caricamento di BigQuery (i file temporanei vengono eliminati anche al termine dell'applicazione Spark). Se l'eliminazione non riesce, dovrai eliminare tutti i file temporanei di Cloud Storage indesiderati, che in genere si trovano in gs://BUCKET_NAME/.spark-bigquery-JOB_ID-UUID.

Passaggi per eseguire il carico di lavoro di conteggio delle parole

  1. Apri un terminale locale o Cloud Shell.
  2. Crea wordcount_dataset con lo strumento a riga di comando bq in un terminale locale o in Cloud Shell.
    bq mk wordcount_dataset
    
  3. Crea un bucket Cloud Storage con el Google Cloud CLI.
    gcloud storage buckets create gs://BUCKET_NAME
    
    Sostituisci BUCKET_NAME con il nome del bucket Cloud Storage che hai creato.
  4. Crea il file wordcount.py localmente in un editor di testo copiando il seguente codice PySpark.
    #!/usr/bin/python
    """BigQuery I/O PySpark example."""
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
      .builder \
      .appName('spark-bigquery-demo') \
      .getOrCreate()
    
    # Cloud Storage bucket used by the connector for temporary BigQuery
    # export data.
    bucket = "BUCKET_NAME"
    spark.conf.set('temporaryGcsBucket', bucket)
    
    # Load data from BigQuery.
    words = spark.read.format('bigquery') \
      .load('bigquery-public-data.samples.shakespeare') \
      .load()
    words.createOrReplaceTempView('words')
    
    # Perform word count.
    word_count = spark.sql(
        'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
    word_count.show()
    word_count.printSchema()
    
    # Save the data to BigQuery
    word_count.write.format('bigquery') \
      .save('wordcount_dataset.wordcount_output')
  5. Invia il carico di lavoro batch PySpark:
    gcloud dataproc batches submit pyspark wordcount.py \
        --region=REGION \
        --deps-bucket=BUCKET_NAME
    
    Esempio di output del terminale:
    ...
    +---------+----------+
    |     word|word_count|
    +---------+----------+
    |     XVII|         2|
    |    spoil|        28|
    |    Drink|         7|
    |forgetful|         5|
    |   Cannot|        46|
    |    cures|        10|
    |   harder|        13|
    |  tresses|         3|
    |      few|        62|
    |  steel'd|         5|
    | tripping|         7|
    |   travel|        35|
    |   ransom|        55|
    |     hope|       366|
    |       By|       816|
    |     some|      1169|
    |    those|       508|
    |    still|       567|
    |      art|       893|
    |    feign|        10|
    +---------+----------+
    only showing top 20 rows
    
    root
     |-- word: string (nullable = false)
     |-- word_count: long (nullable = true)
    

    Per visualizzare l'anteprima della tabella di output nella Google Cloud console, apri la BigQuery pagina, seleziona la wordcount_output tabella e fai clic su Anteprima.
    Rendering dell&#39;anteprima della tabella BigQuery
    Figura 1: visualizza l'anteprima della tabella di output in BigQuery

Per ulteriori informazioni