使用無伺服器 Spark Connect 用戶端

Dataproc Spark Connect 用戶端是 Apache Spark Connect 用戶端的包裝函式。應用程式可透過 Spark Connect 協定,與遠端 Serverless for Apache Spark 工作階段通訊。本文說明如何安裝、設定及使用用戶端。

事前準備

  1. 確認您具備Identity and Access Management 角色,其中包含管理互動式工作階段和工作階段範本所需的權限。

  2. 如果您在 Google Cloud以外的位置執行用戶端,請提供驗證憑證。將 GOOGLE_APPLICATION_CREDENTIALS 環境變數設為服務帳戶金鑰檔案的路徑。

安裝或解除安裝用戶端

您可以使用 pip 安裝或解除安裝 dataproc-spark-connect 套件。

安裝

如要安裝最新版用戶端,請執行下列指令:

pip install -U dataproc-spark-connect

解除安裝

如要解除安裝用戶端,請執行下列指令:

pip uninstall dataproc-spark-connect

設定用戶端

為工作階段指定專案和區域。您可以使用環境變數設定這些值,也可以在程式碼中使用建構工具 API。

環境變數

設定 GOOGLE_CLOUD_PROJECTGOOGLE_CLOUD_REGION 環境變數。

# Google Cloud Configuration for Dataproc Spark Connect Integration Tests
# Copy this file to .env and fill in your actual values

# ============================================================================
# REQUIRED CONFIGURATION
# ============================================================================

# Your Google Cloud Project ID
GOOGLE_CLOUD_PROJECT="your-project-id"

# Google Cloud Region where Dataproc sessions will be created
GOOGLE_CLOUD_REGION="us-central1"

# Path to service account key file (if using SERVICE_ACCOUNT auth)
GOOGLE_APPLICATION_CREDENTIALS="/path/to/your/service-account-key.json"

# ============================================================================
# AUTHENTICATION CONFIGURATION
# ============================================================================

# Authentication type (SERVICE_ACCOUNT or END_USER_CREDENTIALS). If not set, API default is used.
# DATAPROC_SPARK_CONNECT_AUTH_TYPE="SERVICE_ACCOUNT"
# DATAPROC_SPARK_CONNECT_AUTH_TYPE="END_USER_CREDENTIALS"

# Service account email for workload authentication (optional)
# DATAPROC_SPARK_CONNECT_SERVICE_ACCOUNT="your-service-account@your-project.iam.gserviceaccount.com"

# ============================================================================
# SESSION CONFIGURATION
# ============================================================================

# Session timeout in seconds (how long session stays active)
# DATAPROC_SPARK_CONNECT_TTL_SECONDS="3600"

# Session idle timeout in seconds (how long session stays active when idle)
# DATAPROC_SPARK_CONNECT_IDLE_TTL_SECONDS="900"

# Automatically terminate session when Python process exits (true/false)
# DATAPROC_SPARK_CONNECT_SESSION_TERMINATE_AT_EXIT="false"

# Custom file path for storing active session information
# DATAPROC_SPARK_CONNECT_ACTIVE_SESSION_FILE_PATH="/tmp/dataproc_spark_connect_session"

# ============================================================================
# DATA SOURCE CONFIGURATION
# ============================================================================

# Default data source for Spark SQL (currently only supports "bigquery")
# Only available for Dataproc runtime version 2.3
# DATAPROC_SPARK_CONNECT_DEFAULT_DATASOURCE="bigquery"

# ============================================================================
# ADVANCED CONFIGURATION
# ============================================================================

# Custom Dataproc API endpoint (uncomment if needed)
# GOOGLE_CLOUD_DATAPROC_API_ENDPOINT="your-region-dataproc.googleapis.com"

# Subnet URI for Dataproc Spark Connect (full resource name format)
# Example: projects/your-project-id/regions/us-central1/subnetworks/your-subnet-name
# DATAPROC_SPARK_CONNECT_SUBNET="projects/your-project-id/regions/us-central1/subnetworks/your-subnet-name"


