Best practice per la trasformazione RunInference

Quando utilizzi Dataflow per l'inferenza ML, ti consigliamo di utilizzare la trasformazione RunInference. L'utilizzo di questa trasformazione comporta una serie di vantaggi, tra cui:

  • Gestione intelligente della memoria del modello ottimizzata per un worker Dataflow durante l'esecuzione dell'inferenza locale.
  • Batch dinamico che utilizza le caratteristiche della pipeline e i vincoli definiti dall'utente per ottimizzare il rendimento.
  • Funzionalità di backend di Dataflow basate sull'ML che possono fornire un throughput e una latenza migliori.
  • Meccanismi di backoff intelligente e scalabilità automatica quando si raggiungono le quote di inferenza remota.
  • Metriche e funzionalità operative pronte per la produzione.

Quando utilizzi RunInference, ci sono diverse cose da considerare:

Gestione della memoria

Quando carichi un modello ML medio o grande, la tua macchina potrebbe esaurire la memoria. Dataflow fornisce strumenti per evitare errori di memoria insufficiente (OOM) durante il caricamento dei modelli di ML. Utilizza la tabella seguente per determinare l'approccio appropriato per il tuo scenario.

Scenario Soluzione
I modelli sono abbastanza piccoli da poter essere memorizzati. Utilizza la trasformazione RunInference senza configurazioni aggiuntive. La trasformazione RunInference condivide i modelli tra i thread. Se puoi inserire un modello per core CPU sulla tua macchina, la pipeline può utilizzare la configurazione predefinita.
Più modelli addestrati in modo diverso eseguono la stessa attività. Utilizza chiavi per modello. Per saperne di più, consulta Esegui l'inferenza di ML con più modelli addestrati in modo diverso.
Un modello viene caricato in memoria e tutti i processi lo condividono.

Utilizza il parametro large_model. Per saperne di più, consulta Esegui l'inferenza di ML con più modelli addestrati in modo diverso.

Se stai creando un gestore di modelli personalizzato, anziché utilizzare il parametro large_model, esegui l'override del parametro share_model_across_processes.

Devi configurare il numero esatto di modelli caricati sulla macchina.

Per controllare esattamente quanti modelli vengono caricati, utilizza il parametro model_copies.

Se stai creando un gestore di modelli personalizzato, esegui l'override del parametro model_copies.

Per ulteriori informazioni sulla gestione della memoria con Dataflow, consulta Risolvere gli errori di esaurimento della memoria di Dataflow.

Batching

Esistono molti modi per eseguire il batching in Beam, ma quando esegui l'inferenza ti consigliamo di lasciare che la trasformazione RunInference gestisca il batching. Se il tuo modello funziona meglio con una dimensione batch specifica, valuta la possibilità di vincolare i parametri della dimensione batch target di RunInference. La maggior parte dei gestori di modelli espone le dimensioni massime e minime dei batch come parametri. Ad esempio, per controllare la dimensione del batch inserito in una pipeline HuggingFace, puoi definire il seguente gestore del modello:

mh = HuggingFacePipelineModelHandler('text-classification', min_batch_size=4, max_batch_size=16)

La trasformazione RunInference rispetta sempre la dimensione massima del batch. La dimensione minima del batch è un target, ma non è garantito che venga rispettato in tutti i casi. Ad esempio, consulta Raggruppamento basato sui bundle nella sezione seguente.

Batching basato sui bundle

Dataflow passa i dati alle trasformazioni in bundle. Questi bundle possono variare di dimensioni a seconda dell'euristica definita da Dataflow. In genere, i bundle nelle pipeline batch sono piuttosto grandi (O(100) elementi), mentre per le pipeline di streaming possono essere piuttosto piccoli (inclusa la dimensione 1).

Per impostazione predefinita, RunInference genera batch da ogni bundle e non raggruppa i batch tra i bundle. Ciò significa che se hai una dimensione batch minima di 8, ma nel bundle sono rimasti solo 3 elementi, RunInference utilizza una dimensione batch di 3. La maggior parte dei gestori di modelli espone un parametro max_batch_duration_secs che consente di ignorare questo comportamento. Se max_batch_duration_secs è impostato, RunInference raggruppa i batch tra i bundle. Se la trasformazione non riesce a raggiungere la dimensione batch target con un singolo bundle, attende al massimo max_batch_duration_secs prima di generare un batch. Ad esempio, per attivare il batch cross-bundle quando utilizzi una pipeline HuggingFace, puoi definire il seguente gestore del modello:

mh = HuggingFacePipelineModelHandler('text-classification', min_batch_size=4, max_batch_size=16, max_batch_duration_secs=3)

Questa funzionalità è utile se riscontri dimensioni dei batch molto ridotte nella pipeline. In caso contrario, il costo della sincronizzazione in batch tra i bundle di solito non vale la pena di essere utilizzato, perché può causare un costoso shuffle.

Gestione degli errori

La gestione degli errori è una parte importante di qualsiasi pipeline di produzione. Dataflow elabora gli elementi in bundle arbitrari e riprova l'intero bundle se si verifica un errore per qualsiasi elemento del bundle. Se non applichi una gestione degli errori aggiuntiva, Dataflow ritenta i bundle che includono un elemento non riuscito quattro volte quando viene eseguito in modalità batch. La pipeline non viene eseguita completamente quando un singolo pacchetto non viene eseguito quattro volte. Quando viene eseguito in modalità streaming, Dataflow riprova a elaborare un bundle che include un elemento non riuscito all'infinito, il che potrebbe causare l'arresto permanente della pipeline.

RunInference fornisce un meccanismo di gestione degli errori integrato con la relativa funzione with_exception_handling. Quando applichi questa funzione, tutti gli errori vengono indirizzati a un errore separato PCollection insieme ai relativi messaggi di errore. In questo modo potrai rielaborarli. Se associ operazioni di preelaborazione o post-elaborazione al gestore del modello, RunInference le indirizza anche alla raccolta degli errori. Ad esempio, per raccogliere tutti gli errori di un gestore di modelli con operazioni di pre-elaborazione e post-elaborazione, utilizza la seguente logica:

main, other = pcoll | RunInference(model_handler.with_preprocess_fn(f1).with_postprocess_fn(f2)).with_exception_handling()

# handles failed preprocess operations, indexed in the order in which they were applied
other.failed_preprocessing[0] | beam.Map(logging.info)

# handles failed inferences
other.failed_inferences | beam.Map(logging.info)

# handles failed postprocess operations, indexed in the order in which they were applied
other.failed_postprocessing[0] | beam.Map(logging.info)

Timeout

Quando utilizzi la funzionalità with_exception_handling di RunInference, puoi anche impostare un timeout per ogni operazione, che viene conteggiato per batch. In questo modo puoi evitare che una singola inferenza bloccata renda l'intera pipeline non reattiva. Se si verifica un timeout, il record in timeout viene indirizzato all'errore PCollection, tutto lo stato del modello viene pulito e ricreato e l'esecuzione normale continua.

# Timeout execution after 60 seconds
main, other = pcoll | RunInference(model_handler).with_exception_handling(timeout=60)

A partire da Beam 2.68.0, puoi specificare un timeout anche utilizzando l'opzione della pipeline --element_processing_timeout_minutes. In questo caso, un timeout fa sì che un elemento di lavoro non riuscito venga ritentato finché non va a buon fine, anziché indirizzare l'inferenza non riuscita a una coda dead letter.

Utilizzo degli acceleratori

Quando utilizzi gli acceleratori, molti gestori di modelli hanno configurazioni specifiche per gli acceleratori che puoi attivare. Ad esempio, quando utilizzi una GPU e pipeline Hugging Face, ti consigliamo di impostare il parametro device su GPU:

mh = HuggingFacePipelineModelHandler('text-classification', device='GPU')

Ti consigliamo inoltre di iniziare con una singola istanza VM ed eseguire la pipeline localmente. Per farlo, segui la procedura descritta nella guida alla risoluzione dei problemi relativi alla GPU. Ciò può ridurre notevolmente il tempo necessario per eseguire una pipeline. Questo approccio può anche aiutarti a comprendere meglio il rendimento del tuo job.

Per ulteriori informazioni sull'utilizzo degli acceleratori in Dataflow, consulta la documentazione di Dataflow su GPU e TPU.

Gestione delle dipendenze

Le pipeline ML spesso includono dipendenze grandi e importanti, come PyTorch o TensorFlow. Per gestire queste dipendenze, ti consigliamo di utilizzare container personalizzati quando esegui il deployment del job in produzione. In questo modo, il job viene eseguito in un ambiente stabile in più esecuzioni e il debug viene semplificato.

Per saperne di più sulla gestione delle dipendenze, consulta la pagina sulla gestione delle dipendenze Python di Beam.

Passaggi successivi