Managed Service for Apache Spark Spark Connect 用戶端是 Apache Spark Connect 用戶端的封裝容器。應用程式可透過 Spark Connect 協定,與遠端 Managed Service for Apache Spark 工作階段通訊。本文說明如何安裝、設定及使用用戶端。
事前準備
確認您具備Identity and Access Management 角色,其中包含管理互動式工作階段和工作階段範本所需的權限。
如果您在 Google Cloud以外的位置執行用戶端,請提供驗證憑證。將
GOOGLE_APPLICATION_CREDENTIALS環境變數設為服務帳戶金鑰檔案的路徑。
安裝或解除安裝用戶端
您可以使用 pip 安裝或解除安裝 dataproc-spark-connect 套件。
安裝
如要安裝最新版用戶端,請執行下列指令:
pip install -U dataproc-spark-connect
解除安裝
如要解除安裝用戶端,請執行下列指令:
pip uninstall dataproc-spark-connect
設定用戶端
為工作階段指定專案和區域。您可以使用環境變數設定這些值,也可以在程式碼中使用建構工具 API。
環境變數
設定 GOOGLE_CLOUD_PROJECT 和 GOOGLE_CLOUD_REGION 環境變數。
Builder API
請使用 .projectId() 和 .location() 方法。
spark = DataprocSparkSession.builder.projectId("my-project").location("us-central1").getOrCreate()
啟動 Spark 工作階段
如要啟動 Spark 工作階段,請將必要的匯入項目新增至 PySpark 應用程式或筆記本,然後呼叫 DataprocSparkSession.builder.getOrCreate() API。
匯入
DataprocSparkSession類別。呼叫
getOrCreate()方法來啟動工作階段。from google.cloud.dataproc_spark_connect import DataprocSparkSession spark = DataprocSparkSession.builder.getOrCreate()
設定 Spark 屬性
如要設定 Spark 屬性,請將一或多個 .config() 方法鏈結至建構工具。
from google.cloud.dataproc_spark_connect import DataprocSparkSession
spark = DataprocSparkSession.builder.config('spark.executor.memory', '48g').config('spark.executor.cores', '8').getOrCreate()
使用進階設定
如需進階設定,請使用 Session 類別自訂設定,例如子網路或執行階段版本。
from google.cloud.dataproc_spark_connect import DataprocSparkSession
from google.cloud.dataproc_v1 import Session
session_config = Session()
session_config.environment_config.execution_config.subnetwork_uri = 'SUBNET'
session_config.runtime_config.version = '3.0'
spark = DataprocSparkSession.builder.projectId('my-project').location('us-central1').dataprocSessionConfig(session_config).getOrCreate()
重複使用已命名的工作階段
具名工作階段可讓您在多個筆記本之間共用單一 Spark 工作階段,同時避免重複啟動工作階段而導致延遲。
在第一個筆記本中,建立具有自訂 ID 的工作階段。
from google.cloud.dataproc_spark_connect import DataprocSparkSession session_id = 'my-ml-pipeline-session' spark = DataprocSparkSession.builder.dataprocSessionId(session_id).getOrCreate() df = spark.createDataFrame([(1, 'data')], ['id', 'value']) df.show()在另一個筆記本中,指定相同的工作階段 ID,即可重複使用該工作階段。
from google.cloud.dataproc_spark_connect import DataprocSparkSession session_id = 'my-ml-pipeline-session' spark = DataprocSparkSession.builder.dataprocSessionId(session_id).getOrCreate() df = spark.createDataFrame([(2, 'more-data')], ['id', 'value']) df.show()
工作階段 ID 的長度必須介於 4 至 63 個字元之間,只能使用小寫英文字母、數字和連字號,開頭須為小寫英文字母。ID 的結尾不得為連字號。ID 處於 TERMINATED 狀態的工作階段無法重複使用。
使用 Spark SQL Magic 指令
這個套件支援 sparksql-magic 程式庫,可在 Jupyter 筆記本中執行 Spark SQL 查詢。魔法指令是選用功能。
安裝必要的依附元件。
pip install IPython sparksql-magic載入 Magic 擴充功能。
%load_ext sparksql_magic選用:設定預設設定。
%config SparkSql.limit=20執行 SQL 查詢。
%%sparksql SELECT * FROM your_table
如要使用進階選項,請在 %%sparksql 指令中加入旗標。舉例來說,如要快取結果並建立檢視區塊,請執行下列指令:
%%sparksql --cache --view result_view df
SELECT * FROM your_table WHERE condition = true
可用的選項如下:
--cache或-c:將 DataFrame 儲存在快取中。--eager或-e:使用預先載入的快取。--view VIEW或-v VIEW:建立臨時檢視畫面。--limit N或-l N:覆寫預設的資料列顯示限制。variable_name:將結果儲存在變數中。