Managed Airflow (Gen 3) | Managed Airflow (Gen 2) | Managed Airflow (Legacy Gen 1)
Questa pagina spiega come connetterti a un'istanza Cloud SQL che esegue il database Airflow del tuo ambiente Managed Airflow ed eseguire query SQL.
Ad esempio, potresti voler eseguire query direttamente sul database Airflow, eseguire backup del database, raccogliere statistiche in base ai contenuti del database o recuperare altre informazioni personalizzate dal database.
Prima di iniziare
Esportare i contenuti del database Airflow in un'istanza Cloud SQL
Questo approccio prevede il salvataggio di uno snapshot dell'ambiente, che contiene un dump del database Airflow, e quindi l'importazione del dump in un'istanza Cloud SQL. In questo modo, puoi eseguire query su uno snapshot dei contenuti del database Airflow.
Puoi utilizzare questo approccio in tutte le versioni di Airflow supportate da Managed Airflow (Gen 3), incluse le versioni di Airflow 3 successive alla 3.1.7, in cui l'accesso diretto al database Airflow non è più possibile.
Salvare uno snapshot dell'ambiente
Esegui questo comando per salvare uno snapshot del tuo ambiente. Dopo aver salvato uno snapshot, il risultato dell'operazione segnalerà l'URI in cui lo snapshot viene salvato nel campo snapshotPath. Utilizzerai questo URI in un secondo momento.
Per saperne di più sulla creazione di snapshot, consulta Salvare e caricare snapshot dell'ambiente.
gcloud composer environments snapshots save \
ENVIRONMENT_NAME \
--location LOCATION \
--snapshot-location "SNAPSHOTS_URI"
Sostituisci quanto segue:
ENVIRONMENT_NAME: il nome del tuo ambiente.LOCATION: la regione in cui si trova l'ambiente.(Facoltativo)
SNAPSHOTS_URIcon l'URI di una cartella del bucket in cui archiviare lo snapshot. Se ometti questo argomento, Managed Airflow salva lo snapshot nella cartella/snapshotsdel bucket dell'ambiente.
Esempio:
gcloud composer environments snapshots save \
example-environment \
--location us-central1 \
--snapshot-location "gs://example-bucket/environment_snapshots"
Risultato di esempio:
Response:
'@type': type.googleapis.com/google.cloud.orchestration.airflow.service.v1.SaveSnapshotResponse
snapshotPath: gs://example-bucket/environment_snapshots/example-environment_us-central1_2026-03-17T11-26-24
Preparare il database di destinazione
Se non hai un'istanza Cloud SQL, creane una. Questa istanza memorizzerà il database importato.
Esegui questo comando per creare un'istanza Cloud SQL:
gcloud sql instances create SQL_INSTANCE_NAME \
--database-version=POSTGRES_15 \
--cpu=2 \
--memory=4GB \
--storage-size=100GB \
--storage-auto-increase \
--region=LOCATION \
--root-password=PASSWORD
Sostituisci quanto segue:
SQL_INSTANCE_NAME: nome dell'istanza Cloud SQL.LOCATION: regione in cui deve essere creata l'istanza. Ti consigliamo di utilizzare la stessa regione del bucket in cui vengono salvati gli snapshot.PASSWORD: password che utilizzerai per connetterti all'istanza.
Esempio:
gcloud sql instances create example-instance \
--database-version=POSTGRES_15 \
--cpu=2 \
--memory=4GB \
--storage-size=100GB \
--storage-auto-increase \
--region=us-central1 \
--root-password=example_password
Esegui questo comando per creare un database denominato airflow_db:
gcloud sql databases create airflow_db \
--instance=SQL_INSTANCE_NAME
Sostituisci quanto segue:
SQL_INSTANCE_NAME: nome dell'istanza Cloud SQL.
Esempio:
gcloud sql databases create airflow_db \
--instance=example-instance
Concedere le autorizzazioni IAM al account di servizio Cloud SQL
Nel bucket contenente lo snapshot, concedi un ruolo per l'importazione dei dati al account di servizio Cloud SQL della tua istanza Cloud SQL. Per saperne di più sui ruoli IAM per l'importazione dei dati in Cloud SQL, consulta Importare un file di dump SQL in Cloud SQL per PostgreSQL.
Esegui questo comando per ottenere l'indirizzo email del account di servizio Cloud SQL:
gcloud sql instances describe SQL_INSTANCE_NAME \
--format="value(serviceAccountEmailAddress)"
Sostituisci quanto segue:
SQL_INSTANCE_NAME: nome dell'istanza Cloud SQL.
Esempio:
gcloud sql instances describe example-instance --format="value(serviceAccountEmailAddress)"
Output di esempio:
p231236835740-kw9999@gcp-sa-cloud-sql.iam.gserviceaccount.com
Concedi le autorizzazioni di lettura a questo account di servizio:
gcloud storage buckets add-iam-policy-binding gs://BUCKET_NAME \
--member=serviceAccount:SQL_SERVICE_ACCOUNT \
--role=roles/storage.objectAdmin
Sostituisci quanto segue:
BUCKET_NAME: nome del bucket Cloud Storage. Questa è la parte diSNAPSHOTS_URIimmediatamente dopogs://.SQL_SERVICE_ACCOUNT: indirizzo email del service account dell'istanza Cloud SQL. L'hai ottenuto con il comando precedente.
Esempio:
gcloud storage buckets add-iam-policy-binding gs://example-bucket \
--member=serviceAccount:p231236835740-kw9999@gcp-sa-cloud-sql.iam.gserviceaccount.com \
--role=roles/storage.objectAdmin
Importare il dump del database
Esegui questo comando per importare il file di dump del database dallo snapshot salvato in precedenza nel database airflow_db dell'istanza Cloud SQL.
Il database airflow_db non sarà disponibile durante il processo di importazione.
gcloud sql import sql SQL_INSTANCE_NAME \
$(gcloud storage ls SNAPSHOTS_URI/*.sql.gz) \
--database=airflow_db \
--user=postgres
Sostituisci quanto segue:
SQL_INSTANCE_NAME: nome dell'istanza Cloud SQL.SNAPSHOT_PATHcon l'URI della cartella del bucket specifica in cui è archiviato lo snapshot. Questo URI viene restituito quando salvi uno snapshot.
Esempio:
gcloud sql import sql example-instance \
$(gcloud storage ls gs://example-bucket/environment_snapshots/example-environment_us-central1_2026-03-17T11-26-24/*.sql.gz) \
--database=airflow_db \
--user=postgres
(Consigliato) Revocare l'accesso al bucket al termine dell'importazione
Ti consigliamo di revocare le autorizzazioni di accesso al bucket Cloud Storage dal account di servizio dell'istanza Cloud SQL al termine dell'importazione.
Per farlo, esegui questo comando:
gcloud storage buckets remove-iam-policy-binding gs://BUCKET_NAME \
--member=serviceAccount:SQL_SERVICE_ACCOUNT \
--role=roles/storage.objectAdmin
Sostituisci quanto segue:
BUCKET_NAME: nome del bucket Cloud Storage. Questa è la parte diSNAPSHOTS_URIimmediatamente dopogs://.SQL_SERVICE_ACCOUNT: indirizzo email del service account dell'istanza Cloud SQL. L'hai ottenuto con il comando precedente.
Esempio:
gcloud storage buckets revoke-iam-policy-binding gs://example-bucket \
--member=serviceAccount:p231236835740-kw9999@gcp-sa-cloud-sql.iam.gserviceaccount.com \
--role=roles/storage.objectAdmin
Eseguire una query SQL sul database importato
Al termine dell'importazione, puoi eseguire query. Ad esempio, puoi eseguire una query con Google Cloud CLI.
Eseguire una query SQL sul database Airflow da un DAG
Per connetterti al database Airflow:
Crea un DAG con uno o più operatori SQLExecuteQueryOperator. Per iniziare, puoi utilizzare il DAG di esempio.
Nel parametro
sqldell'operatore, specifica la query SQL.Carica questo DAG nel tuo ambiente.
Attiva il DAG, ad esempio puoi farlo manualmente o attendere che venga eseguito in base a una pianificazione.
DAG di esempio:
import datetime
import os
import airflow
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
SQL_DATABASE = os.environ["SQL_DATABASE"]
with airflow.DAG(
"airflow_db_connection_example",
start_date=datetime.datetime(2025, 1, 1),
schedule=None,
catchup=False) as dag:
SQLExecuteQueryOperator(
task_id="run_airflow_db_query",
dag=dag,
conn_id="airflow_db",
database=SQL_DATABASE,
sql="SELECT * FROM dag LIMIT 10;",
)
Per saperne di più sull'utilizzo di SQLExecuteQueryOperator, consulta la guida illustrativa per Postgres che utilizza SQLExecuteQueryOperator nella documentazione di Airflow.