Umgebungen zu Airflow 3 migrieren (nebeneinander)

Managed Airflow (Gen 3) | Managed Airflow (Gen 2) | Managed Airflow (Legacy Gen 1)

Auf dieser Seite wird erläutert, wie Sie DAGs, Daten und Konfigurationen aus Ihrer vorhandenen Managed Airflow-Umgebung (Gen 3) mit Airflow 2 in eine Managed Airflow-Umgebung (Gen 3) mit Airflow 3 übertragen.

Von An Methode Leitfaden
Managed Airflow (Gen 3), Airflow 2 Managed Airflow (Gen 3), Airflow 3 Manuelle Übertragung nebeneinander Dieser Leitfaden
Managed Airflow (2. Generation) Managed Airflow (Gen 3) Nebeneinander mit dem Migrationsskript Anleitung zur Skriptmigration
Managed Airflow (2. Generation) Managed Airflow (Gen 3) Nebeneinander mit Snapshots Anleitung zur Migration von Snapshots
Managed Airflow (Legacy Gen 1), Airflow 2 Managed Airflow (Gen 3) Nebeneinander mit Snapshots Anleitung zur Migration von Snapshots
Managed Airflow (Legacy Gen 1), Airflow 2 Managed Airflow (2. Generation) Nebeneinander mit Snapshots Anleitung zur Migration von Snapshots
Managed Airflow (Legacy Gen 1), Airflow 2 Managed Airflow (2. Generation) Manuelle Übertragung nebeneinander Anleitung zur manuellen Migration
Managed Airflow (Legacy Gen 1), Airflow 1 Managed Airflow (Gen 2), Airflow 2 Nebeneinander mit Snapshots Anleitung zur Migration von Snapshots
Managed Airflow (Legacy Gen 1), Airflow 1 Managed Airflow (Gen 2), Airflow 2 Manuelle Übertragung nebeneinander Anleitung zur manuellen Migration
Managed Airflow (Legacy Gen 1), Airflow 1 Managed Airflow (Legacy Gen 1), Airflow 2 Manuelle Übertragung nebeneinander Anleitung zur manuellen Migration

Änderungen in Airflow 3

Bevor Sie Managed Airflow-Umgebungen mit Airflow 3 verwenden, sollten Sie die Änderungen berücksichtigen, die Airflow 3 in Managed Airflow-Umgebungen (Gen 3) mit sich bringt.

Eine Übersicht über die Änderungen in der Community-Version von Airflow 3 finden Sie unter Apache Airflow 3 is Generally Available!.

DAG-Versionsverwaltung

  • In Airflow 3 wird ein DAG basierend auf der Version zu Beginn bis zum Abschluss ausgeführt, auch wenn während der DAG-Ausführung eine neue Version hochgeladen wurde.

    • Alle DAG-Ausführungen in der Airflow-Benutzeroberfläche sind jetzt der entsprechenden DAG-Version (der Version zum Zeitpunkt der Ausführung) zugeordnet. Dazu gehören die Aufgabenstruktur und der DAG-Code.

Verbesserungen bei Backfills

In Airflow 3 wurde die Verarbeitung von Backfills (Pipelines für Verlaufsdaten noch einmal ausführen) grundlegend überarbeitet. Backfills werden von einem manuellen Prozess zu einer vollständig beobachtbaren Funktion, die in die Airflow-Core-Engine integriert ist, migriert:

  • Backfills werden jetzt direkt im Airflow-Scheduler verwaltet und nicht mehr als separate, manuelle Prozesse behandelt. Das führt zu einer besseren Skalierbarkeit und präziseren Steuerung.
  • Sie können Backfill-Vorgänge jetzt nicht nur über die Airflow-Befehlszeile, sondern auch direkt über die Airflow-UI oder API-Aufrufe auslösen, beenden und überwachen.
  • Der Airflow-Planer bietet eine bessere Übersicht über den Status und den Zustand der bisherigen Backfill-Ausführungen.
  • Backfill-Verbesserungen wurden von der Machine-Learning-Community stark nachgefragt (zum erneuten Trainieren von Modellen mit alten Daten), gelten aber für alle ETL-/ELT-Workflows.

