הוספה ועדכון של DAG

Managed Airflow (דור 3) | Managed Airflow (דור 2) | Managed Airflow (דור 1 מדור קודם)

בדף הזה מוסבר איך לנהל DAG בסביבת Managed Airflow.

‫Managed Airflow משתמש בקטגוריה של Cloud Storage כדי לאחסן DAG של סביבת Managed Airflow. הסביבה שלכם מסנכרנת DAGs מהבאקט הזה לרכיבי Airflow כמו Airflow workers ו-schedulers.

לפני שמתחילים

  • מכיוון ש-Apache Airflow לא מספק בידוד חזק של DAG, מומלץ לשמור על סביבות ייצור ובדיקה נפרדות כדי למנוע הפרעות ב-DAG. מידע נוסף זמין במאמר בנושא בדיקת DAG.
  • חשוב לוודא שלחשבון שלכם יש מספיק הרשאות לניהול DAG.
  • השינויים ב-DAG מתעדכנים ב-Airflow תוך 3-5 דקות. אפשר לראות את סטטוס המשימה בממשק המשתמש של Airflow.

גישה לקטגוריה של הסביבה

כדי לגשת לקטגוריה שמשויכת לסביבה שלכם:

המסוף

  1. במסוף Google Cloud , עוברים לדף Environments.

    מעבר אל Environments

  2. ברשימת הסביבות, מחפשים שורה עם שם הסביבה שלכם ובעמודה DAGs folder (תיקיית DAGs) לוחצים על הקישור DAGs. הדף Bucket details נפתח. היא מציגה את התוכן של התיקייה /dags בקטגוריה של הסביבה שלכם.

gcloud

ל-CLI של gcloud יש פקודות נפרדות להוספה ולמחיקה של DAG בדלי של הסביבה.

אם רוצים ליצור אינטראקציה עם הקטגוריה של הסביבה, אפשר גם להשתמש ב-Google Cloud CLI. כדי לקבל את הכתובת של הקטגוריה של הסביבה, מריצים את הפקודה הבאה ב-CLI של gcloud:

gcloud composer environments describe ENVIRONMENT_NAME \
    --location LOCATION \
    --format="get(config.dagGcsPrefix)"

מחליפים את:

  • ENVIRONMENT_NAME בשם הסביבה.
  • LOCATION באזור שבו הסביבה נמצאת.

דוגמה:

gcloud beta composer environments describe example-environment \
    --location us-central1 \
    --format="get(config.dagGcsPrefix)"

API

יוצרים בקשת API של environments.get. במשאב Environment, במשאב EnvironmentConfig, במשאב dagGcsPrefix נמצאת כתובת ה-bucket של הסביבה.

דוגמה:

GET https://composer.googleapis.com/v1/projects/example-project/
locations/us-central1/environments/example-environment

Python

משתמשים בספרייה google-auth כדי לקבל פרטי כניסה, ובספרייה requests כדי לקרוא ל-API בארכיטקטורת REST.

import google.auth
import google.auth.transport.requests

# Authenticate with Google Cloud.
# See: https://cloud.google.com/docs/authentication/getting-started
credentials, _ = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
authed_session = google.auth.transport.requests.AuthorizedSession(credentials)

# project_id = 'YOUR_PROJECT_ID'
# location = 'us-central1'
# composer_environment = 'YOUR_COMPOSER_ENVIRONMENT_NAME'

environment_url = (
    "https://composer.googleapis.com/v1beta1/projects/{}/locations/{}"
    "/environments/{}"
).format(project_id, location, composer_environment)
response = authed_session.request("GET", environment_url)
environment_data = response.json()

# Print the bucket name from the response body.
print(environment_data["config"]["dagGcsPrefix"])

הוספה או עדכון של DAG

כדי להוסיף או לעדכן DAG, מעבירים את קובץ ה-Python‏ .py של ה-DAG לתיקייה /dags בדלי של הסביבה.

המסוף

  1. במסוף Google Cloud , עוברים לדף Environments.

    מעבר אל Environments

  2. ברשימת הסביבות, מחפשים שורה עם שם הסביבה שלכם ובעמודה DAGs folder (תיקיית DAGs) לוחצים על הקישור DAGs. הדף Bucket details נפתח. היא מציגה את התוכן של התיקייה /dags בקטגוריה של הסביבה שלכם.

  3. לוחצים על Upload files. לאחר מכן בוחרים את קובץ ה-Python‏ .py עבור ה-DAG באמצעות תיבת הדו-שיח של הדפדפן ומאשרים.

gcloud

gcloud composer environments storage dags import \
    --environment ENVIRONMENT_NAME \
    --location LOCATION \
    --source="LOCAL_FILE_TO_UPLOAD"

מחליפים את:

  • ENVIRONMENT_NAME בשם הסביבה.
  • LOCATION באזור שבו הסביבה נמצאת.
  • LOCAL_FILE_TO_UPLOAD הוא קובץ ה-Python‏ .py של ה-DAG.

דוגמה:

gcloud composer environments storage dags import \
    --environment example-environment \
    --location us-central1 \
    --source="example_dag.py"

עדכון DAG עם הפעלות DAG פעילות

