Kafka から BigQuery へのパイプラインのパフォーマンス特性

このページでは、Apache Kafka から読み取って BigQuery に書き込む Dataflow ストリーミング ジョブのパフォーマンス特性について説明します。状態の追跡やストリーム間の要素のグループ化を行わずにメッセージごとの変換を行うマップのみのパイプラインのベンチマーク テスト結果が得られます。

ETL、フィールド検証、スキーマ マッピングなど、多くのデータ統合ワークロードはマップのみのカテゴリに分類されます。パイプラインがこのパターンに従っている場合は、これらのベンチマークを使用して、パフォーマンスの高いリファレンス構成に対して Dataflow ジョブを評価できます。

テスト方法

ベンチマークは次のリソースを使用して実施されました。

  • Managed Service for Apache Kafka クラスタ。メッセージは、ストリーミング データ ジェネレータ テンプレートを使用して生成されました。

    • メッセージ レート: 1 秒あたり約 1,000,000 件のメッセージ
    • 入力負荷: 1 GiB/秒
    • メッセージ形式: 固定スキーマのランダムに生成された JSON テキスト
    • メッセージ サイズ: メッセージ 1 件あたり約 1 KiB
    • Kafka パーティション: 1000
  • 標準の BigQuery テーブル。

  • Apache Kafka to BigQuery テンプレートを使用した Dataflow ストリーミング パイプライン。このパイプラインは、最小限必要な解析とスキーマ マッピングを実行します。カスタム ユーザー定義関数(UDF)は使用されていません。

水平スケーリングが安定し、パイプラインが定常状態に達した後、パイプラインは約 1 日間実行され、その後、結果が収集されて分析されました。

Dataflow パイプライン

このベンチマークでは、JSON メッセージの単純なマッピングと変換を行うマップ専用パイプラインを使用します。パイプラインは、1 回限りモード1 回以上モードの両方を使用してテストされました。1 回以上の処理では、スループットが向上します。ただし、重複レコードが許容される場合や、ダウンストリーム シンクで重複除去が処理される場合にのみ使用する必要があります。

ジョブ構成

次の表は、Dataflow ジョブの構成を示しています。

設定
ワーカーのマシンタイプ e2-standard-2
ワーカー マシンの vCPU 2
ワーカー マシンの RAM 8 GB
ワーカー マシンの Persistent Disk 標準 Persistent Disk(HDD)、30 GB
最大ワーカー数 120
Streaming Engine
水平自動スケーリング
課金モデル リソースベースの課金
Storage Write API が有効になっているか?
Storage Write API ストリーム 400
Storage Write API のトリガー頻度 5 秒
メッセージの形式 JSON
Kafka 認証モード

アプリケーションのデフォルト認証情報(ADC)。

詳細については、 Kafka ブローカーの認証タイプをご覧ください。

ストリーミング パイプラインには BigQuery Storage Write API をおすすめします。Storage Write API で 1 回限りモードを使用する場合は、次の設定を調整できます。

  • 書き込みストリームの数。書き込みステージで十分なキーの並列処理を確保するには、ストリームあたりのスループットの推奨事項に従いながら、Storage Write API ストリームの数をワーカー CPU の数より大きい値に設定します。

  • トリガーの頻度。1 桁の秒値は、高スループット パイプラインに適しています。

詳細については、Dataflow から BigQuery に書き込むをご覧ください。

Apache Kafka パーティションの数も考慮する必要があります。読み取りステージで十分なキーの並列処理を確保するには、パーティションの数がワーカー vCPU の合計数以上である必要があります。詳細については、Apache Kafka から Dataflow に読み込むをご覧ください。

ベンチマークの結果

このセクションでは、ベンチマーク テストの結果について説明します。

スループットとリソース使用量

次の表に、パイプラインのスループットとリソース使用率のテスト結果を示します。

