אחרי שהמשימות נכנסות לתור משיכות, עובד יכול לשכור אותן. אחרי שהמשימות מעובדות, העובד צריך למחוק אותן.
לפני שמתחילים
- יוצרים תור משיכה.
- יצירת משימות והוספה שלהן לתור משיכה.
הקשר חשוב
- השיטה הזו רלוונטית רק ל-workers שפועלים במסגרת שירות בסביבה הרגילה.
- כשמשתמשים בתורים מסוג pull, אתם אחראים להרחבת מספר העובדים בהתאם לנפח העיבוד.
משימות שקשורות להשכרה
אחרי שהמשימות נכנסות לתור, עובד יכול לשכור אחת או יותר מהן באמצעות השיטה
lease_tasks()
. יכול להיות שיהיה עיכוב קצר עד שהמשימות שנוספו לאחרונה באמצעות
add()
יהיו זמינות באמצעות
lease_tasks()
.
כשמבקשים חכירה, מציינים את מספר המשימות לחכירה (עד 1,000 משימות) ואת משך החכירה בשניות (עד שבוע). משך השכירות צריך להיות ארוך מספיק כדי להבטיח שהמשימה הכי איטית תסתיים לפני שתקופת השכירות תסתיים. אפשר לשנות את ההקצאה של משימה באמצעות
modify_task_lease()
.
השכרת משימה הופכת אותה ללא זמינה לעיבוד על ידי עובד אחר, והיא נשארת לא זמינה עד שתוקף ההשכרה פג.
השיטה
lease_tasks()
מחזירה אובייקט Task שמכיל רשימה של משימות שהושכרו מהתור.
דוגמת הקוד הבאה שוכרת 100 משימות מהתור pull-queue למשך שעה אחת:
from google.appengine.api import taskqueue
q = taskqueue.Queue('pull-queue')
q.lease_tasks(3600, 100)
שימוש בתגי משימות כדי לבצע פעולות בכמה משימות בו-זמנית
לא כל המשימות דומות. הקוד יכול 'לתייג' משימות ואז לבחור משימות להשכרה לפי תג. התג פועל כמסנן.
from google.appengine.api import taskqueue
q = taskqueue.Queue('pull-queue')
q.add(taskqueue.Task(payload='parse1', method='PULL', tag='parse'))
q.add(taskqueue.Task(payload='parse2', method='PULL', tag='parse'))
q.add(taskqueue.Task(payload='render1', method='PULL', tag='render'))
q.add(taskqueue.Task(payload='render2', method='PULL', tag='render'))
q.lease_tasks_by_tag(3600, 100, 'render') # leases render tasks, but not parse
q.lease_tasks_by_tag(3600, 100) # Leases up to 100 tasks that have same tag.
הסדרת קצב הדגימה
עובדים ששולחים בקשות חוזרות ונשנות לתור כדי לקבל משימות להשכרה צריכים לזהות אם הם מנסים להשכיר משימות מהר יותר מהקצב שבו התור יכול לספק אותן. אם הכשל הזה
מתרחש, יכולות להיווצר החריגות הבאות מ-
lease_tasks()
:
- `google.appengine.api.taskqueue.TransientError`
- `google.appengine.runtime.apiproxy_errors.DeadlineExceededError`
הקוד צריך לטפל בחריגים האלה, להפסיק את הקריאה ל-
lease_tasks()
,
ולנסות שוב מאוחר יותר. כדי להימנע מהבעיה הזו, כדאי להגדיר ערך גבוה יותר של זמן קצוב לתפוגה של RPC כשקוראים ל-
lease_tasks()
. כדאי גם להפחית את קצב השליחה אם בקשת השכרה מחזירה רשימה ריקה של משימות.
אם תשלחו יותר מ-10 בקשות LeaseTasks לתור בשנייה, רק 10 הבקשות הראשונות יחזירו תוצאות. אם הבקשות חורגות מהמגבלה הזו, מוחזרת השגיאה OK בלי תוצאות.
מעקב אחרי משימות במסוף Google Cloud
כדי לראות מידע על כל המשימות והתורים באפליקציה:
פותחים את הדף Cloud Tasks במסוף Google Cloud ומחפשים את הערך Pull בעמודה Type.
לוחצים על שם התור הרצוי כדי לפתוח את דף הפרטים שלו. בטבלה מוצגות כל המשימות בתור שנבחר.
מחיקת משימות
אחרי שעובד מסיים משימה, הוא צריך למחוק אותה מהתור. אם אתם רואים שמשימות נשארו בתור אחרי שעובד סיים לעבד אותן, סביר להניח שהעובד נכשל. במקרה כזה, משימות יעובדו על ידי עובד אחר.
אפשר למחוק רשימת משימות, כמו הרשימה שמוחזרת באמצעות
lease_tasks()
,
פשוט על ידי העברתה אל delete_tasks():
from google.appengine.api import taskqueue
q = taskqueue.Queue('pull-queue')
tasks = q.lease_tasks(3600, 100)
# Perform some work with the tasks here
q.delete_tasks(tasks)
דוגמה מלאה לתורים מסוג pull
דוגמה פשוטה אבל מלאה מקצה לקצה לשימוש בתורי שליפה ב-Python מופיעה ב-appengine-pullqueue-counter.