Spark BigQuery-Connector verwenden

Der spark-bigquery-connector wird mit Apache Spark verwendet, um Daten aus und in BigQueryzu lesen und zu schreiben. Der Connector nutzt beim Lesen von Daten aus BigQuery die BigQuery Storage API.

In dieser Anleitung finden Sie Informationen zur Verfügbarkeit des vorinstallierten Connectors und erfahren, wie Sie eine bestimmte Connector-Version für Spark-Jobs verfügbar machen. Der Beispielcode zeigt, wie Sie den Spark-BigQuery-Connector in einer Spark-Anwendung verwenden.

Vorinstallierten Connector verwenden

Der Spark-BigQuery-Connector ist auf vorinstalliert und für Spark-Jobs verfügbar, die in Managed Service for Apache Spark-Clustern mit Image-Versionen 2.1 und höher ausgeführt werden. Die vorinstallierte Connector-Version ist auf der Release-Seite jeder Image-Version aufgeführt. In der Zeile BigQuery-Connector auf der Seite mit den Image-Release-Versionen 2.2.x wird beispielsweise die Connector-Version angezeigt, die in den neuesten 2.2-Image-Releases installiert ist.

Bestimmte Connector-Version für Spark-Jobs verfügbar machen

Wenn Sie eine andere Connector-Version als die vorinstallierte Version in einem Cluster mit Image-Version 2.1 oder höher verwenden oder den Connector in einem Cluster mit einer Image-Version vor 2.1 installieren möchten, folgen Sie der Anleitung in diesem Abschnitt.

Wichtig:Die Version von spark-bigquery-connector muss mit der Image-Version des Managed Service for Apache Spark-Clusters kompatibel sein. Weitere Informationen finden Sie in der Kompatibilitätsmatrix für Connector und Managed Service for Apache Spark-Image.

Cluster mit Image-Version 2.1 und höher

Wenn Sie einen Managed Service for Apache Spark-Cluster erstellen mit einer 2.1 oder höher Image-Version, geben Sie die Connector-Version als Cluster-Metadaten an.

Beispiel für die gcloud CLI:

gcloud dataproc clusters create CLUSTER_NAME \
    --region=REGION \
    --image-version=2.2 \
    --metadata=SPARK_BQ_CONNECTOR_VERSION or SPARK_BQ_CONNECTOR_URL\
    other flags

Hinweise:

  • SPARK_BQ_CONNECTOR_VERSION: Geben Sie eine Connector-Version an. Spark-BigQuery-Connector-Versionen sind auf der Seite spark-bigquery-connector/releases auf GitHub aufgeführt.

    Beispiel:

    --metadata=SPARK_BQ_CONNECTOR_VERSION=0.42.1
    
  • SPARK_BQ_CONNECTOR_URL: Geben Sie eine URL an, die auf die JAR-Datei in Cloud Storage verweist. Sie können die URL eines Connectors angeben, der in der Link-Spalte unter Connector in GitHub herunterladen und verwenden aufgeführt ist, oder den Pfad zu einem Cloud Storage-Speicherort, an dem Sie eine benutzerdefinierte Connector-JAR-Datei platziert haben.

    Beispiele:

    --metadata=SPARK_BQ_CONNECTOR_URL=gs://spark-lib/bigquery/spark-3.5-bigquery-0.42.1.jar
    --metadata=SPARK_BQ_CONNECTOR_URL=gs://PATH_TO_CUSTOM_JAR
    

Cluster mit Image-Version 2.0 und früher

