Eseguire l'upgrade di una pipeline di inserimento flussi

Questa pagina fornisce indicazioni e consigli per l'upgrade delle pipeline di streaming. Ad esempio, potresti dover eseguire l'upgrade a una versione più recente dell'SDK Apache Beam o aggiornare il codice della pipeline. Vengono fornite diverse opzioni per adattarsi a scenari diversi.

Mentre le pipeline batch si arrestano al termine del job, le pipeline di streaming spesso vengono eseguite ininterrottamente per fornire un'elaborazione continua. Pertanto, quando esegui l'upgrade delle pipeline di streaming, devi tenere conto dei seguenti aspetti:

  • Potresti dover ridurre al minimo o evitare interruzioni della pipeline. In alcuni casi, potresti tollerare un'interruzione temporanea dell'elaborazione durante l'implementazione di una nuova versione di una pipeline. In altri casi, la tua applicazione potrebbe non essere in grado di tollerare alcuna interruzione.
  • I processi di aggiornamento della pipeline devono gestire le modifiche dello schema in modo da ridurre al minimo le interruzioni dell'elaborazione dei messaggi e di altri sistemi collegati. Ad esempio, se lo schema dei messaggi in una pipeline di elaborazione degli eventi cambia, potrebbero essere necessarie modifiche dello schema anche nei sink di dati downstream.

Puoi utilizzare uno dei seguenti metodi per aggiornare le pipeline di streaming, a seconda dei requisiti di aggiornamento e della pipeline:

Per ulteriori informazioni sui problemi che potresti riscontrare durante un aggiornamento e su come prevenirli, consulta Convalida di un job di sostituzione e Controllo di compatibilità del job.

Best practice

  • Esegui l'upgrade della versione dell'SDK Apache Beam separatamente da qualsiasi modifica del codice della pipeline.
  • Testa la pipeline dopo ogni modifica prima di apportare ulteriori aggiornamenti.
  • Esegui regolarmente l'upgrade della versione dell'SDK Apache Beam utilizzata dalla pipeline.
  • Utilizza metodi automatizzati ove possibile, ad esempio aggiornamenti in volo o aggiornamenti automatici della pipeline parallela.
  • Utilizza Managed I/O, se possibile, per usufruire dei vantaggi degli upgrade automatici delle versioni del connettore.

Eseguire aggiornamenti in volo

Puoi aggiornare alcune pipeline di streaming in corso senza interrompere il job. Questo scenario è chiamato aggiornamento del job in corso. Gli aggiornamenti dei job in volo sono disponibili solo in circostanze limitate:

  • Il job deve utilizzare Streaming Engine.
  • Il job deve essere in esecuzione.
  • Stai modificando solo il numero di worker utilizzati dal job.

Per saperne di più, consulta Imposta l'intervallo di scalabilità automatica nella pagina Scalabilità automatica orizzontale.

Per istruzioni su come eseguire un aggiornamento del job in volo, consulta Aggiornare una pipeline esistente.

Creazione o aggiornamento automatico (upsert) dei modelli

Quando avvii pipeline utilizzando un modello (modelli classici, modelli flessibili, Terraform o Config Connector), puoi utilizzare l'esperimento create_or_update_job per utilizzare la funzionalità di creazione o aggiornamento (upsert).

Quando specifichi create_or_update_job nel parametro additional_experiments o nel flag additional-experiments:

  • Se esiste già un job in esecuzione o in fase di svuotamento con il nome specificato, il servizio modelli avvia automaticamente il nuovo job come aggiornamento di quello esistente.
  • Se non esiste alcun job attivo con quel nome, il servizio di modelli avvia il nuovo job come nuova creazione di job.

Questo esperimento elimina la necessità di determinare in modo programmatico se utilizzare l'azione API di creazione o aggiornamento durante l'avvio di un modello.

Per esempi di codice Terraform e Config Connector che utilizzano questo esperimento, consulta le seguenti sezioni:

Avviare un job di sostituzione

Se il job aggiornato è compatibile con quello esistente, puoi aggiornare la pipeline utilizzando l'opzione update. Quando sostituisci un job esistente, un nuovo job esegue il codice della pipeline aggiornato. Il servizio Dataflow conserva il nome del job, ma esegue il job di sostituzione con un ID job aggiornato. Questo processo potrebbe causare tempi di inattività mentre il job esistente si arresta, viene eseguito il controllo di compatibilità e viene avviato il nuovo job. Per maggiori dettagli, vedi Effetti della sostituzione di un job.

Dataflow esegue un controllo di compatibilità per assicurarsi che il codice della pipeline aggiornato possa essere implementato in sicurezza nella pipeline in esecuzione. Alcune modifiche al codice causano l'esito negativo del controllo di compatibilità, ad esempio quando gli input laterali vengono aggiunti o rimossi da un passaggio esistente. Se il controllo di compatibilità non va a buon fine, non puoi eseguire un aggiornamento in loco del job.

Per istruzioni su come avviare un job di sostituzione, consulta Avviare un job di sostituzione.

Se l'aggiornamento della pipeline non è compatibile con il job corrente, devi interrompere e sostituire la pipeline. Se la pipeline non può tollerare tempi di inattività, esegui pipeline parallele.

Interruzione e sostituzione manuali

Per eseguire un arresto e una sostituzione manuali, annulla o svuota la pipeline, quindi sostituiscila con la pipeline aggiornata. L'annullamento di una pipeline fa sì che Dataflow interrompa immediatamente l'elaborazione e chiuda le risorse il più rapidamente possibile, il che può causare la perdita di alcuni dati in fase di elaborazione, noti come dati in transito. Per evitare la perdita di dati, nella maggior parte dei casi lo svuotamento è l'azione preferita. Puoi anche utilizzare gli snapshot di Dataflow per salvare lo stato di una pipeline di streaming, il che ti consente di avviare una nuova versione del job Dataflow senza perdere lo stato. Per ulteriori informazioni, consulta Utilizzo degli snapshot di Dataflow.

