Airflow 3 に環境を移行する(サイドバイサイド)

Managed Airflow(第 3 世代) | Managed Airflow(第 2 世代) | Managed Airflow(以前の第 1 世代)

このページでは、Airflow 2 を使用する既存の Managed Airflow(第 3 世代)環境から、Airflow 3 を使用する Managed Airflow(第 3 世代)環境に DAG、データ、構成を移行する方法について説明します。

移行元 移行先 メソッド ガイド
Managed Airflow(Gen 3)、Airflow 2 Managed Airflow(第 3 世代)、Airflow 3 並列処理、手動転送 このガイド
Managed Airflow(第 2 世代) Managed Airflow(第 3 世代) 並列処理、移行スクリプトを使用 スクリプト移行ガイド
Managed Airflow(第 2 世代) Managed Airflow(第 3 世代) 並列処理、スナップショットを使用 スナップショット移行ガイド
Managed Airflow(以前の Gen 1)、Airflow 2 Managed Airflow(第 3 世代) 並列処理、スナップショットを使用 スナップショット移行ガイド
Managed Airflow(以前の Gen 1)、Airflow 2 Managed Airflow(第 2 世代) 並列処理、スナップショットを使用 スナップショット移行ガイド
Managed Airflow(以前の Gen 1)、Airflow 2 Managed Airflow(第 2 世代) 並列処理、手動転送 手動移行ガイド
Managed Airflow(以前の Gen 1)、Airflow 1 Managed Airflow(第 2 世代)、Airflow 2 並列処理、スナップショットを使用 スナップショット移行ガイド
Managed Airflow(以前の Gen 1)、Airflow 1 Managed Airflow(第 2 世代)、Airflow 2 並列処理、手動転送 手動移行ガイド
Managed Airflow(以前の Gen 1)、Airflow 1 Managed Airflow(以前の Gen 1)、Airflow 2 並列処理、手動転送 手動移行ガイド

Airflow 3 で導入された変更点

Airflow 3 で Managed Airflow 環境の使用を開始する前に、Airflow 3 が Managed Airflow(第 3 世代)環境にもたらす変化について検討してください。

Airflow 3 のコミュニティ バージョンで導入された変更の概要については、Apache Airflow 3 が一般提供開始をご覧ください。

DAG のバージョニング

  • Airflow 3 では、DAG 実行中に新しいバージョンがアップロードされた場合でも、開始時のバージョンに基づいて DAG が完了まで実行されます。

    • Airflow UI のすべての DAG 実行が、対応する DAG バージョン(実行時のバージョン)に関連付けられるようになりました。これには、タスク構造と DAG コードが含まれます。

バックフィルの改善

Airflow 3 では、バックフィル(過去のデータのパイプラインの再実行)の処理方法が大幅に変更されています。バックフィルは、手動プロセスから、コア Airflow エンジンに統合された完全にオブザーバブルな機能に移行しています。

  • バックフィルは、個別の手動プロセスとして扱われるのではなく、Airflow スケジューラ内で直接管理されるようになりました。これにより、スケーラビリティが向上し、より正確な制御が可能になります。
  • Airflow CLI に加えて、Airflow UI または API 呼び出しを介して、バックフィルを直接トリガー、停止、モニタリングできるようになりました。
  • Airflow スケジューラは、過去のバックフィル実行のステータスと健全性をより詳細に把握できます。
  • ML コミュニティから(古いデータでモデルを再トレーニングするために)強く要望されていましたが、バックフィル改善はすべての ETL/ELT ワークフローに適用されます。

セキュリティと信頼性の向上

  • Airflow 3 では、タスクは Task SDK を介して中央 API サーバーとのみ通信します(Airflow 2 では、タスクがデータベースに直接アクセスできました)。API サーバーはこれらの接続を効率的にプールします。データベースは接続スパイクから保護され、環境全体が負荷の高い状況でも安定します。

  • 新しいタスク実行インターフェースを利用することで、Airflow 3 はタスク間の分離を強化し、あるタスクが別のタスクのデータに干渉したりアクセスしたりする可能性を回避します。

  • Airflow 3 の CLI は、データベースへの直接アクセスから移行しています。新しい airflowctl コマンドライン インターフェースは、API を介したリモート アクセス専用に設計された別のパッケージです。データベースに直接アクセスするのではなく、API を介して Airflow とやり取りするため、セキュリティが強化されます。

