本頁說明如何建立 Dataproc 叢集,並使用 Spark Spanner 連接器,透過 Apache Spark 從 Spanner 讀取資料
Spanner 連接器可與 Spark 搭配使用,透過 Spanner Java 程式庫從 Spanner 資料庫讀取資料。Spanner 連接器支援將 Spanner 資料表和圖表讀取至 Spark DataFrame 和 GraphFrame。
費用
在本文件中,您會使用下列 Google Cloud的計費元件:
- Dataproc
- Spanner
- Cloud Storage
您可以使用 Pricing Calculator,根據預測用量估算費用。
事前準備
在本教學課程中使用 Spanner 連接器之前,請設定 Dataproc 叢集,以及Spanner 執行個體和資料庫。
設定 Dataproc 叢集
建立 Dataproc 叢集,或使用具有下列設定的現有 Dataproc 叢集:
VM 服務帳戶權限。叢集VM 服務帳戶必須指派適當的 Spanner 權限。如果您使用 Data Boost (範例程式碼中已啟用 Data Boost,請參閱「匯出 Spanner 表格」),VM 服務帳戶也必須具備必要的 Data Boost IAM 權限。
存取權範圍。建立叢集時,必須啟用
cloud-platform範圍或適當的spanner範圍。使用映像檔版本 2.1 以上版本建立的叢集,預設會啟用cloud-platform範圍。下列操作說明會示範如何使用 Google Cloud 控制台、gcloud CLI 或 Dataproc API,在叢集建立要求中設定
cloud-platform範圍。如需其他叢集建立操作說明,請參閱「建立叢集」。Google Cloud 控制台
- 在 Google Cloud 控制台中,開啟 Dataproc 的「建立叢集」頁面。
- 在「專案存取權」部分的「管理安全性」面板中,按一下「為這個叢集啟用 cloud-platform 範圍」。
- 填寫或確認其他叢集建立欄位,然後按一下「建立」。
gcloud CLI
您可以執行下列
gcloud dataproc clusters create指令,建立啟用cloud-platform範圍的叢集。gcloud dataproc clusters create CLUSTER_NAME --scopes https://www.googleapis.com/auth/cloud-platform
API
您可以指定 GceClusterConfig.serviceAccountScopes,做為 clusters.create 要求的一部分。
"serviceAccountScopes": "https://www.googleapis.com/auth/cloud-platform"
設定含有 Singers 資料庫資料表的 Spanner 執行個體
建立 Spanner 執行個體,並在其中建立含有 Singers 資料表的資料庫。記下 Spanner 執行個體 ID 和資料庫 ID。
搭配使用 Spanner 連接器和 Spark
Spanner 連接器適用於 Spark 版本 3.1+。將工作提交至 Dataproc 叢集時,您會在 Cloud Storage 連接器 JAR 檔案規格中指定連接器版本。
範例:使用 Spanner 連接器提交 gcloud CLI Spark 工作。
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar \ ... [other job submission flags]
更改下列內容:
CONNECTOR_VERSION:Spanner 連接器版本。
從 GitHub GoogleCloudDataproc/spark-spanner-connector 存放區的版本清單中,選擇 Spanner 連接器版本。
讀取 Spanner 資料表
您可以使用 Python 或 Scala,透過 Spark 資料來源 API 將 Spanner 資料表資料讀取至 Spark DataFrame。
PySpark
如要在叢集上執行本節中的範例 PySpark 程式碼,請將工作提交至 Dataproc 服務,或從叢集主要節點上的 spark-submit REPL 執行工作。
Dataproc 工作
- 使用本機文字編輯器或 Cloud Shell 中預先安裝的
vi、vim或nano文字編輯器,建立singers.py檔案。 - 填入預留位置變數後,將下列程式碼貼到
singers.py檔案中。請注意,系統已啟用 Spanner Data Boost 功能,對主要 Spanner 執行個體的影響幾乎為零。#!/usr/bin/env python """Spanner PySpark read example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-spanner-demo') \ .getOrCreate() # Load data from Spanner. singers = spark.read.format('cloud-spanner') \ .option("projectId", "PROJECT_ID") \ .option("instanceId", "INSTANCE_ID") \ .option("databaseId", "DATABASE_ID") \ .option("table", "TABLE_NAME") \ .option("enableDataBoost", "true") \ .load() singers.createOrReplaceTempView('Singers') # Read from Singers result = spark.sql('SELECT * FROM Singers') result.show() result.printSchema()
更改下列內容:
- PROJECT_ID:您的 Google Cloud 專案 ID。 專案 ID 會列在 Google Cloud 控制台資訊主頁的「專案資訊」部分。
- INSTANCE_ID、DATABASE_ID 和 TABLE_NAME:請參閱使用
Singers資料庫表單設定 Spanner 執行個體。
- 儲存
singers.py檔案。 - 使用 Google Cloud 控制台、gcloud CLI 或 Dataproc API,將工作提交至 Dataproc 服務。
範例:使用 Spanner 連接器透過 gcloud CLI 提交工作。
gcloud dataproc jobs submit pyspark singers.py \ --cluster=CLUSTER_NAME \ --region=REGION \ --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar更改下列內容:
- CLUSTER_NAME:新叢集的名稱。
- REGION:可執行工作負載的 Compute Engine區域。
- CONNECTOR_VERSION:Spanner 連接器版本。
從 GitHub
GoogleCloudDataproc/spark-spanner-connector存放區的版本清單中,選擇 Spanner 連接器版本。
spark-submit 工作
- 使用 SSH 連線至 Dataproc 叢集主節點。
- 使用預先安裝的
vi、vim或nano文字編輯器,在主要節點上建立singers.py檔案。- 將下列程式碼貼入
singers.py檔案。請注意,系統已啟用 Spanner Data Boost 功能,對主要 Spanner 執行個體的影響幾乎為零。#!/usr/bin/env python """Spanner PySpark read example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-spanner-demo') \ .getOrCreate() # Load data from Spanner. singers = spark.read.format('cloud-spanner') \ .option("projectId", "PROJECT_ID") \ .option("instanceId", "INSTANCE_ID") \ .option("databaseId", "DATABASE_ID") \ .option("table", "TABLE_NAME") \ .option("enableDataBoost", "true") \ .load() singers.createOrReplaceTempView('Singers') # Read from Singers result = spark.sql('SELECT * FROM Singers') result.show() result.printSchema()
更改下列內容:
- PROJECT_ID:您的 Google Cloud 專案 ID。 專案 ID 會列在 Google Cloud 控制台資訊主頁的「專案資訊」部分。
- INSTANCE_ID、DATABASE_ID 和 TABLE_NAME:請參閱「使用
Singers資料庫表單設定 Spanner 執行個體」。
- 儲存
singers.py檔案。
- 將下列程式碼貼入
- 執行
singers.py,並使用spark-submit建立 SpannerSingers資料表。spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
更改下列內容:
- CONNECTOR_VERSION:Spanner 連接器版本。
從 GitHub
GoogleCloudDataproc/spark-spanner-connector存放區的版本清單中,選擇 Spanner 連接器版本。
輸出內容會如下所示:
... +--------+---------+--------+---------+-----------+ |SingerId|FirstName|LastName|BirthDate|LastUpdated| +--------+---------+--------+---------+-----------+ | 1| Marc|Richards| null| null| | 2| Catalina| Smith| null| null| | 3| Alice| Trentor| null| null| +--------+---------+--------+---------+-----------+ root |-- SingerId: long (nullable = false) |-- FirstName: string (nullable = true) |-- LastName: string (nullable = true) |-- BirthDate: date (nullable = true) |-- LastUpdated: timestamp (nullable = true) only showing top 20 rows
- CONNECTOR_VERSION:Spanner 連接器版本。
從 GitHub
Scala
如要在叢集上執行 Scala 程式碼範例,請完成下列步驟:
- 使用 SSH 連線至 Dataproc 叢集主節點。
- 使用預先安裝的
vi、vim或nano文字編輯器,在主要節點上建立singers.scala檔案。- 將下列程式碼貼入
singers.scala檔案。請注意,系統已啟用 Spanner Data Boost 功能,對主要 Spanner 執行個體的影響幾乎為零。object singers { def main(): Unit = { /* * Uncomment (use the following code) if you are not running in spark-shell. * import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .appName("spark-spanner-demo") .getOrCreate() */ // Load data in from Spanner. See // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties // for option information. val singersDF = (spark.read.format("cloud-spanner") .option("projectId", "PROJECT_ID") .option("instanceId", "INSTANCE_ID") .option("databaseId", "DATABASE_ID") .option("table", "TABLE_NAME") .option("enableDataBoost", true) .load() .cache()) singersDF.createOrReplaceTempView("Singers") // Load the Singers table. val result = spark.sql("SELECT * FROM Singers") result.show() result.printSchema() } }
更改下列內容:
- PROJECT_ID:您的 Google Cloud 專案 ID。 專案 ID 會列在 Google Cloud 控制台資訊主頁的「專案資訊」部分。
- INSTANCE_ID、DATABASE_ID 和 TABLE_NAME:請參閱「使用
Singers資料庫表單設定 Spanner 執行個體」。
- 儲存
singers.scala檔案。
- 將下列程式碼貼入
- 啟動
spark-shellREPL。$ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
更改下列內容:
CONNECTOR_VERSION:Spanner 連接器版本。 從 GitHub
GoogleCloudDataproc/spark-spanner-connector存放區的版本清單中,選擇 Spanner 連接器版本。 - 執行
singers.scala和:load singers.scala指令,建立 SpannerSingers資料表。輸出清單會顯示 Singers 輸出中的範例。> :load singers.scala Loading singers.scala... defined object singers > singers.main() ... +--------+---------+--------+---------+-----------+ |SingerId|FirstName|LastName|BirthDate|LastUpdated| +--------+---------+--------+---------+-----------+ | 1| Marc|Richards| null| null| | 2| Catalina| Smith| null| null| | 3| Alice| Trentor| null| null| +--------+---------+--------+---------+-----------+ root |-- SingerId: long (nullable = false) |-- FirstName: string (nullable = true) |-- LastName: string (nullable = true) |-- BirthDate: date (nullable = true) |-- LastUpdated: timestamp (nullable = true)
解讀 Spanner 圖表
Spanner 連接器支援將圖形匯出至個別節點和邊緣 DataFrame,以及直接匯出至 GraphFrames。
以下範例會將 Spanner 匯出至 GraphFrame。
這個程式會使用 Spanner 連接器 JAR 中包含的 Python SpannerGraphConnector 類別,讀取 Spanner Graph。
from pyspark.sql import SparkSession connector_jar = "gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar" spark = (SparkSession.builder.appName("spanner-graphframe-graphx-example") .config("spark.jars.packages", "graphframes:graphframes:0.8.4-spark3.5-s_2.12") .config("spark.jars", connector_jar) .getOrCreate()) spark.sparkContext.addPyFile(connector_jar) from spannergraph import SpannerGraphConnector connector = (SpannerGraphConnector() .spark(spark) .project("PROJECT_ID") .instance("INSTANCE_ID") .database("DATABASE_ID") .graph("GRAPH_ID")) g = connector.load_graph() g.vertices.show() g.edges.show()
更改下列內容:
- CONNECTOR_VERSION:Spanner 連接器版本。從 GitHub
GoogleCloudDataproc/spark-spanner-connector存放區的版本清單中,選擇 Spanner 連接器版本。 - PROJECT_ID:您的 Google Cloud 專案 ID。 專案 ID 會列在 Google Cloud 控制台資訊主頁的「專案資訊」部分。
- INSTANCE_ID、DATABASE_ID 和 TABLE_NAME 插入執行個體、資料庫和圖表 ID。
如要匯出節點和邊緣 DataFrames,而非 GraphFrames,請使用 load_dfs:
df_vertices, df_edges, df_id_map = connector.load_dfs()
清除所用資源
如要避免系統持續向您的 Google Cloud 帳戶收費,您可以停止或刪除 Dataproc 叢集,並刪除 Spanner 執行個體。
後續步驟
- 請參閱
pyspark.sql.DataFrame範例。 - 如需 Spark DataFrame 語言支援資訊,請參閱下列文章:
- 請參閱 GitHub 上的 Spark Spanner 連接器存放區。
- 請參閱 Spark 工作調整提示。