Verbesserte Sicherheit und Zuverlässigkeit

  • In Airflow 3 kommunizieren Aufgaben nur über das Task SDK mit dem zentralen API-Server. In Airflow 2 hatten Aufgaben direkten Datenbankzugriff. Der API-Server führt diese Verbindungen effizient zusammen. Ihre Datenbank ist vor Verbindungsspitzen geschützt, wodurch die gesamte Umgebung bei hoher Last stabiler ist.

  • Durch die Verwendung einer neuen Task Execution Interface unterstützt Airflow 3 eine bessere Isolation zwischen Tasks, wodurch verhindert wird, dass ein Task die Daten eines anderen Tasks beeinträchtigt oder darauf zugreift.

  • Die CLI von Airflow 3 verzichtet auf den direkten Datenbankzugriff. Die neue airflowctl-Befehlszeilenschnittstelle ist ein separates Paket, das speziell für den Remotezugriff über die API entwickelt wurde. Anstatt direkt auf die Datenbank zuzugreifen, interagiert sie über APIs mit Airflow, was sicherer ist.

Ereignisgesteuerte Planung und Daten-Assets

  • Datasets wurden zu Data-Assets weiterentwickelt. Mit Data Assets kann Airflow Daten, die von Systemen außerhalb von Airflow erstellt oder aktualisiert werden, besser nachverfolgen und darauf reagieren.

  • In Airflow 3 wurde ein neues Konzept namens Watchers eingeführt. Diese Komponenten überwachen Änderungen an einem Data Asset und ermöglichen es Airflow, Workflows auszulösen, sobald Daten eingehen. Anstatt im Minutentakt zu prüfen, ob eine Datei vorhanden ist, kann eine DAG jetzt sofort ausgelöst werden, sobald eine Nachricht in einer Message Queue eingeht.

  • In Airflow 3 wird eine neue Asset-zentrierte Syntax mit Python-Dekoratoren eingeführt, die den Code für Entwickler übersichtlicher und intuitiver macht.

Modernisierte Airflow-UI

  • Die Airflow-Benutzeroberfläche wurde von Grund auf neu geschrieben und verwendet React (Frontend) und FastAPI (Backend).
  • Die neue Airflow-UI führt ihre Vorgänge über eine standardisierte REST API und eine spezielle API für UI-Vorgänge aus.
  • Durch das Ersetzen der Flask-Implementierung durch FastAPI ist die Airflow-UI deutlich reaktionsfähiger.
  • Die Raster- und Diagrammansichten wurden zusammengeführt, um den Workflow zu optimieren. So lässt sich leichter zwischen allgemeinen DAG-Strukturen und bestimmten Aufgabenlogs wechseln.

Nicht abwärtskompatible Änderungen in Airflow 3

Airflow 3 führt einige wichtige Änderungen ein, von denen einige nicht funktionieren.

  • Vorhandene DAGs aus Airflow 2 funktionieren möglicherweise nicht mit Airflow 3. Sie müssen getestet und möglicherweise angepasst werden, indem Sie Importe, DAG-Parameter und andere Implementierungsdetails ändern.
  • Einige Airflow 2-Konfigurationsoptionen wurden in Airflow 3 umbenannt oder entfernt. Weitere Informationen zu Parametern finden Sie in der Airflow-Konfigurationsreferenz.

  • Kein direkter Zugriff auf die Airflow-Datenbank über den Aufgabencode:

    • Im Task-Code können Airflow-Datenbanksitzungen oder -Modelle nicht mehr direkt importiert und verwendet werden.
    • PostgresHook und PostgresOperator können nicht mit der airflow_db-Verbindung verwendet werden.
  • Einige benutzerdefinierte PyPI-Pakete sind möglicherweise nicht mit der neuen Version von Airflow und den zugehörigen Abhängigkeiten kompatibel.

  • REST API (/api/v1) wurde durch /api/v2 ersetzt.

  • SubDAGs werden durch TaskGroups, Assets und Data Aware Scheduling ersetzt.

  • SLAs wurden eingestellt und entfernt. Sie werden durch Fristbenachrichtigungen ersetzt.

  • Das Argument „subdir“ in CLI-Befehlen wurde entfernt.

  • Einige Airflow-Kontextvariablen wurden entfernt. Weitere Informationen finden Sie in der Airflow-Dokumentation unter Breaking Changes.

  • Der DAG-Parameter catchup_by_default ist jetzt standardmäßig False.

  • Die create_cron_data_intervals-Konfiguration ist jetzt standardmäßig False. Das bedeutet, dass standardmäßig CronTriggerTimetable anstelle von CronDataIntervalTimetable verwendet wird.

  • Liste der Änderungen für Airflow 3.0.0

  • Liste der Änderungen für Airflow 3.1.0

