Mengakses database Airflow

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Halaman ini menjelaskan cara terhubung ke instance Cloud SQL yang menjalankan database Airflow di lingkungan Cloud Composer Anda dan menjalankan kueri SQL.

Misalnya, Anda mungkin ingin menjalankan kueri langsung di database Airflow, membuat cadangan database, mengumpulkan statistik berdasarkan konten database, atau mengambil informasi kustom lainnya dari database.

Sebelum memulai

Mengekspor konten database Airflow ke instance Cloud SQL

Pendekatan ini mencakup penyimpanan snapshot lingkungan, yang berisi dump database Airflow, lalu mengimpor dump ke instance Cloud SQL. Dengan cara ini, Anda dapat menjalankan kueri pada snapshot konten database Airflow.

Anda dapat menggunakan pendekatan ini di semua versi Airflow yang didukung oleh Cloud Composer 3, termasuk versi Airflow 3 yang lebih baru dari 3.1.7 yang tidak lagi memungkinkan akses langsung ke database Airflow.

Menyimpan snapshot lingkungan

Jalankan perintah berikut untuk menyimpan snapshot lingkungan Anda. Setelah Anda menyimpan snapshot, hasil operasi akan melaporkan URI tempat snapshot disimpan di kolom snapshotPath. Anda akan menggunakan URI ini nanti.

Untuk mengetahui informasi selengkapnya tentang cara membuat snapshot, lihat Menyimpan dan memuat snapshot lingkungan.

gcloud composer environments snapshots save \
  ENVIRONMENT_NAME \
  --location LOCATION \
  --snapshot-location "SNAPSHOTS_URI"

Ganti kode berikut:

  • ENVIRONMENT_NAME: nama lingkungan Anda.
  • LOCATION: region tempat lingkungan berada.
  • (Opsional) SNAPSHOTS_URI dengan URI folder bucket tempat menyimpan snapshot. Jika Anda tidak menyertakan argumen ini, Cloud Composer akan menyimpan snapshot di folder /snapshots dalam bucket lingkungan Anda.

Contoh:

gcloud composer environments snapshots save \
  example-environment \
  --location us-central1 \
  --snapshot-location "gs://example-bucket/environment_snapshots"

Contoh hasil:

Response:
'@type': type.googleapis.com/google.cloud.orchestration.airflow.service.v1.SaveSnapshotResponse
snapshotPath: gs://example-bucket/environment_snapshots/example-environment_us-central1_2026-03-17T11-26-24

Menyiapkan database tujuan

Jika Anda belum memiliki instance Cloud SQL, buat instance. Instance ini akan menyimpan database yang diimpor.

Jalankan perintah berikut untuk membuat instance Cloud SQL:

gcloud sql instances create SQL_INSTANCE_NAME \
  --database-version=POSTGRES_15 \
  --cpu=2 \
  --memory=4GB \
  --storage-size=100GB \
  --storage-auto-increase \
  --region=LOCATION \
  --root-password=PASSWORD

Ganti kode berikut:

  • SQL_INSTANCE_NAME: nama instance Cloud SQL.
  • LOCATION: region tempat instance harus dibuat. Sebaiknya gunakan region yang sama dengan bucket tempat snapshot disimpan.
  • PASSWORD: sandi yang akan Anda gunakan untuk terhubung ke instance.

    .

Contoh:

gcloud sql instances create example-instance \
  --database-version=POSTGRES_15 \
  --cpu=2 \
  --memory=4GB \
  --storage-size=100GB \
  --storage-auto-increase \
  --region=us-central1 \
  --root-password=example_password

Jalankan perintah berikut untuk membuat database bernama airflow_db:

gcloud sql databases create airflow_db \
  --instance=SQL_INSTANCE_NAME

Ganti kode berikut:

  • SQL_INSTANCE_NAME: nama instance Cloud SQL.

Contoh:

gcloud sql databases create airflow_db \
  --instance=example-instance

Memberikan izin IAM ke akun layanan Cloud SQL

Di bucket yang berisi snapshot, berikan peran untuk mengimpor data ke akun layanan Cloud SQL dari instance Cloud SQL Anda. Untuk mengetahui informasi selengkapnya tentang peran IAM untuk mengimpor data ke Cloud SQL, lihat Mengimpor file dump SQL ke Cloud SQL untuk PostgreSQL.

Jalankan perintah berikut untuk mendapatkan email akun layanan Cloud SQL:

gcloud sql instances describe SQL_INSTANCE_NAME \
  --format="value(serviceAccountEmailAddress)"

Ganti kode berikut:

  • SQL_INSTANCE_NAME: nama instance Cloud SQL.

Contoh:

gcloud sql instances describe example-instance --format="value(serviceAccountEmailAddress)"

Contoh output:

p231236835740-kw9999@gcp-sa-cloud-sql.iam.gserviceaccount.com

Berikan izin baca ke akun layanan ini:

gcloud storage buckets add-iam-policy-binding gs://BUCKET_NAME \
  --member=serviceAccount:SQL_SERVICE_ACCOUNT \
  --role=roles/storage.objectAdmin

Ganti kode berikut:

  • BUCKET_NAME: nama bucket Cloud Storage. Ini adalah bagian dari SNAPSHOTS_URI segera setelah gs://.
  • SQL_SERVICE_ACCOUNT: email akun layanan instance Cloud SQL. Anda mendapatkannya dengan perintah sebelumnya.

Contoh:

gcloud storage buckets add-iam-policy-binding gs://example-bucket \
  --member=serviceAccount:p231236835740-kw9999@gcp-sa-cloud-sql.iam.gserviceaccount.com \
  --role=roles/storage.objectAdmin

Mengimpor dump database

Jalankan perintah berikut untuk mengimpor file dump database dari snapshot yang disimpan sebelumnya ke database airflow_db instance Cloud SQL Anda.

Database airflow_db tidak akan tersedia selama proses impor.

gcloud sql import sql SQL_INSTANCE_NAME \
  $(gcloud storage ls SNAPSHOTS_URI/*.sql.gz) \
  --database=airflow_db \
  --user=postgres

Ganti kode berikut:

  • SQL_INSTANCE_NAME: nama instance Cloud SQL.
  • SNAPSHOT_PATH dengan URI folder bucket tertentu tempat snapshot disimpan. URI ini ditampilkan saat Anda menyimpan snapshot.

Contoh:

gcloud sql import sql example-instance \
  $(gcloud storage ls gs://example-bucket/environment_snapshots/example-environment_us-central1_2026-03-17T11-26-24/*.sql.gz) \
  --database=airflow_db \
  --user=postgres

(Direkomendasikan) Batalkan akses bucket setelah impor selesai

Sebaiknya batalkan izin akses bucket Cloud Storage dari akun layanan instance Cloud SQL Anda setelah impor selesai.

Jalankan perintah berikut untuk melakukannya:

gcloud storage buckets remove-iam-policy-binding gs://BUCKET_NAME \
  --member=serviceAccount:SQL_SERVICE_ACCOUNT \
  --role=roles/storage.objectAdmin

Ganti kode berikut:

  • BUCKET_NAME: nama bucket Cloud Storage. Ini adalah bagian dari SNAPSHOTS_URI segera setelah gs://.
  • SQL_SERVICE_ACCOUNT: email akun layanan instance Cloud SQL. Anda mendapatkannya dengan perintah sebelumnya.

Contoh:

gcloud storage buckets revoke-iam-policy-binding gs://example-bucket \
  --member=serviceAccount:p231236835740-kw9999@gcp-sa-cloud-sql.iam.gserviceaccount.com \
  --role=roles/storage.objectAdmin

Menjalankan kueri SQL pada database yang diimpor

Setelah impor selesai, Anda dapat menjalankan kueri di dalamnya. Misalnya, Anda dapat menjalankan kueri dengan Google Cloud CLI.

Menjalankan kueri SQL di database Airflow dari DAG

Untuk terhubung ke database Airflow:

  1. Buat DAG dengan satu atau beberapa operator SQLExecuteQueryOperator. Untuk memulai, Anda dapat menggunakan contoh DAG.

  2. Di parameter sql operator, tentukan kueri SQL Anda.

  3. Upload DAG ini ke lingkungan Anda.

  4. Picu DAG, misalnya, Anda dapat melakukannya secara manual atau menunggu hingga berjalan sesuai jadwal.

Contoh DAG:

import datetime
import os

import airflow
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator

SQL_DATABASE = os.environ["SQL_DATABASE"]

with airflow.DAG(
    "airflow_db_connection_example",
    start_date=datetime.datetime(2025, 1, 1),
    schedule=None,
    catchup=False) as dag:

    SQLExecuteQueryOperator(
        task_id="run_airflow_db_query",
        dag=dag,
        conn_id="airflow_db",
        database=SQL_DATABASE,
        sql="SELECT * FROM dag LIMIT 10;",
    )

Untuk mengetahui informasi selengkapnya tentang cara menggunakan SQLExecuteQueryOperator, lihat Panduan Cara Penggunaan Postgres menggunakan SQLExecuteQueryOperator dalam dokumentasi Airflow.

Mengekspor isi database dan mentransfernya ke bucket

Langkah berikutnya