Airflow スケジューラに関する問題のトラブルシューティング

Managed Airflow(第 3 世代) | Managed Airflow(第 2 世代) | Managed Airflow(レガシー第 1 世代)

このページでは、Airflow スケジューラと DAG プロセッサに関する一般的な問題のトラブルシューティング手順と情報を提供します。

問題の原因を特定する

トラブルシューティングを開始するには、問題が発生するタイミングを特定します。

  • DAG 解析時間(Airflow DAG プロセッサが DAG を解析している間)
  • 実行時間(Airflow スケジューラが DAG を処理している間)

解析時間と実行時間について詳しくは、 DAG 解析時間と DAG 実行時間の違いをご覧ください。

DAG 処理の問題を調査する

  1. DAG プロセッサのログを調べます
  2. DAG の解析時間を確認します

実行中のタスクとキューに入れられたタスクのモニタリング

キュー内でタスクが滞留しているかどうかを確認するには、次の手順に従います。

  1. コンソールで、[**環境**] ページに移動します。 Google Cloud

    [環境] に移動

  2. 環境のリストで、ご利用の環境の名前をクリックします。 [環境の詳細] ページが開きます。

  3. [Monitoring] タブに移動します。

  4. [モニタリング] タブで、[DAG 実行] セクションの [Airflow タスク] グラフを調べて、問題があるかどうか確認します。Airflow タスクは、Airflow でキューに格納された状態のタスクであり、Celery または Kubernetes Executor のブローカー キューに移動できます。Celery キューに入れられたタスクは、Celery ブローカーのキューに入れられたタスク インスタンスです。

DAG 解析時間の問題のトラブルシューティング

以下のセクションでは、DAG 解析時間の一般的な問題の症状と想定される修正方法について説明します。

タスクの数と時間の分布

Airflow では、多数の DAG またはタスクを同時にスケジュール設定すると問題が発生する可能性があります。スケジューリングの問題を回避するには、次の操作を行います。

  • より統合された少数のタスクを使用するように DAG を調整します。
  • DAG のスケジュール間隔を調整して、DAG 実行を時間的に均等に分散します。

Airflow 構成のスケーリング

Airflow には、Airflow が同時に実行できるタスクと DAG の数を制御する Airflow 構成オプションが用意されています。これらの構成オプションを設定するには、 オーバーライド環境の値をオーバーライドします。これらの値の一部は、DAG レベルまたはタスクレベルで設定することもできます。

  • ワーカーの同時実行

    [celery]worker_concurrency パラメータは、Airflow ワーカーが同時に実行できるタスクの最大数を制御します。このパラメータの値を Managed Airflow 環境内の Airflow ワーカーの数で乗算すると、環境内で特定の時点で実行できるタスクの最大数が得られます。この数は、[core]parallelism Airflow 構成オプションによって制限されます。詳細は以下で説明します。

    Managed Airflow(第 3 世代)環境では、[celery]worker_concurrency のデフォルト値は、ワーカーが処理できる軽量の同時タスク インスタンスの数に基づいて自動的に計算されます。つまり、その値はワーカーのリソース上限に依存します。ワーカーの同時実行の値は、環境内のワーカー数に依存しません。

  • 最大の有効な DAG 実行数

    [core]max_active_runs_per_dag Airflow 構成オプションは、DAG あたりのアクティブな DAG 実行の最大数を制御します。この上限に達すると、スケジューラは DAG 実行をそれ以上作成しなくなります。

    このパラメータが正しく設定されていない場合、スケジューラが DAG 実行を抑制するという問題が発生する可能性があります。これはスケジューラが特定の時点で DAG 実行インスタンスを作成できなくなるためです。

    この値は、max_active_runs パラメータを使用して DAG レベルで設定することもできます。

  • DAG あたりの最大アクティブ タスク数

    [core]max_active_tasks_per_dag Airflow 構成オプションは、各 DAG で同時に実行できるタスク インスタンスの最大数を制御します。

    このパラメータが正しく設定されていない場合、特定の時点で実行できる DAG タスクの数が限られているため、単一の DAG インスタンスの実行が遅くなるという問題が発生する可能性があります。この場合は、この構成オプションの値を大きくします。

    この値は、max_active_tasks パラメータを使用して DAG レベルで設定することもできます。

    タスクレベルで max_active_tis_per_dag パラメータと max_active_tis_per_dagrun パラメータを使用して、 特定のタスク ID を持つインスタンスが DAG ごとに実行できる数と DAG 実行ごとに実行できる数を制御できます。

  • 並列処理とプールサイズ

    [core]parallelism Airflow 構成オプションは、これらのタスクのすべての依存関係が満たされた後に、Airflow スケジューラが Executor のキュー内にキューに入れるタスクの数を制御します。

    これは、Airflow の設定全体に適用されるグローバル パラメータです。

    タスクはプール内でキューに保存され、実行されます。マネージド Airflow 環境では、1 つのプールのみが使用されます。このプールのサイズによって、特定の時間に実行するためにスケジューラによってキューに入れられるタスクの数が制御されます。プールサイズが小さすぎると、スケジューラは、[core]parallelism 構成オプションと Airflow ワーカーの数を掛けた値がまだ満たされていない [celery]worker_concurrency 構成オプションによって定義されたしきい値であっても、実行するタスクをキューに追加できません。

    プールサイズは Airflow UI ([管理] > [プール]) で構成できます。プールサイズを、環境で想定する並列処理のレベルに調整します。

    通常、[core]parallelism は、ワーカーの最大数と [celery]worker_concurrencyの積に設定されます。