Builder API

請使用 .projectId().location() 方法。

spark = DataprocSparkSession.builder.projectId("my-project").location("us-central1").getOrCreate()

啟動 Spark 工作階段

如要啟動 Spark 工作階段,請將必要的匯入項目新增至 PySpark 應用程式或筆記本,然後呼叫 DataprocSparkSession.builder.getOrCreate() API。

  1. 匯入 DataprocSparkSession 類別。

  2. 呼叫 getOrCreate() 方法即可開始工作階段。

    from google.cloud.dataproc_spark_connect import DataprocSparkSession
    spark = DataprocSparkSession.builder.getOrCreate()
    

設定 Spark 屬性

如要設定 Spark 屬性,請將一或多個 .config() 方法鏈結至建構工具。

from google.cloud.dataproc_spark_connect import DataprocSparkSession

spark = DataprocSparkSession.builder.config('spark.executor.memory', '48g').config('spark.executor.cores', '8').getOrCreate()

使用進階設定

如需進階設定,請使用 Session 類別自訂設定,例如子網路或執行階段版本。

from google.cloud.dataproc_spark_connect import DataprocSparkSession
from google.cloud.dataproc_v1 import Session

session_config = Session()
session_config.environment_config.execution_config.subnetwork_uri = 'SUBNET'
session_config.runtime_config.version = '3.0'

spark = DataprocSparkSession.builder.projectId('my-project').location('us-central1').dataprocSessionConfig(session_config).getOrCreate()

重複使用已命名的工作階段

具名工作階段可讓您在多個筆記本之間共用單一 Spark 工作階段,同時避免重複啟動工作階段而導致延遲。

  1. 在第一個筆記本中,使用自訂 ID 建立工作階段。

    from google.cloud.dataproc_spark_connect import DataprocSparkSession
    
    session_id = 'my-ml-pipeline-session'
    spark = DataprocSparkSession.builder.dataprocSessionId(session_id).getOrCreate()
    df = spark.createDataFrame([(1, 'data')], ['id', 'value'])
    df.show()
    
  2. 在另一個筆記本中,指定相同的工作階段 ID,即可重複使用該工作階段。

    from google.cloud.dataproc_spark_connect import DataprocSparkSession
    
    session_id = 'my-ml-pipeline-session'
    spark = DataprocSparkSession.builder.dataprocSessionId(session_id).getOrCreate()
    df = spark.createDataFrame([(2, 'more-data')], ['id', 'value'])
    df.show()
    

工作階段 ID 的長度必須介於 4 至 63 個字元之間,只能使用小寫英文字母、數字和連字號,開頭須為小寫英文字母。ID 結尾不得為連字號。ID 處於 TERMINATED 狀態的工作階段無法重複使用。

使用 Spark SQL 神奇指令

這個套件支援 sparksql-magic 程式庫,可在 Jupyter 筆記本中執行 Spark SQL 查詢。魔法指令是選用功能。

  1. 安裝必要的依附元件。

    pip install IPython sparksql-magic
    
  2. 載入 Magic 擴充功能。

    %load_ext sparksql_magic
    
  3. 選用:設定預設設定。

    %config SparkSql.limit=20
    
  4. 執行 SQL 查詢。

    %%sparksql
    SELECT * FROM your_table
    

如要使用進階選項,請在 %%sparksql 指令中加入旗標。舉例來說,如要快取結果並建立檢視區塊,請執行下列指令:

%%sparksql --cache --view result_view df
SELECT * FROM your_table WHERE condition = true

可用的選項如下:

  • --cache-c:將 DataFrame 儲存在快取中。
  • --eager-e:使用預先載入的快取。
  • --view VIEW-v VIEW:建立臨時檢視畫面。
  • --limit N-l N:覆寫預設的資料列顯示限制。
  • variable_name:將結果儲存在變數中。

後續步驟