本頁說明如何建立 Managed Service for Apache Spark 叢集,並使用 Spark Spanner 連接器,透過 Apache Spark 從 Spanner 讀取資料,以及將資料寫入 Spanner。
Spanner 連接器可搭配 Spark 使用,透過 Spanner Java 程式庫讀取 Spanner 資料庫中的資料,以及將資料寫入 Spanner 資料庫。Spanner 連接器支援將 Spanner資料表和圖形讀取至 Spark DataFrames 和 GraphFrames,以及將 DataFrame 資料寫入 Spanner 資料表。
費用
在本文件中,您會使用下列 Google Cloud的計費元件:
- Managed Service for Apache Spark
- Spanner
- Cloud Storage
如要根據預測用量估算費用,請使用 Pricing Calculator。
事前準備
- 登入 Google Cloud 帳戶。如果您是 Google Cloud新手,歡迎 建立帳戶,親自評估產品在實際工作環境中的成效。新客戶還能獲得價值 $300 美元的免費抵免額,可用於執行、測試及部署工作負載。
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Spanner, Managed Service for Apache Spark, and Cloud Storage APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Spanner, Managed Service for Apache Spark, and Cloud Storage APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.- 授予必要角色。
- 設定 Managed Service for Apache Spark 叢集。
- 設定 Spanner 執行個體和 Singers 資料庫資料表。
授予必要角色
如要執行本頁的範例,您必須具備特定 IAM 角色。視組織政策而定,系統可能已授予這些角色。如要檢查角色授予情形,請參閱「是否需要授予角色?」一節。
如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和機構的存取權」。
如要確保 Compute Engine 預設服務帳戶具備建立 Managed Service for Apache Spark 叢集所需的權限,請要求管理員在專案中,將下列 IAM 角色授予 Compute Engine 預設服務帳戶:
-
Dataproc Worker (
roles/dataproc.worker) -
Cloud Spanner 資料庫使用者 (
roles/spanner.databaseUser) -
Cloud Spanner 資料庫讀取者 (使用 Data Boost) (
roles/spanner.databaseReaderWithDataBoost)
設定 Managed Service for Apache Spark 叢集
建立 Managed Service for Apache Spark 叢集,或使用以 2.1 以上版本的 Managed Service for Apache Spark 映像檔建立的現有叢集。如果叢集是以 2.0 以下版本的映像檔建立,則必須以 scope 屬性設為 cloud-platform 範圍建立。
設定含有 Singers 資料庫資料表的 Spanner 執行個體
建立 Spanner 執行個體,並在其中建立包含 Singers 資料表的資料庫。記下 Spanner 執行個體 ID 和資料庫 ID。
搭配使用 Spanner 連接器和 Spark
Spanner 連接器適用於 Spark 版本 3.1+。將工作提交至 Managed Service for Apache Spark 叢集時,您可以在 Cloud Storage 連接器 JAR 檔案規格中指定連接器版本。
範例:使用 Spanner 連接器提交 gcloud CLI Spark 作業。
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar \ ... [other job submission flags]
更改下列內容:
CONNECTOR_VERSION:Spanner 連接器版本。從 GitHub GoogleCloudDataproc/spark-spanner-connector 存放區的版本清單中,選擇 Spanner 連接器版本。
讀取 Spanner 資料表
您可以使用 Python 或 Scala,透過 Spark 資料來源 API 將 Spanner 資料表資料讀取至 Spark DataFrame。
PySpark
如要在叢集上執行本節的 PySpark 程式碼範例,請將工作提交至 Managed Service for Apache Spark 服務,或在叢集主要執行個體上透過 spark-submit REPL 執行工作。
Managed Service for Apache Spark 工作
- 使用本機文字編輯器或 Cloud Shell 中預先安裝的
vi、vim或nano文字編輯器,建立singers.py檔案。 - 填入預留位置變數後,將下列程式碼貼到
singers.py檔案中。請注意,系統已啟用 Spanner Data Boost 功能,對主要 Spanner 執行個體的影響幾乎為零。#!/usr/bin/env python """Spanner PySpark read example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-spanner-demo') \ .getOrCreate() # Load data from Spanner. singers = spark.read.format('cloud-spanner') \ .option("projectId", "PROJECT_ID") \ .option("instanceId", "INSTANCE_ID") \ .option("databaseId", "DATABASE_ID") \ .option("table", "TABLE_NAME") \ .option("enableDataBoost", "true") \ .load() singers.createOrReplaceTempView('Singers') # Read from Singers result = spark.sql('SELECT * FROM Singers') result.show() result.printSchema()
更改下列內容:
- PROJECT_ID:您的 Google Cloud 專案 ID。 專案 ID 會列在 Google Cloud 控制台資訊主頁的「Project info」(專案資訊) 部分。
- INSTANCE_ID、DATABASE_ID 和 TABLE_NAME:請參閱「使用
Singers資料庫表單設定 Spanner 執行個體」。
- 儲存
singers.py檔案。 - 使用 Google Cloud 控制台、gcloud CLI 或 REST API,將工作提交至 Managed Service for Apache Spark。
範例:使用 Spanner 連接器提交 gcloud CLI 作業。
gcloud dataproc jobs submit pyspark singers.py \ --cluster=CLUSTER_NAME \ --region=REGION \ --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar更改下列內容:
- CLUSTER_NAME:新叢集的名稱。
- REGION:可執行工作負載的 Compute Engine 區域。
- CONNECTOR_VERSION:Spanner 連接器版本。從 GitHub
GoogleCloudDataproc/spark-spanner-connector存放區的版本清單中,選擇 Spanner 連接器版本。
spark-submit 工作
- 使用 SSH 連線至 Managed Service for Apache Spark 叢集主要執行個體。
- 前往 Google Cloud 控制台的 Managed Service for Apache Spark「Clusters」(叢集) 頁面,然後按一下叢集名稱。
- 在「Cluster details」(叢集詳細資料) 頁面中,選取「VM Instances」(VM 執行個體) 分頁標籤。然後點選叢集主要執行個體節點名稱右側的
SSH。
瀏覽器視窗會開啟主要節點上的主目錄。
Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- 使用預先安裝的
vi、vim或nano文字編輯器,在主要節點上建立singers.py檔案。- 在
singers.py檔案中填入預留位置變數後,將下列程式碼貼到singers.py檔案中。 請注意,系統已啟用 Spanner Data Boost 功能,對主要 Spanner 執行個體的影響幾乎為零。#!/usr/bin/env python """Spanner PySpark read example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-spanner-demo') \ .getOrCreate() # Load data from Spanner. singers = spark.read.format('cloud-spanner') \ .option("projectId", "PROJECT_ID") \ .option("instanceId", "INSTANCE_ID") \ .option("databaseId", "DATABASE_ID") \ .option("table", "TABLE_NAME") \ .option("enableDataBoost", "true") \ .load() singers.createOrReplaceTempView('Singers') # Read from Singers result = spark.sql('SELECT * FROM Singers') result.show() result.printSchema()
更改下列內容:
- PROJECT_ID:您的 Google Cloud 專案 ID。 專案 ID 會列在 Google Cloud 控制台資訊主頁的「Project info」(專案資訊) 部分。
- INSTANCE_ID、DATABASE_ID 和 TABLE_NAME:請參閱「使用
Singers資料庫表單設定 Spanner 執行個體」。
- 儲存
singers.py檔案。
- 在
- 使用
spark-submit執行singers.py,以建立 SpannerSingers資料表。spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
更改下列內容:
- CONNECTOR_VERSION:Spanner 連接器版本。從 GitHub
GoogleCloudDataproc/spark-spanner-connector存放區的版本清單中,選擇 Spanner 連接器版本。
輸出內容會如下所示:
... +--------+---------+--------+---------+-----------+ |SingerId|FirstName|LastName|BirthDate|LastUpdated| +--------+---------+--------+---------+-----------+ | 1| Marc|Richards| null| null| | 2| Catalina| Smith| null| null| | 3| Alice| Trentor| null| null| +--------+---------+--------+---------+-----------+ root |-- SingerId: long (nullable = false) |-- FirstName: string (nullable = true) |-- LastName: string (nullable = true) |-- BirthDate: date (nullable = true) |-- LastUpdated: timestamp (nullable = true) only showing top 20 rows
- CONNECTOR_VERSION:Spanner 連接器版本。從 GitHub
Scala
如要在叢集上執行 Scala 程式碼範例,請完成下列步驟:
- 使用 SSH 連線至 Managed Service for Apache Spark 叢集主要執行個體。
- 前往 Google Cloud 控制台的「Managed Service for Apache Spark」叢集頁面,然後按一下叢集名稱。
- 在「Cluster details」(叢集詳細資料) 頁面中,選取「VM Instances」(VM 執行個體) 分頁標籤。然後點選叢集主要執行個體節點名稱右側的
SSH。
瀏覽器視窗會開啟主要節點上的主目錄。
Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- 使用預先安裝的
vi、vim或nano文字編輯器,在主要節點上建立singers.scala檔案。- 將下列程式碼貼入
singers.scala檔案。請注意,系統已啟用 Spanner Data Boost 功能,對主要 Spanner 執行個體的影響幾乎為零。object singers { def main(): Unit = { /* * Uncomment (use the following code) if you are not running in spark-shell. * import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .appName("spark-spanner-demo") .getOrCreate() */ // Load data in from Spanner. See // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties // for option information. val singersDF = (spark.read.format("cloud-spanner") .option("projectId", "PROJECT_ID") .option("instanceId", "INSTANCE_ID") .option("databaseId", "DATABASE_ID") .option("table", "TABLE_NAME") .option("enableDataBoost", true) .load() .cache()) singersDF.createOrReplaceTempView("Singers") // Load the Singers table. val result = spark.sql("SELECT * FROM Singers") result.show() result.printSchema() } }
更改下列內容:
- PROJECT_ID:您的 Google Cloud 專案 ID。 專案 ID 會列在 Google Cloud 控制台資訊主頁的「Project info」(專案資訊) 部分。
- INSTANCE_ID、DATABASE_ID 和 TABLE_NAME:請參閱「使用
Singers資料庫表單設定 Spanner 執行個體」。
- 儲存
singers.scala檔案。
- 將下列程式碼貼入
- 啟動
spark-shellREPL。$ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
更改下列內容:
CONNECTOR_VERSION:Spanner 連接器版本。從 GitHub
GoogleCloudDataproc/spark-spanner-connector存放區的版本清單中,選擇 Spanner 連接器版本。 - 使用
:load singers.scala指令執行singers.scala,以建立 SpannerSingers資料表。輸出清單會顯示 Singers 輸出中的範例。> :load singers.scala Loading singers.scala... defined object singers > singers.main() ... +--------+---------+--------+---------+-----------+ |SingerId|FirstName|LastName|BirthDate|LastUpdated| +--------+---------+--------+---------+-----------+ | 1| Marc|Richards| null| null| | 2| Catalina| Smith| null| null| | 3| Alice| Trentor| null| null| +--------+---------+--------+---------+-----------+ root |-- SingerId: long (nullable = false) |-- FirstName: string (nullable = true) |-- LastName: string (nullable = true) |-- BirthDate: date (nullable = true) |-- LastUpdated: timestamp (nullable = true)
解讀 Spanner 圖表
Spanner 連接器支援將圖形匯出至個別的節點和邊緣 DataFrames,以及直接匯出至 GraphFrames。
以下範例會將 Spanner 匯出至 GraphFrame。這個範例會使用 Spanner 連接器 JAR 中包含的 Python SpannerGraphConnector 類別,讀取 Spanner Graph。
from pyspark.sql import SparkSession connector_jar = "gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar" spark = (SparkSession.builder.appName("spanner-graphframe-graphx-example") .config("spark.jars.packages", "graphframes:graphframes:0.8.4-spark3.5-s_2.12") .config("spark.jars", connector_jar) .getOrCreate()) spark.sparkContext.addPyFile(connector_jar) from spannergraph import SpannerGraphConnector connector = (SpannerGraphConnector() .spark(spark) .project("PROJECT_ID") .instance("INSTANCE_ID") .database("DATABASE_ID") .graph("GRAPH_ID")) g = connector.load_graph() g.vertices.show() g.edges.show()
更改下列內容:
- CONNECTOR_VERSION:Spanner 連接器版本。從 GitHub
GoogleCloudDataproc/spark-spanner-connector存放區的版本清單中,選擇 Spanner 連接器版本。 - PROJECT_ID:您的 Google Cloud 專案 ID。 專案 ID 會列在 Google Cloud 控制台資訊主頁的「Project info (專案資訊) 部分。
- INSTANCE_ID、DATABASE_ID 和 TABLE_NAME 插入執行個體、資料庫和圖表 ID。
如要匯出節點和邊緣 DataFrames,而非 GraphFrames,請改用 load_dfs:
df_vertices, df_edges, df_id_map = connector.load_dfs()
寫入 Spanner 資料表
Spanner 連接器支援使用 Spark 資料來源 API,將 Spark DataFrame 寫入 Spanner 資料表。
將 DataFrame 寫入 Spanner 資料表範例
請先填入變數,再儲存及執行程式碼。
"""Spanner PySpark write example.""" from pyspark.sql import SparkSession spark = SparkSession.builder.appName('Spanner Write App').getOrCreate() columns = ['id', 'name', 'email'] data = [(1, 'John Doe', 'john.doe@example.com'), (2, 'Jane Doe', 'jane.doe@example.com')] df = spark.createDataFrame(data, columns) df.write.format('cloud-spanner') \ .option("projectId", "PROJECT_ID") .option("instanceId", "INSTANCE_ID") .option("databaseId", "DATABASE_ID") .option("table", "TABLE_NAME") .mode("append") \ .save()
請替換下列項目。
- PROJECT_ID: Google Cloud 專案 ID。 專案 ID 會列在 Google Cloud 控制台資訊主頁的「Project info (專案資訊) 部分。
- INSTANCE_ID、DATABASE_ID 和 TABLE_NAME 插入執行個體、資料庫和資料表 ID。
清除所用資源
如要避免系統持續向您的 Google Cloud 帳戶收取費用,您可以停止或刪除 Managed Service for Apache Spark 叢集,並刪除 Spanner 執行個體。
後續步驟
- 請參閱
pyspark.sql.DataFrame範例。 - 如需 Spark DataFrame 語言支援資訊,請參閱下列文章:
- 請參閱 GitHub 上的 Spark Spanner 連接器存放區。
- 請參閱 Spark 工作調整提示。