Risoluzione dei problemi dello scheduler Airflow

Managed Airflow (terza generazione) | Managed Airflow (seconda generazione) | Managed Airflow (prima generazione legacy)

Questa pagina fornisce passaggi per la risoluzione dei problemi e informazioni sui problemi comuni relativi agli scheduler e ai processori DAG di Airflow.

Identificare l'origine del problema

Per iniziare la risoluzione dei problemi, identifica se il problema si verifica:

  • Al momento dell'analisi del DAG, mentre il DAG viene analizzato da un processore DAG di Airflow
  • Al momento dell'esecuzione, mentre il DAG viene elaborato da uno scheduler di Airflow

Per saperne di più sul tempo di analisi e sul tempo di esecuzione, consulta Differenza tra tempo di analisi del DAG e tempo di esecuzione del DAG.

Esaminare i problemi di elaborazione del DAG

  1. Esamina i log del processore DAG.
  2. Controlla i tempi di analisi del DAG.

Monitorare le attività in esecuzione e in coda

Per verificare se sono presenti attività bloccate in una coda, segui questi passaggi.

  1. Nella Google Cloud console, vai alla pagina Ambienti.

    Vai ad Ambienti

  2. Nell'elenco degli ambienti, fai clic sul nome del tuo ambiente. Viene visualizzata la pagina Dettagli ambiente.

  3. Vai alla scheda Monitoraggio.

  4. Nella scheda Monitoraggio, esamina il grafico Attività Airflow nella sezione Esecuzioni di DAG e identifica i possibili problemi. Le attività Airflow sono attività in stato di coda in Airflow e possono essere inserite nella coda dell'intermediario Celery o Kubernetes Executor. Le attività nella coda Celery sono istanze di attività che vengono inserite nella coda dell'intermediario Celery.

Risolvere i problemi al momento dell'analisi del DAG

Le sezioni seguenti descrivono i sintomi e le potenziali correzioni per alcuni problemi comuni al momento dell'analisi del DAG.

Analisi e pianificazione dei DAG in Managed Airflow (prima generazione legacy) e Airflow 1

L'efficienza dell'analisi dei DAG è stata notevolmente migliorata in Airflow 2. Se riscontri problemi di prestazioni relativi all'analisi e alla pianificazione dei DAG, valuta la possibilità di eseguire la migrazione ad Airflow 2.

In Managed Airflow (prima generazione legacy), lo scheduler viene eseguito sui nodi del cluster insieme ad altri componenti di Managed Airflow. Per questo motivo, il carico dei singoli nodi del cluster potrebbe essere superiore o inferiore rispetto ad altri nodi. Le prestazioni dello scheduler (analisi e pianificazione dei DAG) possono variare a seconda del nodo in cui viene eseguito lo scheduler. Inoltre, un singolo nodo in cui viene eseguito lo scheduler può cambiare in seguito a operazioni di upgrade o manutenzione. Questa limitazione è stata risolta in Managed Airflow (seconda generazione), dove puoi allocare risorse di CPU e memoria allo scheduler e le prestazioni dello scheduler non dipendono dal carico dei nodi del cluster.

Numero e distribuzione temporale delle attività

Airflow può avere problemi durante la pianificazione di un numero elevato di DAG o attività contemporaneamente. Per evitare problemi di pianificazione, puoi:

  • Modificare i DAG in modo da utilizzare un numero inferiore di attività più consolidate.
  • Modificare gli intervalli di pianificazione dei DAG per distribuire le esecuzioni dei DAG in modo più uniforme nel tempo.

Scalare la configurazione di Airflow

