Suggerimenti per l'ottimizzazione del job Spark

Le sezioni che seguono forniscono suggerimenti per ottimizzare le applicazioni Spark Dataproc.

Utilizza cluster temporanei

Quando utilizzi il modello di cluster "temporaneo" di Dataproc, crei un cluster dedicato per ogni job e, al termine del job, elimini il cluster. Con il modello temporaneo, puoi trattare lo spazio di archiviazione e il calcolo separatamente, salvando i dati di input e output del job in Cloud Storage o BigQuery, utilizzando il cluster solo per il calcolo e l'archiviazione temporanea dei dati.

Insidie dei cluster permanenti

L'utilizzo di cluster temporanei a job singolo evita le seguenti insidie e i potenziali problemi associati all'utilizzo di cluster "permanenti" condivisi e a lunga esecuzione:

  • Punti singoli di errore: lo stato di errore di un cluster condiviso può causare l'errore di tutti i job, bloccando un'intera pipeline di dati. L'indagine e il ripristino da un errore possono richiedere ore. Poiché i cluster temporanei mantengono solo gli stati temporanei in-cluster, quando si verifica un errore, possono essere eliminati e ricreati rapidamente.
  • Difficoltà a mantenere ed eseguire la migrazione degli stati dei cluster in HDFS, MySQL o file system locali
  • Contese di risorse tra i job che influiscono negativamente sugli SLO
  • Daemon di servizio che non rispondono a causa della pressione della memoria
  • Accumulo di log e file temporanei che possono superare la capacità del disco
  • Errore di scalabilità verticale a causa dell'esaurimento delle scorte della zona del cluster
  • Mancanza di supporto per le versioni obsolete delle immagini dei cluster.

Vantaggi dei cluster temporanei

D'altra parte, i cluster temporanei ti consentono di:

  • Configurare autorizzazioni IAM diverse per job diversi con account di servizio VM Dataproc diversi.
  • Ottimizzare le configurazioni hardware e software di un cluster per ogni job, modificando le configurazioni del cluster in base alle esigenze.
  • Eseguire l'upgrade delle versioni delle immagini nei nuovi cluster per ottenere le patch di sicurezza , le correzioni di bug e le ottimizzazioni più recenti.
  • Risolvere i problemi più rapidamente su un cluster isolato a job singolo.
  • Risparmiare sui costi pagando solo il tempo di esecuzione del cluster temporaneo, non il tempo di inattività tra i job su un cluster condiviso.

Utilizza Spark SQL

L'API DataFrame di Spark SQL è un'ottimizzazione significativa dell'API RDD. Se interagisci con il codice che utilizza gli RDD, valuta la possibilità di leggere i dati come DataFrame prima di passare un RDD nel codice. Nel codice Java o Scala, valuta la possibilità di utilizzare l'API Dataset di Spark SQL come superset di RDD e DataFrame.

Utilizza Apache Spark 3

Dataproc 2.0 installa Spark 3, che include le seguenti funzionalità e miglioramenti delle prestazioni:

  • Supporto GPU
  • Possibilità di leggere file binari
  • Miglioramenti del rendimento
  • Eliminazione dinamica delle partizioni
  • Esecuzione di query adattiva, che ottimizza i job Spark in tempo reale

Utilizza l'allocazione dinamica

Apache Spark include una funzionalità di allocazione dinamica che ridimensiona il numero di executor Spark sui worker all'interno di un cluster. Questa funzionalità consente a un job di utilizzare l'intero cluster Dataproc anche quando il cluster esegue lo scale up. Questa funzionalità è abilitata per impostazione predefinita su Dataproc (spark.dynamicAllocation.enabled è impostato su true). Per ulteriori informazioni, consulta Allocazione dinamica di Spark.

Utilizza la scalabilità automatica di Dataproc

La scalabilità automatica di Dataproc aggiunge e rimuove dinamicamente i worker Dataproc da un cluster per garantire che i job Spark dispongano delle risorse necessarie per essere completati rapidamente.

È una best practice configurare il criterio di scalabilità automatica in modo da scalare solo i worker secondari.

Utilizza la modalità di flessibilità avanzata di Dataproc

I cluster con VM prerilasciabili o un criterio di scalabilità automatica potrebbero ricevere eccezioni FetchFailed quando i worker vengono prerilasciati o rimossi prima di terminare la pubblicazione dei dati shuffle ai reducer. Questa eccezione può causare nuovi tentativi di attività e tempi di completamento dei job più lunghi.

