Spark Spanner コネクタを使用する

このページでは、 Spark Spanner コネクタ を使用して、Apache SparkSpanner との間でデータを読み取り / 書き込みを行う Managed Service for Apache Spark クラスタを作成する方法について説明します。

Spanner コネクタは Spark と連携して、Spanner Java ライブラリを使用して Spanner データベースとの間でデータを読み取り / 書き込みを行います。 Spanner コネクタは、Spanner テーブルグラフを Spark DataFramesGraphFramesに読み込むこと、 および DataFrame データを Spanner テーブルに書き込むことをサポートしています。

費用

このドキュメントでは、課金対象である次のコンポーネントを使用します。 Google Cloud

  • Managed Service for Apache Spark
  • Spanner
  • Cloud Storage

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。

新規の Google Cloud ユーザーは無料トライアルをご利用いただける場合があります。

始める前に

  1. アカウントにログインします Google Cloud を初めて使用する場合は、 アカウントを作成して、実際のシナリオで Google プロダクトのパフォーマンスを評価してください。 Google Cloud新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Spanner, Managed Service for Apache Spark, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Spanner, Managed Service for Apache Spark, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  8. 必要なロールを付与します
  9. Managed Service for Apache Spark クラスタを設定します
  10. `Singers` データベース テーブルを使用して Spanner インスタンスを設定します

必要なロールを付与する

このページの例を実行するには、特定の IAM ロールが必要です。組織のポリシーによっては、これらのロールがすでに付与されている場合があります。ロールの付与を確認するには、 ロールを付与する必要がありますか?をご覧ください。

ロールの付与については、 プロジェクト、フォルダ、組織へのアクセス権の管理をご覧ください。

Compute Engine のデフォルト サービス アカウントに、Managed Service for Apache Spark クラスタを作成するために必要な 権限を付与するには、プロジェクトに対する次の IAM ロールを Compute Engine のデフォルト サービス アカウントに付与するよう管理者に依頼します。

Managed Service for Apache Spark クラスタを設定する

Managed Service for Apache Spark クラスタを作成する か、 2.1 以降の Managed Service for Apache Spark イメージで作成された既存の Managed Service for Apache Spark クラスタを使用します。クラスタが 2.0 以前のイメージで作成された場合は、 scope プロパティが cloud-platform スコープに設定されている必要があります

`Singers` データベース テーブルを使用して Spanner インスタンスを設定する

Singers テーブルを含むデータベースを使用して Spanner インスタンスを作成します。Spanner インスタンス ID とデータベース ID をメモします。

Spark で Spanner コネクタを使用する

Spanner コネクタは、Spark バージョン 3.1+ で使用できます。ジョブを Managed Service for Apache Spark クラスタに 送信するときに、Cloud Storage コネクタ JAR ファイル仕様の一部として コネクタ バージョン を 指定します。

例: Spanner コネクタを使用した gcloud CLI Spark ジョブの送信。

gcloud dataproc jobs submit spark \
    --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar \
    ... [other job submission flags]
  

次のように置き換えます。

CONNECTOR_VERSION: Spanner コネクタのバージョン。GitHub GoogleCloudDataproc/spark-spanner-connector リポジトリのバージョン リストから Spanner コネクタのバージョンを選択します。

Spanner テーブルを読み取る

Python または Scala を使用して、Spark データソース API を使用して Spanner テーブルデータを Spark Dataframe に読み取ることが可能です。

PySpark

このセクションの PySpark コードの例をクラスタで実行するには、Managed Service for Apache Spark にジョブを送信するか、クラスタ マスターノードの spark-submit REPL からジョブを実行します。

