スナップショットを使用して環境を Managed Airflow(第 2 世代)(Airflow 2 から)に移行する

Managed Airflow(Gen 3) | Managed Airflow(Gen 2) | Managed Airflow(レガシー Gen 1)

このページでは、DAG、データ、構成を既存の Managed Airflow(以前の Gen 1)、Airflow 2 環境から Managed Airflow(Gen 2)、Airflow 2 に移行する方法について説明します。

この移行ガイドでは、スナップショット機能を使用します。

その他の移行ガイド

移行元 移行先 メソッド ガイド
Managed Airflow(Gen 3)、Airflow 2 マネージド Airflow(Gen 3)、Airflow 3 並列処理、手動転送 手動移行ガイド
マネージド Airflow(Gen 2) マネージド Airflow(Gen 3) 並列処理、移行スクリプトを使用 スクリプト移行ガイド
マネージド Airflow(Gen 2) マネージド Airflow(Gen 3) 並列処理、スナップショットを使用 スナップショット移行ガイド
Managed Airflow(以前の Gen 1)、Airflow 2 マネージド Airflow(Gen 3) 並列処理、スナップショットを使用 スナップショット移行ガイド
Managed Airflow(以前の Gen 1)、Airflow 2 マネージド Airflow(Gen 2) 並列処理、スナップショットを使用 このガイド
Managed Airflow(以前の Gen 1)、Airflow 2 マネージド Airflow(Gen 2) 並列処理、手動転送 手動移行ガイド
Managed Airflow(以前の Gen 1)、Airflow 1 Managed Airflow(Gen 2)、Airflow 2 並列処理、スナップショットを使用 スナップショット移行ガイド
Managed Airflow(以前の Gen 1)、Airflow 1 Managed Airflow(Gen 2)、Airflow 2 並列処理、手動転送 手動移行ガイド
Managed Airflow(以前の Gen 1)、Airflow 1 Managed Airflow(以前の Gen 1)、Airflow 2 並列処理、手動転送 手動移行ガイド

始める前に

  • スナップショットは、Managed Airflow(Gen 2)バージョン 2.0.9 以降でサポートされています。Managed Airflow(レガシー Gen 1)では、1.18.5 で環境スナップショットの保存がサポートされています。

  • Managed Airflow は、Managed Airflow(レガシー Gen 1)から Managed Airflow(Gen 2)への並列移行をサポートしています。マネージド Airflow(レガシー Gen 1)からマネージド Airflow(Gen 2)へのインプレース アップグレードはできません。

  • マネージド Airflow(レガシー Gen 1)と マネージド Airflow(Gen 2)の違いのリストを確認してください。

  • スナップショットをサポートする Airflow データベースの最大サイズは 20 GB です。環境のデータベースが 20 GB を超える場合は、Airflow データベースのサイズを縮小します。

  • スナップショットを作成するには、環境のバケット内の /dags/plugins/data フォルダ内のオブジェクトの合計数が 100,000 未満にする必要があります。

  • XCom メカニズムを使用してファイルを転送する場合は、Airflow のガイドラインに従って使用するようにしてください。XCom を使用して大きなファイルや大量のファイルを転送すると、Airflow データベースのパフォーマンスに影響し、スナップショットの読み込みや環境のアップグレード時に障害が発生する可能性があります。大量のデータを転送するには、Cloud Storage などの代替手段の使用を検討してください。

ステップ 1: マネージド Airflow(レガシー Gen 1)環境で DAG を一時停止する

DAG の重複実行を回避するには、スナップショットを保存する前に Managed Airflow(レガシー Gen 1)環境内のすべての DAG を一時停止します。