実行中のタスクとキューに入れられたタスクに関する問題のトラブルシューティング

以降のセクションでは、実行中のタスクとキューに入れられたタスクの一般的な問題の症状と想定される修正方法について説明します。

DAG 実行が実行されない

症状:

DAG のスケジュール日付が動的に設定されると、さまざまな予期しない副作用が発生する可能性があります。次に例を示します。

  • DAG 実行が常に未来で、いつまでも実行されません。

  • 過去の DAG 実行が、実行済みで成功としてマークされますが、実行はされていません。

詳細については、Apache Airflow のドキュメントをご覧ください。

解決策の提示

  • Apache Airflow のドキュメントの推奨事項に従ってください。

  • DAG に静的 start_date を設定します。必要に応じて、catchup=False を使用して過去の日付の DAG の実行を無効にできます。

  • このアプローチの副作用を認識していない限り、datetime.now() または days_ago(<number of days>) の使用は避けてください。

Airflow スケジューラの TimeTable 機能の使用

Managed Airflow(第 3 世代)では、DAG に実装されたタイムテーブルなど、Airflow スケジューラのカスタム プラグインはサポートされていません。プラグインは環境内のスケジューラと同期されません。

マネージド Airflow(第 3 世代)では、組み込みのタイムテーブルを引き続き使用できます。

メンテナンスの時間枠でタスクのスケジューリングを回避する

環境のメンテナンスの時間枠を定義して、DAG を実行する時間帯以外に環境のメンテナンスが行われるようにすることができます。一部のタスクの中断と再試行が許可されていれば、メンテナンスの時間枠で DAG を引き続き実行できます。メンテナンスの時間枠が環境に与える影響について詳しくは、 メンテナンスの時間枠を指定するをご覧ください。

DAG での「wait_for_downstream」の使用

DAG で wait_for_downstream パラメータを True に設定した場合、タスクが成功するには、このタスクのすぐにダウンストリーム にあるすべてのタスクも成功する必要があります。つまり、特定の DAG 実行に属するタスクの実行が、前の DAG 実行のタスクの実行によって遅くなる可能性があります。詳しくは、 Airflow のドキュメントをご覧ください。

キューに長く入りすぎていたタスクがキャンセルされ、再スケジュールされます

Airflow タスクがキューに長時間保持されると、スケジューラは、[scheduler]task_queued_timeout Airflow 構成オプションで設定された時間が経過した後に、実行のために再度スケジュールします。デフォルト値は 2400 です。

この状況の症状を確認する 1 つの方法は、キューに入れられたタスクの数を含むグラフ(Managed Airflow UI の [Monitoring] タブ)を確認し、このグラフのスパイクが約 2 時間以内に低下しない場合は、タスクが再スケジュールされる可能性が高く(ログなし)、スケジューラのログに「採用されたタスクは依然として保留中です」というログエントリが続きます。このような場合、タスクが実行されなかったことから、Airflow タスクログに「ログファイルが見つかりません...」というメッセージが表示されることがあります。

通常、この動作は想定されており、スケジュールされたタスクの次のインスタンスはスケジュールに従って実行されます。Managed Airflow 環境でこのようなケースが多数発生する場合は、スケジュールされたすべてのタスクを処理するのに十分な Airflow ワーカーが環境にない可能性があります。

解決策: この問題を解決するには、キューに入っているタスクを実行するための容量が Airflow ワーカーに常に確保されるようにします。たとえば、ワーカー数または worker_concurrency を増やすことができます。並列処理またはプールを調整して、容量を超えるタスクがキューに追加されないようにすることもできます。

min_file_process_interval パラメータに対する Managed Airflow のアプローチ

