使用 Lightning Engine 加速 Google Cloud Serverless for Apache Spark

本文档介绍了如何启用 Lightning Engine 来加速 Serverless for Apache Spark 批量工作负载和交互式会话。

概览

Lightning Engine 是一款高性能查询加速器,由多层优化引擎提供支持,该引擎可执行常规优化技术(例如查询和执行优化),以及文件系统层和数据访问连接器中的精选优化。

如下图所示,Lightning Engine 可在类似 TPC-H 的工作负载(10 TB 数据集大小)下提升 Spark 查询执行性能。

如需了解详情,请参阅全新推出 Lightning Engine,新一代 Apache Spark 性能

Lightning Engine 可用性

  • Lightning Engine 可与受支持的 Serverless for Apache Spark 运行时搭配使用(目前为正式版运行时 1.22.22.3;不适用于 Spark 运行时 3.0)。
  • Lightning Engine 仅适用于 Serverless for Apache Spark 高级价格层级
    • 批处理工作负载:对于高级层级的批处理工作负载,系统会自动启用 Lightning Engine。您无需采取任何行动。
    • 交互式会话:默认情况下,Lightning Engine 不会针对交互式会话启用。如需启用该功能,请参阅启用 Lightning 引擎
    • 会话模板:默认情况下,Lightning Engine 不会针对会话模板启用。如需启用该功能,请参阅启用 Lightning 引擎

启用 Lightning Engine

以下部分介绍了如何在 Serverless for Apache Spark 批处理工作负载、会话模板和交互式会话中启用 Lightning 引擎。

批处理工作负载

在批处理工作负载上启用 Lightning Engine

您可以使用 Google Cloud 控制台、Google Cloud CLI 或 Dataproc API 在批量工作负载上启用 Lightning Engine。

控制台

使用 Google Cloud 控制台在批量工作负载上启用 Lightning Engine。

  1. 在 Google Cloud 控制台中:

    1. 前往 Dataproc 批次
    2. 点击创建,打开创建批处理页面。
  2. 选择并填写以下字段:

    • 容器
    • 层级配置

      • 选择 Premium。这会自动启用并选中“启用 LIGHTNING ENGINE 以提升 Spark 性能”。

      选择高级层级后,驱动程序计算层级执行器计算层级会设置为 Premium。对于使用 3.0 之前版本的运行时的批次,这些自动设置的高级层级计算设置无法替换。

      您可以将驱动程序磁盘层级执行器磁盘层级配置为 Premium,也可以将其保留为默认的 Standard 层级值。如果您选择高级磁盘层级,则必须选择磁盘大小。如需了解详情,请参阅资源分配属性

    • 属性:可选:如果您想选择 Native Query Execution 运行时,请输入以下 Key(属性名称)和 Value 对:

      spark.dataproc.lightningEngine.runtime 本国的/原生的/土著

  3. 填写、选择或确认其他批量工作负载设置。请参阅提交 Spark 批处理工作负载

  4. 点击提交以运行 Spark 批处理工作负载。

gcloud

设置以下 gcloud CLI gcloud dataproc batches submit spark 命令标志,以在批处理工作负载上启用 Lightning Engine。

gcloud dataproc batches submit spark \
    --project=PROJECT_ID \
    --region=REGION \
    --properties=dataproc.tier=premium \
    OTHER_FLAGS_AS_NEEDED

注意:

  • PROJECT_ID:您的 Google Cloud 项目 ID。 项目 ID 列在 Google Cloud 控制台信息中心项目信息部分中。
  • REGION:用于运行工作负载的可用 Compute Engine 区域
  • --properties=dataproc.tier=premium。 设置高级层级会自动为批处理工作负载设置以下属性:

    • spark.dataproc.engine=lightningEngine 为批处理工作负载选择 Lightning Engine。
    • spark.dataproc.driver.compute.tierspark.dataproc.executor.compute.tier 设置为 premium(请参阅资源分配属性)。对于使用 3.0 之前版本的运行时的批处理,这些自动设置的高级层级计算设置无法替换。
  • 其他属性

    • 原生查询引擎spark.dataproc.lightningEngine.runtime=native 如果您想选择原生查询执行运行时,请添加此属性。

    • 磁盘层级和大小:默认情况下,驱动程序和执行程序磁盘大小设置为 standard 层级和大小。您可以添加属性来选择 premium 磁盘层级和大小(以 375 GiB 为倍数)。
      如需了解详情,请参阅资源分配属性

  • OTHER_FLAGS_AS_NEEDED:请参阅提交 Spark 批处理工作负载

API

