Mit verwaltetem E/A kann Dataflow bestimmte E/A-Connectors verwalten, die in Apache Beam-Pipelines verwendet werden. Verwalteter E/A vereinfacht die Verwaltung von Pipelines, die in unterstützte Quellen und Senken eingebunden sind.
Verwalteter E/A besteht aus zwei Komponenten, die zusammenarbeiten:
Eine Apache Beam-Transformation, die eine gemeinsame API zum Erstellen von E/A-Connectors (Quellen und Senken) bereitstellt.
Ein Dataflow-Dienst, der diese E/A-Connectors in Ihrem Namen verwaltet, einschließlich der Möglichkeit, sie unabhängig von der Apache Beam-Version zu aktualisieren.
Zu den Vorteilen von verwaltetem E/A gehören:
Automatische Upgrades : Dataflow aktualisiert die verwalteten E/A-Connectors in Ihrer Pipeline automatisch. Das bedeutet, dass Ihre Pipeline Sicherheitsupdates, Leistungsverbesserungen und Fehlerkorrekturen für diese Connectors erhält, ohne dass Codeänderungen erforderlich sind. Weitere Informationen finden Sie unter Automatische Upgrades.
Konsistente API : Bisher hatten E/A-Connectors in Apache Beam unterschiedliche APIs und jeder Connector wurde anders konfiguriert. Verwalteter E/A bietet eine einzige Konfigurations-API, die Schlüssel/Wert-Attribute verwendet, was zu einem einfacheren und konsistenteren Pipeline-Code führt. Weitere Informationen finden Sie unter Konfigurations-API.
Voraussetzungen
Die folgenden SDKs unterstützen verwalteten E/A:
- Apache Beam SDK für Java, Version 2.58.0 oder höher.
- Apache Beam SDK für Python, Version 2.61.0 oder höher.
Für den Backend-Dienst ist Dataflow Runner v2 erforderlich. Wenn Runner v2 nicht aktiviert ist, wird Ihre Pipeline trotzdem ausgeführt, aber sie profitiert nicht von den Vorteilen des verwalteten E/A-Dienstes.
Automatische Upgrades
Dataflow-Pipelines mit verwalteten E/A-Connectors verwenden automatisch die neueste zuverlässige Version des Connectors. Automatische Upgrades erfolgen an den folgenden Punkten im Joblebenszyklus:
Jobeinreichung : Wenn Sie einen Batch- oder Streamingjob einreichen, verwendet Dataflow die neueste Version des verwalteten E/A-Connectors, die getestet wurde und gut funktioniert.
Rolling Upgrades : Bei Streamingjobs aktualisiert Dataflow Ihre verwalteten E/A-Connectors in laufenden Pipelines, sobald neue Versionen verfügbar sind. Sie müssen den Connector oder die Apache Beam-Version Ihrer Pipeline nicht manuell aktualisieren.
Standardmäßig erfolgen Rolling Upgrades innerhalb eines Zeitraums von 30 Tagen. Das heißt, Upgrades werden ungefähr alle 30 Tage durchgeführt. Sie können den Zeitraum anpassen oder Rolling Upgrades für einzelne Jobs deaktivieren. Weitere Informationen finden Sie unter Zeitraum für Rolling Upgrades festlegen.
Eine Woche vor dem Upgrade schreibt Dataflow eine Benachrichtigung in die Jobprotokolle.
Ersatzjobs : Bei Streamingjobs sucht Dataflow nach Updates, wenn Sie einen Ersatzjob starten, und verwendet automatisch die neueste bekannte Version. Dataflow führt diese Prüfung auch dann durch, wenn Sie keinen Code im Ersatzjob ändern.
Das folgende Diagramm zeigt den Upgrade-Prozess. Der Nutzer erstellt eine Apache Beam-Pipeline mit SDK-Version X. Dataflow aktualisiert die verwaltete E/A-Version auf die neueste unterstützte Version. Das Upgrade erfolgt, wenn der Nutzer den Job einreicht, nach dem Zeitraum für Rolling Upgrades oder wenn der Nutzer einen Ersatzjob einreicht.

