设置和管理数据处理流水线

支持的平台:

借助数据处理功能,您可以对 Google Security Operations 数据注入进行强大的预解析控制。您可以过滤事件、转换字段或编辑敏感值,以优化数据兼容性、降低费用并保护 Google SecOps 中的敏感信息。

本文档将引导您使用 Bindplane 控制台完成完整的数据注入和处理工作流。因此,您可以了解如何:

  • 配置与 Google SecOps 目标实例的连接。
  • 创建新的 Google SecOps 流水线。
  • 设置数据处理,包括数据流和处理器。
  • 推出流水线以启动数据注入和处理。
  • 在 Google SecOps 控制台中监控流水线数据流和处理器。

您可以使用 Bindplane 管理控制台或直接使用公共 Google SecOps Data Pipeline API 来配置本地数据流和云数据流的数据处理。

数据处理包含以下要素:

  • 数据流:一个或多个数据流将数据馈送到数据处理流水线中。每个数据流都针对特定数据流类型进行了配置。
  • 处理器节点:数据处理有一个包含一个或多个处理器的处理器节点。每个处理器都会指定在数据流经流水线时要对数据执行的操作(例如过滤、转换和编辑)。
  • 目标:Google SecOps 目标实例是处理后的数据发送到的位置。

使用场景

示例使用场景包括:

  • 从原始日志中移除空键值对。
  • 隐去敏感数据。
  • 从原始日志内容添加注入标签。
  • 在多实例环境中,将提取标签应用于直接提取的日志数据,以识别源数据流实例(例如Google Cloud Workspace)。
  • 按字段值过滤 Palo Alto Cortex 数据。
  • 按类别减少 SentinelOne 数据。
  • 从 Feed 和直接提取日志中提取主机信息,并将其映射到 Cloud Monitoring 的 ingestion_source 字段。

前提条件

如果您打算使用 Bindplane 控制台来管理 Google SecOps 数据处理,请执行以下步骤:

  1. 在 Google Security Operations 控制台中,向安装程序授予所需的预定义管理员角色。 有关详情,请参阅在专用项目中分配 Project IAM Admin 角色
  2. 分配角色下,选择以下预定义的 Identity and Access Management 角色:

    • Chronicle API Admin (roles/chronicle.admin)
  3. 安装 Bindplane 服务器控制台。对于 SaaS 或本地,请参阅安装 Bindplane Server 控制台

  4. 在 Bindplane 控制台中,将 Google SecOps 目标实例连接到您的 Bindplane 项目。如需了解详情,请参阅连接到 Google SecOps 实例

管理低容量 SecOps 数据确认延迟

自行配置代理的 Ingestion API 用户可能会发现,在数据处理流水线中,低流量 SecOps 流水线的确认时间可能会增加。

平均延迟时间可能会从 700 毫秒上升到 2 秒。根据需要增加超时时间和内存。当数据吞吐量超过 4 MB 时,确认时间会缩短。

连接到 Google SecOps 实例

在开始之前,请确认您拥有 Bindplane 项目管理员权限,可以访问项目集成页面。

Google SecOps 实例用作数据输出的目标位置。

如需使用 Bindplane 控制台连接到 Google SecOps 实例,请执行以下操作:

  1. Bindplane 控制台中,前往管理项目页面。
  2. 前往集成卡片,然后点击关联到 Google SecOps 以打开修改集成窗口。
  3. 输入 Google SecOps 目标实例的详细信息。
    此实例会接收处理后的数据(数据处理的输出),如下所示:

    字段 说明
    区域 Google SecOps 实例的区域。
    如需在 Google Cloud 控制台中查找实例,请依次前往安全性 > 检测和控制 > Google Security Operations > 实例详情
    客户 ID 您的 Google SecOps 实例的客户 ID。

    在 Google SecOps 控制台中,依次前往 SIEM 设置 > 个人资料 > 组织详细信息
    Google Cloud 项目编号 Google Cloud 您的 Google SecOps 实例的项目编号。

    如需在 Google SecOps 控制台中查找项目编号,请依次前往 SIEM 设置 > 个人资料 > 组织详细信息
    凭据 服务账号凭据是用于验证身份并访问 Google SecOps Data Pipeline API 所需的 JSON 值。从 Google 服务账号凭据文件中获取此 JSON 值。

    服务账号必须位于与 Google SecOps 实例相同的 Google Cloud 项目中,并且需要 Chronicle API 管理员角色 (roles/chronicle.admin) 权限。

    如需了解如何创建服务账号并下载 JSON 文件,请参阅创建和删除服务账号密钥
  4. 点击连接。如果您的连接详细信息正确无误,并且您已成功连接到 Google SecOps,则可以预期会发生以下情况:

    • 系统会打开与 Google SecOps 实例的连接。
    • 首次连接时,SecOps 流水线会显示在 Bindplane 控制台中。
    • Bindplane 控制台会显示您之前使用 API 为此实例设置的所有已处理数据。系统会将您使用该 API 配置的部分处理器转换为 Bindplane 处理器,并以原始 OpenTelemetry 转换语言 (OTTL) 格式显示其他处理器。您可以使用 Bindplane 控制台修改之前使用 API 设置的流水线和处理器。
  5. 成功创建与 Google SecOps 实例的连接后,您可以创建 SecOps 流水线,并使用 Bindplane 控制台设置数据处理

