Airflow DAG のテスト

Managed Airflow(Gen 3) | Managed Airflow(Gen 2) | Managed Airflow(レガシー Gen 1)

DAG を本番環境にデプロイする前に、 Airflow CLI サブコマンドを実行して 、DAG を実行するコンテキストと同じコンテキストで DAG コードを解析できます。

Composer ローカル開発 CLI ツールを使用して DAG をローカルでテストする

Composer ローカル開発 CLI ツールは、Airflow 環境をローカルで実行することにより、マネージド Airflow(Gen 2)向けの Apache Airflow DAG 開発を合理化します。このローカルの Airflow 環境では、Managed Airflow(Gen 2)の特定のバージョンのイメージが使用されます。

このローカル Airflow 環境を使用して DAG を開発してテストし、DAG をテスト用 Managed Airflow 環境に移行できます。このガイドの残りの部分では、テスト用 Managed Airflow 環境で DAG をテストします。

DAG 作成中にテストする

タスク インスタンスを個別にローカルで実行してログ出力を表示できます。 出力を表示できるので、構文エラーとタスクエラーを確認できます。 ローカルでのテストでは、データベースとの依存関係や通信状況はチェックされません。

DAG は、テスト環境の data/test フォルダに配置することをおすすめします。

テスト ディレクトリを作成する

環境のバケット内にテスト ディレクトリを作成し、そのディレクトリに DAG をコピーします。

gcloud storage cp BUCKET_NAME/dags \
  BUCKET_NAME/data/test --recursive

以下を置き換えます。

  • BUCKET_NAME: Managed Airflow 環境に関連付けられているバケットの名前。

例:

gcloud storage cp gs://us-central1-example-environment-a12bc345-bucket/dags \
  gs://us-central1-example-environment-a12bc345-bucket/data/test --recursive

DAG のアップロードの詳細については、DAG を追加および更新するをご覧ください。

構文エラーを確認する

/data/test フォルダにアップロードした DAG の構文エラーを確認するには、次の gcloud コマンドを入力します。

gcloud composer environments run \
  ENVIRONMENT_NAME \
  --location ENVIRONMENT_LOCATION \
  dags list -- --subdir /home/airflow/gcs/data/test

以下を置き換えます。

  • ENVIRONMENT_NAME: 環境の名前。
  • ENVIRONMENT_LOCATION: 環境が配置されているリージョン。

タスクエラーを確認する

/data/test フォルダにアップロードした DAG でタスク固有のエラーを確認するには、次の gcloud コマンドを実行します。

gcloud composer environments run \
  ENVIRONMENT_NAME \
  --location ENVIRONMENT_LOCATION \
  tasks test -- --subdir /home/airflow/gcs/data/test \
  DAG_ID TASK_ID \
  DAG_EXECUTION_DATE

以下を置き換えます。

  • ENVIRONMENT_NAME: 環境の名前。
  • ENVIRONMENT_LOCATION: 環境が配置されているリージョン。
  • DAG_ID: DAG の ID。
  • TASK_ID: タスクの ID。
  • DAG_EXECUTION_DATE: DAG の実行日。この日付はテンプレートとして使用されます。ここで指定した日付に関係なく、DAG はすぐに実行されます。

例:

gcloud composer environments run \
  example-environment \
  --location us-central1 \
  tasks test -- --subdir /home/airflow/gcs/data/test \
  hello_world print_date 2021-04-22

デプロイされた DAG の更新とテスト

テスト環境で DAG の更新をテストするには、次の手順を行います。

  1. 更新する対象のデプロイ済み DAG を data/test にコピーします。
  2. DAG を更新します。
  3. DAG をテストします。
    1. 構文エラーを確認します
    2. タスク固有のエラーを確認します
  4. DAG が正常に実行されることを確認してください。
  5. テスト環境で DAG をオフにします。
    1. [Airflow UI] > [DAG] ページに移動します。
    2. 変更中の DAG が常に実行されている場合は、DAG をオフにします。
    3. 未処理のタスクをすばやく実行するには、タスクをクリックしてから、Mark Success をクリックします。
  6. DAG を本番環境にデプロイします。
    1. 本番環境で DAG をオフにします。
    2. 本番環境の dags/ フォルダに、更新された DAG をアップロードします

DAG のテストに関するよくある質問

本番環境とテスト環境で DAG 実行を分離するにはどうすればよいですか?

たとえば、DAG のすべての実行で Airflow が共通して使用する dags/ フォルダに、ソースコードのグローバル リポジトリがあるとします。DAG の実行を妨げずに、本番環境またはテスト環境のソースコードを更新する必要があります。

Airflow では強力な DAG 分離は提供されていません。テスト環境 DAG が本番環境 DAG と干渉しないように、本番用とテスト用の Managed Airflow 環境を別々に保持することをおすすめします。

異なる GitHub ブランチから統合テストを実行するときに DAG の干渉を回避するにはどうすればよいですか?

干渉を防ぐには一意のタスク名を使用します。たとえば、タスク ID の前にブランチ名を付けます。

Airflow との統合テストのベスト プラクティスは何ですか?

Airflow との統合テストには専用の環境を使用することをおすすめします。DAG の実行成功を保証するために、Cloud Storage フォルダ内のファイルに書き込みを行った後、独自の統合テストケースに沿ってその内容を確認することができます。

他の DAG コントリビューターと効率的なコラボレーションをするにはどうすればよいですか?

開発のために、コントリビューターごとに data/ フォルダにサブディレクトリを用意できます。

data/ フォルダに追加された DAG は、Airflow スケジューラまたはウェブサーバーによって自動的に取得されません

手動の DAG 実行を作成するには、gcloud composer environments run コマンドと test サブコマンドを使用します。その際、コントリビューターの開発ディレクトリは --subdir フラグで指定します。

次に例を示します。

gcloud composer environments run test-environment-name \
  tasks test -- dag-id task-id execution-date \
  --subdir /home/airflow/gcs/data/alice_dev

デプロイ環境と本番環境を同期するにはどうすればよいですか?

アクセスを管理するには、次のようにします。

開発環境から本番環境にデプロイするには次のようにします。

  • 環境変数や PyPI パッケージなどの構成の整合性を確保します。

  • DAG 引数の整合性を確保します。ハードコードを避けるために、Airflow のマクロと変数を使用することをおすすめします。

    次に例を示します。

    gcloud composer environments run test-environment-name \
      variables set -- DATA_ENDPOINT_KEY DATA_ENDPOINT_VALUE
    

次のステップ