結果 Exactly-once At-least-once
ワーカーあたりの入力スループット 平均: 15 MBps、n=3 平均: 18 MBps、n=3
すべてのワーカーの平均 CPU 使用率 平均: 70%、n=3 平均: 75%、n=3
ワーカーノードの数 平均: 63、n=3 平均: 53、n=3
1 時間あたりの Streaming Engine コンピューティング単位数 平均: 58、n=3 平均: 0、n=3

自動スケーリング アルゴリズムは、ターゲット CPU 使用率レベルに影響する可能性があります。目標 CPU 使用率を高くしたり低くしたりするには、自動スケーリング範囲またはワーカー使用率のヒントを設定します。使用率の目標値を高くすると、費用は削減できますが、特に負荷が変動する場合に、テールレイテンシが悪化する可能性があります。

レイテンシ

次の表に、入力ステージを除いた 1 回限りのモードのパイプライン レイテンシのベンチマーク結果を示します。

入力ステージを除くステージのエンドツーエンドの合計レイテンシ Exactly-once
P50 平均: 1,200 ミリ秒、n=3
P95 平均: 3,000 ミリ秒、n=3
P99 平均: 5,400 ミリ秒、n=3

テストでは、3 つの長時間実行テストでステージごとのエンドツーエンド レイテンシ(job/streaming_engine/stage_end_to_end_latencies 指標)を測定しました。この指標は、Streaming Engine が各パイプライン ステージで費やした時間を測定します。これには、次のようなパイプラインのすべての内部ステップが含まれます。

  • 処理用のメッセージのシャッフルとキューイング
  • 実際の処理時間(メッセージを行オブジェクトに変換するなど)
  • 永続状態の書き込みと、永続状態の書き込みをキューに登録するのに費やされた時間

指標の制限により、入力ステージのレイテンシは報告されません。そのため、合計には含まれません。

ここに示されているベンチマークはベースラインを表しています。レイテンシはパイプラインの複雑さに大きく影響されます。カスタム UDF、追加の変換、複雑なウィンドウ処理ロジックはすべてレイテンシを増加させる可能性があります。

費用を見積もる

Google Cloud 料金計算ツールを使用して、次のように、リソースベースの課金で同等の独自のパイプラインのベースライン費用を見積もることができます。

  1. 料金計算ツールを開きます。
  2. [Add To Estimate] をクリックします。
  3. [Dataflow] を選択します。
  4. [サービスタイプ] で [Dataflow Classic] を選択します。
  5. [詳細設定] を選択して、すべてのオプションを表示します。
  6. ジョブが実行されるロケーションを選択します。
  7. [ジョブタイプ] で [ストリーミング] を選択します。
  8. [Streaming Engine を有効にする] を選択します。
  9. ジョブの実行時間、ワーカーノード、ワーカーマシン、Persistent Disk ストレージの情報を入力します。
  10. Streaming Engine コンピューティング単位数の推定値を入力します。

リソース使用量とコストは、入力スループットにほぼ比例してスケーリングされます。ただし、ワーカー数が少ない小規模なジョブでは、総コストは固定費が大部分を占めます。出発点として、ベンチマーク結果からワーカーノードの数とリソース消費量を外挿できます。

たとえば、入力データレートが 100 MiB/秒の map-only パイプラインを exactly-once モードで実行するとします。1 GiB/秒のパイプラインのベンチマーク結果に基づいて、次のようにリソース要件を見積もることができます。

  • スケーリング係数: (100 MiB/秒) / (1 GiB/秒) = 0.1
  • ワーカーノードの予測数: 63 ワーカー × 0.1 = 6.3 ワーカー
  • 1 時間あたりの Streaming Engine コンピューティング単位数の予測値: 58 × 0.1 = 1 時間あたり 5.8 単位

この値は初期見積もりとしてのみ使用してください。実際のスループットと費用は、マシンタイプ、メッセージ サイズの分布、ユーザーコード、集計タイプ、キーの並列処理、ウィンドウ サイズなどの要因によって大きく異なる可能性があります。詳細については、Dataflow の費用の最適化のベスト プラクティスをご覧ください。

