使用 Spark Spanner 连接器

本页面介绍了如何创建 Managed Service for Apache Spark 集群,以使用 Apache Spark 通过 Spark Spanner 连接器Spanner 读取数据和向其写入数据。

Spanner 连接器可与 Spark 搭配使用,以通过 Spanner Java 库从 Spanner 数据库读取数据以及向其中写入数据。Spanner 连接器支持将 Spanner 读入 Spark DataFramesGraphFrames,以及将 DataFrame 数据写入 Spanner 表。

费用

在本文档中,您将使用 Google Cloud的以下收费组件:

  • Managed Service for Apache Spark
  • Spanner
  • Cloud Storage

如需根据您的预计使用情况来估算费用,请使用价格计算器

新 Google Cloud 用户可能有资格申请免费试用

准备工作

  1. 登录您的 Google Cloud 账号。如果您是 Google Cloud新手,请 创建一个账号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Spanner, Managed Service for Apache Spark, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Spanner, Managed Service for Apache Spark, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  8. 授予必需的角色
  9. 设置 Managed Service for Apache Spark 集群
  10. 设置包含 Singers 数据库表的 Spanner 实例

授予必需的角色

您需要拥有某些 IAM 角色才能运行本页中的示例。这些角色可能已获授予,具体取决于组织政策。如需检查角色授予情况,请参阅您是否需要授予角色?

如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

为确保 Compute Engine 默认服务账号具有创建 Managed Service for Apache Spark 集群所需的权限,请让您的管理员为 Compute Engine 默认服务账号授予项目的以下 IAM 角色:

设置 Managed Service for Apache Spark 集群

创建 Managed Service for Apache Spark 集群,或使用已创建的 Managed Service for Apache Spark 集群(该集群是使用 2.1 或更高版本的 Managed Service for Apache Spark 映像创建的;如果该集群是使用 2.0 或更低版本的映像创建的,则必须使用设置为 cloud-platform 范围scope 属性创建)。

设置包含 Singers 数据库表的 Spanner 实例

使用包含 Singers 表的数据库创建 Spanner 实例。记下 Spanner 实例 ID 和数据库 ID。

将 Spanner 连接器与 Spark 搭配使用

Spanner 连接器适用于 Spark 版本 3.1+。在向 Managed Service for Apache Spark 集群提交作业时,您可以在 Cloud Storage 连接器 JAR 文件规范中指定连接器版本

示例:使用 Spanner 连接器提交 gcloud CLI Spark 作业。

gcloud dataproc jobs submit spark \
    --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar \
    ... [other job submission flags]
  

替换以下内容:

CONNECTOR_VERSION:Spanner 连接器版本。从 GitHub GoogleCloudDataproc/spark-spanner-connector 代码库中的版本列表中选择 Spanner 连接器版本。

读取 Spanner 表

您可以使用 Python 或 Scala 通过 Spark 数据源 API 将 Spanner 表数据读入 Spark Dataframe 中。

PySpark

您可以通过以下方式在集群上运行本部分中的示例 PySpark 代码:将作业提交到 Managed Service for Apache Spark,或从集群主服务器上的 spark-submit REPL 运行作业。

Managed Service for Apache Spark 作业

  1. 使用本地文本编辑器或使用预安装的 vivimnano 文本编辑器在 Cloud Shell 中创建 singers.py 文件。
    1. 填充占位变量后,将以下代码粘贴到 singers.py 文件中。请注意,Spanner Data Boost 功能处于启用状态,对主 Spanner 实例的影响接近于零。
      #!/usr/bin/env python
      
      """Spanner PySpark read example."""
      
      from pyspark.sql import SparkSession
      
      spark = SparkSession \
        .builder \
        .master('yarn') \
        .appName('spark-spanner-demo') \
        .getOrCreate()
      
      # Load data from Spanner.
      singers = spark.read.format('cloud-spanner') \
        .option("projectId", "PROJECT_ID") \
        .option("instanceId", "INSTANCE_ID") \
        .option("databaseId", "DATABASE_ID") \
        .option("table", "TABLE_NAME") \
        .option("enableDataBoost", "true") \
        .load()
      singers.createOrReplaceTempView('Singers')
      
      # Read from Singers
      result = spark.sql('SELECT * FROM Singers')
      result.show()
      result.printSchema()
        

      替换以下内容:

      1. PROJECT_ID:您的 Google Cloud 项目 ID。 项目 ID 列在 Google Cloud 控制台信息中心项目信息部分中。
      2. INSTANCE_IDDATABASE_IDTABLE_NAME:请参阅使用 Singers 数据库表设置 Spanner 实例
    2. 保存 singers.py 文件。
  2. 使用 Google Cloud 控制台、gcloud CLI 或 REST API 向 Managed Service for Apache Spark 提交作业

    示例:使用 Spanner 连接器提交 gcloud CLI 作业。

    gcloud dataproc jobs submit pyspark singers.py \
        --cluster=CLUSTER_NAME \
        --region=REGION \
        --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
          

    替换以下内容:

    1. CLUSTER_NAME:新集群的名称。
    2. REGION:用于运行工作负载的可用 Compute Engine 区域
    3. CONNECTOR_VERSION:Spanner 连接器版本。从 GitHub GoogleCloudDataproc/spark-spanner-connector 代码库中的版本列表中选择 Spanner 连接器版本。