使用 Bindplane 控制台设置数据处理

您可以使用 Bindplane 控制台管理 Google SecOps 处理的数据,包括使用 API 设置的流水线。

准备工作

在开始之前,我们建议您先阅读以下重要建议:

  • 调用 Backstory API 的基于推送的流已被弃用,不再支持数据处理。将集成迁移为使用 Chronicle Ingestion API。
  • 如需首次安装 Bindplane 控制台或将 Google SecOps 目标实例连接到您的 Bindplane 项目,请参阅前提条件
  • 除了使用 Bindplane 控制台之外,您还可以直接调用 Google SecOps API 来设置和管理数据处理。如需了解详情,请参阅使用 Google SecOps Data Pipeline API
  • 从转发器和 Bindplane 提取的数据会标记一个与直接提取的数据流不同的 collectorID。为了支持完整的日志可见性,您必须在查询数据源时选择所有提取方法,或者在与 API 互动时明确引用相关的 collectorID

请按照以下步骤在 Google SecOps 中预配和部署新的日志处理流水线,通常使用 Bindplane 控制台:

  1. 创建新的 SecOps 流水线
  2. 配置数据处理
    1. 配置直播
    2. 配置处理器
  3. 推出数据处理流水线

创建新的 Google SecOps 流水线

Google SecOps 流水线是一个容器,您可以在其中配置一个数据处理容器。如需创建新的 Google SecOps 流水线容器,请执行以下操作:

  1. Bindplane 控制台中,点击 SecOps 流水线标签页以打开 SecOps 流水线页面。
  2. 点击创建 SecOps 流水线
  3. 创建新的 SecOps 流水线窗口中,将 SecOps 流水线类型设置为 Google SecOps(默认)。
  4. 输入 SecOps 流水线名称说明

  5. 点击创建。您可以在 SecOps 流水线页面上看到新的流水线容器。

  6. 配置数据处理容器中的数据流和处理器。

配置数据处理容器

数据处理容器用于指定要注入的数据和用于在数据流向 Google SecOps 目标实例时操控数据的处理器(例如,过滤、转换或遮盖)。

流水线配置卡片是数据处理流水线的可视化表示形式,您可以在其中配置数据处理器节点

  • 会根据其配置的规范提取数据,并将其馈送到容器中。一个数据处理容器可以包含一个或多个流,每个流都配置为不同的流。
  • 处理方节点由处理器组成,这些处理器会在数据流向 Google SecOps 目标实例时对数据进行处理。

如需配置数据处理容器,请执行以下操作:

  1. 创建新的 SecOps 流水线
  2. Bindplane 控制台中,点击 SecOps 流水线标签页以打开 SecOps 流水线页面。
  3. 选择要配置新数据处理容器的 SecOps 流水线。
  4. 流水线配置卡片上:

    1. 添加数据流
    2. 配置处理器节点。如需使用 Bindplane 控制台添加处理器,请参阅配置处理器了解详情。
  5. 完成这些配置后,请参阅推出数据处理以开始处理数据。

添加数据流

