Airflow géré (3e génération) | Airflow géré (2e génération) | Airflow géré (1re génération, version héritée)
Cette page explique comment se connecter à une instance Cloud SQL qui exécute la base de données Airflow de votre environnement Airflow géré et exécuter des requêtes SQL.
Par exemple, vous pouvez exécuter des requêtes directement sur la base de données Airflow, effectuer des sauvegardes de la base de données, collecter des statistiques en fonction du contenu de la base de données ou récupérer d'autres informations personnalisées de la base de données.
Avant de commencer
Exporter le contenu de la base de données Airflow vers une instance Cloud SQL
Cette approche consiste à enregistrer un instantané d'environnement, qui contient une vidange de la base de données Airflow, puis à importer la vidange dans une instance Cloud SQL. Vous pouvez ainsi exécuter des requêtes sur un instantané du contenu de la base de données Airflow.
Vous pouvez utiliser cette approche dans toutes les versions d'Airflow compatibles avec Airflow géré (3e génération), y compris les versions d'Airflow ultérieures à la version 3.1.7, où l'accès direct à la base de données Airflow n'est plus possible.
Enregistrer un instantané d'environnement
Exécutez la commande suivante pour enregistrer un instantané de votre environnement. Une fois l'instantané enregistré, le résultat de l'opération indique l'URI où l'instantané est enregistré dans le champ snapshotPath. Vous utiliserez cet URI ultérieurement.
Pour en savoir plus sur la création d'instantanés, consultez Enregistrer et charger des instantanés d'environnement.
gcloud composer environments snapshots save \
ENVIRONMENT_NAME \
--location LOCATION \
--snapshot-location "SNAPSHOTS_URI"
Remplacez les éléments suivants :
ENVIRONMENT_NAME: nom de votre environnementLOCATION: région où se trouve l'environnement(Facultatif)
SNAPSHOTS_URIavec l'URI d'un dossier de bucket dans lequel stocker l'instantané Si vous omettez cet argument, Airflow géré enregistre l'instantané dans le dossier/snapshotsdu bucket de votre environnement.
Exemple :
gcloud composer environments snapshots save \
example-environment \
--location us-central1 \
--snapshot-location "gs://example-bucket/environment_snapshots"
Exemple de résultat :
Response:
'@type': type.googleapis.com/google.cloud.orchestration.airflow.service.v1.SaveSnapshotResponse
snapshotPath: gs://example-bucket/environment_snapshots/example-environment_us-central1_2026-03-17T11-26-24
Préparer la base de données de destination
Si vous ne disposez pas d'instance Cloud SQL, créez-en une. Cette instance stockera la base de données importée.
Exécutez la commande suivante pour créer une instance Cloud SQL :
gcloud sql instances create SQL_INSTANCE_NAME \
--database-version=POSTGRES_15 \
--cpu=2 \
--memory=4GB \
--storage-size=100GB \
--storage-auto-increase \
--region=LOCATION \
--root-password=PASSWORD
Remplacez les éléments suivants :
SQL_INSTANCE_NAME: nom de l'instance Cloud SQLLOCATION: région dans laquelle l'instance doit être créée Nous vous recommandons d'utiliser la même région que le bucket dans lequel les instantanés sont enregistrés.PASSWORD: mot de passe que vous utiliserez pour vous connecter à l'instance
Exemple :
gcloud sql instances create example-instance \
--database-version=POSTGRES_15 \
--cpu=2 \
--memory=4GB \
--storage-size=100GB \
--storage-auto-increase \
--region=us-central1 \
--root-password=example_password
Exécutez la commande suivante pour créer une base de données nommée airflow_db :
gcloud sql databases create airflow_db \
--instance=SQL_INSTANCE_NAME
Remplacez les éléments suivants :
SQL_INSTANCE_NAME: nom de l'instance Cloud SQL
Exemple :
gcloud sql databases create airflow_db \
--instance=example-instance
Accorder des autorisations IAM au compte de service Cloud SQL
Dans le bucket contenant l'instantané, accordez un rôle pour importer des données dans le compte de service Cloud SQL de votre instance Cloud SQL. Pour en savoir plus sur les rôles IAM permettant d'importer des données dans Cloud SQL, consultez Importer un fichier de dump SQL dans Cloud SQL pour PostgreSQL.
Exécutez la commande suivante pour obtenir l'adresse e-mail du compte de service Cloud SQL :
gcloud sql instances describe SQL_INSTANCE_NAME \
--format="value(serviceAccountEmailAddress)"
Remplacez les éléments suivants :
SQL_INSTANCE_NAME: nom de l'instance Cloud SQL
Exemple :
gcloud sql instances describe example-instance --format="value(serviceAccountEmailAddress)"
Exemple de résultat :
p231236835740-kw9999@gcp-sa-cloud-sql.iam.gserviceaccount.com
Accordez des autorisations de lecture à ce compte de service :
gcloud storage buckets add-iam-policy-binding gs://BUCKET_NAME \
--member=serviceAccount:SQL_SERVICE_ACCOUNT \
--role=roles/storage.objectAdmin
Remplacez les éléments suivants :
BUCKET_NAME: nom du bucket Cloud Storage Il s'agit de la partie deSNAPSHOTS_URIqui suit immédiatementgs://.SQL_SERVICE_ACCOUNT: adresse e-mail du compte de service de l'instance Cloud SQL Vous l'avez obtenue avec la commande précédente.
Exemple :
gcloud storage buckets add-iam-policy-binding gs://example-bucket \
--member=serviceAccount:p231236835740-kw9999@gcp-sa-cloud-sql.iam.gserviceaccount.com \
--role=roles/storage.objectAdmin
Importer le vidage de la base de données
Exécutez la commande suivante pour importer le fichier de dump de la base de données à partir de l'instantané précédemment enregistré dans la base de données airflow_db de votre instance Cloud SQL.
La base de données airflow_db ne sera pas disponible pendant le processus d'importation.
gcloud sql import sql SQL_INSTANCE_NAME \
$(gcloud storage ls SNAPSHOTS_URI/*.sql.gz) \
--database=airflow_db \
--user=postgres
Remplacez les éléments suivants :
SQL_INSTANCE_NAME: nom de l'instance Cloud SQLSNAPSHOT_PATHavec l'URI du dossier de bucket spécifique dans lequel l'instantané est stocké Cet URI est renvoyé lorsque vous enregistrez un instantané.
Exemple :
gcloud sql import sql example-instance \
$(gcloud storage ls gs://example-bucket/environment_snapshots/example-environment_us-central1_2026-03-17T11-26-24/*.sql.gz) \
--database=airflow_db \
--user=postgres
(Recommandé) Révoquer l'accès au bucket une fois l'importation terminée
Nous vous recommandons de révoquer les autorisations d'accès au bucket Cloud Storage du compte de service de votre instance Cloud SQL une fois l'importation terminée.
Pour ce faire, exécutez la commande suivante :
gcloud storage buckets remove-iam-policy-binding gs://BUCKET_NAME \
--member=serviceAccount:SQL_SERVICE_ACCOUNT \
--role=roles/storage.objectAdmin
Remplacez les éléments suivants :
BUCKET_NAME: nom du bucket Cloud Storage Il s'agit de la partie deSNAPSHOTS_URIqui suit immédiatementgs://.SQL_SERVICE_ACCOUNT: adresse e-mail du compte de service de l'instance Cloud SQL Vous l'avez obtenue avec la commande précédente.
Exemple :
gcloud storage buckets revoke-iam-policy-binding gs://example-bucket \
--member=serviceAccount:p231236835740-kw9999@gcp-sa-cloud-sql.iam.gserviceaccount.com \
--role=roles/storage.objectAdmin
Exécuter une requête SQL sur la base de données importée
Une fois l'importation terminée, vous pouvez exécuter des requêtes sur celle-ci. Par exemple, vous pouvez exécuter une requête avec la CLI Google Cloud.
Exécuter une requête SQL sur la base de données Airflow à partir d'un DAG
Pour vous connecter à la base de données Airflow :
Créez un DAG avec un ou plusieurs opérateurs SQLExecuteQueryOperator. Pour commencer, vous pouvez utiliser l'exemple de DAG.
Dans le paramètre
sqlde l'opérateur, spécifiez votre requête SQL.Importez ce DAG dans votre environnement.
Déclenchez le DAG. Par exemple, vous pouvez le faire manuellement ou attendre qu'il s'exécute selon une planification.
Exemple de DAG :
import datetime
import os
import airflow
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
SQL_DATABASE = os.environ["SQL_DATABASE"]
with airflow.DAG(
"airflow_db_connection_example",
start_date=datetime.datetime(2025, 1, 1),
schedule=None,
catchup=False) as dag:
SQLExecuteQueryOperator(
task_id="run_airflow_db_query",
dag=dag,
conn_id="airflow_db",
database=SQL_DATABASE,
sql="SELECT * FROM dag LIMIT 10;",
)
Pour en savoir plus sur l'utilisation de SQLExecuteQueryOperator, consultez le guide pratique pour Postgres à l'aide de SQLExecuteQueryOperator dans la documentation Airflow.