אחרי שהמשימות נכנסות לתור משיכות, עובד יכול לשכור אותן. אחרי שהמשימות עוברות עיבוד, העובד חייב למחוק אותן.
לפני שמתחילים
- יוצרים תור משיכה.
- יצירת משימות והוספה שלהן לתור משיכה.
הקשר חשוב
- השיטה הזו רלוונטית רק ל-workers שפועלים בשירות בסביבה הרגילה.
- כשמשתמשים בתורים מסוג pull, אתם אחראים להרחבת מספר העובדים בהתאם לנפח העיבוד.
משימות שקשורות לליסינג
אחרי שהמשימות נכנסות לתור, אפשר להשכיר אחת או יותר מהן באמצעות השיטה
lease_tasks()
. יכול להיות שיחול עיכוב קצר עד שמשימות שנוספו לאחרונה באמצעותadd() יהיו זמינות דרךlease_tasks().
כשמבקשים חכירה, מציינים את מספר המשימות לחכירה (עד 1,000 משימות) ואת משך החכירה בשניות (עד שבוע). משך השכירות צריך להיות ארוך מספיק כדי להבטיח שהמשימה הכי איטית תסתיים לפני שתקופת השכירות תסתיים. אפשר לשנות את ההקצאה של משימה באמצעות
modify_task_lease()
.
השכרת משימה הופכת אותה ללא זמינה לעיבוד על ידי עובד אחר, והיא נשארת לא זמינה עד שתוקף ההשכרה פג.
השיטה
lease_tasks()
מחזירה אובייקט Task שמכיל רשימה של משימות שהושכרו מהתור.
דוגמת הקוד הבאה שוכרת 100 משימות מהתור pull-queue למשך שעה אחת:
from google.appengine.api import taskqueue
q = taskqueue.Queue('pull-queue')
q.lease_tasks(3600, 100)
שימוש בתגי משימות כדי לבצע פעולות בקבוצות
לא כל המשימות זהות. הקוד יכול 'לתייג' משימות ואז לבחור משימות להשכרה לפי תג. התג פועל כמסנן.
from google.appengine.api import taskqueue
q = taskqueue.Queue('pull-queue')
q.add(taskqueue.Task(payload='parse1', method='PULL', tag='parse'))
q.add(taskqueue.Task(payload='parse2', method='PULL', tag='parse'))
q.add(taskqueue.Task(payload='render1', method='PULL', tag='render'))
q.add(taskqueue.Task(payload='render2', method='PULL', tag='render'))
q.lease_tasks_by_tag(3600, 100, 'render') # leases render tasks, but not parse
q.lease_tasks_by_tag(3600, 100) # Leases up to 100 tasks that have same tag.
הסדרת קצב העברת הנתונים
תהליך העבודה שבודק את התור כדי להקצות משימות צריך לזהות אם הוא מנסה להקצות משימות מהר יותר מהקצב שבו התור יכול לספק אותן. אם הכשל הזה מתרחש, יכולות להיווצר החריגות הבאות מ-
lease_tasks()
:
- `google.appengine.api.taskqueue.TransientError`
- `google.appengine.runtime.apiproxy_errors.DeadlineExceededError`
הקוד שלכם צריך לזהות את החריגים האלה, להפסיק את הקריאה ל-
lease_tasks()
,
ולנסות שוב מאוחר יותר. כדי להימנע מהבעיה הזו, כדאי להגדיר ערך גבוה יותר של RPC
deadline כשקוראים ל-
lease_tasks()
. בנוסף, צריך להפסיק את הקריאה כשבקשת חכירה מחזירה רשימה ריקה של משימות.
אם יוצרים יותר מ-10 בקשות LeaseTasks לכל תור בשנייה, רק 10 הבקשות הראשונות יחזירו תוצאות. אם מספר הבקשות חורג מהמגבלה הזו, מוחזרת השגיאה OK עם אפס תוצאות.
מעקב אחרי משימות במסוף Google Cloud
כדי לראות מידע על כל המשימות והתורים באפליקציה:
פותחים את הדף Cloud Tasks במסוף Google Cloud ומחפשים את הערך Pull בעמודה Type.
לוחצים על שם התור שבו אתם מעוניינים, פעולה הפותחת את דף הפרטים של התור. בטבלה מוצגות כל המשימות בתור שנבחר.
מחיקת משימות
אחרי שעובד משלים משימה, הוא צריך למחוק אותה מהתור. אם אתם רואים משימות שנשארו בתור אחרי שעובד סיים לעבד אותן, סביר להניח שהעובד נכשל. במקרה כזה, משימות יעובדו על ידי עובד אחר.
כדי למחוק רשימת משימות, כמו הרשימה שמוחזרת באמצעות
lease_tasks()
,
פשוט מעבירים אותה אל delete_tasks():
from google.appengine.api import taskqueue
q = taskqueue.Queue('pull-queue')
tasks = q.lease_tasks(3600, 100)
# Perform some work with the tasks here
q.delete_tasks(tasks)
דוגמה מלאה לתורי משיכה
דוגמה פשוטה אבל מלאה מקצה לקצה לשימוש בתורי שליפה ב-Python מופיעה במאמר appengine-pullqueue-counter.