Como resolver problemas de atualização e upgrade de ambientes

Airflow gerenciado (Geração 3) | Airflow gerenciado (Geração 2) | Airflow gerenciado (Geração 1 legada)

Esta página fornece informações para solução de problemas que podem ser encontrados ao atualizar ou fazer upgrade de ambientes do Serviço Gerenciado para Apache Airflow.

Para informações sobre solução de problemas relacionadas à criação de ambientes, consulte Como resolver problemas da criação de ambientes.

Quando os ambientes do Airflow gerenciado são atualizados, a maioria dos problemas acontece pelos seguintes motivos:

  • Problemas de permissão da conta de serviço
  • Problemas de dependência do PyPI
  • Tamanho do banco de dados do Airflow

Permissões insuficientes para atualizar ou fazer upgrade de um ambiente

Se o Airflow gerenciado não puder atualizar ou fazer upgrade de um ambiente devido a permissões insuficientes, a seguinte mensagem de erro será exibida:

ERROR: (gcloud.composer.environments.update) PERMISSION_DENIED: The caller does not have permission

Solução: atribua papéis à sua conta e à conta de serviço do ambiente, conforme descrito em Controle de acesso.

A conta de serviço do ambiente não tem permissões suficientes

Ao criar um ambiente do Airflow gerenciado, você especifica uma conta de serviço que realiza a maioria das operações do ambiente. Se essa conta de serviço não tiver permissões suficientes para a operação solicitada, o Airflow gerenciado gerará um erro:

    UPDATE operation on this environment failed 3 minutes ago with the
    following error message:
    Composer Backend timed out. Currently running tasks are [stage:
    CP_COMPOSER_AGENT_RUNNING
    description: "No agent response published."
    response_timestamp {
      seconds: 1618203503
      nanos: 291000000
    }
    ].

Solução: atribua papéis à sua Conta do Google e à conta de serviço do ambiente, conforme descrito em Controle de acesso.

O tamanho do banco de dados do Airflow é muito grande para realizar a operação

Uma operação de upgrade pode não ter êxito porque o tamanho do banco de dados do Airflow é muito grande para o sucesso das operações de upgrade.

Se o tamanho do banco de dados do Airflow for maior do que 20 GB, o Airflow gerenciado gerará o seguinte erro:

Airflow database uses more than 20 GB. Please clean the database before upgrading.

Solução: execute a limpeza do banco de dados do Airflow, conforme descrito em Limpar o banco de dados do Airflow.

Falha no upgrade para uma nova versão do Airflow gerenciado devido a conflitos de pacote do PyPI

Ao fazer upgrade de um ambiente com pacotes PyPI personalizados instalados, talvez ocorram erros relacionados a conflitos no pacote PyPI. Isso pode acontecer porque a nova versão do Airflow contém versões mais recentes de pacotes pré-instalados. Isso pode causar conflitos de dependência com pacotes PyPI que você instalou no ambiente.

Solução:

  • Para ver informações detalhadas sobre conflitos de pacotes, execute uma verificação de upgrade.
  • Reduza as restrições de versão para pacotes PyPI personalizados instalados. Por exemplo, em vez de especificar uma versão como ==1.0.1, especifique-a como >=1.0.1.
  • Para mais informações sobre como alterar os requisitos de versão para resolver dependências conflitantes, consulte a documentação do pip.

Inspecionar avisos de migração com falha

Ao fazer upgrade do Airflow para uma versão mais recente, às vezes novas restrições são aplicadas ao banco de dados do Airflow. Se essas restrições não puderem ser aplicadas, o Airflow criará novas tabelas para armazenar as linhas em que as restrições não puderam ser aplicadas. A interface do Airflow mostra uma mensagem de aviso até que as tabelas de dados movidas sejam renomeadas ou descartadas.

Solução:

É possível usar os dois DAGs a seguir para inspecionar os dados movidos e renomear as tabelas.

O DAG list_moved_tables_after_upgrade_dag lista as linhas que foram movidas de cada tabela em que as restrições não puderam ser aplicadas. Inspecione os dados e decida se quer mantê-los. Para mantê-los, é necessário corrigir manualmente os dados no banco de dados do Airflow. Por exemplo, adicionando as linhas de volta com os dados corretos.

Se você não precisar dos dados ou se já os corrigiu, execute o DAG rename_moved_tables_after_upgrade_dag. Esse DAG renomeia as tabelas movidas. As tabelas e os dados não são excluídos, então você pode revisar os dados mais tarde.

"""
When upgrading Airflow to a newer version,
it might happen that some data cannot be migrated,
often because of constraint changes in the metadata base.
This file contains 2 DAGs:

1. 'list_moved_tables_after_upgrade_dag'
  Prints the rows which failed to be migrated.
2. 'rename_moved_tables_after_upgrade_dag'
  Renames the table which contains the failed migrations. This will remove the
  warning message from airflow.
"""

import datetime
import logging

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.settings import AIRFLOW_MOVED_TABLE_PREFIX


def get_moved_tables():
    hook = PostgresHook(postgres_conn_id="airflow_db")
    return hook.get_records(
        "SELECT schemaname, tablename FROM pg_catalog.pg_tables WHERE tablename"
        f" LIKE '{AIRFLOW_MOVED_TABLE_PREFIX}_%'"
    )


def list_moved_records():
    tables = get_moved_tables()
    if not tables:
        logging.info("No moved tables found")
        return

    hook = PostgresHook(postgres_conn_id="airflow_db")
    for schema, table in tables:
        df = hook.get_pandas_df(f"SELECT * FROM {schema}.{table}")
        logging.info(df.to_markdown())


def rename_moved_tables():
    tables = get_moved_tables()
    if not tables:
        return

    hook = PostgresHook(postgres_conn_id="airflow_db")
    for schema, table in tables:
        hook.run(f"ALTER TABLE {schema}.{table} RENAME TO _abandoned_{table}")


with DAG(
    dag_id="list_moved_tables_after_upgrade_dag",
    start_date=datetime.datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
):
    t1 = PythonOperator(
        task_id="list_moved_records", python_callable=list_moved_records
    )

with DAG(
    dag_id="rename_moved_tables_after_upgrade_dag",
    start_date=datetime.datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
) as dag:
    t1 = PythonOperator(
        task_id="rename_moved_tables", python_callable=rename_moved_tables
    )

A seguir