Dataflow תומך בעיבוד של רשומות בדיוק פעם אחת. בדף הזה מוסבר איך Dataflow מטמיע עיבוד בדיוק פעם אחת, וגם מבטיח חביון נמוך.
סקירה כללית
צינורות עיבוד באצווה תמיד משתמשים בעיבוד של פעם אחת בדיוק. צינורות להזרמת נתונים משתמשים כברירת מחדל בעיבוד בדיוק פעם אחת, אבל יכולים להשתמש גם בעיבוד לפחות פעם אחת.
עיבוד בדיוק פעם אחת מספק ערבויות לגבי תוצאות העיבוד של רשומות, כולל התוצאות מכל שלב בצינור. באופן ספציפי, לכל רשומה שמגיעה לצנרת ממקור, או שמגיעה לשלב משלב קודם, Dataflow מבטיח את הדברים הבאים:
- הרשומה מעובדת ולא אבדה.
- כל תוצאות העיבוד שנשארות בצינור העיבוד משתקפות לכל היותר פעם אחת.
במילים אחרות, הרשומות מעובדות לפחות פעם אחת, והתוצאות נשמרות בדיוק פעם אחת.
העיבוד בדיוק פעם אחת מבטיח שהתוצאות יהיו מדויקות, בלי רשומות כפולות בפלט. Dataflow עובר אופטימיזציה כדי למזער את זמן האחזור תוך שמירה על סמנטיקה של בדיוק פעם אחת. עם זאת, עיבוד של פעם אחת בדיוק עדיין כרוך בעלות על ביצוע ביטול כפילויות. בתרחישי שימוש שבהם אפשר לסבול רשומות כפולות, אפשר בדרך כלל להפחית את העלות ולשפר את זמן האחזור על ידי הפעלת מצב 'לפחות פעם אחת'. מידע נוסף על בחירה בין סטרימינג של בדיוק פעם אחת לבין סטרימינג של לפחות פעם אחת זמין במאמר בנושא הגדרת מצב הסטרימינג של צינור הנתונים.
נתונים מאוחרים
עיבוד של כל רשומה בדיוק פעם אחת מבטיח את הדיוק של צינור עיבוד הנתונים: אם צינור עיבוד הנתונים מעבד רשומה, מערכת Dataflow מוודאת שהרשומה משתקפת בפלט, ושהרשומה לא משוכפלת.
עם זאת, בצינור עיבוד נתונים בסטרימינג, עיבוד בדיוק פעם אחת לא יכול להבטיח שהתוצאות יהיו מלאות, כי יכול להיות שהרשומות יגיעו באיחור. לדוגמה,
נניח שצינור הנתונים מבצע צבירה על חלון זמן, כמו
Count. בעיבוד בדיוק פעם אחת, התוצאה מדויקת לגבי הרשומות שמגיעות בזמן במהלך חלון הזמן, אבל יכול להיות שרשומות שמגיעות באיחור ייפסלו.
באופן כללי, אין דרך להבטיח שלמות בצינור נתונים של סטרימינג, כי באופן תיאורטי, רשומות יכולות להגיע באיחור. במקרה הקיצוני, תצטרכו לחכות לנצח כדי לקבל תוצאה. מבחינה מעשית, Apache Beam מאפשרת להגדיר את הסף להסרת נתונים מאוחרים ואת הזמן להפקת תוצאות מצטברות. מידע נוסף זמין במאמר בנושא סימני מים ונתונים מאוחרים במסמכי התיעוד של Apache Beam.
תופעות לוואי
אין ערובה לכך שתופעות הלוואי יפעלו בדיוק פעם אחת. חשוב לציין שהדבר כולל כתיבת פלט למאגר חיצוני, אלא אם יעד הכתיבה מיישם גם סמנטיקה של בדיוק פעם אחת.
במילים אחרות, Dataflow לא מבטיח שכל רשומה תעבור כל טרנספורמציה בדיוק פעם אחת. בגלל ניסיונות חוזרים או כשלים של עובדים, יכול להיות ש-Dataflow ישלח רשומה דרך טרנספורמציה כמה פעמים, או אפילו בו-זמנית בכמה עובדים.
כחלק מעיבוד בדיוק פעם אחת, Dataflow מסיר כפילויות מהפלט. עם זאת, אם לקוד בטרנספורמציה יש תופעות לוואי, יכול להיות שהתופעות האלה יתרחשו כמה פעמים. לדוגמה, אם טרנספורמציה מבצעת קריאה לשירות מרוחק, יכול להיות שהקריאה הזו תתבצע כמה פעמים לאותה רשומה. תופעות לוואי יכולות אפילו לגרום לאובדן נתונים במצבים מסוימים. לדוגמה, נניח שטרנספורמציה קוראת קובץ כדי ליצור פלט, ואז מוחקת את הקובץ באופן מיידי בלי לחכות שהפלט יאושר. אם מתרחשת שגיאה כשמבצעים commit לתוצאה, Dataflow מנסה שוב את הטרנספורמציה, אבל עכשיו הטרנספורמציה לא יכולה לקרוא את הקובץ שנמחק.
רישום ביומן
פלט היומן מהעיבוד מציין שהעיבוד התרחש, אבל לא מציין אם הנתונים נשמרו. לכן, יכול להיות שבקובצי היומן יצוין שהנתונים עובדו כמה פעמים, למרות שהתוצאות של הנתונים שעובדו נשמרות באחסון קבוע רק פעם אחת. בנוסף, היומנים לא תמיד משקפים נתונים שעברו עיבוד ונשמרו. יכול להיות שיוסרו יומנים בגלל הגבלת קצב העברת הנתונים או שהם יאבדו בגלל בעיות אחרות בשירות רישום היומנים.
סטרימינג של נתונים בדיוק פעם אחת
בקטע הזה מוסבר איך Dataflow מיישם עיבוד מדויק של נתונים בסטרימינג, כולל איך Dataflow מטפל במורכבויות כמו עיבוד לא דטרמיניסטי, נתונים מאוחרים וקוד בהתאמה אישית.
ארגון נתונים בזמן אמת של Dataflow
משימות Dataflow להזרמת נתונים פועלות במקביל בהרבה עובדים שונים, על ידי הקצאת טווחי עבודה לכל עובד. למרות שההקצאות עשויות להשתנות לאורך זמן בתגובה לכשלים של עובדים, להתאמה אוטומטית לעומס או לאירועים אחרים, אחרי כל GroupByKey טרנסנספורמציה, כל הרשומות עם אותו מפתח מעובדות באותו עובד. הטרנספורמציה GroupByKey משמשת לעיתים קרובות בטרנספורמציות מורכבות, כמו Count, FileIO וכן הלאה. כדי להבטיח שרשומות של מפתח נתון יגיעו לאותו תהליך עובד, תהליכי העובד של Dataflow מבצעים ערבוב של הנתונים ביניהם באמצעות קריאות לפרוצדורה מרוחקת (RPC).
כדי להבטיח שהרשומות לא יאבדו במהלך הערבוב, Dataflow משתמש בגיבוי במעלה הזרם. בגיבוי של נתונים שנשלחים לשרת, תהליך העבודה ששולח את הרשומות מנסה שוב את קריאות ה-RPC עד שהוא מקבל אישור חיובי שהרשומה התקבלה. תופעות הלוואי של עיבוד הרשומה נשמרות באחסון קבוע בהמשך השרשרת. אם העובד ששולח את הרשומות לא זמין, Dataflow ממשיך לנסות לשלוח בקשות RPC, וכך מוודא שכל רשומה נשלחת לפחות פעם אחת.
ניסיונות השליחה החוזרים האלה עלולים ליצור כפילויות, ולכן כל הודעה מתויגת במזהה ייחודי. כל מקבל שומר קטלוג של כל המזהים שכבר נראו ועברו עיבוד. כשמתקבלת רשומה, Dataflow מחפש את המזהה שלה בקטלוג. אם המזהה נמצא, הרשומה כבר התקבלה והועברה, והיא נפסלת כרשומה כפולה. כדי לוודא שמזהי הרשומות יציבים, כל פלט משלב לשלב נשמר באחסון כנקודת ביקורת. כתוצאה מכך, אם אותה הודעה נשלחת כמה פעמים בגלל קריאות חוזרות ל-RPC, ההודעה נשמרת באחסון רק פעם אחת.
איך מבטיחים זמן אחזור קצר
כדי שהעיבוד בדיוק פעם אחת יהיה אפשרי, צריך לצמצם את פעולות הקלט/פלט, ובפרט למנוע פעולות קלט/פלט בכל רשומה. כדי להשיג את המטרה הזו, Dataflow משתמשת במסנני Bloom וב-garbage collection.
פילטרים של Bloom
מסנני Bloom הם מבני נתונים קומפקטיים שמאפשרים לבצע במהירות בדיקות של חברות בקבוצה. ב-Dataflow, כל עובד שומר מסנן בלום של כל מזהה שהוא רואה. כשמגיע מזהה רשומה חדש, העובד מחפש את המזהה במסנן. אם המסנן מחזיר ערך false, הרשומה הזו לא כפולה, וה-worker לא מחפש את המזהה באחסון יציב.
Dataflow שומרת קבוצה של מסנני בלום מתגלגלים, שמחולקים לקטגוריות לפי זמן. כשמגיעה רשומה, Dataflow בוחר את המסנן המתאים לבדיקה על סמך חותמת הזמן של המערכת. השלב הזה מונע את הרוויה של מסנני Bloom כשהמערכת אוספת את המסננים, וגם מגביל את כמות הנתונים שצריך לסרוק בהפעלה.
איסוף אשפה
כדי למנוע מילוי של האחסון במזהי רשומות, Dataflow משתמשת באיסוף כדי להסיר רשומות ישנות. Dataflow משתמש בחותמת הזמן של המערכת כדי לחשב את סימן המים של איסוף האשפה.
סימן המים הזה מבוסס על משך הזמן הפיזי שבו ההזמנה נמצאת בשלב מסוים. לכן, הוא מספק גם מידע על החלקים האיטיים בצינור. המטא-נתונים האלה הם הבסיס למדד של השהיית המערכת שמוצג בממשק המעקב של Dataflow.
אם רשומה מגיעה עם חותמת זמן ישנה יותר מסימן המים, והמזהים של הזמן הזה כבר עברו איסוף, המערכת מתעלמת מהרשומה. הסיבה לכך היא שהסימן הנמוך שמפעיל garbage collection לא מתקדם עד שמתקבל אישור על מסירת הרשומות, ולכן הרשומות שמגיעות באיחור הן כפילויות.
מקורות לא דטרמיניסטיים
Dataflow משתמש ב-Apache Beam SDK כדי לקרוא נתונים לצינורות עיבוד נתונים. אם העיבוד נכשל, יכול להיות ש-Dataflow ינסה שוב לקרוא ממקור. במצב כזה, Dataflow צריך לוודא שכל רשומה ייחודית שנוצרת על ידי מקור מתועדת בדיוק פעם אחת. במקורות דטרמיניסטיים, כמו Pub/Sub Lite או Kafka, הרשומות נקראות על סמך היסט רשום, ולכן אין צורך לבצע את השלב הזה.
מכיוון ש-Dataflow לא יכול להקצות מזהי רשומות באופן אוטומטי, מקורות לא דטרמיניסטיים צריכים לציין ל-Dataflow מהם מזהי הרשומות כדי למנוע כפילויות. אם מקור מספק מזהים ייחודיים לכל רשומה, המחבר משתמש בערבוב בצינור כדי להסיר כפילויות. רשומות עם אותו מזהה מסוננות. דוגמה לאופן שבו Dataflow מטמיע עיבוד בדיוק פעם אחת כשמשתמשים ב-Pub/Sub כמקור מופיעה בקטע עיבוד בדיוק פעם אחת בדף בנושא סטרימינג עם Pub/Sub.
כשמריצים פונקציות מותאמות אישית של DoFn כחלק מצינור הנתונים, מערכת Dataflow לא מבטיחה שהקוד הזה יפעל רק פעם אחת לכל רשומה. כדי להבטיח עיבוד של לפחות פעם אחת במקרה של כשלים ב-worker, יכול להיות ש-Dataflow יפעיל רשומה נתונה דרך טרנספורמציה כמה פעמים, או שהוא יפעיל את אותה רשומה בו-זמנית בכמה workers. אם תכללו בצינור עיבוד הנתונים קוד שמבצע פעולות כמו יצירת קשר עם שירות חיצוני, יכול להיות שהפעולות יופעלו יותר מפעם אחת עבור רשומה מסוימת.
כדי להפוך עיבוד לא דטרמיניסטי לדטרמיניסטי, משתמשים ביצירת נקודות ביקורת. כשמשתמשים בנקודות ביקורת, כל פלט מהטרנספורמציה עובר לנקודת ביקורת באחסון יציב עם המזהה הייחודי שלו לפני שהוא מועבר לשלב הבא. ניסיונות חוזרים ב-shuffle delivery של Dataflow מעבירים את הפלט שנוצר בנקודת הבדיקה. הקוד שלכם יכול לפעול כמה פעמים, אבל Dataflow מוודא שרק הפלט של אחת מהפעולות האלה יישמר. Dataflow משתמש בחנות עקבית שמונעת כתיבה של כפילויות לאחסון יציב.
שליחת פלט בדיוק פעם אחת
ערכת ה-SDK של Apache Beam כוללת יעדי נתונים מובנים שנועדו להבטיח שלא ייווצרו כפילויות. כשאפשר, כדאי להשתמש באחד מהיעדים המובנים האלה.
אם אתם צריכים לכתוב יעד משלכם, הגישה הכי טובה היא להפוך את אובייקט הפונקציה לאידמפוטנטי, כדי שאפשר יהיה לנסות אותו שוב כמה פעמים שצריך בלי לגרום לתופעות לוואי לא רצויות. עם זאת, לעיתים קרובות רכיב מסוים של הטרנספורמציה שמיישם את הפונקציונליות של יעד הנתונים הוא לא דטרמיניסטי, ויכול להיות שהוא ישתנה אם תנסו שוב.
לדוגמה, בצבירה של חלון, יכול להיות שקבוצת הרשומות בחלון לא תהיה דטרמיניסטית. באופן ספציפי, יכול להיות שהחלון ינסה להפעיל את הרכיבים e0, e1, e2. יכול להיות שה-worker יקרוס לפני שהוא יבצע את העיבוד של החלון, אבל לא לפני שהאלמנטים האלה יישלחו כתוצאת לוואי. כשמפעילים מחדש את ה-worker, החלון מופעל שוב, ורכיב מאוחר e3 מגיע. מכיוון שהרכיב הזה מגיע לפני שהחלון מתבצע, הוא לא נספר כנתונים שהגיעו באיחור, ולכן הפונקציה DoFn נקראת שוב עם הרכיבים e0, e1, e2 ו-e3. האלמנטים האלה נשלחים לפעולת תופעת הלוואי. אידמפוטנטיות לא עוזרת בתרחיש הזה, כי בכל פעם נשלחות קבוצות שונות של רשומות לוגיות.
כדי לטפל בבעיות של אי-דטרמיניזם ב-Dataflow, משתמשים בטרנספורמציה המובנית Reshuffle. כש-Dataflow מבצע ערבוב של נתונים, הוא כותב את הנתונים בצורה עמידה, כך שאם מתבצע ניסיון חוזר של פעולות אחרי הערבוב, כל הרכיבים שנוצרו בצורה לא דטרמיניסטית יהיו יציבים. השימוש בטרנספורמציה Reshuffle מבטיח שרק גרסה אחת של הפלט של DoFn תוכל לעבור את הגבול של פעולת הערבוב.
התבנית הבאה מבטיחה שפעולת תופעת הלוואי תמיד תקבל רשומה דטרמיניסטית לפלט:
c.apply(Window.<..>into(FixedWindows.of(Duration.standardMinutes(1))))
.apply(GroupByKey.<..>.create())
.apply(new PrepareOutputData())
.apply(Reshuffle.<..>of())
.apply(WriteToSideEffect());
כדי לוודא שהרכיב Dataflow runner יודע שהאלמנטים צריכים להיות יציבים לפני שמבצעים DoFn, מוסיפים את ההערה RequiresStableInput ל-DoFn.
מידע נוסף
- הגדרת מצב הסטרימינג של צינור עיבוד הנתונים
- Streaming with Pub/Sub
- מנוע סטרימינג: מודל ביצוע לעיבוד נתונים עם יכולת הרחבה גבוהה וזמן אחזור נמוך
- מידע נוסף על מודל ההרצה של Apache Beam
- After Lambda: Exactly-once processing in Dataflow, Part 1
- אחרי Lambda: עיבוד בדיוק פעם אחת ב-Dataflow, חלק 2 (הבטחת זמן אחזור קצר)
- אחרי Lambda: עיבוד בדיוק פעם אחת ב-Dataflow, חלק 3 (מקורות ויעדים)