Managed Airflow(第 3 代) | Managed Airflow(第 2 代) | Managed Airflow(旧版第 1 代)
本页面介绍 Airflow 调度器和 DAG 处理器常见问题和问题排查步骤和信息。
确定问题的来源
如需开始问题排查,请确定问题是否发生在以下时间:
- DAG 解析时,Airflow DAG 处理器解析 DAG 时
- 执行时,Airflow 调度器处理 DAG 时
如需详细了解解析时间和执行时间,请参阅 DAG 解析时间与 DAG 执行时间之间的差异。
检查 DAG 处理问题
监控正在运行和已加入队列的任务
如需检查是否有任务卡在队列中,请执行以下步骤。
在 Google Cloud 控制台中,前往环境 页面。
在环境列表中,点击您的环境名称。 环境详情 页面会打开。
转到监控 标签页。
在监控 标签页中,查看 Airflow 任务 图表 在 DAG 运行 部分中,以确定潜在问题。Airflow 任务 是指在 Airflow 中处于排队状态的任务,它们可以进入 Celery 或 Kubernetes 执行程序代理队列。“Celery 队列中的任务数”是指已进入 Celery 代理队列的任务 实例数。
DAG 解析时排查问题
以下部分介绍了 DAG 解析时一些常见问题的症状和可能的修复。
任务的数量和时间分布
如果 Airflow 同时调度大量 DAG 或任务,可能会出现问题。为避免调度问题,您可以:
- 调整 DAG,以使用较少的整合任务。
- 调整 DAG 的调度间隔,以便更均匀地分布 DAG 运行。
扩缩 Airflow 配置
Airflow 提供了 Airflow 配置选项,可用于控制 Airflow 可以同时执行的任务数量和 DAG 数量。如需设置这些配置选项, 请替换环境的值。您还可以在 DAG 或任务级层设置其中一些值。
-
[celery]worker_concurrency参数控制 Airflow 工作器可以同时执行的最大任务数。如果将此参数的值乘以您的 Managed Airflow 环境中的 Airflow 工作器的数量,将得出在您的环境中,在给定的时刻可执行的最大任务数。此数字受[core]parallelismAirflow 配置选项的限制,之后将进一步说明此内容。在 Managed Airflow(第 3 代)环境中,
[celery]worker_concurrency的默认值会自动计算,具体取决于工作器可以容纳的轻量级并发任务实例数。这意味着其值取决于工作器资源限制。工作器并发值取决于环境中的工作器数量。 -
[core]max_active_runs_per_dagAirflow 配置选项控制每个 DAG 的活跃 DAG 运行次数的上限。如果次数达到上限,则调度器不会继续创建更多 DAG 运行。如果此参数设置不正确,您可能会遇到调度器限制 DAG 执行的问题,因为它无法在给定时刻创建更多 DAG 运行实例。
您还可以使用
max_active_runs参数在 DAG 级层设置此值。 -
[core]max_active_tasks_per_dagAirflow 配置选项可控制每个 DAG 中可以并发运行的任务实例数上限。如果此参数设置不正确,您可能会遇到单个 DAG 实例执行缓慢的问题,因为在给定时间可执行的 DAG 任务数有限。在这种情况下,您可以增加此配置选项的值。
您还可以使用
max_active_tasks参数在 DAG 级层设置此值。您可以在任务级层使用
max_active_tis_per_dag和max_active_tis_per_dagrun参数来控制每个 DAG 和每个 DAG 运行中允许运行的具有 特定任务 ID 的实例数。 最大并行数量和池大小
[core]parallelismAirflow 配置选项可控制在满足任务的所有依赖项后,Airflow 调度器可将多少任务添加到执行程序的队列中。这是整个 Airflow 设置的全局参数。
任务会排入队列并在池中执行。Managed Airflow 环境仅使用一个池。此池的大小可控制调度器在给定时间可将多少任务加入执行队列。如果池太小,则即使尚未达到阈值(
[core]parallelism配置选项和[celery]worker_concurrency配置选项之和乘以 Airflow 工作器数量所得的值),相应调度器也无法将任务加入执行队列。您可以在 Airflow 界面中配置池大小 (管理员 > 池)。将池大小调整为您的环境中预期的最大并行数量。
通常,
[core]parallelism设置为工作器数量上限 与[celery]worker_concurrency的乘积。
在正在运行和已加入队列的任务中排查问题
以下部分介绍正在运行和已加入队列的任务的一些常见问题的症状和可能的修复。
DAG 运行未执行
具体情况:
当 DAG 的调度日期是动态设置时,可能会导致各种意外的副作用。例如:
DAG 执行始终在未来,并且 DAG 从未执行。
过去的 DAG 运行被标记为已执行且成功,但实际上并未执行。
如需了解详情,请参阅 Apache Airflow 文档。
可能提供的解决方案:
遵循 Apache Airflow 文档中的建议。
为 DAG 设置静态
start_date。或者,您可以使用catchup=False停用针对过去日期的 DAG 运行。除非您了解此方法的副作用,否则请避免使用
datetime.now()或days_ago(<number of days>)。
使用 Airflow 调度器的 TimeTable 功能
Managed Airflow(第 3 代)不支持 Airflow 调度器的自定义插件,包括在 DAG 中实现的时间表。插件不会同步到环境中的调度器。
您仍然可以在内置时间表中 Managed Airflow(第 3 代)。
避免在维护窗口内进行任务调度
您可以为环境定义维护窗口,以便在您运行 DAG 之外的时间进行环境维护。您仍然可以在维护窗口期间运行 DAG,只要可以接受某些任务可能会中断并重试即可。如需详细了解 维护窗口对环境的影响,请参阅 指定维护窗口。
在 DAG 中使用“wait_for_downstream”
如果您将 DAG 中的 wait_for_downstream 参数设置为 True,那么如果您希望某个任务成功,则该任务的所有直接下行 任务都必须成功。这意味着,由于执行之前的 DAG 运行中的任务,属于特定 DAG 运行的任务的执行可能会变慢。如需了解详情,请参阅
Airflow 文档。
排队时间过长的任务将被取消并重新安排
如果 Airflow 任务在队列中保留的时间过长,则调度器会在 [scheduler]task_queued_timeout Airflow 配置选项中设置的时间过后,再次重新安排它以执行。默认值为 2400。
观察此情况的问题的一种方法是查看包含已加入队列的任务数的图表(Managed Airflow 界面中的“监控”标签页),如果此图表中的高峰在大约两小时内未下降,则任务很可能会被重新安排(无日志),后跟调度器日志中“Adopted tasks were still pending ...”日志条目。在这种情况下,您可能会在 Airflow 任务日志中看到“Log file not found...”消息,因为该任务未执行。
一般来说,预期此行为,并且计划任务的下一个实例应该根据时间表执行。如果您在 Managed Airflow 环境中观察到许多此类情况,则可能表示您的环境中没有足够的 Airflow 工作器来处理所有计划任务。
解决方法:如需解决此问题,您需要确保 Airflow 工作器始终具有容量来运行已加入队列的任务。例如,您可以增加工作器数量或提高工作器并发 (worker_concurrency) 值。您还可以调整并行性或池,以防止队列任务超出您具有的容量。
Managed Airflow 方法 min_file_process_interval 参数
Managed Airflow 更改了 Airflow 调度器使用
[scheduler]min_file_process_interval
的方式。
在所有 DAG 都被调度一定次数后,Airflow 调度器会重启,并且 [scheduler]num_runs 参数控制调度器执行此操作的次数。当调度器达到 [scheduler]num_runs 调度循环时,它会重启。调度器是一个无状态组件,此类重启是调度器可能遇到的任何问题的自动修复机制。[scheduler]num_runs 的默认值为 5000。
[scheduler]min_file_process_interval 可用于配置 DAG 解析的频率,但此参数不能长于调度器在调度 DAG 时执行 [scheduler]num_runs 循环所需的时间。
在达到 dagrun_timeout 后将任务标记为失败
如果 DAG 运行未在
dagrun_timeout(DAG 参数)内完成,则调度器会将未完成(正在运行、已调度和已加入队列)
的任务标记为失败。
解决方案:
延长
dagrun_timeout以满足超时。
Airflow 数据库负载过重的症状
有时,您可能会在 Airflow 工作器日志中看到以下警告日志条目:
psycopg2.OperationalError: connection to server at ... failed
此类错误或警告可能是 Airflow 数据库因打开的连接数或在同一时间执行的查询数过多而过载的症状,这些连接或查询可能是由调度器或其他 Airflow 组件(如工作器、触发器和网络服务器)执行的。
可能提供的解决方案:
减少调度器和 DAG 处理器的数量。从一个调度器和一个 DAG 处理器开始,然后如果您发现这些组件接近其资源限制,则增加调度器或 DAG 处理器的数量。
避免在 Airflow DAG 中使用全局变量。请改用 环境变量和 Airflow 变量。
将
[scheduler]scheduler_heartbeat_sec设置为更高的值,例如 15 秒或更长时间。将
[scheduler]job_heartbeat_sec设置为更高的值,例如 30 秒或更长时间。将
[scheduler]scheduler_health_check_threshold设置为等于[scheduler]job_heartbeat_sec乘以4的值。
网络服务器显示“调度器似乎未运行”警告
调度器会定期向 Airflow 数据库报告其检测信号。根据此信息,Airflow Web 服务器会确定调度器是否处于活跃状态。
有时,如果调度器负载过重,则可能无法
每隔
[scheduler]scheduler_heartbeat_sec报告其检测信号。
在这种情况下,Airflow Web 服务器可能会显示以下警告:
The scheduler does not appear to be running. Last heartbeat was received <X>
seconds ago.
可能提供的解决方案:
增加调度器的 CPU 和内存资源。
优化 DAG,使其解析和调度速度更快,并且不会消耗过多的调度器资源。
避免在 Airflow DAG 中使用全局变量。请改用 环境变量和 Airflow 变量。
增加
[scheduler]scheduler_health_check_thresholdAirflow 配置选项的值,以便 Web 服务器等待更长时间,然后再 报告调度器不可用。
针对回填 DAG 期间遇到的问题的解决方法
有时,您可能需要重新运行已执行的 DAG。 您可以使用 Airflow CLI 命令执行此操作,如下所示:
gcloud composer environments run \
ENVIRONMENT_NAME \
--location LOCATION \
dags backfill -- -B \
-s START_DATE \
-e END_DATE \
DAG_NAME
如需仅重新运行特定 DAG 的失败任务,请同时使用 --rerun-failed-tasks 实参。
替换:
ENVIRONMENT_NAME替换为环境的名称。LOCATION替换为环境所在的区域。START_DATE替换为start_dateDAG 参数的值,格式为YYYY-MM-DD。END_DATE替换为end_dateDAG 参数的值,格式为YYYY-MM-DD。DAG_NAME替换为 DAG 名称。
回填操作有时可能会产生死锁情况,即由于任务被锁定而无法进行回填。例如:
2022-11-08 21:24:18.198 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.201 CET -------- --------- -------- ------------
2022-11-08 21:24:18.202 CET 2022-11-08 21:24:18.203 CET These tasks are deadlocked:
2022-11-08 21:24:18.203 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.204 CET ----------------------- ----------- ----------------------------------- ------------
2022-11-08 21:24:18.204 CET <DAG name> <Task name> backfill__2022-10-27T00:00:00+00:00 1
2022-11-08 21:24:19.249 CET Command exited with return code 1
...
2022-11-08 21:24:19.348 CET Failed to execute job 627927 for task backfill
在某些情况下,您可以使用以下解决方法来克服死锁:
通过 将
[core]schedule_after_task_execution替换为False来停用迷你调度器。针对较窄的日期范围运行回填。例如,设置
START_DATE和END_DATE以指定仅 1 天的时间段。