在 YARA-L 中创建多阶段查询
本文档介绍了 YARA-L 中的多阶段查询如何让您将一个查询阶段的输出直接馈送到后续阶段的输入中。与单个整体式查询相比,此流程可让您更好地控制数据转换。
将多阶段查询与现有功能集成
多阶段查询可与 Google Security Operations 中的以下现有功能搭配使用:
复合检测规则:多阶段查询可补充复合检测规则。 与复合规则不同,使用搜索功能的多阶段查询可以实时返回结果。
时间范围和多事件规则:您可以使用多阶段查询,通过比较数据中的不同时间窗口来检测异常情况。例如,您可以使用初始查询阶段在较长时间内建立基准,然后使用后续阶段根据该基准评估近期活动。您还可以使用多事件规则创建类似的比较。
信息中心和搜索均支持 YARA-L 中的多阶段查询。
联接有助于关联来自多个来源的数据,从而为调查提供更多背景信息。通过关联相关事件、实体和其他数据,您可以调查复杂的攻击场景。如需了解详情,请参阅在搜索中使用联接。
定义多阶段 YARA-L 语法
配置多阶段查询时,请注意以下事项:
- 限制阶段:多阶段查询必须包含 1 到 4 个命名阶段,外加根阶段。
- 顺序语法:请务必先定义命名阶段语法,然后再定义根阶段语法。
创建多阶段 YARA-L 查询
如需创建多阶段 YARA-L 查询,请完成以下步骤。
阶段结构和语法
依次前往调查 > 搜索。定义查询阶段时,请遵循以下结构要求:
语法:使用以下语法来命名每个阶段,并将其与其他阶段分开:
stage <stage name> { }
大括号:将所有阶段语法放在大括号 {} 内。
顺序:在定义根阶段之前,先定义所有命名阶段的语法。
引用:每个阶段都可以引用查询中之前定义的阶段。
根阶段:查询必须具有根阶段,该阶段在所有命名阶段之后处理。
以下示例阶段 daily_stats
会收集每日网络统计信息:
stage daily_stats {
metadata.event_type = "NETWORK_CONNECTION"
$source = principal.hostname
$target = target.ip
$source != ""
$target != ""
$total_bytes = cast.as_int(network.sent_bytes + network.received_bytes)
match:
$source, $target by day
outcome:
$exchanged_bytes = sum($total_bytes)
}
访问阶段输出
后续阶段可以使用阶段字段访问已命名阶段的输出。阶段字段与阶段的 match
和 outcome
变量相对应,并且可以像统一数据模型 (UDM) 字段一样使用。
使用以下语法访问阶段字段:
$<stage name>.<variable name>
访问时段时间戳(可选)
如果命名阶段使用跳跃窗口、滑动窗口或滚动窗口,请使用以下预留字段访问每个输出行的窗口开始时间和窗口结束时间:
$<stage name>.window_start
$<stage name>.window_end
window_start
和 window_end
是整数字段,以自 Unix 纪元以来的秒数表示。不同阶段的窗口大小可能有所不同。
限制
多阶段查询具有以下功能和结构限制:
结构和阶段限制
根阶段:每个查询只允许有一个根阶段。
命名阶段:最多支持四个命名阶段。
阶段引用:阶段只能引用同一查询中在逻辑上位于其之前的阶段。
联接:所有阶段最多允许四个非数据表联接。
结果要求:每个命名阶段(根阶段除外)都必须包含
match
部分或outcome
部分。outcome
部分不需要进行汇总。
窗口和兼容性限制
功能支持:搜索和信息中心支持多阶段查询,但规则不支持。
窗口类型:避免在单个查询中混合使用不同的窗口类型。
窗口依赖项:使用跃点窗口或滑动窗口的阶段不能依赖于也使用跃点窗口或滑动窗口的另一个阶段。
滚动窗口大小:虽然不同阶段的滚动窗口大小可能有所不同,但大小差异必须小于 720 倍。
示例:阶段聚合差异
不允许使用以下窗口配置示例:
stage monthly_stats {
metadata.event_type = "NETWORK_CONNECTION"
$source = principal.hostname
$target = target.ip
$source != ""
$target != ""
$total_bytes = cast.as_int(network.sent_bytes + network.received_bytes)
match:
$source, $target by month
outcome:
$exchanged_bytes = sum($total_bytes)
}
$source = $monthly_stats.source
$target = $monthly_stats.target
match:
$source, $target by minute
如果 monthly_stats
阶段按月汇总数据,而根阶段按分钟汇总 monthly_stats
的输出,那么 monthly_stats
中的每一行都会映射到根阶段中的 43,200 行(因为一个月有 43,200 分钟)。
阶段和查询限制
多阶段查询中的每个阶段都有以下限制:
适用于单阶段查询的大部分限制也适用于每个单独的阶段:
输出要求:每个阶段都必须输出至少一个匹配变量或结果变量(阶段字段)。
联接中的窗口:联接中使用的最大窗口大小(跳跃、滚动或滑动)为 2 天。
结果变量数量上限:
对于未选择允许更大结果变量限制的客户,此值为 20
50(如果客户已选择启用更大的结果变量限制)
多阶段查询与统计信息查询受相同的限制约束:
统计信息查询:120 QPH(API 和界面)
Google SecOps 中的搜索视图:每分钟 100 个视图
界面和
EventService.UDMSearch
API 支持多阶段联接,但SearchService.UDMSearch
API 不支持。界面中还支持不含联接的多阶段查询。
活动和全局限制
事件数上限:
多阶段查询可同时处理的事件数量受到严格限制:
UDM 事件:最多允许 2 个 UDM 事件。
实体上下文图 (ECG) 事件:最多允许一个 ECG 事件。
全局查询限制:
这些限制是平台级限制,用于控制多阶段查询可以返回多长时间之前的数据以及多少数据。
对于查询时间范围,标准查询的最长时间范围为 30 天。
结果集总大小上限为 10,000 条结果。
多阶段查询示例
本部分中的示例有助于说明如何创建完整的多阶段 YARA-L 查询。
示例:搜索异常活跃的网络连接(小时)
此多阶段 YARA-L 示例可识别网络活动高于正常水平的 IP 地址对,并针对持续高活动状态超过 3 小时的 IP 地址对。该查询包含两个必需的组件:命名阶段 hourly_stats
和 root
阶段。
hourly_stats
阶段会搜索具有高网络活动级别的 principal.ip
和 target.ip
对。
此阶段会针对以下字段返回单个小时值:
来源 IP(字符串)的统计信息:
$hourly_stats.src_ip
目标 IP(字符串)的统计信息:
$hourly_stats.dst_ip
事件数量(整数)的统计信息:
$hourly_stats.count
接收字节数的标准差(浮点数):
$hourly_stats.std_recd_bytes
平均接收字节数(浮点数):
$hourly_stats.avg_recd_bytes
以秒为单位的小时段开始时间(自 Unix 纪元起算,整数):
$hourly_stats.window_start
以秒为单位的小时桶结束时间(整数),从 Unix 纪元开始计算:
$hourly_stats.window_end
根阶段会处理 hourly_stats
阶段的输出。它会计算 principal.ip
和 target.ip
对的统计信息,其中 activity 超过了 $hourly_stats
指定的阈值。然后,它会过滤掉高活动时间超过 3 小时的配对。
stage hourly_stats {
metadata.event_type = "NETWORK_CONNECTION"
$src_ip = principal.ip
$dst_ip = target.ip
$src_ip != ""
$dst_ip != ""
match:
$src_ip, $dst_ip by hour
outcome:
$count = count(metadata.id)
$avg_recd_bytes = avg(network.received_bytes)
$std_recd_bytes = stddev(network.received_bytes)
condition:
$avg_recd_bytes > 100 and $std_recd_bytes > 50
}
$src_ip = $hourly_stats.src_ip
$dst_ip = $hourly_stats.dst_ip
$time_bucket_count = strings.concat(timestamp.get_timestamp($hourly_stats.window_start), "|", $hourly_stats.count)
match:
$src_ip, $dst_ip
outcome:
$list = array_distinct($time_bucket_count)
$count = count_distinct($hourly_stats.window_start)
condition:
$count > 3
如果您按如下方式更改根阶段中的匹配条件,则可以为多阶段查询引入按天进行的窗口化聚合。
match:
$src_ip, $dst_ip by day
示例:搜索异常活跃的网络连接(使用 Z 分数)
此多阶段查询使用 Z 得分计算(衡量与平均值的标准差数)将每日平均网络活动与今天的活动进行比较。此查询可有效搜索内部资产与外部系统之间异常高的网络活动。
前提条件:查询时间窗口必须大于或等于 2 天,并且包含当前日期,计算出的 Z 分数才能有效。
此多阶段查询包含 daily_stats
阶段和 root
阶段,这两个阶段共同计算网络活动的 Z 得分:
daily_stats
阶段执行初始的每日聚合。它会计算每个 IP 对(source
和target
)每天交换的总字节数,并返回以下阶段字段(与输出行中的列相对应):$daily_stats.source
:单数,字符串$daily_stats.target
:单数,字符串$daily_stats.exchanged_bytes
:单数,整数$daily_stats.window_start
:单数,整数$daily_stats.window_end
:单数,整数
根阶段会聚合每个 IP 对的
daily_stats
阶段输出。 它会计算整个搜索范围内的每日字节交换量的平均值和标准差,以及今天的字节交换量。它会使用这三个计算出的值来确定 Z 得分。输出会列出今天所有 IP 对的 Z 分数,并按降序排序。
// Calculate the total bytes exchanged per day by source and target
stage daily_stats {
metadata.event_type = "NETWORK_CONNECTION"
$source = principal.hostname
$target = target.ip
$source != ""
$target != ""
$total_bytes = cast.as_int(network.sent_bytes + network.received_bytes)
match:
$source, $target by day
outcome:
$exchanged_bytes = sum($total_bytes)
}
// Calculate the average per day over the time window and compare with the bytes
exchanged today
$source = $daily_stats.source
$target = $daily_stats.target
$date = timestamp.get_date($daily_stats.window_start)
match:
$source, $target
outcome:
$today_bytes = sum(if($date = timestamp.get_date(timestamp.current_seconds()), $daily_stats.exchanged_bytes, 0))
$average_bytes = window.avg($daily_stats.exchanged_bytes)
$stddev_bytes = window.stddev($daily_stats.exchanged_bytes)
$zscore = ($today_bytes - $average_bytes) / $stddev_bytes
order:
$zscore desc
从阶段导出未汇总的变量
命名阶段可以包含未汇总的 outcome
部分。这意味着,在 outcome
部分中定义的变量会直接从阶段输出,让后续阶段可以将其作为阶段字段进行访问,而无需进行分组汇总。
示例:导出未汇总的变量
此示例演示了如何导出未汇总的变量。请注意以下逻辑:
top_5_bytes_sent
针对网络活动最多的 5 个事件进行分阶段搜索。top_5_bytes_sent
阶段会输出与输出行中的列对应的以下阶段字段:$top_5_bytes_sent.bytes_sent
:单数,整数$top_5_bytes_sent.timestamp_seconds
:单数,整数
root
阶段会计算网络活动最高的 5 个事件的最新和最早时间戳。
stage top_5_bytes_sent {
metadata.event_type = "NETWORK_CONNECTION"
network.sent_bytes > 0
outcome:
$bytes_sent = cast.as_int(network.sent_bytes)
$timestamp_seconds = metadata.event_timestamp.seconds
order:
$bytes_sent desc
limit:
5
}
outcome:
$latest_timestamp = timestamp.get_timestamp(max($top_5_bytes_sent.timestamp_seconds))
$earliest_timestamp = timestamp.get_timestamp(min($top_5_bytes_sent.timestamp_seconds))
在多阶段查询中实现窗口化
多阶段查询支持在命名阶段中进行所有类型的窗口化(跳跃、滑动和滚动)。如果命名阶段包含窗口,则可以使用以下预留字段访问每个输出行的窗口开始时间和窗口结束时间:
$<stage name>.window_start
$<stage name>.window_end
示例:跳跃窗口
以下示例说明了如何在多阶段查询中使用跳跃窗口:
hourly_stats
阶段会搜索在同一小时内具有高网络活动的 IP 对。hourly_stats
输出与输出行中的列对应的以下阶段字段:$hourly_stats.src_ip
:单数,字符串$hourly_stats.dst_ip
:单数,字符串$hourly_stats.count
:单数,整数$hourly_stats.std_recd_bytes
:单数,浮点数$hourly_stats.avg_recd_bytes
:单数,浮点数$hourly_stats.window_start
:单数,整数$hourly_stats.window_end
:单数,整数
根阶段会过滤掉高活动时间超过 3 小时的 IP 对。由于在
hourly_stats
阶段使用了跳跃窗口,因此小时数可能会重叠。
stage hourly_stats {
metadata.event_type = "NETWORK_CONNECTION"
$src_ip = principal.ip
$dst_ip = target.ip
$src_ip != ""
$dst_ip != ""
match:
$src_ip, $dst_ip over 1h
outcome:
$count = count(metadata.id)
$avg_recd_bytes = avg(network.received_bytes)
$std_recd_bytes = stddev(network.received_bytes)
condition:
$avg_recd_bytes > 100 and $std_recd_bytes > 50
}
$src_ip = $hourly_stats.src_ip
$dst_ip = $hourly_stats.dst_ip
$time_bucket_count = strings.concat(timestamp.get_timestamp($hourly_stats.window_start), "|", $hourly_stats.count)
match:
$src_ip, $dst_ip
outcome:
$list = array_distinct($time_bucket_count)
$count = count_distinct($hourly_stats.window_start)
condition:
$count > 3
已知问题
在实现多阶段查询时,建议您查看以下限制和推荐的解决方法:
所有多阶段查询的行为都类似于统计信息搜索查询(输出由汇总统计信息组成,而不是未汇总的事件或数据表行)。
由于 UDM 和实体事件数据集的大小,以 UDM 和实体事件为一侧的联接可能会出现性能低下的情况。我们强烈建议尽可能多地过滤联接的 UDM 和实体事件(例如,按事件类型过滤)。
如需有关推荐实践的一般指导,请参阅 Yara-L 最佳实践;如需了解有关联接的具体信息,请参阅最佳实践。
需要更多帮助?从社区成员和 Google SecOps 专业人士那里获得解答。