Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
En esta página, se explica cómo conectarse a una instancia de Cloud SQL que ejecuta la base de datos de Airflow de tu entorno de Cloud Composer y que ejecuta consultas de SQL.
Por ejemplo, es posible que desees ejecutar consultas directamente en la base de datos de Airflow, hacer copias de seguridad de la base de datos, recopilar estadísticas basadas en el contenido de la base de datos o recuperar cualquier otra información personalizada de la base de datos.
Antes de comenzar
Exporta el contenido de la base de datos de Airflow a una instancia de Cloud SQL
Este enfoque incluye guardar una instantánea del entorno, que contiene un volcado de la base de datos de Airflow, y, luego, importar el volcado a una instancia de Cloud SQL. De esta manera, puedes ejecutar consultas en una instantánea del contenido de la base de datos de Airflow.
Puedes usar este enfoque en todas las versiones de Airflow compatibles con Cloud Composer 3, incluidas las versiones de Airflow 3 posteriores a la 3.1.7, en las que ya no es posible el acceso directo a la base de datos de Airflow.
Cómo guardar una instantánea del entorno
Ejecuta el siguiente comando para guardar una instantánea de tu entorno. Después de guardar una instantánea, el resultado de la operación informará el URI en el que se guardó la instantánea en el campo snapshotPath. Usarás este URI más adelante.
Para obtener más información sobre cómo crear instantáneas, consulta Cómo guardar y cargar instantáneas del entorno.
gcloud composer environments snapshots save \
ENVIRONMENT_NAME \
--location LOCATION \
--snapshot-location "SNAPSHOTS_URI"
Reemplaza lo siguiente:
ENVIRONMENT_NAME: Es el nombre de tu entorno.LOCATION: Es la región en la que se encuentra el entorno.(Opcional)
SNAPSHOTS_URIcon el URI de una carpeta de bucket en la que se almacenará la instantánea. Si omites este argumento, Cloud Composer guardará la instantánea en la carpeta/snapshotsdel bucket de tu entorno.
Ejemplo:
gcloud composer environments snapshots save \
example-environment \
--location us-central1 \
--snapshot-location "gs://example-bucket/environment_snapshots"
Resultado de ejemplo:
Response:
'@type': type.googleapis.com/google.cloud.orchestration.airflow.service.v1.SaveSnapshotResponse
snapshotPath: gs://example-bucket/environment_snapshots/example-environment_us-central1_2026-03-17T11-26-24
Prepara la base de datos de destino
Si no tienes una instancia de Cloud SQL, crea una. Esta instancia almacenará la base de datos importada.
Ejecuta el siguiente comando para crear una instancia de Cloud SQL:
gcloud sql instances create SQL_INSTANCE_NAME \
--database-version=POSTGRES_15 \
--cpu=2 \
--memory=4GB \
--storage-size=100GB \
--storage-auto-increase \
--region=LOCATION \
--root-password=PASSWORD
Reemplaza lo siguiente:
SQL_INSTANCE_NAME: Nombre de la instancia de Cloud SQL.LOCATION: Es la región en la que se debe crear la instancia. Te recomendamos que uses la misma región que el bucket en el que se guardan las instantáneas.PASSWORD: Contraseña que usarás para conectarte a la instancia.
Ejemplo:
gcloud sql instances create example-instance \
--database-version=POSTGRES_15 \
--cpu=2 \
--memory=4GB \
--storage-size=100GB \
--storage-auto-increase \
--region=us-central1 \
--root-password=example_password
Ejecuta el siguiente comando para crear una base de datos llamada airflow_db:
gcloud sql databases create airflow_db \
--instance=SQL_INSTANCE_NAME
Reemplaza lo siguiente:
SQL_INSTANCE_NAME: Nombre de la instancia de Cloud SQL.
Ejemplo:
gcloud sql databases create airflow_db \
--instance=example-instance
Otorga permisos de IAM a la cuenta de servicio de Cloud SQL
En el bucket que contiene la instantánea, otorga un rol para importar datos a la cuenta de servicio de Cloud SQL de tu instancia de Cloud SQL. Para obtener más información sobre los roles de IAM para importar datos a Cloud SQL, consulta Importa un archivo de volcado de SQL a Cloud SQL para PostgreSQL.
Ejecuta el siguiente comando para obtener el correo electrónico de la cuenta de servicio de Cloud SQL:
gcloud sql instances describe SQL_INSTANCE_NAME \
--format="value(serviceAccountEmailAddress)"
Reemplaza lo siguiente:
SQL_INSTANCE_NAME: Nombre de la instancia de Cloud SQL.
Ejemplo:
gcloud sql instances describe example-instance --format="value(serviceAccountEmailAddress)"
Resultado de ejemplo:
p231236835740-kw9999@gcp-sa-cloud-sql.iam.gserviceaccount.com
Otorga permisos de lectura a esta cuenta de servicio:
gcloud storage buckets add-iam-policy-binding gs://BUCKET_NAME \
--member=serviceAccount:SQL_SERVICE_ACCOUNT \
--role=roles/storage.objectAdmin
Reemplaza lo siguiente:
BUCKET_NAME: Es el nombre del bucket de Cloud Storage. Esta es la parte deSNAPSHOTS_URIinmediatamente después degs://.SQL_SERVICE_ACCOUNT: Es el correo electrónico de la cuenta de servicio de la instancia de Cloud SQL. Lo obtuviste con el comando anterior.
Ejemplo:
gcloud storage buckets add-iam-policy-binding gs://example-bucket \
--member=serviceAccount:p231236835740-kw9999@gcp-sa-cloud-sql.iam.gserviceaccount.com \
--role=roles/storage.objectAdmin
Importa el volcado de la base de datos
Ejecuta el siguiente comando para importar el archivo de volcado de la base de datos desde la instantánea guardada anteriormente a la base de datos airflow_db de tu instancia de Cloud SQL.
La base de datos airflow_db no estará disponible durante el proceso de importación.
gcloud sql import sql SQL_INSTANCE_NAME \
$(gcloud storage ls SNAPSHOTS_URI/*.sql.gz) \
--database=airflow_db \
--user=postgres
Reemplaza lo siguiente:
SQL_INSTANCE_NAME: Nombre de la instancia de Cloud SQL.SNAPSHOT_PATHcon el URI de la carpeta del bucket específico en el que se almacena la instantánea. Este URI se devuelve cuando guardas una instantánea.
Ejemplo:
gcloud sql import sql example-instance \
$(gcloud storage ls gs://example-bucket/environment_snapshots/example-environment_us-central1_2026-03-17T11-26-24/*.sql.gz) \
--database=airflow_db \
--user=postgres
(Recomendado) Revoca el acceso al bucket después de que se complete la importación.
Te recomendamos que revoques los permisos de acceso al bucket de Cloud Storage de la cuenta de servicio de tu instancia de Cloud SQL después de que se complete la importación.
Para ello, ejecuta el comando siguiente:
gcloud storage buckets remove-iam-policy-binding gs://BUCKET_NAME \
--member=serviceAccount:SQL_SERVICE_ACCOUNT \
--role=roles/storage.objectAdmin
Reemplaza lo siguiente:
BUCKET_NAME: Es el nombre del bucket de Cloud Storage. Esta es la parte deSNAPSHOTS_URIinmediatamente después degs://.SQL_SERVICE_ACCOUNT: Es el correo electrónico de la cuenta de servicio de la instancia de Cloud SQL. Lo obtuviste con el comando anterior.
Ejemplo:
gcloud storage buckets revoke-iam-policy-binding gs://example-bucket \
--member=serviceAccount:p231236835740-kw9999@gcp-sa-cloud-sql.iam.gserviceaccount.com \
--role=roles/storage.objectAdmin
Ejecuta una consulta en SQL en la base de datos importada
Una vez que se complete la importación, podrás ejecutar consultas en ella. Por ejemplo, puedes ejecutar una consulta con Google Cloud CLI.
Ejecuta una consulta en SQL en la base de datos de Airflow desde un DAG
Para conectarte a la base de datos de Airflow, haz lo siguiente:
Crea un DAG con uno o más operadores SQLExecuteQueryOperator. Para comenzar, puedes usar el DAG de ejemplo.
En el parámetro
sqldel operador, especifica tu consulta en SQL.Sube este DAG a tu entorno.
Activa el DAG. Por ejemplo, puedes hacerlo manualmente o esperar a que se ejecute según una programación.
Ejemplo de DAG:
import datetime
import os
import airflow
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
SQL_DATABASE = os.environ["SQL_DATABASE"]
with airflow.DAG(
"airflow_db_connection_example",
start_date=datetime.datetime(2025, 1, 1),
schedule=None,
catchup=False) as dag:
SQLExecuteQueryOperator(
task_id="run_airflow_db_query",
dag=dag,
conn_id="airflow_db",
database=SQL_DATABASE,
sql="SELECT * FROM dag LIMIT 10;",
)
Para obtener más información sobre el uso de SQLExecuteQueryOperator, consulta la Guía práctica para Postgres con SQLExecuteQueryOperator en la documentación de Airflow.