Airflow fornisce opzioni di configurazione di Airflow che controllano il numero di attività e DAG che Airflow può eseguire contemporaneamente. Per impostare queste opzioni di configurazione, esegui l'override dei relativi valori per il tuo ambiente. Puoi anche impostare alcuni di questi valori a livello di DAG o attività.

  • Concorrenza dei worker

    Il parametro [celery]worker_concurrency controlla il numero massimo di attività che un worker di Airflow può eseguire contemporaneamente. Se moltiplichi il valore di questo parametro per il numero di worker di Airflow nel tuo ambiente Managed Airflow, ottieni il numero massimo di attività che possono essere eseguite in un determinato momento nel tuo ambiente. Questo numero è limitato dall'opzione di configurazione di Airflow [core]parallelism, descritta di seguito.

  • Numero massimo di esecuzioni di DAG attive

    L'opzione di configurazione di Airflow [core]max_active_runs_per_dag controlla il numero massimo di esecuzioni di DAG attive per DAG. Se raggiunge questo limite, lo scheduler non crea altre esecuzioni di DAG.

    Se questo parametro è impostato in modo errato, potresti riscontrare un problema per cui lo scheduler limita l'esecuzione del DAG perché non può creare altre istanze di esecuzione del DAG in un determinato momento.

    Puoi anche impostare questo valore a livello di DAG con il parametro max_active_runs.

  • Numero massimo di attività attive per DAG

    L'opzione di configurazione di Airflow [core]max_active_tasks_per_dag controlla il numero massimo di istanze di attività che possono essere eseguite contemporaneamente in ogni DAG.

    Se questo parametro è impostato in modo errato, potresti riscontrare un problema per cui l'esecuzione di una singola istanza DAG è lenta perché in un determinato momento può essere eseguito solo un numero limitato di attività DAG. In questo caso, puoi aumentare il valore di questa opzione di configurazione.

    Puoi anche impostare questo valore a livello di DAG con il parametro max_active_tasks.

    Puoi utilizzare max_active_tis_per_dag e max_active_tis_per_dagrun parametri a livello di attività per controllare il numero di istanze con un ID attività specifico che possono essere eseguite per DAG e per esecuzione di DAG.

  • Parallelismo e dimensioni del pool

    L'opzione di configurazione di Airflow [core]parallelism controlla il numero di attività che lo scheduler di Airflow può mettere in coda nella coda dell'Executor dopo che tutte le dipendenze di queste attività sono state soddisfatte.

    Si tratta di un parametro globale per l'intera configurazione di Airflow.

    Le attività vengono messe in coda ed eseguite all'interno di un pool. Gli ambienti Managed Airflow utilizzano un solo pool. Le dimensioni di questo pool controllano il numero di attività che lo scheduler può mettere in coda per l'esecuzione in un determinato momento. Se le dimensioni del pool sono troppo piccole, lo scheduler non può mettere in coda le attività per l'esecuzione anche se le soglie, definite dall'opzione di configurazione [core]parallelism e dall'opzione di configurazione [celery]worker_concurrency moltiplicata per il numero di worker di Airflow, non sono ancora state raggiunte.

    Puoi configurare le dimensioni del pool nella UI di Airflow (Amministrazione > Pool). Regola le dimensioni del pool in base al livello di parallelismo previsto nel tuo ambiente.

    In genere, [core]parallelism è impostato come prodotto del numero massimo di worker e [celery]worker_concurrency.

Risolvere i problemi relativi alle attività in esecuzione e in coda

Le sezioni seguenti descrivono i sintomi e le potenziali correzioni per alcuni problemi comuni relativi alle attività in esecuzione e in coda.

Le esecuzioni di DAG non vengono eseguite

Sintomo:

Quando una data di pianificazione per un DAG viene impostata dinamicamente, possono verificarsi vari effetti collaterali imprevisti. Ad esempio:

  • L'esecuzione di un DAG è sempre futura e il DAG non viene mai eseguito.

  • Le esecuzioni di DAG precedenti vengono contrassegnate come eseguite e riuscite, anche se non sono state eseguite.

Per saperne di più, consulta la documentazione di Apache Airflow.

Possibili soluzioni:

  • Segui le indicazioni riportate nella documentazione di Apache Airflow.

  • Imposta start_date statico per i DAG. In alternativa, puoi utilizzare catchup=False per disattivare l'esecuzione del DAG per le date precedenti.

  • Evita di utilizzare datetime.now() o days_ago(<number of days>) a meno che tu non sia a conoscenza degli effetti collaterali di questo approccio.

Utilizzare la funzionalità TimeTable dello scheduler di Airflow

Le tabelle temporali sono disponibili a partire da Airflow 2.2.

Puoi definire una tabella temporale per un DAG con uno dei seguenti metodi:

Puoi anche utilizzare le tabelle temporali integrate.

Risorse cluster limitate

Potresti riscontrare problemi di prestazioni se il cluster GKE del tuo ambiente è troppo piccolo per gestire tutti i DAG e le attività. In questo caso, prova una delle seguenti soluzioni:

  • Crea un nuovo ambiente con un tipo di macchina che offre prestazioni migliori ed esegui la migrazione dei DAG.
  • Crea altri ambienti Managed Airflow e dividi i DAG tra di essi.
  • Modifica il tipo di macchina per i nodi GKE, come descritto in Eseguire l'upgrade del tipo di macchina per i nodi GKE. Poiché questa procedura è soggetta a errori, è l'opzione meno consigliata.
  • Esegui l'upgrade del tipo di macchina dell'istanza Cloud SQL che esegue il database Airflow nel tuo ambiente, ad esempio utilizzando i gcloud composer environments update comandi. Le prestazioni scarse del database Airflow potrebbero essere il motivo per cui lo scheduler è lento.