Managed Service for Apache Spark ジョブ

  1. ローカル テキスト エディタを使用するか、Cloud Shell でプリインストールされている vivim、または nano テキスト エディタを使用して に singers.py ファイルを作成します。
    1. プレースホルダ変数に値を入力したら、次のコードを singers.py ファイルに貼り付けます。Spanner の Data Boost 機能が有効になっています。これはメインの Spanner インスタンスへの影響がほぼゼロです。
      #!/usr/bin/env python
      
      """Spanner PySpark read example."""
      
      from pyspark.sql import SparkSession
      
      spark = SparkSession \
        .builder \
        .master('yarn') \
        .appName('spark-spanner-demo') \
        .getOrCreate()
      
      # Load data from Spanner.
      singers = spark.read.format('cloud-spanner') \
        .option("projectId", "PROJECT_ID") \
        .option("instanceId", "INSTANCE_ID") \
        .option("databaseId", "DATABASE_ID") \
        .option("table", "TABLE_NAME") \
        .option("enableDataBoost", "true") \
        .load()
      singers.createOrReplaceTempView('Singers')
      
      # Read from Singers
      result = spark.sql('SELECT * FROM Singers')
      result.show()
      result.printSchema()
        

      次のように置き換えます。

      1. PROJECT_ID: 実際の Google Cloud プロジェクト ID。 プロジェクト ID は、 Google Cloud コンソールのダッシュボードの [プロジェクト情報] セクションに表示されます。
      2. INSTANCE_IDDATABASE_IDTABLE_NAME: Singers データベース テーブルを使用して Spanner インスタンスを設定するをご覧ください。
    2. singers.py ファイルを保存します。
  2. ジョブを送信 するには、Managed Service for Apache Spark で Google Cloud コンソール、gcloud CLI、または REST API を使用します。

    例: Spanner コネクタを使用した gcloud CLI ジョブの送信。

    gcloud dataproc jobs submit pyspark singers.py \
        --cluster=CLUSTER_NAME \
        --region=REGION \
        --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
          

    次のように置き換えます。

    1. CLUSTER_NAME: 新しいクラスタの名前。
    2. REGION: ワークロードを実行できる利用可能な Compute Engine リージョン
    3. CONNECTOR_VERSION: Spanner コネクタのバージョン。 GitHub の GoogleCloudDataproc/spark-spanner-connector リポジトリのバージョン リストから Spanner コネクタのバージョンを選択します。

spark-submit ジョブ

  1. SSH を使用して Managed Service for Apache Spark クラスタ マスターノードに接続します。
    1. コンソールで、Managed Service for Apache Spark の [Clusters] ページに移動し、クラスタ名をクリックします。 Google Cloud
    2. [クラスタの詳細] ページで、[VM インスタンス] タブを選択します。次に、クラスタ マスターノード名の右側にある SSH をクリックします。
       Google Cloud コンソールの Dataproc クラスタの詳細ページのスクリーンショット。クラスタ マスターノードに接続するために使用される SSH ボタンが表示されています。

      マスターノード上のホーム ディレクトリでブラウザ ウィンドウが開きます。

          Connected, host fingerprint: ssh-rsa 2048 ...
          ...
          user@clusterName-m:~$
          
  2. 事前にインストールされている vivim、または nano テキスト エディタを使用して、マスターノードに singers.py ファイルを作成します。
    1. プレースホルダ変数を singers.py ファイルに入力したら、次のコードを singers.py ファイルに貼り付けます。Spanner の Data Boost 機能が 有効になっています。これはメインの Spanner インスタンスへの影響がほぼゼロです。
      #!/usr/bin/env python
      
      """Spanner PySpark read example."""
      
      from pyspark.sql import SparkSession
      
      spark = SparkSession \
        .builder \
        .master('yarn') \
        .appName('spark-spanner-demo') \
        .getOrCreate()
      
      # Load data from Spanner.
      singers = spark.read.format('cloud-spanner') \
        .option("projectId", "PROJECT_ID") \
        .option("instanceId", "INSTANCE_ID") \
        .option("databaseId", "DATABASE_ID") \
        .option("table", "TABLE_NAME") \
        .option("enableDataBoost", "true") \
        .load()
      singers.createOrReplaceTempView('Singers')
      
      # Read from Singers
      result = spark.sql('SELECT * FROM Singers')
      result.show()
      result.printSchema()
        

      次のように置き換えます。

      1. PROJECT_ID: 実際の Google Cloud プロジェクト ID。 プロジェクト ID は、 Google Cloud コンソールのダッシュボードの [プロジェクト情報] セクションに表示されます。
      2. INSTANCE_IDDATABASE_IDTABLE_NAME: Singers データベース テーブルを使用して Spanner インスタンスを設定するをご覧ください。
    2. singers.py ファイルを保存します。
  3. spark-submitsingers.py を実行して、Spanner の Singers テーブルを作成します。
    spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
      

    次のように置き換えます。

    1. CONNECTOR_VERSION: Spanner コネクタのバージョン。 GitHub の GoogleCloudDataproc/spark-spanner-connector リポジトリのバージョン リストから Spanner コネクタのバージョンを選択します。

    次のように出力されます。

    ...
    +--------+---------+--------+---------+-----------+
    |SingerId|FirstName|LastName|BirthDate|LastUpdated|
    +--------+---------+--------+---------+-----------+
    |       1|     Marc|Richards|     null|       null|
    |       2| Catalina|   Smith|     null|       null|
    |       3|    Alice| Trentor|     null|       null|
    +--------+---------+--------+---------+-----------+
    
    root
     |-- SingerId: long (nullable = false)
     |-- FirstName: string (nullable = true)
     |-- LastName: string (nullable = true)
     |-- BirthDate: date (nullable = true)
     |-- LastUpdated: timestamp (nullable = true)
    only showing top 20 rows
    

