Le pipeline vengono eseguite su cluster di macchine. Ottengono un throughput elevato suddividendo il lavoro da svolgere e poi eseguendolo in parallelo sui più executor distribuiti nel cluster. In generale, maggiore è il numero di suddivisioni (chiamate anche partizioni), più velocemente può essere eseguita la pipeline. Il livello di parallelismo nella pipeline è determinato dalle origini e dalle fasi di shuffle della pipeline.
Fonti
All'inizio di ogni esecuzione della pipeline, ogni origine nella pipeline calcola i dati da leggere e come questi dati possono essere suddivisi in suddivisioni. Ad esempio, considera una pipeline di base che legge da Cloud Storage, esegue alcune trasformazioni di Wrangler e poi scrive di nuovo in Cloud Storage.

Quando la pipeline viene avviata, l'origine Cloud Storage esamina i file di input e li suddivide in base alle dimensioni dei file. Ad esempio, un singolo file di gigabyte può essere suddiviso in 100 suddivisioni, ognuna di 10 MB. Ogni executor legge i dati per quella suddivisione, esegue le trasformazioni di Wrangler e poi scrive l'output in un file part.
Se la pipeline viene eseguita lentamente, una delle prime cose da controllare è se le origini creano suddivisioni sufficienti per sfruttare appieno il parallelismo. Ad esempio, alcuni tipi di compressione rendono i file di testo normale non suddivisibili. Se stai leggendo file compressi con gzip, potresti notare che la pipeline viene eseguita molto più lentamente rispetto alla lettura di file non compressi o compressi con BZIP (che è suddiviso). Allo stesso modo, se utilizzi l'origine del database e l'hai configurata per utilizzare una sola suddivisione, viene eseguita molto più lentamente rispetto a se la configuri per utilizzare più suddivisioni.
Shuffle
Alcuni tipi di plug-in causano lo shuffle dei dati nel cluster. Ciò accade quando i record elaborati da un executor devono essere inviati a un altro executor per eseguire il calcolo. Gli shuffle sono operazioni costose perché comportano un elevato I/O. I plug-in che causano lo shuffle dei dati vengono visualizzati nella sezione Analisi di Pipeline Studio. Questi includono plug-in come Group By, Deduplicate, Distinct e Joiner. Ad esempio, supponiamo che alla pipeline nell'esempio precedente venga aggiunta una fase Group By.
Supponiamo anche che i dati letti rappresentino gli acquisti effettuati in un negozio di alimentari.
Ogni record contiene un campo item e un campo num_purchased. Nella fase Group By, configuriamo la pipeline per raggruppare i record nel campo item e calcolare la somma del campo num_purchased.
Quando la pipeline viene eseguita, i file di input vengono suddivisi come descritto in precedenza. Dopodiché, ogni record viene sottoposto a shuffle nel cluster in modo che ogni record con lo stesso elemento appartenga allo stesso executor.
Come illustrato nell'esempio precedente, i record degli acquisti di mele erano originariamente distribuiti su più executor. Per eseguire l'aggregazione, tutti questi record dovevano essere inviati allo stesso executor nel cluster.
La maggior parte dei plug-in che richiedono uno shuffle consente di specificare il numero di partizioni da utilizzare durante lo shuffle dei dati. Questo controlla il numero di executor utilizzati per elaborare i dati sottoposti a shuffle.
Nell'esempio precedente, se il numero di partizioni è impostato su 2, ogni executor calcola gli aggregati per due elementi anziché uno.
Tieni presente che è possibile ridurre il parallelismo della pipeline dopo questa fase. Ad esempio, considera la visualizzazione logica della pipeline:
Se l'origine suddivide i dati in 500 partizioni, ma il raggruppamento per shuffle utilizza 200 partizioni, il livello massimo di parallelismo dopo il raggruppamento per scende da 500 a 200. Invece di 500 file di parti diversi scritti in Cloud Storage, ne hai solo 200.
Scegliere le partizioni
Se il numero di partizioni è troppo basso, non utilizzerai la capacità completa del cluster per parallelizzare il maggior numero possibile di lavoro. Se imposti un numero di partizioni troppo elevato, aumenti la quantità di overhead non necessario. In generale, è meglio utilizzare troppe partizioni che troppo poche. L'overhead aggiuntivo è un problema se la pipeline richiede alcuni minuti per l'esecuzione e stai cercando di ridurre di un paio di minuti. Se la pipeline richiede ore per l'esecuzione, in genere non devi preoccuparti dell'overhead.
Un modo utile, ma eccessivamente semplificato, per determinare il numero di partizioni da utilizzare è impostarlo su max(cluster CPUs, input records / 500,000). In altre parole, prendi il numero di record di input e dividilo per 500.000. Se questo numero è maggiore del numero di CPU del cluster, utilizzalo per il numero di partizioni.
In caso contrario, utilizza il numero di CPU del cluster. Ad esempio, se il cluster ha 100 CPU e la fase di shuffle prevede 100 milioni di record di input, utilizza 200 partizioni.
Una risposta più completa è che gli shuffle funzionano meglio quando i dati di shuffle intermedi per ogni partizione possono essere contenuti completamente nella memoria di un executor, in modo che non sia necessario eseguire lo spill su disco. Spark riserva poco meno del 30% della memoria di un executor per contenere i dati di shuffle. Il numero esatto è (memoria totale - 300 MB) * 30%. Se supponiamo che ogni executor sia configurato per utilizzare 2 GB di memoria, significa che ogni partizione non deve contenere più di (2 GB - 300 MB) * 30% = circa 500 MB di record. Se supponiamo che ogni record venga compresso a 1 KB, significa che (500 MB / partizione) / (1 KB/record) = 500.000 record per partizione. Se gli executor utilizzano più memoria o i record sono più piccoli, puoi modificare questo numero di conseguenza.
Distorsione dei dati
Tieni presente che nell'esempio precedente gli acquisti di vari articoli sono stati distribuiti in modo uniforme. Ovvero, sono stati effettuati tre acquisti ciascuno per mele, banane, carote e uova. Lo shuffle su una chiave distribuita in modo uniforme è il tipo di shuffle più performante, ma molti set di dati non hanno questa proprietà. Continuando con l'acquisto di generi alimentari nell'esempio precedente, ti aspetteresti di avere molti più acquisti di uova che di biglietti di auguri. Quando ci sono alcune chiavi di shuffle molto più comuni di altre, si tratta di dati distorti. I dati distorti possono avere prestazioni significativamente peggiori rispetto ai dati non distorti, perché una quantità sproporzionata di lavoro viene eseguita da una piccola manciata di executor. Fa sì che un piccolo sottoinsieme di partizioni sia molto più grande di tutte le altre.
In questo esempio, gli acquisti di uova sono cinque volte superiori a quelli di biglietti, il che significa che l'aggregato di uova richiede circa cinque volte più tempo per essere calcolato. Non fa molta differenza quando si tratta di 10 record anziché due, ma fa una grande differenza quando si tratta di cinque miliardi di record anziché un miliardo. Quando hai una distorsione dei dati, il numero di partizioni utilizzate in uno shuffle non ha un grande impatto sulle prestazioni della pipeline.
Puoi riconoscere la distorsione dei dati esaminando il grafico dei record di output nel tempo. Se la fase genera record a una velocità molto più elevata all'inizio dell'esecuzione della pipeline e poi rallenta improvvisamente, potrebbe significare che hai dati distorti.
Puoi anche riconoscere la distorsione dei dati esaminando l'utilizzo della memoria del cluster nel tempo. Se il cluster è al massimo della capacità per un certo periodo di tempo, ma improvvisamente ha un basso utilizzo della memoria utilizzata per un periodo di tempo, è anche un segno che stai gestendo una distorsione dei dati.
La distorsione dei dati influisce maggiormente sulle prestazioni quando viene eseguito un join. Esistono alcune tecniche che possono essere utilizzate per migliorare le prestazioni dei join distorti. Per saperne di più, consulta
Elaborazione parallela per le operazioni JOIN.
Ottimizzazione adattiva per l'esecuzione
Per ottimizzare l'esecuzione in modo adattivo, specifica l'intervallo di partizioni da utilizzare, non il numero di partizione esatto. Il numero di partizione esatto, anche se impostato nella configurazione della pipeline, viene ignorato quando l'esecuzione adattiva è abilitata.
Se utilizzi un cluster Managed Service for Apache Spark effimero, Cloud Data Fusion imposta automaticamente la configurazione corretta, ma per i cluster Managed Service for Apache Spark o Hadoop statici, è possibile impostare i due parametri di configurazione seguenti:
spark.default.parallelism: impostalo sul numero totale di vCore disponibili nel cluster. In questo modo, il cluster non è sottocaricato e viene definito il limite inferiore per il numero di partizioni.spark.sql.adaptive.coalescePartitions.initialPartitionNum: impostalo su 32 volte il numero di vCore disponibili nel cluster. In questo modo viene definito il limite superiore per il numero di partizioni.Spark.sql.adaptive.enabled: per abilitare le ottimizzazioni, imposta questo valore sutrue. Managed Service for Apache Spark lo imposta automaticamente, ma se utilizzi cluster Hadoop generici, devi assicurarti che sia abilitato .
Questi parametri possono essere impostati nella configurazione del motore di una pipeline specifica o nelle proprietà del cluster di un cluster Managed Service for Apache Spark statico.
Passaggi successivi
- Scopri di più sull'elaborazione parallela per le operazioni
JOIN.