收集 Elastic Packet Beats 日志
本文档介绍了如何使用 Bindplane 将 Elastic Packet Beats 日志注入到 Google Security Operations。解析器首先会为 Elastic Packet Beats 日志中找到的各个字段初始化默认值。然后,它使用 grok
模式和 json
过滤器的组合从日志消息中提取数据,执行数据类型转换,并根据事件数据集类型(例如 flow、dns、http、tls、dhcpv4)将提取的字段映射到统一数据模型 (UDM) 中的相应字段。
准备工作
请确保满足以下前提条件:
- Google SecOps 实例。
- 搭载
systemd
的 Windows 2016 或更高版本或 Linux 主机。 - 如果通过代理运行,请确保防火墙端口已根据 Bindplane 代理要求打开。
- 对 Elastic Packet Beats 管理控制台或设备的特权访问权限。
- Logstash 已安装并配置。
获取 Google SecOps 注入身份验证文件
- 登录 Google SecOps 控制台。
- 依次前往 SIEM 设置 > 收集代理。
- 下载注入身份验证文件。
- 将文件安全地保存在将要安装 Bindplane 的系统上。
获取 Google SecOps 客户 ID
- 登录 Google SecOps 控制台。
- 依次前往 SIEM 设置 > 个人资料。
- 复制并保存组织详细信息部分中的客户 ID。
安装 Bindplane 代理
按照以下说明在 Windows 或 Linux 操作系统上安装 Bindplane 代理。
Windows 安装
- 以管理员身份打开命令提示符或 PowerShell。
运行以下命令:
msiexec /i "https://github.com/observIQ/bindplane-agent/releases/latest/download/observiq-otel-collector.msi" /quiet
Linux 安装
- 打开具有 root 或 sudo 权限的终端。
运行以下命令:
sudo sh -c "$(curl -fsSlL https://github.com/observiq/bindplane-agent/releases/latest/download/install_unix.sh)" install_unix.sh
其他安装资源
如需了解其他安装选项,请参阅安装指南。
配置 Bindplane 代理以注入 Syslog 并将其发送到 Google SecOps
访问配置文件:
- 找到
config.yaml
文件。通常,它位于 Linux 上的/etc/bindplane-agent/
目录中,或位于 Windows 上的安装目录中。 - 使用文本编辑器(例如
nano
、vi
或记事本)打开该文件。
- 找到
按如下方式修改
config.yaml
文件:receivers: udplog: # Replace the port and IP address as required listen_address: "0.0.0.0:514" exporters: chronicle/chronicle_w_labels: compression: gzip # Adjust the path to the credentials file you downloaded in Step 1 creds_file_path: '/path/to/ingestion-authentication-file.json' # Replace with your actual customer ID from Step 2 customer_id: YOUR_CUSTOMER_ID endpoint: malachiteingestion-pa.googleapis.com # Add optional ingestion labels for better organization log_type: 'ELASTIC_PACKETBEATS' raw_log_field: body ingestion_labels: service: pipelines: logs/source0__chronicle_w_labels-0: receivers: - udplog exporters: - chronicle/chronicle_w_labels
- 根据基础架构的需要替换端口和 IP 地址。
- 将
YOUR_CUSTOMER_ID
替换为实际的客户 ID。 - 将
/path/to/ingestion-authentication-file.json
更新为获取 Google SecOps 提取身份验证文件部分中保存身份验证文件的路径。
重启 Bindplane 代理以应用更改
如需在 Linux 中重启 Bindplane 代理,请运行以下命令:
sudo systemctl restart observiq-otel-collector
如需在 Windows 中重启 Bindplane 代理,您可以使用服务控制台,也可以输入以下命令:
net stop observiq-otel-collector && net start observiq-otel-collector
在 Elastic Packet Beats 上配置 Syslog 转发
由于 Packetbeat 不支持直接 syslog 输出,您必须使用 Logstash 作为中介。
配置 Packetbeat 以将日志发送到 Logstash
- 登录 Elastic Packet Beats 管理控制台。
- 依次前往设置 > 日志转发。
- 点击 + 添加或启用按钮。
- 提供以下配置详细信息:
- 名称:输入一个描述性名称(例如
Logstash Output
)。 - 主机:输入 Logstash 服务器 IP 地址。
- 端口:输入 Logstash Beats 输入端口(通常为 5044)。
- 协议:选择 Beats 协议。
- 格式:选择 JSON。
- 时区:选择世界协调时间 (UTC) 时区,以确保各系统之间的时间一致。
- 前往事件部分,然后选择相关的日志类型或全部。
- 名称:输入一个描述性名称(例如
保存配置。
替代方法:直接修改 packetbeat.yml:
# /etc/packetbeat/packetbeat.yml packetbeat.protocols: - type: dns ports: [53] - type: http ports: [80, 8080, 8000, 5000, 8002] send_headers: true send_all_headers: true - type: tls ports: [443, 993, 995, 5223, 8443, 8883, 9243] - type: dhcpv4 ports: [67, 68] # Enable processors for additional fields processors: - add_network_direction: source: private destination: private internal_networks: - private - community_id: # Send to Logstash using beats protocol output.logstash: hosts: ["LOGSTASH_IP:5044"]
将
LOGSTASH_IP
替换为 Logstash 服务器的 IP 地址。
配置 Logstash 以使用 Syslog 转发到 BindPlane
创建 Logstash 流水线配置文件:
sudo nano /etc/logstash/conf.d/packetbeat-to-bindplane.conf
添加以下配置:
# Receive from Packetbeat input { beats { port => 5044 } } # Optional: Add filters for data enrichment filter { # Preserve original message structure mutate { copy => { "@metadata" => "[@metadata_backup]" } } } # Send to BindPlane via syslog output { syslog { host => "BINDPLANE_IP" port => 514 protocol => "udp" rfc => "rfc5424" facility => "local0" severity => "informational" sourcehost => "%{[agent][hostname]}" appname => "packetbeat" procid => "%{[agent][id]}" msgid => "ELASTIC_PACKETBEATS" structured_data => "packetbeat@32473" message => "%{message}" } }
将
BINDPLANE_IP
替换为 BindPlane 代理的 IP 地址。
重启 Logstash 以应用配置:
sudo systemctl restart logstash
UDM 映射表
日志字段 | UDM 映射 | 逻辑 |
---|---|---|
@timestamp | metadata.event_timestamp | 直接从原始日志字段 @timestamp 映射。 |
agent.hostname | observer.hostname | 直接从原始日志字段 agent.hostname 映射。 |
agent.id | observer.asset_id | 与 agent.type 连接以形成 observer.asset_id 字段。 |
agent.type | observer.application | 直接从原始日志字段 agent.type 映射。 |
agent.version | observer.platform_version | 直接从原始日志字段 agent.version 映射。 |
audit_category | security_result.category_details | 直接从原始日志字段 audit_category 映射。 |
audit_cluster_name | additional.fields.audit_cluster_name.value.string_value | 直接从原始日志字段 audit_cluster_name 映射。 |
audit_node_host_address | observer.ip | 直接从原始日志字段 audit_node_host_address 映射。 |
audit_node_id | additional.fields.audit_node_id.value.string_value | 直接从原始日志字段 audit_node_id 映射。 |
audit_node_name | additional.fields.audit_node_name.value.string_value | 直接从原始日志字段 audit_node_name 映射。 |
audit_request_effective_user | observer.user.userid | 直接从原始日志字段 audit_request_effective_user 映射。 |
audit_request_initiating_user | additional.fields.audit_request_initiating_user.value.string_value | 直接从原始日志字段 audit_request_initiating_user 映射。 |
audit_request_remote_address | observer.ip | 如果与 audit_node_host_address 不同,则直接从原始日志字段 audit_request_remote_address 映射。 |
client.bytes | network.received_bytes(入站)/ network.sent_bytes(出站) | 根据 network.direction 字段进行映射。如果是 INBOUND,则映射到 network.received_bytes 。如果是 OUTBOUND,则映射到 network.sent_bytes 。 |
client.ip | target.ip(入站)/ principal.ip(出站) | 根据 network.direction 字段进行映射。如果是 INBOUND,则映射到 target.ip 。如果是 OUTBOUND,则映射到 principal.ip 。 |
client.port | target.port(入站)/ principal.port(出站) | 根据 network.direction 字段进行映射。如果是 INBOUND,则映射到 target.port 。如果是 OUTBOUND,则映射到 principal.port 。 |
cluster.uuid | additional.fields.uuid.value.string_value | 直接从原始日志字段 cluster.uuid 映射。 |
组件 | additional.fields.component.value.string_value | 直接从原始日志字段 component 映射。 |
destination.bytes | network.sent_bytes | 直接从原始日志字段 destination.bytes 映射(针对 FLOW 事件)。 |
destination.ip | target.ip | 如果 network.direction 不是 INBOUND 或 OUTBOUND,则直接从原始日志字段 destination.ip 映射。 |
destination.mac | target.mac | 直接从原始日志字段 destination.mac 映射(针对 FLOW 事件)。 |
destination.port | target.port | 直接从原始日志字段 destination.port 映射(针对 FLOW 事件)。 |
dhcpv4.assigned_ip | network.dhcp.requested_address | 直接从原始日志字段 dhcpv4.assigned_ip 映射。 |
dhcpv4.client_ip | network.dhcp.yiaddr (ACK) / network.dhcp.ciaddr (REQUEST) / source.ip (REQUEST,如果 dhcpv4.client_ip 为空) | 根据 network.dhcp.type 字段进行映射。如果是 ACK,则映射到 network.dhcp.yiaddr 。如果是 REQUEST,则映射到 network.dhcp.ciaddr 。如果为 REQUEST 且 dhcpv4.client_ip 为空,则映射到 source.ip 。 |
dhcpv4.client_mac | network.dhcp.client_identifier | 直接从原始日志字段 dhcpv4.client_mac 映射,并将其转换为字节。 |
dhcpv4.op_code | network.dhcp.opcode | 根据 dhcpv4.op_code 的值映射到 network.dhcp.opcode 。如果 dhcpv4.op_code 为 BOOTREPLY 或 BOOTREQUEST ,则直接映射该值。否则,它会映射到 UNKNOWN_OPCODE 。 |
dhcpv4.option.hostname | network.dhcp.client_hostname | 直接从原始日志字段 dhcpv4.option.hostname 映射。 |
dhcpv4.option.ip_address_lease_time_sec | network.dhcp.lease_time_seconds | 直接从原始日志字段 dhcpv4.option.ip_address_lease_time_sec 映射,并将其转换为无符号整数。 |
dhcpv4.option.message_type | network.dhcp.type | 根据 dhcpv4.option.message_type 的值映射到 network.dhcp.type 。映射关系如下:ack -> ACK 、nack -> NAK 、discover -> DISCOVER 、offer -> OFFER 、request -> REQUEST 、decline -> DECLINE 、release -> RELEASE 、info -> INFORM 。如果该值不是上述值之一,则会映射到 UNKNOWN_MESSAGE_TYPE 。 |
dhcpv4.option.server_identifier | network.dhcp.sname | 直接从原始日志字段 dhcpv4.option.server_identifier 映射。 |
dns.answers.data | network.dns.answers.data | 直接从原始日志字段 dns.answers.data 映射。 |
dns.answers.class | network.dns.answers.class | 根据 dns.answers.class 的值映射到 network.dns.answers.class 。映射关系如下:IN -> 1,NONE -> 254,ANY -> 255。 |
dns.answers.name | network.dns.answers.name | 直接从原始日志字段 dns.answers.name 映射。 |
dns.answers.ttl | network.dns.answers.ttl | 直接从原始日志字段 dns.answers.ttl 映射,并将其转换为无符号整数。 |
dns.answers.type | network.dns.answers.type | 根据 dns.answers.type 的值映射到 network.dns.answers.type 。映射关系如下:A -> 1、NS -> 2、CNAME -> 5、SOA -> 6、PTR -> 12、MX -> 15、TXT -> 16、AAAA -> 28、SRV -> 33、NAPTR -> 35、DS -> 43、DNSKEY -> 48、IXFR -> 251、AXFR -> 252、TYPE99 -> 99、TKEY -> 249、ANY -> 255、ALL -> 255、URI -> 256、NULL -> 0。 |
dns.flags.authoritative | network.dns.authoritative | 如果原始日志字段 dns.flags.authoritative 为 true,则直接映射。 |
dns.flags.recursion_available | network.dns.recursion_available | 如果原始日志字段 dns.flags.recursion_available 为 true,则直接映射。 |
dns.flags.recursion_desired | network.dns.recursion_desired | 如果原始日志字段 dns.flags.recursion_desired 为 true,则直接映射。 |
dns.flags.truncated_response | network.dns.truncated | 如果原始日志字段 dns.flags.truncated_response 为 true,则直接映射。 |
dns.id | network.dns.id | 直接从原始日志字段 dns.id 映射,并将其转换为无符号整数。 |
dns.question.class | network.dns.questions.class | 根据 dns.question.class 的值映射到 network.dns.questions.class 。映射关系如下:IN -> 1,NONE -> 254,ANY -> 255。 |
dns.question.name | network.dns.questions.name | 直接从原始日志字段 dns.question.name 映射。 |
dns.question.type | network.dns.questions.type | 根据 dns.question.type 的值映射到 network.dns.questions.type 。映射关系如下:A -> 1、NS -> 2、CNAME -> 5、SOA -> 6、PTR -> 12、MX -> 15、TXT -> 16、AAAA -> 28、SRV -> 33、NAPTR -> 35、DS -> 43、DNSKEY -> 48、IXFR -> 251、AXFR -> 252、TYPE99 -> 99、TKEY -> 249、ANY -> 255、ALL -> 255、URI -> 256、NULL -> 0。 |
dns.resolved_ip | network.dns.additional.data | 系统会处理 dns.resolved_ip 数组中的每个元素,并将其映射到 network.dns.additional.data 字段。 |
dns.response_code | network.dns.response_code | 根据 dns.response_code 的值映射到 network.dns.response_code 。映射关系如下:NOERROR -> 0、FORMERR -> 1、SERVFAIL -> 2、NXDOMAIN -> 3、NOTIMP -> 4、REFUSED -> 5、YXDOMAIN -> 6、YXRRSET -> 7、NXRRSET -> 8、NOTAUTH -> 9、NOTZONE -> 10。 |
error.message | security_result.summary | 与 status 连接以形成 HTTP 事件的 security_result.summary 字段。 |
event.dataset | metadata.product_event_type | 直接从原始日志字段 event.dataset 映射。 |
flow.final | 用于确定流程是否已完成。如果不是,则丢弃该事件。 | |
flow.id | network.session_id | 直接从原始日志字段 flow.id 映射(针对 FLOW 事件)。 |
headers.accept_encoding | security_result.about.labels.Accept-Encoding | 直接从原始日志字段 headers.accept_encoding 映射。 |
headers.content_length | additional.fields.content_length.value.string_value | 直接从原始日志字段 headers.content_length 映射。 |
headers.content_type | additional.fields.content_type.value.string_value | 直接从原始日志字段 headers.content_type 映射。 |
headers.http_accept | additional.fields.http_accept.value.string_value | 直接从原始日志字段 headers.http_accept 映射。 |
headers.http_host | principal.hostname、principal.asset.hostname | 直接从原始日志字段 headers.http_host 映射。 |
headers.http_user_agent | network.http.user_agent | 直接从原始日志字段 headers.http_user_agent 映射。 |
headers.request_method | network.http.method | 直接从原始日志字段 headers.request_method 映射。 |
headers.x_b3_parentspanid | additional.fields.x_b3_parentspanid.value.string_value | 直接从原始日志字段 headers.x_b3_parentspanid 映射。 |
headers.x_b3_sampled | additional.fields.x_b3_sampled.value.string_value | 直接从原始日志字段 headers.x_b3_sampled 映射。 |
headers.x_envoy_attempt_count | security_result.about.labels.x_envoy_attempt_count | 直接从原始日志字段 headers.x_envoy_attempt_count 映射。 |
headers.x_envoy_original_path | additional.fields.x_envoy_original_path.value.string_value | 直接从原始日志字段 headers.x_envoy_original_path 映射。 |
headers.x_forwarded_client_cert | additional.fields.client_cert.value.string_value | 直接从原始日志字段 headers.x_forwarded_client_cert 映射。 |
headers.x_forwarded_for | principal.ip、principal.asset.ip | 使用 grok 提取 IP 地址后,直接从原始日志字段 headers.x_forwarded_for 映射。 |
headers.x_forwarded_proto | additional.fields.x_forwarded_proto.value.string_value | 直接从原始日志字段 headers.x_forwarded_proto 映射。 |
headers.x_request_id | additional.fields.x_request_id.value.string_value | 直接从原始日志字段 headers.x_request_id 映射。 |
主机 | principal.ip、principal.asset.ip | 使用 grok 提取 IP 地址后,直接从原始日志字段 host 映射。 |
http.request.method | network.http.method | 直接从原始日志字段 http.request.method 映射。 |
level | security_result.severity | 根据 level 的值映射到 security_result.severity 。映射关系如下:INFO -> INFORMATIONAL 、ERROR -> ERROR 、WARNING -> LOW 。 |
logger | additional.fields.logger.value.string_value | 直接从原始日志字段 logger 映射。 |
方法 | 用于确定事件是否为 DNS 事件。 | |
msg | security_result.description | 直接从原始日志字段 msg 映射,并移除双引号。 |
network.community_id | network.community_id | 直接从原始日志字段 network.community_id 映射。 |
network.direction | network.direction | 直接从原始日志字段 network.direction 映射,并将其转换为大写。如果值为 INGRESS 或 INBOUND ,则映射到 INBOUND 。如果值为 EGRESS 或 OUTBOUND ,则映射到 OUTBOUND 。 |
network.protocol | network.application_protocol | 直接从原始日志字段 network.protocol 映射。 |
network.transport | network.ip_protocol | 直接从原始日志字段 network.transport 映射(针对 TLS 事件)。 |
server.bytes | network.sent_bytes(入站)/ network.received_bytes(出站) | 根据 network.direction 字段进行映射。如果是 INBOUND,则映射到 network.sent_bytes 。如果是 OUTBOUND,则映射到 network.received_bytes 。 |
server.domain | principal.hostname、principal.asset.hostname(入站)/ target.hostname、target.asset.hostname(出站) | 根据 network.direction 字段进行映射。如果是 INBOUND,则映射到 principal.hostname 。如果是 OUTBOUND,则映射到 target.hostname 。 |
server.ip | principal.ip、principal.asset.ip(入站)/ target.ip、target.asset.ip(出站) | 根据 network.direction 字段进行映射。如果是 INBOUND,则映射到 principal.ip 。如果是 OUTBOUND,则映射到 target.ip 。 |
server.port | principal.port(入站)/ target.port(出站) | 根据 network.direction 字段进行映射。如果是 INBOUND,则映射到 principal.port 。如果是 OUTBOUND,则映射到 target.port 。 |
source.bytes | network.received_bytes | 直接从原始日志字段 source.bytes 映射(针对 FLOW 事件)。 |
source.ip | principal.ip、principal.asset.ip | 直接从原始日志字段 source.ip 映射(针对 FLOW 事件)。 |
source.mac | principal.mac | 直接从原始日志字段 source.mac 映射(针对 FLOW 事件)。 |
source.port | principal.port | 直接从原始日志字段 source.port 映射(针对 FLOW 事件)。 |
状态 | metadata.description、security_result.summary | 如果 level 为空,则映射到 metadata.description 。与 error.message 连接以形成 HTTP 和 TLS 事件的 security_result.summary 字段。 |
tls.client.ja3 | network.tls.client.ja3 | 直接从原始日志字段 tls.client.ja3 映射。 |
tls.client.server_name | network.tls.client.server_name | 直接从原始日志字段 tls.client.server_name 映射。 |
tls.client.supported_ciphers | network.tls.client.supported_ciphers | 系统会处理 tls.client.supported_ciphers 数组中的每个元素,并将其映射到 network.tls.client.supported_ciphers 数组。 |
tls.cipher | network.tls.cipher | 直接从原始日志字段 tls.cipher 映射。 |
tls.detailed.server_certificate.not_after | network.tls.server.certificate.not_after | 直接从原始日志字段 tls.detailed.server_certificate.not_after 映射,并将其转换为时间戳。 |
tls.detailed.server_certificate.not_before | network.tls.server.certificate.not_before | 直接从原始日志字段 tls.detailed.server_certificate.not_before 映射,并将其转换为时间戳。 |
tls.detailed.server_certificate.serial_number | network.tls.server.certificate.serial | 直接从原始日志字段 tls.detailed.server_certificate.serial_number 映射。 |
tls.detailed.server_certificate.version | network.tls.server.certificate.version | 直接从原始日志字段 tls.detailed.server_certificate.version 映射,并将其转换为字符串。 |
tls.established | network.tls.established | 直接从原始日志字段 tls.established 映射。 |
tls.next_protocol | network.tls.next_protocol | 直接从原始日志字段 tls.next_protocol 映射。 |
tls.resumed | network.tls.resumed | 直接从原始日志字段 tls.resumed 映射。 |
tls.server.hash.sha1 | network.tls.server.certificate.sha1 | 直接从原始日志字段 tls.server.hash.sha1 映射,并转换为小写。 |
tls.server.issuer | network.tls.server.certificate.issuer | 直接从原始日志字段 tls.server.issuer 映射。 |
tls.server.subject | network.tls.server.certificate.subject | 直接从原始日志字段 tls.server.subject 映射。 |
tls.version | network.tls.version | 直接从原始日志字段 tls.version 映射。 |
tls.version_protocol | network.tls.version_protocol | 直接从原始日志字段 tls.version_protocol 映射。 |
类型 | 用于确定事件是否为 DNS 事件。 | |
url.full | principal.url(入站)/ target.url(出站) | 根据 network.direction 字段进行映射。如果是 INBOUND,则映射到 principal.url 。如果是 OUTBOUND,则映射到 target.url 。 |
user_id | target.user.userid | 直接从原始日志字段 user_id 映射。 |
user_name | target.user.user_display_name | 直接从原始日志字段 user_name 映射。 |
metadata.event_type | 默认设置为 GENERIC_EVENT 。根据日志来源和事件数据更改为特定事件类型。 |
|
metadata.vendor_name | 默认设置为 Elastic 。 |
|
metadata.product_name | 默认设置为 PacketBeat 。 |
|
security_result.action | 默认设置为 ALLOW 。 |
|
metadata.log_type | 默认设置为 ELASTIC_PACKETBEATS 。 |
需要更多帮助?从社区成员和 Google SecOps 专业人士那里获得解答。