イベント ドリブン スケジューリングとデータ アセット

  • データセットデータアセットに進化しました。データ アセットを使用すると、Airflow は Airflow 以外のシステムで作成または更新されたデータをより適切に追跡し、対応できます。

  • Airflow 3 では、Watchers という新しいコンセプトが導入されました。これらは、データアセットの変更を監視するコンポーネントであり、データが到着した瞬間に Airflow がワークフローをトリガーできるようにします。ポーリング(ファイルが存在するかどうかを 1 分ごとに確認する)の代わりに、メッセージがメッセージキューに到達した瞬間に DAG を即座にトリガーできるようになりました。

  • Airflow 3 では、Python デコレータを使用する新しいアセット中心の構文が導入され、コードがよりクリーンになり、デベロッパーにとってより直感的になりました。

最新の Airflow UI

  • Airflow UI は、React(フロントエンド)と FastAPI(バックエンド)を使用してゼロから書き直されました。
  • 新しい Airflow UI は、標準化された REST API と UI オペレーション用の専用 API を介してオペレーションを実行します。
  • Flask の実装を FastAPI に置き換えることで、Airflow UI のレスポンスが大幅に向上します。
  • グリッドビューとグラフビューが統合され、ワークフローがスムーズになり、DAG の高レベルの構造と特定のタスクログを簡単に切り替えられるようになりました。

Airflow 3 の重要な変更

Airflow 3 では多くの大きな変更が導入され、その一部は最新機能です。

  • Airflow 2 の既存の DAG が Airflow 3 でそのまま機能するとは限りません。インポート、DAG パラメータ、その他の実装の詳細を変更して、テストを行い、必要に応じて調整する必要があります。
  • Airflow 2 の一部の構成オプションは、Airflow 3 で名前が変更されるか削除されます。パラメータの詳細については、Airflow 構成リファレンスをご覧ください。

  • タスクコードから Airflow データベースに直接アクセスできない:

    • タスクコードで Airflow データベース セッションやモデルを直接インポートして使用することはできなくなります。
    • airflow_db 接続で PostgresHookPostgresOperator を使用することはできません。
  • 一部のカスタム PyPI パッケージには、Airflow の新しいバージョンとその依存関係との互換性がない可能性があります。

  • REST API(/api/v1)が /api/v2 に置き換えられました。

  • SubDAG は、TaskGroup、アセット、データ認識スケジューリングに置き換えられます。

  • SLA は非推奨となり、削除されました。代わりに期限アラートが導入されました。

  • CLI コマンドの subdir 引数が削除されました。

  • 一部の Airflow コンテキスト変数が削除されました。詳細については、Airflow ドキュメントの互換性を破る変更をご覧ください。

  • catchup_by_default DAG パラメータがデフォルトで False になりました。

  • create_cron_data_intervals 構成がデフォルトで False になりました。つまり、CronDataIntervalTimetable ではなく CronTriggerTimetable がデフォルトで使用されます。

  • Airflow 3.0.0 の変更リスト

  • Airflow 3.1.0 の変更リスト

Airflow 3 と Airflow 2 の環境の違い

Airflow 2 を使用する Managed Airflow 環境と Airflow 3 を使用する環境の主な違いは次のとおりです。

  • Airflow 3 環境のワークロード構成:

    • すべての Airflow コンポーネントの最小メモリ量は 2 GB です。
    • 環境プリセットの Airflow トリガーとワーカーの構成は、Airflow 2 環境と比較して変更されています。
    • トリガーのデフォルトの CPU 数は 1 です。
    • triggerer のデフォルトのメモリ容量は 2 GB です。
  • Airflow 3 コンポーネントによるさまざまなメモリ使用量に対応するため、[celery]worker_concurrency 構成オプションの自動計算が変更されました。

  • Airflow 3 では、タスクコードから Airflow データベースに直接アクセスする方法はありません。

  • Airflow 3 では、airflowctl コマンドライン ユーティリティを使用して Airflow CLI コマンドを実行します。

  • プリインストールされた PyPI パッケージは、Airflow 3 環境で異なります。プリインストールされた PyPI パッケージの一覧については、プリインストールされたパッケージの変更履歴をご覧ください。

