Managed Airflow (terza generazione) | Managed Airflow (seconda generazione) | Managed Airflow (prima generazione legacy)
Questa pagina spiega come funzionano la pianificazione e l'attivazione dei DAG in Airflow, come definire una pianificazione per un DAG e come attivare o mettere in pausa un DAG manualmente.
Informazioni sui DAG Airflow in Managed Airflow
I DAG Airflow in Managed Airflow vengono eseguiti in uno o più ambienti Managed Airflow nel tuo progetto. Carichi i file di origine dei DAG Airflow in un bucket Cloud Storage associato a un ambiente. L'istanza di Airflow dell'ambiente analizza questi file e pianifica le esecuzioni dei DAG, come definito dalla pianificazione di ogni DAG. Durante l'esecuzione di un DAG, Airflow pianifica ed esegue le singole attività che compongono un DAG in una sequenza definita dal DAG.
Per saperne di più sui concetti di base di Airflow, come i DAG Airflow, le esecuzioni di DAG, attività o operatori, consulta la pagina Concetti di base nella documentazione di Airflow.
Informazioni sulla pianificazione dei DAG in Airflow
Airflow fornisce i seguenti concetti per il meccanismo di pianificazione:
- Data logica
Rappresenta una data per la quale viene eseguita una determinata esecuzione di DAG.
Non è la data effettiva in cui Airflow esegue un DAG, ma un periodo di tempo che una determinata esecuzione di DAG deve elaborare. Ad esempio, per un DAG pianificato per l'esecuzione ogni giorno alle 12:00, la data logica sarebbe anche le 12:00 di un giorno specifico. Poiché viene eseguito due volte al giorno, il periodo di tempo che deve elaborare è di 12 ore. Allo stesso tempo, la logica definita nel DAG stesso potrebbe non utilizzare affatto la data logica o l'intervallo di tempo. Ad esempio, un DAG potrebbe eseguire lo stesso script una volta al giorno senza utilizzare il valore della data logica.
Nelle versioni di Airflow precedenti alla 2.2, questa data è chiamata data di esecuzione.
- Data esecuzione
Rappresenta una data in cui viene eseguita una determinata esecuzione di DAG.
Ad esempio, per un DAG pianificato per l'esecuzione ogni giorno alle 12:00, l'esecuzione effettiva del DAG potrebbe avvenire alle 12:05, qualche tempo dopo il passaggio della data logica.
- Intervallo pianificazione
Rappresenta quando e con quale frequenza un DAG deve essere eseguito, in termini di date logiche.
Ad esempio, una pianificazione giornaliera significa che un DAG viene eseguito una volta al giorno e le date logiche per le esecuzioni dei DAG hanno intervalli di 24 ore.
- Data di inizio
Specifica quando vuoi che Airflow inizi a pianificare il DAG.
Le attività nel DAG possono avere date di inizio individuali oppure puoi specificare una singola data di inizio per tutte le attività. In base alla data di inizio minima per le attività nel DAG e all'intervallo di pianificazione, Airflow pianifica le esecuzioni dei DAG.
- Catchup, backfill e tentativi
Meccanismi per l'esecuzione di esecuzioni di DAG per date passate.
Catchup esegue le esecuzioni di DAG che non sono ancora state eseguite, ad esempio se il DAG è stato messo in pausa per un lungo periodo di tempo e poi riattivato. Puoi utilizzare il backfill per eseguire le esecuzioni di DAG per un determinato intervallo di date. I tentativi specificano il numero di tentativi che Airflow deve effettuare durante l'esecuzione delle attività di un DAG.
La pianificazione funziona nel seguente modo:
Dopo il passaggio della data di inizio, Airflow attende la successiva occorrenza dell'intervallo di pianificazione.
Airflow pianifica la prima esecuzione di DAG alla fine di questo intervallo di pianificazione.
Ad esempio, se un DAG è pianificato per l'esecuzione ogni ora e la data di inizio è alle 12:00 di oggi, la prima esecuzione di DAG avviene alle 13:00 di oggi.
La sezione Pianificare un DAG Airflow in questo documento descrive come configurare la pianificazione per i DAG utilizzando questi concetti. Per saperne di più sulle esecuzioni e sulla pianificazione dei DAG, consulta Esecuzioni di DAG nella documentazione di Airflow.
Informazioni sui modi per attivare un DAG
Airflow offre i seguenti modi per attivare un DAG:
Trigger in base a una pianificazione. Airflow attiva automaticamente il DAG in base alla pianificazione specificata nel file DAG.
Trigger manuale. Puoi attivare un DAG manualmente dalla Google Cloud console, dalla UI di Airflow o eseguendo un comando dell'interfaccia a riga di comando di Airflow da Google Cloud CLI.
Trigger in risposta agli eventi. Il modo standard per attivare un DAG in risposta agli eventi è utilizzare un sensore.
Altri modi per attivare i DAG:
Trigger programmatico. Puoi attivare un DAG utilizzando l'API REST di Airflow. Ad esempio, da uno script Python.
Trigger programmatico in risposta agli eventi. Puoi attivare i DAG in risposta agli eventi utilizzando le funzioni Cloud Run e l'API REST di Airflow.
Prima di iniziare
- Assicurati che il tuo account abbia un ruolo in grado di gestire gli oggetti nei bucket dell'ambiente e di visualizzare e attivare i DAG. Per saperne di più, consulta Controllo dell'accesso.
Pianificare un DAG Airflow
Definisci una pianificazione per un DAG nel file DAG. Modifica la definizione del DAG nel seguente modo:
Individua e modifica il file DAG sul computer. Se non hai il file DAG, puoi scaricarne una copia dal bucket dell'ambiente. Per un nuovo DAG, puoi definire tutti i parametri quando crei il file DAG.
Nel parametro
schedule, definisci la pianificazione. Puoi utilizzare un'espressione cron, ad esempio0 0 * * *, o un preset, ad esempio@daily. Per saperne di più, consulta Espressioni cron e intervalli di tempo nella documentazione di Airflow.Airflow determina le date logiche per le esecuzioni dei DAG in base alla pianificazione che hai impostato.
Nel parametro
start_date, definisci la data di inizio.Airflow determina la data logica della prima esecuzione di DAG utilizzando questo parametro.
(Facoltativo) Nel parametro
catchup, definisci se Airflow deve eseguire tutte le esecuzioni precedenti di questo DAG dalla data di inizio alla data corrente che non sono ancora state eseguite.Le esecuzioni di DAG eseguite durante il catchup avranno la data logica nel passato e la data di esecuzione rifletterà l'ora in cui l'esecuzione di DAG è stata effettivamente eseguita.
(Facoltativo) Nel parametro
retries, definisci il numero di tentativi che Airflow deve effettuare per le attività non riuscite (ogni DAG è composto da una o più attività individuali). Per impostazione predefinita, le attività in Managed Airflow vengono ritentate due volte.Carica la nuova versione del DAG nel bucket dell'ambiente.
Attendi che Airflow analizzi correttamente il DAG. Ad esempio, puoi controllare l'elenco dei DAG nel tuo ambiente nella Google Cloud console o nella UI di Airflow.
La seguente definizione di DAG di esempio viene eseguita due volte al giorno alle 00:00 e alle 12:00. La data di inizio è impostata sul 1° gennaio 2024, ma Airflow non la esegue per le date passate dopo il caricamento o la messa in pausa perché il catchup è disattivato.
Il DAG contiene un'attività denominata insert_query_job, che inserisce una riga in una tabella con l'operatore BigQueryInsertJobOperator. Questo operatore è uno degli
Google Cloud operatori BigQuery,
che puoi utilizzare per gestire set di dati e tabelle, eseguire query e convalidare i dati.
Se una determinata esecuzione di questa attività non riesce, Airflow la ritenta altre quattro volte con l'intervallo di tentativi predefinito. La data logica per questi tentativi rimane la stessa.
La query SQL per questa riga utilizza i modelli Airflow per scrivere la data logica e il nome del DAG nella riga.
import datetime
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
with DAG(
"bq_example_scheduling_dag",
start_date=datetime.datetime(2024, 1, 1),
schedule='0 */12 * * *',
catchup=False
) as dag:
insert_query_job = BigQueryInsertJobOperator(
task_id="insert_query_job",
retries=4,
configuration={
"query": {
# schema: date (string), description (string)
# example row: "20240101T120000", "DAG run: <DAG: bq_example_scheduling_dag>"
"query": "INSERT example_dataset.example_table VALUES ('{{ ts_nodash }}', 'DAG run: {{ dag }}' )",
"useLegacySql": False,
"priority": "BATCH",
}
},
location="us-central1"
)
insert_query_job
Per testare questo DAG, puoi attivarlo manualmente e poi visualizzare i log di esecuzione delle attività.
Altri esempi di parametri di pianificazione
I seguenti esempi di parametri di pianificazione illustrano come funziona la pianificazione con diverse combinazioni di parametri:
Se
start_dateèdatetime(2024, 4, 4, 16, 25)escheduleè30 16 * * *, la prima esecuzione di DAG avviene alle 16:30 del 5 aprile 2024.Se
start_dateèdatetime(2024, 4, 4, 16, 35)escheduleè30 16 * * *, la prima esecuzione di DAG avviene alle 16:30 del 6 aprile 2024. Poiché la data di inizio è successiva all'intervallo di pianificazione del 4 aprile 2024, l'esecuzione di DAG non avviene il 5 aprile 2024. L'intervallo di pianificazione termina invece alle 16:35 del 5 aprile 2024, quindi la prossima esecuzione di DAG è pianificata per le 16:30 del giorno successivo.Se
start_dateèdatetime(2024, 4, 4)escheduleè@daily, la prima esecuzione di DAG è pianificata per le 00:00 del 5 aprile 2024.Se
start_dateèdatetime(2024, 4, 4, 16, 30)escheduleè0 * * * *, la prima esecuzione di DAG è pianificata per le 18:00 del 4 aprile 2024. Dopo il passaggio della data e dell'ora specificate, Airflow pianifica un'esecuzione di DAG al minuto 0 di ogni ora. Il punto nel tempo più vicino in cui ciò si verifica è alle 17:00. A questo punto, Airflow pianifica un'esecuzione di DAG alla fine dell'intervallo di pianificazione, ovvero alle 18:00.
Attivare un DAG manualmente
Quando attivi manualmente un DAG Airflow, Airflow esegue il DAG una volta, indipendentemente dalla pianificazione specificata nel file DAG.
Console
Per attivare un DAG dalla Google Cloud console:
Nella Google Cloud console, vai alla pagina Ambienti.
Seleziona un ambiente per visualizzarne i dettagli.
Nella pagina Dettagli ambiente, vai alla scheda DAG.
Fai clic sul nome di un DAG.
Nella pagina Dettagli DAG, fai clic su Attiva DAG. Viene creata una nuova esecuzione di DAG.
UI di Airflow
Per attivare un DAG dalla UI di Airflow:
Nella Google Cloud console, vai alla pagina Ambienti.
Nella colonna Server web Airflow, segui il link Airflow per il tuo ambiente.
Accedi con l'Account Google che dispone delle autorizzazioni appropriate.
Nella UI di Airflow, nella pagina DAG, fai clic sul pulsante Attiva DAG per il tuo DAG.
gcloud
Esegui il comando dell'interfaccia a riga di comando di Airflow dags trigger:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
dags trigger -- DAG_ID
Sostituisci quanto segue:
ENVIRONMENT_NAME: il nome dell'ambiente.LOCATION: la regione in cui si trova l'ambiente.DAG_ID: il nome del DAG.
Per saperne di più sull'esecuzione dei comandi dell'interfaccia a riga di comando di Airflow negli ambienti Managed Airflow, consulta Eseguire i comandi dell'interfaccia a riga di comando di Airflow.
Per saperne di più sui comandi dell'interfaccia a riga di comando di Airflow disponibili, consulta
il gcloud composer environments run riferimento al comando.
Visualizzare i log e i dettagli dell'esecuzione di DAG
Nella Google Cloud console puoi:
- Visualizzare gli stati delle esecuzioni di DAG precedenti e i dettagli dei DAG.
- Esplorare i log dettagliati di tutte le esecuzioni di DAG e di tutte le attività di questi DAG.
- Visualizzare le statistiche dei DAG.
Inoltre, Managed Airflow fornisce l'accesso alla UI di Airflow, che è l'interfaccia web di Airflow.
Mettere in pausa un DAG
Console
Per mettere in pausa un DAG dalla Google Cloud console:
Nella Google Cloud console, vai alla pagina Ambienti.
Seleziona un ambiente per visualizzarne i dettagli.
Nella pagina Dettagli ambiente, vai alla scheda DAG.
Fai clic sul nome di un DAG.
Nella pagina Dettagli DAG, fai clic su Metti in pausa DAG.
UI di Airflow
Per mettere in pausa un DAG dalla UI di Airflow:
- Nella Google Cloud console, vai alla pagina Ambienti.
Nella colonna Server web Airflow, segui il link Airflow per il tuo ambiente.
Accedi con l'Account Google che dispone delle autorizzazioni appropriate.
Nell'interfaccia web di Airflow, nella pagina DAG, fai clic sul pulsante di attivazione/disattivazione accanto al nome del DAG.
gcloud
Esegui il comando dell'interfaccia a riga di comando di Airflow dags pause:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
dags pause -- DAG_ID
Sostituisci quanto segue:
ENVIRONMENT_NAME: il nome dell'ambiente.LOCATION: la regione in cui si trova l'ambiente.DAG_ID: il nome del DAG.
Per saperne di più sull'esecuzione dei comandi dell'interfaccia a riga di comando di Airflow negli ambienti Managed Airflow, consulta Eseguire i comandi dell'interfaccia a riga di comando di Airflow.
Per saperne di più sui comandi dell'interfaccia a riga di comando di Airflow disponibili, consulta
il gcloud composer environments run riferimento al comando.