Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Auf dieser Seite wird erläutert, wie Sie eine Verbindung zu einer Cloud SQL-Instanz herstellen, die die Airflow-Datenbank Ihrer Cloud Composer-Umgebung ausführt und wie Sie SQL-Abfragen ausführen.
Sie können beispielsweise Abfragen direkt in der Airflow-Datenbank ausführen, Datenbanksicherungen erstellen, Statistiken basierend auf dem Datenbankinhalt erfassen oder andere benutzerdefinierte Informationen aus der Datenbank abrufen.
Hinweis
Airflow-Datenbankinhalte in eine Cloud SQL-Instanz exportieren
Bei diesem Ansatz wird ein Umgebungssnapshot gespeichert, der einen Airflow-Datenbankdump enthält. Anschließend wird der Dump in eine Cloud SQL-Instanz importiert. So können Sie Abfragen für einen Snapshot der Airflow-Datenbankinhalte ausführen.
Sie können diesen Ansatz in allen von Cloud Composer 3 unterstützten Airflow-Versionen verwenden, einschließlich Airflow 3-Versionen nach 3.1.7, in denen der direkte Zugriff auf die Airflow-Datenbank nicht mehr möglich ist.
Umgebungs-Snapshot speichern
Führen Sie den folgenden Befehl aus, um einen Snapshot Ihrer Umgebung zu speichern. Nachdem Sie einen Snapshot gespeichert haben, wird im Ergebnis des Vorgangs der URI angegeben, unter dem der Snapshot im Feld snapshotPath gespeichert ist. Sie benötigen diesen URI später.
Weitere Informationen zum Erstellen von Snapshots finden Sie unter Umgebungssnapshots speichern und laden.
gcloud composer environments snapshots save \
ENVIRONMENT_NAME \
--location LOCATION \
--snapshot-location "SNAPSHOTS_URI"
Ersetzen Sie Folgendes:
ENVIRONMENT_NAME: der Name Ihrer UmgebungLOCATION: die Region, in der sich die Umgebung befindet.Optional:
SNAPSHOTS_URImit dem URI eines Bucket-Ordners, in dem der Snapshot gespeichert werden soll. Wenn Sie dieses Argument weglassen, speichert Cloud Composer den Snapshot im Ordner/snapshotsim Bucket Ihrer Umgebung.
Beispiel:
gcloud composer environments snapshots save \
example-environment \
--location us-central1 \
--snapshot-location "gs://example-bucket/environment_snapshots"
Beispielergebnis:
Response:
'@type': type.googleapis.com/google.cloud.orchestration.airflow.service.v1.SaveSnapshotResponse
snapshotPath: gs://example-bucket/environment_snapshots/example-environment_us-central1_2026-03-17T11-26-24
Zieldatenbank vorbereiten
Wenn Sie keine Cloud SQL-Instanz haben, erstellen Sie eine. In dieser Instanz wird die importierte Datenbank gespeichert.
Führen Sie den folgenden Befehl aus, um eine Cloud SQL-Instanz zu erstellen:
gcloud sql instances create SQL_INSTANCE_NAME \
--database-version=POSTGRES_15 \
--cpu=2 \
--memory=4GB \
--storage-size=100GB \
--storage-auto-increase \
--region=LOCATION \
--root-password=PASSWORD
Ersetzen Sie Folgendes:
SQL_INSTANCE_NAME: Name der Cloud SQL-Instanz.LOCATION: Die Region, in der die Instanz erstellt werden muss. Wir empfehlen, dieselbe Region wie den Bucket zu verwenden, in dem die Snapshots gespeichert werden.PASSWORD: Das Passwort, das Sie zum Herstellen einer Verbindung zur Instanz verwenden.
Beispiel:
gcloud sql instances create example-instance \
--database-version=POSTGRES_15 \
--cpu=2 \
--memory=4GB \
--storage-size=100GB \
--storage-auto-increase \
--region=us-central1 \
--root-password=example_password
Führen Sie den folgenden Befehl aus, um eine Datenbank mit dem Namen airflow_db zu erstellen:
gcloud sql databases create airflow_db \
--instance=SQL_INSTANCE_NAME
Ersetzen Sie Folgendes:
SQL_INSTANCE_NAME: Name der Cloud SQL-Instanz.
Beispiel:
gcloud sql databases create airflow_db \
--instance=example-instance
Dem Cloud SQL-Dienstkonto IAM-Berechtigungen gewähren
Weisen Sie dem Cloud SQL-Dienstkonto Ihrer Cloud SQL-Instanz eine Rolle zum Importieren von Daten für den Bucket zu, der den Snapshot enthält. Weitere Informationen zu IAM-Rollen für den Import von Daten in Cloud SQL finden Sie unter SQL-Dumpdatei in Cloud SQL for PostgreSQL importieren.
Führen Sie den folgenden Befehl aus, um die E-Mail-Adresse des Cloud SQL-Dienstkontos abzurufen:
gcloud sql instances describe SQL_INSTANCE_NAME \
--format="value(serviceAccountEmailAddress)"
Ersetzen Sie Folgendes:
SQL_INSTANCE_NAME: Name der Cloud SQL-Instanz.
Beispiel:
gcloud sql instances describe example-instance --format="value(serviceAccountEmailAddress)"
Beispielausgabe:
p231236835740-kw9999@gcp-sa-cloud-sql.iam.gserviceaccount.com
Gewähren Sie diesem Dienstkonto Leseberechtigungen:
gcloud storage buckets add-iam-policy-binding gs://BUCKET_NAME \
--member=serviceAccount:SQL_SERVICE_ACCOUNT \
--role=roles/storage.objectAdmin
Ersetzen Sie Folgendes:
BUCKET_NAME: Name des Cloud Storage-Bucket. Dies ist der Teil vonSNAPSHOTS_URIunmittelbar nachgs://.SQL_SERVICE_ACCOUNT: E-Mail-Adresse des Dienstkontos der Cloud SQL-Instanz. Sie haben sie mit dem vorherigen Befehl abgerufen.
Beispiel:
gcloud storage buckets add-iam-policy-binding gs://example-bucket \
--member=serviceAccount:p231236835740-kw9999@gcp-sa-cloud-sql.iam.gserviceaccount.com \
--role=roles/storage.objectAdmin
Datenbank-Dump importieren
Führen Sie den folgenden Befehl aus, um die Datenbank-Dumpdatei aus dem zuvor gespeicherten Snapshot in die airflow_db-Datenbank Ihrer Cloud SQL-Instanz zu importieren.
Die Datenbank airflow_db ist während des Importvorgangs nicht verfügbar.
gcloud sql import sql SQL_INSTANCE_NAME \
$(gcloud storage ls SNAPSHOTS_URI/*.sql.gz) \
--database=airflow_db \
--user=postgres
Ersetzen Sie Folgendes:
SQL_INSTANCE_NAME: Name der Cloud SQL-Instanz.SNAPSHOT_PATHdurch den URI des spezifischen Bucket-Ordners, in dem der Snapshot gespeichert ist. Dieser URI wird zurückgegeben, wenn Sie einen Snapshot speichern.
Beispiel:
gcloud sql import sql example-instance \
$(gcloud storage ls gs://example-bucket/environment_snapshots/example-environment_us-central1_2026-03-17T11-26-24/*.sql.gz) \
--database=airflow_db \
--user=postgres
(Empfohlen) Bucket-Zugriff nach Abschluss des Imports widerrufen
Wir empfehlen, die Berechtigungen für den Zugriff auf Cloud Storage-Bucket für das Dienstkonto Ihrer Cloud SQL-Instanz zu widerrufen, nachdem der Import abgeschlossen ist.
Führen Sie hierzu den folgenden Befehl aus:
gcloud storage buckets remove-iam-policy-binding gs://BUCKET_NAME \
--member=serviceAccount:SQL_SERVICE_ACCOUNT \
--role=roles/storage.objectAdmin
Ersetzen Sie Folgendes:
BUCKET_NAME: Name des Cloud Storage-Bucket. Dies ist der Teil vonSNAPSHOTS_URIunmittelbar nachgs://.SQL_SERVICE_ACCOUNT: E-Mail-Adresse des Dienstkontos der Cloud SQL-Instanz. Sie haben sie mit dem vorherigen Befehl abgerufen.
Beispiel:
gcloud storage buckets revoke-iam-policy-binding gs://example-bucket \
--member=serviceAccount:p231236835740-kw9999@gcp-sa-cloud-sql.iam.gserviceaccount.com \
--role=roles/storage.objectAdmin
SQL-Abfrage für die importierte Datenbank ausführen
Nach Abschluss des Imports können Sie Abfragen dafür ausführen. Sie können beispielsweise eine Abfrage mit der Google Cloud CLI ausführen.
SQL-Abfrage für die Airflow-Datenbank über einen DAG ausführen
So stellen Sie eine Verbindung zur Airflow-Datenbank her:
Erstellen Sie einen DAG mit einem oder mehreren SQLExecuteQueryOperator-Operatoren. Für den Einstieg können Sie die Beispiel-DAG verwenden.
Geben Sie im Parameter
sqldes Operators Ihre SQL-Abfrage an.Lösen Sie die DAG aus. Das kann beispielsweise manuell erfolgen oder Sie warten, bis sie nach Zeitplan ausgeführt wird.
Beispiel-DAG:
import datetime
import os
import airflow
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
SQL_DATABASE = os.environ["SQL_DATABASE"]
with airflow.DAG(
"airflow_db_connection_example",
start_date=datetime.datetime(2025, 1, 1),
schedule=None,
catchup=False) as dag:
SQLExecuteQueryOperator(
task_id="run_airflow_db_query",
dag=dag,
conn_id="airflow_db",
database=SQL_DATABASE,
sql="SELECT * FROM dag LIMIT 10;",
)
Weitere Informationen zur Verwendung von SQLExecuteQueryOperator finden Sie in der Airflow-Dokumentation im How-to Guide for Postgres using SQLExecuteQueryOperator.