Lo svuotamento di una pipeline chiude immediatamente tutte le finestre in corso e attiva tutti i trigger. Sebbene i dati in volo non vengano persi, lo svuotamento potrebbe causare la presenza di dati incompleti nelle finestre. In questo caso, le finestre in corso emettono risultati parziali o incompleti. Per saperne di più, consulta Effetti dello svuotamento di un job. Al termine del job esistente, avvia un nuovo job in modalità flusso che contenga il codice della pipeline aggiornato, che ti consente di riprendere l'elaborazione.

Con questo metodo, si verifica un tempo di inattività tra l'interruzione del job di streaming esistente e il momento in cui la pipeline di sostituzione è pronta a riprendere l'elaborazione dei dati. Tuttavia, l'annullamento o lo svuotamento di una pipeline esistente e l'avvio di un nuovo job con la pipeline aggiornata sono meno complicati dell'esecuzione di pipeline parallele.

Per ulteriori informazioni, consulta Arrestare un job Dataflow. Dopo aver arrestato il job corrente, avviane uno nuovo con lo stesso nome.

Interruzione e sostituzione automatica

Dataflow fornisce il supporto API per l'avvio di un aggiornamento stop-and-replace automatizzato. Questo flusso di lavoro in stile dichiarativo elimina i passaggi procedurali manuali. Dichiari il job da sostituire e il nuovo job viene avviato e coordina automaticamente la transizione.

Quando utilizzi questo flusso di lavoro, viene eseguito il provisioning delle nuove risorse del job mentre il vecchio job è ancora in esecuzione. Il vecchio job riceve quindi automaticamente un segnale di svuotamento. Al termine del drenaggio del vecchio job o al raggiungimento di un timeout specificato dall'utente, il nuovo job inizia immediatamente a elaborare i dati. Utilizza questo flusso di lavoro per le pipeline che non possono tollerare dati duplicati o aggregazioni parziali, ma possono accettare una breve pausa di elaborazione mentre il vecchio job viene svuotato.

Inviare una richiesta di aggiornamento automatico di interruzione e sostituzione

Per utilizzare questo flusso di lavoro:

  • Devi impostare l'opzione parallel_replace_job_max_stop_duration.
  • Non devi impostare l'opzione parallel_replace_job_min_parallel_pipelines_duration. L'impostazione di una durata parallela attiva il flusso di lavoro aggiornamenti automatici della pipeline parallela.

Avvia una richiesta di aggiornamento automatico con interruzione e sostituzione utilizzando le seguenti opzioni di servizio:

Java

Opzione 1: aggiornamento utilizzando lo stesso nome del job

--update \
--dataflowServiceOptions="update_strategy_parallel_job_update" \
--dataflowServiceOptions="parallel_replace_job_max_stop_duration=DURATION"
  • Per eseguire un aggiornamento automatico di interruzione e sostituzione utilizzando lo stesso nome, utilizza il flag --update e l'opzione update_strategy_parallel_job_update.
  • Per eseguire un aggiornamento in loco, utilizza update_strategy_in_place_update.

Opzione 2: aggiorna utilizzando un nome del lavoro diverso

--dataflowServiceOptions="parallel_replace_job_name=OLD_JOB_NAME" \
--dataflowServiceOptions="parallel_replace_job_max_stop_duration=DURATION"
  • Per specificare il vecchio job per ID anziché per nome, utilizza --dataflowServiceOptions="parallel_replace_job_id=OLD_JOB_ID".
  • Se specifichi un nuovo nome del job e utilizzi il flag --update, Dataflow cerca un job esistente con il nuovo nome, il che causa un errore.

(Facoltativo) Disattivare l'annullamento automatico

L'annullamento automatico è attivato per impostazione predefinita quando specifichi l'opzione parallel_replace_job_max_stop_duration. Per disattivare l'annullamento automatico, imposta l'opzione parallel_replace_job_cancel_on_drain_timeout su false.

--dataflowServiceOptions="parallel_replace_job_cancel_on_drain_timeout=false"

Se disattivi l'annullamento automatico e il vecchio job rimane bloccato nello stato di svuotamento, sia il vecchio che il nuovo job continuano a essere eseguiti in parallelo.

Python

Opzione 1: aggiornamento utilizzando lo stesso nome del job

--update \
--dataflow_service_options="update_strategy_parallel_job_update" \
--dataflow_service_options="parallel_replace_job_max_stop_duration=DURATION"
  • Per eseguire un aggiornamento automatico di interruzione e sostituzione utilizzando lo stesso nome, utilizza il flag --update e l'opzione update_strategy_parallel_job_update.
  • Per eseguire un aggiornamento in loco, utilizza update_strategy_in_place_update.

Opzione 2: aggiorna utilizzando un nome del lavoro diverso

--dataflow_service_options="parallel_replace_job_name=OLD_JOB_NAME" \
--dataflow_service_options="parallel_replace_job_max_stop_duration=DURATION"
  • Per specificare il vecchio job per ID anziché per nome, utilizza --dataflow_service_options="parallel_replace_job_id=OLD_JOB_ID".
  • Se specifichi un nuovo nome del job e utilizzi il flag --update, Dataflow cerca un job esistente con il nuovo nome, il che causa un errore.

(Facoltativo) Disattivare l'annullamento automatico

L'annullamento automatico è attivato per impostazione predefinita quando specifichi l'opzione parallel_replace_job_max_stop_duration. Per disattivare l'annullamento automatico, imposta l'opzione parallel_replace_job_cancel_on_drain_timeout su false.

--dataflow_service_options="parallel_replace_job_cancel_on_drain_timeout=false"

Se disattivi l'annullamento automatico e il vecchio job rimane bloccato nello stato di svuotamento, sia il vecchio che il nuovo job continuano a essere eseguiti in parallelo.

Go

Opzione 1: aggiornamento utilizzando lo stesso nome del job