Sie können den Spark-BigQuery-Connector auf eine der folgenden Arten für Ihre Anwendung verfügbar machen:

  1. Installieren Sie den Spark-bigquery-Connector im Spark-JAR-Verzeichnis jedes Knotens. Verwenden Sie dazu beim Erstellen des Clusters die Initialisierungsaktion für Managed Service for Apache Spark-Connectors.

  2. Geben Sie die Connector-JAR-URL an, wenn Sie Ihren Job über die Google Cloud Console, die gcloud CLI oder die Managed Service for Apache Spark API an den Cluster senden.

    Console

    Verwenden Sie das Element JAR-Dateien des Spark-Jobs auf der Seite Job senden von Managed Service for Apache Spark.

    gcloud

    Verwenden Sie das gcloud dataproc jobs submit spark --jars Flag.

    API

    Verwenden Sie das SparkJob.jarFileUris Feld.

    Connector-JAR-Datei beim Ausführen von Spark-Jobs in Clustern mit Image-Version vor 2.0 angeben

    • Geben Sie die Connector-JAR-Datei an, indem Sie die Scala- und Connector-Versionsinformationen im folgenden URI-String ersetzen:
      gs://spark-lib/bigquery/spark-bigquery-with-dependencies_SCALA_VERSION-CONNECTOR_VERSION.jar
      
    • Verwenden Sie Scala 2.12 mit Managed Service for Apache Spark-Image-Versionen 1.5+
      gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-CONNECTOR_VERSION.jar
      
      Beispiel für die gcloud CLI:
      gcloud dataproc jobs submit spark \
          --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.23.2.jar \
          -- job args
      
    • Verwenden Sie Scala 2.11 mit Managed Service for Apache Spark-Image-Versionen 1.4 und früher:
      gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-CONNECTOR_VERSION.jar
      
      Beispiel für die gcloud CLI:
      gcloud dataproc jobs submit spark \
          --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.23.2.jar \
          -- job-args
      
  3. Schließen Sie die Connector-JAR-Datei in Ihre Scala- oder Java Spark-Anwendung als Abhängigkeit ein (siehe Für den Connector kompilieren).

Kosten berechnen

In diesem Dokument verwenden Sie die folgenden kostenpflichtigen Komponenten von Google Cloud:

  • Managed Service for Apache Spark
  • BigQuery
  • Cloud Storage

Mit dem Preisrechner können Sie eine Kostenschätzung für Ihre voraussichtliche Nutzung vornehmen. Verwenden Sie den Preisrechner.

Neuen Google Cloud Nutzern vonsteht möglicherweise eine kostenlose Testversion zur Verfügung.

Daten aus und in BigQuery lesen und schreiben

In diesem Beispiel werden Daten aus BigQuery in einen Spark-DataFrame eingelesen und dann mit der Standard-Datenquellen APIeiner Wortzählung unterzogen.

Der Connector schreibt die Daten in BigQuery, indem er zuerst alle Daten in einer temporären Cloud Storage-Tabelle zwischenspeichert. Anschließend werden alle Daten in einem Vorgang in BigQuery kopiert. Sobald der BigQuery-Ladevorgang erfolgreich war, versucht der Connector, die temporären Dateien zu löschen. Dies geschieht auch, wenn die Spark-Anwendung beendet wird. Wenn der Job fehlschlägt, entfernen Sie alle verbleibenden temporären Cloud Storage-Dateien. In der Regel befinden sich temporäre BigQuery-Dateien in gs://[bucket]/.spark-bigquery-[jobid]-[UUID].

Abrechnung konfigurieren

Standardmäßig wird das Projekt, das mit den Anmeldeinformationen oder dem Dienstkonto verbunden ist, für die API-Nutzung abgerechnet. Wenn Sie ein anderes Projekt in Rechnung stellen möchten, legen Sie die folgende Konfiguration fest: spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>").

Sie kann auch einem Lese- oder Schreibvorgang hinzugefügt werden: .option("parentProject", "<BILLED-GCP-PROJECT>").

Code ausführen

Bevor Sie dieses Beispiel ausführen, erstellen Sie ein Dataset mit dem Namen „wordcount_dataset“ oder ändern Sie das Ausgabe-Dataset im Code in ein vorhandenes BigQuery-Dataset in Ihrem Google Cloud Projekt.

Verwenden Sie den bq-Befehl, um wordcount_dataset zu erstellen:

bq mk wordcount_dataset

Verwenden Sie den Google Cloud CLI-Befehl zum Erstellen eines Cloud Storage-Bucket, der für den Export nach BigQuery verwendet wird:

gcloud storage buckets create gs://[bucket]