Unterschiede zwischen Umgebungen mit Airflow 3 und Airflow 2

Die wichtigsten Unterschiede zwischen Managed Airflow-Umgebungen mit Airflow 2 und Umgebungen mit Airflow 3 sind:

  • Arbeitslastkonfiguration in Airflow 3-Umgebungen:

    • Der Mindestspeicher für alle Airflow-Komponenten beträgt 2 GB.
    • Die Konfigurationen für Airflow-Trigger und -Worker in Umgebungsvoreinstellungen haben sich im Vergleich zu Airflow 2-Umgebungen geändert.
    • Die Standardanzahl von CPUs für Trigger ist 1.
    • Die Standardmenge an Arbeitsspeicher für Trigger beträgt 2 GB.
  • Die automatische Berechnung der Konfigurationsoption [celery]worker_concurrency wurde geändert, um der unterschiedlichen Arbeitsspeichernutzung durch Airflow 3-Komponenten Rechnung zu tragen.

  • In Airflow 3 ist es nicht möglich, direkt über den Aufgabencode auf die Airflow-Datenbank zuzugreifen.

  • In Airflow 3 wird das Befehlszeilentool airflowctl verwendet, um Airflow-Befehlszeilenbefehle auszuführen.

  • Vorinstallierte PyPI-Pakete sind in Airflow 3-Umgebungen anders. Eine Liste der vorinstallierten PyPI-Pakete finden Sie im Changelog für vorinstallierte Pakete.

Parallele Migration zu Airflow 3

Die Side-by-Side-Migration umfasst die folgenden Schritte:

  1. Kompatibilität mit Airflow 3 prüfen
  2. Erstellen Sie eine Airflow 3-Umgebung und übertragen Sie Konfigurationsüberschreibungen und Umgebungsvariablen.
  3. Installieren Sie PyPI-Pakete in der Airflow 3-Umgebung.
  4. Übertragen Sie Variablen, Verbindungen und Pools zu Airflow 3.
  5. Übertragen Sie andere Daten aus Ihrem Airflow 2.*-Umgebungs-Bucket.
  6. Nutzer und Rollen übertragen
  7. Prüfen Sie, ob Ihre DAGs für Airflow 3 bereit sind.
  8. Übertragen Sie DAGs in die Airflow 3-Umgebung.
  9. Überwachen Sie Ihre Airflow 3-Umgebung.

Schritt 1: Kompatibilität mit Airflow 3 prüfen

So prüfen Sie die Kompatibilität mit Airflow 3:

  • Prüfen Sie, ob Ihre Umgebung Airflow-Version 2.7 oder höher verwendet. Wir empfehlen, zuerst ein Upgrade auf die aktuelle Airflow 2-Version durchzuführen und erst dann zu Airflow 3 zu migrieren.
  • Prüfen Sie, ob die Umgebung fehlerfrei ist und seit einiger Zeit ohne Probleme ausgeführt wird.
  • Achten Sie darauf, dass Ihre DAGs und Ihre Airflow-Konfiguration keine Funktionen verwenden, die in Airflow 3 entfernt wurden.
  • Lesen Sie die Anleitung zum Ändern Ihrer DAGs für die Kompatibilität mit Airflow 3, um zu sehen, ob während der Migration Änderungen an Ihren DAGs erforderlich sind.
  • Prüfen Sie die Kompatibilität Ihrer Airflow-DAGs mit dem ruff-Tool, das in der Community-Version von Airflow enthalten ist. Eine Anleitung dazu finden Sie in der Airflow-Dokumentation unter Airflow-DAGs auf Kompatibilität prüfen.