次のいずれかのオプションを使用できます。

  • Airflow ウェブ インターフェースで、[DAG] に移動し、すべての DAG を手動で一時停止します。

  • composer_dags スクリプトを使用して、すべての DAG を一時停止します。

    python3 composer_dags.py --environment COMPOSER_1_ENV \
      --project PROJECT_ID \
      --location COMPOSER_1_LOCATION \
      --operation pause
    

    次のように置き換えます。

    • COMPOSER_1_ENV は、Managed Airflow(レガシー Gen 1)環境の名前に置き換えます。
    • PROJECT_ID は、プロジェクト ID に置き換えます。
    • COMPOSER_1_LOCATION は、環境が配置されているリージョン。
  • (Airflow バージョン 2.9.1 以降)多数の DAG を一時停止している間に割り当てエラーが発生した場合は、次の Airflow CLI コマンドを使用して、すべての DAG を一時停止できます。

    gcloud composer environments run COMPOSER_1_ENV dags pause \
      --project PROJECT_ID \
      --location COMPOSER_1_LOCATION \
      -- -y --treat-dag-id-as-regex ".*"
    
  • (Airflow バージョン 2.9.1 より前)多数の DAG を一時停止している間に割り当てエラーが発生した場合は、Airflow REST API を使用して DAG を一時停止できます。Airflow ドキュメントの API を試すもご覧ください。

ステップ 2: Managed Airflow(レガシー Gen 1)環境のスナップショットを保存する

コンソール

環境のスナップショットを作成します。

  1. コンソールで、[**環境**] ページに移動します。 Google Cloud

    [環境] に移動

  2. 環境のリストで、Managed Airflow(レガシー Gen 1)環境の名前をクリックします。[環境の詳細] ページが開きます。

  3. [スナップショットを作成] をクリックします。

  4. [スナップショットを作成] ダイアログで、[送信] をクリックします。このガイドでは、スナップショットを Managed Airflow(レガシー Gen 1)環境のバケットに保存しますが、必要に応じて別のロケーションを選択することもできます。

  5. マネージド Airflow がスナップショットを作成するまで待ちます。

gcloud

  1. Managed Airflow(レガシー Gen 1)環境のバケット URI を取得します。

    1. 次のコマンドを実行します。

      gcloud composer environments describe COMPOSER_1_ENV \
          --location COMPOSER_1_LOCATION \
           --format="value(config.dagGcsPrefix)"
      

      次のように置き換えます。

      • COMPOSER_1_ENV は、Managed Airflow(レガシー Gen 1)環境の名前に置き換えます。
      • COMPOSER_1_LOCATION は、環境が配置されているリージョン。
    2. 出力で、/dags フォルダを削除します。結果は、Managed Airflow(以前の Gen 1)環境のバケットの URI です。

      たとえば、gs://us-central1-example-916807e1-bucket/dagsgs://us-central1-example-916807e1-bucketに変更します。

  2. Managed Airflow(レガシー Gen 1)環境のスナップショットを作成します。

    gcloud composer environments snapshots save \
      COMPOSER_1_ENV \
      --location COMPOSER_1_LOCATION \
      --snapshot-location "COMPOSER_1_SNAPSHOTS_FOLDER"
    

    次のように置き換えます。

    • COMPOSER_1_ENV は、Managed Airflow(レガシー Gen 1)環境の名前に置き換えます。
    • COMPOSER_1_LOCATION は、Managed Airflow(レガシー Gen 1)環境が配置されているリージョン。
    • COMPOSER_1_SNAPSHOTS_FOLDER は、Managed Airflow(レガシー Gen 1)環境のバケットの URI。このガイドでは、スナップショットを Managed Airflow(レガシー Gen 1)環境のバケットに保存しますが、必要に応じて別のロケーションを選択することもできます。カスタム ロケーションを指定する場合は、 両方の環境のサービス アカウントに、指定したロケーションに対する読み取りと書き込みの権限が必要です。

ステップ 3: Managed Airflow(Gen 2)環境を作成する

Managed Airflow(Gen 2)環境を作成します想定されるリソース需要を満たす環境プリセットから始めて、後で環境をさらにスケーリングして最適化できます。

構成のオーバーライドと環境変数は、Managed Airflow(レガシー Gen 1)環境のスナップショットを読み込むときに後で置き換えるため、指定する必要はありません。

ステップ 4: スナップショットを Managed Airflow(Gen 2)環境に読み込む

コンソール

