Airflow データベースにアクセスする

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

このページでは、Cloud Composer 環境の Airflow データベースを実行する Cloud SQL インスタンスに接続して SQL クエリを実行する方法について説明します。

たとえば、Airflow データベースで直接クエリを実行する、データベースのバックアップを作成する、データベースのコンテンツに基づいて統計情報を収集する、またはデータベースからその他のカスタム情報を取得することが必要になる場合があります。

始める前に

Airflow データベースの内容を Cloud SQL インスタンスにエクスポートする

この方法では、Airflow データベース ダンプを含む環境スナップショットを保存し、そのダンプを Cloud SQL インスタンスにインポートします。このようにして、Airflow データベースのコンテンツのスナップショットに対してクエリを実行できます。

このアプローチは、Cloud Composer 3 でサポートされているすべての Airflow バージョンで使用できます。これには、Airflow データベースへの直接アクセスが不可能になった Airflow 3 バージョン 3.1.7 以降も含まれます。

環境のスナップショットを保存する

次のコマンドを実行して、環境のスナップショットを保存します。スナップショットを保存すると、オペレーションの結果の snapshotPath フィールドに、スナップショットが保存された URI が報告されます。この URI は後で使用します。

スナップショットの作成の詳細については、環境スナップショットの保存と読み込みをご覧ください。

gcloud composer environments snapshots save \
  ENVIRONMENT_NAME \
  --location LOCATION \
  --snapshot-location "SNAPSHOTS_URI"

次のように置き換えます。

  • ENVIRONMENT_NAME: 環境の名前。
  • LOCATION: 環境が配置されているリージョン。
  • (省略可)SNAPSHOTS_URI は、スナップショットを保存するバケット フォルダの URI に置き換えます。この引数を省略すると、Cloud Composer では、スナップショットが環境のバケット内の /snapshots フォルダに保存されます。

例:

gcloud composer environments snapshots save \
  example-environment \
  --location us-central1 \
  --snapshot-location "gs://example-bucket/environment_snapshots"

結果の例:

Response:
'@type': type.googleapis.com/google.cloud.orchestration.airflow.service.v1.SaveSnapshotResponse
snapshotPath: gs://example-bucket/environment_snapshots/example-environment_us-central1_2026-03-17T11-26-24

移行先データベースを準備する

Cloud SQL インスタンスがない場合は、作成します。このインスタンスには、インポートされたデータベースが保存されます。

次のコマンドを実行して、Cloud SQL インスタンスを作成します。

gcloud sql instances create SQL_INSTANCE_NAME \
  --database-version=POSTGRES_15 \
  --cpu=2 \
  --memory=4GB \
  --storage-size=100GB \
  --storage-auto-increase \
  --region=LOCATION \
  --root-password=PASSWORD

次のように置き換えます。

  • SQL_INSTANCE_NAME: Cloud SQL インスタンスの名前。
  • LOCATION: インスタンスを作成する必要があるリージョン。スナップショットが保存されているバケットと同じリージョンを使用することをおすすめします。
  • PASSWORD: インスタンスへの接続に使用するパスワード。

例:

gcloud sql instances create example-instance \
  --database-version=POSTGRES_15 \
  --cpu=2 \
  --memory=4GB \
  --storage-size=100GB \
  --storage-auto-increase \
  --region=us-central1 \
  --root-password=example_password

次のコマンドを実行して、airflow_db という名前のデータベースを作成します。

gcloud sql databases create airflow_db \
  --instance=SQL_INSTANCE_NAME

次のように置き換えます。

  • SQL_INSTANCE_NAME: Cloud SQL インスタンスの名前。

例:

gcloud sql databases create airflow_db \
  --instance=example-instance

Cloud SQL サービス アカウントに IAM 権限を付与する

スナップショットを含むバケットで、Cloud SQL インスタンスの Cloud SQL サービス アカウントにデータをインポートするロールを付与します。Cloud SQL にデータをインポートするための IAM ロールの詳細については、Cloud SQL for PostgreSQL に SQL ダンプファイルをインポートするをご覧ください。

次のコマンドを実行して、Cloud SQL サービス アカウントのメールアドレスを取得します。

gcloud sql instances describe SQL_INSTANCE_NAME \
  --format="value(serviceAccountEmailAddress)"

次のように置き換えます。

  • SQL_INSTANCE_NAME: Cloud SQL インスタンスの名前。

例:

gcloud sql instances describe example-instance --format="value(serviceAccountEmailAddress)"