Der Upgrade-Prozess verlängert die Startzeit für den ersten Job (pro Projekt), der verwalteten E/A verwendet, um etwa zwei Minuten. Bei nachfolgenden Jobs kann es etwa eine halbe Minute dauern. Bei Rolling Upgrades startet der Dataflow-Dienst
einen
Ersatzjob. Dies kann zu vorübergehenden Ausfallzeiten für Ihre Pipeline führen, da der vorhandene Worker-Pool heruntergefahren und ein neuer Worker-Pool gestartet wird. Wenn Sie den Status von verwalteten E/A
Vorgängen prüfen möchten, suchen Sie nach
Logeinträgen, die den String
"Managed Transform(s)" enthalten.
Zeitraum für Rolling Upgrades festlegen
Wenn Sie den Zeitraum für das Upgrade eines Streaming-Dataflow-Jobs angeben möchten, legen Sie
die managed_transforms_rolling_upgrade_window
Dienstoption auf die Anzahl
der Tage fest. Der Wert muss zwischen 10 und 90 Tagen liegen (einschließlich).
Java
--dataflowServiceOptions=managed_transforms_rolling_upgrade_window=DAYS
Python
--dataflow_service_options=managed_transforms_rolling_upgrade_window=DAYS
gcloud
Verwenden Sie den
gcloud dataflow jobs run Befehl
mit der additional-experiments Option. Wenn Sie eine flexible Vorlage verwenden, die
verwalteten E/A verwendet, verwenden Sie den
gcloud dataflow flex-template run
Befehl.
--additional-experiments=managed_transforms_rolling_upgrade_window=DAYS
Wenn Sie Rolling Upgrades deaktivieren möchten, legen Sie die Dienstoption managed_transforms_rolling_upgrade_window auf never fest. Sie können trotzdem ein Update auslösen, indem Sie einen Ersatzjob starten.
Java
--dataflowServiceOptions=managed_transforms_rolling_upgrade_window=never
Python
--dataflow_service_options=managed_transforms_rolling_upgrade_window=never
Go
--dataflow_service_options=managed_transforms_rolling_upgrade_window=never
gcloud
Verwenden Sie den
gcloud dataflow jobs run Befehl
mit der additional-experiments Option. Wenn Sie flexible Vorlagen verwenden, verwenden Sie
den
gcloud dataflow flex-template run
Befehl.
--additional-experiments=managed_transforms_rolling_upgrade_window=never
Konfigurations-API
Verwalteter E/A ist eine sofort einsatzbereite Apache Beam-Transformation, die eine konsistente API zum Konfigurieren von Quellen und Senken bietet.
Java
Verwenden Sie die
Managed Klasse, um eine von verwaltetem E/A unterstützte Quelle oder Senke zu erstellen. Geben Sie an, welche Quelle oder Senke instanziiert werden soll, und übergeben Sie eine Reihe von Konfigurationsparametern, ähnlich wie im folgenden Beispiel:
Map config = ImmutableMap.<String, Object>builder()
.put("config1", "abc")
.put("config2", 1);
pipeline.apply(Managed.read(/*Which source to read*/).withConfig(config))
.getSinglePCollection();
Sie können Konfigurationsparameter auch als YAML-Datei übergeben. Ein vollständiges Code beispiel finden Sie unter Aus Apache Iceberg lesen.
Python
Importieren Sie das apache_beam.transforms.managed Modul
und rufen Sie die managed.Read oder managed.Write Methode auf. Geben Sie an, welche Quelle oder Senke instanziiert werden soll, und übergeben Sie eine Reihe von Konfigurationsparametern, ähnlich wie im folgenden Beispiel:
pipeline
| beam.managed.Read(
beam.managed.SOURCE, # Example: beam.managed.KAFKA
config={
"config1": "abc",
"config2": 1
}
)
Sie können Konfigurationsparameter auch als YAML-Datei übergeben. Ein vollständiges Code beispiel finden Sie unter Aus Apache Kafka lesen.
Dynamische Ziele
Bei einigen Senken kann der verwaltete E/A-Connector dynamisch ein Ziel basierend auf Feldwerten in den eingehenden Datensätzen auswählen.
Wenn Sie dynamische Ziele verwenden möchten, geben Sie einen Vorlagenstring für das Ziel an. Der
Vorlagenstring kann Feldnamen in geschweiften Klammern enthalten, z. B.
"tables.{field1}". Zur Laufzeit ersetzt der Connector den Wert des Felds für jeden eingehenden Datensatz, um das Ziel für diesen Datensatz zu bestimmen.
Angenommen, Ihre Daten haben ein Feld mit dem Namen airport. Sie können das
Ziel auf "flights.{airport}" festlegen. Wenn airport=SFO ist, wird der Datensatz
in flights.SFO geschrieben. Verwenden Sie für verschachtelte Felder die Punktnotation. Beispiel: {top.middle.nested}.
Beispielcode zur Verwendung dynamischer Ziele finden Sie unter Mit dynamischen Zielen schreiben.
Filtern
Möglicherweise möchten Sie bestimmte Felder herausfiltern, bevor sie in die Zieltabelle geschrieben werden. Für Senken, die dynamische Ziele unterstützen, können Sie dazu den Parameter drop, keep oder only verwenden. Mit diesen Parametern können Sie Zielmetadaten in die Eingabedatensätze aufnehmen, ohne die Metadaten in das Ziel zu schreiben.
Sie können für eine bestimmte Senke höchstens einen dieser Parameter festlegen.
| Konfigurationsparameter | Datentyp | Beschreibung |
|---|---|---|
drop |
Liste mit Strings | Eine Liste von Feldnamen, die vor dem Schreiben in das Ziel gelöscht werden sollen. |
keep |
Liste mit Strings | Eine Liste von Feldnamen, die beim Schreiben in das Ziel beibehalten werden sollen. Andere Felder werden gelöscht. |
only |
String | Der Name genau eines Felds, das als Datensatz der obersten Ebene verwendet werden soll , wenn in das Ziel geschrieben wird. Alle anderen Felder werden gelöscht. Dieses Feld muss vom Typ „Zeile“ sein. |
Unterstützte Quellen und Senken
Verwalteter E/A unterstützt die folgenden Quellen und Senken.
Weitere Informationen finden Sie in der Apache Beam-Dokumentation unter Verwaltete E/A-Connectors.