存取 Airflow 資料庫

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer„�

本頁說明如何連線至 Cloud Composer 環境的 Airflow 資料庫,並執行 SQL 查詢。

舉例來說,您可能想直接在 Airflow 資料庫上執行查詢、備份資料庫、根據資料庫內容收集統計資料,或是從資料庫擷取任何其他自訂資訊。

事前準備

將 Airflow 資料庫內容匯出至 Cloud SQL 執行個體

這個方法包括儲存環境快照 (內含 Airflow 資料庫傾印),然後將傾印匯入 Cloud SQL 執行個體。這樣一來,您就能對 Airflow 資料庫內容的快照執行查詢。

您可以在 Cloud Composer 3 支援的所有 Airflow 版本中使用這種方法,包括 Airflow 3.1.7 之後的版本,因為這些版本無法直接存取 Airflow 資料庫。

儲存環境快照

執行下列指令,儲存環境的快照。儲存快照後,作業結果會回報 URI,快照會儲存在 snapshotPath 欄位中。稍後會用到這個 URI。

如要進一步瞭解如何建立快照,請參閱「儲存及載入環境快照」。

gcloud composer environments snapshots save \
  ENVIRONMENT_NAME \
  --location LOCATION \
  --snapshot-location "SNAPSHOTS_URI"

更改下列內容:

  • ENVIRONMENT_NAME:環境名稱。
  • LOCATION:環境所在的區域。
  • (選用) SNAPSHOTS_URI,其中包含要儲存快照的值區資料夾 URI。如果省略這個引數,Cloud Composer 會將快照儲存至環境值區的 /snapshots 資料夾。

範例:

gcloud composer environments snapshots save \
  example-environment \
  --location us-central1 \
  --snapshot-location "gs://example-bucket/environment_snapshots"

結果範例:

Response:
'@type': type.googleapis.com/google.cloud.orchestration.airflow.service.v1.SaveSnapshotResponse
snapshotPath: gs://example-bucket/environment_snapshots/example-environment_us-central1_2026-03-17T11-26-24

準備目的地資料庫

如果沒有 Cloud SQL 執行個體,請建立一個。這個執行個體會儲存匯入的資料庫。

執行下列指令,建立 Cloud SQL 執行個體:

gcloud sql instances create SQL_INSTANCE_NAME \
  --database-version=POSTGRES_15 \
  --cpu=2 \
  --memory=4GB \
  --storage-size=100GB \
  --storage-auto-increase \
  --region=LOCATION \
  --root-password=PASSWORD

更改下列內容:

  • SQL_INSTANCE_NAME:Cloud SQL 執行個體的名稱。
  • LOCATION:必須建立執行個體的區域。建議使用與儲存快照的 bucket 相同的區域。
  • PASSWORD:用來連線至執行個體的密碼。

範例:

gcloud sql instances create example-instance \
  --database-version=POSTGRES_15 \
  --cpu=2 \
  --memory=4GB \
  --storage-size=100GB \
  --storage-auto-increase \
  --region=us-central1 \
  --root-password=example_password

執行下列指令,建立名為 airflow_db 的資料庫:

gcloud sql databases create airflow_db \
  --instance=SQL_INSTANCE_NAME

更改下列內容:

  • SQL_INSTANCE_NAME:Cloud SQL 執行個體的名稱。

範例:

gcloud sql databases create airflow_db \
  --instance=example-instance

將 IAM 權限授予 Cloud SQL 服務帳戶

在含有快照的值區中,將匯入資料的角色授予 Cloud SQL 執行個體的 Cloud SQL 服務帳戶。如要進一步瞭解將資料匯入 Cloud SQL 的 IAM 角色,請參閱「將 SQL 傾印檔案匯入 PostgreSQL 適用的 Cloud SQL」一文。

執行下列指令,取得 Cloud SQL 服務帳戶電子郵件地址:

gcloud sql instances describe SQL_INSTANCE_NAME \
  --format="value(serviceAccountEmailAddress)"

更改下列內容:

  • SQL_INSTANCE_NAME:Cloud SQL 執行個體的名稱。

範例:

gcloud sql instances describe example-instance --format="value(serviceAccountEmailAddress)"

輸出內容範例:

p231236835740-kw9999@gcp-sa-cloud-sql.iam.gserviceaccount.com

