Modalità flessibilità avanzata di Managed Service per Apache Spark

La modalità di flessibilità avanzata (EFM) di Managed Service for Apache Spark consente di gestire i dati di shuffling per ridurre al minimo i ritardi nell'avanzamento dei job causati dalla rimozione dei nodi da un cluster in esecuzione. La modalità EFM esegue l'offload dei dati di shuffling scrivendoli sui worker principali. I worker eseguono il pull di questi nodi remoti durante la fase di riduzione. Questa modalità è disponibile solo per i job Spark.

Poiché EFM non archivia i dati di shuffling intermedi sui worker secondari, EFM è adatta ai cluster che utilizzano VM prerilasciabili o che eseguono la scalabilità automatica solo del gruppo di worker secondari.

La modalità EFM è supportata nelle versioni delle immagini 2.0.31+, 2.1.6+, 2.2+ e successive di Managed Service for Apache Spark.

Limitazioni:

  • I job Apache Hadoop YARN che non supportano il trasferimento di AppMaster possono non riuscire in modalità di flessibilità avanzata (vedi Quando attendere il completamento di AppMaster).
  • La modalità di flessibilità avanzata non è consigliata:
    • su un cluster che ha solo worker principali
    • sui job di streaming, poiché la pulizia dei dati di shuffling intermedi può richiedere fino a 30 minuti dopo il completamento del job.
    • su un cluster che esegue notebook, poiché i dati di shuffling potrebbero non essere puliti durante la durata della sessione.
    • quando i job Spark vengono eseguiti su un cluster con rimozione controllata controllata abilitata. La dismissione controllata e la modalità EFM possono funzionare in modo opposto, poiché il meccanismo di dismissione controllata di YARN mantiene i nodi in fase di dismissione fino al completamento di tutte le applicazioni coinvolte.
    • su un cluster che esegue job Spark e non Spark.
  • La modalità di flessibilità avanzata non è supportata:
    • quando è abilitata la scalabilità automatica dei worker principali. Nella maggior parte dei casi, i worker principali continueranno ad archiviare i dati di shuffling che non vengono migrati automaticamente. La riduzione del gruppo di worker principali annulla i vantaggi della modalità EFM.

Utilizzare la modalità di flessibilità avanzata

La flessibilità avanzata è abilitata quando crei un cluster impostando la dataproc:efm.spark.shuffle proprietà del cluster su primary-worker.

Esempio:

gcloud dataproc clusters create cluster-name \
    --region=region \
    --properties=dataproc:efm.spark.shuffle=primary-worker \
     other flags ...

Esempio di Apache Spark

  1. Esegui un job WordCount sul testo pubblico di Shakespeare utilizzando il file JAR degli esempi di Spark sul cluster EFM.
    gcloud dataproc jobs submit spark \
        --cluster=cluster-name \
        --region=region \
        --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
        --class=org.apache.spark.examples.JavaWordCount \
        -- gs://apache-beam-samples/shakespeare/macbeth.txt
    

Configurare gli SSD locali

Poiché la modalità EFM scrive i dati di shuffling intermedi sui dischi collegati alle VM, beneficia della velocità effettiva e degli IOPS aggiuntivi forniti da SSD locali. Per facilitare l'allocazione delle risorse, quando configuri le macchine dei worker principali, punta a un obiettivo di circa 1 partizione SSD locale per 4 vCPU.

Per collegare gli SSD locali, passa il flag --num-worker-local-ssds al comando gcloud Managed Service for Apache Spark clusters create.

In genere, non avrai bisogno di SSD locali sui worker secondari. L'aggiunta di SSD locali ai worker secondari di un cluster (utilizzando il flag --num-secondary-worker-local-ssds) è spesso meno importante perché i worker secondari non scrivono i dati di shuffling in locale. Tuttavia, poiché gli SSD locali migliorano le prestazioni del disco locale, potresti decidere di aggiungere SSD locali ai worker secondari se prevedi che i job siano vincolati all'I/O a causa dell'utilizzo del disco locale: il job utilizza una quantità significativa di disco locale per lo spazio di scratch o le partizioni sono troppo grandi per essere contenute nella memoria e verranno trasferite sul disco.

Rapporto tra worker secondari

Poiché i worker secondari scrivono i dati di shuffling sui worker principali, il cluster deve contenere un numero sufficiente di worker principali con risorse di CPU, memoria e disco sufficienti per gestire il carico di shuffling del job. Per i cluster con scalabilità automatica, per impedire al gruppo principale di scalare e causare un comportamento indesiderato, imposta minInstances sul valore maxInstances nel criterio di scalabilità automatica per il gruppo di worker principali.

Se hai un rapporto elevato tra worker secondari e principali (ad esempio, 10:1), monitora l'utilizzo della CPU, la rete e l'utilizzo del disco dei worker principali per determinare se sono sovraccarichi. A tal fine:

  1. Nella Google Cloud console, vai alla pagina Istanze VM.

  2. Fai clic sulla casella di controllo a sinistra del worker principale.

  3. Fai clic sulla scheda MONITORAGGIO per visualizzare l'utilizzo della CPU, gli IOPS del disco, i byte di rete e altre metriche del worker principale.

Se i worker principali sono sovraccarichi, valuta la possibilità di aumentare manualmente le risorse dei worker principali.

Ridimensionare il gruppo di worker principali

Il gruppo di worker principali può essere aumentato in modo sicuro, ma la riduzione può influire negativamente sull'avanzamento del job. Le operazioni che riducono il gruppo di worker principali devono utilizzare la dismissione controllata, che viene abilitata impostando il flag --graceful-decommission-timeout.

