将 Apache Kafka 与 Google SecOps 集成
本文档介绍了如何将 Apache Kafka 与 Google Security Operations (Google SecOps) 集成。
使用场景
Apache Kafka 集成可满足以下使用场景:
实时安全日志注入:自动将 Kafka 主题中的安全事件注入到 Google SecOps 中并进行处理。这样一来,您就可以集中管理日志并进行实时分析,从而根据流式数据生成提醒。
事件驱动的自动化:根据从 Kafka 主题流式传输的特定安全事件或消息,在 Google SecOps 中触发自动化剧本。这有助于更快地响应关键事件,例如用户从非常用地点登录。
威胁情报丰富化:从 Kafka 主题拉取自定义威胁情报信息流,以丰富现有提醒和案例。这可为分析师提供有关入侵指标 (IOC) 的最新背景信息,并提高威胁分析的准确性。
准备工作
在 Google SecOps 中配置 Apache Kafka 集成之前,请完成以下前提条件:
- Apache Kafka 服务器:确保您有权访问正在运行的 Apache Kafka 服务器,并且已配置必要的 Kafka Broker 和主题。
远程代理 Docker 映像:创建远程代理时,您必须使用基于 Debian 的映像。请使用以下图片确保兼容性:
us-docker.pkg.dev/siem-ar-public/images/agent-debian:latest
集成参数
Apache Kafka 集成需要以下参数:
| 参数 | 说明 |
|---|---|
Kafka brokers |
必填。 要连接的 Kafka Broker 的英文逗号分隔列表,格式为 |
Use TLS for connection |
可选。 如果选择此项,集成功能将使用 TLS 加密进行身份验证。 此参数需要证书授权机构 (CA) 证书。 默认情况下未启用。 |
Use SASL PLAIN with TLS for connection |
可选。 如果选择此项,集成功能将使用 SASL PLAIN 用户名和密码机制进行身份验证。 此选项仅支持 TLS 加密,并且需要 SASL 用户名和密码以及 CA 证书。 默认情况下未启用。 |
CA certificate of Kafka server |
可选。 用于验证 Kafka 服务器身份的 CA 证书。 如果启用了 SASL,则此参数是必需的。 |
Client certificate |
可选。 用于与 Kafka 服务器进行双向 TLS 身份验证的客户端证书。 如果启用了双向 TLS (mTLS),则此参数是必需的。 |
Client certificate key |
可选。 与客户端证书对应的私钥,用于双向 TLS 身份验证。 如果启用了双向 TLS (mTLS),则此参数是必需的。 |
Client certificate key password |
可选。 用于解密客户端证书私钥的密码。 如果启用了双向 TLS (mTLS),则此参数是必需的。 |
SASL PLAIN Username |
可选。 用于与 Kafka 代理进行 SASL PLAIN 身份验证的用户名。 如果启用了 SASL,则此参数是必需的。 |
SASL PLAIN Password |
可选。 用于通过 SASL PLAIN 身份验证与 Kafka 代理进行通信的密码。 如果启用了 SASL,则此参数是必需的。 |
如需了解如何在 Google SecOps 中配置集成,请参阅配置集成。
如有需要,您可以在稍后阶段进行更改。配置集成实例后,您可以在 playbook 中使用该实例。如需详细了解如何配置和支持多个实例,请参阅支持多个实例。
操作
如需详细了解操作,请参阅 在工作台页面中处理待处理的操作和执行人工处置措施。
Ping
使用 Ping 操作测试与 Apache Kafka 的连接。
此操作不适用于 Google SecOps 实体。
操作输入
无。
操作输出
Ping 操作提供以下输出:
| 操作输出类型 | 可用性 |
|---|---|
| 案例墙附件 | 不可用 |
| 案例墙链接 | 不可用 |
| “支持请求墙”表格 | 不可用 |
| 丰富化表 | 不可用 |
| JSON 结果 | 不可用 |
| 输出消息 | 可用 |
| 脚本结果。 | 可用 |
输出消息
Ping 操作可以返回以下输出消息:
| 输出消息 | 消息说明 |
|---|---|
|
操作成功。 |
Failed to connect to the Apache Kafka server!
Error is ERROR_REASON |
操作失败。 检查与服务器的连接、输入参数或凭据。 |
脚本结果
下表列出了使用 Ping 操作时脚本结果输出的值:
| 脚本结果名称 | 值 |
|---|---|
is_success |
True 或 False |
连接器
如需详细了解如何在 Google SecOps 中配置连接器,请参阅注入数据(连接器)。
Apache Kafka - Messages 连接器
使用 Apache Kafka - 消息连接器从 Apache Kafka 中检索消息。
连接器从指定的 Kafka 主题检索消息,并可根据消息格式以不同方式处理这些消息。如果消息是有效的 JSON 对象,连接器会提取特定字段以用于创建和映射提醒。如果消息是纯字符串,则会作为原始事件数据被提取。
连接器会根据您提供的参数处理严重程度映射、提醒名称模板化和唯一 ID 生成。
JSON 严重程度映射
如需映射提醒严重程度,您需要指定 Apache Kafka - Messages Connector 使用哪个字段来获取 Severity Mapping JSON 参数中的严重程度值。连接器响应可以包含 integer、float 和 string 等值类型。
Apache Kafka - Messages 连接器会读取 integer 和 float 值,并根据 Google SecOps 设置对其进行映射。下表显示了 integer 值与 Google SecOps 中的严重程度之间的映射关系:
| 整数值 | 映射的严重程度 |
|---|---|
100 |
Critical |
从 80 到 100 |
High |
从 60 到 80 |
Medium |
从 40 到 60 |
Low |
不到 40 |
Informational |
如果响应包含 string 值,则 Pub/Sub - 消息连接器需要进行额外配置。
最初,默认值显示如下:
{
"Default": 60
}
如果映射所需的值位于 event_severity JSON 键中,则这些值可以如下所示:
"Malicious""Benign""Unknown"
如需解析 event_severity JSON 键值并确保 JSON 对象格式正确,请按如下方式配置 Severity Mapping JSON 参数:
{
"event_severity": {
"Malicious": 100,
"Unknown": 60,
"Benign": -1
},
"Default": 50
}
必须提供 "Default" 值。
如果同一 JSON 对象有多个匹配项,Apache Kafka - 消息连接器会优先处理第一个 JSON 对象键。
如需处理包含 integer 或 float 值的字段,请在 Severity Mapping JSON 参数中配置键和空字符串:
{
"Default":"60",
"integer_field": "",
"float_field": ""
}
连接器输入
Apache Kafka - Messages 连接器需要以下参数:
| 参数 | 说明 |
|---|---|
Product Field Name |
必填。 存储商品名称的字段的名称。 商品名称主要会影响映射。为了简化和改进连接器的映射流程,默认值会解析为从代码引用的回退值。默认情况下,此参数的任何无效输入都会解析为回退值。 默认值为 |
Event Field Name |
必填。 用于确定事件名称(子类型)的字段的名称。 默认值为 |
Environment Field Name |
可选。 存储环境名称的字段的名称。 如果缺少环境字段,连接器会使用默认值。 默认值为 |
Environment Regex Pattern |
可选。 要对在 使用默认值 如果正则表达式模式为 null 或空,或者环境值为 null,则最终环境结果为默认环境。 |
Script Timeout (Seconds) |
必填。 运行当前脚本的 Python 进程的超时时间限制(以秒为单位)。 默认值为 |
Kafka brokers |
必填。 要连接的 Kafka Broker 的英文逗号分隔列表,格式为 |
Use TLS for connection |
可选。 如果选择此项,集成功能将使用 TLS 加密进行身份验证。 此参数需要 CA 证书。 默认情况下未启用。 |
Use SASL PLAIN with TLS for connection |
可选。 如果选择此项,集成功能将使用 SASL PLAIN 用户名和密码机制进行身份验证。 此选项需要提供 SASL 用户名和密码。仅支持 TLS 加密,而 TLS 加密需要 CA 证书。 默认情况下未启用。 |
CA certificate of Kafka server |
可选。 用于验证 Kafka 服务器身份的 CA 证书。 |
Client certificate |
可选。 用于与 Kafka 服务器进行双向 TLS 身份验证的客户端证书。 |
Client certificate key |
可选。 与客户端证书对应的私钥,用于双向 TLS 身份验证。 |
Client certificate key password |
可选。 用于解密客户端证书私钥的密码。 |
SASL PLAIN Username |
可选。 用于与 Kafka 代理进行 SASL PLAIN 身份验证的用户名。 |
SASL PLAIN Password |
可选。 用于通过 SASL PLAIN 身份验证与 Kafka 代理进行通信的密码。 |
Topic |
必填。 从中检索事件的 Kafka 主题。 |
Consumer Group ID |
可选。 检索事件时使用的消费群组的标识符。 如果未提供任何值,系统会生成一个唯一 ID。 |
Partitions |
可选。 要从中提取消息的分区的 CSV 列表。 |
Initial Offset |
可选。 连接器开始从 Kafka 分区提取消息的位置。 您可以指定一个正整数,从特定偏移量开始,也可以使用值 |
Poll Timeout |
可选。 从 Kafka 使用消息的轮询超时时间(以秒为单位)。 |
Case Name Template |
可选。 用于定义自定义事件名称的模板。连接器会向事件添加 您可以使用格式为 FIELD_NAME 的占位符,这些占位符会根据第一个事件的字符串值进行填充。 示例: |
Alert Name Template |
必填。 用于定义提醒名称的模板。 您可以使用格式为 FIELD_NAME 的占位符,这些占位符会根据第一个事件的字符串值进行填充。 示例: 如果未提供值或模板无效,连接器会使用默认的提醒名称。 |
Rule Generator Template |
必填。 用于定义规则生成器的模板。 您可以使用格式为 FIELD_NAME 的占位符,这些占位符会根据第一个事件的字符串值进行填充。 示例: 如果未提供值或模板无效,连接器会使用默认的规则生成器名称。 |
Timestamp Field |
必填。 包含 Google SecOps 提醒时间戳的 Kafka 消息中的字段名称。 如果时间戳不是 Unix 时间戳格式,则必须在 |
Timestamp Format |
可选。 消息时间戳的格式,对于非 Unix 纪元时间戳是必需的。使用标准 Python 如果时间戳不是 Unix 纪元格式,并且未配置此参数,则连接器会失败。 |
Severity Mapping JSON |
必填。 连接器使用的 JSON 对象,用于从消息中提取严重程度级别并将其映射到 Google SecOps 优先级。 默认值为 |
Unique ID Field |
可选。 用作唯一消息标识符的字段的名称。 如果未提供值,连接器会生成消息内容的 SHA-256 哈希值并将其用作消息标识符。 |
Max Messages To Fetch |
必填。 连接器在每次迭代中处理的消息数量上限。 默认值为 |
Disable Overflow |
可选。 如果选中此选项,连接器会忽略 Google SecOps 溢出机制。 默认处于启用状态。 |
Verify SSL |
必填。 如果选择此项,集成会在连接到 Apache Kafka 服务器时验证 SSL 证书。 默认处于启用状态。 |
Proxy Server Address |
可选。 要使用的代理服务器的地址。 |
Proxy Username |
可选。 用于进行身份验证的代理用户名。 |
Proxy Password |
可选。 用于进行身份验证的代理密码。 |
连接器规则
连接器支持代理。
连接器提醒
下表介绍了 Apache Kafka 消息字段与 Google SecOps 提醒字段的映射关系:
| Siemplify 提醒字段 | Apache Kafka 消息字段 |
|---|---|
SourceSystemName |
由框架填充。 |
TicketId |
唯一 ID 字段的值或消息的 SHA-256 哈希。 |
DisplayId |
ApacheKafka_{unique id or hash}_{connector identifier} |
Name |
由 Alert Name Template 生成的值。 |
Reason |
不适用 |
Description |
不适用 |
DeviceVendor |
硬编码:Apache Kafka |
DeviceProduct |
后备值:Message |
Priority |
从 Severity Mapping JSON 参数映射而来。 |
RuleGenerator |
由 Rule Generator Template 生成的值。 |
SourceGroupingIdentifier |
不适用 |
StartTime |
从 Timestamp Field 转换而来。 |
EndTime |
从 Timestamp Field 转换而来。 |
Siemplify Alert - Extensions |
不适用 |
Siemplify Alert - Attachments |
不适用 |
连接器事件
连接器事件的示例如下:
{
"notificationConfigName": "organizations/ORGANIZATION_ID/notificationConfigs/soar_connector_CONNECTOR_ID_toxic_notifications_config",
"finding": {
"name": "organizations/ORGANIZATION_ID/sources/SOURCE_ID/findings/FINDING_ID",
"parent": "organizations/ORGANIZATION_ID/sources/SOURCE_ID",
"resourceName": "//compute.googleapis.com/projects/PROJECT_ID/global/firewalls/FIREWALL_ID",
"state": "ACTIVE",
"category": "OPEN_NETBIOS_PORT",
"externalUri": "https://console.cloud.google.com/networking/firewalls/details/default-allow-rdp?project=PROJECT_ID",
"sourceProperties": {
"Recommendation": "Restrict the firewall rules at: https://console.cloud.google.com/networking/firewalls/details/default-allow-rdp?project=PROJECT_ID",
"ExceptionInstructions": "Add the security mark \"allow_open_netbios_port\" to the asset with a value of \"true\" to prevent this finding from being activated again.",
"Explanation": "Firewall rules that allow connections from all IP addresses on TCP ports 137-139 or UDP ports 137-139 may expose NetBIOS services to attackers.",
"ScannerName": "FIREWALL_SCANNER",
"ResourcePath": [
"projects/PROJECT_ID/",
"folders/FOLDER_ID_1/",
"folders/FOLDER_ID_2/",
"organizations/ORGANIZATION_ID/"
],
"ExposedService": "NetBIOS",
"OpenPorts": {
"TCP": [
137.0,
138.0,
139.0
],
"UDP": [
137.0,
138.0,
139.0
]
},
"compliance_standards": {
"iso": [
{
"ids": [
"A.13.1.1"
]
}
],
"pci": [
{
"ids": [
"1.2.1"
]
}
],
"nist": [
{
"ids": [
"SC-7"
]
}
]
},
"ReactivationCount": 4.0
},
"securityMarks": {
"name": "organizations/ORGANIZATION_ID/sources/SOURCE_ID/findings/FINDING_ID/securityMarks",
"marks": {
"USER_ID": "SECURITY_MARK"
}
},
"eventTime": "2024-08-30T14:44:37.973090Z",
"createTime": "2024-06-24T07:08:54.777Z",
"propertyDataTypes": {
"ResourcePath": {
"listValues": {
"propertyDataTypes": [
{
"primitiveDataType": "STRING"
}
]
}
},
"ReactivationCount": {
"primitiveDataType": "NUMBER"
},
"Explanation": {
"primitiveDataType": "STRING"
},
"ExposedService": {
"primitiveDataType": "STRING"
},
"ScannerName": {
"primitiveDataType": "STRING"
}
}
}
}
需要更多帮助?获得社区成员和 Google SecOps 专业人士的解答。