בדיקה, סנכרון ופריסה של קובצי DAG מ-GitHub

Managed Airflow (דור 3) | Managed Airflow (דור 2) | Managed Airflow (דור 1 מדור קודם)

במדריך הזה מוסבר איך ליצור צינור CI/CD כדי לבדוק, לסנכרן ולפרוס קובצי DAG בסביבת Managed Airflow ממאגר GitHub.

אם רוצים רק לסנכרן נתונים משירותים אחרים, אפשר לעיין במאמר בנושא העברת נתונים משירותים אחרים.

סקירה כללית על צינור עיבוד נתונים של CI/CD

דיאגרמת ארכיטקטורה שמציגה את השלבים בתהליך. הבדיקה לפני שליחת הקוד והבדיקה של בקשת משיכה מופיעות בקטע GitHub, והסנכרון של DAG והאימות הידני של DAG מופיעים בקטע Google Cloud .
איור 1. דיאגרמת ארכיטקטורה שמציגה את השלבים של התהליך (אפשר ללחוץ כדי להגדיל)

צינור ה-CI/CD שמשמש לבדיקה, לסנכרון ולפריסה של DAG כולל את השלבים הבאים:

  1. מבצעים שינוי ב-DAG ומעבירים את השינוי הזה להסתעפות פיתוח במאגר.

  2. פותחים בקשת משיכה מול הענף הראשי של המאגר.

  3. ‫Cloud Build מריץ בדיקות יחידה כדי לוודא שגרף ה-DAG תקין.

  4. בקשת המיזוג אושרה ומוזגה לענף הראשי של המאגר.

  5. ‫Cloud Build מסנכרן את סביבת Managed Airflow עם השינויים החדשים האלה.

  6. מוודאים ש-DAG פועל כצפוי בסביבת הפיתוח.

  7. אם ה-DAG פועל כמצופה, מעלים אותו לסביבת הייצור של Managed Airflow.

מטרות

  • הפעלת בדיקה אוטומטית לפני שליחת קוד באמצעות Cloud Build. בבדיקה הזו מופעלות בדיקות יחידה ל-DAG.
  • סנכרון של DAG בסביבת הפיתוח של Managed Service for Apache Airflow עם DAG במאגר GitHub.

לפני שמתחילים

  • במדריך הזה אנחנו מניחים שאתם עובדים עם שתי סביבות זהות של Managed Airflow: סביבת פיתוח וסביבת ייצור.

    לצורך המדריך הזה, אתם מגדירים צינור CI/CD רק לסביבת הפיתוח. חשוב לוודא שהסביבה שבה אתם משתמשים היא לא סביבת ייצור.

  • המדריך הזה מתבסס על ההנחה שקובצי ה-DAG והבדיקות שלהם מאוחסנים במאגר GitHub.

    צינור עיבוד הנתונים לדוגמה של CI/CD מדגים את התוכן של מאגר לדוגמה. קובצי ה-DAG והבדיקות מאוחסנים בספרייה dags/, וקובצי הדרישות, קובץ האילוצים וקובצי תצורת ה-build של Cloud Build מאוחסנים ברמה העליונה. כלי הסנכרון של DAG והדרישות שלו נמצאים בספרייה utils.

יצירת משימת בדיקה לפני שליחה ובדיקות יחידה

המשימה הראשונה של Cloud Build מפעילה בדיקה לפני שליחה, שמבצעת בדיקות יחידה ל-DAG.

הוספת בדיקות יחידה

אם עדיין לא עשיתם זאת, כדאי ליצור בדיקות יחידה ל-DAGs. שומרים את הבדיקות האלה לצד הגרפים המכוונים המחזוריים במאגר, כל אחד עם הסיומת _test. לדוגמה, קובץ הבדיקה של ה-DAG ב-example_dag.py הוא example_dag_test.py. אלה הבדיקות שמופעלות כבדיקה לפני שליחת קוד במאגר שלכם.

יצירת הגדרת YAML של Cloud Build לבדיקה לפני שליחת קוד

