Kafka 到 BigQuery 流水线的性能特征

本页面介绍了从 Apache Kafka 读取数据并写入 BigQuery 的 Dataflow 流式作业的性能特征。它提供了仅限映射流水线的基准测试结果,这些流水线执行按消息转换,而不跟踪状态或对流中的元素进行分组。

许多数据集成工作负载(包括 ETL、字段验证和架构映射)都属于仅限映射的类别。如果您的流水线遵循这种模式,则可以使用这些基准来评估您的 Dataflow 作业与性能良好的参考配置之间的差异。

测试方法

基准测试使用了以下资源:

  • Managed Service for Apache Kafka 集群。 消息是使用流式数据生成器模板生成的。

    • 消息速率:大约每秒 100 万条消息
    • 输入负载:1 GiB/秒
    • 消息格式:具有固定架构的随机生成的 JSON 文本
    • 消息大小:每条消息大约 1 KiB
    • Kafka 分区数:1000
  • 标准 BigQuery 表。

  • 使用 Apache Kafka to BigQuery 模板的 Dataflow 流处理流水线。此流水线会执行最低限度的必需解析和架构映射。未使用任何自定义用户定义的函数 (UDF)。

在横向扩缩稳定且流水线达到稳定状态后,允许流水线运行大约一天,然后收集并分析结果。

Dataflow 流水线

此基准测试使用仅限映射的流水线,可对 JSON 消息执行简单的映射和转换。该流水线已使用“正好一次”模式“至少一次”模式进行测试。至少一次处理可提供更高的吞吐量。不过,只有在可以接受重复记录或下游接收器可以处理去重时,才应使用此模式。

作业配置

下表显示了 Dataflow 作业的配置方式。

设置
工作器机器类型 e2-standard-2
工作器机器 vCPU 数量 2
工作器机器 RAM 8 GB
工作器机器永久性磁盘 标准永久性磁盘 (HDD),30 GB
工作器数量上限 120
Streaming Engine
横向自动扩缩
结算模式 基于资源的结算
Storage Write API 是否已启用?
Storage Write API 数据流 400
Storage Write API 触发频率 5 秒
消息格式 JSON
Kafka 身份验证模式

应用默认凭据 (ADC)。

如需了解详情,请参阅 Kafka 代理的身份验证类型

建议为流处理流水线使用 BigQuery Storage Write API。将“正好一次”模式与 Storage Write API 搭配使用时,您可以调整以下设置:

  • 写入数据流的数量。为确保写入阶段具有足够的键并行度,请将 Storage Write API 数据流的数量设置为大于工作器 CPU 数量的值,同时遵循每个数据流的吞吐量建议

  • 触发频率。一位数的秒值适合高吞吐量流水线。

如需了解详情,请参阅从 Dataflow 写入 BigQuery

还应特别考虑 Apache Kafka 分区的数量。为确保读取阶段具有足够的键并行度,分区数应至少等于工作器 vCPU 的总数。如需了解详情,请参阅从 Apache Kafka 读取到 Dataflow

基准结果

本部分介绍了基准测试的结果。

吞吐量和资源用量

下表显示了流水线吞吐量和资源用量的测试结果。

结果 正好一次 至少一次
每个工作器的输入吞吐量 平均值:15 MBps,n=3 平均值:18 MBps,n=3
所有工作器的平均 CPU 利用率 平均值:70%,n=3 平均值:75%,n=3
工作器节点数量 平均值:63,n=3 平均值:53,n=3
每小时 Streaming Engine 计算单元数 平均值:58,n=3 平均值:0,n=3

自动扩缩算法可能会影响目标 CPU 利用率水平。如需实现更高或更低的目标 CPU 利用率,您可以设置自动扩缩范围工作器利用率提示。较高的利用率目标值可以降低费用,但也会导致尾部延迟时间变长,尤其是在负载变化的情况下。

延迟时间

下表显示了在“恰好一次”模式下(不包括输入阶段)流水线延迟的基准测试结果。

总阶段端到端延迟时间(不包括输入阶段) 正好一次
P50 平均值:1,200 毫秒,n=3
P95 平均值:3,000 毫秒,n=3
P99 平均值:5,400 毫秒,n=3

该测试在三次长时间运行的测试执行中,测量了每个阶段的端到端延迟时间(即 job/streaming_engine/stage_end_to_end_latencies 指标)。此指标用于衡量 Streaming Engine 在每个流水线阶段花费的时间。它涵盖了流水线的所有内部步骤,例如:

  • 对消息进行 shuffle 操作和排队以供处理
  • 实际处理时间;例如,将消息转换为行对象
  • 写入持久状态,以及排队等待写入持久状态所花费的时间

由于该指标存在限制,因此不会报告输入阶段延迟时间。因此,它不会计入总数。

此处显示的基准代表了基线。延迟对流水线复杂性非常敏感。自定义 UDF、其他转换和复杂的窗口化逻辑都会增加延迟时间。

估算费用