Schritt 2: Airflow 3-Umgebung erstellen, Konfigurationskonfigurationen überschreiben und Umgebungsvariablen

In diesem Schritt erstellen Sie eine neue Managed Airflow-Umgebung (Gen 3) mit Airflow 3 und beginnen mit der Übertragung der Konfigurationsparameter aus Ihrer Airflow 2-Umgebung:

Folgen Sie der Anleitung zum Erstellen einer Managed Airflow-Umgebung (Gen 3) und gehen Sie so vor:

  1. Wenn Sie einen Airflow-Build auswählen, wählen Sie einen Build mit Airflow 3 aus.
  2. Kopieren Sie alle kompatiblen Überschreibungen von Airflow-Konfigurationsoptionen aus Ihrer Airflow 2-Umgebung.

  3. Kopieren Sie alle Umgebungsvariablen aus Ihrer Airflow 2-Umgebung.

  4. Fahren Sie mit dem Erstellen einer Umgebung mit Airflow 3 fort.

In der folgenden Tabelle sind einige Änderungen an den Airflow-Konfigurationsoptionen aufgeführt. Die Liste ist nicht vollständig. Weitere Informationen zu Änderungen an den Airflow-Konfigurationsoptionen finden Sie in der Airflow-Dokumentation unter Airflow Configuration Reference und Airflow Release Notes.

Airflow 2-Option Airflow 3-Option
[scheduler]min_file_process_interval [dag_processor]min_file_process_interval
[webserver]rbac_user_registration_role [api]rbac_user_registration_role
[core]dag_file_processor_timeout [dag_processor]dag_file_processor_timeout
[scheduler]dag_dir_list_interval [dag_processor]refresh_interval
[scheduler]max_threads [dag_processor]parsing_processes
[scheduler]parsing_processes [dag_processor]parsing_processes
[webserver]instance_name [api]instance_name
[scheduler]scheduler_zombie_task_threshold [scheduler]task_instance_heartbeat_timeout
[webserver]rbac Verworfen
[api]auth_backend=airflow.api.auth.backend.deny_all Verworfen
[api]auth_backends=airflow.api.auth.backend.deny_all Verworfen
[api]composer_auth_user_registration_role Verworfen

Schritt 3: PyPI-Pakete in der Airflow 3-Umgebung installieren

Nachdem die Airflow 3-Umgebung erstellt wurde, installieren Sie PyPI-Pakete darin:

  1. Kopieren Sie die PyPI-Paketanforderungen aus Ihrer Airflow 2-Umgebung.
  2. Starten Sie den Vorgang zum Aktualisieren der PyPI-Pakete und warten Sie, bis die Umgebung aktualisiert wurde.

Da Airflow 3-Umgebungen eine andere Reihe von vorinstallierten Paketen verwenden, können während des Aktualisierungsvorgangs PyPI-Paketkonflikte auftreten. Weitere Informationen zur Fehlerbehebung bei PyPI-Paketkonflikten finden Sie unter Konflikte mit vorinstallierten PyPI-Paketen.

Schritt 4: Variablen, Verbindungen und Pools aus Airflow 2 exportieren

Wenn Sie keine Variablen oder Verbindungen haben, überspringen Sie die entsprechenden Export- und Importbefehle.

