Auf die Airflow-Datenbank zugreifen

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Auf dieser Seite wird erläutert, wie Sie eine Verbindung zu einer Cloud SQL-Instanz herstellen, die die Airflow-Datenbank Ihrer Cloud Composer-Umgebung ausführt und wie Sie SQL-Abfragen ausführen.

Sie können beispielsweise Abfragen direkt in der Airflow-Datenbank ausführen, Datenbanksicherungen erstellen, Statistiken basierend auf dem Datenbankinhalt erfassen oder andere benutzerdefinierte Informationen aus der Datenbank abrufen.

Hinweis

Airflow-Datenbankinhalte in eine Cloud SQL-Instanz exportieren

Bei diesem Ansatz wird ein Umgebungssnapshot gespeichert, der einen Airflow-Datenbankdump enthält. Anschließend wird der Dump in eine Cloud SQL-Instanz importiert. So können Sie Abfragen für einen Snapshot der Airflow-Datenbankinhalte ausführen.

Sie können diesen Ansatz in allen von Cloud Composer 3 unterstützten Airflow-Versionen verwenden, einschließlich Airflow 3-Versionen nach 3.1.7, in denen der direkte Zugriff auf die Airflow-Datenbank nicht mehr möglich ist.

Umgebungs-Snapshot speichern

Führen Sie den folgenden Befehl aus, um einen Snapshot Ihrer Umgebung zu speichern. Nachdem Sie einen Snapshot gespeichert haben, wird im Ergebnis des Vorgangs der URI angegeben, unter dem der Snapshot im Feld snapshotPath gespeichert ist. Sie benötigen diesen URI später.

Weitere Informationen zum Erstellen von Snapshots finden Sie unter Umgebungssnapshots speichern und laden.

gcloud composer environments snapshots save \
  ENVIRONMENT_NAME \
  --location LOCATION \
  --snapshot-location "SNAPSHOTS_URI"

Ersetzen Sie Folgendes:

  • ENVIRONMENT_NAME: der Name Ihrer Umgebung
  • LOCATION: die Region, in der sich die Umgebung befindet.
  • Optional: SNAPSHOTS_URI mit dem URI eines Bucket-Ordners, in dem der Snapshot gespeichert werden soll. Wenn Sie dieses Argument weglassen, speichert Cloud Composer den Snapshot im Ordner /snapshots im Bucket Ihrer Umgebung.

Beispiel:

gcloud composer environments snapshots save \
  example-environment \
  --location us-central1 \
  --snapshot-location "gs://example-bucket/environment_snapshots"

Beispielergebnis:

Response:
'@type': type.googleapis.com/google.cloud.orchestration.airflow.service.v1.SaveSnapshotResponse
snapshotPath: gs://example-bucket/environment_snapshots/example-environment_us-central1_2026-03-17T11-26-24

Zieldatenbank vorbereiten

Wenn Sie keine Cloud SQL-Instanz haben, erstellen Sie eine. In dieser Instanz wird die importierte Datenbank gespeichert.

Führen Sie den folgenden Befehl aus, um eine Cloud SQL-Instanz zu erstellen:

gcloud sql instances create SQL_INSTANCE_NAME \
  --database-version=POSTGRES_15 \
  --cpu=2 \
  --memory=4GB \
  --storage-size=100GB \
  --storage-auto-increase \
  --region=LOCATION \
  --root-password=PASSWORD

Ersetzen Sie Folgendes:

  • SQL_INSTANCE_NAME: Name der Cloud SQL-Instanz.
  • LOCATION: Die Region, in der die Instanz erstellt werden muss. Wir empfehlen, dieselbe Region wie den Bucket zu verwenden, in dem die Snapshots gespeichert werden.
  • PASSWORD: Das Passwort, das Sie zum Herstellen einer Verbindung zur Instanz verwenden.

Beispiel:

gcloud sql instances create example-instance \
  --database-version=POSTGRES_15 \
  --cpu=2 \
  --memory=4GB \
  --storage-size=100GB \
  --storage-auto-increase \
  --region=us-central1 \
  --root-password=example_password

Führen Sie den folgenden Befehl aus, um eine Datenbank mit dem Namen airflow_db zu erstellen:

gcloud sql databases create airflow_db \
  --instance=SQL_INSTANCE_NAME

Ersetzen Sie Folgendes:

  • SQL_INSTANCE_NAME: Name der Cloud SQL-Instanz.

Beispiel:

gcloud sql databases create airflow_db \
  --instance=example-instance

Dem Cloud SQL-Dienstkonto IAM-Berechtigungen gewähren

Weisen Sie dem Cloud SQL-Dienstkonto Ihrer Cloud SQL-Instanz eine Rolle zum Importieren von Daten für den Bucket zu, der den Snapshot enthält. Weitere Informationen zu IAM-Rollen für den Import von Daten in Cloud SQL finden Sie unter SQL-Dumpdatei in Cloud SQL for PostgreSQL importieren.

Führen Sie den folgenden Befehl aus, um die E-Mail-Adresse des Cloud SQL-Dienstkontos abzurufen:

gcloud sql instances describe SQL_INSTANCE_NAME \
  --format="value(serviceAccountEmailAddress)"

Ersetzen Sie Folgendes:

  • SQL_INSTANCE_NAME: Name der Cloud SQL-Instanz.

