גישה למסד הנתונים של Airflow

Managed Airflow (דור 3) | Managed Airflow (דור 2) | Managed Airflow (דור 1 מדור קודם)

בדף הזה מוסבר איך להתחבר למופע Cloud SQL שמריץ את מסד הנתונים של Airflow בסביבת Managed Airflow ולהריץ שאילתות SQL.

לדוגמה, יכול להיות שתרצו להריץ שאילתות ישירות במסד הנתונים של Airflow, ליצור גיבויים של מסד הנתונים, לאסוף נתונים סטטיסטיים על סמך התוכן של מסד הנתונים או לאחזר מידע מותאם אישית אחר ממסד הנתונים.

לפני שמתחילים

ייצוא התוכן של מסד הנתונים של Airflow למכונת Cloud SQL

הגישה הזו כוללת שמירה של snapshot של סביבה, שמכיל dump של מסד נתונים של Airflow, ואז ייבוא של ה-dump למכונת Cloud SQL. כך תוכלו להריץ שאילתות על תמונת מצב של תוכן מסד הנתונים של Airflow.

אפשר להשתמש בגישה הזו בכל הגרסאות של Airflow שנתמכות על ידי Managed Airflow (דור 3), כולל גרסאות Airflow 3 שחדשות יותר מגרסה 3.1.7, שבהן כבר אי אפשר לגשת ישירות למסד הנתונים של Airflow.

שמירת תמונת מצב של סביבה

מריצים את הפקודה הבאה כדי לשמור תמונת מצב של הסביבה. אחרי ששומרים תמונת מצב, התוצאה של הפעולה תדווח על ה-URI שבו תמונת המצב נשמרה בשדה snapshotPath. תצטרכו להשתמש ב-URI הזה בהמשך.

מידע נוסף על יצירת תמונות מצב זמין במאמר שמירה וטעינה של תמונות מצב של סביבות.

gcloud composer environments snapshots save \
  ENVIRONMENT_NAME \
  --location LOCATION \
  --snapshot-location "SNAPSHOTS_URI"

מחליפים את מה שכתוב בשדות הבאים:

  • ENVIRONMENT_NAME: השם של הסביבה.
  • LOCATION: האזור שבו נמצאת הסביבה.
  • ‫(Optional) SNAPSHOTS_URI עם ה-URI של תיקיית bucket שבה רוצים לאחסן את התמונה. אם לא מציינים את הארגומנט הזה, Managed Airflow שומר את תמונת המצב בתיקייה /snapshots בדלי של הסביבה.

דוגמה:

gcloud composer environments snapshots save \
  example-environment \
  --location us-central1 \
  --snapshot-location "gs://example-bucket/environment_snapshots"

תוצאה לדוגמה:

Response:
'@type': type.googleapis.com/google.cloud.orchestration.airflow.service.v1.SaveSnapshotResponse
snapshotPath: gs://example-bucket/environment_snapshots/example-environment_us-central1_2026-03-17T11-26-24

הכנת מסד הנתונים של היעד

אם אין לכם מכונה של Cloud SQL, אתם צריכים ליצור אחת. במופע הזה יאוחסן מסד הנתונים המיובא.

מריצים את הפקודה הבאה כדי ליצור מכונה של Cloud SQL:

gcloud sql instances create SQL_INSTANCE_NAME \
  --database-version=POSTGRES_15 \
  --cpu=2 \
  --memory=4GB \
  --storage-size=100GB \
  --storage-auto-increase \
  --region=LOCATION \
  --root-password=PASSWORD

מחליפים את מה שכתוב בשדות הבאים:

  • SQL_INSTANCE_NAME: השם של מכונת Cloud SQL.
  • LOCATION: האזור שבו צריך ליצור את המכונה. מומלץ להשתמש באותו אזור כמו בדלי שבו נשמרים קובצי ה-Snapshot.
  • PASSWORD: הסיסמה שבה תשתמשו כדי להתחבר למופע.

דוגמה:

gcloud sql instances create example-instance \
  --database-version=POSTGRES_15 \
  --cpu=2 \
  --memory=4GB \
  --storage-size=100GB \
  --storage-auto-increase \
  --region=us-central1 \
  --root-password=example_password

מריצים את הפקודה הבאה כדי ליצור מסד נתונים בשם airflow_db:

gcloud sql databases create airflow_db \
  --instance=SQL_INSTANCE_NAME

מחליפים את מה שכתוב בשדות הבאים:

  • SQL_INSTANCE_NAME: השם של מכונת Cloud SQL.

דוגמה:

gcloud sql databases create airflow_db \
  --instance=example-instance

הקצאת הרשאות IAM לחשבון השירות של Cloud SQL

בקטגוריה שמכילה את התמונה, מעניקים תפקיד לייבוא נתונים לחשבון השירות של Cloud SQL של מכונת Cloud SQL. מידע נוסף על תפקידי IAM לייבוא נתונים ל-Cloud SQL זמין במאמר ייבוא קובץ SQL מוכן לשימוש ל-Cloud SQL ל-PostgreSQL.

מריצים את הפקודה הבאה כדי לקבל את כתובת האימייל בחשבון השירות של Cloud SQL:

gcloud sql instances describe SQL_INSTANCE_NAME \
  --format="value(serviceAccountEmailAddress)"