--update \
--dataflow_service_options="update_strategy_parallel_job_update" \
--dataflow_service_options="parallel_replace_job_max_stop_duration=DURATION"
  • Per eseguire un aggiornamento automatico di interruzione e sostituzione utilizzando lo stesso nome, utilizza il flag --update e l'opzione update_strategy_parallel_job_update.
  • Per eseguire un aggiornamento in loco, utilizza update_strategy_in_place_update.

Opzione 2: aggiorna utilizzando un nome del lavoro diverso

--dataflow_service_options="parallel_replace_job_name=OLD_JOB_NAME" \
--dataflow_service_options="parallel_replace_job_max_stop_duration=DURATION"
  • Per specificare il vecchio job per ID anziché per nome, utilizza --dataflow_service_options="parallel_replace_job_id=OLD_JOB_ID".
  • Se specifichi un nuovo nome del job e utilizzi il flag --update, Dataflow cerca un job esistente con il nuovo nome, il che causa un errore.

(Facoltativo) Disattivare l'annullamento automatico

L'annullamento automatico è attivato per impostazione predefinita quando specifichi l'opzione parallel_replace_job_max_stop_duration. Per disattivare l'annullamento automatico, imposta l'opzione parallel_replace_job_cancel_on_drain_timeout su false.

--dataflow_service_options="parallel_replace_job_cancel_on_drain_timeout=false"

Se disattivi l'annullamento automatico e il vecchio job rimane bloccato nello stato di svuotamento, sia il vecchio che il nuovo job continuano a essere eseguiti in parallelo.

gcloud

Opzione 1: aggiornamento utilizzando lo stesso nome del job

--update \
--additional-experiments="update_strategy_parallel_job_update" \
--additional-experiments="parallel_replace_job_max_stop_duration=DURATION"
  • Per eseguire un aggiornamento automatico di interruzione e sostituzione utilizzando lo stesso nome, utilizza il flag --update e l'opzione update_strategy_parallel_job_update.
  • Per eseguire un aggiornamento in loco, utilizza update_strategy_in_place_update.

Opzione 2: aggiorna utilizzando un nome del lavoro diverso

--additional-experiments="parallel_replace_job_name=OLD_JOB_NAME" \
--additional-experiments="parallel_replace_job_max_stop_duration=DURATION"
  • Per specificare il vecchio job per ID anziché per nome, utilizza --additional-experiments="parallel_replace_job_id=OLD_JOB_ID".
  • Se specifichi un nuovo nome del job e utilizzi il flag --update, Dataflow cerca un job esistente con il nuovo nome, il che causa un errore.

(Facoltativo) Disattivare l'annullamento automatico

L'annullamento automatico è attivato per impostazione predefinita quando specifichi l'opzione parallel_replace_job_max_stop_duration. Per disattivare l'annullamento automatico, imposta l'opzione parallel_replace_job_cancel_on_drain_timeout su false.

--additional-experiments="parallel_replace_job_cancel_on_drain_timeout=false"

Se disattivi l'annullamento automatico e il vecchio job rimane bloccato nello stato di svuotamento, sia il vecchio che il nuovo job continuano a essere eseguiti in parallelo.

(Facoltativo) Upsert (crea o aggiorna il job)

Per attivare il comportamento di upsert (crea o aggiorna job):

--additional-experiments="create_or_update_job"

Terraform

additional_experiments = [
  "parallel_replace_job_max_stop_duration=DURATION",
  "parallel_replace_job_cancel_on_drain_timeout=true",
  "update_strategy_parallel_job_update",
  "parallel_replace_job_preallocate_compute_resources=true",
  "create_or_update_job"
]

Config Connector

metadata:
  annotations:
    # Force to use only direct controller. Multiple controllers can cause a Dataflow job to enter into a continuous update loop.
    # https://docs.cloud.google.com/config-connector/docs/concepts/controller-types#underlying-controller-types
    alpha.cnrm.cloud.google.com/reconciler: direct
    # Optional but recommended: Dataflow batch jobs do not support the drain operation. But for Dataflow streaming jobs, prefer "drain" over "cancel" as an on-delete option.
    cnrm.cloud.google.com/on-delete: drain
    # Optional but recommended: Set deletion-policy to "abandon" to avoid accidental deletion, this will ignore the on-delete option.
    cnrm.cloud.google.com/deletion-policy: abandon
spec:
  ...
  additionalExperiments:
    - "parallel_replace_job_max_stop_duration=DURATION"
    - "parallel_replace_job_cancel_on_drain_timeout=true"
    - "update_strategy_parallel_job_update"
    - "parallel_replace_job_preallocate_compute_resources=true"
    - "create_or_update_job"

Sostituisci le seguenti variabili:

  • Devi fornire parallel_replace_job_name o parallel_replace_job_id per identificare il job da sostituire:
    • OLD_JOB_NAME: il nome del job da sostituire.
    • OLD_JOB_ID: l'ID del job da sostituire.
  • Devi fornire il valore parallel_replace_job_max_stop_duration per attivare l'interruzione e la sostituzione automatiche:
    • DURATION: il periodo di tempo massimo durante il quale il nuovo job attende il completamento del drenaggio del vecchio job. La durata deve essere formattata come una stringa che termina con s, m o h (ad esempio, 30m, 1h).
  • Non impostare l'opzione parallel_replace_job_min_parallel_pipelines_duration quando utilizzi questo flusso di lavoro. L'impostazione di questa opzione attiva il flusso di lavoro aggiornamenti automatici della pipeline parallela.
  • (Facoltativo) Configura l'opzione parallel_replace_job_cancel_on_drain_timeout. Poiché l'annullamento automatico è abilitato (il valore predefinito è true) per impostazione predefinita quando è impostata l'opzione parallel_replace_job_max_stop_duration, non è necessario configurare esplicitamente questa opzione per attivarla.
    • Per mantenere il comportamento predefinito, ometti questa opzione o impostala su true.
    • Per disattivare l'annullamento automatico, imposta questa opzione su false. Se imposti questa opzione su false e il vecchio job si blocca nello stato di svuotamento, sia il vecchio che il nuovo job continuano a essere eseguiti in parallelo.
  • (Facoltativo) Definisci la configurazione parallel_replace_job_preallocate_compute_resources:
    • Specifica se i worker vengono sottoposti al provisioning in anticipo per il nuovo job mentre il vecchio job viene svuotato. Valori: true (impostazione predefinita) o false. Per Terraform e Config Connector, è consigliabile impostare questa opzione su true per evitare timeout del provisioning delle risorse. Quando parallel_replace_job_preallocate_compute_resources è impostato su false, il nuovo job rimane in stato di attesa finché il vecchio job non viene svuotato.

