גישה לממשק שורת הפקודה (CLI) של Airflow

Managed Airflow (דור 3) | Managed Airflow (דור 2) | Managed Airflow (דור 1 מדור קודם)

ל-Apache Airflow יש ממשק שורת פקודה (CLI) שבעזרתו אפשר לבצע משימות כמו הפעלה וניהול של DAG, קבלת מידע על הפעלות ומשימות של DAG, הוספה ומחיקה של חיבורים ומשתמשים.

פקודות נתמכות ב-Airflow CLI

‫Airflow משתמש בתחביר של Airflow CLI, שמתואר במסמכי התיעוד של Airflow.

במאמר העזרה בנושא הפקודה gcloud composer environments run מופיעה רשימה מלאה של פקודות Airflow CLI נתמכות.

לפני שמתחילים

כדי להריץ פקודות Airflow CLI דרך Google Cloud CLI:

  • לחשבון Google שלכם צריכות להיות הרשאות לשימוש ב-Google Cloud CLI עם Managed Airflow ולהרצת פקודות Airflow CLI.

  • פקודות Airflow CLI שמופעלות דרך Google Cloud CLI צורכות את environments.executeAirflowCommand המכסה.

  • בגרסאות Managed Airflow לפני גרסה 2.4.0, צריך גישה למישור הבקרה של אשכול הסביבה כדי להריץ פקודות Airflow CLI.

הרצת פקודות Airflow CLI באמצעות ה-CLI של gcloud

כדי להריץ פקודות של Airflow CLI בסביבות שלכם, משתמשים ב-CLI של gcloud:

gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    SUBCOMMAND \
    -- SUBCOMMAND_ARGUMENTS

מחליפים את מה שכתוב בשדות הבאים:

  • ENVIRONMENT_NAME: השם של הסביבה.
  • LOCATION: האזור שבו נמצאת הסביבה.
  • SUBCOMMAND: אחת מהפקודות הנתמכות של Airflow CLI.
  • SUBCOMMAND_ARGUMENTS עם ארגומנטים לפקודת Airflow CLI.

מפריד של ארגומנטים של פקודות משנה

מפרידים בין הארגומנטים של פקודת Airflow CLI שצוינה באמצעות --:

  • מציינים פקודות מורכבות ב-CLI כפקודת משנה.
  • אם מזינים פקודות מורכבות, צריך לציין את הארגומנטים שלהן כארגומנטים של פקודות משנה, אחרי המפריד --.

דוגמה:

gcloud composer environments run example-environment \
    dags list -- --output=json

מיקום ברירת המחדל

רוב הפקודות של gcloud composer דורשות מיקום. אפשר לציין את המיקום באמצעות הדגל --location או על ידי הגדרת מיקום ברירת מחדל.

לדוגמה, כדי להפעיל DAG בשם sample_quickstart עם המזהה 5077 בסביבת Managed Airflow:

gcloud composer environments run example-environment \
    --location us-central1 dags trigger -- sample_quickstart \
    --run-id=5077

הרצת פקודות בסביבת IP פרטית

בגרסאות של Managed Airflow לפני 2.4.0:

כדי להריץ פקודות Airflow CLI בסביבת IP פרטית, צריך להריץ אותן במכונה שיש לה גישה לנקודת הקצה של מישור הבקרה של אשכול GKE. האפשרויות עשויות להשתנות בהתאם להגדרות של האשכול הפרטי.

אם הגישה לנקודת קצה ציבורית מושבתת באשכול של הסביבה, אי אפשר להשתמש בפקודות gcloud composer כדי להריץ את Airflow CLI. כדי להריץ פקודות של Airflow CLI, מבצעים את השלבים הבאים:

  1. יצירת מכונה וירטואלית ברשת ה-VPC
  2. קבלת פרטי כניסה לאשכול. מריצים את הפקודה הבאה:

    gcloud container clusters get-credentials CLUSTER_NAME \
      --region REGION \
      --project PROJECT \
      --internal-ip
    

משתמשים ב-kubectl כדי להריץ את פקודת Airflow. לדוגמה:

