Managed Airflow (3ª gen.) | Managed Airflow (2ª gen.) | Managed Airflow (1ª gen. heredada)
En esta página, se proporcionan información y pasos para solucionar problemas comunes con los programadores y procesadores de DAG de Airflow.
Identifica el origen del problema
Para comenzar a solucionar problemas, identifica si el problema ocurre en los siguientes casos:
- Durante el análisis de DAG, mientras un procesador de DAG de Airflow analiza el DAG
- Durante el tiempo de ejecución, mientras un programador de Airflow procesa el DAG
Para obtener más información sobre el tiempo de análisis y el tiempo de ejecución, consulta la Diferencia entre el tiempo de análisis y de ejecución del DAG.
Inspecciona los problemas de procesamiento de DAG
Supervisa tareas en ejecución y en cola
Para verificar si hay tareas atascadas en una cola, sigue estos pasos.
En Google Cloud console, ve a la página Entornos.
En la lista de entornos, haz clic en el nombre de tu entorno. Se abrirá la página Detalles del entorno.
Ve a la pestaña Monitoring.
En la pestaña Monitoring, revisa el gráfico Tareas de Airflow en la sección Ejecuciones de DAG y, luego, identifica posibles problemas. Las tareas de Airflow son tareas que están en estado en cola en Airflow y pueden ir a la cola de agentes de Celery o Kubernetes Executor. Las tareas en cola de Celery son instancias de tareas que se ponen en la cola de agentes de Celery.
Soluciona problemas durante el análisis de DAG
En las siguientes secciones, se describen los síntomas y las posibles soluciones para algunos problemas comunes durante el análisis de DAG.
Análisis y programación de DAG en Managed Airflow (1ª gen. heredada) y Airflow 1
La eficiencia del análisis de DAG mejoró significativamente en Airflow 2. Si tienes problemas de rendimiento relacionados con el análisis y la programación de DAG, considera migrar a Airflow 2.
En Managed Airflow (1ª gen. heredada), el programador se ejecuta en los nodos del clúster junto con otros componentes de Managed Airflow. Por este motivo, la carga de los nodos individuales del clúster puede ser mayor o menor en comparación con otros nodos. El rendimiento del programador (análisis y programación de DAG) puede variar según el nodo en el que se ejecute el programador. Además, un nodo individual en el que se ejecuta el programador puede cambiar como resultado de operaciones de actualización o mantenimiento. Esta limitación se resolvió en Managed Airflow (2ª gen.), donde puedes asignar recursos de CPU y memoria al programador, y el rendimiento del programador no depende de la carga de los nodos del clúster.
Distribución de números y horarios de las tareas
Airflow puede tener problemas cuando programa una gran cantidad de DAG o tareas al mismo tiempo. Para evitar problemas con la programación, puedes hacer lo siguiente:
- Ajusta tus DAG para usar una cantidad menor de tareas más consolidadas.
- Ajusta los intervalos de programación de tus DAG para distribuir las ejecuciones de DAG de manera más uniforme con el tiempo.
Ajusta la configuración de Airflow
Airflow proporciona opciones de configuración que controlan cuántas tareas y DAG de Airflow pueden ejecutarse al mismo tiempo. Para configurar estas opciones de configuración, anula sus valores para tu entorno. También puedes establecer algunos de estos valores a nivel del DAG o de la tarea.
Simultaneidad de los trabajadores
El parámetro
[celery]worker_concurrencycontrola la cantidad máxima de tareas que un trabajador de Airflow puede ejecutar al mismo tiempo. Si multiplicas el valor de este parámetro por la cantidad de trabajadores de Airflow en tu entorno de Managed Airflow, obtendrás la cantidad máxima de tareas que se pueden ejecutar en un momento determinado de tu entorno. Este número está limitado por la opción de configuración de Airflow[core]parallelism, que se describe con más detalle.Ejecuciones máximas de DAG activas
La opción de configuración de Airflow
[core]max_active_runs_per_dagcontrola la cantidad máxima de ejecuciones activas del DAG por DAG. El programador no crea más ejecuciones de DAG si alcanza este límite.Si este parámetro se establece de forma incorrecta, puedes encontrar un problema en el que el programador regula la ejecución del DAG porque no puede crear más instancias de ejecución de DAG en un momento determinado.
También puedes establecer este valor a nivel del DAG con el parámetro
max_active_runs.Cantidad máxima de tareas activas por DAG
La opción de configuración de Airflow
[core]max_active_tasks_per_dagcontrola la cantidad máxima de instancias de tareas que se pueden ejecutar de forma simultánea en cada DAG.Si este parámetro se establece de manera incorrecta, es posible que tengas un problema en el que la ejecución de una sola instancia de DAG sea lenta porque solo hay una cantidad limitada de tareas de DAG que se pueden ejecutar en un momento determinado. En este caso, puedes aumentar el valor de esta opción de configuración.
También puedes establecer este valor a nivel del DAG con el parámetro
max_active_tasks.Puedes usar
max_active_tis_per_dagymax_active_tis_per_dagrunparámetros a nivel de la tarea para controlar cuántas instancias con un ID de tarea específico pueden ejecutarse por DAG y por ejecución de DAG.Paralelismo y tamaño del grupo
La opción de configuración
[core]parallelismde Airflow controla cuántas tareas puede poner en cola el programador de Airflow en la cola del ejecutor después de que se cumplen todas las dependencias para estas tareas.Este es un parámetro global para toda la configuración de Airflow.
Las tareas se ponen en cola y se ejecutan dentro de un grupo. Los entornos de Managed Airflow usan solo un grupo. El tamaño de este grupo controla cuántas tareas puede poner en cola el programador para su ejecución en un momento determinado. Si el tamaño del grupo es demasiado pequeño, el programador no puede poner en cola las tareas para su ejecución a pesar de los límites, que se definen mediante la opción de configuración
[core]parallelismy[celery]worker_concurrencymultiplicada por la cantidad de trabajadores de Airflow, que todavía no se cumplen.Puedes configurar el tamaño del grupo en la IU de Airflow (Administrador > Grupos). Ajusta el tamaño del grupo al nivel del paralelismo que esperas en tu entorno.
Por lo general,
[core]parallelismse establece como un producto de la cantidad máxima de trabajadores y[celery]worker_concurrency.
Soluciona problemas de tareas en ejecución y en cola
En las siguientes secciones, se describen los síntomas y las posibles soluciones para algunos problemas comunes con las tareas en cola y en ejecución.
No se ejecutan las ejecuciones de DAG
Síntoma:
Cuando se establece una fecha de programación para un DAG de forma dinámica, esto puede generar varios efectos secundarios inesperados. Por ejemplo:
Una ejecución de DAG siempre está en el futuro y el DAG nunca se ejecuta.
Las ejecuciones de DAG anteriores se marcan como ejecutadas y exitosas a pesar de que no se ejecutaron.
Hay más información disponible en la documentación de Apache Airflow.
Soluciones posibles:
Sigue las recomendaciones de la documentación de Apache Airflow.
Establece
start_dateestático para los DAG. Como opción, puedes usarcatchup=Falsepara inhabilitar la ejecución del DAG para fechas pasadas.Evita usar
datetime.now()odays_ago(<number of days>), a menos que seas consciente de los efectos secundarios de este enfoque.
Usa la función TimeTable del programador de Airflow
Las tablas de tiempo están disponibles a partir de Airflow 2.2.
Puedes definir una tabla de tiempo para un DAG con uno de los siguientes métodos:
- Con una función de Python
- (No disponible en Managed Airflow [1ª gen. heredada]) Con un complemento personalizado
También puedes usar tablas de tiempo integradas.
Recursos de clúster limitados
Es posible que experimentes problemas de rendimiento si el clúster de GKE de tu entorno es demasiado pequeño para todos tus DAG y tareas. En este caso, prueba una de las siguientes soluciones:
- Crea un entorno nuevo con un tipo de máquina que proporcione más rendimiento y migra tus DAG a él
- Crea más entornos de Managed Airflow y divide los DAG entre ellos
- Cambia el tipo de máquina de los nodos de GKE, como se describe en Actualización del tipo de máquina de los nodos de GKE. Dado que este procedimiento es propenso a errores, es la opción menos recomendada.
- Actualiza el tipo de máquina de la instancia de Cloud SQL que ejecuta la base de datos de Airflow
en tu entorno, por ejemplo, mediante los
gcloud composer environments updatecomandos. El rendimiento bajo de la base de datos de Airflow podría ser la razón por la que el programador es lento.
Evita la programación de tareas durante los períodos de mantenimiento
Puedes definir períodos de mantenimiento para tu entorno de modo que el mantenimiento del entorno se realice fuera de los horarios en los que ejecutas tus DAG. Aún puedes ejecutar tus DAG durante los períodos de mantenimiento, siempre que sea aceptable que se puedan interrumpir y reintentar algunas tareas. Para obtener más información sobre cómo afectan los períodos de mantenimiento a tu entorno, consulta Especifica períodos de mantenimiento.
Uso de “wait_for_downstream” en tus DAG
Si configuras el parámetro wait_for_downstream en True en tus DAG, para que una tarea tenga éxito, todas las tareas que se encuentran en una etapa posterior de esta tarea también deben tener éxito. Significa que la ejecución de las tareas que pertenecen a una ejecución de DAG determinada puede ralentizarse mediante la ejecución de tareas de la ejecución del DAG anterior. Obtén más información en
la documentación de Airflow.
Las tareas en cola durante demasiado tiempo se cancelarán y reprogramarán
Si una tarea de Airflow se mantiene en la cola durante demasiado tiempo, el programador la volverá a programar para su ejecución después de que haya transcurrido la cantidad de tiempo establecida en la opción de configuración de Airflow [scheduler]task_queued_timeout. El valor predeterminado es 2400.
En las versiones de Airflow anteriores a la 2.3.1, la tarea también se marca como fallida y se reintenta si es apta para un reintento.
Una forma de observar los síntomas de esta situación es observar el gráfico con la cantidad de tareas en cola (pestaña “Monitoring” en la IU de Managed Airflow). Si los picos de este gráfico no disminuyen en aproximadamente dos horas, es probable que las tareas se reprogramen (sin registros) y, luego, se muestren las entradas de registro “Adopted tasks were still pending…” en los registros del programador. En esos casos, es posible que veas el mensaje “Log file is not found…” en los registros de tareas de Airflow porque la tarea no se ejecutó.
En general, este comportamiento es el esperado, y la siguiente instancia de la tarea programada debe ejecutarse según la programación. Si observas muchos casos de este tipo en tus entornos de Managed Airflow, es posible que no haya suficientes trabajadores de Airflow en tu entorno para procesar todas las tareas programadas.
Resolución: Para resolver este problema, debes asegurarte de que siempre haya capacidad en los trabajadores de Airflow para ejecutar tareas en cola. Por ejemplo, puedes aumentar la cantidad de trabajadores o worker_concurrency. También puedes ajustar el paralelismo o los grupos para evitar poner en cola tareas más allá de la capacidad que tienes.
Enfoque de Managed Airflow para el parámetro min_file_process_interval
Managed Airflow cambia la forma en que
[scheduler]min_file_process_interval
es usado por el programador de Airflow.
Airflow 1
En el caso de Managed Airflow con Airflow 1, los usuarios pueden establecer el valor de [scheduler]min_file_process_interval entre 0 y 600 segundos. Los valores superiores a 600 segundos producen los mismos resultados que si [scheduler]min_file_process_interval se establece en 600 segundos.
Airflow 2
En las versiones de Managed Airflow anteriores a la 1.19.9, se ignora [scheduler]min_file_process_interval.
Versiones de Managed Airflow posteriores a la 1.19.9:
El programador de Airflow se reinicia después de una cierta cantidad de veces que se programan todos los DAG
, y el [scheduler]num_runs parámetro
controla cuántas veces lo hace el programador. Cuando el programador alcanza los bucles de programación [scheduler]num_runs, se reinicia. El programador es un componente sin estado, y este reinicio es un mecanismo de reparación automática para cualquier problema que pueda experimentar el programador. El valor predeterminado de [scheduler]num_runs es 5,000.
Se puede usar [scheduler]min_file_process_interval para configurar la frecuencia con la que se realiza el análisis de DAG, pero este parámetro no puede ser más largo que el tiempo que necesita un programador para realizar bucles [scheduler]num_runs cuando programa tus DAG.
Marcar tareas como fallidas después de alcanzar dagrun_timeout
El programador marca como fallidas las tareas que no se terminan (en ejecución, programadas y en cola)
si una ejecución de DAG no finaliza dentro de
dagrun_timeout (un parámetro de DAG).
Solución:
Extiende
dagrun_timeoutpara cumplir con el tiempo de espera.
Síntomas de que la base de datos de Airflow está bajo una carga pesada
A veces, en los registros del programador de Airflow, es posible que veas la siguiente entrada de registro de advertencia:
Scheduler heartbeat got an exception: (_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at 'reading initial communication packet', system error: 0")"
También se pueden observar síntomas similares en los registros de trabajadores de Airflow:
Para MySQL:
(_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at
'reading initial communication packet', system error: 0")"
Para PostgreSQL:
psycopg2.OperationalError: connection to server at ... failed
Estos errores o advertencias pueden ser un síntoma de que la base de datos de Airflow está sobrecargada por la cantidad de conexiones abiertas o la cantidad de consultas ejecutadas al mismo tiempo, ya sea por programadores o por otros componentes de Airflow, como trabajadores, activadores y servidores web.
Soluciones posibles:
Quita los datos innecesarios de la base de datos de Airflow.
Escala verticalmente la base de datos de Airflow. Para ello, cambia el tipo de máquina de la instancia de Cloud SQL que almacena la base de datos de Airflow de tu entorno.
Evita usar variables globales en los DAG de Airflow. En su lugar, usa variables de entorno y variables de Airflow.
Establece
[scheduler]scheduler_heartbeat_secen un valor más alto, por ejemplo, en 15 segundos o más.Establece
[scheduler]job_heartbeat_secen un valor más alto, por ejemplo, en 30 segundos o más.Establece
[scheduler]scheduler_health_check_thresholden un valor igual a[scheduler]job_heartbeat_secmultiplicado por4.
El servidor web muestra la advertencia “The scheduler does not appear to be running”
El programador informa su señal de monitoreo de funcionamiento de forma periódica a la base de datos de Airflow. Según esta información, el servidor web de Airflow determina si el programador está activo.
A veces, si el programador está bajo una carga pesada, es posible que no pueda
informar su señal de monitoreo de funcionamiento cada
[scheduler]scheduler_heartbeat_sec.
En esa situación, el servidor web de Airflow podría mostrar la siguiente advertencia:
The scheduler does not appear to be running. Last heartbeat was received <X>
seconds ago.
Soluciones posibles:
Aumenta la CPU y la memoria de los recursos del programador.
Optimiza tus DAG para que su análisis y programación sean más rápidos y no consuman demasiados recursos del programador.
Evita usar variables globales en los DAG de Airflow. En su lugar, usa variables de entorno y variables de Airflow.
Aumenta el valor de la opción de configuración de Airflow
[scheduler]scheduler_health_check_thresholdpara que el servidor web espere más tiempo antes de informar la falta de disponibilidad del programador.
Soluciones alternativas para problemas que se producen durante el reabastecimiento de DAG
A veces, es posible que desees volver a ejecutar los DAG que ya se ejecutaron. Puedes hacerlo con un comando de la CLI de Airflow de la siguiente manera:
Airflow 2
gcloud composer environments run \
ENVIRONMENT_NAME \
--location LOCATION \
dags backfill -- -B \
-s START_DATE \
-e END_DATE \
DAG_NAME
Para volver a ejecutar solo las tareas fallidas de un DAG específico, también usa el argumento --rerun-failed-tasks.
Airflow 1
gcloud composer environments run \
ENVIRONMENT_NAME \
--location LOCATION \
backfill -- -B \
-s START_DATE \
-e END_DATE \
DAG_NAME
Para volver a ejecutar solo las tareas fallidas de un DAG específico, también usa el argumento --rerun_failed_tasks.
Reemplaza lo siguiente:
ENVIRONMENT_NAMEpor el nombre del entorno.LOCATIONpor la región en la que se encuentra el entorno.START_DATEpor un valor para el parámetrostart_datedel DAG, en el formatoYYYY-MM-DD.END_DATEpor un valor para el parámetroend_datedel DAG, en el formatoYYYY-MM-DD.DAG_NAMEpor el nombre del DAG.
A veces, la operación de reabastecimiento puede generar una situación de interbloqueo en la que no es posible un reabastecimiento porque hay un bloqueo en una tarea. Por ejemplo:
2022-11-08 21:24:18.198 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.201 CET -------- --------- -------- ------------
2022-11-08 21:24:18.202 CET 2022-11-08 21:24:18.203 CET These tasks are deadlocked:
2022-11-08 21:24:18.203 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.204 CET ----------------------- ----------- ----------------------------------- ------------
2022-11-08 21:24:18.204 CET <DAG name> <Task name> backfill__2022-10-27T00:00:00+00:00 1
2022-11-08 21:24:19.249 CET Command exited with return code 1
...
2022-11-08 21:24:19.348 CET Failed to execute job 627927 for task backfill
En algunos casos, puedes usar las siguientes soluciones alternativas para superar los interbloqueos:
Inhabilita el miniprogramador anulando el
[core]schedule_after_task_executionaFalse.Ejecuta reabastecimientos para períodos más cortos. Por ejemplo, establece
START_DATEyEND_DATEpara especificar un período de solo 1 día.