Parallelverarbeitung

Pipelines werden auf Clustern von Maschinen ausgeführt. Sie erzielen einen hohen Durchsatz, indem sie die zu erledigende Arbeit aufteilen und dann parallel auf den mehreren Executors ausführen, die über den Cluster verteilt sind. Im Allgemeinen gilt: Je mehr Aufteilungen (auch Partitionen genannt), desto schneller kann die Pipeline ausgeführt werden. Der Grad der Parallelität in Ihrer Pipeline wird durch die Quellen und Shuffle-Phasen in der Pipeline bestimmt.

Quellen

Zu Beginn jeder Pipelineausführung berechnet jede Quelle in Ihrer Pipeline, welche Daten gelesen werden müssen und wie diese Daten in Aufteilungen unterteilt werden können. Stellen Sie sich beispielsweise eine einfache Pipeline vor, die Daten aus Cloud Storage liest, einige Wrangler-Transformationen durchführt und dann wieder in Cloud Storage schreibt.

Einfache Pipeline mit Cloud Storage-Quelle, Wrangler-Transformation und Cloud Storage-Senke

Wenn die Pipeline gestartet wird, untersucht die Cloud Storage-Quelle die Eingabedateien und unterteilt sie anhand der Dateigrößen in Aufteilungen. Eine einzelne Gigabyte-Datei kann beispielsweise in 100 Aufteilungen mit jeweils 10 MB unterteilt werden. Jeder Executor liest die Daten für diese Aufteilung, führt die Wrangler-Transformationen aus und schreibt die Ausgabe dann in eine Teildatei.

Partitionierte Daten in Cloud Storage in parallele Wrangler-Transformationen in Teildateien aufteilen

Wenn Ihre Pipeline langsam ausgeführt wird, sollten Sie zuerst prüfen, ob Ihre Quellen genügend Aufteilungen erstellen, um die Parallelität voll auszunutzen. Bei einigen Arten der Komprimierung können Nur-Text-Dateien beispielsweise nicht aufgeteilt werden. Wenn Sie Dateien lesen, die mit GZIP komprimiert wurden, wird die Pipeline möglicherweise viel langsamer ausgeführt als beim Lesen von unkomprimierten Dateien oder Dateien, die mit BZIP komprimiert wurden (die aufgeteilt werden können). Wenn Sie die Datenbankquelle verwenden und sie so konfiguriert haben, dass nur eine einzige Aufteilung verwendet wird, wird sie viel langsamer ausgeführt als bei der Konfiguration für die Verwendung mehrerer Aufteilungen.

Shuffles

Bestimmte Arten von Plug-ins führen dazu, dass Daten im Cluster gemischt werden. Dies geschieht, wenn Datensätze, die von einem Executor verarbeitet werden, an einen anderen Executor gesendet werden müssen, um die Berechnung durchzuführen. Shuffles sind teure Vorgänge, da sie viele E/A-Vorgänge umfassen. Plug-ins, die dazu führen, dass Daten gemischt werden, werden alle im Bereich Analytics von Pipeline Studio angezeigt. Dazu gehören Plug-ins wie „Group By“, „Deduplicate“, „Distinct“ und „Joiner“. Angenommen, dem Pipeline-Beispiel oben wird eine Group By-Phase hinzugefügt.

Angenommen, die gelesenen Daten stellen Einkäufe in einem Lebensmittelgeschäft dar. Jeder Datensatz enthält ein Feld item und ein Feld num_purchased. In der Phase Group By konfigurieren wir die Pipeline so, dass Datensätze nach dem Feld item gruppiert und die Summe des Felds num_purchased berechnet wird.

Wenn die Pipeline ausgeführt wird, werden die Eingabedateien wie oben beschrieben aufgeteilt. Danach wird jeder Datensatz im Cluster gemischt, sodass jeder Datensatz mit demselben Artikel zum selben Executor gehört.

Wie im vorherigen Beispiel dargestellt, waren die Datensätze für Apfelkäufe ursprünglich auf mehrere Executors verteilt. Um die Aggregation durchzuführen, mussten alle diese Datensätze im Cluster an denselben Executor gesendet werden.

