Airflow 데이터베이스에 액세스

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

이 페이지에서는 Cloud Composer 환경의 Airflow 데이터베이스를 실행하는 Cloud SQL 인스턴스에 연결하고 SQL 쿼리를 실행하는 방법을 설명합니다.

예를 들어 Airflow 데이터베이스에서 직접 쿼리를 실행하거나, 데이터베이스 백업을 수행하거나, 데이터베이스 콘텐츠를 기반으로 통계를 수집하거나, 데이터베이스에서 다른 커스텀 정보를 가져올 수 있습니다.

시작하기 전에

Airflow 데이터베이스 콘텐츠를 Cloud SQL 인스턴스로 내보내기

이 방법에는 Airflow 데이터베이스 덤프가 포함된 환경 스냅샷을 저장한 다음 덤프를 Cloud SQL 인스턴스로 가져오는 작업이 포함됩니다. 이렇게 하면 Airflow 데이터베이스 콘텐츠의 스냅샷에 대해 쿼리를 실행할 수 있습니다.

Airflow 데이터베이스에 직접 액세스할 수 없는 Airflow 3.1.7 이후 버전의 Airflow를 비롯해 Cloud Composer 3에서 지원하는 모든 Airflow 버전에서 이 방법을 사용할 수 있습니다.

환경 스냅샷 저장

다음 명령어를 실행하여 환경의 스냅샷을 저장합니다. 스냅샷을 저장하면 작업 결과에서 스냅샷이 저장된 URI를 snapshotPath 필드에 보고합니다. 이 URI는 나중에 사용하게 됩니다.

스냅샷을 만드는 방법에 대한 자세한 내용은 환경 스냅샷 저장 및 로드를 참고하세요.

gcloud composer environments snapshots save \
  ENVIRONMENT_NAME \
  --location LOCATION \
  --snapshot-location "SNAPSHOTS_URI"

다음을 바꿉니다.

  • ENVIRONMENT_NAME: 환경의 이름입니다.
  • LOCATION: 환경이 위치한 리전
  • (선택사항) SNAPSHOTS_URI를 스냅샷을 저장할 버킷 폴더의 URI로 바꿉니다. 이 인수를 생략하면 Cloud Composer가 해당 환경 버킷의 /snapshots 폴더에 스냅샷을 저장합니다.

예:

gcloud composer environments snapshots save \
  example-environment \
  --location us-central1 \
  --snapshot-location "gs://example-bucket/environment_snapshots"

결과 예시:

Response:
'@type': type.googleapis.com/google.cloud.orchestration.airflow.service.v1.SaveSnapshotResponse
snapshotPath: gs://example-bucket/environment_snapshots/example-environment_us-central1_2026-03-17T11-26-24

대상 데이터베이스 준비

Cloud SQL 인스턴스가 없으면 인스턴스를 만듭니다. 이 인스턴스에 가져온 데이터베이스가 저장됩니다.

다음 명령어를 실행하여 Cloud SQL 인스턴스를 만듭니다.

gcloud sql instances create SQL_INSTANCE_NAME \
  --database-version=POSTGRES_15 \
  --cpu=2 \
  --memory=4GB \
  --storage-size=100GB \
  --storage-auto-increase \
  --region=LOCATION \
  --root-password=PASSWORD

다음을 바꿉니다.

  • SQL_INSTANCE_NAME: Cloud SQL 인스턴스의 이름입니다.
  • LOCATION: 인스턴스를 만들어야 하는 리전입니다. 스냅샷이 저장된 버킷과 동일한 리전을 사용하는 것이 좋습니다.
  • PASSWORD: 인스턴스에 연결하는 데 사용할 비밀번호입니다.

예:

gcloud sql instances create example-instance \
  --database-version=POSTGRES_15 \
  --cpu=2 \
  --memory=4GB \
  --storage-size=100GB \
  --storage-auto-increase \
  --region=us-central1 \
  --root-password=example_password

다음 명령어를 실행하여 airflow_db라는 데이터베이스를 만듭니다.

gcloud sql databases create airflow_db \
  --instance=SQL_INSTANCE_NAME

다음을 바꿉니다.

  • SQL_INSTANCE_NAME: Cloud SQL 인스턴스의 이름입니다.

예:

gcloud sql databases create airflow_db \
  --instance=example-instance

Cloud SQL 서비스 계정에 IAM 권한 부여

스냅샷이 포함된 버킷에서 데이터 가져오기 역할을 Cloud SQL 인스턴스의 Cloud SQL 서비스 계정에 부여합니다. Cloud SQL로 데이터를 가져오기 위한 IAM 역할에 대한 자세한 내용은 PostgreSQL용 Cloud SQL로 SQL 덤프 파일 가져오기를 참고하세요.

다음 명령어를 실행하여 Cloud SQL 서비스 계정 이메일을 가져옵니다.

gcloud sql instances describe SQL_INSTANCE_NAME \
  --format="value(serviceAccountEmailAddress)"

다음을 바꿉니다.

  • SQL_INSTANCE_NAME: Cloud SQL 인스턴스의 이름입니다.

