Apache Beam מפשט את תהליך העבודה של העשרת הנתונים באמצעות טרנספורמציה מוכנה מראש להעשרה שאפשר להוסיף לצינור הנתונים. בדף הזה מוסבר איך להשתמש בטרנספורמציית ההעשרה של Apache Beam כדי להעשיר את נתוני הסטרימינג.
כשמעשירים נתונים, מוסיפים לנתונים הגולמיים ממקור אחד נתונים קשורים ממקור שני. הנתונים הנוספים יכולים להגיע ממגוון מקורות, כמו Bigtable או BigQuery. הטרנספורמציה להעשרת נתונים ב-Apache Beam משתמשת בחיפוש של זוגות מפתח/ערך כדי לקשר את הנתונים הנוספים לנתונים הגולמיים.
בדוגמאות הבאות מפורטים כמה מקרים שבהם העשרת נתונים יכולה להיות שימושית:
- אתם רוצים ליצור צינור למסחר אלקטרוני שמתעד פעילויות של משתמשים באתר או באפליקציה ומספק המלצות מותאמות אישית. הטרנספורמציה משלבת את הפעילויות בנתוני צינור המכירות כדי שתוכלו לספק את ההמלצות המותאמות אישית.
- יש לכם נתוני משתמשים שאתם רוצים לצרף לנתונים גיאוגרפיים כדי לבצע ניתוח גיאוגרפי.
- אתם רוצים ליצור צינור להעברת נתונים שמקבל נתונים ממכשירי אינטרנט של הדברים (IOT) ששולחים אירועי טלמטריה.
יתרונות
היתרונות של טרנספורמציית ההעשרה:
- הכלי משנה את הנתונים בלי שתצטרכו לכתוב קוד מורכב או לנהל ספריות בסיסיות.
- מספקת רכיבי handler מובנים של מקורות.
- אפשר להשתמש ב-handler
BigTableEnrichmentHandlerכדי להעשיר את הנתונים באמצעות מקור Bigtable בלי להעביר פרטי הגדרה. - אפשר להשתמש ב-handler
BigQueryEnrichmentHandlerכדי להעשיר את הנתונים באמצעות מקור BigQuery בלי להעביר פרטי הגדרה. - שימוש ב-handler
VertexAIFeatureStoreEnrichmentHandlerעם Vertex AI Feature Store ומילוי בקשה באופן מיידי של Bigtable.
- אפשר להשתמש ב-handler
- משתמשים בוויסות נתונים בצד הלקוח כדי לנהל את הגבלת קצב יצירת הבקשות. הבקשות מושהות באופן מעריכי לפני ניסיון חוזר, עם אסטרטגיית ניסיון חוזר שמוגדרת כברירת מחדל. אתם יכולים להגדיר הגבלת קצב של יצירת בקשות בהתאם לתרחיש השימוש שלכם.
תמיכה ומגבלות
הדרישות לגבי טרנספורמציית העשרה הן:
- האפשרות הזו זמינה לצינורות עיבוד נתונים באצווה ובסטרימינג.
- ה-handler
BigTableEnrichmentHandlerזמין בגרסאות 2.54.0 ואילך של Apache Beam Python SDK. - ה-handler
BigQueryEnrichmentHandlerזמין בגרסאות 2.57.0 ואילך של Apache Beam Python SDK. - הפונקציה
VertexAIFeatureStoreEnrichmentHandlerזמינה בגרסאות 2.55.0 ואילך של Apache Beam Python SDK. - כשמשתמשים ב-Apache Beam Python SDK בגרסה 2.55.0 ואילך, צריך גם להתקין את לקוח Python ל-Redis.
- משימות Dataflow חייבות להשתמש ב-Runner v2.
שימוש בטרנספורמציה של העשרה
כדי להשתמש בטרנספורמציה של העשרה, צריך לכלול את הקוד הבא בצינור:
import apache_beam as beam
from apache_beam.transforms.enrichment import Enrichment
from apache_beam.transforms.enrichment_handlers.bigtable import BigTableEnrichmentHandler
bigtable_handler = BigTableEnrichmentHandler(...)
with beam.Pipeline() as p:
output = (p
...
| "Create" >> beam.Create(data)
| "Enrich with Bigtable" >> Enrichment(bigtable_handler)
...
)
כברירת מחדל, טרנספורמציית ההעשרה מבצעת שאילתת איחוד (cross join), ולכן צריך לתכנן את האיחוד המותאם אישית כדי להעשיר את נתוני הקלט. העיצוב הזה מבטיח שההצטרפות תכלול רק את השדות שצוינו.
בדוגמה הבאה, left הוא רכיב הקלט של טרנספורמציית ההעשרה, ו-right הם נתונים שנשלפו משירות חיצוני עבור רכיב הקלט הזה.
def custom_join(left: Dict[str, Any], right: Dict[str, Any]):
enriched = {}
enriched['FIELD_NAME'] = left['FIELD_NAME']
...
return beam.Row(**enriched)
פרמטרים
כדי להשתמש בטרנספורמציה להעשרה, צריך להשתמש בפרמטר EnrichmentHandler.
אפשר גם להשתמש בפרמטר הגדרה כדי לציין פונקציית lambda עבור פונקציית join, זמן קצוב לתפוגה, מגביל או חוזר (אסטרטגיית ניסיון חוזר). הפרמטרים הבאים של ההגדרה זמינים:
-
join_fn: פונקציהlambdaשמקבלת מילונים כקלט ומחזירה שורה מועשרת (Callable[[Dict[str, Any], Dict[str, Any]], beam.Row]). השורה המועשרת מציינת איך לצרף את הנתונים שאוחזרו מה-API. ברירת המחדל היא cross join. -
timeout: מספר השניות להמתנה עד שה-API ישלים את הבקשה לפני שיפוג הזמן. ברירת המחדל היא 30 שניות. -
throttler: מציין את מנגנון ויסות הנתונים. האפשרות היחידה שנתמכת היא הגבלת קצב העברת נתונים דינמית מצד הלקוח שמוגדרת כברירת מחדל. -
repeater: מציין את אסטרטגיית הניסיון החוזר כשמתרחשות שגיאות כמוTooManyRequestsו-TimeoutException. ברירת המחדל היאExponentialBackOffRepeater.
המאמרים הבאים
- דוגמאות נוספות זמינות במאמר Enrichment transform בקטלוג הטרנספורמציות של Apache Beam.
- שימוש ב-Apache Beam וב-Bigtable כדי להעשיר נתונים.
- שימוש ב-Apache Beam וב-BigQuery כדי להעשיר נתונים
- שימוש ב-Apache Beam וב-Vertex AI Feature Store כדי להעשיר נתונים.