Auf die Airflow-Datenbank zugreifen

Managed Airflow (Gen 3) | Managed Airflow (Gen 2) | Managed Airflow (Legacy Gen 1)

Auf dieser Seite wird erläutert, wie Sie eine Verbindung zu einer Cloud SQL-Instanz herstellen, die die Airflow-Datenbank Ihrer Managed Airflow Umgebung ausführt, und wie Sie SQL-Abfragen ausführen.

Sie können beispielsweise Abfragen direkt in der Airflow-Datenbank ausführen, Datenbanksicherungen erstellen, Statistiken basierend auf dem Datenbankinhalt erfassen oder andere benutzerdefinierte Informationen aus der Datenbank abrufen.

Hinweis

Airflow-Datenbankinhalte in eine Cloud SQL-Instanz exportieren

Bei diesem Ansatz wird ein Umgebungs-Snapshot gespeichert, der einen Airflow-Datenbank-Dump enthält. Anschließend wird der Dump in eine Cloud SQL-Instanz importiert. Auf diese Weise können Sie Abfragen für einen Snapshot der Airflow-Datenbankinhalte ausführen.

Sie können diesen Ansatz in allen Versionen von Airflow verwenden, die von Managed Airflow (Gen 3) unterstützt werden, einschließlich Airflow 3-Versionen nach 3.1.7, bei denen der direkte Zugriff auf die Airflow-Datenbank nicht mehr möglich ist.

Umgebungs-Snapshot speichern

Führen Sie den folgenden Befehl aus, um einen Snapshot Ihrer Umgebung zu speichern. Nachdem Sie einen Snapshot gespeichert haben, wird im Ergebnis des Vorgangs im Feld snapshotPath der URI angegeben, unter dem der Snapshot gespeichert ist. Sie verwenden diesen URI später.

Weitere Informationen zum Erstellen von Snapshots finden Sie unter Umgebungs-Snapshots speichern und laden.

gcloud composer environments snapshots save \
  ENVIRONMENT_NAME \
  --location LOCATION \
  --snapshot-location "SNAPSHOTS_URI"

Ersetzen Sie Folgendes:

  • ENVIRONMENT_NAME: der Name Ihrer Umgebung
  • LOCATION: die Region, in der sich die Umgebung befindet
  • (Optional) SNAPSHOTS_URI durch den URI eines Bucket-Ordners, in dem der Snapshot gespeichert werden soll Wenn Sie dieses Argument weglassen, speichert Managed Airflow den Snapshot im Ordner /snapshots im Bucket Ihrer Umgebung.

Beispiel:

gcloud composer environments snapshots save \
  example-environment \
  --location us-central1 \
  --snapshot-location "gs://example-bucket/environment_snapshots"

Beispielergebnis:

Response:
'@type': type.googleapis.com/google.cloud.orchestration.airflow.service.v1.SaveSnapshotResponse
snapshotPath: gs://example-bucket/environment_snapshots/example-environment_us-central1_2026-03-17T11-26-24

Ziel-Datenbank vorbereiten

Wenn Sie keine Cloud SQL-Instanz haben, erstellen Sie eine. In dieser Instanz wird die importierte Datenbank gespeichert.

Führen Sie den folgenden Befehl aus, um eine Cloud SQL-Instanz zu erstellen:

gcloud sql instances create SQL_INSTANCE_NAME \
  --database-version=POSTGRES_15 \
  --cpu=2 \
  --memory=4GB \
  --storage-size=100GB \
  --storage-auto-increase \
  --region=LOCATION \
  --root-password=PASSWORD

Ersetzen Sie Folgendes:

  • SQL_INSTANCE_NAME: Name der Cloud SQL-Instanz
  • LOCATION: Region, in der die Instanz erstellt werden muss Wir empfehlen, dieselbe Region wie den Bucket zu verwenden, in dem die Snapshots gespeichert sind.
  • PASSWORD: Passwort, das Sie für die Verbindung zur Instanz verwenden

Beispiel:

gcloud sql instances create example-instance \
  --database-version=POSTGRES_15 \
  --cpu=2 \
  --memory=4GB \
  --storage-size=100GB \
  --storage-auto-increase \
  --region=us-central1 \
  --root-password=example_password

Führen Sie den folgenden Befehl aus, um eine Datenbank mit dem Namen airflow_db zu erstellen:

gcloud sql databases create airflow_db \
  --instance=SQL_INSTANCE_NAME

Ersetzen Sie Folgendes:

  • SQL_INSTANCE_NAME: Name der Cloud SQL-Instanz

Beispiel:

gcloud sql databases create airflow_db \
  --instance=example-instance

Dem Cloud SQL-Dienstkonto IAM-Berechtigungen gewähren

Gewähren Sie im Bucket mit dem Snapshot dem Cloud SQL-Dienstkonto Ihrer Cloud SQL-Instanz eine Rolle zum Importieren von Daten. Weitere Informationen zu IAM-Rollen für den Import von Daten in Cloud SQL finden Sie unter SQL-Dumpdatei in Cloud SQL for PostgreSQL importieren.

Führen Sie den folgenden Befehl aus, um die E-Mail-Adresse des Cloud SQL-Dienstkontos abzurufen:

gcloud sql instances describe SQL_INSTANCE_NAME \
  --format="value(serviceAccountEmailAddress)"