Cluster con scalabilità automatica: la scalabilità del gruppo di worker principali è disabilitata nei cluster EFM con criteri di scalabilità automatica. Per ridimensionare il gruppo di worker principali su un cluster con scalabilità automatica:

  1. Disattiva la scalabilità automatica.

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --disable-autoscaling
    

  2. Scala il gruppo principale.

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --num-workers=num-primary-workers \
        --graceful-decommission-timeout=graceful-decommission-timeout # (if downscaling)
    

  3. Riattiva la scalabilità automatica:

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --autoscaling-policy=autoscaling-policy
    

Monitorare l'utilizzo del disco dei worker principali

I worker principali devono avere spazio su disco sufficiente per i dati di shuffling del cluster. Puoi monitorare questo aspetto indirettamente tramite la metrica remaining HDFS capacity. Man mano che il disco locale si riempie, lo spazio diventa non disponibile per HDFS e la capacità rimanente diminuisce.

Per impostazione predefinita, quando l'utilizzo del disco locale di un worker principale supera il 90% della capacità, il nodo viene contrassegnato come NON INTEGRO nell'interfaccia utente del nodo YARN. Se riscontri problemi di capacità del disco, puoi eliminare i dati non utilizzati da HDFS o fare lo scale up del pool di worker principali.

Configurazione avanzata

Partizionamento e parallelismo

Quando invii un job Spark, configura un livello di partizionamento appropriato. La scelta del numero di partizioni di input e output per una fase di shuffling comporta un compromesso tra diverse caratteristiche di prestazioni. È consigliabile sperimentare con i valori che funzionano per le forme dei job.

Partizioni di input

Il partizionamento dell'input di Spark e MapReduce è determinato dal set di dati di input. Quando leggi i file da Cloud Storage, ogni attività elabora circa una quantità di dati pari a una "dimensione del blocco".

  • Per i job Spark SQL, la dimensione massima della partizione è controllata da spark.sql.files.maxPartitionBytes. Ti consigliamo di aumentarla a 1 GB: spark.sql.files.maxPartitionBytes=1073741824.

  • Per gli RDD Spark, la dimensione della partizione viene in genere controllata con fs.gs.block.size, il cui valore predefinito è 128 MB. Ti consigliamo di aumentarla a 1 GB. Esempio: --properties spark.hadoop.fs.gs.block.size=1073741824

Partizioni di output

Il numero di attività nelle fasi successive è controllato da diverse proprietà. Per i job più grandi che elaborano più di 1 TB, ti consigliamo di avere almeno 1 GB per partizione.

  • Per Spark SQL, il numero di partizioni di output è controllato da spark.sql.shuffle.partitions.

  • Per i job Spark che utilizzano l'API RDD, puoi specificare il numero di partizioni di output o impostare spark.default.parallelism.

Ottimizzazione dello shuffling per lo shuffling del worker principale

La proprietà più significativa è --properties yarn:spark.shuffle.io.serverThreads=<num-threads>. Tieni presente che si tratta di una proprietà YARN a livello di cluster perché il server di shuffling di Spark viene eseguito come parte di Node Manager. Il valore predefinito è il doppio (2x) del numero di core sulla macchina (ad esempio, 16 thread su una n1-highmem-8). Se "Shuffle Read Blocked Time" è maggiore di 1 secondo e i worker principali non hanno raggiunto i limiti di rete, CPU o disco, valuta la possibilità di aumentare il numero di thread del server di shuffling.

Sui tipi di macchine più grandi, valuta la possibilità di aumentare spark.shuffle.io.numConnectionsPerPeer, il cui valore predefinito è 1. (Ad esempio, impostalo su 5 connessioni per coppia di host).

Aumentare i tentativi

Il numero massimo di tentativi consentiti per i master delle app, le attività e le fasi può essere configurato impostando le seguenti proprietà:

yarn:yarn.resourcemanager.am.max-attempts
spark:spark.task.maxFailures
spark:spark.stage.maxConsecutiveAttempts

Poiché i master delle app e le attività vengono terminati più frequentemente nei cluster che utilizzano molte VM prerilasciabili o la scalabilità automatica senza la rimozione controllata, l'aumento dei valori delle proprietà precedenti in questi cluster può essere utile (tieni presente che l'utilizzo di EFM con Spark e il rimozione controllata non è supportato).

Rimozione controllata di YARN sui cluster EFM

Il ritiro gestito automaticamente di YARN può essere utilizzato per rimuovere rapidamente i nodi con un impatto minimo sulle applicazioni in esecuzione. Per i cluster con scalabilità automatica, il timeout per la dismissione controllata può essere impostato in un AutoscalingPolicy collegato al cluster EFM.

Miglioramenti EFM per rimozione controllata

  1. Poiché i dati intermedi vengono archiviati in un file system distribuito, i nodi possono essere rimossi da un cluster EFM non appena tutti i container in esecuzione su questi nodi sono stati completati. Al contrario, i nodi non vengono rimossi nei cluster Managed Service for Apache Spark standard fino al completamento dell'applicazione.

  2. La rimozione dei nodi non attende il completamento dei master delle app in esecuzione su un nodo. Quando il container del master dell'app viene terminato, viene ripianificato su un altro nodo che non è in fase di ritiro. L'avanzamento del job non viene perso: il nuovo master dell'app recupera rapidamente lo stato dal master dell'app precedente leggendo la cronologia dei job.