Questa pagina fornisce indicazioni e consigli per l'upgrade delle pipeline di streaming. Ad esempio, potresti dover eseguire l'upgrade a una versione più recente dell'SDK Apache Beam o aggiornare il codice della pipeline. Vengono fornite diverse opzioni per adattarsi a scenari diversi.
Mentre le pipeline batch si arrestano al termine del job, le pipeline di streaming spesso vengono eseguite ininterrottamente per fornire un'elaborazione continua. Pertanto, quando esegui l'upgrade delle pipeline di streaming, devi tenere conto dei seguenti aspetti:
- Potresti dover ridurre al minimo o evitare interruzioni della pipeline. In alcuni casi, potresti tollerare un'interruzione temporanea dell'elaborazione durante l'implementazione di una nuova versione di una pipeline. In altri casi, la tua applicazione potrebbe non essere in grado di tollerare alcuna interruzione.
- I processi di aggiornamento della pipeline devono gestire le modifiche dello schema in modo da ridurre al minimo le interruzioni dell'elaborazione dei messaggi e di altri sistemi collegati. Ad esempio, se lo schema dei messaggi in una pipeline di elaborazione degli eventi cambia, potrebbero essere necessarie modifiche dello schema anche nei sink di dati downstream.
Puoi utilizzare uno dei seguenti metodi per aggiornare le pipeline di streaming, a seconda dei requisiti di aggiornamento e della pipeline:
Per ulteriori informazioni sui problemi che potresti riscontrare durante un aggiornamento e su come prevenirli, consulta Convalida di un job di sostituzione e Controllo di compatibilità del job.
Best practice
- Esegui l'upgrade della versione dell'SDK Apache Beam separatamente da qualsiasi modifica del codice della pipeline.
- Testa la pipeline dopo ogni modifica prima di apportare ulteriori aggiornamenti.
- Esegui regolarmente l'upgrade della versione dell'SDK Apache Beam utilizzata dalla pipeline.
- Utilizza metodi automatizzati ove possibile, ad esempio aggiornamenti in volo o aggiornamenti automatici della pipeline parallela.
- Utilizza Managed I/O, se possibile, per usufruire dei vantaggi degli upgrade automatici delle versioni del connettore.
Eseguire aggiornamenti in volo
Puoi aggiornare alcune pipeline di streaming in corso senza interrompere il job. Questo scenario è chiamato aggiornamento del job in corso. Gli aggiornamenti dei job in volo sono disponibili solo in circostanze limitate:
- Il job deve utilizzare Streaming Engine.
- Il job deve essere in esecuzione.
- Stai modificando solo il numero di worker utilizzati dal job.
Per saperne di più, consulta Imposta l'intervallo di scalabilità automatica nella pagina Scalabilità automatica orizzontale.
Per istruzioni su come eseguire un aggiornamento del job in volo, consulta Aggiornare una pipeline esistente.
Creazione o aggiornamento automatico (upsert) dei modelli
Quando avvii pipeline utilizzando un modello (modelli classici, modelli flessibili, Terraform o Config Connector), puoi utilizzare l'esperimento create_or_update_job per utilizzare la funzionalità di creazione o aggiornamento (upsert).
Quando specifichi create_or_update_job nel parametro additional_experiments
o nel flag additional-experiments:
- Se esiste già un job in esecuzione o in fase di svuotamento con il nome specificato, il servizio modelli avvia automaticamente il nuovo job come aggiornamento di quello esistente.
- Se non esiste alcun job attivo con quel nome, il servizio di modelli avvia il nuovo job come nuova creazione di job.
Questo esperimento elimina la necessità di determinare in modo programmatico se utilizzare l'azione API di creazione o aggiornamento durante l'avvio di un modello.
Per esempi di codice Terraform e Config Connector che utilizzano questo esperimento, consulta le seguenti sezioni:
- Inviare una richiesta di aggiornamento automatico di interruzione e sostituzione
- Inviare una richiesta di aggiornamento della pipeline parallela automatizzata
Avviare un job di sostituzione
Se il job aggiornato è compatibile con quello esistente, puoi aggiornare la pipeline utilizzando l'opzione update. Quando sostituisci un job esistente, un nuovo job esegue il codice della pipeline aggiornato.
Il servizio Dataflow conserva il nome del job, ma esegue il job di sostituzione con un ID job aggiornato. Questo processo potrebbe causare tempi di inattività
mentre il job esistente si arresta, viene eseguito il controllo di compatibilità e viene avviato il nuovo job. Per maggiori dettagli, vedi
Effetti della sostituzione di un job.
Dataflow esegue un controllo di compatibilità per assicurarsi che il codice della pipeline aggiornato possa essere implementato in sicurezza nella pipeline in esecuzione. Alcune modifiche al codice causano l'esito negativo del controllo di compatibilità, ad esempio quando gli input laterali vengono aggiunti o rimossi da un passaggio esistente. Se il controllo di compatibilità non va a buon fine, non puoi eseguire un aggiornamento in loco del job.
Per istruzioni su come avviare un job di sostituzione, consulta Avviare un job di sostituzione.
Se l'aggiornamento della pipeline non è compatibile con il job corrente, devi interrompere e sostituire la pipeline. Se la pipeline non può tollerare tempi di inattività, esegui pipeline parallele.
Interruzione e sostituzione manuali
Per eseguire un arresto e una sostituzione manuali, annulla o svuota la pipeline, quindi sostituiscila con la pipeline aggiornata. L'annullamento di una pipeline fa sì che Dataflow interrompa immediatamente l'elaborazione e chiuda le risorse il più rapidamente possibile, il che può causare la perdita di alcuni dati in fase di elaborazione, noti come dati in transito. Per evitare la perdita di dati, nella maggior parte dei casi lo svuotamento è l'azione preferita. Puoi anche utilizzare gli snapshot di Dataflow per salvare lo stato di una pipeline di streaming, il che ti consente di avviare una nuova versione del job Dataflow senza perdere lo stato. Per ulteriori informazioni, consulta Utilizzo degli snapshot di Dataflow.
Lo svuotamento di una pipeline chiude immediatamente tutte le finestre in corso e attiva tutti i trigger. Sebbene i dati in volo non vengano persi, lo svuotamento potrebbe causare la presenza di dati incompleti nelle finestre. In questo caso, le finestre in corso emettono risultati parziali o incompleti. Per saperne di più, consulta Effetti dello svuotamento di un job. Al termine del job esistente, avvia un nuovo job in modalità flusso che contenga il codice della pipeline aggiornato, che ti consente di riprendere l'elaborazione.
Con questo metodo, si verifica un tempo di inattività tra l'interruzione del job di streaming esistente e il momento in cui la pipeline di sostituzione è pronta a riprendere l'elaborazione dei dati. Tuttavia, l'annullamento o lo svuotamento di una pipeline esistente e l'avvio di un nuovo job con la pipeline aggiornata sono meno complicati dell'esecuzione di pipeline parallele.
Per ulteriori informazioni, consulta Arrestare un job Dataflow. Dopo aver arrestato il job corrente, avviane uno nuovo con lo stesso nome.
Interruzione e sostituzione automatica
Dataflow fornisce il supporto API per l'avvio di un aggiornamento stop-and-replace automatizzato. Questo flusso di lavoro in stile dichiarativo elimina i passaggi procedurali manuali. Dichiari il job da sostituire e il nuovo job viene avviato e coordina automaticamente la transizione.
Quando utilizzi questo flusso di lavoro, viene eseguito il provisioning delle nuove risorse del job mentre il vecchio job è ancora in esecuzione. Il vecchio job riceve quindi automaticamente un segnale di svuotamento. Al termine del drenaggio del vecchio job o al raggiungimento di un timeout specificato dall'utente, il nuovo job inizia immediatamente a elaborare i dati. Utilizza questo flusso di lavoro per le pipeline che non possono tollerare dati duplicati o aggregazioni parziali, ma possono accettare una breve pausa di elaborazione mentre il vecchio job viene svuotato.
Inviare una richiesta di aggiornamento automatico di interruzione e sostituzione
Per utilizzare questo flusso di lavoro:
- Devi impostare l'opzione
parallel_replace_job_max_stop_duration. - Non devi impostare l'opzione
parallel_replace_job_min_parallel_pipelines_duration. L'impostazione di una durata parallela attiva il flusso di lavoro aggiornamenti automatici della pipeline parallela.
Avvia una richiesta di aggiornamento automatico con interruzione e sostituzione utilizzando le seguenti opzioni di servizio:
Java
Opzione 1: aggiornamento utilizzando lo stesso nome del job
--update \
--dataflowServiceOptions="update_strategy_parallel_job_update" \
--dataflowServiceOptions="parallel_replace_job_max_stop_duration=DURATION"
- Per eseguire un aggiornamento automatico di interruzione e sostituzione utilizzando lo stesso nome, utilizza
il flag
--updatee l'opzioneupdate_strategy_parallel_job_update. - Per eseguire un aggiornamento in loco, utilizza
update_strategy_in_place_update.
Opzione 2: aggiorna utilizzando un nome del lavoro diverso
--dataflowServiceOptions="parallel_replace_job_name=OLD_JOB_NAME" \
--dataflowServiceOptions="parallel_replace_job_max_stop_duration=DURATION"
- Per specificare il vecchio job per ID anziché per nome, utilizza
--dataflowServiceOptions="parallel_replace_job_id=OLD_JOB_ID". - Se specifichi un nuovo nome del job e utilizzi il flag
--update, Dataflow cerca un job esistente con il nuovo nome, il che causa un errore.
(Facoltativo) Disattivare l'annullamento automatico
L'annullamento automatico è attivato per impostazione predefinita quando specifichi l'opzione
parallel_replace_job_max_stop_duration. Per disattivare l'annullamento automatico,
imposta l'opzione parallel_replace_job_cancel_on_drain_timeout su false.
--dataflowServiceOptions="parallel_replace_job_cancel_on_drain_timeout=false"
Se disattivi l'annullamento automatico e il vecchio job rimane bloccato nello stato di svuotamento, sia il vecchio che il nuovo job continuano a essere eseguiti in parallelo.
Python
Opzione 1: aggiornamento utilizzando lo stesso nome del job
--update \
--dataflow_service_options="update_strategy_parallel_job_update" \
--dataflow_service_options="parallel_replace_job_max_stop_duration=DURATION"
- Per eseguire un aggiornamento automatico di interruzione e sostituzione utilizzando lo stesso nome, utilizza
il flag
--updatee l'opzioneupdate_strategy_parallel_job_update. - Per eseguire un aggiornamento in loco, utilizza
update_strategy_in_place_update.
Opzione 2: aggiorna utilizzando un nome del lavoro diverso
--dataflow_service_options="parallel_replace_job_name=OLD_JOB_NAME" \
--dataflow_service_options="parallel_replace_job_max_stop_duration=DURATION"
- Per specificare il vecchio job per ID anziché per nome, utilizza
--dataflow_service_options="parallel_replace_job_id=OLD_JOB_ID". - Se specifichi un nuovo nome del job e utilizzi il flag
--update, Dataflow cerca un job esistente con il nuovo nome, il che causa un errore.
(Facoltativo) Disattivare l'annullamento automatico
L'annullamento automatico è attivato per impostazione predefinita quando specifichi l'opzione
parallel_replace_job_max_stop_duration. Per disattivare l'annullamento automatico,
imposta l'opzione parallel_replace_job_cancel_on_drain_timeout su false.
--dataflow_service_options="parallel_replace_job_cancel_on_drain_timeout=false"
Se disattivi l'annullamento automatico e il vecchio job rimane bloccato nello stato di svuotamento, sia il vecchio che il nuovo job continuano a essere eseguiti in parallelo.
Go
Opzione 1: aggiornamento utilizzando lo stesso nome del job
--update \
--dataflow_service_options="update_strategy_parallel_job_update" \
--dataflow_service_options="parallel_replace_job_max_stop_duration=DURATION"
- Per eseguire un aggiornamento automatico di interruzione e sostituzione utilizzando lo stesso nome, utilizza
il flag
--updatee l'opzioneupdate_strategy_parallel_job_update. - Per eseguire un aggiornamento in loco, utilizza
update_strategy_in_place_update.
Opzione 2: aggiorna utilizzando un nome del lavoro diverso
--dataflow_service_options="parallel_replace_job_name=OLD_JOB_NAME" \
--dataflow_service_options="parallel_replace_job_max_stop_duration=DURATION"
- Per specificare il vecchio job per ID anziché per nome, utilizza
--dataflow_service_options="parallel_replace_job_id=OLD_JOB_ID". - Se specifichi un nuovo nome del job e utilizzi il flag
--update, Dataflow cerca un job esistente con il nuovo nome, il che causa un errore.
(Facoltativo) Disattivare l'annullamento automatico
L'annullamento automatico è attivato per impostazione predefinita quando specifichi l'opzione
parallel_replace_job_max_stop_duration. Per disattivare l'annullamento automatico,
imposta l'opzione parallel_replace_job_cancel_on_drain_timeout su false.
--dataflow_service_options="parallel_replace_job_cancel_on_drain_timeout=false"
Se disattivi l'annullamento automatico e il vecchio job rimane bloccato nello stato di svuotamento, sia il vecchio che il nuovo job continuano a essere eseguiti in parallelo.
gcloud
Opzione 1: aggiornamento utilizzando lo stesso nome del job
--update \
--additional-experiments="update_strategy_parallel_job_update" \
--additional-experiments="parallel_replace_job_max_stop_duration=DURATION"
- Per eseguire un aggiornamento automatico di interruzione e sostituzione utilizzando lo stesso nome, utilizza
il flag
--updatee l'opzioneupdate_strategy_parallel_job_update. - Per eseguire un aggiornamento in loco, utilizza
update_strategy_in_place_update.
Opzione 2: aggiorna utilizzando un nome del lavoro diverso
--additional-experiments="parallel_replace_job_name=OLD_JOB_NAME" \
--additional-experiments="parallel_replace_job_max_stop_duration=DURATION"
- Per specificare il vecchio job per ID anziché per nome, utilizza
--additional-experiments="parallel_replace_job_id=OLD_JOB_ID". - Se specifichi un nuovo nome del job e utilizzi il flag
--update, Dataflow cerca un job esistente con il nuovo nome, il che causa un errore.
(Facoltativo) Disattivare l'annullamento automatico
L'annullamento automatico è attivato per impostazione predefinita quando specifichi l'opzione
parallel_replace_job_max_stop_duration. Per disattivare l'annullamento automatico,
imposta l'opzione parallel_replace_job_cancel_on_drain_timeout su false.
--additional-experiments="parallel_replace_job_cancel_on_drain_timeout=false"
Se disattivi l'annullamento automatico e il vecchio job rimane bloccato nello stato di svuotamento, sia il vecchio che il nuovo job continuano a essere eseguiti in parallelo.
(Facoltativo) Upsert (crea o aggiorna il job)
Per attivare il comportamento di upsert (crea o aggiorna job):
--additional-experiments="create_or_update_job"
Terraform
additional_experiments = [
"parallel_replace_job_max_stop_duration=DURATION",
"parallel_replace_job_cancel_on_drain_timeout=true",
"update_strategy_parallel_job_update",
"parallel_replace_job_preallocate_compute_resources=true",
"create_or_update_job"
]
Config Connector
metadata:
annotations:
# Force to use only direct controller. Multiple controllers can cause a Dataflow job to enter into a continuous update loop.
# https://docs.cloud.google.com/config-connector/docs/concepts/controller-types#underlying-controller-types
alpha.cnrm.cloud.google.com/reconciler: direct
# Optional but recommended: Dataflow batch jobs do not support the drain operation. But for Dataflow streaming jobs, prefer "drain" over "cancel" as an on-delete option.
cnrm.cloud.google.com/on-delete: drain
# Optional but recommended: Set deletion-policy to "abandon" to avoid accidental deletion, this will ignore the on-delete option.
cnrm.cloud.google.com/deletion-policy: abandon
spec:
...
additionalExperiments:
- "parallel_replace_job_max_stop_duration=DURATION"
- "parallel_replace_job_cancel_on_drain_timeout=true"
- "update_strategy_parallel_job_update"
- "parallel_replace_job_preallocate_compute_resources=true"
- "create_or_update_job"
Sostituisci le seguenti variabili:
- Devi fornire
parallel_replace_job_nameoparallel_replace_job_idper identificare il job da sostituire:OLD_JOB_NAME: il nome del job da sostituire.OLD_JOB_ID: l'ID del job da sostituire.
- Devi fornire il valore
parallel_replace_job_max_stop_durationper attivare l'interruzione e la sostituzione automatiche:DURATION: il periodo di tempo massimo durante il quale il nuovo job attende il completamento del drenaggio del vecchio job. La durata deve essere formattata come una stringa che termina cons,moh(ad esempio,30m,1h).
- Non impostare l'opzione
parallel_replace_job_min_parallel_pipelines_durationquando utilizzi questo flusso di lavoro. L'impostazione di questa opzione attiva il flusso di lavoro aggiornamenti automatici della pipeline parallela. - (Facoltativo) Configura l'opzione
parallel_replace_job_cancel_on_drain_timeout. Poiché l'annullamento automatico è abilitato (il valore predefinito ètrue) per impostazione predefinita quando è impostata l'opzioneparallel_replace_job_max_stop_duration, non è necessario configurare esplicitamente questa opzione per attivarla.- Per mantenere il comportamento predefinito, ometti questa opzione o impostala su
true. - Per disattivare l'annullamento automatico, imposta questa opzione su
false. Se imposti questa opzione sufalsee il vecchio job si blocca nello stato di svuotamento, sia il vecchio che il nuovo job continuano a essere eseguiti in parallelo.
- Per mantenere il comportamento predefinito, ometti questa opzione o impostala su
- (Facoltativo) Definisci la configurazione
parallel_replace_job_preallocate_compute_resources:- Specifica se i worker vengono sottoposti al provisioning in anticipo per il nuovo job
mentre il vecchio job viene svuotato. Valori:
true(impostazione predefinita) ofalse. Per Terraform e Config Connector, è consigliabile impostare questa opzione sutrueper evitare timeout del provisioning delle risorse. Quandoparallel_replace_job_preallocate_compute_resourcesè impostato sufalse, il nuovo job rimane in stato di attesa finché il vecchio job non viene svuotato.
- Specifica se i worker vengono sottoposti al provisioning in anticipo per il nuovo job
mentre il vecchio job viene svuotato. Valori:
Rielaborazione dei messaggi con Pub/Sub Snapshot e Seek
In alcune situazioni, dopo aver sostituito o annullato una pipeline svuotata, potresti dover rielaborare i messaggi Pub/Sub inviati in precedenza. Ad esempio, potresti dover utilizzare una logica di business aggiornata per rielaborare i dati. Pub/Sub Seek è una funzionalità che ti consente di riprodurre i messaggi da uno snapshot Pub/Sub. Puoi utilizzare Pub/Sub Seek con Dataflow per rielaborare i messaggi dal momento in cui viene creato lo snapshot della sottoscrizione.
Durante lo sviluppo e il test, puoi anche utilizzare Pub/Sub Seek per riprodurre ripetutamente i messaggi noti per verificare l'output della pipeline. Quando utilizzi Pub/Sub Seek, non cercare uno snapshot della sottoscrizione quando la sottoscrizione viene utilizzata da una pipeline. In questo caso, la ricerca può invalidare la logica del watermark di Dataflow e potrebbe influire sull'elaborazione "exactly-once" dei messaggi Pub/Sub.
Un flusso di lavoro gcloud CLI consigliato per l'utilizzo di Pub/Sub Seek con le pipeline Dataflow in una finestra del terminale è il seguente:
Per creare uno snapshot dell'abbonamento, utilizza il comando
gcloud pubsub snapshots create:gcloud pubsub snapshots create SNAPSHOT_NAME --subscription=PIPELINE_SUBSCRIPTION_NAME
Per svuotare o annullare la pipeline, utilizza il comando
gcloud dataflow jobs draino il comandogcloud dataflow jobs cancel:gcloud dataflow jobs drain JOB_ID
o
gcloud dataflow jobs cancel JOB_ID
Per passare allo snapshot, utilizza il comando
gcloud pubsub subscriptions seek:gcloud pubsub subscriptions seek SNAPSHOT_NAME
Esegui il deployment di una nuova pipeline che utilizza l'abbonamento.
Esegui pipeline parallele
Se devi evitare interruzioni alla pipeline di streaming durante un aggiornamento, puoi eseguire pipeline parallele. Questo approccio ti consente di avviare un nuovo job di streaming con il codice della pipeline aggiornato ed eseguirlo in parallelo con il job esistente. Puoi utilizzare il flusso di lavoro di deployment dell'aggiornamento parallelo automatizzato della pipeline di Dataflow oppure puoi eseguire i passaggi manualmente.
Panoramica delle pipeline parallele
Quando crei la nuova pipeline, utilizza la stessa strategia di finestre che hai utilizzato per la pipeline esistente. Per il flusso di lavoro manuale, lascia che la pipeline esistente continui a essere eseguita finché la filigrana non supera il timestamp della finestra completa meno recente elaborata dalla pipeline aggiornata. Poi, svuota o annulla la pipeline esistente. Se utilizzi il flusso di lavoro automatizzato, questo lavoro viene svolto per te. La pipeline aggiornata continua a essere eseguita al suo posto e assume autonomamente l'elaborazione.
Il seguente diagramma illustra questo processo.
Nel diagramma, Pipeline B è il job aggiornato che sostituisce Pipeline A. Il valore t è il timestamp della prima finestra completa elaborata dalla pipeline B. Il valore w è la filigrana per Pipeline A. Per semplicità, si presuppone una filigrana perfetta senza dati in ritardo. L'elaborazione e il tempo totale di esecuzione sono rappresentati sull'asse orizzontale. Entrambe le pipeline utilizzano finestre fisse (in sequenza) di cinque minuti. I risultati vengono attivati dopo che la filigrana supera la fine di ogni finestra.
Poiché l'output simultaneo si verifica durante il periodo di tempo in cui le due pipeline si sovrappongono, configura le due pipeline in modo che scrivano i risultati in destinazioni diverse. I sistemi downstream possono quindi utilizzare un'astrazione sulle due destinazioni sink, ad esempio una visualizzazione del database, per eseguire query sui risultati combinati. Questi sistemi possono anche utilizzare l'astrazione per deduplicare i risultati del periodo sovrapposto. Per ulteriori informazioni, consulta Gestire l'output duplicato.
Limitazioni
L'utilizzo di aggiornamenti della pipeline parallela automatici o manuali presenta le seguenti limitazioni:
- Solo aggiornamenti automatici: il nuovo job parallelo deve essere un job Streaming Engine.
- I job simultanei con lo stesso nome non sono consentiti. Tuttavia, quando esegui un aggiornamento della pipeline parallelo o di arresto e sostituzione automatizzato utilizzando lo stesso nome del job, puoi riutilizzare il nome del job. In questo caso, il nuovo job deve iniziare almeno due minuti dopo l'inizio del job precedente. Questa limitazione impedisce più aggiornamenti paralleli da ripetuti tentativi della libreria client o chiamate di procedure remote non aggiornate.
- L'esecuzione di due pipeline in parallelo sullo stesso input può comportare la duplicazione dei dati, aggregazioni parziali e potenziali problemi di ordinamento quando i dati vengono inseriti nel sink. Il sistema downstream deve essere progettato per prevedere e gestire questi risultati.
- Quando leggi da una sorgente Pub/Sub, l'utilizzo della stessa sottoscrizione per più pipeline non è consigliato e può causare problemi di correttezza. Tuttavia, in alcuni casi d'uso, come le pipeline di estrazione, trasformazione e caricamento (ETL), l'utilizzo della stessa sottoscrizione in due pipeline potrebbe ridurre la duplicazione. I problemi di scalabilità automatica sono probabili ogni volta che fornisci un valore diverso da zero per la durata di sovrapposizione. Questo problema può essere mitigato utilizzando la funzionalità di aggiornamento dei job in volo. Per saperne di più, consulta Ottimizzare la scalabilità automatica per le pipeline di streaming Pub/Sub.
- Per Apache Kafka, puoi ridurre al minimo i duplicati attivando il commit dell'offset in Kafka. Per attivare il commit dell'offset in Kafka, consulta Commit in Kafka.
Aggiornamenti automatici delle pipeline parallele
Dataflow fornisce il supporto API per l'avvio di un job di sostituzione parallela. Questa API in stile dichiarativo astrae il lavoro manuale di esecuzione dei passaggi procedurali. Dichiari il job che vuoi aggiornare e un nuovo job viene eseguito in parallelo al vecchio job. Dopo l'esecuzione del nuovo job per la durata specificata, il vecchio job viene svuotato. Questa funzionalità elimina le pause di elaborazione durante gli aggiornamenti. Inoltre, riduce lo sforzo operativo necessario per aggiornare le pipeline incompatibili.
Questo metodo di aggiornamento è ideale per le pipeline che possono tollerare alcuni duplicati o aggregazioni parziali e non richiedono un ordine rigoroso durante l'inserimento dei dati. È
adatto alle pipeline ETL, nonché alle pipeline che utilizzano la modalità di streaming
almeno una volta e la
trasformazione Redistribute con l'opzione Consenti duplicati impostata su true.
Opzioni di servizio per pipeline parallele automatizzate
Utilizza le seguenti opzioni di servizio per gli aggiornamenti automatici della pipeline parallela:
| Opzione del servizio | Obbligatorio o facoltativo | Descrizione | Dipendenze o esclusioni |
|---|---|---|---|
update_strategy_parallel_job_update |
Obbligatorio (opzione 1: aggiornamento utilizzando lo stesso nome del lavoro) | Comando per eseguire un aggiornamento parallelo, che esegue entrambe le pipeline contemporaneamente per ridurre al minimo il tempo di inattività, durante l'aggiornamento con lo stesso nome del job. | Deve essere impostato insieme al flag --update e
parallel_replace_job_min_parallel_pipelines_duration.
|
update_strategy_in_place_update |
Optional | Alternativa all'aggiornamento parallelo. Esegue un aggiornamento standard in loco del job. | Deve essere impostato insieme al flag --update.
Si esclude a vicenda con
Quando questa opzione è impostata, le altre opzioni relative ai job paralleli vengono ignorate. |
parallel_replace_job_min_parallel_pipelines_duration |
Obbligatorio | Specifica la durata minima per cui le due pipeline vengono eseguite contemporaneamente.
Trascorso questo periodo, al vecchio job viene inviato un segnale di svuotamento.
I valori accettabili vanno da 0s (consigliato per la sovrapposizione
zero) a 744h (31 giorni).
|
Deve essere abbinato a un modo per scegliere come target il vecchio lavoro. Una delle seguenti opzioni:
|
parallel_replace_job_name o
parallel_replace_job_id (scegli una delle due opzioni) |
Obbligatorio (opzione 2: aggiorna utilizzando un nome del job diverso) | Identifica il vecchio job per nome o ID da sostituire durante un aggiornamento con un nome diverso. | Richiede l'impostazione di parallel_replace_job_min_parallel_pipelines_duration.
Non utilizzare il flag |
parallel_replace_job_max_stop_duration |
Optional | La durata massima consentita per l'esaurimento del vecchio job prima che
venga attivato l'annullamento automatico. Ad esempio, 30m o
1h. |
Richiede l'impostazione di un flusso di lavoro di aggiornamento parallelo (opzione 1 o opzione 2). |
parallel_replace_job_cancel_on_drain_timeout |
Optional Il valore predefinito è |
Opzione booleana che specifica se annullare il vecchio job se la durata
del suo svuotamento supera parallel_replace_job_max_stop_duration. |
Utilizzato insieme a
parallel_replace_job_max_stop_duration.
Imposta su |
Inviare una richiesta di aggiornamento della pipeline parallela automatizzata
Per utilizzare il workflow automatizzato, avvia un nuovo job di streaming. Puoi aggiornare un job utilizzando lo stesso nome o un nome diverso.
Java
Opzione 1: aggiornamento utilizzando lo stesso nome del job
--update \
--dataflowServiceOptions="update_strategy_parallel_job_update" \
--dataflowServiceOptions="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
- Per eseguire un aggiornamento parallelo utilizzando lo stesso nome, utilizza il flag
--updatee l'opzioneupdate_strategy_parallel_job_update. - Per eseguire un aggiornamento sul posto senza rimuovere le opzioni relative ai
job paralleli, utilizza
update_strategy_in_place_updateanzichéupdate_strategy_parallel_job_update.
Opzione 2: aggiorna utilizzando un nome del lavoro diverso
--dataflowServiceOptions="parallel_replace_job_name=OLD_JOB_NAME" \
--dataflowServiceOptions="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
- Per specificare il vecchio job per ID anziché per nome, utilizza
--dataflowServiceOptions="parallel_replace_job_id=OLD_JOB_ID". - Se specifichi un nuovo nome del job e utilizzi il flag
--update, Dataflow cerca un job esistente con il nuovo nome, il che causa un errore.
(Facoltativo) Configura il timeout di svuotamento e l'annullamento automatico
Puoi aggiungere le seguenti opzioni a una delle configurazioni per impostare un timeout di svuotamento e annullare automaticamente il vecchio job se si blocca.
--dataflowServiceOptions="parallel_replace_job_max_stop_duration=DRAIN_TIMEOUT_DURATION" \
--dataflowServiceOptions="parallel_replace_job_cancel_on_drain_timeout=true"
Se disattivi l'annullamento automatico e il vecchio job rimane bloccato nello stato di svuotamento, sia il vecchio che il nuovo job continuano a essere eseguiti in parallelo.
Python
Opzione 1: aggiornamento utilizzando lo stesso nome del job
--update \
--dataflow_service_options="update_strategy_parallel_job_update" \
--dataflow_service_options="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
- Per eseguire un aggiornamento parallelo utilizzando lo stesso nome, utilizza il flag
--updatee l'opzioneupdate_strategy_parallel_job_update. - Per eseguire un aggiornamento sul posto senza rimuovere le opzioni relative ai
job paralleli, utilizza
update_strategy_in_place_updateanzichéupdate_strategy_parallel_job_update.
Opzione 2: aggiorna utilizzando un nome del lavoro diverso
--dataflow_service_options="parallel_replace_job_name=OLD_JOB_NAME" \
--dataflow_service_options="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
- Per specificare il vecchio job per ID anziché per nome, utilizza
--dataflow_service_options="parallel_replace_job_id=OLD_JOB_ID". - Se specifichi un nuovo nome del job e utilizzi il flag
--update, Dataflow cerca un job esistente con il nuovo nome, il che causa un errore.
(Facoltativo) Configura il timeout di svuotamento e l'annullamento automatico
Puoi aggiungere le seguenti opzioni a una delle configurazioni per impostare un timeout di svuotamento e annullare automaticamente il vecchio job se si blocca.
--dataflow_service_options="parallel_replace_job_max_stop_duration=DRAIN_TIMEOUT_DURATION" \
--dataflow_service_options="parallel_replace_job_cancel_on_drain_timeout=true"
Se disattivi l'annullamento automatico e il vecchio job rimane bloccato nello stato di svuotamento, sia il vecchio che il nuovo job continuano a essere eseguiti in parallelo.
Go
Opzione 1: aggiornamento utilizzando lo stesso nome del job
--update \
--dataflow_service_options="update_strategy_parallel_job_update" \
--dataflow_service_options="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
- Per eseguire un aggiornamento parallelo utilizzando lo stesso nome, utilizza il flag
--updatee l'opzioneupdate_strategy_parallel_job_update. - Per eseguire un aggiornamento sul posto senza rimuovere le opzioni relative ai
job paralleli, utilizza
update_strategy_in_place_updateanzichéupdate_strategy_parallel_job_update.
Opzione 2: aggiorna utilizzando un nome del lavoro diverso
--dataflow_service_options="parallel_replace_job_name=OLD_JOB_NAME" \
--dataflow_service_options="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
- Per specificare il vecchio job per ID anziché per nome, utilizza
--dataflow_service_options="parallel_replace_job_id=OLD_JOB_ID". - Se specifichi un nuovo nome del job e utilizzi il flag
--update, Dataflow cerca un job esistente con il nuovo nome, il che causa un errore.
(Facoltativo) Configura il timeout di svuotamento e l'annullamento automatico
Puoi aggiungere le seguenti opzioni a una delle configurazioni per impostare un timeout di svuotamento e annullare automaticamente il vecchio job se si blocca.
--dataflow_service_options="parallel_replace_job_max_stop_duration=DRAIN_TIMEOUT_DURATION" \
--dataflow_service_options="parallel_replace_job_cancel_on_drain_timeout=true"
Se disattivi l'annullamento automatico e il vecchio job rimane bloccato nello stato di svuotamento, sia il vecchio che il nuovo job continuano a essere eseguiti in parallelo.
gcloud
Opzione 1: aggiornamento utilizzando lo stesso nome del job
--update \
--additional-experiments="update_strategy_parallel_job_update" \
--additional-experiments="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
- Per eseguire un aggiornamento parallelo utilizzando lo stesso nome, utilizza il flag
--updatee l'opzioneupdate_strategy_parallel_job_update. - Per eseguire un aggiornamento sul posto senza rimuovere le opzioni relative ai
job paralleli, utilizza
update_strategy_in_place_updateanzichéupdate_strategy_parallel_job_update.
Opzione 2: aggiorna utilizzando un nome del lavoro diverso
--additional-experiments="parallel_replace_job_name=OLD_JOB_NAME" \
--additional-experiments="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
- Per specificare il vecchio job per ID anziché per nome, utilizza
--additional-experiments="parallel_replace_job_id=OLD_JOB_ID". - Se specifichi un nuovo nome del job e utilizzi il flag
--update, Dataflow cerca un job esistente con il nuovo nome, il che causa un errore.
(Facoltativo) Configura il timeout di svuotamento e l'annullamento automatico
Puoi aggiungere le seguenti opzioni a una delle configurazioni per impostare un timeout di svuotamento e annullare automaticamente il vecchio job se si blocca.
--additional-experiments="parallel_replace_job_max_stop_duration=DRAIN_TIMEOUT_DURATION" \
--additional-experiments="parallel_replace_job_cancel_on_drain_timeout=true"
Se disattivi l'annullamento automatico e il vecchio job rimane bloccato nello stato di svuotamento, sia il vecchio che il nuovo job continuano a essere eseguiti in parallelo.
(Facoltativo) Upsert (crea o aggiorna il job)
Per attivare il comportamento di upsert (crea o aggiorna job):
--additional-experiments="create_or_update_job"
Terraform
additional_experiments = [
"parallel_replace_job_min_parallel_pipelines_duration=DURATION",
"parallel_replace_job_max_stop_duration=DRAIN_TIMEOUT_DURATION",
"update_strategy_parallel_job_update",
"create_or_update_job"
]
Config Connector
metadata:
annotations:
# Force to use only direct controller. Multiple controllers can cause a Dataflow job to enter into a continuous update loop.
# https://docs.cloud.google.com/config-connector/docs/concepts/controller-types#underlying-controller-types
alpha.cnrm.cloud.google.com/reconciler: direct
# Optional but recommended: Dataflow batch jobs do not support the drain operation. But for Dataflow streaming jobs, prefer "drain" over "cancel" as an on-delete option.
cnrm.cloud.google.com/on-delete: drain
# Optional but recommended: Set deletion-policy to "abandon" to avoid accidental deletion, this will ignore the on-delete option.
cnrm.cloud.google.com/deletion-policy: abandon
spec:
...
additionalExperiments:
- "parallel_replace_job_min_parallel_pipelines_duration=DURATION"
- "parallel_replace_job_max_stop_duration=DRAIN_TIMEOUT_DURATION"
- "update_strategy_parallel_job_update"
- "create_or_update_job"
Sostituisci le seguenti variabili:
- Se esegui l'aggiornamento utilizzando un nome job diverso (opzione 2), devi
fornire
parallel_replace_job_nameoparallel_replace_job_idper identificare il job da sostituire. L'aggiornamento utilizzando un nome job diverso non è supportato per Terraform o Config Connector.OLD_JOB_NAME: il nome del job da sostituire.OLD_JOB_ID: l'ID del job da sostituire.
DURATION: il periodo di tempo minimo in cui le due pipeline vengono eseguite in parallelo come numero intero o in rappresentazione in virgola mobile. Per evitare sovrapposizioni, è consigliabile una durata di0s. Trascorso questo periodo, al vecchio job viene inviato un segnale di svuotamento.La durata deve essere compresa tra 0 secondi (
0s) e 31 giorni (744h). Utilizzas,mehper specificare secondi, minuti e ore. Ad esempio,10mè 10 minuti.DRAIN_TIMEOUT_DURATION: (Facoltativo) La quantità massima di tempo in cui il vecchio job deve essere svuotato prima che venga attivato l'annullamento automatico. La durata deve essere formattata come una stringa che termina cons,moh(ad esempio,30m,1h).parallel_replace_job_cancel_on_drain_timeout: (Facoltativo) Indica se annullare il job precedente se non termina lo scaricamento prima della durata massima di interruzione. Se viene fornita una durata del timeout di svuotamento, il valore predefinito ètrue. Per disattivare l'annullamento automatico, imposta questa opzione sufalse. Se imposti questa opzione sufalsee il vecchio job si blocca nello stato di svuotamento, sia il vecchio che il nuovo job continuano a essere eseguiti in parallelo.
Quando avvii il nuovo job, Dataflow attende il provisioning di tutti i worker prima di iniziare a elaborare i dati. Per monitorare lo stato del deployment, controlla i log dei job Dataflow.
Eseguire manualmente pipeline parallele
Per scenari più complessi o quando hai bisogno di un maggiore controllo sul processo di aggiornamento, puoi eseguire manualmente pipeline parallele. Consenti alla pipeline esistente di continuare a essere eseguita finché il watermark non supera il timestamp della finestra completa più recente elaborata dalla pipeline aggiornata. Quindi, svuota o annulla la pipeline esistente.
Gestire l'output duplicato
L'esempio seguente descrive un approccio per la gestione dell'output duplicato. Le due pipeline scrivono l'output in destinazioni diverse, utilizzano sistemi downstream per eseguire query sui risultati e deduplicare i risultati del periodo sovrapposto. Questo esempio utilizza una pipeline che legge i dati di input da Pub/Sub, esegue alcune elaborazioni e scrive i risultati in BigQuery.
Nello stato iniziale, la pipeline di streaming esistente (pipeline A) è in esecuzione e legge i messaggi da un argomento Pub/Sub (argomento) utilizzando una sottoscrizione (sottoscrizione A). I risultati vengono scritti in una tabella BigQuery (Tabella A). I risultati vengono utilizzati tramite una vista BigQuery, che funge da facciata per mascherare le modifiche alla tabella sottostante. Questo processo è un'applicazione di un metodo di progettazione chiamato pattern facciata. Il seguente diagramma mostra lo stato iniziale.
Crea un nuovo abbonamento (Abbonamento B) per la pipeline aggiornata. Esegui il deployment della pipeline aggiornata (pipeline B), che legge dall'argomento Pub/Sub (argomento) utilizzando sottoscrizione B e scrive in una tabella BigQuery separata (tabella B). Il seguente diagramma illustra questo flusso.
A questo punto, Pipeline A e Pipeline B vengono eseguite in parallelo e scrivono i risultati in tabelle separate. Registri l'ora t come timestamp della prima finestra completa elaborata dalla pipeline B.
Quando il watermark della pipeline A supera l'ora t, svuota la pipeline A. Quando svuoti la pipeline, tutte le finestre aperte si chiudono e l'elaborazione dei dati in transito viene completata. Se la pipeline contiene finestre e le finestre complete sono importanti (supponendo che non ci siano dati in ritardo), prima di svuotare la pipeline A, lascia che entrambe le pipeline vengano eseguite finché non hai finestre sovrapposte complete. Interrompi il job di streaming per la pipeline A dopo che tutti i dati in transito sono stati elaborati e scritti nella tabella A. Il seguente diagramma mostra questa fase.
A questo punto, è in esecuzione solo Pipeline B. Puoi eseguire query da una vista BigQuery (vista facciata), che funge da facciata per la tabella A e la tabella B. Per le righe che hanno lo stesso timestamp in entrambe le tabelle, configura la visualizzazione in modo che restituisca le righe della tabella B oppure, se le righe non esistono nella tabella B, torna alla tabella A. Il seguente diagramma mostra la vista (vista facciata) che legge sia dalla Tabella A sia dalla Tabella B.
A questo punto, puoi eliminare l'abbonamento A.
Quando vengono rilevati problemi con il deployment di una nuova pipeline, la presenza di pipeline parallele può semplificare il rollback. In questo esempio, potresti voler mantenere in esecuzione Pipeline A mentre monitori Pipeline B per verificare il corretto funzionamento. Se si verificano problemi con la pipeline B, puoi eseguire il rollback alla pipeline A.
Gestire le mutazioni dello schema
I sistemi di gestione dei dati spesso devono adattarsi alle mutazioni dello schema nel tempo, a volte a causa di modifiche ai requisiti aziendali e altre volte per motivi tecnici. L'applicazione degli aggiornamenti dello schema in genere richiede un'attenta pianificazione ed esecuzione per evitare interruzioni ai sistemi informativi aziendali.
Considera una pipeline che legge i messaggi contenenti payload JSON da un argomento Pub/Sub. La pipeline converte ogni messaggio in un'istanza TableRow e poi scrive le righe in una tabella BigQuery. Lo schema
della tabella di output è simile ai messaggi elaborati dalla pipeline.
Nel seguente diagramma, lo schema è indicato come Schema A.
Nel tempo, lo schema del messaggio potrebbe subire modifiche non banali. Ad esempio, i campi vengono aggiunti, rimossi o sostituiti. Lo schema A si evolve in un nuovo schema. Nella discussione che segue, il nuovo schema viene indicato come Schema B. In questo caso, è necessario aggiornare la pipeline A e lo schema della tabella di output deve supportare lo schema B.
Per la tabella di output, puoi eseguire alcune mutazioni dello schema senza tempi di inattività.
Ad esempio, puoi aggiungere nuovi campi o rilassare
le modalità delle colonne,
ad esempio modificando REQUIRED in NULLABLE, senza tempi di inattività.
Queste mutazioni di solito non influiscono sulle query esistenti. Tuttavia, le mutazioni dello schema che modificano o rimuovono i campi dello schema esistenti interrompono le query o causano altre interruzioni. Il seguente approccio consente di apportare modifiche senza
richiedere tempi di inattività.
Separa i dati scritti dalla pipeline in una tabella principale e in una o più tabelle di staging. La tabella principale archivia i dati storici scritti dalla pipeline. Le tabelle di staging archiviano l'output più recente della pipeline. Puoi definire una vista facciata BigQuery sulle tabelle principale e di staging, che consente ai consumatori di eseguire query sui dati storici e aggiornati.
Il seguente diagramma rivede il flusso della pipeline precedente per includere una tabella di staging (Staging Table A), una tabella principale e una visualizzazione façade.
Nel flusso rivisto, Pipeline A elabora i messaggi che utilizzano Schema A e scrive l'output in Staging Table A, che ha uno schema compatibile. La tabella principale contiene i dati storici scritti dalle versioni precedenti della pipeline, nonché i risultati uniti periodicamente dalla tabella di staging. I consumatori possono eseguire query sui dati aggiornati, inclusi quelli storici e in tempo reale, utilizzando la visualizzazione facciata.
Quando lo schema del messaggio cambia da Schema A a Schema B, potresti aggiornare il codice della pipeline in modo che sia compatibile con i messaggi che utilizzano lo Schema B. La pipeline esistente deve essere aggiornata con la nuova implementazione. Eseguendo pipeline parallele, puoi assicurarti che l'elaborazione dei dati in streaming continui senza interruzioni. L'interruzione e la sostituzione delle pipeline comportano un'interruzione dell'elaborazione, perché nessuna pipeline è in esecuzione per un periodo di tempo.
La pipeline aggiornata scrive in una tabella di gestione temporanea aggiuntiva (Staging Table B) che utilizza Schema B. Puoi utilizzare un flusso di lavoro orchestrato per creare la nuova tabella di gestione temporanea prima di aggiornare la pipeline. Aggiorna la visualizzazione facciata in modo da includere i risultati della nuova tabella di gestione temporanea, utilizzando potenzialmente un passaggio del flusso di lavoro correlato.
Il seguente diagramma mostra il flusso aggiornato che mostra la tabella di staging B con lo schema B e come la visualizzazione facciata viene aggiornata per includere i contenuti della tabella principale e di entrambe le tabelle di staging.
Come processo separato dall'aggiornamento della pipeline, puoi unire le tabelle di staging nella tabella principale, periodicamente o in base alle necessità. Il seguente diagramma mostra come la tabella di staging A viene unita alla tabella principale.
Passaggi successivi
- Trova la procedura dettagliata per aggiornare una pipeline esistente.