如需在批量工作负载上启用 Lightning Engine,请将“dataproc.tier”:“premium”添加到 RuntimeConfig.properties,作为 batches.create 请求的一部分。设置高级层级会自动为批处理工作负载设置以下属性:

  • spark.dataproc.engine=lightningEngine 为批处理工作负载选择 Lightning Engine。
  • spark.dataproc.driver.compute.tierspark.dataproc.executor.compute.tier 设置为 premium(请参阅资源分配属性)。对于使用 3.0 之前版本的运行时进行的批处理,这些自动设置的高级层级计算设置无法替换。

其他 RuntimeConfig.properties

  • 原生查询引擎spark.dataproc.lightningEngine.runtime:native。 如果您想选择原生查询执行运行时,请添加此属性。

  • 磁盘层级和大小:默认情况下,驱动程序和执行程序磁盘大小设置为 standard 层级和大小。您可以添加属性来选择 premium 层级和大小(以 375 GiB 为倍数)。
    如需了解详情,请参阅资源分配属性

如需设置其他批量工作负载 API 字段,请参阅提交 Spark 批量工作负载

会话模板

在会议模板上启用 Lightning Engine

您可以使用 Google Cloud 控制台、Google Cloud CLI 或 Dataproc API 在 Jupyter 或 Spark Connect 会话的会话模板上启用 Lightning Engine。

控制台

使用 Google Cloud 控制台在批量工作负载上启用 Lightning Engine。

  1. 在 Google Cloud 控制台中:

    1. 前往“Dataproc 会话模板”
    2. 点击创建,打开创建会话模板页面。
  2. 选择并填写以下字段:

    • 会话模板信息
      • 选择“启用 Lightning Engine 以提升 Spark 性能”。
    • 执行配置
    • 属性: 输入以下 Key(属性名称)和 Value 对,以选择 Premium 级:

      dataproc.tier 高级
      spark.dataproc.engine lightningEngine

      可选:输入以下 Key(属性名称)和 Value 对,以选择 Native Query Execution 运行时:

      spark.dataproc.lightningEngine.runtime native

  3. 填写、选择或确认其他会话模板设置。请参阅创建会话模板

  4. 点击提交以创建会话模板。

gcloud

您无法使用 gcloud CLI 直接创建 Serverless for Apache Spark 会话模板。您可以改为使用 gcloud beta dataproc session-templates import 命令导入现有会话模板,修改导入的模板以启用 Lightning Engine 和(可选)原生查询运行时,然后使用 gcloud beta dataproc session-templates export 命令导出修改后的模板。

API

如需在会话模板上启用 Lightning Engine,请将“dataproc.tier”:“premium”和“spark.dataproc.engine”:“lightningEngine”添加到 RuntimeConfig.properties,作为 sessionTemplates.create 请求的一部分。

其他 RuntimeConfig.properties

  • 原生查询引擎spark.dataproc.lightningEngine.runtime:native:将此属性添加到 RuntimeConfig.properties 以选择原生查询执行运行时。

如需设置其他会话模板 API 字段,请参阅创建会话模板

交互式会话

在交互式会话中启用 Lightning Engine

您可以使用 Google Cloud CLI 或 Dataproc API 在 Serverless for Apache Spark 交互式会话中启用 Lightning Engine。 您还可以在 BigQuery Studio 笔记本的交互式会话中启用 Lightning Engine。

gcloud

设置以下 gcloud CLI gcloud beta dataproc sessions create spark 命令标志,以在交互式会话中启用 Lightning Engine。

gcloud beta dataproc sessions create spark \
    --project=PROJECT_ID \
    --location=REGION \
    --properties=dataproc.tier=premium,spark.dataproc.engine=lightningEngine \
    OTHER_FLAGS_AS_NEEDED

注意:

  • PROJECT_ID:您的 Google Cloud 项目 ID。 项目 ID 列在 Google Cloud 控制台信息中心项目信息部分中。
  • REGION:用于运行工作负载的可用 Compute Engine 区域
  • --properties=dataproc.tier=premium,spark.dataproc.engine=lightningEngine。 这些属性可在会话中启用 Lightning Engine。

  • 其他房源:

    • 原生查询引擎spark.dataproc.lightningEngine.runtime=native: 添加此属性可选择原生查询执行运行时。
  • OTHER_FLAGS_AS_NEEDED:请参阅创建交互式会话

API

如需在会话中启用 Lightning Engine,请将“dataproc.tier”:“premium”和“spark.dataproc.engine”:“lightningEngine”添加到 RuntimeConfig.properties,作为 sessions.create 请求的一部分。

其他 RuntimeConfig.properties

* 原生查询引擎spark.dataproc.lightningEngine.runtime:native: 如果您想选择原生查询执行运行时,请将此属性添加到 RuntimeConfig.properties

如需设置其他会话模板 API 字段,请参阅创建交互式会话

BigQuery 笔记本

您可以在 BigQuery Studio PySpark 笔记本中创建会话时启用 Lightning Engine。

from google.cloud.dataproc_spark_connect import DataprocSparkSession
from google.cloud.dataproc_v1 import Session
session = Session()

