RunInference 変換のベスト プラクティス

ML 推論に Dataflow を使用する場合は、RunInference 変換を使用することをおすすめします。この変換を使用すると、次のようなメリットがあります。

  • ローカル推論の実行時に Dataflow ワーカー用に最適化されたインテリジェントなモデルメモリ管理。
  • パイプラインの特性とユーザー定義の制約を使用してパフォーマンスを最適化する動的バッチ処理。
  • ML 対応の Dataflow バックエンド機能。スループットとレイテンシを改善できます。
  • リモート推論の割り当てに達した場合のインテリジェントなバックオフと自動スケーリングのメカニズム。
  • プロダクション レディな指標と運用機能。

RunInference を使用する際には、次の点に注意してください。

メモリ管理

中規模または大規模の ML モデルを読み込むと、マシンのメモリが不足する可能性があります。Dataflow には、ML モデルの読み込み時にメモリ不足(OOM)エラーを回避するためのツールが用意されています。次の表を使用して、シナリオに適したアプローチを決定してください。

シナリオ 解決策
モデルはメモリに収まる程度の小さなサイズである。 追加の構成なしで RunInference 変換を使用します。RunInference 変換は、スレッド間でモデルを共有します。マシンの CPU コアごとに 1 つのモデルを配置できる場合は、パイプラインでデフォルト構成を使用できます。
トレーニング方法が異なる複数のモデルが同じタスクを実行している。 モデルごとのキーを使用します。詳細については、トレーニングが異なる複数のモデルで ML 推論を実行するをご覧ください。
1 つのモデルがメモリに読み込まれ、すべてのプロセスがこのモデルを共有する。

large_model パラメータを使用します。詳細については、トレーニングが異なる複数のモデルで ML 推論を実行するをご覧ください。

カスタム モデルハンドラを構築する場合は、large_model パラメータを使用する代わりに、share_model_across_processes パラメータをオーバーライドします。

マシンに読み込むモデルの正確な数を構成する必要がある。

読み込むモデルの数を正確に制御するには、model_copies パラメータを使用します。

カスタム モデルハンドラを構築する場合は、model_copies パラメータをオーバーライドします。

Dataflow のメモリ管理の詳細については、Dataflow のメモリ不足エラーのトラブルシューティングをご覧ください。

バッチ処理

Beam でバッチ処理を行う方法は多数ありますが、推論を実行する場合は、RunInference 変換でバッチ処理を行うことをおすすめします。特定のバッチサイズでモデルのパフォーマンスが最適になる場合は、RunInference のターゲット バッチサイズ パラメータを制約することを検討してください。ほとんどのモデルハンドラは、最大バッチサイズと最小バッチサイズをパラメータとして公開します。たとえば、HuggingFace パイプラインに渡されるバッチサイズを制御するには、次のモデルハンドラを定義します。

mh = HuggingFacePipelineModelHandler('text-classification', min_batch_size=4, max_batch_size=16)

RunInference 変換は、常に最大バッチサイズを尊重します。最小バッチサイズは目標ですが、すべてのケースで尊重されるとは限りません。たとえば、次のセクションのバンドルベースのバッチ処理をご覧ください。

バンドルベースのバッチ処理

Dataflow は、バンドルで変換にデータを渡します。これらのバンドルのサイズは、Dataflow で定義されたヒューリスティックによって異なります。通常、バッチ パイプラインのバンドルは非常に大規模(O(100s) 要素)ですが、ストリーミング パイプラインのバンドルは非常に小規模になることがあります(サイズ 1 を含む)。

デフォルトでは、RunInference は各バンドルからバッチを生成し、バンドル間でバッチ処理を行いません。つまり、最小バッチサイズが 8 で、バンドルに 3 つの要素しか残っていない場合、RunInference はバッチサイズ 3 を使用します。ほとんどのモデルハンドラは、この動作をオーバーライドできる max_batch_duration_secs パラメータを公開しています。max_batch_duration_secs が設定されている場合、RunInference はバンドル間でバッチ処理されます。変換で単一のバンドルでは目標バッチサイズを達成できない場合、バッチを生成する前に最大 max_batch_duration_secs 待機します。たとえば、HuggingFace パイプラインを使用するときにクロスバンドル バッチ処理を有効にするには、次のモデルハンドラを定義します。

