Horizontales Autoscaling für Streamingpipelines abstimmen

Bei Streamingpipelines mit einem hohen Volumen an Eingabedaten gibt es in der Regel einen Kompromiss zwischen Kosten und Latenz. Um eine niedrige Latenz aufrechtzuerhalten, muss Dataflow Worker hinzufügen, wenn das Trafficvolumen zunimmt. Ein weiterer Faktor ist, wie schnell die Pipeline als Reaktion auf Änderungen der Eingabedatenrate hoch- oder herunterskaliert werden soll.

Das Dataflow-Autoscaling hat Standardeinstellungen, die für viele Arbeitslasten geeignet sind. Es empfiehlt sich jedoch, dieses Verhalten für Ihr spezifisches Szenario gegebenenfalls anzupassen. Beispielsweise kann eine höhere durchschnittliche Latenz akzeptabel sein, um Kosten zu senken, oder Sie möchten, dass Dataflow als Reaktion auf Trafficspitzen schneller hochskaliert.

Zur Optimierung des horizontalen Autoscalings können Sie die folgenden Parameter anpassen:

Autoscaling-Bereich festlegen

Wenn Sie einen neuen Streamingjob erstellen, können Sie die anfängliche Anzahl der Worker und die maximale Anzahl der Worker festlegen. Geben Sie dazu die folgenden Pipelineoptionen an:

Java

  • --numWorkers: die anfängliche Anzahl der Worker, die verfügbar sind, wenn die Pipeline-Ausführung beginnt
  • --maxNumWorkers: die maximale Anzahl an Workern, die für Ihre Pipeline verfügbar sind

Python

  • --num_workers: die anfängliche Anzahl der Worker, die verfügbar sind, wenn die Pipeline-Ausführung beginnt
  • --max_num_workers: die maximale Anzahl an Workern, die für Ihre Pipeline verfügbar sind

Go

  • --num_workers: die anfängliche Anzahl der Worker, die verfügbar sind, wenn die Pipeline-Ausführung beginnt
  • --max_num_workers: die maximale Anzahl an Workern, die für Ihre Pipeline verfügbar sind

Für Streamingjobs, die Streaming Engine verwenden, ist das Flag --maxNumWorkers optional. Der Standardwert ist 100. Für Streamingjobs ohne Streaming Engine ist --maxNumWorkers erforderlich, wenn das horizontale Autoscaling aktiviert ist.

Der Startwert von --maxNumWorkers bestimmt auch, wie viele Persistent Disks dem Job zugewiesen werden. Streamingpipelines werden mit einem festen Pool nichtflüchtiger Speicher bereitgestellt, deren Anzahl --maxNumWorkers entspricht. Während des Streamings werden Persistent Disks so neu verteilt, dass jeder Worker mit der gleichen Anzahl von Laufwerken verbunden ist.

Wenn Sie --maxNumWorkers festlegen, muss der Wert eine ausreichende Anzahl von Laufwerken für Ihre Pipeline bieten. Berücksichtigen Sie bei der Festlegung des Anfangswerts das zukünftige Wachstum. Informationen zur Leistung von Persistent Disks finden Sie unter Persistent Disk und VMs konfigurieren. Dataflow stellt die Nutzung von Persistent Disk in Rechnung und hat Compute Engine-Kontingente, einschließlich Persistent Disk-Kontingente.

Die Mindestanzahl von Workern ist standardmäßig 1 für Streamingjobs, die Streaming Engine verwenden, und (maxNumWorkers/15), aufgerundet, für Jobs ohne Streaming Engine.

Autoscaling-Bereich aktualisieren

Bei Jobs, die Streaming Engine verwenden, können Sie die Mindest- und Höchstanzahl der Worker anpassen, ohne den Job zu beenden oder zu ersetzen. Verwenden Sie dazu eine Aktualisierung des Jobs während der Ausführung. Aktualisieren Sie die folgenden Joboptionen:

  • --min-num-workers: die Mindestanzahl der Worker.
  • --max-num-workers: die maximale Anzahl der Worker.

gcloud

Führen Sie den Befehl gcloud dataflow jobs update-options aus:

gcloud dataflow jobs update-options \
  --region=REGION \
  --min-num-workers=MINIMUM_WORKERS \
  --max-num-workers=MAXIMUM_WORKERS \
  JOB_ID

Ersetzen Sie dabei Folgendes:

  • REGION: Die Regions-ID des regionalen Endpunkts des Jobs.
  • MINIMUM_WORKERS: Die Mindestanzahl von Compute Engine-Instanzen.
  • MAXIMUM_WORKERS: Die Mindestanzahl von Compute Engine-Instanzen.
  • JOB_ID: Die ID des zu aktualisierenden Jobs.