Scala

クラスタで Scala サンプルコードを実行する手順は次のとおりです。

  1. SSH を使用して Managed Service for Apache Spark クラスタ マスターノードに接続します。
    1. コンソールで、Managed Service for Apache Spark の [Clusters] ページに移動し、クラスタ名をクリックします。 Google Cloud
    2. [クラスタの詳細] ページで、[VM インスタンス] タブを選択します。次に、クラスタ マスターノード名の右側にある SSH をクリックします。 Google Cloud コンソールの Dataproc の [クラスタの詳細] ページ。

      マスターノード上のホーム ディレクトリでブラウザ ウィンドウが開きます。

          Connected, host fingerprint: ssh-rsa 2048 ...
          ...
          user@clusterName-m:~$
          
  2. 事前にインストールされている vivimnano テキスト エディタを使用して、マスターノードに singers.scala ファイルを作成します。
    1. 次のコードを singers.scala ファイルに貼り付けます。Spanner の Data Boost 機能が有効になっています。これはメインの Spanner インスタンスへの影響がほぼゼロです。
      object singers {
        def main(): Unit = {
          /*
           * Uncomment (use the following code) if you are not running in spark-shell.
           *
          import org.apache.spark.sql.SparkSession
          val spark = SparkSession.builder()
            .appName("spark-spanner-demo")
            .getOrCreate()
          */
      
          // Load data in from Spanner. See
          // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties
          // for option information.
          val singersDF =
            (spark.read.format("cloud-spanner")
              .option("projectId", "PROJECT_ID")
              .option("instanceId", "INSTANCE_ID")
              .option("databaseId", "DATABASE_ID")
              .option("table", "TABLE_NAME")
              .option("enableDataBoost", true)
              .load()
              .cache())
      
          singersDF.createOrReplaceTempView("Singers")
      
          // Load the Singers table.
          val result = spark.sql("SELECT * FROM Singers")
          result.show()
          result.printSchema()
        }
      }
        

      次のように置き換えます。

      1. PROJECT_ID: 実際の Google Cloud プロジェクト ID。 プロジェクト ID は、 Google Cloud コンソールのダッシュボードの [プロジェクト情報] セクションに表示されます。
      2. INSTANCE_IDDATABASE_IDTABLE_NAME: Singers データベース テーブルを使用して Spanner インスタンスを設定するをご覧ください。
    2. singers.scala ファイルを保存します。
  3. spark-shell REPL を起動します。
    $ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
    

    次のように置き換えます。

    CONNECTOR_VERSION: Spanner コネクタのバージョン。GitHub の GoogleCloudDataproc/spark-spanner-connector リポジトリのバージョン リストから Spanner コネクタのバージョンを選択します。

  4. :load singers.scala コマンドで singers.scala を実行して、Spanner の Singers テーブルを作成します。出力リストには、Singers 出力の例が表示されます。
    > :load singers.scala
    Loading singers.scala...
    defined object singers
    > singers.main()
    ...
    +--------+---------+--------+---------+-----------+
    |SingerId|FirstName|LastName|BirthDate|LastUpdated|
    +--------+---------+--------+---------+-----------+
    |       1|     Marc|Richards|     null|       null|
    |       2| Catalina|   Smith|     null|       null|
    |       3|    Alice| Trentor|     null|       null|
    +--------+---------+--------+---------+-----------+
    
    root
     |-- SingerId: long (nullable = false)
     |-- FirstName: string (nullable = true)
     |-- LastName: string (nullable = true)
     |-- BirthDate: date (nullable = true)
     |-- LastUpdated: timestamp (nullable = true)
      

