Autoscaling für Managed Service for Apache Spark

In diesem Dokument finden Sie Informationen zum Autoscaling von Managed Service for Apache Spark. Wenn Sie Ihre Spark-Arbeitslast senden, kann Managed Service for Apache Spark die Arbeitslastressourcen wie die Anzahl der Executors dynamisch skalieren, um Ihre Arbeitslast effizient auszuführen. Das Autoscaling von Managed Service for Apache Spark ist das Standardverhalten und verwendet die dynamische Ressourcenzuweisung von Spark, um zu bestimmen, ob, wie und wann Ihre Arbeitslast skaliert werden soll.

Autoscaling von Managed Service for Apache Spark V2

Mit dem Autoscaling von Managed Service for Apache Spark Version 2 (V2) werden der Standardversion 1 (V1) Funktionen und Verbesserungen hinzugefügt, mit denen Sie Arbeitslasten von Managed Service for Apache Spark verwalten, die Arbeitslastleistung verbessern und Kosten sparen können:

  • Asynchrone Knotenskalierung: Beim Autoscaling V2 wird die synchrone Skalierung von V1 durch die asynchrone Skalierung ersetzt. Bei der asynchronen Skalierung skaliert Managed Service for Apache Spark die Arbeitslastressourcen herunter, ohne darauf zu warten, dass alle Knoten die Shuffle-Migration abgeschlossen haben. Das bedeutet, dass Longtail-Knoten, die langsam herunterskaliert werden, die Aufskalierung nicht blockieren.
  • Intelligente Knotenauswahl für die Skalierung: Beim Autoscaling V2 wird die zufällige Knotenauswahl von V1's durch einen intelligenten Algorithmus ersetzt, der die besten Knoten für die erste Skalierung ermittelt. Dieser Algorithmus berücksichtigt Faktoren wie die Größe der Shuffle-Daten des Knotens und die Leerlaufzeit.
  • Konfigurierbares Verhalten bei der ordnungsgemäßen Außerbetriebnahme von Spark und der Shuffle-Migration: Mit dem Autoscaling V2 können Sie Standard-Spark-Attribute verwenden, um die ordnungsgemäße Außerbetriebnahme von Spark und die Shuffle-Migration zu konfigurieren. Mit dieser Funktion können Sie die Migrationskompatibilität mit Ihren benutzerdefinierten Spark-Attributen aufrechterhalten.

Funktionen des Autoscalings von Managed Service for Apache Spark

Funktion Autoscaling von Managed Service for Apache Spark V1 Autoscaling von Managed Service for Apache Spark V2
Knotenskalierung Synchron Asynchron
Knotenauswahl für die Skalierung Zufällig Intelligent
Ordnungsgemäße Außerbetriebnahme von Spark und Shuffle-Migration Nicht konfigurierbar Konfigurierbar

Attribute für die dynamische Zuordnung von Spark

In der folgenden Tabelle sind die Attribute für die dynamische Zuordnung von Spark aufgeführt, die Sie beim Senden einer Batcharbeitslast festlegen können, um das Autoscaling zu steuern. Informationen zum Festlegen von Spark-Attributen

Attribut Beschreibung Standard
spark.dataproc.scaling.version Die Version des Autoscalings von Managed Service for Apache Spark Spark. Geben Sie Version 1 oder 2 an (siehe Autoscaling von Managed Service for Apache Spark V2). 1
spark.dynamicAllocation.enabled Gibt an, ob die dynamische Ressourcenzuweisung verwendet werden soll, mit der die Anzahl der Executors basierend auf der Arbeitslast hoch- und herunterskaliert wird. Wenn Sie den Wert auf false setzen, wird das Autoscaling für die Arbeitslast deaktiviert. Standard: true. true
spark.dynamicAllocation.initialExecutors Die anfängliche Anzahl der Executors, die der Arbeitslast zugewiesen wurden. Nach dem Arbeitslast startet, kann das Autoscaling die Anzahl der aktiven Executors ändern. Der Mindestwert ist 2, der Höchstwert 2000. 2
spark.dynamicAllocation.minExecutors Die Mindestanzahl der Executors, auf die die Arbeitslast herunterskaliert werden soll. Der Mindestwert ist 2. 2
spark.dynamicAllocation.maxExecutors Die maximale Anzahl der Executors, auf die die Arbeitslast hochskaliert werden soll. Der Höchstwert ist 2000. 1000
spark.dynamicAllocation.executorAllocationRatio Passt die Aufskalierung der Spark-Arbeitslast an. Akzeptiert einen Wert zwischen 0 und 1. Ein Wert von 1.0 bietet maximale Aufskalierbarkeit und trägt dazu bei, maximale Parallelität zu erreichen. Ein Wert von 0.5 legt die Aufskalierbarkeit und Parallelität auf die Hälfte des Höchstwerts fest. 0.3
spark.dynamicAllocation.diagnosis.enabled Wenn true, werden Diagnoseinformationen protokolliert, wenn die Anzahl der ausgeführten Executors die maximal erforderliche Anzahl für den Zeitraum überschreitet, der durch spark.dynamicAllocation.diagnosis.interval angegeben wird. Die Diagnose enthält eine Zusammenfassung der Executors mit der Anzahl der Executors im Leerlauf und den Perzentilen der Leerlaufzeit der Verteilung der aktiven Aufgaben, der Größe der Shuffle-Daten und der Größe der zwischengespeicherten RDD. Mit spark.dynamicAllocation.diagnosis.logLevel können Sie die Protokollebene der Ausgabe steuern. false
spark.dynamicAllocation.profile Legen Sie performance oder cost fest, um eine vordefinierte Reihe von Konfigurationen anzuwenden, die für Leistung oder Kosteneffizienz optimiert sind. Benutzerdefinierte Attribute überschreiben die Standardeinstellungen des Profils. Weitere Informationen finden Sie unter Profile für die dynamische Zuordnung von Spark. none
spark.dynamicAllocation.shuffleTracking.dynamicTimeout.enabled Wenn true, wird die dynamische Zeitüberschreitungsberechnung für Executors aktiviert, die Shuffle-Daten enthalten. Anstelle der statischen spark.dynamicAllocation.shuffleTracking.timeout, wird die Zeitüberschreitung basierend auf der Menge der Shuffle-Daten berechnet, die auf dem Executor gespeichert sind. So können Executors mit kleinen Shuffles schneller freigegeben werden, während Executors mit großen Shuffles länger aktiv bleiben. false
spark.reducer.fetchMigratedShuffle.enabled Wenn auf true gesetzt, wird das Abrufen des Shuffle-Ausgabespeicherorts vom Spark-Treiber aktiviert, nachdem ein Abruf von einem Executor fehlgeschlagen ist, der aufgrund der dynamischen Zuordnung von Spark außer Betrieb genommen wurde. Dadurch werden ExecutorDeadException Fehler reduziert, die durch die Migration von Shuffle-Blöcken von außer Betrieb genommenen zu aktiven Executors verursacht werden, und die Anzahl der Wiederholungen von Phasen reduziert, die durch FetchFailedException Fehler verursacht werden (siehe FetchFailedException caused by ExecutorDeadException). Dieses Attribut ist in den Spark-Laufzeitversionen Spark runtime versions 1.1.12 und höher sowie 2.0.20 und höher von Managed Service for Apache Spark verfügbar. false
spark.scheduler.excludeShuffleSkewExecutors Wenn true, werden Aufgaben nicht auf Executors mit Shuffle-Skew geplant. Das sind Executors mit einer großen Menge an Shuffle-Daten oder einer großen Anzahl abgeschlossener Map-Aufgaben. Dadurch kann die Leistung verbessert werden, indem Shuffle-Skew reduziert wird. false