Sie können --min-num-workers und --max-num-workers auch einzeln aktualisieren.

REST

Verwenden Sie die Methode projects.locations.jobs.update:

PUT https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID?updateMask=runtime_updatable_params.max_num_workers,runtime_updatable_params.min_num_workers
{
  "runtime_updatable_params": {
    "min_num_workers": MINIMUM_WORKERS,
    "max_num_workers": MAXIMUM_WORKERS
  }
}

Ersetzen Sie dabei Folgendes:

  • PROJECT_ID: die Google Cloud Projekt-ID des Dataflow-Jobs.
  • REGION: Die Regions-ID des regionalen Endpunkts des Jobs.
  • JOB_ID: Die ID des zu aktualisierenden Jobs.
  • MINIMUM_WORKERS: Die Mindestanzahl von Compute Engine-Instanzen.
  • MAXIMUM_WORKERS: Die Mindestanzahl von Compute Engine-Instanzen.

Sie können min_num_workers und max_num_workers auch einzeln aktualisieren. Geben Sie im Abfrageparameter updateMask an, welche Parameter aktualisiert werden sollen, und fügen Sie die aktualisierten Werte in das Feld runtimeUpdatableParams des Anfragetexts ein. Im folgenden Beispiel wird min_num_workers aktualisiert:

PUT https://dataflow.googleapis.com/v1b3/projects/my_project/locations/us-central1/jobs/job1?updateMask=runtime_updatable_params.min_num_workers
{
  "runtime_updatable_params": {
    "min_num_workers": 5
  }
}

Bei Jobs ohne Streaming Engine können Sie den vorhandenen Job durch einen aktualisierten Wert von maxNumWorkers ersetzen.

Wenn Sie einen Streamingjob aktualisieren, der Streaming Engine nicht verwendet, ist das horizontale Autoscaling für den aktualisierten Job standardmäßig deaktiviert. Wenn das Autoscaling aktiviert bleiben soll, geben Sie für den aktualisierten Job --autoscalingAlgorithm und --maxNumWorkers an.

Hinweis zur Worker-Auslastung festlegen

Dataflow verwendet die durchschnittliche CPU-Auslastung als Signal dafür, wann das horizontale Autoscaling angewendet werden soll. Standardmäßig legt Dataflow eine Ziel-CPU-Auslastung von 0,8 fest. Wenn die Auslastung außerhalb dieses Bereichs liegt, fügt Dataflow möglicherweise Worker hinzu oder entfernt sie.

Wenn Sie das Autoscaling-Verhalten besser steuern möchten, können Sie die Ziel-CPU-Auslastung auf einen Wert im Bereich [0,1, 0,9] festlegen.

  • Legen Sie einen niedrigeren Wert für die CPU-Auslastung fest, wenn Sie niedrigere Spitzenlatenzen erzielen möchten. Ein niedrigerer Wert ermöglicht es Dataflow, als Reaktion auf eine steigende Worker-Auslastung aggressiver horizontal zu skalieren und konservativer herunterzuskalieren, um die Stabilität zu verbessern. Ein niedrigerer Wert bietet auch einen größeren Toleranzbereich, wenn die Pipeline im stabilen Zustand ausgeführt wird. Dies führt im Allgemeinen zu einer geringeren Extremwertlatenz. Die Extremwertlatenz misst die längsten Wartezeiten, bevor ein neuer Datensatz verarbeitet wird.

  • Legen Sie einen höheren Wert fest, wenn Sie Ressourcen sparen und die Kosten bei Trafficspitzen senken möchten. Ein höherer Wert verhindert eine übermäßige Hochskalierung, auf Kosten einer höheren Latenz.

Wenn Sie den Auslastungshinweis konfigurieren möchten, wenn Sie einen Job ohne Vorlage ausführen, legen Sie die worker_utilization_hint Dienstoptionfest. Bei einem Vorlagenjob, aktualisieren Sie stattdessen den Auslastungshinweis, da Dienstoptionen nicht unterstützt werden.

Das folgende Beispiel zeigt, wie worker_utilization_hint verwendet wird:

Java

--dataflowServiceOptions=worker_utilization_hint=TARGET_UTILIZATION

Ersetzen Sie TARGET_UTILIZATION durch einen Wert im Bereich [0,1, 0,9].

Python

--dataflow_service_options=worker_utilization_hint=TARGET_UTILIZATION

Ersetzen Sie TARGET_UTILIZATION durch einen Wert im Bereich [0,1, 0,9].

Go

--dataflow_service_options=worker_utilization_hint=TARGET_UTILIZATION