Evitare la pianificazione delle attività durante i periodi di manutenzione

Puoi definire i periodi di manutenzione per il tuo ambiente in modo che la manutenzione dell'ambiente avvenga al di fuori degli orari in cui esegui i DAG. Puoi comunque eseguire i DAG durante i periodi di manutenzione, a condizione che sia accettabile che alcune attività possano essere interrotte e riprovate. Per saperne di più su come i periodi di manutenzione influiscono sul tuo ambiente, consulta Specificare i periodi di manutenzione.

Utilizzo di "wait_for_downstream" nei DAG

Se imposti il parametro wait_for_downstream su True nei DAG, affinché un'attività abbia esito positivo, anche tutte le attività immediatamente a valle di questa attività devono avere esito positivo. Ciò significa che l'esecuzione delle attività appartenenti a una determinata esecuzione di DAG potrebbe essere rallentata dall'esecuzione delle attività dell'esecuzione di DAG precedente. Per saperne di più, consulta la documentazione di Airflow in Airflow.

Le attività in coda per troppo tempo verranno annullate e riprogrammate

Se un'attività Airflow viene mantenuta in coda per troppo tempo, lo scheduler la riprogramma per l'esecuzione dopo che è trascorso il periodo di tempo impostato nell'opzione di configurazione di Airflow [scheduler]task_queued_timeout. Il valore predefinito è 2400. Nelle versioni di Airflow precedenti alla 2.3.1, l'attività viene anche contrassegnata come non riuscita e riprovata se idonea per un nuovo tentativo.

Un modo per osservare i sintomi di questa situazione è esaminare il grafico con il numero di attività in coda (scheda "Monitoraggio" nella UI di Managed Airflow). Se i picchi in questo grafico non diminuiscono entro circa due ore, le attività verranno molto probabilmente riprogrammate (senza log), seguite dalle voci di log "Adopted tasks were still pending ..." nei log dello scheduler. In questi casi, potresti visualizzare il messaggio "Log file is not found..." nei log delle attività Airflow perché l'attività non è stata eseguita.

In generale, questo comportamento è previsto e l'istanza successiva dell'attività pianificata deve essere eseguita in base alla pianificazione. Se osservi molti casi di questo tipo nei tuoi ambienti Managed Airflow, potrebbe significare che non ci sono abbastanza worker di Airflow nel tuo ambiente per elaborare tutte le attività pianificate.

Risoluzione: per risolvere il problema, devi assicurarti che i worker di Airflow abbiano sempre la capacità di eseguire le attività in coda. Ad esempio, puoi aumentare il numero di worker o worker_concurrency. Puoi anche ottimizzare il parallelismo o i pool per evitare di mettere in coda più attività di quelle che hai a disposizione.

Approccio di Managed Airflow al parametro min_file_process_interval

Managed Airflow modifica il modo in cui [scheduler]min_file_process_interval viene utilizzato dallo scheduler di Airflow.

Airflow 1

Nel caso di Managed Airflow che utilizza Airflow 1, gli utenti possono impostare il valore di [scheduler]min_file_process_interval tra 0 e 600 secondi. I valori superiori a 600 secondi producono gli stessi risultati di se [scheduler]min_file_process_interval è impostato su 600 secondi.

Airflow 2

Nelle versioni di Managed Airflow precedenti alla 1.19.9, [scheduler]min_file_process_interval viene ignorato.

Versioni di Managed Airflow successive alla 1.19.9:

Lo scheduler di Airflow viene riavviato dopo un certo numero di volte in cui tutti i DAG vengono pianificati e il [scheduler]num_runs parametro controlla il numero di volte in cui viene eseguito dallo scheduler. Quando lo scheduler raggiunge i loop di pianificazione [scheduler]num_runs, viene riavviato. Lo scheduler è un componente stateless e questo riavvio è un meccanismo di riparazione automatica per eventuali problemi che lo scheduler potrebbe riscontrare. Il valore predefinito di [scheduler]num_runs è 5000.

[scheduler]min_file_process_interval può essere utilizzato per configurare la frequenza con cui viene eseguita l'analisi dei DAG, ma questo parametro non può essere più lungo del tempo necessario a uno scheduler per eseguire i loop [scheduler]num_runs durante la pianificazione dei DAG.

Contrassegnare le attività come non riuscite dopo aver raggiunto dagrun_timeout