spark-submit 作业

  1. 使用 SSH 连接到 Managed Service for Apache Spark 集群主服务器节点。
    1. 在 Google Cloud 控制台中前往 Managed Service for Apache Spark 集群页面,然后点击集群的名称。
    2. 集群详情页面上,选择“虚拟机实例”标签页。然后,点击集群主服务器节点名称右侧的 SSH
       Google Cloud 控制台中的“Dataproc 集群详细信息”页面屏幕截图,其中显示了用于连接到集群主服务器节点的 SSH 按钮。

      此时会打开一个浏览器窗口并显示主节点上的主目录。

          Connected, host fingerprint: ssh-rsa 2048 ...
          ...
          user@clusterName-m:~$
          
  2. 使用预安装的 vivimnano 文本编辑器在主节点上创建 singers.py 文件。
    1. 在将占位变量填充到 singers.py 文件中后,将以下代码粘贴到 singers.py 文件中。请注意,Spanner Data Boost 功能处于启用状态,对主 Spanner 实例的影响接近于零。
      #!/usr/bin/env python
      
      """Spanner PySpark read example."""
      
      from pyspark.sql import SparkSession
      
      spark = SparkSession \
        .builder \
        .master('yarn') \
        .appName('spark-spanner-demo') \
        .getOrCreate()
      
      # Load data from Spanner.
      singers = spark.read.format('cloud-spanner') \
        .option("projectId", "PROJECT_ID") \
        .option("instanceId", "INSTANCE_ID") \
        .option("databaseId", "DATABASE_ID") \
        .option("table", "TABLE_NAME") \
        .option("enableDataBoost", "true") \
        .load()
      singers.createOrReplaceTempView('Singers')
      
      # Read from Singers
      result = spark.sql('SELECT * FROM Singers')
      result.show()
      result.printSchema()
        

      替换以下内容:

      1. PROJECT_ID:您的 Google Cloud 项目 ID。 项目 ID 列在 Google Cloud 控制台信息中心项目信息部分中。
      2. INSTANCE_IDDATABASE_IDTABLE_NAME:请参阅使用 Singers 数据库表设置 Spanner 实例
    2. 保存 singers.py 文件。
  3. 使用 spark-submit 运行 singers.py 以创建 Spanner Singers 表。
    spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
      

    替换以下内容:

    1. CONNECTOR_VERSION:Spanner 连接器版本。从 GitHub GoogleCloudDataproc/spark-spanner-connector 代码库中的版本列表中选择 Spanner 连接器版本。

    输出为:

    ...
    +--------+---------+--------+---------+-----------+
    |SingerId|FirstName|LastName|BirthDate|LastUpdated|
    +--------+---------+--------+---------+-----------+
    |       1|     Marc|Richards|     null|       null|
    |       2| Catalina|   Smith|     null|       null|
    |       3|    Alice| Trentor|     null|       null|
    +--------+---------+--------+---------+-----------+
    
    root
     |-- SingerId: long (nullable = false)
     |-- FirstName: string (nullable = true)
     |-- LastName: string (nullable = true)
     |-- BirthDate: date (nullable = true)
     |-- LastUpdated: timestamp (nullable = true)
    only showing top 20 rows
    

Scala

