Risolvere i problemi di memoria insufficiente di Dataflow

Questa pagina descrive come trovare e risolvere gli errori di memoria insufficiente (OOM) in Dataflow.

Trovare gli errori di memoria insufficiente

Per determinare se la pipeline sta esaurendo la memoria, utilizza uno dei seguenti metodi.

  • Nella pagina Dettagli job, nel riquadro Log , visualizza la scheda Diagnostica. Questa scheda mostra gli errori relativi ai problemi di memoria e la frequenza con cui si verificano.
  • Nell'interfaccia di monitoraggio di Dataflow, utilizza il grafico Utilizzo memoria per monitorare la capacità e l'utilizzo della memoria dei worker.
  • Nella pagina Dettagli job, nel riquadro Log, seleziona Log worker per trovare gli errori di memoria insufficiente nei log dei worker.
  • Gli errori di memoria insufficiente potrebbero essere visualizzati anche nei log di sistema. Per visualizzarli, vai a Esplora log e utilizza la seguente query:

    resource.type="dataflow_step"
    resource.labels.job_id="JOB_ID"
    "out of memory" OR "OutOfMemory" OR "Shutting down JVM"
    

    Sostituisci JOB_ID con l'ID del job.

  • Per i job Java, Java Memory Monitor segnala periodicamente le metriche di garbage collection. Se la frazione di tempo della CPU utilizzata per la garbage collection supera una soglia del 50% per un periodo di tempo prolungato, l'SDK harness non riesce. Potresti visualizzare un errore simile al seguente esempio:

    Shutting down JVM after 8 consecutive periods of measured GC thrashing. Memory is used/total/max = ...
    

    Questo errore può verificarsi quando la memoria fisica è ancora disponibile e in genere indica che la memoria utilizzata della pipeline non è efficiente. Per risolvere il problema, ottimizza la pipeline.

    Java Memory Monitor è configurato dall' MemoryMonitorOptions interfaccia.

Se il job ha un utilizzo elevato della memoria utilizzata o errori di memoria insufficiente, segui i consigli riportati in questa pagina per ottimizzare l'utilizzo della memoria utilizzata o aumentare la quantità di memoria disponibile.

Risolvere gli errori di memoria insufficiente

Le modifiche alla pipeline Dataflow potrebbero risolvere gli errori di memoria insufficiente o ridurre la memoria utilizzata. Le possibili modifiche includono le seguenti azioni:

Il seguente diagramma mostra il flusso di lavoro per la risoluzione dei problemi di Dataflow descritto in questa pagina.

Un diagramma che mostra il flusso di lavoro per la risoluzione dei problemi.

Prova le seguenti mitigazioni:

  • Se possibile, ottimizza la pipeline per ridurre la memoria utilizzata.
  • Se il job è un job batch, prova a svolgere i seguenti passaggi nell'ordine indicato:
    1. Utilizza un tipo di macchina con più memoria per vCPU.
    2. Riduci il numero di thread a un valore inferiore al numero di vCPU per worker.
    3. Utilizza un tipo di macchina personalizzata con più memoria per vCPU.
  • Se il job è un job di streaming che utilizza Python, riduci il numero di thread a un valore inferiore a 12.
  • Se il job è un job di streaming che utilizza Java o Go, prova a svolgere i seguenti passaggi:
    1. Riduci il numero di thread a un valore inferiore a 500 per i job Runner v2 o inferiore a 300 per i job che non utilizzano Runner v2.
    2. Utilizza un tipo di macchina con più memoria.

Ottimizzare la pipeline

Diverse operazioni della pipeline possono causare errori di memoria insufficiente. Questa sezione fornisce opzioni per ridurre la memoria utilizzata della pipeline. Per identificare le fasi della pipeline che consumano più memoria, utilizza Cloud Profiler per monitorare il rendimento della pipeline.

Puoi utilizzare le seguenti best practice per ottimizzare la pipeline:

Utilizza i connettori I/O integrati di Apache Beam per la lettura dei file

Non aprire file di grandi dimensioni all'interno di una DoFn. Per leggere i file, utilizza i connettori I/O integrati di Apache Beam. I file aperti in una DoFn devono rientrare nella memoria. Poiché più istanze DoFn vengono eseguite contemporaneamente, i file di grandi dimensioni aperti nelle DoFn possono causare errori di memoria insufficiente.

