このページでは、 Spark Spanner コネクタ を使用して、Apache SparkでSpanner との間でデータを読み取り / 書き込みを行う Managed Service for Apache Spark クラスタを作成する方法について説明します。
Spanner コネクタは Spark と連携して、Spanner Java ライブラリを使用して Spanner データベースとの間でデータを読み取り / 書き込みを行います。 Spanner コネクタは、Spanner テーブルと グラフを Spark DataFrames と GraphFramesに読み込むこと、 および DataFrame データを Spanner テーブルに書き込むことをサポートしています。
費用
このドキュメントでは、課金対象である次のコンポーネントを使用します。 Google Cloud
- Managed Service for Apache Spark
- Spanner
- Cloud Storage
料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。
始める前に
- アカウントにログインします Google Cloud を初めて使用する場合は、 アカウントを作成して、実際のシナリオで Google プロダクトのパフォーマンスを評価してください。 Google Cloud新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Spanner, Managed Service for Apache Spark, and Cloud Storage APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Spanner, Managed Service for Apache Spark, and Cloud Storage APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.- 必要なロールを付与します。
- Managed Service for Apache Spark クラスタを設定します。
- `Singers` データベース テーブルを使用して Spanner インスタンスを設定します。
必要なロールを付与する
このページの例を実行するには、特定の IAM ロールが必要です。組織のポリシーによっては、これらのロールがすでに付与されている場合があります。ロールの付与を確認するには、 ロールを付与する必要がありますか?をご覧ください。
ロールの付与については、 プロジェクト、フォルダ、組織へのアクセス権の管理をご覧ください。
Compute Engine のデフォルト サービス アカウントに、Managed Service for Apache Spark クラスタを作成するために必要な 権限を付与するには、プロジェクトに対する次の IAM ロールを Compute Engine のデフォルト サービス アカウントに付与するよう管理者に依頼します。
正しいプリンシパルにロールを付与しないと、権限エラーが発生する可能性があります。- Dataproc ワーカー (
roles/dataproc.worker) - Cloud Spanner データベース ユーザー (
roles/spanner.databaseUser) - Cloud Spanner データベース読み取り(DataBoost 付き) (
roles/spanner.databaseReaderWithDataBoost)
Managed Service for Apache Spark クラスタを設定する
Managed Service for Apache Spark クラスタを作成する
か、
2.1 以降の Managed Service for Apache Spark イメージで作成された既存の Managed Service for Apache Spark クラスタを使用します。クラスタが 2.0 以前のイメージで作成された場合は、
scope プロパティが
cloud-platform スコープに設定されている必要があります。
`Singers` データベース テーブルを使用して Spanner インスタンスを設定する
Singers テーブルを含むデータベースを使用して Spanner インスタンスを作成します。Spanner インスタンス ID とデータベース ID をメモします。
Spark で Spanner コネクタを使用する
Spanner コネクタは、Spark バージョン 3.1+ で使用できます。ジョブを
Managed Service for Apache Spark クラスタに
送信するときに、Cloud Storage コネクタ JAR ファイル仕様の一部として
コネクタ バージョン
を
指定します。
例: Spanner コネクタを使用した gcloud CLI Spark ジョブの送信。
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar \ ... [other job submission flags]
次のように置き換えます。
CONNECTOR_VERSION: Spanner コネクタのバージョン。GitHub
GoogleCloudDataproc/spark-spanner-connector リポジトリのバージョン リストから Spanner コネクタのバージョンを選択します。
Spanner テーブルを読み取る
Python または Scala を使用して、Spark データソース API を使用して Spanner テーブルデータを Spark Dataframe に読み取ることが可能です。
PySpark
このセクションの PySpark コードの例をクラスタで実行するには、Managed Service for Apache Spark にジョブを送信するか、クラスタ マスターノードの spark-submit REPL からジョブを実行します。
Managed Service for Apache Spark ジョブ
- ローカル
テキスト エディタを使用するか、Cloud Shell でプリインストールされている
vi、vim、またはnanoテキスト エディタを使用して にsingers.pyファイルを作成します。 - プレースホルダ変数に値を入力したら、次のコードを
singers.pyファイルに貼り付けます。Spanner の Data Boost 機能が有効になっています。これはメインの Spanner インスタンスへの影響がほぼゼロです。#!/usr/bin/env python """Spanner PySpark read example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-spanner-demo') \ .getOrCreate() # Load data from Spanner. singers = spark.read.format('cloud-spanner') \ .option("projectId", "PROJECT_ID") \ .option("instanceId", "INSTANCE_ID") \ .option("databaseId", "DATABASE_ID") \ .option("table", "TABLE_NAME") \ .option("enableDataBoost", "true") \ .load() singers.createOrReplaceTempView('Singers') # Read from Singers result = spark.sql('SELECT * FROM Singers') result.show() result.printSchema()
次のように置き換えます。
- PROJECT_ID: 実際の Google Cloud プロジェクト ID。 プロジェクト ID は、 Google Cloud コンソールのダッシュボードの [プロジェクト情報] セクションに表示されます。
- INSTANCE_ID、DATABASE_ID、TABLE_NAME:
Singersデータベース テーブルを使用して Spanner インスタンスを設定するをご覧ください。
singers.pyファイルを保存します。- ジョブを送信
するには、Managed Service for Apache Spark で Google Cloud コンソール、gcloud CLI、または
REST API を使用します。
例: Spanner コネクタを使用した gcloud CLI ジョブの送信。
gcloud dataproc jobs submit pyspark singers.py \ --cluster=CLUSTER_NAME \ --region=REGION \ --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar次のように置き換えます。
- CLUSTER_NAME: 新しいクラスタの名前。
- REGION: ワークロードを実行できる利用可能な Compute Engine リージョン。
- CONNECTOR_VERSION: Spanner コネクタのバージョン。
GitHub の
GoogleCloudDataproc/spark-spanner-connectorリポジトリのバージョン リストから Spanner コネクタのバージョンを選択します。
spark-submit ジョブ
- SSH を使用して Managed Service for Apache Spark クラスタ マスターノードに接続します。
- コンソールで、Managed Service for Apache Spark の [Clusters] ページに移動し、クラスタ名をクリックします。 Google Cloud
- [クラスタの詳細] ページで、[VM インスタンス] タブを選択します。次に、クラスタ マスターノード名の右側にある
SSHをクリックします。
マスターノード上のホーム ディレクトリでブラウザ ウィンドウが開きます。
Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- 事前にインストールされている
vi、vim、またはnanoテキスト エディタを使用して、マスターノードにsingers.pyファイルを作成します。- プレースホルダ変数を
singers.pyファイルに入力したら、次のコードをsingers.pyファイルに貼り付けます。Spanner の Data Boost 機能が 有効になっています。これはメインの Spanner インスタンスへの影響がほぼゼロです。#!/usr/bin/env python """Spanner PySpark read example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-spanner-demo') \ .getOrCreate() # Load data from Spanner. singers = spark.read.format('cloud-spanner') \ .option("projectId", "PROJECT_ID") \ .option("instanceId", "INSTANCE_ID") \ .option("databaseId", "DATABASE_ID") \ .option("table", "TABLE_NAME") \ .option("enableDataBoost", "true") \ .load() singers.createOrReplaceTempView('Singers') # Read from Singers result = spark.sql('SELECT * FROM Singers') result.show() result.printSchema()
次のように置き換えます。
- PROJECT_ID: 実際の Google Cloud プロジェクト ID。 プロジェクト ID は、 Google Cloud コンソールのダッシュボードの [プロジェクト情報] セクションに表示されます。
- INSTANCE_ID、DATABASE_ID、TABLE_NAME:
Singersデータベース テーブルを使用して Spanner インスタンスを設定するをご覧ください。
singers.pyファイルを保存します。
- プレースホルダ変数を
spark-submitでsingers.pyを実行して、Spanner のSingersテーブルを作成します。spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
次のように置き換えます。
- CONNECTOR_VERSION: Spanner コネクタのバージョン。
GitHub の
GoogleCloudDataproc/spark-spanner-connectorリポジトリのバージョン リストから Spanner コネクタのバージョンを選択します。
次のように出力されます。
... +--------+---------+--------+---------+-----------+ |SingerId|FirstName|LastName|BirthDate|LastUpdated| +--------+---------+--------+---------+-----------+ | 1| Marc|Richards| null| null| | 2| Catalina| Smith| null| null| | 3| Alice| Trentor| null| null| +--------+---------+--------+---------+-----------+ root |-- SingerId: long (nullable = false) |-- FirstName: string (nullable = true) |-- LastName: string (nullable = true) |-- BirthDate: date (nullable = true) |-- LastUpdated: timestamp (nullable = true) only showing top 20 rows
- CONNECTOR_VERSION: Spanner コネクタのバージョン。
GitHub の
Scala
クラスタで Scala サンプルコードを実行する手順は次のとおりです。
- SSH を使用して Managed Service for Apache Spark クラスタ マスターノードに接続します。
- コンソールで、Managed Service for Apache Spark の [Clusters] ページに移動し、クラスタ名をクリックします。 Google Cloud
- [クラスタの詳細] ページで、[VM インスタンス] タブを選択します。次に、クラスタ マスターノード名の右側にある
SSHをクリックします。
マスターノード上のホーム ディレクトリでブラウザ ウィンドウが開きます。
Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- 事前にインストールされている
vi、vim、nanoテキスト エディタを使用して、マスターノードにsingers.scalaファイルを作成します。- 次のコードを
singers.scalaファイルに貼り付けます。Spanner の Data Boost 機能が有効になっています。これはメインの Spanner インスタンスへの影響がほぼゼロです。object singers { def main(): Unit = { /* * Uncomment (use the following code) if you are not running in spark-shell. * import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .appName("spark-spanner-demo") .getOrCreate() */ // Load data in from Spanner. See // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties // for option information. val singersDF = (spark.read.format("cloud-spanner") .option("projectId", "PROJECT_ID") .option("instanceId", "INSTANCE_ID") .option("databaseId", "DATABASE_ID") .option("table", "TABLE_NAME") .option("enableDataBoost", true) .load() .cache()) singersDF.createOrReplaceTempView("Singers") // Load the Singers table. val result = spark.sql("SELECT * FROM Singers") result.show() result.printSchema() } }
次のように置き換えます。
- PROJECT_ID: 実際の Google Cloud プロジェクト ID。 プロジェクト ID は、 Google Cloud コンソールのダッシュボードの [プロジェクト情報] セクションに表示されます。
- INSTANCE_ID、DATABASE_ID、TABLE_NAME:
Singersデータベース テーブルを使用して Spanner インスタンスを設定するをご覧ください。
singers.scalaファイルを保存します。
- 次のコードを
spark-shellREPL を起動します。$ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
次のように置き換えます。
CONNECTOR_VERSION: Spanner コネクタのバージョン。GitHub の
GoogleCloudDataproc/spark-spanner-connectorリポジトリのバージョン リストから Spanner コネクタのバージョンを選択します。:load singers.scalaコマンドでsingers.scalaを実行して、Spanner のSingersテーブルを作成します。出力リストには、Singers 出力の例が表示されます。> :load singers.scala Loading singers.scala... defined object singers > singers.main() ... +--------+---------+--------+---------+-----------+ |SingerId|FirstName|LastName|BirthDate|LastUpdated| +--------+---------+--------+---------+-----------+ | 1| Marc|Richards| null| null| | 2| Catalina| Smith| null| null| | 3| Alice| Trentor| null| null| +--------+---------+--------+---------+-----------+ root |-- SingerId: long (nullable = false) |-- FirstName: string (nullable = true) |-- LastName: string (nullable = true) |-- BirthDate: date (nullable = true) |-- LastUpdated: timestamp (nullable = true)
Spanner グラフを読み取る
Spanner コネクタは、グラフを個別の
ノードとエッジの
DataFrames
にエクスポートするだけでなく、
GraphFrames
に直接エクスポートすることもサポートしています。
次の例では、Spanner を GraphFrame にエクスポートします。Spanner コネクタ jar に含まれている Python SpannerGraphConnector クラスを使用して、Spanner Graph を読み取ります。
from pyspark.sql import SparkSession connector_jar = "gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar" spark = (SparkSession.builder.appName("spanner-graphframe-graphx-example") .config("spark.jars.packages", "graphframes:graphframes:0.8.4-spark3.5-s_2.12") .config("spark.jars", connector_jar) .getOrCreate()) spark.sparkContext.addPyFile(connector_jar) from spannergraph import SpannerGraphConnector connector = (SpannerGraphConnector() .spark(spark) .project("PROJECT_ID") .instance("INSTANCE_ID") .database("DATABASE_ID") .graph("GRAPH_ID")) g = connector.load_graph() g.vertices.show() g.edges.show()
次のように置き換えます。
- CONNECTOR_VERSION: Spanner コネクタのバージョン。
GitHub の
GoogleCloudDataproc/spark-spanner-connectorリポジトリのバージョン リストから Spanner コネクタのバージョンを選択します。 - PROJECT_ID: 実際の Google Cloud プロジェクト ID。 プロジェクト ID は、 Google Cloud コンソールのダッシュボードの [プロジェクト情報] セクションに表示されます。
- INSTANCE_ID、DATABASE_ID、TABLE_NAME インスタンス ID、データベース ID、グラフ ID を挿入します。
GraphFrames ではなくノードとエッジ DataFrames をエクスポートするには、代わりに load_dfs を使用します。
df_vertices, df_edges, df_id_map = connector.load_dfs()
Spanner テーブルを書き込む
Spanner コネクタは、Spark データソース APIを使用して、Spark Dataframe を Spanner テーブルに書き込むことをサポートしています。
DataFrame を Spanner テーブルに書き込む例
コードを保存して実行する前に、変数に値を入力します。
"""Spanner PySpark write example.""" from pyspark.sql import SparkSession spark = SparkSession.builder.appName('Spanner Write App').getOrCreate() columns = ['id', 'name', 'email'] data = [(1, 'John Doe', 'john.doe@example.com'), (2, 'Jane Doe', 'jane.doe@example.com')] df = spark.createDataFrame(data, columns) df.write.format('cloud-spanner') \ .option("projectId", "PROJECT_ID") .option("instanceId", "INSTANCE_ID") .option("databaseId", "DATABASE_ID") .option("table", "TABLE_NAME") .mode("append") \ .save()
次のように置き換えます。
- PROJECT_ID: プロジェクト ID Google Cloud プロジェクト ID は、[Project info] セクションに 表示されます。 Google Cloud [Dashboard]
- INSTANCE_ID、DATABASE_ID、TABLE_NAME インスタンス ID、データベース ID、テーブル ID を挿入します。
クリーンアップ
アカウントに継続的に課金されないようにするには、 Managed Service for Apache Spark クラスタを停止または 削除し、 Spanner インスタンスを削除します。 Google Cloud
次のステップ
pyspark.sql.DataFrameの例を見る。- Spark DataFrame 言語のサポートについては、以下をご覧ください。
- GitHub の Spark Spanner Connector リポジトリを参照する。
- Spark ジョブ調整のヒントを確認する。