Accéder à la base de données Airflow

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Cette page explique comment se connecter à une instance Cloud SQL qui exécute la base de données Airflow de votre environnement Cloud Composer et exécuter des requêtes SQL.

Par exemple, vous pouvez exécuter des requêtes directement sur la base de données Airflow, effectuer des sauvegardes de la base de données, collecter des statistiques en fonction du contenu de la base de données ou récupérer d'autres informations personnalisées de la base de données.

Avant de commencer

Exporter le contenu de la base de données Airflow vers une instance Cloud SQL

Cette approche consiste à enregistrer un instantané de l'environnement, qui contient un vidage de la base de données Airflow, puis à importer le vidage dans une instance Cloud SQL. Vous pouvez ainsi exécuter des requêtes sur un instantané du contenu de la base de données Airflow.

Vous pouvez utiliser cette approche dans toutes les versions d'Airflow compatibles avec Cloud Composer 3, y compris les versions d'Airflow ultérieures à la version 3.1.7, où l'accès direct à la base de données Airflow n'est plus possible.

Enregistrer un instantané d'environnement

Exécutez la commande suivante pour enregistrer un instantané de votre environnement. Une fois que vous avez enregistré un instantané, le résultat de l'opération indique l'URI où l'instantané est enregistré dans le champ snapshotPath. Vous utiliserez cet URI plus tard.

Pour en savoir plus sur la création d'instantanés, consultez Enregistrer et charger des instantanés d'environnement.

gcloud composer environments snapshots save \
  ENVIRONMENT_NAME \
  --location LOCATION \
  --snapshot-location "SNAPSHOTS_URI"

Remplacez les éléments suivants :

  • ENVIRONMENT_NAME : nom de votre environnement
  • LOCATION : région où se trouve l'environnement.
  • (Facultatif) SNAPSHOTS_URI avec l'URI d'un dossier de bucket dans lequel stocker l'instantané. Si vous omettez cet argument, Cloud Composer enregistre l'instantané dans le dossier /snapshots du bucket de votre environnement.

Exemple :

gcloud composer environments snapshots save \
  example-environment \
  --location us-central1 \
  --snapshot-location "gs://example-bucket/environment_snapshots"

Exemple de résultat :

Response:
'@type': type.googleapis.com/google.cloud.orchestration.airflow.service.v1.SaveSnapshotResponse
snapshotPath: gs://example-bucket/environment_snapshots/example-environment_us-central1_2026-03-17T11-26-24

Préparer la base de données de destination

Si vous ne disposez pas d'instance Cloud SQL, créez-en une. Cette instance stockera la base de données importée.

Exécutez la commande suivante pour créer une instance Cloud SQL :

gcloud sql instances create SQL_INSTANCE_NAME \
  --database-version=POSTGRES_15 \
  --cpu=2 \
  --memory=4GB \
  --storage-size=100GB \
  --storage-auto-increase \
  --region=LOCATION \
  --root-password=PASSWORD

Remplacez les éléments suivants :

  • SQL_INSTANCE_NAME : nom de l'instance Cloud SQL.
  • LOCATION : région dans laquelle l'instance doit être créée. Nous vous recommandons d'utiliser la même région que le bucket dans lequel les instantanés sont enregistrés.
  • PASSWORD : mot de passe que vous utiliserez pour vous connecter à l'instance.

Exemple :

gcloud sql instances create example-instance \
  --database-version=POSTGRES_15 \
  --cpu=2 \
  --memory=4GB \
  --storage-size=100GB \
  --storage-auto-increase \
  --region=us-central1 \
  --root-password=example_password

Exécutez la commande suivante pour créer une base de données nommée airflow_db :

gcloud sql databases create airflow_db \
  --instance=SQL_INSTANCE_NAME

Remplacez les éléments suivants :

  • SQL_INSTANCE_NAME : nom de l'instance Cloud SQL.

Exemple :

gcloud sql databases create airflow_db \
  --instance=example-instance

Accorder des autorisations IAM au compte de service Cloud SQL

Dans le bucket contenant l'instantané, accordez un rôle pour importer des données au compte de service Cloud SQL de votre instance Cloud SQL. Pour en savoir plus sur les rôles IAM permettant d'importer des données dans Cloud SQL, consultez Importer un fichier de dump SQL dans Cloud SQL pour PostgreSQL.

Exécutez la commande suivante pour obtenir l'adresse e-mail du compte de service Cloud SQL :

gcloud sql instances describe SQL_INSTANCE_NAME \
  --format="value(serviceAccountEmailAddress)"

Remplacez les éléments suivants :

  • SQL_INSTANCE_NAME : nom de l'instance Cloud SQL.

Exemple :

gcloud sql instances describe example-instance --format="value(serviceAccountEmailAddress)"

