使用无服务器 Spark Connect 客户端

Dataproc Spark Connect 客户端是 Apache Spark Connect 客户端的封装容器。它允许应用使用 Spark Connect 协议与远程 Serverless for Apache Spark 会话进行通信。本文档介绍了如何安装、配置和使用客户端。

准备工作

  1. 确保您拥有包含管理互动会话和会话模板所需权限的 Identity and Access Management 角色

  2. 如果您在 Google Cloud之外运行客户端,请提供身份验证凭据。 将 GOOGLE_APPLICATION_CREDENTIALS 环境变量设置为服务账号密钥文件的路径。

安装或卸载客户端

您可以使用 pip 安装或卸载 dataproc-spark-connect 软件包。

安装

如需安装最新版本的客户端,请运行以下命令:

pip install -U dataproc-spark-connect

卸载

如需卸载客户端,请运行以下命令:

pip uninstall dataproc-spark-connect

配置客户端

为您的会话指定项目和区域。您可以使用环境变量或通过代码中的构建器 API 设置这些值。

环境变量

设置 GOOGLE_CLOUD_PROJECTGOOGLE_CLOUD_REGION 环境变量。

# Google Cloud Configuration for Dataproc Spark Connect Integration Tests
# Copy this file to .env and fill in your actual values

# ============================================================================
# REQUIRED CONFIGURATION
# ============================================================================

# Your Google Cloud Project ID
GOOGLE_CLOUD_PROJECT="your-project-id"

# Google Cloud Region where Dataproc sessions will be created
GOOGLE_CLOUD_REGION="us-central1"

# Path to service account key file (if using SERVICE_ACCOUNT auth)
GOOGLE_APPLICATION_CREDENTIALS="/path/to/your/service-account-key.json"

# ============================================================================
# AUTHENTICATION CONFIGURATION
# ============================================================================

# Authentication type (SERVICE_ACCOUNT or END_USER_CREDENTIALS). If not set, API default is used.
# DATAPROC_SPARK_CONNECT_AUTH_TYPE="SERVICE_ACCOUNT"
# DATAPROC_SPARK_CONNECT_AUTH_TYPE="END_USER_CREDENTIALS"

# Service account email for workload authentication (optional)
# DATAPROC_SPARK_CONNECT_SERVICE_ACCOUNT="your-service-account@your-project.iam.gserviceaccount.com"

# ============================================================================
# SESSION CONFIGURATION
# ============================================================================

# Session timeout in seconds (how long session stays active)
# DATAPROC_SPARK_CONNECT_TTL_SECONDS="3600"

# Session idle timeout in seconds (how long session stays active when idle)
# DATAPROC_SPARK_CONNECT_IDLE_TTL_SECONDS="900"

# Automatically terminate session when Python process exits (true/false)
# DATAPROC_SPARK_CONNECT_SESSION_TERMINATE_AT_EXIT="false"

# Custom file path for storing active session information
# DATAPROC_SPARK_CONNECT_ACTIVE_SESSION_FILE_PATH="/tmp/dataproc_spark_connect_session"

# ============================================================================
# DATA SOURCE CONFIGURATION
# ============================================================================

# Default data source for Spark SQL (currently only supports "bigquery")
# Only available for Dataproc runtime version 2.3
# DATAPROC_SPARK_CONNECT_DEFAULT_DATASOURCE="bigquery"

# ============================================================================
# ADVANCED CONFIGURATION
# ============================================================================

# Custom Dataproc API endpoint (uncomment if needed)
# GOOGLE_CLOUD_DATAPROC_API_ENDPOINT="your-region-dataproc.googleapis.com"

# Subnet URI for Dataproc Spark Connect (full resource name format)
# Example: projects/your-project-id/regions/us-central1/subnetworks/your-subnet-name
# DATAPROC_SPARK_CONNECT_SUBNET="projects/your-project-id/regions/us-central1/subnetworks/your-subnet-name"


Builder API

使用 .projectId().location() 方法。

spark = DataprocSparkSession.builder.projectId("my-project").location("us-central1").getOrCreate()

启动 Spark 会话

如需启动 Spark 会话,请将所需的导入项添加到 PySpark 应用或笔记本中,然后调用 DataprocSparkSession.builder.getOrCreate() API。

  1. 导入 DataprocSparkSession 类。

  2. 调用 getOrCreate() 方法以启动会话。

    from google.cloud.dataproc_spark_connect import DataprocSparkSession
    spark = DataprocSparkSession.builder.getOrCreate()
    

配置 Spark 属性

如需配置 Spark 属性,请将一个或多个 .config() 方法链接到构建器。

from google.cloud.dataproc_spark_connect import DataprocSparkSession

spark = DataprocSparkSession.builder.config('spark.executor.memory', '48g').config('spark.executor.cores', '8').getOrCreate()

使用高级配置

如需进行高级配置,请使用 Session 类自定义子网或运行时版本等设置。

from google.cloud.dataproc_spark_connect import DataprocSparkSession
from google.cloud.dataproc_v1 import Session

session_config = Session()
session_config.environment_config.execution_config.subnetwork_uri = 'SUBNET'
session_config.runtime_config.version = '3.0'

spark = DataprocSparkSession.builder.projectId('my-project').location('us-central1').dataprocSessionConfig(session_config).getOrCreate()

重复使用已命名的会话

借助命名会话,您可以在多个笔记本之间共享单个 Spark 会话,同时避免重复的会话启动时间延迟。

  1. 在第一个笔记本中,创建一个具有自定义 ID 的会话。

    from google.cloud.dataproc_spark_connect import DataprocSparkSession
    
    session_id = 'my-ml-pipeline-session'
    spark = DataprocSparkSession.builder.dataprocSessionId(session_id).getOrCreate()
    df = spark.createDataFrame([(1, 'data')], ['id', 'value'])
    df.show()
    
  2. 在另一个笔记本中,通过指定相同的会话 ID 来重用该会话。

    from google.cloud.dataproc_spark_connect import DataprocSparkSession
    
    session_id = 'my-ml-pipeline-session'
    spark = DataprocSparkSession.builder.dataprocSessionId(session_id).getOrCreate()
    df = spark.createDataFrame([(2, 'more-data')], ['id', 'value'])
    df.show()
    

会话 ID 的长度必须介于 4 到 63 个字符之间,以小写字母开头,并且只能包含小写字母、数字和连字符。ID 不得以连字符结尾。ID 处于 TERMINATED 状态的会话无法重复使用。

使用 Spark SQL magic 命令

该软件包支持 sparksql-magic,以便在 Jupyter 笔记本中执行 Spark SQL 查询。Magic 命令是一项可选功能。

  1. 安装所需的依赖项。

    pip install IPython sparksql-magic
    
  2. 加载魔法扩展程序。

    %load_ext sparksql_magic
    
  3. 可选:配置默认设置。

    %config SparkSql.limit=20
    
  4. 执行 SQL 查询。

    %%sparksql
    SELECT * FROM your_table
    

如需使用高级选项,请向 %%sparksql 命令添加标志。例如,如需缓存结果并创建视图,请运行以下命令:

%%sparksql --cache --view result_view df
SELECT * FROM your_table WHERE condition = true

您可以使用以下选项:

  • --cache-c:缓存 DataFrame。
  • --eager-e:采用预先加载的缓存。
  • --view VIEW-v VIEW:创建临时视图。
  • --limit N-l N:替换默认的行显示限制。
  • variable_name:将结果存储在变量中。

后续步骤