Cloud Composer 3 | Cloud Composer 2 | Cloud Composer�
本頁說明如何連線至 Cloud Composer 環境的 Airflow 資料庫,並執行 SQL 查詢。
舉例來說,您可能想直接在 Airflow 資料庫上執行查詢、備份資料庫、根據資料庫內容收集統計資料,或是從資料庫擷取任何其他自訂資訊。
事前準備
將 Airflow 資料庫內容匯出至 Cloud SQL 執行個體
這個方法包括儲存環境快照 (內含 Airflow 資料庫傾印),然後將傾印匯入 Cloud SQL 執行個體。這樣一來,您就能對 Airflow 資料庫內容的快照執行查詢。
您可以在 Cloud Composer 3 支援的所有 Airflow 版本中使用這種方法,包括 Airflow 3.1.7 之後的版本,因為這些版本無法直接存取 Airflow 資料庫。
儲存環境快照
執行下列指令,儲存環境的快照。儲存快照後,作業結果會回報 URI,快照會儲存在 snapshotPath 欄位中。稍後會用到這個 URI。
如要進一步瞭解如何建立快照,請參閱「儲存及載入環境快照」。
gcloud composer environments snapshots save \
ENVIRONMENT_NAME \
--location LOCATION \
--snapshot-location "SNAPSHOTS_URI"
更改下列內容:
ENVIRONMENT_NAME:環境名稱。LOCATION:環境所在的區域。(選用)
SNAPSHOTS_URI,其中包含要儲存快照的值區資料夾 URI。如果省略這個引數,Cloud Composer 會將快照儲存至環境值區的/snapshots資料夾。
範例:
gcloud composer environments snapshots save \
example-environment \
--location us-central1 \
--snapshot-location "gs://example-bucket/environment_snapshots"
結果範例:
Response:
'@type': type.googleapis.com/google.cloud.orchestration.airflow.service.v1.SaveSnapshotResponse
snapshotPath: gs://example-bucket/environment_snapshots/example-environment_us-central1_2026-03-17T11-26-24
準備目的地資料庫
如果沒有 Cloud SQL 執行個體,請建立一個。這個執行個體會儲存匯入的資料庫。
執行下列指令,建立 Cloud SQL 執行個體:
gcloud sql instances create SQL_INSTANCE_NAME \
--database-version=POSTGRES_15 \
--cpu=2 \
--memory=4GB \
--storage-size=100GB \
--storage-auto-increase \
--region=LOCATION \
--root-password=PASSWORD
更改下列內容:
SQL_INSTANCE_NAME:Cloud SQL 執行個體的名稱。LOCATION:必須建立執行個體的區域。建議使用與儲存快照的 bucket 相同的區域。PASSWORD:用來連線至執行個體的密碼。
範例:
gcloud sql instances create example-instance \
--database-version=POSTGRES_15 \
--cpu=2 \
--memory=4GB \
--storage-size=100GB \
--storage-auto-increase \
--region=us-central1 \
--root-password=example_password
執行下列指令,建立名為 airflow_db 的資料庫:
gcloud sql databases create airflow_db \
--instance=SQL_INSTANCE_NAME
更改下列內容:
SQL_INSTANCE_NAME:Cloud SQL 執行個體的名稱。
範例:
gcloud sql databases create airflow_db \
--instance=example-instance
將 IAM 權限授予 Cloud SQL 服務帳戶
在含有快照的值區中,將匯入資料的角色授予 Cloud SQL 執行個體的 Cloud SQL 服務帳戶。如要進一步瞭解將資料匯入 Cloud SQL 的 IAM 角色,請參閱「將 SQL 傾印檔案匯入 PostgreSQL 適用的 Cloud SQL」一文。
執行下列指令,取得 Cloud SQL 服務帳戶電子郵件地址:
gcloud sql instances describe SQL_INSTANCE_NAME \
--format="value(serviceAccountEmailAddress)"
更改下列內容:
SQL_INSTANCE_NAME:Cloud SQL 執行個體的名稱。
範例:
gcloud sql instances describe example-instance --format="value(serviceAccountEmailAddress)"
輸出內容範例:
p231236835740-kw9999@gcp-sa-cloud-sql.iam.gserviceaccount.com
將讀取權限授予這個服務帳戶:
gcloud storage buckets add-iam-policy-binding gs://BUCKET_NAME \
--member=serviceAccount:SQL_SERVICE_ACCOUNT \
--role=roles/storage.objectAdmin
更改下列內容:
BUCKET_NAME:Cloud Storage bucket 的名稱。這是gs://後方緊接的SNAPSHOTS_URI部分。SQL_SERVICE_ACCOUNT:Cloud SQL 執行個體服務帳戶的電子郵件地址。您已透過先前的指令取得這項資訊。
範例:
gcloud storage buckets add-iam-policy-binding gs://example-bucket \
--member=serviceAccount:p231236835740-kw9999@gcp-sa-cloud-sql.iam.gserviceaccount.com \
--role=roles/storage.objectAdmin
匯入資料庫轉儲
執行下列指令,將先前儲存的快照匯入 Cloud SQL 執行個體的 airflow_db 資料庫。
匯入程序進行期間,airflow_db 資料庫將無法使用。
gcloud sql import sql SQL_INSTANCE_NAME \
$(gcloud storage ls SNAPSHOTS_URI/*.sql.gz) \
--database=airflow_db \
--user=postgres
更改下列內容:
SQL_INSTANCE_NAME:Cloud SQL 執行個體的名稱。SNAPSHOT_PATH,並提供儲存快照的特定 bucket 資料夾 URI。這個 URI 會在您儲存快照時傳回。
範例:
gcloud sql import sql example-instance \
$(gcloud storage ls gs://example-bucket/environment_snapshots/example-environment_us-central1_2026-03-17T11-26-24/*.sql.gz) \
--database=airflow_db \
--user=postgres
(建議) 匯入完成後撤銷 bucket 存取權
匯入作業完成後,建議您撤銷 Cloud SQL 執行個體服務帳戶的 Cloud Storage bucket 存取權限。
執行下列指令來進行這項操作:
gcloud storage buckets remove-iam-policy-binding gs://BUCKET_NAME \
--member=serviceAccount:SQL_SERVICE_ACCOUNT \
--role=roles/storage.objectAdmin
更改下列內容:
BUCKET_NAME:Cloud Storage bucket 的名稱。這是gs://後方緊接的SNAPSHOTS_URI部分。SQL_SERVICE_ACCOUNT:Cloud SQL 執行個體服務帳戶的電子郵件地址。您已透過先前的指令取得這項資訊。
範例:
gcloud storage buckets revoke-iam-policy-binding gs://example-bucket \
--member=serviceAccount:p231236835740-kw9999@gcp-sa-cloud-sql.iam.gserviceaccount.com \
--role=roles/storage.objectAdmin
對匯入的資料庫執行 SQL 查詢
匯入完成後,您就可以對其執行查詢。舉例來說,您可以使用 Google Cloud CLI 執行查詢。
從 DAG 對 Airflow 資料庫執行 SQL 查詢
如要連線至 Airflow 資料庫,請按照下列步驟操作:
使用一或多個 SQLExecuteQueryOperator 運算子建立 DAG。如要開始使用,可以採用範例 DAG。
在運算子的
sql參數中,指定 SQL 查詢。上傳這個 DAG 至環境。
觸發 DAG,例如手動觸發,或等待 DAG 依排程執行。
DAG 範例:
import datetime
import os
import airflow
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
SQL_DATABASE = os.environ["SQL_DATABASE"]
with airflow.DAG(
"airflow_db_connection_example",
start_date=datetime.datetime(2025, 1, 1),
schedule=None,
catchup=False) as dag:
SQLExecuteQueryOperator(
task_id="run_airflow_db_query",
dag=dag,
conn_id="airflow_db",
database=SQL_DATABASE,
sql="SELECT * FROM dag LIMIT 10;",
)
如要進一步瞭解如何使用 SQLExecuteQueryOperator,請參閱 Airflow 說明文件中的「How-to Guide for Postgres using SQLExecuteQueryOperator」。