Managed Airflow (דור 3) | Managed Airflow (דור 2) | Managed Airflow (דור 1 מדור קודם)
בדף הזה מוסבר איך להפעיל את CeleryKubernetesExecutor ב-Managed Airflow ואיך להשתמש ב-KubernetesExecutor ב-DAG.
מידע על CeleryKubernetesExecutor
CeleryKubernetesExecutor הוא סוג של מפעיל שיכול להשתמש ב-CeleryExecutor וב-KubernetesExecutor בו-זמנית. מערכת Airflow בוחרת את ה-executor על סמך התור שמגדירים למשימה. ב-DAG אחד, אפשר להריץ חלק מהמשימות באמצעות CeleryExecutor, ומשימות אחרות באמצעות KubernetesExecutor:
- CeleryExecutor עבר אופטימיזציה לביצוע מהיר וניתן להרחבה של משימות.
- KubernetesExecutor נועד לביצוע משימות שצורכות הרבה משאבים ולהרצת משימות בבידוד.
CeleryKubernetesExecutor ב-Managed Airflow
CeleryKubernetesExecutor ב-Managed Airflow מאפשר להשתמש ב-KubernetesExecutor למשימות. אי אפשר להשתמש ב-KubernetesExecutor ב-Managed Airflow בנפרד מ-CeleryKubernetesExecutor.
Managed Airflow מריץ משימות שאתם מבצעים באמצעות KubernetesExecutor באשכול של הסביבה שלכם, באותו מרחב שמות עם עובדי Airflow. למשימות כאלה יש את אותן הקצאות כמו לעובדים של Airflow, והן יכולות לגשת למשאבים בפרויקט.
משימות שמופעלות באמצעות KubernetesExecutor משתמשות במודל התמחור של Managed Airflow, כי הפודים עם המשימות האלה פועלים באשכול של הסביבה שלכם. מק"טים של Managed Airflow Compute (לשימוש במעבד, בזיכרון ובאחסון) חלים על הפודים האלה.
מומלץ להריץ משימות באמצעות CeleryExecutor במקרים הבאים:
- חשוב להקפיד על זמן ההפעלה של המשימה.
- משימות לא דורשות בידוד בזמן הריצה והן לא צורכות הרבה משאבים.
מומלץ להריץ משימות באמצעות KubernetesExecutor במקרים הבאים:
- המשימות מחייבות בידוד בזמן ריצה. לדוגמה, כדי שהמשימות לא יתחרו על הזיכרון ועל המעבד, כי הן פועלות בתרמילים משלהן.
- הפעולות ב-Tasks דורשות הרבה משאבים, ואתם רוצים לשלוט במשאבי הזיכרון והמעבד (CPU) שזמינים.
ההבדלים בין KubernetesExecutor לבין KubernetesPodOperator
הפעלת משימות באמצעות KubernetesExecutor דומה להפעלת משימות באמצעות KubernetesPodOperator. המשימות מבוצעות ב-pods, ולכן יש בידוד של המשימות ברמת ה-pod וניהול משאבים טוב יותר.
עם זאת, יש כמה הבדלים חשובים:
- KubernetesExecutor מריץ משימות רק במרחב השמות של סביבת Managed Airflow עם ניהול גרסאות. אי אפשר לשנות את מרחב השמות הזה ב-Managed Airflow. אתם יכולים לציין מרחב שמות שבו משימות ה-Pod מופעלות על ידי KubernetesPodOperator.
- KubernetesExecutor יכול להשתמש בכל אופרטור מובנה של Airflow. KubernetesPodOperator מפעיל רק סקריפט שמוגדר על ידי נקודת הכניסה של הקונטיינר.
- KubernetesExecutor משתמש בקובץ האימג' של Docker המנוהל כברירת מחדל ב-Airflow עם אותם Python, ביטולים של אפשרויות הגדרה של Airflow, משתני סביבה וחבילות PyPI שמוגדרים בסביבת Managed Airflow.
מידע על קובצי אימג' של Docker
כברירת מחדל, KubernetesExecutor מפעיל משימות באמצעות אותו קובץ אימג' של Docker שמשמש את Managed Airflow עבור Celery workers. זהו קובץ האימג' של Managed Airflow עבור הסביבה שלכם, עם כל השינויים שציינתם עבור הסביבה, כמו חבילות PyPI בהתאמה אישית או משתני סביבה.
לפני שמתחילים
אפשר להשתמש ב-CeleryKubernetesExecutor ב-Managed Airflow (דור 3).
אי אפשר להשתמש באף מפעיל אחר מלבד CeleryKubernetesExecutor ב-Managed Airflow (דור 3). כלומר, אפשר להריץ משימות באמצעות CeleryExecutor, KubernetesExecutor או שניהם ב-DAG אחד, אבל אי אפשר להגדיר את הסביבה כך שתשתמש רק ב-KubernetesExecutor או ב-CeleryExecutor.
הגדרת CeleryKubernetesExecutor
אולי תרצו להגדיר ערכים חדשים לאפשרויות קיימות בהגדרות של Airflow שקשורות ל-KubernetesExecutor:
[kubernetes]worker_pods_creation_batch_sizeהאפשרות הזו מגדירה את מספר הקריאות ליצירת Kubernetes Worker Pod לכל לולאת תזמון. ערך ברירת המחדל הוא
1, ולכן רק פוד אחד מופעל בכל פעימת לב של מתזמן. אם אתם משתמשים ב-KubernetesExecutor באופן נרחב, מומלץ להגדיל את הערך הזה.[kubernetes]worker_pods_pending_timeoutהאפשרות הזו מגדירה, בשניות, כמה זמן יכולה להיות פעולה במצב
Pending(יצירת Pod) לפני שהיא נחשבת כפעולה שנכשלה. ערך ברירת המחדל הוא 5 דקות.
הפעלת משימות באמצעות KubernetesExecutor או CeleryExecutor
אפשר להריץ משימות באמצעות CeleryExecutor, KubernetesExecutor או שניהם ב-DAG אחד:
- כדי להריץ משימה באמצעות KubernetesExecutor, צריך לציין את הערך
kubernetesבפרמטרqueueשל המשימה. - כדי להריץ משימה באמצעות CeleryExecutor, משמיטים את הפרמטר
queue.
בדוגמה הבאה מריצים את המשימה task-kubernetes באמצעות KubernetesExecutor ואת המשימה task-celery באמצעות CeleryExecutor:
import datetime
import airflow
from airflow.operators.python_operator import PythonOperator
with airflow.DAG(
"composer_sample_celery_kubernetes",
start_date=datetime.datetime(2022, 1, 1),
schedule="@daily") as dag:
def kubernetes_example():
print("This task runs using KubernetesExecutor")
def celery_example():
print("This task runs using CeleryExecutor")
# To run with KubernetesExecutor, set queue to kubernetes
task_kubernetes = PythonOperator(
task_id='task-kubernetes',
python_callable=kubernetes_example,
dag=dag,
queue='kubernetes')
# To run with CeleryExecutor, omit the queue argument
task_celery = PythonOperator(
task_id='task-celery',
python_callable=celery_example,
dag=dag)
task_kubernetes >> task_celery
הרצת פקודות Airflow CLI שקשורות ל-KubernetesExecutor
אפשר להריץ כמה פקודות של Airflow CLI שקשורות ל-KubernetesExecutor באמצעות gcloud.
התאמה אישית של מפרט של פוד עובד
אפשר להתאים אישית את מפרט ה-pod של העובד על ידי העברתו בפרמטר executor_config של משימה. אפשר להשתמש בזה כדי להגדיר דרישות מותאמות אישית של מעבד וזיכרון.
אפשר לבטל את כל מפרט ה-pod של העובד שמשמש להרצת משימה. כדי לאחזר את מפרט ה-pod של משימה שמשמשת את KubernetesExecutor, אפשר להריץ את הפקודה kubernetes generate-dag-yaml של Airflow CLI.
מידע נוסף על התאמה אישית של מפרט ה-pod של העובד זמין במסמכי התיעוד של Airflow.
ב-Managed Airflow (דור 3) יש תמיכה בערכים הבאים של דרישות משאבים:
| משאב | מינימום | מקסימום | שלב |
|---|---|---|---|
| CPU | 0.25 | 32 | ערכי השלבים: 0.25, 0.5, 1, 2, 4, 6, 8, 10 וכן הלאה עד 32. הערכים המבוקשים מעוגלים כלפי מעלה לערך השלב הנתמך הקרוב ביותר (לדוגמה, 5 מעוגל ל-6). |
| זיכרון | 2G (GB) | 128G (GB) | ערכי השלבים: 2, 3, 4, 5 ועד 128. הערכים המבוקשים מעוגלים כלפי מעלה לערך השלב הנתמך הקרוב ביותר (לדוגמה, 3.5G מעוגל ל-4G). |
| אחסון | - | 100G (GB) | כל ערך. אם מבקשים יותר מ-100GB, מקבלים רק 100GB. |
מידע נוסף על יחידות משאבים ב-Kubernetes זמין במאמר יחידות משאבים ב-Kubernetes.
בדוגמה הבאה מוצגת משימה שמשתמשת במפרט בהתאמה אישית של worker pod:
PythonOperator(
task_id='custom-spec-example',
python_callable=f,
dag=dag,
queue='kubernetes',
executor_config={
'pod_override': k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name='base',
resources=k8s.V1ResourceRequirements(requests={
'cpu': '0.5',
'memory': '2G',
})
),
],
),
)
},
)
צפייה ביומני המשימות
יומנים של משימות שהופעלו על ידי KubernetesExecutor זמינים בכרטיסייה Logs, יחד עם יומנים של משימות שהופעלו על ידי CeleryExecutor:
במסוף Google Cloud , עוברים לדף Environments.
ברשימת הסביבות, לוחצים על שם הסביבה. הדף Environment details ייפתח.
עוברים לכרטיסייה יומנים.
עוברים אל All logs (כל היומנים) > Airflow logs (יומני Airflow) > Workers (תהליכי עבודה).
עובדים בשם
airflow-k8s-workerמבצעים משימות של KubernetesExecutor. כדי לחפש יומנים של משימה ספציפית, אפשר להשתמש במזהה DAG או במזהה משימה כמילת מפתח בחיפוש.
המאמרים הבאים
- פתרון בעיות ב-KubernetesExecutor
- שימוש ב-KubernetesPodOperator
- שימוש באופרטורים של GKE
- שינוי של אפשרויות ההגדרה של Airflow