收集 Elastic Packet Beats 日志

支持的平台:

本文档介绍了如何使用 Bindplane 将 Elastic Packet Beats 日志注入到 Google Security Operations。解析器首先会为 Elastic Packet Beats 日志中找到的各个字段初始化默认值。然后,它使用 grok 模式和 json 过滤器的组合从日志消息中提取数据,执行数据类型转换,并根据事件数据集类型(例如 flow、dns、http、tls、dhcpv4)将提取的字段映射到统一数据模型 (UDM) 中的相应字段。

准备工作

请确保满足以下前提条件:

  • Google SecOps 实例。
  • 搭载 systemd 的 Windows 2016 或更高版本或 Linux 主机。
  • 如果通过代理运行,请确保防火墙端口已根据 Bindplane 代理要求打开。
  • 对 Elastic Packet Beats 管理控制台或设备的特权访问权限。
  • Logstash 已安装并配置。

获取 Google SecOps 注入身份验证文件

  1. 登录 Google SecOps 控制台。
  2. 依次前往 SIEM 设置 > 收集代理
  3. 下载注入身份验证文件
    • 将文件安全地保存在将要安装 Bindplane 的系统上。

获取 Google SecOps 客户 ID

  1. 登录 Google SecOps 控制台。
  2. 依次前往 SIEM 设置 > 个人资料
  3. 复制并保存组织详细信息部分中的客户 ID

安装 Bindplane 代理

按照以下说明在 Windows 或 Linux 操作系统上安装 Bindplane 代理。

Windows 安装

  1. 以管理员身份打开命令提示符PowerShell
  2. 运行以下命令:

    msiexec /i "https://github.com/observIQ/bindplane-agent/releases/latest/download/observiq-otel-collector.msi" /quiet
    

Linux 安装

  1. 打开具有 root 或 sudo 权限的终端。
  2. 运行以下命令:

    sudo sh -c "$(curl -fsSlL https://github.com/observiq/bindplane-agent/releases/latest/download/install_unix.sh)" install_unix.sh
    

其他安装资源

如需了解其他安装选项,请参阅安装指南

配置 Bindplane 代理以注入 Syslog 并将其发送到 Google SecOps

  1. 访问配置文件:

    • 找到 config.yaml 文件。通常,它位于 Linux 上的 /etc/bindplane-agent/ 目录中,或位于 Windows 上的安装目录中。
    • 使用文本编辑器(例如 nanovi 或记事本)打开该文件。
  2. 按如下方式修改 config.yaml 文件:

    receivers:
      udplog:
        # Replace the port and IP address as required
        listen_address: "0.0.0.0:514"
    
    exporters:
      chronicle/chronicle_w_labels:
        compression: gzip
        # Adjust the path to the credentials file you downloaded in Step 1
        creds_file_path: '/path/to/ingestion-authentication-file.json'
        # Replace with your actual customer ID from Step 2
        customer_id: YOUR_CUSTOMER_ID
        endpoint: malachiteingestion-pa.googleapis.com
        # Add optional ingestion labels for better organization
        log_type: 'ELASTIC_PACKETBEATS'
        raw_log_field: body
        ingestion_labels:
    
    service:
      pipelines:
        logs/source0__chronicle_w_labels-0:
          receivers:
            - udplog
          exporters:
            - chronicle/chronicle_w_labels
    
    • 根据基础架构的需要替换端口和 IP 地址。
    • YOUR_CUSTOMER_ID 替换为实际的客户 ID。
    • /path/to/ingestion-authentication-file.json 更新为获取 Google SecOps 提取身份验证文件部分中保存身份验证文件的路径。

重启 Bindplane 代理以应用更改

  • 如需在 Linux 中重启 Bindplane 代理,请运行以下命令:

    sudo systemctl restart observiq-otel-collector
    
  • 如需在 Windows 中重启 Bindplane 代理,您可以使用服务控制台,也可以输入以下命令:

    net stop observiq-otel-collector && net start observiq-otel-collector
    

在 Elastic Packet Beats 上配置 Syslog 转发

由于 Packetbeat 不支持直接 syslog 输出,您必须使用 Logstash 作为中介。

