Menggunakan konektor Spark BigQuery

spark-bigquery-connector digunakan dengan Apache Spark untuk membaca dan menulis data dari dan ke BigQuery. Konektor ini memanfaatkan BigQuery Storage API saat membaca data dari BigQuery.

Tutorial ini memberikan informasi tentang ketersediaan konektor yang telah diinstal sebelumnya, dan menunjukkan cara membuat versi konektor tertentu tersedia untuk tugas Spark. Kode contoh menunjukkan cara menggunakan konektor Spark BigQuery dalam aplikasi Spark.

Menggunakan konektor yang telah diinstal sebelumnya

Konektor Spark BigQuery telah diinstal sebelumnya dan tersedia untuk tugas Spark yang dijalankan di cluster Managed Service untuk Apache Spark yang dibuat dengan versi image 2.1 dan yang lebih baru. Versi konektor yang telah diinstal sebelumnya tercantum di setiap halaman rilis versi image. Misalnya, baris BigQuery Connector di halaman versi rilis image 2.2.x menampilkan versi konektor yang diinstal pada rilis image 2.2 terbaru.

Membuat versi konektor tertentu tersedia untuk tugas Spark

Jika Anda ingin menggunakan versi konektor yang berbeda dari versi yang telah diinstal sebelumnya di cluster versi image 2.1 atau yang lebih baru, atau jika Anda ingin menginstal konektor di cluster versi image sebelum 2.1, ikuti petunjuk di bagian ini.

Penting: Versi spark-bigquery-connector harus kompatibel dengan versi image cluster Managed Service untuk Apache Spark. Lihat Matriks Kompatibilitas Image Konektor ke Managed Service untuk Apache Spark.

Cluster versi image 2.1 dan yang lebih baru

Saat Anda membuat cluster Managed Service untuk Apache Spark dengan versi image 2.1 atau yang lebih baru, tentukan versi konektor sebagai metadata cluster.

Contoh gcloud CLI:

gcloud dataproc clusters create CLUSTER_NAME \
    --region=REGION \
    --image-version=2.2 \
    --metadata=SPARK_BQ_CONNECTOR_VERSION or SPARK_BQ_CONNECTOR_URL\
    other flags

Catatan:

  • SPARK_BQ_CONNECTOR_VERSION: Tentukan versi konektor. Versi konektor Spark BigQuery tercantum di halaman spark-bigquery-connector/releases di GitHub.

    Contoh:

    --metadata=SPARK_BQ_CONNECTOR_VERSION=0.42.1
    
  • SPARK_BQ_CONNECTOR_URL: Tentukan URL yang mengarah ke jar di Cloud Storage. Anda dapat menentukan URL konektor yang tercantum di kolom link di bagian Mendownload dan Menggunakan Konektor di GitHub atau jalur ke lokasi Cloud Storage tempat Anda menempatkan jar konektor kustom.

    Contoh:

    --metadata=SPARK_BQ_CONNECTOR_URL=gs://spark-lib/bigquery/spark-3.5-bigquery-0.42.1.jar
    --metadata=SPARK_BQ_CONNECTOR_URL=gs://PATH_TO_CUSTOM_JAR
    

Cluster versi image 2.0 dan yang lebih lama

Anda dapat membuat konektor Spark BigQuery tersedia untuk aplikasi Anda dengan salah satu cara berikut:

  1. Instal spark-bigquery-connector di direktori jar Spark setiap node menggunakan tindakan inisialisasi konektor Managed Service untuk Apache Spark saat Anda membuat cluster.

  2. Berikan URL jar konektor saat Anda mengirimkan tugas ke cluster menggunakan konsol Google Cloud , gcloud CLI, atau Managed Service untuk Apache Spark API.

    Konsol

    Gunakan item Jars files tugas Spark di halaman Submit a job Managed Service untuk Apache Spark.

    gcloud

    Gunakan gcloud dataproc jobs submit spark --jars flag.

    API

    Gunakan SparkJob.jarFileUris kolom.

    Cara menentukan jar konektor saat menjalankan tugas Spark di cluster versi image sebelum 2.0

    • Tentukan jar konektor dengan mengganti informasi versi Scala dan konektor dalam string URI berikut:
      gs://spark-lib/bigquery/spark-bigquery-with-dependencies_SCALA_VERSION-CONNECTOR_VERSION.jar
      
    • Gunakan Scala 2.12 dengan versi image Managed Service untuk Apache Spark 1.5+
      gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-CONNECTOR_VERSION.jar
      
      Contoh gcloud CLI:
      gcloud dataproc jobs submit spark \
          --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.23.2.jar \
          -- job args
      
    • Gunakan Scala 2.11 dengan versi image Managed Service untuk Apache Spark 1.4 dan yang lebih lama:
      gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-CONNECTOR_VERSION.jar
      
      Contoh gcloud CLI:
      gcloud dataproc jobs submit spark \
          --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.23.2.jar \
          -- job-args
      
  3. Sertakan jar konektor dalam aplikasi Spark Scala atau Java sebagai dependensi (lihat Mengompilasi terhadap konektor).