# Enable Lightning Engine.
session.runtime_config.properties["dataproc.tier"] = "premium"
session.runtime_config.properties["spark.dataproc.engine"] = "lightningEngine"

# Enable THE Native Query Execution runtime.
session.runtime_config.properties["spark.dataproc.lightningEngine.runtime"] = "native"

# Create the Spark session.
spark = (
   DataprocSparkSession.builder
     .appName("APP_NAME")
     .dataprocSessionConfig(session)
     .getOrCreate())

# Add Spark application code here:

验证 Lightning Engine 设置

您可以使用 Google Cloud 控制台、Google Cloud CLI 或 Dataproc API 来验证批处理工作负载、会话模板或交互式会话中的 Lightning Engine 设置。

批处理工作负载

  • 如需验证批处理是否已设置为 premium,以及引擎是否已设置为 Lightning Engine,请执行以下操作:

    • Google Cloud 控制台:在批次页面上,查看批次的层级引擎列。您还可以点击批次 ID,在批次详情页面上查看这些设置。
    • gcloud CLI:运行 gcloud dataproc batches describe 命令。
    • API:发出 batches.get 请求。

会话模板

  • 如需验证会话模板的 engine 是否已设置为 Lightning Engine,请执行以下操作:

    • Google Cloud 控制台:在会话模板页面上,查看模板的引擎列。您还可以点击会话模板名称,在会话模板详情页面上查看此设置。
    • gcloud CLI:运行 gcloud beta dataproc session-templates describe 命令。
    • API:发出 sessionTemplates.get 请求。

交互式会话

  • 对于交互式会话,引擎设置为 Lightning Engine

    • Google Cloud 控制台:在交互式会话页面上,查看模板的引擎列。您还可以点击交互式会话 ID,在会话模板详情页面上查看此设置。
    • gcloud CLI:运行 gcloud beta dataproc sessions describe 命令。
    • API:发出 sessions.get 请求。

原生查询执行

原生查询执行 (NQE) 是一项可选的 Lightning Engine 功能,它通过基于 Apache GlutenVelox 的原生实现来提升性能,该实现专为 Google 硬件而设计。

原生查询执行运行时包含统一的内存管理,可在堆外内存和堆内内存之间动态切换,而无需更改现有的 Spark 配置。NQE 扩展了对运算符、函数和 Spark 数据类型的支持,并具备智能功能,可自动识别使用原生引擎实现最佳下推操作的机会。

识别原生查询执行工作负载

在以下场景中使用原生查询执行:

  • 从 Parquet 和 ORC 文件读取数据的 Spark Dataframe API、Spark Dataset API 和 Spark SQL 查询。输出文件格式不会影响原生查询执行性能。

  • 原生查询执行资格认证工具推荐的工作负载。

不建议对输入数据具有以下数据类型的工作负载使用原生查询执行:

  • 字节:ORC 和 Parquet
  • 时间戳:ORC
  • 结构体、数组、映射:Parquet

原生查询执行限制

在以下场景中启用原生查询执行可能会导致异常、Spark 不兼容或工作负载回退到默认的 Spark 引擎。

后备广告

在以下执行中执行原生查询可能会导致工作负载回退到 Spark 执行引擎,从而导致回归或失败。

  • ANSI:如果启用了 ANSI 模式,执行会回退到 Spark。

  • 区分大小写模式:原生查询执行仅支持 Spark 默认的不区分大小写模式。如果启用了区分大小写模式,可能会出现不正确的结果。

  • 分区表扫描:仅当路径包含分区信息时,原生查询执行才支持分区表扫描,否则工作负载会回退到 Spark 执行引擎。

不兼容的行为

在以下情况下使用原生查询执行时,可能会导致不兼容的行为或错误的结果:

  • JSON 函数:原生查询执行支持用英文双引号而非英文单引号括起来的字符串。使用单引号时出现错误结果。在包含 get_json_object 函数的路径中使用“*”会返回 NULL

  • Parquet 读取配置

    • 即使将 spark.files.ignoreCorruptFiles 设置为 true,原生查询执行也会将其视为设置为默认值 false
    • 原生查询执行会忽略 spark.sql.parquet.datetimeRebaseModeInRead,并仅返回 Parquet 文件内容。不考虑旧版混合(儒略格里高利)日历与 Proleptic 格里高利日历之间的差异。Spark 结果可能会有所不同。
  • NaN:不支持。例如,在数值比较中使用 NaN 时,可能会出现意外结果。

  • Spark 列式读取:由于 Spark 列式向量与原生查询执行不兼容,可能会发生严重错误。

  • 溢出:当 shuffle 分区设置为较大数量时,溢出到磁盘功能可能会触发 OutOfMemoryException。如果发生这种情况,减少分区数量可以消除此异常。