Ersetzen Sie Folgendes:

  • SQL_INSTANCE_NAME: Name der Cloud SQL-Instanz

Beispiel:

gcloud sql instances describe example-instance --format="value(serviceAccountEmailAddress)"

Beispielausgabe:

p231236835740-kw9999@gcp-sa-cloud-sql.iam.gserviceaccount.com

Gewähren Sie diesem Dienstkonto Leseberechtigungen:

gcloud storage buckets add-iam-policy-binding gs://BUCKET_NAME \
  --member=serviceAccount:SQL_SERVICE_ACCOUNT \
  --role=roles/storage.objectAdmin

Ersetzen Sie Folgendes:

  • BUCKET_NAME: Name des Cloud Storage-Bucket Dies ist der Teil von SNAPSHOTS_URI direkt nach gs://.
  • SQL_SERVICE_ACCOUNT: E-Mail-Adresse des Dienstkontos der Cloud SQL-Instanz Sie haben sie mit dem vorherigen Befehl abgerufen.

Beispiel:

gcloud storage buckets add-iam-policy-binding gs://example-bucket \
  --member=serviceAccount:p231236835740-kw9999@gcp-sa-cloud-sql.iam.gserviceaccount.com \
  --role=roles/storage.objectAdmin

Datenbank-Dump importieren

Führen Sie den folgenden Befehl aus, um die Datenbank-Dumpdatei aus dem zuvor gespeicherten Snapshot in die Datenbank airflow_db Ihrer Cloud SQL-Instanz zu importieren.

Die Datenbank airflow_db ist während des Imports nicht verfügbar.

gcloud sql import sql SQL_INSTANCE_NAME \
  $(gcloud storage ls SNAPSHOTS_URI/*.sql.gz) \
  --database=airflow_db \
  --user=postgres

Ersetzen Sie Folgendes:

  • SQL_INSTANCE_NAME: Name der Cloud SQL-Instanz
  • SNAPSHOT_PATH durch den URI des Bucket-Ordners, in dem der Snapshot gespeichert ist Dieser URI wird zurückgegeben wenn Sie einen Snapshot speichern.

Beispiel:

gcloud sql import sql example-instance \
  $(gcloud storage ls gs://example-bucket/environment_snapshots/example-environment_us-central1_2026-03-17T11-26-24/*.sql.gz) \
  --database=airflow_db \
  --user=postgres

(Empfohlen) Bucket-Zugriff nach Abschluss des Imports widerrufen

Wir empfehlen, die Berechtigungen für den Zugriff auf den Cloud Storage-Bucket für das Dienstkonto Ihrer Cloud SQL-Instanz zu widerrufen, nachdem der Import abgeschlossen ist.

Führen Sie hierzu den folgenden Befehl aus:

gcloud storage buckets remove-iam-policy-binding gs://BUCKET_NAME \
  --member=serviceAccount:SQL_SERVICE_ACCOUNT \
  --role=roles/storage.objectAdmin

Ersetzen Sie Folgendes:

  • BUCKET_NAME: Name des Cloud Storage-Bucket Dies ist der Teil von SNAPSHOTS_URI direkt nach gs://.
  • SQL_SERVICE_ACCOUNT: E-Mail-Adresse des Dienstkontos der Cloud SQL-Instanz Sie haben sie mit dem vorherigen Befehl abgerufen.

Beispiel:

gcloud storage buckets revoke-iam-policy-binding gs://example-bucket \
  --member=serviceAccount:p231236835740-kw9999@gcp-sa-cloud-sql.iam.gserviceaccount.com \
  --role=roles/storage.objectAdmin

SQL-Abfrage für die importierte Datenbank ausführen

Nach Abschluss des Imports können Sie Abfragen ausführen. Sie können beispielsweise eine Abfrage mit der Google Cloud CLI ausführen .

SQL-Abfrage für die Airflow-Datenbank aus einem DAG ausführen

So stellen Sie eine Verbindung zur Airflow-Datenbank her:

  1. Erstellen Sie einen DAG mit einem oder mehreren SQLExecuteQueryOperator-Operatoren. Verwenden Sie dazu den Beispiel-DAG.

  2. Geben Sie im Parameter sql des Operators Ihre SQL-Abfrage an.

  3. Laden Sie diesen DAG in Ihre Umgebung hoch.

  4. Lösen Sie den DAG aus. Sie können dies beispielsweise manuell tun oder warten, bis er nach Zeitplan ausgeführt wird.

Beispiel-DAG:

import datetime
import os

import airflow
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator

SQL_DATABASE = os.environ["SQL_DATABASE"]

with airflow.DAG(
    "airflow_db_connection_example",
    start_date=datetime.datetime(2025, 1, 1),
    schedule=None,
    catchup=False) as dag:

    SQLExecuteQueryOperator(
        task_id="run_airflow_db_query",
        dag=dag,
        conn_id="airflow_db",
        database=SQL_DATABASE,
        sql="SELECT * FROM dag LIMIT 10;",
    )

Weitere Informationen zur Verwendung von SQLExecuteQueryOperator finden Sie in der Anleitung für Postgres mit SQLExecuteQueryOperator in der Airflow-Dokumentation.

Inhalt der Datenbank in einer Dumpdatei sichern und in einen Bucket übertragen

Nächste Schritte