Sie müssen Pools nur übertragen, wenn Sie andere benutzerdefinierte Pools als default_pool haben. Andernfalls können Sie Befehle überspringen, mit denen Pools exportiert und importiert werden.

  1. Exportieren Sie Variablen aus Ihrer Airflow 2-Umgebung:

    gcloud composer environments run AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        variables -- export /home/airflow/gcs/data/variables.json
    

    Ersetzen Sie Folgendes:

    • AIRFLOW_2_ENV: Der Name Ihrer Airflow 2-Umgebung.
    • AIRFLOW_2_LOCATION: die Region, in der sich die Airflow 2-Umgebung befindet.
  2. Verbindungen aus Ihrer Airflow 2-Umgebung exportieren:

    gcloud composer environments run AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        connections -- export /home/airflow/gcs/data/connections.json
    

    Ersetzen Sie Folgendes:

    • AIRFLOW_2_ENV: Der Name Ihrer Airflow 2-Umgebung.
    • AIRFLOW_2_LOCATION: die Region, in der sich die Airflow 2-Umgebung befindet.
  3. Exportieren Sie Pools aus der Airflow 2-Umgebung:

    gcloud composer environments run AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        pools -- export /home/airflow/gcs/data/pools.json
    

    Ersetzen Sie Folgendes:

    • AIRFLOW_2_ENV: Der Name Ihrer Airflow 2-Umgebung.
    • AIRFLOW_2_LOCATION: die Region, in der sich die Airflow 2-Umgebung befindet.
  4. Rufen Sie den Namen des Buckets Ihrer Airflow 2-Umgebung ab:

    gcloud composer environments describe AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        --format="value(storageConfig.bucket)"
    

    Ersetzen Sie Folgendes:

    • AIRFLOW_2_ENV: Der Name Ihrer Airflow 2-Umgebung.
    • AIRFLOW_2_LOCATION: die Region, in der sich die Airflow 2-Umgebung befindet.
  5. Laden Sie die Dateien variables.json, connections.json und pools.json aus dem Verzeichnis /data des Buckets Ihrer Airflow 2-Umgebung in ein lokales Verzeichnis herunter:

    gcloud storage cp gs://AIRFLOW_2_BUCKET/data/variables.json ./variables.json
    gcloud storage cp gs://AIRFLOW_2_BUCKET/data/connections.json ./connections.json
    gcloud storage cp gs://AIRFLOW_2_BUCKET/data/pools.json ./pools.json
    

    Ersetzen Sie Folgendes:

    • AIRFLOW_2_BUCKET: Name des Buckets Ihrer Airflow 2-Umgebung, der im vorherigen Schritt abgerufen wurde.

Schritt 5: Variablen, Verbindungen und Pools in Airflow 3 importieren

Wenn Sie keine Variablen oder Verbindungen haben, überspringen Sie die entsprechenden Export- und Importbefehle.

Sie müssen Pools nur übertragen, wenn Sie andere benutzerdefinierte Pools als default_pool haben. Andernfalls können Sie Befehle überspringen, mit denen Pools exportiert und importiert werden.

  1. Konfigurieren Sie airflowctl, um Airflow-Befehlszeilenbefehle für die Airflow 3-Umgebung auszuführen.

  2. Importieren Sie Variablen, Verbindungen und Pools in die Airflow 3-Umgebung mit airflowctl:

    airflowctl variables import ./variables.json
    airflowctl connections import ./connections.json
    airflowctl pools import ./pools.json
    
  3. Prüfen Sie, ob Variablen, Verbindungen und Pools in die Airflow 3-Umgebung importiert wurden:

    airflowctl variables list
    airflowctl connections list
    airflowctl pools list
    
  4. JSON-Dateien bereinigen:

    gcloud storage rm gs://AIRFLOW_2_BUCKET/data/variables.json
    gcloud storage rm gs://AIRFLOW_2_BUCKET/data/connections.json
    gcloud storage rm gs://AIRFLOW_2_BUCKET/data/pools.json
    rm ./variables.json
    rm ./connections.json
    rm ./pools.json
    

    Ersetzen Sie Folgendes:

    • AIRFLOW_2_BUCKET: Name des Buckets Ihrer Airflow 2-Umgebung.