mh = HuggingFacePipelineModelHandler('text-classification', min_batch_size=4, max_batch_size=16, max_batch_duration_secs=3)

この機能は、パイプラインでバッチサイズが非常に小規模である場合に役立ちます。そうでない場合は、バンドル間でバッチ処理を行う同期コストには通常、使用する価値がありません。費用のかかるシャッフルが発生する可能性があるためです。

障害対応

エラー処理は、あらゆる本番環境パイプラインの重要な部分です。バンドル内のいずれかの要素についてエラーが発生した場合、Dataflow はそのバンドル内の要素を処理し、バンドル全体を再試行します。追加のエラー処理を適用しない場合、バッチモードで実行すると、Dataflow は失敗した項目を含むバンドルを 4 回再試行します。1 つのバンドルが 4 回失敗すると、パイプラインが完全に失敗します。ストリーミング モードで実行している場合、Dataflow は失敗した項目を含むバンドルを無制限に再試行します。これにより、パイプラインが恒久的に滞るおそれがあります。

RunInference は、with_exception_handling 関数を使用して、組み込みのエラー処理メカニズムを提供します。この関数を適用すると、すべての障害がエラー メッセージとともに別の障害 PCollection に転送されます。これにより、再処理が可能になります。前処理オペレーションまたは後処理オペレーションをモデルハンドラに関連付けると、RunInference はそれらのオペレーションも失敗コレクションに転送します。たとえば、前処理オペレーションと後処理オペレーションを含むモデルハンドラからすべてのエラーを収集するには、次のロジックを使用します。

main, other = pcoll | RunInference(model_handler.with_preprocess_fn(f1).with_postprocess_fn(f2)).with_exception_handling()

# handles failed preprocess operations, indexed in the order in which they were applied
other.failed_preprocessing[0] | beam.Map(logging.info)

# handles failed inferences
other.failed_inferences | beam.Map(logging.info)

# handles failed postprocess operations, indexed in the order in which they were applied
other.failed_postprocessing[0] | beam.Map(logging.info)

タイムアウト

RunInferencewith_exception_handling 機能を使用する場合は、各オペレーションのタイムアウトを設定することもできます。これはバッチごとにカウントされます。これにより、1 つの推論がスタックしてパイプライン全体が応答しなくなるのを防ぐことができます。タイムアウトが発生すると、タイムアウトしたレコードは失敗 PCollection に転送されて、すべてのモデルの状態がクリーンアップされて再作成され、通常の実行が続行されます。

# Timeout execution after 60 seconds
main, other = pcoll | RunInference(model_handler).with_exception_handling(timeout=60)

Beam 2.68.0 以降では、--element_processing_timeout_minutes パイプライン オプションを使用してタイムアウトを指定することもできます。この場合、タイムアウトが発生すると、失敗した推論がデッドレター キューに転送されるのではなく、失敗した作業アイテムが成功するまで再試行されます。

アクセラレータの操作

アクセラレータを使用する場合、多くのモデルハンドラには、有効にできるアクセラレータに固有の構成があります。たとえば、GPU と Hugging Face パイプラインを使用する場合は、device パラメータを GPU に設定することをおすすめします。

mh = HuggingFacePipelineModelHandler('text-classification', device='GPU')

また、単一の VM インスタンスから始めて、そこでパイプラインをローカルで実行することをおすすめします。これを行うには、GPU のトラブルシューティング ガイドに記載されている手順に沿って操作します。これにより、パイプラインの実行に必要な時間を大幅に短縮できます。このアプローチは、ジョブのパフォーマンスを把握するうえでも役立ちます。

Dataflow でアクセラレータを使用する方法については、Dataflow の GPUTPU に関するドキュメントをご覧ください。

依存関係の管理

ML パイプラインには、PyTorch や TensorFlow などの大規模で重要な依存関係が含まれていることがよくあります。これらの依存関係を管理するには、ジョブを本番環境にデプロイするときにカスタム コンテナを使用することをおすすめします。これにより、ジョブが安定した環境で複数の実行にまたがって実行され、デバッグが簡素化されます。

依存関係の管理の詳細については、Beam の Python 依存関係の管理ページをご覧ください。

次のステップ