Managed Airflow (Gen 3) | Managed Airflow (Gen 2) | Managed Airflow (Legacy Gen 1)
Questa pagina fornisce passaggi per la risoluzione dei problemi e informazioni sui problemi comuni relativi agli scheduler e ai processori DAG di Airflow.
Identificare l'origine del problema
Per iniziare la risoluzione dei problemi, identifica se il problema si verifica:
- Al momento dell'analisi del DAG, mentre il DAG viene analizzato da un processore DAG di Airflow
- Al momento dell'esecuzione, mentre il DAG viene elaborato da uno scheduler di Airflow
Per saperne di più sul tempo di analisi e sul tempo di esecuzione, consulta Differenza tra tempo di analisi del DAG e tempo di esecuzione del DAG.
Esaminare i problemi di elaborazione dei DAG
Monitorare le attività in esecuzione e in coda
Per verificare se sono presenti attività bloccate in una coda, segui questi passaggi.
In Google Cloud console, vai alla pagina Ambienti.
Nell'elenco degli ambienti, fai clic sul nome del tuo ambiente. Si apre la pagina Dettagli ambiente.
Vai alla scheda Monitoraggio.
Nella scheda Monitoraggio, esamina il grafico Attività Airflow nella sezione Esecuzioni di DAG e identifica i possibili problemi. Le attività Airflow sono attività in stato di coda in Airflow e possono essere inserite nella coda dell'intermediario Celery o Kubernetes Executor. Le attività nella coda Celery sono istanze di attività che vengono inserite nella coda dell'intermediario Celery.
Risolvere i problemi al momento dell'analisi dei DAG
Le sezioni seguenti descrivono i sintomi e le potenziali correzioni per alcuni problemi comuni al momento dell'analisi dei DAG.
Numero e distribuzione temporale delle attività
Airflow può avere problemi durante la pianificazione di un numero elevato di DAG o attività contemporaneamente. Per evitare problemi di pianificazione, puoi:
- Modificare i DAG in modo che utilizzino un numero inferiore di attività più consolidate.
- Modificare gli intervalli di pianificazione dei DAG per distribuire le esecuzioni dei DAG in modo più uniforme nel tempo.
Scalare la configurazione di Airflow
Airflow fornisce opzioni di configurazione di Airflow che controllano il numero di attività e DAG che Airflow può eseguire contemporaneamente. Per impostare queste opzioni di configurazione, esegui l'override dei relativi valori per il tuo ambiente. Puoi anche impostare alcuni di questi valori a livello di DAG o attività.
-
Il parametro
[celery]worker_concurrencycontrolla il numero massimo di attività che un worker di Airflow può eseguire contemporaneamente. Se moltiplichi il valore di questo parametro per il numero di worker di Airflow nel tuo ambiente Managed Airflow, ottieni il numero massimo di attività che possono essere eseguite in un determinato momento nel tuo ambiente. Questo numero è limitato dall'opzione di configurazione di Airflow[core]parallelism, descritta di seguito.Negli ambienti Managed Airflow (Gen 3), il valore predefinito di
[celery]worker_concurrencyviene calcolato automaticamente in base al numero di istanze di attività simultanee leggere che un worker può gestire. Ciò significa che il suo valore dipende dai limiti delle risorse dei worker. Il valore di contemporaneità dei worker non dipende dal numero di worker nel tuo ambiente. Numero massimo di esecuzioni di DAG attive
L'opzione di configurazione di Airflow
[core]max_active_runs_per_dagcontrolla il numero massimo di esecuzioni di DAG attive per DAG. Se raggiunge questo limite, lo scheduler non crea altre esecuzioni di DAG.Se questo parametro è impostato in modo errato, potresti riscontrare un problema per cui lo scheduler limita l'esecuzione dei DAG perché non può creare altre istanze di esecuzione dei DAG in un determinato momento.
Puoi anche impostare questo valore a livello di DAG con il parametro
max_active_runs.Numero massimo di attività attive per DAG
L'opzione di configurazione di Airflow
[core]max_active_tasks_per_dagcontrolla il numero massimo di istanze di attività che possono essere eseguite contemporaneamente in ogni DAG.Se questo parametro è impostato in modo errato, potresti riscontrare un problema per cui l'esecuzione di una singola istanza di DAG è lenta perché è possibile eseguire solo un numero limitato di attività DAG in un determinato momento. In questo caso, puoi aumentare il valore di questa opzione di configurazione.
Puoi anche impostare questo valore a livello di DAG con il parametro
max_active_tasks.Puoi utilizzare
max_active_tis_per_dagemax_active_tis_per_dagrunparametri a livello di attività per controllare il numero di istanze con un ID attività specifico che possono essere eseguite per DAG e per esecuzione di DAG.Parallelismo e dimensioni del pool
L'opzione di configurazione di Airflow
[core]parallelismcontrolla il numero di attività che lo scheduler di Airflow può mettere in coda nella coda dell'Executor dopo che tutte le dipendenze di queste attività sono state soddisfatte.Si tratta di un parametro globale per l'intera configurazione di Airflow.
Le attività vengono messe in coda ed eseguite all'interno di un pool. Gli ambienti Managed Airflow utilizzano un solo pool. Le dimensioni di questo pool controllano il numero di attività che lo scheduler può mettere in coda per l'esecuzione in un determinato momento. Se le dimensioni del pool sono troppo piccole, lo scheduler non può mettere in coda le attività per l'esecuzione anche se le soglie, definite dall'opzione di configurazione
[core]parallelisme dall'opzione di configurazione[celery]worker_concurrencymoltiplicata per il numero di worker di Airflow, non sono ancora state raggiunte.Puoi configurare le dimensioni del pool nella UI di Airflow (Admin > Pools). Regola le dimensioni del pool in base al livello di parallelismo previsto nel tuo ambiente.
In genere,
[core]parallelismè impostato come prodotto del numero massimo di worker e[celery]worker_concurrency.
Risolvere i problemi relativi alle attività in esecuzione e in coda
Le sezioni seguenti descrivono i sintomi e le potenziali correzioni per alcuni problemi comuni relativi alle attività in esecuzione e in coda.
Le esecuzioni di DAG non vengono eseguite
Sintomo:
Quando una data di pianificazione per un DAG viene impostata dinamicamente, possono verificarsi vari effetti collaterali imprevisti. Ad esempio:
L'esecuzione di un DAG è sempre futura e il DAG non viene mai eseguito.
Le esecuzioni di DAG precedenti sono contrassegnate come eseguite e riuscite, anche se non sono state eseguite.
Per saperne di più, consulta la documentazione di Apache Airflow.
Possibili soluzioni:
Segui le indicazioni riportate nella documentazione di Apache Airflow.
Imposta
start_datestatico per i DAG. In alternativa, puoi utilizzarecatchup=Falseper disabilitare l'esecuzione del DAG per le date precedenti.Evita di utilizzare
datetime.now()odays_ago(<number of days>)a meno che tu non sia a conoscenza degli effetti collaterali di questo approccio.
Utilizzare la funzionalità TimeTable dello scheduler di Airflow
Managed Airflow (Gen 3) non supporta i plug-in personalizzati per lo scheduler di Airflow, incluse le tabelle temporali implementate nei DAG. I plug-in non vengono sincronizzati con gli scheduler nel tuo ambiente.
Puoi comunque utilizzare le tabelle temporali integrate in Managed Airflow (Gen 3).
Evitare la pianificazione delle attività durante i periodi di manutenzione
Puoi definire i periodi di manutenzione per il tuo ambiente in modo che la manutenzione dell'ambiente avvenga al di fuori degli orari in cui esegui i DAG. Puoi comunque eseguire i DAG durante i periodi di manutenzione, a condizione che sia accettabile che alcune attività possano essere interrotte e riprovate. Per saperne di più su come i periodi di manutenzione influiscono sul tuo ambiente, consulta Specificare i periodi di manutenzione.
Utilizzo di "wait_for_downstream" nei DAG
Se imposti il parametro wait_for_downstream su True nei DAG, per il successo di un'attività devono avere esito positivo anche tutte le attività immediatamente a valle di questa attività. Ciò significa che l'esecuzione delle attività appartenenti a una determinata esecuzione di DAG potrebbe essere rallentata dall'esecuzione delle attività dell'esecuzione di DAG precedente. Per saperne di più, consulta la documentazione di Airflow in
Airflow.
Le attività in coda per troppo tempo verranno annullate e riprogrammate
Se un'attività Airflow viene mantenuta in coda per troppo tempo, lo scheduler la riprogramma per l'esecuzione dopo che è trascorso il periodo di tempo impostato nell'opzione di configurazione di Airflow [scheduler]task_queued_timeout. Il valore predefinito è 2400.
Un modo per osservare i sintomi di questa situazione è esaminare il grafico con il numero di attività in coda (scheda "Monitoraggio" nella UI di Managed Airflow). Se i picchi in questo grafico non diminuiscono entro circa due ore, le attività verranno molto probabilmente riprogrammate (senza log), seguite dalle voci di log "Adopted tasks were still pending ..." nei log dello scheduler. In questi casi, potresti visualizzare il messaggio "Log file is not found..." nei log delle attività Airflow perché l'attività non è stata eseguita.
In generale, questo comportamento è previsto e l'istanza successiva dell'attività pianificata deve essere eseguita in base alla pianificazione. Se osservi molti casi di questo tipo nei tuoi ambienti Managed Airflow, potrebbe significare che non ci sono worker di Airflow sufficienti nel tuo ambiente per elaborare tutte le attività pianificate.
Risoluzione: per risolvere il problema, devi assicurarti che i worker di Airflow abbiano sempre la capacità di eseguire le attività in coda. Ad esempio, puoi aumentare il numero di worker o worker_concurrency. Puoi anche ottimizzare il parallelismo o i pool per evitare di mettere in coda più attività di quelle che hai a disposizione.
Approccio di Managed Airflow al parametro min_file_process_interval
Managed Airflow modifica il modo in cui
[scheduler]min_file_process_interval
viene utilizzato dallo scheduler di Airflow.
Lo scheduler di Airflow viene riavviato dopo un certo numero di volte in cui tutti i DAG
vengono pianificati e il [scheduler]num_runs parametro
controlla il numero di volte in cui viene eseguito dallo scheduler. Quando lo scheduler raggiunge i loop di pianificazione [scheduler]num_runs, viene riavviato. Lo scheduler è un componente stateless e questo riavvio è un meccanismo di riparazione automatica per eventuali problemi che lo scheduler potrebbe riscontrare. Il valore predefinito di [scheduler]num_runs è 5000.
[scheduler]min_file_process_interval può essere utilizzato per configurare la frequenza con cui viene eseguita l'analisi dei DAG, ma questo parametro non può essere più lungo del tempo necessario a uno scheduler per eseguire i loop [scheduler]num_runs durante la pianificazione dei DAG.
Contrassegnare le attività come non riuscite dopo aver raggiunto dagrun_timeout
Lo scheduler contrassegna come non riuscite le attività non completate (in esecuzione, pianificate e in coda)
se un'esecuzione di DAG non viene completata entro
dagrun_timeout (un parametro DAG).
Soluzione:
Estendi
dagrun_timeoutper soddisfare il timeout.Aumenta il numero di worker o i parametri di rendimento dei worker, in modo che il DAG venga eseguito più rapidamente.
Sintomi del database Airflow sotto carico elevato
A volte, nei log dei worker di Airflow potresti visualizzare la seguente voce di log di avviso:
psycopg2.OperationalError: connection to server at ... failed
Questi errori o avvisi potrebbero essere un sintomo del sovraccarico del database Airflow a causa del numero di connessioni aperte o del numero di query eseguite contemporaneamente, dagli scheduler o da altri componenti di Airflow come worker, triggerer e server web.
Possibili soluzioni:
Esegui lo scale up del database Airflow regolando le dimensioni dell'ambiente.
Riduci il numero di scheduler e processori DAG. Inizia con uno scheduler e un processore DAG, quindi aumenta il numero di scheduler o processori DAG se noti che questi componenti si avvicinano ai limiti delle risorse.
Evita di utilizzare variabili globali nei DAG di Airflow. Utilizza invece le variabili di ambiente e le variabili di Airflow.
Imposta
[scheduler]scheduler_heartbeat_secsu un valore più alto, ad esempio 15 secondi o più.Imposta
[scheduler]job_heartbeat_secsu un valore più alto, ad esempio 30 secondi o più.Imposta
[scheduler]scheduler_health_check_thresholdsu un valore uguale a[scheduler]job_heartbeat_secmoltiplicato per4.
Il server web mostra l'avviso "Lo scheduler non sembra essere in esecuzione"
Lo scheduler segnala regolarmente il suo heartbeat al database Airflow. In base a queste informazioni, il server web di Airflow determina se lo scheduler è attivo.
A volte, se lo scheduler è sotto carico elevato, potrebbe non essere in grado di
segnalare il suo heartbeat ogni
[scheduler]scheduler_heartbeat_sec.
In questa situazione, il server web di Airflow potrebbe mostrare il seguente avviso:
The scheduler does not appear to be running. Last heartbeat was received <X>
seconds ago.
Possibili soluzioni:
Aumenta le risorse di CPU e memoria per lo scheduler.
Ottimizza i DAG in modo che l'analisi e la pianificazione siano più veloci e non consumino troppe risorse dello scheduler.
Evita di utilizzare variabili globali nei DAG di Airflow. Utilizza invece le variabili di ambiente e le variabili di Airflow.
Aumenta il valore dell'opzione di configurazione di Airflow
[scheduler]scheduler_health_check_threshold, in modo che il server web attenda più a lungo prima di segnalare l'indisponibilità dello scheduler.
Soluzioni alternative per i problemi riscontrati durante il backfill dei DAG
A volte, potresti voler eseguire di nuovo i DAG già eseguiti. Puoi farlo con un comando della CLI di Airflow nel seguente modo:
gcloud composer environments run \
ENVIRONMENT_NAME \
--location LOCATION \
dags backfill -- -B \
-s START_DATE \
-e END_DATE \
DAG_NAME
Per eseguire di nuovo solo le attività non riuscite per un DAG specifico, utilizza anche l'argomento --rerun-failed-tasks.
Sostituisci:
ENVIRONMENT_NAMEcon il nome dell'ambiente.LOCATIONcon la regione in cui si trova l'ambiente.START_DATEcon un valore per il parametro DAGstart_date, nel formatoYYYY-MM-DD.END_DATEcon un valore per il parametro DAGend_date, nel formatoYYYY-MM-DD.DAG_NAMEcon il nome del DAG.
L'operazione di backfill a volte può generare una situazione di deadlock in cui il backfill non è possibile perché è presente un blocco su un'attività. Ad esempio:
2022-11-08 21:24:18.198 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.201 CET -------- --------- -------- ------------
2022-11-08 21:24:18.202 CET 2022-11-08 21:24:18.203 CET These tasks are deadlocked:
2022-11-08 21:24:18.203 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.204 CET ----------------------- ----------- ----------------------------------- ------------
2022-11-08 21:24:18.204 CET <DAG name> <Task name> backfill__2022-10-27T00:00:00+00:00 1
2022-11-08 21:24:19.249 CET Command exited with return code 1
...
2022-11-08 21:24:19.348 CET Failed to execute job 627927 for task backfill
In alcuni casi, puoi utilizzare le seguenti soluzioni alternative per superare i deadlock:
Disattiva il mini-scheduler eseguendo l'override di
[core]schedule_after_task_executionsuFalse.Esegui i backfill per intervalli di date più ristretti. Ad esempio, imposta
START_DATEeEND_DATEper specificare un periodo di un solo giorno.