Schritt 6: Andere Daten aus dem Bucket Ihrer Airflow 2-Umgebung übertragen

In diesem Schritt übertragen Sie die verbleibenden Daten aus dem Bucket Ihrer Airflow 2-Umgebung.

  1. Rufen Sie den Namen des Buckets Ihrer Airflow 3-Umgebung ab:

    gcloud composer environments describe AIRFLOW_3_ENV \
        --location AIRFLOW_3_LOCATION \
        --format="value(storageConfig.bucket)"
    

    Ersetzen Sie Folgendes:

    • AIRFLOW_3_ENV: Der Name Ihrer Airflow 3-Umgebung.
    • AIRFLOW_3_LOCATION: die Region, in der sich die Airflow 3-Umgebung befindet.
  2. Exportieren Sie Plug-ins aus dem Bucket Ihrer Airflow 2-Umgebung in das Verzeichnis /plugins im Bucket Ihrer Airflow 3-Umgebung:

    gcloud composer environments storage plugins export \
      --destination=AIRFLOW_3_BUCKET/plugins \
      --environment=AIRFLOW_2_ENV \
      --location=AIRFLOW_2_LOCATION
    

    Ersetzen Sie Folgendes:

    • AIRFLOW_3_BUCKET: Name des Buckets Ihrer Airflow 3-Umgebung, der im vorherigen Schritt abgerufen wurde.
    • AIRFLOW_2_ENV: Der Name Ihrer Airflow 2-Umgebung.
    • AIRFLOW_2_LOCATION: die Region, in der sich die Airflow 2-Umgebung befindet.
  3. Prüfen Sie, ob das Verzeichnis /plugins erfolgreich importiert wurde:

    gcloud composer environments storage plugins list \
      --environment=AIRFLOW_3_ENV \
      --location=AIRFLOW_3_LOCATION
    

    Ersetzen Sie Folgendes:

    • AIRFLOW_3_ENV: Der Name Ihrer Airflow 3-Umgebung.
    • AIRFLOW_3_LOCATION: die Region, in der sich die Airflow 3-Umgebung befindet.
  4. Exportieren Sie das Verzeichnis /data aus Ihrer Airflow 2-Umgebung in die Airflow 3-Umgebung:

    gcloud composer environments storage data export \
      --destination=AIRFLOW_3_BUCKET/data \
      --environment=AIRFLOW_2_ENV \
      --location=AIRFLOW_2_LOCATION
    

    Ersetzen Sie Folgendes:

    • AIRFLOW_3_BUCKET: Name des Buckets Ihrer Airflow 3-Umgebung, der im vorherigen Schritt abgerufen wurde.
    • AIRFLOW_2_ENV: Der Name Ihrer Airflow 2-Umgebung.
    • AIRFLOW_2_LOCATION: die Region, in der sich die Airflow 2-Umgebung befindet.
  5. Prüfen Sie, ob der Ordner /data erfolgreich importiert wurde:

    gcloud composer environments storage data list \
      --environment=AIRFLOW_3_ENV \
      --location=AIRFLOW_3_LOCATION
    

Schritt 7: Nutzer und Rollen übertragen

Nutzer und Rollen können nicht migriert werden, da airflowctl die Befehle users und roles noch nicht unterstützt.

