Accedere al database Airflow

Managed Airflow (Gen 3) | Managed Airflow (Gen 2) | Managed Airflow (Legacy Gen 1)

Questa pagina spiega come connetterti a un'istanza Cloud SQL che esegue il database Airflow del tuo ambiente Managed Airflow ed eseguire query SQL.

Ad esempio, potresti voler eseguire query direttamente sul database Airflow, eseguire backup del database, raccogliere statistiche in base ai contenuti del database o recuperare altre informazioni personalizzate dal database.

Prima di iniziare

Esportare i contenuti del database Airflow in un'istanza Cloud SQL

Questo approccio prevede il salvataggio di uno snapshot dell'ambiente, che contiene un dump del database Airflow, e quindi l'importazione del dump in un'istanza Cloud SQL. In questo modo, puoi eseguire query su uno snapshot dei contenuti del database Airflow.

Puoi utilizzare questo approccio in tutte le versioni di Airflow supportate da Managed Airflow (Gen 3), incluse le versioni di Airflow 3 successive alla 3.1.7, in cui l'accesso diretto al database Airflow non è più possibile.

Salvare uno snapshot dell'ambiente

Esegui questo comando per salvare uno snapshot del tuo ambiente. Dopo aver salvato uno snapshot, il risultato dell'operazione segnalerà l'URI in cui lo snapshot viene salvato nel campo snapshotPath. Utilizzerai questo URI in un secondo momento.

Per saperne di più sulla creazione di snapshot, consulta Salvare e caricare snapshot dell'ambiente.

gcloud composer environments snapshots save \
  ENVIRONMENT_NAME \
  --location LOCATION \
  --snapshot-location "SNAPSHOTS_URI"

Sostituisci quanto segue:

  • ENVIRONMENT_NAME: il nome del tuo ambiente.
  • LOCATION: la regione in cui si trova l'ambiente.
  • (Facoltativo) SNAPSHOTS_URI con l'URI di una cartella del bucket in cui archiviare lo snapshot. Se ometti questo argomento, Managed Airflow salva lo snapshot nella cartella /snapshots del bucket dell'ambiente.

Esempio:

gcloud composer environments snapshots save \
  example-environment \
  --location us-central1 \
  --snapshot-location "gs://example-bucket/environment_snapshots"

Risultato di esempio:

Response:
'@type': type.googleapis.com/google.cloud.orchestration.airflow.service.v1.SaveSnapshotResponse
snapshotPath: gs://example-bucket/environment_snapshots/example-environment_us-central1_2026-03-17T11-26-24

Preparare il database di destinazione

Se non hai un'istanza Cloud SQL, creane una. Questa istanza memorizzerà il database importato.

Esegui questo comando per creare un'istanza Cloud SQL:

gcloud sql instances create SQL_INSTANCE_NAME \
  --database-version=POSTGRES_15 \
  --cpu=2 \
  --memory=4GB \
  --storage-size=100GB \
  --storage-auto-increase \
  --region=LOCATION \
  --root-password=PASSWORD

Sostituisci quanto segue:

  • SQL_INSTANCE_NAME: nome dell'istanza Cloud SQL.
  • LOCATION: regione in cui deve essere creata l'istanza. Ti consigliamo di utilizzare la stessa regione del bucket in cui vengono salvati gli snapshot.
  • PASSWORD: password che utilizzerai per connetterti all'istanza.

Esempio:

gcloud sql instances create example-instance \
  --database-version=POSTGRES_15 \
  --cpu=2 \
  --memory=4GB \
  --storage-size=100GB \
  --storage-auto-increase \
  --region=us-central1 \
  --root-password=example_password

Esegui questo comando per creare un database denominato airflow_db:

gcloud sql databases create airflow_db \
  --instance=SQL_INSTANCE_NAME

Sostituisci quanto segue:

  • SQL_INSTANCE_NAME: nome dell'istanza Cloud SQL.

Esempio:

gcloud sql databases create airflow_db \
  --instance=example-instance

Concedere le autorizzazioni IAM al account di servizio Cloud SQL

Nel bucket contenente lo snapshot, concedi un ruolo per l'importazione dei dati al account di servizio Cloud SQL della tua istanza Cloud SQL. Per saperne di più sui ruoli IAM per l'importazione dei dati in Cloud SQL, consulta Importare un file di dump SQL in Cloud SQL per PostgreSQL.

Esegui questo comando per ottenere l'indirizzo email del account di servizio Cloud SQL:

gcloud sql instances describe SQL_INSTANCE_NAME \
  --format="value(serviceAccountEmailAddress)"

Sostituisci quanto segue:

  • SQL_INSTANCE_NAME: nome dell'istanza Cloud SQL.