Exemple de résultat :

p231236835740-kw9999@gcp-sa-cloud-sql.iam.gserviceaccount.com

Accordez des autorisations de lecture à ce compte de service :

gcloud storage buckets add-iam-policy-binding gs://BUCKET_NAME \
  --member=serviceAccount:SQL_SERVICE_ACCOUNT \
  --role=roles/storage.objectAdmin

Remplacez les éléments suivants :

  • BUCKET_NAME : nom du bucket Cloud Storage. Il s'agit de la partie de SNAPSHOTS_URI immédiatement après gs://.
  • SQL_SERVICE_ACCOUNT : adresse e-mail du compte de service de l'instance Cloud SQL. Vous l'avez obtenu avec la commande précédente.

Exemple :

gcloud storage buckets add-iam-policy-binding gs://example-bucket \
  --member=serviceAccount:p231236835740-kw9999@gcp-sa-cloud-sql.iam.gserviceaccount.com \
  --role=roles/storage.objectAdmin

Importer le vidage de la base de données

Exécutez la commande suivante pour importer le fichier de dump de la base de données à partir du snapshot enregistré précédemment dans la base de données airflow_db de votre instance Cloud SQL.

La base de données airflow_db sera indisponible pendant le processus d'importation.

gcloud sql import sql SQL_INSTANCE_NAME \
  $(gcloud storage ls SNAPSHOTS_URI/*.sql.gz) \
  --database=airflow_db \
  --user=postgres

Remplacez les éléments suivants :

  • SQL_INSTANCE_NAME : nom de l'instance Cloud SQL.
  • SNAPSHOT_PATH par l'URI du dossier de bucket spécifique dans lequel le snapshot est stocké. Cet URI est renvoyé lorsque vous enregistrez un instantané.

Exemple :

gcloud sql import sql example-instance \
  $(gcloud storage ls gs://example-bucket/environment_snapshots/example-environment_us-central1_2026-03-17T11-26-24/*.sql.gz) \
  --database=airflow_db \
  --user=postgres

(Recommandé) Révoquez l'accès au bucket une fois l'importation terminée.

Nous vous recommandons de révoquer les autorisations d'accès au bucket Cloud Storage du compte de service de votre instance Cloud SQL une fois l'importation terminée.

Pour ce faire, exécutez la commande suivante :

gcloud storage buckets remove-iam-policy-binding gs://BUCKET_NAME \
  --member=serviceAccount:SQL_SERVICE_ACCOUNT \
  --role=roles/storage.objectAdmin

Remplacez les éléments suivants :

  • BUCKET_NAME : nom du bucket Cloud Storage. Il s'agit de la partie de SNAPSHOTS_URI immédiatement après gs://.
  • SQL_SERVICE_ACCOUNT : adresse e-mail du compte de service de l'instance Cloud SQL. Vous l'avez obtenu avec la commande précédente.

Exemple :

gcloud storage buckets revoke-iam-policy-binding gs://example-bucket \
  --member=serviceAccount:p231236835740-kw9999@gcp-sa-cloud-sql.iam.gserviceaccount.com \
  --role=roles/storage.objectAdmin

Exécuter une requête SQL sur la base de données importée

Une fois l'importation terminée, vous pouvez exécuter des requêtes sur les données. Par exemple, vous pouvez exécuter une requête avec Google Cloud CLI.

Exécuter une requête SQL sur la base de données Airflow à partir d'un DAG

Pour vous connecter à la base de données Airflow :

  1. Créez un DAG avec un ou plusieurs opérateurs SQLExecuteQueryOperator. Pour commencer, vous pouvez utiliser l'exemple de DAG.

  2. Dans le paramètre sql de l'opérateur, spécifiez votre requête SQL.

  3. Importez ce DAG dans votre environnement.

  4. Déclenchez le DAG. Par exemple, vous pouvez le faire manuellement ou attendre qu'il s'exécute selon une planification.

Exemple de DAG :

import datetime
import os

import airflow
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator

SQL_DATABASE = os.environ["SQL_DATABASE"]

with airflow.DAG(
    "airflow_db_connection_example",
    start_date=datetime.datetime(2025, 1, 1),
    schedule=None,
    catchup=False) as dag:

    SQLExecuteQueryOperator(
        task_id="run_airflow_db_query",
        dag=dag,
        conn_id="airflow_db",
        database=SQL_DATABASE,
        sql="SELECT * FROM dag LIMIT 10;",
    )

Pour en savoir plus sur l'utilisation de SQLExecuteQueryOperator, consultez le guide pratique pour Postgres à l'aide de SQLExecuteQueryOperator dans la documentation Airflow.

Vider le contenu de la base de données et le transférer dans un bucket

Étapes suivantes