מחליפים את מה שכתוב בשדות הבאים:

  • SQL_INSTANCE_NAME: השם של מכונת Cloud SQL.

דוגמה:

gcloud sql instances describe example-instance --format="value(serviceAccountEmailAddress)"

פלט לדוגמה:

p231236835740-kw9999@gcp-sa-cloud-sql.iam.gserviceaccount.com

נותנים הרשאות קריאה לחשבון השירות הזה:

gcloud storage buckets add-iam-policy-binding gs://BUCKET_NAME \
  --member=serviceAccount:SQL_SERVICE_ACCOUNT \
  --role=roles/storage.objectAdmin

מחליפים את מה שכתוב בשדות הבאים:

  • BUCKET_NAME: שם הקטגוריה של Cloud Storage. זה החלק של SNAPSHOTS_URI שמופיע מיד אחרי gs://.
  • SQL_SERVICE_ACCOUNT: כתובת האימייל של חשבון השירות של מופע Cloud SQL. השגתם אותו באמצעות הפקודה הקודמת.

דוגמה:

gcloud storage buckets add-iam-policy-binding gs://example-bucket \
  --member=serviceAccount:p231236835740-kw9999@gcp-sa-cloud-sql.iam.gserviceaccount.com \
  --role=roles/storage.objectAdmin

ייבוא של העתקת מסד הנתונים

מריצים את הפקודה הבאה כדי לייבא את קובץ ה-dump של מסד הנתונים מהתמונה ששמרתם קודם למסד הנתונים airflow_db במכונה של Cloud SQL.

מסד הנתונים airflow_db לא יהיה זמין במהלך תהליך הייבוא.

gcloud sql import sql SQL_INSTANCE_NAME \
  $(gcloud storage ls SNAPSHOTS_URI/*.sql.gz) \
  --database=airflow_db \
  --user=postgres

מחליפים את מה שכתוב בשדות הבאים:

  • SQL_INSTANCE_NAME: השם של מכונת Cloud SQL.
  • SNAPSHOT_PATH עם ה-URI של תיקיית הדלי הספציפית שבה מאוחסן התצלום. ה-URI הזה מוחזר כששומרים תמונת מצב.

דוגמה:

gcloud sql import sql example-instance \
  $(gcloud storage ls gs://example-bucket/environment_snapshots/example-environment_us-central1_2026-03-17T11-26-24/*.sql.gz) \
  --database=airflow_db \
  --user=postgres

(מומלץ) מבטלים את הגישה לדלי אחרי שהייבוא מסתיים

מומלץ לבטל את הרשאות הגישה לקטגוריה של Cloud Storage מחשבון השירות של מכונת Cloud SQL אחרי שהייבוא מסתיים.

כדי לעשות זאת, מריצים את הפקודה הבאה:

gcloud storage buckets remove-iam-policy-binding gs://BUCKET_NAME \
  --member=serviceAccount:SQL_SERVICE_ACCOUNT \
  --role=roles/storage.objectAdmin

מחליפים את מה שכתוב בשדות הבאים:

  • BUCKET_NAME: שם הקטגוריה של Cloud Storage. זה החלק של SNAPSHOTS_URI שמופיע מיד אחרי gs://.
  • SQL_SERVICE_ACCOUNT: כתובת האימייל של חשבון השירות של מופע Cloud SQL. השגתם אותו באמצעות הפקודה הקודמת.

דוגמה:

gcloud storage buckets revoke-iam-policy-binding gs://example-bucket \
  --member=serviceAccount:p231236835740-kw9999@gcp-sa-cloud-sql.iam.gserviceaccount.com \
  --role=roles/storage.objectAdmin

הרצת שאילתת SQL במסד הנתונים המיובא

אחרי שהייבוא מסתיים, אפשר להריץ עליו שאילתות. לדוגמה, אפשר להריץ שאילתה באמצעות Google Cloud CLI.

הרצת שאילתת SQL במסד הנתונים של Airflow מ-DAG

כדי להתחבר למסד הנתונים של Airflow:

  1. יוצרים DAG עם אופרטור אחד או יותר של SQLExecuteQueryOperator. כדי להתחיל, אפשר להשתמש ב-DAG לדוגמה.

  2. בפרמטר sql של האופרטור, מציינים את שאילתת ה-SQL.

  3. מעלים את ה-DAG הזה לסביבה שלכם.

  4. מפעילים את ה-DAG. לדוגמה, אפשר לעשות זאת באופן ידני או להמתין עד שהוא יפעל לפי לוח זמנים.

DAG לדוגמה:

import datetime
import os

import airflow
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator

SQL_DATABASE = os.environ["SQL_DATABASE"]

with airflow.DAG(
    "airflow_db_connection_example",
    start_date=datetime.datetime(2025, 1, 1),
    schedule=None,
    catchup=False) as dag:

    SQLExecuteQueryOperator(
        task_id="run_airflow_db_query",
        dag=dag,
        conn_id="airflow_db",
        database=SQL_DATABASE,
        sql="SELECT * FROM dag LIMIT 10;",
    )

מידע נוסף על השימוש ב-SQLExecuteQueryOperator זמין במדריך לשימוש ב-Postgres באמצעות SQLExecuteQueryOperator במסמכי Airflow.

העתקת התוכן של מסד הנתונים והעברה שלו לקטגוריה

המאמרים הבאים