Airflow 데이터베이스에 액세스

관리형 Airflow (3세대) | 관리형 Airflow (2세대) | 관리형 Airflow (기존 1세대)

이 페이지에서는 관리형 Airflow 환경의 Airflow 데이터베이스를 실행하는 Cloud SQL 인스턴스에 연결하고 SQL 쿼리를 실행하는 방법을 설명합니다.

예를 들어 Airflow 데이터베이스에서 직접 쿼리를 실행하거나, 데이터베이스 백업을 수행하거나, 데이터베이스 콘텐츠를 기반으로 통계를 수집하거나, 데이터베이스에서 다른 커스텀 정보를 가져올 수 있습니다.

시작하기 전에

Airflow 데이터베이스 콘텐츠를 Cloud SQL 인스턴스로 내보내기

이 접근 방식에는 Airflow 데이터베이스 덤프가 포함된 환경 스냅샷을 저장한 후 덤프를 Cloud SQL 인스턴스로 가져오는 것이 포함됩니다. 이 방식으로 Airflow 데이터베이스 콘텐츠의 스냅샷에 대해 쿼리를 실행할 수 있습니다.

Airflow 데이터베이스에 직접 액세스할 수 없는 Airflow 3.1.7 이후의 Airflow 3 버전을 포함하여 관리형 Airflow (3세대)에서 지원하는 모든 Airflow 버전에서 이 접근 방식을 사용할 수 있습니다.

환경 스냅샷 저장

다음 명령어를 실행하여 환경의 스냅샷을 저장합니다. 스냅샷을 저장하면 작업 결과가 스냅샷이 저장된 URI를 snapshotPath 필드에 보고합니다. 이 URI는 나중에 사용됩니다.

스냅샷을 만드는 방법에 대한 자세한 내용은 환경 스냅샷 저장 및 로드를 참조하세요.

gcloud composer environments snapshots save \
  ENVIRONMENT_NAME \
  --location LOCATION \
  --snapshot-location "SNAPSHOTS_URI"

다음을 바꿉니다.

  • ENVIRONMENT_NAME: 환경의 이름입니다.
  • LOCATION: 환경이 위치한 리전
  • (선택사항) 스냅샷을 저장할 버킷 폴더의 URI가 포함된 SNAPSHOTS_URI 이 인수를 생략하면 관리형 Airflow가 해당 환경 버킷의 /snapshots 폴더에 스냅샷을 저장합니다.

예:

gcloud composer environments snapshots save \
  example-environment \
  --location us-central1 \
  --snapshot-location "gs://example-bucket/environment_snapshots"

결과 예시:

Response:
'@type': type.googleapis.com/google.cloud.orchestration.airflow.service.v1.SaveSnapshotResponse
snapshotPath: gs://example-bucket/environment_snapshots/example-environment_us-central1_2026-03-17T11-26-24

대상 데이터베이스 준비

Cloud SQL 인스턴스가 없는 경우 인스턴스를 만듭니다. 이 인스턴스는 가져온 데이터베이스를 저장합니다.

다음 명령어를 실행하여 Cloud SQL 인스턴스를 생성합니다.

gcloud sql instances create SQL_INSTANCE_NAME \
  --database-version=POSTGRES_15 \
  --cpu=2 \
  --memory=4GB \
  --storage-size=100GB \
  --storage-auto-increase \
  --region=LOCATION \
  --root-password=PASSWORD

다음을 바꿉니다.

  • SQL_INSTANCE_NAME: Cloud SQL 인스턴스의 이름입니다.
  • LOCATION: 인스턴스를 만들어야 하는 리전입니다. 스냅샷이 저장된 버킷과 동일한 리전을 사용하는 것이 좋습니다.
  • PASSWORD: 인스턴스에 연결하는 데 사용할 비밀번호입니다.

예:

gcloud sql instances create example-instance \
  --database-version=POSTGRES_15 \
  --cpu=2 \
  --memory=4GB \
  --storage-size=100GB \
  --storage-auto-increase \
  --region=us-central1 \
  --root-password=example_password

다음 명령어를 실행하여 airflow_db라는 데이터베이스를 만듭니다.

gcloud sql databases create airflow_db \
  --instance=SQL_INSTANCE_NAME

다음을 바꿉니다.

  • SQL_INSTANCE_NAME: Cloud SQL 인스턴스의 이름입니다.

예:

gcloud sql databases create airflow_db \
  --instance=example-instance

Cloud SQL 서비스 계정에 IAM 권한 부여

스냅샷이 포함된 버킷에서 Cloud SQL 인스턴스의 Cloud SQL 서비스 계정에 데이터를 가져오는 역할을 부여합니다. Cloud SQL로 데이터를 가져오기 위한 IAM 역할에 대한 자세한 내용은 다음 자료를 참조하세요. PostgreSQL용 Cloud SQL로 SQL 덤프 파일 가져오기.

다음 명령어를 실행하여 Cloud SQL 서비스 계정 이메일을 가져옵니다.

gcloud sql instances describe SQL_INSTANCE_NAME \
  --format="value(serviceAccountEmailAddress)"