예:

gcloud sql instances describe example-instance --format="value(serviceAccountEmailAddress)"

출력 예시:

p231236835740-kw9999@gcp-sa-cloud-sql.iam.gserviceaccount.com

이 서비스 계정에 읽기 권한을 부여합니다.

gcloud storage buckets add-iam-policy-binding gs://BUCKET_NAME \
  --member=serviceAccount:SQL_SERVICE_ACCOUNT \
  --role=roles/storage.objectAdmin

다음을 바꿉니다.

  • BUCKET_NAME: Cloud Storage 버킷의 이름입니다. gs:// 바로 뒤에 오는 SNAPSHOTS_URI 부분입니다.
  • SQL_SERVICE_ACCOUNT: Cloud SQL 인스턴스의 서비스 계정 이메일입니다. 이전 명령어로 가져왔습니다.

예:

gcloud storage buckets add-iam-policy-binding gs://example-bucket \
  --member=serviceAccount:p231236835740-kw9999@gcp-sa-cloud-sql.iam.gserviceaccount.com \
  --role=roles/storage.objectAdmin

데이터베이스 덤프 가져오기

다음 명령어를 실행하여 이전에 저장한 스냅샷의 데이터베이스 덤프 파일을 Cloud SQL 인스턴스의 airflow_db 데이터베이스로 가져옵니다.

가져오기 프로세스 중에는 airflow_db 데이터베이스를 사용할 수 없습니다.

gcloud sql import sql SQL_INSTANCE_NAME \
  $(gcloud storage ls SNAPSHOTS_URI/*.sql.gz) \
  --database=airflow_db \
  --user=postgres

다음을 바꿉니다.

  • SQL_INSTANCE_NAME: Cloud SQL 인스턴스의 이름입니다.
  • SNAPSHOT_PATH를 스냅샷이 저장된 특정 버킷 폴더의 URI로 바꿉니다. 이 URI는 스냅샷을 저장할 때 반환됩니다.

예:

gcloud sql import sql example-instance \
  $(gcloud storage ls gs://example-bucket/environment_snapshots/example-environment_us-central1_2026-03-17T11-26-24/*.sql.gz) \
  --database=airflow_db \
  --user=postgres

(권장) 가져오기가 완료된 후 버킷 액세스 권한 취소

가져오기가 완료된 후 Cloud SQL 인스턴스의 서비스 계정에서 Cloud Storage 버킷 액세스 권한을 취소하는 것이 좋습니다.

다음 명령어를 실행하여 수행합니다.

gcloud storage buckets remove-iam-policy-binding gs://BUCKET_NAME \
  --member=serviceAccount:SQL_SERVICE_ACCOUNT \
  --role=roles/storage.objectAdmin

다음을 바꿉니다.

  • BUCKET_NAME: Cloud Storage 버킷의 이름입니다. gs:// 바로 뒤에 오는 SNAPSHOTS_URI 부분입니다.
  • SQL_SERVICE_ACCOUNT: Cloud SQL 인스턴스의 서비스 계정 이메일입니다. 이전 명령어로 가져왔습니다.

예:

gcloud storage buckets revoke-iam-policy-binding gs://example-bucket \
  --member=serviceAccount:p231236835740-kw9999@gcp-sa-cloud-sql.iam.gserviceaccount.com \
  --role=roles/storage.objectAdmin

가져온 데이터베이스에서 SQL 쿼리 실행

가져오기가 완료되면 가져온 데이터에 대해 쿼리를 실행할 수 있습니다. 예를 들어 Google Cloud CLI로 쿼리를 실행할 수 있습니다.

DAG에서 Airflow 데이터베이스에 SQL 쿼리 실행

Airflow 데이터베이스에 연결하려면 다음 단계를 따르세요.

  1. 하나 이상의 SQLExecuteQueryOperator 연산자로 DAG를 만듭니다. 시작하려면 예시 DAG를 사용하면 됩니다.

  2. 연산자의 sql 매개변수에서 SQL 쿼리를 지정합니다.

  3. 이 DAG를 환경에 업로드합니다.

  4. DAG를 트리거합니다. 예를 들어 수동으로 트리거하거나 예약된 시간에 실행될 때까지 기다릴 수 있습니다.

DAG 예시:

import datetime
import os

import airflow
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator

SQL_DATABASE = os.environ["SQL_DATABASE"]

with airflow.DAG(
    "airflow_db_connection_example",
    start_date=datetime.datetime(2025, 1, 1),
    schedule=None,
    catchup=False) as dag:

    SQLExecuteQueryOperator(
        task_id="run_airflow_db_query",
        dag=dag,
        conn_id="airflow_db",
        database=SQL_DATABASE,
        sql="SELECT * FROM dag LIMIT 10;",
    )

SQLExecuteQueryOperator 사용에 관한 자세한 내용은 Airflow 문서의 SQLExecuteQueryOperator를 사용하는 Postgres 방법 가이드를 참고하세요.

데이터베이스 콘텐츠를 덤프하고 버킷으로 전송

다음 단계