Ersetzen Sie TARGET_UTILIZATION durch einen Wert im Bereich [0,1, 0,9].

Bei neuen Pipelines empfehlen wir, die Standardeinstellung unter realistischen Lasten zu testen. Bewerten Sie dann das Autoscaling-Verhalten für Ihre Pipeline und nehmen Sie bei Bedarf Anpassungen vor.

Der Auslastungshinweis ist nur einer der Faktoren, die Dataflow bei der Entscheidung berücksichtigt, ob Worker skaliert werden sollen. Andere Faktoren wie Rückstand und verfügbare Schlüssel können den Hinweiswert überschreiben. Außerdem ist der Hinweis kein strenges Ziel. Das Autoscaling versucht, die CPU-Auslastung im Bereich des Hinweiswerts zu halten, aber der aggregierte Auslastungsmesswert kann höher oder niedriger sein. Weitere Informationen finden Sie unter Streaming-Autoscaling-Heuristik.

Auslastungshinweis aktualisieren

So aktualisieren Sie den Auslastungshinweis während der Ausführung eines Jobs: In-Flight-Update

gcloud

Führen Sie den Befehl gcloud dataflow jobs update-options aus:

gcloud dataflow jobs update-options \
  --region=REGION \
  --worker-utilization-hint=TARGET_UTILIZATION \
  JOB_ID

Ersetzen Sie dabei Folgendes:

  • REGION: Die Regions-ID des regionalen Endpunkts des Jobs.
  • JOB_ID: Die ID des zu aktualisierenden Jobs.
  • TARGET_UTILIZATION: ein Wert im Bereich [0,1, 0,9]

Verwenden Sie den folgenden gcloud-Befehl, um den Auslastungshinweis auf den Standardwert zurückzusetzen:

gcloud dataflow jobs update-options \
  --unset-worker-utilization-hint \
  --region=REGION \
  --project=PROJECT_ID \
  JOB_ID

REST

Verwenden Sie die Methode projects.locations.jobs.update:

PUT https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID?updateMask=runtime_updatable_params.worker_utilization_hint
{
  "runtime_updatable_params": {
    "worker_utilization_hint": TARGET_UTILIZATION
  }
}

Ersetzen Sie dabei Folgendes:

  • PROJECT_ID: die Google Cloud Projekt-ID des Dataflow-Jobs.
  • REGION: Die Regions-ID des regionalen Endpunkts des Jobs.
  • JOB_ID: Die ID des zu aktualisierenden Jobs.
  • TARGET_UTILIZATION: ein Wert im Bereich [0,1, 0,9]

Hinweis zur Worker-Parallelität festlegen

Um das Autoscaling mit langen Vorgängen zu verarbeiten, die weniger von CPUs abhängig sind, z. B. ML-intensive Arbeitslasten, können Sie den Hinweis zur Worker-Parallelität mit Apache Beam-Ressourcenhinweisen festlegen. Diese Hinweise schalten das Autoscaling in einen anderen Modus um, der für GPU-intensive Arbeitslasten oder Transformationen mit langer Verarbeitungszeit optimiert ist.

Das folgende Beispiel zeigt, wie Sie einen Hinweis zur Parallelität an eine Transformation anhängen:

Java

pcoll.apply(MyCompositeTransform.of(...)
  .setResourceHints(
      ResourceHints.create()
          .withMaxActiveBundlesPerWorker(TARGET_PARALLELISM_PER_WORKER)))

Ersetzen Sie TARGET_PARALLELISM_PER_WORKER durch einen Wert , der für Ihren Anwendungsfall geeignet ist. Allgemeine Informationen finden Sie unter Einen guten Startwert auswählen.

Python

pcoll | MyPTransform().with_resource_hints(
  max_active_bundles_per_worker="TARGET_PARALLELISM_PER_WORKER")

Ersetzen Sie TARGET_PARALLELISM_PER_WORKER durch einen Wert , der für Ihren Anwendungsfall geeignet ist. Allgemeine Informationen finden Sie unter Einen guten Startwert auswählen.

Wert für den Hinweis zur Worker-Parallelität auswählen

Für ML-Anwendungsfälle ist ein guter Startwert gleich der Anzahl der Modelle, die parallel auf jedem Worker ausgeführt werden. Dieser Wert ist durch die Kapazität der Beschleuniger auf dem Worker und die Größe des Modells begrenzt.

Bei anderen Anwendungsfällen ist die Pipeline entweder speicher- oder CPU-gebunden. Bei speichergebundenen Pipelines verwenden Sie das Speicherlimit, um die maximale parallele Verarbeitung zu berechnen. Bei CPU-gebundenen Pipelines empfiehlt es sich, die Standardrichtlinie für das Autoscaling beizubehalten, anstatt einen Hinweis zur Parallelität zu geben.