다음을 바꿉니다.

  • SQL_INSTANCE_NAME: Cloud SQL 인스턴스의 이름입니다.

예:

gcloud sql instances describe example-instance --format="value(serviceAccountEmailAddress)"

출력 예시:

p231236835740-kw9999@gcp-sa-cloud-sql.iam.gserviceaccount.com

이 서비스 계정에 읽기 권한을 부여합니다.

gcloud storage buckets add-iam-policy-binding gs://BUCKET_NAME \
  --member=serviceAccount:SQL_SERVICE_ACCOUNT \
  --role=roles/storage.objectAdmin

다음을 바꿉니다.

  • BUCKET_NAME: Cloud Storage 버킷의 이름입니다. gs:// 바로 뒤에 있는 SNAPSHOTS_URI의 일부입니다.
  • SQL_SERVICE_ACCOUNT: Cloud SQL 인스턴스 서비스 계정의 이메일입니다. 이전 명령어로 가져왔습니다.

예:

gcloud storage buckets add-iam-policy-binding gs://example-bucket \
  --member=serviceAccount:p231236835740-kw9999@gcp-sa-cloud-sql.iam.gserviceaccount.com \
  --role=roles/storage.objectAdmin

데이터베이스 덤프 가져오기

다음 명령어를 실행하여 이전에 저장한 스냅샷에서 데이터베이스 덤프 파일을 Cloud SQL 인스턴스의 airflow_db 데이터베이스로 가져옵니다.

가져오기 프로세스 중에 airflow_db 데이터베이스를 사용할 수 없습니다.

gcloud sql import sql SQL_INSTANCE_NAME \
  $(gcloud storage ls SNAPSHOTS_URI/*.sql.gz) \
  --database=airflow_db \
  --user=postgres

다음을 바꿉니다.

  • SQL_INSTANCE_NAME: Cloud SQL 인스턴스의 이름입니다.
  • 스냅샷이 저장된 특정 버킷 폴더의 URI가 포함된 SNAPSHOT_PATH 이 URI는 스냅샷을 저장할 때 반환됩니다.

예:

gcloud sql import sql example-instance \
  $(gcloud storage ls gs://example-bucket/environment_snapshots/example-environment_us-central1_2026-03-17T11-26-24/*.sql.gz) \
  --database=airflow_db \
  --user=postgres

(권장사항) 가져오기가 완료된 후 버킷 액세스 권한 취소

가져오기가 완료된 후 Cloud SQL 인스턴스의 서비스 계정에서 Cloud Storage 버킷 액세스 권한을 취소하는 것이 좋습니다.

다음 명령어를 실행하여 수행합니다.

gcloud storage buckets remove-iam-policy-binding gs://BUCKET_NAME \
  --member=serviceAccount:SQL_SERVICE_ACCOUNT \
  --role=roles/storage.objectAdmin

다음을 바꿉니다.

  • BUCKET_NAME: Cloud Storage 버킷의 이름입니다. gs:// 바로 뒤에 있는 SNAPSHOTS_URI의 일부입니다.
  • SQL_SERVICE_ACCOUNT: Cloud SQL 인스턴스 서비스 계정의 이메일입니다. 이전 명령어로 가져왔습니다.

예:

gcloud storage buckets revoke-iam-policy-binding gs://example-bucket \
  --member=serviceAccount:p231236835740-kw9999@gcp-sa-cloud-sql.iam.gserviceaccount.com \
  --role=roles/storage.objectAdmin

가져온 데이터베이스에서 SQL 쿼리 실행

가져오기가 완료되면 가져온 데이터베이스에서 쿼리를 실행할 수 있습니다. 예를 들어 Google Cloud CLI를 사용하여 쿼리를 실행할 수 있습니다.

DAG에서 Airflow 데이터베이스에 대한 SQL 쿼리 실행

Airflow 데이터베이스에 연결하려면 다음 단계를 따르세요.

  1. 하나 이상의 SQLExecuteQueryOperator 연산자가 있는 DAG를 만듭니다. 시작하려면 DAG 예시를 사용하면 됩니다.

  2. 연산자의 sql 매개변수에서 SQL 쿼리를 지정합니다.

  3. 이 DAG를 환경에업로드합니다.

  4. DAG를 트리거합니다. 예를 들어 수동으로 트리거하거나 일정에 따라 실행될 때까지 기다릴 수 있습니다.

DAG 예시:

import datetime
import os

import airflow
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator

SQL_DATABASE = os.environ["SQL_DATABASE"]

with airflow.DAG(
    "airflow_db_connection_example",
    start_date=datetime.datetime(2025, 1, 1),
    schedule=None,
    catchup=False) as dag:

    SQLExecuteQueryOperator(
        task_id="run_airflow_db_query",
        dag=dag,
        conn_id="airflow_db",
        database=SQL_DATABASE,
        sql="SELECT * FROM dag LIMIT 10;",
    )

SQLExecuteQueryOperator 사용에 대한 자세한 내용은 Airflow 문서에서 SQLExecuteQueryOperator를 사용하여 Postgres에 대한 사용 방법 가이드 를 참조하세요.

데이터베이스 콘텐츠를 덤프하고 버킷으로 전송

다음 단계