חלק מתבניות Dataflow ש-Google מספקת תומכות בפונקציות בהגדרת המשתמש (UDF). פונקציות UDF מאפשרות להרחיב את הפונקציונליות של תבנית בלי לשנות את קוד התבנית.
סקירה כללית
כדי ליצור UDF, כותבים פונקציית JavaScript או פונקציית Python, בהתאם לתבנית. מאחסנים את קובץ הקוד של ה-UDF ב-Cloud Storage ומציינים את המיקום כפרמטר של תבנית. עבור כל רכיב קלט, התבנית קוראת לפונקציה. הפונקציה משנה את הרכיב או מבצעת לוגיקה מותאמת אישית אחרת ומחזירה את התוצאה לתבנית.
לדוגמה, אפשר להשתמש בפונקציה מוגדרת על ידי המשתמש כדי:
- שינוי הפורמט של נתוני הקלט כך שיתאימו לסכימת יעד.
- צנזור מידע אישי רגיש.
- לסנן חלק מהרכיבים מהפלט.
הקלט לפונקציית ה-UDF הוא רכיב נתונים יחיד, שעבר סריאליזציה כמחרוזת JSON. הפונקציה מחזירה מחרוזת JSON מסודרת כפלט. פורמט הנתונים תלוי בתבנית. לדוגמה, בתבנית Pub/Sub Subscription to BigQuery, הקלט הוא נתוני ההודעה ב-Pub/Sub שעברו סריאליזציה כאובייקט JSON, והפלט הוא אובייקט JSON שעבר סריאליזציה ומייצג שורה בטבלה ב-BigQuery. מידע נוסף זמין במסמכי התיעוד של כל תבנית.
הפעלת תבנית עם פונקציית UDF
כדי להריץ תבנית עם UDF, מציינים את המיקום ב-Cloud Storage של קובץ ה-JavaScript ואת שם הפונקציה כפרמטרים של התבנית.
בחלק מהתבניות ש-Google מספקת, אפשר גם ליצור את ה-UDF ישירות בGoogle Cloud מסוף, באופן הבא:
נכנסים לדף Dataflow במסוף Google Cloud .
לוחצים על add_boxCreate job from template.
בוחרים את התבנית שסופקה על ידי Google שרוצים להפעיל.
מרחיבים את הקטע פרמטרים אופציונליים. אם התבנית תומכת בפונקציות UDF, היא כוללת פרמטר למיקום של ה-UDF ב-Cloud Storage ופרמטר נוסף לשם הפונקציה.
לצד פרמטר התבנית, לוחצים על יצירת UDF.
בחלונית בחירה או יצירה של פונקציה בהגדרת המשתמש (UDF):
- מזינים שם קובץ. דוגמה:
my_udf.js - בוחרים תיקייה ב-Cloud Storage.
דוגמה:
gs://your-bucket/your-folder - משתמשים בעורך הקוד המוטבע כדי לכתוב את הפונקציה. העורך מאוכלס מראש בקוד שחוזר על עצמו (boilerplate) שאפשר להשתמש בו כנקודת התחלה.
לוחצים על יצירת פונקציה מוגדרת על ידי המשתמש.
המסוף שומר את קובץ ה-UDF ומאכלס את המיקום ב-Cloud Storage. Google Cloud
מזינים את שם הפונקציה בשדה המתאים.
- מזינים שם קובץ. דוגמה:
כתיבת פונקציה מוגדרת על ידי המשתמש (UDF) ב-JavaScript
הקוד הבא מציג פונקציה בהגדרת המשתמש (UDF) ב-JavaScript שאין לה פעולה (no-op), שאפשר להתחיל ממנה:
/*
* @param {string} inJson input JSON message (stringified)
* @return {?string} outJson output JSON message (stringified)
*/
function process(inJson) {
const obj = JSON.parse(inJson);
// Example data transformations:
// Add a field: obj.newField = 1;
// Modify a field: obj.existingField = '';
// Filter a record: return null;
return JSON.stringify(obj);
}
קוד ה-JavaScript פועל ב-Nashorn JavaScript engine. מומלץ לבדוק את הפונקציה המוגדרת על ידי המשתמש במנוע Nashorn לפני הפריסה שלה. מנוע Nashorn
לא זהה בדיוק להטמעה של JavaScript ב-Node.js. בעיה נפוצה היא שימוש ב-console.log() או ב-Number.isNaN(), שאף אחד מהם לא מוגדר במנוע Nashorn.
אפשר לבדוק את ה-UDF במנוע Nashorn באמצעות Cloud Shell, שבו מותקן מראש JDK 11. מפעילים את Nashorn במצב אינטראקטיבי באופן הבא:
jjs --language=es6
במעטפת האינטראקטיבית של Nashorn, מבצעים את השלבים הבאים:
- מתקשרים אל
loadכדי לטעון את קובץ ה-JavaScript של ה-UDF. - מגדירים אובייקט JSON של קלט בהתאם להודעות הצפויות של צינור הנתונים.
- משתמשים בפונקציה
JSON.stringifyכדי לבצע סריאליזציה של הקלט למחרוזת JSON. - קוראים לפונקציית ה-UDF כדי לעבד את מחרוזת ה-JSON.
- מבצעים קריאה ל-
JSON.parseכדי לבטל את הסריאליזציה של הפלט. - מאמתים את התוצאה.
דוגמה:
> load('my_udf.js')
> var input = {"name":"user1"}
> var output = process(JSON.stringify(input))
> print(output)
כתיבת פונקציה מוגדרת על ידי המשתמש (UDF) ב-Python
הקוד הבא מציג פונקציית UDF ב-Python שלא מבצעת פעולה, שאפשר להתחיל ממנה:
import json
def process(value):
# Load the JSON string into a dictionary.
data = json.loads(value)
# Transform the data in some way.
data['new_field'] = 'new_value'
# Serialize the data back to JSON.
return json.dumps(data)
פונקציות UDF של Python תומכות בחבילות של יחסי תלות שהן סטנדרטיות ב-Python וב-Apache Beam. הם לא יכולים להשתמש בחבילות של צד שלישי.
טיפול בשגיאות
בדרך כלל, כשמתרחשת שגיאה במהלך הביצוע של UDF, השגיאה נכתבת במיקום של הודעות שלא נמסרו. הפרטים משתנים בהתאם לתבנית. לדוגמה, התבנית Pub/Sub Subscription to BigQuery יוצרת טבלה _error_records וכותבת בה שגיאות. שגיאות של פונקציות UDF בזמן ריצה יכולות להתרחש בגלל שגיאות תחביר או חריגים שלא נתפסו. כדי לבדוק אם יש שגיאות תחביר, בודקים את הפונקציה המוגדרת על ידי המשתמש באופן מקומי.
אפשר להגדיר באופן פרוגרמטי חריגה לרכיב שלא אמור לעבור עיבוד. במקרה כזה, האלמנט נכתב למיקום של הודעות שלא נמסרו, אם התבנית תומכת בכך. דוגמה שממחישה את הגישה הזו מופיעה במאמר בנושא ניתוב אירועים.
תרחישים לדוגמה
בקטע הזה מתוארים כמה דפוסים נפוצים של פונקציות UDF, על סמך תרחישי שימוש בפועל.
העשרת אירועים
אפשר להשתמש בפונקציה מוגדרת על ידי המשתמש כדי להוסיף שדות חדשים לאירועים, וכך לקבל מידע נוסף על ההקשר.
דוגמה:
function process(inJson) {
const data = JSON.parse(inJson);
// Add new field to track data source
data.source = "source1";
return JSON.stringify(data);
}
טרנספורמציה של אירועים
משתמשים בפונקציה מוגדרת על ידי המשתמש כדי לשנות את הפורמט של כל האירוע בהתאם למה שנדרש ביעד.
בדוגמה הבאה מוצגת רשומה ביומן של Cloud Logging (LogEntry) שמשוחזרת למחרוזת היומן המקורית, אם היא זמינה. (בהתאם למקור היומן, המחרוזת המקורית של היומן מאוכלסת לפעמים בשדה textPayload). אפשר להשתמש בדפוס הזה כדי לשלוח את היומנים הגולמיים בפורמט המקורי שלהם, במקום לשלוח את כל LogEntry מ-Cloud Logging.
function process(inJson) {
const data = JSON.parse(inJson);
if (data.textPayload) {
return data.textPayload; // Return string value, and skip JSON.stringify
}
return JSON.stringify(obj);
}
צנזורה או הסרה של נתוני אירועים
משתמשים בפונקציה מוגדרת על ידי המשתמש כדי לצנזר או להסיר חלק מהאירוע.
בדוגמה הבאה, שם השדה sensitiveField מצונזר על ידי החלפת הערך שלו, והשדה בשם redundantField מוסר לחלוטין.
function process(inJson) {
const data = JSON.parse(inJson);
// Normalize existing field values
data.source = (data.source && data.source.toLowerCase()) || "unknown";
// Redact existing field values
if (data.sensitiveField) {
data.sensitiveField = "REDACTED";
}
// Remove existing fields
if (data.redundantField) {
delete(data.redundantField);
}
return JSON.stringify(data);
}
אירועים במסלול
אפשר להשתמש בפונקציה מוגדרת על ידי המשתמש כדי לנתב אירועים ליעדים נפרדים במאגר הנתונים במורד הזרם.
בדוגמה הבאה, שמבוססת על תבנית Pub/Sub to Splunk, כל אירוע מנותב לאינדקס הנכון ב-Splunk. הוא קורא לפונקציה מקומית שהוגדרה על ידי המשתמש כדי למפות אירועים לאינדקסים.
function process(inJson) {
const obj = JSON.parse(inJson);
// Set index programmatically for data segregation in Splunk
obj._metadata = {
index: splunkIndexLookup(obj)
}
return JSON.stringify(obj);
}
בדוגמה הבאה, אירועים שלא מזוהים מנותבים לתור של הודעות שלא ניתן להעביר, בהנחה שהתבנית תומכת בתור כזה. (לדוגמה, אפשר לעיין בתבנית Pub/Sub to JDBC). אפשר להשתמש בתבנית הזו כדי לסנן רשומות לא צפויות לפני הכתיבה ליעד.
function process(inJson) {
const data = JSON.parse(inJson);
// Route unrecognized events to the deadletter topic
if (!data.hasOwnProperty('severity')) {
throw new Error("Unrecognized event. eventId='" + data.Id + "'");
}
return JSON.stringify(data);
סינון אירועים
אפשר להשתמש בפונקציה מוגדרת על ידי המשתמש כדי לסנן מהפלט אירועים לא רצויים או לא מזוהים.
בדוגמה הבאה, אירועים שבהם data.severity שווה ל-"DEBUG" מושמטים.
function process(inJson) {
const data = JSON.parse(inJson);
// Drop events with certain field values
if (data.severity == "DEBUG") {
return null;
}
return JSON.stringify(data);
}
המאמרים הבאים
- תבניות שסופקו על ידי Google
- הרחבת תבנית Dataflow באמצעות פונקציות מוגדרות על ידי המשתמש (UDF) (פוסט בבלוג)
- דוגמאות לפונקציות UDF (GitHub)