Menghitung biaya

Dalam dokumen ini, Anda akan menggunakan komponen yang dapat ditagih sebagai berikut Google Cloud:

  • Managed Service for Apache Spark
  • BigQuery
  • Cloud Storage

Untuk membuat perkiraan biaya berdasarkan proyeksi penggunaan Anda, gunakan kalkulator harga.

Pengguna baru mungkin memenuhi syarat untuk mendapatkan uji coba gratis. Google Cloud

Membaca dan menulis data dari dan ke BigQuery

Contoh ini membaca data dari BigQuery ke dalam DataFrame Spark untuk melakukan penghitungan kata menggunakan API sumber data standar.

Konektor menulis data ke BigQuery dengan terlebih dahulu melakukan buffering semua data ke dalam tabel sementara Cloud Storage. Kemudian, konektor akan menyalin semua data dari ke BigQuery dalam satu operasi. Konektor mencoba menghapus file sementara setelah operasi pemuatan BigQuery berhasil dan sekali lagi saat aplikasi Spark dihentikan. Jika tugas gagal, hapus file Cloud Storage sementara yang tersisa. Biasanya, file BigQuery sementara terletak di gs://[bucket]/.spark-bigquery-[jobid]-[UUID].

Mengonfigurasi penagihan

Secara default, project yang terkait dengan kredensial atau akun layanan akan ditagih untuk penggunaan API. Untuk menagih project lain, tetapkan konfigurasi berikut: spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>").

Konfigurasi ini juga dapat ditambahkan ke operasi baca atau tulis, sebagai berikut: .option("parentProject", "<BILLED-GCP-PROJECT>").

Menjalankan kode

Sebelum menjalankan contoh ini, buat set data bernama "wordcount_dataset" atau ubah set data output dalam kode ke set data BigQuery yang ada di Google Cloud project Anda.

Gunakan perintah bq untuk membuat wordcount_dataset:

bq mk wordcount_dataset

Gunakan perintah Google Cloud CLI untuk membuat bucket Cloud Storage, yang akan digunakan untuk diekspor ke BigQuery:

gcloud storage buckets create gs://[bucket]

Scala

  1. Periksa kode dan ganti placeholder [bucket] dengan bucket Cloud Storage yang Anda buat sebelumnya.
    /*
     * Remove comment if you are not running in spark-shell.
     *
    import org.apache.spark.sql.SparkSession
    val spark = SparkSession.builder()
      .appName("spark-bigquery-demo")
      .getOrCreate()
    */
    
    // Use the Cloud Storage bucket for temporary BigQuery export data used
    // by the connector.
    val bucket = "[bucket]"
    spark.conf.set("temporaryGcsBucket", bucket)
    
    // Load data in from BigQuery. See
    // https://github.com/GoogleCloudDataproc/spark-bigquery-connector/tree/0.17.3#properties
    // for option information.
    val wordsDF =
      spark.read.bigquery("bigquery-public-data:samples.shakespeare")
      .cache()
    
    wordsDF.createOrReplaceTempView("words")
    
    // Perform word count.
    val wordCountDF = spark.sql(
      "SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word")
    wordCountDF.show()
    wordCountDF.printSchema()
    
    // Saving the data to BigQuery.
    (wordCountDF.write.format("bigquery")
      .save("wordcount_dataset.wordcount_output"))
  2. Jalankan kode di cluster Anda
    1. Gunakan SSH untuk terhubung ke node master cluster Managed Service untuk Apache Spark
      1. Buka halaman Clusters Managed Service untuk Apache Spark di Google Cloud konsol, lalu klik nama cluster Anda Halaman cluster Dataproc di Konsol Cloud.
      2. Di halaman >Cluster details, pilih tab VM Instances. Kemudian, klik SSH di sebelah kanan nama node master cluster> Halaman detail Cluster Dataproc di Konsol Cloud.
        Jendela browser akan terbuka di direktori beranda Anda di node master
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. Buat wordcount.scala dengan editor teks vi, vim, atau nano yang telah diinstal sebelumnya, lalu tempelkan kode Scala dari listing kode Scala
      nano wordcount.scala
        
    3. Luncurkan REPL spark-shell.
      $ spark-shell --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar
      ...
      Using Scala version ...
      Type in expressions to have them evaluated.
      Type :help for more information.
      ...
      Spark context available as sc.
      ...
      SQL context available as sqlContext.
      scala>
      
    4. Jalankan wordcount.scala dengan perintah :load wordcount.scala untuk membuat tabel wordcount_output BigQuery. Listing output menampilkan 20 baris dari output wordcount.
      :load wordcount.scala
      ...
      +---------+----------+
      |     word|word_count|
      +---------+----------+
      |     XVII|         2|
      |    spoil|        28|
      |    Drink|         7|
      |forgetful|         5|
      |   Cannot|        46|
      |    cures|        10|
      |   harder|        13|
      |  tresses|         3|
      |      few|        62|
      |  steel'd|         5|
      | tripping|         7|
      |   travel|        35|
      |   ransom|        55|
      |     hope|       366|
      |       By|       816|
      |     some|      1169|
      |    those|       508|
      |    still|       567|
      |      art|       893|
      |    feign|        10|
      +---------+----------+
      only showing top 20 rows
      
      root
       |-- word: string (nullable = false)
       |-- word_count: long (nullable = true)
      

      Untuk melihat pratinjau tabel output, buka BigQuery halaman, pilih tabel wordcount_output, lalu klik Preview. Pratinjau tabel di halaman BigQuery Explorer di Konsol Cloud.