אם מעדכנים DAG עם הפעלות DAG פעילות:

  • כל המשימות שמופעלות כרגע מסתיימות באמצעות קובץ ה-DAG המקורי.
  • כל המשימות שמתוזמנות אבל לא פועלות כרגע משתמשות בקובץ ה-DAG המעודכן.
  • כל המשימות שכבר לא מופיעות בקובץ ה-DAG המעודכן מסומנות כהוסרו.

עדכון של DAG שפועלים לפי לוח זמנים תדיר

אחרי שמעלים קובץ DAG, עובר זמן עד ש-Airflow טוען את הקובץ הזה ומעדכן את ה-DAG. אם ה-DAG שלכם פועל לפי לוח זמנים קבוע, כדאי לוודא שהוא משתמש בגרסה המעודכנת של קובץ ה-DAG. כדי לעשות את זה:

  1. משהים את ה-DAG בממשק המשתמש של Airflow.

  2. מעלים קובץ DAG מעודכן.

  3. מחכים עד שהעדכונים יופיעו בממשק המשתמש של Airflow. משמעות הדבר היא שה-DAG עבר ניתוח תחבירי נכון על ידי המתזמן ועודכן במסד הנתונים של Airflow.

    אם ממשק המשתמש של Airflow מציג את קובצי ה-DAG המעודכנים, זה לא מבטיח שהעובדים של Airflow קיבלו את הגרסה המעודכנת של קובץ ה-DAG. הסיבה לכך היא שקבצי DAG מסונכרנים באופן עצמאי עבור מתזמנים ועובדים.

  4. כדאי להאריך את זמן ההמתנה כדי לוודא שקובץ ה-DAG מסונכרן עם כל העובדים בסביבה שלכם. הסנכרון מתבצע כמה פעמים בדקה. בסביבה תקינה, מספיק לחכות כ-20-30 שניות כדי שכל העובדים יסתנכרנו.

  5. (אופציונלי) כדי לוודא שכל העובדים קיבלו את הגרסה החדשה של קובץ ה-DAG, בודקים את היומנים של כל עובד בנפרד. כדי לעשות את זה:

    1. פותחים את הכרטיסייה Logs (יומנים) בסביבה במסוף Google Cloud .

    2. עוברים אל Composer logs > Infrastructure > Cloud Storage sync item ובודקים את היומנים של כל עובד בסביבה. מחפשים את Syncing dags directory log item הכי עדכני עם חותמת זמן אחרי שהעליתם את קובץ ה-DAG החדש. אם מופיע פריט Finished syncing אחרי השורה הזו, סימן שה-DAG מסונכרנים בהצלחה בעובד הזה.

  6. מבטלים את ההשהיה של ה-DAG.

ניתוח מחדש של DAG

מכיוון ש-DAG מאוחסנים בקטגוריה של הסביבה, כל DAG מסונכרן קודם למעבד DAG, ואז מעבד ה-DAG מנתח אותו עם השהיה קצרה. אם מנתחים מחדש DAG באופן ידני, למשל דרך ממשק המשתמש של Airflow, מעבד ה-DAG מנתח מחדש את הגרסה הנוכחית של ה-DAG שזמינה לו, ויכול להיות שזו לא הגרסה האחרונה של ה-DAG שהעליתם לבאקט של הסביבה.

מומלץ להשתמש בניתוח מחדש לפי דרישה רק אם נתקלתם בזמני ניתוח ארוכים. לדוגמה, זה יכול לקרות אם בסביבה שלכם יש מספר גדול של קבצים, או אם מרווח ארוך של ניתוח DAG מוגדר באפשרויות ההגדרה של Airflow.

מחיקת DAG בסביבה

כדי למחוק DAG, מסירים את קובץ ה-Python‏ .py של ה-DAG מהתיקייה /dags של הסביבה בדלי של הסביבה.

המסוף

  1. במסוף Google Cloud , עוברים לדף Environments.

    מעבר אל Environments

  2. ברשימת הסביבות, מחפשים שורה עם שם הסביבה שלכם ובעמודה DAGs folder (תיקיית DAGs) לוחצים על הקישור DAGs. הדף Bucket details נפתח. היא מציגה את התוכן של התיקייה /dags בקטגוריה של הסביבה שלכם.

  3. בוחרים את קובץ ה-DAG, לוחצים על מחיקה ומאשרים את הפעולה.

gcloud

gcloud composer environments storage dags delete \
    --environment ENVIRONMENT_NAME \
    --location LOCATION \
    DAG_FILE

מחליפים את:

  • ENVIRONMENT_NAME בשם הסביבה.
  • LOCATION באזור שבו הסביבה נמצאת.
  • DAG_FILE עם קובץ Python ‏.py עבור ה-DAG.

דוגמה:

gcloud composer environments storage dags delete \
    --environment example-environment \
    --location us-central1 \
    example_dag.py

הסרת DAG מממשק המשתמש של Airflow

כדי להסיר את המטא-נתונים של DAG מממשק המשתמש של Airflow:

ממשק המשתמש של Airflow

  1. עוברים לממשק המשתמש של Airflow בסביבה שלכם.
  2. ב-DAG, לוחצים על מחיקת DAG.

gcloud

מריצים את הפקודה הבאה ב-CLI של gcloud:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags delete -- DAG_NAME

מחליפים את:

  • ENVIRONMENT_NAME בשם הסביבה.
  • LOCATION עם האזור שבו הסביבה ממוקמת.
  • DAG_NAME הוא שם ה-DAG שרוצים למחוק.

המאמרים הבאים