Es ist möglich, den Wert so zu optimieren, dass er den Verarbeitungsanforderungen anderer Phasen gerecht wird, z. B. dem Schreiben in ein Ziel. Wenn Sie den Wert um 1 oder 2 erhöhen, wenn die Parallelität Ihres Modells 2 ist, wird die schnellere Verarbeitungszeit beim Schreiben in das Ziel berücksichtigt, da mehr Spielraum für die Verarbeitung in anderen Phasen vorhanden ist. Wenn Ihre Pipeline kein Shuffle umfasst und Transformationen zu einer einzigen Phase zusammengefasst werden, müssen Sie den Wert für andere Transformationen nicht anpassen.

Dieser Wert kann auch angepasst werden, um die Auswirkungen akzeptabler Rückstandsverzögerungen zu simulieren. Wenn Sie beispielsweise mit einer maximalen Verzögerung von 10 Minuten zufrieden sind und die durchschnittliche Verarbeitungszeit Ihres Modells 1 Minute beträgt, können Sie den Wert um 1 erhöhen, vorausgesetzt, die maximale Anzahl der Worker ist auf 10 festgelegt.

Heuristik für GPU-intensives Autoscaling

Bei der GPU-intensiven Einstellung, die durch das Festlegen eines Hinweises zur Parallelität angegeben wird, berücksichtigt Dataflow beim Autoscaling mehrere Faktoren. Diese Faktoren umfassen:

  • Verfügbare Schlüssel. Schlüssel sind die Grundeinheit der Parallelität in Dataflow.
  • Maximale Anzahl aktiver Bundles pro Worker. Dies gibt die maximale ideale Anzahl der Verarbeitungsparallelität innerhalb des Workers an.

Die allgemeine Idee hinter den Skalierungsentscheidungen besteht darin, die Worker zu berechnen, die zur Verarbeitung der aktuellen Last erforderlich sind, wie durch verfügbare Schlüssel signalisiert. Wenn beispielsweise 100 Schlüssel zur Verarbeitung verfügbar sind und die maximale Parallelität pro Worker 10 beträgt, sollten Sie insgesamt 10 Worker haben.

Wenn Ihre Pipeline komplex ist und sowohl eine hohe GPU-intensive Arbeitslast als auch zahlreiche CPU-intensive Transformationen aufweist, empfehlen wir, die richtige Anpassung zu aktivieren. So kann der Dienst gut zwischen CPU-intensiver und GPU-intensiver Arbeit unterscheiden und dann jeden Worker-Pool entsprechend skalieren.

Streaming-Autoscaling-Heuristik

Bei Streamingpipelines besteht das Ziel des horizontalen Autoscalings darin, den Rückstand zu minimieren und gleichzeitig die Worker-Auslastung und den Durchsatz zu maximieren. Außerdem soll dadurch schnell auf Lastspitzen reagiert werden können.

Dataflow berücksichtigt beim Autoscaling mehrere Faktoren, darunter:

  • Rückstand. Die geschätzte Rückstandszeit wird sowohl aus dem Durchsatz als auch aus den Rückstandbyte berechnet, die noch aus der Eingabequelle verarbeitet werden müssen. Eine Pipeline gilt als "im Rückstand", wenn die geschätzte Rückstandszeit mehr als 15 Sekunden beträgt.

  • CPU-Zielauslastung. Der Standardzielwert für die durchschnittliche CPU-Auslastung ist 0,8. Sie können diesen Wert überschreiben.

  • Verfügbare Schlüssel: Schlüssel sind die Grundeinheit der Parallelität in Dataflow.

In einigen Fällen berücksichtigt Dataflow die folgenden Faktoren bei Autoscaling-Entscheidungen. Wenn diese Faktoren für Ihren Job verwendet werden, sehen Sie diese Informationen auf dem Tab Autoscaling für Messwerte.

  • Bei der schlüsselbasierten Drosselung wird die Anzahl der vom Job empfangenen Verarbeitungsschlüssel verwendet, um die Obergrenze für Nutzer-Worker zu berechnen, da jeder Schlüssel nur von einem Worker auf einmal verarbeitet werden kann.

  • Herunterskalierung dämpfen. Wenn Dataflow feststellt, dass instabile Autoscaling-Entscheidungen getroffen wurden, wird die Herunterskalierung verlangsamt, um die Stabilität zu verbessern.

  • CPU-basierte Skalierung verwendet eine hohe CPU-Auslastung als Hochskalierungskriterium.

  • Bei Streamingjobs ohne Streaming Engine kann die Skalierung durch die Anzahl der Persistent Disks eingeschränkt sein. Weitere Informationen finden Sie unter Autoscaling-Bereich festlegen.

  • GPU-intensives Autoscaling, wenn durch Festlegen eines Hinweises zur Worker-Parallelität aktiviert. Weitere Informationen finden Sie unter Heuristik für GPU-intensives Autoscaling.