Riprogetta le operazioni quando utilizzi le PTransform GroupByKey

Quando utilizzi una PTransform GroupByKey in Dataflow, i valori risultanti per chiave e per finestra vengono elaborati su un singolo thread. Poiché questi dati vengono passati come flusso dal servizio di backend Dataflow ai worker, non devono rientrare nella memoria dei worker. Tuttavia, se i valori vengono raccolti in memoria, la logica di elaborazione potrebbe causare errori di memoria insufficiente.

Ad esempio, se hai una chiave che contiene dati per una finestra e aggiungi i valori della chiave a un oggetto in memoria, ad esempio un elenco, potrebbero verificarsi errori di memoria insufficiente. In questo scenario, il worker potrebbe non avere una capacità di memoria sufficiente per contenere tutti gli oggetti.

Per saperne di più sulle PTransform GroupByKey, consulta la documentazione di Apache Beam Python GroupByKey e Java GroupByKey.

Il seguente elenco contiene suggerimenti per progettare la pipeline in modo da ridurre al minimo il consumo di memoria quando utilizzi le PTransform GroupByKey.

  • Per ridurre la quantità di dati per chiave e per finestra, evita le chiavi con molti valori, note anche come chiavi attive.
  • Per ridurre la quantità di dati raccolti per finestra, utilizza una dimensione della finestra più piccola.
  • Se utilizzi i valori di una chiave in una finestra per calcolare un numero, utilizza una Combine trasformazione. Non eseguire il calcolo in una singola istanza DoFn dopo aver raccolto i valori.
  • Filtra i valori o i duplicati prima dell'elaborazione. Per saperne di più, consulta la documentazione delle trasformazioni Python Filter e Java Filter.

Riduci i dati in entrata da origini esterne

Se effettui chiamate a un'API esterna o a un database per l'arricchimento dei dati, i dati restituiti devono rientrare nella memoria dei worker. Se esegui il batch delle chiamate, ti consigliamo di utilizzare una trasformazione GroupIntoBatches. Se riscontri errori di memoria insufficiente, riduci le dimensioni del batch. Per saperne di più sul raggruppamento in batch, consulta la documentazione delle trasformazioni GroupIntoBatchesPython e JavaGroupIntoBatches.

Condividi gli oggetti tra i thread

La condivisione di un oggetto dati in memoria tra le istanze DoFn può migliorare l'efficienza dello spazio e dell'accesso. Gli oggetti dati creati in qualsiasi metodo di DoFn, inclusi Setup, StartBundle, Process, FinishBundle e Teardown, vengono richiamati per ogni DoFn. In Dataflow, ogni worker potrebbe avere diverse istanze DoFn. Per un utilizzo più efficiente della memoria utilizzata, passa un oggetto dati come singleton per condividerlo tra più DoFn. Per saperne di più, consulta il post del blog Riutilizzo della cache tra le DoFns.

Utilizza rappresentazioni di elementi efficienti in termini di memoria

Valuta se puoi utilizzare rappresentazioni per gli elementi PCollection che utilizzano meno memoria. Quando utilizzi i codificatori nella pipeline, considera non solo le rappresentazioni degli elementi PCollection codificati, ma anche quelli decodificati. Le matrici sparse possono spesso trarre vantaggio da questo tipo di ottimizzazione.

Riduci le dimensioni degli input aggiuntivi

Se le DoFn utilizzano input aggiuntivi, riduci le dimensioni dell'input aggiuntivo. Per gli input aggiuntivi che sono raccolte di elementi, valuta la possibilità di utilizzare visualizzazioni iterabili, come AsIterable o AsMultimap, anziché visualizzazioni che materializzano l'intero input aggiuntivo contemporaneamente, come AsList.

Ridurre il numero di thread

Puoi aumentare la memoria disponibile per thread riducendo il numero massimo di thread che eseguono le istanze DoFn. Questa modifica riduce il parallelismo, ma rende disponibile più memoria per ogni DoFn.

La tabella seguente mostra il numero predefinito di thread creati da Dataflow:

Tipo di prestazione SDK Python SDK Java/Go
Batch 1 thread per vCPU 1 thread per vCPU
Streaming con Runner v2 12 thread per vCPU 500 thread per VM worker
Streaming senza Runner v2 12 thread per vCPU 300 thread per VM worker

Per ridurre il numero di thread dell'SDK Apache Beam, imposta la seguente opzione della pipeline:

Java

Utilizza l'opzione della pipeline --numberOfWorkerHarnessThreads.

Python

Utilizza l'opzione della pipeline --number_of_worker_harness_threads.

Vai

Utilizza l'opzione della pipeline --number_of_worker_harness_threads.

Per i job batch, imposta il valore su un numero inferiore al numero di vCPU.

Per i job di streaming, inizia riducendo il valore alla metà del valore predefinito. Se questo passaggio non mitiga il problema, continua a ridurre il valore della metà, osservando i risultati a ogni passaggio. Ad esempio, quando utilizzi Python, prova i valori 6, 3 e 1.

Utilizzare un tipo di macchina con più memoria per vCPU

Per selezionare un worker con più memoria per vCPU, utilizza uno dei seguenti metodi.

  • Utilizza un tipo di macchina con memoria elevata nella famiglia di macchine per uso generico. I tipi di macchine con memoria elevata hanno più memoria per vCPU rispetto ai tipi di macchine standard. L'utilizzo di un tipo di macchina con memoria elevata aumenta sia la memoria disponibile per ogni worker sia la memoria disponibile per thread, perché il numero di vCPU rimane lo stesso. Di conseguenza, l'utilizzo di un tipo di macchina con memoria elevata può essere un modo conveniente per selezionare un worker con più memoria per vCPU.
  • Per una maggiore flessibilità nella specifica del numero di vCPU e della quantità di memoria, puoi utilizzare un tipo di macchina personalizzata. Con i tipi di macchine personalizzate, puoi aumentare la memoria con incrementi di 256 MB. Questi tipi di macchine hanno prezzi diversi rispetto ai tipi di macchine standard.
  • Alcune famiglie di macchine ti consentono di utilizzare tipi di macchine personalizzate con memoria estesa. La memoria estesa consente un rapporto memoria/vCPU più elevato. Il costo è più elevato. Esempi di tipi di macchine personalizzate con memoria estesa includono n2-custom-1-19456-ext e n2-custom-8-317440-ext.

Per impostare i tipi di worker, utilizza la seguente opzione della pipeline. Per saperne di più, consulta Impostare le opzioni della pipeline e Opzioni della pipeline.

Java

Utilizza l'opzione della pipeline --workerMachineType.

Python

Utilizza l'opzione della pipeline --machine_type.

Vai

Utilizza l'opzione della pipeline --worker_machine_type.

Utilizzare un solo processo dell'SDK Apache Beam

Per le pipeline di streaming Python e le pipeline Python che utilizzano Runner v2, puoi forzare Dataflow ad avviare un solo processo dell'SDK Apache Beam per worker.

Prima di provare questa opzione, prova a risolvere il problema utilizzando gli altri metodi. Per configurare le VM worker di Dataflow in modo che avviino un solo processo Python in container, utilizza la seguente opzione della pipeline:

--experiments=no_use_multiple_sdk_containers

Con questa configurazione, le pipeline Python creano un processo dell'SDK Apache Beam per worker. Questa configurazione impedisce la replica degli oggetti e dei dati condivisi più volte per ogni processo dell'SDK Apache Beam. Tuttavia, limita l'utilizzo efficiente delle risorse di calcolo disponibili sul worker.

La riduzione del numero di processi dell'SDK Apache Beam a uno non riduce necessariamente il numero totale di thread avviati sul worker. Inoltre, se tutti i thread si trovano in un singolo processo dell'SDK Apache Beam, l'elaborazione potrebbe essere lenta o la pipeline potrebbe bloccarsi. Pertanto, potrebbe essere necessario ridurre anche il numero di thread, come descritto nella sezione Ridurre il numero di thread in questa pagina.

Puoi anche forzare i worker a utilizzare un solo processo dell'SDK Apache Beam utilizzando un tipo di macchina con una sola vCPU.