配置 Packetbeat 以将日志发送到 Logstash

  1. 登录 Elastic Packet Beats 管理控制台
  2. 依次前往设置 > 日志转发
  3. 点击 + 添加启用按钮。
  4. 提供以下配置详细信息:
    • 名称:输入一个描述性名称(例如 Logstash Output)。
    • 主机:输入 Logstash 服务器 IP 地址。
    • 端口:输入 Logstash Beats 输入端口(通常为 5044)。
    • 协议:选择 Beats 协议
    • 格式:选择 JSON
    • 时区:选择世界协调时间 (UTC) 时区,以确保各系统之间的时间一致。
    • 前往事件部分,然后选择相关的日志类型或全部
  5. 保存配置。

    替代方法:直接修改 packetbeat.yml

    # /etc/packetbeat/packetbeat.yml
    packetbeat.protocols:
      - type: dns
        ports: [53]
      - type: http
        ports: [80, 8080, 8000, 5000, 8002]
        send_headers: true
        send_all_headers: true
      - type: tls
        ports: [443, 993, 995, 5223, 8443, 8883, 9243]
      - type: dhcpv4
        ports: [67, 68]
    
    # Enable processors for additional fields
    processors:
      - add_network_direction:
          source: private
          destination: private
          internal_networks:
            - private
      - community_id:
    
    # Send to Logstash using beats protocol
    output.logstash:
      hosts: ["LOGSTASH_IP:5044"]
    

    LOGSTASH_IP 替换为 Logstash 服务器的 IP 地址。

配置 Logstash 以使用 Syslog 转发到 BindPlane

  1. 创建 Logstash 流水线配置文件:

    sudo nano /etc/logstash/conf.d/packetbeat-to-bindplane.conf
    
  2. 添加以下配置:

    # Receive from Packetbeat
    input {
      beats {
        port => 5044
      }
    }
    
    # Optional: Add filters for data enrichment
    filter {
      # Preserve original message structure
      mutate {
        copy => { "@metadata" => "[@metadata_backup]" }
      }
    }
    
    # Send to BindPlane via syslog
    output {
      syslog {
        host => "BINDPLANE_IP"
        port => 514
        protocol => "udp"
        rfc => "rfc5424"
        facility => "local0"
        severity => "informational"
        sourcehost => "%{[agent][hostname]}"
        appname => "packetbeat"
        procid => "%{[agent][id]}"
        msgid => "ELASTIC_PACKETBEATS"
        structured_data => "packetbeat@32473"
        message => "%{message}"
      }
    }
    

    BINDPLANE_IP 替换为 BindPlane 代理的 IP 地址。

  • 重启 Logstash 以应用配置:

    sudo systemctl restart logstash
    

UDM 映射表

