将 Apache Kafka 与 Google SecOps 集成

本文档介绍了如何将 Apache Kafka 与 Google Security Operations (Google SecOps) 集成。

使用场景

Apache Kafka 集成可满足以下使用场景:

  • 实时安全日志注入:自动将 Kafka 主题中的安全事件注入到 Google SecOps 中并进行处理。这样一来,您就可以集中管理日志并进行实时分析,从而根据流式数据生成提醒。

  • 事件驱动的自动化:根据从 Kafka 主题流式传输的特定安全事件或消息,在 Google SecOps 中触发自动化剧本。这有助于更快地响应关键事件,例如用户从非常用地点登录。

  • 威胁情报丰富化:从 Kafka 主题拉取自定义威胁情报信息流,以丰富现有提醒和案例。这可为分析师提供有关入侵指标 (IOC) 的最新背景信息,并提高威胁分析的准确性。

准备工作

在 Google SecOps 中配置 Apache Kafka 集成之前,请完成以下前提条件:

  • Apache Kafka 服务器:确保您有权访问正在运行的 Apache Kafka 服务器,并且已配置必要的 Kafka Broker 和主题。
  • 远程代理 Docker 映像:创建远程代理时,您必须使用基于 Debian 的映像。请使用以下图片确保兼容性:

    us-docker.pkg.dev/siem-ar-public/images/agent-debian:latest
    

集成参数

Apache Kafka 集成需要以下参数:

参数 说明
Kafka brokers

必填。

要连接的 Kafka Broker 的英文逗号分隔列表,格式为 hostname:port

Use TLS for connection

可选。

如果选择此项,集成功能将使用 TLS 加密进行身份验证。

此参数需要证书授权机构 (CA) 证书。

默认情况下未启用。

Use SASL PLAIN with TLS for connection

可选。

如果选择此项,集成功能将使用 SASL PLAIN 用户名和密码机制进行身份验证。

此选项仅支持 TLS 加密,并且需要 SASL 用户名和密码以及 CA 证书。

默认情况下未启用。

CA certificate of Kafka server

可选。

用于验证 Kafka 服务器身份的 CA 证书。

如果启用了 SASL,则此参数是必需的。

Client certificate

可选。

用于与 Kafka 服务器进行双向 TLS 身份验证的客户端证书。

如果启用了双向 TLS (mTLS),则此参数是必需的。

Client certificate key

可选。

与客户端证书对应的私钥,用于双向 TLS 身份验证。

如果启用了双向 TLS (mTLS),则此参数是必需的。

Client certificate key password

可选。

用于解密客户端证书私钥的密码。

如果启用了双向 TLS (mTLS),则此参数是必需的。

SASL PLAIN Username

可选。

用于与 Kafka 代理进行 SASL PLAIN 身份验证的用户名。

如果启用了 SASL,则此参数是必需的。

SASL PLAIN Password

可选。

用于通过 SASL PLAIN 身份验证与 Kafka 代理进行通信的密码。

如果启用了 SASL,则此参数是必需的。

如需了解如何在 Google SecOps 中配置集成,请参阅配置集成

如有需要,您可以在稍后阶段进行更改。配置集成实例后,您可以在 playbook 中使用该实例。如需详细了解如何配置和支持多个实例,请参阅支持多个实例

操作

如需详细了解操作,请参阅 在工作台页面中处理待处理的操作执行人工处置措施

Ping

使用 Ping 操作测试与 Apache Kafka 的连接。

此操作不适用于 Google SecOps 实体。

操作输入

无。

操作输出

Ping 操作提供以下输出:

操作输出类型 可用性
案例墙附件 不可用
案例墙链接 不可用
“支持请求墙”表格 不可用
丰富化表 不可用
JSON 结果 不可用
输出消息 可用
脚本结果。 可用
输出消息

Ping 操作可以返回以下输出消息:

输出消息 消息说明