如需添加直播,请执行以下操作:

  1. 流水线配置卡片中,点击 添加 图标 添加信息流,打开创建信息流窗口。
  2. 创建 SecOps 数据流窗口中,输入以下字段的详细信息:

    字段 说明
    日志类型 选择要注入的数据的日志类型。例如 CrowdStrike Falcon (CS_EDR)
    注意 警告 图标表示相应日志类型已在另一个数据流(在此流水线中或 Google SecOps 实例中的另一个流水线中)中配置。

    如需使用不可用的日志类型,您必须先从其他数据流配置中将其删除。

    如需了解如何查找配置了日志类型的流配置,请参阅过滤 SecOps 流水线配置
    注入方法 选择用于注入所选日志类型的数据的注入方法。这些提取方法之前已为您的 Google SecOps 实例定义。

    您必须选择以下选项之一:

    • 所有注入方法:包含所选日志类型的所有注入方法。选择此选项后,您将无法为同一日志类型添加使用特定提取方法的后续数据流。例外情况:您可以在其他数据流中为此日志类型选择其他未配置的特定提取方法。
    • 特定提取方法,例如 Cloud Native IngestionFeedIngestion APIWorkspace
    Feed 如果您选择 Feed 作为注入方法,系统随后会显示一个字段,其中包含所选日志类型的可用 Feed 名称(在 Google SecOps 实例中预配置)。您必须选择相关 Feed 才能完成配置。如需查看和管理可用的 Feed,请依次前往 SIEM 设置 >“Feed”表格

  3. 点击添加数据流以保存新数据流。新数据流会立即显示在流水线配置卡片上。该数据流会自动连接到处理器节点和 Google SecOps 目标位置

过滤 SecOps 流水线配置

借助 SecOps 流水线页面上的搜索栏,您可以根据多个配置元素过滤和查找 SecOps 流水线(数据处理容器)。您可以搜索特定条件(例如日志类型、提取方法或 Feed 名称)来过滤流水线。

使用以下语法进行过滤:

  • logtype:value
  • ingestionmethod:value
  • feed:value

例如,如需识别包含特定日志类型的流配置,请在搜索栏中输入 logtype:,然后从结果列表中选择相应日志类型。

配置处理器

数据处理容器具有一个处理器节点,该节点包含一个或多个处理器。每个处理器会按顺序处理流数据:

  1. 第一个处理器处理原始流数据。
  2. 第一个处理器的输出结果会立即成为序列中下一个处理器的输入。
  3. 此序列会按处理器窗格中显示的顺序,继续处理所有后续处理器,其中一个处理器的输出会成为下一个处理器的输入。

下表列出了处理器:

处理器类型 能力
过滤 按条件过滤
过滤 按 HTTP 状态过滤
过滤 按指标名称过滤
过滤 按正则表达式过滤
过滤 按严重程度过滤
隐去 隐去敏感数据
转换 添加字段
转换 合并
转换 Concat
转换 复制字段
转换 删除字段
转换 Marshal
转换 移动字段
转换 解析 CSV
转换 解析 JSON
转换 解析键值对
转换 解析严重程度字段
转换 解析时间戳
转换 使用正则表达式进行解析
转换 解析 XML
转换 重命名字段
转换 重写时间戳
转换 拆分
转换 转换
  1. 通过添加、移除或更改一个或多个处理器的顺序来配置处理器节点。

添加处理器

如需添加处理器,请按以下步骤操作:

  1. 流水线配置卡片中,点击处理器节点以打开修改处理器窗口。 修改处理器窗口分为以下窗格,按数据流排列:

    • 输入(或源数据):最近传入的流日志数据(处理前)
    • 配置(或处理器列表):处理器及其配置
    • 输出(或结果):最近的传出结果日志数据(处理后)

    如果流水线之前已推出,系统会在窗格中显示最近的传入日志数据(处理前)和最近的传出日志数据(处理后)。

  2. 点击添加处理器以显示处理器列表。 为方便起见,处理器列表按处理器类型分组。如需整理列表并添加您自己的处理器组合,请选择一个或多个处理器,然后点击添加新的处理器组合

  3. 在处理器列表中,选择要添加的处理器

  4. 根据需要配置处理器。

  5. 点击保存,以将处理器配置保存在处理器节点中。

系统会立即处理来自输入窗格的传入流数据的新鲜样本,从而测试新配置,并在输出窗格中显示生成的传出数据。

推出数据处理流水线

完成数据流和处理器配置后,您必须推出流水线才能开始处理数据,具体操作如下:

  1. 点击开始发布。这会立即激活数据处理,并让 Google 的安全基础架构开始根据您的配置处理数据。

  2. 如果发布成功,数据处理容器的版本号会递增,并显示在容器的名称旁边。

在 Google SecOps 控制台中查看数据处理详情

以下部分介绍了如何通过 Google SecOps 控制台查看数据处理详情:

查看所有数据处理配置

  1. 在 Google SecOps 控制台中,前往 SIEM 设置 > 数据处理,您可以在其中查看所有已配置的流水线。
  2. 入站数据流水线搜索栏中,搜索您构建的任何流水线。您可以按元素(例如流水线名称组件)进行搜索。搜索结果会显示流水线的处理器及其配置摘要。
  3. 在流水线摘要中,您可以执行以下任一操作:
    • 检查处理器配置。
    • 复制配置详细信息。
    • 点击 Open in Bindplane(在 Bindplane 中打开),即可直接在 Bindplane 控制台中访问和管理流水线。

查看已配置的 Feed

如需查看系统中的已配置 Feed,请执行以下操作:

  1. 在 Google SecOps 控制台中,依次前往 SIEM 设置 > FeedFeed 页面会显示您在系统中配置的所有 Feed。
  2. 将指针悬停在每行上,以显示 ⋮ 更多菜单,您可以在其中查看 Feed 详细信息、修改、停用或删除 Feed。
  3. 点击查看详情以查看详情窗口。
  4. 点击 Open in Bindplane(在 Bindplane 中打开),在 Bindplane 控制台中打开相应 Feed 的数据流配置。

查看数据处理详情(可用的日志类型)

如需在可用日志类型页面上查看数据处理详情(您可以在该页面上查看所有可用的日志类型),请执行以下操作:

  1. 在 Google SecOps 控制台中,前往 SIEM 设置 > 可用日志类型。主页面会显示您的所有日志类型。
  2. 将指针悬停在每个 Feed 行上,以显示 more_vert 更多菜单。您可以通过此菜单查看、修改、停用或删除 Feed 详细信息。
  3. 点击查看数据处理以查看 Feed 的配置。
  4. 点击 Open in Bindplane(在 Bindplane 中打开),在 Bindplane 控制台中打开相应处理器的处理器配置。

使用 Google SecOps 数据流水线方法

Google SecOps 数据流水线方法提供了全面的工具来管理处理后的数据。这些数据流水线方法包括创建、更新、删除和列出流水线,以及将 Feed 和日志类型与流水线相关联。

使用场景

本部分包含与数据流水线方法相关的典型用例示例。

如需使用本部分中的示例,请执行以下操作:

  • 将特定于客户的参数(例如网址和 Feed ID)替换为适合您自己环境的参数。
  • 插入您自己的身份验证。在本部分中,不记名令牌已使用 ****** 进行编辑。

列出特定 Google SecOps 实例中的所有流水线

以下命令会列出特定 Google SecOps 实例中的所有流水线:

curl --location 'https://abc.def.googleapis.com/v123/projects/projectabc-byop/locations/us/instances/aaaa1aa1-111a-11a1-1111-11111a1aa1aa/logProcessingPipelines' \
--header 'Authorization: Bearer ******'

使用一个处理器创建基本流水线

如需创建包含转换处理器的基本流水线并将其与三个来源相关联,请执行以下操作:

  1. 运行以下命令:

    curl --location 'https://abc.def.googleapis.com/v123/projects/projectabc-byop/locations/us/instances/aaaa1aa1-111a-11a1-1111-11111a1aa1aa/logProcessingPipelines' \
    --header 'Content-Type: application/json' \
    --header 'Authorization: Bearer ******' \
    --data '{
        "displayName": "Example Pipeline",
        "description": "My description",
        "processors": [
            {
                "transformProcessor": {
                    "statements": [
                        "set(attributes[\"myKey1\"], \"myVal1\")",
                        "set(attributes[\"myKey2\"], \"myVal2\")"
                    ]
                }
            }
        ]
    }'
    
  2. 从响应中复制 displayName 字段的值。

  3. 运行以下命令,将三个数据流(日志类型、带有收集器 ID 的日志类型和 Feed)与流水线相关联。使用 displayName 字段的值作为 {pipelineName} 值。

    curl --location 'https://abc.def.googleapis.com/v123/{pipelineName}:associateStreams' \
        --header 'Content-Type: application/json' \
    --header 'Authorization: Bearer ******' \
    --data '{
        "streams": [
            {
                "logType": "MICROSOFT_SENTINEL"
            },
            {
                "logType": "A10_LOAD_BALANCER",
                "collectorId": "dddddddd-dddd-dddd-dddd-dddddddddddd"
            },
            {
                "feed": "1a1a1a1a-1a1a-1a1a-1a1a-1a1a1a1a1a1a"
            }
        ]
    }'
    

创建包含三个处理器的流水线