במאגר, יוצרים קובץ YAML בשם test-dags.cloudbuild.yaml שמגדיר את עבודת ה-Cloud Build לבדיקות לפני שליחה. התהליך כולל שלושה שלבים:

  1. מתקינים את יחסי התלות שנדרשים ל-DAG.
  2. מתקינים את הרכיבים התלויים שנדרשים לבדיקות היחידה.
  3. מריצים את הבדיקות של ה-DAG.

steps:
  # install dependencies
  - name: python:3.8-slim
    entrypoint: pip
    args: ["install", "-r", "requirements.txt", "-c", "constraints.txt", "--user"]

  - name: python:3.8-slim
    entrypoint: pip
    args: ["install", "-r", "requirements-test.txt", "--user"]

  # run in python 3.8 which is latest version in Cloud Composer
  - name: python:3.8-slim
    entrypoint: python3.8
    args: ["-m", "pytest", "-s", "dags/"]

יצירת טריגר לפיתוח גרסת Build לבדיקה לפני שליחת קוד

פועלים לפי המדריך יצירת מאגרים מ-GitHub כדי ליצור טריגר שמבוסס על אפליקציית GitHub עם ההגדרות הבאות:

  • Name (שם): test-dags

  • אירוע: בקשת משיכה

  • מקור – מאגר: בוחרים את המאגר

  • מקור – ענף בסיס: ^main$ (אם צריך, משנים את main לשם של ענף הבסיס במאגר)

  • מקור – בקרת תגובות: לא נדרש

  • Build Configuration (הגדרת build) – קובץ הגדרות של Cloud Build:‏ /test-dags.cloudbuild.yaml (הנתיב לקובץ ה-build)

יצירה של משימת סנכרון של DAG והוספה של סקריפט כלי DAG

בשלב הבא, מגדירים משימה ב-Cloud Build שמריצה סקריפט של כלי DAG. סקריפט השירות בעבודה הזו מסנכרן את ה-DAG עם סביבת Managed Airflow אחרי שהם ממוזגים לענף הראשי במאגר.

הוספת סקריפט כלי השירות של DAG

מוסיפים את סקריפט כלי ה-DAG למאגר. סקריפט השירות הזה מעתיק את כל קובצי ה-DAG בספרייה dags/ של המאגר שלכם לספרייה זמנית, ומתעלם מכל קובצי ה-Python שאינם קובצי DAG. לאחר מכן, הסקריפט משתמש בספריית הלקוח של Cloud Storage כדי להעלות את כל הקבצים מהספרייה הזמנית אל הספרייה dags/ בדלי של סביבת Managed Airflow.

from __future__ import annotations

import argparse
import glob
import os
from shutil import copytree, ignore_patterns
import tempfile

# Imports the Google Cloud client library
from google.cloud import storage


def _create_dags_list(dags_directory: str) -> tuple[str, list[str]]:
    temp_dir = tempfile.mkdtemp()

    # ignore non-DAG Python files
    files_to_ignore = ignore_patterns("__init__.py", "*_test.py")

    # Copy everything but the ignored files to a temp directory
    copytree(dags_directory, f"{temp_dir}/", ignore=files_to_ignore, dirs_exist_ok=True)

    # The only Python files left in our temp directory are DAG files
    # so we can exclude all non Python files
    dags = glob.glob(f"{temp_dir}/*.py")
    return (temp_dir, dags)