Comprendere la memoria utilizzata di Dataflow

Per risolvere i problemi relativi agli errori di memoria insufficiente, è utile comprendere come le pipeline Dataflow utilizzano la memoria.

Quando Dataflow esegue una pipeline, l'elaborazione viene distribuita su più macchine virtuali (VM) Compute Engine, spesso chiamate worker. I worker elaborano gli elementi di lavoro dal servizio Dataflow e li delegano ai processi dell'SDK Apache Beam. Un processo dell'SDK Apache Beam crea istanze di DoFn. DoFn è una classe dell'SDK Apache Beam che definisce una funzione di elaborazione distribuita.

Dataflow avvia diversi thread su ogni worker e la memoria di ogni worker viene condivisa tra tutti i thread. Un thread è una singola attività eseguibile in esecuzione all'interno di un processo più grande. Il numero predefinito di thread dipende da diversi fattori e varia tra i job batch e di streaming.

Se la pipeline richiede più memoria della quantità predefinita di memoria disponibile sui worker, potresti riscontrare errori di memoria insufficiente.

Le pipeline Dataflow utilizzano principalmente la memoria dei worker in tre modi:

Memoria operativa dei worker

I worker Dataflow hanno bisogno di memoria per i sistemi operativi e i processi di sistema. La memoria utilizzata dai worker in genere non supera 1 GB. L'utilizzo è in genere inferiore a 1 GB.

  • Diversi processi sul worker utilizzano la memoria per garantire il corretto funzionamento della pipeline. Ognuno di questi processi potrebbe riservare una piccola quantità di memoria per la sua operazione.
  • Quando la pipeline non utilizza Streaming Engine, i processi worker aggiuntivi utilizzano la memoria.

Memoria del processo SDK

I processi dell'SDK Apache Beam potrebbero creare oggetti e dati condivisi tra i thread all'interno del processo, indicati in questa pagina come oggetti e dati condivisi dell'SDK. L'utilizzo della memoria utilizzata di questi oggetti e dati condivisi dell'SDK è denominato memoria del processo SDK. Il seguente elenco include esempi di oggetti e dati condivisi dell'SDK:

  • Input aggiuntivi
  • Modelli di machine learning
  • Oggetti singleton in memoria
  • Oggetti Python creati con il apache_beam.utils.shared modulo
  • Dati caricati da origini esterne, come Cloud Storage o BigQuery

I job di streaming che non utilizzano Streaming Engine archiviano gli input aggiuntivi in memoria. Per le pipeline Java e Go, ogni worker ha una copia dell'input aggiuntivo. Per le pipeline Python, ogni processo dell'SDK Apache Beam ha una copia dell'input aggiuntivo.

I job di streaming che utilizzano Streaming Engine hanno un limite di dimensioni dell'input aggiuntivo di 80 MB. Gli input aggiuntivi vengono archiviati al di fuori della memoria dei worker.

La memoria utilizzata da oggetti e dati condivisi dell'SDK aumenta linearmente con il numero di processi dell'SDK Apache Beam. Nelle pipeline Java e Go, viene avviato un processo dell'SDK Apache Beam per worker. Nelle pipeline Python, viene avviato un processo dell'SDK Apache Beam per vCPU. Gli oggetti e i dati condivisi dell'SDK vengono riutilizzati tra i thread all'interno dello stesso processo dell'SDK Apache Beam.

Utilizzo della memoria DoFn

DoFn è una classe dell'SDK Apache Beam che definisce una funzione di elaborazione distribuita. Ogni worker può eseguire istanze DoFn simultanee. Ogni thread esegue un'istanza DoFn. Quando valuti la memoria utilizzata totale, calcolare le dimensioni del working set o la quantità di memoria necessaria per il funzionamento continuo di un'applicazione potrebbe essere utile. Ad esempio, se una singola DoFn utilizza un massimo di 5 MB di memoria e un worker ha 300 thread, l'utilizzo della memoria DoFn potrebbe raggiungere un picco di 1,5 GB, ovvero il numero di byte di memoria moltiplicato per il numero di thread. A seconda di come i worker utilizzano la memoria, un picco nell'utilizzo della memoria potrebbe causare l'esaurimento della memoria dei worker.