如需在集群上运行示例 Scala 代码,请完成以下步骤:

  1. 使用 SSH 连接到 Managed Service for Apache Spark 集群主服务器节点。
    1. 在 Google Cloud 控制台中前往 Managed Service for Apache Spark 集群页面,然后点击集群的名称。
    2. 集群详情页面上,选择“虚拟机实例”标签页。然后,点击集群主节点名称右侧的 SSH Google Cloud 控制台中的 Dataproc 集群详情页面。

      此时会打开一个浏览器窗口并显示主节点上的主目录。

          Connected, host fingerprint: ssh-rsa 2048 ...
          ...
          user@clusterName-m:~$
          
  2. 使用预安装的 vivimnano 文本编辑器在主节点上创建 singers.scala 文件。
    1. 将以下代码粘贴到 singers.scala 文件中。请注意,Spanner Data Boost 功能处于启用状态,对主 Spanner 实例的影响接近于零。
      object singers {
        def main(): Unit = {
          /*
           * Uncomment (use the following code) if you are not running in spark-shell.
           *
          import org.apache.spark.sql.SparkSession
          val spark = SparkSession.builder()
            .appName("spark-spanner-demo")
            .getOrCreate()
          */
      
          // Load data in from Spanner. See
          // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties
          // for option information.
          val singersDF =
            (spark.read.format("cloud-spanner")
              .option("projectId", "PROJECT_ID")
              .option("instanceId", "INSTANCE_ID")
              .option("databaseId", "DATABASE_ID")
              .option("table", "TABLE_NAME")
              .option("enableDataBoost", true)
              .load()
              .cache())
      
          singersDF.createOrReplaceTempView("Singers")
      
          // Load the Singers table.
          val result = spark.sql("SELECT * FROM Singers")
          result.show()
          result.printSchema()
        }
      }
        

      替换以下内容:

      1. PROJECT_ID:您的 Google Cloud 项目 ID。 项目 ID 列在 Google Cloud 控制台信息中心项目信息部分中。
      2. INSTANCE_IDDATABASE_IDTABLE_NAME:请参阅使用 Singers 数据库表设置 Spanner 实例
    2. 保存 singers.scala 文件。
  3. 启动 spark-shell REPL。
    $ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
    

    替换以下内容:

    CONNECTOR_VERSION:Spanner 连接器版本。从 GitHub GoogleCloudDataproc/spark-spanner-connector 代码库中的版本列表中选择 Spanner 连接器版本。

  4. 使用 :load singers.scala 命令运行 singers.scala 以创建 Spanner Singers 表。输出列表会显示来自 Singers 输出的示例。
    > :load singers.scala
    Loading singers.scala...
    defined object singers
    > singers.main()
    ...
    +--------+---------+--------+---------+-----------+
    |SingerId|FirstName|LastName|BirthDate|LastUpdated|
    +--------+---------+--------+---------+-----------+
    |       1|     Marc|Richards|     null|       null|
    |       2| Catalina|   Smith|     null|       null|
    |       3|    Alice| Trentor|     null|       null|
    +--------+---------+--------+---------+-----------+
    
    root
     |-- SingerId: long (nullable = false)
     |-- FirstName: string (nullable = true)
     |-- LastName: string (nullable = true)
     |-- BirthDate: date (nullable = true)
     |-- LastUpdated: timestamp (nullable = true)
      

读取 Spanner 图

Spanner 连接器支持将该图导出到单独的节点和边缘 DataFrames,以及直接将其导出到 GraphFrames

以下示例将 Spanner 导出到 GraphFrame。该示例使用 Spanner 连接器 jar 中包含的 Python SpannerGraphConnector 类来读取 Spanner 图

from pyspark.sql import SparkSession

connector_jar = "gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar"

spark = (SparkSession.builder.appName("spanner-graphframe-graphx-example")
         .config("spark.jars.packages", "graphframes:graphframes:0.8.4-spark3.5-s_2.12")
         .config("spark.jars", connector_jar)
         .getOrCreate())
spark.sparkContext.addPyFile(connector_jar)

from spannergraph import SpannerGraphConnector

connector = (SpannerGraphConnector()
             .spark(spark)
             .project("PROJECT_ID")
             .instance("INSTANCE_ID")
             .database("DATABASE_ID")
             .graph("GRAPH_ID"))

g = connector.load_graph()
g.vertices.show()
g.edges.show()

替换以下内容:

  • CONNECTOR_VERSION:Spanner 连接器版本。 从 GitHub GoogleCloudDataproc/spark-spanner-connector 代码库中的版本列表中选择 Spanner 连接器版本。
  • PROJECT_ID:您的 Google Cloud 项目 ID。 项目 ID 列在 Google Cloud 控制台信息中心项目信息部分中。
  • INSTANCE_IDDATABASE_IDTABLE_NAME:插入实例、数据库和图表 ID。

如需导出节点和边缘 DataFrames(而不是 GraphFrame),请改用 load_dfs

df_vertices, df_edges, df_id_map = connector.load_dfs()

写入 Spanner 表

Spanner 连接器支持使用 Spark 数据源 API 将 Spark Dataframe 写入 Spanner 表。

将 DataFrame 写入 Spanner 表的示例

在保存并运行代码之前,请先填充变量。

"""Spanner PySpark write example."""
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('Spanner Write App').getOrCreate()

columns = ['id', 'name', 'email']
data = [(1, 'John Doe', 'john.doe@example.com'), (2, 'Jane Doe', 'jane.doe@example.com')]
df = spark.createDataFrame(data, columns)

df.write.format('cloud-spanner') \
    .option("projectId", "PROJECT_ID")
    .option("instanceId", "INSTANCE_ID")
    .option("databaseId", "DATABASE_ID")
    .option("table", "TABLE_NAME")
    .mode("append") \
    .save()

替换以下内容。

  • PROJECT_ID: Google Cloud 项目 ID。 项目 ID 列在 Google Cloud 控制台信息中心项目信息部分中。
  • INSTANCE_IDDATABASE_IDTABLE_NAME:插入实例、数据库和表 ID。

清理

为避免您的 Google Cloud 账号持续产生费用,您可以停止删除 Managed Service for Apache Spark 集群,并删除 Spanner 实例

后续步骤