Dataproc Spark Connect 客户端是 Apache Spark Connect 客户端的封装容器。它让应用可以使用 Spark Connect 协议与远程 Serverless for Apache Spark 会话通信。 本文档介绍了如何安装、配置和使用该客户端。
准备工作
确保您拥有包含管理 互动会话和会话模板所需的权限的 Identity and Access Management 角色。
如果您在 Google CloudGoogle Cloud 之外运行客户端,请提供 身份验证凭据。 将
GOOGLE_APPLICATION_CREDENTIALS环境变量 设置为服务账号密钥文件的路径。
安装或卸载客户端
您可以使用 pip 安装或卸载 dataproc-spark-connect 软件包。
安装
如需安装最新版本的客户端,请运行以下命令:
pip install -U dataproc-spark-connect
卸载
如需卸载客户端,请运行以下命令:
pip uninstall dataproc-spark-connect
配置客户端
为会话指定项目和区域。您可以使用环境变量或代码中的构建器 API 设置这些值。
环境变量
设置 GOOGLE_CLOUD_PROJECT 和 GOOGLE_CLOUD_REGION 环境变量。
构建器 API
使用 .projectId() 和 .location() 方法。
spark = DataprocSparkSession.builder.projectId("my-project").location("us-central1").getOrCreate()
启动 Spark 会话
如需启动 Spark 会话,请将所需的导入添加到 PySpark 应用
或笔记本,然后调用 DataprocSparkSession.builder.getOrCreate() API。
导入
DataprocSparkSession类。调用
getOrCreate()方法以启动会话。from google.cloud.dataproc_spark_connect import DataprocSparkSession spark = DataprocSparkSession.builder.getOrCreate()
配置 Spark 属性
如需配置 Spark 属性,请将一个或多个 .config() 方法链接到
构建器。
from google.cloud.dataproc_spark_connect import DataprocSparkSession
spark = DataprocSparkSession.builder.config('spark.executor.memory', '48g').config('spark.executor.cores', '8').getOrCreate()
使用高级配置
如需进行高级配置,请使用 Session 类自定义设置,例如子网或运行时版本。
from google.cloud.dataproc_spark_connect import DataprocSparkSession
from google.cloud.dataproc_v1 import Session
session_config = Session()
session_config.environment_config.execution_config.subnetwork_uri = 'SUBNET'
session_config.runtime_config.version = '3.0'
spark = DataprocSparkSession.builder.projectId('my-project').location('us-central1').dataprocSessionConfig(session_config).getOrCreate()
重复使用已命名的会话
借助已命名的会话,您可以在多个笔记本之间共享单个 Spark 会话 同时避免重复的会话启动时间延迟。
在第一个笔记本中,使用自定义 ID 创建会话。
from google.cloud.dataproc_spark_connect import DataprocSparkSession session_id = 'my-ml-pipeline-session' spark = DataprocSparkSession.builder.dataprocSessionId(session_id).getOrCreate() df = spark.createDataFrame([(1, 'data')], ['id', 'value']) df.show()在另一个笔记本中,通过指定相同的会话 ID 重复使用该会话。
from google.cloud.dataproc_spark_connect import DataprocSparkSession session_id = 'my-ml-pipeline-session' spark = DataprocSparkSession.builder.dataprocSessionId(session_id).getOrCreate() df = spark.createDataFrame([(2, 'more-data')], ['id', 'value']) df.show()
会话 ID 的长度必须介于 4 到 63 个字符之间,以小写字母开头,并且
只能包含小写字母、数字和连字符。ID 不能以
连字符结尾。ID 处于 TERMINATED 状态的会话无法
重复使用。
使用 Spark SQL magic 命令
该软件包支持
sparksql-magic 库,以便在 Jupyter 笔记本中执行
Spark SQL 查询。Magic 命令是一项可选功能。
安装所需的依赖项。
pip install IPython sparksql-magic加载 magic 扩展程序。
%load_ext sparksql_magic可选:配置默认设置。
%config SparkSql.limit=20执行 SQL 查询。
%%sparksql SELECT * FROM your_table
如需使用高级选项,请将标志添加到 %%sparksql 命令。例如,如需
缓存结果并创建视图,请运行以下命令:
%%sparksql --cache --view result_view df
SELECT * FROM your_table WHERE condition = true
提供的选项如下:
--cache或-c:缓存 DataFrame。--eager或-e:使用抢先式加载进行缓存。--view VIEW或-v VIEW:创建临时视图。--limit N或-l N:替换默认的行显示限制。variable_name:将结果存储在变量中。