テスト パイプラインを実行する

このセクションでは、マップ専用パイプラインの実行に使用された gcloud dataflow flex-template run コマンドを示します。

1 回限りのモード

gcloud dataflow flex-template run JOB_NAME \
  --project=PROJECT_ID \
  --template-file-gcs-location=gs://dataflow-templates-us-central1/latest/flex/Kafka_to_BigQuery_Flex \
  --enable-streaming-engine \
  --parameters \
readBootstrapServerAndTopic="KAFKA_BOOTSTRAP_ADDRESS;KAFKA_TOPIC",\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\
messageFormat=JSON,\
writeMode=SINGLE_TABLE_NAME,\
outputTableSpec="PROJECT_ID:BQ_DATASET.BQ_TABLE_NAME",\
useBigQueryDLQ=true,\
outputDeadletterTable="PROJECT_ID:BQ_DATASET.BQ_TABLE_NAME_dlq",\
numStorageWriteApiStreams=400

1 回以上モード

gcloud dataflow flex-template run JOB_NAME \
  --project=PROJECT_ID \
  --template-file-gcs-location=gs://dataflow-templates-us-central1/latest/flex/Kafka_to_BigQuery_Flex \
  --enable-streaming-engine \
  --additional-experiments=streaming_mode_at_least_once \
  --parameters \
readBootstrapServerAndTopic="KAFKA_BOOTSTRAP_ADDRESS;KAFKA_TOPIC",\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\
messageFormat=JSON,\
writeMode=SINGLE_TABLE_NAME,\
outputTableSpec="PROJECT_ID:BQ_DATASET.BQ_TABLE_NAME",\
useBigQueryDLQ=true,\
outputDeadletterTable="PROJECT_ID:BQ_DATASET.BQ_TABLE_NAME_dlq",\
numStorageWriteApiStreams=400,\
useStorageWriteApiAtLeastOnce=true

次のように置き換えます。

  • JOB_NAME: Dataflow ジョブ名
  • PROJECT_ID: プロジェクト ID
  • KAFKA_BOOTSTRAP_ADDRESS: Apache Kafka クラスタのブートストラップ アドレス
  • KAFKA_TOPIC: Kafka トピックの名前
  • BQ_DATASET: BigQuery データセットの名前
  • BQ_TABLE_NAME: BigQuery テーブルの名前

テストデータを生成する

テストデータを生成するには、次のコマンドを使用して Streaming Data Generator テンプレートを実行します。

gcloud dataflow flex-template run JOB_NAME \
  --project=PROJECT_ID \
  --template-file-gcs-location=gs://dataflow-templates-us-central1/latest/flex/Streaming_Data_Generator \
  --max-workers=140 \
  --parameters \
schemaLocation=SCHEMA_LOCATION,\
qps=1000000,\
sinkType=KAFKA,\
bootstrapServer=KAFKA_BOOTSTRAP_ADDRESS,\
kafkaTopic=KAFKA_TOPIC,\
outputType=JSON

次のように置き換えます。

  • JOB_NAME: Dataflow ジョブ名
  • PROJECT_ID: プロジェクト ID
  • SCHEMA_LOCATION: Cloud Storage 内のスキーマ ファイルのパス
  • KAFKA_BOOTSTRAP_ADDRESS: Apache Kafka クラスタのブートストラップ アドレス
  • KAFKA_TOPIC: Kafka トピックの名前

ストリーミング データ生成ツール テンプレートは、JSON データ生成ツール ファイルを使用してメッセージ スキーマを定義します。ベンチマーク テストでは、次のようなメッセージ スキーマが使用されました。

{
  "logStreamId": "{{integer(1000001,2000000)}}",
  "message": "{{alphaNumeric(962)}}"
}

次のステップ