Rielaborazione dei messaggi con Pub/Sub Snapshot e Seek

In alcune situazioni, dopo aver sostituito o annullato una pipeline svuotata, potresti dover rielaborare i messaggi Pub/Sub inviati in precedenza. Ad esempio, potresti dover utilizzare una logica di business aggiornata per rielaborare i dati. Pub/Sub Seek è una funzionalità che ti consente di riprodurre i messaggi da uno snapshot Pub/Sub. Puoi utilizzare Pub/Sub Seek con Dataflow per rielaborare i messaggi dal momento in cui viene creato lo snapshot della sottoscrizione.

Durante lo sviluppo e il test, puoi anche utilizzare Pub/Sub Seek per riprodurre ripetutamente i messaggi noti per verificare l'output della pipeline. Quando utilizzi Pub/Sub Seek, non cercare uno snapshot della sottoscrizione quando la sottoscrizione viene utilizzata da una pipeline. In questo caso, la ricerca può invalidare la logica del watermark di Dataflow e potrebbe influire sull'elaborazione "exactly-once" dei messaggi Pub/Sub.

Un flusso di lavoro gcloud CLI consigliato per l'utilizzo di Pub/Sub Seek con le pipeline Dataflow in una finestra del terminale è il seguente:

  1. Per creare uno snapshot dell'abbonamento, utilizza il comando gcloud pubsub snapshots create:

    gcloud pubsub snapshots create SNAPSHOT_NAME --subscription=PIPELINE_SUBSCRIPTION_NAME
    
  2. Per svuotare o annullare la pipeline, utilizza il comando gcloud dataflow jobs drain o il comando gcloud dataflow jobs cancel:

    gcloud dataflow jobs drain JOB_ID
    

    o

    gcloud dataflow jobs cancel JOB_ID
    
  3. Per passare allo snapshot, utilizza il comando gcloud pubsub subscriptions seek:

    gcloud pubsub subscriptions seek SNAPSHOT_NAME
    
  4. Esegui il deployment di una nuova pipeline che utilizza l'abbonamento.

Esegui pipeline parallele

Se devi evitare interruzioni alla pipeline di streaming durante un aggiornamento, puoi eseguire pipeline parallele. Questo approccio ti consente di avviare un nuovo job di streaming con il codice della pipeline aggiornato ed eseguirlo in parallelo con il job esistente. Puoi utilizzare il flusso di lavoro di deployment dell'aggiornamento parallelo automatizzato della pipeline di Dataflow oppure puoi eseguire i passaggi manualmente.

Panoramica delle pipeline parallele

Quando crei la nuova pipeline, utilizza la stessa strategia di finestre che hai utilizzato per la pipeline esistente. Per il flusso di lavoro manuale, lascia che la pipeline esistente continui a essere eseguita finché la filigrana non supera il timestamp della finestra completa meno recente elaborata dalla pipeline aggiornata. Poi, svuota o annulla la pipeline esistente. Se utilizzi il flusso di lavoro automatizzato, questo lavoro viene svolto per te. La pipeline aggiornata continua a essere eseguita al suo posto e assume autonomamente l'elaborazione.

Il seguente diagramma illustra questo processo.

La pipeline B si sovrappone alla pipeline B per un periodo di 5 minuti.

Nel diagramma, Pipeline B è il job aggiornato che sostituisce Pipeline A. Il valore t è il timestamp della prima finestra completa elaborata dalla pipeline B. Il valore w è la filigrana per Pipeline A. Per semplicità, si presuppone una filigrana perfetta senza dati in ritardo. L'elaborazione e il tempo totale di esecuzione sono rappresentati sull'asse orizzontale. Entrambe le pipeline utilizzano finestre fisse (in sequenza) di cinque minuti. I risultati vengono attivati dopo che la filigrana supera la fine di ogni finestra.

Poiché l'output simultaneo si verifica durante il periodo di tempo in cui le due pipeline si sovrappongono, configura le due pipeline in modo che scrivano i risultati in destinazioni diverse. I sistemi downstream possono quindi utilizzare un'astrazione sulle due destinazioni sink, ad esempio una visualizzazione del database, per eseguire query sui risultati combinati. Questi sistemi possono anche utilizzare l'astrazione per deduplicare i risultati del periodo sovrapposto. Per ulteriori informazioni, consulta Gestire l'output duplicato.

Limitazioni

L'utilizzo di aggiornamenti della pipeline parallela automatici o manuali presenta le seguenti limitazioni:

  • Solo aggiornamenti automatici: il nuovo job parallelo deve essere un job Streaming Engine.
  • I job simultanei con lo stesso nome non sono consentiti. Tuttavia, quando esegui un aggiornamento della pipeline parallelo o di arresto e sostituzione automatizzato utilizzando lo stesso nome del job, puoi riutilizzare il nome del job. In questo caso, il nuovo job deve iniziare almeno due minuti dopo l'inizio del job precedente. Questa limitazione impedisce più aggiornamenti paralleli da ripetuti tentativi della libreria client o chiamate di procedure remote non aggiornate.
  • L'esecuzione di due pipeline in parallelo sullo stesso input può comportare la duplicazione dei dati, aggregazioni parziali e potenziali problemi di ordinamento quando i dati vengono inseriti nel sink. Il sistema downstream deve essere progettato per prevedere e gestire questi risultati.
  • Quando leggi da una sorgente Pub/Sub, l'utilizzo della stessa sottoscrizione per più pipeline non è consigliato e può causare problemi di correttezza. Tuttavia, in alcuni casi d'uso, come le pipeline di estrazione, trasformazione e caricamento (ETL), l'utilizzo della stessa sottoscrizione in due pipeline potrebbe ridurre la duplicazione. I problemi di scalabilità automatica sono probabili ogni volta che fornisci un valore diverso da zero per la durata di sovrapposizione. Questo problema può essere mitigato utilizzando la funzionalità di aggiornamento dei job in volo. Per saperne di più, consulta Ottimizzare la scalabilità automatica per le pipeline di streaming Pub/Sub.
  • Per Apache Kafka, puoi ridurre al minimo i duplicati attivando il commit dell'offset in Kafka. Per attivare il commit dell'offset in Kafka, consulta Commit in Kafka.

