排查环境更新和升级问题

Managed Airflow(第 3 代) | Managed Airflow(第 2 代) | Managed Airflow(旧版第 1 代)

本页面针对更新或升级 Managed Service for Apache Airflow 环境时可能遇到的问题提供了问题排查信息。

如需了解与创建环境相关的问题排查信息,请参阅 排查环境创建问题

更新 Managed Airflow 环境时,大多数问题都是由以下原因造成的:

  • 服务账号权限问题
  • PyPI 依赖项问题
  • Airflow 数据库的大小

权限不足,无法更新或升级环境

如果 Managed Airflow 由于权限不足无法更新或升级环境,它将输出以下错误消息:

ERROR: (gcloud.composer.environments.update) PERMISSION_DENIED: The caller does not have permission

解决方案:如访问权限控制中所述,将角色分配给您的账号以及环境的服务帐号 。

环境的服务帐号权限不足

创建 Managed Airflow 环境时,您需要指定一个服务账号来执行环境的大部分操作。如果此服务帐号没有执行所请求操作的足够权限,Managed Airflow 将输出错误:

    UPDATE operation on this environment failed 3 minutes ago with the
    following error message:
    Composer Backend timed out. Currently running tasks are [stage:
    CP_COMPOSER_AGENT_RUNNING
    description: "No agent response published."
    response_timestamp {
      seconds: 1618203503
      nanos: 291000000
    }
    ].

解决方案:如访问权限控制中所述,将角色分配给您的 Google 账号以及 环境的服务帐号。

Airflow 数据库太大,无法执行此操作

如果 Airflow 数据库太大导致升级操作无法完成,升级操作可能不会成功。

如果 Airflow 数据库的大小超过 16 GB,Managed Airflow 会输出以下错误:

Airflow database uses more than 16 GB. Please clean the database before upgrading.

解决方案:执行 Airflow 数据库清理,如 清理 Airflow 数据库中所述。

由于 PyPI 软件包冲突,升级到新的 Managed Airflow 版本失败

使用已安装的自定义 PyPI 软件包升级环境时,您可能会遇到与 PyPI 软件包冲突相关的错误。出现此错误,可能是因为新的 Managed Service for Apache Airflow 映像 包含较新版本的预安装软件包。这会导致依赖项与您在环境中安装的 PyPI 软件包发生冲突。

解决方案

  • 如需获取关于软件包冲突的详细信息,请运行一 项升级检查
  • 放宽已安装的自定义 PyPI 软件包的版本限制条件。例如, 不要将版本指定为 ==1.0.1,而是将其指定为 >=1.0.1
  • 如需详细了解如何更改版本要求来解决 依赖项冲突问题,请参阅 pip 文档

无法将环境升级到仍受支持的版本

Managed Airflow 环境只能升级到 几个最新版本和之前的版本

创建新环境和升级现有环境的版本限制不同。您在创建新环境时选择的 Managed Airflow 版本可能无法用于升级现有环境。

您可以使用 Google Cloud CLI、API 或 Terraform 执行升级操作。在 Google Cloud 控制台中,只有最新版本可作为升级选项 。

环境状况不佳(活跃度检查失败)

只有当环境的状态 报告为“健康”时,才能升级环境。

环境状态不佳的最常见原因之一是,环境的组件接近配置的资源限制,并且始终以最大负载运行。由于某些环境组件无法报告其状态,因此活跃度检查 DAG 会将环境的状态报告为“不健康”。

如需解决此问题,我们建议您增加资源限制。虽然我们建议您始终避免环境接近限制,但您也可以仅在环境升级期间这样做。

缺少与 DNS 的连接可能会在执行升级或更新时导致问题

此类连接问题可能会导致如下日志条目:

WARNING - Compute Engine Metadata server unavailable attempt 1 of 5. Reason: [Errno -3] Temporary failure in name resolution Error

这通常意味着没有通往 DNS 的路由,因此请确保元数据.google.internal DNS 名称可以从集群、Pod 和服务网络内解析为 IP 地址。检查您是否在创建环境的 VPC(在宿主或服务项目中)内启用了专用 Google 访问通道。

触发器 CPU 超出 1 个 vCPU 的限制

Managed Airflow 2.4.4 及更高版本引入了不同的触发器资源分配策略,以提高性能伸缩能力。如果您在执行环境更新时遇到与 triggerer(触发器) CPU 相关的错误,则表示您的当前 triggerer(触发器) 配置为每个 triggerer(触发器) 使用超过 1 个 vCPU。

解决方案

检查失败的迁移警告

将 Airflow 升级到更高版本时,有时会对 Airflow 数据库应用新的限制。如果无法应用这些限制,Airflow 会创建新表来存储无法应用限制的行。在重命名或删除移动的数据表之前,Airflow 界面会显示警告消息。

解决方案

您可以使用以下两个 DAG 检查移动的数据并重命名表。

list_moved_tables_after_upgrade_dag DAG 列出了无法应用限制的每个表中移动的行。检查数据并决定是否要保留这些数据。如需保留这些数据,您需要手动修复 Airflow 数据库中的数据。例如,通过使用正确的数据添加回行。

如果您不需要这些数据或已修复这些数据,则可以运行 rename_moved_tables_after_upgrade_dag DAG。此 DAG 会重命名移动的表。 表及其数据不会被删除,因此您可以稍后查看数据。

"""
When upgrading Airflow to a newer version,
it might happen that some data cannot be migrated,
often because of constraint changes in the metadata base.
This file contains 2 DAGs:

1. 'list_moved_tables_after_upgrade_dag'
  Prints the rows which failed to be migrated.
2. 'rename_moved_tables_after_upgrade_dag'
  Renames the table which contains the failed migrations. This will remove the
  warning message from airflow.
"""

import datetime
import logging

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.settings import AIRFLOW_MOVED_TABLE_PREFIX


def get_moved_tables():
    hook = PostgresHook(postgres_conn_id="airflow_db")
    return hook.get_records(
        "SELECT schemaname, tablename FROM pg_catalog.pg_tables WHERE tablename"
        f" LIKE '{AIRFLOW_MOVED_TABLE_PREFIX}_%'"
    )


def list_moved_records():
    tables = get_moved_tables()
    if not tables:
        logging.info("No moved tables found")
        return

    hook = PostgresHook(postgres_conn_id="airflow_db")
    for schema, table in tables:
        df = hook.get_pandas_df(f"SELECT * FROM {schema}.{table}")
        logging.info(df.to_markdown())


def rename_moved_tables():
    tables = get_moved_tables()
    if not tables:
        return

    hook = PostgresHook(postgres_conn_id="airflow_db")
    for schema, table in tables:
        hook.run(f"ALTER TABLE {schema}.{table} RENAME TO _abandoned_{table}")


with DAG(
    dag_id="list_moved_tables_after_upgrade_dag",
    start_date=datetime.datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
):
    t1 = PythonOperator(
        task_id="list_moved_records", python_callable=list_moved_records
    )

with DAG(
    dag_id="rename_moved_tables_after_upgrade_dag",
    start_date=datetime.datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
) as dag:
    t1 = PythonOperator(
        task_id="rename_moved_tables", python_callable=rename_moved_tables
    )

环境操作无限期处于失败状态

Managed Airflow(第 2 代)环境依赖于 Pub/Sub 主题和 订阅,以便在环境操作期间与位于环境 租户项目中的资源进行通信

如果您的项目中停用了 Pub/Sub API,或者删除了环境的主题或订阅,则环境操作可能会失败并无限期处于失败状态。此类环境将不可撤销地损坏。

后续步骤