Spark BigQuery コネクタを使用する

spark-bigquery-connectorApache Spark とともに使用すると、BigQuery との間でデータの読み書きを行えます。このコネクタは、BigQuery からデータを読み取るときに BigQuery Storage API を使用します。

このチュートリアルでは、プリインストールされたコネクタの可用性について説明し、特定のコネクタ バージョンを Spark ジョブで使用できるようにする方法について説明します。コードの例は、Spark アプリケーション内で Spark BigQuery コネクタを使用する方法を示しています。

プリインストールされているコネクタを使用する

Spark BigQuery コネクタは、イメージ バージョン 2.1 以降で作成された Managed Service for Apache Spark クラスタで実行される Spark ジョブにプリインストールされています。プリインストールされたコネクタのバージョンは、各イメージ バージョンのリリースページに記載されています。たとえば、[2.2.x イメージ リリースのバージョン] ページの [BigQuery Connector] 行には、最新の 2.2 イメージ リリースにインストールされているコネクタ バージョンが表示されます。

特定のコネクタ バージョンを Spark ジョブで使用できるようにする

2.1 以降のイメージ バージョンのクラスタにプリインストールされたバージョンとは異なるコネクタ バージョンを使用する場合、または 2.1 より前のイメージ バージョンのクラスタにコネクタをインストールする場合は、このセクションの手順に沿って操作します。

重要: spark-bigquery-connector のバージョンには、Managed Service for Apache Spark クラスタ イメージ バージョンとの互換性がある必要があります。Managed Service for Apache Spark イメージとのコネクタの互換性マトリックスをご覧ください。

2.1 以降の画像モードのクラスタ

2.1 以降のイメージ バージョンで Managed Service for Apache Spark クラスタを作成する場合は、クラスタ メタデータとしてコネクタ バージョンを指定します。2.1

gcloud CLI の例:

gcloud dataproc clusters create CLUSTER_NAME \
    --region=REGION \
    --image-version=2.2 \
    --metadata=SPARK_BQ_CONNECTOR_VERSION or SPARK_BQ_CONNECTOR_URL\
    other flags

注:

  • SPARK_BQ_CONNECTOR_VERSION: コネクタのバージョンを指定します。Spark BigQuery コネクタのバージョンは、GitHub の spark-bigquery-connector/releases ページに記載されています。

    例:

    --metadata=SPARK_BQ_CONNECTOR_VERSION=0.42.1
    
  • SPARK_BQ_CONNECTOR_URL: Cloud Storage 内の jar を指す URL を指定します。GitHub の コネクタのダウンロードと使用の [リンク] 列に記載されているコネクタの URL を指定するか、カスタム コネクタ jar を配置した Cloud Storage のロケーションのパスを指定できます。

    例:

    --metadata=SPARK_BQ_CONNECTOR_URL=gs://spark-lib/bigquery/spark-3.5-bigquery-0.42.1.jar
    --metadata=SPARK_BQ_CONNECTOR_URL=gs://PATH_TO_CUSTOM_JAR
    

2.0 以前のイメージ バージョンのクラスタ

次のいずれかの方法で、Spark BigQuery コネクタをアプリケーションで使用できるようにします。

  1. クラスタの作成時に Managed Service for Apache Spark コネクタ初期化アクションを使用して、すべての ノードの Spark の jars ディレクトリに spark-bigquery-connector をインストールします。

  2. コンソール、gcloud CLI、または Managed Service for Apache Spark API. を使用してジョブをクラスタに送信するときに、コネクタ jar の URL を指定します。 Google Cloud

    コンソール

    Managed Service for Apache Spark [ジョブの送信] ページで、Spark ジョブの [Jar ファイル] 項目を使用します。

    gcloud

    gcloud dataproc jobs submit spark --jars フラグを使用します。

    API

    SparkJob.jarFileUris フィールドを使用します。

    2.0 より前のバージョンのイメージ クラスタで Spark ジョブを実行するときにコネクタ JAR を指定する方法

    • コネクタ jar は、次の URI 文字列の Scala とコネクタのバージョン情報を置き換えて指定します。
      gs://spark-lib/bigquery/spark-bigquery-with-dependencies_SCALA_VERSION-CONNECTOR_VERSION.jar
      
    • Managed Service for Apache Spark イメージ バージョン 1.5+ で Scala 2.12 を使用する
      gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-CONNECTOR_VERSION.jar
      
      gcloud CLI の例:
      gcloud dataproc jobs submit spark \
          --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.23.2.jar \
          -- job args
      
    • Managed Service for Apache Spark イメージ バージョン 1.4 以前で Scala 2.11 を使用する
      gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-CONNECTOR_VERSION.jar
      
      gcloud CLI の例:
      gcloud dataproc jobs submit spark \
          --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.23.2.jar \
          -- job-args
      
  3. Scala または Java Spark アプリケーションに依存関係としてコネクタ jar を含める ( コネクタのコンパイルをご覧ください)。

