Der Enhanced Flexibility Mode (EFM) von Managed Service for Apache Spark verwaltet Shuffle-Daten. Dadurch werden Verzögerungen beim Fortschritt des Jobs minimiert, die durch das Entfernen von Knoten aus einem ausgeführten Cluster verursacht werden. EFM lagert Shuffle-Daten aus, indem Daten an primäre Worker geschrieben werden. Worker rufen diese Remoteknoten während der reduzierten Phase ab. Dieser Modus ist nur für Spark-Jobs verfügbar.
Da EFM keine Zwischen-Shuffle-Daten auf sekundären Workern speichert, eignet sich EFM gut für Cluster, die VMs auf Abruf oder nur die sekundäre Worker-Gruppe automatisch skalieren.
EFM wird in den Image-Versionen
2.0.31+, 2.1.6+, 2.2+, und höher
von Managed Service for Apache Spark unterstützt.
- Apache Hadoop YARN -Jobs, die die AppMaster-Verschiebung nicht unterstützen, können im Enhanced Flexibility Mode fehlschlagen. Weitere Informationen finden Sie unter Wann auf den Abschluss von AppMastern gewartet werden muss.
- Der Enhanced Flexibility Mode wird nicht empfohlen:
- auf einem Cluster, der nur primäre Worker hat
- für Streaming-Jobs, da es nach Abschluss des Jobs bis zu 30 Minuten dauern kann, bis die Zwischen-Shuffle-Daten bereinigt werden
- auf einem Cluster, auf dem Notebooks ausgeführt werden, da Shuffle-Daten während der Sitzung möglicherweise nicht bereinigt werden
- wenn Spark-Jobs in einem Cluster mit aktivierter ordnungsgemäßer Außerbetriebnahme ausgeführt werden Die ordnungsgemäße Außerbetriebnahme und EFM können sich gegenseitig behindern, da der YARN-Mechanismus für die ordnungsgemäße Außerbetriebnahme die Knoten im Status DECOMMISSIONING beibehält, bis alle beteiligten Anwendungen abgeschlossen sind.
- auf einem Cluster, auf dem sowohl Spark- als auch Nicht-Spark-Jobs ausgeführt werden
- Der Enhanced Flexibility Mode wird nicht unterstützt:
- wenn das primäre Worker-Autoscaling aktiviert ist. In den meisten Fällen speichern primäre Worker weiterhin Shuffle-Daten, die nicht automatisch migriert werden. Durch die Herunterskalierung der primären Worker-Gruppe werden EFM-Vorteile ausgeschlossen.
Enhanced Flexibility Mode verwenden
Enhanced Flexibility wird aktiviert, wenn Sie einen Cluster erstellen, indem Sie die
dataproc:efm.spark.shuffle Cluster-Eigenschaft
auf primary-workersetzen.
Beispiel :
gcloud dataproc clusters create cluster-name \ --region=region \ --properties=dataproc:efm.spark.shuffle=primary-worker \ other flags ...
Beispiel für Apache Spark
- Führen Sie einen WordCount-Job für öffentlichen Shakespeare-Text mithilfe der Spark-Beispiel-JAR-Datei auf dem EFM-Cluster aus.
gcloud dataproc jobs submit spark \ --cluster=cluster-name \ --region=region \ --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \ --class=org.apache.spark.examples.JavaWordCount \ -- gs://apache-beam-samples/shakespeare/macbeth.txt
Lokale SSDs konfigurieren
Da EFM Zwischen-Shuffle-Daten auf VM-angehängte Laufwerke schreibt, profitiert es vom zusätzlichen Durchsatz und den zusätzlichen IOPS, die von lokalen SSDs bereitgestellt werden. Damit die Ressourcenzuweisung vereinfacht wird, sollten Sie beim Konfigurieren von primären Worker-Maschinen ein Ziel von etwa einer lokalen SSD-Partition pro 4 vCPUs festlegen.
Übergeben Sie das --num-worker-local-ssds Flag an den
Befehl gcloud Managed Service for Apache Spark clusters create, um lokale SSDs hinzuzufügen.
Im Allgemeinen benötigen Sie keine lokalen SSDs auf sekundären Workern.
Das Hinzufügen lokaler SSDs zu den sekundären Workern eines Clusters (mit dem Flag --num-secondary-worker-local-ssds) ist oft weniger wichtig, da sekundäre Worker keine Shuffle-Daten lokal schreiben.
Da lokale SSDs die Leistung des lokalen Laufwerks verbessern, können Sie
lokalen Workern jedoch lokale SSDs hinzufügen, wenn Sie
davon ausgehen, dass Jobs aufgrund einer lokalen Laufwerknutzung auf E/A-gebunden
sind: Ihr Job verwendet ein umfangreiches lokales Laufwerk für den
Scratch-Speicherplatz oder Ihre Partitionen sind
zu groß, um in den Speicher zu passen, und wird an den Datenträger weitergeleitet.
Verhältnis der sekundären Worker
Da sekundäre Worker ihre Shuffle-Daten an primäre Worker schreiben, muss der Cluster eine ausreichende Anzahl von primären Workern mit genügend CPU-, Arbeitsspeicher- und Laufwerkressourcen haben, um die Shuffle-Last des Jobs zu bewältigen. Wenn Sie bei
Autoscaling-Clustern verhindern möchten, dass die primäre Gruppe skaliert wird und
unerwünschtes Verhalten verursacht, setzen Sie minInstances in der
Autoscaling-Richtlinie
für die primäre Worker-Gruppe auf den Wert maxInstances.
Wenn Sie ein hohes Verhältnis von sekundären zu primären Workern haben (z. B. 10:1), überwachen Sie die CPU-Auslastung, das Netzwerk und die Laufwerknutzung primärer Worker, um festzustellen, ob sie überlastet sind. Dazu
rufen Sie in der Google Cloud Console die VM-Instanzen Seite auf.
Klicken Sie auf das Kästchen links neben dem primären Worker.
Klicken Sie auf den Tab MONITORING, um die CPU-Auslastung, die Laufwerk-IOPS, die Netzwerk-Byte und andere Messwerte des primären Workers anzuzeigen.
Wenn primäre Worker überlastet sind, sollten Sie primäre Worker manuell vertikal skalieren.
Größe der primären Workergruppe anpassen
Die primäre Workergruppe kann sicher skaliert werden. Das Herunterskalieren der primären Workergruppe kann jedoch den Jobfortschritt beeinträchtigen. Vorgänge, die die
primäre Worker-Gruppe herunterskalieren, sollten die
ordnungsgemäße Außerbetriebnahme verwenden. Diese wird durch Festlegen des --graceful-decommission-timeout Flags aktiviert.
Automatisch skalierte Cluster:Die Skalierung der primären Workergruppe ist in EFM-Clustern mit Autoscaling-Richtlinien deaktiviert. So passen Sie die Größe der primären Workergruppe auf einem automatisch skalierten Cluster an:
Autoscaling deaktivieren
gcloud dataproc clusters update \ --cluster=cluster-name \ --region=region \ --disable-autoscaling
Primäre Gruppe skalieren
gcloud dataproc clusters update \ --cluster=cluster-name \ --region=region \ --num-workers=num-primary-workers \ --graceful-decommission-timeout=graceful-decommission-timeout # (if downscaling)
Aktivieren Sie das Autoscaling wieder:
gcloud dataproc clusters update \ --cluster=cluster-name \ --region=region \ --autoscaling-policy=autoscaling-policy
Nutzung des primären Worker-Laufwerks überwachen
Primäre Worker müssen genügend Speicherplatz für die Shuffle-Daten des Clusters haben.
Sie können dies indirekt anhand des Messwerts remaining HDFS capacity überwachen.
Wenn das lokale Laufwerk gefüllt wird, ist der Speicherplatz für HDFS nicht verfügbar und die verbleibende Kapazität sinkt.
Wenn das lokale Laufwerk eines primären Workers über 90 % der Kapazität überschreitet, wird der Knoten in der YARN-Knoten-Benutzeroberfläche standardmäßig als UNHEALTHY gekennzeichnet. Wenn Probleme mit der Laufwerkskapazität auftreten, können Sie nicht verwendete Daten aus HDFS löschen oder den primären Worker-Pool hochskalieren.
Erweiterte Konfiguration
Partitionierung und Parallelität
Konfigurieren Sie beim Senden eines Spark-Jobs eine geeignete Partitionierungsebene. Die Entscheidung über die Anzahl der Eingabe- und Ausgabepartitionen für eine Shuffle-Phase hat einen Kompromiss zwischen verschiedenen Leistungsmerkmalen. Am besten experimentieren Sie mit Werten, die für Ihre Jobformen funktionieren.
Eingabepartitionen
Die Partitionierung von Spark und MapReduce wird vom Eingabe-Dataset bestimmt. Beim Lesen von Dateien aus Cloud Storage verarbeitet jede Aufgabe ungefähr eine „Blockgröße“ an Daten.
Bei Spark SQL-Jobs wird die maximale Partitionsgröße von
spark.sql.files.maxPartitionBytesfestgelegt. Wir empfehlen, den Wert auf 1 GB zu erhöhen:spark.sql.files.maxPartitionBytes=1073741824.Bei Spark-RDDs wird die Partitionsgröße in der Regel mit
fs.gs.block.sizegesteuert, die standardmäßig 128 MB beträgt. Wir empfehlen, den Wert auf 1 GB zu erhöhen. Beispiel:--properties spark.hadoop.fs.gs.block.size=1073741824
Ausgabepartitionen
Die Anzahl der Aufgaben in den folgenden Phasen wird durch mehrere Attribute gesteuert. Bei größeren Jobs, die mehr als 1 TB verarbeiten, sollten Sie mindestens 1 GB pro Partition bereitstellen.
Bei Spark SQL wird die Anzahl der Ausgabepartitionen von
spark.sql.shuffle.partitionsgesteuert.Bei Spark-Jobs, die die RDD API verwenden, können Sie die Anzahl der Ausgabepartitionen angeben oder
spark.default.parallelismfestlegen.
Shuffle-Tuning des primären Workers
Das wichtigste Attribut ist --properties yarn:spark.shuffle.io.serverThreads=<num-threads>.
Beachten Sie, dass hier ein YARN-Attribut auf Clusterebene angegeben ist, da der Spark-Shuffle-Server als Teil des Knoten-Managers ausgeführt wird. Standardmäßig wird die Anzahl der Kerne auf dem Computer verdoppelt (z. B. 16 Threads auf einem n1-highmem-8). Wenn „Shuffle Read Blocked Time“ größer als 1 Sekunde ist und primäre Worker das Netzwerk-, CPU- oder Laufwerklimit nicht erreicht haben, sollten Sie die Anzahl der Shuffle-Server-Threads erhöhen.
Bei größeren Maschinentypen sollten Sie möglicherweise spark.shuffle.io.numConnectionsPerPeer erhöhen. Der Standardwert ist 1. (Setzen Sie es z. B. auf 5 Verbindungen pro Hostpaar).
Wiederholungsversuche erhöhen
Die maximale Anzahl der Versuche, die App-Master, Aufgaben und Phasen verwenden können, kann über die folgenden Attribute konfiguriert werden:
yarn:yarn.resourcemanager.am.max-attempts spark:spark.task.maxFailures spark:spark.stage.maxConsecutiveAttempts
Da App-Master und Aufgaben häufiger in Clustern beendet werden, die viele VMs auf Abruf oder Autoscaling ohne ordnungsgemäße Außerbetriebnahme verwenden, kann es hilfreich sein, die Werte der oben genannten Attribute in diesen Clustern zu erhöhen. (Bitte beachten Sie, dass die Verwendung von EFM mit Spark und ordnungsgemäßer Außerbetriebnahme nicht unterstützt wird).
YARN: Ordnungsgemäße Außerbetriebnahme in EFM-Clustern
Die ordnungsgemäße Außerbetriebnahme in YARN kann verwendet werden, um Knoten schnell und mit minimalen Auswirkungen auf die Ausführung von Anwendungen zu entfernen. Bei Autoscaling-Clustern kann das Zeitlimit für eine ordnungsgemäße Außerbetriebnahme in einer AutoscalingPolicy festgelegt werden, die an den EFM-Cluster angehängt ist.
EFM-Verbesserungen für eine ordnungsgemäße Außerbetriebnahme
Da Zwischendaten in einem verteilten Dateisystem gespeichert werden, können Knoten aus einem EFM-Cluster entfernt werden, sobald alle Container, die auf diesen Knoten ausgeführt werden, abgeschlossen sind. Dagegen werden Knoten erst dann aus Standard-Managed Service for Apache Spark-Clustern entfernt, wenn die Anwendung beendet wurde.
Das Entfernen von Knoten wartet nicht, bis die auf einem Knoten ausgeführten App-Master beendet wurden. Wenn der Master-Container der Anwendung beendet wird, wird er auf einem anderen Knoten verschoben, der nicht außer Betrieb genommen wird. Jobfortschritte gehen nicht verloren. Der neue App-Master stellt den Status des vorherigen App-Master durch das Einlesen des Jobverlaufs schnell wieder her.