Schritt 8: Prüfen, ob Ihre DAGs für Airflow 3 bereit sind

  1. Passen Sie Ihre Airflow-DAGs an, damit sie mit Airflow 3 kompatibel sind.

  2. Aufgaben mit direktem Airflow-Datenbankzugriff prüfen:

    In Airflow 3 können Operatoren nicht direkt über Datenbanksitzungen auf die Airflow-Metadatendatenbank zugreifen. Wenn Sie benutzerdefinierte Operatoren haben, überprüfen Sie Ihren Code, um sicherzustellen, dass keine direkten Datenbankzugriffsaufrufe vorhanden sind.

    Sie können einen der alternativen Ansätze verwenden, um die Migration vom direkten Airflow-Datenbankzugriff in den Aufgaben durchzuführen:

    • Zugriff auf die Airflow-Datenbank durch Exportieren von Airflow-Datenbankinhalten in eine Cloud SQL-Instanz.

    • Airflow-Python-Client verwenden Der von der Community-Version von Airflow bereitgestellte Python-Client hat APIs, die für die meisten Tabellen definiert sind, z. B. DagRuns, TaskInstances, Variables, Connections und XComs. Das Paket apache-airflow-client ist in den Managed Airflow-Builds für Airflow 3 bereits vorinstalliert.

    • Führen Sie airflowctl bis BashOperator über einen DAG aus.

    Wenn das Abfragen der exportierten Airflow-Datenbank für Ihren Anwendungsfall nicht infrage kommt und sowohl der Airflow-Python-Client als auch airflowctl nicht die erforderlichen Funktionen bieten, können Sie neue API-Endpunkte oder Task SDK-Funktionen in der Community-Version von Airflow anfordern.

  3. Wenn Sie KubernetesExecutor-Aufgaben haben, passen Sie die zugehörigen Operatordefinitionen an, indem Sie queue="kubernetes" durch executor="KubernetesExecutor" ersetzen.

    Beispiel für eine KubernetesExecutor-Aufgabe in Airflow 3:

    PythonOperator(
    task_id="airflow3_kubernetes_executor_task",
    dag=dag,
    python_callable=f,
    executor="KubernetesExecutor",
    )
    
  4. Wenn Sie die Umgebungsvariable AIRFLOW__WEBSERVER__BASE_URL im Aufgabencode verwenden, ersetzen Sie sie durch die Airflow-Konfigurationsoption [api]base_url.

    Beispiel für das Abrufen dieses Werts in Airflow 3:

    from airflow.configuration import conf
    
    webserver_base_url = conf.get("api", "base_url")
    

Schritt 9: DAGs in die Airflow 3-Umgebung übertragen

Die folgenden potenziellen Probleme können auftreten, wenn Sie DAGs zwischen Umgebungen übertragen:

  • Wenn ein DAG in beiden Umgebungen aktiviert (nicht pausiert) ist, führt jede Umgebung wie geplant eine eigene Kopie des DAG aus. Dies kann zu gleichzeitigen DAG-Ausführungen für dieselben Daten und zu den gleichen Ausführungszeiten führen.

  • Aufgrund des DAG-Catchups plant Airflow zusätzliche DAG-Ausführungen, beginnend ab dem in Ihren DAGs angegebenen Startdatum. Dies liegt daran, dass die neue Airflow-Instanz den Verlauf von DAG-Ausführungen aus der Airflow 2-Umgebung nicht berücksichtigt. Dies kann zu einer großen Anzahl von geplanten DAG-Ausführungen ab dem angegebenen Startdatum führen.

Gleichzeitige DAG-Ausführungen verhindern

In Ihrer Airflow 3-Umgebung überschreiben Sie die Airflow-Konfigurationsoption dags_are_paused_at_creation. Nachdem Sie diese Änderung vorgenommen haben, werden alle neuen DAGs standardmäßig pausiert.

Bereich Schlüssel Wert
core dags_are_paused_at_creation True

Zusätzliche oder fehlende DAG-Ausführungen verhindern

Geben Sie ein neues statisches Startdatum in DAGs an, das Sie in Ihre Airflow 3-Umgebung übertragen.

Zur Vermeidung von Lücken und Überschneidungen in logischen Datumsangaben muss die erste DAG-Ausführung in der Airflow 3-Umgebung beim nächsten Auftreten des Zeitplanintervalls erfolgen. Legen Sie dazu das neue Startdatum in Ihrem DAG vor dem Datum der letzten Ausführung in der Airflow 2-Umgebung fest.

