Utilizzo del connettore BigQuery con Google Cloud Serverless per Apache Spark

Utilizza il spark-bigquery-connector con Apache Spark per leggere e scrivere dati da e in BigQuery. Questo tutorial mostra un'applicazione PySpark che utilizza il spark-bigquery-connector.

Utilizzare il connettore BigQuery con il carico di lavoro

Consulta le release di runtime di Serverless per Apache Spark per determinare la versione del connettore BigQuery installata nella versione di runtime del carico di lavoro batch. Se il connettore non è elencato, consulta la sezione successiva per istruzioni su come renderlo disponibile per le applicazioni.

Come utilizzare il connettore con la versione di runtime di Spark 2.0

Il connettore BigQuery non è installato nella versione di runtime di Spark 2.0. Quando utilizzi la versione di runtime di Spark 2.0, puoi rendere disponibile il connettore per la tua applicazione in uno dei seguenti modi:

  • Utilizza il parametro jars per puntare a un file JAR del connettore quando invii il carico di lavoro batch Google Cloud Serverless per Apache Spark. L'esempio seguente specifica un file JAR del connettore (per un elenco dei file JAR del connettore disponibili, consulta il repository GoogleCloudDataproc/spark-bigquery-connector su GitHub).
    • Esempio di Google Cloud CLI:
      gcloud dataproc batches submit pyspark \
          --region=region \
          --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.13-version.jar \
          ... other args
      
  • Includi il file JAR del connettore nell'applicazione Spark come dipendenza (vedi Compilazione con il connettore)

Calcolare i costi

Questo tutorial utilizza componenti fatturabili di Google Cloud, tra cui:

  • Serverless per Apache Spark
  • BigQuery
  • Cloud Storage

Utilizza il Calcolatore prezzi per generare una stima dei costi in base all'utilizzo previsto.

I nuovi utenti di Cloud Platform potrebbero essere idonei a usufruire di una prova senza costi.

I/O di BigQuery

Questo esempio legge i dati da BigQuery in un DataFrame Spark per eseguire un conteggio delle parole utilizzando l'API dell'origine dati standard.

Il connettore scrive l'output di wordcount in BigQuery nel seguente modo:

  1. Memorizzazione dei dati in file temporanei nel bucket Cloud Storage

  2. Copia dei dati in un'unica operazione dal bucket Cloud Storage a BigQuery

  3. Eliminazione dei file temporanei in Cloud Storage al termine dell'operazione di caricamento di BigQuery (i file temporanei vengono eliminati anche al termine dell'applicazione Spark). Se l'eliminazione non riesce, dovrai eliminare tutti i file temporanei di Cloud Storage indesiderati, che in genere si trovano in gs://YOUR_BUCKET/.spark-bigquery-JOB_ID-UUID.

Configurare la fatturazione

Per impostazione predefinita, il progetto associato alle credenziali o all'account di servizio viene fatturato per l'utilizzo dell'API. Per fatturare un progetto diverso, imposta la seguente configurazione: spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>").

Puoi anche aggiungere un'operazione di lettura o scrittura, come segue: .option("parentProject", "<BILLED-GCP-PROJECT>").

Inviare un carico di lavoro batch di conteggio delle parole PySpark

Esegui un carico di lavoro batch di Spark che conta il numero di parole in un set di dati pubblico.

  1. Apri un terminale locale o Cloud Shell
  2. Crea wordcount_dataset con lo strumento a riga di comando bq in un terminale locale o in Cloud Shell.
    bq mk wordcount_dataset
    
  3. Crea un bucket Cloud Storage con el Google Cloud CLI.
    gcloud storage buckets create gs://YOUR_BUCKET
    
    Sostituisci YOUR_BUCKET con il nome del bucket Cloud Storage che hai creato.
  4. Crea il file wordcount.py localmente in un editor di testo copiando il seguente codice PySpark.
    #!/usr/bin/python
    """BigQuery I/O PySpark example."""
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
      .builder \
      .appName('spark-bigquery-demo') \
      .getOrCreate()
    
    # Use the Cloud Storage bucket for temporary BigQuery export data used
    # by the connector.
    bucket = "YOUR_BUCKET"
    spark.conf.set('temporaryGcsBucket', bucket)
    
    # Load data from BigQuery.
    words = spark.read.format('bigquery') \
      .option('table', 'bigquery-public-data:samples.shakespeare') \
      .load()
    words.createOrReplaceTempView('words')
    
    # Perform word count.
    word_count = spark.sql(
        'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
    word_count.show()
    word_count.printSchema()
    
    # Saving the data to BigQuery
    word_count.write.format('bigquery') \
      .option('table', 'wordcount_dataset.wordcount_output') \
      .save()
  5. Invia il carico di lavoro batch PySpark:
    gcloud dataproc batches submit pyspark wordcount.py \
        --region=REGION \
        --deps-bucket=YOUR_BUCKET
    
    Esempio di output del terminale:
    ...
    +---------+----------+
    |     word|word_count|
    +---------+----------+
    |     XVII|         2|
    |    spoil|        28|
    |    Drink|         7|
    |forgetful|         5|
    |   Cannot|        46|
    |    cures|        10|
    |   harder|        13|
    |  tresses|         3|
    |      few|        62|
    |  steel'd|         5|
    | tripping|         7|
    |   travel|        35|
    |   ransom|        55|
    |     hope|       366|
    |       By|       816|
    |     some|      1169|
    |    those|       508|
    |    still|       567|
    |      art|       893|
    |    feign|        10|
    +---------+----------+
    only showing top 20 rows
    
    root
     |-- word: string (nullable = false)
     |-- word_count: long (nullable = true)
    

    Per visualizzare l'anteprima della tabella di output nella Google Cloud console, apri la pagina BigQuery
    del progetto, seleziona la tabella wordcount_output e fai clic su Anteprima.

Per ulteriori informazioni