Airflow 3 に並行して移行する

並行移行プロセスには次の手順があります。

  1. Airflow 3 との互換性を確認します。
  2. Airflow 3 環境を作成し、構成のオーバーライドと環境変数を転送します。
  3. Airflow 3 環境に PyPI パッケージをインストールします。
  4. 変数、接続、プールを Airflow 3 に転送します。
  5. Airflow 2.* 環境バケットから他のデータを転送します。
  6. ユーザーとロールを移行します。
  7. DAG が Airflow 3 に対応する準備ができていることを確認します。
  8. DAG を Airflow 3 環境に転送します。
  9. Airflow 3 環境をモニタリングします。

ステップ 1: Airflow 3 との互換性を確認する

Airflow 3 との互換性を確認するには:

  • 環境で Airflow バージョン 2.7 以降が使用されていることを確認します。最初に最新の Airflow 2 バージョンにアップグレードしてから、Airflow 3 に移行することをおすすめします。
  • 環境が正常で、しばらくの間問題なく実行されていることを確認します。
  • DAG と Airflow 構成で、Airflow 3 で削除された機能や関数が使用されていないことを確認します。
  • Airflow 3 と互換性があるように DAG を変更するの手順を読んで、移行プロセス中に DAG の変更が必要かどうかを確認します。
  • Airflow のコミュニティ バージョンで提供されている ruff ツールを使用して、Airflow DAG の互換性を確認します。手順については、Airflow ドキュメントの Airflow DAG の互換性を確認するをご覧ください。

ステップ 2: Airflow 3 環境を作成し、構成のオーバーライドと環境変数を転送する

このステップでは、Airflow 3 を使用して新しい Managed Airflow(第 3 世代)環境を作成し、Airflow 2 環境からの構成パラメータの転送を開始します。

Managed Airflow(第 3 世代)環境を作成する手順に沿って、次の操作を行います。

  1. Airflow ビルドを選択するときは、Airflow 3 を使用するビルドを選択します。
  2. Airflow 2 環境から、互換性のある Airflow 構成オプションのオーバーライドをすべてコピーします。

  3. Airflow 2 環境からすべての環境変数をコピーします。

  4. Airflow 3 を使用して環境の作成に進みます。

次の表に、Airflow 構成オプションの変更の一部を示します。このリストはすべてを網羅したものではありません。Airflow 構成オプションの変更の詳細については、Airflow ドキュメントの Airflow 構成リファレンスAirflow リリースノートをご覧ください。

Airflow 2 オプション Airflow 3 オプション
[scheduler]min_file_process_interval [dag_processor]min_file_process_interval
[webserver]rbac_user_registration_role [api]rbac_user_registration_role
[core]dag_file_processor_timeout [dag_processor]dag_file_processor_timeout
[scheduler]dag_dir_list_interval [dag_processor]refresh_interval
[scheduler]max_threads [dag_processor]parsing_processes
[scheduler]parsing_processes [dag_processor]parsing_processes
[webserver]instance_name [api]instance_name
[scheduler]scheduler_zombie_task_threshold [scheduler]task_instance_heartbeat_timeout
[webserver]rbac 非推奨
[api]auth_backend=airflow.api.auth.backend.deny_all 非推奨
[api]auth_backends=airflow.api.auth.backend.deny_all 非推奨
[api]composer_auth_user_registration_role 非推奨

ステップ 3: Airflow 3 環境に PyPI パッケージをインストールする

Airflow 3 環境を作成したら、PyPI パッケージをインストールします。

  1. Airflow 2 環境から PyPI パッケージの要件をコピーします。
  2. PyPI パッケージの更新オペレーションを開始し、環境が更新されるまで待ちます。

Airflow 3 環境では、プリインストールされたパッケージの別のセットを使用するため、更新オペレーション中に PyPI パッケージの競合が発生することがあります。PyPI パッケージの競合のトラブルシューティングの詳細については、プリインストールされた PyPI パッケージとの競合をご覧ください。