スナップショットを Managed Airflow(Gen 2)環境に読み込むには:

  1. コンソールで、[**環境**] ページに移動します。 Google Cloud

    [環境] に移動

  2. 環境のリストで、Managed Airflow(Gen 2)環境の名前をクリックします。[環境の詳細] ページが開きます。

  3. [スナップショットを読み込む] をクリックします。

  4. [スナップショットを読み込む] ダイアログで、[参照] をクリックします。

  5. スナップショットのあるフォルダを選択します。このガイドのデフォルトの場所を使用する場合、このフォルダは /snapshots フォルダの Managed Airflow(レガシー Gen 1)環境バケットにあり、名前はスナップショット保存オペレーションのタイムスタンプになります。例: us-central1-example-916807e1-bucket/snapshots_example-project_us-central1_example-environment/2022-01-05T18-59-00

  6. [読み込む] をクリックし、Managed Airflow によってスナップショットが読み込まれるまで待ちます。

gcloud

マネージド Airflow(レガシー Gen 1)環境のスナップショットをマネージド Airflow(Gen 2)環境に読み込みます。

gcloud composer environments snapshots load \
  COMPOSER_2_ENV \
  --location COMPOSER_2_LOCATION \
  --snapshot-path "SNAPSHOT_PATH"

次のように置き換えます。

  • COMPOSER_2_ENV は、Managed Airflow(Gen 2)環境の名前に置き換えます。
  • COMPOSER_2_LOCATION は、Managed Airflow(Gen 2)環境が配置されているリージョン。
  • SNAPSHOT_PATH を、Managed Airflow(以前の Gen 1)環境のバケットの URI の後にスナップショットのパスを付加したものに置き換えます。例: gs://us-central1-example-916807e1-bucket/snapshots/example-project_us-central1_example-environment_2022-01-05T18-59-00

ステップ 5: Managed Airflow(第 2 世代)環境で DAG の一時停止を解除する

次のいずれかのオプションを使用できます。

  • Airflow ウェブ インターフェースで、[DAG] に移動し、すべての DAG の一時停止を手動で 1 つずつ解除します。

  • composer_dags スクリプトを使用して、すべての DAG の一時停止を解除します。

    python3 composer_dags.py --environment COMPOSER_2_ENV \
      --project PROJECT_ID \
      --location COMPOSER_2_LOCATION \
      --operation unpause
    

    次のように置き換えます。

    • COMPOSER_2_ENV は、Managed Airflow(Gen 2)環境の名前に置き換えます。
    • PROJECT_ID は、プロジェクト ID に置き換えます。
    • COMPOSER_2_LOCATION は、環境が配置されているリージョン。
  • (Airflow バージョン 2.9.1 以降)多数の DAG の一時停止を解除するときに割り当てエラーが発生した場合は、次の Airflow CLI コマンドを使用して、すべての DAG を一括で一時停止を解除できます。

    gcloud composer environments run COMPOSER_2_ENV dags unpause \
      --project PROJECT_ID \
      --location COMPOSER_2_LOCATION \
      -- -y --treat-dag-id-as-regex ".*"
    
  • (Airflow バージョン 2.9.1 より前)多数の DAG を一時停止解除する際に割り当てエラーが発生した場合は、Airflow REST API を使用して DAG を一時停止解除できます。Airflow ドキュメントの API を試すもご覧ください。

ステップ 6: DAG エラーを確認する

  1. Airflow ウェブ インターフェースで、[DAG] に移動し、報告された DAG 構文エラーを確認します。

  2. DAG 実行が正しい時間にスケジュール設定されていることを確認します。

  3. マネージド Airflow(Gen 2)環境で DAG の実行が発生するのを待ち、成功したかどうかを確認します。DAG の実行が成功した場合は、Managed Airflow(レガシー Gen 1)環境で一時停止を解除しないでください。解除すると、DAG が Managed Airflow(レガシー Gen 1)の環境で同じ日時に実行されます。

  4. 特定の DAG の実行が失敗した場合は、Managed Airflow(Gen 2)で正常に実行されるまで DAG の トラブルシューティングを行います 。

ステップ 7: Managed Airflow(第 2 世代)環境をモニタリングする

すべての DAG と構成をマネージド Airflow(Gen 2)環境に転送した後、潜在的な問題、失敗した DAG 実行、環境全体の健全性をモニタリングします。

Managed Airflow(Gen 2)環境が、十分な時間、問題なく動作している場合は、Managed Airflow(以前の Gen 1)環境の削除を検討してください。

次のステップ