spark-bigquery-connector を Managed Service for Apache Spark とともに使用すると、BigQuery との間でデータの読み書きを行えます。このチュートリアルでは、spark-bigquery-connector を使用する PySpark アプリケーションについて説明します。
コネクタのバージョンを確認する
バッチ ワークロードまたはインタラクティブ セッションのランタイム バージョンにインストールされている BigQuery コネクタのバージョンを確認するには、Managed Service for Apache Spark ランタイム リリース をご覧ください。コネクタが 一覧にない場合は、コネクタをアプリケーションで使用できるようにするをご覧ください。
必要に応じて、コネクタをアプリケーションで使用できるようにする
BigQuery コネクタは、サポートされているすべての
Managed Service for Apache Spark ランタイム バージョンにインストールされます。
コネクタがインストールされない
サポート対象外のランタイム バージョン
(Spark runtime 1.0)を使用している場合は、次のいずれかの方法で、コネクタを
アプリケーションで使用できるようにします。
- Managed Service for Apache Spark バッチ ワークロードを送信するか、インタラクティブ セッションを実行するときに、
jarsパラメータを使用してコネクタ jar ファイルを指定します。次のバッチ ワークロードの例 では、コネクタ jar ファイルを指定しています(使用可能なコネクタ jar ファイルの一覧については、GitHub の GoogleCloudDataproc/spark-bigquery-connector リポジトリをご覧ください)。- Google Cloud CLI の例:
gcloud dataproc batches submit pyspark \ --region=REGION \ --jars=spark-3.5-bigquery-version.jar \ ... other args
- Google Cloud CLI の例:
費用の計算
このチュートリアルでは、以下を含む、 Google Cloudの課金対象となるコンポーネントを使用します。
- Managed Service for Apache Spark
- BigQuery
- Cloud Storage
料金計算ツールを使うと、予想使用量に基づいて費用の見積もり を作成できます。
課金の構成
デフォルトでは、認証情報またはサービス アカウントに関連付けられているプロジェクトには、API 使用量に対して課金されます。別のプロジェクトに課金するには、次の
構成プロパティを設定します: spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>")。
このプロパティは、読み取りまたは書き込みオペレーションに追加することもできます。
.option("parentProject", "<BILLED-GCP-PROJECT>")
PySpark のワードカウント バッチ ワークロードを送信する
このサンプルでは、 BigQuery から Spark DataFrame にデータを読み込み、標準データソース APIを使用してワード数をカウントします。
コネクタは、次のオペレーション シーケンスでワードカウントの出力を BigQuery に書き込みます。
Cloud Storage バケット内の一時ファイルにデータをバッファリングします
1 回のオペレーションで Cloud Storage バケットから BigQuery にデータをコピーします
BigQuery の読み込みオペレーションが完了すると、Cloud Storage 内の一時ファイルが削除されます(一時ファイルは Spark アプリケーションが終了した後にも削除されます)。削除に失敗した場合は、不要な Cloud Storage 一時ファイルを削除する必要があります。ファイルは通常、
gs://BUCKET_NAME/.spark-bigquery-JOB_ID-UUIDにあります。
ワードカウント ワークロードを実行する手順
- ローカル ターミナルまたは Cloud Shell を開きます。
- ローカル ターミナルまたは
Cloud Shell で、bq コマンドライン ツールを使用して
wordcount_datasetを作成します。bq mk wordcount_dataset
- Google Cloud CLI を使用して Cloud Storage バケットを作成します。
gcloud storage buckets create gs://BUCKET_NAME
BUCKET_NAMEは、 作成した Cloud Storage バケットの 名前に置き換えます。 - 次の PySpark コードをコピーして、テキスト エディタで
wordcount.pyファイルをローカルに作成します。#!/usr/bin/python """BigQuery I/O PySpark example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName('spark-bigquery-demo') \ .getOrCreate() # Cloud Storage bucket used by the connector for temporary BigQuery # export data. bucket = "BUCKET_NAME" spark.conf.set('temporaryGcsBucket', bucket) # Load data from BigQuery. words = spark.read.format('bigquery') \ .load('bigquery-public-data.samples.shakespeare') \ .load() words.createOrReplaceTempView('words') # Perform word count. word_count = spark.sql( 'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word') word_count.show() word_count.printSchema() # Save the data to BigQuery word_count.write.format('bigquery') \ .save('wordcount_dataset.wordcount_output')
- PySpark バッチ ワークロードを送信します。
ターミナル出力例:gcloud dataproc batches submit pyspark wordcount.py \ --region=REGION \ --deps-bucket=BUCKET_NAME
... +---------+----------+ | word|word_count| +---------+----------+ | XVII| 2| | spoil| 28| | Drink| 7| |forgetful| 5| | Cannot| 46| | cures| 10| | harder| 13| | tresses| 3| | few| 62| | steel'd| 5| | tripping| 7| | travel| 35| | ransom| 55| | hope| 366| | By| 816| | some| 1169| | those| 508| | still| 567| | art| 893| | feign| 10| +---------+----------+ only showing top 20 rows root |-- word: string (nullable = false) |-- word_count: long (nullable = true)
コンソールで出力テーブルをプレビューするには、 Google Cloud BigQuery ページを開き、wordcount_outputテーブルを選択して、 [プレビュー]をクリックします。
図 1: BigQuery で出力テーブルをプレビューする