本页介绍了从 Pub/Sub 读取数据并写入 BigQuery 的 Dataflow 流式作业的性能特征。它提供了两种类型的流式传输流水线的基准测试结果:
仅限映射(按消息转换):执行按消息转换的流水线,不跟踪状态或对流中的元素进行分组。例如 ETL、字段验证和架构映射。
窗口化聚合 (
GroupByKey):执行有状态操作并根据键和时间窗口对数据进行分组的流水线。例如,统计事件数、计算总和以及收集用户会话的记录。
大多数流式数据集成工作负载都属于这两类。如果您的流水线遵循类似的模式,则可以使用这些基准来评估您的 Dataflow 作业与性能良好的参考配置之间的差异。
测试方法
基准测试使用了以下资源:
预先配置的 Pub/Sub 主题,具有稳定的输入负载。 消息是使用流式数据生成器模板生成的。
- 消息速率:大约每秒 100 万条消息
- 输入负载:1 GiB/s
- 消息格式:具有固定架构的随机生成的 JSON 文本
- 消息大小:每条消息大约 1 KiB
标准 BigQuery 表。
基于 Pub/Sub to BigQuery 模板的 Dataflow 流处理流水线。 这些流水线会执行最低限度的必需解析和架构映射。未使用任何自定义用户定义的函数 (UDF)。
在横向伸缩稳定且流水线达到稳定状态后,允许流水线运行大约一天,然后收集并分析结果。
Dataflow 流水线
我们测试了两种流水线变体:
仅限映射的流水线。此流水线可对 JSON 消息执行简单的映射和转换。在此测试中,Pub/Sub to BigQuery 模板未经修改即被使用。
窗口化聚合流水线。此流水线按固定大小窗口中的特定键对消息进行分组,并将汇总的记录写入 BigQuery。在此测试中,我们使用了基于 Pub/Sub to BigQuery 模板的自定义 Apache Beam 流水线。
聚合逻辑:对于每个固定的不重叠的 1 分钟窗口,系统会收集具有相同键的消息,并将其作为单个聚合记录写入 BigQuery。这种类型的聚合通常用于日志处理,以将相关事件(例如用户活动)合并到单个记录中,以便进行下游分析。
键并行度:基准测试使用了 1,000,000 个均匀分布的键。
语义:流水线已使用“正好一次”模式进行测试。聚合需要“正好一次”语义才能确保正确性,并防止在组和窗口内重复计数。
作业配置
下表显示了 Dataflow 作业的配置方式。
| 设置 | 仅限映射,正好一次 | 仅映射,至少一次 | 窗口化聚合,正好一次 |
|---|---|---|---|
| 工作器机器类型 | n1-standard-2 |
n1-standard-2 |
n1-standard-2 |
| 工作器机器 vCPU 数量 | 2 | 2 | 2 |
| 工作器机器 RAM | 7.5 GiB | 7.5 GiB | 7.5 GiB |
| 工作器机器 Persistent Disk | 标准永久性磁盘 (HDD),30 GB | 标准永久性磁盘 (HDD),30 GB | 标准永久性磁盘 (HDD),30 GB |
| 初始工作器 | 70 | 30 | 180 |
| 工作器数量上限 | 100 | 100 | 250 |
| Streaming Engine | 是 | 是 | 是 |
| 横向自动扩缩 | 是 | 是 | 是 |
| 结算模式 | 基于资源的结算方式 | 基于资源的结算方式 | 基于资源的结算方式 |
| Storage Write API 是否已启用? | 是 | 是 | 是 |
| Storage Write API 流 | 200 | 不适用 | 500 |
| Storage Write API 触发频率 | 5 秒 | 不适用 | 5 秒 |
建议为流处理流水线使用 BigQuery Storage Write API。将 Storage Write API 与“正好一次”模式搭配使用时,您可以调整以下设置:
写入流的数量。为确保写入阶段具有足够的键并行性,请将 Storage Write API 流的数量设置为大于工作器 CPU 数量的值,同时保持合理的 BigQuery 写入流吞吐量。
触发频次。一位数的秒值适合高吞吐量流水线。
如需了解详情,请参阅从 Dataflow 写入 BigQuery。
基准结果
本部分介绍了基准测试的结果。
吞吐量和资源用量
下表显示了流水线吞吐量和资源使用情况的测试结果。
| 结果 | 仅限映射,正好一次 | 仅映射,至少一次 | 窗口化聚合,正好一次 |
|---|---|---|---|
| 每个工作器的输入吞吐量 | 平均值:17 MBps,n=3 | 平均值:21 MBps,n=3 | 平均值:6 MBps,n=3 |
| 所有工作器的平均 CPU 利用率 | 平均值:65%,n=3 | 平均值:69%,n=3 | 平均值:80%,n=3 |
| 工作器节点数量 | 平均值:57,n=3 | 平均值:48,n=3 | 平均值:169,n=3 |
| 每小时 Streaming Engine 计算单元数 | 平均值:125,n=3 | 平均值:46,n=3 | 平均值:354,n=3 |
自动扩缩算法可能会影响目标 CPU 利用率水平。如需实现更高或更低的目标 CPU 利用率,您可以设置自动扩缩范围或工作器利用率提示。较高的利用率目标值可以降低费用,但也会导致尾部延迟时间变长,尤其是在负载变化的情况下。
对于窗口聚合流水线,聚合类型、窗口大小和键并行度会对资源使用情况产生很大影响。
延迟时间
下表显示了流水线延迟时间的基准测试结果。
| 总阶段端到端延迟时间 | 仅映射一次 | 仅映射,至少一次 | 窗口化聚合,正好一次 |
|---|---|---|---|
| P50 | 平均值:800 毫秒,n=3 | 平均值:160 毫秒,n=3 | 平均值:3,400 毫秒,n=3 |
| P95 | 平均值:2,000 毫秒,n=3 | 平均值:250 毫秒,n=3 | 平均值:13,000 毫秒,n=3 |
| P99 | 平均值:2,800 毫秒,n=3 | 平均值:410 毫秒,n=3 | 平均值:25,000 毫秒,n=3 |
该测试在三次长时间运行的测试执行中,测量了每个阶段的端到端延迟时间(即 job/streaming_engine/stage_end_to_end_latencies 指标)。此指标用于衡量 Streaming Engine 在每个流水线阶段花费的时间。它涵盖了流水线的所有内部步骤,例如:
- 对消息进行混排和排队以供处理
- 实际处理时间;例如,将消息转换为行对象
- 写入持久状态,以及排队等待写入持久状态所花费的时间
另一种延迟时间指标是数据新鲜度。不过,数据新鲜度会受到多种因素的影响,例如用户定义的窗口和来源中的上游延迟。系统延迟时间可为流水线在负载下的内部处理效率和健康状况提供更客观的基准。
每次运行的数据测量时间约为一天,并舍弃了初始启动阶段的数据,以反映稳定、稳态的性能。结果显示了两个会增加额外延迟时间的因素:
“正好一次”模式。为了实现“正好一次”语义,去重需要确定性混排和持久状态查找。“至少一次”模式会绕过这些步骤,因此速度明显更快。
窗口化聚合。消息必须在窗口关闭之前完全随机化、缓冲并写入到持久状态,从而增加端到端延迟时间。
此处显示的基准值仅供参考。延迟对流水线复杂程度非常敏感。自定义 UDF、其他转换和复杂的窗口化逻辑都会增加延迟时间。与状态繁重的操作(例如将元素收集到列表中)相比,简单的、高度缩减的聚合(例如求和和计数)往往会带来更低的延迟时间。
估算费用
您可以使用 Google Cloud Platform 价格计算器,按以下步骤估算您自己的类似流水线采用基于资源的结算时的基准费用:
- 打开价格计算器。
- 点击添加到估算。
- 选择 Dataflow。
- 在服务类型字段中,选择“Dataflow Classic”。
- 选择高级设置以显示完整的一组选项。
- 选择作业的运行位置。
- 对于作业类型,请选择“流式处理”。
- 选择启用 Streaming Engine。
- 输入作业运行小时数、工作器节点、工作器机器和 Persistent Disk 存储空间的相关信息。
- 输入 Streaming Engine 计算单元的估计数量。
资源使用量和费用大致随输入吞吐量线性扩展,不过对于只有少量工作器的小型作业,总费用主要由固定费用构成。首先,您可以根据基准测试结果推断工作器节点的数量和资源消耗量。
例如,假设您以“正好一次”模式运行仅包含 Map 的流水线,输入数据速率为 100 MiB/s。根据 1 GiB/s 流水线的基准测试结果,您可以按如下方式估算资源要求:
- 缩放比例:(100 MiB/s) / (1 GiB/s) = 0.1
- 预计工作器节点数:57 个工作器 × 0.1 = 5.7 个工作器
- 每小时的 Streaming Engine 计算单元预计数:125 × 0.1 = 每小时 12.5 个单元
此值仅应作为初始估算值使用。实际吞吐量和费用可能会因多种因素而有很大差异,例如机器类型、消息大小分布、用户代码、聚合类型、键并行性和窗口大小。如需了解详情,请参阅 Dataflow 费用优化的最佳实践。
运行测试流水线
此部分显示了用于运行仅包含 Map 的流水线的 gcloud dataflow flex-template run 命令。
“正好一次”模式
gcloud dataflow flex-template run JOB_ID \
--template-file-gcs-location gs://dataflow-templates-us-central1/latest/flex/PubSub_to_BigQuery_Flex \
--enable-streaming-engine \
--num-workers 70 \
--max-workers 100 \
--parameters \
inputSubscription=projects/PROJECT_IDsubscriptions/SUBSCRIPTION_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
useStorageWriteApi=true,\
numStorageWriteApiStreams=200 \
storageWriteApiTriggeringFrequencySec=5
“至少一次”模式
gcloud dataflow flex-template run JOB_ID \
--template-file-gcs-location gs://dataflow-templates-us-central1/latest/flex/PubSub_to_BigQuery_Flex \
--enable-streaming-engine \
--num-workers 30 \
--max-workers 100 \
--parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
useStorageWriteApi=true \
--additional-experiments streaming_mode_at_least_once
替换以下内容:
JOB_ID:Dataflow 作业 IDPROJECT_ID:项目 IDSUBSCRIPTION_NAME:Pub/Sub 订阅的名称DATASET:BigQuery 数据集的名称TABLE_NAME:BigQuery 表的名称
生成测试数据
如需生成测试数据,请使用以下命令运行流式数据生成器模板:
gcloud dataflow flex-template run JOB_ID \
--template-file-gcs-location gs://dataflow-templates-us-central1/latest/flex/Streaming_Data_Generator \
--num-workers 70 \
--max-workers 100 \
--parameters \
topic=projects/PROJECT_ID/topics/TOPIC_NAME,\
qps=1000000,\
maxNumWorkers=100,\
schemaLocation=SCHEMA_LOCATION
替换以下内容:
JOB_ID:Dataflow 作业 IDPROJECT_ID:项目 IDTOPIC_NAME:Pub/Sub 主题的名称SCHEMA_LOCATION:Cloud Storage 中架构文件的路径
流式数据生成器模板使用 JSON 数据生成器文件来定义消息架构。基准测试使用的消息架构类似于以下内容:
{ "logStreamId": "{{integer(1000001,2000000)}}", "message": "{{alphaNumeric(962)}}" }
后续步骤
- 使用 Dataflow 作业监控界面
- Dataflow 费用优化的最佳实践
- 排查流处理作业缓慢或卡住的问题
- 从 Pub/Sub 读取数据到 Dataflow
- 从 Dataflow 写入 BigQuery