本页面介绍了从 Apache Kafka 读取数据并写入 BigQuery 的 Dataflow 流式作业的性能特征。它提供了仅限映射流水线的基准测试结果,这些流水线执行按消息转换,而不跟踪状态或对流中的元素进行分组。
许多数据集成工作负载(包括 ETL、字段验证和架构映射)都属于仅限映射的类别。如果您的流水线遵循这种模式,则可以使用这些基准来评估您的 Dataflow 作业与性能良好的参考配置之间的差异。
测试方法
基准测试使用了以下资源:
Managed Service for Apache Kafka 集群。 消息是使用流式数据生成器模板生成的。
- 消息速率:大约每秒 100 万条消息
- 输入负载:1 GiB/秒
- 消息格式:具有固定架构的随机生成的 JSON 文本
- 消息大小:每条消息大约 1 KiB
- Kafka 分区数:1000
标准 BigQuery 表。
使用 Apache Kafka to BigQuery 模板的 Dataflow 流处理流水线。此流水线会执行最低限度的必需解析和架构映射。未使用任何自定义用户定义的函数 (UDF)。
在横向扩缩稳定且流水线达到稳定状态后,允许流水线运行大约一天,然后收集并分析结果。
Dataflow 流水线
此基准测试使用仅限映射的流水线,可对 JSON 消息执行简单的映射和转换。该流水线已使用“正好一次”模式和“至少一次”模式进行测试。至少一次处理可提供更高的吞吐量。不过,只有在可以接受重复记录或下游接收器可以处理去重时,才应使用此模式。
作业配置
下表显示了 Dataflow 作业的配置方式。
| 设置 | 值 |
|---|---|
| 工作器机器类型 | e2-standard-2 |
| 工作器机器 vCPU 数量 | 2 |
| 工作器机器 RAM | 8 GB |
| 工作器机器永久性磁盘 | 标准永久性磁盘 (HDD),30 GB |
| 工作器数量上限 | 120 |
| Streaming Engine | 是 |
| 横向自动扩缩 | 是 |
| 结算模式 | 基于资源的结算 |
| Storage Write API 是否已启用? | 是 |
| Storage Write API 数据流 | 400 |
| Storage Write API 触发频率 | 5 秒 |
| 消息格式 | JSON |
| Kafka 身份验证模式 |
应用默认凭据 (ADC)。 如需了解详情,请参阅 Kafka 代理的身份验证类型。 |
建议为流处理流水线使用 BigQuery Storage Write API。将“正好一次”模式与 Storage Write API 搭配使用时,您可以调整以下设置:
写入数据流的数量。为确保写入阶段具有足够的键并行度,请将 Storage Write API 数据流的数量设置为大于工作器 CPU 数量的值,同时遵循每个数据流的吞吐量建议。
触发频率。一位数的秒值适合高吞吐量流水线。
如需了解详情,请参阅从 Dataflow 写入 BigQuery。
还应特别考虑 Apache Kafka 分区的数量。为确保读取阶段具有足够的键并行度,分区数应至少等于工作器 vCPU 的总数。如需了解详情,请参阅从 Apache Kafka 读取到 Dataflow。
基准结果
本部分介绍了基准测试的结果。
吞吐量和资源用量
下表显示了流水线吞吐量和资源用量的测试结果。
| 结果 | 正好一次 | 至少一次 |
|---|---|---|
| 每个工作器的输入吞吐量 | 平均值:15 MBps,n=3 | 平均值:18 MBps,n=3 |
| 所有工作器的平均 CPU 利用率 | 平均值:70%,n=3 | 平均值:75%,n=3 |
| 工作器节点数量 | 平均值:63,n=3 | 平均值:53,n=3 |
| 每小时 Streaming Engine 计算单元数 | 平均值:58,n=3 | 平均值:0,n=3 |
自动扩缩算法可能会影响目标 CPU 利用率水平。如需实现更高或更低的目标 CPU 利用率,您可以设置自动扩缩范围或工作器利用率提示。较高的利用率目标值可以降低费用,但也会导致尾部延迟时间变长,尤其是在负载变化的情况下。
延迟时间
下表显示了在“恰好一次”模式下(不包括输入阶段)流水线延迟的基准测试结果。
| 总阶段端到端延迟时间(不包括输入阶段) | 正好一次 |
|---|---|
| P50 | 平均值:1,200 毫秒,n=3 |
| P95 | 平均值:3,000 毫秒,n=3 |
| P99 | 平均值:5,400 毫秒,n=3 |
该测试在三次长时间运行的测试执行中,测量了每个阶段的端到端延迟时间(即 job/streaming_engine/stage_end_to_end_latencies 指标)。此指标用于衡量 Streaming Engine 在每个流水线阶段花费的时间。它涵盖了流水线的所有内部步骤,例如:
- 对消息进行 shuffle 操作和排队以供处理
- 实际处理时间;例如,将消息转换为行对象
- 写入持久状态,以及排队等待写入持久状态所花费的时间
由于该指标存在限制,因此不会报告输入阶段延迟时间。因此,它不会计入总数。
此处显示的基准代表了基线。延迟对流水线复杂性非常敏感。自定义 UDF、其他转换和复杂的窗口化逻辑都会增加延迟时间。
估算费用
您可以使用 Google Cloud Platform 价格计算器,估算您自己的、类似流水线的基于资源的结算的基准成本,如下所示:
- 打开价格计算器。
- 点击添加到估算。
- 选择 Dataflow。
- 对于服务类型,选择“Dataflow Classic”。
- 选择高级设置以显示完整选项集。
- 选择作业的运行位置。
- 对于作业类型,请选择“流式处理”。
- 选择启用 Streaming Engine。
- 输入作业运行小时数、工作器节点、工作器机器和永久性磁盘存储空间的相关信息。
- 输入 Streaming Engine 计算单元的估算数量。
资源用量和费用大致随输入吞吐量线性扩展,不过对于只有少量工作器的小型作业,总费用主要由固定费用构成。首先,您可以根据基准测试结果推断工作器节点的数量和资源消耗量。
例如,假设您以“正好一次”模式运行仅限映射的流水线,输入数据速率为 100 MiB/秒。根据 1 GiB/秒流水线的基准测试结果,您可以按如下方式估算资源要求:
- 缩放比例:(100 MiB/s) / (1 GiB/s) = 0.1
- 预计工作器节点数:63 个工作器 × 0.1 = 6.3 个工作器
- 每小时的 Streaming Engine 计算单元预计数:58 × 0.1 = 每小时 5.8 个单元
此值仅应作为初始估算值使用。实际吞吐量和费用可能会因多种因素而有很大差异,例如机器类型、消息大小分布、用户代码、聚合类型、键并行度和窗口大小。如需了解详情,请参阅 Dataflow 费用优化的最佳实践。
运行测试流水线
此部分显示了用于运行仅限映射的流水线的 gcloud dataflow flex-template run 命令。
“正好一次”模式
gcloud dataflow flex-template run JOB_NAME \
--project=PROJECT_ID \
--template-file-gcs-location=gs://dataflow-templates-us-central1/latest/flex/Kafka_to_BigQuery_Flex \
--enable-streaming-engine \
--parameters \
readBootstrapServerAndTopic="KAFKA_BOOTSTRAP_ADDRESS;KAFKA_TOPIC",\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\
messageFormat=JSON,\
writeMode=SINGLE_TABLE_NAME,\
outputTableSpec="PROJECT_ID:BQ_DATASET.BQ_TABLE_NAME",\
useBigQueryDLQ=true,\
outputDeadletterTable="PROJECT_ID:BQ_DATASET.BQ_TABLE_NAME_dlq",\
numStorageWriteApiStreams=400
“至少一次”模式
gcloud dataflow flex-template run JOB_NAME \
--project=PROJECT_ID \
--template-file-gcs-location=gs://dataflow-templates-us-central1/latest/flex/Kafka_to_BigQuery_Flex \
--enable-streaming-engine \
--additional-experiments=streaming_mode_at_least_once \
--parameters \
readBootstrapServerAndTopic="KAFKA_BOOTSTRAP_ADDRESS;KAFKA_TOPIC",\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\
messageFormat=JSON,\
writeMode=SINGLE_TABLE_NAME,\
outputTableSpec="PROJECT_ID:BQ_DATASET.BQ_TABLE_NAME",\
useBigQueryDLQ=true,\
outputDeadletterTable="PROJECT_ID:BQ_DATASET.BQ_TABLE_NAME_dlq",\
numStorageWriteApiStreams=400,\
useStorageWriteApiAtLeastOnce=true
替换以下内容:
JOB_NAME:Dataflow 作业名称PROJECT_ID:项目 IDKAFKA_BOOTSTRAP_ADDRESS:Apache Kafka 集群的引导地址KAFKA_TOPIC:Kafka 主题的名称BQ_DATASET:BigQuery 数据集的名称BQ_TABLE_NAME:BigQuery 表的名称
生成测试数据
如需生成测试数据,请使用以下命令运行流式数据生成器模板:
gcloud dataflow flex-template run JOB_NAME \
--project=PROJECT_ID \
--template-file-gcs-location=gs://dataflow-templates-us-central1/latest/flex/Streaming_Data_Generator \
--max-workers=140 \
--parameters \
schemaLocation=SCHEMA_LOCATION,\
qps=1000000,\
sinkType=KAFKA,\
bootstrapServer=KAFKA_BOOTSTRAP_ADDRESS,\
kafkaTopic=KAFKA_TOPIC,\
outputType=JSON
替换以下内容:
JOB_NAME:Dataflow 作业名称PROJECT_ID:项目 IDSCHEMA_LOCATION:Cloud Storage 中架构文件的路径KAFKA_BOOTSTRAP_ADDRESS:Apache Kafka 集群的引导地址KAFKA_TOPIC:Kafka 主题的名称
流式数据生成器模板使用 JSON 数据生成器文件来定义消息架构。基准测试使用的消息架构类似于以下内容:
{ "logStreamId": "{{integer(1000001,2000000)}}", "message": "{{alphaNumeric(962)}}" }
后续步骤
- 使用 Dataflow 作业监控界面
- Dataflow 费用优化的最佳实践
- 排查流处理作业缓慢或卡住的问题
- 从 Apache Kafka 读取到 Dataflow
- 从 Dataflow 写入 BigQuery