記錄剖析總覽
本文概要說明 Google Security Operations 如何將原始記錄剖析為 Unified Data Model (UDM) 格式。
Google SecOps 可接收來自下列擷取來源的記錄資料:
- Google SecOps 轉送器
- Chronicle API 動態消息
- Chronicle Ingestion API
- 第三方技術合作夥伴
一般來說,客戶會以原始記錄的形式傳送資料。Google SecOps 會使用 LogType 獨一無二地識別產生記錄的裝置。LogType 可識別下列兩項:
- 產生記錄的供應商和裝置,例如 Cisco 防火牆、Linux DHCP 伺服器或 Bro DNS。
- 剖析器會將原始記錄轉換為具結構性的 UDM。剖析器與 LogType 之間是一對一關係。 每個剖析器都會轉換單一 LogType 接收的資料。
Google SecOps 提供一組預設剖析器,可讀取原始記錄,並使用原始記錄中的資料產生結構化 UDM 記錄。Google SecOps 會維護這些剖析器。客戶也可以建立專屬剖析器,定義自訂資料對應指示。如果提交多行酬載,系統會將每一行解讀為個別的記錄項目。

剖析器包含資料對應指示。這個檔案會定義如何將原始記錄中的資料對應至 UDM 資料結構中的一或多個欄位。
如果沒有剖析錯誤,Google SecOps 會使用原始記錄中的資料建立 UDM 結構化記錄。將原始記錄轉換為 UDM 記錄的過程稱為「正規化」。
預設剖析器可能會對應原始記錄中的部分核心值。通常,這些核心欄位對於在 Google SecOps 中提供安全性深入分析資訊最為重要。未對應的值會保留在原始記錄中,但不會儲存在 UDM 記錄中。
客戶也可以使用 Ingestion API,以結構化 UDM 格式傳送資料。
自訂系統剖析擷取資料的方式
Google SecOps 提供下列功能,讓客戶能夠自訂傳入原始記錄資料的資料剖析方式。
- 客戶專屬剖析器:客戶可為特定記錄類型建立自訂剖析器設定,滿足特定需求。客戶專屬剖析器會取代特定 LogType 的預設剖析器。詳情請參閱「管理預先建構和自訂剖析器」。
- 剖析器擴充功能:除了預設剖析器設定,客戶還可新增自訂對應指示。每位顧客都能建立專屬的自訂對應指示。這些對應指示定義如何從原始記錄中擷取其他欄位,並轉換為 UDM 欄位。剖析器擴充功能不會取代預設或客戶專屬的剖析器。
使用 Squid 網路 Proxy 記錄的範例
本節提供 Squid 網路 Proxy 記錄範例,並說明如何將值對應至 UDM 記錄。如要瞭解 UDM 架構中的所有欄位,請參閱「整合式資料模型欄位清單」。
Squid 網頁 Proxy 記錄範例包含以空格分隔的值。每筆記錄代表一個事件,並儲存下列資料:時間戳記、時間長度、用戶端、結果代碼/結果狀態、傳輸的位元組、要求方法、網址、使用者、階層代碼和內容類型。在本範例中,系統會擷取下列欄位,並對應至 UDM 記錄:時間、用戶端、結果狀態、位元組、要求方法和網址。
1588059648.129 23 192.168.23.4 TCP_HIT/200 904 GET www.google.com/images/sunlogo.png - HIER_DIRECT/203.0.113.52 image/jpeg
比較這些結構時,請注意 UDM 記錄中只包含原始記錄資料的子集。部分欄位為必填,其餘為選填。此外,只有 UDM 記錄中的部分區段包含資料。如果剖析器未將原始記錄中的資料對應至 UDM 記錄,您就不會在 Google SecOps 中看到 UDM 記錄的該部分。
metadata 區段會儲存事件時間戳記。請注意,該值已從 EPOCH 轉換為 RFC 3339 格式。這項轉換為選用項目。時間戳記可以儲存為 EPOCH 格式,並經過前處理,將秒數和毫秒部分分別儲存至不同欄位。
metadata.event_type 欄位會儲存 NETWORK_HTTP 值,這是可識別事件類型的列舉值。metadata.event_type 的值會決定哪些 UDM 欄位為必填,哪些為選填。product_name 和 vendor_name 值包含記錄原始記錄的裝置說明,方便使用者瞭解。
UDM 事件記錄中的 metadata.event_type 與使用 Ingestion API 擷取資料時定義的 log_type 不同。這兩個屬性會儲存不同資訊。
「network」部分包含原始記錄事件的值。請注意,在本範例中,原始記錄中的狀態值是從「result code/status」欄位剖析而來,然後才寫入 UDM 記錄。UDM 記錄中只包含 result_code。
「principal」部分會儲存原始記錄中的用戶端資訊。「target」部分會儲存完整網址和 IP 位址。
security_result 區段會儲存其中一個列舉值,代表原始記錄中記錄的動作。
這是以 JSON 格式設定的 UDM 記錄。請注意,只有包含資料的區段才會顯示。不包括 src、observer、intermediary、about 和 extensions 區段。
{
"metadata": {
"event_timestamp": "2020-04-28T07:40:48.129Z",
"event_type": "NETWORK_HTTP",
"product_name": "Squid Proxy",
"vendor_name": "Squid"
},
"principal": {
"ip": "192.168.23.4"
},
"target": {
"url": "www.google.com/images/sunlogo.png",
"ip": "203.0.113.52"
},
"network": {
"http": {
"method": "GET",
"response_code": 200,
"received_bytes": 904
}
},
"security_result": {
"action": "UNKNOWN_ACTION"
}
}
剖析器指令中的步驟
剖析器中的資料對應指示遵循常見模式,如下所示:
- 剖析並擷取原始記錄中的資料。
- 操控擷取的資料。包括使用條件邏輯選擇性剖析值、轉換資料型別、取代值中的子字串、轉換為大寫或小寫等。
- 為 UDM 欄位指派值。
- 將對應的 UDM 記錄輸出至 @output 鍵。
剖析及擷取原始記錄中的資料
設定篩選器陳述式
filter 陳述式是這組剖析指令中的第一個陳述式。
所有額外的剖析指令都包含在 filter 陳述式中。
filter {
}
初始化將儲存擷取值的變數
在 filter 陳述式中,初始化剖析器將用來儲存從記錄檔擷取值的中間變數。
每次剖析個別記錄時,都會使用這些變數。稍後在剖析指令中,每個中間變數的值都會設為一或多個 UDM 欄位。
mutate {
replace => {
"event.idm.read_only_udm.metadata.product_name" => "Webproxy"
"event.idm.read_only_udm.metadata.vendor_name" => "Squid"
"not_valid_log" => "false"
"when" => ""
"srcip" => ""
"action" => ""
"username" => ""
"url" => ""
"tgtip" => ""
"method" => ""
}
}
從記錄中擷取個別值
Google SecOps 提供一組以 Logstash 為基礎的篩選器,可從原始記錄檔中擷取欄位。視記錄的格式而定,您可以使用一或多個擷取篩選器,從記錄中擷取所有資料。如果字串為:
- 原生 JSON,剖析器語法與支援 JSON 格式記錄的 JSON 篩選器類似。不支援巢狀 JSON。
- XML 格式,剖析器語法與支援 XML 格式記錄的 XML 篩選器類似。
- 鍵/值組合,剖析器語法與支援鍵/值格式訊息的 Kv 篩選器類似。
- CSV 格式,剖析器語法與 CSV 篩選器類似,支援 CSV 格式的訊息。
- 其他所有格式,剖析器語法與 GROK 內建模式的 GROK 篩選器類似。 這項操作會使用 Regex 樣式的擷取指令。
Google SecOps 提供各篩選器中的部分功能。 Google SecOps 也提供篩選器中沒有的自訂資料對應語法。如要瞭解支援的功能和自訂函式,請參閱剖析器語法參考資料。
延續 Squid 網頁 Proxy 記錄範例,下列資料擷取指令包含 Logstash Grok 語法和規則運算式的組合。
下列擷取陳述式會將值儲存在下列中介變數中:
whensrcipactionreturnCodesizemethodusernameurltgtip
這個範例陳述式也會使用 overwrite 關鍵字,將每個變數中擷取的值儲存起來。如果擷取程序傳回錯誤,on_error 陳述式會將 not_valid_log 設為 True。
grok {
match => {
"message" => [
"%{NUMBER:when}\\s+\\d+\\s%{SYSLOGHOST:srcip} %{WORD:action}\\/%{NUMBER:returnCode} %{NUMBER:size} %{WORD:method} (?P<url>\\S+) (?P<username>.*?) %{WORD}\\/(?P<tgtip>\\S+).*"
]
}
overwrite => ["when","srcip","action","returnCode","size","method","url","username","tgtip"]
on_error => "not_valid_log"
}
操控及轉換擷取的值
Google SecOps 運用 Logstash mutate 篩選器外掛程式功能,可操控從原始記錄檔擷取的值。Google SecOps 提供外掛程式的部分功能。 如要瞭解支援的功能和自訂函式 (例如):
- 將值轉換為其他資料類型
- 取代字串中的值
- 合併兩個陣列,或在陣列中附加字串。字串值會在合併前轉換為陣列。
- 轉換為小寫或大寫
本節提供資料轉換範例,這些範例是以先前顯示的 Squid 網頁 Proxy 記錄為基礎。
轉換事件時間戳記
所有儲存為 UDM 記錄的事件都必須有事件時間戳記。這個範例會檢查是否從記錄中擷取資料值。然後使用 Grok 日期函式,將值比對至 UNIX 時間格式。
if [when] != "" {
date {
match => [
"when", "UNIX"
]
}
}
轉換 username 值
以下範例陳述式會將 username 變數中的值轉換為小寫。
mutate {
lowercase => [ "username"]
}
轉換 action 值
以下範例會評估 action 中間變數中的值,並將值變更為 ALLOW、BLOCK 或 UNKNOWN_ACTION,這些都是 security_result.action UDM 欄位的有效值。security_result.action UDM 欄位是列舉型別,只會儲存特定值。
if ([action] == "TCP_DENIED" or [action] == "TCP_MISS" or [action] == "Denied" or [action] == "denied" or [action] == "Dropped") {
mutate {
replace => {
"action" => "BLOCK"
}
}
} else if ([action] == "TCP_TUNNEL" or [action] == "Accessed" or [action] == "Built" or [action] == "Retrieved" or [action] == "Stored") {
mutate {
replace => {
"action" => "ALLOW"
}
}
} else {
mutate {
replace => {
"action" => "UNKNOWN_ACTION" }
}
}
轉換目標 IP 位址
以下範例會檢查 tgtip 中間變數中的值。如果找到,系統會使用預先定義的 Grok 模式,將該值與 IP 位址模式比對。如果值與 IP 位址模式相符時發生錯誤,on_error 函式會將 not_valid_tgtip 屬性設為 True。如果比對成功,系統就不會設定 not_valid_tgtip 屬性。
if [tgtip] not in [ "","-" ] {
grok {
match => {
"tgtip" => [ "%{IP:tgtip}" ]
}
overwrite => ["tgtip"]
on_error => "not_valid_tgtip"
}
變更 returnCode 和 size 的資料類型
以下範例會將 size 變數中的值轉換為 uinteger,並將 returnCode 變數中的值轉換為 integer。這是必要步驟,因為 size 變數會儲存至 network.received_bytes UDM 欄位,該欄位會儲存 int64 資料型別。returnCode 變數會儲存至 network.http.response_code UDM 欄位,該欄位會儲存 int32 資料類型。
mutate {
convert => {
"returnCode" => "integer"
"size" => "uinteger"
}
}
在事件中為 UDM 欄位指派值
擷取並預先處理值後,請將這些值指派給 UDM 事件記錄中的欄位。您可以將擷取的值和靜態值指派給 UDM 欄位。
如果填入 event.disambiguation_key,請確保這個欄位是為指定記錄產生的每個事件專屬。如果兩個不同事件的 disambiguation_key 相同,系統就會發生異常行為。
本節的剖析器範例是以先前的 Squid 網頁 Proxy 記錄檔範例為基礎。
儲存事件時間戳記
每個 UDM 事件記錄都必須為 metadata.event_timestamp UDM 欄位設定值。以下範例會將從記錄檔擷取的事件時間戳記儲存至 @timestamp 內建變數。Google Security Operations 預設會將這項資訊儲存至 UDM 欄位。metadata.event_timestamp
mutate {
rename => {
"when" => "timestamp"
}
}
設定活動類型
每個 UDM 事件記錄都必須為 metadata.event_type UDM 欄位設定值。這個欄位是列舉型別。這個欄位的值會決定必須填入哪些額外 UDM 欄位,才能儲存 UDM 記錄。
如果任何必填欄位未包含有效資料,剖析和正規化程序就會失敗。
replace => {
"event.idm.read_only_udm.metadata.event_type" => "NETWORK_HTTP"
}
}
使用 replace 陳述式儲存 username 和 method 值
username 和 method 中間欄位的值是字串。以下範例會檢查是否存在有效值,如果存在,則將 username 值儲存至 principal.user.userid UDM 欄位,並將 method 值儲存至 network.http.method UDM 欄位。
if [username] not in [ "-" ,"" ] {
mutate {
replace => {
"event.idm.read_only_udm.principal.user.userid" => "%{username}"
}
}
}
if [method] != "" {
mutate {
replace => {
"event.idm.read_only_udm.network.http.method" => "%{method}"
}
}
}
將 action 儲存至 security_result.action UDM 欄位
在上一節中,系統評估了 action 中間變數的值,並將其轉換為 security_result.action UDM 欄位的其中一個標準值。
security_result 和 action UDM 欄位都會儲存項目陣列,因此儲存這個值時,您必須採取略有不同的做法。
首先,請將轉換後的值儲存至中介 security_result.action 欄位。security_result 欄位是 action 欄位的父項。
mutate {
merge => {
"security_result.action" => "action"
}
}
接著,將中介 security_result.action 中介欄位儲存至 security_result UDM 欄位。security_result UDM 欄位會儲存項目陣列,因此值會附加至這個欄位。
# save the security_result field
mutate {
merge => {
"event.idm.read_only_udm.security_result" => "security_result"
}
}
使用 merge 陳述式儲存目標 IP 位址和來源 IP 位址
將下列值儲存至 UDM 事件記錄:
srcip中介變數中的值會傳送至principal.ipUDM 欄位。tgtip中介變數中的值會傳送至target.ipUDM 欄位。
principal.ip 和 target.ip UDM 欄位都會儲存項目陣列,因此值會附加至每個欄位。
以下範例說明儲存這些值的不同方法。
在轉換步驟中,tgtip 中間變數會使用預先定義的 Grok 模式比對 IP 位址。下列範例陳述式會檢查 not_valid_tgtip 屬性是否為 true,指出 tgtip 值無法與 IP 位址模式相符。如果為 false,則會將 tgtip 值儲存至 target.ip UDM 欄位。
if ![not_valid_tgtip] {
mutate {
merge => {
"event.idm.read_only_udm.target.ip" => "tgtip"
}
}
}
srcip 中間變數未轉換。下列陳述式會檢查是否從原始記錄中擷取值,如果有的話,則將該值儲存至 principal.ip UDM 欄位。
if [srcip] != "" {
mutate {
merge => {
"event.idm.read_only_udm.principal.ip" => "srcip"
}
}
}
使用 rename 陳述式儲存 url、returnCode 和 size
下列範例陳述式會使用 rename 陳述式儲存值:
url變數已儲存至target.urlUDM 欄位。returnCode中間變數會儲存至network.http.response_codeUDM 欄位。size中間變數會儲存至network.received_bytesUDM 欄位。
mutate {
rename => {
"url" => "event.idm.read_only_udm.target.url"
"returnCode" => "event.idm.read_only_udm.network.http.response_code"
"size" => "event.idm.read_only_udm.network.received_bytes"
}
}
將 UDM 記錄繫結至輸出內容
資料對應指令中的最後一個陳述式,會將處理後的資料輸出至 UDM 事件記錄。
mutate {
merge => {
"@output" => "event"
}
}
完整的剖析器程式碼
這是完整的剖析器程式碼範例。指令順序與本文先前章節不同,但會產生相同的輸出內容。
filter {
# initialize variables
mutate {
replace => {
"event.idm.read_only_udm.metadata.product_name" => "Webproxy"
"event.idm.read_only_udm.metadata.vendor_name" => "Squid"
"not_valid_log" => "false"
"when" => ""
"srcip" => ""
"action" => ""
"username" => ""
"url" => ""
"tgtip" => ""
"method" => ""
}
}
# Extract fields from the raw log.
grok {
match => {
"message" => [
"%{NUMBER:when}\\s+\\d+\\s%{SYSLOGHOST:srcip} %{WORD:action}\\/%{NUMBER:returnCode} %{NUMBER:size} %{WORD:method} (?P<url>\\S+) (?P<username>.*?) %{WORD}\\/(?P<tgtip>\\S+).*"
]
}
overwrite => ["when","srcip","action","returnCode","size","method","url","username","tgtip"]
on_error => "not_valid_log"
}
# Parse event timestamp
if [when] != "" {
date {
match => [
"when", "UNIX"
]
}
}
# Save the value in "when" to the event timestamp
mutate {
rename => {
"when" => "timestamp"
}
}
# Transform and save username
if [username] not in [ "-" ,"" ] {
mutate {
lowercase => [ "username"]
}
}
mutate {
replace => {
"event.idm.read_only_udm.principal.user.userid" => "%{username}"
}
}
if ([action] == "TCP_DENIED" or [action] == "TCP_MISS" or [action] == "Denied" or [action] == "denied" or [action] == "Dropped") {
mutate {
replace => {
"action" => "BLOCK"
}
}
} else if ([action] == "TCP_TUNNEL" or [action] == "Accessed" or [action] == "Built" or [action] == "Retrieved" or [action] == "Stored") {
mutate {
replace => {
"action" => "ALLOW"
}
}
} else {
mutate {
replace => {
"action" => "UNKNOWN_ACTION" }
}
}
# save transformed value to an intermediary field
mutate {
merge => {
"security_result.action" => "action"
}
}
# save the security_result field
mutate {
merge => {
"event.idm.read_only_udm.security_result" => "security_result"
}
}
# check for presence of target ip. Extract and store target IP address.
if [tgtip] not in [ "","-" ] {
grok {
match => {
"tgtip" => [ "%{IP:tgtip}" ]
}
overwrite => ["tgtip"]
on_error => "not_valid_tgtip"
}
# store target IP address
if ![not_valid_tgtip] {
mutate {
merge => {
"event.idm.read_only_udm.target.ip" => "tgtip"
}
}
}
}
# convert the returnCode and size to integer data type
mutate {
convert => {
"returnCode" => "integer"
"size" => "uinteger"
}
}
# save url, returnCode, and size
mutate {
rename => {
"url" => "event.idm.read_only_udm.target.url"
"returnCode" => "event.idm.read_only_udm.network.http.response_code"
"size" => "event.idm.read_only_udm.network.received_bytes"
}
# set the event type to NETWORK_HTTP
replace => {
"event.idm.read_only_udm.metadata.event_type" => "NETWORK_HTTP"
}
}
# validate and set source IP address
if [srcip] != "" {
mutate {
merge => {
"event.idm.read_only_udm.principal.ip" => "srcip"
}
}
}
# save event to @output
mutate {
merge => {
"@output" => "event"
}
}
} #end of filter
還有其他問題嗎?向社群成員和 Google SecOps 專業人員尋求答案。