Aggiornamenti automatici delle pipeline parallele

Dataflow fornisce il supporto API per l'avvio di un job di sostituzione parallela. Questa API in stile dichiarativo astrae il lavoro manuale di esecuzione dei passaggi procedurali. Dichiari il job che vuoi aggiornare e un nuovo job viene eseguito in parallelo al vecchio job. Dopo l'esecuzione del nuovo job per la durata specificata, il vecchio job viene svuotato. Questa funzionalità elimina le pause di elaborazione durante gli aggiornamenti. Inoltre, riduce lo sforzo operativo necessario per aggiornare le pipeline incompatibili.

Questo metodo di aggiornamento è ideale per le pipeline che possono tollerare alcuni duplicati o aggregazioni parziali e non richiedono un ordine rigoroso durante l'inserimento dei dati. È adatto alle pipeline ETL, nonché alle pipeline che utilizzano la modalità di streaming almeno una volta e la trasformazione Redistribute con l'opzione Consenti duplicati impostata su true.

Opzioni di servizio per pipeline parallele automatizzate

Utilizza le seguenti opzioni di servizio per gli aggiornamenti automatici della pipeline parallela:

Opzione del servizio Obbligatorio o facoltativo Descrizione Dipendenze o esclusioni
update_strategy_parallel_job_update Obbligatorio (opzione 1: aggiornamento utilizzando lo stesso nome del lavoro) Comando per eseguire un aggiornamento parallelo, che esegue entrambe le pipeline contemporaneamente per ridurre al minimo il tempo di inattività, durante l'aggiornamento con lo stesso nome del job. Deve essere impostato insieme al flag --update e parallel_replace_job_min_parallel_pipelines_duration.
update_strategy_in_place_update Optional Alternativa all'aggiornamento parallelo. Esegue un aggiornamento standard in loco del job. Deve essere impostato insieme al flag --update.

Si esclude a vicenda con update_strategy_parallel_job_update.

Quando questa opzione è impostata, le altre opzioni relative ai job paralleli vengono ignorate.

parallel_replace_job_min_parallel_pipelines_duration Obbligatorio Specifica la durata minima per cui le due pipeline vengono eseguite contemporaneamente. Trascorso questo periodo, al vecchio job viene inviato un segnale di svuotamento. I valori accettabili vanno da 0s (consigliato per la sovrapposizione zero) a 744h (31 giorni). Deve essere abbinato a un modo per scegliere come target il vecchio lavoro. Una delle seguenti opzioni:
  • Opzione 1: utilizza lo stesso nome del job: update_strategy_parallel_job_update oppure
  • Opzione 2: utilizza un nome del job diverso: parallel_replace_job_name (o in alternativa parallel_replace_job_id per identificare il job)
parallel_replace_job_name o parallel_replace_job_id (scegli una delle due opzioni) Obbligatorio (opzione 2: aggiorna utilizzando un nome del job diverso) Identifica il vecchio job per nome o ID da sostituire durante un aggiornamento con un nome diverso. Richiede l'impostazione di parallel_replace_job_min_parallel_pipelines_duration.

Non utilizzare il flag --update o parallel_replace_job_id con questa opzione.

parallel_replace_job_max_stop_duration Optional La durata massima consentita per l'esaurimento del vecchio job prima che venga attivato l'annullamento automatico. Ad esempio, 30m o 1h. Richiede l'impostazione di un flusso di lavoro di aggiornamento parallelo (opzione 1 o opzione 2).
parallel_replace_job_cancel_on_drain_timeout Optional

Il valore predefinito è true se è impostata una durata massima della sosta.

Opzione booleana che specifica se annullare il vecchio job se la durata del suo svuotamento supera parallel_replace_job_max_stop_duration. Utilizzato insieme a parallel_replace_job_max_stop_duration.

Imposta su false per disattivare l'annullamento automatico. Se disattivi l'annullamento automatico e il vecchio job rimane bloccato nello stato di svuotamento, sia il vecchio che il nuovo job continuano a essere eseguiti in parallelo.

Inviare una richiesta di aggiornamento della pipeline parallela automatizzata

Per utilizzare il workflow automatizzato, avvia un nuovo job di streaming. Puoi aggiornare un job utilizzando lo stesso nome o un nome diverso.

Java

Opzione 1: aggiornamento utilizzando lo stesso nome del job

--update \
--dataflowServiceOptions="update_strategy_parallel_job_update" \
--dataflowServiceOptions="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
  • Per eseguire un aggiornamento parallelo utilizzando lo stesso nome, utilizza il flag --update e l'opzione update_strategy_parallel_job_update.
  • Per eseguire un aggiornamento sul posto senza rimuovere le opzioni relative ai job paralleli, utilizza update_strategy_in_place_update anziché update_strategy_parallel_job_update.

Opzione 2: aggiorna utilizzando un nome del lavoro diverso

