Managed Airflow(Gen 3) | Managed Airflow(Gen 2) | Managed Airflow(レガシー Gen 1)
このページでは、DAG、データ、構成を既存の Managed Airflow(以前の Gen 1)、Airflow 2 環境から Managed Airflow(Gen 2)、Airflow 2 に移行する方法について説明します。
この移行ガイドでは、スナップショット機能を使用します。
その他の移行ガイド
| 移行元 | 移行先 | メソッド | ガイド |
|---|---|---|---|
| Managed Airflow(Gen 3)、Airflow 2 | マネージド Airflow(Gen 3)、Airflow 3 | 並列処理、手動転送 | 手動移行ガイド |
| マネージド Airflow(Gen 2) | マネージド Airflow(Gen 3) | 並列処理、移行スクリプトを使用 | スクリプト移行ガイド |
| マネージド Airflow(Gen 2) | マネージド Airflow(Gen 3) | 並列処理、スナップショットを使用 | スナップショット移行ガイド |
| Managed Airflow(以前の Gen 1)、Airflow 2 | マネージド Airflow(Gen 3) | 並列処理、スナップショットを使用 | スナップショット移行ガイド |
| Managed Airflow(以前の Gen 1)、Airflow 2 | マネージド Airflow(Gen 2) | 並列処理、スナップショットを使用 | このガイド |
| Managed Airflow(以前の Gen 1)、Airflow 2 | マネージド Airflow(Gen 2) | 並列処理、手動転送 | 手動移行ガイド |
| Managed Airflow(以前の Gen 1)、Airflow 1 | Managed Airflow(Gen 2)、Airflow 2 | 並列処理、スナップショットを使用 | スナップショット移行ガイド |
| Managed Airflow(以前の Gen 1)、Airflow 1 | Managed Airflow(Gen 2)、Airflow 2 | 並列処理、手動転送 | 手動移行ガイド |
| Managed Airflow(以前の Gen 1)、Airflow 1 | Managed Airflow(以前の Gen 1)、Airflow 2 | 並列処理、手動転送 | 手動移行ガイド |
始める前に
スナップショットは、Managed Airflow(Gen 2)バージョン 2.0.9 以降でサポートされています。Managed Airflow(レガシー Gen 1)では、1.18.5 で環境スナップショットの保存がサポートされています。
Managed Airflow は、Managed Airflow(レガシー Gen 1)から Managed Airflow(Gen 2)への並列移行をサポートしています。マネージド Airflow(レガシー Gen 1)からマネージド Airflow(Gen 2)へのインプレース アップグレードはできません。
マネージド Airflow(レガシー Gen 1)と マネージド Airflow(Gen 2)の違いのリストを確認してください。
スナップショットをサポートする Airflow データベースの最大サイズは 20 GB です。環境のデータベースが 20 GB を超える場合は、Airflow データベースのサイズを縮小します。
スナップショットを作成するには、環境のバケット内の
/dags、/plugins、/dataフォルダ内のオブジェクトの合計数が 100,000 未満にする必要があります。XCom メカニズムを使用してファイルを転送する場合は、Airflow のガイドラインに従って使用するようにしてください。XCom を使用して大きなファイルや大量のファイルを転送すると、Airflow データベースのパフォーマンスに影響し、スナップショットの読み込みや環境のアップグレード時に障害が発生する可能性があります。大量のデータを転送するには、Cloud Storage などの代替手段の使用を検討してください。
ステップ 1: マネージド Airflow(レガシー Gen 1)環境で DAG を一時停止する
DAG の重複実行を回避するには、スナップショットを保存する前に Managed Airflow(レガシー Gen 1)環境内のすべての DAG を一時停止します。
次のいずれかのオプションを使用できます。
Airflow ウェブ インターフェースで、[DAG] に移動し、すべての DAG を手動で一時停止します。
composer_dags スクリプトを使用して、すべての DAG を一時停止します。
python3 composer_dags.py --environment COMPOSER_1_ENV \ --project PROJECT_ID \ --location COMPOSER_1_LOCATION \ --operation pause次のように置き換えます。
COMPOSER_1_ENVは、Managed Airflow(レガシー Gen 1)環境の名前に置き換えます。PROJECT_IDは、プロジェクト ID に置き換えます。COMPOSER_1_LOCATIONは、環境が配置されているリージョン。
(Airflow バージョン 2.9.1 以降)多数の DAG を一時停止している間に割り当てエラーが発生した場合は、次の Airflow CLI コマンドを使用して、すべての DAG を一時停止できます。
gcloud composer environments run COMPOSER_1_ENV dags pause \ --project PROJECT_ID \ --location COMPOSER_1_LOCATION \ -- -y --treat-dag-id-as-regex ".*"(Airflow バージョン 2.9.1 より前)多数の DAG を一時停止している間に割り当てエラーが発生した場合は、Airflow REST API を使用して DAG を一時停止できます。Airflow ドキュメントの API を試すもご覧ください。
ステップ 2: Managed Airflow(レガシー Gen 1)環境のスナップショットを保存する
コンソール
環境のスナップショットを作成します。
コンソールで、[**環境**] ページに移動します。 Google Cloud
環境のリストで、Managed Airflow(レガシー Gen 1)環境の名前をクリックします。[環境の詳細] ページが開きます。
[スナップショットを作成] をクリックします。
[スナップショットを作成] ダイアログで、[送信] をクリックします。このガイドでは、スナップショットを Managed Airflow(レガシー Gen 1)環境のバケットに保存しますが、必要に応じて別のロケーションを選択することもできます。
マネージド Airflow がスナップショットを作成するまで待ちます。
gcloud
Managed Airflow(レガシー Gen 1)環境のバケット URI を取得します。
次のコマンドを実行します。
gcloud composer environments describe COMPOSER_1_ENV \ --location COMPOSER_1_LOCATION \ --format="value(config.dagGcsPrefix)"次のように置き換えます。
COMPOSER_1_ENVは、Managed Airflow(レガシー Gen 1)環境の名前に置き換えます。COMPOSER_1_LOCATIONは、環境が配置されているリージョン。
出力で、
/dagsフォルダを削除します。結果は、Managed Airflow(以前の Gen 1)環境のバケットの URI です。たとえば、
gs://us-central1-example-916807e1-bucket/dagsをgs://us-central1-example-916807e1-bucketに変更します。
Managed Airflow(レガシー Gen 1)環境のスナップショットを作成します。
gcloud composer environments snapshots save \ COMPOSER_1_ENV \ --location COMPOSER_1_LOCATION \ --snapshot-location "COMPOSER_1_SNAPSHOTS_FOLDER"次のように置き換えます。
COMPOSER_1_ENVは、Managed Airflow(レガシー Gen 1)環境の名前に置き換えます。COMPOSER_1_LOCATIONは、Managed Airflow(レガシー Gen 1)環境が配置されているリージョン。COMPOSER_1_SNAPSHOTS_FOLDERは、Managed Airflow(レガシー Gen 1)環境のバケットの URI。このガイドでは、スナップショットを Managed Airflow(レガシー Gen 1)環境のバケットに保存しますが、必要に応じて別のロケーションを選択することもできます。カスタム ロケーションを指定する場合は、 両方の環境のサービス アカウントに、指定したロケーションに対する読み取りと書き込みの権限が必要です。
ステップ 3: Managed Airflow(Gen 2)環境を作成する
Managed Airflow(Gen 2)環境を作成します。想定されるリソース需要を満たす環境プリセットから始めて、後で環境をさらにスケーリングして最適化できます。
構成のオーバーライドと環境変数は、Managed Airflow(レガシー Gen 1)環境のスナップショットを読み込むときに後で置き換えるため、指定する必要はありません。
ステップ 4: スナップショットを Managed Airflow(Gen 2)環境に読み込む
コンソール
スナップショットを Managed Airflow(Gen 2)環境に読み込むには:
コンソールで、[**環境**] ページに移動します。 Google Cloud
環境のリストで、Managed Airflow(Gen 2)環境の名前をクリックします。[環境の詳細] ページが開きます。
[スナップショットを読み込む] をクリックします。
[スナップショットを読み込む] ダイアログで、[参照] をクリックします。
スナップショットのあるフォルダを選択します。このガイドのデフォルトの場所を使用する場合、このフォルダは
/snapshotsフォルダの Managed Airflow(レガシー Gen 1)環境バケットにあり、名前はスナップショット保存オペレーションのタイムスタンプになります。例:us-central1-example-916807e1-bucket/snapshots_example-project_us-central1_example-environment/2022-01-05T18-59-00[読み込む] をクリックし、Managed Airflow によってスナップショットが読み込まれるまで待ちます。
gcloud
マネージド Airflow(レガシー Gen 1)環境のスナップショットをマネージド Airflow(Gen 2)環境に読み込みます。
gcloud composer environments snapshots load \
COMPOSER_2_ENV \
--location COMPOSER_2_LOCATION \
--snapshot-path "SNAPSHOT_PATH"
次のように置き換えます。
COMPOSER_2_ENVは、Managed Airflow(Gen 2)環境の名前に置き換えます。COMPOSER_2_LOCATIONは、Managed Airflow(Gen 2)環境が配置されているリージョン。SNAPSHOT_PATHを、Managed Airflow(以前の Gen 1)環境のバケットの URI の後にスナップショットのパスを付加したものに置き換えます。例:gs://us-central1-example-916807e1-bucket/snapshots/example-project_us-central1_example-environment_2022-01-05T18-59-00
ステップ 5: Managed Airflow(第 2 世代)環境で DAG の一時停止を解除する
次のいずれかのオプションを使用できます。
Airflow ウェブ インターフェースで、[DAG] に移動し、すべての DAG の一時停止を手動で 1 つずつ解除します。
composer_dags スクリプトを使用して、すべての DAG の一時停止を解除します。
python3 composer_dags.py --environment COMPOSER_2_ENV \ --project PROJECT_ID \ --location COMPOSER_2_LOCATION \ --operation unpause次のように置き換えます。
COMPOSER_2_ENVは、Managed Airflow(Gen 2)環境の名前に置き換えます。PROJECT_IDは、プロジェクト ID に置き換えます。COMPOSER_2_LOCATIONは、環境が配置されているリージョン。
(Airflow バージョン 2.9.1 以降)多数の DAG の一時停止を解除するときに割り当てエラーが発生した場合は、次の Airflow CLI コマンドを使用して、すべての DAG を一括で一時停止を解除できます。
gcloud composer environments run COMPOSER_2_ENV dags unpause \ --project PROJECT_ID \ --location COMPOSER_2_LOCATION \ -- -y --treat-dag-id-as-regex ".*"(Airflow バージョン 2.9.1 より前)多数の DAG を一時停止解除する際に割り当てエラーが発生した場合は、Airflow REST API を使用して DAG を一時停止解除できます。Airflow ドキュメントの API を試すもご覧ください。
ステップ 6: DAG エラーを確認する
Airflow ウェブ インターフェースで、[DAG] に移動し、報告された DAG 構文エラーを確認します。
DAG 実行が正しい時間にスケジュール設定されていることを確認します。
マネージド Airflow(Gen 2)環境で DAG の実行が発生するのを待ち、成功したかどうかを確認します。DAG の実行が成功した場合は、Managed Airflow(レガシー Gen 1)環境で一時停止を解除しないでください。解除すると、DAG が Managed Airflow(レガシー Gen 1)の環境で同じ日時に実行されます。
特定の DAG の実行が失敗した場合は、Managed Airflow(Gen 2)で正常に実行されるまで DAG の トラブルシューティングを行います 。
ステップ 7: Managed Airflow(第 2 世代)環境をモニタリングする
すべての DAG と構成をマネージド Airflow(Gen 2)環境に転送した後、潜在的な問題、失敗した DAG 実行、環境全体の健全性をモニタリングします。
Managed Airflow(Gen 2)環境が、十分な時間、問題なく動作している場合は、Managed Airflow(以前の Gen 1)環境の削除を検討してください。