費用の計算

このドキュメントでは、課金対象である次の コンポーネントを使用します Google Cloud:

  • Managed Service for Apache Spark
  • BigQuery
  • Cloud Storage

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。

新規の Google Cloud ユーザーは無料トライアルをご利用いただける場合があります。

BigQuery との間でデータを読み書きする

このサンプルでは、BigQuery から Spark DataFrame にデータを読み込み、標準データソース API を使用してワード数をカウントします。

コネクタは、最初にデータを Cloud Storage の一時テーブルにバッファリングしてから、BigQuery にすべてのデータを書き込みます。次に、1 回のオペレーションですべてのデータを BigQuery にコピーします。BigQuery の読み込みオペレーションが成功した直後と、Spark アプリケーションが終了する際に、コネクタは一時ファイルの削除を試みます。ジョブが失敗した場合、残っている Cloud Storage の一時ファイルを削除します。通常、一時的な BigQuery ファイルは gs://[bucket]/.spark-bigquery-[jobid]-[UUID] にあります。

課金を構成する

デフォルトでは、認証情報またはサービス アカウントに関連付けられているプロジェクトには、API 使用量に対して課金されます。別のプロジェクトに請求するには、次のように構成します。spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>")

また、読み取り / 書き込みオペレーションに次のように追加することもできます。.option("parentProject", "<BILLED-GCP-PROJECT>")

コードを実行する

このサンプルを実行する場合は、事前に「wordcount_dataset」という名前のデータセットを作成するか、コード内の出力データセットをGoogle Cloud プロジェクトの既存の BigQuery データセットに変更します。

次のように bq コマンドを使用して wordcount_dataset を作成します。

bq mk wordcount_dataset

Google Cloud CLI コマンドを使用して、BigQuery にエクスポートするために使用する Cloud Storage バケットを作成します。

gcloud storage buckets create gs://[bucket]

Scala

  1. コードを調査し、[bucket] プレースホルダを以前に作成した Cloud Storage バケットに置き換えます。
    /*
     * Remove comment if you are not running in spark-shell.
     *
    import org.apache.spark.sql.SparkSession
    val spark = SparkSession.builder()
      .appName("spark-bigquery-demo")
      .getOrCreate()
    */
    
    // Use the Cloud Storage bucket for temporary BigQuery export data used
    // by the connector.
    val bucket = "[bucket]"
    spark.conf.set("temporaryGcsBucket", bucket)
    
    // Load data in from BigQuery. See
    // https://github.com/GoogleCloudDataproc/spark-bigquery-connector/tree/0.17.3#properties
    // for option information.
    val wordsDF =
      spark.read.bigquery("bigquery-public-data:samples.shakespeare")
      .cache()
    
    wordsDF.createOrReplaceTempView("words")
    
    // Perform word count.
    val wordCountDF = spark.sql(
      "SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word")
    wordCountDF.show()
    wordCountDF.printSchema()
    
    // Saving the data to BigQuery.
    (wordCountDF.write.format("bigquery")
      .save("wordcount_dataset.wordcount_output"))
  2. クラスタ上でコードを実行します
    1. SSH を使用して Managed Service for Apache Spark クラスタ マスター ノード
        に接続します。
      1. コンソールで Managed Service for Apache Spark [Clusters] ページに移動し、クラスタの名前をクリックします。 Cloud コンソールの Dataproc [クラスタ] ページ。 Google Cloud
      2. [クラスタの詳細] ページで、[VM インスタンス] タブを選択します。次に、クラスタ マスター ノード名の右側にある SSH をクリックします。 Cloud コンソールの Dataproc [クラスタの詳細] ページ。
        マスターノードのホーム ディレクトリでブラウザ ウィンドウが開きます。
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. 事前にインストールされた vivimnano テキスト エディタで wordcount.scala を作成し、Scala コードの一覧から Scala コードを貼り付けます。
      nano wordcount.scala
        
    3. spark-shell REPL を起動します。
      $ spark-shell --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar
      ...
      Using Scala version ...
      Type in expressions to have them evaluated.
      Type :help for more information.
      ...
      Spark context available as sc.
      ...
      SQL context available as sqlContext.
      scala>
      
    4. :load wordcount.scala コマンドで wordcount.scala を実行して、BigQuery の wordcount_output テーブルを作成します。出力リストには、ワードカウント出力のうち 20 行が表示されます。
      :load wordcount.scala
      ...
      +---------+----------+
      |     word|word_count|
      +---------+----------+
      |     XVII|         2|
      |    spoil|        28|
      |    Drink|         7|
      |forgetful|         5|
      |   Cannot|        46|
      |    cures|        10|
      |   harder|        13|
      |  tresses|         3|
      |      few|        62|
      |  steel'd|         5|
      | tripping|         7|
      |   travel|        35|
      |   ransom|        55|
      |     hope|       366|
      |       By|       816|
      |     some|      1169|
      |    those|       508|
      |    still|       567|
      |      art|       893|
      |    feign|        10|
      +---------+----------+
      only showing top 20 rows
      
      root
       |-- word: string (nullable = false)
       |-- word_count: long (nullable = true)
      

      出力テーブルをプレビューするには、BigQuery ページを開いて wordcount_output テーブルを選択し、[プレビュー] をクリックします。Cloud コンソールの BigQuery Explorer ページでテーブルをプレビューする。