PySpark

  1. Periksa kode dan ganti placeholder [bucket] dengan bucket Cloud Storage yang Anda buat sebelumnya.
    #!/usr/bin/env python
    
    """BigQuery I/O PySpark example."""
    
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
      .builder \
      .master('yarn') \
      .appName('spark-bigquery-demo') \
      .getOrCreate()
    
    # Use the Cloud Storage bucket for temporary BigQuery export data used
    # by the connector.
    bucket = "[bucket]"
    spark.conf.set('temporaryGcsBucket', bucket)
    
    # Load data from BigQuery.
    words = spark.read.format('bigquery') \
      .load('bigquery-public-data:samples.shakespeare') \
    words.createOrReplaceTempView('words')
    
    # Perform word count.
    word_count = spark.sql(
        'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
    word_count.show()
    word_count.printSchema()
    
    # Save the data to BigQuery
    word_count.write.format('bigquery') \
      .save('wordcount_dataset.wordcount_output')
  2. Jalankan kode di cluster Anda
    1. Gunakan SSH untuk terhubung ke node master cluster Managed Service untuk Apache Spark
      1. Buka halaman Clusters Managed Service untuk Apache Spark di Google Cloud konsol, lalu klik nama cluster Anda Halaman cluster di Konsol Cloud.
      2. Di halaman Cluster details, pilih tab VM Instances. Kemudian, klik SSH di sebelah kanan nama node master cluster Pilih SSH di baris nama cluster pada halaman Cluster details di Konsol Cloud.
        Jendela browser akan terbuka di direktori beranda Anda di node master
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. Buat wordcount.py dengan editor teks vi, vim, atau nano yang telah diinstal sebelumnya, lalu tempelkan kode PySpark dari listing kode PySpark
      nano wordcount.py
      
    3. Jalankan wordcount dengan spark-submit untuk membuat tabel BigQuerywordcount_output. Listing output menampilkan 20 baris dari output wordcount.
      spark-submit --jars gs://spark-lib/bigquery/spark-bigquery-latest.jar wordcount.py
      ...
      +---------+----------+
      |     word|word_count|
      +---------+----------+
      |     XVII|         2|
      |    spoil|        28|
      |    Drink|         7|
      |forgetful|         5|
      |   Cannot|        46|
      |    cures|        10|
      |   harder|        13|
      |  tresses|         3|
      |      few|        62|
      |  steel'd|         5|
      | tripping|         7|
      |   travel|        35|
      |   ransom|        55|
      |     hope|       366|
      |       By|       816|
      |     some|      1169|
      |    those|       508|
      |    still|       567|
      |      art|       893|
      |    feign|        10|
      +---------+----------+
      only showing top 20 rows
      
      root
       |-- word: string (nullable = false)
       |-- word_count: long (nullable = true)
      

      Untuk melihat pratinjau tabel output, buka BigQuery halaman, pilih tabel wordcount_output lalu klik Preview. Pratinjau tabel di halaman BigQuery Explorer di Konsol Cloud.

Tips pemecahan masalah

Anda dapat memeriksa log tugas di Cloud Logging dan di BigQuery Jobs Explorer untuk memecahkan masalah tugas Spark yang menggunakan konektor BigQuery.

  • Log driver Managed Service untuk Apache Spark berisi entri BigQueryClient dengan metadata BigQuery yang menyertakan jobId:

    ClassNotFoundException INFO BigQueryClient:.. jobId: JobId{project=PROJECT_ID, job=JOB_ID, location=LOCATION}
    
  • Tugas BigQuery berisi Managed Service for Apache Spark_job_id dan Managed Service for Apache Spark_job_uuid label:

    • Logging:
      protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.labels.dataproc_job_id="JOB_ID"
      protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.labels.dataproc_job_uuid="JOB_UUID"
      protoPayload.serviceData.jobCompletedEvent.job.jobName.jobId="JOB_NAME"
      
    • BigQuery Jobs Explorer: Klik ID tugas untuk melihat detail tugas di bagian Labels di Job information.

Langkah berikutnya