È difficile stimare il numero di istanze di una DoFn create da Dataflow. Il numero dipende da vari fattori, come l'SDK, il tipo di macchina e altre variabili. Inoltre, la DoFn potrebbe essere utilizzata da più thread in successione. Il servizio Dataflow non garantisce il numero di volte in cui viene richiamata una DoFn, né garantisce il numero esatto di istanze DoFn create nel corso di una pipeline. Tuttavia, la tabella seguente fornisce alcune informazioni sul livello di parallelismo che puoi aspettarti e stima un limite superiore per il numero di istanze DoFn.

SDK Python Beam

Batch Streaming senza Streaming Engine Streaming Engine
Parallelismo

1 processo per vCPU

1 thread per processo

1 thread per vCPU

1 processo per vCPU

12 thread per processo

12 thread per vCPU

1 processo per vCPU

12 thread per processo

12 thread per vCPU

Numero massimo di istanze DoFn simultanee (tutti questi numeri sono soggetti a modifiche in qualsiasi momento)

1 DoFn per thread

1 DoFn per vCPU

1 DoFn per thread

12 DoFn per vCPU

1 DoFn per thread

12 DoFn per vCPU

SDK Java/Go Beam

Batch Appliance di streaming e Streaming Engine senza Runner v2 Streaming Engine con Runner v2
Parallelismo

1 processo per VM worker

1 thread per vCPU

1 processo per VM worker

300 thread per processo

300 thread per VM worker

1 processo per VM worker

500 thread per processo

500 thread per VM worker

Numero massimo di istanze DoFn simultanee (tutti questi numeri sono soggetti a modifiche in qualsiasi momento)

1 DoFn per thread

1 DoFn per vCPU

1 DoFn per thread

300 DoFn per VM worker

1 DoFn per thread

500 DoFn per VM worker

Ad esempio, quando utilizzi l'SDK Python con un worker Dataflow n1-standard-2, si applica quanto segue:

  • Job batch: Dataflow avvia un processo per vCPU (due in questo caso). Ogni processo utilizza un thread e ogni thread crea un'istanza DoFn.
  • Job di streaming con Streaming Engine: Dataflow avvia un processo per vCPU (due in totale). Tuttavia, ogni processo può generare fino a 12 thread, ognuno con la propria istanza DoFn.

Quando progetti pipeline complesse, è importante comprendere il DoFn ciclo di vita. Assicurati che le funzioni DoFn siano serializzabili ed evita di modificare direttamente l'argomento dell'elemento al loro interno.

Quando hai una pipeline multilingue e sul worker è in esecuzione più di un SDK Apache Beam, il worker utilizza il grado di parallelismo thread-per-processo più basso possibile.

Differenze tra Java, Go e Python

Java, Go e Python gestiscono i processi e la memoria in modo diverso. Di conseguenza, l'approccio da adottare per la risoluzione dei problemi relativi agli errori di memoria insufficiente varia a seconda che la pipeline utilizzi Java, Go o Python.

Pipeline Java e Go

Nelle pipeline Java e Go:

  • Ogni worker avvia un processo dell'SDK Apache Beam.
  • Gli oggetti e i dati condivisi dell'SDK, come gli input aggiuntivi e le cache, vengono condivisi tra tutti i thread sul worker.
  • La memoria utilizzata dagli oggetti e dai dati condivisi dell'SDK in genere non viene scalata in base al numero di vCPU sul worker.

Pipeline Python

Nelle pipeline Python:

  • Ogni worker avvia un processo dell'SDK Apache Beam per vCPU.
  • Gli oggetti e i dati condivisi dell'SDK, come gli input aggiuntivi e le cache, vengono condivisi tra tutti i thread all'interno di ogni processo dell'SDK Apache Beam.
  • Il numero totale di thread sul worker viene scalato linearmente in base al numero di vCPU. Di conseguenza, la memoria utilizzata dagli oggetti e dai dati condivisi dell'SDK aumenta linearmente con il numero di vCPU.
  • I thread che eseguono il lavoro vengono distribuiti tra i processi. Le nuove unità di lavoro vengono assegnate a un processo senza elementi di lavoro o al processo con il minor numero di elementi di lavoro assegnati in quel momento.