收集 Apache Hadoop 日志
本文档介绍了如何使用 Bindplane 将 Apache Hadoop 日志注入到 Google Security Operations。解析器首先使用基于常见 Hadoop 日志格式的 Grok 模式从原始 Hadoop 日志中提取字段。然后,它会将提取的字段映射到统一数据模型 (UDM) 架构中的相应字段,执行数据类型转换,并使用其他上下文信息丰富数据。
准备工作
请确保满足以下前提条件:
- Google SecOps 实例
- 搭载
systemd的 Windows 2016 或更高版本或 Linux 主机 - 如果通过代理运行,请确保防火墙端口已根据 Bindplane 代理要求打开
- 对 Apache Hadoop 集群配置文件的特权访问权限
获取 Google SecOps 注入身份验证文件
- 登录 Google SecOps 控制台。
- 依次前往 SIEM 设置 > 收集代理。
- 下载注入身份验证文件。将文件安全地保存在将要安装 Bindplane 的系统上。
获取 Google SecOps 客户 ID
- 登录 Google SecOps 控制台。
- 依次前往 SIEM 设置 > 个人资料。
- 复制并保存组织详细信息部分中的客户 ID。
安装 Bindplane 代理
按照以下说明在 Windows 或 Linux 操作系统上安装 Bindplane 代理。
Windows 安装
- 以管理员身份打开命令提示符或 PowerShell。
运行以下命令:
msiexec /i "https://github.com/observIQ/bindplane-agent/releases/latest/download/observiq-otel-collector.msi" /quiet
Linux 安装
- 打开具有 root 或 sudo 权限的终端。
运行以下命令:
sudo sh -c "$(curl -fsSlL https://github.com/observiq/bindplane-agent/releases/latest/download/install_unix.sh)" install_unix.sh
其他安装资源
- 如需了解其他安装选项,请参阅此安装指南。
配置 Bindplane 代理以注入 Syslog 并将其发送到 Google SecOps
访问配置文件:
- 找到
config.yaml文件。通常,它位于 Linux 上的/etc/bindplane-agent/目录中或 Windows 上的安装目录中。 - 使用文本编辑器(例如
nano、vi或记事本)打开该文件。
- 找到
按如下方式修改
config.yaml文件:receivers: udplog: # Replace the port and IP address as required listen_address: "0.0.0.0:514" exporters: chronicle/chronicle_w_labels: compression: gzip # Adjust the path to the credentials file you downloaded in Step 1 creds_file_path: '/path/to/ingestion-authentication-file.json' # Replace with your actual customer ID from Step 2 customer_id: <CUSTOMER_ID> endpoint: malachiteingestion-pa.googleapis.com # Add optional ingestion labels for better organization log_type: 'HADOOP' raw_log_field: body ingestion_labels: service: pipelines: logs/source0__chronicle_w_labels-0: receivers: - udplog exporters: - chronicle/chronicle_w_labels- 根据基础架构的需要替换端口和 IP 地址。
- 将
<CUSTOMER_ID>替换为实际的客户 ID。 - 将
/path/to/ingestion-authentication-file.json更新为获取 Google SecOps 注入身份验证文件部分中保存身份验证文件的路径。
重启 Bindplane 代理以应用更改
如需在 Linux 中重启 Bindplane 代理,请运行以下命令:
sudo systemctl restart bindplane-agent如需在 Windows 中重启 Bindplane 代理,您可以使用服务控制台,也可以输入以下命令:
net stop BindPlaneAgent && net start BindPlaneAgent
在 Apache Hadoop 上配置 Syslog 转发
Apache Hadoop 使用 Log4j 进行日志记录。根据您的 Log4j 版本配置相应的 Syslog appender,以便 Hadoop 精灵程序(NameNode、DataNode、ResourceManager、NodeManager 等)直接将日志转发到您的 syslog 接收器(Bindplane 主机)。Log4j 通过文件(而非 Web 界面)进行配置。
方法 1:Log4j 1.x 配置
- 找到 log4j.properties 文件(通常位于
$HADOOP_CONF_DIR/log4j.properties中)。 将以下 SyslogAppender 配置添加到该文件中:
# Syslog appender (UDP example) log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender log4j.appender.SYSLOG.SyslogHost=<BINDPLANE_HOST_IP>:514 log4j.appender.SYSLOG.Facility=LOCAL0 log4j.appender.SYSLOG.FacilityPrinting=true log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout log4j.appender.SYSLOG.layout.ConversionPattern=%d{ISO8601} level=%p logger=%c thread=%t msg=%m%n # Example: send NameNode logs to syslog log4j.logger.org.apache.hadoop.hdfs.server.namenode=INFO,SYSLOG log4j.additivity.org.apache.hadoop.hdfs.server.namenode=false # Or attach to root logger to send all Hadoop logs # log4j.rootLogger=INFO, SYSLOG将
<BINDPLANE_HOST_IP>替换为 Bindplane 主机的 IP 地址。保存文件。
重启 Hadoop 守护进程以应用配置更改。
选项 2:Log4j 2.x 配置
- 找到 log4j2.xml 文件(通常位于
$HADOOP_CONF_DIR/log4j2.xml中)。 将以下 Syslog appender 配置添加到该文件中:
<Configuration status="WARN"> <Appenders> <!-- UDP example; for TCP use protocol="TCP" --> <Syslog name="SYSLOG" format="RFC5424" host="<BINDPLANE_HOST_IP>" port="514" protocol="UDP" facility="LOCAL0" appName="hadoop" enterpriseNumber="18060" mdcId="mdc"> <PatternLayout pattern="%d{ISO8601} level=%p logger=%c thread=%t msg=%m %X%n"/> </Syslog> </Appenders> <Loggers> <!-- Send NameNode logs to syslog --> <Logger name="org.apache.hadoop.hdfs.server.namenode" level="info" additivity="false"> <AppenderRef ref="SYSLOG"/> </Logger> <!-- Or send all Hadoop logs --> <Root level="info"> <AppenderRef ref="SYSLOG"/> </Root> </Loggers> </Configuration>- 将
<BINDPLANE_HOST_IP>替换为 Bindplane 主机的 IP 地址。
- 将
保存文件。
重启 Hadoop 守护进程以应用配置更改。
UDM 映射表
| 日志字段 | UDM 映射 | 逻辑 |
|---|---|---|
| 允许 | security_result.action | 如果为“false”,则操作为“BLOCK”。如果为“true”,则操作为“ALLOW”。 |
| auth_type | additional.fields.key = "auth_type", additional.fields.value.string_value | 使用 grok 模式“%{DATA:suser}@.*auth:%{WORD:auth_type}”从“ugi”字段中提取。移除了圆括号和“auth:”。 |
| call | additional.fields.key = "Call#", additional.fields.value.string_value | 直接映射。 |
| call_context | additional.fields.key = "callerContext", additional.fields.value.string_value | 直接映射。 |
| cliIP | principal.ip | 仅当“json_data”字段存在且成功解析为 JSON 时才进行映射。 |
| cmd | principal.process.command_line | 直接映射。 |
| cluster_name | target.hostname | 如果存在,则用作目标主机名。 |
| 天 | metadata.event_timestamp.seconds | 与月份、年份、小时、分钟和秒一起使用,以构造 event_timestamp。 |
| 说明 | metadata.description | 直接映射。 |
| driver | additional.fields.key = "driver", additional.fields.value.string_value | 直接映射。 |
| dst | target.ip OR target.hostname OR target.file.full_path | 如果成功解析为 IP,则映射到目标 IP。如果值以“/user”开头,则映射到目标文件路径。否则,映射到目标主机名。 |
| dstport | target.port | 直接映射并转换为整数。 |
| 执行者 | security_result.rule_name | 直接映射。 |
| event_count | additional.fields.key = "event_count", additional.fields.value.string_value | 直接映射并转换为字符串。 |
| fname | src.file.full_path | 直接映射。 |
| 小时 | metadata.event_timestamp.seconds | 与月份、日期、年份、分钟和秒一起使用,以构造 event_timestamp。 |
| id | additional.fields.key = "id", additional.fields.value.string_value | 直接映射。 |
| ip | principal.ip | 在移除任何前导“/”字符后,映射到主账号 IP。 |
| json_data | 解析为 JSON。提取的字段会映射到相应的 UDM 字段。 | |
| logType | additional.fields.key = "logType", additional.fields.value.string_value | 直接映射。 |
| 消息 | 用于使用 Grok 模式提取各种字段。 | |
| 方法 | network.http.method | 直接映射。 |
| 分钟 | metadata.event_timestamp.seconds | 与月份、日期、年份、小时和秒数一起使用,以构造 event_timestamp。 |
| 月 | metadata.event_timestamp.seconds | 与天、年、小时、分钟和秒一起使用,以构造 event_timestamp。 |
| 观察者 | observer.hostname 或 observer.ip | 如果成功解析为 IP,则映射到观测者 IP。否则,映射到观测者主机名。 |
| perm | additional.fields.key = "perm", additional.fields.value.string_value | 直接映射。 |
| 政策 | security_result.rule_id | 直接映射并转换为字符串。 |
| 产品 | metadata.product_name | 直接映射。 |
| product_event | metadata.product_event_type | 直接映射。如果为“rename”,则“dst”字段会映射到“target.file.full_path”。 |
| proto | network.application_protocol | 如果不是“webhdfs”,则直接映射并转换为大写。 |
| reason | security_result.summary | 直接映射。 |
| 代码库 | additional.fields.key = "repo", additional.fields.value.string_value | 直接映射。 |
| resType | additional.fields.key = "resType", additional.fields.value.string_value | 直接映射。 |
| 结果 | additional.fields.key = "result", additional.fields.value.string_value | 直接映射并转换为字符串。 |
| 重试 | additional.fields.key = "Retry#", additional.fields.value.string_value | 直接映射。 |
| 秒 | metadata.event_timestamp.seconds | 与月份、日期、年份、小时和分钟一起使用,以构造 event_timestamp。 |
| seq_num | additional.fields.key = "seq_num", additional.fields.value.string_value | 直接映射并转换为字符串。 |
| 和程度上减少 | security_result.severity | 根据值映射到不同的严重程度级别:“INFO”“Info”“info”->“INFORMATIONAL”;“Low”“low”“LOW”->“LOW”;“error”“Error”“WARN”“Warn”->“MEDIUM”;“High”“high”“HIGH”->“HIGH”;“Critical”“critical”“CRITICAL”->“CRITICAL”。 |
| shost | principal.hostname | 如果与“src”不同,则用作主要主机名。 |
| src | principal.ip 或 principal.hostname 或 observer.ip | 如果成功解析为 IP,则映射到主 IP 和观测者 IP。否则,映射到主主机名。 |
| srcport | principal.port | 直接映射并转换为整数。 |
| 摘要 | security_result.summary | 直接映射。 |
| suser | principal.user.userid | 直接映射。 |
| 标签 | additional.fields.key = "tags", additional.fields.value.string_value | 直接映射。 |
| 线程 | additional.fields.key = "thread", additional.fields.value.string_value | 直接映射。 |
| 提示 | target.ip | 直接映射。 |
| ugi | target.hostname | 如果“log_data”字段不包含“·”,则用作目标主机名。 |
| 网址 | target.url | 直接映射。 |
| vendor | metadata.vendor_name | 直接映射。 |
| 版本 | metadata.product_version | 直接映射。 |
| 年 | metadata.event_timestamp.seconds | 与月、日、小时、分钟和秒一起使用,以构造 event_timestamp。 |
| 不适用 | metadata.event_type | 默认设置为“NETWORK_CONNECTION”。如果未识别到目标,则更改为“STATUS_UPDATE”。 |
| 不适用 | metadata.log_type | 设置为“HADOOP”。 |
| 不适用 | security_result.alert_state | 如果严重程度为“HIGH”或“CRITICAL”,则设置为“ALERTING”。 |
| 不适用 | is_alert | 如果严重程度为“高”或“严重”,则设置为“true”。 |
| 不适用 | is_significant | 如果严重程度为“高”或“严重”,则设置为“true”。 |
需要更多帮助?获得社区成员和 Google SecOps 专业人士的解答。