Airflow スケジューラに関する問題のトラブルシューティング

Managed Airflow(Gen 3) | Managed Airflow(Gen 2) | Managed Airflow(レガシー Gen 1)

このページでは、Airflow スケジューラと DAG プロセッサに関する一般的な問題のトラブルシューティング手順と情報を提供します。

問題の原因を特定する

トラブルシューティングを開始するには、問題が発生するタイミングを特定します。

  • DAG 解析時間(Airflow DAG プロセッサが DAG を解析している間)
  • 実行時間(Airflow スケジューラが DAG を処理している間)

解析時間と実行時間について詳しくは、 DAG 解析時間と DAG 実行時間の違いをご覧ください。

DAG 処理の問題を調査する

  1. DAG プロセッサのログを調べます
  2. DAG の解析時間を調べます

実行中のタスクとキューに入れられたタスクのモニタリング

キュー内でタスクが滞留しているかどうかを確認するには、次の手順に従います。

  1. コンソールで、[**環境**] ページに移動します。 Google Cloud

    [環境] に移動

  2. 環境のリストで、ご利用の環境の名前をクリックします。 [環境の詳細] ページが開きます。

  3. [Monitoring] タブに移動します。

  4. [モニタリング] タブで、[DAG 実行] セクションの [Airflow タスク] グラフを調べて、問題があるかどうか確認します。Airflow タスクは、Airflow でキューに格納された状態のタスクであり、Celery または Kubernetes Executor のブローカー キューに移動できます。Celery キューに入れられたタスクは、Celery ブローカーのキューに入れられたタスク インスタンスです。

DAG 解析時間の問題のトラブルシューティング

以下のセクションでは、DAG 解析時間の一般的な問題の症状と想定される修正方法について説明します。

タスクの数と時間の分布

Airflow では、多数の DAG やタスクを同時にスケジュール設定すると問題が発生する可能性があります。スケジューリングに関する問題を回避するには、次の操作を行います。

  • 少数の統合されたタスクを使用するように DAG を調整します。
  • DAG のスケジュール間隔を調整して、DAG の実行を時間的に均等に分散します。

Airflow 構成のスケーリング

Airflow には、Airflow が同時に実行できるタスクと DAG の数を制御する Airflow 構成オプションが用意されています。これらの構成オプションを設定するには、 値をオーバーライドします。これらの値の一部は、DAG レベルまたはタスクレベルで設定することもできます。

  • ワーカーの同時実行

    [celery]worker_concurrency パラメータは、Airflow ワーカーが同時に実行できるタスクの最大数を制御します。このパラメータの値に Managed Airflow 環境の Airflow ワーカー数を掛けると、環境内の特定の時点で実行できるタスクの最大数が取得されます。この数は、[core]parallelism Airflow 構成オプションによって制限されます。詳細は以下で説明します。

    Managed Airflow(Gen 3)環境では、[celery]worker_concurrency のデフォルト値は、ワーカーが処理できる軽量の同時タスク インスタンスの数に基づいて自動的に計算されます。つまり、その値はワーカーのリソース上限に依存します。ワーカーの同時実行の値は、環境内のワーカー数に依存しません。

  • 最大の有効な DAG 実行数

    [core]max_active_runs_per_dag Airflow 構成オプションは、DAG あたりのアクティブな DAG 実行の最大数を制御します。この上限に達すると、スケジューラは DAG 実行をそれ以上作成しなくなります。

    このパラメータが正しく設定されていない場合、スケジューラが DAG 実行を抑制するという問題が発生する可能性があります。これはスケジューラが特定の時点で DAG 実行インスタンスを作成できなくなるためです。

    この値は、max_active_runs パラメータを使用して DAG レベルで設定することもできます。

  • DAG あたりの最大アクティブ タスク数

    [core]max_active_tasks_per_dag Airflow 構成オプションは、各 DAG で同時に実行できるタスク インスタンスの最大数を制御します。

    このパラメータが正しく設定されていない場合、1 つの DAG インスタンスの実行速度が遅い問題が発生する可能性があります。これは、ある瞬間に実行できる DAG タスクの数には限りがあるからです。この場合は、この構成オプションの値を増やすことができます。

    この値は、max_active_tasks パラメータを使用して DAG レベルで設定することもできます。

    タスクレベルで max_active_tis_per_dag パラメータと max_active_tis_per_dagrun パラメータを使用すると、 特定のタスク ID を持つインスタンスが DAG ごとおよび DAG 実行ごとに実行できる数を制御できます。

  • 並列処理とプールサイズ

    [core]parallelism Airflow 構成オプションは、これらのタスクのすべての依存関係が満たされた後に、Airflow スケジューラが Executor のキュー内にキューに入れるタスクの数を制御します。

    これは、Airflow の設定全体に適用されるグローバル パラメータです。

    タスクはプール内でキューに保存され、実行されます。マネージド Airflow 環境では、1 つのプールのみが使用されます。このプールのサイズによって、特定の時間に実行するためにスケジューラによってキューに入れられるタスクの数が制御されます。プールサイズが小さすぎると、スケジューラは、[core]parallelism 構成オプションと Airflow ワーカーの数を掛けた値がまだ満たされていない [celery]worker_concurrency 構成オプションによって定義されたしきい値であっても、実行するタスクをキューに追加できません。

    プールサイズは Airflow UI ([管理] > [プール]) で構成できます。プールサイズを、環境で想定する並列処理のレベルに調整します。

    通常、[core]parallelism は、ワーカーの最大数 と [celery]worker_concurrency の積として設定されます。

