שימוש ב-CeleryKubernetesExecutor

Managed Airflow (דור 3) | Managed Airflow (דור 2) | Managed Airflow (דור 1 מדור קודם)

בדף הזה מוסבר איך להפעיל את CeleryKubernetesExecutor ב-Managed Airflow ואיך להשתמש ב-KubernetesExecutor ב-DAG.

מידע על CeleryKubernetesExecutor

CeleryKubernetesExecutor הוא סוג של מפעיל שיכול להשתמש ב-CeleryExecutor וב-KubernetesExecutor בו-זמנית. מערכת Airflow בוחרת את ה-executor על סמך התור שמגדירים למשימה. ב-DAG אחד, אפשר להריץ חלק מהמשימות באמצעות CeleryExecutor, ומשימות אחרות באמצעות KubernetesExecutor:

  • ‫CeleryExecutor עבר אופטימיזציה לביצוע מהיר וניתן להרחבה של משימות.
  • ‫KubernetesExecutor נועד לביצוע משימות שצורכות הרבה משאבים ולהרצת משימות בבידוד.

‫CeleryKubernetesExecutor ב-Managed Airflow

‫CeleryKubernetesExecutor ב-Managed Airflow מאפשר להשתמש ב-KubernetesExecutor למשימות. אי אפשר להשתמש ב-KubernetesExecutor ב-Managed Airflow בנפרד מ-CeleryKubernetesExecutor.

‫Managed Airflow מריץ משימות שאתם מבצעים באמצעות KubernetesExecutor באשכול של הסביבה שלכם, באותו מרחב שמות עם עובדי Airflow. למשימות כאלה יש את אותן הקצאות כמו לעובדים של Airflow, והן יכולות לגשת למשאבים בפרויקט.

משימות שמופעלות באמצעות KubernetesExecutor משתמשות במודל התמחור של Managed Airflow, כי הפודים עם המשימות האלה פועלים באשכול של הסביבה שלכם. מק"טים של Managed Airflow Compute (לשימוש במעבד, בזיכרון ובאחסון) חלים על הפודים האלה.

מומלץ להריץ משימות באמצעות CeleryExecutor במקרים הבאים:

  • חשוב להקפיד על זמן ההפעלה של המשימה.
  • משימות לא דורשות בידוד בזמן הריצה והן לא צורכות הרבה משאבים.

מומלץ להריץ משימות באמצעות KubernetesExecutor במקרים הבאים:

  • המשימות מחייבות בידוד בזמן ריצה. לדוגמה, כדי שהמשימות לא יתחרו על הזיכרון ועל המעבד, כי הן פועלות בתרמילים משלהן.
  • הפעולות ב-Tasks דורשות הרבה משאבים, ואתם רוצים לשלוט במשאבי הזיכרון והמעבד (CPU) שזמינים.

ההבדלים בין KubernetesExecutor לבין KubernetesPodOperator

הפעלת משימות באמצעות KubernetesExecutor דומה להפעלת משימות באמצעות KubernetesPodOperator. המשימות מבוצעות ב-pods, ולכן יש בידוד של המשימות ברמת ה-pod וניהול משאבים טוב יותר.

עם זאת, יש כמה הבדלים חשובים:

  • ‫KubernetesExecutor מריץ משימות רק במרחב השמות של סביבת Managed Airflow עם ניהול גרסאות. אי אפשר לשנות את מרחב השמות הזה ב-Managed Airflow. אתם יכולים לציין מרחב שמות שבו משימות ה-Pod מופעלות על ידי KubernetesPodOperator.
  • ‫KubernetesExecutor יכול להשתמש בכל אופרטור מובנה של Airflow. ‫KubernetesPodOperator מפעיל רק סקריפט שמוגדר על ידי נקודת הכניסה של הקונטיינר.
  • ‫KubernetesExecutor משתמש בקובץ האימג' של Docker המנוהל כברירת מחדל ב-Airflow עם אותם Python, ביטולים של אפשרויות הגדרה של Airflow, משתני סביבה וחבילות PyPI שמוגדרים בסביבת Managed Airflow.

מידע על קובצי אימג' של Docker

כברירת מחדל, KubernetesExecutor מפעיל משימות באמצעות אותו קובץ אימג' של Docker שמשמש את Managed Airflow עבור Celery workers. זהו קובץ האימג' של Managed Airflow עבור הסביבה שלכם, עם כל השינויים שציינתם עבור הסביבה, כמו חבילות PyPI בהתאמה אישית או משתני סביבה.

לפני שמתחילים

  • אפשר להשתמש ב-CeleryKubernetesExecutor ב-Managed Airflow (דור 3).

  • אי אפשר להשתמש באף מפעיל אחר מלבד CeleryKubernetesExecutor ב-Managed Airflow (דור 3). כלומר, אפשר להריץ משימות באמצעות CeleryExecutor,‏ KubernetesExecutor או שניהם ב-DAG אחד, אבל אי אפשר להגדיר את הסביבה כך שתשתמש רק ב-KubernetesExecutor או ב-CeleryExecutor.

הגדרת CeleryKubernetesExecutor