Managed Airflow では、Airflow スケジューラによる [scheduler]min_file_process_interval の使用方法が変更されます。

すべての DAG がスケジュールされると、Airflow スケジューラが再起動されます。[scheduler]num_runs パラメータ は、スケジューラが実行する回数を制御します。スケジューラが [scheduler]num_runs スケジューリング ループに達すると、再起動されます。スケジューラはステートレス コンポーネントであり、このような再起動は、スケジューラで発生する可能性のある問題に対する自動修復メカニズムです。[scheduler]num_runs のデフォルト値は 5000 です。

[scheduler]min_file_process_interval を使用して DAG 解析の頻度を構成できますが、このパラメータは、DAG のスケジュール時にスケジューラが [scheduler]num_runs ループを実行するのに必要な時間よりも長くすることはできません。

dagrun_timeout に達した後にタスクを失敗としてマークする

DAG の実行が dagrun_timeout(DAG パラメータ)内で完了しない場合、スケジューラは完了していない(実行中、スケジュールされた、キュー内の)タスクを 「失敗」としてマークします。

解決方法:

Airflow データベースの負荷が大きい場合の兆候

Airflow ワーカーのログに、次のような警告ログエントリが表示されることがあります。

psycopg2.OperationalError: connection to server at ... failed

このようなエラーや警告は、Airflow データベースが、オープン接続の数や、スケジューラやワーカー、トリガー、ウェブサーバーなどの他の Airflow コンポーネントによって同時に実行されるクエリの数に圧倒されている症状である場合があります。

解決策の提示

ウェブサーバーに「スケジューラが実行されていないようです」という警告が表示される

スケジューラは、ハートビートを定期的に Airflow データベースに報告します。この情報に基づいて、Airflow ウェブサーバーはスケジューラがアクティブかどうかを判断します。

スケジューラに負荷がかかっている場合、 ハートビートを 報告できなくなることがあります[scheduler]scheduler_heartbeat_sec

このような場合、Airflow ウェブサーバーに次のような警告が表示されることがあります。

The scheduler does not appear to be running. Last heartbeat was received <X>
seconds ago.

解決策の提示

  • スケジューラのリソースの CPU とメモリを増やします。

  • DAG の解析とスケジューリングが高速になり、スケジューラ リソースを過剰に消費しないように DAG を最適化します。

  • Airflow DAG でグローバル変数を使用しないでください。代わりに、 環境変数Airflow 変数を使用します。

  • ウェブサーバーがスケジューラの利用不可を報告するまでの待機時間が長くなるように、 [scheduler]scheduler_health_check_threshold Airflow 構成オプションの値を大きくします。

DAG のバックフィル中に発生した問題の回避策

すでに実行されている DAG の再実行が必要になることもあります。 これは、Airflow CLI コマンドを使用して次の方法で行うことができます。

gcloud composer environments run \
  ENVIRONMENT_NAME \
  --location LOCATION \
   dags backfill -- -B \
   -s START_DATE \
   -e END_DATE \
   DAG_NAME

特定の DAG で失敗したタスクのみを再実行するには、--rerun-failed-tasks 引数も使用します。

以下のように置き換えます。

  • ENVIRONMENT_NAME を環境の名前に置き換えます。
  • LOCATION は、環境が配置されているリージョン。
  • START_DATE は、start_date DAG パラメータの値(YYYY-MM-DD 形式)に置き換えます。
  • END_DATE は、end_date DAG パラメータの値(YYYY-MM-DD 形式)に置き換えます。
  • DAG_NAME は、DAG の名前に置き換えます。

バックフィルを実行すると、タスクがロックされているためにバックフィルができないデッドロックが発生することがあります。次に例を示します。

2022-11-08 21:24:18.198 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.201 CET -------- --------- -------- ------------
2022-11-08 21:24:18.202 CET 2022-11-08 21:24:18.203 CET These tasks are deadlocked:
2022-11-08 21:24:18.203 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.204 CET ----------------------- ----------- ----------------------------------- ------------
2022-11-08 21:24:18.204 CET <DAG name> <Task name> backfill__2022-10-27T00:00:00+00:00 1
2022-11-08 21:24:19.249 CET Command exited with return code 1
...
2022-11-08 21:24:19.348 CET Failed to execute job 627927 for task backfill

場合によっては、次の回避策を使用するとデッドロックを克服できます。

  • オーバーライドしてミニスケジューラを無効にします。[core]schedule_after_task_executionFalse

  • 期間を絞ってバックフィルを実行します。たとえば、START_DATEEND_DATE を設定して 1 日だけの期間を指定します。

次のステップ