ステップ 4: Airflow 2 から変数、接続、プールをエクスポートする

変数または接続がない場合は、それぞれのエクスポート コマンドとインポート コマンドをスキップします。

default_pool 以外のカスタムプールがある場合にのみ、プールを移行する必要があります。それ以外の場合は、プールのエクスポートとインポートを行うコマンドをスキップします。

  1. Airflow 2 環境から変数をエクスポートします。

    gcloud composer environments run AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        variables -- export /home/airflow/gcs/data/variables.json
    

    次のように置き換えます。

    • AIRFLOW_2_ENV: Airflow 2 環境の名前。
    • AIRFLOW_2_LOCATION: Airflow 2 環境が配置されているリージョン。
  2. Airflow 2 環境から接続をエクスポートします。

    gcloud composer environments run AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        connections -- export /home/airflow/gcs/data/connections.json
    

    次のように置き換えます。

    • AIRFLOW_2_ENV: Airflow 2 環境の名前。
    • AIRFLOW_2_LOCATION: Airflow 2 環境が配置されているリージョン。
  3. Airflow 2 環境からプールをエクスポートします。

    gcloud composer environments run AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        pools -- export /home/airflow/gcs/data/pools.json
    

    次のように置き換えます。

    • AIRFLOW_2_ENV: Airflow 2 環境の名前。
    • AIRFLOW_2_LOCATION: Airflow 2 環境が配置されているリージョン。
  4. Airflow 2 環境のバケットの名前を取得します。

    gcloud composer environments describe AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        --format="value(storageConfig.bucket)"
    

    次のように置き換えます。

    • AIRFLOW_2_ENV: Airflow 2 環境の名前。
    • AIRFLOW_2_LOCATION: Airflow 2 環境が配置されているリージョン。
  5. Airflow 2 環境のバケットの /data ディレクトリからローカル ディレクトリに variables.jsonconnections.jsonpools.json ファイルをダウンロードします。

    gcloud storage cp gs://AIRFLOW_2_BUCKET/data/variables.json ./variables.json
    gcloud storage cp gs://AIRFLOW_2_BUCKET/data/connections.json ./connections.json
    gcloud storage cp gs://AIRFLOW_2_BUCKET/data/pools.json ./pools.json
    

    次のように置き換えます。

    • AIRFLOW_2_BUCKET: 前の手順で取得した Airflow 2 環境のバケットの名前。

ステップ 5: 変数、接続、プールを Airflow 3 にインポートする

変数または接続がない場合は、それぞれのエクスポート コマンドとインポート コマンドをスキップします。

default_pool 以外のカスタムプールがある場合にのみ、プールを移行する必要があります。それ以外の場合は、プールのエクスポートとインポートを行うコマンドをスキップします。

  1. Airflow 3 環境の Airflow CLI コマンドを実行するように airflowctl を構成します

  2. airflowctl を使用して、変数、接続、プールを Airflow 3 環境にインポートします。

    airflowctl variables import ./variables.json
    airflowctl connections import ./connections.json
    airflowctl pools import ./pools.json
    
  3. 変数、接続、プールが Airflow 3 環境にインポートされていることを確認します。

    airflowctl variables list
    airflowctl connections list
    airflowctl pools list
    
  4. JSON ファイルをクリーンアップします。

    gcloud storage rm gs://AIRFLOW_2_BUCKET/data/variables.json
    gcloud storage rm gs://AIRFLOW_2_BUCKET/data/connections.json
    gcloud storage rm gs://AIRFLOW_2_BUCKET/data/pools.json
    rm ./variables.json
    rm ./connections.json
    rm ./pools.json
    

    次のように置き換えます。

    • AIRFLOW_2_BUCKET: Airflow 2 環境のバケットの名前。

ステップ 6: Airflow 2 環境のバケットから他のデータを転送する

