日志解析概览
本文档概述了 Google Security Operations 如何将原始日志解析为统一数据模型 (UDM) 格式。
Google SecOps 可以接收来自以下注入源的日志数据:
- Google SecOps 转发器
- Chronicle API Feed
- Chronicle Ingestion API
- 第三方技术合作伙伴
一般来说,客户会以原始原始日志的形式发送数据。Google SecOps 会使用 LogType 唯一标识生成日志的设备。LogType 可同时标识以下两种日志:
- 生成日志的供应商和设备,例如 Cisco 防火墙、Linux DHCP 服务器或 Bro DNS。
- 哪个解析器将原始日志转换为结构化 UDM。 解析器与 LogType 之间存在一对一的关系。每个解析器都会转换单个 LogType 接收的数据。
Google SecOps 提供了一组默认解析器,这些解析器可读取原始的原始日志,并使用原始原始日志中的数据生成结构化的 UDM 记录。Google SecOps 会维护这些解析器。客户还可以通过创建特定于客户的解析器来定义自定义数据映射指令。如果您提交多行载荷,系统会将每一行解读为单独的日志条目。

解析器包含数据映射指令。它定义了如何将数据从原始原始日志映射到 UDM 数据结构中的一个或多个字段。
如果没有解析错误,Google SecOps 会使用原始日志中的数据创建 UDM 结构化记录。将原始日志转换为 UDM 记录的过程称为归一化。
默认解析器可能会映射原始日志中的部分核心值。通常,这些核心字段对于在 Google SecOps 中提供安全数据分析至关重要。未映射的值仍会保留在原始日志中,但不会存储在 UDM 记录中。
客户还可以使用 Ingestion API 以结构化 UDM 格式发送数据。
自定义如何解析提取的数据
Google SecOps 提供以下功能,使客户能够自定义对传入的原始日志数据进行的数据解析。
- 客户专用解析器:客户为特定日志类型创建自定义解析器配置,以满足其特定要求。客户专用解析器会替换特定 LogType 的默认解析器。如需了解详情,请参阅管理预构建解析器和自定义解析器。
- 解析器扩展:除了默认解析器配置之外,客户还可以添加自定义映射说明。每位客户都可以创建自己的一组独特的自定义映射说明。这些映射指令定义了如何从原始原始日志中提取其他字段并将其转换为 UDM 字段。解析器扩展程序不会替换默认解析器或客户专用解析器。
使用 Squid 网络代理日志的示例
本部分提供了一个 Squid Web 代理日志示例,并介绍了如何将值映射到 UDM 记录。如需了解 UDM 架构中所有字段的说明,请参阅统一数据模型字段列表。
示例 Squid 网络代理日志包含以空格分隔的值。每条记录代表一个事件,并存储以下数据:时间戳、时长、客户端、结果代码/结果状态、传输的字节数、请求方法、网址、用户、层次结构代码和内容类型。在此示例中,系统会提取以下字段并将其映射到 UDM 记录中:时间、客户端、结果状态、字节数、请求方法和网址。
1588059648.129 23 192.168.23.4 TCP_HIT/200 904 GET www.google.com/images/sunlogo.png - HIER_DIRECT/203.0.113.52 image/jpeg
在比较这些结构时,请注意 UDM 记录中仅包含原始日志数据的一部分。某些字段是必需字段,其他字段是可选字段。此外,UDM 记录中只有一部分部分包含数据。如果解析器未将原始日志中的数据映射到 UDM 记录,则您在 Google SecOps 中看不到 UDM 记录的相应部分。
metadata 部分存储着事件时间戳。请注意,该值已从 EPOCH 格式转换为 RFC 3339 格式。此转换是可选的。时间戳可以存储为 EPOCH 格式,并进行预处理,将秒和毫秒部分分离到单独的字段中。
metadata.event_type 字段存储值 NETWORK_HTTP,这是一个用于标识事件类型的枚举值。metadata.event_type 的值决定了哪些额外的 UDM 字段是必需的,哪些是可选的。product_name 和 vendor_name 值包含记录原始日志的设备的易懂说明。
UDM 事件记录中的 metadata.event_type 与使用 Ingestion API 提取数据时定义的 log_type 不同。这两个属性存储的信息不同。
network 部分包含原始日志事件中的值。请注意,在此示例中,原始日志中的状态值在写入 UDM 记录之前,已从“结果代码/状态”字段中解析出来。UDM 记录中仅包含 result_code。
principal 部分存储来自原始日志的客户端信息。target 部分存储完全限定网址和 IP 地址。
security_result 部分存储一个枚举值,用于表示原始日志中记录的操作。
这是以 JSON 格式设置的 UDM 记录。请注意,系统只会包含包含数据的部分。不包括 src、observer、intermediary、about 和 extensions 部分。
{
"metadata": {
"event_timestamp": "2020-04-28T07:40:48.129Z",
"event_type": "NETWORK_HTTP",
"product_name": "Squid Proxy",
"vendor_name": "Squid"
},
"principal": {
"ip": "192.168.23.4"
},
"target": {
"url": "www.google.com/images/sunlogo.png",
"ip": "203.0.113.52"
},
"network": {
"http": {
"method": "GET",
"response_code": 200,
"received_bytes": 904
}
},
"security_result": {
"action": "UNKNOWN_ACTION"
}
}
解析器指令中的步骤
解析器中的数据映射指令遵循以下常见模式:
- 解析原始日志并从中提取数据。
- 操控提取的数据。这包括使用条件逻辑来选择性地解析值、转换数据类型、替换值中的子字符串、转换为大写或小写等。
- 为 UDM 字段分配值。
- 将映射的 UDM 记录输出到 @output 键。
解析原始日志并从中提取数据
设置过滤语句
filter 语句是解析指令集中的第一个语句。所有其他解析指令都包含在 filter 语句中。
filter {
}
初始化将存储提取值的变量
在 filter 语句中,初始化解析器将用于存储从日志中提取的值的中间变量。
每次解析单个日志时都会使用这些变量。每个中间变量中的值将在解析说明中稍后设置为一个或多个 UDM 字段。
mutate {
replace => {
"event.idm.read_only_udm.metadata.product_name" => "Webproxy"
"event.idm.read_only_udm.metadata.vendor_name" => "Squid"
"not_valid_log" => "false"
"when" => ""
"srcip" => ""
"action" => ""
"username" => ""
"url" => ""
"tgtip" => ""
"method" => ""
}
}
从日志中提取各个值
Google SecOps 提供了一组基于 Logstash 的过滤器,用于从原始日志文件中提取字段。根据日志的格式,您可以使用一个或多个提取过滤条件从日志中提取所有数据。如果字符串为:
- 原生 JSON,解析器语法与支持 JSON 格式日志的 JSON 过滤器类似。不支持嵌套 JSON。
- XML 格式,解析器语法与支持 XML 格式日志的 XML 过滤器类似。
- 键值对,解析器语法与支持键值格式化消息的 Kv 过滤器类似。
- CSV 格式,解析器语法与支持 CSV 格式消息的 CSV 过滤器类似。
- 对于所有其他格式,解析器语法与 内置 GROK 模式的 GROK 过滤条件类似。这会使用正则表达式样式的提取指令。
Google SecOps 提供每个过滤器中的部分功能。Google SecOps 还提供过滤器中没有的自定义数据映射语法。如需了解支持的功能和自定义函数,请参阅解析器语法参考文档。
继续使用 Squid Web 代理日志示例,以下数据提取指令包含 Logstash Grok 语法和正则表达式的组合。
以下提取语句将值存储在以下中间变量中:
whensrcipactionreturnCodesizemethodusernameurltgtip
此示例语句还使用 overwrite 关键字将提取的值存储在每个变量中。如果提取过程返回错误,则 on_error 语句会将 not_valid_log 设置为 True。
grok {
match => {
"message" => [
"%{NUMBER:when}\\s+\\d+\\s%{SYSLOGHOST:srcip} %{WORD:action}\\/%{NUMBER:returnCode} %{NUMBER:size} %{WORD:method} (?P<url>\\S+) (?P<username>.*?) %{WORD}\\/(?P<tgtip>\\S+).*"
]
}
overwrite => ["when","srcip","action","returnCode","size","method","url","username","tgtip"]
on_error => "not_valid_log"
}
处理和转换提取的值
Google SecOps 利用 Logstash mutate 过滤条件插件功能来处理从原始日志中提取的值。Google SecOps 提供的功能是该插件提供的功能的一部分。 如需了解受支持的功能和自定义函数(例如):请参阅解析器语法。
- 将值转换为其他数据类型
- 替换字符串中的值
- 合并两个数组或将字符串附加到数组。字符串值会在合并之前转换为数组。
- 转换为小写或大写
本部分提供了一些数据转换示例,这些示例基于之前介绍的 Squid 网络代理日志。
转换事件时间戳
存储为 UDM 记录的所有事件都必须具有事件时间戳。此示例用于检查是否已从日志中提取数据值。然后,它使用 Grok 日期函数将该值与 UNIX 时间格式进行匹配。
if [when] != "" {
date {
match => [
"when", "UNIX"
]
}
}
转换 username 值
以下示例语句将 username 变量中的值转换为小写。
mutate {
lowercase => [ "username"]
}
转换 action 值
以下示例会评估 action 中间变量中的值,并将该值更改为 ALLOW、BLOCK 或 UNKNOWN_ACTION,这些都是 security_result.action UDM 字段的有效值。security_result.action UDM 字段是一种仅存储特定值的枚举类型。
if ([action] == "TCP_DENIED" or [action] == "TCP_MISS" or [action] == "Denied" or [action] == "denied" or [action] == "Dropped") {
mutate {
replace => {
"action" => "BLOCK"
}
}
} else if ([action] == "TCP_TUNNEL" or [action] == "Accessed" or [action] == "Built" or [action] == "Retrieved" or [action] == "Stored") {
mutate {
replace => {
"action" => "ALLOW"
}
}
} else {
mutate {
replace => {
"action" => "UNKNOWN_ACTION" }
}
}
转换目标 IP 地址
以下示例会检查 tgtip 中间变量中的值。如果找到,则使用预定义的 Grok 模式将该值与 IP 地址模式进行匹配。如果将值与 IP 地址模式匹配时出现错误,on_error 函数会将 not_valid_tgtip 属性设置为 True。如果匹配成功,则不会设置 not_valid_tgtip 属性。
if [tgtip] not in [ "","-" ] {
grok {
match => {
"tgtip" => [ "%{IP:tgtip}" ]
}
overwrite => ["tgtip"]
on_error => "not_valid_tgtip"
}
更改 returnCode 和 size 的数据类型
以下示例将 size 变量中的值强制转换为 uinteger,并将 returnCode 变量中的值强制转换为 integer。这是必需的,因为 size 变量将保存到存储 int64 数据类型的 network.received_bytes UDM 字段中。returnCode 变量将保存到存储 int32 数据类型的 network.http.response_code UDM 字段中。
mutate {
convert => {
"returnCode" => "integer"
"size" => "uinteger"
}
}
为事件中的 UDM 字段分配值
提取并预处理值后,将其分配给 UDM 事件记录中的字段。您可以为 UDM 字段分配提取的值和静态值。
如果您填充 event.disambiguation_key,请确保此字段对于为给定日志生成的每个事件都是唯一的。如果两个不同的事件具有相同的 disambiguation_key,则会导致系统出现意外行为。
本部分中的解析器示例基于之前的 Squid Web 代理日志示例。
保存活动时间戳
每个 UDM 事件记录都必须为 metadata.event_timestamp UDM 字段设置值。以下示例将从日志中提取的事件时间戳保存到内置变量 @timestamp 中。Google Security Operations 默认会将此信息保存到 metadata.event_timestamp UDM 字段。
mutate {
rename => {
"when" => "timestamp"
}
}
设置活动类型
每个 UDM 事件记录都必须为 metadata.event_type UDM 字段设置值。此字段为枚举类型。此字段的值决定了必须填充哪些额外的 UDM 字段才能保存 UDM 记录。如果任何必填字段不包含有效数据,解析和规范化过程将失败。
replace => {
"event.idm.read_only_udm.metadata.event_type" => "NETWORK_HTTP"
}
}
使用 replace 语句保存 username 和 method 值
username 和 method 中间字段中的值是字符串。以下示例会检查是否存在有效值,如果存在,则将 username 值存储到 principal.user.userid UDM 字段,并将 method 值存储到 network.http.method UDM 字段。
if [username] not in [ "-" ,"" ] {
mutate {
replace => {
"event.idm.read_only_udm.principal.user.userid" => "%{username}"
}
}
}
if [method] != "" {
mutate {
replace => {
"event.idm.read_only_udm.network.http.method" => "%{method}"
}
}
}
将 action 保存到 security_result.action UDM 字段
在上一部分中,系统对 action 中间变量中的值进行了评估,并将其转换为 security_result.action UDM 字段的标准值之一。
security_result 和 action UDM 字段都存储一个项目数组,这意味着您在保存此值时必须采用略有不同的方法。
首先,将转换后的值保存到中间 security_result.action 字段。security_result 字段是 action 字段的父级。
mutate {
merge => {
"security_result.action" => "action"
}
}
接下来,将中间 security_result.action 中介字段保存到 security_result UDM 字段。security_result UDM 字段存储的是一个项目数组,因此该值会附加到此字段。
# save the security_result field
mutate {
merge => {
"event.idm.read_only_udm.security_result" => "security_result"
}
}
使用 merge 语句存储目标 IP 地址和源 IP 地址
将以下值存储到 UDM 事件记录中:
srcip中间变量中的值到principal.ipUDM 字段。tgtip中间变量中的值到target.ipUDM 字段。
principal.ip 和 target.ip UDM 字段都存储着一个项目数组,因此值会附加到每个字段。
以下示例展示了保存这些值的不同方法。
在转换步骤中,tgtip中间变量使用预定义的 Grok 模式与 IP 地址进行了匹配。以下示例语句用于检查 not_valid_tgtip 属性是否为 true,以表明 tgtip 值无法与 IP 地址模式匹配。如果为 false,则将 tgtip 值保存到 target.ip UDM 字段。
if ![not_valid_tgtip] {
mutate {
merge => {
"event.idm.read_only_udm.target.ip" => "tgtip"
}
}
}
srcip 中间变量未转换。以下语句会检查是否从原始日志中提取了值,如果提取了,则将该值保存到 principal.ip UDM 字段。
if [srcip] != "" {
mutate {
merge => {
"event.idm.read_only_udm.principal.ip" => "srcip"
}
}
}
使用 rename 语句保存 url、returnCode 和 size
以下示例语句使用 rename 语句存储值:
url变量已保存到target.urlUDM 字段。- 保存到
network.http.response_codeUDM 字段的returnCode中间变量。 - 保存到
network.received_bytesUDM 字段的size中间变量。
mutate {
rename => {
"url" => "event.idm.read_only_udm.target.url"
"returnCode" => "event.idm.read_only_udm.network.http.response_code"
"size" => "event.idm.read_only_udm.network.received_bytes"
}
}
将 UDM 记录绑定到输出
数据映射指令中的最后一条语句将处理后的数据输出到 UDM 事件记录。
mutate {
merge => {
"@output" => "event"
}
}
完整的解析器代码
这是完整的解析器代码示例。指令的顺序与本文档前几部分中的顺序不同,但会生成相同的输出。
filter {
# initialize variables
mutate {
replace => {
"event.idm.read_only_udm.metadata.product_name" => "Webproxy"
"event.idm.read_only_udm.metadata.vendor_name" => "Squid"
"not_valid_log" => "false"
"when" => ""
"srcip" => ""
"action" => ""
"username" => ""
"url" => ""
"tgtip" => ""
"method" => ""
}
}
# Extract fields from the raw log.
grok {
match => {
"message" => [
"%{NUMBER:when}\\s+\\d+\\s%{SYSLOGHOST:srcip} %{WORD:action}\\/%{NUMBER:returnCode} %{NUMBER:size} %{WORD:method} (?P<url>\\S+) (?P<username>.*?) %{WORD}\\/(?P<tgtip>\\S+).*"
]
}
overwrite => ["when","srcip","action","returnCode","size","method","url","username","tgtip"]
on_error => "not_valid_log"
}
# Parse event timestamp
if [when] != "" {
date {
match => [
"when", "UNIX"
]
}
}
# Save the value in "when" to the event timestamp
mutate {
rename => {
"when" => "timestamp"
}
}
# Transform and save username
if [username] not in [ "-" ,"" ] {
mutate {
lowercase => [ "username"]
}
}
mutate {
replace => {
"event.idm.read_only_udm.principal.user.userid" => "%{username}"
}
}
if ([action] == "TCP_DENIED" or [action] == "TCP_MISS" or [action] == "Denied" or [action] == "denied" or [action] == "Dropped") {
mutate {
replace => {
"action" => "BLOCK"
}
}
} else if ([action] == "TCP_TUNNEL" or [action] == "Accessed" or [action] == "Built" or [action] == "Retrieved" or [action] == "Stored") {
mutate {
replace => {
"action" => "ALLOW"
}
}
} else {
mutate {
replace => {
"action" => "UNKNOWN_ACTION" }
}
}
# save transformed value to an intermediary field
mutate {
merge => {
"security_result.action" => "action"
}
}
# save the security_result field
mutate {
merge => {
"event.idm.read_only_udm.security_result" => "security_result"
}
}
# check for presence of target ip. Extract and store target IP address.
if [tgtip] not in [ "","-" ] {
grok {
match => {
"tgtip" => [ "%{IP:tgtip}" ]
}
overwrite => ["tgtip"]
on_error => "not_valid_tgtip"
}
# store target IP address
if ![not_valid_tgtip] {
mutate {
merge => {
"event.idm.read_only_udm.target.ip" => "tgtip"
}
}
}
}
# convert the returnCode and size to integer data type
mutate {
convert => {
"returnCode" => "integer"
"size" => "uinteger"
}
}
# save url, returnCode, and size
mutate {
rename => {
"url" => "event.idm.read_only_udm.target.url"
"returnCode" => "event.idm.read_only_udm.network.http.response_code"
"size" => "event.idm.read_only_udm.network.received_bytes"
}
# set the event type to NETWORK_HTTP
replace => {
"event.idm.read_only_udm.metadata.event_type" => "NETWORK_HTTP"
}
}
# validate and set source IP address
if [srcip] != "" {
mutate {
merge => {
"event.idm.read_only_udm.principal.ip" => "srcip"
}
}
}
# save event to @output
mutate {
merge => {
"@output" => "event"
}
}
} #end of filter
需要更多帮助?获得社区成员和 Google SecOps 专业人士的解答。