您可以使用 Google Cloud Platform 价格计算器,估算您自己的、类似流水线的基于资源的结算的基准成本,如下所示:

  1. 打开价格计算器
  2. 点击添加到估算
  3. 选择 Dataflow。
  4. 对于服务类型,选择“Dataflow Classic”。
  5. 选择高级设置以显示完整选项集。
  6. 选择作业的运行位置。
  7. 对于作业类型,请选择“流式处理”。
  8. 选择启用 Streaming Engine
  9. 输入作业运行小时数、工作器节点、工作器机器和永久性磁盘存储空间的相关信息。
  10. 输入 Streaming Engine 计算单元的估算数量。

资源用量和费用大致随输入吞吐量线性扩展,不过对于只有少量工作器的小型作业,总费用主要由固定费用构成。首先,您可以根据基准测试结果推断工作器节点的数量和资源消耗量。

例如,假设您以“正好一次”模式运行仅限映射的流水线,输入数据速率为 100 MiB/秒。根据 1 GiB/秒流水线的基准测试结果,您可以按如下方式估算资源要求:

  • 缩放比例:(100 MiB/s) / (1 GiB/s) = 0.1
  • 预计工作器节点数:63 个工作器 × 0.1 = 6.3 个工作器
  • 每小时的 Streaming Engine 计算单元预计数:58 × 0.1 = 每小时 5.8 个单元

此值仅应作为初始估算值使用。实际吞吐量和费用可能会因多种因素而有很大差异,例如机器类型、消息大小分布、用户代码、聚合类型、键并行度和窗口大小。如需了解详情,请参阅 Dataflow 费用优化的最佳实践

运行测试流水线

此部分显示了用于运行仅限映射的流水线的 gcloud dataflow flex-template run 命令。

“正好一次”模式

gcloud dataflow flex-template run JOB_NAME \
  --project=PROJECT_ID \
  --template-file-gcs-location=gs://dataflow-templates-us-central1/latest/flex/Kafka_to_BigQuery_Flex \
  --enable-streaming-engine \
  --parameters \
readBootstrapServerAndTopic="KAFKA_BOOTSTRAP_ADDRESS;KAFKA_TOPIC",\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\
messageFormat=JSON,\
writeMode=SINGLE_TABLE_NAME,\
outputTableSpec="PROJECT_ID:BQ_DATASET.BQ_TABLE_NAME",\
useBigQueryDLQ=true,\
outputDeadletterTable="PROJECT_ID:BQ_DATASET.BQ_TABLE_NAME_dlq",\
numStorageWriteApiStreams=400

“至少一次”模式

gcloud dataflow flex-template run JOB_NAME \
  --project=PROJECT_ID \
  --template-file-gcs-location=gs://dataflow-templates-us-central1/latest/flex/Kafka_to_BigQuery_Flex \
  --enable-streaming-engine \
  --additional-experiments=streaming_mode_at_least_once \
  --parameters \
readBootstrapServerAndTopic="KAFKA_BOOTSTRAP_ADDRESS;KAFKA_TOPIC",\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\
messageFormat=JSON,\
writeMode=SINGLE_TABLE_NAME,\
outputTableSpec="PROJECT_ID:BQ_DATASET.BQ_TABLE_NAME",\
useBigQueryDLQ=true,\
outputDeadletterTable="PROJECT_ID:BQ_DATASET.BQ_TABLE_NAME_dlq",\
numStorageWriteApiStreams=400,\
useStorageWriteApiAtLeastOnce=true

替换以下内容:

  • JOB_NAME:Dataflow 作业名称
  • PROJECT_ID:项目 ID
  • KAFKA_BOOTSTRAP_ADDRESS:Apache Kafka 集群的引导地址
  • KAFKA_TOPIC:Kafka 主题的名称
  • BQ_DATASET:BigQuery 数据集的名称
  • BQ_TABLE_NAME:BigQuery 表的名称

生成测试数据

如需生成测试数据,请使用以下命令运行流式数据生成器模板

gcloud dataflow flex-template run JOB_NAME \
  --project=PROJECT_ID \
  --template-file-gcs-location=gs://dataflow-templates-us-central1/latest/flex/Streaming_Data_Generator \
  --max-workers=140 \
  --parameters \
schemaLocation=SCHEMA_LOCATION,\
qps=1000000,\
sinkType=KAFKA,\
bootstrapServer=KAFKA_BOOTSTRAP_ADDRESS,\
kafkaTopic=KAFKA_TOPIC,\
outputType=JSON

替换以下内容:

  • JOB_NAME:Dataflow 作业名称
  • PROJECT_ID:项目 ID
  • SCHEMA_LOCATION:Cloud Storage 中架构文件的路径
  • KAFKA_BOOTSTRAP_ADDRESS:Apache Kafka 集群的引导地址
  • KAFKA_TOPIC:Kafka 主题的名称

流式数据生成器模板使用 JSON 数据生成器文件来定义消息架构。基准测试使用的消息架构类似于以下内容:

{
  "logStreamId": "{{integer(1000001,2000000)}}",
  "message": "{{alphaNumeric(962)}}"
}

后续步骤