Sie können den spark-bigquery-connector
mit Managed Service for Apache Spark verwenden, um Daten aus BigQuery zu lesen und in BigQuery zu schreiben. In dieser Anleitung wird eine PySpark-Anwendung gezeigt, die den spark-bigquery-connector verwendet.
Connector-Version bestätigen
Unter Managed Service for Apache Spark-Laufzeitversionen finden Sie die BigQuery-Connector-Version, die in der Laufzeitversion Ihrer Batcharbeitslast oder interaktiven Sitzung installiert ist. Wenn der Connector nicht aufgeführt ist, lesen Sie den Abschnitt Connector für Anwendungen verfügbar machen.
Connector für Anwendungen verfügbar machen (falls erforderlich)
Der BigQuery-Connector ist in allen
unterstützten Managed Service for Apache Spark-Laufzeitversionen installiert.
Wenn Sie eine
nicht unterstützte Laufzeitversion
verwenden, in der der Connector nicht installiert ist (Spark runtime 1.0), können Sie den Connector auf eine der folgenden beiden Arten für eine
Anwendung verfügbar machen:
- Verwenden Sie den Parameter
jars, um auf eine Connector-JAR-Datei zu verweisen, wenn Sie eine Managed Service for Apache Spark-Batcharbeitslast senden oder eine interaktive Sitzung ausführen. Im folgenden Beispiel für eine Batcharbeitslast wird eine Connector-JAR-Datei angegeben. Eine Liste der verfügbaren Connector-JAR-Dateien finden Sie im GitHub-Repository GoogleCloudDataproc/spark-bigquery-connector .- Google Cloud CLI-Beispiel:
gcloud dataproc batches submit pyspark \ --region=REGION \ --jars=spark-3.5-bigquery-version.jar \ ... other args
- Google Cloud CLI-Beispiel:
Kosten berechnen
In dieser Anleitung werden kostenpflichtige Komponenten von Google Cloudverwendet, darunter:
- Managed Service for Apache Spark
- BigQuery
- Cloud Storage
Der Preisrechner kann eine Kostenschätzung anhand Ihrer voraussichtlichen Nutzung generieren.
Abrechnung konfigurieren
Standardmäßig wird das Projekt, das mit den Anmeldeinformationen oder dem Dienstkonto verbunden ist, für die API-Nutzung abgerechnet. Wenn Sie ein anderes Projekt abrechnen möchten, legen Sie die folgende
Konfigurationseigenschaft fest: spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>").
Sie können diese Eigenschaft auch einem Lese- oder Schreibvorgang hinzufügen:
.option("parentProject", "<BILLED-GCP-PROJECT>").
PySpark-Batcharbeitslast für die Wortzählung senden
In diesem Beispiel werden Daten aus BigQuery in einen Spark-DataFrame eingelesen und dann mit der Standard-Datenquellen APIeiner Wortzählung unterzogen.
Der Connector schreibt die Ausgabe der Wortzählung in der folgenden Reihenfolge von Vorgängen in BigQuery:
Puffert die Daten in temporären Dateien in Ihrem Cloud Storage-Bucket
Kopiert die Daten in einem Vorgang aus Ihrem Cloud Storage-Bucket nach BigQuery
Löscht die temporären Dateien in Cloud Storage, nachdem der BigQuery-Ladevorgang abgeschlossen ist. Temporäre Dateien werden auch gelöscht, nachdem die Spark-Anwendung beendet wurde. Wenn das Löschen fehlschlägt, müssen Sie alle unerwünschten temporären Cloud Storage-Dateien löschen, die sich in der Regel in
gs://BUCKET_NAME/.spark-bigquery-JOB_ID-UUIDbefinden.
Schritte zum Ausführen der Arbeitslast für die Wortzählung
- Öffnen Sie ein lokales Terminal oder Cloud Shell.
- Erstellen Sie das
wordcount_datasetmit dem bq Befehlszeilentool in einem lokalen Terminal oder in Cloud Shell.bq mk wordcount_dataset
- Erstellen Sie einen Cloud Storage-Bucket mit der
Google Cloud CLI.
Ersetzen Siegcloud storage buckets create gs://BUCKET_NAME
BUCKET_NAMEdurch den Namen des von Ihnen erstellten Cloud Storage-Bucket. - Erstellen Sie die Datei
wordcount.pylokal in einem Texteditor, indem Sie den folgenden PySpark-Code kopieren.#!/usr/bin/python """BigQuery I/O PySpark example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName('spark-bigquery-demo') \ .getOrCreate() # Cloud Storage bucket used by the connector for temporary BigQuery # export data. bucket = "BUCKET_NAME" spark.conf.set('temporaryGcsBucket', bucket) # Load data from BigQuery. words = spark.read.format('bigquery') \ .load('bigquery-public-data.samples.shakespeare') \ .load() words.createOrReplaceTempView('words') # Perform word count. word_count = spark.sql( 'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word') word_count.show() word_count.printSchema() # Save the data to BigQuery word_count.write.format('bigquery') \ .save('wordcount_dataset.wordcount_output')
- Senden Sie die PySpark-Batcharbeitslast:
Beispiel für die Terminalausgabe:gcloud dataproc batches submit pyspark wordcount.py \ --region=REGION \ --deps-bucket=BUCKET_NAME
... +---------+----------+ | word|word_count| +---------+----------+ | XVII| 2| | spoil| 28| | Drink| 7| |forgetful| 5| | Cannot| 46| | cures| 10| | harder| 13| | tresses| 3| | few| 62| | steel'd| 5| | tripping| 7| | travel| 35| | ransom| 55| | hope| 366| | By| 816| | some| 1169| | those| 508| | still| 567| | art| 893| | feign| 10| +---------+----------+ only showing top 20 rows root |-- word: string (nullable = false) |-- word_count: long (nullable = true)
Wenn Sie eine Vorschau der Ausgabetabelle in der Google Cloud Console aufrufen möchten, öffnen Sie die BigQuery Seite, wählen Sie diewordcount_outputTabelle aus und klicken Sie dann auf Vorschau.
Abbildung 1: Vorschau der Ausgabetabelle in BigQuery
Weitere Informationen
- BigQuery Storage und Spark SQL – Python
- Tabellendefinitionsdatei für eine externe Datenquelle erstellen
- Extern partitionierte Daten verwenden