סקירה כללית של מנתחי נתונים
במסמך הזה מפורטת סקירה כללית על האופן שבו Google Security Operations מנתח יומנים גולמיים לפורמט של UDM.
Google SecOps יכול לקבל נתוני יומן שמקורם במקורות ההטמעה הבאים:
- Google SecOps forwarder
- פיד Chronicle API
- Chronicle Ingestion API
- שותף טכנולוגי צד-שלישי
בדרך כלל, הלקוחות שולחים נתונים כיומני רישום גולמיים מקוריים. Google SecOps מזהה באופן ייחודי את המכשיר שיצר את היומנים באמצעות LogType. הפרמטר LogType מזהה את שני הדברים הבאים:
- הספק והמכשיר שיצרו את היומן, כמו Cisco Firewall, Linux DHCP Server או Bro DNS.
- איזה מנתח ממיר את יומן הרישום הגולמי ל-UDM מובנה. יש קשר של אחד לאחד בין מנתח לבין LogType. כל מנתח ממיר נתונים שמתקבלים על ידי LogType יחיד.
Google SecOps מספקת קבוצה של מנתחי ברירת מחדל שקוראים יומנים גולמיים מקוריים ויוצרים רשומות UDM מובנות באמצעות נתונים ביומן הגולמי המקורי. Google SecOps מתחזקת את מנתחי הנתונים האלה. לקוחות יכולים גם להגדיר הוראות מיפוי נתונים בהתאמה אישית על ידי יצירת מנתח (parser) ספציפי ללקוח. אם שולחים מטען ייעודי (payload) עם כמה שורות, המערכת מפרשת כל שורה כרשומה נפרדת ביומן.