Spanner グラフを読み取る

Spanner コネクタは、グラフを個別の ノードとエッジの DataFrames にエクスポートするだけでなく、 GraphFrames に直接エクスポートすることもサポートしています。

次の例では、Spanner を GraphFrame にエクスポートします。Spanner コネクタ jar に含まれている Python SpannerGraphConnector クラスを使用して、Spanner Graph を読み取ります。

from pyspark.sql import SparkSession

connector_jar = "gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar"

spark = (SparkSession.builder.appName("spanner-graphframe-graphx-example")
         .config("spark.jars.packages", "graphframes:graphframes:0.8.4-spark3.5-s_2.12")
         .config("spark.jars", connector_jar)
         .getOrCreate())
spark.sparkContext.addPyFile(connector_jar)

from spannergraph import SpannerGraphConnector

connector = (SpannerGraphConnector()
             .spark(spark)
             .project("PROJECT_ID")
             .instance("INSTANCE_ID")
             .database("DATABASE_ID")
             .graph("GRAPH_ID"))

g = connector.load_graph()
g.vertices.show()
g.edges.show()

次のように置き換えます。

  • CONNECTOR_VERSION: Spanner コネクタのバージョン。 GitHub の GoogleCloudDataproc/spark-spanner-connector リポジトリのバージョン リストから Spanner コネクタのバージョンを選択します。
  • PROJECT_ID: 実際の Google Cloud プロジェクト ID。 プロジェクト ID は、 Google Cloud コンソールのダッシュボードの [プロジェクト情報] セクションに表示されます。
  • INSTANCE_IDDATABASE_IDTABLE_NAME インスタンス ID、データベース ID、グラフ ID を挿入します。

GraphFrames ではなくノードとエッジ DataFrames をエクスポートするには、代わりに load_dfs を使用します。

df_vertices, df_edges, df_id_map = connector.load_dfs()

Spanner テーブルを書き込む

Spanner コネクタは、Spark データソース APIを使用して、Spark Dataframe を Spanner テーブルに書き込むことをサポートしています。

DataFrame を Spanner テーブルに書き込む例

コードを保存して実行する前に、変数に値を入力します。

"""Spanner PySpark write example."""
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('Spanner Write App').getOrCreate()

columns = ['id', 'name', 'email']
data = [(1, 'John Doe', 'john.doe@example.com'), (2, 'Jane Doe', 'jane.doe@example.com')]
df = spark.createDataFrame(data, columns)

df.write.format('cloud-spanner') \
    .option("projectId", "PROJECT_ID")
    .option("instanceId", "INSTANCE_ID")
    .option("databaseId", "DATABASE_ID")
    .option("table", "TABLE_NAME")
    .mode("append") \
    .save()

次のように置き換えます。

  • PROJECT_ID: プロジェクト ID Google Cloud プロジェクト ID は、[Project info] セクションに 表示されます。 Google Cloud [Dashboard]
  • INSTANCE_IDDATABASE_IDTABLE_NAME インスタンス ID、データベース ID、テーブル ID を挿入します。

クリーンアップ

アカウントに継続的に課金されないようにするには、 Managed Service for Apache Spark クラスタを停止または 削除し、 Spanner インスタンスを削除します。 Google Cloud

次のステップ