使用 Dataflow 進行 ML 推論時,建議您使用 RunInference 轉換。使用這項轉換作業有許多優點,包括:
- 智慧型模型記憶體管理功能,可在執行本機推論時,針對 Dataflow 工作人員進行最佳化。
- 動態批次處理:使用管線特徵和使用者定義的限制條件,將效能最佳化。
- 支援機器學習的 Dataflow 後端功能,可提供更佳的輸送量和延遲時間。
- 遇到遠端推論配額時,智慧型退避和自動調度資源機制。
- 可直接用於實際工作環境的指標和作業功能。
使用 RunInference 時,請注意下列幾點:
記憶體管理
載入中型或大型 ML 模型時,機器可能會記憶體不足。 Dataflow 提供相關工具,協助您在載入 ML 模型時避免記憶體不足 (OOM) 錯誤。請參閱下表,判斷適合您情況的方法。
| 情境 | 解決方案 |
|---|---|
| 模型夠小,可放入記憶體。 |
使用 RunInference 轉換,無需額外設定。RunInference 轉換會在執行緒之間共用模型。如果機器上的每個 CPU 核心都能容納一個模型,管道就可以使用預設設定。
|
| 多個訓練方式不同的模型執行相同工作。 | 使用模型專屬金鑰。詳情請參閱「使用多個訓練方式不同的模型執行 ML 推論」。 |
| 一個模型會載入記憶體,所有程序都會共用這個模型。 |
使用 如果您要建構自訂模型處理常式,請覆寫 |
| 您必須設定載入至電腦的模型確切數量。 |
如要精確控管載入的模型數量,請使用 如果您要建構自訂模型處理常式,請覆寫 |
如要進一步瞭解如何使用 Dataflow 管理記憶體,請參閱排解 Dataflow 記憶體不足錯誤。
批次處理
在 Beam 中,批次處理的方式有很多種,但執行推論時,建議您讓 RunInference 轉換處理批次處理作業。如果模型在特定批次大小下表現最佳,請考慮限制 RunInference 的目標批次大小參數。大多數模型處理常式都會將批次大小上限和下限做為參數公開。舉例來說,如要控管饋送至 HuggingFace 管道的批次大小,可以定義下列模型處理常式:
mh = HuggingFacePipelineModelHandler('text-classification', min_batch_size=4, max_batch_size=16)
RunInference 轉換作業一律會遵守批次大小上限。最低批次大小是目標,但不保證在所有情況下都能遵守。舉例來說,請參閱下一節的「以套件為基礎的批次處理」。
以套裝組合為準的批次處理
Dataflow 會將資料傳遞至套件中的轉換。這些組合的大小會因 Dataflow 定義的啟發式方法而異。一般來說,批次管線中的套件相當大 (O(100s) 個元素),而串流管線中的套件則相當小 (包括大小 1)。
根據預設,RunInference 會從每個套件產生批次,且不會跨套件批次處理。也就是說,如果最小批次大小為 8,但套件中只剩下 3 個元素,RunInference 會使用批次大小 3。大多數模型處理常式都會公開 max_batch_duration_secs 參數,讓您覆寫這項行為。如果已設定 max_batch_duration_secs,則 RunInference 會批次處理套件中的所有檔案。如果轉換無法透過單一套件達到目標批次大小,系統最多會等待 max_batch_duration_secs,然後產生批次。舉例來說,如要在使用 HuggingFace 管道時啟用跨套件批次處理,可以定義下列模型處理常式:
mh = HuggingFacePipelineModelHandler('text-classification', min_batch_size=4, max_batch_size=16, max_batch_duration_secs=3)
如果管道中的批次大小非常小,這項功能會有幫助。 否則,跨套件批次處理的同步成本通常不值得使用,因為這可能會導致昂貴的隨機排序。
處理失敗
處理錯誤是任何正式版管道的重要環節。 Dataflow 會以任意組合來處理元素,只要組合中有任何元素發生錯誤,就會重新嘗試處理整個組合。如果您未套用額外的錯誤處理機制,以批次模式執行時,Dataflow 會重試內含失敗項目的組合四次。如果單一組合失敗達四次,管道程序就會完全失敗。以串流模式執行時,Dataflow 會無限期重試內含失敗項目的組合,這可能會導致管道永久停滯。
RunInference 的 with_exception_handling 函式提供內建錯誤處理機制。套用這項函式後,所有失敗都會連同錯誤訊息一併傳送至個別的失敗 PCollection。以便重新處理。如果將前處理或後處理作業與模型處理常式建立關聯,RunInference 也會將這些作業路徑傳送至失敗集合。舉例來說,如要收集模型處理常式中所有失敗的預先處理和後續處理作業,請使用下列邏輯:
main, other = pcoll | RunInference(model_handler.with_preprocess_fn(f1).with_postprocess_fn(f2)).with_exception_handling()
# handles failed preprocess operations, indexed in the order in which they were applied
other.failed_preprocessing[0] | beam.Map(logging.info)
# handles failed inferences
other.failed_inferences | beam.Map(logging.info)
# handles failed postprocess operations, indexed in the order in which they were applied
other.failed_postprocessing[0] | beam.Map(logging.info)
逾時
使用 RunInference 的 with_exception_handling 功能時,您也可以為每項作業設定逾時時間,並以批次為單位計算。這樣一來,您就能避免單一卡住的推論導致整個管道沒有回應。如果發生逾時情形,逾時的記錄會傳送至失敗 PCollection,所有模型狀態都會清除並重新建立,然後繼續正常執行。
# Timeout execution after 60 seconds
main, other = pcoll | RunInference(model_handler).with_exception_handling(timeout=60)
從 Beam 2.68.0 開始,您也可以使用 --element_processing_timeout_minutes 管道選項指定逾時。在這種情況下,如果工作項目逾時,系統會重試,直到成功為止,而不是將失敗的推論作業轉送至死信佇列。
使用加速器
使用加速器時,許多模型處理常式都有可啟用的加速器專屬設定。舉例來說,使用 GPU 和 Hugging Face 管道時,建議將 device 參數設為 GPU:
mh = HuggingFacePipelineModelHandler('text-classification', device='GPU')
此外,我們也建議您先從單一 VM 執行個體著手,並在該處執行管道。如要這麼做,請按照 GPU 疑難排解指南中的步驟操作。這可大幅縮短管道的執行時間。這種做法也有助於您進一步瞭解作業的成效。
如要進一步瞭解如何在 Dataflow 中使用加速器,請參閱 Dataflow 說明文件中的 GPU 和 TPU。
依附元件管理
機器學習管道通常包含大型且重要的依附元件,例如 PyTorch 或 TensorFlow。如要管理這些依附元件,建議您在將工作部署至正式環境時,使用自訂容器。這可確保工作在多次執行時都能在穩定環境中執行,並簡化偵錯程序。
如要進一步瞭解依附元件管理,請參閱 Beam 的 Python 依附元件管理頁面。
後續步驟
- 如需實務範例,請參閱 Dataflow ML 筆記本。
- 如要深入瞭解如何搭配使用機器學習與 Apache Beam,請參閱 AI/機器學習管道說明文件。
- 進一步瞭解
RunInferenceAPI。 - 瞭解可用於監控
RunInference轉換的指標。 - 返回「About Dataflow ML」(關於 Dataflow ML) 頁面,瞭解 Dataflow 的機器學習功能。