Consigliamo di utilizzare la modalità di flessibilità avanzatadi Dataproc, che non archivia i dati shuffle intermedi sui worker secondari, in modo che i worker secondari possano essere prerilasciati o ridimensionati in modo sicuro.

Configura il partizionamento e lo shuffling

Spark archivia i dati in partizioni temporanee sul cluster. Se l'applicazione raggruppa o unisce i DataFrame, esegue lo shuffle dei dati in nuove partizioni in base al raggruppamento e alla configurazione di basso livello.

Il partizionamento dei dati influisce in modo significativo sul rendimento dell'applicazione: un numero insufficiente di partizioni limita il parallelismo dei job e l'utilizzo delle risorse del cluster; un numero eccessivo di partizioni rallenta il job a causa dell' elaborazione e dello shuffling aggiuntivi delle partizioni.

Configurazione delle partizioni

Le seguenti proprietà regolano il numero e le dimensioni delle partizioni:

  • spark.sql.files.maxPartitionBytes: la dimensione massima delle partizioni quando leggi i dati da Cloud Storage. Il valore predefinito è 128 MB, che è sufficientemente grande per la maggior parte delle applicazioni che elaborano meno di 100 TB.

  • spark.sql.shuffle.partitions: il numero di partizioni dopo l'esecuzione di uno shuffle. Il valore predefinito è 1000 per i cluster con versione dell'immagine 2.2 e successive. Consigliamo di impostare questo valore su 3 volte il numero di vCPU nel cluster.

  • spark.default.parallelism: il numero di partizioni restituite dopo l'esecuzione di trasformazioni RDD che richiedono shuffle, come join, reduceByKey e parallelize. Il valore predefinito è il numero totale di vCPU nel cluster. Quando utilizzi gli RDD nei job Spark, puoi impostare questo numero su 3 volte le vCPU

Limita il numero di file

Si verifica una perdita di prestazioni quando Spark legge un numero elevato di file di piccole dimensioni. Archivia i dati in file di dimensioni maggiori, ad esempio, dimensioni dei file nell'intervallo 256 MB-512 MB. Allo stesso modo, limita il numero di file di output (per forzare uno shuffle, vedi Evita shuffle non necessari).

Configura l'esecuzione di query adattiva (Spark 3)

L'esecuzione di query adattiva (abilitata per impostazione predefinita nella versione dell'immagine Dataproc 2.0) offre miglioramenti delle prestazioni dei job Spark, tra cui:

Sebbene le impostazioni di configurazione predefinite siano valide per la maggior parte dei casi d'uso, l'impostazione di spark.sql.adaptive.advisoryPartitionSizeInBytes su spark.sqlfiles.maxPartitionBytes (valore predefinito 128 MB) può essere utile.

Evita shuffle non necessari

Spark consente agli utenti di attivare manualmente uno shuffle per ribilanciare i dati con la funzione repartition. Gli shuffle sono costosi, quindi il reshuffling dei dati deve essere utilizzato con cautela. L'impostazione corretta delle configurazioni delle partizioni dovrebbe essere sufficiente per consentire a Spark di partizionare automaticamente i dati.

Eccezione: quando scrivi dati partizionati per colonna in Cloud Storage, la ripartizione su una colonna specifica evita di scrivere molti file di piccole dimensioni per ottenere tempi di scrittura più rapidi.

df.repartition("col_name").write().partitionBy("col_name").save("gs://...")

Archivia i dati in Parquet o Avro

Per impostazione predefinita, Spark SQL legge e scrive i dati nei file Parquet compressi con Snappy. Parquet Parquet è un formato di file colonnare efficiente che consente a Spark di leggere solo i dati necessari per eseguire un' applicazione. Questo è un vantaggio importante quando si lavora con set di dati di grandi dimensioni. Anche altri formati colonnari, come Apache ORC, offrono buone prestazioni.

Per i dati non colonnari, Apache Avro fornisce un formato di file binario-riga efficiente. Sebbene in genere sia più lento di Parquet, il rendimento di Avro è migliore rispetto ai formati basati su testo,come CSV o JSON.

Ottimizza le dimensioni del disco

La velocità effettiva dei dischi permanenti aumenta in base alle dimensioni del disco, il che può influire sul rendimento dei job Spark, poiché i job scrivono metadati e dati shuffle su disco. Quando utilizzi i dischi permanenti standard, le dimensioni del disco devono essere di almeno 1 terabyte per worker (vedi Rendimento in base alle dimensioni del disco permanente).