def upload_dags_to_composer(
    dags_directory: str, bucket_name: str, name_replacement: str = "dags/"
) -> None:
    """
    Given a directory, this function moves all DAG files from that directory
    to a temporary directory, then uploads all contents of the temporary directory
    to a given cloud storage bucket
    Args:
        dags_directory (str): a fully qualified path to a directory that contains a "dags/" subdirectory
        bucket_name (str): the GCS bucket of the Cloud Composer environment to upload DAGs to
        name_replacement (str, optional): the name of the "dags/" subdirectory that will be used when constructing the temporary directory path name Defaults to "dags/".
    """
    temp_dir, dags = _create_dags_list(dags_directory)

    if len(dags) > 0:
        # Note - the GCS client library does not currently support batch requests on uploads
        # if you have a large number of files, consider using
        # the Python subprocess module to run gcloud storage cp --recursive on your dags
        # See https://cloud.google.com/storage/docs/gsutil/commands/cp for more info
        storage_client = storage.Client()
        bucket = storage_client.bucket(bucket_name)

        for dag in dags:
            # Remove path to temp dir
            dag = dag.replace(f"{temp_dir}/", name_replacement)

            try:
                # Upload to your bucket
                blob = bucket.blob(dag)
                blob.upload_from_filename(dag)
                print(f"File {dag} uploaded to {bucket_name}/{dag}.")
            except FileNotFoundError:
                current_directory = os.listdir()
                print(
                    f"{name_replacement} directory not found in {current_directory}, you may need to override the default value of name_replacement to point to a relative directory"
                )
                raise

    else:
        print("No DAGs to upload.")


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter
    )
    parser.add_argument(
        "--dags_directory",
        help="Relative path to the source directory containing your DAGs",
    )
    parser.add_argument(
        "--dags_bucket",
        help="Name of the DAGs bucket of your Composer environment without the gs:// prefix",
    )

    args = parser.parse_args()

    upload_dags_to_composer(args.dags_directory, args.dags_bucket)

יצירת הגדרת YAML של Cloud Build לסנכרון של DAG

במאגר, יוצרים קובץ YAML בשם add-dags-to-composer.cloudbuild.yaml שמגדיר את עבודת ה-Cloud Build לסנכרון של DAG. התהליך כולל שני שלבים:

  1. מתקינים את יחסי התלות שנדרשים לסקריפט השירות של DAG.

  2. מריצים את סקריפט השירות כדי לסנכרן את הגרפים המכוונים האציקליים (DAG) במאגר עם סביבת Managed Airflow.

steps:
  # install dependencies
  - name: python
    entrypoint: pip
    args: ["install", "-r", "utils/requirements.txt", "--user"]

  # run
  - name: python
    entrypoint: python
    args: ["utils/add_dags_to_composer.py", "--dags_directory=${_DAGS_DIRECTORY}", "--dags_bucket=${_DAGS_BUCKET}"]

יצירת טריגר לפיתוח גרסת Build