このステップでは、Airflow 2 環境のバケットから残りのデータを転送します。

  1. Airflow 3 環境のバケットの名前を取得します。

    gcloud composer environments describe AIRFLOW_3_ENV \
        --location AIRFLOW_3_LOCATION \
        --format="value(storageConfig.bucket)"
    

    次のように置き換えます。

    • AIRFLOW_3_ENV: Airflow 3 環境の名前。
    • AIRFLOW_3_LOCATION: Airflow 3 環境が配置されているリージョン。
  2. Airflow 2 環境のバケットから Airflow 3 環境のバケットにある /plugins ディレクトリにプラグインをエクスポートします。

    gcloud composer environments storage plugins export \
      --destination=AIRFLOW_3_BUCKET/plugins \
      --environment=AIRFLOW_2_ENV \
      --location=AIRFLOW_2_LOCATION
    

    次のように置き換えます。

    • AIRFLOW_3_BUCKET: 前の手順で取得した Airflow 3 環境のバケットの名前。
    • AIRFLOW_2_ENV: Airflow 2 環境の名前。
    • AIRFLOW_2_LOCATION: Airflow 2 環境が配置されているリージョン。
  3. /plugins ディレクトリが正常にインポートされたことを確認します。

    gcloud composer environments storage plugins list \
      --environment=AIRFLOW_3_ENV \
      --location=AIRFLOW_3_LOCATION
    

    次のように置き換えます。

    • AIRFLOW_3_ENV: Airflow 3 環境の名前。
    • AIRFLOW_3_LOCATION: Airflow 3 環境が配置されているリージョン。
  4. Airflow 2 環境から Airflow 3 環境に /data ディレクトリをエクスポートします。

    gcloud composer environments storage data export \
      --destination=AIRFLOW_3_BUCKET/data \
      --environment=AIRFLOW_2_ENV \
      --location=AIRFLOW_2_LOCATION
    

    次のように置き換えます。

    • AIRFLOW_3_BUCKET: 前の手順で取得した Airflow 3 環境のバケットの名前。
    • AIRFLOW_2_ENV: Airflow 2 環境の名前。
    • AIRFLOW_2_LOCATION: Airflow 2 環境が配置されているリージョン。
  5. /data フォルダが正常にインポートされたことを確認します。

    gcloud composer environments storage data list \
      --environment=AIRFLOW_3_ENV \
      --location=AIRFLOW_3_LOCATION
    

ステップ 7: ユーザーとロールを転送する

airflowctlusers コマンドと roles コマンドをまだサポートしていないため、ユーザーとロールを移行できません。

ステップ 8: DAG が Airflow 3 に対応する準備ができていることを確認する

  1. Airflow DAG を調整して、Airflow 3 と互換性を持たせます

  2. Airflow データベースに直接アクセスするカスタム作成タスクを確認します。

    Airflow 3 では、オペレーターはデータベース セッションを使用して Airflow メタデータ データベースに直接アクセスできません。カスタム オペレーターがある場合は、コードを確認して、データベースへの直接アクセス呼び出しがないことを確認します。

    次のいずれかの方法を使用して、タスクでの Airflow データベースへの直接アクセスから移行できます。

    エクスポートされた Airflow データベースのクエリがユースケースのオプションではなく、Airflow Python クライアントと airflowctl の両方で必要な機能が提供されていない場合は、Airflow のコミュニティ バージョンで新しい API エンドポイントまたは Task SDK 機能のリクエストを検討してください。

  3. KubernetesExecutor タスクがある場合は、queue="kubernetes"executor="KubernetesExecutor" に置き換えて、演算子の定義を調整します。

    Airflow 3 の KubernetesExecutor タスクの例:

    PythonOperator(
    task_id="airflow3_kubernetes_executor_task",
    dag=dag,
    python_callable=f,
    executor="KubernetesExecutor",
    )
    
  4. タスクコードで AIRFLOW__WEBSERVER__BASE_URL 環境変数を使用している場合は、[api]base_url Airflow 構成オプションに置き換えます。

    Airflow 3 でこの値を取得する例:

    from airflow.configuration import conf
    
    webserver_base_url = conf.get("api", "base_url")
    

ステップ 9: DAG を Airflow 3 環境に転送する