Scala

  1. Untersuchen Sie den Code und ersetzen Sie den Platzhalter [bucket] durch den zuvor erstellten Cloud Storage-Bucket.
    /*
     * Remove comment if you are not running in spark-shell.
     *
    import org.apache.spark.sql.SparkSession
    val spark = SparkSession.builder()
      .appName("spark-bigquery-demo")
      .getOrCreate()
    */
    
    // Use the Cloud Storage bucket for temporary BigQuery export data used
    // by the connector.
    val bucket = "[bucket]"
    spark.conf.set("temporaryGcsBucket", bucket)
    
    // Load data in from BigQuery. See
    // https://github.com/GoogleCloudDataproc/spark-bigquery-connector/tree/0.17.3#properties
    // for option information.
    val wordsDF =
      spark.read.bigquery("bigquery-public-data:samples.shakespeare")
      .cache()
    
    wordsDF.createOrReplaceTempView("words")
    
    // Perform word count.
    val wordCountDF = spark.sql(
      "SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word")
    wordCountDF.show()
    wordCountDF.printSchema()
    
    // Saving the data to BigQuery.
    (wordCountDF.write.format("bigquery")
      .save("wordcount_dataset.wordcount_output"))
  2. Führen Sie den Code in Ihrem Cluster aus.
    1. Stellen Sie eine SSH-Verbindung zum Masterknoten des Managed Service for Apache Spark-Clusters her. Masterknoten
      1. Rufen Sie in der Console die Seite Managed Service for Apache Spark Cluster auf und klicken Sie dann auf den Namen Ihres ClustersSeite „Dataproc-Cluster“ in der Cloud Console.. Google Cloud
      2. Wählen Sie auf der Seite >Clusterdetails den Tab „VM-Instanzen“ aus. Klicken Sie dann rechts neben dem Namen des Masterknotens des Clusters auf SSH> Seite „Dataproc-Clusterdetails“ in der Cloud Console
        Ein Browserfenster wird in Ihrem Home-Verzeichnis auf dem Masterknoten geöffnet.
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. Erstellen Sie wordcount.scala mit dem vorinstallierten Texteditor vi, vim oder nano und fügen Sie dann den Scala Code aus der Scala Code Liste ein.
      nano wordcount.scala
        
    3. Starten Sie die spark-shell-REPL.
      $ spark-shell --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar
      ...
      Using Scala version ...
      Type in expressions to have them evaluated.
      Type :help for more information.
      ...
      Spark context available as sc.
      ...
      SQL context available as sqlContext.
      scala>
      
    4. Führen Sie wordcount.scala mit dem :load wordcount.scala-Befehl aus, um die BigQuery-wordcount_output-Tabelle zu erstellen. Die Ausgabeliste zeigt 20 Zeilen von der Wordcount-Ausgabe an.
      :load wordcount.scala
      ...
      +---------+----------+
      |     word|word_count|
      +---------+----------+
      |     XVII|         2|
      |    spoil|        28|
      |    Drink|         7|
      |forgetful|         5|
      |   Cannot|        46|
      |    cures|        10|
      |   harder|        13|
      |  tresses|         3|
      |      few|        62|
      |  steel'd|         5|
      | tripping|         7|
      |   travel|        35|
      |   ransom|        55|
      |     hope|       366|
      |       By|       816|
      |     some|      1169|
      |    those|       508|
      |    still|       567|
      |      art|       893|
      |    feign|        10|
      +---------+----------+
      only showing top 20 rows
      
      root
       |-- word: string (nullable = false)
       |-- word_count: long (nullable = true)
      

      Wenn Sie eine Vorschau der Ausgabetabelle aufrufen möchten, öffnen Sie die BigQuery Seite, wählen Sie die wordcount_output Tabelle aus und klicken Sie dann auf Vorschau. Rufen Sie in der Cloud Console auf der Seite „BigQuery Explorer“ eine Vorschau der Tabelle auf.

