このページでは、Pub/Sub から読み取って BigQuery に書き込む Dataflow ストリーミング ジョブのパフォーマンス特性について説明します。次の 2 種類のストリーミング パイプラインのベンチマーク テスト結果が返されます。
マップのみ(メッセージごとの変換): ストリーム全体で状態を追跡したり、要素をグループ化したりせずに、メッセージごとの変換を実行するパイプライン。たとえば、ETL、フィールド検証、スキーマ マッピングなどがあります。
ウィンドウ化された集計(
GroupByKey): ステートフル オペレーションを実行し、キーと時間ウィンドウに基づいてデータをグループ化するパイプライン。たとえば、イベントのカウント、合計の計算、ユーザー セッションのレコードの収集などがあります。
ストリーミング データ統合のほとんどのワークロードは、この 2 つのカテゴリに分類されます。パイプラインが同様のパターンに従っている場合は、これらのベンチマークを使用して、パフォーマンスの高いリファレンス構成に対して Dataflow ジョブを評価できます。
テスト方法
ベンチマークは次のリソースを使用して実施されました。
入力負荷が一定の事前プロビジョニングされた Pub/Sub トピック。メッセージは、ストリーミング データ ジェネレータ テンプレートを使用して生成されました。
- メッセージ レート: 1 秒あたり約 1,000,000 件のメッセージ
- 入力負荷: 1 GiB/秒
- メッセージ形式: 固定スキーマのランダムに生成された JSON テキスト
- メッセージ サイズ: メッセージ 1 件あたり約 1 KiB
標準の BigQuery テーブル。
Pub/Sub to BigQuery テンプレートに基づく Dataflow ストリーミング パイプライン。これらのパイプラインは、最小限の解析とスキーマ マッピングを実行します。カスタム ユーザー定義関数(UDF)は使用されていません。
水平スケーリングが安定し、パイプラインが定常状態に達した後、パイプラインは約 1 日間実行され、その後、結果が収集されて分析されました。
Dataflow パイプライン
2 つのパイプライン バリアントがテストされました。
マップのみのパイプライン。このパイプラインは、JSON メッセージの単純なマッピングと変換を行います。このテストでは、Pub/Sub to BigQuery テンプレートを修正せずに使用しました。
- セマンティクス: パイプラインは、1 回限りモードと1 回以上モードの両方を使用してテストされました。1 回以上の処理では、スループットが向上します。ただし、重複レコードが許容される場合や、ダウンストリーム シンクが重複除去を処理する場合にのみ使用する必要があります。
ウィンドウ集計パイプライン。このパイプラインは、固定サイズのウィンドウで特定のキーごとにメッセージをグループ化し、集計されたレコードを BigQuery に書き込みます。このテストでは、Pub/Sub to BigQuery テンプレートに基づくカスタム Apache Beam パイプラインが使用されました。
集計ロジック: 固定された重複しない 1 分間のウィンドウごとに、同じキーを持つメッセージが収集され、単一の集計レコードとして BigQuery に書き込まれました。このタイプの集計は、ログ処理でよく使用されます。たとえば、ユーザーのアクティビティなどの関連するイベントを 1 つのレコードに結合して、ダウンストリーム分析に使用します。
キーの並列処理: ベンチマークでは、一様に分布した 1,000,000 個のキーを使用しました。
セマンティクス: パイプラインは exactly-once モードを使用してテストされました。集計では、正確性を確保し、グループとウィンドウ内での二重カウントを防ぐために、1 回限りのセマンティクスが必要です。
ジョブ構成
次の表は、Dataflow ジョブの構成を示しています。
| 設定 | マップのみ、正確に 1 回 | マッピングのみ、1 回以上 | ウィンドウ集計(1 回限り) |
|---|---|---|---|
| ワーカーのマシンタイプ | n1-standard-2 |
n1-standard-2 |
n1-standard-2 |
| ワーカー マシンの vCPU | 2 | 2 | 2 |
| ワーカー マシンの RAM | 7.5 GiB | 7.5 GiB | 7.5 GiB |
| ワーカー マシンの Persistent Disk | 標準永続ディスク(HDD)、30 GB | 標準永続ディスク(HDD)、30 GB | 標準永続ディスク(HDD)、30 GB |
| 初期ワーカー | 70 | 30 | 180 |
| 最大ワーカー数 | 100 | 100 | 250 |
| Streaming Engine | はい | はい | はい |
| 水平自動スケーリング | はい | はい | はい |
| 課金モデル | リソースベースの課金 | リソースベースの課金 | リソースベースの課金 |
| Storage Write API が有効になっているか? | はい | はい | はい |
| Storage Write API ストリーム | 200 | 該当なし | 500 |
| Storage Write API のトリガー頻度 | 5 秒 | 該当なし | 5 秒 |
BigQuery Storage Write API は、ストリーミング パイプラインに推奨されます。Storage Write API で 1 回限りモードを使用する場合は、次の設定を調整できます。
書き込みストリームの数。書き込みステージで十分なキーの並列処理を確保するには、BigQuery の書き込みストリームのスループットを妥当なレベルに維持しながら、Storage Write API ストリームの数をワーカー CPU の数よりも大きい値に設定します。
トリガーの頻度。1 桁の秒値は、高スループット パイプラインに適しています。
詳細については、Dataflow から BigQuery に書き込むをご覧ください。
ベンチマークの結果
このセクションでは、ベンチマーク テストの結果について説明します。
スループットとリソース使用量
次の表に、パイプラインのスループットとリソース使用率のテスト結果を示します。
| 結果 | マップのみ、正確に 1 回 | マッピングのみ、1 回以上 | ウィンドウ集計(1 回限り) |
|---|---|---|---|
| ワーカーあたりの入力スループット | 平均: 17 MBps、n=3 | 平均: 21 MBps、n=3 | 平均: 6 Mbps、n=3 |
| すべてのワーカーの平均 CPU 使用率 | 平均: 65%、n=3 | 平均: 69%、n=3 | 平均: 80%、n=3 |
| ワーカーノードの数 | 平均: 57、n=3 | 平均: 48、n=3 | 平均: 169、n=3 |
| 1 時間あたりの Streaming Engine コンピューティング単位数 | 平均: 125、n=3 | 平均: 46、n=3 | 平均: 354、n=3 |
自動スケーリング アルゴリズムは、ターゲット CPU 使用率レベルに影響する可能性があります。目標 CPU 使用率を高くしたり低くしたりするには、自動スケーリング範囲またはワーカー使用率のヒントを設定します。使用率の目標値を高くすると、費用は削減できますが、特に負荷が変動する場合に、テール レイテンシが悪化する可能性があります。
ウィンドウ集約パイプラインの場合、集約のタイプ、ウィンドウ サイズ、キーの並列処理は、リソース使用量に大きな影響を与える可能性があります。
レイテンシ
次の表に、パイプライン レイテンシのベンチマーク結果を示します。
| ステージのエンドツーエンドの合計レイテンシ | マップのみ、正確に 1 回 | マッピングのみ、1 回以上 | ウィンドウ集計(1 回限り) |
|---|---|---|---|
| P50 | 平均: 800 ミリ秒、n=3 | 平均: 160 ミリ秒、n=3 | 平均: 3,400 ミリ秒、n=3 |
| P95 | 平均: 2,000 ミリ秒、n=3 | 平均: 250 ミリ秒、n=3 | 平均: 13,000 ミリ秒、n=3 |
| P99 | 平均: 2,800 ミリ秒、n=3 | 平均: 410 ミリ秒、n=3 | 平均: 25,000 ミリ秒、n=3 |
テストでは、3 つの長時間実行テストでステージごとのエンドツーエンド レイテンシ(job/streaming_engine/stage_end_to_end_latencies 指標)を測定しました。この指標は、Streaming Engine が各パイプライン ステージで費やした時間を測定します。これには、次のようなパイプラインのすべての内部ステップが含まれます。
- 処理するメッセージのシャッフルとキューイング
- 実際の処理時間(メッセージを行オブジェクトに変換するなど)
- 永続状態の書き込みと、永続状態の書き込みをキューに登録するのに費やされた時間
もう 1 つのレイテンシ指標は、データの更新頻度です。ただし、データの更新頻度は、ユーザー定義のウィンドウ処理やソースのアップストリームの遅延などの要因の影響を受けます。システム レイテンシは、負荷がかかった状態でのパイプラインの内部処理の効率と健全性に関するより客観的なベースラインを提供します。
データは 1 回の実行につき約 1 日にわたって測定され、安定した定常状態のパフォーマンスを反映するために、初期起動期間は破棄されました。結果には、レイテンシを追加する 2 つの要因が示されています。
1 回限りのモード。「正確に 1 回」のセマンティクスを実現するには、重複除去に決定論的シャッフルと永続状態のルックアップが必要です。少なくとも 1 回モードでは、これらの手順がバイパスされるため、パフォーマンスが大幅に向上します。
ウィンドウ集計。メッセージは、ウィンドウが閉じる前に完全にシャッフルされ、バッファリングされ、永続状態に書き込まれる必要があります。これにより、エンドツーエンドのレイテンシが増加します。
ここに示されているベンチマークはベースラインを表しています。レイテンシはパイプラインの複雑さに大きく影響されます。カスタム UDF、追加の変換、複雑なウィンドウ処理ロジックはすべて、レイテンシを増加させる可能性があります。合計やカウントなど、削減率の高い単純な集計は、要素をリストに収集するなど、状態を多く使用するオペレーションよりもレイテンシが低くなる傾向があります。
費用を見積もる
Google Cloud Platform の料金計算ツールを使用して、リソースベースの課金で同等のパイプラインのベースライン費用を見積もることができます。手順は次のとおりです。
- 料金計算ツールを開きます。
- [Add To Estimate] をクリックします。
- [Dataflow] を選択します。
- [サービスタイプ] で [Dataflow Classic] を選択します。
- [詳細設定] を選択して、すべてのオプションを表示します。
- ジョブが実行されるロケーションを選択します。
- [ジョブタイプ] で [ストリーミング] を選択します。
- [Streaming Engine を有効にする] を選択します。
- ジョブの実行時間、ワーカーノード、ワーカーマシン、Persistent Disk ストレージの情報を入力します。
- Streaming Engine コンピューティング単位数の推定値を入力します。
リソース使用量とコストは、入力スループットにほぼ比例して増加します。ただし、ワーカー数が少ない小規模なジョブでは、総コストは固定費が大部分を占めます。出発点として、ベンチマーク結果からワーカーノードの数とリソース消費量を外挿できます。
たとえば、入力データレートが 100 MiB/秒の map-only パイプラインを exactly-once モードで実行するとします。1 GiB/秒のパイプラインのベンチマーク結果に基づいて、次のようにリソース要件を見積もることができます。
- スケーリング係数: (100 MiB/秒) / (1 GiB/秒) = 0.1
- ワーカーノードの予測数: 57 ワーカー × 0.1 = 5.7 ワーカー
- 1 時間あたりの Streaming Engine コンピューティング単位数の予測値: 125 × 0.1 = 1 時間あたり 12.5 単位
この値は初期見積もりとしてのみ使用してください。実際のスループットと費用は、マシンタイプ、メッセージ サイズの分布、ユーザーコード、集計タイプ、キーの並列処理、ウィンドウ サイズなどの要因によって大きく異なる可能性があります。詳細については、Dataflow の費用の最適化のベスト プラクティスをご覧ください。
テスト パイプラインを実行する
このセクションでは、マップ専用パイプラインの実行に使用された gcloud dataflow flex-template run コマンドを示します。
1 回限りのモード
gcloud dataflow flex-template run JOB_ID \
--template-file-gcs-location gs://dataflow-templates-us-central1/latest/flex/PubSub_to_BigQuery_Flex \
--enable-streaming-engine \
--num-workers 70 \
--max-workers 100 \
--parameters \
inputSubscription=projects/PROJECT_IDsubscriptions/SUBSCRIPTION_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
useStorageWriteApi=true,\
numStorageWriteApiStreams=200 \
storageWriteApiTriggeringFrequencySec=5
1 回以上モード
gcloud dataflow flex-template run JOB_ID \
--template-file-gcs-location gs://dataflow-templates-us-central1/latest/flex/PubSub_to_BigQuery_Flex \
--enable-streaming-engine \
--num-workers 30 \
--max-workers 100 \
--parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
useStorageWriteApi=true \
--additional-experiments streaming_mode_at_least_once
次のように置き換えます。
JOB_ID: Dataflow ジョブ IDPROJECT_ID: プロジェクト IDSUBSCRIPTION_NAME: Pub/Sub サブスクリプションの名前DATASET: BigQuery データセットの名前TABLE_NAME: BigQuery テーブルの名前
テストデータを生成する
テストデータを生成するには、次のコマンドを使用して Streaming Data Generator テンプレートを実行します。
gcloud dataflow flex-template run JOB_ID \
--template-file-gcs-location gs://dataflow-templates-us-central1/latest/flex/Streaming_Data_Generator \
--num-workers 70 \
--max-workers 100 \
--parameters \
topic=projects/PROJECT_ID/topics/TOPIC_NAME,\
qps=1000000,\
maxNumWorkers=100,\
schemaLocation=SCHEMA_LOCATION
次のように置き換えます。
JOB_ID: Dataflow ジョブ IDPROJECT_ID: プロジェクト IDTOPIC_NAME: Pub/Sub トピックの名前SCHEMA_LOCATION: Cloud Storage 内のスキーマ ファイルのパス
ストリーミング データ生成ツール テンプレートは、JSON データ生成ツール ファイルを使用してメッセージ スキーマを定義します。ベンチマーク テストでは、次のようなメッセージ スキーマが使用されました。
{ "logStreamId": "{{integer(1000001,2000000)}}", "message": "{{alphaNumeric(962)}}" }
次のステップ
- Dataflow ジョブ モニタリング インターフェースを使用する
- Dataflow の費用の最適化のベスト プラクティス
- ストリーミング ジョブの処理速度が遅い場合や停止している場合のトラブルシューティング
- Pub/Sub から Dataflow に読み取る
- Dataflow から BigQuery に書き込む