Managed Airflow (terza generazione) | Managed Airflow (seconda generazione) | Managed Airflow (prima generazione legacy)
Questa pagina descrive come gestire i DAG nell'ambiente Airflow gestito.
Airflow gestito utilizza un bucket Cloud Storage per archiviare i DAG del tuo ambiente Airflow gestito. Il tuo ambiente sincronizza i DAG da questo bucket ai componenti Airflow, come worker di Airflow e scheduler.
Prima di iniziare
- Poiché Apache Airflow non fornisce un isolamento DAG efficace, ti consigliamo di mantenere ambienti di produzione e di test separati per evitare interferenze tra i DAG. Per saperne di più, consulta la sezione Testare i DAG.
- Assicurati che il tuo account disponga delle autorizzazioni necessarie per gestire i DAG.
- Le modifiche ai DAG vengono propagate ad Airflow entro 3-5 minuti. Puoi visualizzare lo stato delle attività nella UI di Airflow.
Accedere al bucket dell'ambiente
Per accedere al bucket associato al tuo ambiente:
Console
Nella Google Cloud console, vai alla pagina Ambienti.
Nell'elenco degli ambienti, individua una riga con il nome del tuo ambiente e nella colonna Cartella DAG fai clic sul link DAG. Si apre la pagina Dettagli bucket. Mostra i contenuti della cartella
/dagsnel bucket del tuo ambiente.
gcloud
gcloud CLI ha comandi separati per aggiungere e eliminare i DAG nel bucket dell'ambiente.
Se vuoi interagire con il bucket del tuo ambiente, puoi anche utilizzare Google Cloud CLI. Per ottenere l'indirizzo del bucket del tuo ambiente, esegui il seguente comando gcloud CLI:
gcloud composer environments describe ENVIRONMENT_NAME \
--location LOCATION \
--format="get(config.dagGcsPrefix)"
Sostituisci:
ENVIRONMENT_NAMEcon il nome dell'ambiente.LOCATIONcon la regione in cui si trova l'ambiente.
Esempio:
gcloud beta composer environments describe example-environment \
--location us-central1 \
--format="get(config.dagGcsPrefix)"
API
Crea una richiesta API environments.get. Nella risorsa Environment, nella risorsa EnvironmentConfig, nella risorsa dagGcsPrefix si trova l'indirizzo del bucket del tuo ambiente.
Esempio:
GET https://composer.googleapis.com/v1/projects/example-project/
locations/us-central1/environments/example-environment
Python
Utilizza la libreria google-auth per ottenere le credenziali e la libreria requests per chiamare l'API REST.
Aggiungere o aggiornare un DAG
Per aggiungere o aggiornare un DAG, sposta il file Python .py del DAG nella cartella /dags del bucket dell'ambiente.
Console
Nella Google Cloud console, vai alla pagina Ambienti.
Nell'elenco degli ambienti, individua una riga con il nome del tuo ambiente e nella colonna Cartella DAG fai clic sul link DAG. Si apre la pagina Dettagli bucket. Mostra i contenuti della cartella
/dagsnel bucket del tuo ambiente.Fai clic su Carica file. Seleziona il file Python
.pydel DAG utilizzando la finestra di dialogo del browser e conferma.
gcloud
gcloud composer environments storage dags import \
--environment ENVIRONMENT_NAME \
--location LOCATION \
--source="LOCAL_FILE_TO_UPLOAD"
Sostituisci:
ENVIRONMENT_NAMEcon il nome dell'ambiente.LOCATIONcon la regione in cui si trova l'ambiente.LOCAL_FILE_TO_UPLOADè il file Python.pydel DAG.
Esempio:
gcloud composer environments storage dags import \
--environment example-environment \
--location us-central1 \
--source="example_dag.py"
Aggiornare un DAG con esecuzioni di DAG attive
Se aggiorni un DAG con esecuzioni di DAG attive:
- Tutte le attività attualmente in esecuzione vengono completate utilizzando il file DAG originale.
- Tutte le attività pianificate ma non attualmente in esecuzione utilizzano il file DAG aggiornato.
- Tutte le attività non più presenti nel file DAG aggiornato vengono contrassegnate come rimosse.
Aggiornare i DAG eseguiti con una pianificazione frequente
Dopo aver caricato un file DAG, Airflow impiega un po' di tempo per caricare questo file e aggiornare il DAG. Se il DAG viene eseguito con una pianificazione frequente, potresti voler assicurarti che utilizzi la versione aggiornata del file DAG. Ecco come fare:
Metti in pausa il DAG nella UI di Airflow.
Carica un file DAG aggiornato.
Attendi finché non visualizzi gli aggiornamenti nella UI di Airflow. Ciò significa che il DAG è stato analizzato correttamente dallo scheduler e aggiornato nel database Airflow.
Se la UI di Airflow mostra i DAG aggiornati, non è garantito che i worker di Airflow abbiano la versione aggiornata del file DAG. Questo accade perché i file DAG vengono sincronizzati in modo indipendente per scheduler e worker.
Potresti voler estendere il tempo di attesa per assicurarti che il file DAG sia sincronizzato con tutti i worker del tuo ambiente. La sincronizzazione avviene più volte al minuto. In un ambiente integro, sono sufficienti circa 20-30 secondi di attesa per la sincronizzazione di tutti i worker.
(Facoltativo) Se vuoi essere assolutamente certo che tutti i worker abbiano la nuova versione del file DAG, esamina i log di ogni singolo worker. Ecco come fare:
Apri la scheda Log del tuo ambiente nella Google Cloud console.
Vai a Log di Composer > Infrastruttura > Sincronizzazione di Cloud Storage ed esamina i log di ogni worker nel tuo ambiente. Cerca la voce di log
Syncing dags directorypiù recente con un timestamp successivo al caricamento del nuovo file DAG. Se vedi una voceFinished syncingche la segue, significa che i DAG sono stati sincronizzati correttamente su questo worker.
Riattiva il DAG.
Analizzare di nuovo un DAG
Poiché i DAG sono archiviati nel bucket dell'ambiente, ogni DAG viene sincronizzato prima con il processore DAG, che poi lo analizza con un breve ritardo. Se analizzi di nuovo un DAG manualmente, ad esempio, tramite la UI di Airflow, il processore DAG analizza di nuovo la versione corrente del DAG a sua disposizione, che potrebbe non essere l'ultima versione del DAG che hai caricato nel bucket dell'ambiente.
Ti consigliamo di utilizzare l'analisi su richiesta solo se riscontri tempi di analisi lunghi. Ad esempio, questo potrebbe accadere se il tuo ambiente ha un numero elevato di file o se nelle opzioni di configurazione di Airflow è configurato un intervallo di analisi DAG lungo.
Eliminare un DAG nell'ambiente
Per eliminare un DAG, rimuovi il file Python .py del DAG dalla cartella /dags dell'ambiente nel bucket dell'ambiente.
Console
Nella Google Cloud console, vai alla pagina Ambienti.
Nell'elenco degli ambienti, individua una riga con il nome del tuo ambiente e nella colonna Cartella DAG fai clic sul link DAG. Si apre la pagina Dettagli bucket. Mostra i contenuti della cartella
/dagsnel bucket del tuo ambiente.Seleziona il file DAG, fai clic su Elimina e conferma l'operazione.
gcloud
gcloud composer environments storage dags delete \
--environment ENVIRONMENT_NAME \
--location LOCATION \
DAG_FILE
Sostituisci:
ENVIRONMENT_NAMEcon il nome dell'ambiente.LOCATIONcon la regione in cui si trova l'ambiente.DAG_FILEcon il file Python.pydel DAG.
Esempio:
gcloud composer environments storage dags delete \
--environment example-environment \
--location us-central1 \
example_dag.py
Rimuovere un DAG dalla UI di Airflow
Per rimuovere i metadati di un DAG dalla UI di Airflow:
UI di Airflow
- Vai alla UI di Airflow per il tuo ambiente.
- Per il DAG, fai clic su Elimina DAG.
gcloud
Esegui il seguente comando in gcloud CLI:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
dags delete -- DAG_NAME
Sostituisci:
ENVIRONMENT_NAMEcon il nome dell'ambiente.LOCATIONcon la regione in cui si trova l'ambiente.DAG_NAMEcon il nome del DAG da eliminare.
Passaggi successivi
- Scrittura dei DAG
- Testare, sincronizzare e implementare i DAG da GitHub
- Risoluzione dei problemi dei DAG