Kafka から BigQuery へのパイプラインのパフォーマンス特性

このページでは、Apache Kafka から読み取り、BigQuery に書き込む Dataflow ストリーミング ジョブのパフォーマンス特性について説明します。ストリーム全体で状態を追跡したり、要素をグループ化したりせずに、メッセージごとの変換を行うマップのみのパイプラインのベンチマーク テストの結果を示します。

ETL、フィールド検証、スキーマ マッピングなど、多くのデータ統合ワークロードはマップのみのカテゴリに分類されます。パイプラインがこのパターンに従っている場合は、これらのベンチマークを使用して、パフォーマンスの高い参照構成に対して Dataflow ジョブを評価できます。

テスト方法

ベンチマークは次のリソースを使用して実施されました。

  • Managed Service for Apache Kafka クラスタ。 メッセージは、 ストリーミング データ ジェネレータ テンプレートを使用して生成されました。

    • メッセージ レート: 1 秒あたり約 1,000,000 メッセージ
    • 入力負荷: 1 GiB/秒
    • メッセージ形式: 固定スキーマのランダムに生成された JSON テキスト
    • メッセージ サイズ: 1 メッセージあたり約 1 KiB
    • Kafka パーティション: 1,000
  • 標準の standard BigQuery テーブル。

  • Apache Kafka to BigQuery テンプレートを使用した Dataflow ストリーミング パイプライン。 このパイプラインは、最小限の解析とスキーマ マッピングを行います。カスタム ユーザー定義関数(UDF)は使用されませんでした。

水平スケーリングが安定し、パイプラインが定常状態に達した後、パイプラインを約 1 日間実行し、その結果を収集して分析しました。

Dataflow パイプライン

このベンチマークでは、JSON メッセージの簡単なマッピングと変換を行うマップのみのパイプラインを使用します。パイプラインは、両方を使用してテストされました 1 回限りモード1 回以上モード。1 回以上の処理では、スループットが向上します。ただし、重複レコードが許容される場合、またはダウンストリーム シンクが重複除去を処理する場合にのみ使用してください。

ジョブ構成

次の表に、Dataflow ジョブの構成方法を示します。

設定
ワーカーのマシンタイプ e2-standard-2
ワーカー マシンの vCPU 2
ワーカー マシンの RAM 8 GB
ワーカー マシンの Persistent Disk 標準永続ディスク(HDD)、30 GB
最大ワーカー数 120
Streaming Engine はい
水平自動スケーリング はい
課金モデル リソースベースの課金
Storage Write API は有効ですか? はい
Storage Write API ストリーム 400
Storage Write API のトリガー頻度 5 秒
メッセージの形式 JSON
Kafka 認証モード

アプリケーションのデフォルト認証情報(ADC)。

詳細については、 Kafka ブローカーの認証タイプをご覧ください。

ストリーミング パイプラインには、BigQuery Storage Write API を使用することをおすすめします。Storage Write API で 1 回限りモードを使用する場合は、次の設定を調整できます。

詳細については、 Dataflow から BigQuery に書き込むをご覧ください。

Apache Kafka パーティションの数も考慮する必要があります。読み取りステージで十分なキー並列処理を確保するには、パーティションの数をワーカー vCPU の合計数以上にする必要があります。詳細については、 Apache Kafka から Dataflow に読み込むをご覧ください。

ベンチマークの結果

このセクションでは、ベンチマーク テストの結果について説明します。

スループットとリソース使用量

次の表に、パイプラインのスループットとリソース使用量のテスト結果を示します。

結果 Exactly-once At-least-once
ワーカーあたりの入力スループット 平均: 15 MBps、n=3 平均: 18 MBps、n=3
すべてのワーカーの平均 CPU 使用率 平均: 70%、n=3 平均: 75%、n=3
ワーカーノードの数 平均: 63、n=3 平均: 53、n=3
Streaming Engine コンピューティング単位数(1 時間あたり) 平均: 58、n=3 平均: 0、n=3

自動スケーリング アルゴリズムは、ターゲット CPU 使用率レベルに影響する可能性があります。ターゲット CPU 使用率を高くまたは低くするには、自動スケーリング範囲またはワーカー使用率のヒントを設定します。使用率のターゲットが高いほど、費用は削減されますが、特に負荷が変動する場合、テールレイテンシが悪化する可能性があります。

レイテンシ

次の表に、入力ステージを除いた、1 回限りモードのパイプライン レイテンシのベンチマーク結果を示します。

入力ステージを除いたステージのエンドツーエンドの合計レイテンシ Exactly-once
P50 平均: 1,200 ミリ秒、n=3
P95 平均: 3,000 ミリ秒、n=3
P99 平均: 5,400 ミリ秒、n=3

テストでは、3 回の長時間実行テストで、ステージごとのエンドツーエンド レイテンシ( job/streaming_engine/stage_end_to_end_latencies 指標)を測定しました。この指標は、Streaming Engine が各パイプライン ステージで費やす時間を測定します。これには、次のようなパイプラインの内部ステップがすべて含まれます。

  • 処理のためのメッセージのシャッフルとキューイング
  • 実際の処理時間(メッセージを Row オブジェクトに変換するなど)
  • 永続状態の書き込みと、永続状態の書き込みのキューイングに費やされた時間

指標の制限により、入力ステージのレイテンシは報告されません。 そのため、合計には含まれません。

ここに示されているベンチマークはベースラインを表しています。レイテンシはパイプラインの複雑さに大きく影響されます。カスタム UDF、追加の変換、複雑なウィンドウ処理ロジックはすべてレイテンシを増加させる可能性があります。

費用を見積もる