--dataflowServiceOptions="parallel_replace_job_name=OLD_JOB_NAME" \
--dataflowServiceOptions="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
  • Per specificare il vecchio job per ID anziché per nome, utilizza --dataflowServiceOptions="parallel_replace_job_id=OLD_JOB_ID".
  • Se specifichi un nuovo nome del job e utilizzi il flag --update, Dataflow cerca un job esistente con il nuovo nome, il che causa un errore.

(Facoltativo) Configura il timeout di svuotamento e l'annullamento automatico

Puoi aggiungere le seguenti opzioni a una delle configurazioni per impostare un timeout di svuotamento e annullare automaticamente il vecchio job se si blocca.

--dataflowServiceOptions="parallel_replace_job_max_stop_duration=DRAIN_TIMEOUT_DURATION" \
--dataflowServiceOptions="parallel_replace_job_cancel_on_drain_timeout=true"

Se disattivi l'annullamento automatico e il vecchio job rimane bloccato nello stato di svuotamento, sia il vecchio che il nuovo job continuano a essere eseguiti in parallelo.

Python

Opzione 1: aggiornamento utilizzando lo stesso nome del job

--update \
--dataflow_service_options="update_strategy_parallel_job_update" \
--dataflow_service_options="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
  • Per eseguire un aggiornamento parallelo utilizzando lo stesso nome, utilizza il flag --update e l'opzione update_strategy_parallel_job_update.
  • Per eseguire un aggiornamento sul posto senza rimuovere le opzioni relative ai job paralleli, utilizza update_strategy_in_place_update anziché update_strategy_parallel_job_update.

Opzione 2: aggiorna utilizzando un nome del lavoro diverso

--dataflow_service_options="parallel_replace_job_name=OLD_JOB_NAME" \
--dataflow_service_options="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
  • Per specificare il vecchio job per ID anziché per nome, utilizza --dataflow_service_options="parallel_replace_job_id=OLD_JOB_ID".
  • Se specifichi un nuovo nome del job e utilizzi il flag --update, Dataflow cerca un job esistente con il nuovo nome, il che causa un errore.

(Facoltativo) Configura il timeout di svuotamento e l'annullamento automatico

Puoi aggiungere le seguenti opzioni a una delle configurazioni per impostare un timeout di svuotamento e annullare automaticamente il vecchio job se si blocca.

--dataflow_service_options="parallel_replace_job_max_stop_duration=DRAIN_TIMEOUT_DURATION" \
--dataflow_service_options="parallel_replace_job_cancel_on_drain_timeout=true"

Se disattivi l'annullamento automatico e il vecchio job rimane bloccato nello stato di svuotamento, sia il vecchio che il nuovo job continuano a essere eseguiti in parallelo.

Go

Opzione 1: aggiornamento utilizzando lo stesso nome del job

--update \
--dataflow_service_options="update_strategy_parallel_job_update" \
--dataflow_service_options="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
  • Per eseguire un aggiornamento parallelo utilizzando lo stesso nome, utilizza il flag --update e l'opzione update_strategy_parallel_job_update.
  • Per eseguire un aggiornamento sul posto senza rimuovere le opzioni relative ai job paralleli, utilizza update_strategy_in_place_update anziché update_strategy_parallel_job_update.

Opzione 2: aggiorna utilizzando un nome del lavoro diverso

--dataflow_service_options="parallel_replace_job_name=OLD_JOB_NAME" \
--dataflow_service_options="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
  • Per specificare il vecchio job per ID anziché per nome, utilizza --dataflow_service_options="parallel_replace_job_id=OLD_JOB_ID".
  • Se specifichi un nuovo nome del job e utilizzi il flag --update, Dataflow cerca un job esistente con il nuovo nome, il che causa un errore.

(Facoltativo) Configura il timeout di svuotamento e l'annullamento automatico

Puoi aggiungere le seguenti opzioni a una delle configurazioni per impostare un timeout di svuotamento e annullare automaticamente il vecchio job se si blocca.

--dataflow_service_options="parallel_replace_job_max_stop_duration=DRAIN_TIMEOUT_DURATION" \
--dataflow_service_options="parallel_replace_job_cancel_on_drain_timeout=true"

Se disattivi l'annullamento automatico e il vecchio job rimane bloccato nello stato di svuotamento, sia il vecchio che il nuovo job continuano a essere eseguiti in parallelo.

gcloud

Opzione 1: aggiornamento utilizzando lo stesso nome del job

--update \
--additional-experiments="update_strategy_parallel_job_update" \
--additional-experiments="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
  • Per eseguire un aggiornamento parallelo utilizzando lo stesso nome, utilizza il flag --update e l'opzione update_strategy_parallel_job_update.
  • Per eseguire un aggiornamento sul posto senza rimuovere le opzioni relative ai job paralleli, utilizza update_strategy_in_place_update anziché update_strategy_parallel_job_update.

Opzione 2: aggiorna utilizzando un nome del lavoro diverso

--additional-experiments="parallel_replace_job_name=OLD_JOB_NAME" \
--additional-experiments="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
  • Per specificare il vecchio job per ID anziché per nome, utilizza --additional-experiments="parallel_replace_job_id=OLD_JOB_ID".
  • Se specifichi un nuovo nome del job e utilizzi il flag --update, Dataflow cerca un job esistente con il nuovo nome, il che causa un errore.

(Facoltativo) Configura il timeout di svuotamento e l'annullamento automatico

Puoi aggiungere le seguenti opzioni a una delle configurazioni per impostare un timeout di svuotamento e annullare automaticamente il vecchio job se si blocca.

--additional-experiments="parallel_replace_job_max_stop_duration=DRAIN_TIMEOUT_DURATION" \
--additional-experiments="parallel_replace_job_cancel_on_drain_timeout=true"

Se disattivi l'annullamento automatico e il vecchio job rimane bloccato nello stato di svuotamento, sia il vecchio che il nuovo job continuano a essere eseguiti in parallelo.

(Facoltativo) Upsert (crea o aggiorna il job)

Per attivare il comportamento di upsert (crea o aggiorna job):

--additional-experiments="create_or_update_job"

Terraform

