このドキュメントでは、大規模なバッチ パイプラインでジョブの失敗の影響を最小限に抑える方法について説明します。大規模なワークロードの失敗は、障害からの復旧と修正に時間や費用が必要なため、特に影響が大きくなります。これらのパイプラインが失敗したときに最初から再試行すると、時間と費用の両方が多くかかります。
費用のかかるバッチ パイプラインの失敗を減らすには、このページのガイドラインに沿って対応してください。失敗した要素やパイプラインの障害を常に完全に回避できるわけではないため、ここで紹介する手法は、復元力の向上、障害コストの削減、そして障害発生時のデバッグと理解を容易にすることに重点を置いています。
一般的なパイプラインのベスト プラクティスについては、Dataflow パイプラインのベスト プラクティスをご覧ください。
大規模なジョブに対して小規模なテストを実施する
大規模なバッチジョブを実行する前に、データセットのサブセットで、1 つ以上の小規模なジョブを実行します。この手法は、費用の見積もりを提供するとともに、潜在的な障害点の特定にも役立ちます。
費用予測
テストを実行すると、ジョブの実行にかかる総費用の最小値を予測できます。通常、ジョブ費用の計算は cost of test
job*size(full dataset)/size(test dataset) です。パイプラインによっては、費用が超線形にスケールする場合もあれば、頻度は低いものの、線形以下になる場合もあります。それでも多くの場合、このステップによりジョブ費用の概算を把握できます。入力サイズを変更して、費用の増減をより正確に予測することも可能です。この情報を参考にして、既存のパイプラインを続行するか、費用削減のためにパイプラインを再設計するかを決定します。
障害点を見つける
テストを実行すると、バグ、潜在的な障害点、構成や効率に関する潜在的な問題が明らかになります。また、以下の指標など他のパイプラインの指標を確認することもできます。
- 使用可能なメモリのほぼすべてを使用しているパイプラインでは、負荷が高い場合やレコードが非常に大きい場合に、メモリ不足(OOM)例外が発生する可能性があります。これらの OOM エラーを回避するには、最後のジョブにより多くのメモリをプロビジョニングする必要があります。
- パイプラインのスループットが低下している場合は、パイプラインのログを調べて原因を特定します。停止した要素や、特にパフォーマンスが低いデータセットの一部が見つかることがあります。これらのデータポイントは個別に処理することも、要素の処理時にタイムアウトを適用することもできます。詳細については、このドキュメントの費用がかかるレコードをタイムアウトするをご覧ください。
- Dataflow のタスクで、パイプラインのパフォーマンスがローカルのパフォーマンスよりも大幅に低下する場合は、パイプライン ロジックを確認して原因を特定します。たとえば、Dataflow で 8 コアを使用したときと、ローカルで 1 コアを使用したときのスループットが同じである場合、ジョブがリソースの競合でボトルネックになっている可能性があります。パフォーマンスが想定よりも低い場合は、次のいずれかのオプションを検討してください。
- 別のマシン構成またはソフトウェア構成でさらにテストする。
- 複数のコアを使用してローカルで同時にテストする。
- コードを調べて、大規模なデプロイ時にボトルネックになる可能性のある部分を特定する。
パイプラインに Dataflow の推奨事項がある場合は、それらに基づいてパフォーマンスを改善します。
デッドレター キューを使用して予期しない不正なデータを処理する
多くの場合、パイプラインはほとんどの入力要素で成功しますが、入力のごく一部のサブセットでは失敗することもあります。小規模なテストでは、入力のサブセットのみをテストするため、この問題が検出されないことがあります。Dataflow では、デフォルトでこれらの失敗したタスクをバッチモードで 4 回、ストリーミング モードで無制限に再試行します。バッチモードでは、再試行の上限に達すると、ジョブ全体が失敗します。ストリーミング モードでは、無期限に停止する可能性があります。
多くのジョブでは、デッドレター キュー(未処理のメッセージ キュー)を使用して、これらの失敗した要素をパイプラインから除外し、残りのジョブを完了させることができます。デッドレター キューは、失敗したレコードを別の出力 PCollection に渡します。この出力は、メイン出力とは別に管理できます。この構成により、これらのレコードのポリシーを設計できます。たとえば、レコードを手動で Pub/Sub に書き込み、検査してクリーンアップしてから、レコードを再処理できます。
多くの Apache Beam 変換には、デッドレター キューのサポートが組み込まれています。Java では、ErrorHandler オブジェクトを使用してアクセスできます。Python では、with_exception_handling メソッドを使用してアクセスできます。一部の変換では、デッドレター キューをカスタムの方法で定義できます。変換のドキュメントで詳細をご確認ください。詳細については、エラー処理にデッドレター キューを使用するをご覧ください。
ジョブがデッドレター キューの条件を満たしているかどうかを確認するには、このドキュメントの制限事項セクションをご覧ください。
デッドレター キューの制限事項
次のシナリオでは、デッドレター キューが役に立たない場合があります。
- ワーカー全体または
DoFnのライフサイクルの失敗。ワーカーまたはバンドル全体の処理が失敗した場合、デッドレター キューは失敗を把握できません。たとえば、パイプラインでメモリ不足(OOM)例外が発生した場合、VM 上のアクティブなタスクはすべて失敗して再試行されますが、デッドレター キューには何も送信されません。 - 結合やその他の集計。すべての入力要素が存在し、結果の一部として処理される必要がある計算をパイプラインが実行する場合、このステップの前にデッドレター キューを使用する際は注意が必要です。デッドレター キューを使用すると、入力データの一部が結果から除外されます。デッドレター キューの追加により、正確性とフォールト トラレンスのトレードオフが発生する可能性があります。
- デッドレター キューパスでの障害。デッドレター キューシンクへの送信中に要素が失敗すると、パイプライン全体が失敗する可能性があります。この障害を回避するには、デッドレター キューのロジックを可能な限りシンプルにします。デッドレター キュー要素の書き込み前にメインの入力が確実に完了するように、待機ステップ(
wait classを参照)を追加できます。この構成は、パフォーマンスが低下し、パイプラインからのエラーシグナルが遅延する可能性があります。 - 部分的に変換された要素。デッドレター キューをパイプラインの途中に挿入すると、デッドレター キューは部分的に変換された要素を出力し、元の要素にアクセスできない可能性があります。そのため、要素をクリーンアップして、その要素に対してパイプラインを再実行することはできません。代わりに、デッドレター キューの出力を元の要素と関連付けるために別のロジックを適用する必要が生じる場合や、部分的に変換された要素を解釈して処理する必要が生じる場合があります。また、結果の不整合が生じる可能性もあります。たとえば、要素がパイプラインの 2 つのブランチに送信され、各ブランチが例外を引き起こす要素をデッドレター キューに送信する場合、単一の入力要素が片方のブランチ、もう片方のブランチ、または両方のブランチに送信される可能性と、いずれのブランチにも送信されない可能性があります。
費用がかかるレコードをタイムアウトする
パイプラインは、より費用のかかる要素、またはデッドロックなどの応答不能を引き起こす制限に達した要素の小さなサブセットを処理している間、応答を停止することがあります。この問題を軽減するために、一部の変換ではタイムアウトを設定し、問題が発生したユーザーコード DoFn 内のタイムアウトした要素を失敗させることができます。たとえば、Python の with_exception_handling メソッドを使用できます。デッドレター キューでタイムアウトを使用すると、パイプラインは正常な要素の処理を続行して進行状況を進め、費用のかかる要素は個別に再処理できます。この構成は、パフォーマンスが低下する可能性があります。
タイムアウトが必要になる可能性が高い DoFn オペレーションを特定するには、パイプライン全体を起動する前に小規模のテストを実行します。
垂直自動スケーリングを有効にする
ジョブに必要なメモリ量が不明な場合や、ジョブでメモリ不足が発生する可能性がある場合は、垂直自動スケーリングを有効にします。この機能は、パイプラインが大規模に実行される場合や、非常に大きな要素に遭遇した場合に OOM エラーを回避するのに役立ちます。
垂直自動スケーリングは、ジョブ費用を増加させる可能性があり、すべてのメモリ不足エラーを防止できるわけではありませんが、それでも過剰なメモリ消費の問題に対処する必要があります。垂直自動スケーリングには Dataflow Prime も必要です。Dataflow Prime には追加の制限と、異なる課金モデルがあります。
投機的実行を使用して遅延タスクを回避する
バッチ パイプラインでは、実行速度の遅いタスクや停止したタスクの影響を軽減する機能である投機的実行を有効にできます。これらの遅いタスクや停止したタスクは、遅延タスクとも呼ばれます。この機能は、時間がかかりすぎているタスクの冗長な実行(バックアップ実行)を開始します。最初に完了したタスクが使用され、他のタスクはキャンセルされます。これにより、パイプラインの全体的な完了時間を短縮できます。
投機的実行は、ワーカーマシンの速度低下や、非決定論的バグ、リソース スロットリング、接続性の問題などの一時的な問題によって遅延が発生している作業項目の代替実行パスを提供することで、パイプラインの完了を高速化できます。
制限事項と考慮事項
投機的実行を有効にする前に、次の点を考慮してください。
- ストリーミング パイプライン: ストリーミング パイプラインでは投機的実行はサポートされていません。
- 費用の変化の可能性: この機能の費用への影響は、遅延タスクやバックアップ タスクのプロビジョニングを予測することが難しいため、推定が困難です。たとえば、バックアップ ワークアイテムは追加のリソースを消費し、コストが増加する可能性がありますが、早期に完了することで、リソースの節約とコスト削減につながる可能性があります。いずれの場合も、全体的な影響は最小限にとどまる見込みです。
- 一貫して実行時間が長いワークアイテム: ホットキーなど、一貫して実行時間が長いワークアイテムの場合、遅延の原因となる根本的な問題が解決されないため、投機的実行はあまり役に立たない可能性があります。
バッチジョブの遅延タスクの詳細については、バッチジョブの遅延タスクのトラブルシューティングをご覧ください。
投機的実行を有効にする
投機的実行を有効にするには、map_task_backup_mode Dataflow サービス オプションを使用します。次の 2 つのモードがあります。
Java
--dataflowServiceOptions=map_task_backup_mode=ON--dataflowServiceOptions=map_task_backup_mode=CAUTIOUS
Python / Go
--dataflow_service_options=map_task_backup_mode=ON--dataflow_service_options=map_task_backup_mode=CAUTIOUS
ON モードでは、元のタスクの想定実行時間が新しいタスクの想定実行時間よりも約 20% 長い場合に、バックアップ タスクがスケジュールされます。
CAUTIOUS モードでは、元のタスクの想定実行時間が新しいタスクの想定実行時間よりも約 70% 長い場合、バックアップ タスクがスケジュールされます。
投機的実行が有効になっていることを確認するには、ログメッセージを確認します。バックアップ タスクが開始されたことを示すエントリを探します。これにより、投機的実行がトリガーされたことを確認できます。これらのログを表示するには、パイプラインの [ジョブログ] パネルに移動します([ジョブ] > [ジョブを選択] > [ログ] セクション > [ジョブログ])。ログメッセージは次のように表示されます。
Backup issued in step STEP_NAME. ADDITIONAL_INFORMATION.
障害が発生しやすいパイプラインの回避策
パイプラインの中には、特にエラーが発生しやすいものがあります。これらのエラーの原因に対処するのが最善ですが、障害の費用削減のために、次のオプションを検討してください。
中間結果を実体化する
パイプラインには、パイプラインの実行時間の大部分を占める、特に費用のかかる変換が 1 つ以上含まれている場合があります。この変換後のパイプラインの障害は、すでに完了した作業がすべて失われるため特に有害です。このシナリオを回避するには、費用のかかるステップで生成された中間 PCollections を Cloud Storage などのシンクに書き込むことを検討してください。この構成により、障害が発生した場合の費用を削減できます。この利点と、追加の書き込みを実行するコストを比較する必要があります。この実体化された結果は、次のいずれかの方法で使用できます。
- 元のパイプラインを、中間結果を書き込むパイプラインと中間結果を読み取るパイプラインの 2 つのパイプラインに分割します。
- パイプラインの障害が発生した場合にのみ、元のソースと実体化された中間コレクションの両方から結果を読み取り、フラット化します。
これらの実体化が次の処理の前に確実に書き込まれるようにするため、後続の処理ステップの前に待機ステップ(wait
class を参照)を追加します。