Spark BigQuery-Connector verwenden

Sie können den spark-bigquery-connector mit Managed Service for Apache Spark verwenden, um Daten aus BigQuery zu lesen und in BigQuery zu schreiben. In dieser Anleitung wird eine PySpark-Anwendung gezeigt, die den spark-bigquery-connector verwendet.

Connector-Version bestätigen

Unter Managed Service for Apache Spark-Laufzeitversionen finden Sie die BigQuery-Connector-Version, die in der Laufzeitversion Ihrer Batcharbeitslast oder interaktiven Sitzung installiert ist. Wenn der Connector nicht aufgeführt ist, lesen Sie den Abschnitt Connector für Anwendungen verfügbar machen.

Connector für Anwendungen verfügbar machen (falls erforderlich)

Der BigQuery-Connector ist in allen unterstützten Managed Service for Apache Spark-Laufzeitversionen installiert. Wenn Sie eine nicht unterstützte Laufzeitversion verwenden, in der der Connector nicht installiert ist (Spark runtime 1.0), können Sie den Connector auf eine der folgenden beiden Arten für eine Anwendung verfügbar machen:

  • Verwenden Sie den Parameter jars, um auf eine Connector-JAR-Datei zu verweisen, wenn Sie eine Managed Service for Apache Spark-Batcharbeitslast senden oder eine interaktive Sitzung ausführen. Im folgenden Beispiel für eine Batcharbeitslast wird eine Connector-JAR-Datei angegeben. Eine Liste der verfügbaren Connector-JAR-Dateien finden Sie im GitHub-Repository GoogleCloudDataproc/spark-bigquery-connector .
    • Google Cloud CLI-Beispiel:
      gcloud dataproc batches submit pyspark \
          --region=REGION \
          --jars=spark-3.5-bigquery-version.jar \
          ... other args
      

Kosten berechnen

In dieser Anleitung werden kostenpflichtige Komponenten von Google Cloudverwendet, darunter:

  • Managed Service for Apache Spark
  • BigQuery
  • Cloud Storage

Der Preisrechner kann eine Kostenschätzung anhand Ihrer voraussichtlichen Nutzung generieren.

Neuen Cloud Platform-Nutzern steht gegebenenfalls eine kostenlose Testversion zur Verfügung.

Abrechnung konfigurieren

Standardmäßig wird das Projekt, das mit den Anmeldeinformationen oder dem Dienstkonto verbunden ist, für die API-Nutzung abgerechnet. Wenn Sie ein anderes Projekt abrechnen möchten, legen Sie die folgende Konfigurationseigenschaft fest: spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>").

Sie können diese Eigenschaft auch einem Lese- oder Schreibvorgang hinzufügen: .option("parentProject", "<BILLED-GCP-PROJECT>").

PySpark-Batcharbeitslast für die Wortzählung senden

In diesem Beispiel werden Daten aus BigQuery in einen Spark-DataFrame eingelesen und dann mit der Standard-Datenquellen APIeiner Wortzählung unterzogen.

Der Connector schreibt die Ausgabe der Wortzählung in der folgenden Reihenfolge von Vorgängen in BigQuery:

  1. Puffert die Daten in temporären Dateien in Ihrem Cloud Storage-Bucket

  2. Kopiert die Daten in einem Vorgang aus Ihrem Cloud Storage-Bucket nach BigQuery

  3. Löscht die temporären Dateien in Cloud Storage, nachdem der BigQuery-Ladevorgang abgeschlossen ist. Temporäre Dateien werden auch gelöscht, nachdem die Spark-Anwendung beendet wurde. Wenn das Löschen fehlschlägt, müssen Sie alle unerwünschten temporären Cloud Storage-Dateien löschen, die sich in der Regel in gs://BUCKET_NAME/.spark-bigquery-JOB_ID-UUID befinden.

Schritte zum Ausführen der Arbeitslast für die Wortzählung

  1. Öffnen Sie ein lokales Terminal oder Cloud Shell.
  2. Erstellen Sie das wordcount_dataset mit dem bq Befehlszeilentool in einem lokalen Terminal oder in Cloud Shell.
    bq mk wordcount_dataset
    
  3. Erstellen Sie einen Cloud Storage-Bucket mit der Google Cloud CLI.
    gcloud storage buckets create gs://BUCKET_NAME
    
    Ersetzen Sie BUCKET_NAME durch den Namen des von Ihnen erstellten Cloud Storage-Bucket.
  4. Erstellen Sie die Datei wordcount.py lokal in einem Texteditor, indem Sie den folgenden PySpark-Code kopieren.
    #!/usr/bin/python
    """BigQuery I/O PySpark example."""
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
      .builder \
      .appName('spark-bigquery-demo') \
      .getOrCreate()
    
    # Cloud Storage bucket used by the connector for temporary BigQuery
    # export data.
    bucket = "BUCKET_NAME"
    spark.conf.set('temporaryGcsBucket', bucket)
    
    # Load data from BigQuery.
    words = spark.read.format('bigquery') \
      .load('bigquery-public-data.samples.shakespeare') \
      .load()
    words.createOrReplaceTempView('words')
    
    # Perform word count.
    word_count = spark.sql(
        'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
    word_count.show()
    word_count.printSchema()
    
    # Save the data to BigQuery
    word_count.write.format('bigquery') \
      .save('wordcount_dataset.wordcount_output')
  5. Senden Sie die PySpark-Batcharbeitslast:
    gcloud dataproc batches submit pyspark wordcount.py \
        --region=REGION \
        --deps-bucket=BUCKET_NAME
    
    Beispiel für die Terminalausgabe:
    ...
    +---------+----------+
    |     word|word_count|
    +---------+----------+
    |     XVII|         2|
    |    spoil|        28|
    |    Drink|         7|
    |forgetful|         5|
    |   Cannot|        46|
    |    cures|        10|
    |   harder|        13|
    |  tresses|         3|
    |      few|        62|
    |  steel'd|         5|
    | tripping|         7|
    |   travel|        35|
    |   ransom|        55|
    |     hope|       366|
    |       By|       816|
    |     some|      1169|
    |    those|       508|
    |    still|       567|
    |      art|       893|
    |    feign|        10|
    +---------+----------+
    only showing top 20 rows
    
    root
     |-- word: string (nullable = false)
     |-- word_count: long (nullable = true)
    

    Wenn Sie eine Vorschau der Ausgabetabelle in der Google Cloud Console aufrufen möchten, öffnen Sie die BigQuery Seite, wählen Sie die wordcount_output Tabelle aus und klicken Sie dann auf Vorschau.
    Rendering der BigQuery-Tabellenvorschau
    Abbildung 1: Vorschau der Ausgabetabelle in BigQuery

Weitere Informationen