additional_experiments = [
  "parallel_replace_job_min_parallel_pipelines_duration=DURATION",
  "parallel_replace_job_max_stop_duration=DRAIN_TIMEOUT_DURATION",
  "update_strategy_parallel_job_update",
  "create_or_update_job"
]

Config Connector

metadata:
  annotations:
    # Force to use only direct controller. Multiple controllers can cause a Dataflow job to enter into a continuous update loop.
    # https://docs.cloud.google.com/config-connector/docs/concepts/controller-types#underlying-controller-types
    alpha.cnrm.cloud.google.com/reconciler: direct
    # Optional but recommended: Dataflow batch jobs do not support the drain operation. But for Dataflow streaming jobs, prefer "drain" over "cancel" as an on-delete option.
    cnrm.cloud.google.com/on-delete: drain
    # Optional but recommended: Set deletion-policy to "abandon" to avoid accidental deletion, this will ignore the on-delete option.
    cnrm.cloud.google.com/deletion-policy: abandon
spec:
  ...
  additionalExperiments:
    - "parallel_replace_job_min_parallel_pipelines_duration=DURATION"
    - "parallel_replace_job_max_stop_duration=DRAIN_TIMEOUT_DURATION"
    - "update_strategy_parallel_job_update"
    - "create_or_update_job"

Sostituisci le seguenti variabili:

  • Se esegui l'aggiornamento utilizzando un nome job diverso (opzione 2), devi fornire parallel_replace_job_name o parallel_replace_job_id per identificare il job da sostituire. L'aggiornamento utilizzando un nome job diverso non è supportato per Terraform o Config Connector.
    • OLD_JOB_NAME: il nome del job da sostituire.
    • OLD_JOB_ID: l'ID del job da sostituire.
  • DURATION: il periodo di tempo minimo in cui le due pipeline vengono eseguite in parallelo come numero intero o in rappresentazione in virgola mobile. Per evitare sovrapposizioni, è consigliabile una durata di 0s. Trascorso questo periodo, al vecchio job viene inviato un segnale di svuotamento.

    La durata deve essere compresa tra 0 secondi (0s) e 31 giorni (744h). Utilizza s, m e h per specificare secondi, minuti e ore. Ad esempio, 10m è 10 minuti.

  • DRAIN_TIMEOUT_DURATION: (Facoltativo) La quantità massima di tempo in cui il vecchio job deve essere svuotato prima che venga attivato l'annullamento automatico. La durata deve essere formattata come una stringa che termina con s, m o h (ad esempio, 30m, 1h).

  • parallel_replace_job_cancel_on_drain_timeout: (Facoltativo) Indica se annullare il job precedente se non termina lo scaricamento prima della durata massima di interruzione. Se viene fornita una durata del timeout di svuotamento, il valore predefinito è true. Per disattivare l'annullamento automatico, imposta questa opzione su false. Se imposti questa opzione su false e il vecchio job si blocca nello stato di svuotamento, sia il vecchio che il nuovo job continuano a essere eseguiti in parallelo.

Quando avvii il nuovo job, Dataflow attende il provisioning di tutti i worker prima di iniziare a elaborare i dati. Per monitorare lo stato del deployment, controlla i log dei job Dataflow.

Eseguire manualmente pipeline parallele

Per scenari più complessi o quando hai bisogno di un maggiore controllo sul processo di aggiornamento, puoi eseguire manualmente pipeline parallele. Consenti alla pipeline esistente di continuare a essere eseguita finché il watermark non supera il timestamp della finestra completa più recente elaborata dalla pipeline aggiornata. Quindi, svuota o annulla la pipeline esistente.

Gestire l'output duplicato

L'esempio seguente descrive un approccio per la gestione dell'output duplicato. Le due pipeline scrivono l'output in destinazioni diverse, utilizzano sistemi downstream per eseguire query sui risultati e deduplicare i risultati del periodo sovrapposto. Questo esempio utilizza una pipeline che legge i dati di input da Pub/Sub, esegue alcune elaborazioni e scrive i risultati in BigQuery.

  1. Nello stato iniziale, la pipeline di streaming esistente (pipeline A) è in esecuzione e legge i messaggi da un argomento Pub/Sub (argomento) utilizzando una sottoscrizione (sottoscrizione A). I risultati vengono scritti in una tabella BigQuery (Tabella A). I risultati vengono utilizzati tramite una vista BigQuery, che funge da facciata per mascherare le modifiche alla tabella sottostante. Questo processo è un'applicazione di un metodo di progettazione chiamato pattern facciata. Il seguente diagramma mostra lo stato iniziale.

    Una pipeline con una sottoscrizione e la scrittura in una singola tabella BigQuery.

  2. Crea un nuovo abbonamento (Abbonamento B) per la pipeline aggiornata. Esegui il deployment della pipeline aggiornata (pipeline B), che legge dall'argomento Pub/Sub (argomento) utilizzando sottoscrizione B e scrive in una tabella BigQuery separata (tabella B). Il seguente diagramma illustra questo flusso.

    Due pipeline, ciascuna con un abbonamento. Ogni pipeline scrive in una tabella BigQuery separata. Una vista facciata legge da entrambe le tabelle.

    A questo punto, Pipeline A e Pipeline B vengono eseguite in parallelo e scrivono i risultati in tabelle separate. Registri l'ora t come timestamp della prima finestra completa elaborata dalla pipeline B.

  3. Quando il watermark della pipeline A supera l'ora t, svuota la pipeline A. Quando svuoti la pipeline, tutte le finestre aperte si chiudono e l'elaborazione dei dati in transito viene completata. Se la pipeline contiene finestre e le finestre complete sono importanti (supponendo che non ci siano dati in ritardo), prima di svuotare la pipeline A, lascia che entrambe le pipeline vengano eseguite finché non hai finestre sovrapposte complete. Interrompi il job di streaming per la pipeline A dopo che tutti i dati in transito sono stati elaborati e scritti nella tabella A. Il seguente diagramma mostra questa fase.

    La pipeline A viene svuotata e non legge più l'abbonamento A e non invia più dati alla tabella A dopo il completamento dello svuotamento. Tutta l'elaborazione viene gestita dalla seconda pipeline.

  4. A questo punto, è in esecuzione solo Pipeline B. Puoi eseguire query da una vista BigQuery (vista facciata), che funge da facciata per la tabella A e la tabella B. Per le righe che hanno lo stesso timestamp in entrambe le tabelle, configura la visualizzazione in modo che restituisca le righe della tabella B oppure, se le righe non esistono nella tabella B, torna alla tabella A. Il seguente diagramma mostra la vista (vista facciata) che legge sia dalla Tabella A sia dalla Tabella B.

    La pipeline A non è più presente e viene eseguita solo la pipeline B.

    A questo punto, puoi eliminare l'abbonamento A.