Hochskalieren. Wenn eine Streamingpipeline für mehrere Minuten Rückstände bei ausreichender Parallelität auf den Workern hat, wird Dataflow hochskaliert. Dataflow versucht, den Rückstand unter Berücksichtigung des aktuellen Durchsatzes pro Worker innerhalb von etwa 150 Sekunden nach dem Hochskalieren zu löschen. Wenn es Rückstände gibt, der Worker jedoch nicht genügend Parallelität für zusätzliche Worker hat, wird die Pipeline nicht hochskaliert. Wenn Sie die Anzahl der Worker über die Anzahl der für die parallele Verarbeitung verfügbaren Schlüssel hinaus erhöhen, wird der Rückstand nicht schneller verarbeitet.

Herunterskalieren Wenn das Autoscaling eine Entscheidung zum Herunterskalieren trifft, ist der Rückstand der höchste Prioritätsfaktor. Das Autoscaling zielt auf einen Rückstand von maximal 15 Sekunden ab. Wenn der Rückstand unter 10 Sekunden fällt und die durchschnittliche Worker-Auslastung unter dem Zielwert für die CPU-Auslastung liegt, wird Dataflow herunterskaliert. Solange der Rückstand akzeptabel ist, versucht das Autoscaling, die CPU-Auslastung nahe der Ziel-CPU-Auslastung zu halten. Wenn die Auslastung jedoch bereits ausreichend nahe am Zielwert liegt, behält das Autoscaling möglicherweise die Anzahl der Worker bei, da jeder Herunterskalierungsschritt Kosten verursacht.

Streaming Engine verwendet auch ein Verfahren für vorausschauendes Autoscaling, das auf dem Timer-Rückstand basiert. Unbegrenzte Daten in einer Streaming-Pipeline werden in Fenster aufgeteilt, die nach Zeitstempeln gruppiert sind. Am Ende eines Fensters werden Timer für jeden in diesem Fenster verarbeiteten Schlüssel ausgelöst. Das Auslösen eines Timers zeigt an, dass das Fenster für einen bestimmten Schlüssel abgelaufen ist. Streaming Engine kann den Timer-Rückstand messen und vorhersagen, wie viele Timer am Ende eines Fensters ausgelöst werden sollen. Unter Verwendung des Timer-Rückstands als Signal kann Dataflow die Verarbeitungsmenge schätzen, die bei zukünftigen Timern ausgeführt werden muss. Basierend auf der geschätzten zukünftigen Arbeitslast wird Dataflow automatisch im Voraus skaliert, um die erwartete Nachfrage zu erfüllen.

Messwerte

Wenn Sie die aktuellen Autoscaling-Limits für einen Job ermitteln möchten, fragen Sie die folgenden Messwerte ab:

  • job/max_worker_instances_limit: Maximale Anzahl von Workern.
  • job/min_worker_instances_limit: Mindestanzahl von Workern.

Wenn Sie Informationen zur Worker-Auslastung abrufen möchten, fragen Sie die folgenden Messwerte ab:

  • job/aggregated_worker_utilization: Die aggregierte Worker-Auslastung.
  • job/worker_utilization_hint: Der aktuelle Hinweis zur Worker-Auslastung.

Wenn Sie Einblicke in das Verhalten des Autoscalings erhalten möchten, fragen Sie den folgenden Messwert ab:

  • job.worker_utilization_hint_is_actively_used: Gibt an, ob das Autoscaling den Hinweis zur Worker-Auslastung aktiv verwendet. Wenn andere Faktoren den Hinweis überschreiben, wenn dieser Messwert erfasst wird, ist der Wert false.
  • job/horizontal_worker_scaling: Beschreibt die Entscheidungen des Autoscalings. Dieser Messwert enthält die folgenden Labels:
    • direction: Gibt an, ob das Autoscaling hoch- oder herunterskaliert wurde oder keine Aktion ausgeführt hat.
    • rationale: Gibt die Gründe für die Entscheidung des Autoscalings an.

Weitere Informationen finden Sie unter Cloud Monitoring-Messwerte. Diese Messwerte werden auch in den Monitoring-Diagrammen für das Autoscaling angezeigt.

Nächste Schritte