Lo scaling dinamico dei thread fa parte della suite di funzionalità di scalabilità verticale di Dataflow. Integra la funzionalità di scalabilità automatica orizzontale di Dataflow regolando il numero di attività parallele, note anche come bundle, che ogni worker Dataflow esegue. L'obiettivo è aumentare l'efficienza complessiva della pipeline Dataflow.
Quando Dataflow esegue una pipeline, l'elaborazione viene distribuita su più macchine virtuali (VM) Compute Engine, note anche come worker. Un thread è una singola attività eseguibile in esecuzione all'interno di un processo più grande. Dataflow avvia diversi thread su ogni worker.
Con lo scaling dinamico dei thread abilitato, il servizio Dataflow sceglie automaticamente il numero appropriato di thread da eseguire su ogni worker Dataflow. Poiché ogni thread esegue un'attività, l'aumento del numero di thread consente di eseguire più attività in parallelo su un worker. Quando utilizzi questa funzionalità con la funzionalità di scalabilità automatica orizzontale, il numero totale di thread utilizzati dalla pipeline rimane lo stesso, ma vengono utilizzati meno worker.
Lo scaling dinamico dei thread utilizza un algoritmo per determinare il numero di thread necessari a ogni worker in base ai segnali di utilizzo delle risorse generati durante l'esecuzione della pipeline. Per saperne di più, consulta la sezione Come funziona in questa pagina.
Vantaggi
Lo scaling dinamico dei thread offre i seguenti potenziali vantaggi.
- Consente ai worker Dataflow di elaborare i dati in modo più efficiente migliorando l'utilizzo della CPU e della memoria per worker.
- Migliora l'elaborazione parallela regolando il numero di thread worker disponibili per eseguire le attività in parallelo durante l'esecuzione della pipeline.
- Riduce il numero di worker necessari per elaborare set di dati di grandi dimensioni, il che potrebbe ridurre i costi.
Supporto e limitazioni
- Lo scaling dinamico dei thread è disponibile per le pipeline che utilizzano gli SDK Java, Python e Go.
- Il job Dataflow deve utilizzare Runner v2.
- Sono supportate solo le pipeline batch.
- Le pipeline che utilizzano intensamente la CPU o la memoria potrebbero non trarre vantaggio dallo scaling dinamico dei thread.
- Lo scaling dinamico dei thread non riduce il tempo necessario per completare un job Dataflow.
- Lo scaling dinamico dei thread è destinato principalmente ai problemi di memoria relativi ai dati. Se la memoria è insufficiente a causa delle dimensioni di un modello ML, consulta Gestione della memoria.
- Per i casi d'uso con memoria elevata, potrebbe essere comunque necessario ottimizzare manualmente
num_worker_harness_threadso passare a un tipo di macchina con memoria elevata.
Come funziona
Lo scaling dinamico dei thread utilizza i principi di ottimizzazione automatica per scalare dinamicamente il numero di thread su o giù su ogni worker nel pool di worker Dataflow. Il numero di thread viene scalato in modo indipendente su ogni worker. Ogni thread esegue un'attività. L'aumento del numero di thread consente di eseguire più attività in parallelo su un worker. Man mano che le attività vengono completate e i thread non sono più necessari, il numero di thread viene ridotto. Un algoritmo determina il numero di thread necessari a ogni worker.
Il numero di thread su un worker viene aumentato fino a un massimo di due thread per vCPU quando vengono soddisfatte entrambe le seguenti condizioni:
- L'utilizzo della memoria sul worker è inferiore al 50%.
- L'utilizzo della CPU sul worker è inferiore al 65%.
Il numero di thread su un worker viene ridotto a un minimo di un thread per vCPU quando viene soddisfatta la seguente condizione:
- L'utilizzo della memoria sul worker è superiore al 70%.
Per visualizzare l'utilizzo della memoria e della CPU per il job, utilizza la scheda **Metriche job** dell'interfaccia web di Dataflow.
Per garantire la validità dei suggerimenti, Dataflow attende che l'utilizzo delle risorse si stabilizzi prima di inviare i suggerimenti ai worker. Ad esempio, l'utilizzo della memoria e della CPU potrebbe rientrare nell'intervallo per lo scaling, ma poiché l'utilizzo delle risorse è ancora in crescita, Dataflow non invia un suggerimento. Una volta stabilizzato l'utilizzo delle risorse, Dataflow invia un suggerimento.
Se si verifica un errore di memoria insufficiente, lo scaling dei thread viene disattivato automaticamente e la pipeline viene eseguita con un thread per vCPU.
Attivare lo scaling dinamico dei thread
Per attivare lo scaling dinamico dei thread, utilizza la seguente opzione del servizio Dataflow.
Java
--dataflowServiceOptions=enable_dynamic_thread_scaling
Python
--dataflow_service_options=enable_dynamic_thread_scaling
Vai
--dataflow_service_options=enable_dynamic_thread_scaling
Quando lo scaling dinamico dei thread è attivato, puoi anche impostare il numero iniziale e massimo di worker disponibili per la pipeline durante l'esecuzione. Per saperne di più, consulta Opzioni pipeline.
Verificare che lo scaling dinamico dei thread sia attivato
Quando lo scaling dinamico dei thread è attivato, nei file di log dei worker viene visualizzato il seguente messaggio:
Enabling thread vertical scaling feature in worker.
Per visualizzare i file di log dei worker, in Esplora log, utilizza il riquadro Query per filtrare i log in base al Nome log. Utilizza il seguente nome log nel filtro:
projects/PROJECT_ID/logs/dataflow.googleapis.com%2Fharness
Puoi visualizzare il numero di thread consigliato nei file di log dei worker. Il seguente messaggio include il numero di thread consigliato:
worker_thread_scaling_report_response { recommended_thread_count: NUMBER }
Se l'utilizzo delle risorse non rientra nell'intervallo per lo scaling, il valore visualizzato è uguale al numero di vCPU sul worker.
Puoi anche utilizzare la Google Cloud consoleper verificare se lo scaling dinamico dei thread è
attivato. Quando è attivato, nel riquadro Informazioni job di Dataflow, nella riga dataflowServiceOptions della sezione Opzioni pipeline, viene visualizzato enable_dynamic_thread_scaling.
Risoluzione dei problemi
Questa sezione fornisce istruzioni per la risoluzione dei problemi comuni relativi allo scaling dinamico dei thread.
Il rendimento peggiora con lo scaling dinamico dei thread attivato
L'aumento del numero di thread potrebbe causare problemi di prestazioni nei seguenti casi:
- Quando più processi tentano di utilizzare la stessa risorsa, un processo è in grado di utilizzare la risorsa mentre gli altri devono attendere. Questa situazione è nota come contesa delle risorse. Quando si verifica una contesa delle risorse, il rendimento della pipeline potrebbe diminuire.
- Quando si verificano errori di memoria insufficiente, lo scaling dinamico dei thread viene disattivato. In alcuni casi, gli errori di memoria insufficiente potrebbero causare il fallimento della pipeline.
Verifica se il numero di thread è aumentato. Per informazioni su come verificare il numero di thread consigliato, consulta Verifica che lo scaling dei thread sia attivato in questa pagina.
Se lo scaling dei thread è attivato, per risolvere il problema, quando esegui la pipeline, non includere l'opzione del servizio di scaling dinamico dei thread.
Worker unificato… sia attivato che disattivato
Dopo aver attivato lo scaling dinamico dei thread, il job potrebbe non riuscire con il seguente errore:
The workflow could not be created. Causes: (ID): Unified worker misconfigured by user and was both enabled and disabled.
Questo errore si verifica quando Runner v2 è disattivato in modo esplicito.
Per risolvere il problema, attiva Runner v2. Per saperne di più, consulta la sezione Attivare Dataflow Runner v2 nella pagina "Utilizzare Dataflow Runner V2".
Eseguire l'upgrade dell'SDK
Dopo aver attivato lo scaling dinamico dei thread, il job potrebbe non riuscire con il seguente errore:
Java
Dataflow Runner v2 requires the Apache Beam Java SDK version 2.29.0 or higher. Please upgrade your SDK and resubmit your job.
Python
Dataflow Runner v2 requires the Apache Beam SDK, version 2.21.0 or higher. Please upgrade your SDK and resubmit your job.
Questo errore si verifica quando non è possibile attivare Runner v2 perché la versione dell'SDK non lo supporta.
Per risolvere il problema, utilizza una versione dell'SDK che supporta Runner v2.
Impossibile attivare la funzionalità di scaling verticale dei thread
Dopo aver attivato lo scaling dinamico dei thread, il job potrebbe non riuscire con il seguente errore:
The workflow could not be created. Causes: (ID): Thread vertical scaling feature can not be enabled while number_of_worker_harness_threads is specified.
Questo errore si verifica quando la pipeline imposta in modo esplicito il numero di thread per
worker utilizzando l'opzione della pipeline numberOfWorkerHarnessThreads o
number_of_worker_harness_threads
.
Per risolvere il problema, rimuovi l'opzione della pipeline numberOfWorkerHarnessThreads o number_of_worker_harness_threads dalla pipeline.