העשרת נתונים בסטרימינג

‫Apache Beam מפשט את תהליך העבודה של העשרת הנתונים באמצעות טרנספורמציה מוכנה מראש להעשרה שאפשר להוסיף לצינור הנתונים. בדף הזה מוסבר איך להשתמש בטרנספורמציית ההעשרה של Apache Beam כדי להעשיר את נתוני הסטרימינג.

כשמעשירים נתונים, מוסיפים לנתונים הגולמיים ממקור אחד נתונים קשורים ממקור שני. הנתונים הנוספים יכולים להגיע ממגוון מקורות, כמו Bigtable או BigQuery. הטרנספורמציה להעשרת נתונים ב-Apache Beam משתמשת בחיפוש של זוגות מפתח/ערך כדי לקשר את הנתונים הנוספים לנתונים הגולמיים.

בדוגמאות הבאות מפורטים כמה מקרים שבהם העשרת נתונים יכולה להיות שימושית:

  • אתם רוצים ליצור צינור למסחר אלקטרוני שמתעד פעילויות של משתמשים באתר או באפליקציה ומספק המלצות מותאמות אישית. הטרנספורמציה משלבת את הפעילויות בנתוני צינור המכירות כדי שתוכלו לספק את ההמלצות המותאמות אישית.
  • יש לכם נתוני משתמשים שאתם רוצים לצרף לנתונים גיאוגרפיים כדי לבצע ניתוח גיאוגרפי.
  • אתם רוצים ליצור צינור להעברת נתונים שמקבל נתונים ממכשירי אינטרנט של הדברים (IOT) ששולחים אירועי טלמטריה.

יתרונות

היתרונות של טרנספורמציית ההעשרה:

  • הכלי משנה את הנתונים בלי שתצטרכו לכתוב קוד מורכב או לנהל ספריות בסיסיות.
  • מספקת רכיבי handler מובנים של מקורות.
  • משתמשים בוויסות נתונים בצד הלקוח כדי לנהל את הגבלת קצב יצירת הבקשות. הבקשות מושהות באופן מעריכי לפני ניסיון חוזר, עם אסטרטגיית ניסיון חוזר שמוגדרת כברירת מחדל. אתם יכולים להגדיר הגבלת קצב של יצירת בקשות בהתאם לתרחיש השימוש שלכם.

תמיכה ומגבלות

הדרישות לגבי טרנספורמציית העשרה הן:

  • האפשרות הזו זמינה לצינורות עיבוד נתונים באצווה ובסטרימינג.
  • ה-handler‏ BigTableEnrichmentHandler זמין בגרסאות 2.54.0 ואילך של Apache Beam Python SDK.
  • ה-handler‏ BigQueryEnrichmentHandler זמין בגרסאות 2.57.0 ואילך של Apache Beam Python SDK.
  • הפונקציה VertexAIFeatureStoreEnrichmentHandler זמינה בגרסאות 2.55.0 ואילך של Apache Beam Python SDK.
  • כשמשתמשים ב-Apache Beam Python SDK בגרסה 2.55.0 ואילך, צריך גם להתקין את לקוח Python ל-Redis.
  • משימות Dataflow חייבות להשתמש ב-Runner v2.

שימוש בטרנספורמציה של העשרה

כדי להשתמש בטרנספורמציה של העשרה, צריך לכלול את הקוד הבא בצינור:

import apache_beam as beam
from apache_beam.transforms.enrichment import Enrichment
from apache_beam.transforms.enrichment_handlers.bigtable import BigTableEnrichmentHandler

bigtable_handler = BigTableEnrichmentHandler(...)

with beam.Pipeline() as p:
  output = (p
            ...
            | "Create" >> beam.Create(data)
            | "Enrich with Bigtable" >> Enrichment(bigtable_handler)
            ...
            )

כברירת מחדל, טרנספורמציית ההעשרה מבצעת שאילתת איחוד (cross join), ולכן צריך לתכנן את האיחוד המותאם אישית כדי להעשיר את נתוני הקלט. העיצוב הזה מבטיח שההצטרפות תכלול רק את השדות שצוינו.

בדוגמה הבאה, left הוא רכיב הקלט של טרנספורמציית ההעשרה, ו-right הם נתונים שנשלפו משירות חיצוני עבור רכיב הקלט הזה.

def custom_join(left: Dict[str, Any], right: Dict[str, Any]):
  enriched = {}
  enriched['FIELD_NAME'] = left['FIELD_NAME']
  ...
  return beam.Row(**enriched)

פרמטרים

כדי להשתמש בטרנספורמציה להעשרה, צריך להשתמש בפרמטר EnrichmentHandler.

אפשר גם להשתמש בפרמטר הגדרה כדי לציין פונקציית lambda עבור פונקציית join, זמן קצוב לתפוגה, מגביל או חוזר (אסטרטגיית ניסיון חוזר). הפרמטרים הבאים של ההגדרה זמינים:

  • join_fn: פונקציה lambda שמקבלת מילונים כקלט ומחזירה שורה מועשרת (Callable[[Dict[str, Any], Dict[str, Any]], beam.Row]). השורה המועשרת מציינת איך לצרף את הנתונים שאוחזרו מה-API. ברירת המחדל היא cross join.
  • timeout: מספר השניות להמתנה עד שה-API ישלים את הבקשה לפני שיפוג הזמן. ברירת המחדל היא 30 שניות.
  • throttler: מציין את מנגנון ויסות הנתונים. האפשרות היחידה שנתמכת היא הגבלת קצב העברת נתונים דינמית מצד הלקוח שמוגדרת כברירת מחדל.
  • repeater: מציין את אסטרטגיית הניסיון החוזר כשמתרחשות שגיאות כמו TooManyRequests ו-TimeoutException. ברירת המחדל היא ExponentialBackOffRepeater.

המאמרים הבאים