PySpark

  1. Untersuchen Sie den Code und ersetzen Sie den Platzhalter [bucket] durch den zuvor erstellten Cloud Storage-Bucket.
    #!/usr/bin/env python
    
    """BigQuery I/O PySpark example."""
    
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
      .builder \
      .master('yarn') \
      .appName('spark-bigquery-demo') \
      .getOrCreate()
    
    # Use the Cloud Storage bucket for temporary BigQuery export data used
    # by the connector.
    bucket = "[bucket]"
    spark.conf.set('temporaryGcsBucket', bucket)
    
    # Load data from BigQuery.
    words = spark.read.format('bigquery') \
      .load('bigquery-public-data:samples.shakespeare') \
    words.createOrReplaceTempView('words')
    
    # Perform word count.
    word_count = spark.sql(
        'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
    word_count.show()
    word_count.printSchema()
    
    # Save the data to BigQuery
    word_count.write.format('bigquery') \
      .save('wordcount_dataset.wordcount_output')
  2. Führen Sie den Code in Ihrem Cluster aus.
    1. Stellen Sie eine SSH-Verbindung zum Masterknoten des Managed Service for Apache Spark-Clusters her.
      1. Rufen Sie in der Console die Seite Managed Service for Apache Spark Cluster auf und klicken Sie dann auf den Namen Ihres ClustersSeite „Cluster“ in der Cloud Console. Google Cloud
      2. Wählen Sie auf der Seite Clusterdetails den Tab „VM-Instanzen“ aus. Klicken Sie dann rechts neben dem Namen des Masterknotens des Clusters auf SSH Wählen Sie auf der Seite „Clusterdetails“ in der Cloud Console in der Zeile mit dem Clusternamen „SSH“ aus.
        Ein Browserfenster wird in Ihrem Home-Verzeichnis auf dem Masterknoten geöffnet.
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. Erstellen Sie wordcount.py mit dem vorinstallierten Texteditor vi, vim oder nano und fügen Sie dann den PySpark Code aus der PySpark-Codeliste ein.
      nano wordcount.py
      
    3. Führen Sie Wordcount mit spark-submit aus, um die BigQuery-wordcount_output-Tabelle zu erstellen. Die Ausgabeliste zeigt 20 Zeilen von der Wordcount-Ausgabe an.
      spark-submit --jars gs://spark-lib/bigquery/spark-bigquery-latest.jar wordcount.py
      ...
      +---------+----------+
      |     word|word_count|
      +---------+----------+
      |     XVII|         2|
      |    spoil|        28|
      |    Drink|         7|
      |forgetful|         5|
      |   Cannot|        46|
      |    cures|        10|
      |   harder|        13|
      |  tresses|         3|
      |      few|        62|
      |  steel'd|         5|
      | tripping|         7|
      |   travel|        35|
      |   ransom|        55|
      |     hope|       366|
      |       By|       816|
      |     some|      1169|
      |    those|       508|
      |    still|       567|
      |      art|       893|
      |    feign|        10|
      +---------+----------+
      only showing top 20 rows
      
      root
       |-- word: string (nullable = false)
       |-- word_count: long (nullable = true)
      

      Wenn Sie eine Vorschau der Ausgabetabelle aufrufen möchten, öffnen Sie die BigQuery Seite, wählen Sie die wordcount_output Tabelle aus und klicken Sie dann auf Vorschau. Rufen Sie in der Cloud Console auf der Seite „BigQuery Explorer“ eine Vorschau der Tabelle auf.

Tipps zur Fehlerbehebung

Sie können Joblogs in Cloud Logging und im BigQuery Jobs Explorer untersuchen, um Fehler bei Spark-Jobs zu beheben, die den BigQuery-Connector verwenden.

  • Managed Service for Apache Spark-Treiberlogs enthalten einen BigQueryClient-Eintrag mit BigQuery-Metadaten, einschließlich der jobId:

    ClassNotFoundException INFO BigQueryClient:.. jobId: JobId{project=PROJECT_ID, job=JOB_ID, location=LOCATION}
    
  • BigQuery-Jobs enthalten die Labels Managed Service for Apache Spark_job_id und Managed Service for Apache Spark_job_uuid:

    • Logging:
      protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.labels.dataproc_job_id="JOB_ID"
      protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.labels.dataproc_job_uuid="JOB_UUID"
      protoPayload.serviceData.jobCompletedEvent.job.jobName.jobId="JOB_NAME"
      
    • BigQuery Jobs Explorer: Klicken Sie auf eine Job-ID, um Jobdetails unter Labels in Jobinformationen aufzurufen.

Nächste Schritte