実行中のタスクとキューに入れられたタスクに関する問題のトラブルシューティング

以降のセクションでは、実行中のタスクとキューに入れられたタスクの一般的な問題の症状と想定される修正方法について説明します。

DAG 実行が実行されない

症状:

DAG のスケジュール日付が動的に設定されると、さまざまな予期しない副作用が発生する可能性があります。次に例を示します。

  • DAG 実行が常に未来で、いつまでも実行されません。

  • 過去の DAG 実行が、実行済みで成功としてマークされますが、実行はされていません。

詳細については、Apache Airflow のドキュメントをご覧ください。

解決策の提示

  • Apache Airflow のドキュメントの推奨事項に従ってください。

  • DAG に静的 start_date を設定します。必要に応じて、catchup=False を使用して過去の日付の DAG の実行を無効にできます。

  • このアプローチの副作用を認識していない限り、datetime.now() または days_ago(<number of days>) の使用は避けてください。

Airflow スケジューラの TimeTable 機能の使用

時刻表は Airflow 2.2 以降で利用できます。

次のいずれかの方法で DAG の時刻表を定義できます。

組み込みの時刻表を使用することもできます

メンテナンスの時間枠でタスクのスケジューリングを回避する

環境のメンテナンスが DAG を実行する時間外に行われるように、環境のメンテナンスの時間枠を定義できます。一部のタスクの中断と再試行が許可されていれば、メンテナンスの時間枠で DAG を引き続き実行できます。メンテナンスの時間枠が環境に与える影響について詳しくは、 メンテナンスの時間枠を指定するをご覧ください。

DAG での「wait_for_downstream」の使用

DAG で wait_for_downstream パラメータを True に設定した場合、タスクが成功するには、このタスクのすぐにダウンストリーム にあるすべてのタスクも成功する必要があります。つまり、特定の DAG 実行に属するタスクの実行が、前の DAG 実行のタスクの実行によって遅延させられる可能性があります。詳しくは、 Airflow のドキュメントをご覧ください。

キューに長く入りすぎていたタスクがキャンセルされ、再スケジュールされます

Airflow タスクがキューに長時間保持されると、スケジューラは [scheduler]task_queued_timeout Airflow 構成オプションで設定された時間が経過した後、実行するためにもう一度再スケジュールします。デフォルト値は 2400 です。 2.3.1 より前の Airflow バージョンでも、タスクは失敗としてマークされ、適格な場合は再試行されます。

この状況の症状を確認する 1 つの方法は、キューに入れられたタスクの数を含むグラフ(Managed Airflow UI の [Monitoring] タブ)を確認し、このグラフのスパイクが約 2 時間以内に低下しない場合は、タスクが再スケジュールされる可能性が高く(ログなし)、スケジューラのログに「採用されたタスクは依然として保留中です」というログエントリが続きます。このような場合、タスクが実行されなかったことから、Airflow タスクログに「ログファイルが見つかりません...」というメッセージが表示されることがあります。

通常、この動作は想定されており、スケジュールされたタスクの次のインスタンスはスケジュールに従って実行されます。Managed Airflow 環境でこのようなケースが多数発生している場合は、スケジュールされたすべてのタスクを処理するのに十分な Airflow ワーカーが環境内にないことを意味している可能性があります。

解決策: この問題を解決するには、キューに入っているタスクを実行するための容量が Airflow ワーカーに常に確保されるようにします。たとえば、ワーカー数または worker_concurrency を増やすことができます。並列処理またはプールを調整して、容量を超えるタスクがキューに追加されないようにすることもできます。

キュー内で滞留しているタスクが特定の DAG の実行をブロックする可能性がある

この問題を解決するには、環境を Managed Airflow バージョン 2.1.12 以降にアップグレードします。

多くの場合、Airflow スケジューラは、キューにタスクがあり、なんらかの理由でこれらのタスクを正常に実行できない状況(これらのタスクが属する DAG が削除された場合など)に対応できます。

これらのタスクがスケジューラによって消去されない場合は、手動で削除する必要があります。これを行うには、Airflow UI で [メニュー] [>] [ブラウザ] [>] [タスク インスタンス] の順に移動し、キューに入れられたタスクを削除します。

min_file_process_interval パラメータに対する Managed Airflow のアプローチ

Managed Airflow では、Airflow スケジューラによる [scheduler]min_file_process_interval の使用方法が変更されます。