DAG を環境間で転送すると、次の問題が発生する場合があります。

  • 両方の環境で DAG が有効である(一時停止されていない)場合、各環境はスケジュールされたとおりに DAG の独自のコピーを実行します。これにより、同じデータと実行時間に対して同時 DAG 実行が発生する可能性があります。

  • DAG のキャッチアップのために、Airflow は DAG で指定された開始日に始まる追加の DAG 実行をスケジュールします。これは、新しい Airflow インスタンスで Airflow 2 環境からの DAG 実行の履歴が考慮されていないためです。このため、指定した開始日に始まるように大量の DAG 実行がスケジュールされる可能性があります。

同時 DAG 実行を防止する

Airflow 3 環境では、Airflow 構成オプション dags_are_paused_at_creation をオーバーライドします。この変更を行うと、デフォルトで新しいすべての DAG が一時停止されます。

セクション キー
core dags_are_paused_at_creation True

DAG 実行の追加や欠落を防ぐ

Airflow 3 環境に転送する DAG で新しい静的開始日を指定します。

論理日付のギャップと重複を避けるため、最初の DAG 実行はスケジュール間隔の次の発生時に Airflow 3 環境で行う必要があります。そのためには、DAG 内の新しい開始日を、Airflow 2 環境での最後の実行日よりも前の日付に設定します。

たとえば、Airflow 2 環境で DAG が毎日 15:00、17:00、21:00 に実行され、最後の DAG 実行が 15:00 に発生し、DAG を 15:15 に転送するように計画する場合、Airflow 3 環境の開始日を今日の 14:45 に設定できます。Airflow 3 環境で DAG を有効にすると、DAG の実行が 17:00 にスケジュール設定されます。

別の例として、Airflow 2 環境で DAG が毎日 00:00 に実行され、最後の DAG 実行が 2026 年 3 月 26 日の 00:00 に発生し、DAG を 2026 年 3 月 26 日の 13:00 に転送するように計画する場合、Airflow 3 環境の開始日を 2026 年 3 月 25 日の 23:45 に設定できます。Airflow 3 環境で DAG を有効にすると、DAG の実行が 2026 年 3 月 27 日の 00:00 にスケジュール設定されます。

DAG を 1 つずつ Airflow 3 環境に転送する

DAG ごとに、次の手順に従って転送します。

  1. DAG の新しい開始日が前のセクションの説明に従って設定されていることを確認します。

  2. Airflow 3 環境に更新された DAG をアップロードします。この DAG は、構成のオーバーライドのために Airflow 3 環境で一時停止されているため、DAG 実行はまだスケジュールされていません。

  3. Airflow ウェブ インターフェースで、[DAG] に移動し、報告された DAG 構文エラーを確認します。

  4. DAG の転送を計画する時刻:

    1. Airflow 2 環境で DAG を一時停止します。

    2. Airflow 3 環境で DAG の一時停止を解除します。

    3. 新しい DAG 実行が正しい時間にスケジュール設定されていることを確認します。

    4. Airflow 3 環境で DAG の実行が発生するまで待機し、実行が成功したかどうかを確認します。

  5. DAG の実行が成功するかどうかによって、以下にようになります。

    • DAG の実行が成功した場合、続行して、Airflow 3 環境から DAG を使用できます。最終的には、Airflow 2 バージョンの DAG を削除することを検討してください。

    • DAG 実行が失敗した場合は、Airflow 3 で正常に実行されるまで DAG のトラブルシューティングを試行してください。

      必要に応じて、いつでも DAG の Airflow 2 バージョンにフォールバックできます。

      1. Airflow 3 環境で DAG を一時停止します。

      2. Airflow 3 環境で DAG の一時停止を解除します。これにより、新しい DAG 実行が、失敗した DAG 実行と同じ日時にスケジュール設定されます。

      3. Airflow 3 バージョンの DAG の使用を続行する準備ができたら、開始日を調整し、新しいバージョンの DAG を Airflow 3 環境にアップロードして、手順を繰り返します。

ステップ 10: Airflow 3 環境をモニタリングする

すべての DAG と構成を Airflow 3 環境に転送した後、潜在的な問題、失敗した DAG 実行、環境全体の健全性をモニタリングします。Airflow 3 環境が一定期間問題のなく動作している場合は、Airflow 2 環境を削除できます。

次のステップ