הכלי לניתוח מכיל הוראות למיפוי נתונים. הוא מגדיר איך הנתונים ממופים מיומן הגולמי המקורי לשדה אחד או יותר במבנה הנתונים של UDM.
אם אין שגיאות בניתוח, Google SecOps יוצר רשומה מובנית של UDM באמצעות נתונים מיומן הגולמי. תהליך ההמרה של יומן גולמי לרשומה ב-UDM נקרא נירמול.
מנתח ברירת מחדל עשוי למפות קבוצת משנה של ערכי ליבה מתוך יומן הגולמי. בדרך כלל, השדות האלה הם החשובים ביותר כדי לספק תובנות אבטחה ב-Google SecOps. ערכים לא ממופים נשארים ביומן הגולמי, אבל לא נשמרים ברשומת UDM.
לקוחות יכולים גם להשתמש ב-Ingestion API כדי לשלוח נתונים בפורמט UDM מובנה.
התאמה אישית של האופן שבו נתונים מוזנים מנותחים
Google SecOps מספקת את היכולות הבאות שמאפשרות ללקוחות להתאים אישית את ניתוח הנתונים בנתוני יומן מקוריים נכנסים. פרטים נוספים על ניהול מנתחי הנתונים האלה זמינים במאמר ניהול מנתחי נתונים מוגדרים מראש ומנתחי נתונים בהתאמה אישית.
- מנתחי נתונים ספציפיים ללקוח: לקוחות יוצרים הגדרת תצורה של מנתח נתונים מותאם אישית לסוג יומן ספציפי שעונה על הדרישות הספציפיות שלהם. מנתח (parser) ספציפי ללקוח מחליף את מנתח ברירת המחדל עבור LogType ספציפי. מידע נוסף מופיע במאמר בנושא ניהול מנתחים מוגדרים מראש ומנתחים בהתאמה אישית.
- תוספים של מנתח (Parser): לקוחות יכולים להוסיף הוראות מיפוי בהתאמה אישית בנוסף להגדרת ברירת המחדל של מנתח הנתונים. כל לקוח יכול ליצור קבוצה ייחודית משלו של הוראות מיפוי בהתאמה אישית. ההוראות האלה למיפוי מגדירות איך לחלץ ולשנות שדות נוספים מיומנים גולמיים מקוריים לשדות UDM. תוסף ניתוח לא מחליף את מנתח ברירת המחדל או מנתח ספציפי ללקוח.
דוגמה לשימוש ביומן של שרת proxy לאינטרנט Squid
בקטע הזה מופיעה דוגמה ליומן של שרת proxy לאינטרנט של Squid, ומוסבר איך הערכים ממופים לרשומה ב-UDM. תיאור של כל השדות בסכימת UDM מופיע במאמר רשימת השדות של מודל הנתונים המאוחד.
יומן הרישום של פרוקסי האינטרנט Squid לדוגמה מכיל ערכים מופרדים ברווחים. כל רשומה מייצגת אירוע אחד ומאחסנת את הנתונים הבאים: חותמת זמן, משך, לקוח, קוד תוצאה/סטטוס תוצאה, בייטים שהועברו, שיטת בקשה, כתובת URL, משתמש, קוד היררכי וסוג תוכן. בדוגמה הזו, השדות הבאים מחולצים וממופים לרשומה ב-UDM: time, client, result status, bytes, request method ו-URL.
1588059648.129 23 192.168.23.4 TCP_HIT/200 904 GET www.google.com/images/sunlogo.png - HIER_DIRECT/203.0.113.52 image/jpeg
כשמשווים בין המבנים האלה, אפשר לראות שרק חלק מנתוני היומן המקוריים נכללים ברשומת ה-UDM. יש שדות חובה ויש שדות אופציונליים. בנוסף, רק קבוצת משנה של הסעיפים ברשומה של UDM מכילה נתונים. אם מנתח התקשורת לא ממפה נתונים מהיומן המקורי לרשומה של UDM, הקטע הזה של רשומת UDM לא יופיע ב-Google SecOps.
בקטע metadata מאוחסנת חותמת הזמן של האירוע. שימו לב שהערך הומר מפורמט Epoch לפורמט RFC 3339. ההמרה הזו היא אופציונלית. אפשר לאחסן את חותמת הזמן בפורמט Epoch, עם עיבוד מקדים כדי להפריד את החלקים של השניות והמילי-שניות לשדות נפרדים.
בשדה metadata.event_type מאוחסן הערך NETWORK_HTTP, שהוא ערך ממוספר שמזהה את סוג האירוע. הערך של metadata.event_type קובע אילו שדות נוספים של UDM הם חובה ואילו הם אופציונליים. הערכים של product_name ו-vendor_name מכילים תיאורים ידידותיים למשתמש של המכשיר שבו נרשם היומן המקורי.
הערך metadata.event_type ברשומה של אירוע UDM לא זהה ל-log_type שמוגדר כשמבצעים המרה של נתונים באמצעות Ingestion API. שני המאפיינים האלה משמשים לאחסון מידע שונה.
הקטע network מכיל ערכים מאירוע היומן המקורי. שימו לב שבמקרה הזה, ערך הסטטוס מהיומן המקורי עבר ניתוח מהשדה 'result
code/status' לפני שנכתב ברשומת UDM. רק result_code
נכלל ברשומת ה-UDM.
בקטע principal נשמר מידע הלקוח מיומן המקור. בקטע target מאוחסנות גם כתובת ה-URL המוגדרת במלואה וגם כתובת ה-IP.
בקטע security_result מאוחסן אחד מערכי ה-enum שמייצג את הפעולה שתועדה ביומן המקורי.
זו רשומת UDM בפורמט JSON. שימו לב שרק קטעים שמכילים נתונים נכללים. החלקים src, observer, intermediary, about ו-extensions לא נכללים.
{
"metadata": {
"event_timestamp": "2020-04-28T07:40:48.129Z",
"event_type": "NETWORK_HTTP",
"product_name": "Squid Proxy",
"vendor_name": "Squid"
},
"principal": {
"ip": "192.168.23.4"
},
"target": {
"url": "www.google.com/images/sunlogo.png",
"ip": "203.0.113.52"
},
"network": {
"http": {
"method": "GET",
"response_code": 200,
"received_bytes": 904
}
},
"security_result": {
"action": "UNKNOWN_ACTION"
}
}
שלבים בהוראות של כלי הניתוח
ההוראות למיפוי נתונים בתוך מנתח פועלות לפי דפוס משותף, כפי שמוצג בהמשך:
- ניתוח וחילוץ נתונים מהיומן המקורי.
- לבצע שינויים בנתונים שחולצו. זה כולל שימוש בלוגיקה מותנית כדי לנתח ערכים באופן סלקטיבי, להמיר סוגי נתונים, להחליף מחרוזות משנה בערך, להמיר לאותיות רישיות או לאותיות קטנות וכו'.
- הקצאת ערכים לשדות UDM.
- פלט של רשומת UDM ממופה למפתח @output.
ניתוח וחילוץ נתונים מהיומן המקורי
הגדרת משפט הסינון
ההצהרה filter היא ההצהרה הראשונה בקבוצת הוראות הניתוח.
כל הוראות הניתוח הנוספות כלולות בהצהרה filter.
filter {
}
הפעלת משתנים שיאחסנו ערכים שחולצו
בתוך הצהרת filter, מאתחלים משתני ביניים שהכלי לניתוח ישתמש בהם כדי לאחסן ערכים שחולצו מהיומן.
המשתנים האלה משמשים בכל פעם שמנתחים יומן נפרד. הערך בכל משתנה ביניים יוגדר לשדה אחד או יותר של UDM בהמשך הוראות הניתוח.
mutate {
replace => {
"event.idm.read_only_udm.metadata.product_name" => "Webproxy"
"event.idm.read_only_udm.metadata.vendor_name" => "Squid"
"not_valid_log" => "false"
"when" => ""
"srcip" => ""
"action" => ""
"username" => ""
"url" => ""
"tgtip" => ""
"method" => ""
}
}
חילוץ ערכים ספציפיים מהיומן
Google SecOps מספקת קבוצה של מסננים, שמבוססים על Logstash, כדי לחלץ שדות מקובצי יומן מקוריים. בהתאם לפורמט של היומן, משתמשים במסנן חילוץ אחד או בכמה מסנני חילוץ כדי לחלץ את כל הנתונים מהיומן. אם המחרוזת היא:
- JSON מקורי, תחביר המנתח דומה למסנן JSON שתומך ביומנים בפורמט JSON. אין תמיכה ב-JSON מוטמע.
- פורמט XML, תחביר המנתח דומה למסנן XML שתומך ביומנים בפורמט XML.
- צמדי מפתח/ערך, תחביר המנתח דומה למסנן Kv שתומך בהודעות בפורמט של צמדי מפתח/ערך.
- פורמט CSV, תחביר המנתח דומה למסנן CSV שתומך בהודעות בפורמט CSV.
- בכל הפורמטים האחרים, תחביר המנתח דומה למסנן GROK עם דפוסי GROK מובנים . ההוראות האלה מבוססות על חילוץ בסגנון Regex.
Google SecOps מספקת קבוצת משנה של היכולות שזמינות בכל מסנן. Google SecOps מספקת גם תחביר למיפוי נתונים בהתאמה אישית שלא זמין במסננים. בחומר העזר בנושא תחביר של כלי הניתוח מפורטות התכונות הנתמכות והפונקציות המותאמות אישית.
בהמשך לדוגמה של יומן שרת ה-proxy של Squid, הוראת חילוץ הנתונים הבאה כוללת שילוב של תחביר Logstash Grok וביטויים רגולריים.
ההצהרה הבאה לגבי חילוץ מאחסנת ערכים במשתני הביניים הבאים:
whensrcipactionreturnCodesizemethodusernameurltgtip
ההצהרה לדוגמה הזו משתמשת גם במילת המפתח overwrite כדי לאחסן את הערכים שחולצו בכל משתנה. אם תהליך החילוץ מחזיר שגיאה, ההצהרה on_error
מגדירה את not_valid_log ל-true.
grok {
match => {
"message" => [
"%{NUMBER:when}\\s+\\d+\\s%{SYSLOGHOST:srcip} %{WORD:action}\\/%{NUMBER:returnCode} %{NUMBER:size} %{WORD:method} (?P<url>\\S+) (?P<username>.*?) %{WORD}\\/(?P<tgtip>\\S+).*"
]
}
overwrite => ["when","srcip","action","returnCode","size","method","url","username","tgtip"]
on_error => "not_valid_log"
}
שינוי והמרה של הערכים שחולצו
Google SecOps משתמש ביכולות של תוסף המסנן mutate של Logstash כדי לאפשר שינוי של ערכים שחולצו מהיומן המקורי. Google SecOps מספק קבוצת משנה של היכולות שזמינות בתוסף. בתחביר של כלי הניתוח מפורטות תכונות נתמכות ופונקציות בהתאמה אישית, כמו:
- המרת ערכים לסוג נתונים אחר
- החלפת ערכים במחרוזת
- למזג שני מערכים או לצרף מחרוזת למערך. ערכי מחרוזות מומרים למערך לפני המיזוג.
- להמיר לאותיות קטנות או לאותיות רישיות
בקטע הזה מופיעות דוגמאות לשינוי נתונים שמבוססות על יומן הרישום של פרוקסי האינטרנט Squid שהוצג קודם.
שינוי חותמת הזמן של האירוע
לכל האירועים שמאוחסנים כרשומה ב-UDM חייבת להיות חותמת זמן של האירוע. בדוגמה הזו
נבדק אם ערך הנתונים חולץ מהיומן. לאחר מכן, הוא משתמש בפונקציית התאריך של Grok כדי להתאים את הערך לפורמט הזמן UNIX.
if [when] != "" {
date {
match => [
"when", "UNIX"
]
}
}
שינוי הערך של username
ההצהרה הבאה ממירה את הערך במשתנה username לאותיות קטנות.
mutate {
lowercase => [ "username"]
}
שינוי הערך של action
בדוגמה הבאה, הערך בaction משתנה הביניים
מוערך והערך משתנה ל-ALLOW, BLOCK או UNKNOWN_ACTION, שהם ערכים תקינים בשדה security_result.action UDM. השדה security_result.action UDM הוא סוג ממוספר שמאחסן רק ערכים ספציפיים.
if ([action] == "TCP_DENIED" or [action] == "TCP_MISS" or [action] == "Denied" or [action] == "denied" or [action] == "Dropped") {
mutate {
replace => {
"action" => "BLOCK"
}
}
} else if ([action] == "TCP_TUNNEL" or [action] == "Accessed" or [action] == "Built" or [action] == "Retrieved" or [action] == "Stored") {
mutate {
replace => {
"action" => "ALLOW"
}
}
} else {
mutate {
replace => {
"action" => "UNKNOWN_ACTION" }
}
}
שינוי כתובת ה-IP של היעד
בדוגמה הבאה נבדק ערך במשתנה הביניים tgtip.
אם נמצא ערך, הוא מושווה לתבנית של כתובת IP באמצעות תבנית Grok מוגדרת מראש. אם יש שגיאה בהתאמת הערך לתבנית של כתובת IP, הפונקציה on_error מגדירה את המאפיין not_valid_tgtip לערך true. אם ההתאמה מצליחה, המאפיין not_valid_tgtip לא מוגדר.
if [tgtip] not in [ "","-" ] {
grok {
match => {
"tgtip" => [ "%{IP:tgtip}" ]
}
overwrite => ["tgtip"]
on_error => "not_valid_tgtip"
}
שינוי סוג הנתונים של returnCode והגודל
בדוגמה הבאה, הערך במשתנה size מומר ל-uinteger והערך במשתנה returnCode מומר ל-uinteger. הפעולה הזו נדרשת כי המשתנה size יישמר בשדה network.received_bytes UDM שבו מאוחסן סוג הנתונים int64. המשתנה returnCode יישמר בשדה network.http.response_code UDM שבו מאוחסן סוג הנתונים int32.
mutate {
convert => {
"returnCode" => "integer"
"size" => "uinteger"
}
}
הקצאת ערכים לשדות של UDM באירוע
אחרי שערכים מחולצים ועוברים עיבוד מקדים, הם מוקצים לשדות ברשומת אירוע של UDM. אפשר להקצות ערכים שחולצו וערכים סטטיים לשדה UDM.
אם מאכלסים את השדה event.disambiguation_key, צריך לוודא שהערך שלו ייחודי לכל אירוע שנוצר עבור היומן הנתון. אם לשני אירועים שונים יש את אותו disambiguation_key, המערכת תתנהג בצורה לא צפויה.
דוגמאות המנתח בקטע הזה מבוססות על דוגמת יומן הרישום של שרת ה-proxy הקודם של Squid.
שמירת חותמת הזמן של האירוע
לכל רשומה של אירוע UDM צריך להיות ערך מוגדר בשדה metadata.event_timestampUDM. בדוגמה הבאה, חותמת הזמן של האירוע שחולצה מהיומן נשמרת במשתנה המובנה @timestamp. Google Security Operations שומרת את הנתונים האלה בשדה metadata.event_timestamp UDM כברירת מחדל.
mutate {
rename => {
"when" => "timestamp"
}
}
הגדרת סוג האירוע
לכל רשומה של אירוע UDM צריך להיות ערך שמוגדר לשדה metadata.event_type UDM. השדה הזה הוא מסוג מנומר. הערך בשדה הזה קובע אילו שדות נוספים של UDM צריך למלא כדי שהרשומה של UDM תישמר.
תהליך הניתוח והנורמליזציה ייכשל אם אחד משדות החובה לא יכיל נתונים תקינים.
replace => {
"event.idm.read_only_udm.metadata.event_type" => "NETWORK_HTTP"
}
}
שומרים את הערכים username ו-method באמצעות ההצהרה replace
הערכים בשדות הביניים username ו-method הם מחרוזות. בדוגמה הבאה נבדק אם קיים ערך תקין. אם כן, הערך username מאוחסן בשדה principal.user.userid UDM והערך method מאוחסן בשדה network.http.method UDM.
if [username] not in [ "-" ,"" ] {
mutate {
replace => {
"event.idm.read_only_udm.principal.user.userid" => "%{username}"
}
}
}
if [method] != "" {
mutate {
replace => {
"event.idm.read_only_udm.network.http.method" => "%{method}"
}
}
}
שמירת action בשדה security_result.action UDM
בקטע הקודם, הערך במשתנה הביניים action עבר הערכה והמרה לאחד מהערכים הסטנדרטיים בשדה security_result.action UDM.
בשדות של UDM security_result ו-action מאוחסן מערך של פריטים, ולכן צריך להשתמש בגישה שונה כדי לשמור את הערך הזה.
קודם כל, שומרים את הערך שעבר טרנספורמציה בשדה ביניים security_result.action. השדה security_result הוא שדה הורה של השדה action.
mutate {
merge => {
"security_result.action" => "action"
}
}
לאחר מכן, שומרים את שדה הביניים security_result.action בשדה security_result UDM. השדה security_result UDM מאחסן מערך של פריטים, ולכן הערך מצורף לשדה הזה.
# save the security_result field
mutate {
merge => {
"event.idm.read_only_udm.security_result" => "security_result"
}
}
אחסון כתובת ה-IP של היעד וכתובת ה-IP של המקור באמצעות ההצהרה merge
מאחסנים את הערכים הבאים ברשומת האירוע של UDM:
- הערך במשתנה הביניים
srcipלשדה UDMprincipal.ip. - הערך במשתנה הביניים
tgtipלשדה UDMtarget.ip.
גם השדה principal.ip וגם השדה target.ip של UDM מאחסנים מערך של פריטים, ולכן הערכים מצורפים לכל שדה.
בדוגמאות הבאות מוצגות גישות שונות לשמירת הערכים האלה.
במהלך שלב הטרנספורמציה, המשתנה tgtipהמשני הותאם לכתובת IP באמצעות דפוס Grok מוגדר מראש. ההצהרה הבאה לדוגמה בודקת אם המאפיין not_valid_tgtip הוא true, כלומר הערך tgtip לא תאם לתבנית של כתובת IP. אם הערך הוא false, הערך של tgtip נשמר בשדה target.ip של UDM.
if ![not_valid_tgtip] {
mutate {
merge => {
"event.idm.read_only_udm.target.ip" => "tgtip"
}
}
}
המשתנה הזמני srcip לא עבר טרנספורמציה. ההצהרה הבאה
בודקת אם ערך חולץ מהיומן המקורי, ואם כן, שומרת
את הערך בשדה principal.ip UDM.
if [srcip] != "" {
mutate {
merge => {
"event.idm.read_only_udm.principal.ip" => "srcip"
}
}
}
שמירה של url, returnCode ו-size באמצעות ההצהרה rename
בדוגמה הבאה, הערכים מאוחסנים באמצעות המשפט rename:
- המשתנה
urlנשמר בשדהtarget.urlשל UDM. - המשתנה הביניים
returnCodeנשמר בשדהnetwork.http.response_codeUDM. - המשתנה הביניים
sizeנשמר בשדהnetwork.received_bytesUDM.
mutate {
rename => {
"url" => "event.idm.read_only_udm.target.url"
"returnCode" => "event.idm.read_only_udm.network.http.response_code"
"size" => "event.idm.read_only_udm.network.received_bytes"
}
}
קישור רשומת ה-UDM לפלט
ההצהרה האחרונה בהוראות מיפוי הנתונים מוציאה את הנתונים המעובדים לרשומה של אירוע UDM.
mutate {
merge => {
"@output" => "event"
}
}
הקוד המלא של כלי הניתוח
זו דוגמה מלאה לקוד של כלי הניתוח. סדר ההוראות לא זהה לסדר בקטעים הקודמים של המסמך הזה, אבל התוצאה זהה.
filter {
# initialize variables
mutate {
replace => {
"event.idm.read_only_udm.metadata.product_name" => "Webproxy"
"event.idm.read_only_udm.metadata.vendor_name" => "Squid"
"not_valid_log" => "false"
"when" => ""
"srcip" => ""
"action" => ""
"username" => ""
"url" => ""
"tgtip" => ""
"method" => ""
}
}
# Extract fields from the raw log.
grok {
match => {
"message" => [
"%{NUMBER:when}\\s+\\d+\\s%{SYSLOGHOST:srcip} %{WORD:action}\\/%{NUMBER:returnCode} %{NUMBER:size} %{WORD:method} (?P<url>\\S+) (?P<username>.*?) %{WORD}\\/(?P<tgtip>\\S+).*"
]
}
overwrite => ["when","srcip","action","returnCode","size","method","url","username","tgtip"]
on_error => "not_valid_log"
}
# Parse event timestamp
if [when] != "" {
date {
match => [
"when", "UNIX"
]
}
}
# Save the value in "when" to the event timestamp
mutate {
rename => {
"when" => "timestamp"
}
}
# Transform and save username
if [username] not in [ "-" ,"" ] {
mutate {
lowercase => [ "username"]
}
}
mutate {
replace => {
"event.idm.read_only_udm.principal.user.userid" => "%{username}"
}
}
if ([action] == "TCP_DENIED" or [action] == "TCP_MISS" or [action] == "Denied" or [action] == "denied" or [action] == "Dropped") {
mutate {
replace => {
"action" => "BLOCK"
}
}
} else if ([action] == "TCP_TUNNEL" or [action] == "Accessed" or [action] == "Built" or [action] == "Retrieved" or [action] == "Stored") {
mutate {
replace => {
"action" => "ALLOW"
}
}
} else {
mutate {
replace => {
"action" => "UNKNOWN_ACTION" }
}
}
# save transformed value to an intermediary field
mutate {
merge => {
"security_result.action" => "action"
}
}
# save the security_result field
mutate {
merge => {
"event.idm.read_only_udm.security_result" => "security_result"
}
}
# check for presence of target ip. Extract and store target IP address.
if [tgtip] not in [ "","-" ] {
grok {
match => {
"tgtip" => [ "%{IP:tgtip}" ]
}
overwrite => ["tgtip"]
on_error => "not_valid_tgtip"
}
# store target IP address
if ![not_valid_tgtip] {
mutate {
merge => {
"event.idm.read_only_udm.target.ip" => "tgtip"
}
}
}
}
# convert the returnCode and size to integer data type
mutate {
convert => {
"returnCode" => "integer"
"size" => "uinteger"
}
}
# save url, returnCode, and size
mutate {
rename => {
"url" => "event.idm.read_only_udm.target.url"
"returnCode" => "event.idm.read_only_udm.network.http.response_code"
"size" => "event.idm.read_only_udm.network.received_bytes"
}
# set the event type to NETWORK_HTTP
replace => {
"event.idm.read_only_udm.metadata.event_type" => "NETWORK_HTTP"
}
}
# validate and set source IP address
if [srcip] != "" {
mutate {
merge => {
"event.idm.read_only_udm.principal.ip" => "srcip"
}
}
}
# save event to @output
mutate {
merge => {
"@output" => "event"
}
}
} #end of filter
הבעיה עדיין לא נפתרה? קבלת תשובות מחברי הקהילה וממומחי Google SecOps.