Successfully connected to the Apache Kafka server with the provided connection parameters!

操作成功。
Failed to connect to the Apache Kafka server! Error is ERROR_REASON

操作失败。

检查与服务器的连接、输入参数或凭据。

脚本结果

下表列出了使用 Ping 操作时脚本结果输出的值:

脚本结果名称
is_success TrueFalse

连接器

如需详细了解如何在 Google SecOps 中配置连接器,请参阅注入数据(连接器)

Apache Kafka - Messages 连接器

使用 Apache Kafka - 消息连接器从 Apache Kafka 中检索消息。

连接器从指定的 Kafka 主题检索消息,并可根据消息格式以不同方式处理这些消息。如果消息是有效的 JSON 对象,连接器会提取特定字段以用于创建和映射提醒。如果消息是纯字符串,则会作为原始事件数据被提取。

连接器会根据您提供的参数处理严重程度映射、提醒名称模板化和唯一 ID 生成。

JSON 严重程度映射

如需映射提醒严重程度,您需要指定 Apache Kafka - Messages Connector 使用哪个字段来获取 Severity Mapping JSON 参数中的严重程度值。连接器响应可以包含 integerfloatstring 等值类型。

Apache Kafka - Messages 连接器会读取 integerfloat 值,并根据 Google SecOps 设置对其进行映射。下表显示了 integer 值与 Google SecOps 中的严重程度之间的映射关系:

整数值 映射的严重程度
100 Critical
80100 High
6080 Medium
4060 Low
不到 40 Informational

如果响应包含 string 值,则 Pub/Sub - 消息连接器需要进行额外配置。

最初,默认值显示如下:

{
    "Default": 60
}

如果映射所需的值位于 event_severity JSON 键中,则这些值可以如下所示:

  • "Malicious"
  • "Benign"
  • "Unknown"

如需解析 event_severity JSON 键值并确保 JSON 对象格式正确,请按如下方式配置 Severity Mapping JSON 参数:

{
    "event_severity": {
        "Malicious": 100,
        "Unknown": 60,
        "Benign": -1
    },
    "Default": 50
}

必须提供 "Default" 值。

如果同一 JSON 对象有多个匹配项,Apache Kafka - 消息连接器会优先处理第一个 JSON 对象键。

如需处理包含 integerfloat 值的字段,请在 Severity Mapping JSON 参数中配置键和空字符串:

{
  "Default":"60",
  "integer_field": "",
  "float_field": ""
}

连接器输入

Apache Kafka - Messages 连接器需要以下参数:

参数 说明
Product Field Name

必填。

存储商品名称的字段的名称。

商品名称主要会影响映射。为了简化和改进连接器的映射流程,默认值会解析为从代码引用的回退值。默认情况下,此参数的任何无效输入都会解析为回退值。

默认值为 Product Name

Event Field Name

必填。

用于确定事件名称(子类型)的字段的名称。

默认值为 event_type

Environment Field Name

可选。

存储环境名称的字段的名称。

如果缺少环境字段,连接器会使用默认值。

默认值为 ""

Environment Regex Pattern

可选。

要对在 Environment Field Name 字段中找到的值运行的正则表达式模式。此参数可让您使用正则表达式逻辑来操纵环境字段。

使用默认值 .* 可检索所需的原始 Environment Field Name 值。

如果正则表达式模式为 null 或空,或者环境值为 null,则最终环境结果为默认环境。

Script Timeout (Seconds)

必填。

运行当前脚本的 Python 进程的超时时间限制(以秒为单位)。

默认值为 180

Kafka brokers

必填。

要连接的 Kafka Broker 的英文逗号分隔列表,格式为 hostname:port

Use TLS for connection

可选。

如果选择此项,集成功能将使用 TLS 加密进行身份验证。

此参数需要 CA 证书。

默认情况下未启用。

Use SASL PLAIN with TLS for connection

可选。

