Acessar o banco de dados do Airflow

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Nesta página, você verá como se conectar a uma instância do Cloud SQL que executa o banco de dados do Airflow do ambiente do Cloud Composer e executar consultas SQL.

Por exemplo, talvez você queira executar consultas diretamente no banco de dados do Airflow, fazer backups de bancos de dados, coletar estatísticas com base no conteúdo do banco de dados ou recuperar outras informações personalizadas do banco de dados.

Antes de começar

Exportar o conteúdo do banco de dados do Airflow para uma instância do Cloud SQL

Essa abordagem inclui salvar um snapshot do ambiente, que contém um despejo do banco de dados do Airflow, e importar o despejo para uma instância do Cloud SQL. Dessa forma, é possível executar consultas em um snapshot do conteúdo do banco de dados do Airflow.

É possível usar essa abordagem em todas as versões do Airflow compatíveis com o Cloud Composer 3, incluindo versões do Airflow posteriores à 3.1.7, em que o acesso direto ao banco de dados do Airflow não é mais possível.

Salvar um snapshot do ambiente

Execute o comando a seguir para salvar um snapshot do seu ambiente. Depois de salvar um snapshot, o resultado da operação vai informar o URI em que ele foi salvo no campo snapshotPath. Você vai usar esse URI mais tarde.

Para mais informações sobre como criar snapshots, consulte Salvar e carregar snapshots de ambiente.

gcloud composer environments snapshots save \
  ENVIRONMENT_NAME \
  --location LOCATION \
  --snapshot-location "SNAPSHOTS_URI"

Substitua:

  • ENVIRONMENT_NAME: o nome do ambiente;
  • LOCATION: a região em que o ambiente está localizado.
  • (Opcional) SNAPSHOTS_URI com o URI de uma pasta de bucket em que armazenar o instantâneo. Se você omitir esse argumento, o Cloud Composer vai salvar o snapshot na pasta /snapshots no bucket do ambiente.

Exemplo:

gcloud composer environments snapshots save \
  example-environment \
  --location us-central1 \
  --snapshot-location "gs://example-bucket/environment_snapshots"

Exemplo de resultado:

Response:
'@type': type.googleapis.com/google.cloud.orchestration.airflow.service.v1.SaveSnapshotResponse
snapshotPath: gs://example-bucket/environment_snapshots/example-environment_us-central1_2026-03-17T11-26-24

Preparar o banco de dados de destino

Se você não tiver uma instância do Cloud SQL, crie uma. Essa instância vai armazenar o banco de dados importado.

Execute o comando a seguir para criar uma instância do Cloud SQL:

gcloud sql instances create SQL_INSTANCE_NAME \
  --database-version=POSTGRES_15 \
  --cpu=2 \
  --memory=4GB \
  --storage-size=100GB \
  --storage-auto-increase \
  --region=LOCATION \
  --root-password=PASSWORD

Substitua:

  • SQL_INSTANCE_NAME: nome da instância do Cloud SQL.
  • LOCATION: região em que a instância será criada. Recomendamos usar a mesma região do bucket em que os snapshots são salvos.
  • PASSWORD: senha que você vai usar para se conectar à instância.

Exemplo:

gcloud sql instances create example-instance \
  --database-version=POSTGRES_15 \
  --cpu=2 \
  --memory=4GB \
  --storage-size=100GB \
  --storage-auto-increase \
  --region=us-central1 \
  --root-password=example_password

Execute o comando a seguir para criar um banco de dados chamado airflow_db:

gcloud sql databases create airflow_db \
  --instance=SQL_INSTANCE_NAME

Substitua:

  • SQL_INSTANCE_NAME: nome da instância do Cloud SQL.

Exemplo:

gcloud sql databases create airflow_db \
  --instance=example-instance

Conceder permissões do IAM à conta de serviço do Cloud SQL

No bucket que contém o snapshot, conceda um papel para importar dados para a conta de serviço do Cloud SQL da sua instância do Cloud SQL. Para mais informações sobre os papéis do IAM para importar dados para o Cloud SQL, consulte Importar um arquivo dump SQL para o Cloud SQL para PostgreSQL.

Execute o seguinte comando para receber o e-mail da conta de serviço do Cloud SQL:

gcloud sql instances describe SQL_INSTANCE_NAME \
  --format="value(serviceAccountEmailAddress)"

Substitua:

  • SQL_INSTANCE_NAME: nome da instância do Cloud SQL.

Exemplo:

gcloud sql instances describe example-instance --format="value(serviceAccountEmailAddress)"

Exemplo de saída:

p231236835740-kw9999@gcp-sa-cloud-sql.iam.gserviceaccount.com

Conceda permissões de leitura a essa conta de serviço:

gcloud storage buckets add-iam-policy-binding gs://BUCKET_NAME \
  --member=serviceAccount:SQL_SERVICE_ACCOUNT \
  --role=roles/storage.objectAdmin

Substitua:

  • BUCKET_NAME: nome do bucket do Cloud Storage. É a parte do SNAPSHOTS_URI imediatamente após gs://.
  • SQL_SERVICE_ACCOUNT: e-mail da conta de serviço da instância do Cloud SQL. Você o recebeu com o comando anterior.

Exemplo:

gcloud storage buckets add-iam-policy-binding gs://example-bucket \
  --member=serviceAccount:p231236835740-kw9999@gcp-sa-cloud-sql.iam.gserviceaccount.com \
  --role=roles/storage.objectAdmin

Importar o despejo do banco de dados

Execute o comando a seguir para importar o arquivo dump do banco de dados do snapshot salvo anteriormente para o banco de dados airflow_db da instância do Cloud SQL.

O banco de dados airflow_db não estará disponível durante o processo de importação.

gcloud sql import sql SQL_INSTANCE_NAME \
  $(gcloud storage ls SNAPSHOTS_URI/*.sql.gz) \
  --database=airflow_db \
  --user=postgres

Substitua:

  • SQL_INSTANCE_NAME: nome da instância do Cloud SQL.
  • SNAPSHOT_PATH com o URI da pasta específica do bucket em que o snapshot está armazenado. Esse URI é retornado quando você salva um snapshot.

Exemplo:

gcloud sql import sql example-instance \
  $(gcloud storage ls gs://example-bucket/environment_snapshots/example-environment_us-central1_2026-03-17T11-26-24/*.sql.gz) \
  --database=airflow_db \
  --user=postgres

(Recomendado) Revogue o acesso ao bucket após a conclusão da importação

Recomendamos revogar as permissões de acesso ao bucket do Cloud Storage da conta de serviço da instância do Cloud SQL após a conclusão da importação.

Para isso, execute o seguinte comando:

gcloud storage buckets remove-iam-policy-binding gs://BUCKET_NAME \
  --member=serviceAccount:SQL_SERVICE_ACCOUNT \
  --role=roles/storage.objectAdmin

Substitua:

  • BUCKET_NAME: nome do bucket do Cloud Storage. É a parte do SNAPSHOTS_URI imediatamente após gs://.
  • SQL_SERVICE_ACCOUNT: e-mail da conta de serviço da instância do Cloud SQL. Você o recebeu com o comando anterior.

Exemplo:

gcloud storage buckets revoke-iam-policy-binding gs://example-bucket \
  --member=serviceAccount:p231236835740-kw9999@gcp-sa-cloud-sql.iam.gserviceaccount.com \
  --role=roles/storage.objectAdmin

Executar uma consulta SQL no banco de dados importado

Depois que a importação for concluída, você poderá executar consultas nela. Por exemplo, é possível executar uma consulta com a Google Cloud CLI.

Executar uma consulta SQL no banco de dados do Airflow em um DAG

Para se conectar ao banco de dados do Airflow:

  1. Crie um DAG com um ou mais operadores SQLExecuteQueryOperator. Para começar, use o DAG de exemplo.

  2. No parâmetro sql do operador, especifique sua consulta SQL.

  3. Faça upload desse DAG para o ambiente.

  4. Acione o DAG. Por exemplo, você pode fazer isso manualmente ou esperar até que ele seja executado em uma programação.

Exemplo de DAG:

import datetime
import os

import airflow
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator

SQL_DATABASE = os.environ["SQL_DATABASE"]

with airflow.DAG(
    "airflow_db_connection_example",
    start_date=datetime.datetime(2025, 1, 1),
    schedule=None,
    catchup=False) as dag:

    SQLExecuteQueryOperator(
        task_id="run_airflow_db_query",
        dag=dag,
        conn_id="airflow_db",
        database=SQL_DATABASE,
        sql="SELECT * FROM dag LIMIT 10;",
    )

Para mais informações sobre como usar o SQLExecuteQueryOperator, consulte o guia de instruções para Postgres usando SQLExecuteQueryOperator na documentação do Airflow.

Despejar conteúdo do banco de dados e transferi-lo para um bucket

A seguir