Managed Airflow (3e génération) | Managed Airflow (2e génération) | Managed Airflow (1re génération, version héritée)
Cette page fournit des étapes de dépannage et des informations sur les problèmes courants liés aux programmeurs Airflow et aux processeurs DAG.
Identifier la source du problème
Pour commencer le dépannage, identifiez si le problème se produit :
- au moment de l'analyse du DAG, lorsque le DAG est analysé par un processeur DAG Airflow ;
- au moment de l'exécution, lorsque le DAG est traité par un programmeur Airflow.
Pour en savoir plus sur le temps d'analyse et la durée d'exécution du DAG, consultez la section Différence entre le temps d'analyse du DAG et la durée d'exécution du DAG.
Inspecter les problèmes de traitement du DAG
Surveiller les tâches en cours et en file d'attente
Pour vérifier si des tâches sont bloquées dans une file d'attente, procédez comme suit :
Dans la Google Cloud console, accédez à la page Environnements.
Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Détails de l'environnement s'ouvre.
Accédez à l'onglet Surveillance.
Dans l'onglet Surveillance, consultez le graphique Tâches Airflow dans la section Exécutions DAG et identifiez les problèmes potentiels. Les tâches Airflow sont des tâches qui sont en file d'attente dans Airflow. Elles peuvent être envoyées à la file d'attente de l'agent Celery ou de l'exécuteur Kubernetes. Les tâches en file d'attente Celery sont des instances de tâches placées dans la file d'attente de l'agent Celery.
Résoudre les problèmes rencontrés au moment de l'analyse du DAG
Les sections suivantes décrivent les symptômes et les correctifs potentiels pour certains problèmes courants qui surviennent au moment de l'analyse du DAG.
Analyse et planification des DAG dans Managed Airflow (1re génération, version héritée) et Airflow 1
L'efficacité de l'analyse des DAG a été considérablement améliorée dans Airflow 2. Si vous rencontrez des problèmes de performances liés à l'analyse et à la planification des DAG, envisagez de migrer vers Airflow 2.
Dans Managed Airflow (1re génération, version héritée), le programmeur s'exécute sur les nœuds de cluster avec d'autres composants Managed Airflow. De ce fait, la charge des nœuds de cluster individuels peut être supérieure ou inférieure à celle des autres nœuds. Les performances du programmeur (analyse et planification des DAG) peuvent varier en fonction du nœud sur lequel il s'exécute. De plus, un nœud individuel sur lequel le programmeur s'exécute peut changer à la suite d'opérations de mise à niveau ou de maintenance. Cette limitation a été résolue dans Managed Airflow (2e génération), où vous pouvez allouer des ressources de processeur et de mémoire au programmeur, et où ses performances ne dépendent pas de la charge des nœuds de cluster.
Répartition du nombre et de la durée des tâches
Airflow peut rencontrer des problèmes lors de la planification simultanée d'un grand nombre de DAG ou de tâches. Pour éviter les problèmes de planification, vous pouvez :
- ajuster vos DAG pour utiliser un nombre plus faible de tâches mieux consolidées ;
- ajuster les intervalles de planification de vos DAG pour répartir les exécutions de DAG de manière plus uniforme dans le temps.
Scaling de la configuration Airflow
Airflow fournit des options de configuration Airflow qui contrôlent le nombre de tâches et de DAG Airflow pouvant être exécutés simultanément. Pour définir ces options de configuration, remplacez leurs valeurs pour votre environnement. Vous pouvez également définir certaines de ces valeurs au niveau du DAG ou de la tâche.
Simultanéité des nœuds de calcul
Le paramètre
[celery]worker_concurrencycontrôle le nombre maximal de tâches qu'un nœud de calcul Airflow peut exécuter en même temps. Si vous multipliez la valeur de ce paramètre par le nombre de nœuds de calcul Airflow dans votre environnement Managed Airflow, vous obtenez le nombre maximal de tâches pouvant être exécutées simultanément dans votre environnement. Ce nombre est limité par l'option de configuration Airflow[core]parallelism, décrite en détail plus bas.Nombre maximal d'exécutions de DAG actives
L'option de configuration Airflow
[core]max_active_runs_per_dagcontrôle le nombre maximal d'exécutions de DAG actives par DAG. Si cette limite est atteinte, le programmeur ne crée pas d'autres exécutions de DAG.Une définition incorrecte de ce paramètre peut créer un problème où le programmeur limite l'exécution du DAG, car il ne peut pas créer d'autres instances d'exécution de DAG simultanées.
Vous pouvez également définir cette valeur au niveau du DAG avec le paramètre
max_active_runs.Nombre maximal de tâches actives par DAG
L'option de configuration Airflow
[core]max_active_tasks_per_dagcontrôle le nombre maximal d'instances de tâche pouvant s'exécuter simultanément dans chaque DAG.Si ce paramètre est défini de manière incorrecte, vous risquez de rencontrer un problème dans lequel l'exécution d'une instance de DAG est ralentie car le nombre de tâches de DAG pouvant être exécutées simultanément est limité. Dans ce cas, vous pouvez augmenter la valeur de cette option de configuration.
Vous pouvez également définir cette valeur au niveau du DAG avec le paramètre
max_active_tasks.Vous pouvez utiliser les paramètres
max_active_tis_per_dagetmax_active_tis_per_dagrunau niveau de la tâche pour contrôler le nombre d'instances avec un ID de tâche spécifique autorisé à s'exécuter par DAG et par exécution de DAG.Parallélisme et taille du pool
L'option de configuration Airflow
[core]parallelismcontrôle le nombre de tâches que le programmeur Airflow peut mettre en file d'attente dans la file d'attente de l'exécuteur une fois que toutes les dépendances pour ces tâches sont satisfaites.Il s'agit d'un paramètre global pour l'ensemble de la configuration Airflow.
Les tâches sont mises en file d'attente et exécutées dans un pool. Les environnements Managed Airflow n'utilisent qu'un seul pool. La taille de ce pool contrôle le nombre de tâches pouvant être mises en file d'attente simultanément par le programmeur. Si la taille du pool est trop faible, le programmeur ne pourra plus mettre de tâches en file d'attente avant même que les seuils (définis par l'option de configuration
[core]parallelismet par l'option de configuration[celery]worker_concurrencymultipliée par le nombre de nœuds de calcul Airflow) ne soient dépassés.Vous pouvez configurer la taille du pool dans l'interface utilisateur Airflow (Admin > Pools). Ajustez la taille du pool en fonction du niveau de parallélisme attendu dans votre environnement.
En règle générale,
[core]parallelismest défini comme le produit du nombre maximal de nœuds de calcul et[celery]worker_concurrency.
Résoudre les problèmes liés aux tâches en cours d'exécution et en file d'attente
Les sections suivantes décrivent les symptômes et les correctifs potentiels pour certains problèmes courants qui surviennent au moment de l'exécution ou de la mise en file d'attente des tâches.
Les exécutions DAG ne sont pas exécutées
Symptôme :
Lorsqu'une date de planification d'un DAG est définie de manière dynamique, cela peut entraîner divers effets secondaires inattendus. Exemple :
Une exécution de DAG est toujours future et le DAG n'est jamais exécuté.
Les exécutions DAG passées sont marquées comme exécutées et réussies alors qu'elles ne l'ont pas été.
Pour en savoir plus, consultez la documentation Apache Airflow.
Solutions possibles :
Suivez les recommandations de la documentation Apache Airflow.
Définissez une
start_datestatique pour les DAG. Vous pouvez également utilisercatchup=Falsepour désactiver l'exécution du DAG pour les dates passées.Évitez d'utiliser
datetime.now()oudays_ago(<number of days>), sauf si vous êtes conscient des effets secondaires de cette approche.
Utiliser la fonctionnalité TimeTable du programmeur Airflow
Les tables de temps sont disponibles à partir d'Airflow 2.2.
Vous pouvez définir une table de temps pour un DAG à l'aide de l'une des méthodes suivantes :
- Avec une fonction Python
- (Non disponible dans Managed Airflow (1re génération, version héritée)) Avec un plug-in personnalisé
Vous pouvez également utiliser des tables de temps intégrées.
Ressources de cluster limitées
Vous risquez de rencontrer des problèmes de performances si le cluster GKE de votre environnement est trop petit pour gérer tous vos DAG et tâches. Dans ce cas, essayez l'une des solutions suivantes :
- Créez un environnement avec un type de machine offrant plus de performances et migrez vos DAG vers cet environnement.
- Créez d'autres environnements Managed Airflow et répartissez les DAG sur ces environnements.
- Modifiez le type de machine pour les nœuds GKE, comme décrit dans Mettre à niveau le type de machine pour les nœuds GKE. Cette procédure étant sujette aux erreurs, il s'agit de l'option la moins recommandée.
- Mettez à niveau le type de machine de l'instance Cloud SQL qui exécute la base de données Airflow
dans votre environnement, par exemple à l'aide des
gcloud composer environments updatecommandes. Une base de données Airflow peu performante peut ralentir le programmeur.
Éviter la planification des tâches pendant les intervalles de maintenance
Vous pouvez définir des intervalles de maintenance pour votre environnement afin que la maintenance de l'environnement s'effectue en dehors des périodes d'exécution de vos DAG. Vous pouvez toujours exécuter vos DAG pendant les intervalles de maintenance, à condition qu'il soit acceptable que certaines tâches soient interrompues et relancées. Pour en savoir plus sur l'impact des intervalles de maintenance sur votre environnement, consultez la section Spécifier des intervalles de maintenance.
Utilisation de "wait_for_downstream" dans vos DAG
Si vous définissez le paramètre wait_for_downstream sur True dans vos DAG, pour qu'une tâche réussisse, toutes les tâches immédiatement en aval de cette tâche doivent également réussir. Cela signifie que l'exécution des tâches d'un DAG donné peut être ralentie par l'exécution des tâches du DAG précédent. Pour en savoir plus, consultez
la documentation Airflow.
Les tâches mises en file d'attente trop longtemps seront annulées et reprogrammées
Si une tâche Airflow est conservée trop longtemps dans la file d'attente, le programmeur la reprogrammera pour exécution une fois le délai défini dans l'option de configuration Airflow [scheduler]task_queued_timeout écoulé. La valeur par défaut est 2400.
Dans les versions d'Airflow antérieures à la version 2.3.1, la tâche est également marquée comme ayant échoué et relancée si elle est éligible à une nouvelle tentative.
Pour observer les symptômes de cette situation, consultez le graphique indiquant le nombre de tâches en file d'attente (onglet "Surveillance" de l'interface utilisateur Managed Airflow). Si les pics de ce graphique ne diminuent pas au bout d'environ deux heures, les tâches seront probablement reprogrammées (sans journaux), suivies des entrées de journal "Adopted tasks were still pending ..." dans les journaux du programmeur. Dans ce cas, le message "Log file is not found..." peut s'afficher dans les journaux des tâches Airflow, car la tâche n'a pas été exécutée.
En général, ce comportement est attendu et l'instance suivante de la tâche planifiée est censée être exécutée conformément à la planification. Si vous constatez de nombreux cas de ce type dans vos environnements Managed Airflow, cela peut signifier qu'il n'y a pas assez de nœuds de calcul Airflow dans votre environnement pour traiter toutes les tâches planifiées.
Résolution : Pour résoudre ce problème, vous devez vous assurer que les nœuds de calcul Airflow disposent toujours de la capacité nécessaire pour exécuter les tâches en file d'attente. Par exemple, vous pouvez augmenter le nombre de nœuds de calcul ou la simultanéité des nœuds de calcul. Vous pouvez également ajuster le parallélisme ou les pools pour éviter de mettre en file d'attente plus de tâches que votre capacité.
Approche de Managed Airflow concernant le paramètre min_file_process_interval
Managed Airflow modifie la façon dont
[scheduler]min_file_process_interval
est utilisé par le programmeur Airflow.
Airflow 1
Dans le cas de Managed Airflow utilisant Airflow 1, les utilisateurs peuvent définir la valeur de [scheduler]min_file_process_interval entre 0 et 600 secondes. Les valeurs supérieures à 600 secondes donnent les mêmes résultats que si [scheduler]min_file_process_interval est défini sur 600 secondes.
Airflow 2
Dans les versions de Managed Airflow antérieures à la version 1.19.9, [scheduler]min_file_process_interval est ignoré.
Versions de Managed Airflow ultérieures à la version 1.19.9 :
Le programmeur Airflow est redémarré après un certain nombre de planifications de tous les DAG
. Le paramètre [scheduler]num_runs
contrôle le nombre de fois où le programmeur effectue cette opération. Lorsque le programmeur atteint les boucles de planification [scheduler]num_runs, il est redémarré. Le programmeur est un composant sans état, et un tel redémarrage est un mécanisme d'autoréparation pour tout problème qu'il pourrait rencontrer. La valeur par défaut de [scheduler]num_runs est 5 000.
[scheduler]min_file_process_interval peut être utilisé pour configurer la fréquence d'analyse des DAG, mais ce paramètre ne peut pas être plus long que le temps nécessaire à un programmeur pour effectuer les boucles [scheduler]num_runs lors de la planification de vos DAG.
Marquer les tâches comme ayant échoué après avoir atteint dagrun_timeout
Le programmeur marque les tâches qui ne sont pas terminées (en cours d'exécution, planifiées et en file d'attente)
comme ayant échoué si une exécution de DAG ne se termine pas dans le délai
dagrun_timeout (paramètre DAG).
Solution:
Prolongez
dagrun_timeoutpour respecter le délai.
Symptômes d'une charge importante sur la base de données Airflow
Il arrive que l'entrée de journal d'avertissement suivante s'affiche dans les journaux du programmeur Airflow :
Scheduler heartbeat got an exception: (_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at 'reading initial communication packet', system error: 0")"
Des symptômes similaires peuvent également être observés dans les journaux des nœuds de calcul Airflow :
Pour MySQL :
(_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at
'reading initial communication packet', system error: 0")"
Pour PostgreSQL :
psycopg2.OperationalError: connection to server at ... failed
De telles erreurs ou avertissements peuvent être le signe que la base de données Airflow est surchargée par le nombre de connexions ouvertes ou le nombre de requêtes exécutées en même temps, soit par les programmeurs, soit par d'autres composants Airflow tels que les nœuds de calcul, les déclencheurs et les serveurs Web.
Solutions possibles :
Supprimez les données inutiles de la base de données Airflow.
Faites évoluer la base de données Airflow en modifiant le type de machine de l'instance Cloud SQL qui stocke la base de données Airflow de votre environnement.
Évitez d'utiliser des variables globales dans les DAG Airflow. Utilisez plutôt des variables d'environnement et des variables Airflow.
Définissez
[scheduler]scheduler_heartbeat_secsur une valeur plus élevée, par exemple 15 secondes ou plus.Définissez
[scheduler]job_heartbeat_secsur une valeur plus élevée, par exemple 30 secondes ou plus.Définissez
[scheduler]scheduler_health_check_thresholdsur une valeur égale à[scheduler]job_heartbeat_secmultipliée par4.
Le serveur Web affiche l'avertissement "Le programmeur ne semble pas être en cours d'exécution"
Le programmeur signale régulièrement sa pulsation à la base de données Airflow. En fonction de ces informations, le serveur Web Airflow détermine si le programmeur est actif.
Parfois, si le programmeur est soumis à une charge importante, il peut ne pas être en mesure de
signaler sa pulsation toutes les
[scheduler]scheduler_heartbeat_sec.
Dans ce cas, le serveur Web Airflow peut afficher l'avertissement suivant :
The scheduler does not appear to be running. Last heartbeat was received <X>
seconds ago.
Solutions possibles :
Augmentez les ressources de processeur et de mémoire du programmeur.
Optimisez vos DAG afin que leur analyse et leur planification soient plus rapides et ne consomment pas trop de ressources du programmeur.
Évitez d'utiliser des variables globales dans les DAG Airflow. Utilisez plutôt des variables d'environnement et des variables Airflow.
Augmentez la valeur de l'option de configuration Airflow
[scheduler]scheduler_health_check_threshold, afin que le serveur Web attende plus longtemps avant de signaler l'indisponibilité du programmeur.
Solutions pour les problèmes rencontrés lors du remplissage des DAG
Parfois, vous pouvez vouloir réexécuter des DAG qui ont déjà été exécutés. Vous pouvez le faire avec une commande de CLI Airflow de la manière suivante :
Airflow 2
gcloud composer environments run \
ENVIRONMENT_NAME \
--location LOCATION \
dags backfill -- -B \
-s START_DATE \
-e END_DATE \
DAG_NAME
Pour réexécuter uniquement les tâches ayant échoué pour un DAG spécifique, utilisez également l'argument --rerun-failed-tasks.
Airflow 1
gcloud composer environments run \
ENVIRONMENT_NAME \
--location LOCATION \
backfill -- -B \
-s START_DATE \
-e END_DATE \
DAG_NAME
Pour réexécuter uniquement les tâches ayant échoué pour un DAG spécifique, utilisez également l'argument --rerun_failed_tasks.
Remplacez :
ENVIRONMENT_NAMEpar le nom de l'environnement.LOCATIONpar la région dans laquelle se trouve l'environnement.START_DATEpar une valeur pour le paramètrestart_datedu DAG, au formatYYYY-MM-DD.END_DATEpar une valeur pour le paramètreend_datedu DAG, au formatYYYY-MM-DD.DAG_NAMEpar le nom du DAG.
L'opération de remplissage peut parfois générer une situation d'interblocage dans laquelle un remplissage n'est pas possible, car une tâche est verrouillée. Exemple :
2022-11-08 21:24:18.198 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.201 CET -------- --------- -------- ------------
2022-11-08 21:24:18.202 CET 2022-11-08 21:24:18.203 CET These tasks are deadlocked:
2022-11-08 21:24:18.203 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.204 CET ----------------------- ----------- ----------------------------------- ------------
2022-11-08 21:24:18.204 CET <DAG name> <Task name> backfill__2022-10-27T00:00:00+00:00 1
2022-11-08 21:24:19.249 CET Command exited with return code 1
...
2022-11-08 21:24:19.348 CET Failed to execute job 627927 for task backfill
Dans certains cas, vous pouvez utiliser les solutions suivantes pour surmonter les blocages :
Désactivez le mini-programmeur en remplaçant le
[core]schedule_after_task_executionparFalse.Exécutez des remplissages pour des plages de dates plus étroites. Par exemple, définissez
START_DATEetEND_DATEpour spécifier une période d'un seul jour.