Bei den meisten Plug-ins, für die ein Shuffle erforderlich ist, können Sie die Anzahl der Partitionen angeben, die beim Mischen der Daten verwendet werden sollen. Dadurch wird festgelegt, wie viele Executors zum Verarbeiten der gemischten Daten verwendet werden.

Wenn im vorherigen Beispiel die Anzahl der Partitionen auf 2 festgelegt ist, berechnet jeder Executor Aggregate für zwei Artikel anstelle von einem.

Beachten Sie, dass Sie die Parallelität Ihrer Pipeline nach dieser Phase verringern können. Sehen Sie sich beispielsweise die logische Ansicht der Pipeline an:

Wenn die Quelle Daten auf 500 Partitionen verteilt, die Group By-Phase aber 200 Partitionen verwendet, sinkt der maximale Grad der Parallelität nach der Group By-Phase von 500 auf 200. Anstelle von 500 verschiedenen Teildateien, die in Cloud Storage geschrieben werden, haben Sie nur 200.

Partitionen auswählen

Wenn die Anzahl der Partitionen zu niedrig ist, nutzen Sie die volle Kapazität Ihres Clusters nicht, um so viel Arbeit wie möglich zu parallelisieren. Wenn Sie die Anzahl der Partitionen zu hoch festlegen, erhöht sich der unnötige Aufwand. Im Allgemeinen ist es besser, zu viele als zu wenige Partitionen zu verwenden. Zusätzlicher Aufwand ist ein Problem, wenn die Ausführung Ihrer Pipeline einige Minuten dauert und Sie versuchen, ein paar Minuten einzusparen. Wenn die Ausführung Ihrer Pipeline Stunden dauert, müssen Sie sich in der Regel keine Sorgen um den Aufwand machen.

Eine nützliche, aber zu einfache Möglichkeit, die Anzahl der zu verwendenden Partitionen zu bestimmen, besteht darin, sie auf max(cluster CPUs, input records / 500,000) festzulegen. Mit anderen Worten: Teilen Sie die Anzahl der Eingabedatensätze durch 500.000. Wenn diese Zahl größer als die Anzahl der Cluster-CPUs ist, verwenden Sie sie für die Anzahl der Partitionen. Andernfalls verwenden Sie die Anzahl der Cluster-CPUs. Wenn Ihr Cluster beispielsweise 100 CPUs hat und die Shuffle-Phase voraussichtlich 100 Millionen Eingabedatensätze umfasst, verwenden Sie 200 Partitionen.

Eine vollständigere Antwort ist, dass Shuffles am besten funktionieren, wenn die Zwischen-Shuffle-Daten für jede Partition vollständig in den Arbeitsspeicher eines Executors passen, sodass nichts auf die Festplatte ausgelagert werden muss. Spark reserviert knapp 30% des Arbeitsspeichers eines Executors für Shuffle-Daten. Die genaue Zahl ist (Gesamtspeicher – 300 MB) * 30%. Wenn wir davon ausgehen, dass jeder Executor 2 GB Arbeitsspeicher verwendet, sollten in jeder Partition nicht mehr als (2 GB – 300 MB) * 30% = etwa 500 MB an Datensätzen enthalten sein. Wenn wir davon ausgehen, dass jeder Datensatz auf 1 KB komprimiert wird,bedeutet das (500 MB / Partition) / (1 KB / Datensatz) = 500.000 Datensätze pro Partition. Wenn Ihre Executors mehr Arbeitsspeicher verwenden oder Ihre Datensätze kleiner sind, können Sie diese Zahl entsprechend anpassen.

Datenverzerrung