PySpark

  1. コードを調査し、[bucket] プレースホルダを以前に作成した Cloud Storage バケットに置き換えます。
    #!/usr/bin/env python
    
    """BigQuery I/O PySpark example."""
    
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
      .builder \
      .master('yarn') \
      .appName('spark-bigquery-demo') \
      .getOrCreate()
    
    # Use the Cloud Storage bucket for temporary BigQuery export data used
    # by the connector.
    bucket = "[bucket]"
    spark.conf.set('temporaryGcsBucket', bucket)
    
    # Load data from BigQuery.
    words = spark.read.format('bigquery') \
      .load('bigquery-public-data:samples.shakespeare') \
    words.createOrReplaceTempView('words')
    
    # Perform word count.
    word_count = spark.sql(
        'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
    word_count.show()
    word_count.printSchema()
    
    # Save the data to BigQuery
    word_count.write.format('bigquery') \
      .save('wordcount_dataset.wordcount_output')
  2. クラスタ上でコードを実行します
    1. SSH を使用して Managed Service for Apache Spark クラスタ マスター ノードに接続します。
      1. コンソールで Managed Service for Apache Spark [Clusters] ページに移動し、クラスタの名前をクリックします Cloud コンソールの [クラスタ] ページ。 Google Cloud
      2. [クラスタの詳細] ページで、[VM インスタンス] タブを選択します。次に、クラスタ マスター ノード名の右側にある SSH をクリックします。 Cloud コンソールの [クラスタの詳細] ページで、クラスタ名の行の [SSH] を選択します。
        マスターノードのホーム ディレクトリでブラウザ ウィンドウが開きます。
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. 事前にインストールされた vivim、または nano テキスト エディタで wordcount.py を作成し、PySpark のコード一覧から PySpark コードを貼り付けます。
      nano wordcount.py
      
    3. spark-submit で wordcount を実行し、BigQuery の wordcount_output テーブルを作成します。出力リストには、ワードカウント出力のうち 20 行が表示されます。
      spark-submit --jars gs://spark-lib/bigquery/spark-bigquery-latest.jar wordcount.py
      ...
      +---------+----------+
      |     word|word_count|
      +---------+----------+
      |     XVII|         2|
      |    spoil|        28|
      |    Drink|         7|
      |forgetful|         5|
      |   Cannot|        46|
      |    cures|        10|
      |   harder|        13|
      |  tresses|         3|
      |      few|        62|
      |  steel'd|         5|
      | tripping|         7|
      |   travel|        35|
      |   ransom|        55|
      |     hope|       366|
      |       By|       816|
      |     some|      1169|
      |    those|       508|
      |    still|       567|
      |      art|       893|
      |    feign|        10|
      +---------+----------+
      only showing top 20 rows
      
      root
       |-- word: string (nullable = false)
       |-- word_count: long (nullable = true)
      

      出力テーブルをプレビューするには、BigQuery ページを開いて wordcount_output テーブルを選択し、[プレビュー] をクリックします。 Cloud コンソールの BigQuery Explorer ページでテーブルをプレビューする。

トラブルシューティングのヒント

Cloud Logging と BigQuery ジョブ エクスプローラでジョブログを調べて、BigQuery コネクタを使用する Spark ジョブのトラブルシューティングを行うことができます。

  • Managed Service for Apache Spark ドライバログ には、jobId などの BigQuery メタデータを含む BigQueryClient エントリが含まれています。

    ClassNotFoundException INFO BigQueryClient:.. jobId: JobId{project=PROJECT_ID, job=JOB_ID, location=LOCATION}
    
  • BigQuery ジョブ には、Managed Service for Apache Spark_job_id ラベルと Managed Service for Apache Spark_job_uuid ラベルが含まれています。

    • ロギング:
      protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.labels.dataproc_job_id="JOB_ID"
      protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.labels.dataproc_job_uuid="JOB_UUID"
      protoPayload.serviceData.jobCompletedEvent.job.jobName.jobId="JOB_NAME"
      
    • BigQuery ジョブ エクスプローラ: ジョブ ID をクリックすると、[ジョブ情報] の [ラベル] にジョブの詳細が表示されます。

次のステップ