Auf dieser Seite finden Sie Anleitungen und Empfehlungen für das Upgrade Ihrer Streamingpipelines. Möglicherweise müssen Sie beispielsweise ein Upgrade auf eine neuere Version des Apache Beam SDK durchführen oder Ihren Pipelinecode aktualisieren. Es gibt verschiedene Optionen für unterschiedliche Szenarien.
Im Unterschied zu Batchpipelines, die nach Abschluss des Jobs beendet werden, müssen Streamingpipelines häufig kontinuierlich ausgeführt werden, um eine unterbrechungsfreie Verarbeitung zu ermöglichen. Wenn Sie Streamingpipelines aktualisieren, müssen Sie daher die folgenden Aspekte berücksichtigen:
- Möglicherweise müssen Sie Störungen der Pipeline minimieren oder vermeiden. In einigen Fällen können Sie möglicherweise eine vorübergehende Unterbrechung der Verarbeitung tolerieren, während eine neue Version einer Pipeline bereitgestellt wird. In anderen Fällen kann Ihre Anwendung Unterbrechungen nicht tolerieren.
- Pipeline-Aktualisierungsprozesse müssen Schemaänderungen so handhaben, dass Unterbrechungen der Nachrichtenverarbeitung und anderer damit verbundener Systeme minimiert werden. Wenn sich beispielsweise das Schema für Nachrichten in einer Pipeline zur Ereignisverarbeitung ändert, sind möglicherweise auch Schemaänderungen in nachgelagerten Datensenken erforderlich.
Je nach Pipeline und Aktualisierungsanforderungen können Sie eine der folgenden Methoden verwenden, um Streamingpipelines zu aktualisieren:
- Aktualisierungen während der Übertragung durchführen
- Ersatzjob starten
- Parallele Pipelines ausführen
Weitere Informationen zu Problemen, die bei einem Update auftreten können, und dazu, wie Sie diese verhindern können, finden Sie unter Einen Ersatzjob validieren und Job-Kompatibilitätsprüfung.
Best Practices
- Führen Sie das Upgrade der Apache Beam SDK-Version unabhängig von Änderungen am Pipelinecode durch.
- Testen Sie Ihre Pipeline nach jeder Änderung, bevor Sie weitere Updates vornehmen.
- Aktualisieren Sie regelmäßig die Apache Beam SDK-Version, die von Ihrer Pipeline verwendet wird.
- Verwenden Sie nach Möglichkeit automatisierte Methoden wie In-Flight-Updates oder automatisierte parallele Pipeline-Updates.
- Verwenden Sie nach Möglichkeit Managed I/O, um die Vorteile automatischer Upgrades von Connector-Versionen zu nutzen.
Aktualisierungen während der Übertragung durchführen
Sie können einige laufende Streamingpipelines aktualisieren, ohne den Job zu beenden. Dieses Szenario wird als Jobaktualisierung im laufenden betrieb bezeichnet. Aktualisierungen laufender Jobs sind nur unter bestimmten Umständen möglich:
- Für den Job muss Streaming Engine verwendet werden.
- Der Job muss den Status „Wird ausgeführt“ haben.
- Sie ändern nur die Anzahl der Worker, die vom Job verwendet werden.
Weitere Informationen finden Sie auf der Seite „Horizontales Autoscaling“ unter Autoscaling-Bereich festlegen.
Eine Anleitung zum Aktualisieren eines laufenden Jobs finden Sie unter Vorhandene Pipeline aktualisieren.
Automatisches Erstellen oder Aktualisieren (Upsert) von Vorlagen
Wenn Sie Pipelines mit einer Vorlage (klassische Vorlagen, flexible Vorlagen, Terraform oder Config Connector) starten, können Sie das create_or_update_job-Experiment verwenden, um die Funktion zum Erstellen oder Aktualisieren (Upsert) zu nutzen.
Wenn Sie create_or_update_job im Parameter additional_experiments oder im Flag additional-experiments angeben:
- Wenn bereits ein laufender oder auslaufender Job mit dem angegebenen Jobnamen vorhanden ist, wird der neue Job vom Vorlagendienst automatisch als Update für den vorhandenen Job gestartet.
- Wenn kein aktiver Job mit diesem Namen vorhanden ist, wird der neue Job vom Vorlagendienst als neuer Job gestartet.
Bei diesem Test ist es nicht mehr erforderlich, programmatisch zu ermitteln, ob beim Starten einer Vorlage die API-Aktion „create“ oder „update“ verwendet werden soll.
Terraform- und Config Connector-Codebeispiele, in denen dieses Testverfahren verwendet wird, finden Sie in den folgenden Abschnitten:
- Automatisierte Anfrage zum Ersetzen von Artikeln senden
- Automatisierte parallele Pipeline-Aktualisierungsanfrage senden
Ersatzjob starten
Wenn der aktualisierte Job mit dem vorhandenen Job kompatibel ist, können Sie Ihre Pipeline mit der Option update aktualisieren. Wenn Sie einen vorhandenen Job ersetzen, wird ein neuer Job mit dem aktualisierten Pipelinecode ausgeführt.
Der Dataflow-Dienst behält den Jobnamen bei, führt den Ersatzjob aber mit einer aktualisierten Job-ID aus. Dieser Vorgang kann zu Ausfallzeiten führen, während der vorhandene Job angehalten wird, die Kompatibilitätsprüfung ausgeführt wird und der neue Job gestartet wird. Weitere Informationen finden Sie unter Auswirkungen des Ersetzens eines Jobs.
Dataflow führt eine Kompatibilitätsprüfung durch, um sicherzustellen, dass der aktualisierte Pipelinecode sicher auf der ausgeführten Pipeline bereitgestellt werden kann. Bestimmte Codeänderungen führen dazu, dass die Kompatibilitätsprüfung fehlschlägt, z. B. wenn Nebeneingaben einem vorhandenen Schritt hinzugefügt oder daraus entfernt werden. Wenn die Kompatibilitätsprüfung fehlschlägt, können Sie kein direktes Job-Update durchführen.
Eine Anleitung zum Starten eines Ersatzjobs finden Sie unter Ersatzjob starten.
Wenn die Pipelineaktualisierung nicht mit dem aktuellen Job kompatibel ist, müssen Sie die Pipeline beenden und ersetzen. Wenn Ihre Pipeline keine Ausfallzeiten tolerieren kann, führen Sie parallele Pipelines aus.
Manuelles Beenden und Ersetzen
Wenn Sie einen manuellen Stopp und Ersatz durchführen möchten, brechen Sie die Pipeline ab oder leeren Sie sie und ersetzen Sie sie dann durch die aktualisierte Pipeline. Das Abbrechen einer Pipeline führt dazu, dass Dataflow die Verarbeitung sofort stoppt und Ressourcen schnellstmöglich herunterfährt. Dies kann zu einem gewissen Verlust an gerade verarbeiteten Daten führen. Diese werden als In-Flight-Daten bezeichnet. Zum Vermeiden von Datenverlusten ist in den meisten Fällen das Leeren von Daten die bevorzugte Aktion. Sie können Dataflow-Snapshots auch verwenden, um den Status einer Streamingpipeline zu speichern. So können Sie eine neue Version Ihres Dataflow-Jobs starten, ohne den Status zu verlieren. Weitere Informationen finden Sie unter Dataflow-Snapshots verwenden.
Durch das Leeren der Pipeline werden alle laufenden Fenster sofort geschlossen und alle Trigger ausgelöst. Obwohl "In-Flight-Daten" nicht verloren gehen, kann das Leeren der Pipeline dazu führen, dass die Daten in Fenstern unvollständig sind. In diesem Fall geben Prozessfenster nur teilweise oder unvollständige Ergebnisse aus. Weitere Informationen finden Sie unter Auswirkungen des Leerens eines Jobs. Nachdem der vorhandene Job abgeschlossen ist, können Sie einen neuen Streamingjob starten, der den aktualisierten Pipelinecode enthält. Dadurch kann die Verarbeitung fortgesetzt werden.
Bei dieser Methode entsteht zwischen dem Zeitpunkt, an dem der bestehende Streamingjob beendet wird, und dem Zeitpunkt, an dem die Ersatzpipeline zur Wiederaufnahme der Datenverarbeitung bereit ist, eine gewisse Ausfallzeit. Das Abbrechen oder Leeren einer vorhandenen Pipeline und das anschließende Starten eines neuen Jobs mit der aktualisierten Pipeline ist jedoch weniger kompliziert als das Ausführen paralleler Pipelines.
Weitere Informationen finden Sie unter Dataflow-Job per Drain beenden. Nachdem Sie den aktuellen Job beendet haben, starten Sie einen neuen Job mit demselben Jobnamen.
Automatisches Beenden und Ersetzen
Dataflow bietet API-Unterstützung für das Starten eines automatischen Stop-and-Replace-Updates. Bei diesem deklarativen Workflow entfallen die manuellen prozeduralen Schritte. Sie deklarieren den zu ersetzenden Job. Der neue Job wird gestartet und koordiniert den Übergang automatisch.
Wenn Sie diesen Workflow verwenden, werden neue Jobressourcen bereitgestellt, während der alte Job noch ausgeführt wird. Der alte Job erhält dann automatisch ein Drain-Signal. Nachdem der alte Job beendet wurde oder ein vom Nutzer angegebenes Zeitlimit erreicht hat, beginnt der neue Job sofort mit der Verarbeitung von Daten. Verwenden Sie diesen Workflow für Pipelines, bei denen keine doppelten Daten oder Teilaggregationen toleriert werden können, bei denen aber eine kurze Verarbeitungspause akzeptabel ist, während der alte Job beendet wird.
Automatisierte Anfrage für das Ersetzen von Inhalten senden
So verwenden Sie diesen Workflow:
- Sie müssen die Option
parallel_replace_job_max_stop_durationfestlegen. - Sie dürfen die Option
parallel_replace_job_min_parallel_pipelines_durationnicht festlegen. Wenn Sie eine parallele Dauer festlegen, wird stattdessen der Workflow für automatische parallele Pipeline-Updates ausgelöst.
Starten Sie eine automatisierte Anfrage zum Beenden und Ersetzen mit den folgenden Dienstoptionen:
Java
Option 1: Aktualisierung mit demselben Jobnamen
--update \
--dataflowServiceOptions="update_strategy_parallel_job_update" \
--dataflowServiceOptions="parallel_replace_job_max_stop_duration=DURATION"
- Wenn Sie ein automatisches Stop-and-Replace-Update mit demselben Namen durchführen möchten, verwenden Sie das Flag
--updateund die Optionupdate_strategy_parallel_job_update. - Wenn Sie stattdessen ein direktes Update durchführen möchten, verwenden Sie
update_strategy_in_place_update.
Option 2: Mit einem anderen Jobnamen aktualisieren
--dataflowServiceOptions="parallel_replace_job_name=OLD_JOB_NAME" \
--dataflowServiceOptions="parallel_replace_job_max_stop_duration=DURATION"
- Wenn Sie den alten Job anhand der ID anstelle des Jobnamens angeben möchten, verwenden Sie
--dataflowServiceOptions="parallel_replace_job_id=OLD_JOB_ID". - Wenn Sie einen neuen Jobnamen angeben und das Flag
--updateverwenden, sucht Dataflow nach einem vorhandenen Job mit dem neuen Namen. Dies führt zu einem Fehler.
Optional: Automatische Kündigung deaktivieren
Die automatische Kündigung ist standardmäßig aktiviert, wenn Sie die Option parallel_replace_job_max_stop_duration angeben. Wenn Sie die automatische Kündigung deaktivieren möchten, setzen Sie die Option parallel_replace_job_cancel_on_drain_timeout auf false.
--dataflowServiceOptions="parallel_replace_job_cancel_on_drain_timeout=false"
Wenn Sie die automatische Kündigung deaktivieren und der alte Job im Status „Wird beendet“ hängen bleibt, werden sowohl der alte als auch der neue Job parallel ausgeführt.
Python
Option 1: Aktualisierung mit demselben Jobnamen
--update \
--dataflow_service_options="update_strategy_parallel_job_update" \
--dataflow_service_options="parallel_replace_job_max_stop_duration=DURATION"
- Wenn Sie ein automatisches Stop-and-Replace-Update mit demselben Namen durchführen möchten, verwenden Sie das Flag
--updateund die Optionupdate_strategy_parallel_job_update. - Wenn Sie stattdessen ein direktes Update durchführen möchten, verwenden Sie
update_strategy_in_place_update.
Option 2: Mit einem anderen Jobnamen aktualisieren
--dataflow_service_options="parallel_replace_job_name=OLD_JOB_NAME" \
--dataflow_service_options="parallel_replace_job_max_stop_duration=DURATION"
- Wenn Sie den alten Job anhand der ID anstelle des Jobnamens angeben möchten, verwenden Sie
--dataflow_service_options="parallel_replace_job_id=OLD_JOB_ID". - Wenn Sie einen neuen Jobnamen angeben und das Flag
--updateverwenden, sucht Dataflow nach einem vorhandenen Job mit dem neuen Namen. Dies führt zu einem Fehler.
Optional: Automatische Kündigung deaktivieren
Die automatische Kündigung ist standardmäßig aktiviert, wenn Sie die Option parallel_replace_job_max_stop_duration angeben. Wenn Sie die automatische Kündigung deaktivieren möchten, setzen Sie die Option parallel_replace_job_cancel_on_drain_timeout auf false.
--dataflow_service_options="parallel_replace_job_cancel_on_drain_timeout=false"
Wenn Sie die automatische Kündigung deaktivieren und der alte Job im Status „Wird beendet“ hängen bleibt, werden sowohl der alte als auch der neue Job parallel ausgeführt.
Go
Option 1: Aktualisierung mit demselben Jobnamen
--update \
--dataflow_service_options="update_strategy_parallel_job_update" \
--dataflow_service_options="parallel_replace_job_max_stop_duration=DURATION"
- Wenn Sie ein automatisches Stop-and-Replace-Update mit demselben Namen durchführen möchten, verwenden Sie das Flag
--updateund die Optionupdate_strategy_parallel_job_update. - Wenn Sie stattdessen ein direktes Update durchführen möchten, verwenden Sie
update_strategy_in_place_update.
Option 2: Mit einem anderen Jobnamen aktualisieren
--dataflow_service_options="parallel_replace_job_name=OLD_JOB_NAME" \
--dataflow_service_options="parallel_replace_job_max_stop_duration=DURATION"
- Wenn Sie den alten Job anhand der ID anstelle des Jobnamens angeben möchten, verwenden Sie
--dataflow_service_options="parallel_replace_job_id=OLD_JOB_ID". - Wenn Sie einen neuen Jobnamen angeben und das Flag
--updateverwenden, sucht Dataflow nach einem vorhandenen Job mit dem neuen Namen. Dies führt zu einem Fehler.
Optional: Automatische Kündigung deaktivieren
Die automatische Kündigung ist standardmäßig aktiviert, wenn Sie die Option parallel_replace_job_max_stop_duration angeben. Wenn Sie die automatische Kündigung deaktivieren möchten, setzen Sie die Option parallel_replace_job_cancel_on_drain_timeout auf false.
--dataflow_service_options="parallel_replace_job_cancel_on_drain_timeout=false"
Wenn Sie die automatische Kündigung deaktivieren und der alte Job im Status „Wird beendet“ hängen bleibt, werden sowohl der alte als auch der neue Job parallel ausgeführt.
gcloud
Option 1: Aktualisierung mit demselben Jobnamen
--update \
--additional-experiments="update_strategy_parallel_job_update" \
--additional-experiments="parallel_replace_job_max_stop_duration=DURATION"
- Wenn Sie ein automatisches Stop-and-Replace-Update mit demselben Namen durchführen möchten, verwenden Sie das Flag
--updateund die Optionupdate_strategy_parallel_job_update. - Wenn Sie stattdessen ein direktes Update durchführen möchten, verwenden Sie
update_strategy_in_place_update.
Option 2: Mit einem anderen Jobnamen aktualisieren
--additional-experiments="parallel_replace_job_name=OLD_JOB_NAME" \
--additional-experiments="parallel_replace_job_max_stop_duration=DURATION"
- Wenn Sie den alten Job anhand der ID anstelle des Jobnamens angeben möchten, verwenden Sie
--additional-experiments="parallel_replace_job_id=OLD_JOB_ID". - Wenn Sie einen neuen Jobnamen angeben und das Flag
--updateverwenden, sucht Dataflow nach einem vorhandenen Job mit dem neuen Namen. Dies führt zu einem Fehler.
Optional: Automatische Kündigung deaktivieren
Die automatische Kündigung ist standardmäßig aktiviert, wenn Sie die Option parallel_replace_job_max_stop_duration angeben. Wenn Sie die automatische Kündigung deaktivieren möchten, setzen Sie die Option parallel_replace_job_cancel_on_drain_timeout auf false.
--additional-experiments="parallel_replace_job_cancel_on_drain_timeout=false"
Wenn Sie die automatische Kündigung deaktivieren und der alte Job im Status „Wird beendet“ hängen bleibt, werden sowohl der alte als auch der neue Job parallel ausgeführt.
Optional: Upsert (create or update job)
So aktivieren Sie das Upsert-Verhalten (Job erstellen oder aktualisieren):
--additional-experiments="create_or_update_job"
Terraform
additional_experiments = [
"parallel_replace_job_max_stop_duration=DURATION",
"parallel_replace_job_cancel_on_drain_timeout=true",
"update_strategy_parallel_job_update",
"parallel_replace_job_preallocate_compute_resources=true",
"create_or_update_job"
]
Config Connector
metadata:
annotations:
# Force to use only direct controller. Multiple controllers can cause a Dataflow job to enter into a continuous update loop.
# https://docs.cloud.google.com/config-connector/docs/concepts/controller-types#underlying-controller-types
alpha.cnrm.cloud.google.com/reconciler: direct
# Optional but recommended: Dataflow batch jobs do not support the drain operation. But for Dataflow streaming jobs, prefer "drain" over "cancel" as an on-delete option.
cnrm.cloud.google.com/on-delete: drain
# Optional but recommended: Set deletion-policy to "abandon" to avoid accidental deletion, this will ignore the on-delete option.
cnrm.cloud.google.com/deletion-policy: abandon
spec:
...
additionalExperiments:
- "parallel_replace_job_max_stop_duration=DURATION"
- "parallel_replace_job_cancel_on_drain_timeout=true"
- "update_strategy_parallel_job_update"
- "parallel_replace_job_preallocate_compute_resources=true"
- "create_or_update_job"
Ersetzen Sie die folgenden Variablen:
- Sie müssen entweder
parallel_replace_job_nameoderparallel_replace_job_idangeben, um den zu ersetzenden Job zu identifizieren:OLD_JOB_NAME: Der Name des Jobs, der ersetzt werden soll.OLD_JOB_ID: Die ID des zu ersetzenden Jobs.
- Sie müssen den Wert
parallel_replace_job_max_stop_durationangeben, um das automatische Beenden und Ersetzen zu aktivieren:DURATION: Die maximale Zeit, die der neue Job wartet, bis der alte Job beendet ist. Die Dauer muss als String formatiert werden, der mits,moderhendet (z. B.30m,1h).
- Legen Sie die Option
parallel_replace_job_min_parallel_pipelines_durationnicht fest, wenn Sie diesen Workflow verwenden. Wenn Sie diese Option festlegen, wird stattdessen der Workflow für automatisierte parallele Pipeline-Updates ausgelöst. - Optional: Konfigurieren Sie die Option
parallel_replace_job_cancel_on_drain_timeout. Da die automatische Kündigung standardmäßig aktiviert ist (Standardwert isttrue), wenn die Optionparallel_replace_job_max_stop_durationfestgelegt ist, müssen Sie diese Option nicht explizit konfigurieren, um sie zu aktivieren.- Wenn Sie das Standardverhalten beibehalten möchten, lassen Sie diese Option weg oder legen Sie sie auf
truefest. - Wenn Sie die automatische Kündigung deaktivieren möchten, legen Sie diese Option auf
falsefest. Wenn Sie diese Option auffalsefestlegen und der alte Job im Status „Wird beendet“ hängen bleibt, werden sowohl der alte als auch der neue Job parallel ausgeführt.
- Wenn Sie das Standardverhalten beibehalten möchten, lassen Sie diese Option weg oder legen Sie sie auf
- Optional: Konfiguration für
parallel_replace_job_preallocate_compute_resourcesdefinieren:- Gibt an, ob Worker für den neuen Job vorab bereitgestellt werden, während der alte Job beendet wird. Werte:
true(Standard) oderfalse. Für Terraform und Config Connector wird empfohlen, diese Option auftruezu setzen, um Zeitüberschreitungen bei der Ressourcenbereitstellung zu vermeiden. Wennparallel_replace_job_preallocate_compute_resourcesauffalsegesetzt ist, bleibt der neue Job im Status „Ausstehend“, bis der alte Job abgeschlossen ist.
- Gibt an, ob Worker für den neuen Job vorab bereitgestellt werden, während der alte Job beendet wird. Werte:
Neuverarbeitung von Nachrichten mit Pub/Sub-Snapshot und Seek
In einigen Situationen müssen Sie nach dem Ersetzen oder Abbrechen einer geleerten Pipeline möglicherweise zuvor übermittelte Pub/Sub-Nachrichten noch einmal verarbeiten. Es kann beispielsweise sein, dass Sie Daten mithilfe der aktualisierten Geschäftslogik neu verarbeiten müssen. Pub/Sub Seek ist ein Feature, mit dem Sie Nachrichten aus einem Pub/Sub-Snapshot wiedergeben können. Sie können Pub/Sub Seek mit Dataflow verwenden, um Nachrichten ab dem Zeitpunkt der Erstellung des Abo-Snapshots noch einmal zu verarbeiten.
Während der Entwicklung und des Tests können Sie Pub/Sub Seek auch zum wiederholten Abspielen der bekannten Mitteilungen verwenden, um die Ausgabe Ihrer Pipeline zu kontrollieren. Wenn Sie Pub/Sub Seek verwenden, suchen Sie nicht nach einem Abo-Snapshot, wenn das Abo von einer Pipeline genutzt wird. Wenn Sie dies tun, kann das Seek die Wasserzeichenlogik von Dataflow ungültig machen und die genau einmalige Verarbeitung von Pub/Sub-Nachrichten beeinflussen.
Ein empfohlener gcloud CLI-Workflow für die Verwendung von Pub/Sub Seek mit Dataflow-Pipelines in einem Terminalfenster lautet so:
Verwenden Sie den Befehl
gcloud pubsub snapshots create, um einen Snapshot des Abos zu erstellen:gcloud pubsub snapshots create SNAPSHOT_NAME --subscription=PIPELINE_SUBSCRIPTION_NAME
Verwenden Sie zum Leeren oder Abbrechen der Pipeline den Befehl
gcloud dataflow jobs drainoder den Befehlgcloud dataflow jobs cancel:gcloud dataflow jobs drain JOB_ID
oder
gcloud dataflow jobs cancel JOB_ID
Verwenden Sie den Befehl
gcloud pubsub subscriptions seek, um zum Snapshot zu springen:gcloud pubsub subscriptions seek SNAPSHOT_NAME
Stellen Sie eine neue Pipeline bereit, die das Abo nutzt.
Parallele Pipelines ausführen
Wenn Sie während eines Updates Unterbrechungen Ihrer Streamingpipeline vermeiden möchten, können Sie parallele Pipelines ausführen. Bei diesem Ansatz können Sie einen neuen Streamingjob mit Ihrem aktualisierten Pipelinecode starten und ihn parallel zum vorhandenen Job ausführen. Sie können den automatisierten parallelen Pipeline-Aktualisierungs- und Bereitstellungsworkflow von Dataflow verwenden oder die Schritte manuell ausführen.
Parallele Pipelines – Übersicht
Verwenden Sie beim Erstellen der neuen Pipeline dieselbe Windowing-Strategie, die Sie für die vorhandene Pipeline verwendet haben. Lassen Sie die vorhandene Pipeline für den manuellen Workflow so lange laufen, bis das Wasserzeichen den Zeitstempel des frühesten vollständigen Fensters überschreitet, das von der aktualisierten Pipeline verarbeitet wird. Leeren Sie die vorhandene Pipeline dann oder brechen Sie sie ab. Wenn Sie den automatisierten Workflow verwenden, wird diese Aufgabe für Sie erledigt. Die aktualisierte Pipeline läuft an ihrer Stelle weiter und übernimmt die Verarbeitung effektiv selbstständig.
Im folgenden Diagramm wird dieser Vorgang veranschaulicht.
Im Diagramm ist Pipeline B der aktualisierte Job, der die Pipeline A übernimmt. Der Wert t ist der Zeitstempel des frühesten vollständigen Fensters, das von Pipeline B verarbeitet wird. Der Wert w ist das Wasserzeichen für Pipeline A. Der Einfachheit halber wird von einem perfekten Wasserzeichen ohne verspätete Daten ausgegangen. Die Verarbeitungszeit und die Wanduhrzeit werden auf der horizontalen Achse dargestellt. Beide Pipelines verwenden feste (gleitende) Fünf-Minuten-Fenster. Die Ergebnisse werden ausgelöst, nachdem das Wasserzeichen das Ende des Fensters passiert.
Da die gleichzeitige Ausgabe während des Zeitraums auftritt, in der sich die beiden Pipelines überlappen, sollten Sie die beiden Pipelines so konfigurieren, dass die Ergebnisse in unterschiedliche Ziele geschrieben werden. Nachgelagerte Systeme können dann eine Abstraktion der beiden Zielsenken wie eine Datenbankansicht verwenden, um die kombinierten Ergebnisse abzufragen. Diese Systeme können die Abstraktion auch verwenden, um Ergebnisse aus dem sich überlappenden Zeitraum zu deduplizieren. Weitere Informationen finden Sie unter Doppelte Ausgabe verarbeiten.
Beschränkungen
Für die Verwendung automatisierter oder manueller paralleler Pipeline-Updates gelten die folgenden Einschränkungen:
- Nur automatisierte Updates: Der neue parallele Job muss ein Streaming Engine-Job sein.
- Gleichzeitige Jobs mit demselben Namen sind nicht zulässig. Wenn Sie jedoch ein automatisches Stop-and-Replace oder ein paralleles Pipeline-Update mit demselben Jobnamen durchführen, können Sie den Jobnamen wiederverwenden. In diesem Fall muss der neue Job mindestens zwei Minuten nach dem Start des vorherigen Jobs beginnen. Diese Einschränkung verhindert mehrere parallele Aktualisierungen durch wiederholte Versuche der Clientbibliothek oder veraltete Remote Procedure Calls.
- Wenn Sie zwei Pipelines parallel für dieselbe Eingabe ausführen, kann es zu doppelten Daten, Teilaggregationen und potenziellen Problemen mit der Reihenfolge kommen, wenn Daten in die Senke eingefügt werden. Das Downstream-System muss so konzipiert sein, dass diese Ergebnisse berücksichtigt und verwaltet werden.
- Beim Lesen aus einer Pub/Sub-Quelle wird die Verwendung desselben Abos für mehrere Pipelines nicht empfohlen und kann zu Richtigkeitsproblemen führen. In einigen Anwendungsfällen wie ETL-Pipelines (Extrahieren, Transformieren, Laden) kann die Verwendung desselben Abos in zwei Pipelines jedoch die Duplizierung reduzieren. Probleme mit dem Autoscaling sind wahrscheinlich, wenn Sie einen Wert ungleich null für die Überschneidungsdauer angeben. Dies kann mithilfe der Funktion zur Aktualisierung von laufenden Jobs behoben werden. Weitere Informationen finden Sie unter Autoscaling für Pub/Sub-Streamingpipelines optimieren.
- Bei Apache Kafka können Sie Duplikate minimieren, indem Sie das Festschreiben von Offsets in Kafka aktivieren. Informationen zum Aktivieren des Festschreibens von Offsets in Kafka finden Sie unter Zurückschreiben in Kafka.
Automatisierte parallele Pipeline-Updates
Dataflow bietet API-Unterstützung für das Starten eines parallelen Ersatzjobs. Diese deklarative API abstrahiert die manuelle Ausführung von prozeduralen Schritten. Sie deklarieren den Job, den Sie aktualisieren möchten, und ein neuer Job wird parallel zum alten Job ausgeführt. Nachdem der neue Job für die von Ihnen angegebene Dauer ausgeführt wurde, wird der alte Job beendet. Diese Funktion verhindert Verarbeitungspausen während der Aktualisierung. Außerdem wird der operative Aufwand für die Aktualisierung inkompatibler Pipelines reduziert.
Diese Aktualisierungsmethode eignet sich am besten für Pipelines, bei denen einige Duplikate oder Teilaggregationen toleriert werden können und bei denen beim Einfügen von Daten keine strenge Reihenfolge erforderlich ist. Sie eignet sich gut für ETL-Pipelines sowie für Pipelines, die den Streamingmodus „Mindestens einmal“ und die Redistribute-Transformation verwenden, wobei „allow duplicates“ auf true festgelegt ist.
Automatisierte parallele Pipeline-Dienstoptionen
Verwenden Sie die folgenden Dienstoptionen für automatisierte parallele Pipeline-Updates:
| Service option | Optional oder erforderlich | Beschreibung | Abhängigkeiten oder Ausschlüsse |
|---|---|---|---|
update_strategy_parallel_job_update |
Erforderlich (Option 1: Aktualisierung mit demselben Jobnamen) | Befehl zum Ausführen eines parallelen Updates, bei dem beide Pipelines gleichzeitig ausgeführt werden, um die Ausfallzeit zu minimieren, wenn unter demselben Jobnamen aktualisiert wird. | Muss zusammen mit dem Flag --update und parallel_replace_job_min_parallel_pipelines_duration festgelegt werden.
|
update_strategy_in_place_update |
Optional | Alternative zur parallelen Aktualisierung. Führt eine standardmäßige In-Place-Jobaktualisierung durch. | Muss zusammen mit dem Flag --update festgelegt werden.
Schließt sich mit Wenn diese Option festgelegt ist, werden andere Optionen für parallele Jobs ignoriert. |
parallel_replace_job_min_parallel_pipelines_duration |
Erforderlich | Gibt die Mindestdauer an, für die die beiden Pipelines gleichzeitig ausgeführt werden.
Nach Ablauf dieses Zeitraums wird ein Drain-Signal an den alten Job gesendet.
Zulässige Werte reichen von 0s (empfohlen für keine Überschneidung) bis 744h (31 Tage).
|
Muss mit einer Methode zum Ausrichten auf den alten Job kombiniert werden. Einer der folgenden Werte:
|
parallel_replace_job_name oder
parallel_replace_job_id (eine Option auswählen) |
Erforderlich (Option 2: Mit einem anderen Jobnamen aktualisieren) | Gibt den alten Job entweder nach Name oder ID an, der bei einer Aktualisierung mit anderem Namen ersetzt werden soll. | Erfordert, dass parallel_replace_job_min_parallel_pipelines_duration festgelegt ist.
Verwenden Sie mit dieser Option nicht das Flag |
parallel_replace_job_max_stop_duration |
Optional | Die maximale Dauer, die der alte Job noch laufen darf, bevor die automatische Kündigung ausgelöst wird. Beispiel: 30m oder 1h. |
Erfordert die Einrichtung eines Workflows für parallele Updates (Option 1 oder Option 2). |
parallel_replace_job_cancel_on_drain_timeout |
Optional Der Standardwert ist |
Boolesche Option, die angibt, ob der alte Job abgebrochen werden soll, wenn seine Drain-Dauer parallel_replace_job_max_stop_duration überschreitet. |
Wird in Verbindung mit parallel_replace_job_max_stop_duration verwendet.
Legen Sie |
Automatisierte parallele Pipeline-Aktualisierungsanfrage senden
Wenn Sie den automatisierten Workflow verwenden möchten, starten Sie einen neuen Streamingjob. Sie können einen Job mit demselben oder einem anderen Jobnamen aktualisieren.
Java
Option 1: Aktualisierung mit demselben Jobnamen
--update \
--dataflowServiceOptions="update_strategy_parallel_job_update" \
--dataflowServiceOptions="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
- Wenn Sie ein paralleles Update mit demselben Namen durchführen möchten, verwenden Sie das Flag
--updateund die Optionupdate_strategy_parallel_job_update. - Wenn Sie stattdessen ein direktes Update durchführen möchten, ohne Optionen für parallele Jobs zu entfernen, verwenden Sie
update_strategy_in_place_updateanstelle vonupdate_strategy_parallel_job_update.
Option 2: Mit einem anderen Jobnamen aktualisieren
--dataflowServiceOptions="parallel_replace_job_name=OLD_JOB_NAME" \
--dataflowServiceOptions="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
- Wenn Sie den alten Job anhand der ID anstelle des Jobnamens angeben möchten, verwenden Sie
--dataflowServiceOptions="parallel_replace_job_id=OLD_JOB_ID". - Wenn Sie einen neuen Jobnamen angeben und das Flag
--updateverwenden, sucht Dataflow nach einem vorhandenen Job mit dem neuen Namen. Dies führt zu einem Fehler.
Optional: Zeitlimit für das Entleeren und automatische Kündigung konfigurieren
Sie können die folgenden Optionen an eine der beiden Konfigurationen anhängen, um ein Drain-Timeout festzulegen und den alten Job automatisch abzubrechen, wenn er hängen bleibt.
--dataflowServiceOptions="parallel_replace_job_max_stop_duration=DRAIN_TIMEOUT_DURATION" \
--dataflowServiceOptions="parallel_replace_job_cancel_on_drain_timeout=true"
Wenn Sie die automatische Kündigung deaktivieren und der alte Job im Status „Wird beendet“ hängen bleibt, werden sowohl der alte als auch der neue Job parallel ausgeführt.
Python
Option 1: Aktualisierung mit demselben Jobnamen
--update \
--dataflow_service_options="update_strategy_parallel_job_update" \
--dataflow_service_options="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
- Wenn Sie ein paralleles Update mit demselben Namen durchführen möchten, verwenden Sie das Flag
--updateund die Optionupdate_strategy_parallel_job_update. - Wenn Sie stattdessen ein direktes Update durchführen möchten, ohne Optionen für parallele Jobs zu entfernen, verwenden Sie
update_strategy_in_place_updateanstelle vonupdate_strategy_parallel_job_update.
Option 2: Mit einem anderen Jobnamen aktualisieren
--dataflow_service_options="parallel_replace_job_name=OLD_JOB_NAME" \
--dataflow_service_options="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
- Wenn Sie den alten Job anhand der ID anstelle des Jobnamens angeben möchten, verwenden Sie
--dataflow_service_options="parallel_replace_job_id=OLD_JOB_ID". - Wenn Sie einen neuen Jobnamen angeben und das Flag
--updateverwenden, sucht Dataflow nach einem vorhandenen Job mit dem neuen Namen. Dies führt zu einem Fehler.
Optional: Zeitlimit für das Entleeren und automatische Kündigung konfigurieren
Sie können die folgenden Optionen an eine der beiden Konfigurationen anhängen, um ein Drain-Timeout festzulegen und den alten Job automatisch abzubrechen, wenn er hängen bleibt.
--dataflow_service_options="parallel_replace_job_max_stop_duration=DRAIN_TIMEOUT_DURATION" \
--dataflow_service_options="parallel_replace_job_cancel_on_drain_timeout=true"
Wenn Sie die automatische Kündigung deaktivieren und der alte Job im Status „Wird beendet“ hängen bleibt, werden sowohl der alte als auch der neue Job parallel ausgeführt.
Go
Option 1: Aktualisierung mit demselben Jobnamen
--update \
--dataflow_service_options="update_strategy_parallel_job_update" \
--dataflow_service_options="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
- Wenn Sie ein paralleles Update mit demselben Namen durchführen möchten, verwenden Sie das Flag
--updateund die Optionupdate_strategy_parallel_job_update. - Wenn Sie stattdessen ein direktes Update durchführen möchten, ohne Optionen für parallele Jobs zu entfernen, verwenden Sie
update_strategy_in_place_updateanstelle vonupdate_strategy_parallel_job_update.
Option 2: Mit einem anderen Jobnamen aktualisieren
--dataflow_service_options="parallel_replace_job_name=OLD_JOB_NAME" \
--dataflow_service_options="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
- Wenn Sie den alten Job anhand der ID anstelle des Jobnamens angeben möchten, verwenden Sie
--dataflow_service_options="parallel_replace_job_id=OLD_JOB_ID". - Wenn Sie einen neuen Jobnamen angeben und das Flag
--updateverwenden, sucht Dataflow nach einem vorhandenen Job mit dem neuen Namen. Dies führt zu einem Fehler.
Optional: Zeitlimit für das Entleeren und automatische Kündigung konfigurieren
Sie können die folgenden Optionen an eine der beiden Konfigurationen anhängen, um ein Drain-Timeout festzulegen und den alten Job automatisch abzubrechen, wenn er hängen bleibt.
--dataflow_service_options="parallel_replace_job_max_stop_duration=DRAIN_TIMEOUT_DURATION" \
--dataflow_service_options="parallel_replace_job_cancel_on_drain_timeout=true"
Wenn Sie die automatische Kündigung deaktivieren und der alte Job im Status „Wird beendet“ hängen bleibt, werden sowohl der alte als auch der neue Job parallel ausgeführt.
gcloud
Option 1: Aktualisierung mit demselben Jobnamen
--update \
--additional-experiments="update_strategy_parallel_job_update" \
--additional-experiments="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
- Wenn Sie ein paralleles Update mit demselben Namen durchführen möchten, verwenden Sie das Flag
--updateund die Optionupdate_strategy_parallel_job_update. - Wenn Sie stattdessen ein direktes Update durchführen möchten, ohne Optionen für parallele Jobs zu entfernen, verwenden Sie
update_strategy_in_place_updateanstelle vonupdate_strategy_parallel_job_update.
Option 2: Mit einem anderen Jobnamen aktualisieren
--additional-experiments="parallel_replace_job_name=OLD_JOB_NAME" \
--additional-experiments="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
- Wenn Sie den alten Job anhand der ID anstelle des Jobnamens angeben möchten, verwenden Sie
--additional-experiments="parallel_replace_job_id=OLD_JOB_ID". - Wenn Sie einen neuen Jobnamen angeben und das Flag
--updateverwenden, sucht Dataflow nach einem vorhandenen Job mit dem neuen Namen, was zu einem Fehler führt.
Optional: Zeitlimit für das Entleeren und automatische Kündigung konfigurieren
Sie können die folgenden Optionen an eine der beiden Konfigurationen anhängen, um ein Drain-Timeout festzulegen und den alten Job automatisch abzubrechen, wenn er hängen bleibt.
--additional-experiments="parallel_replace_job_max_stop_duration=DRAIN_TIMEOUT_DURATION" \
--additional-experiments="parallel_replace_job_cancel_on_drain_timeout=true"
Wenn Sie die automatische Kündigung deaktivieren und der alte Job im Status „Wird beendet“ hängen bleibt, werden sowohl der alte als auch der neue Job parallel ausgeführt.
Optional: Upsert (create or update job)
So aktivieren Sie das Upsert-Verhalten (Job erstellen oder aktualisieren):
--additional-experiments="create_or_update_job"
Terraform
additional_experiments = [
"parallel_replace_job_min_parallel_pipelines_duration=DURATION",
"parallel_replace_job_max_stop_duration=DRAIN_TIMEOUT_DURATION",
"update_strategy_parallel_job_update",
"create_or_update_job"
]
Config Connector
metadata:
annotations:
# Force to use only direct controller. Multiple controllers can cause a Dataflow job to enter into a continuous update loop.
# https://docs.cloud.google.com/config-connector/docs/concepts/controller-types#underlying-controller-types
alpha.cnrm.cloud.google.com/reconciler: direct
# Optional but recommended: Dataflow batch jobs do not support the drain operation. But for Dataflow streaming jobs, prefer "drain" over "cancel" as an on-delete option.
cnrm.cloud.google.com/on-delete: drain
# Optional but recommended: Set deletion-policy to "abandon" to avoid accidental deletion, this will ignore the on-delete option.
cnrm.cloud.google.com/deletion-policy: abandon
spec:
...
additionalExperiments:
- "parallel_replace_job_min_parallel_pipelines_duration=DURATION"
- "parallel_replace_job_max_stop_duration=DRAIN_TIMEOUT_DURATION"
- "update_strategy_parallel_job_update"
- "create_or_update_job"
Ersetzen Sie die folgenden Variablen:
- Wenn Sie mit einem anderen Jobnamen aktualisieren (Option 2), müssen Sie entweder
parallel_replace_job_nameoderparallel_replace_job_idangeben, um den zu ersetzenden Job zu identifizieren. Das Aktualisieren mit einem anderen Jobnamen wird für Terraform oder Config Connector nicht unterstützt.OLD_JOB_NAME: Der Name des Jobs, der ersetzt werden soll.OLD_JOB_ID: Die ID des zu ersetzenden Jobs.
DURATION: Die Mindestzeit, die die beiden Pipelines parallel ausgeführt werden, als Ganzzahl oder Gleitkommazahl. Für eine Überschneidung von null wird eine Dauer von0sempfohlen. Nach Ablauf dieser Dauer wird dem alten Job ein Drain-Signal gesendet.Die Dauer muss zwischen 0 Sekunden (
0s) und 31 Tagen (744h) liegen. Verwenden Sies,mundh, um Sekunden, Minuten und Stunden anzugeben. Beispiel:10mist 10 Minuten.DRAIN_TIMEOUT_DURATION: Optional. Die maximale Zeit, die der alte Job benötigt, bis er abgeschlossen ist, bevor die automatische Kündigung ausgelöst wird. Die Dauer muss als String formatiert werden, der mits,moderhendet (z. B.30m,1h).parallel_replace_job_cancel_on_drain_timeout: Optional. Gibt an, ob der vorherige Job abgebrochen werden soll, wenn er nicht vor der maximalen Stoppdauer beendet wird. Der Standardwert isttrue, wenn eine Zeitüberschreitung für das Beenden angegeben wird. Wenn Sie die automatische Kündigung deaktivieren möchten, legen Sie diese Option auffalsefest. Wenn Sie diese Option auffalsefestlegen und der alte Job im Status „Wird beendet“ hängen bleibt, werden sowohl der alte als auch der neue Job parallel ausgeführt.
Wenn Sie den neuen Job starten, wartet Dataflow, bis alle Worker bereitgestellt sind, bevor mit der Verarbeitung von Daten begonnen wird. Prüfen Sie die Joblogs von Dataflow, um den Status der Bereitstellung zu überwachen.
Parallele Pipelines manuell ausführen
Bei komplexeren Szenarien oder wenn Sie mehr Kontrolle über den Aktualisierungsprozess benötigen, können Sie parallele Pipelines manuell ausführen. Führen Sie die vorhandene Pipeline so lange aus, bis das Wasserzeichen den Zeitstempel des frühesten vollständigen Fensters überschreitet, das von der aktualisierten Pipeline verarbeitet wird. Leeren Sie die vorhandene Pipeline dann oder brechen Sie sie ab.
Umgang mit doppelter Ausgabe
Im folgenden Beispiel wird ein Ansatz zum Verarbeiten doppelter Ausgaben beschrieben. Die beiden Pipelines schreiben Ausgaben an verschiedene Ziele, verwenden Downstream-Systeme zum Abfragen von Ergebnissen und deduplizieren Ergebnisse aus dem überschneidenden Zeitraum. In diesem Beispiel wird eine Pipeline verwendet, die Eingabedaten aus Pub/Sub liest, einige Verarbeitungsschritte ausführt und die Ergebnisse in BigQuery schreibt.
Im Ausgangszustand wird die vorhandene Streamingpipeline (Pipeline A) ausgeführt und liest Nachrichten aus einem Pub/Sub-Thema (Thema) mit einem Abo (Abo A). Die Ergebnisse werden in eine BigQuery-Tabelle geschrieben (Tabelle A). Die Ergebnisse werden über eine BigQuery-Ansicht verarbeitet, die als Fassade dient, um die zugrundeliegenden Tabellenänderungen zu verdecken. Dieser Prozess ist eine Anwendung einer Gestaltungsmethode, die als Fassadenmuster bezeichnet wird. Das folgende Diagramm zeigt den Ausgangszustand.
Sie erstellen ein neues Abo (Abo B) für die aktualisierte Pipeline. Sie stellen die aktualisierte Pipeline (Pipeline B) bereit, die mit Abo B aus dem Pub/Sub-Thema (Thema) liest und in eine separate BigQuery-Tabelle (Tabelle B) schreibt. Das folgende Diagramm veranschaulicht diesen Ablauf.
An diesem Punkt werden Pipeline A und Pipeline B parallel ausgeführt und die Ergebnisse in separate Tabellen geschrieben. Sie erfassen die Zeit t als Zeitstempel des frühesten vollständigen Fensters, das von Pipeline B verarbeitet wird.
Wenn das Wasserzeichen von Pipeline A Zeitt überschreitet, beenden Sie Pipeline A per Drain“ Wenn Sie die Pipeline so beenden, werden alle geöffneten Fenster geschlossen und die Verarbeitung für In-Flight-Daten wird abgeschlossen. Wenn die Pipeline Fenster und vollständige Fenster enthält (vorausgesetzt, dass keine späten Daten vorhanden sind), lassen Sie vor dem Leeren von Pipeline A beide Pipelines laufen, bis Sie alle überlappenden Fenster haben. Sie beenden den Streamingjob für Pipeline A, nachdem alle In-Flight-Daten verarbeitet und in Tabelle A geschrieben wurden. Das folgende Diagramm zeigt diese Phase.
Zu diesem Zeitpunkt wird nur Pipeline B ausgeführt. Sie können auch eine BigQuery-Ansicht (Fassadenansicht) abfragen, die als Fassade für Tabelle A und Tabelle B fungiert. Konfigurieren Sie für Zeilen mit demselben Zeitstempel in beiden Tabellen die Ansicht so, dass die Zeilen zurückgegeben werden aus Tabelle B oder, wenn die Zeilen nicht vorhanden sind in Tabelle B , ein Fallback auf Tabelle A passiert“ Das folgende Diagramm zeigt die Ansicht (Fassadenansicht), die sowohl aus Tabelle A als auch aus Tabelle B liest.
An diesem Punkt können Sie Abo A löschen.
Wenn bei einer neuen Pipeline-Bereitstellung Probleme festgestellt werden, kann das Vorhandensein paralleler Pipelines das Rollback vereinfachen. In diesem Beispiel möchten Sie möglicherweise, dass Pipeline A ausgeführt wird, während Sie Pipeline B auf den korrekten Betrieb hin beobachten. Wenn Probleme mit Pipeline B auftreten, können Sie ein Rollback auf Pipeline A vornehmen.
Schemamutationen verarbeiten
Datenverarbeitungssysteme müssen im Laufe der Zeit häufig Schemamutationen berücksichtigen, manchmal aufgrund von geänderten Geschäftsanforderungen oder aus technischen Gründen. Die Anwendung von Schemaaktualisierungen erfordert in der Regel eine sorgfältige Planung und Ausführung, um Unterbrechungen der Geschäftsinformationssysteme zu vermeiden.
Stellen Sie sich eine einfache Pipeline vor, die Nachrichten mit JSON-Nutzlasten aus einem Pub/Sub-Thema liest. Die Pipeline konvertiert jede Nachricht in eine TableRow-Instanz und schreibt die Zeilen dann in eine BigQuery-Tabelle. Das Schema der Ausgabetabelle ähnelt den Nachrichten, die von der Pipeline verarbeitet werden.
Im folgenden Diagramm wird das Schema als Schema A bezeichnet.
Im Laufe der Zeit kann sich das Nachrichtenschema auf anspruchsvolle Weise ändern. Beispielsweise werden Felder hinzugefügt, entfernt oder ersetzt. Schema A entwickelt sich zu einem neuen Schema. In der folgenden Diskussion wird das neue Schema als Schema B bezeichnet. In diesem Fall muss Pipeline A aktualisiert werden und das Ausgabetabellenschema muss Schema B unterstützen.
Für die Ausgabetabelle können Sie einige Schemamutationen ohne Ausfallzeit ausführen.
Beispielsweise können Sie neue Felder hinzufügen oder Spaltenmodi lockern, indem Sie z. B. ohne AusfallzeitREQUIRED in NULLABLE ändern.
Diese Mutationen haben in der Regel keine Auswirkungen auf vorhandene Abfragen. Schemamutationen, die vorhandene Schemafelder ändern oder entfernen, unterbrechen jedoch Abfragen oder führen zu anderen Störungen. Bei diesem Ansatz sind Änderungen ohne Ausfallzeiten möglich.
Trennen Sie die Daten, die von der Pipeline in eine Hauptkontotabelle und in eine oder mehrere Staging-Tabellen geschrieben werden. In der Hauptkontotabelle werden Verlaufsdaten gespeichert, die von der Pipeline geschrieben wurden. In Staging-Tabellen wird die neueste Pipelineausgabe gespeichert. Sie können eine BigQuery-Fassadenansicht über die Hauptkonto- und Staging-Tabellen definieren, mit denen Nutzer sowohl Verlaufsdaten als auch aktuelle Daten abfragen können.
Das folgende Diagramm überarbeitet den vorherigen Pipeline-Ablauf und fügt eine Staging-Tabelle (Staging-Tabelle A), eine Hauptkontotabelle und eine Fassadenansicht hinzu.
Im überarbeiteten Ablauf verarbeitet Pipeline A Nachrichten, die Schema A verwenden, und schreibt die Ausgabe in die Staging-Tabelle A, die über ein kompatibles Schema verfügt. Die Hauptkontotabelle enthält Verlaufsdaten, die von früheren Versionen der Pipeline geschrieben wurden, sowie Ergebnisse, die regelmäßig aus der Staging-Tabelle zusammengeführt werden. Nutzer können aktuelle Daten, darunter Verlaufsdaten und Echtzeitdaten, über die Fassadenansicht abfragen.
Wenn das Nachrichtenschema von Schema A zu Schema B mutiert, können Sie den Pipelinecode so aktualisieren, dass er mit Nachrichten kompatibel ist, die Schema B verwenden. Die vorhandene Pipeline muss mit der neuen Implementierung aktualisiert werden. Durch parallele Pipelines können Sie dafür sorgen, dass die Verarbeitung von Streamingdaten ohne Unterbrechung fortgesetzt wird. Das Beenden und Ersetzen von Pipelines führt zu einer Unterbrechung der Verarbeitung, da für einen bestimmten Zeitraum keine Pipeline ausgeführt wird.
Die aktualisierte Pipeline schreibt in eine zusätzliche Staging-Tabelle (Staging-Tabelle B), die Schema B verwendet. Sie können einen orchestrierten Workflow verwenden, um die neue Staging-Tabelle zu erstellen, bevor Sie die Pipeline aktualisieren. Aktualisieren Sie auch die Fassadenansicht, um Ergebnisse aus der neuen Staging-Tabelle aufzunehmen. Dazu können Sie einen zugehörigen Workflowschritt verwenden.
Das folgende Diagramm stellt den aktualisierten Ablauf dar, der Staging-Tabelle B mit Schema B zeigt und wie die Fassadenansicht aktualisiert wurde, um Inhalte aus der Mastertabelle sowie aus beiden Staging-Tabellen aufzunehmen.
Als ein von der Pipelineaktualisierung separater Prozess können Sie die Staging-Tabellen entweder periodisch oder nach Bedarf in der Hauptkontotabelle zusammenführen. Das folgende Diagramm zeigt, wie die Staging-Tabelle A mit der Hauptkontotabelle zusammengeführt wird.
Nächste Schritte
- Eine ausführliche Anleitung zum Aktualisieren einer vorhandenen Pipeline finden Sie hier.