Nelle pipeline di streaming con un volume elevato di dati di input, in genere esiste un compromesso tra costo e latenza. Per mantenere una bassa latenza, Dataflow deve aggiungere worker man mano che il volume di traffico aumenta. Un altro fattore è la velocità con cui la pipeline deve eseguire lo scale up o lo scale down in risposta alle modifiche della velocità dei dati di input.
Il gestore della scalabilità automatica di Dataflow ha impostazioni predefinite adatte a molti carichi di lavoro. Tuttavia, potresti voler ottimizzare questo comportamento per il tuo scenario specifico. Ad esempio, una latenza media più elevata potrebbe essere accettabile per ridurre i costi oppure potresti voler che Dataflow esegua lo scale up più rapidamente in risposta ai picchi di traffico.
Per ottimizzare la scalabilità automatica orizzontale, puoi modificare i seguenti parametri:
- Intervallo di scalabilità automatica: il numero minimo e massimo di worker da allocare.
- Suggerimento per l'utilizzo dei worker: l'utilizzo CPU target per i worker.
- Suggerimento per il parallelismo dei worker: il numero target di parallelismo per i worker.
Impostare l'intervallo di scalabilità automatica
Quando crei un nuovo job di streaming, puoi impostare il numero iniziale di worker e il numero massimo di worker. Per farlo, specifica le seguenti opzioni della pipeline:
Java
--numWorkers: il numero iniziale di worker disponibili all'avvio della pipeline--maxNumWorkers: il numero massimo di worker disponibili per la pipeline
Python
--num_workers: il numero iniziale di worker disponibili all'avvio della pipeline--max_num_workers: il numero massimo di worker disponibili per la pipeline
Vai
--num_workers: il numero iniziale di worker disponibili all'avvio della pipeline--max_num_workers: il numero massimo di worker disponibili per la pipeline
Per i job di streaming che utilizzano Streaming Engine, il flag --maxNumWorkers è facoltativo. Il valore predefinito è 100. Per i job di streaming che non utilizzano Streaming Engine, --maxNumWorkers è obbligatorio quando la scalabilità automatica orizzontale è abilitata.
Il valore iniziale di --maxNumWorkers determina anche il numero di
Persistent Disk allocati per il job.
Le pipeline vengono eseguite il deployment con un pool fisso di Persistent Disk, pari al numero di --maxNumWorkers. Durante lo streaming, i Persistent Disk vengono ridistribuiti in modo che ogni worker riceva un numero uguale di dischi collegati.
Se imposti --maxNumWorkers, assicurati che il valore fornisca dischi sufficienti per la pipeline. Quando imposti il valore iniziale, tieni conto della crescita futura. Per informazioni
sulle prestazioni di Persistent Disk, vedi
Configurare Persistent Disk e le VM.
Dataflow fattura l'utilizzo di Persistent Disk e ha
quote di Compute Engine, incluse le quote di Persistent Disk.
Per impostazione predefinita, il numero minimo di worker è 1 per i job di streaming che utilizzano Streaming Engine e (maxNumWorkers/15), arrotondato per eccesso, per i job che non utilizzano Streaming Engine.
Aggiornare l'intervallo di scalabilità automatica
Per i job che utilizzano Streaming Engine, puoi modificare il numero minimo e massimo di worker senza arrestare o sostituire il job. Per modificare questi valori, utilizza un aggiornamento del job in corso. Aggiorna le seguenti opzioni del job:
--min-num-workers: il numero minimo di worker.--max-num-workers: il numero massimo di worker.
gcloud
Utilizza il comando gcloud dataflow jobs update-options:
gcloud dataflow jobs update-options \ --region=REGION \ --min-num-workers=MINIMUM_WORKERS \ --max-num-workers=MAXIMUM_WORKERS \ JOB_ID
Sostituisci quanto segue:
- REGION: l'ID regione dell'endpoint regionale del job
- MINIMUM_WORKERS: il numero minimo di istanze di Compute Engine
- MAXIMUM_WORKERS: il numero massimo di istanze di Compute Engine
- JOB_ID: l'ID del job da aggiornare
Puoi anche aggiornare --min-num-workers e --max-num-workers singolarmente.
REST
Utilizza il
projects.locations.jobs.update
metodo:
PUT https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID?updateMask=runtime_updatable_params.max_num_workers,runtime_updatable_params.min_num_workers { "runtime_updatable_params": { "min_num_workers": MINIMUM_WORKERS, "max_num_workers": MAXIMUM_WORKERS } }
Sostituisci quanto segue:
- PROJECT_ID: l' Google Cloud ID progetto del job Dataflow
- REGION: l'ID regione dell'endpoint regionale del job
- JOB_ID: l'ID del job da aggiornare
- MINIMUM_WORKERS: il numero minimo di istanze di Compute Engine
- MAXIMUM_WORKERS: il numero massimo di istanze di Compute Engine
Puoi anche aggiornare min_num_workers e max_num_workers singolarmente.
Specifica i parametri da aggiornare nel parametro di query updateMask e includi i valori aggiornati nel campo runtimeUpdatableParams del corpo della richiesta. L'esempio seguente aggiorna min_num_workers:
PUT https://dataflow.googleapis.com/v1b3/projects/my_project/locations/us-central1/jobs/job1?updateMask=runtime_updatable_params.min_num_workers { "runtime_updatable_params": { "min_num_workers": 5 } }
Per i job che non utilizzano Streaming Engine, puoi
sostituire il job esistente
con un valore aggiornato di maxNumWorkers.
Se aggiorni un job di streaming che non utilizza Streaming Engine, per impostazione predefinita la scalabilità automatica orizzontale è disabilitata per il job aggiornato. Per mantenere la scalabilità automatica abilitata, specifica --autoscalingAlgorithm e --maxNumWorkers per il job aggiornato.
Impostare il suggerimento per l'utilizzo dei worker
Dataflow utilizza l'utilizzo medio della CPU come indicatore per l'applicazione della scalabilità automatica orizzontale. Per impostazione predefinita, Dataflow imposta un utilizzo CPU target di 0,8. Quando l'utilizzo non rientra in questo intervallo, Dataflow potrebbe aggiungere o rimuovere worker.
Per un maggiore controllo sul comportamento della scalabilità automatica, puoi impostare l'utilizzo CPU target su un valore compreso nell'intervallo [0,1, 0,9].
Imposta un valore di utilizzo CPU inferiore se vuoi ottenere latenze di picco inferiori. Un valore inferiore consente a Dataflow di eseguire lo scale out in modo più aggressivo in risposta all'aumento dell'utilizzo dei worker e di eseguire lo scale down in modo più conservativo per migliorare la stabilità. Un valore inferiore fornisce anche più margine quando la pipeline è in stato stazionario, con conseguente latenza di coda generalmente inferiore. (La latenza di coda misura i tempi di attesa più lunghi prima che venga elaborato un nuovo record.)
Imposta un valore più alto se vuoi risparmiare risorse e ridurre i costi durante i picchi di traffico. Un valore più alto impedisce un eccessivo scale up, a scapito di una latenza più elevata.
Per configurare il suggerimento per l'utilizzo quando esegui un job non basato su modello, imposta l'
worker_utilization_hint
opzione di servizio. Per un job basato su modello,
aggiorna invece il suggerimento per l'utilizzo, poiché le opzioni di servizio non sono
supportate.
L'esempio seguente mostra come utilizzare worker_utilization_hint:
Java
--dataflowServiceOptions=worker_utilization_hint=TARGET_UTILIZATION
Sostituisci TARGET_UTILIZATION con un valore compreso nell'intervallo [0,1, 0,9].
Python
--dataflow_service_options=worker_utilization_hint=TARGET_UTILIZATION
Sostituisci TARGET_UTILIZATION con un valore compreso nell'intervallo [0,1, 0,9].
Vai
--dataflow_service_options=worker_utilization_hint=TARGET_UTILIZATION
Sostituisci TARGET_UTILIZATION con un valore compreso nell'intervallo [0,1, 0,9].
Per le nuove pipeline, ti consigliamo di eseguire test con carichi realistici utilizzando l'impostazione predefinita. Valuta quindi il comportamento della scalabilità automatica in base alla tua pipeline e apporta le modifiche necessarie.
Il suggerimento per l'utilizzo è solo uno dei fattori utilizzati da Dataflow per decidere se scalare i worker. Altri fattori, come il backlog e le chiavi disponibili, possono sostituire il valore del suggerimento. Inoltre, il suggerimento non è un target rigoroso. Il gestore della scalabilità automatica tenta di mantenere l'utilizzo della CPU nell'intervallo del valore del suggerimento, ma la metrica di utilizzo aggregata potrebbe essere superiore o inferiore. Per saperne di più, consulta Euristiche di scalabilità automatica dello streaming.
Aggiornare il suggerimento per l'utilizzo
Per aggiornare il suggerimento per l'utilizzo durante l'esecuzione di un job, esegui un aggiornamento in corso come segue:
gcloud
Utilizza il
gcloud dataflow jobs update-options
comando:
gcloud dataflow jobs update-options \ --region=REGION \ --worker-utilization-hint=TARGET_UTILIZATION \ JOB_ID
Sostituisci quanto segue:
- REGION: l'ID regione dell'endpoint regionale del job
- JOB_ID: l'ID del job da aggiornare
- TARGET_UTILIZATION: un valore compreso nell'intervallo [0,1, 0,9]
Per reimpostare il suggerimento per l'utilizzo sul valore predefinito, utilizza il seguente comando gcloud:
gcloud dataflow jobs update-options \ --unset-worker-utilization-hint \ --region=REGION \ --project=PROJECT_ID \ JOB_ID
REST
Utilizza il
projects.locations.jobs.update
metodo:
PUT https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID?updateMask=runtime_updatable_params.worker_utilization_hint { "runtime_updatable_params": { "worker_utilization_hint": TARGET_UTILIZATION } }
Sostituisci quanto segue:
- PROJECT_ID: l' Google Cloud ID progetto del job Dataflow.
- REGION: l'ID regione dell'endpoint regionale del job.
- JOB_ID: l'ID del job da aggiornare.
- TARGET_UTILIZATION: un valore compreso nell'intervallo [0,1, 0,9]
Impostare il suggerimento per il parallelismo dei worker
Per gestire la scalabilità automatica con operazioni lunghe meno dipendenti dalle CPU, come i carichi di lavoro a elevata intensità di ML, puoi impostare il suggerimento per il parallelismo dei worker utilizzando i suggerimenti per le risorse di Apache Beam. Questi suggerimenti cambiano la scalabilità automatica in una modalità diversa, ottimizzata per i carichi di lavoro a elevata intensità di GPU o per le trasformazioni con tempi di elaborazione lunghi.
L'esempio seguente mostra come collegare il suggerimento per il parallelismo a una trasformazione:
Java
pcoll.apply(MyCompositeTransform.of(...)
.setResourceHints(
ResourceHints.create()
.withMaxActiveBundlesPerWorker(TARGET_PARALLELISM_PER_WORKER)))
Sostituisci TARGET_PARALLELISM_PER_WORKER con un valore appropriato per il tuo caso d'uso. Per indicazioni generali, consulta la sezione su come scegliere un buon valore iniziale.
Python
pcoll | MyPTransform().with_resource_hints(
max_active_bundles_per_worker="TARGET_PARALLELISM_PER_WORKER")
Sostituisci TARGET_PARALLELISM_PER_WORKER con un valore appropriato per il tuo caso d'uso. Per indicazioni generali, consulta la sezione su come scegliere un buon valore iniziale.
Scegliere il valore del suggerimento per il parallelismo dei worker
Per i casi d'uso di ML, un buon valore iniziale è equivalente al numero di modelli in esecuzione in parallelo all'interno di ogni worker. Questo valore è limitato dalla capacità degli acceleratori sul worker e dalle dimensioni del modello.
Per gli altri casi d'uso, la pipeline è vincolata alla memoria o alla CPU. Per le pipeline vincolate alla memoria, utilizza il limite di memoria per calcolare l'elaborazione parallela massima. Per le pipeline vincolate alla CPU, ti consigliamo di mantenere il criterio di scalabilità automatica predefinito anziché fornire un suggerimento per il parallelismo.
È possibile ottimizzare il valore in base alle esigenze di elaborazione di altre fasi, ad esempio la scrittura in un sink. L'aumento del valore di 1 o 2 quando il parallelismo del modello è 2 consente di riconoscere il tempo di elaborazione più rapido della scrittura nel sink, fornendo più margine per tenere conto dell'elaborazione eseguita in altre fasi. Se la pipeline non prevede la ridistribuzione e le trasformazioni vengono unite in un'unica fase, non è necessario modificare il valore per altre trasformazioni.
Questo valore può anche essere modificato per simulare gli effetti di ritardi di backlog accettabili. Ad esempio, se accetti un ritardo massimo di 10 minuti e il tempo di elaborazione medio del modello è di 1 minuto, puoi scegliere di aumentare il valore di 1, supponendo che il numero massimo di worker sia impostato su 10.
Euristiche di scalabilità automatica a elevata intensità di GPU
Nell'impostazione a elevata intensità di GPU indicata tramite l'impostazione del suggerimento per il parallelismo, Dataflow tiene conto di diversi fattori durante la scalabilità automatica. Questi fattori includono:
- Chiavi disponibili. Le chiavi sono l'unità fondamentale di parallelismo in Dataflow.
- Numero massimo di bundle attivi per worker. Questo suggerisce il numero ideale massimo di parallelismo di elaborazione all'interno del worker.
L'idea generale alla base delle decisioni di scalabilità è calcolare i worker necessari per gestire il carico attuale, come indicato da chiavi disponibili. Ad esempio, se sono disponibili 100 chiavi da elaborare e il parallelismo massimo per worker è 10, dovresti avere un totale di 10 worker.
Se la pipeline è complessa e presenta sia un carico di lavoro a elevata intensità di GPU sia numerose trasformazioni a elevata intensità di CPU, ti consigliamo di abilitare l'adattamento alle risorse. In questo modo, il servizio può distinguere chiaramente tra il lavoro a elevata intensità di CPU e quello a elevata intensità di GPU e scalare di conseguenza ogni worker pool.
Euristiche di scalabilità automatica dello streaming
Per le pipeline di streaming, l'obiettivo della scalabilità automatica orizzontale è ridurre al minimo il backlog, massimizzando al contempo l'utilizzo e la velocità effettiva dei worker e reagendo rapidamente ai picchi di carico.
Dataflow tiene conto di diversi fattori durante la scalabilità automatica, tra cui:
Backlog. Il tempo di backlog stimato viene calcolato in base al throughput e ai byte di backlog ancora da elaborare dalla sorgente di ingresso. Una pipeline viene considerata in backlog quando il tempo di backlog stimato rimane superiore a 15 secondi.
Utilizzo CPU target. Il target predefinito per l'utilizzo medio della CPU è 0,8. Puoi sostituire questo valore.
Chiavi disponibili. Le chiavi sono l'unità fondamentale di parallelismo in Dataflow.
In alcuni casi, Dataflow utilizza i seguenti fattori nelle decisioni di scalabilità automatica. Se questi fattori vengono utilizzati per il tuo job, puoi visualizzare le informazioni nella scheda delle metriche **Scalabilità automatica**.
La limitazione basata sulle chiavi utilizza il numero di chiavi di elaborazione ricevute dal job per calcolare il limite per i worker dell'utente, perché ogni chiave può essere elaborata da un solo worker alla volta.
Smorzamento dello scale down. Se Dataflow rileva decisioni di scalabilità automatica instabili, rallenta la velocità di scale down per migliorare la stabilità.
Lo scale up basato sulla CPU utilizza l'utilizzo elevato della CPU come criterio di scale up.
Per i job di streaming che non utilizzano Streaming Engine, la scalabilità potrebbe essere limitata dal numero di Persistent Disk. Per saperne di più, consulta Impostare l'intervallo di scalabilità automatica.
Scalabilità automatica a elevata intensità di GPU, se abilitata impostando il suggerimento per il parallelismo dei worker. Per saperne di più, consulta Euristiche di scalabilità automatica a elevata intensità di GPU.
Scale up. Se una pipeline di streaming rimane in backlog con un parallelismo sufficiente sui worker per diversi minuti, Dataflow esegue lo scale up. Dataflow tenta di cancellare il backlog entro circa 150 secondi dallo scale up, in base alla velocità effettiva corrente per worker. Se è presente un backlog, ma il worker non ha un parallelismo sufficiente per worker aggiuntivi, la pipeline non esegue lo scale up. (La scalabilità del numero di worker oltre il numero di chiavi disponibili per l'elaborazione parallela non consente di elaborare il backlog più rapidamente.)
Scale down Quando il gestore della scalabilità automatica prende una decisione di scale down, il backlog è il fattore di massima priorità. Il gestore della scalabilità automatica punta a un backlog non superiore a 15 secondi. Se il backlog scende sotto i 10 secondi e l'utilizzo medio dei worker è inferiore al target di utilizzo della CPU, Dataflow esegue lo scale down. Finché il backlog è accettabile, il gestore della scalabilità automatica tenta di mantenere l'utilizzo della CPU vicino al target di utilizzo della CPU. Tuttavia, se l'utilizzo è già sufficientemente vicino al target, il gestore della scalabilità automatica potrebbe mantenere invariato il numero di worker, perché ogni passaggio di scale down ha un costo.
Streaming Engine utilizza anche una tecnica di scalabilità automatica predittiva basata sul backlog dei timer. I dati illimitati in una pipeline di streaming vengono suddivisi in finestre raggruppate in base ai timestamp. Alla fine di una finestra, i timer vengono attivati per ogni chiave elaborata in quella finestra. L'attivazione di un timer indica che la finestra è scaduta per una determinata chiave. Streaming Engine può misurare il backlog dei timer e prevedere quanti timer verranno attivati alla fine di una finestra. Utilizzando il backlog dei timer come segnale, Dataflow può stimare la quantità di elaborazione che deve avvenire quando i timer futuri si attivano. In base al carico futuro stimato, Dataflow esegue la scalabilità automatica in anticipo per soddisfare la domanda prevista.
Metriche
Per trovare i limiti di scalabilità automatica correnti per un job, esegui una query sulle seguenti metriche:
job/max_worker_instances_limit: numero massimo di worker.job/min_worker_instances_limit: numero minimo di worker.
Per ottenere informazioni sull'utilizzo dei worker, esegui una query sulle seguenti metriche:
job/aggregated_worker_utilization: l'utilizzo aggregato dei worker.job/worker_utilization_hint: il suggerimento per l'utilizzo dei worker corrente.
Per ottenere informazioni sul comportamento del gestore della scalabilità automatica, esegui una query sulla seguente metrica:
job.worker_utilization_hint_is_actively_used: indica se il gestore della scalabilità automatica utilizza attivamente il suggerimento per l'utilizzo dei worker. Se altri fattori sostituiscono il suggerimento quando viene eseguito il campionamento di questa metrica, il valore èfalse.job/horizontal_worker_scaling: descrive le decisioni prese dal gestore della scalabilità automatica. Questa metrica contiene le seguenti etichette:direction: specifica se il gestore della scalabilità automatica ha eseguito lo scale up, lo scale down o non ha intrapreso alcuna azione.rationale: specifica la motivazione della decisione del gestore della scalabilità automatica.
Per saperne di più, consulta le metriche di Cloud Monitoring. Queste metriche vengono visualizzate anche nei grafici di monitoraggio della scalabilità automatica.
Passaggi successivi
- Monitorare la scalabilità automatica di Dataflow
- Risolvere i problemi relativi alla scalabilità automatica di Dataflow