Risoluzione dei problemi relativi agli upgrade e agli aggiornamenti degli ambienti

Managed Airflow (terza generazione) | Managed Airflow (seconda generazione) | Managed Airflow (prima generazione legacy)

Questa pagina fornisce informazioni per la risoluzione dei problemi che potresti riscontrare durante l'aggiornamento o l'upgrade degli ambienti Managed Service for Apache Airflow.

Per informazioni sulla risoluzione dei problemi relativi alla creazione di ambienti, consulta Risoluzione dei problemi di creazione dell'ambiente.

Quando gli ambienti Managed Airflow vengono aggiornati, la maggior parte dei problemi si verifica per i seguenti motivi:

  • Problemi di autorizzazione dell'account di servizio
  • Problemi di dipendenza di PyPI
  • Dimensioni del database Airflow

Autorizzazioni insufficienti per aggiornare o eseguire l'upgrade di un ambiente

Se Managed Airflow non riesce ad aggiornare o eseguire l'upgrade di un ambiente a causa di autorizzazioni insufficienti, restituisce il seguente messaggio di errore:

ERROR: (gcloud.composer.environments.update) PERMISSION_DENIED: The caller does not have permission

Soluzione: assegna i ruoli sia al tuo account sia all'account di servizio del tuo ambiente come descritto in Controllo dell'accesso.

L'account di servizio dell'ambiente non dispone di autorizzazioni sufficienti

Quando crei un ambiente Airflow gestito, specifichi un account di servizio che esegue la maggior parte delle operazioni dell'ambiente. Se questo account di servizio non dispone di autorizzazioni sufficienti per l'operazione richiesta, Managed Airflow restituisce un errore:

    UPDATE operation on this environment failed 3 minutes ago with the
    following error message:
    Composer Backend timed out. Currently running tasks are [stage:
    CP_COMPOSER_AGENT_RUNNING
    description: "No agent response published."
    response_timestamp {
      seconds: 1618203503
      nanos: 291000000
    }
    ].

Soluzione: assegna i ruoli al tuo Account Google e all'account di servizio del tuo ambiente come descritto in Controllo dell'accesso.

Le dimensioni del database Airflow sono troppo grandi per eseguire l'operazione

Un'operazione di upgrade potrebbe non riuscire perché le dimensioni del database Airflow sono troppo grandi per il successo delle operazioni di upgrade.

Se le dimensioni del database Airflow sono superiori a 20 GB, Airflow gestito restituisce il seguente errore:

Airflow database uses more than 20 GB. Please clean the database before upgrading.

Soluzione: esegui la pulizia del database Airflow, come descritto in Pulizia del database Airflow.

L'upgrade a una nuova versione di Managed Airflow non riesce a causa di conflitti tra i pacchetti PyPI

Quando esegui l'upgrade di un ambiente con pacchetti PyPI personalizzati installati, potresti riscontrare errori relativi a conflitti tra i pacchetti PyPI. Questo potrebbe accadere perché la nuova build di Airflow contiene versioni successive dei pacchetti preinstallati. Ciò può causare conflitti di dipendenza con i pacchetti PyPI installati nell'ambiente.

Soluzione:

  • Per informazioni dettagliate sui conflitti tra i pacchetti, esegui un controllo dell'upgrade.
  • Allenta i vincoli di versione per i pacchetti PyPI personalizzati installati. Ad esempio, anziché specificare una versione come ==1.0.1, specificala come >=1.0.1.
  • Per ulteriori informazioni sulla modifica dei requisiti di versione per risolvere le dipendenze in conflitto, consulta la documentazione di pip.

Esamina gli avvisi di migrazione non riuscita

Quando esegui l'upgrade di Airflow a una versione successiva, a volte vengono applicati nuovi vincoli al database Airflow. Se questi vincoli non possono essere applicati, Airflow crea nuove tabelle per archiviare le righe per le quali non è stato possibile applicare i vincoli. L'interfaccia utente di Airflow mostra un messaggio di avviso finché le tabelle di dati spostate non vengono rinominate o eliminate.

Soluzione:

Puoi utilizzare i due DAG seguenti per esaminare i dati spostati e rinominare le tabelle.

Il DAG list_moved_tables_after_upgrade_dag elenca le righe spostate da ogni tabella in cui non è stato possibile applicare i vincoli. Esamina i dati e decidi se vuoi conservarli. Per conservarli, devi correggere manualmente i dati nel database Airflow, ad esempio aggiungendo di nuovo le righe con i dati corretti.

Se non hai bisogno dei dati o se li hai già corretti, puoi eseguire il DAG rename_moved_tables_after_upgrade_dag. Questo DAG rinomina le tabelle spostate. Le tabelle e i relativi dati non vengono eliminati, quindi puoi esaminare i dati in un secondo momento.

"""
When upgrading Airflow to a newer version,
it might happen that some data cannot be migrated,
often because of constraint changes in the metadata base.
This file contains 2 DAGs:

1. 'list_moved_tables_after_upgrade_dag'
  Prints the rows which failed to be migrated.
2. 'rename_moved_tables_after_upgrade_dag'
  Renames the table which contains the failed migrations. This will remove the
  warning message from airflow.
"""

import datetime
import logging

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.settings import AIRFLOW_MOVED_TABLE_PREFIX


def get_moved_tables():
    hook = PostgresHook(postgres_conn_id="airflow_db")
    return hook.get_records(
        "SELECT schemaname, tablename FROM pg_catalog.pg_tables WHERE tablename"
        f" LIKE '{AIRFLOW_MOVED_TABLE_PREFIX}_%'"
    )


def list_moved_records():
    tables = get_moved_tables()
    if not tables:
        logging.info("No moved tables found")
        return

    hook = PostgresHook(postgres_conn_id="airflow_db")
    for schema, table in tables:
        df = hook.get_pandas_df(f"SELECT * FROM {schema}.{table}")
        logging.info(df.to_markdown())


def rename_moved_tables():
    tables = get_moved_tables()
    if not tables:
        return

    hook = PostgresHook(postgres_conn_id="airflow_db")
    for schema, table in tables:
        hook.run(f"ALTER TABLE {schema}.{table} RENAME TO _abandoned_{table}")


with DAG(
    dag_id="list_moved_tables_after_upgrade_dag",
    start_date=datetime.datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
):
    t1 = PythonOperator(
        task_id="list_moved_records", python_callable=list_moved_records
    )

with DAG(
    dag_id="rename_moved_tables_after_upgrade_dag",
    start_date=datetime.datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
) as dag:
    t1 = PythonOperator(
        task_id="rename_moved_tables", python_callable=rename_moved_tables
    )

Passaggi successivi