kubectl exec deployment/airflow-scheduler -n COMPOSER_NAMESPACE \
  --container airflow-scheduler -- airflow dags list

מחליפים את COMPOSER_NAMESPACE במרחב שמות שדומה לזה: composer-2-0-28-airflow-2-3-394zxc12411. אפשר למצוא את Managed Airflow ברשימת עומסי העבודה או באמצעות הפקודה kubectl get namespaces.

אם הגישה לנקודת קצה ציבורית מופעלת באשכול של הסביבה שלכם, אתם יכולים גם להריץ פקודות Airflow CLI ממכונה עם כתובת IP חיצונית שנוספה לרשתות מורשות. כדי לאפשר גישה מהמחשב, צריך להוסיף את הכתובת החיצונית של המחשב לרשימת הרשתות המורשות בסביבה.

כשמריצים פקודות של gcloud composer environments run או kubectl, יכול להיות שתיתקלו בשגיאה הבאה:

Get "https://<IP Address>/api?timeout=32s": dial tcp <IP Address>:443: i/o timeout"

הסימפטום: הודעת השגיאה הזו מציינת שאין קישוריות לרשת מהמחשב שבו מריצים את הפקודות האלה.

פתרון: פועלים לפי ההנחיות שמפורטות בקטע הפעלת פקודות בסביבת IP פרטית או לפי ההוראות שמפורטות בקטע הפקודה kubectl נכשלת בגלל פסק זמן.

הפעלת פקודות Airflow CLI דרך Cloud Composer API

החל מגרסה 2.4.0 של Managed Airflow, אפשר להריץ פקודות Airflow CLI דרך Cloud Composer API.

הרצת פקודה

יוצרים בקשת API של environments.executeAirflowCommand:

{
  "environment": "projects/PROJECT_ID/locations/LOCATION/environments/ENVIRONMENT_NAME",
  "command": "AIRFLOW_COMMAND",
  "subcommand": "AIRFLOW_SUBCOMMAND",
  "parameters": [
    "SUBCOMMAND_PARAMETER"
  ]
}

מחליפים את מה שכתוב בשדות הבאים:

  • PROJECT_ID: מזהה הפרויקט.
  • LOCATION: האזור שבו נמצאת הסביבה.
  • ENVIRONMENT_NAME: השם של הסביבה.
  • AIRFLOW_COMMAND: פקודת Airflow CLI שרוצים להריץ, כמו dags.
  • AIRFLOW_SUBCOMMAND: פקודת משנה של פקודת Airflow CLI שרוצים להריץ, כמו list.
  • (אופציונלי) SUBCOMMAND_PARAMETER: פרמטרים של פקודת המשנה. אם רוצים להשתמש ביותר מפרמטר אחד, מוסיפים עוד פריטים לרשימה.

דוגמה:

// POST https://composer.googleapis.com/v1/{environment=projects/*/locations/*/environments/*}:executeAirflowCommand
{
  "environment": "projects/example-project/locations/us-central1/environments/example-environment",
  "command": "dags",
  "subcommand": "list",
  "parameters": [
    "-o json",
    "--verbose"
  ]
}

בדיקת סטטוס הפקודה

אחרי שמריצים פקודת Airflow CLI דרך Cloud Composer API, בודקים אם הפקודה הושלמה בהצלחה על ידי שליחת בקשת PollAirflowCommand ובדיקת השדות ב-exitInfo כדי לראות אם יש שגיאות וקודי סטטוס. השדה output מכיל שורות ביומן.

כדי לקבל את סטטוס הביצוע של הפקודה ולאחזר יומנים, צריך לספק את הערכים executionId, pod ו-podNamespace שמוחזרים על ידי ExecuteAirflowCommandRequest:

דוגמה:

// POST https://composer.googleapis.com/v1/{environment=projects/*/locations/*/environments/*}:pollAirflowCommand
{
  "executionId": "39b82312-3a19-4d21-abac-7f8f19855ce7",
  "pod": "airflow-scheduler-1327d8cd68-hblpd",
  "podNamespace": "composer-2-4-0-airflow-2-5-3-184dadaf",
  "nextLineNumber": 1
}

המאמרים הבאים