Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
このページでは、Airflow スケジューラと DAG プロセッサに関する一般的な問題に関する情報とトラブルシューティングの手順について説明します。
問題の原因を特定する
トラブルシューティングを開始するには、問題の発生が次のいずれであるかを特定します。
- DAG 解析時(Airflow DAG プロセッサによって DAG が解析されている間)
- 実行時(Airflow スケジューラで DAG が処理されている間)
解析時間と実行時間について詳しくは、DAG 解析時間と DAG 実行時間の違いをご覧ください。
DAG 処理の問題を調べる
実行中のタスクとキューに入れられたタスクのモニタリング
キュー内でタスクが滞留しているかどうかを確認するには、次の手順に従います。
Google Cloud コンソールで、[環境] ページに移動します。
環境のリストで、ご利用の環境の名前をクリックします。 [環境の詳細] ページが開きます。
[Monitoring] タブに移動します。
[モニタリング] タブで、[DAG 実行] セクションの [Airflow タスク] グラフを調べて、問題があるかどうか確認します。Airflow タスクは、Airflow でキューに格納された状態のタスクであり、Celery または Kubernetes Executor のブローカー キューに移動できます。Celery キューに入れられたタスクは、Celery ブローカーのキューに入れられたタスク インスタンスです。
DAG 解析時間の問題のトラブルシューティング
以下のセクションでは、DAG 解析時間の一般的な問題の症状と想定される修正方法について説明します。
タスクの数と時間の分布
Airflow では、多数の DAG やタスクを同時にスケジュール設定すると問題が発生する可能性があります。スケジューリングの問題を回避するには、次の操作を行います。
- より少数の統合されたタスクを使用するように DAG を調整します。
- DAG のスケジュール間隔を調整して、DAG 実行を時間経過とともに均等に分散します。
Airflow 構成のスケーリング
Airflow には、Airflow が同時に実行できるタスクと DAG の数を制御する Airflow 構成オプションが用意されています。これらの構成オプションを設定するには、環境に合わせて値をオーバーライドします。これらの値の一部は、DAG レベルまたはタスクレベルで設定することもできます。
-
[celery]worker_concurrencyパラメータは、Airflow ワーカーが同時に実行できるタスクの最大数を制御します。このパラメータの値に Cloud Composer 環境の Airflow ワーカー数を掛けると、環境内の特定の時点で実行できるタスクの最大数が取得されます。この数は、[core]parallelismAirflow 構成オプションによって制限されます。詳細は以下で説明します。Cloud Composer 3 環境では、ワーカーが対応できる軽量な同時実行タスク インスタンスの数に基づいて、
[celery]worker_concurrencyのデフォルト値が自動的に計算されます。つまり、この値はワーカー リソースの上限に依存します。ワーカーの同時実行の値は、環境内のワーカー数に依存しません。 -
[core]max_active_runs_per_dagAirflow 構成オプションは、DAG あたりのアクティブな DAG 実行の最大数を制御します。この上限に達すると、スケジューラは DAG 実行をそれ以上作成しなくなります。このパラメータが正しく設定されていない場合、スケジューラが DAG 実行を抑制するという問題が発生する可能性があります。これはスケジューラが特定の時点で DAG 実行インスタンスを作成できなくなるためです。
この値は、
max_active_runsパラメータを使用して DAG レベルで設定することもできます。 -
[core]max_active_tasks_per_dagAirflow 構成オプションは、各 DAG で同時に実行できるタスク インスタンスの最大数を制御します。このパラメータが正しく設定されていない場合、1 つの DAG インスタンスの実行速度が遅い問題が発生する可能性があります。これは、ある瞬間に実行できる DAG タスクの数には限りがあるからです。この場合は、この構成オプションの値を増やすことができます。
この値は、
max_active_tasksパラメータを使用して DAG レベルで設定することもできます。タスクレベルで
max_active_tis_per_dagパラメータとmax_active_tis_per_dagrunパラメータを使用すると、特定のタスク ID を持つインスタンスが DAG ごと、DAG 実行ごとに実行できる数を制御できます。 並列処理とプールサイズ
[core]parallelismAirflow 構成オプションは、これらのタスクのすべての依存関係が満たされた後に、Airflow スケジューラが Executor のキュー内にキューに入れるタスクの数を制御します。これは、Airflow の設定全体に適用されるグローバル パラメータです。
タスクはプール内でキューに保存され、実行されます。Cloud Composer 環境では、1 つのプールのみが使用されます。このプールのサイズによって、特定の時間に実行するためにスケジューラによってキューに入れられるタスクの数が制御されます。プールサイズが小さすぎると、スケジューラは、
[core]parallelism構成オプションと Airflow ワーカーの数を掛けた値がまだ満たされていない[celery]worker_concurrency構成オプションによって定義されたしきい値であっても、実行するタスクをキューに追加できません。プールサイズは Airflow UI([メニュー] > [管理] > [プール])で構成できます。プールサイズを、環境で想定する並列処理のレベルに調整します。
通常、
[core]parallelismは、ワーカーの最大数と[celery]worker_concurrencyの積が設定されます。
実行中のタスクとキューに入れられたタスクに関する問題のトラブルシューティング
以降のセクションでは、実行中のタスクとキューに入れられたタスクの一般的な問題の症状と想定される修正方法について説明します。
DAG 実行が実行されない
症状:
DAG のスケジュール日付が動的に設定されると、さまざまな予期しない副作用が発生する可能性があります。次に例を示します。
DAG 実行が常に未来で、いつまでも実行されません。
過去の DAG 実行が、実行済みで成功としてマークされますが、実行はされていません。
詳細については、Apache Airflow のドキュメントをご覧ください。
解決策の提示
Apache Airflow のドキュメントの推奨事項に従ってください。
DAG に静的
start_dateを設定します。必要に応じて、catchup=Falseを使用して過去の日付の DAG の実行を無効にできます。このアプローチの副作用を認識していない限り、
datetime.now()またはdays_ago(<number of days>)の使用は避けてください。
Airflow スケジューラの TimeTable 機能の使用
Cloud Composer 3 は、DAG に実装されたタイムテーブルなど、Airflow スケジューラ用のカスタム プラグインをサポートしていません。プラグインは、環境内のスケジューラと同期されません。
Cloud Composer 3 では、組み込みのタイムテーブルを引き続き使用できます。
メンテナンスの時間枠でタスクのスケジューリングを回避する
環境のメンテナンスの時間枠を定義して、DAG の実行時間外に環境のメンテナンスが行われるようにすることができます。一部のタスクの中断と再試行が許可されていれば、メンテナンスの時間枠で DAG を引き続き実行できます。メンテナンス時間枠が環境に与える影響について詳しくは、メンテナンス時間枠を指定するをご覧ください。
DAG での「wait_for_downstream」の使用
DAG で wait_for_downstream パラメータを True に設定した場合、タスクが成功するには、このタスクのすぐにダウンストリームにあるすべてのタスクも成功する必要があります。つまり、特定の DAG 実行に属するタスクの実行が、前の DAG 実行のタスクの実行によって遅延させられる可能性があります。詳しくは、Airflow のドキュメントをご覧ください。
キューに長く入りすぎていたタスクがキャンセルされ、再スケジュールされます
Airflow タスクがキューに長時間保持されると、スケジューラは [scheduler]task_queued_timeout Airflow 構成オプションで設定された時間が経過した後に、実行のためにもう一度再スケジュールします。デフォルト値は 2400 です。
この状況の症状を監視する 1 つの方法は、キューに入れられたタスクの数をグラフで確認することです(Cloud Composer UI の [モニタリング] タブ)。このグラフのスパイクが約 2 時間以内に低下しない場合は、タスクが再スケジュールされ(ログなし)、スケジューラ ログに「Adopted tasks were still pending ...」というログエントリが記録される可能性が高くなります。このような場合、タスクが実行されなかったことから、Airflow タスクログに「ログファイルが見つかりません...」というメッセージが表示されることがあります。
通常、この動作は想定されており、スケジュールされたタスクの次のインスタンスはスケジュールに従って実行されます。Cloud Composer 環境でこのようなケースが多数発生している場合は、スケジュールされたすべてのタスクを処理するのに十分な Airflow ワーカーが環境内にないことを意味している可能性があります。
解決策: この問題を解決するには、キューに入っているタスクを実行するための容量が Airflow ワーカーに常に確保されるようにします。たとえば、ワーカー数または worker_concurrency を増やすことができます。並列処理またはプールを調整して、容量を超えるタスクがキューに追加されないようにすることもできます。
min_file_process_interval パラメータに対する Cloud Composer のアプローチ
Cloud Composer では、Airflow スケジューラによる [scheduler]min_file_process_interval の使用方法が変更されます。
すべての DAG が特定の回数スケジュールされた後、Airflow スケジューラが再起動されます。[scheduler]num_runs パラメータは、スケジューラによって実行される回数を制御します。スケジューラが [scheduler]num_runs スケジューリング ループに達すると、再起動します。スケジューラはステートレス コンポーネントであり、このような再起動は、スケジューラで発生する可能性がある問題に対する自動修復メカニズムです。[scheduler]num_runs のデフォルト値は 5000 です。
[scheduler]min_file_process_interval を使用して、DAG 解析の実行頻度を構成できますが、このパラメータは、DAG のスケジューリング時にスケジューラが [scheduler]num_runs ループを実行するのに必要な時間より長く設定することはできません。
dagrun_timeout に達した後にタスクを失敗としてマークする
DAG の実行が dagrun_timeout(DAG パラメータ)内で完了しない場合、スケジューラは完了していない(実行中、スケジュールされた、キュー内の)タスクを「失敗」としてマークします。
解決方法:
タイムアウトを満たすように
dagrun_timeoutを延長します。ワーカー数を増やすかワーカー パフォーマンス パラメータを増加させて、DAG が迅速に実行されるようにします。
Airflow データベースの負荷が大きい場合の兆候
Airflow ワーカーのログに次のような警告ログエントリが表示されることがあります。
psycopg2.OperationalError: connection to server at ... failed
このようなエラーや警告は、Airflow データベースが、オープン接続の数や、スケジューラやワーカー、トリガー、ウェブサーバーなどの他の Airflow コンポーネントによって同時に実行されるクエリの数に圧倒されている症状である場合があります。
解決策の提示
環境のサイズを調整して、Airflow データベースをスケールアップします。
スケジューラと DAG プロセッサの数を減らします。1 つのスケジューラと 1 つの DAG プロセッサから始め、これらのコンポーネントがリソース上限に近づいていることがわかったら、スケジューラまたは DAG プロセッサの数を増やします。
Airflow DAG でグローバル変数を使用しないようにします。代わりに、環境変数と Airflow 変数を使用します。
[scheduler]scheduler_heartbeat_secを高い値(15 秒以上など)に設定します。[scheduler]job_heartbeat_secを高い値(30 秒以上など)に設定します。[scheduler]scheduler_health_check_thresholdを、[scheduler]job_heartbeat_secに4を掛けた値に設定します。
ウェブサーバーに「スケジューラが実行されていないようです」という警告が表示される
スケジューラは、ハートビートを定期的に Airflow データベースに報告します。この情報に基づいて、Airflow ウェブサーバーはスケジューラがアクティブかどうかを判断します。
スケジューラに負荷がかかっている場合、[scheduler]scheduler_heartbeat_sec ごとにハートビートを報告できなくなることがあります。
このような場合、Airflow ウェブサーバーに次のような警告が表示されることがあります。
The scheduler does not appear to be running. Last heartbeat was received <X>
seconds ago.
解決策の提示
スケジューラのリソースの CPU とメモリを増やします。
DAG の解析とスケジュール設定が高速になってスケジューラのリソースを消費しすぎないように DAG を最適化します。
Airflow DAG でグローバル変数を使用しないようにします。代わりに、環境変数と Airflow 変数を使用します。
[scheduler]scheduler_health_check_thresholdAirflow 構成オプションの値を大きくして、ウェブサーバーがスケジューラの使用不能を報告するまでの待機時間を長く取るようにします。
DAG のバックフィル中に発生した問題の回避策
すでに実行されている DAG の再実行が必要になることもあります。これは、Airflow CLI コマンドを使用して次の方法で行うことができます。
gcloud composer environments run \
ENVIRONMENT_NAME \
--location LOCATION \
dags backfill -- -B \
-s START_DATE \
-e END_DATE \
DAG_NAME
特定の DAG で失敗したタスクのみを再実行するには、--rerun-failed-tasks 引数も使用します。
以下のように置き換えます。
ENVIRONMENT_NAMEを環境の名前に置き換えます。LOCATIONは、環境が配置されているリージョン。START_DATEは、start_dateDAG パラメータの値(YYYY-MM-DD形式)に置き換えます。END_DATEは、end_dateDAG パラメータの値(YYYY-MM-DD形式)に置き換えます。DAG_NAMEは、DAG の名前に置き換えます。
バックフィルを実行すると、タスクがロックされているためにバックフィルができないデッドロックが発生することがあります。次に例を示します。
2022-11-08 21:24:18.198 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.201 CET -------- --------- -------- ------------
2022-11-08 21:24:18.202 CET 2022-11-08 21:24:18.203 CET These tasks are deadlocked:
2022-11-08 21:24:18.203 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.204 CET ----------------------- ----------- ----------------------------------- ------------
2022-11-08 21:24:18.204 CET <DAG name> <Task name> backfill__2022-10-27T00:00:00+00:00 1
2022-11-08 21:24:19.249 CET Command exited with return code 1
...
2022-11-08 21:24:19.348 CET Failed to execute job 627927 for task backfill
場合によっては、次の回避策を使用するとデッドロックを克服できます。
[core]schedule_after_task_executionをFalseにオーバーライドして、ミニスケジューラを無効にします。期間を絞ってバックフィルを実行します。たとえば、
START_DATEとEND_DATEを設定して 1 日だけの期間を指定します。