Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Questa pagina spiega come trasferire DAG, dati e configurazione dagli ambienti Airflow 1.10.* esistenti agli ambienti con Airflow 2 e versioni successive di Airflow.
Altre guide alla migrazione
Da | A | Metodo | Guida |
---|---|---|---|
Cloud Composer 2 | Cloud Composer 3 | Affiancata, utilizzando lo script di migrazione | Guida alla migrazione degli script |
Cloud Composer 2 | Cloud Composer 3 | Affiancate, utilizzando gli snapshot | Guida alla migrazione degli snapshot |
Cloud Composer 1, Airflow 2 | Cloud Composer 3 | Affiancate, utilizzando gli snapshot | Guida alla migrazione degli snapshot |
Cloud Composer 1, Airflow 2 | Cloud Composer 2 | Affiancate, utilizzando gli snapshot | Guida alla migrazione degli snapshot |
Cloud Composer 1, Airflow 2 | Cloud Composer 2 | Trasferimento manuale affiancato | Guida alla migrazione manuale |
Cloud Composer 1, Airflow 1 | Cloud Composer 2, Airflow 2 | Affiancate, utilizzando gli snapshot | Guida alla migrazione degli snapshot |
Cloud Composer 1, Airflow 1 | Cloud Composer 2, Airflow 2 | Trasferimento manuale affiancato | Guida alla migrazione manuale |
Cloud Composer 1, Airflow 1 | Cloud Composer 1, Airflow 2 | Trasferimento manuale affiancato | Questa guida |
Upgrade side-by-side
Cloud Composer fornisce lo script di trasferimento del database Cloud Composer per eseguire la migrazione del database dei metadati, dei DAG, dei dati e dei plug-in dagli ambienti Cloud Composer con Airflow 1.10.14 e Airflow 1.10.15 agli ambienti Cloud Composer esistenti con Airflow 2.0.1 e versioni successive di Airflow.
Questo è un percorso alternativo a quello descritto in questa guida. Alcune parti di questa guida sono ancora valide quando si utilizza lo script fornito. Ad esempio, potresti voler controllare la compatibilità dei tuoi DAG con Airflow 2 prima di eseguirne la migrazione o assicurarti che non si verifichino esecuzioni simultanee di DAG e che non ci siano esecuzioni di DAG aggiuntive o mancanti.
Prima di iniziare
Prima di iniziare a utilizzare gli ambienti Cloud Composer con Airflow 2, considera le modifiche che Airflow 2 introduce negli ambienti Cloud Composer.
Più scheduler
Puoi utilizzare più di uno scheduler Airflow nel tuo ambiente. Puoi impostare il numero di scheduler quando crei un ambiente o aggiorni un ambiente esistente.
Esecutore Celery+Kubernetes
Airflow 2 Celery+Kubernetes Executor è supportato in Cloud Composer 3.
Modifiche che provocano un errore
Airflow 2 introduce molte modifiche importanti, alcune delle quali sono incompatibili con le versioni precedenti:
- Non è garantito che i DAG esistenti di Airflow 1.10.* funzionino con Airflow 2. Devono essere testati e, se necessario, modificati.
- Operatori, trasferimenti e hook migrati ai pacchetti dei provider. Le istruzioni di importazione nei DAG devono utilizzare i nuovi pacchetti di provider. Le istruzioni di importazione precedenti potrebbero non funzionare più in Airflow 2.
- Alcune configurazioni di Airflow 1.10.* potrebbero non essere più supportate perché Airflow 2 non supporta più determinate opzioni di configurazione.
- Alcuni pacchetti PyPI personalizzati potrebbero non essere compatibili con la nuova versione di Airflow o Python.
- La UI di Airflow con controllo dell'accesso è la UI predefinita di Airflow 2. Airflow 2 non supporta altri tipi di UI di Airflow.
- L'API REST sperimentale viene sostituita dall'API Airflow stabile. L'API REST sperimentale è disattivata per impostazione predefinita in Airflow 2.
- Altre modifiche importanti in Airflow 2.0.0
- Altre modifiche importanti in Airflow 2.0.1
Differenze tra gli ambienti con Airflow 2 e Airflow 1.10.*
Differenze principali tra gli ambienti Cloud Composer con Airflow 1.10.* e gli ambienti con Airflow 2:
- Gli ambienti con Airflow 2 utilizzano Python 3.8. Si tratta di una versione più recente di quella utilizzata negli ambienti Airflow 1.10.*. Python 2, Python 3.6 e Python 3.7 non sono supportati.
- Airflow 2 utilizza un formato CLI diverso. Cloud Composer supporta il nuovo formato negli ambienti con Airflow 2 tramite il comando
gcloud composer environments run
. - I pacchetti PyPI preinstallati sono diversi negli ambienti Airflow 2. Per un elenco dei pacchetti PyPI preinstallati, consulta l'elenco delle versioni di Cloud Composer.
- La serializzazione DAG è sempre abilitata in Airflow 2. Di conseguenza, il caricamento asincrono dei DAG non è più necessario e non è supportato in Airflow 2. Di conseguenza, la configurazione dei parametri
[core]store_serialized_dags
e[core]store_dag_code
non è supportata per Airflow 2 e i tentativi di impostarli verranno segnalati come errori. - I plug-in del server web Airflow non sono supportati. Ciò non influisce sui plug-in di scheduler o worker, inclusi operatori e sensori Airflow.
- Negli ambienti Airflow 2, il ruolo utente Airflow predefinito è
Op
. Per gli ambienti con Airflow 1.10.*, il ruolo predefinito èAdmin
.
Passaggio 1: controlla la compatibilità con Airflow 2
Per verificare la presenza di potenziali conflitti con Airflow 2, consulta la guida Eseguire l'upgrade ad Airflow 2.0+ nella sezione relativa all'upgrade dei DAG.
Un problema comune che potresti riscontrare riguarda i percorsi di importazione incompatibili. Per ulteriori informazioni sulla risoluzione di questo problema di compatibilità, consulta la sezione relativa ai provider di backporting nella guida all'upgrade ad Airflow 2.0+.
Passaggio 2: crea un ambiente Airflow 2, trasferisci gli override della configurazione e le variabili di ambiente
Crea un ambiente Airflow 2 e trasferisci gli override della configurazione e le variabili di ambiente:
Segui i passaggi per creare un ambiente. Prima di creare un ambiente, specifica anche gli override della configurazione e le variabili di ambiente, come spiegato di seguito.
Quando selezioni un'immagine, scegli un'immagine con Airflow 2.
Trasferisci manualmente i parametri di configurazione dal tuo ambiente Airflow 1.10.* al nuovo ambiente Airflow 2.
Console
Quando crei un ambiente, espandi la sezione Networking, override della configurazione di Airflow e funzionalità aggiuntive.
In Override della configurazione Airflow, fai clic su Aggiungi override della configurazione Airflow.
Copia tutti gli override della configurazione dal tuo ambiente Airflow 1.10.*
Alcune opzioni di configurazione utilizzano un nome e una sezione diversi in Airflow 2. Per ulteriori informazioni, vedi Modifiche alla configurazione.
In Variabili di ambiente, fai clic su Aggiungi variabile di ambiente.
Copia tutte le variabili di ambiente dall'ambiente Airflow 1.10.*.
Fai clic su Crea per creare un ambiente.
Passaggio 3: installa i pacchetti PyPI nell'ambiente Airflow 2
Dopo aver creato l'ambiente Airflow 2, installa i pacchetti PyPI:
Console
Nella console Google Cloud , vai alla pagina Ambienti.
Seleziona l'ambiente Airflow 2.
Vai alla scheda Pacchetti PyPI e fai clic su Modifica.
Copia i requisiti dei pacchetti PyPI dal tuo ambiente Airflow 1.10.*. Fai clic su Salva e attendi l'aggiornamento dell'ambiente.
Poiché gli ambienti Airflow 2 utilizzano un insieme diverso di pacchetti preinstallati e una versione di Python diversa, potresti riscontrare conflitti di pacchetti PyPI difficili da risolvere.
Passaggio 4: trasferisci variabili e pool ad Airflow 2
Airflow 1.10.* supporta l'esportazione di variabili e pool in file JSON. Puoi quindi importare questi file nel tuo ambiente Airflow 2.
Devi trasferire i pool solo se hai pool personalizzati diversi da
default_pool
. In caso contrario, ignora i comandi che esportano e importano i pool.
gcloud
Esporta le variabili dall'ambiente Airflow 1.10.*:
gcloud composer environments run AIRFLOW_1_ENV \ --location AIRFLOW_1_LOCATION \ variables -- -e /home/airflow/gcs/data/variables.json
Sostituisci:
AIRFLOW_1_ENV
con il nome del tuo ambiente Airflow 1.10.*.AIRFLOW_1_LOCATION
con la regione in cui si trova l'ambiente.
Esporta i pool dal tuo ambiente Airflow 1.10.*:
gcloud composer environments run AIRFLOW_1_ENV \ --location AIRFLOW_1_LOCATION \ pool -- -e /home/airflow/gcs/data/pools.json
Recupera l'URI del bucket dell'ambiente Airflow 2.
Esegui questo comando:
gcloud composer environments describe AIRFLOW_2_ENV \ --location AIRFLOW_2_LOCATION \ --format="value(config.dagGcsPrefix)"
Sostituisci:
AIRFLOW_2_ENV
con il nome del tuo ambiente Airflow 2.AIRFLOW_2_LOCATION
con la regione in cui si trova l'ambiente.
Nell'output, rimuovi la cartella
/dags
. Il risultato è l'URI del bucket dell'ambiente Airflow 2.Ad esempio, modifica
gs://us-central1-example-916807e1-bucket/dags
ings://us-central1-example-916807e1-bucket
.
Trasferisci i file JSON con variabili e pool nel tuo ambiente Airflow 2:
gcloud composer environments storage data export \ --destination=AIRFLOW_2_BUCKET/data \ --environment=AIRFLOW_1_ENV \ --location=AIRFLOW_1_LOCATION \ --source=variables.json gcloud composer environments storage data export \ --destination=AIRFLOW_2_BUCKET/data \ --environment=AIRFLOW_1_ENV \ --location=AIRFLOW_1_LOCATION \ --source=pools.json
Sostituisci
AIRFLOW_2_BUCKET
con l'URI del bucket dell'ambiente Airflow 2, ottenuto nel passaggio precedente.Importa variabili e pool in Airflow 2:
gcloud composer environments run \ AIRFLOW_2_ENV \ --location AIRFLOW_2_LOCATION \ variables import \ -- /home/airflow/gcs/data/variables.json gcloud composer environments run \ AIRFLOW_2_ENV \ --location AIRFLOW_2_LOCATION \ pools import \ -- /home/airflow/gcs/data/pools.json
Verifica che le variabili e i pool siano importati:
gcloud composer environments run \ AIRFLOW_2_ENV \ --location AIRFLOW_2_LOCATION \ variables list gcloud composer environments run \ AIRFLOW_2_ENV \ --location AIRFLOW_2_LOCATION \ pools list
Rimuovi i file JSON dai bucket:
gcloud composer environments storage data delete \ variables.json \ --environment=AIRFLOW_2_ENV \ --location=AIRFLOW_2_LOCATION gcloud composer environments storage data delete \ pools.json \ --environment=AIRFLOW_2_ENV \ --location=AIRFLOW_2_LOCATION gcloud composer environments storage data delete \ variables.json \ --environment=AIRFLOW_1_ENV \ --location=AIRFLOW_1_LOCATION gcloud composer environments storage data delete \ pools.json \ --environment=AIRFLOW_1_ENV \ --location=AIRFLOW_1_LOCATION
Passaggio 5: trasferisci altri dati dal bucket dell'ambiente Airflow 1.10.*
gcloud
Trasferisci i plug-in al tuo ambiente Airflow 2. Per farlo, esporta i plug-in dal bucket dell'ambiente Airflow 1.10.* alla cartella
/plugins
nel bucket dell'ambiente Airflow 2:gcloud composer environments storage plugins export \ --destination=AIRFLOW_2_BUCKET/plugins \ --environment=AIRFLOW_1_ENV \ --location=AIRFLOW_1_LOCATION
Verifica che la cartella
/plugins
sia stata importata correttamente:gcloud composer environments storage plugins list \ --environment=AIRFLOW_2_ENV \ --location=AIRFLOW_2_LOCATION
Esporta la cartella
/data
dal tuo ambiente Airflow 1.10.* all'ambiente Airflow 2:gcloud composer environments storage data export \ --destination=AIRFLOW_2_BUCKET/data \ --environment=AIRFLOW_1_ENV \ --location=AIRFLOW_1_LOCATION
Verifica che la cartella
/data
sia stata importata correttamente:gcloud composer environments storage data list \ --environment=AIRFLOW_2_ENV \ --location=AIRFLOW_2_LOCATION
Passaggio 6: trasferisci connessioni e utenti
Airflow 1.10.* non supporta l'esportazione di utenti e connessioni. Per trasferire utenti e connessioni, crea manualmente nuovi account utente e connessioni nel tuo ambiente Airflow 2.
gcloud
Per ottenere un elenco delle connessioni nel tuo ambiente Airflow 1.10.*, esegui:
gcloud composer environments run AIRFLOW_1_ENV \ --location AIRFLOW_1_LOCATION \ connections -- --list
Per creare una nuova connessione nell'ambiente Airflow 2, esegui il comando dell'interfaccia a riga di comando di Airflow
connections
tramite gcloud. Ad esempio:gcloud composer environments run \ AIRFLOW_2_ENV \ --location AIRFLOW_2_LOCATION \ connections add \ -- --conn-host postgres.example.com \ --conn-port 5432 \ --conn-type postgres \ --conn-login example_user \ --conn-password example_password \ --conn-description "Example connection" \ example_connection
Per visualizzare un elenco degli utenti nel tuo ambiente Airflow 1.10.*:
Apri l'interfaccia web di Airflow per il tuo ambiente Airflow 1.10.*.
Vai ad Amministrazione > Utenti.
Per creare un nuovo account utente nell'ambiente Airflow 2, esegui il comando CLI Airflow
users create
tramite gcloud. Ad esempio:gcloud composer environments run \ AIRFLOW_2_ENV \ --location AIRFLOW_2_LOCATION \ users create \ -- --username example_username \ --firstname Example-Name \ --lastname Example-Surname \ --email example-user@example.com \ --use-random-password \ --role Admin
Passaggio 7: assicurati che i DAG siano pronti per Airflow 2
Prima di trasferire i DAG al tuo ambiente Airflow 2, assicurati che:
I DAG vengono eseguiti correttamente e non sono presenti problemi di compatibilità rimanenti.
I tuoi DAG utilizzano istruzioni di importazione corrette.
Ad esempio, la nuova istruzione di importazione per
BigQueryCreateDataTransferOperator
potrebbe avere il seguente aspetto:from airflow.providers.google.cloud.operators.bigquery_dts \ import BigQueryCreateDataTransferOperator
I tuoi DAG sono stati aggiornati per Airflow 2. Questa modifica è compatibile con Airflow 1.10.14 e versioni successive.
Passaggio 8: trasferisci i DAG all'ambiente Airflow 2
Quando trasferisci i DAG tra ambienti, potrebbero verificarsi i seguenti problemi potenziali:
Se un DAG è abilitato (non in pausa) in entrambi gli ambienti, ogni ambiente esegue la propria copia del DAG, come pianificato. Ciò potrebbe comportare esecuzioni simultanee di DAG per gli stessi dati e lo stesso tempo di esecuzione.
A causa del recupero del DAG, Airflow pianifica esecuzioni del DAG aggiuntive, a partire dalla data di inizio specificata nei DAG. Ciò accade perché la nuova istanza di Airflow non tiene conto della cronologia delle esecuzioni DAG dell'ambiente 1.10.*. Ciò potrebbe comportare un numero elevato di esecuzioni di DAG pianificate a partire dalla data di inizio specificata.
Impedire l'esecuzione simultanea di DAG
Nel tuo ambiente Airflow 2,
esegui l'override dell'opzione di configurazione di Airflow dags_are_paused_at_creation
. Dopo aver apportato questa modifica, tutti i nuovi DAG vengono
sospesi per impostazione predefinita.
Sezione | Chiave | Valore |
---|---|---|
core |
dags_are_paused_at_creation |
True |
Evitare esecuzioni di DAG aggiuntive o mancanti
Specifica una nuova data di inizio statica nei DAG che trasferisci al tuo ambiente Airflow 2.
Per evitare lacune e sovrapposizioni nelle date di esecuzione, la prima esecuzione del DAG deve avvenire nell'ambiente Airflow 2 alla successiva occorrenza dell'intervallo di pianificazione. A questo scopo, imposta la nuova data di inizio nel DAG in modo che sia precedente alla data dell'ultima esecuzione nell'ambiente Airflow 1.10.*.
Ad esempio, se il tuo DAG viene eseguito alle 15:00, alle 17:00 e alle 21:00 ogni giorno nell'ambiente Airflow 1.10.*, l'ultima esecuzione del DAG è avvenuta alle 15:00 e prevedi di trasferire il DAG alle 15:15, la data di inizio per l'ambiente Airflow 2 può essere oggi alle 14:45. Dopo aver abilitato il DAG nell'ambiente Airflow 2, Airflow pianifica un'esecuzione del DAG per le ore 17:00.
Come altro esempio, se il tuo DAG viene eseguito alle 00:00 ogni giorno nell'ambiente Airflow 1.10.*, l'ultima esecuzione del DAG è avvenuta alle 00:00 del 26 aprile 2021 e prevedi di trasferire il DAG alle 13:00 del 26 aprile 2021, la data di inizio per l'ambiente Airflow 2 può essere le 23:45 del 25 aprile 2021. Dopo aver abilitato il DAG nell'ambiente Airflow 2, Airflow pianifica un'esecuzione del DAG per le ore 00:00 del 27 aprile 2021.
Trasferisci i DAG uno alla volta all'ambiente Airflow 2
Per ogni DAG, segui questa procedura per trasferirlo:
Assicurati che la nuova data di inizio nel DAG sia impostata come descritto nella sezione precedente.
Carica il DAG aggiornato nell'ambiente Airflow 2. Questo DAG è in pausa nell'ambiente Airflow 2 a causa dell'override della configurazione, quindi non sono ancora state pianificate esecuzioni del DAG.
Nell'interfaccia web di Airflow, vai a DAG e verifica la presenza di errori di sintassi dei DAG segnalati.
Al momento in cui prevedi di trasferire il DAG:
Metti in pausa il DAG nell'ambiente Airflow 1.10.*.
Riattiva il DAG nell'ambiente Airflow 2.
Controlla che la nuova esecuzione del DAG sia pianificata all'ora corretta.
Attendi l'esecuzione del DAG nell'ambiente Airflow 2 e verifica se l'esecuzione è riuscita.
A seconda che l'esecuzione del DAG abbia esito positivo:
Se l'esecuzione del DAG va a buon fine, puoi procedere e utilizzare il DAG dal tuo ambiente Airflow 2. Infine, valuta la possibilità di eliminare la versione 1.10.* del DAG di Airflow.
Se l'esecuzione del DAG non è riuscita, prova a risolvere i problemi del DAG finché non viene eseguito correttamente in Airflow 2.
Se necessario, puoi sempre eseguire il rollback alla versione 1.10.* del DAG:
Metti in pausa il DAG nell'ambiente Airflow 2.
Riattiva il DAG nell'ambiente Airflow 1.10.*. In questo modo viene pianificata una nuova esecuzione di DAG per la stessa data e ora dell'esecuzione di DAG non riuscita.
Quando è tutto pronto per continuare con la versione Airflow 2 del DAG, modifica la data di inizio, carica la nuova versione del DAG nell'ambiente Airflow 2 e ripeti la procedura.
Passaggio 9: monitora l'ambiente Airflow 2
Dopo aver trasferito tutti i DAG e la configurazione all'ambiente Airflow 2, monitoralo per rilevare potenziali problemi, esecuzioni di DAG non riuscite e l'integrità generale dell'ambiente. Se l'ambiente Airflow 2 viene eseguito senza problemi per un periodo di tempo sufficiente, puoi rimuovere l'ambiente Airflow 1.10.*.
Passaggi successivi
- Risoluzione dei problemi dei DAG
- Risoluzione dei problemi di creazione dell'ambiente
- Risoluzione dei problemi relativi agli upgrade e agli aggiornamenti degli ambienti
- Utilizzo dei pacchetti di backport