Le sezioni seguenti forniscono suggerimenti per ottimizzare le applicazioni Spark di Managed Service per Apache Spark.
Utilizza cluster temporanei
Quando utilizzi il modello di cluster "temporaneo" di Managed Service per Apache Spark, crei un cluster dedicato per ogni job e, al termine del job, elimini il cluster. Con il modello temporaneo, puoi trattare separatamente l'archiviazione e il calcolo, salvando i dati di input e output dei job in Cloud Storage o BigQuery e utilizzando il cluster solo per il calcolo e l'archiviazione temporanea dei dati.
Insidie dei cluster permanenti
L'utilizzo di cluster temporanei per un singolo job evita le seguenti insidie e i potenziali problemi associati all'utilizzo di cluster "permanenti" condivisi e a lunga esecuzione:
- Punti singoli di errore: lo stato di errore di un cluster condiviso può causare il fallimento di tutti i job, bloccando un'intera pipeline di dati. L'indagine e il ripristino da un errore possono richiedere ore. Poiché i cluster temporanei mantengono solo gli stati temporanei in-cluster, quando si verifica un errore possono essere eliminati e ricreati rapidamente.
- Difficoltà a mantenere ed eseguire la migrazione degli stati dei cluster in HDFS, MySQL o file system locali
- Contese di risorse tra i job che influiscono negativamente sugli SLO
- Daemon di servizio che non rispondono a causa della pressione della memoria
- Accumulo di log e file temporanei che possono superare la capacità del disco
- Errore di scalabilità verticale a causa dell'esaurimento delle scorte della zona del cluster
- Mancanza di supporto per le versioni obsolete delle immagini dei cluster.
Vantaggi dei cluster temporanei
D'altra parte, i cluster temporanei ti consentono di:
- Configurare autorizzazioni IAM diverse per job diversi con service account VM di Managed Service per Apache Spark diversi .
- Ottimizzare le configurazioni hardware e software di un cluster per ogni job, modificando le configurazioni del cluster in base alle esigenze.
- Eseguire l'upgrade delle versioni delle immagini nei nuovi cluster per ottenere le patch di sicurezza, le correzioni di bug e le ottimizzazioni più recenti.
- Risolvere i problemi più rapidamente su un cluster isolato a job singolo.
- Risparmiare sui costi pagando solo il tempo di esecuzione del cluster temporaneo, non il tempo di inattività tra i job su un cluster condiviso.
Utilizza Spark SQL
L'API DataFrame di Spark SQL è un'ottimizzazione significativa dell'API RDD. Se interagisci con codice che utilizza gli RDD, valuta la possibilità di leggere i dati come DataFrame prima di passare un RDD nel codice. Nel codice Java o Scala, valuta la possibilità di utilizzare l'API Dataset di Spark SQL come superset di RDD e DataFrame.
Utilizza Apache Spark 3
Managed Service per Apache Spark 2.0 installa Spark 3, che include le seguenti funzionalità e miglioramenti delle prestazioni:
- Supporto GPU
- Possibilità di leggere file binari
- Miglioramenti delle prestazioni
- Eliminazione dinamica delle partizioni
- Esecuzione di query adattiva, che ottimizza i job Spark in tempo reale
Utilizza l'allocazione dinamica
Apache Spark include una funzionalità di allocazione dinamica che ridimensiona il numero di executor Spark sui worker all'interno di un cluster. Questa funzionalità consente a un job di utilizzare l'intero cluster Managed Service per Apache Spark anche quando il cluster esegue lo scale up. Questa funzionalità è abilitata per impostazione predefinita in Managed Service per Apache Spark
(spark.dynamicAllocation.enabled è impostato su true). Per ulteriori informazioni, consulta
Allocazione dinamica di Spark.
Utilizza la scalabilità automatica di Managed Service per Apache Spark
La scalabilità automatica di Managed Service per Apache Spark aggiunge e rimuove dinamicamente i worker di Managed Service per Apache Spark da un cluster per garantire che i job Spark dispongano delle risorse necessarie per essere completati rapidamente.
È una best practice configurare il criterio di scalabilità automatica in modo da scalare solo i worker secondari.
Utilizza la modalità di flessibilità avanzata di Managed Service per Apache Spark
I cluster con VM prerilasciabili o un criterio di scalabilità automatica potrebbero ricevere eccezioni FetchFailed quando i worker vengono prerilasciati o rimossi prima di terminare la pubblicazione dei dati di shuffling ai reducer. Questa eccezione può causare nuovi tentativi di attività e tempi di completamento dei job più lunghi.
Consigliamo di utilizzare Managed Service per Apache Spark Enhanced Flexibility Mode, che non archivia i dati di shuffling intermedi sui worker secondari, in modo che i worker secondari possano essere prerilasciati o ridimensionati in modo sicuro.
Configura il partizionamento e lo shuffling
Spark archivia i dati in partizioni temporanee sul cluster. Se l'applicazione raggruppa o unisce i DataFrame, esegue lo shuffling dei dati in nuove partizioni in base al raggruppamento e alla configurazione di basso livello.
Il partizionamento dei dati influisce in modo significativo sul rendimento dell'applicazione: un numero insufficiente di partizioni limita il parallelismo dei job e l'utilizzo delle risorse del cluster, mentre un numero eccessivo di partizioni rallenta il job a causa dell'elaborazione e dello shuffling aggiuntivi delle partizioni.
Configurazione delle partizioni
Le seguenti proprietà regolano il numero e le dimensioni delle partizioni:
spark.sql.files.maxPartitionBytes: la dimensione massima delle partizioni quando leggi i dati da Cloud Storage. Il valore predefinito è 128 MB, che è sufficientemente grande per la maggior parte delle applicazioni che elaborano meno di 100 TB.spark.sql.shuffle.partitions: il numero di partizioni dopo l'esecuzione di uno shuffling. Il valore predefinito è1000per i cluster con versione dell'immagine2.2e successive. Consigliamo di impostare questo valore su 3 volte il numero di vCPU nel cluster.spark.default.parallelism: il numero di partizioni restituite dopo l'esecuzione di trasformazioni RDD che richiedono shuffling, comejoin,reduceByKeyeparallelize. Il valore predefinito è il numero totale di vCPU nel cluster. Quando utilizzi gli RDD nei job Spark, puoi impostare questo numero su 3 volte le vCPU
Limita il numero di file
Si verifica una perdita di prestazioni quando Spark legge un numero elevato di file di piccole dimensioni. Archivia i dati in file di dimensioni maggiori, ad esempio dimensioni dei file nell'intervallo 256 MB-512 MB. Allo stesso modo, limita il numero di file di output (per forzare uno shuffling, vedi Evita shuffling non necessari).
Configura l'esecuzione di query adattiva (Spark 3)
L'esecuzione di query adattiva (abilitata per impostazione predefinita nella versione dell'immagine di Managed Service per Apache Spark 2.0) offre miglioramenti delle prestazioni dei job Spark, tra cui:
- Unione delle partizioni dopo lo shuffling
- Conversione delle unioni sort-merge in unioni broadcast
- Ottimizzazioni per le unioni con skew.
Sebbene le impostazioni di configurazione predefinite siano valide per la maggior parte dei casi d'uso, l'impostazione di spark.sql.adaptive.advisoryPartitionSizeInBytes su spark.sqlfiles.maxPartitionBytes (valore predefinito 128 MB) può essere utile.
Evita shuffling non necessari
Spark consente agli utenti di attivare manualmente uno shuffling per ribilanciare i dati con la funzione repartition. Gli shuffling sono costosi, quindi il ripartizionamento dei dati deve essere utilizzato con cautela. L'impostazione corretta delle configurazioni
delle partizioni dovrebbe essere sufficiente per consentire a Spark di partizionare automaticamente
i dati.
Eccezione: quando scrivi dati partizionati per colonna in Cloud Storage, il ripartizionamento su una colonna specifica evita di scrivere molti file di piccole dimensioni per ottenere tempi di scrittura più rapidi.
df.repartition("col_name").write().partitionBy("col_name").save("gs://...")
Archivia i dati in Parquet o Avro
Per impostazione predefinita, Spark SQL legge e scrive i dati nei file Parquet compressi con Snappy. Parquet Parquet è un formato di file colonnare efficiente che consente a Spark di leggere solo i dati necessari per eseguire un'applicazione. Questo è un vantaggio importante quando si lavora con set di dati di grandi dimensioni. Anche altri formati colonnari, come Apache ORC, offrono buone prestazioni.
Per i dati non colonnari, Apache Avro fornisce un formato di file binario-riga efficiente. Sebbene in genere sia più lento di Parquet, il rendimento di Avro è migliore rispetto ai formati basati su testo,come CSV o JSON.
Ottimizza le dimensioni del disco
La velocità effettiva dei dischi permanenti aumenta in base alle dimensioni del disco, il che può influire sul rendimento dei job Spark, poiché i job scrivono metadati e dati di shuffling sul disco. Quando utilizzi i dischi permanenti standard, le dimensioni del disco devono essere di almeno 1 terabyte per worker (vedi Rendimento in base alle dimensioni del disco permanente).
Per monitorare la velocità effettiva del disco del worker nella Google Cloud console:
- Fai clic sul nome del cluster nella pagina Cluster.
- Fai clic sulla scheda ISTANZE VM.
- Fai clic sul nome di un worker.
- Fai clic sulla scheda MONITORAGGIO, quindi scorri verso il basso fino a Velocità effettiva del disco per visualizzare la velocità effettiva del worker.
Considerazioni sui dischi
I cluster temporanei di Managed Service per Apache Spark, che non usufruiscono dell'archiviazione permanente, possono utilizzare gli SSD locali. Gli SSD locali sono collegati fisicamente al cluster e offrono una velocità effettiva maggiore rispetto ai dischi permanenti (vedi la tabella Rendimento). Gli SSD locali sono disponibili con una dimensione fissa di 375 gigabyte, ma puoi aggiungere più SSD per aumentare le prestazioni.
I dati degli SSD locali non vengono conservati dopo l'arresto di un cluster. Se hai bisogno di spazio di archiviazione permanente, puoi utilizzare i dischi permanenti SSD, che offrono una velocità effettiva maggiore per le loro dimensioni rispetto ai dischi permanenti standard. I dischi permanenti SSD sono anche una buona scelta se le dimensioni della partizione saranno inferiori a 8 KB (tuttavia, evita le partizioni di piccole dimensioni).
Collega le GPU al cluster
Spark 3 supporta le GPU. Utilizza le GPU con l' azione di inizializzazione RAPIDS per velocizzare i job Spark utilizzando l' acceleratore SQL RAPIDS. L' azione di inizializzazione dei driver GPU per configurare un cluster con GPU.
Errori comuni dei job e relative correzioni
Memoria insufficiente
Esempi:
- "Executor perso"
- "java.lang.OutOfMemoryError: GC overhead limit exceeded"
- "Container killed by YARN for exceeding memory limits"
Possibili correzioni:
- Se utilizzi PySpark, aumenta
spark.executor.memoryOverheade diminuiscispark.executor.memory. - Utilizza tipi di macchine con memoria elevata.
- Utilizza partizioni più piccole.
Errori di recupero dello shuffling
Esempi:
- "FetchFailedException" (errore di Spark)
- "Failed to connect to..." (errore di Spark)
- "Failed to fetch" (errore di MapReduce)
In genere è causato dalla rimozione prematura dei worker che hanno ancora dati di shuffling da pubblicare.
Possibili cause e correzioni:
- Le VM dei worker prerilasciabili sono state recuperate o le VM dei worker non prerilasciabili sono state rimosse dal gestore della scalabilità automatica. Soluzione: utilizza la modalità di flessibilità avanzata per rendere i worker secondari prerilasciabili o scalabili in modo sicuro.
- L'executor o il mapper si è arrestato in modo anomalo a causa di un errore OutOfMemory. Soluzione: aumenta la memoria dell'executor o del mapper.
- Il servizio di shuffling di Spark potrebbe essere sovraccarico. Soluzione: riduci il numero di partizioni dei job.
I nodi YARN sono NON INTEGRI
Esempi (dai log YARN):
...reported UNHEALTHY with details: 1/1 local-dirs usable space is below
configured utilization percentage/no more usable space
[ /hadoop/yarn/nm-local-dir : used space above threshold of 90.0% ]
Spesso correlato a spazio su disco insufficiente per i dati di shuffling. Per la diagnosi, visualizza i file di log:
- Apri la pagina Cluster del tuo progetto nella Google Cloud console, quindi fai clic sul nome del cluster.
- Fai clic su VISUALIZZA LOG.
- Filtra i log per
hadoop-yarn-nodemanager. - Cerca "UNHEALTHY".
Possibili correzioni:
- La cache utente viene archiviata nella directory specificata dalla proprietà
yarn.nodemanager.local-dirsnelyarn-site.xml file. Questo file si trova in/etc/hadoop/conf/yarn-site.xml. Puoi controllare lo spazio libero nel percorso/hadoop/yarn/nm-local-dire liberare spazio eliminando la cartella della cache utente/hadoop/yarn/nm-local-dir/usercache. - Se il log segnala lo stato "UNHEALTHY", ricrea il cluster con uno spazio su disco maggiore, che aumenterà il limite di velocità effettiva.
Il job non riesce a causa di memoria del driver insufficiente
Quando esegui i job in modalità cluster, il job non riesce se le dimensioni della memoria del nodo worker sono inferiori alle dimensioni della memoria del driver.
Esempio dai log dei driver:
'Exception in thread "main" java.lang.IllegalArgumentException: Required AM memory (32768+3276 MB) is above the max threshold (12288 MB) of this cluster! Please check the values of 'yarn.scheduler.maximum -allocation-mb' and/or 'yarn.nodemanager.resource.memory-mb'.'
Possibili correzioni:
- Imposta
spark:spark.driver.memorysu un valore inferiore ayarn:yarn.scheduler.maximum-allocation-mb. - Utilizza lo stesso tipo di macchina per i nodi master e worker.
Passaggi successivi
- Scopri di più sull' ottimizzazione delle prestazioni di Spark.