Lo scheduler contrassegna come non riuscite le attività non completate (in esecuzione, pianificate e in coda) se un'esecuzione di DAG non viene completata entro dagrun_timeout (un parametro DAG).

Soluzione:

Sintomi del database Airflow sotto carico elevato

A volte, nei log dello scheduler di Airflow potresti visualizzare la seguente voce di log di avviso:

Scheduler heartbeat got an exception: (_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at 'reading initial communication packet', system error: 0")"

Sintomi simili potrebbero essere osservati anche nei log dei worker di Airflow:

Per MySQL:

(_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at
'reading initial communication packet', system error: 0")"

Per PostgreSQL:

psycopg2.OperationalError: connection to server at ... failed

Questi errori o avvisi potrebbero essere un sintomo del sovraccarico del database Airflow dovuto al numero di connessioni aperte o al numero di query eseguite nello stesso periodo di tempo, sia dagli scheduler sia da altri componenti di Airflow come worker, triggerer e server web.

Possibili soluzioni:

Il server web mostra l'avviso "Lo scheduler non sembra essere in esecuzione"

Lo scheduler segnala regolarmente il suo heartbeat al database Airflow. In base a queste informazioni, il server web Airflow determina se lo scheduler è attivo.

A volte, se lo scheduler è sotto carico elevato, potrebbe non essere in grado di segnalare il suo heartbeat ogni [scheduler]scheduler_heartbeat_sec.

In questa situazione, il server web Airflow potrebbe mostrare il seguente avviso:

The scheduler does not appear to be running. Last heartbeat was received <X>
seconds ago.

Possibili soluzioni:

  • Aumenta le risorse di CPU e memoria per lo scheduler.

  • Ottimizza i DAG in modo che l'analisi e la pianificazione siano più veloci e non consumino troppe risorse dello scheduler.

  • Evita di utilizzare variabili globali nei DAG di Airflow. Utilizza invece le variabili di ambiente e le variabili di Airflow.

  • Aumenta il valore dell'opzione di configurazione di Airflow [scheduler]scheduler_health_check_threshold, in modo che il server web attenda più a lungo prima di segnalare l'indisponibilità dello scheduler.

Soluzioni alternative per i problemi riscontrati durante il backfill dei DAG

A volte, potresti voler eseguire di nuovo i DAG già eseguiti. Puoi farlo con un comando della CLI di Airflow nel seguente modo:

Airflow 2

gcloud composer environments run \
  ENVIRONMENT_NAME \
  --location LOCATION \
   dags backfill -- -B \
   -s START_DATE \
   -e END_DATE \
   DAG_NAME

Per eseguire di nuovo solo le attività non riuscite per un DAG specifico, utilizza anche l'argomento --rerun-failed-tasks.

Airflow 1

gcloud composer environments run \
  ENVIRONMENT_NAME \
  --location LOCATION \
  backfill -- -B \
  -s START_DATE \
  -e END_DATE \
  DAG_NAME

Per eseguire di nuovo solo le attività non riuscite per un DAG specifico, utilizza anche l'argomento --rerun_failed_tasks.

Sostituisci:

  • ENVIRONMENT_NAME con il nome dell'ambiente.
  • LOCATION con la regione in cui si trova l'ambiente.
  • START_DATE con un valore per il parametro DAG start_date, nel formato YYYY-MM-DD.
  • END_DATE con un valore per il parametro DAG end_date, nel formato YYYY-MM-DD.
  • DAG_NAME con il nome del DAG.

A volte, l'operazione di backfill potrebbe generare una situazione di deadlock in cui il backfill non è possibile perché è presente un blocco su un'attività. Ad esempio:

2022-11-08 21:24:18.198 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.201 CET -------- --------- -------- ------------
2022-11-08 21:24:18.202 CET 2022-11-08 21:24:18.203 CET These tasks are deadlocked:
2022-11-08 21:24:18.203 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.204 CET ----------------------- ----------- ----------------------------------- ------------
2022-11-08 21:24:18.204 CET <DAG name> <Task name> backfill__2022-10-27T00:00:00+00:00 1
2022-11-08 21:24:19.249 CET Command exited with return code 1
...
2022-11-08 21:24:19.348 CET Failed to execute job 627927 for task backfill

In alcuni casi, puoi utilizzare le seguenti soluzioni alternative per superare i deadlock:

  • Disattiva il mini-scheduler eseguendo l'override di [core]schedule_after_task_execution su False.

  • Esegui i backfill per intervalli di date più ristretti. Ad esempio, imposta START_DATE e END_DATE per specificare un periodo di un solo giorno.

Passaggi successivi