Puoi utilizzare spark-bigquery-connector
con Managed Service for Apache Spark per leggere e scrivere dati da e in BigQuery. Questo tutorial mostra un'applicazione PySpark che utilizza
spark-bigquery-connector.
Conferma la versione del connettore
Consulta Versioni del runtime di Managed Service for Apache Spark per determinare la versione del connettore BigQuery installata nella versione del runtime del carico di lavoro batch o della sessione interattiva. Se il connettore non è elencato, consulta Rendere disponibile il connettore per le applicazioni.
Rendi disponibile il connettore per le applicazioni (se necessario)
Il connettore BigQuery è installato in tutte le
versioni del runtime di Managed Service for Apache Spark supportate.
Se utilizzi una
versione del runtime non supportata
che non installa il connettore (Spark runtime 1.0), puoi rendere disponibile il connettore a un'applicazione
in uno dei due modi seguenti:
- Utilizza il parametro
jarsper indicare un file JAR del connettore quando invii un workload batch Managed Service for Apache Spark o esegui una sessione interattiva. Il seguente esempio di workload batch specifica un file JAR del connettore (consulta il repository GoogleCloudDataproc/spark-bigquery-connector su GitHub per un elenco dei file JAR del connettore disponibili).- Esempio di Google Cloud CLI:
gcloud dataproc batches submit pyspark \ --region=REGION \ --jars=spark-3.5-bigquery-version.jar \ ... other args
- Esempio di Google Cloud CLI:
Calcola i costi
Questo tutorial utilizza componenti fatturabili di Google Cloud, tra cui:
- Managed Service for Apache Spark
- BigQuery
- Cloud Storage
Utilizza il Calcolatore prezzi per generare una stima dei costi in base all'utilizzo previsto.
Configura la fatturazione
Per impostazione predefinita, il progetto associato alle credenziali o al account di servizio viene fatturato per l'utilizzo dell'API. Per fatturare un progetto diverso, imposta la seguente
proprietà di configurazione: spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>").
Puoi anche aggiungere questa proprietà a un'operazione di lettura o scrittura, come segue:
.option("parentProject", "<BILLED-GCP-PROJECT>").
Invia un carico di lavoro batch di conteggio parole PySpark
Questo esempio legge i dati da BigQuery in un DataFrame Spark per eseguire un conteggio delle parole utilizzando l'API dell'origine dati standard.
Il connettore scrive l'output del conteggio parole in BigQuery nella seguente sequenza di operazioni:
Memorizza i dati in file temporanei nel bucket Cloud Storage
Copia i dati in un'unica operazione dal bucket Cloud Storage in BigQuery.
Elimina i file temporanei in Cloud Storage al termine dell'operazione di caricamento di BigQuery (i file temporanei vengono eliminati anche al termine dell'applicazione Spark). Se l'eliminazione non va a buon fine, dovrai eliminare tutti i file temporanei di Cloud Storage indesiderati, che in genere si trovano in
gs://BUCKET_NAME/.spark-bigquery-JOB_ID-UUID.
Passaggi per eseguire il carico di lavoro wordcount
- Apri un terminale locale o Cloud Shell.
- Crea
wordcount_datasetcon lo strumento a riga di comando bq in un terminale locale o in Cloud Shell.bq mk wordcount_dataset
- Crea un bucket Cloud Storage con
Google Cloud CLI.
Sostituiscigcloud storage buckets create gs://BUCKET_NAME
BUCKET_NAMEcon il nome del bucket Cloud Storage che hai creato. - Crea il file
wordcount.pylocalmente in un editor di testo copiando il seguente codice PySpark.#!/usr/bin/python """BigQuery I/O PySpark example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName('spark-bigquery-demo') \ .getOrCreate() # Cloud Storage bucket used by the connector for temporary BigQuery # export data. bucket = "BUCKET_NAME" spark.conf.set('temporaryGcsBucket', bucket) # Load data from BigQuery. words = spark.read.format('bigquery') \ .load('bigquery-public-data.samples.shakespeare') \ .load() words.createOrReplaceTempView('words') # Perform word count. word_count = spark.sql( 'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word') word_count.show() word_count.printSchema() # Save the data to BigQuery word_count.write.format('bigquery') \ .save('wordcount_dataset.wordcount_output')
- Invia il carico di lavoro batch PySpark:
Output di esempio del terminale:gcloud dataproc batches submit pyspark wordcount.py \ --region=REGION \ --deps-bucket=BUCKET_NAME
... +---------+----------+ | word|word_count| +---------+----------+ | XVII| 2| | spoil| 28| | Drink| 7| |forgetful| 5| | Cannot| 46| | cures| 10| | harder| 13| | tresses| 3| | few| 62| | steel'd| 5| | tripping| 7| | travel| 35| | ransom| 55| | hope| 366| | By| 816| | some| 1169| | those| 508| | still| 567| | art| 893| | feign| 10| +---------+----------+ only showing top 20 rows root |-- word: string (nullable = false) |-- word_count: long (nullable = true)
Per visualizzare l'anteprima della tabella di output nella console Google Cloud , apri la pagina BigQuery, seleziona la tabellawordcount_outpute poi fai clic su Anteprima.
Figura 1: visualizza l'anteprima della tabella di output in BigQuery
Per ulteriori informazioni
- BigQuery Storage e Spark SQL - Python
- Creazione di un file di definizione della tabella per un'origine dati esterna
- Utilizzare dati partizionati esternamente