Beispiel:

gcloud sql instances describe example-instance --format="value(serviceAccountEmailAddress)"

Beispielausgabe:

p231236835740-kw9999@gcp-sa-cloud-sql.iam.gserviceaccount.com

Gewähren Sie diesem Dienstkonto Leseberechtigungen:

gcloud storage buckets add-iam-policy-binding gs://BUCKET_NAME \
  --member=serviceAccount:SQL_SERVICE_ACCOUNT \
  --role=roles/storage.objectAdmin

Ersetzen Sie Folgendes:

  • BUCKET_NAME: Name des Cloud Storage-Bucket. Dies ist der Teil von SNAPSHOTS_URI unmittelbar nach gs://.
  • SQL_SERVICE_ACCOUNT: E-Mail-Adresse des Dienstkontos der Cloud SQL-Instanz. Sie haben sie mit dem vorherigen Befehl abgerufen.

Beispiel:

gcloud storage buckets add-iam-policy-binding gs://example-bucket \
  --member=serviceAccount:p231236835740-kw9999@gcp-sa-cloud-sql.iam.gserviceaccount.com \
  --role=roles/storage.objectAdmin

Datenbank-Dump importieren

Führen Sie den folgenden Befehl aus, um die Datenbank-Dumpdatei aus dem zuvor gespeicherten Snapshot in die airflow_db-Datenbank Ihrer Cloud SQL-Instanz zu importieren.

Die Datenbank airflow_db ist während des Importvorgangs nicht verfügbar.

gcloud sql import sql SQL_INSTANCE_NAME \
  $(gcloud storage ls SNAPSHOTS_URI/*.sql.gz) \
  --database=airflow_db \
  --user=postgres

Ersetzen Sie Folgendes:

  • SQL_INSTANCE_NAME: Name der Cloud SQL-Instanz.
  • SNAPSHOT_PATH durch den URI des spezifischen Bucket-Ordners, in dem der Snapshot gespeichert ist. Dieser URI wird zurückgegeben, wenn Sie einen Snapshot speichern.

Beispiel:

gcloud sql import sql example-instance \
  $(gcloud storage ls gs://example-bucket/environment_snapshots/example-environment_us-central1_2026-03-17T11-26-24/*.sql.gz) \
  --database=airflow_db \
  --user=postgres

(Empfohlen) Bucket-Zugriff nach Abschluss des Imports widerrufen

Wir empfehlen, die Berechtigungen für den Zugriff auf Cloud Storage-Bucket für das Dienstkonto Ihrer Cloud SQL-Instanz zu widerrufen, nachdem der Import abgeschlossen ist.

Führen Sie hierzu den folgenden Befehl aus:

gcloud storage buckets remove-iam-policy-binding gs://BUCKET_NAME \
  --member=serviceAccount:SQL_SERVICE_ACCOUNT \
  --role=roles/storage.objectAdmin

Ersetzen Sie Folgendes:

  • BUCKET_NAME: Name des Cloud Storage-Bucket. Dies ist der Teil von SNAPSHOTS_URI unmittelbar nach gs://.
  • SQL_SERVICE_ACCOUNT: E-Mail-Adresse des Dienstkontos der Cloud SQL-Instanz. Sie haben sie mit dem vorherigen Befehl abgerufen.

Beispiel:

gcloud storage buckets revoke-iam-policy-binding gs://example-bucket \
  --member=serviceAccount:p231236835740-kw9999@gcp-sa-cloud-sql.iam.gserviceaccount.com \
  --role=roles/storage.objectAdmin

SQL-Abfrage für die importierte Datenbank ausführen

Nach Abschluss des Imports können Sie Abfragen dafür ausführen. Sie können beispielsweise eine Abfrage mit der Google Cloud CLI ausführen.

SQL-Abfrage für die Airflow-Datenbank über einen DAG ausführen

So stellen Sie eine Verbindung zur Airflow-Datenbank her:

  1. Erstellen Sie einen DAG mit einem oder mehreren SQLExecuteQueryOperator-Operatoren. Für den Einstieg können Sie die Beispiel-DAG verwenden.

  2. Geben Sie im Parameter sql des Operators Ihre SQL-Abfrage an.

  3. Laden Sie diesen DAG in Ihre Umgebung hoch.

  4. Lösen Sie die DAG aus. Das kann beispielsweise manuell erfolgen oder Sie warten, bis sie nach Zeitplan ausgeführt wird.

Beispiel-DAG:

import datetime
import os

import airflow
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator

SQL_DATABASE = os.environ["SQL_DATABASE"]

with airflow.DAG(
    "airflow_db_connection_example",
    start_date=datetime.datetime(2025, 1, 1),
    schedule=None,
    catchup=False) as dag:

    SQLExecuteQueryOperator(
        task_id="run_airflow_db_query",
        dag=dag,
        conn_id="airflow_db",
        database=SQL_DATABASE,
        sql="SELECT * FROM dag LIMIT 10;",
    )

Weitere Informationen zur Verwendung von SQLExecuteQueryOperator finden Sie in der Airflow-Dokumentation im How-to Guide for Postgres using SQLExecuteQueryOperator.

Inhalt der Datenbank in einer Dumpdatei sichern und in einen Bucket übertragen

Nächste Schritte