Esempio:

gcloud sql instances describe example-instance --format="value(serviceAccountEmailAddress)"

Output di esempio:

p231236835740-kw9999@gcp-sa-cloud-sql.iam.gserviceaccount.com

Concedi le autorizzazioni di lettura a questo account di servizio:

gcloud storage buckets add-iam-policy-binding gs://BUCKET_NAME \
  --member=serviceAccount:SQL_SERVICE_ACCOUNT \
  --role=roles/storage.objectAdmin

Sostituisci quanto segue:

  • BUCKET_NAME: nome del bucket Cloud Storage. Questa è la parte di SNAPSHOTS_URI immediatamente dopo gs://.
  • SQL_SERVICE_ACCOUNT: indirizzo email del service account dell'istanza Cloud SQL. L'hai ottenuto con il comando precedente.

Esempio:

gcloud storage buckets add-iam-policy-binding gs://example-bucket \
  --member=serviceAccount:p231236835740-kw9999@gcp-sa-cloud-sql.iam.gserviceaccount.com \
  --role=roles/storage.objectAdmin

Importare il dump del database

Esegui questo comando per importare il file di dump del database dallo snapshot salvato in precedenza nel database airflow_db dell'istanza Cloud SQL.

Il database airflow_db non sarà disponibile durante il processo di importazione.

gcloud sql import sql SQL_INSTANCE_NAME \
  $(gcloud storage ls SNAPSHOTS_URI/*.sql.gz) \
  --database=airflow_db \
  --user=postgres

Sostituisci quanto segue:

  • SQL_INSTANCE_NAME: nome dell'istanza Cloud SQL.
  • SNAPSHOT_PATH con l'URI della cartella del bucket specifica in cui è archiviato lo snapshot. Questo URI viene restituito quando salvi uno snapshot.

Esempio:

gcloud sql import sql example-instance \
  $(gcloud storage ls gs://example-bucket/environment_snapshots/example-environment_us-central1_2026-03-17T11-26-24/*.sql.gz) \
  --database=airflow_db \
  --user=postgres

(Consigliato) Revocare l'accesso al bucket al termine dell'importazione

Ti consigliamo di revocare le autorizzazioni di accesso al bucket Cloud Storage dal account di servizio dell'istanza Cloud SQL al termine dell'importazione.

Per farlo, esegui questo comando:

gcloud storage buckets remove-iam-policy-binding gs://BUCKET_NAME \
  --member=serviceAccount:SQL_SERVICE_ACCOUNT \
  --role=roles/storage.objectAdmin

Sostituisci quanto segue:

  • BUCKET_NAME: nome del bucket Cloud Storage. Questa è la parte di SNAPSHOTS_URI immediatamente dopo gs://.
  • SQL_SERVICE_ACCOUNT: indirizzo email del service account dell'istanza Cloud SQL. L'hai ottenuto con il comando precedente.

Esempio:

gcloud storage buckets revoke-iam-policy-binding gs://example-bucket \
  --member=serviceAccount:p231236835740-kw9999@gcp-sa-cloud-sql.iam.gserviceaccount.com \
  --role=roles/storage.objectAdmin

Eseguire una query SQL sul database importato

Al termine dell'importazione, puoi eseguire query. Ad esempio, puoi eseguire una query con Google Cloud CLI.

Eseguire una query SQL sul database Airflow da un DAG

Per connetterti al database Airflow:

  1. Crea un DAG con uno o più operatori SQLExecuteQueryOperator. Per iniziare, puoi utilizzare il DAG di esempio.

  2. Nel parametro sql dell'operatore, specifica la query SQL.

  3. Carica questo DAG nel tuo ambiente.

  4. Attiva il DAG, ad esempio puoi farlo manualmente o attendere che venga eseguito in base a una pianificazione.

DAG di esempio:

import datetime
import os

import airflow
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator

SQL_DATABASE = os.environ["SQL_DATABASE"]

with airflow.DAG(
    "airflow_db_connection_example",
    start_date=datetime.datetime(2025, 1, 1),
    schedule=None,
    catchup=False) as dag:

    SQLExecuteQueryOperator(
        task_id="run_airflow_db_query",
        dag=dag,
        conn_id="airflow_db",
        database=SQL_DATABASE,
        sql="SELECT * FROM dag LIMIT 10;",
    )

Per saperne di più sull'utilizzo di SQLExecuteQueryOperator, consulta la guida illustrativa per Postgres che utilizza SQLExecuteQueryOperator nella documentazione di Airflow.

Eseguire il dump dei contenuti del database e trasferirli in un bucket

Passaggi successivi