Dataproc Spark Connect 客户端是 Apache Spark Connect 客户端的封装容器。它允许应用使用 Spark Connect 协议与远程 Serverless for Apache Spark 会话进行通信。本文档介绍了如何安装、配置和使用客户端。
准备工作
如果您在 Google Cloud之外运行客户端,请提供身份验证凭据。 将
GOOGLE_APPLICATION_CREDENTIALS环境变量设置为服务账号密钥文件的路径。
安装或卸载客户端
您可以使用 pip 安装或卸载 dataproc-spark-connect 软件包。
安装
如需安装最新版本的客户端,请运行以下命令:
pip install -U dataproc-spark-connect
卸载
如需卸载客户端,请运行以下命令:
pip uninstall dataproc-spark-connect
配置客户端
为您的会话指定项目和区域。您可以使用环境变量或通过代码中的构建器 API 设置这些值。
环境变量
设置 GOOGLE_CLOUD_PROJECT 和 GOOGLE_CLOUD_REGION 环境变量。
Builder API
使用 .projectId() 和 .location() 方法。
spark = DataprocSparkSession.builder.projectId("my-project").location("us-central1").getOrCreate()
启动 Spark 会话
如需启动 Spark 会话,请将所需的导入项添加到 PySpark 应用或笔记本中,然后调用 DataprocSparkSession.builder.getOrCreate() API。
导入
DataprocSparkSession类。调用
getOrCreate()方法以启动会话。from google.cloud.dataproc_spark_connect import DataprocSparkSession spark = DataprocSparkSession.builder.getOrCreate()
配置 Spark 属性
如需配置 Spark 属性,请将一个或多个 .config() 方法链接到构建器。
from google.cloud.dataproc_spark_connect import DataprocSparkSession
spark = DataprocSparkSession.builder.config('spark.executor.memory', '48g').config('spark.executor.cores', '8').getOrCreate()
使用高级配置
如需进行高级配置,请使用 Session 类自定义子网或运行时版本等设置。
from google.cloud.dataproc_spark_connect import DataprocSparkSession
from google.cloud.dataproc_v1 import Session
session_config = Session()
session_config.environment_config.execution_config.subnetwork_uri = 'SUBNET'
session_config.runtime_config.version = '3.0'
spark = DataprocSparkSession.builder.projectId('my-project').location('us-central1').dataprocSessionConfig(session_config).getOrCreate()
重复使用已命名的会话
借助命名会话,您可以在多个笔记本之间共享单个 Spark 会话,同时避免重复的会话启动时间延迟。
在第一个笔记本中,创建一个具有自定义 ID 的会话。
from google.cloud.dataproc_spark_connect import DataprocSparkSession session_id = 'my-ml-pipeline-session' spark = DataprocSparkSession.builder.dataprocSessionId(session_id).getOrCreate() df = spark.createDataFrame([(1, 'data')], ['id', 'value']) df.show()在另一个笔记本中,通过指定相同的会话 ID 来重用该会话。
from google.cloud.dataproc_spark_connect import DataprocSparkSession session_id = 'my-ml-pipeline-session' spark = DataprocSparkSession.builder.dataprocSessionId(session_id).getOrCreate() df = spark.createDataFrame([(2, 'more-data')], ['id', 'value']) df.show()
会话 ID 的长度必须介于 4 到 63 个字符之间,以小写字母开头,并且只能包含小写字母、数字和连字符。ID 不得以连字符结尾。ID 处于 TERMINATED 状态的会话无法重复使用。
使用 Spark SQL magic 命令
该软件包支持 sparksql-magic 库,以便在 Jupyter 笔记本中执行 Spark SQL 查询。Magic 命令是一项可选功能。
安装所需的依赖项。
pip install IPython sparksql-magic加载魔法扩展程序。
%load_ext sparksql_magic可选:配置默认设置。
%config SparkSql.limit=20执行 SQL 查询。
%%sparksql SELECT * FROM your_table
如需使用高级选项,请向 %%sparksql 命令添加标志。例如,如需缓存结果并创建视图,请运行以下命令:
%%sparksql --cache --view result_view df
SELECT * FROM your_table WHERE condition = true
您可以使用以下选项:
--cache或-c:缓存 DataFrame。--eager或-e:采用预先加载的缓存。--view VIEW或-v VIEW:创建临时视图。--limit N或-l N:替换默认的行显示限制。variable_name:将结果存储在变量中。