使用无服务器 Spark Connect 客户端

Dataproc Spark Connect 客户端是 Apache Spark Connect 客户端的封装容器。它让应用可以使用 Spark Connect 协议与远程 Serverless for Apache Spark 会话通信。 本文档介绍了如何安装、配置和使用该客户端。

准备工作

  1. 确保您拥有包含管理 互动会话和会话模板所需的权限的 Identity and Access Management 角色

  2. 如果您在 Google CloudGoogle Cloud 之外运行客户端,请提供 身份验证凭据。 将 GOOGLE_APPLICATION_CREDENTIALS 环境变量 设置为服务账号密钥文件的路径。

安装或卸载客户端

您可以使用 pip 安装或卸载 dataproc-spark-connect 软件包。

安装

如需安装最新版本的客户端,请运行以下命令:

pip install -U dataproc-spark-connect

卸载

如需卸载客户端,请运行以下命令:

pip uninstall dataproc-spark-connect

配置客户端

为会话指定项目和区域。您可以使用环境变量或代码中的构建器 API 设置这些值。

环境变量

设置 GOOGLE_CLOUD_PROJECTGOOGLE_CLOUD_REGION 环境变量。

# Google Cloud Configuration for Dataproc Spark Connect Integration Tests
# Copy this file to .env and fill in your actual values

# ============================================================================
# REQUIRED CONFIGURATION
# ============================================================================

# Your Google Cloud Project ID
GOOGLE_CLOUD_PROJECT="your-project-id"

# Google Cloud Region where Dataproc sessions will be created
GOOGLE_CLOUD_REGION="us-central1"

# Path to service account key file (if using SERVICE_ACCOUNT auth)
GOOGLE_APPLICATION_CREDENTIALS="/path/to/your/service-account-key.json"

# ============================================================================
# AUTHENTICATION CONFIGURATION
# ============================================================================

# Authentication type (SERVICE_ACCOUNT or END_USER_CREDENTIALS). If not set, API default is used.
# DATAPROC_SPARK_CONNECT_AUTH_TYPE="SERVICE_ACCOUNT"
# DATAPROC_SPARK_CONNECT_AUTH_TYPE="END_USER_CREDENTIALS"

# Service account email for workload authentication (optional)
# DATAPROC_SPARK_CONNECT_SERVICE_ACCOUNT="your-service-account@your-project.iam.gserviceaccount.com"

# ============================================================================
# SESSION CONFIGURATION
# ============================================================================

# Session timeout in seconds (how long session stays active)
# DATAPROC_SPARK_CONNECT_TTL_SECONDS="3600"

# Session idle timeout in seconds (how long session stays active when idle)
# DATAPROC_SPARK_CONNECT_IDLE_TTL_SECONDS="900"

# Automatically terminate session when Python process exits (true/false)
# DATAPROC_SPARK_CONNECT_SESSION_TERMINATE_AT_EXIT="false"

# Custom file path for storing active session information
# DATAPROC_SPARK_CONNECT_ACTIVE_SESSION_FILE_PATH="/tmp/dataproc_spark_connect_session"

# ============================================================================
# DATA SOURCE CONFIGURATION
# ============================================================================

# Default data source for Spark SQL (currently only supports "bigquery")
# Only available for Dataproc runtime version 2.3
# DATAPROC_SPARK_CONNECT_DEFAULT_DATASOURCE="bigquery"

# ============================================================================
# ADVANCED CONFIGURATION
# ============================================================================

# Custom Dataproc API endpoint (uncomment if needed)
# GOOGLE_CLOUD_DATAPROC_API_ENDPOINT="your-region-dataproc.googleapis.com"

# Subnet URI for Dataproc Spark Connect (full resource name format)
# Example: projects/your-project-id/regions/us-central1/subnetworks/your-subnet-name
# DATAPROC_SPARK_CONNECT_SUBNET="projects/your-project-id/regions/us-central1/subnetworks/your-subnet-name"


构建器 API

使用 .projectId().location() 方法。

spark = DataprocSparkSession.builder.projectId("my-project").location("us-central1").getOrCreate()

启动 Spark 会话

如需启动 Spark 会话,请将所需的导入添加到 PySpark 应用 或笔记本,然后调用 DataprocSparkSession.builder.getOrCreate() API。

  1. 导入 DataprocSparkSession 类。

  2. 调用 getOrCreate() 方法以启动会话。

    from google.cloud.dataproc_spark_connect import DataprocSparkSession
    spark = DataprocSparkSession.builder.getOrCreate()
    

配置 Spark 属性

如需配置 Spark 属性,请将一个或多个 .config() 方法链接到 构建器。

from google.cloud.dataproc_spark_connect import DataprocSparkSession

spark = DataprocSparkSession.builder.config('spark.executor.memory', '48g').config('spark.executor.cores', '8').getOrCreate()

使用高级配置

如需进行高级配置,请使用 Session 类自定义设置,例如子网或运行时版本。

from google.cloud.dataproc_spark_connect import DataprocSparkSession
from google.cloud.dataproc_v1 import Session

session_config = Session()
session_config.environment_config.execution_config.subnetwork_uri = 'SUBNET'
session_config.runtime_config.version = '3.0'

spark = DataprocSparkSession.builder.projectId('my-project').location('us-central1').dataprocSessionConfig(session_config).getOrCreate()

重复使用已命名的会话

借助已命名的会话,您可以在多个笔记本之间共享单个 Spark 会话 同时避免重复的会话启动时间延迟。

  1. 在第一个笔记本中,使用自定义 ID 创建会话。

    from google.cloud.dataproc_spark_connect import DataprocSparkSession
    
    session_id = 'my-ml-pipeline-session'
    spark = DataprocSparkSession.builder.dataprocSessionId(session_id).getOrCreate()
    df = spark.createDataFrame([(1, 'data')], ['id', 'value'])
    df.show()
    
  2. 在另一个笔记本中,通过指定相同的会话 ID 重复使用该会话。

    from google.cloud.dataproc_spark_connect import DataprocSparkSession
    
    session_id = 'my-ml-pipeline-session'
    spark = DataprocSparkSession.builder.dataprocSessionId(session_id).getOrCreate()
    df = spark.createDataFrame([(2, 'more-data')], ['id', 'value'])
    df.show()
    

会话 ID 的长度必须介于 4 到 63 个字符之间,以小写字母开头,并且 只能包含小写字母、数字和连字符。ID 不能以 连字符结尾。ID 处于 TERMINATED 状态的会话无法 重复使用。

使用 Spark SQL magic 命令

该软件包支持 sparksql-magic,以便在 Jupyter 笔记本中执行 Spark SQL 查询。Magic 命令是一项可选功能。

  1. 安装所需的依赖项。

    pip install IPython sparksql-magic
    
  2. 加载 magic 扩展程序。

    %load_ext sparksql_magic
    
  3. 可选:配置默认设置。

    %config SparkSql.limit=20
    
  4. 执行 SQL 查询。

    %%sparksql
    SELECT * FROM your_table
    

如需使用高级选项,请将标志添加到 %%sparksql 命令。例如,如需 缓存结果并创建视图,请运行以下命令:

%%sparksql --cache --view result_view df
SELECT * FROM your_table WHERE condition = true

提供的选项如下:

  • --cache-c:缓存 DataFrame。
  • --eager-e:使用抢先式加载进行缓存。
  • --view VIEW-v VIEW:创建临时视图。
  • --limit N-l N:替换默认的行显示限制。
  • variable_name:将结果存储在变量中。

后续步骤