RunInference 轉換最佳做法

使用 Dataflow 進行 ML 推論時,建議您使用 RunInference 轉換。使用這項轉換作業有許多優點,包括:

  • 智慧型模型記憶體管理功能,可在執行本機推論時,針對 Dataflow 工作人員進行最佳化。
  • 動態批次處理:使用管線特徵和使用者定義的限制條件,將效能最佳化。
  • 支援機器學習的 Dataflow 後端功能,可提供更佳的輸送量和延遲時間。
  • 遇到遠端推論配額時,智慧型退避和自動調度資源機制。
  • 可直接用於實際工作環境的指標和作業功能。

使用 RunInference 時,請注意下列幾點:

記憶體管理

載入中型或大型 ML 模型時,機器可能會記憶體不足。 Dataflow 提供相關工具,協助您在載入 ML 模型時避免記憶體不足 (OOM) 錯誤。請參閱下表,判斷適合您情況的方法。

情境 解決方案
模型夠小,可放入記憶體。 使用 RunInference 轉換,無需額外設定。RunInference 轉換會在執行緒之間共用模型。如果機器上的每個 CPU 核心都能容納一個模型,管道就可以使用預設設定。
多個訓練方式不同的模型執行相同工作。 使用模型專屬金鑰。詳情請參閱「使用多個訓練方式不同的模型執行 ML 推論」。
一個模型會載入記憶體,所有程序都會共用這個模型。

使用 large_model 參數。詳情請參閱「使用多個訓練方式不同的模型執行 ML 推論」。

如果您要建構自訂模型處理常式,請覆寫 share_model_across_processes 參數,而非使用 large_model 參數。

您必須設定載入至電腦的模型確切數量。

如要精確控管載入的模型數量,請使用 model_copies 參數。

如果您要建構自訂模型處理常式,請覆寫 model_copies 參數。

如要進一步瞭解如何使用 Dataflow 管理記憶體,請參閱排解 Dataflow 記憶體不足錯誤

批次處理

在 Beam 中,批次處理的方式有很多種,但執行推論時,建議您讓 RunInference 轉換處理批次處理作業。如果模型在特定批次大小下表現最佳,請考慮限制 RunInference 的目標批次大小參數。大多數模型處理常式都會將批次大小上限和下限做為參數公開。舉例來說,如要控管饋送至 HuggingFace 管道的批次大小,可以定義下列模型處理常式:

mh = HuggingFacePipelineModelHandler('text-classification', min_batch_size=4, max_batch_size=16)

RunInference 轉換作業一律會遵守批次大小上限。最低批次大小是目標,但不保證在所有情況下都能遵守。舉例來說,請參閱下一節的「以套件為基礎的批次處理」。

以套裝組合為準的批次處理

Dataflow 會將資料傳遞至套件中的轉換。這些組合的大小會因 Dataflow 定義的啟發式方法而異。一般來說,批次管線中的套件相當大 (O(100s) 個元素),而串流管線中的套件則相當小 (包括大小 1)。

根據預設,RunInference 會從每個套件產生批次,且不會跨套件批次處理。也就是說,如果最小批次大小為 8,但套件中只剩下 3 個元素,RunInference 會使用批次大小 3。大多數模型處理常式都會公開 max_batch_duration_secs 參數,讓您覆寫這項行為。如果已設定 max_batch_duration_secs,則 RunInference 會批次處理套件中的所有檔案。如果轉換無法透過單一套件達到目標批次大小,系統最多會等待 max_batch_duration_secs,然後產生批次。舉例來說,如要在使用 HuggingFace 管道時啟用跨套件批次處理,可以定義下列模型處理常式:

mh = HuggingFacePipelineModelHandler('text-classification', min_batch_size=4, max_batch_size=16, max_batch_duration_secs=3)

如果管道中的批次大小非常小,這項功能會有幫助。 否則,跨套件批次處理的同步成本通常不值得使用,因為這可能會導致昂貴的隨機排序

處理失敗

處理錯誤是任何正式版管道的重要環節。 Dataflow 會以任意組合來處理元素,只要組合中有任何元素發生錯誤,就會重新嘗試處理整個組合。如果您未套用額外的錯誤處理機制,以批次模式執行時,Dataflow 會重試內含失敗項目的組合四次。如果單一組合失敗達四次,管道程序就會完全失敗。以串流模式執行時,Dataflow 會無限期重試內含失敗項目的組合,這可能會導致管道永久停滯。

RunInferencewith_exception_handling 函式提供內建錯誤處理機制。套用這項函式後,所有失敗都會連同錯誤訊息一併傳送至個別的失敗 PCollection。以便重新處理。如果將前處理或後處理作業與模型處理常式建立關聯,RunInference 也會將這些作業路徑傳送至失敗集合。舉例來說,如要收集模型處理常式中所有失敗的預先處理和後續處理作業,請使用下列邏輯:

main, other = pcoll | RunInference(model_handler.with_preprocess_fn(f1).with_postprocess_fn(f2)).with_exception_handling()

# handles failed preprocess operations, indexed in the order in which they were applied
other.failed_preprocessing[0] | beam.Map(logging.info)

# handles failed inferences
other.failed_inferences | beam.Map(logging.info)

# handles failed postprocess operations, indexed in the order in which they were applied
other.failed_postprocessing[0] | beam.Map(logging.info)

逾時

使用 RunInferencewith_exception_handling 功能時,您也可以為每項作業設定逾時時間,並以批次為單位計算。這樣一來,您就能避免單一卡住的推論導致整個管道沒有回應。如果發生逾時情形,逾時的記錄會傳送至失敗 PCollection,所有模型狀態都會清除並重新建立,然後繼續正常執行。

# Timeout execution after 60 seconds
main, other = pcoll | RunInference(model_handler).with_exception_handling(timeout=60)

從 Beam 2.68.0 開始,您也可以使用 --element_processing_timeout_minutes 管道選項指定逾時。在這種情況下,如果工作項目逾時,系統會重試,直到成功為止,而不是將失敗的推論作業轉送至死信佇列

使用加速器

使用加速器時,許多模型處理常式都有可啟用的加速器專屬設定。舉例來說,使用 GPU 和 Hugging Face 管道時,建議將 device 參數設為 GPU:

mh = HuggingFacePipelineModelHandler('text-classification', device='GPU')

此外,我們也建議您先從單一 VM 執行個體著手,並在該處執行管道。如要這麼做,請按照 GPU 疑難排解指南中的步驟操作。這可大幅縮短管道的執行時間。這種做法也有助於您進一步瞭解作業的成效。

如要進一步瞭解如何在 Dataflow 中使用加速器,請參閱 Dataflow 說明文件中的 GPUTPU

依附元件管理

機器學習管道通常包含大型且重要的依附元件,例如 PyTorch 或 TensorFlow。如要管理這些依附元件,建議您在將工作部署至正式環境時,使用自訂容器。這可確保工作在多次執行時都能在穩定環境中執行,並簡化偵錯程序。

如要進一步瞭解依附元件管理,請參閱 Beam 的 Python 依附元件管理頁面

後續步驟