Dataproc Spark Connect 用戶端是 Apache Spark Connect 用戶端的包裝函式。應用程式可透過 Spark Connect 協定,與遠端 Serverless for Apache Spark 工作階段通訊。本文說明如何安裝、設定及使用用戶端。
事前準備
確認您具備Identity and Access Management 角色,其中包含管理互動式工作階段和工作階段範本所需的權限。
如果您在 Google Cloud以外的位置執行用戶端,請提供驗證憑證。將
GOOGLE_APPLICATION_CREDENTIALS環境變數設為服務帳戶金鑰檔案的路徑。
安裝或解除安裝用戶端
您可以使用 pip 安裝或解除安裝 dataproc-spark-connect 套件。
安裝
如要安裝最新版用戶端,請執行下列指令:
pip install -U dataproc-spark-connect
解除安裝
如要解除安裝用戶端,請執行下列指令:
pip uninstall dataproc-spark-connect
設定用戶端
為工作階段指定專案和區域。您可以使用環境變數設定這些值,也可以在程式碼中使用建構工具 API。
環境變數
設定 GOOGLE_CLOUD_PROJECT 和 GOOGLE_CLOUD_REGION 環境變數。
Builder API
請使用 .projectId() 和 .location() 方法。
spark = DataprocSparkSession.builder.projectId("my-project").location("us-central1").getOrCreate()
啟動 Spark 工作階段
如要啟動 Spark 工作階段,請將必要的匯入項目新增至 PySpark 應用程式或筆記本,然後呼叫 DataprocSparkSession.builder.getOrCreate() API。
匯入
DataprocSparkSession類別。呼叫
getOrCreate()方法即可開始工作階段。from google.cloud.dataproc_spark_connect import DataprocSparkSession spark = DataprocSparkSession.builder.getOrCreate()
設定 Spark 屬性
如要設定 Spark 屬性,請將一或多個 .config() 方法鏈結至建構工具。
from google.cloud.dataproc_spark_connect import DataprocSparkSession
spark = DataprocSparkSession.builder.config('spark.executor.memory', '48g').config('spark.executor.cores', '8').getOrCreate()
使用進階設定
如需進階設定,請使用 Session 類別自訂設定,例如子網路或執行階段版本。
from google.cloud.dataproc_spark_connect import DataprocSparkSession
from google.cloud.dataproc_v1 import Session
session_config = Session()
session_config.environment_config.execution_config.subnetwork_uri = 'SUBNET'
session_config.runtime_config.version = '3.0'
spark = DataprocSparkSession.builder.projectId('my-project').location('us-central1').dataprocSessionConfig(session_config).getOrCreate()
重複使用已命名的工作階段
具名工作階段可讓您在多個筆記本之間共用單一 Spark 工作階段,同時避免重複啟動工作階段而導致延遲。
在第一個筆記本中,使用自訂 ID 建立工作階段。
from google.cloud.dataproc_spark_connect import DataprocSparkSession session_id = 'my-ml-pipeline-session' spark = DataprocSparkSession.builder.dataprocSessionId(session_id).getOrCreate() df = spark.createDataFrame([(1, 'data')], ['id', 'value']) df.show()在另一個筆記本中,指定相同的工作階段 ID,即可重複使用該工作階段。
from google.cloud.dataproc_spark_connect import DataprocSparkSession session_id = 'my-ml-pipeline-session' spark = DataprocSparkSession.builder.dataprocSessionId(session_id).getOrCreate() df = spark.createDataFrame([(2, 'more-data')], ['id', 'value']) df.show()
工作階段 ID 的長度必須介於 4 至 63 個字元之間,只能使用小寫英文字母、數字和連字號,開頭須為小寫英文字母。ID 結尾不得為連字號。ID 處於 TERMINATED 狀態的工作階段無法重複使用。
使用 Spark SQL 神奇指令
這個套件支援 sparksql-magic 程式庫,可在 Jupyter 筆記本中執行 Spark SQL 查詢。魔法指令是選用功能。
安裝必要的依附元件。
pip install IPython sparksql-magic載入 Magic 擴充功能。
%load_ext sparksql_magic選用:設定預設設定。
%config SparkSql.limit=20執行 SQL 查詢。
%%sparksql SELECT * FROM your_table
如要使用進階選項,請在 %%sparksql 指令中加入旗標。舉例來說,如要快取結果並建立檢視區塊,請執行下列指令:
%%sparksql --cache --view result_view df
SELECT * FROM your_table WHERE condition = true
可用的選項如下:
--cache或-c:將 DataFrame 儲存在快取中。--eager或-e:使用預先載入的快取。--view VIEW或-v VIEW:建立臨時檢視畫面。--limit N或-l N:覆寫預設的資料列顯示限制。variable_name:將結果儲存在變數中。