פועלים לפי המדריך יצירת מאגרים מ-GitHub כדי ליצור טריגר שמבוסס על אפליקציית GitHub עם ההגדרות הבאות:

  • Name (שם): add-dags-to-composer

  • אירוע: שליחה לענף

  • מקור – מאגר: בוחרים את המאגר

  • מקור – ענף בסיס: ^main$ (אם צריך, משנים את main לשם של ענף הבסיס במאגר)

  • מקור – פילטר של קבצים כלולים (glob): dags/**

  • Build Configuration (הגדרת build) – קובץ הגדרות של Cloud Build:‏ /add-dags-to-composer.cloudbuild.yaml (הנתיב לקובץ ה-build)

בהגדרות המתקדמות, מוסיפים שני משתני החלפה:

  • _DAGS_DIRECTORY – הספרייה שבה נמצאים קובצי ה-DAG במאגר. אם אתם משתמשים במאגר הדוגמאות מהמדריך הזה, הוא dags/.

  • _DAGS_BUCKET – הקטגוריה של Cloud Storage שמכילה את הספרייה dags/ בסביבת הפיתוח של Managed Airflow. לא כוללים את הקידומת gs://. לדוגמה: us-central1-example-env-1234ab56-bucket.

בדיקת צינור עיבוד הנתונים של CI/CD

בקטע הזה, תפעלו לפי תהליך פיתוח של DAG שמשתמש בטריגרים של Cloud Build שיצרתם.

הפעלת משימה לפני שליחה

יוצרים בקשת משיכה לענף הראשי כדי לבדוק את ה-build. מאתרים את הבדיקה לפני השליחה בדף. לוחצים על Details ובוחרים באפשרות View more details on Google Cloud Build כדי לראות את יומני הבנייה במסוףGoogle Cloud .

צילום מסך של בדיקה ב-GitHub בשם test-dags עם חץ אדום שמצביע על שם הפרויקט בסוגריים
איור 2. צילום מסך של סטטוס הבדיקה לפני שליחה ב-GitHub (לחיצה להגדלה)

אם בדיקת השליחה המוקדמת נכשלה, אפשר לעיין במאמר בנושא פתרון בעיות שקשורות לגרסאות build.

אימות הפעולה של ה-DAG בסביבת הפיתוח

אחרי שבקשת המיזוג תאושר, תוכלו למזג אותה עם הענף הראשי. אפשר להשתמש במסוףGoogle Cloud כדי לראות את תוצאות ה-build. אם יש לכם הרבה טריגרים של Cloud Build, אתם יכולים לסנן את הבנייה לפי שם הטריגר add-dags-to-composer.

אחרי שמשימת הסנכרון של Cloud Build מסתיימת בהצלחה, ה-DAG המסונכרן מופיע בסביבת הפיתוח של Managed Airflow. שם תוכלו לוודא ש-DAG פועל כמצופה.

הוספת ה-DAG לסביבת הייצור

אחרי שמוודאים ש-DAG פועל כמו שצריך, מוסיפים אותו ידנית לסביבת הייצור. כדי לעשות זאת, מעלים את קובץ ה-DAG לספרייה dags/ בדלי של סביבת הייצור של Managed Airflow.

אם משימת הסנכרון של ה-DAG נכשלה או אם ה-DAG לא מתנהג כצפוי בסביבת הפיתוח של Managed Airflow, אפשר לעיין במאמר פתרון בעיות שקשורות לגרסאות build.

טיפול בכשלים בבנייה

בקטע הזה מוסבר איך לטפל בתרחישים נפוצים של כשלים בבנייה.

מה קורה אם הבדיקה לפני שליחת המאמר נכשלה?

מתוך בקשת המיזוג, לוחצים על Details ובוחרים באפשרות View more details on Google Cloud Build כדי לראות את יומני הבנייה במסוףGoogle Cloud . אפשר להשתמש ביומנים האלה כדי לנפות באגים ב-DAG. אחרי שפותרים את הבעיות, מבצעים את התיקון ושולחים אותו לענף. הבדיקה לפני השליחה תפעל שוב, ותוכלו להמשיך לבצע איטרציות באמצעות היומנים ככלי לניפוי באגים.

מה קורה אם משימת הסנכרון של DAG נכשלה?

אפשר להשתמש במסוף Google Cloud כדי לראות את תוצאות ה-build. אם יש לכם הרבה טריגרים של Cloud Build, אתם יכולים לסנן את הבנייה לפי שם הטריגר add-dags-to-composer. בודקים את היומנים של משימת ה-build ופותרים את השגיאות. אם אתם צריכים עזרה נוספת בפתרון השגיאות, תוכלו להיעזר בערוצי התמיכה.

מה קורה אם ה-DAG לא פועל כמו שצריך בסביבת Managed Airflow?

אם ה-DAG לא פועל כצפוי בסביבת הפיתוח שלכם ב-Managed Airflow, אל תקדמו את ה-DAG באופן ידני לסביבת הייצור שלכם ב-Managed Airflow. במקום זאת, צריך לבצע אחת מהפעולות הבאות:

  • מבטלים את בקשת המיזוג עם השינויים שגרמו לשבירת ה-DAG כדי לשחזר אותו למצב שהיה לפני השינויים (פעולה זו מבטלת גם את כל הקבצים האחרים בבקשת המיזוג).
  • יוצרים בקשת משיכה חדשה כדי לבטל ידנית את השינויים ב-DAG הפגום.
  • יוצרים בקשת משיכה חדשה כדי לתקן את השגיאות ב-DAG.

אחרי שמבצעים אחד מהשלבים האלה, מופעלת בדיקה חדשה לפני שליחת הקוד, ולאחר המיזוג מופעלת משימת הסנכרון של ה-DAG.

המאמרים הבאים