Wenn Ihr DAG beispielsweise täglich um 15:00 Uhr, 17:00 Uhr und 21:00 Uhr in der Airflow 2-Umgebung ausgeführt wird, die letzte DAG-Ausführung um 15:00 Uhr ausgeführt wurde und Sie möchten den DAG um 15:15 Uhr übertragen, dann kann das Startdatum für die Airflow 3-Umgebung heute um 14:45 Uhr sein. Nachdem Sie den DAG in der Airflow 3-Umgebung aktiviert haben, plant Airflow eine DAG-Ausführung für 17:00 Uhr.

Ein weiteres Beispiel: Wenn Ihr DAG täglich in der Airflow 2-Umgebung um 00:00 Uhr ausgeführt wird, die letzte DAG-Ausführung am 26. März 2026 um 00:00 Uhr ausgeführt wurde und Sie möchten den DAG um 13:00 Uhr am 26. März 2026 übertragen, kann das Startdatum für die Airflow 3-Umgebung 23:45 Uhr am 25. März 2026 sein. Nachdem Sie den DAG in der Airflow 3-Umgebung aktiviert haben, plant Airflow eine DAG-Ausführung für 00:00 Uhr am 27. März 2026.

DAGs einzeln in die Airflow 3-Umgebung übertragen

Führen Sie für jeden DAG die folgenden Schritte aus, um ihn zu übertragen:

  1. Achten Sie darauf, dass das neue Startdatum im DAG wie im vorherigen Abschnitt beschrieben festgelegt ist.

  2. Laden Sie den aktualisierten DAG in die Airflow 3-Umgebung hoch. Dieser DAG wird aufgrund der Konfigurationsüberschreibung in der Airflow 3-Umgebung pausiert, sodass keine DAG-Ausführungen geplant sind.

  3. Wechseln Sie in der Airflow-Weboberfläche zu DAGs und suchen Sie nach gemeldeten DAG-Syntaxfehlern.

  4. Zu dem Zeitpunkt, an dem Sie den DAG übertragen möchten:

    1. Halten Sie den DAG in Ihrer Airflow 2-Umgebung an.

    2. Heben Sie die Pausierung des DAG in Ihrer Airflow 3-Umgebung auf.

    3. Prüfen Sie, ob die neue DAG-Ausführung zum richtigen Zeitpunkt geplant ist.

    4. Warten Sie, bis die DAG-Ausführung in der Airflow 3-Umgebung erfolgt ist, und prüfen Sie, ob die Ausführung erfolgreich ist.

  5. Je nachdem, ob die DAG-Ausführung erfolgreich ist:

    • Wenn die DAG-Ausführung erfolgreich ist, können Sie fortfahren und den DAG aus Ihrer Airflow 3-Umgebung verwenden. Erwägen Sie schließlich, die Airflow 2-Version des DAG zu löschen.

    • Wenn die DAG-Ausführung fehlgeschlagen ist, versuchen Sie, die Fehlerbehebung beim DAG durchzuführen, bis sie erfolgreich in Airflow 3 ausgeführt wird.

      Bei Bedarf können Sie jederzeit auf die Airflow 2-Version des DAG zurückgreifen:

      1. Pausieren Sie den DAG in Ihrer Airflow 3-Umgebung.

      2. Heben Sie die Pausierung des DAG in Ihrer Airflow 3-Umgebung auf. Dadurch wird eine neue DAG-Ausführung für das gleiche Datum und die gleiche Uhrzeit wie die fehlgeschlagene DAG-Ausführung geplant.

      3. Wenn Sie mit der Airflow 3-Version des DAG fortfahren möchten, passen Sie das Startdatum an, laden Sie die neue Version des DAG in Ihre Airflow 3-Umgebung hoch und wiederholen Sie den Vorgang.

Schritt 10: Airflow 3-Umgebung überwachen

Nachdem Sie alle DAGs und Konfigurationen in die Airflow 3-Umgebung übertragen haben, überwachen Sie diese auf potenzielle Probleme, fehlgeschlagene DAG-Ausführungen und den allgemeinen Umgebungsstatus. Wenn die Airflow 3-Umgebung für einen ausreichend langen Zeitraum fehlerfrei ausgeführt wird, können Sie die Airflow 2-Umgebung entfernen.

Nächste Schritte