Bei Streamingpipelines mit einem hohen Volumen an Eingabedaten gibt es in der Regel einen Kompromiss zwischen Kosten und Latenz. Um eine niedrige Latenz beizubehalten, muss Dataflow Worker hinzufügen, wenn das Trafficvolumen zunimmt. Ein weiterer Faktor ist, wie schnell die Pipeline als Reaktion auf Änderungen der Eingabedatenrate skaliert werden soll.
Der Dataflow-Autoscaler hat Standardeinstellungen, die für viele Arbeitslasten geeignet sind. Es empfiehlt sich jedoch, dieses Verhalten für Ihr spezifisches Szenario gegebenenfalls anzupassen. Beispielsweise kann eine höhere durchschnittliche Latenz akzeptabel sein, um Kosten zu senken, oder Sie möchten, dass Dataflow als Reaktion auf Trafficspitzen schneller hochskaliert.
Sie können die folgenden Parameter anpassen, um das horizontale Autoscaling zu optimieren:
- Autoscaling-Bereich: Die Mindest- und Höchstzahl der zuzuweisenden Worker.
- Hinweis zur Worker-Auslastung: Die CPU-Zielauslastung für Worker.
- Hinweis zur Worker-Parallelität: Die Zielanzahl der Parallelität für Worker.
Autoscaling-Bereich festlegen
Wenn Sie einen neuen Streamingjob erstellen, können Sie die anfängliche und die maximale Anzahl von Workern festlegen. Geben Sie dazu die folgenden Pipelineoptionen an:
Java
--numWorkers: die anfängliche Anzahl der Worker, die verfügbar sind, wenn die Pipeline-Ausführung beginnt--maxNumWorkers: die maximale Anzahl an Workern, die für Ihre Pipeline verfügbar sind
Python
--num_workers: die anfängliche Anzahl der Worker, die verfügbar sind, wenn die Pipeline-Ausführung beginnt--max_num_workers: die maximale Anzahl an Workern, die für Ihre Pipeline verfügbar sind
Go
--num_workers: die anfängliche Anzahl der Worker, die verfügbar sind, wenn die Pipeline-Ausführung beginnt--max_num_workers: die maximale Anzahl an Workern, die für Ihre Pipeline verfügbar sind
Für Streamingjobs mit Streaming Engine ist das Flag --maxNumWorkers optional. Der Standardwert ist 100. Für Streamingjobs ohne Streaming Engine ist --maxNumWorkers erforderlich, wenn horizontales Autoscaling aktiviert ist.
Der Startwert von --maxNumWorkers bestimmt auch, wie viele Persistent Disks dem Job zugewiesen werden.
Streamingpipelines werden mit einem festen Pool nichtflüchtiger Speicher bereitgestellt, deren Anzahl --maxNumWorkers entspricht. Während des Streamings werden Persistent Disks so neu verteilt, dass jeder Worker mit der gleichen Anzahl von Laufwerken verbunden ist.
Wenn Sie --maxNumWorkers festlegen, muss der Wert eine ausreichende Anzahl von Laufwerken für Ihre Pipeline umfassen. Berücksichtigen Sie das zukünftige Wachstum beim Festlegen des Anfangswerts. Informationen zur Leistung von Persistent Disks finden Sie unter Persistent Disk und VMs konfigurieren.
Dataflow stellt die Nutzung von Persistent Disk in Rechnung und hat Compute Engine-Kontingente, einschließlich Persistent Disk-Kontingente.
Die Mindestanzahl von Workern ist standardmäßig 1 für Streamingjobs, die Streaming Engine verwenden, und (maxNumWorkers/15), aufgerundet, für Jobs ohne Streaming Engine.
Autoscaling-Bereich aktualisieren
Bei Jobs, die Streaming Engine verwenden, können Sie die Mindest- und Höchstanzahl von Workern anpassen, ohne den Job zu beenden oder zu ersetzen. Wenn Sie diese Werte anpassen möchten, verwenden Sie eine Aktualisierung eines laufenden Jobs. Aktualisieren Sie die folgenden Joboptionen:
--min-num-workers: die Mindestanzahl von Workern.--max-num-workers: die maximale Anzahl von Workern.
gcloud
Führen Sie den Befehl gcloud dataflow jobs update-options aus:
gcloud dataflow jobs update-options \ --region=REGION \ --min-num-workers=MINIMUM_WORKERS \ --max-num-workers=MAXIMUM_WORKERS \ JOB_ID
Ersetzen Sie dabei Folgendes:
- REGION: Die Regions-ID des regionalen Endpunkts des Jobs.
- MINIMUM_WORKERS: Die Mindestanzahl von Compute Engine-Instanzen.
- MAXIMUM_WORKERS: Die Mindestanzahl von Compute Engine-Instanzen.
- JOB_ID: die ID des zu aktualisierenden Jobs
Sie können --min-num-workers und --max-num-workers auch einzeln aktualisieren.
REST
Verwenden Sie die Methode projects.locations.jobs.update:
PUT https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID?updateMask=runtime_updatable_params.max_num_workers,runtime_updatable_params.min_num_workers { "runtime_updatable_params": { "min_num_workers": MINIMUM_WORKERS, "max_num_workers": MAXIMUM_WORKERS } }
Ersetzen Sie Folgendes:
- PROJECT_ID: die Google Cloud Projekt-ID des Dataflow-Jobs
- REGION: Die Regions-ID des regionalen Endpunkts des Jobs.
- JOB_ID: die ID des zu aktualisierenden Jobs
- MINIMUM_WORKERS: Die Mindestanzahl von Compute Engine-Instanzen.
- MAXIMUM_WORKERS: Die Mindestanzahl von Compute Engine-Instanzen.
Sie können min_num_workers und max_num_workers auch einzeln aktualisieren.
Geben Sie im Abfrageparameter updateMask an, welche Parameter aktualisiert werden sollen, und fügen Sie die aktualisierten Werte in das Feld runtimeUpdatableParams des Anfragetexts ein. Im folgenden Beispiel wird min_num_workers aktualisiert:
PUT https://dataflow.googleapis.com/v1b3/projects/my_project/locations/us-central1/jobs/job1?updateMask=runtime_updatable_params.min_num_workers { "runtime_updatable_params": { "min_num_workers": 5 } }
Bei Jobs ohne Streaming Engine können Sie den vorhandenen Job durch einen neuen Job ersetzen, der einen aktualisierten Wert für maxNumWorkers verwendet.
Wenn Sie einen Streamingjob aktualisieren, der Streaming Engine nicht verwendet, ist das horizontale Autoscaling für den aktualisierten Job standardmäßig deaktiviert. Wenn Sie das Autoscaling aktiviert lassen möchten, geben Sie --autoscalingAlgorithm und --maxNumWorkers für den aktualisierten Job an.
Hinweis zur Worker-Auslastung festlegen
Dataflow verwendet die durchschnittliche CPU-Auslastung als Signal dafür, wann horizontales Autoscaling angewendet werden soll. Standardmäßig legt Dataflow eine Ziel-CPU-Auslastung von 0,8 fest. Wenn die Auslastung außerhalb dieses Bereichs liegt, fügt Dataflow möglicherweise Worker hinzu oder entfernt sie.
Wenn Sie das Autoscaling-Verhalten besser steuern möchten, können Sie die Ziel-CPU-Auslastung auf einen Wert im Bereich [0,1, 0,9] festlegen.
Legen Sie einen niedrigeren CPU-Auslastungswert fest, wenn Sie niedrigere Spitzenlatenzen erzielen möchten. Bei einem niedrigeren Wert kann Dataflow bei steigender Worker-Auslastung aggressiver horizontal skaliert und zur Verbesserung der Stabilität konservativer herunterskaliert werden. Ein niedrigerer Wert bietet auch einen größeren Toleranzbereich, wenn die Pipeline im stabilen Zustand ausgeführt wird. Dies führt im Allgemeinen zu einer geringeren Extremwertlatenz. Die Tail-Latenz gibt die längsten Wartezeiten an, bevor ein neuer Datensatz verarbeitet wird.
Legen Sie einen höheren Wert fest, wenn Sie Ressourcen sparen und die Kosten bei Traffic-Spitzen niedrig halten möchten. Ein höherer Wert verhindert eine übermäßige Hochskalierung, auf Kosten einer höheren Latenz.
Wenn Sie den Hinweis zur Auslastung für einen Job konfigurieren möchten, der nicht auf einer Vorlage basiert, legen Sie die worker_utilization_hint-Dienstoption fest. Bei einem Vorlagenjob aktualisieren Sie stattdessen den Nutzungshinweis, da Dienstoptionen nicht unterstützt werden.
Das folgende Beispiel zeigt, wie worker_utilization_hint verwendet wird:
Java
--dataflowServiceOptions=worker_utilization_hint=TARGET_UTILIZATION
Ersetzen Sie TARGET_UTILIZATION durch einen Wert im Bereich [0,1, 0,9].
Python
--dataflow_service_options=worker_utilization_hint=TARGET_UTILIZATION
Ersetzen Sie TARGET_UTILIZATION durch einen Wert im Bereich [0,1, 0,9].
Go
--dataflow_service_options=worker_utilization_hint=TARGET_UTILIZATION
Ersetzen Sie TARGET_UTILIZATION durch einen Wert im Bereich [0,1, 0,9].
Bei neuen Pipelines empfehlen wir, unter realistischen Lasten mit der Standardeinstellung zu testen. Bewerten Sie dann das Autoscaling-Verhalten für Ihre Pipeline und nehmen Sie bei Bedarf Anpassungen vor.
Der Auslastungshinweis ist nur einer der Faktoren, die Dataflow verwendet, um zu entscheiden, ob Worker skaliert werden sollen. Andere Faktoren wie der Backlog und verfügbare Schlüssel können den Hinweiswert überschreiben. Außerdem ist der Hinweis kein striktes Ziel. Der Autoscaler versucht, die CPU-Auslastung im Bereich des Hinweiswerts zu halten. Der aggregierte Auslastungs-Messwert kann jedoch höher oder niedriger sein. Weitere Informationen finden Sie unter Streaming-Autoscaling-Heuristik.
Auslastungshinweis aktualisieren
So aktualisieren Sie den Hinweis zur Auslastung während der Ausführung eines Jobs:
gcloud
Führen Sie den Befehl gcloud dataflow jobs update-options aus:
gcloud dataflow jobs update-options \ --region=REGION \ --worker-utilization-hint=TARGET_UTILIZATION \ JOB_ID
Ersetzen Sie dabei Folgendes:
- REGION: Die Regions-ID des regionalen Endpunkts des Jobs.
- JOB_ID: die ID des zu aktualisierenden Jobs
- TARGET_UTILIZATION: ein Wert im Bereich [0,1, 0,9]
Verwenden Sie den folgenden gcloud-Befehl, um den Hinweis zur Auslastung auf den Standardwert zurückzusetzen:
gcloud dataflow jobs update-options \ --unset-worker-utilization-hint \ --region=REGION \ --project=PROJECT_ID \ JOB_ID
REST
Verwenden Sie die Methode projects.locations.jobs.update:
PUT https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID?updateMask=runtime_updatable_params.worker_utilization_hint { "runtime_updatable_params": { "worker_utilization_hint": TARGET_UTILIZATION } }
Ersetzen Sie Folgendes:
- PROJECT_ID: die Google Cloud Projekt-ID des Dataflow-Jobs.
- REGION: Die Regions-ID des regionalen Endpunkts des Jobs.
- JOB_ID: Die ID des zu aktualisierenden Jobs.
- TARGET_UTILIZATION: ein Wert im Bereich [0,1, 0,9]
Hinweis zur Worker-Parallelität festlegen
Um das Autoscaling bei langen Vorgängen zu verarbeiten, die weniger auf CPUs angewiesen sind, z. B. ML-intensive Arbeitslasten, können Sie den Hinweis zur Worker-Parallelität mit Apache Beam-Ressourcenhinweisen festlegen. Diese Hinweise schalten das Autoscaling in einen anderen Modus um, der für GPU-intensive Arbeitslasten oder Transformationen mit langer Verarbeitungszeit optimiert ist.
Das folgende Beispiel zeigt, wie Sie einer Transformation einen Parallelitätshinweis hinzufügen:
Java
pcoll.apply(MyCompositeTransform.of(...)
.setResourceHints(
ResourceHints.create()
.withMaxActiveBundlesPerWorker(TARGET_PARALLELISM_PER_WORKER)))
Ersetzen Sie TARGET_PARALLELISM_PER_WORKER durch einen Wert, der für Ihren Anwendungsfall geeignet ist. Allgemeine Hinweise zur Auswahl eines guten Startwerts
Python
pcoll | MyPTransform().with_resource_hints(
max_active_bundles_per_worker="TARGET_PARALLELISM_PER_WORKER")
Ersetzen Sie TARGET_PARALLELISM_PER_WORKER durch einen Wert, der für Ihren Anwendungsfall geeignet ist. Allgemeine Hinweise zur Auswahl eines guten Startwerts
Wert für Hinweis zur Worker-Parallelität auswählen
Für ML-Anwendungsfälle entspricht ein guter Startwert der Anzahl der Modelle, die parallel auf jedem Worker ausgeführt werden. Dieser Wert ist durch die Kapazität der Beschleuniger auf dem Worker und die Größe des Modells begrenzt.
Bei anderen Anwendungsfällen ist die Pipeline entweder speicher- oder CPU-gebunden. Bei speicherlastigen Pipelines können Sie das Arbeitsspeicherlimit verwenden, um die maximale parallele Verarbeitung zu berechnen. Für CPU-gebundene Pipelines wird empfohlen, die Standardrichtlinie für die automatische Skalierung beizubehalten, anstatt einen Hinweis zur Parallelität anzugeben.
Sie können den Wert an die Verarbeitungsanforderungen anderer Phasen anpassen, z. B. das Schreiben in eine Senke. Wenn Sie den Wert um 1 oder 2 erhöhen, wenn Ihr Modellparallelismus 2 ist, wird die schnellere Verarbeitung beim Schreiben in den Senken erkannt, da mehr Spielraum für die Verarbeitung in anderen Phasen vorhanden ist. Wenn in Ihrer Pipeline keine Shuffle-Vorgänge enthalten sind und Transformationen in einer einzigen Phase zusammengeführt werden, müssen Sie den Wert für andere Transformationen nicht anpassen.
Dieser Wert kann auch angepasst werden, um die Auswirkungen akzeptabler Verzögerungen bei der Nachfrage zu simulieren. Wenn Sie beispielsweise mit einer maximalen Verzögerung von 10 Minuten zufrieden sind und die durchschnittliche Verarbeitungszeit Ihres Modells 1 Minute beträgt, können Sie den Wert um 1 erhöhen, sofern die maximale Anzahl der Worker auf 10 festgelegt ist.
Heuristik für GPU-intensives Autoscaling
In der GPU-intensiven Umgebung, die durch den Hinweis zur Einstellung der Parallelität angegeben wird, berücksichtigt Dataflow beim automatischen Skalieren mehrere Faktoren. Diese Faktoren umfassen:
- Verfügbare Schlüssel: Schlüssel sind die grundlegende Einheit der Parallelverarbeitung in Dataflow.
- Maximale Anzahl aktiver Bundles pro Worker: Dies gibt die maximale ideale Anzahl von Verarbeitungsparallelität innerhalb des Workers an.
Die allgemeine Idee hinter Skalierungsentscheidungen besteht darin, die Worker zu berechnen, die erforderlich sind, um die aktuelle Last zu bewältigen, die durch verfügbare Schlüssel signalisiert wird. Wenn beispielsweise 100 Schlüssel zur Verarbeitung verfügbar sind und die maximale Parallelität pro Worker 10 beträgt, sollten Sie insgesamt 10 Worker verwenden.
Wenn Ihre Pipeline komplex ist und sowohl eine GPU-intensive Arbeitslast als auch zahlreiche CPU-intensive Transformationen enthält, empfehlen wir, Right-Fitting zu aktivieren. So kann der Dienst zwischen CPU-intensiven und GPU-intensiven Aufgaben unterscheiden und die einzelnen Worker-Pools entsprechend skalieren.
Heuristik für Streaming-Autoscaling
Bei Streamingpipelines besteht das Ziel des horizontalen Autoscalings darin, den Rückstand zu minimieren und gleichzeitig die Worker-Auslastung und den Durchsatz zu maximieren. Außerdem soll dadurch schnell auf Lastspitzen reagiert werden können.
Beim Autoscaling berücksichtigt Dataflow mehrere Faktoren, darunter:
Rückstand. Die geschätzte Rückstandszeit wird sowohl aus dem Durchsatz als auch aus den Rückstandbyte berechnet, die noch aus der Eingabequelle verarbeitet werden müssen. Eine Pipeline gilt als "im Rückstand", wenn die geschätzte Rückstandszeit mehr als 15 Sekunden beträgt.
CPU-Zielauslastung. Der Standardwert für die durchschnittliche CPU-Auslastung ist 0,8. Sie können diesen Wert überschreiben.
Verfügbare Schlüssel: Schlüssel sind die grundlegende Einheit der Parallelverarbeitung in Dataflow.
In einigen Fällen werden die folgenden Faktoren für Autoscaling-Entscheidungen verwendet. Wenn diese Faktoren für Ihren Job verwendet werden, sehen Sie diese Informationen auf dem Tab Autoscaling für Messwerte.
Bei der schlüsselbasierten Drosselung wird die Anzahl der vom Job empfangenen Verarbeitungsschlüssel verwendet, um die Obergrenze für Nutzer-Worker zu berechnen, da jeder Schlüssel nur von einem Worker auf einmal verarbeitet werden kann.
Dämpfung bei Herunterskalierung: Wenn Dataflow instabile Autoscaling-Entscheidungen erkennt, wird die Herunterskalierung verlangsamt, um die Stabilität zu verbessern.
CPU-basierte Skalierung verwendet eine hohe CPU-Auslastung als Hochskalierungskriterium.
Bei Streamingjobs ohne Streaming Engine kann die Skalierung durch die Anzahl der Persistent Disks eingeschränkt sein. Weitere Informationen finden Sie unter Autoscaling-Bereich festlegen.
GPU-intensives Autoscaling, wenn es durch Festlegen des Hinweises zur Worker-Parallelität aktiviert wird. Weitere Informationen finden Sie unter Autoscaling-Heuristik für GPU-intensive Anwendungen.
Hochskalieren. Wenn eine Streamingpipeline für einige Minuten Rückstände bei ausreichender Parallelität auf den Workern hat, wird Dataflow hochskaliert. Dataflow versucht, den Rückstand unter Berücksichtigung des aktuellen Durchsatzes pro Worker innerhalb von etwa 150 Sekunden nach dem Hochskalieren zu löschen. Wenn es Rückstände gibt, der Worker jedoch nicht genügend Parallelität für zusätzliche Worker hat, wird die Pipeline nicht hochskaliert. (Wenn Sie die Anzahl der Worker über die Anzahl der Schlüssel hinaus skalieren, die für die parallele Verarbeitung verfügbar sind, wird der Rückstand nicht schneller verarbeitet.)
Herunterskalieren Wenn das Autoscaling eine Entscheidung zum Herunterskalieren trifft, ist der Rückstand der höchste Prioritätsfaktor. Das Autoscaling zielt auf einen Rückstand von höchstens 15 Sekunden ab. Wenn der Rückstand unter 10 Sekunden fällt und die durchschnittliche Worker-Auslastung unter dem Ziel für die CPU-Auslastung liegt, wird Dataflow herunterskaliert. Solange der Backlog akzeptabel ist, versucht das Autoscaling, die CPU-Auslastung nahe der CPU-Zielauslastung zu halten. Wenn die Auslastung jedoch bereits ausreichend nahe am Zielwert liegt, behält das Autoscaling möglicherweise die Anzahl der Worker unverändert bei, da jeder Herunterskalierungsschritt Kosten verursacht.
Streaming Engine verwendet auch ein Verfahren für vorausschauendes Autoscaling, das auf dem Timer-Rückstand basiert. Unbegrenzte Daten in einer Streaming-Pipeline werden in Fenster aufgeteilt, die nach Zeitstempeln gruppiert sind. Am Ende eines Fensters werden Timer für jeden in diesem Fenster verarbeiteten Schlüssel ausgelöst. Das Auslösen eines Timers zeigt an, dass das Fenster für einen bestimmten Schlüssel abgelaufen ist. Streaming Engine kann den Timer-Rückstand messen und vorhersagen, wie viele Timer am Ende eines Fensters ausgelöst werden sollen. Unter Verwendung des Timer-Rückstands als Signal kann Dataflow die Verarbeitungsmenge schätzen, die bei zukünftigen Timern ausgeführt werden muss. Basierend auf der geschätzten zukünftigen Arbeitslast wird Dataflow automatisch im Voraus skaliert, um die erwartete Nachfrage zu erfüllen.
Messwerte
Wenn Sie die aktuellen Autoscaling-Limits für einen Job ermitteln möchten, fragen Sie die folgenden Messwerte ab:
job/max_worker_instances_limit: Maximale Anzahl von Workern.job/min_worker_instances_limit: Mindestanzahl von Workern.
Wenn Sie Informationen zur Worker-Auslastung abrufen möchten, fragen Sie die folgenden Messwerte ab:
job/aggregated_worker_utilization: Die aggregierte Worker-Auslastung.job/worker_utilization_hint: Der aktuelle Hinweis zur Worker-Auslastung.
Wenn Sie Informationen zum Verhalten des Autoscalers erhalten möchten, fragen Sie den folgenden Messwert ab:
job.worker_utilization_hint_is_actively_used: Gibt an, ob der Autoscaler den Hinweis zur Worker-Auslastung aktiv verwendet. Wenn andere Faktoren den Hinweis überschreiben, wenn dieser Messwert erhoben wird, ist der Wertfalse.job/horizontal_worker_scaling: Beschreibt die Entscheidungen, die vom Autoscaler getroffen wurden. Dieser Messwert enthält die folgenden Labels:direction: Gibt an, ob das Autoscaling hoch- oder herunterskaliert wurde oder keine Aktion ausgeführt hat.rationale: Gibt die Begründung für die Entscheidung des Autoscalings an.
Weitere Informationen finden Sie unter Cloud Monitoring-Messwerte. Diese Messwerte werden auch in den Monitoring-Diagrammen für das Autoscaling angezeigt.