该流水线使用以下处理器:

  • 转换:将日志正文转换为 JSON 并进行解析。
  • 过滤:根据已解析的正文过滤出符合条件的日志。
  • 隐去:隐去与 Bindplane 的预设敏感值匹配的数据。

如需创建流水线并将其与三个来源相关联,请执行以下操作:

  1. 运行以下命令:

    curl --location 'https://abc.def.googleapis.com/v123/projects/projectabc-byop/locations/us/instances/aaaa1aa1-111a-11a1-1111-11111a1aa1aa/logProcessingPipelines' \
    --header 'Content-Type: application/json' \
    --header 'Authorization: Bearer ******' \
    --data-raw '{
        "displayName": "My Pipeline 2",
        "description": "My description 2",
        "processors": [
            {
                "transformProcessor": {
                    "statements": [
                        "merge_maps(\n  body,\n  ParseJSON(\n    body\n  ),\n  \"upsert\"\n) where IsMap(body) and true\n",
                        "set(\n  body,\n  ParseJSON(\n    body\n  )\n) where not IsMap(body) and true\n"
                    ],
                    "errorMode": "IGNORE"
                }
            },
            {
                "filterProcessor": {
                    "logConditions": [
                        "true and severity_number != 0 and severity_number < 9"
                    ]
                }
            },
            {
                "redactProcessor": {
                    "blockedValues": [
                        "\\b[A-Z]{2}\\d{2}(?: ?[A-Z0-9]){11,31}(?:\\s[A-Z0-9])*\\b",
                        "\\b([0-9A-Fa-f]{2}[:-]){5}[0-9A-Fa-f]{2}\\b",
                        "\\b(?:(?:(?:\\d{4}[- ]?){3}\\d{4}|\\d{15,16}))\\b",
                        "\\b(?:(?:19|20)?\\d{2}[-/])?(?:0?[1-9]|1[0-2])[-/](?:0?[1-9]|[12]\\d|3[01])(?:[-/](?:19|20)?\\d{2})?\\b",
                        "\\b[a-zA-Z0-9._/\\+\\-—|]+@[A-Za-z0-9\\-—|]+\\.[a-zA-Z|]{2,6}\\b",
                        "\\b(?:(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])\\.){3}(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])\\b",
                        "\\b(?:[0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}\\b",
                        "\\b((\\+|\\b)[1l][\\-\\. ])?\\(?\\b[\\dOlZSB]{3,5}([\\-\\. ]|\\) ?)[\\dOlZSB]{3}[\\-\\. ][\\dOlZSB]{4}\\b",
                        "\\+[1-9]\\d{0,2}(?:[-.\\s]?\\(?\\d+\\)?(?:[-.\\s]?\\d+)*)\\b",
                        "\\b\\d{3}[- ]\\d{2}[- ]\\d{4}\\b",
                        "\\b[A-Z][A-Za-z\\s\\.]+,\\s{0,1}[A-Z]{2}\\b",
                        "\\b\\d+\\s[A-z]+\\s[A-z]+(\\s[A-z]+)?\\s*\\d*\\b",
                        "\\b\\d{5}(?:[-\\s]\\d{4})?\\b",
                        "\\b[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[1-5][0-9a-fA-F]{3}-[89abAB][0-9a-fA-F]{3}-[0-9a-fA-F]{12}\\b"
                    ],
                    "allowAllKeys": true,
                    "allowedKeys": [
                        "__bindplane_id__"
                    ],
                    "ignoredKeys": [
                        "__bindplane_id__"
                    ],
                    "redactAllTypes": true
                }
            }
        ]
    }'
    
  2. 从响应中复制 displayName 字段的值。

  3. 运行以下命令,将三个数据流(日志类型、带有收集器 ID 的日志类型和 Feed)与流水线相关联。使用 displayName 字段的值作为 {pipelineName} 值。

    curl --location 'https://abc.def.googleapis.com/v123/{pipelineName}:associateStreams' \
        --header 'Content-Type: application/json' \
    --header 'Authorization: Bearer ******' \
    --data '{
        "streams": [
            {
                "logType": "MICROSOFT_SENTINEL"
            },
            {
                "logType": "A10_LOAD_BALANCER",
                "collectorId": "dddddddd-dddd-dddd-dddd-dddddddddddd"
            },
            {
                "feed": "1a1a1a1a-1a1a-1a1a-1a1a-1a1a1a1a1a1a"
            }
        ]
    }'
    

需要更多帮助?获得社区成员和 Google SecOps 专业人士的解答。