在 YARA-L 中创建多阶段查询
本文档介绍了 YARA-L 中的多阶段查询如何让您将一个查询阶段的输出直接馈送到后续阶段的输入中。与单个整体式查询相比,此流程可让您更好地控制数据转换。
将多阶段查询与现有功能集成
多阶段查询可与 Google Security Operations 中的以下现有功能搭配使用:
复合检测规则:多阶段查询可补充复合检测规则。 与复合规则不同,使用搜索功能的多阶段查询可以实时返回结果。
时间范围和多事件规则:您可以使用多阶段查询,通过比较数据中的不同时间窗口来检测异常情况。例如,您可以使用初始查询阶段在较长时间内建立基准,然后使用后续阶段根据该基准评估近期活动。您还可以使用多事件规则来创建类似的比较。
信息中心和搜索均支持 YARA-L 中的多阶段查询。
联接有助于关联来自多个来源的数据,从而为调查提供更多背景信息。通过关联相关事件、实体和其他数据,您可以调查复杂的攻击场景。如需了解详情,请参阅在搜索中使用联接。
定义多阶段 YARA-L 语法
配置多阶段查询时,请注意以下事项:
- 限制阶段:多阶段查询必须包含 1 到 4 个命名阶段,外加根阶段。
- 顺序语法:请务必先定义命名阶段语法,然后再定义根阶段语法。
创建多阶段 YARA-L 查询
如需创建多阶段 YARA-L 查询,请完成以下步骤。
阶段结构和语法
依次前往调查 > 搜索。定义查询阶段时,请遵循以下结构要求:
语法:使用以下语法为每个阶段命名,并将其与其他阶段分开:
stage <stage name> { }
大括号:将所有阶段语法都放在大括号 {} 内。
顺序:在定义根阶段之前,先定义所有命名阶段的语法。
引用:每个阶段都可以引用查询中之前定义的阶段。
根阶段:查询必须具有根阶段,该阶段在所有命名阶段之后处理。
以下示例阶段 daily_stats 会收集每日网络统计信息:
stage daily_stats {
metadata.event_type = "NETWORK_CONNECTION"
$source = principal.hostname
$target = target.ip
$source != ""
$target != ""
match:
$source, $target by day
outcome:
$exchanged_bytes = sum(network.sent_bytes + network.received_bytes)
}
访问阶段输出
后续阶段可以使用阶段字段访问已命名阶段的输出。阶段字段与阶段的 match 和 outcome 变量相对应,并且可以像统一数据模型 (UDM) 字段一样使用。
使用以下语法访问阶段字段:
$<stage name>.<variable name>
访问时段时间戳(可选)
如果命名阶段使用跳跃窗口、滑动窗口或滚动窗口,请使用以下预留字段访问每个输出行的窗口开始时间和窗口结束时间:
$<stage name>.window_start$<stage name>.window_end
window_start 和 window_end 是整数字段,以自 Unix 纪元以来的秒数表示。不同阶段的窗口大小可能有所不同。
限制
多阶段查询具有以下功能和结构限制:
结构和阶段限制
根阶段:每个查询只允许有一个根阶段。
命名阶段:最多支持四个命名阶段。
阶段引用:阶段只能引用同一查询中在逻辑上位于其之前的阶段。
联接:所有阶段最多允许 4 个非数据表联接。
结果要求:每个命名阶段(根阶段除外)都必须包含
match部分或outcome部分。outcome部分不需要进行聚合。
窗口和兼容性限制
功能支持:搜索和信息中心支持多阶段查询,但规则不支持。
窗口类型:避免在单个查询中混用不同的窗口类型。
窗口依赖项:使用跃点窗口或滑动窗口的阶段不能依赖于也使用跃点窗口或滑动窗口的另一个阶段。
滚动窗口大小:虽然不同阶段的滚动窗口大小可能有所不同,但大小差异必须小于 720 倍。
示例:阶段聚合差异
不允许使用以下窗口配置示例:
stage monthly_stats {
metadata.event_type = "NETWORK_CONNECTION"
$source = principal.hostname
$target = target.ip
$source != ""
$target != ""
match:
$source, $target by month
outcome:
$exchanged_bytes = sum(network.sent_bytes + network.received_bytes)
}
$source = $monthly_stats.source
$target = $monthly_stats.target
match:
$source, $target by minute
如果 monthly_stats 阶段按月汇总数据,而根阶段按分钟汇总 monthly_stats 的输出,那么 monthly_stats 中的每一行都会映射到根阶段中的 43,200 行(因为一个月有 43,200 分钟)。
阶段和查询限制
多阶段查询中的每个阶段都有以下限制:
适用于单阶段查询的大部分限制也适用于每个单独的阶段:
输出要求:每个阶段都必须输出至少一个匹配变量或结果变量(阶段字段)。
联接中的窗口:联接中使用的最大窗口大小(跳跃、滚动或滑动)为 2 天。
结果变量数量上限:
20(对于未选择允许更大结果变量限制的客户)
50(对于选择允许更大结果变量限制的客户)
多阶段查询与统计信息查询受相同的限制约束:
统计信息查询:120 QPH(API 和界面)
Google SecOps 的搜索视图:每分钟 100 个视图
界面和
EventService.UDMSearchAPI 支持多阶段联接,但SearchService.UDMSearchAPI 不支持。界面中还支持不含联接的多阶段查询。
活动和全局限制
事件数上限:
多阶段查询可同时处理的事件数量受到严格限制:
UDM 事件:最多允许 2 个 UDM 事件。
实体上下文图 (ECG) 事件:最多允许一个 ECG 事件。
全局查询限制:
这些限制是平台级限制,用于控制多阶段查询可以返回的数据的时间范围和数据量。
对于查询时间范围,标准查询的最长时间范围为 30 天。
结果集总大小上限为 10,000 条结果。
多阶段查询示例
本部分中的示例有助于说明如何创建完整的多阶段 YARA-L 查询。
示例:搜索异常活跃的网络连接(小时)
此多阶段 YARA-L 示例可识别网络活动高于正常水平的 IP 地址对,目标是持续高活动时间超过 3 小时的地址对。该查询包含两个必需的组件:命名阶段 hourly_stats 和 root 阶段。
hourly_stats 阶段会搜索具有高网络活动级别的 principal.ip 和 target.ip 对。
此阶段会针对以下字段返回单个每小时值:
来源 IP(字符串)的统计信息:
$hourly_stats.src_ip目标 IP(字符串)的统计信息:
$hourly_stats.dst_ip事件数量(整数)的统计信息:
$hourly_stats.count接收字节数的标准差(浮点数):
$hourly_stats.std_recd_bytes平均接收字节数(浮点数):
$hourly_stats.avg_recd_bytes以秒为单位的小时桶开始时间(整数),从 Unix 纪元开始计算:
$hourly_stats.window_start以秒为单位的小时桶结束时间(整数),从 Unix 纪元开始计算:
$hourly_stats.window_end
根阶段会处理 hourly_stats 阶段的输出。它会计算 principal.ip 和 target.ip 对的统计信息,其中 activity 超过了 $hourly_stats 指定的阈值。然后,它会过滤出高活动时间超过 3 小时的配对。
stage hourly_stats {
metadata.event_type = "NETWORK_CONNECTION"
$src_ip = principal.ip
$dst_ip = target.ip
$src_ip != ""
$dst_ip != ""
match:
$src_ip, $dst_ip by hour
outcome:
$count = count(metadata.id)
$avg_recd_bytes = avg(network.received_bytes)
$std_recd_bytes = stddev(network.received_bytes)
condition:
$avg_recd_bytes > 100 and $std_recd_bytes > 50
}
$src_ip = $hourly_stats.src_ip
$dst_ip = $hourly_stats.dst_ip
$time_bucket_count = strings.concat(timestamp.get_timestamp($hourly_stats.window_start), "|", $hourly_stats.count)
match:
$src_ip, $dst_ip
outcome:
$list = array_distinct($time_bucket_count)
$count = count_distinct($hourly_stats.window_start)
condition:
$count > 3
如果您按如下方式更改根阶段中的匹配条件,则可以为多阶段查询引入按天进行的窗口化聚合。
match:
$src_ip, $dst_ip by day
示例:搜索异常活跃的网络连接(使用 Z 得分)
此多阶段查询使用 Z 得分计算(衡量与平均值的标准差数)将每日平均网络活动与今天的活动进行比较。此查询可有效搜索内部资产与外部系统之间异常高的网络活动。
前提条件:查询时间窗口必须大于或等于 2 天,并且包含当前日期,计算出的 Z 分数才能有效。
此多阶段查询包含 daily_stats 阶段和 root 阶段,这两个阶段共同计算网络活动的 Z 分数:
daily_stats阶段执行初始的每日汇总。它会计算每个 IP 对(source和target)每天交换的总字节数,并返回以下阶段字段(与输出行中的列相对应):$daily_stats.source:单数,字符串$daily_stats.target:单数,字符串$daily_stats.exchanged_bytes:单数,整数$daily_stats.window_start:单数,整数$daily_stats.window_end:单数,整数
根阶段会聚合每个 IP 对的
daily_stats阶段输出。 它会计算整个搜索范围内的每日字节交换量的平均值和标准差,以及今天的字节交换量。然后,它会使用这三个计算出的值来确定 Z 分数。输出会列出今天所有 IP 对的 Z 得分,并按降序排序。
// Calculate the total bytes exchanged per day by source and target
stage daily_stats {
metadata.event_type = "NETWORK_CONNECTION"
$source = principal.hostname
$target = target.ip
$source != ""
$target != ""
match:
$source, $target by day
outcome:
$exchanged_bytes = sum(network.sent_bytes + network.received_bytes)
}
// Calculate the average per day over the time window and compare with the bytes exchanged today
$source = $daily_stats.source
$target = $daily_stats.target
$date = timestamp.get_date($daily_stats.window_start)
match:
$source, $target
outcome:
$today_bytes = sum(if($date = timestamp.get_date(timestamp.current_seconds()), cast.as_int($daily_stats.exchanged_bytes), 0))
$average_bytes = window.avg($daily_stats.exchanged_bytes)
$stddev_bytes = window.stddev($daily_stats.exchanged_bytes)
$zscore = ($today_bytes - $average_bytes) / $stddev_bytes
order:
$zscore desc
从阶段导出未汇总的变量
命名阶段可以包含未汇总的 outcome 部分。这意味着,在 outcome 部分中定义的变量直接从阶段输出,让后续阶段可以将其作为阶段字段进行访问,而无需进行分组聚合。
示例:导出未汇总的变量
此示例演示了如何导出未汇总的变量。请注意以下逻辑:
top_5_bytes_sent针对网络活动最多的 5 个事件进行分阶段搜索。top_5_bytes_sent阶段输出与输出行中的列对应的以下阶段字段:$top_5_bytes_sent.bytes_sent:单数,整数$top_5_bytes_sent.timestamp_seconds:单数,整数
root阶段会计算网络活动最高的 5 个事件的最新和最早时间戳。
stage top_5_bytes_sent {
metadata.event_type = "NETWORK_CONNECTION"
network.sent_bytes > 0
outcome:
$bytes_sent = network.sent_bytes
$timestamp_seconds = metadata.event_timestamp.seconds
order:
$bytes_sent desc
limit:
5
}
outcome:
$latest_timestamp = timestamp.get_timestamp(max($top_5_bytes_sent.timestamp_seconds))
$earliest_timestamp = timestamp.get_timestamp(min($top_5_bytes_sent.timestamp_seconds))
在多阶段查询中实现窗口化
多阶段查询支持命名阶段中的所有类型的窗口(跳跃、滑动和滚动)。如果命名阶段包含窗口,则可以使用以下预留字段访问每个输出行的窗口开始时间和窗口结束时间:
$<stage name>.window_start$<stage name>.window_end
示例:跳跃窗口
以下示例说明了如何在多阶段查询中使用跳跃窗口:
hourly_stats阶段会搜索在同一小时内具有高网络活动的 IP 对。hourly_stats输出与输出行中的列对应的以下阶段字段:$hourly_stats.src_ip:单数,字符串$hourly_stats.dst_ip:单数,字符串$hourly_stats.count:单数,整数$hourly_stats.std_recd_bytes:单数,浮点数$hourly_stats.avg_recd_bytes:单数,浮点数$hourly_stats.window_start:单数,整数$hourly_stats.window_end:单数,整数
根阶段会过滤掉高活动时间超过 3 小时的 IP 对。由于在
hourly_stats阶段使用了跳跃窗口,因此小时数可能会重叠。
stage hourly_stats {
metadata.event_type = "NETWORK_CONNECTION"
$src_ip = principal.ip
$dst_ip = target.ip
$src_ip != ""
$dst_ip != ""
match:
$src_ip, $dst_ip over 1h
outcome:
$count = count(metadata.id)
$avg_recd_bytes = avg(network.received_bytes)
$std_recd_bytes = stddev(network.received_bytes)
condition:
$avg_recd_bytes > 100 and $std_recd_bytes > 50
}
$src_ip = $hourly_stats.src_ip
$dst_ip = $hourly_stats.dst_ip
$time_bucket_count = strings.concat(timestamp.get_timestamp($hourly_stats.window_start), "|", $hourly_stats.count)
match:
$src_ip, $dst_ip
outcome:
$list = array_distinct($time_bucket_count)
$count = count_distinct($hourly_stats.window_start)
condition:
$count > 3
多阶段查询中的联接
多阶段查询的各个阶段内以及阶段之间都支持内部联接。内连接功能支持以下类型:
- UDM 和 UDM
- UDM 和 ECG
- UDM 和 DataTable
在关联的上下文中,窗口化阶段是指包含窗口的匹配部分。相比之下,表阶段不会输出窗口。
以下示例展示了如何在多阶段查询中配置 UDM 事件与表阶段之间的无匹配联接。
median阶段计算每个源主机和目标 IP 对的发送字节数中位数median阶段输出与输出行中的列对应的以下阶段字段:$median.host:单数,字符串$median.target:单数,字符串$median.median:单数,浮点数
absolute_deviations阶段将每个 UDM 事件与median中具有相同源主机和目标 IP 对的行联接起来。对于每个 UDM 事件,它会计算发送的字节数的绝对值。absolute_deviations输出与输出行中的列对应的以下阶段字段:$absolute_deviations.host:单数,字符串$absolute_deviations.target:单数,字符串$absolute_deviations.absolute_deviation:单数,浮点数
- 根阶段计算所有 UDM 事件中发送的字节数的平均绝对偏差
stage median {
metadata.event_type = "NETWORK_CONNECTION"
$host = principal.hostname
$target = target.ip
match:
$host, $target
outcome:
$median = window.median(network.sent_bytes, true)
}
stage absolute_deviations {
metadata.event_type = "NETWORK_CONNECTION"
$join_host = principal.hostname
$join_host = $median.host
$join_target = target.ip[0]
$join_target = $median.target
outcome:
$host = $join_host
$target = $join_target
$absolute_deviation = math.abs(network.sent_bytes - $median.median)
}
$host = $absolute_deviations.host
$target = $absolute_deviations.target
match:
$host, $target
outcome:
$mean_absolute_deviation = avg($absolute_deviations.absolute_deviation)
示例:窗口阶段与表阶段之间的无匹配联接
以下示例说明了如何在多阶段查询中配置窗口阶段与表阶段之间的无匹配联接。
hourly_stats阶段会计算每个源主机和目标主机对以及小时桶的总发送字节数。hourly_stats阶段输出与输出行中的列对应的以下阶段字段:$hourly_stats.source_host:单数,字符串$hourly_stats.dst_host:单数,字符串$hourly_stats.total_bytes_sent:单数,浮点数$hourly_stats.window_start:单数,整数$hourly_stats.window_end:单数,整数
agg_stats阶段会计算每个源主机和目标主机对每小时的平均字节数和标准差。agg_stats输出与输出行中的列对应的以下阶段字段:$agg_stats.source_host:单数,字符串$agg_stats.dst_host:单数,字符串$agg_stats.avg_bytes_sent:单数,浮点数$agg_stats.stddev_bytes_sent:单数,浮点数
根阶段将
hourly_stats中的每一行与agg_stats中具有相同来源和目标主机对的行联接起来。对于每个源主机和目标主机对,它会使用相应主机对存储桶的总发送字节数和汇总统计信息来计算 z 得分。
stage hourly_stats {
$source_host = principal.hostname
$dst_host = target.hostname
principal.hostname != ""
target.hostname != ""
match:
$source_host, $dst_host by hour
outcome:
$total_bytes_sent = sum(network.sent_bytes)
}
stage agg_stats {
$source_host = $hourly_stats.source_host
$dst_host = $hourly_stats.dst_host
match:
$source_host, $dst_host
outcome:
$avg_bytes_sent = avg($hourly_stats.total_bytes_sent)
$stddev_bytes_sent = stddev($hourly_stats.total_bytes_sent)
}
$source_host = $agg_stats.source_host
$source_host = $hourly_stats.source_host
$dst_host = $agg_stats.dst_host
$dst_host = $hourly_stats.dst_host
outcome:
$hour_bucket = timestamp.get_timestamp($hourly_stats.window_start)
$z_score = ($hourly_stats.total_bytes_sent - $agg_stats.avg_bytes_sent)/$agg_stats.stddev_bytes_sent
多阶段查询中的交叉联接
使用 Google SecOps 搜索或信息中心时,多阶段查询中的交叉联接可让您将各个 UDM 事件数据与在其他 YARA-L 阶段计算的汇总统计信息进行比较。
在 YARA-L 中,cross join 关键字适用于限制为 1 的阶段。这只会返回一行。
如果在限制为 1 的阶段与另一个数据集(例如 UDM 事件)之间使用交叉联接,则该阶段的单行输出会附加到另一个数据集的每一行。这会使用总体统计信息来丰富事件数据。
示例:查找异常登录活动
以下示例用于识别登录频率高于正常水平的用户。它会通过比较每个用户的登录次数(使用 user_login_counts 阶段)与所有用户的平均登录次数(使用 total_users 阶段)来计算此值。在搜索结果中,登录次数异常多的用户可以按登录次数排序。
然后,您可以使用交叉联接关键字将 total_users 阶段的结果与 user_login_counts 阶段的结果相关联。
stage user_login_counts {
$user = principal.user.userid
metadata.event_type = "USER_LOGIN"
security_result.action = "ALLOW"
match:
$user
outcome:
$login_count = count(metadata.id)
}
stage total_users {
outcome:
$count = count($user_login_counts.user)
limit:
1
}
cross join $total_users, $user_login_counts
$login_count = $user_login_counts.login_count
$user = $user_login_counts.user
$tot_users = $total_users.count
// all users who logged in the same number of times are grouped together.
match:
$login_count
outcome:
$num_users = count($user)
$frequency_percent = (count($user) / max($tot_users) ) * 100
已知问题
在实现多阶段查询时,建议您查看以下限制和推荐的解决方法:
所有多阶段查询的行为都类似于统计信息搜索查询(输出包含的是汇总统计信息,而不是未汇总的事件或数据表行)。
由于 UDM 和实体事件数据集的大小,以 UDM 和实体事件为一侧的联接可能会出现性能低下的情况。我们强烈建议尽可能多地过滤联接的 UDM 和实体事件(例如,按事件类型过滤)。
如需有关推荐做法的一般指导,请参阅 Yara-L 最佳实践;如需了解有关联接的具体信息,请参阅最佳实践。
需要更多帮助?获得社区成员和 Google SecOps 专业人士的解答。