Im vorherigen Beispiel waren die Käufe für verschiedene Artikel gleichmäßig verteilt. Das heißt, es gab jeweils drei Käufe für Äpfel, Bananen, Karotten und Eier. Das Mischen mit einem gleichmäßig verteilten Schlüssel ist die leistungsstärkste Art des Mischens, aber viele Datasets haben diese Eigenschaft nicht. Wenn wir beim Beispiel mit dem Lebensmittelgeschäft bleiben, würden Sie viel mehr Käufe für Eier als für Hochzeitskarten erwarten. Wenn es einige Shuffle-Schlüssel gibt, die viel häufiger vorkommen als andere, haben Sie es mit verzerrten Daten zu tun. Verzerrte Daten können deutlich schlechter abschneiden als nicht verzerrte Daten, da ein unverhältnismäßig großer Teil der Arbeit von einer kleinen Anzahl von Executors ausgeführt wird. Dadurch ist eine kleine Teilmenge von Partitionen viel größer als alle anderen.

In diesem Beispiel gibt es fünfmal so viele Eierkäufe wie Kartenkäufe. Das bedeutet, dass die Berechnung des Eieraggregats etwa fünfmal so lange dauert. Das macht nicht viel aus, wenn es nur 10 Datensätze anstelle von zwei gibt, aber es macht einen großen Unterschied, wenn es fünf Milliarden Datensätze anstelle von einer Milliarde gibt. Bei einer Datenverzerrung hat die Anzahl der Partitionen, die beim Mischen verwendet werden, keine großen Auswirkungen auf die Pipelineleistung.

Sie können Datenverzerrungen erkennen, indem Sie sich das Diagramm für Ausgabedatensätze im Zeitverlauf ansehen. Wenn die Phase zu Beginn der Pipelineausführung Datensätze viel schneller ausgibt und dann plötzlich langsamer wird, kann das auf verzerrte Daten hindeuten.

Sie können Datenverzerrungen auch erkennen, indem Sie sich die Arbeitsspeichernutzung des Clusters im Zeitverlauf ansehen. Wenn Ihr Cluster einige Zeit lang voll ausgelastet ist, aber plötzlich eine Zeit lang eine geringe Arbeitsspeichernutzung aufweist, ist das auch ein Zeichen dafür, dass Sie es mit Datenverzerrungen zu tun haben.

Verzerrte Daten wirken sich am stärksten auf die Leistung aus, wenn ein Join ausgeführt wird. Es gibt einige Techniken, mit denen die Leistung bei verzerrten Joins verbessert werden kann. Weitere Informationen finden Sie unter Parallele Verarbeitung für JOIN Vorgänge.

Adaptive Optimierung für die Ausführung

Wenn Sie die Ausführung adaptiv optimieren möchten, geben Sie den Bereich der zu verwendenden Partitionen an, nicht die genaue Partitionsnummer. Die genaue Partitionsnummer wird auch dann ignoriert, wenn sie in der Pipelinekonfiguration festgelegt ist, wenn die adaptive Ausführung aktiviert ist.

Wenn Sie einen sitzungsspezifischen Managed Service for Apache Spark-Cluster verwenden, legt Cloud Data Fusion automatisch die richtige Konfiguration fest. Bei statischen Managed Service for Apache Spark- oder Hadoop-Clustern können die folgenden beiden Konfigurationsparameter festgelegt werden:

  • spark.default.parallelism: Legen Sie diesen Parameter auf die Gesamtzahl der im Cluster verfügbaren vCores fest. So wird sichergestellt, dass Ihr Cluster nicht unterlastet ist, und die Untergrenze für die Anzahl der Partitionen festgelegt.
  • spark.sql.adaptive.coalescePartitions.initialPartitionNum: Legen Sie diesen Parameter auf das 32-Fache der Anzahl der im Cluster verfügbaren vCores fest. Dadurch wird die Obergrenze für die Anzahl der Partitionen festgelegt.
  • Spark.sql.adaptive.enabled: Setzen Sie diesen Wert auf true, um die Optimierungen zu aktivieren. Managed Service for Apache Spark legt ihn automatisch fest. Wenn Sie jedoch generische Hadoop-Cluster verwenden, müssen Sie darauf achten, dass er aktiviert ist.

Diese Parameter können in der Engine-Konfiguration einer bestimmten Pipeline oder in den Clustereigenschaften eines statischen Managed Service for Apache Spark Clusters festgelegt werden.

Nächste Schritte