2.0.26 より前の Managed Airflow バージョンでは、[scheduler]min_file_process_interval は無視されます。

2.0.26 より後の Managed Airflow バージョンでは、次のようになります。

すべての DAG が特定の回数スケジュールされた後、Airflow スケジューラが再起動さます。[scheduler]num_runs パラメータは、スケジューラによって実行される回数を制御します。スケジューラが [scheduler]num_runs スケジューリング ループに達すると、再起動します。スケジューラはステートレス コンポーネントであり、このような再起動は、スケジューラで発生する可能性がある問題に対する自動修復メカニズムです。[scheduler]num_runs のデフォルト値は 5000 です。

[scheduler]min_file_process_interval を使用して、DAG 解析の実行頻度を構成できますが、このパラメータは、DAG のスケジューリング時にスケジューラが [scheduler]num_runs ループを実行するのに必要な時間より長く設定することはできません。

dagrun_timeout に達した後にタスクを失敗としてマークする

DAG の実行が dagrun_timeout(DAG パラメータ)内で完了しない場合、スケジューラは完了していない(実行中、スケジュールされた、キュー内の)タスクを 「失敗」としてマークします。

解決方法:

Airflow データベースの負荷が大きい場合の兆候

Airflow スケジューラのログに次のような警告ログエントリが表示される場合があります。

Scheduler heartbeat got an exception: (_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at 'reading initial communication packet', system error: 0")"

Airflow ワーカーのログでも同様の症状が確認される場合があります。

MySQL の場合:

(_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at
'reading initial communication packet', system error: 0")"

PostgreSQL の場合:

psycopg2.OperationalError: connection to server at ... failed

このようなエラーや警告は、Airflow データベースが、オープン接続の数や、スケジューラやワーカー、トリガー、ウェブサーバーなどの他の Airflow コンポーネントによって同時に実行されるクエリの数に圧倒されている症状である場合があります。

解決策の提示

ウェブサーバーに「スケジューラが実行されていないようです」という警告が表示される

スケジューラは、ハートビートを定期的に Airflow データベースに報告します。この情報に基づいて、Airflow ウェブサーバーはスケジューラがアクティブかどうかを判断します。

スケジューラに負荷がかかっている場合、 ハートビートを [scheduler]scheduler_heartbeat_secごとに報告できなくなることがあります。

このような場合、Airflow ウェブサーバーに次のような警告が表示されることがあります。

The scheduler does not appear to be running. Last heartbeat was received <X>
seconds ago.

解決策の提示

  • スケジューラのリソースの CPU とメモリを増やします。

  • DAG の解析とスケジュール設定が高速になってスケジューラのリソースを消費しすぎないように DAG を最適化します。

  • Airflow DAG でグローバル変数を使用しないようにします。代わりに、 環境変数Airflow 変数を使用します。

  • [scheduler]scheduler_health_check_threshold Airflow 構成オプションの値を大きくして、ウェブサーバーがスケジューラの使用不能を報告するまでの待機時間を長く取るようにします。

DAG のバックフィル中に発生した問題の回避策

すでに実行されている DAG の再実行が必要になることもあります。 これは、Airflow CLI コマンドを使用して次の方法で行うことができます。

gcloud composer environments run \
  ENVIRONMENT_NAME \
  --location LOCATION \
   dags backfill -- -B \
   -s START_DATE \
   -e END_DATE \
   DAG_NAME

特定の DAG で失敗したタスクのみを再実行するには、--rerun-failed-tasks 引数も使用します。

以下のように置き換えます。

  • ENVIRONMENT_NAME を環境の名前に置き換えます。
  • LOCATION は、環境が配置されているリージョン。
  • START_DATE は、start_date DAG パラメータの値(YYYY-MM-DD 形式)に置き換えます。
  • END_DATE は、end_date DAG パラメータの値(YYYY-MM-DD 形式)に置き換えます。
  • DAG_NAME は、DAG の名前に置き換えます。

バックフィルを実行すると、タスクがロックされているためにバックフィルができないデッドロックが発生することがあります。次に例を示します。

2022-11-08 21:24:18.198 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.201 CET -------- --------- -------- ------------
2022-11-08 21:24:18.202 CET 2022-11-08 21:24:18.203 CET These tasks are deadlocked:
2022-11-08 21:24:18.203 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.204 CET ----------------------- ----------- ----------------------------------- ------------
2022-11-08 21:24:18.204 CET <DAG name> <Task name> backfill__2022-10-27T00:00:00+00:00 1
2022-11-08 21:24:19.249 CET Command exited with return code 1
...
2022-11-08 21:24:19.348 CET Failed to execute job 627927 for task backfill

場合によっては、次の回避策を使用するとデッドロックを克服できます。

  • オーバーライドしてミニスケジューラを無効にします。[core]schedule_after_task_executionFalse

  • 期間を絞ってバックフィルを実行します。たとえば、START_DATEEND_DATE を設定して 1 日だけの期間を指定します。

次のステップ