如果选择此项,集成功能将使用 SASL PLAIN 用户名和密码机制进行身份验证。

此选项需要提供 SASL 用户名和密码。仅支持 TLS 加密,而 TLS 加密需要 CA 证书。

默认情况下未启用。

CA certificate of Kafka server

可选。

用于验证 Kafka 服务器身份的 CA 证书。

Client certificate

可选。

用于与 Kafka 服务器进行双向 TLS 身份验证的客户端证书。

Client certificate key

可选。

与客户端证书对应的私钥,用于双向 TLS 身份验证。

Client certificate key password

可选。

用于解密客户端证书私钥的密码。

SASL PLAIN Username

可选。

用于与 Kafka 代理进行 SASL PLAIN 身份验证的用户名。

SASL PLAIN Password

可选。

用于通过 SASL PLAIN 身份验证与 Kafka 代理进行通信的密码。

Topic

必填。

从中检索事件的 Kafka 主题。

Consumer Group ID

可选。

检索事件时使用的消费群组的标识符。

如果未提供任何值,系统会生成一个唯一 ID。

Partitions

可选。

要从中提取消息的分区的 CSV 列表。

Initial Offset

可选。

连接器开始从 Kafka 分区提取消息的位置。

您可以指定一个正整数,从特定偏移量开始,也可以使用值 earliestlatest 从分区开头或结尾开始提取。

Poll Timeout

可选。

从 Kafka 使用消息的轮询超时时间(以秒为单位)。

Case Name Template

可选。

用于定义自定义事件名称的模板。连接器会向事件添加 custom_case_name 键。

您可以使用格式为 FIELD_NAME 的占位符,这些占位符会根据第一个事件的字符串值进行填充。

示例:Phishing - EVENT_MAILBOX

Alert Name Template

必填。

用于定义提醒名称的模板。

您可以使用格式为 FIELD_NAME 的占位符,这些占位符会根据第一个事件的字符串值进行填充。

示例:Phishing - EVENT_MAILBOX

如果未提供值或模板无效,连接器会使用默认的提醒名称。

Rule Generator Template

必填。

用于定义规则生成器的模板。

您可以使用格式为 FIELD_NAME 的占位符,这些占位符会根据第一个事件的字符串值进行填充。

示例:Phishing - EVENT_MAILBOX

如果未提供值或模板无效,连接器会使用默认的规则生成器名称。

Timestamp Field

必填。

包含 Google SecOps 提醒时间戳的 Kafka 消息中的字段名称。

如果时间戳不是 Unix 时间戳格式,则必须在 Timestamp Format 参数中定义其格式。

Timestamp Format

可选。

消息时间戳的格式,对于非 Unix 纪元时间戳是必需的。使用标准 Python strftime 格式代码。

如果时间戳不是 Unix 纪元格式,并且未配置此参数,则连接器会失败。

Severity Mapping JSON

必填。

连接器使用的 JSON 对象,用于从消息中提取严重程度级别并将其映射到 Google SecOps 优先级。

默认值为 {"Default": "60"}

Unique ID Field

可选。

用作唯一消息标识符的字段的名称。

如果未提供值,连接器会生成消息内容的 SHA-256 哈希值并将其用作消息标识符。

Max Messages To Fetch

必填。

连接器在每次迭代中处理的消息数量上限。

默认值为 100

Disable Overflow

可选。

如果选中此选项,连接器会忽略 Google SecOps 溢出机制。

默认处于启用状态。

Verify SSL

必填。

如果选择此项,集成会在连接到 Apache Kafka 服务器时验证 SSL 证书。

默认处于启用状态。

Proxy Server Address

可选。

要使用的代理服务器的地址。

Proxy Username

可选。

用于进行身份验证的代理用户名。

Proxy Password

可选。

用于进行身份验证的代理密码。

连接器规则

连接器支持代理。

连接器提醒

下表介绍了 Apache Kafka 消息字段与 Google SecOps 提醒字段的映射关系:

Siemplify 提醒字段 Apache Kafka 消息字段
SourceSystemName 由框架填充。
TicketId 唯一 ID 字段的值或消息的 SHA-256 哈希。
DisplayId ApacheKafka_{unique id or hash}_{connector identifier}
Name Alert Name Template 生成的值。
Reason 不适用
Description 不适用
DeviceVendor 硬编码:Apache Kafka
DeviceProduct 后备值:Message
Priority Severity Mapping JSON 参数映射而来。
RuleGenerator Rule Generator Template 生成的值。
SourceGroupingIdentifier 不适用
StartTime Timestamp Field 转换而来。
EndTime Timestamp Field 转换而来。
Siemplify Alert - Extensions 不适用
Siemplify Alert - Attachments 不适用

连接器事件

连接器事件的示例如下:

{
  "notificationConfigName": "organizations/ORGANIZATION_ID/notificationConfigs/soar_connector_CONNECTOR_ID_toxic_notifications_config",
  "finding": {
    "name": "organizations/ORGANIZATION_ID/sources/SOURCE_ID/findings/FINDING_ID",
    "parent": "organizations/ORGANIZATION_ID/sources/SOURCE_ID",
    "resourceName": "//compute.googleapis.com/projects/PROJECT_ID/global/firewalls/FIREWALL_ID",
    "state": "ACTIVE",
    "category": "OPEN_NETBIOS_PORT",
    "externalUri": "https://console.cloud.google.com/networking/firewalls/details/default-allow-rdp?project=PROJECT_ID",
    "sourceProperties": {
      "Recommendation": "Restrict the firewall rules at: https://console.cloud.google.com/networking/firewalls/details/default-allow-rdp?project=PROJECT_ID",
      "ExceptionInstructions": "Add the security mark \"allow_open_netbios_port\" to the asset with a value of \"true\" to prevent this finding from being activated again.",
      "Explanation": "Firewall rules that allow connections from all IP addresses on TCP ports 137-139 or UDP ports 137-139 may expose NetBIOS services to attackers.",
      "ScannerName": "FIREWALL_SCANNER",
      "ResourcePath": [
        "projects/PROJECT_ID/",
        "folders/FOLDER_ID_1/",
        "folders/FOLDER_ID_2/",
        "organizations/ORGANIZATION_ID/"
      ],
      "ExposedService": "NetBIOS",
      "OpenPorts": {
        "TCP": [
          137.0,
          138.0,
          139.0
        ],
        "UDP": [
          137.0,
          138.0,
          139.0
        ]
      },
      "compliance_standards": {
        "iso": [
          {
            "ids": [
              "A.13.1.1"
            ]
          }
        ],
        "pci": [
          {
            "ids": [
              "1.2.1"
            ]
          }
        ],
        "nist": [
          {
            "ids": [
              "SC-7"
            ]
          }
        ]
      },
      "ReactivationCount": 4.0
    },
    "securityMarks": {
      "name": "organizations/ORGANIZATION_ID/sources/SOURCE_ID/findings/FINDING_ID/securityMarks",
      "marks": {
        "USER_ID": "SECURITY_MARK"
      }
    },
    "eventTime": "2024-08-30T14:44:37.973090Z",
    "createTime": "2024-06-24T07:08:54.777Z",
    "propertyDataTypes": {
      "ResourcePath": {
        "listValues": {
          "propertyDataTypes": [
            {
              "primitiveDataType": "STRING"
            }
          ]
        }
      },
      "ReactivationCount": {
        "primitiveDataType": "NUMBER"
      },
      "Explanation": {
        "primitiveDataType": "STRING"
      },
      "ExposedService": {
        "primitiveDataType": "STRING"
      },
      "ScannerName": {
        "primitiveDataType": "STRING"
      }
    }
  }
}

需要更多帮助?获得社区成员和 Google SecOps 专业人士的解答。