出力例:

p231236835740-kw9999@gcp-sa-cloud-sql.iam.gserviceaccount.com

このサービス アカウントに読み取り権限を付与します。

gcloud storage buckets add-iam-policy-binding gs://BUCKET_NAME \
  --member=serviceAccount:SQL_SERVICE_ACCOUNT \
  --role=roles/storage.objectAdmin

次のように置き換えます。

  • BUCKET_NAME: Cloud Storage バケットの名前。これは、gs:// の直後の SNAPSHOTS_URI の部分です。
  • SQL_SERVICE_ACCOUNT: Cloud SQL インスタンスのサービス アカウントのメールアドレス。これは、前のコマンドで取得したものです。

例:

gcloud storage buckets add-iam-policy-binding gs://example-bucket \
  --member=serviceAccount:p231236835740-kw9999@gcp-sa-cloud-sql.iam.gserviceaccount.com \
  --role=roles/storage.objectAdmin

データベース ダンプをインポートする

次のコマンドを実行して、以前に保存したスナップショットから Cloud SQL インスタンスの airflow_db データベースにデータベース ダンプファイルをインポートします。

インポート プロセス中は airflow_db データベースを使用できません。

gcloud sql import sql SQL_INSTANCE_NAME \
  $(gcloud storage ls SNAPSHOTS_URI/*.sql.gz) \
  --database=airflow_db \
  --user=postgres

次のように置き換えます。

  • SQL_INSTANCE_NAME: Cloud SQL インスタンスの名前。
  • SNAPSHOT_PATH は、スナップショットが保存されている特定のバケット フォルダの URI に置き換えます。この URI は、スナップショットを保存するときに返されます。

例:

gcloud sql import sql example-instance \
  $(gcloud storage ls gs://example-bucket/environment_snapshots/example-environment_us-central1_2026-03-17T11-26-24/*.sql.gz) \
  --database=airflow_db \
  --user=postgres

(推奨)インポートの完了後にバケット アクセスを取り消す

インポートが完了したら、Cloud SQL インスタンスのサービス アカウントから Cloud Storage バケットのアクセス権を取り消すことをおすすめします。

以下のコマンドを実行します。

gcloud storage buckets remove-iam-policy-binding gs://BUCKET_NAME \
  --member=serviceAccount:SQL_SERVICE_ACCOUNT \
  --role=roles/storage.objectAdmin

次のように置き換えます。

  • BUCKET_NAME: Cloud Storage バケットの名前。これは、gs:// の直後の SNAPSHOTS_URI の部分です。
  • SQL_SERVICE_ACCOUNT: Cloud SQL インスタンスのサービス アカウントのメールアドレス。これは、前のコマンドで取得したものです。

例:

gcloud storage buckets revoke-iam-policy-binding gs://example-bucket \
  --member=serviceAccount:p231236835740-kw9999@gcp-sa-cloud-sql.iam.gserviceaccount.com \
  --role=roles/storage.objectAdmin

インポートしたデータベースに対して SQL クエリを実行する

インポートが完了すると、インポートしたデータに対してクエリを実行できます。たとえば、Google Cloud CLI でクエリを実行できます。

DAG から Airflow データベースで SQL クエリを実行する

Airflow データベースに接続するには:

  1. 1 つ以上の SQLExecuteQueryOperator 演算子を使用して DAG を作成します。始めるにあたっては、サンプル DAG を使用できます。

  2. 演算子の sql パラメータで、SQL クエリを指定します。

  3. この DAG を環境にアップロードします。

  4. DAG をトリガーします。たとえば、手動でトリガーすることも、スケジュールに従い実行されるまで待つこともできます。

DAG の例:

import datetime
import os

import airflow
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator

SQL_DATABASE = os.environ["SQL_DATABASE"]

with airflow.DAG(
    "airflow_db_connection_example",
    start_date=datetime.datetime(2025, 1, 1),
    schedule=None,
    catchup=False) as dag:

    SQLExecuteQueryOperator(
        task_id="run_airflow_db_query",
        dag=dag,
        conn_id="airflow_db",
        database=SQL_DATABASE,
        sql="SELECT * FROM dag LIMIT 10;",
    )

SQLExecuteQueryOperator の使用方法の詳細については、Airflow ドキュメントの SQLExecuteQueryOperator を使用した Postgres のハウツーガイドをご覧ください。

データベース コンテンツをダンプしてバケットに転送する

次のステップ