將讀取權限授予這個服務帳戶:

gcloud storage buckets add-iam-policy-binding gs://BUCKET_NAME \
  --member=serviceAccount:SQL_SERVICE_ACCOUNT \
  --role=roles/storage.objectAdmin

更改下列內容:

  • BUCKET_NAME:Cloud Storage bucket 的名稱。這是 gs:// 後方緊接的 SNAPSHOTS_URI 部分。
  • SQL_SERVICE_ACCOUNT:Cloud SQL 執行個體服務帳戶的電子郵件地址。您已透過先前的指令取得這項資訊。

範例:

gcloud storage buckets add-iam-policy-binding gs://example-bucket \
  --member=serviceAccount:p231236835740-kw9999@gcp-sa-cloud-sql.iam.gserviceaccount.com \
  --role=roles/storage.objectAdmin

匯入資料庫轉儲

執行下列指令,將先前儲存的快照匯入 Cloud SQL 執行個體的 airflow_db 資料庫。

匯入程序進行期間,airflow_db 資料庫將無法使用。

gcloud sql import sql SQL_INSTANCE_NAME \
  $(gcloud storage ls SNAPSHOTS_URI/*.sql.gz) \
  --database=airflow_db \
  --user=postgres

更改下列內容:

  • SQL_INSTANCE_NAME:Cloud SQL 執行個體的名稱。
  • SNAPSHOT_PATH,並提供儲存快照的特定 bucket 資料夾 URI。這個 URI 會在您儲存快照時傳回。

範例:

gcloud sql import sql example-instance \
  $(gcloud storage ls gs://example-bucket/environment_snapshots/example-environment_us-central1_2026-03-17T11-26-24/*.sql.gz) \
  --database=airflow_db \
  --user=postgres

(建議) 匯入完成後撤銷 bucket 存取權

匯入作業完成後,建議您撤銷 Cloud SQL 執行個體服務帳戶的 Cloud Storage bucket 存取權限。

執行下列指令來進行這項操作:

gcloud storage buckets remove-iam-policy-binding gs://BUCKET_NAME \
  --member=serviceAccount:SQL_SERVICE_ACCOUNT \
  --role=roles/storage.objectAdmin

更改下列內容:

  • BUCKET_NAME:Cloud Storage bucket 的名稱。這是 gs:// 後方緊接的 SNAPSHOTS_URI 部分。
  • SQL_SERVICE_ACCOUNT:Cloud SQL 執行個體服務帳戶的電子郵件地址。您已透過先前的指令取得這項資訊。

範例:

gcloud storage buckets revoke-iam-policy-binding gs://example-bucket \
  --member=serviceAccount:p231236835740-kw9999@gcp-sa-cloud-sql.iam.gserviceaccount.com \
  --role=roles/storage.objectAdmin

對匯入的資料庫執行 SQL 查詢

匯入完成後,您就可以對其執行查詢。舉例來說,您可以使用 Google Cloud CLI 執行查詢

從 DAG 對 Airflow 資料庫執行 SQL 查詢

如要連線至 Airflow 資料庫,請按照下列步驟操作:

  1. 使用一或多個 SQLExecuteQueryOperator 運算子建立 DAG。如要開始使用,可以採用範例 DAG。

  2. 在運算子的 sql 參數中,指定 SQL 查詢。

  3. 上傳這個 DAG 至環境。

  4. 觸發 DAG,例如手動觸發,或等待 DAG 依排程執行。

DAG 範例:

import datetime
import os

import airflow
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator

SQL_DATABASE = os.environ["SQL_DATABASE"]

with airflow.DAG(
    "airflow_db_connection_example",
    start_date=datetime.datetime(2025, 1, 1),
    schedule=None,
    catchup=False) as dag:

    SQLExecuteQueryOperator(
        task_id="run_airflow_db_query",
        dag=dag,
        conn_id="airflow_db",
        database=SQL_DATABASE,
        sql="SELECT * FROM dag LIMIT 10;",
    )

如要進一步瞭解如何使用 SQLExecuteQueryOperator,請參閱 Airflow 說明文件中的「How-to Guide for Postgres using SQLExecuteQueryOperator」。

轉儲資料庫內容並轉移至值區

後續步驟