为了提高数据流水线的性能,您可以将一些转换操作推送到 BigQuery,而不是 Apache Spark。 转换下推是指一项设置,可让 Cloud Data Fusion 数据流水线中的操作作为执行引擎推送到 BigQuery。因此,操作及其数据会转移到 BigQuery,并在其中执行操作。
转换推送可提高具有
多个复杂
JOIN操作
或其他受支持转换的流水线的性能。在 BigQuery 中执行某些转换可能比在 Spark 中执行更快。
不受支持的转换和所有预览转换都在 Spark 中执行。
支持的转换
Cloud Data Fusion 6.5.0 版及更高版本提供了转换下推功能,但以下某些转换仅在更高版本中受支持。
JOIN 操作
Cloud Data Fusion 6.5.0 版及更高版本中的
JOIN操作支持转换下推。支持基本(基于键)和高级
JOIN操作。联接必须正好有两个输入阶段,才能在 BigQuery 中执行。
配置为将一个或多个输入加载到内存中的联接将在 Spark 中执行,而不是在 BigQuery 中执行,但以下情况除外:
- 联接的任何输入已下推。
- 您已将联接配置为在 SQL 引擎中执行(请参阅 强制执行的阶段 选项)。
BigQuery 接收器
Cloud Data Fusion 6.7.0 版及更高版本中的 BigQuery 接收器支持转换下推。
当 BigQuery 接收器跟随在 BigQuery 中执行的阶段时,将记录写入 BigQuery 的操作将直接在 BigQuery 中执行。
如需使用此接收器提高性能,您需要:
- 服务帐号必须有权在 BigQuery 接收器使用的数据集中创建和更新表。
- 用于转换下推和 BigQuery 接收器的数据集必须存储在同一 位置。
- 操作必须是以下操作之一:
Insert(不支持Truncate Table选项)UpdateUpsert
GROUP BY 聚合
Cloud Data Fusion 6.7.0 版及更高版本中的 GROUP BY 聚合支持转换下推。
BigQuery 中的 GROUP BY 聚合可用于以下操作:
AvgCollect List(从输出数组中移除 null 值)Collect Set(从输出数组中移除 null 值)ConcatConcat DistinctCountCount DistinctCount NullsLogical AndLogical OrMaxMinStandard DeviationSumSum of SquaresCorrected Sum of SquaresVarianceShortest StringLongest String
在以下情况下,GROUP BY 聚合将在 BigQuery 中执行:
- 它跟随已下推的阶段。
- 您已将其配置为在 SQL 引擎中执行(请参阅 强制执行的阶段 选项)。
重复数据删除聚合
Cloud Data Fusion 6.7.0 版及更高版本中的去重汇总支持转换下推,适用于以下操作:
- 未指定过滤操作
ANY(所需字段的非 null 值)MIN(指定字段的最小值)MAX(指定字段的最大值)
不支持以下操作:
FIRSTLAST
在以下情况下,重复数据删除聚合将在 SQL 引擎中执行:
- 它跟随已下推的阶段。
- 您已将其配置为在 SQL 引擎中执行(请参阅 强制执行的阶段 选项)。
BigQuery 来源下推
Cloud Data Fusion 6.8.0 版及更高版本提供了 BigQuery 来源下推功能。
当 BigQuery 来源跟随与 BigQuery 下推兼容的阶段时,流水线可以在 BigQuery 中执行所有兼容的阶段。
Cloud Data Fusion 会复制在 BigQuery 中执行流水线所需的记录。
使用 BigQuery 来源下推时,系统会保留表分区和聚簇属性,以便您使用这些属性来优化进一步的操作,例如联接。
其他要求
如需使用 BigQuery 来源下推,必须满足以下要求:
为 BigQuery Transformation Pushdown 配置的服务帐号必须有权读取 BigQuery 来源的数据集中的表。
BigQuery 来源中使用的数据集和为转换推送配置的数据集必须存储在同一 位置。
窗口聚合
Cloud Data Fusion 6.9 版及更高版本中的窗口聚合支持转换下推。BigQuery 中的窗口聚合支持以下操作:
RankDense RankPercent RankN tileRow NumberMedianContinuous PercentileLeadLagFirstLastCumulative distributionAccumulate
在以下情况下,窗口聚合将在 BigQuery 中执行:
- 它跟随已下推的阶段。
- 您已将其配置为在 SQL 引擎中执行(请参阅 强制下推的阶段 选项)。
Wrangler 过滤条件推送
Cloud Data Fusion 6.9 版及更高版本提供了 Wrangler 过滤条件推送功能。
使用 Wrangler 插件时,您可以推送过滤条件(称为 Precondition 操作),以便在 BigQuery 中执行,而不是在 Spark 中执行。
过滤条件推送仅在 Preconditions的 SQL 模式下受支持,该模式也在 6.9 版中发布。在此模式下,该插件接受 ANSI 标准 SQL 中的前提条件表达式。
如果 SQL 模式用于前提条件,则 Wrangler 插件的指令 和用户定义的指令 将被停用,因为它们在 SQL 模式下不支持前提条件。
启用转换下推后,具有多个输入的 Wrangler 插件不支持前提条件的 SQL 模式。如果与多个输入一起使用,则此具有 SQL 过滤条件的 Wrangler 阶段将在 Spark 中执行。
在以下情况下,过滤条件将在 BigQuery 中执行:
- 它跟随已下推的阶段。
- 您已将其配置为在 SQL 引擎中执行(请参阅 强制下推的阶段 选项)。
指标
如需详细了解 Cloud Data Fusion 为 在 BigQuery 中执行的流水线部分提供的指标,请参阅 BigQuery 下推流水线指标。
何时使用转换下推
在 BigQuery 中执行转换涉及以下操作:
- 将记录写入流水线中受支持的阶段的 BigQuery。
- 在 BigQuery 中执行受支持的阶段。
- 在执行受支持的 转换后从 BigQuery 读取记录,除非它们后跟 BigQuery 接收器。
根据数据集的大小,可能会产生大量的网络开销,这可能会对启用转换下推的总体流水线执行时间产生负面影响。
由于网络开销,我们建议在以下情况下使用转换推送:
- 按顺序执行多个受支持的操作(阶段之间没有步骤)。
- 相对于 Spark,BigQuery 执行转换带来的性能提升超过了数据移入和可能移出 BigQuery 的延迟。
工作原理
运行使用转换推送的流水线时,Cloud Data Fusion 会在 BigQuery 中执行受支持的转换阶段。流水线中的所有其他阶段都在 Spark 中执行。
执行转换时:
Cloud Data Fusion 会将输入数据集加载到 BigQuery(通过将记录写入 Cloud Storage,然后执行 BigQuery 加载作业)。
然后,使用 SQL 语句将
JOIN操作和受支持的转换作为 BigQuery 作业执行。如果作业执行后需要进一步处理,可以将记录从 BigQuery 导出到 Spark。但是,如果启用了尝试直接复制到 BigQuery 接收器 选项,并且 BigQuery 接收器跟随在 BigQuery 中执行的阶段,则记录将直接写入目标 BigQuery 接收器表。
下图显示了转换下推如何在 BigQuery 中执行受支持的转换,而不是在 Spark 中执行。