Profile für die dynamische Zuordnung von Spark

Sie können das Attribut spark.dynamicAllocation.profile auf performance oder cost setzen, um eine vordefinierte Reihe von Spark-Konfigurationen anzuwenden, die für Leistung oder Kosteneffizienz optimiert sind. Wenn Sie zusätzlich zum Festlegen des Attributs spark.dynamicAllocation.profile Spark-Attribute festlegen, überschreiben Ihre Einstellungen die Standardeinstellungen des Profils für diese Attribute.

performance: Dieses Profil ist für eine minimale Ausführungszeit optimiert, indem die folgenden Standardeinstellungen angewendet werden:

  • spark.scheduler.excludeShuffleSkewExecutors: true
  • spark.dynamicAllocation.executorIdleTimeout: 300s
  • spark.dynamicAllocation.initialExecutors: 10

cost: Dieses Profil ist für einen reduzierten Ressourcenverbrauch optimiert, indem die folgenden Standardeinstellungen angewendet werden:

  • spark.dynamicAllocation.executorIdleTimeout: 120s
  • spark.dynamicAllocation.cachedExecutorIdleTimeout: 120s
  • spark.dynamicAllocation.shuffleTracking.dynamicTimeout.enabled: true
  • spark.dynamicAllocation.diagnosis.enabled: true

Messwerte für die dynamische Zuordnung von Spark

Spark-Batcharbeitslasten generieren die folgenden Messwerte im Zusammenhang mit der dynamischen Ressourcenzuweisung von Spark. Weitere Informationen zu Spark-Messwerten finden Sie unter Monitoring und Instrumentierung.

Messwert Beschreibung
maximum-needed Die maximale Anzahl der Executors, die unter der aktuellen Last erforderlich sind, um alle laufenden und ausstehenden Aufgaben zu erfüllen.
running Die Anzahl der ausgeführten Executors, die Aufgaben ausführen.

Probleme und Lösungen für die dynamische Zuordnung von Spark

  • FetchFailedException caused by ExecutorDeadException

    Ursache: Wenn die dynamische Zuordnung von Spark einen Executor herunterskaliert, wird die Shuffle-Datei zu aktiven Executors migriert. Da die Spark-Reducer-Aufgabe auf einem Executor jedoch die Shuffle-Ausgabe vom Speicherort abruft, der vom Spark-Treiber beim Start der Reducer-Aufgabe festgelegt wurde, kann der Reducer bei der Migration einer Shuffle-Datei weiterhin versuchen, die Shuffle-Ausgabe von einem außer Betrieb genommenen Executor abzurufen. Dies führt zu ExecutorDeadException- und FetchFailedException-Fehlern.

    Lösung: Aktivieren Sie das erneute Abrufen des Shuffle-Speicherorts, indem Sie das spark.reducer.fetchMigratedShuffle.enabled auf true setzen, wenn Sie Ihre Managed Service for Apache Spark-Batcharbeitslast ausführen (siehe Attribute für Spark-Batcharbeitslasten festlegen). Wenn dieses Attribut aktiviert ist, ruft die Reducer-Aufgabe den Speicherort der Shuffle-Ausgabe nach einem fehlgeschlagenen Abruf von einem außer Betrieb genommenen Executor erneut vom Treiber ab.