אולי תרצו להגדיר ערכים חדשים לאפשרויות קיימות בהגדרות של Airflow שקשורות ל-KubernetesExecutor:

  • [kubernetes]worker_pods_creation_batch_size

    האפשרות הזו מגדירה את מספר הקריאות ליצירת Kubernetes Worker Pod לכל לולאת תזמון. ערך ברירת המחדל הוא 1, ולכן רק פוד אחד מופעל בכל פעימת לב של מתזמן. אם אתם משתמשים ב-KubernetesExecutor באופן נרחב, מומלץ להגדיל את הערך הזה.

  • [kubernetes]worker_pods_pending_timeout

    האפשרות הזו מגדירה, בשניות, כמה זמן יכולה להיות פעולה במצב Pending (יצירת Pod) לפני שהיא נחשבת כפעולה שנכשלה. ערך ברירת המחדל הוא 5 דקות.

הפעלת משימות באמצעות KubernetesExecutor או CeleryExecutor

אפשר להריץ משימות באמצעות CeleryExecutor,‏ KubernetesExecutor או שניהם ב-DAG אחד:

  • כדי להריץ משימה באמצעות KubernetesExecutor, צריך לציין את הערך kubernetes בפרמטר queue של המשימה.
  • כדי להריץ משימה באמצעות CeleryExecutor, משמיטים את הפרמטר queue.

בדוגמה הבאה מריצים את המשימה task-kubernetes באמצעות KubernetesExecutor ואת המשימה task-celery באמצעות CeleryExecutor:

import datetime
import airflow
from airflow.operators.python_operator import PythonOperator

with airflow.DAG(
  "composer_sample_celery_kubernetes",
  start_date=datetime.datetime(2022, 1, 1),
  schedule="@daily") as dag:

  def kubernetes_example():
      print("This task runs using KubernetesExecutor")

  def celery_example():
      print("This task runs using CeleryExecutor")

  # To run with KubernetesExecutor, set queue to kubernetes
  task_kubernetes = PythonOperator(
    task_id='task-kubernetes',
    python_callable=kubernetes_example,
    dag=dag,
    queue='kubernetes')

  # To run with CeleryExecutor, omit the queue argument
  task_celery = PythonOperator(
    task_id='task-celery',
    python_callable=celery_example,
    dag=dag)

  task_kubernetes >> task_celery

הרצת פקודות Airflow CLI שקשורות ל-KubernetesExecutor

אפשר להריץ כמה פקודות של Airflow CLI שקשורות ל-KubernetesExecutor באמצעות gcloud.

התאמה אישית של מפרט של פוד עובד

אפשר להתאים אישית את מפרט ה-pod של העובד על ידי העברתו בפרמטר executor_config של משימה. אפשר להשתמש בזה כדי להגדיר דרישות מותאמות אישית של מעבד וזיכרון.

אפשר לבטל את כל מפרט ה-pod של העובד שמשמש להרצת משימה. כדי לאחזר את מפרט ה-pod של משימה שמשמשת את KubernetesExecutor, אפשר להריץ את הפקודה kubernetes generate-dag-yaml של Airflow CLI.

מידע נוסף על התאמה אישית של מפרט ה-pod של העובד זמין במסמכי התיעוד של Airflow.

ב-Managed Airflow (דור 3) יש תמיכה בערכים הבאים של דרישות משאבים:

משאב מינימום מקסימום שלב
CPU 0.25 32 ערכי השלבים: 0.25,‏ 0.5,‏ 1,‏ 2,‏ 4,‏ 6,‏ 8,‏ 10 וכן הלאה עד 32. הערכים המבוקשים מעוגלים כלפי מעלה לערך השלב הנתמך הקרוב ביותר (לדוגמה, 5 מעוגל ל-6).
זיכרון ‫2G (GB) ‫128G (GB) ערכי השלבים: 2,‏ 3,‏ 4,‏ 5 ועד 128. הערכים המבוקשים מעוגלים כלפי מעלה לערך השלב הנתמך הקרוב ביותר (לדוגמה, 3.5G מעוגל ל-4G).
אחסון - ‫100G (GB) כל ערך. אם מבקשים יותר מ-100GB, מקבלים רק 100GB.

מידע נוסף על יחידות משאבים ב-Kubernetes זמין במאמר יחידות משאבים ב-Kubernetes.

בדוגמה הבאה מוצגת משימה שמשתמשת במפרט בהתאמה אישית של worker pod:

PythonOperator(
    task_id='custom-spec-example',
    python_callable=f,
    dag=dag,
    queue='kubernetes',
    executor_config={
        'pod_override': k8s.V1Pod(
            spec=k8s.V1PodSpec(
                containers=[
                    k8s.V1Container(
                        name='base',
                        resources=k8s.V1ResourceRequirements(requests={
                            'cpu': '0.5',
                            'memory': '2G',
                        })
                    ),
                ],
            ),
        )
    },
)

צפייה ביומני המשימות

יומנים של משימות שהופעלו על ידי KubernetesExecutor זמינים בכרטיסייה Logs, יחד עם יומנים של משימות שהופעלו על ידי CeleryExecutor:

  1. במסוף Google Cloud , עוברים לדף Environments.

    מעבר אל Environments

  2. ברשימת הסביבות, לוחצים על שם הסביבה. הדף Environment details ייפתח.

  3. עוברים לכרטיסייה יומנים.

  4. עוברים אל All logs (כל היומנים) > Airflow logs (יומני Airflow) > Workers (תהליכי עבודה).

  5. עובדים בשם airflow-k8s-worker מבצעים משימות של KubernetesExecutor. כדי לחפש יומנים של משימה ספציפית, אפשר להשתמש במזהה DAG או במזהה משימה כמילת מפתח בחיפוש.

המאמרים הבאים