日志字段 UDM 映射 逻辑
@timestamp metadata.event_timestamp 直接从原始日志字段 @timestamp 映射。
agent.hostname observer.hostname 直接从原始日志字段 agent.hostname 映射。
agent.id observer.asset_id agent.type 连接以形成 observer.asset_id 字段。
agent.type observer.application 直接从原始日志字段 agent.type 映射。
agent.version observer.platform_version 直接从原始日志字段 agent.version 映射。
audit_category security_result.category_details 直接从原始日志字段 audit_category 映射。
audit_cluster_name additional.fields.audit_cluster_name.value.string_value 直接从原始日志字段 audit_cluster_name 映射。
audit_node_host_address observer.ip 直接从原始日志字段 audit_node_host_address 映射。
audit_node_id additional.fields.audit_node_id.value.string_value 直接从原始日志字段 audit_node_id 映射。
audit_node_name additional.fields.audit_node_name.value.string_value 直接从原始日志字段 audit_node_name 映射。
audit_request_effective_user observer.user.userid 直接从原始日志字段 audit_request_effective_user 映射。
audit_request_initiating_user additional.fields.audit_request_initiating_user.value.string_value 直接从原始日志字段 audit_request_initiating_user 映射。
audit_request_remote_address observer.ip 如果与 audit_node_host_address 不同,则直接从原始日志字段 audit_request_remote_address 映射。
client.bytes network.received_bytes(入站)/ network.sent_bytes(出站) 根据 network.direction 字段进行映射。如果是 INBOUND,则映射到 network.received_bytes。如果是 OUTBOUND,则映射到 network.sent_bytes
client.ip target.ip(入站)/ principal.ip(出站) 根据 network.direction 字段进行映射。如果是 INBOUND,则映射到 target.ip。如果是 OUTBOUND,则映射到 principal.ip
client.port target.port(入站)/ principal.port(出站) 根据 network.direction 字段进行映射。如果是 INBOUND,则映射到 target.port。如果是 OUTBOUND,则映射到 principal.port
cluster.uuid additional.fields.uuid.value.string_value 直接从原始日志字段 cluster.uuid 映射。
组件 additional.fields.component.value.string_value 直接从原始日志字段 component 映射。
destination.bytes network.sent_bytes 直接从原始日志字段 destination.bytes 映射(针对 FLOW 事件)。
destination.ip target.ip 如果 network.direction 不是 INBOUND 或 OUTBOUND,则直接从原始日志字段 destination.ip 映射。
destination.mac target.mac 直接从原始日志字段 destination.mac 映射(针对 FLOW 事件)。
destination.port target.port 直接从原始日志字段 destination.port 映射(针对 FLOW 事件)。
dhcpv4.assigned_ip network.dhcp.requested_address 直接从原始日志字段 dhcpv4.assigned_ip 映射。
dhcpv4.client_ip network.dhcp.yiaddr (ACK) / network.dhcp.ciaddr (REQUEST) / source.ip (REQUEST,如果 dhcpv4.client_ip 为空) 根据 network.dhcp.type 字段进行映射。如果是 ACK,则映射到 network.dhcp.yiaddr。如果是 REQUEST,则映射到 network.dhcp.ciaddr。如果为 REQUEST 且 dhcpv4.client_ip 为空,则映射到 source.ip
dhcpv4.client_mac network.dhcp.client_identifier 直接从原始日志字段 dhcpv4.client_mac 映射,并将其转换为字节。
dhcpv4.op_code network.dhcp.opcode 根据 dhcpv4.op_code 的值映射到 network.dhcp.opcode。如果 dhcpv4.op_codeBOOTREPLYBOOTREQUEST,则直接映射该值。否则,它会映射到 UNKNOWN_OPCODE
dhcpv4.option.hostname network.dhcp.client_hostname 直接从原始日志字段 dhcpv4.option.hostname 映射。
dhcpv4.option.ip_address_lease_time_sec network.dhcp.lease_time_seconds 直接从原始日志字段 dhcpv4.option.ip_address_lease_time_sec 映射,并将其转换为无符号整数。
dhcpv4.option.message_type network.dhcp.type 根据 dhcpv4.option.message_type 的值映射到 network.dhcp.type。映射关系如下:ack -> ACKnack -> NAKdiscover -> DISCOVERoffer -> OFFERrequest -> REQUESTdecline -> DECLINErelease -> RELEASEinfo -> INFORM。如果该值不是上述值之一,则会映射到 UNKNOWN_MESSAGE_TYPE
dhcpv4.option.server_identifier network.dhcp.sname 直接从原始日志字段 dhcpv4.option.server_identifier 映射。
dns.answers.data network.dns.answers.data 直接从原始日志字段 dns.answers.data 映射。
dns.answers.class network.dns.answers.class 根据 dns.answers.class 的值映射到 network.dns.answers.class。映射关系如下:IN -> 1,NONE -> 254,ANY -> 255。
dns.answers.name network.dns.answers.name 直接从原始日志字段 dns.answers.name 映射。
dns.answers.ttl network.dns.answers.ttl 直接从原始日志字段 dns.answers.ttl 映射,并将其转换为无符号整数。
dns.answers.type network.dns.answers.type 根据 dns.answers.type 的值映射到 network.dns.answers.type。映射关系如下:A -> 1、NS -> 2、CNAME -> 5、SOA -> 6、PTR -> 12、MX -> 15、TXT -> 16、AAAA -> 28、SRV -> 33、NAPTR -> 35、DS -> 43、DNSKEY -> 48、IXFR -> 251、AXFR -> 252、TYPE99 -> 99、TKEY -> 249、ANY -> 255、ALL -> 255、URI -> 256、NULL -> 0。
dns.flags.authoritative network.dns.authoritative 如果原始日志字段 dns.flags.authoritative 为 true,则直接映射。
dns.flags.recursion_available network.dns.recursion_available 如果原始日志字段 dns.flags.recursion_available 为 true,则直接映射。
dns.flags.recursion_desired network.dns.recursion_desired 如果原始日志字段 dns.flags.recursion_desired 为 true,则直接映射。
dns.flags.truncated_response network.dns.truncated 如果原始日志字段 dns.flags.truncated_response 为 true,则直接映射。
dns.id network.dns.id 直接从原始日志字段 dns.id 映射,并将其转换为无符号整数。
dns.question.class network.dns.questions.class 根据 dns.question.class 的值映射到 network.dns.questions.class。映射关系如下:IN -> 1,NONE -> 254,ANY -> 255。
dns.question.name network.dns.questions.name 直接从原始日志字段 dns.question.name 映射。
dns.question.type network.dns.questions.type 根据 dns.question.type 的值映射到 network.dns.questions.type。映射关系如下:A -> 1、NS -> 2、CNAME -> 5、SOA -> 6、PTR -> 12、MX -> 15、TXT -> 16、AAAA -> 28、SRV -> 33、NAPTR -> 35、DS -> 43、DNSKEY -> 48、IXFR -> 251、AXFR -> 252、TYPE99 -> 99、TKEY -> 249、ANY -> 255、ALL -> 255、URI -> 256、NULL -> 0。
dns.resolved_ip network.dns.additional.data 系统会处理 dns.resolved_ip 数组中的每个元素,并将其映射到 network.dns.additional.data 字段。
dns.response_code network.dns.response_code 根据 dns.response_code 的值映射到 network.dns.response_code。映射关系如下:NOERROR -> 0、FORMERR -> 1、SERVFAIL -> 2、NXDOMAIN -> 3、NOTIMP -> 4、REFUSED -> 5、YXDOMAIN -> 6、YXRRSET -> 7、NXRRSET -> 8、NOTAUTH -> 9、NOTZONE -> 10。
error.message security_result.summary status 连接以形成 HTTP 事件的 security_result.summary 字段。
event.dataset metadata.product_event_type 直接从原始日志字段 event.dataset 映射。
flow.final 用于确定流程是否已完成。如果不是,则丢弃该事件。
flow.id network.session_id 直接从原始日志字段 flow.id 映射(针对 FLOW 事件)。
headers.accept_encoding security_result.about.labels.Accept-Encoding 直接从原始日志字段 headers.accept_encoding 映射。
headers.content_length additional.fields.content_length.value.string_value 直接从原始日志字段 headers.content_length 映射。
headers.content_type additional.fields.content_type.value.string_value 直接从原始日志字段 headers.content_type 映射。
headers.http_accept additional.fields.http_accept.value.string_value 直接从原始日志字段 headers.http_accept 映射。
headers.http_host principal.hostname、principal.asset.hostname 直接从原始日志字段 headers.http_host 映射。
headers.http_user_agent network.http.user_agent 直接从原始日志字段 headers.http_user_agent 映射。
headers.request_method network.http.method 直接从原始日志字段 headers.request_method 映射。
headers.x_b3_parentspanid additional.fields.x_b3_parentspanid.value.string_value 直接从原始日志字段 headers.x_b3_parentspanid 映射。
headers.x_b3_sampled additional.fields.x_b3_sampled.value.string_value 直接从原始日志字段 headers.x_b3_sampled 映射。
headers.x_envoy_attempt_count security_result.about.labels.x_envoy_attempt_count 直接从原始日志字段 headers.x_envoy_attempt_count 映射。
headers.x_envoy_original_path additional.fields.x_envoy_original_path.value.string_value 直接从原始日志字段 headers.x_envoy_original_path 映射。
headers.x_forwarded_client_cert additional.fields.client_cert.value.string_value 直接从原始日志字段 headers.x_forwarded_client_cert 映射。
headers.x_forwarded_for principal.ip、principal.asset.ip 使用 grok 提取 IP 地址后,直接从原始日志字段 headers.x_forwarded_for 映射。
headers.x_forwarded_proto additional.fields.x_forwarded_proto.value.string_value 直接从原始日志字段 headers.x_forwarded_proto 映射。
headers.x_request_id additional.fields.x_request_id.value.string_value 直接从原始日志字段 headers.x_request_id 映射。
主机 principal.ip、principal.asset.ip 使用 grok 提取 IP 地址后,直接从原始日志字段 host 映射。
http.request.method network.http.method 直接从原始日志字段 http.request.method 映射。
level security_result.severity 根据 level 的值映射到 security_result.severity。映射关系如下:INFO -> INFORMATIONALERROR -> ERRORWARNING -> LOW
logger additional.fields.logger.value.string_value 直接从原始日志字段 logger 映射。
方法 用于确定事件是否为 DNS 事件。
msg security_result.description 直接从原始日志字段 msg 映射,并移除双引号。
network.community_id network.community_id 直接从原始日志字段 network.community_id 映射。
network.direction network.direction 直接从原始日志字段 network.direction 映射,并将其转换为大写。如果值为 INGRESSINBOUND,则映射到 INBOUND。如果值为 EGRESSOUTBOUND,则映射到 OUTBOUND
network.protocol network.application_protocol 直接从原始日志字段 network.protocol 映射。
network.transport network.ip_protocol 直接从原始日志字段 network.transport 映射(针对 TLS 事件)。
server.bytes network.sent_bytes(入站)/ network.received_bytes(出站) 根据 network.direction 字段进行映射。如果是 INBOUND,则映射到 network.sent_bytes。如果是 OUTBOUND,则映射到 network.received_bytes
server.domain principal.hostname、principal.asset.hostname(入站)/ target.hostname、target.asset.hostname(出站) 根据 network.direction 字段进行映射。如果是 INBOUND,则映射到 principal.hostname。如果是 OUTBOUND,则映射到 target.hostname
server.ip principal.ip、principal.asset.ip(入站)/ target.ip、target.asset.ip(出站) 根据 network.direction 字段进行映射。如果是 INBOUND,则映射到 principal.ip。如果是 OUTBOUND,则映射到 target.ip
server.port principal.port(入站)/ target.port(出站) 根据 network.direction 字段进行映射。如果是 INBOUND,则映射到 principal.port。如果是 OUTBOUND,则映射到 target.port
source.bytes network.received_bytes 直接从原始日志字段 source.bytes 映射(针对 FLOW 事件)。
source.ip principal.ip、principal.asset.ip 直接从原始日志字段 source.ip 映射(针对 FLOW 事件)。
source.mac principal.mac 直接从原始日志字段 source.mac 映射(针对 FLOW 事件)。
source.port principal.port 直接从原始日志字段 source.port 映射(针对 FLOW 事件)。
状态 metadata.description、security_result.summary 如果 level 为空,则映射到 metadata.description。与 error.message 连接以形成 HTTP 和 TLS 事件的 security_result.summary 字段。
tls.client.ja3 network.tls.client.ja3 直接从原始日志字段 tls.client.ja3 映射。
tls.client.server_name network.tls.client.server_name 直接从原始日志字段 tls.client.server_name 映射。
tls.client.supported_ciphers network.tls.client.supported_ciphers 系统会处理 tls.client.supported_ciphers 数组中的每个元素,并将其映射到 network.tls.client.supported_ciphers 数组。
tls.cipher network.tls.cipher 直接从原始日志字段 tls.cipher 映射。
tls.detailed.server_certificate.not_after network.tls.server.certificate.not_after 直接从原始日志字段 tls.detailed.server_certificate.not_after 映射,并将其转换为时间戳。
tls.detailed.server_certificate.not_before network.tls.server.certificate.not_before 直接从原始日志字段 tls.detailed.server_certificate.not_before 映射,并将其转换为时间戳。
tls.detailed.server_certificate.serial_number network.tls.server.certificate.serial 直接从原始日志字段 tls.detailed.server_certificate.serial_number 映射。
tls.detailed.server_certificate.version network.tls.server.certificate.version 直接从原始日志字段 tls.detailed.server_certificate.version 映射,并将其转换为字符串。
tls.established network.tls.established 直接从原始日志字段 tls.established 映射。
tls.next_protocol network.tls.next_protocol 直接从原始日志字段 tls.next_protocol 映射。
tls.resumed network.tls.resumed 直接从原始日志字段 tls.resumed 映射。
tls.server.hash.sha1 network.tls.server.certificate.sha1 直接从原始日志字段 tls.server.hash.sha1 映射,并转换为小写。
tls.server.issuer network.tls.server.certificate.issuer 直接从原始日志字段 tls.server.issuer 映射。
tls.server.subject network.tls.server.certificate.subject 直接从原始日志字段 tls.server.subject 映射。
tls.version network.tls.version 直接从原始日志字段 tls.version 映射。
tls.version_protocol network.tls.version_protocol 直接从原始日志字段 tls.version_protocol 映射。
类型 用于确定事件是否为 DNS 事件。
url.full principal.url(入站)/ target.url(出站) 根据 network.direction 字段进行映射。如果是 INBOUND,则映射到 principal.url。如果是 OUTBOUND,则映射到 target.url
user_id target.user.userid 直接从原始日志字段 user_id 映射。
user_name target.user.user_display_name 直接从原始日志字段 user_name 映射。
metadata.event_type 默认设置为 GENERIC_EVENT。根据日志来源和事件数据更改为特定事件类型。
metadata.vendor_name 默认设置为 Elastic
metadata.product_name 默认设置为 PacketBeat
security_result.action 默认设置为 ALLOW
metadata.log_type 默认设置为 ELASTIC_PACKETBEATS

需要更多帮助?从社区成员和 Google SecOps 专业人士那里获得解答。