访问 Airflow 数据库

Managed Airflow(第 3 代) | Managed Airflow(第 2 代) | Managed Airflow(旧版第 1 代)

本页面介绍如何连接到运行您的 Managed Airflow 环境的 Airflow 数据库的 Cloud SQL 实例以及如何运行 SQL 查询。

例如,您可能想要直接在 Airflow 数据库上运行查询、进行数据库备份、根据数据库内容收集统计信息,或从数据库中检索任何其他自定义信息。

准备工作

将 Airflow 数据库内容导出到 Cloud SQL 实例

此方法包括保存包含 Airflow 数据库转储的环境快照,然后将该转储导入到 Cloud SQL 实例中。这样一来,您就可以对 Airflow 数据库内容的快照运行查询。

您可以在 Managed Airflow(第 3 代)支持的所有 Airflow 版本中使用此方法,包括 3.1.7 之后的 Airflow 3 版本,在这些版本中,无法再直接访问 Airflow 数据库。

保存环境快照

运行以下命令以保存环境的快照。保存快照后,操作结果将在 snapshotPath 字段中报告保存快照的 URI。您稍后会用到此 URI。

如需详细了解如何创建快照,请参阅保存和加载环境快照

gcloud composer environments snapshots save \
  ENVIRONMENT_NAME \
  --location LOCATION \
  --snapshot-location "SNAPSHOTS_URI"

替换以下内容:

  • ENVIRONMENT_NAME:您的环境的名称。
  • LOCATION:环境所在的区域。
  • (可选)SNAPSHOTS_URI,用于存储快照的存储桶文件夹的 URI。如果您省略此实参,Managed Airflow 会将快照保存在环境的存储桶中的 /snapshots 文件夹中。

示例:

gcloud composer environments snapshots save \
  example-environment \
  --location us-central1 \
  --snapshot-location "gs://example-bucket/environment_snapshots"

示例结果:

Response:
'@type': type.googleapis.com/google.cloud.orchestration.airflow.service.v1.SaveSnapshotResponse
snapshotPath: gs://example-bucket/environment_snapshots/example-environment_us-central1_2026-03-17T11-26-24

准备目标数据库

如果您没有 Cloud SQL 实例,请创建一个。此实例将存储导入的数据库。

运行以下命令以创建 Cloud SQL 实例:

gcloud sql instances create SQL_INSTANCE_NAME \
  --database-version=POSTGRES_15 \
  --cpu=2 \
  --memory=4GB \
  --storage-size=100GB \
  --storage-auto-increase \
  --region=LOCATION \
  --root-password=PASSWORD

替换以下内容:

  • SQL_INSTANCE_NAME:Cloud SQL 实例的名称。
  • LOCATION:必须创建实例的区域。建议使用与保存快照的存储桶相同的区域。
  • PASSWORD:您将用于连接实例的密码。

示例:

gcloud sql instances create example-instance \
  --database-version=POSTGRES_15 \
  --cpu=2 \
  --memory=4GB \
  --storage-size=100GB \
  --storage-auto-increase \
  --region=us-central1 \
  --root-password=example_password

运行以下命令以创建名为 airflow_db 的数据库:

gcloud sql databases create airflow_db \
  --instance=SQL_INSTANCE_NAME

替换以下内容:

  • SQL_INSTANCE_NAME:Cloud SQL 实例的名称。

示例:

gcloud sql databases create airflow_db \
  --instance=example-instance

向 Cloud SQL 服务账号授予 IAM 权限

在包含快照的存储桶中,向 Cloud SQL 实例的 Cloud SQL 服务账号授予用于导入数据的角色。如需详细了解用于将数据导入 Cloud SQL 的 IAM 角色,请参阅将 SQL 转储文件导入 Cloud SQL for PostgreSQL

运行以下命令以获取 Cloud SQL 服务账号电子邮件地址:

gcloud sql instances describe SQL_INSTANCE_NAME \
  --format="value(serviceAccountEmailAddress)"

替换以下内容:

  • SQL_INSTANCE_NAME:Cloud SQL 实例的名称。

示例:

gcloud sql instances describe example-instance --format="value(serviceAccountEmailAddress)"

输出示例:

p231236835740-kw9999@gcp-sa-cloud-sql.iam.gserviceaccount.com

向此服务账号授予读取权限:

gcloud storage buckets add-iam-policy-binding gs://BUCKET_NAME \
  --member=serviceAccount:SQL_SERVICE_ACCOUNT \
  --role=roles/storage.objectAdmin

替换以下内容:

  • BUCKET_NAME:Cloud Storage 存储桶的名称。这是 SNAPSHOTS_URI 中紧跟在 gs:// 之后的部分。
  • SQL_SERVICE_ACCOUNT:Cloud SQL 实例的服务账号的电子邮件地址。您可以使用上一个命令获取该值。

示例:

gcloud storage buckets add-iam-policy-binding gs://example-bucket \
  --member=serviceAccount:p231236835740-kw9999@gcp-sa-cloud-sql.iam.gserviceaccount.com \
  --role=roles/storage.objectAdmin

导入数据库转储

运行以下命令,将之前保存的快照中的数据库转储文件导入到 Cloud SQL 实例的 airflow_db 数据库中。

在导入过程中,airflow_db 数据库将不可用。

gcloud sql import sql SQL_INSTANCE_NAME \
  $(gcloud storage ls SNAPSHOTS_URI/*.sql.gz) \
  --database=airflow_db \
  --user=postgres

替换以下内容:

  • SQL_INSTANCE_NAME:Cloud SQL 实例的名称。
  • SNAPSHOT_PATH 替换为存储快照的特定存储桶文件夹的 URI。保存快照时会返回此 URI。

示例:

gcloud sql import sql example-instance \
  $(gcloud storage ls gs://example-bucket/environment_snapshots/example-environment_us-central1_2026-03-17T11-26-24/*.sql.gz) \
  --database=airflow_db \
  --user=postgres

(建议)在导入完成后撤消存储桶访问权限

建议您在导入完成后,撤消 Cloud SQL 实例的服务账号对 Cloud Storage 存储桶的访问权限。

运行以下命令来进行安装:

gcloud storage buckets remove-iam-policy-binding gs://BUCKET_NAME \
  --member=serviceAccount:SQL_SERVICE_ACCOUNT \
  --role=roles/storage.objectAdmin

替换以下内容:

  • BUCKET_NAME:Cloud Storage 存储桶的名称。这是 SNAPSHOTS_URI 中紧跟在 gs:// 之后的部分。
  • SQL_SERVICE_ACCOUNT:Cloud SQL 实例的服务账号的电子邮件地址。您可以使用上一个命令获取该值。

示例:

gcloud storage buckets revoke-iam-policy-binding gs://example-bucket \
  --member=serviceAccount:p231236835740-kw9999@gcp-sa-cloud-sql.iam.gserviceaccount.com \
  --role=roles/storage.objectAdmin

对导入的数据库运行 SQL 查询

导入完成后,您可以针对该数据运行查询。例如,您可以使用 Google Cloud CLI 运行查询

从 DAG 对 Airflow 数据库运行 SQL 查询

如需连接到 Airflow 数据库,请执行以下操作:

  1. 创建一个包含一个或多个 SQLExecuteQueryOperator 运算符的 DAG。您可以先使用示例 DAG。

  2. 在运算符的 sql 参数中,指定您的 SQL 查询。

  3. 将此 DAG 上传到您的环境。

  4. 触发 DAG,例如,您可以手动触发,也可以等待它按计划运行。

示例 DAG:

import datetime
import os

import airflow
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator

SQL_DATABASE = os.environ["SQL_DATABASE"]

with airflow.DAG(
    "airflow_db_connection_example",
    start_date=datetime.datetime(2025, 1, 1),
    schedule=None,
    catchup=False) as dag:

    SQLExecuteQueryOperator(
        task_id="run_airflow_db_query",
        dag=dag,
        conn_id="airflow_db",
        database=SQL_DATABASE,
        sql="SELECT * FROM dag LIMIT 10;",
    )

如需详细了解如何使用 SQLExecuteQueryOperator,请参阅 Airflow 文档中的使用 SQLExecuteQueryOperator 的 Postgres 方法指南

转储数据库内容并转移到存储桶

后续步骤