אפליקציית הדוגמה למסחר אלקטרוני מדגימה שיטות מומלצות לשימוש ב-Dataflow כדי להטמיע ניתוח נתונים בסטרימינג ו-AI בזמן אמת. הדוגמה מכילה דפוסי משימות שמראים את הדרך הכי טובה לבצע משימות תכנות ב-Java. המשימות האלה נדרשות בדרך כלל כדי ליצור אפליקציות למסחר אלקטרוני.
האפליקציה מכילה את דפוסי המשימות הבאים של Java:
- שימוש בסכימות של Apache Beam כדי לעבוד עם נתונים מובְנים
- שימוש ב-JsonToRow להמרת נתוני JSON
- שימוש בכלי ליצירת קוד
AutoValueכדי ליצור אובייקטים פשוטים של Java (POJO) - הוספת נתונים שלא ניתן לעבד לתור לצורך ניתוח נוסף
- החלה של טרנספורמציות לאימות נתונים באופן סדרתי
- שימוש ב-
DoFn.StartBundleכדי להפעיל קבוצות קטנות של שיחות לשירותים חיצוניים - שימוש בדפוס מתאים של קלט צדדי
שימוש בסכימות של Apache Beam לעבודה עם נתונים מובְנים
אתם יכולים להשתמש בסכימות של Apache Beam כדי לעבד נתונים מובְנים בצורה קלה יותר.
המרת האובייקטים לשורות מאפשרת ליצור קוד Java נקי מאוד, וכך להקל על תרגיל הבנייה של גרף אציקלי מכוון (DAG). אפשר גם להפנות למאפייני אובייקט כשדות בהצהרות הניתוח שיוצרים, במקום להפעיל שיטות.
דוגמה
שימוש בפונקציה JsonToRow להמרת נתוני JSON
עיבוד מחרוזות JSON ב-Dataflow הוא צורך נפוץ. לדוגמה, מחרוזות JSON עוברות עיבוד כשמשדרים מידע על נתוני קליקים שנאסף מאפליקציות אינטרנט. כדי לעבד מחרוזות JSON, צריך להמיר אותן לשורות או לאובייקטים פשוטים של Java (POJO) במהלך עיבוד צינור הנתונים.
אפשר להשתמש בטרנספורמציה המובנית JsonToRow של Apache Beam כדי להמיר מחרוזות JSON לשורות. עם זאת, אם רוצים תור לעיבוד הודעות שלא הצליחו, צריך ליצור אותו בנפרד. אפשר לעיין במאמר בנושא הוספת נתונים שלא ניתן לעיבוד לתור לצורך ניתוח נוסף.
אם אתם צריכים להמיר מחרוזת JSON ל-POJO באמצעות AutoValue, צריך לרשום סכימה לסוג באמצעות ההערה @DefaultSchema(AutoValueSchema.class), ואז להשתמש בכלי Convert. הקוד שיתקבל יהיה דומה לקוד הבא:
PCollection<String> json = ...
PCollection<MyUserType> = json
.apply("Parse JSON to Beam Rows", JsonToRow.withSchema(expectedSchema))
.apply("Convert to a user type with a compatible schema registered", Convert.to(MyUserType.class))
מידע נוסף, כולל סוגי Java שונים שאפשר להסיק מהם סכימות, זמין במאמר בנושא יצירת סכימות.
אם הפונקציה JsonToRow לא פועלת עם הנתונים שלכם, אפשר להשתמש ב-Gson. ההגדרות של Gson לעיבוד נתונים הן די גמישות, ולכן יכול להיות שתצטרכו להוסיף עוד אימות לתהליך המרת הנתונים.
דוגמאות
שימוש בכלי ליצירת קוד AutoValue ליצירת POJO
סכימות של Apache Beam הן לרוב הדרך הטובה ביותר לייצג אובייקטים בצינור עיבוד נתונים, כי הן מאפשרות לעבוד עם נתונים מובְנים. עם זאת, לפעמים נדרש אובייקט Java רגיל (POJO), למשל כשעובדים עם אובייקטים של זוגות מפתח/ערך או כשמטפלים במצב של אובייקט.
כדי ליצור POJO באופן ידני, צריך לכתוב קוד לביטול של השיטות equals() ו-hashcode(), וזה יכול לקחת הרבה זמן וגם להוביל לשגיאות. שינויים שגויים עלולים לגרום להתנהגות לא עקבית של האפליקציה או לאובדן נתונים.
כדי ליצור POJO, משתמשים בבונה המחלקה AutoValue. האפשרות הזו מבטיחה שייעשה שימוש בשינויים הנדרשים, ועוזרת לכם להימנע משגיאות אפשריות.
נעשה שימוש נרחב ב-AutoValue בבסיס הקוד של Apache Beam, לכן כדאי להכיר את כלי הבנייה הזה אם רוצים לפתח צינורות עיבוד נתונים של Apache Beam ב-Dataflow באמצעות Java.
אפשר גם AutoValue עם סכימות של Apache Beam אם מוסיפים הערה @DefaultSchema(AutoValueSchema.class). מידע נוסף זמין במאמר בנושא יצירת סכימות.
מידע נוסף על AutoValue זמין במאמר למה AutoValue? ובמסמכי AutoValue.
דוגמה
הוספת נתונים שלא ניתן לעבד לתור לצורך ניתוח נוסף
במערכות ייצור, חשוב לטפל בנתונים בעייתיים. אם אפשר, כדאי לאמת ולתקן את הנתונים תוך כדי הזרמה. אם אי אפשר לתקן את הערך, צריך לרשום אותו ביומן של תור הודעות שלא עברו עיבוד, שלפעמים נקרא תור של הודעות שלא נמסרו, כדי לנתח אותו בהמשך. בעיות נפוצות מתרחשות כשממירים נתונים מפורמט אחד לפורמט אחר, למשל כשממירים מחרוזות JSON לשורות.
כדי לפתור את הבעיה הזו, צריך להשתמש בטרנספורמציה עם כמה פלטים כדי להעביר את הרכיבים שמכילים את הנתונים שלא עברו עיבוד לPCollection אחר לצורך ניתוח נוסף. העיבוד הזה הוא פעולה נפוצה שאולי תרצו להשתמש בה במקומות רבים בצינור. כדאי לנסות ליצור טרנספורמציה גנרית מספיק כדי שאפשר יהיה להשתמש בה בכמה מקומות. קודם יוצרים אובייקט שגיאה כדי לעטוף מאפיינים נפוצים, כולל הנתונים המקוריים. בשלב הבא, יוצרים טרנספורמציה של sink עם כמה אפשרויות ליעד.
דוגמאות
החלת טרנספורמציות של אימות נתונים באופן סדרתי
לעתים קרובות צריך לנקות את הנתונים שנאספים ממערכות חיצוניות. כדאי לבנות את צינור הנתונים כך שיוכל לתקן נתונים בעייתיים בזרם, כשזה אפשרי. לשלוח את הנתונים לתור לניתוח נוסף כשצריך.
יכול להיות שיהיו כמה בעיות בהודעה אחת שצריך לתקן, ולכן חשוב לתכנן את הגרף האציקלי המכוון (DAG) שנדרש. אם רכיב מכיל נתונים עם כמה פגמים, צריך לוודא שהרכיב עובר את ההמרות המתאימות.
לדוגמה, נניח שיש אלמנט עם הערכים הבאים, שאף אחד מהם לא צריך להיות null:
{"itemA": null,"itemB": null}
מוודאים שהאלמנט עובר טרנספורמציות שמתקנות את שתי הבעיות האפשריות:
badElements.apply(fixItemA).apply(fixItemB)
יכול להיות שצינור עיבוד הנתונים שלכם יכלול יותר שלבים סדרתיים, אבל המיזוג עוזר לצמצם את התקורה של העיבוד.
דוגמה
שימוש ב-DoFn.StartBundle כדי לבצע קבוצות קטנות של קריאות לשירותים חיצוניים
יכול להיות שתצטרכו להפעיל ממשקי API חיצוניים כחלק מצינור הנתונים. צינור העברת נתונים מחלק את העבודה בין הרבה משאבי מחשוב, ולכן ביצוע קריאה אחת לכל רכיב שמועבר דרך המערכת עלול להעמיס על נקודת קצה של שירות חיצוני. הבעיה הזו נפוצה במיוחד כשלא משתמשים בפונקציות להפחתת הנתונים.
כדי להימנע מהבעיה הזו, כדאי לבצע קריאות אצווה למערכות חיצוניות.
אפשר לאגד קריאות באמצעות טרנספורמציה של GroupByKey או באמצעות Apache Beam Timer API. עם זאת, שתי הגישות האלה דורשות ערבוב, שמוסיף תקורה מסוימת לעיבוד ודורש מספר קסם כדי לקבוע את מרחב המפתחות.
במקום זאת, אפשר להשתמש ברכיבי מחזור החיים StartBundle ו-FinishBundle כדי להוסיף את הנתונים לקבוצות. כשבוחרים באפשרויות האלה, לא צריך לערבב את השירים.
חיסרון קל באפשרות הזו הוא שגודלי החבילות נקבעים באופן דינמי על ידי ההטמעה של הרצת התהליכים, על סמך מה שקורה כרגע בצינור ובסביבות העבודה שלו. במצב 'בתוך הזרם', חבילות הן לרוב קטנות. ההחלטה לגבי אופן האריזה של נתונים ב-Dataflow מושפעת מגורמים שקשורים לשרת העורפי, כמו השימוש בפיצול, נפח הנתונים שזמין למפתח מסוים וקצב העברת הנתונים של צינור הנתונים.
דוגמה
EventItemCorrectionService.java
שימוש בדפוס מתאים של קלט צדדי להעשרת נתונים
באפליקציות לניתוח נתונים בזמן אמת, הנתונים מועשרים לעיתים קרובות במידע נוסף שעשוי להיות שימושי לניתוח נוסף. לדוגמה, אם יש לכם את מזהה החנות של עסקה, יכול להיות שתרצו להוסיף מידע על מיקום החנות. לרוב, המידע הנוסף הזה מתווסף על ידי לקיחת רכיב והוספת מידע מטבלת מיפוי.
במקרה של טבלאות בדיקה שמשתנות לאט וקטנות בגודלן, אפשר להוסיף את הטבלה לצינור כסוג יחיד שמטמיע את הממשק Map<K,V>. האפשרות הזו מאפשרת לכם להימנע מקריאה ל-API לכל רכיב לצורך חיפוש. אחרי שכוללים עותק של טבלה בצינור, צריך לעדכן אותה מדי פעם כדי שהנתונים יהיו עדכניים.
כדי לטפל בקלט צדדי שמתעדכן לאט, אפשר להשתמש בדפוסי קלט צדדי של Apache Beam.
שמירה במטמון
קלט צדדי נטען בזיכרון ולכן הוא נשמר במטמון באופן אוטומטי.
אפשר להגדיר את גודל המטמון באמצעות האפשרות --setWorkerCacheMb.
אפשר לשתף את המטמון בין DoFn מופעים ולהשתמש בטריגרים חיצוניים כדי לרענן את המטמון.
דוגמה
SlowMovingStoreLocationDimension.java