并行处理

流水线可在机器集群中执行。它们通过拆分需要完成的工作,然后在集群中分布的多个执行程序上并行运行工作,从而实现高吞吐量。一般来说,拆分(也称为分区)的数量越大,流水线的运行速度就越快。流水线中的并行级别由流水线中的来源和重排阶段决定。

来源

在每次流水线运行开始时,流水线中的每个来源都会计算需要读取哪些数据,以及如何将这些数据划分为分块。例如,假设有一个基本流水线,该流水线从 Cloud Storage 读取数据,执行一些 Wrangler 转换,然后写回 Cloud Storage。

基本流水线,显示了 Cloud Storage 源、Wrangler 转换和 Cloud Storage 接收器

流水线启动后,Cloud Storage 来源会检查输入文件,并根据文件大小将其拆分为拆分。例如,一个 1 GB 的文件可以拆分为 100 个拆分,每个拆分的大小为 10 MB。每个执行程序都会读取该拆分的数据,运行 Wrangler 转换,然后将输出写入 part 文件。

将 Cloud Storage 中的分区数据并行转换为 Wrangler 转换,再转换为部分文件

如果流水线运行缓慢,首先要检查的是来源是否创建了足够的拆分,以便充分利用并行处理。 例如,某些类型的压缩会使纯文本文件无法拆分。如果您读取的是经过 gzip 压缩的文件,您可能会注意到,流水线的运行速度比读取未压缩的文件或使用 BZIP(可拆分)压缩的文件慢得多。同样,如果您使用的是数据库来源,并且已将其配置为仅使用一个拆分,则其运行速度比配置为使用更多拆分时慢得多。

重排

某些类型的插件会导致数据在集群中重排。当一个执行程序处理的记录需要发送给另一个执行程序以执行计算时,就会发生这种情况。重排是开销很大的操作,因为它们涉及大量 I/O。导致数据重排的插件都会显示在流水线工作室的分析 部分中。这些插件包括 Group By、Deduplicate、Distinct 和 Joiner 等。例如,假设在上述示例中,向流水线添加了 Group By 阶段。

还假设读取的数据表示在杂货店进行的购买。每条记录都包含一个 item 字段和一个 num_purchased 字段。在 Group By 阶段,我们将流水线配置为按 item 字段对记录进行分组,并计算 num_purchased 字段的总和。

当流水线运行时,输入文件会按前面所述的方式拆分。之后,每条记录都会在集群中重排,以便具有相同项的每条记录都属于同一执行程序。

如上例所示,苹果购买记录最初分布在多个执行程序中。为了执行聚合,所有这些记录都需要跨集群发送到同一执行程序。

大多数需要重排的插件都允许您指定在重排数据时要使用的分区数。这会控制用于处理重排数据的执行程序数量。

在上述示例中,如果分区数设置为 2,则每个执行程序都会计算两个项(而不是一个项)的聚合。

请注意,您可以在该阶段之后降低流水线的并行处理能力。例如,请看流水线的逻辑视图:

如果来源将数据划分为 500 个分区,但 Group By 使用 200 个分区进行重排,则 Group By 之后的并行处理能力上限将从 500 降至 200。您将只有 200 个不同的部分文件写入 Cloud Storage,而不是 500 个。

选择分区

如果分区数过低,您将无法充分利用集群的容量来并行处理尽可能多的工作。将分区数设置得过高会增加不必要的开销。一般来说,使用过多的分区比使用过少的分区要好。如果流水线需要几分钟才能运行完毕,并且您尝试缩短几分钟的运行时间,则需要担心额外的开销。如果流水线需要数小时才能运行完毕,则通常无需担心开销。

一种有用的但过于简单的方法是,将要使用的分区数设置为 max(cluster CPUs, input records / 500,000)。换句话说,取输入记录数并除以 500,000。如果该数字大于集群 CPU 的数量,则将其用作分区数。 否则,请使用集群 CPU 的数量。例如,如果您的集群有 100 个 CPU,并且重排阶段预计有 1 亿条输入记录,请使用 200 个分区。

更完整的答案是,当每个分区的中间重排数据可以完全容纳在执行程序的内存中,而无需溢出到磁盘时,重排的性能最佳。Spark 会为保存重排数据预留不到 30% 的执行程序内存。确切的数字是(总内存 - 300 MB)* 30%。如果我们假设每个执行程序都设置为使用 2 GB 内存,则表示每个分区应保存不超过(2 GB - 300 MB)* 30% = 大约 500 MB 的记录。如果我们假设每条记录压缩到 1 KB 大小,则表示(500 MB / 分区)/(1 KB/记录)= 每个分区 500,000 条记录。如果您的执行程序使用的内存更多,或者您的记录更小,您可以相应地调整此数字。

数据倾斜

请注意,在上述示例中,各种商品的购买量是均匀分布的。也就是说,苹果、香蕉、胡萝卜和鸡蛋的购买量均为 3。在均匀分布的键上进行重排是性能最高的重排类型,但许多数据集不具备此属性。继续上述示例中的杂货店购买,您会发现鸡蛋的购买量远高于结婚贺卡的购买量。当少数重排键比其他键常见得多时,您处理的是倾斜数据。倾斜数据的性能可能比非倾斜数据差得多,因为少量执行程序执行的工作量不成比例地大。这会导致一小部分分区比所有其他分区大得多。

在此示例中,鸡蛋的购买量是贺卡购买量的五倍,这意味着鸡蛋汇总的计算时间大约是贺卡汇总的五倍。当处理 10 条记录(而不是 2 条)时,这并不重要,但当处理 50 亿条记录(而不是 10 亿条)时,这会产生很大的影响。当您有数据倾斜时,重排中使用的分区数对流水线性能的影响不大。

您可以通过检查输出记录随时间变化的图表来识别数据倾斜。 如果阶段在流水线运行开始时以非常高的速度输出记录,然后突然减速,这可能意味着您有倾斜数据。

您还可以通过检查集群内存用量随时间变化的图表来识别数据倾斜。如果您的集群在一段时间内处于满负荷状态,但突然在一段时间内内存用量较低,这也是您处理数据倾斜的迹象。

当执行联接时,倾斜数据对性能的影响最为显著。有一些技巧可用于提高倾斜联接的性能。如需了解详情,请参阅 并行处理JOIN运算

执行的自适应调优

如需自适应调优执行,请指定要使用的分区范围,而不是确切的分区号。启用自适应执行后,系统会忽略确切的分区号(即使在流水线配置中设置了该分区号)。

如果您使用的是临时托管式 Apache Spark 集群,Cloud Data Fusion 会自动设置正确的配置,但对于静态托管式 Apache Spark 或 Hadoop 集群,可以设置以下两个配置参数:

  • spark.default.parallelism:将其设置为集群中可用的 vCore 总数。这可确保您的集群不会欠载,并定义分区数的下限。
  • spark.sql.adaptive.coalescePartitions.initialPartitionNum:将其设置为集群中可用 vCore 数量的 32 倍。这定义了分区数的上限。
  • Spark.sql.adaptive.enabled:如需启用优化,请将此值设置为 true。托管式 Apache Spark 会自动设置此值,但如果您使用的是通用 Hadoop 集群,则必须确保已启用此值。

可以在特定 流水线的 引擎配置中或静态托管式 Apache Spark 集群的 集群属性中设置这些参数。

后续步骤