Quando vengono rilevati problemi con il deployment di una nuova pipeline, la presenza di pipeline parallele può semplificare il rollback. In questo esempio, potresti voler mantenere in esecuzione Pipeline A mentre monitori Pipeline B per verificare il corretto funzionamento. Se si verificano problemi con la pipeline B, puoi eseguire il rollback alla pipeline A.

Gestire le mutazioni dello schema

I sistemi di gestione dei dati spesso devono adattarsi alle mutazioni dello schema nel tempo, a volte a causa di modifiche ai requisiti aziendali e altre volte per motivi tecnici. L'applicazione degli aggiornamenti dello schema in genere richiede un'attenta pianificazione ed esecuzione per evitare interruzioni ai sistemi informativi aziendali.

Considera una pipeline che legge i messaggi contenenti payload JSON da un argomento Pub/Sub. La pipeline converte ogni messaggio in un'istanza TableRow e poi scrive le righe in una tabella BigQuery. Lo schema della tabella di output è simile ai messaggi elaborati dalla pipeline. Nel seguente diagramma, lo schema è indicato come Schema A.

Pipeline che legge una sottoscrizione e scrive in una tabella di output BigQuery utilizzando lo schema A.

Nel tempo, lo schema del messaggio potrebbe subire modifiche non banali. Ad esempio, i campi vengono aggiunti, rimossi o sostituiti. Lo schema A si evolve in un nuovo schema. Nella discussione che segue, il nuovo schema viene indicato come Schema B. In questo caso, è necessario aggiornare la pipeline A e lo schema della tabella di output deve supportare lo schema B.

Per la tabella di output, puoi eseguire alcune mutazioni dello schema senza tempi di inattività. Ad esempio, puoi aggiungere nuovi campi o rilassare le modalità delle colonne, ad esempio modificando REQUIRED in NULLABLE, senza tempi di inattività. Queste mutazioni di solito non influiscono sulle query esistenti. Tuttavia, le mutazioni dello schema che modificano o rimuovono i campi dello schema esistenti interrompono le query o causano altre interruzioni. Il seguente approccio consente di apportare modifiche senza richiedere tempi di inattività.

Separa i dati scritti dalla pipeline in una tabella principale e in una o più tabelle di staging. La tabella principale archivia i dati storici scritti dalla pipeline. Le tabelle di staging archiviano l'output più recente della pipeline. Puoi definire una vista facciata BigQuery sulle tabelle principale e di staging, che consente ai consumatori di eseguire query sui dati storici e aggiornati.

Il seguente diagramma rivede il flusso della pipeline precedente per includere una tabella di staging (Staging Table A), una tabella principale e una visualizzazione façade.

Pipeline che legge una sottoscrizione e scrive in una tabella di gestione temporanea BigQuery. Una seconda tabella (principale) ha un output di una versione precedente dello schema. Una visualizzazione facciata legge sia dalla tabella di gestione temporanea sia dalla tabella principale.

Nel flusso rivisto, Pipeline A elabora i messaggi che utilizzano Schema A e scrive l'output in Staging Table A, che ha uno schema compatibile. La tabella principale contiene i dati storici scritti dalle versioni precedenti della pipeline, nonché i risultati uniti periodicamente dalla tabella di staging. I consumatori possono eseguire query sui dati aggiornati, inclusi quelli storici e in tempo reale, utilizzando la visualizzazione facciata.

Quando lo schema del messaggio cambia da Schema A a Schema B, potresti aggiornare il codice della pipeline in modo che sia compatibile con i messaggi che utilizzano lo Schema B. La pipeline esistente deve essere aggiornata con la nuova implementazione. Eseguendo pipeline parallele, puoi assicurarti che l'elaborazione dei dati in streaming continui senza interruzioni. L'interruzione e la sostituzione delle pipeline comportano un'interruzione dell'elaborazione, perché nessuna pipeline è in esecuzione per un periodo di tempo.

La pipeline aggiornata scrive in una tabella di gestione temporanea aggiuntiva (Staging Table B) che utilizza Schema B. Puoi utilizzare un flusso di lavoro orchestrato per creare la nuova tabella di gestione temporanea prima di aggiornare la pipeline. Aggiorna la visualizzazione facciata in modo da includere i risultati della nuova tabella di gestione temporanea, utilizzando potenzialmente un passaggio del flusso di lavoro correlato.

Il seguente diagramma mostra il flusso aggiornato che mostra la tabella di staging B con lo schema B e come la visualizzazione facciata viene aggiornata per includere i contenuti della tabella principale e di entrambe le tabelle di staging.

La pipeline ora utilizza lo schema B e scrive nella tabella di staging B. Una vista facciata legge dalla tabella Principal, dalla tabella di staging A e dalla tabella di staging B.

Come processo separato dall'aggiornamento della pipeline, puoi unire le tabelle di staging nella tabella principale, periodicamente o in base alle necessità. Il seguente diagramma mostra come la tabella di staging A viene unita alla tabella principale.

La tabella di staging A viene unita alla tabella principale. La visualizzazione facciata legge dalla tabella di staging B e dalla tabella principale.

Passaggi successivi