Per monitorare la velocità effettiva del disco del worker nella Google Cloud console:

  1. Fai clic sul nome del cluster nella pagina Cluster.
  2. Fai clic sulla scheda ISTANZE VM.
  3. Fai clic sul nome di un worker.
  4. Fai clic sulla scheda MONITORAGGIO, quindi scorri verso il basso fino a Velocità effettiva del disco per visualizzare la velocità effettiva del worker.

Considerazioni sui dischi

I cluster Dataproc temporanei, che non usufruiscono dello spazio di archiviazione permanente, possono utilizzare gli SSD locali. Gli SSD locali sono collegati fisicamente al cluster e forniscono una velocità effettiva maggiore rispetto ai dischi permanenti (vedi la tabella Rendimento). Gli SSD locali sono disponibili con una dimensione fissa di 375 gigabyte, ma puoi aggiungere più SSD per aumentare le prestazioni.

I dati sugli SSD locali non vengono conservati dopo l'arresto di un cluster. Se hai bisogno di spazio di archiviazione permanente, puoi utilizzare i dischi permanenti SSD, che forniscono una velocità effettiva maggiore per le loro dimensioni rispetto ai dischi permanenti standard. I dischi permanenti SSD sono anche una buona scelta se le dimensioni della partizione saranno inferiori a 8 KB (tuttavia, evita le partizioni piccole).

Collega le GPU al cluster

Spark 3 supporta le GPU. Utilizza le GPU con l' azione di inizializzazione RAPIDS per velocizzare i job Spark utilizzando l' acceleratore SQL RAPIDS. L' azione di inizializzazione dei driver GPU per configurare un cluster con GPU.

Errori comuni dei job e relative correzioni

Memoria insufficiente

Esempi:

  • "Executor perso"
  • "java.lang.OutOfMemoryError: GC overhead limit exceeded"
  • "Container killed by YARN for exceeding memory limits"

Possibili correzioni:

Errori di recupero shuffle

Esempi:

  • "FetchFailedException" (errore di Spark)
  • "Failed to connect to..." (errore di Spark)
  • "Failed to fetch" (errore di MapReduce)

In genere è causato dalla rimozione prematura dei worker che hanno ancora dati shuffle da pubblicare.

Possibili cause e correzioni:

I nodi YARN sono NON INTEGRI

Esempi (dai log YARN):

...reported UNHEALTHY with details: 1/1 local-dirs usable space is below
configured utilization percentage/no more usable space
[ /hadoop/yarn/nm-local-dir : used space above threshold of 90.0% ]

Spesso correlato a spazio su disco insufficiente per i dati shuffle. Per la diagnosi, visualizza i file di log:

  • Apri la pagina Cluster del progetto nella Google Cloud console, quindi fai clic sul nome del cluster.
  • Fai clic su VISUALIZZA LOG.
  • Filtra i log in base a hadoop-yarn-nodemanager.
  • Cerca "UNHEALTHY".

Possibili correzioni:

  • La cache utente viene archiviata nella directory specificata dalla yarn.nodemanager.local-dirs proprietà nel yarn-site.xml file. Questo file si trova in /etc/hadoop/conf/yarn-site.xml. Puoi controllare lo spazio libero nel percorso /hadoop/yarn/nm-local-dir e liberare spazio eliminando la cartella della cache utente /hadoop/yarn/nm-local-dir/usercache.
  • Se il log segnala lo stato "UNHEALTHY", ricrea il cluster con uno spazio su disco maggiore, che aumenterà il limite di velocità effettiva.

Il job non riesce a causa di memoria del driver insufficiente

Quando esegui i job in modalità cluster, il job non riesce se le dimensioni della memoria del nodo worker sono inferiori alle dimensioni della memoria del driver.

Esempio dai log dei driver:

'Exception in thread "main" java.lang.IllegalArgumentException:
Required AM memory (32768+3276 MB) is above the max threshold (12288 MB) of this cluster!
Please check the values of 'yarn.scheduler.maximum -allocation-mb' and/or 'yarn.nodemanager.resource.memory-mb'.'

Possibili correzioni:

  • Imposta spark:spark.driver.memory su un valore inferiore a yarn:yarn.scheduler.maximum-allocation-mb.
  • Utilizza lo stesso tipo di macchina per i nodi master e worker.

Passaggi successivi