最佳做法
调整集群和执行程序大小
如需优化流水线中的资源管理,请执行以下操作:
为工作负载使用正确数量的集群工作器(节点)。换句话说,通过充分利用实例的可用 CPU 和内存,充分利用预配的 Managed Service for Apache Spark 集群,同时受益于 BigQuery 执行大型作业的速度。
使用 自动扩缩集群来提高流水线的并行性。
在流水线执行期间从 BigQuery 推送或拉取记录的调整资源配置。
建议:尝试增加 执行程序资源的 CPU 核心数(不超过工作器节点使用的 CPU 核心数)。 执行程序会在数据进出 BigQuery 的序列化和反序列化步骤中优化 CPU 使用情况。如需了解详情, 请参阅 调整集群大小。
在 BigQuery 中执行转换的好处在于,您的流水线可以在较小的 Managed Service for Apache Spark 集群上运行。如果联接是流水线中资源最多的操作,则您可以尝试使用较小的集群大小,因为繁重的 JOIN 操作现在会在 BigQuery 中执行,从而降低总体计算费用。
使用 BigQuery Storage Read API 更快地检索数据
BigQuery 执行转换后,您的流水线可能需要在 Spark 中执行其他阶段。在 Cloud Data Fusion 6.7.0 版及更高版本中,转换下推支持 BigQuery Storage Read API,该 API 可缩短延迟时间,并加快将数据读取到 Spark 中的速度。它可以缩短流水线的总体执行时间。
该 API 会并行读取记录,因此我们建议您相应地调整执行程序大小。如果在 BigQuery 中执行资源密集型操作,请减少执行程序的内存分配,以 在流水线运行时提高并行性(请参阅 调整集群和执行程序大小)。
BigQuery Storage Read API 默认处于停用状态。您可以在安装了 Scala 2.12 的执行环境中启用它(包括 Managed Service for Apache Spark 2.0 和 Managed Service for Apache Spark 1.5)。
考虑数据集大小
考虑 JOIN 操作中数据集的大小。对于生成大量输出记录的 JOIN 操作(例如,类似于交叉 JOIN 操作的操作),生成的数据集大小可能大于输入数据集。此外,请考虑在整体流水线性能发生这些记录进行额外的 Spark 处理(例如转换或接收器)时,将这些记录拉取回 Spark 的开销。
缓解数据倾斜
对于严重倾斜的数据,JOIN 操作可能会导致
BigQuery 作业超出
资源利用率限制,
从而导致 JOIN 操作失败。为避免这种情况,请转到 Joiner 插件设置,并在倾斜的输入阶段 字段中标识倾斜的输入。这样,Cloud Data Fusion 就可以以减少 BigQuery 语句超出限制的风险的方式排列输入。

后续步骤
- 了解如何在 Cloud Data Fusion 中启用转换下推。