料金計算ツールを使用して、 リソースベースの課金で、同等のパイプラインのベースライン費用を見積もることができます。Google Cloud

  1. 料金計算ツールを開きます。 料金計算ツール
  2. [Add To Estimate] をクリックします。
  3. Dataflow を選択します。
  4. [サービスタイプ] で [Dataflow Classic] を選択します。
  5. [詳細設定] を選択して、オプションの完全なセットを表示します。
  6. ジョブを実行する場所を選択します。
  7. [サービスの種類] で [Streaming] を選択します。
  8. [Streaming Engine を有効にする] を選択します。
  9. ジョブの実行時間、ワーカーノード、ワーカーマシン、 Persistent Disk ストレージの情報を入力します。
  10. Streaming Engine コンピューティング単位数の推定数を入力します。

リソースの使用量と費用のスケールは、入力スループットにほぼ比例しますが、ワーカー数が少ない小規模なジョブの場合、総費用は固定費が大部分を占めます。まず、ベンチマークの結果からワーカーノードの数とリソース消費量を推定できます。

たとえば、入力データレートが 100 MiB/秒の 1 回限りモードでマップのみのパイプラインを実行するとします。1 GiB/秒のパイプラインのベンチマーク結果に基づいて、リソース要件を次のように見積もることができます。

  • スケーリング係数:(100 MiB/秒)/(1 GiB/秒)= 0.1
  • 推定ワーカーノード: 63 ワーカー × 0.1 = 6.3 ワーカー
  • 1 時間あたりの Streaming Engine コンピューティング単位数の推定数: 58 × 0.1 = 1 時間あたり 5.8 ユニット

この値は、初期見積もりとしてのみ使用してください。実際のスループットと費用は、マシンタイプ、メッセージ サイズの分布、ユーザーコード、集計タイプ、キー並列処理、ウィンドウ サイズなどの要因によって大きく異なる場合があります。詳細については、 Dataflow の費用の最適化のベスト プラクティスをご覧ください。

テスト パイプラインを実行する

このセクションでは、マップのみのパイプラインの実行に使用された gcloud dataflow flex-template run コマンドを示します。

1 回限りのモード

gcloud dataflow flex-template run JOB_NAME \
  --project=PROJECT_ID \
  --template-file-gcs-location=gs://dataflow-templates-us-central1/latest/flex/Kafka_to_BigQuery_Flex \
  --enable-streaming-engine \
  --parameters \
readBootstrapServerAndTopic="KAFKA_BOOTSTRAP_ADDRESS;KAFKA_TOPIC",\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\
messageFormat=JSON,\
writeMode=SINGLE_TABLE_NAME,\
outputTableSpec="PROJECT_ID:BQ_DATASET.BQ_TABLE_NAME",\
useBigQueryDLQ=true,\
outputDeadletterTable="PROJECT_ID:BQ_DATASET.BQ_TABLE_NAME_dlq",\
numStorageWriteApiStreams=400

1 回以上モード

gcloud dataflow flex-template run JOB_NAME \
  --project=PROJECT_ID \
  --template-file-gcs-location=gs://dataflow-templates-us-central1/latest/flex/Kafka_to_BigQuery_Flex \
  --enable-streaming-engine \
  --additional-experiments=streaming_mode_at_least_once \
  --parameters \
readBootstrapServerAndTopic="KAFKA_BOOTSTRAP_ADDRESS;KAFKA_TOPIC",\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\
messageFormat=JSON,\
writeMode=SINGLE_TABLE_NAME,\
outputTableSpec="PROJECT_ID:BQ_DATASET.BQ_TABLE_NAME",\
useBigQueryDLQ=true,\
outputDeadletterTable="PROJECT_ID:BQ_DATASET.BQ_TABLE_NAME_dlq",\
numStorageWriteApiStreams=400,\
useStorageWriteApiAtLeastOnce=true

次のように置き換えます。

  • JOB_NAME: Dataflow ジョブ名
  • PROJECT_ID: プロジェクト ID
  • KAFKA_BOOTSTRAP_ADDRESS: Apache Kafka クラスタのブートストラップ アドレス
  • KAFKA_TOPIC: Kafka トピックの名前
  • BQ_DATASET: BigQuery データセットの名前
  • BQ_TABLE_NAME: BigQuery テーブルの名前

テストデータを生成する

テストデータを生成するには、次のコマンドを使用して ストリーミング データ ジェネレータ テンプレートを実行します。

gcloud dataflow flex-template run JOB_NAME \
  --project=PROJECT_ID \
  --template-file-gcs-location=gs://dataflow-templates-us-central1/latest/flex/Streaming_Data_Generator \
  --max-workers=140 \
  --parameters \
schemaLocation=SCHEMA_LOCATION,\
qps=1000000,\
sinkType=KAFKA,\
bootstrapServer=KAFKA_BOOTSTRAP_ADDRESS,\
kafkaTopic=KAFKA_TOPIC,\
outputType=JSON

次のように置き換えます。

  • JOB_NAME: Dataflow ジョブ名
  • PROJECT_ID: プロジェクト ID
  • SCHEMA_LOCATION: Cloud Storage 内のスキーマ ファイルへのパス
  • KAFKA_BOOTSTRAP_ADDRESS: Apache Kafka クラスタのブートストラップ アドレス
  • KAFKA_TOPIC: Kafka トピックの名前

ストリーミング データ ジェネレータ テンプレートは、JSON データ ジェネレータ ファイルを使用してメッセージ スキーマを定義します。ベンチマーク テストでは、次のようなメッセージ スキーマを使用しました。

{
  "logStreamId": "{{integer(1000001,2000000)}}",
  "message": "{{alphaNumeric(962)}}"
}

次のステップ