收集 Apache Hadoop 日志

支持的平台:

本文档介绍了如何使用 Bindplane 将 Apache Hadoop 日志注入到 Google Security Operations。解析器首先使用基于常见 Hadoop 日志格式的 Grok 模式从原始 Hadoop 日志中提取字段。然后,它会将提取的字段映射到统一数据模型 (UDM) 架构中的相应字段,执行数据类型转换,并使用其他上下文信息丰富数据。

准备工作

请确保满足以下前提条件:

  • Google SecOps 实例
  • 搭载 systemd 的 Windows 2016 或更高版本或 Linux 主机
  • 如果通过代理运行,请确保防火墙端口已根据 Bindplane 代理要求打开
  • 对 Apache Hadoop 集群配置文件的特权访问权限

获取 Google SecOps 注入身份验证文件

  1. 登录 Google SecOps 控制台。
  2. 依次前往 SIEM 设置 > 收集代理
  3. 下载注入身份验证文件。将文件安全地保存在将要安装 Bindplane 的系统上。

获取 Google SecOps 客户 ID

  1. 登录 Google SecOps 控制台。
  2. 依次前往 SIEM 设置 > 个人资料
  3. 复制并保存组织详细信息部分中的客户 ID

安装 Bindplane 代理

按照以下说明在 Windows 或 Linux 操作系统上安装 Bindplane 代理。

Windows 安装

  1. 以管理员身份打开命令提示符PowerShell
  2. 运行以下命令:

    msiexec /i "https://github.com/observIQ/bindplane-agent/releases/latest/download/observiq-otel-collector.msi" /quiet
    

Linux 安装

  1. 打开具有 root 或 sudo 权限的终端。
  2. 运行以下命令:

    sudo sh -c "$(curl -fsSlL https://github.com/observiq/bindplane-agent/releases/latest/download/install_unix.sh)" install_unix.sh
    

其他安装资源

  • 如需了解其他安装选项,请参阅此安装指南

配置 Bindplane 代理以注入 Syslog 并将其发送到 Google SecOps

  1. 访问配置文件:

    1. 找到 config.yaml 文件。通常,它位于 Linux 上的 /etc/bindplane-agent/ 目录中或 Windows 上的安装目录中。
    2. 使用文本编辑器(例如 nanovi 或记事本)打开该文件。
  2. 按如下方式修改 config.yaml 文件:

    receivers:
      udplog:
        # Replace the port and IP address as required
        listen_address: "0.0.0.0:514"
    
    exporters:
      chronicle/chronicle_w_labels:
        compression: gzip
        # Adjust the path to the credentials file you downloaded in Step 1
        creds_file_path: '/path/to/ingestion-authentication-file.json'
        # Replace with your actual customer ID from Step 2
        customer_id: <CUSTOMER_ID>
        endpoint: malachiteingestion-pa.googleapis.com
        # Add optional ingestion labels for better organization
        log_type: 'HADOOP'
        raw_log_field: body
        ingestion_labels:
    
    service:
      pipelines:
        logs/source0__chronicle_w_labels-0:
          receivers:
            - udplog
          exporters:
            - chronicle/chronicle_w_labels
    
    • 根据基础架构的需要替换端口和 IP 地址。
    • <CUSTOMER_ID> 替换为实际的客户 ID。
    • /path/to/ingestion-authentication-file.json 更新为获取 Google SecOps 注入身份验证文件部分中保存身份验证文件的路径。

重启 Bindplane 代理以应用更改

  • 如需在 Linux 中重启 Bindplane 代理,请运行以下命令:

    sudo systemctl restart bindplane-agent
    
  • 如需在 Windows 中重启 Bindplane 代理,您可以使用服务控制台,也可以输入以下命令:

    net stop BindPlaneAgent && net start BindPlaneAgent
    

在 Apache Hadoop 上配置 Syslog 转发

Apache Hadoop 使用 Log4j 进行日志记录。根据您的 Log4j 版本配置相应的 Syslog appender,以便 Hadoop 精灵程序(NameNode、DataNode、ResourceManager、NodeManager 等)直接将日志转发到您的 syslog 接收器(Bindplane 主机)。Log4j 通过文件(而非 Web 界面)进行配置。

方法 1:Log4j 1.x 配置

  1. 找到 log4j.properties 文件(通常位于 $HADOOP_CONF_DIR/log4j.properties 中)。
  2. 将以下 SyslogAppender 配置添加到该文件中:

    # Syslog appender (UDP example)
    log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
    log4j.appender.SYSLOG.SyslogHost=<BINDPLANE_HOST_IP>:514
    log4j.appender.SYSLOG.Facility=LOCAL0
    log4j.appender.SYSLOG.FacilityPrinting=true
    log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
    log4j.appender.SYSLOG.layout.ConversionPattern=%d{ISO8601} level=%p logger=%c thread=%t msg=%m%n
    
    # Example: send NameNode logs to syslog
    log4j.logger.org.apache.hadoop.hdfs.server.namenode=INFO,SYSLOG
    log4j.additivity.org.apache.hadoop.hdfs.server.namenode=false
    
    # Or attach to root logger to send all Hadoop logs
    # log4j.rootLogger=INFO, SYSLOG
    
  3. <BINDPLANE_HOST_IP> 替换为 Bindplane 主机的 IP 地址。

  4. 保存文件。

  5. 重启 Hadoop 守护进程以应用配置更改。

选项 2:Log4j 2.x 配置

  1. 找到 log4j2.xml 文件(通常位于 $HADOOP_CONF_DIR/log4j2.xml 中)。
  2. 将以下 Syslog appender 配置添加到该文件中:

    <Configuration status="WARN">
      <Appenders>
        <!-- UDP example; for TCP use protocol="TCP" -->
        <Syslog name="SYSLOG" format="RFC5424"
                host="<BINDPLANE_HOST_IP>"
                port="514"
                protocol="UDP"
                facility="LOCAL0"
                appName="hadoop"
                enterpriseNumber="18060"
                mdcId="mdc">
          <PatternLayout pattern="%d{ISO8601} level=%p logger=%c thread=%t msg=%m %X%n"/>
        </Syslog>
      </Appenders>
    
      <Loggers>
        <!-- Send NameNode logs to syslog -->
        <Logger name="org.apache.hadoop.hdfs.server.namenode" level="info" additivity="false">
          <AppenderRef ref="SYSLOG"/>
        </Logger>
    
        <!-- Or send all Hadoop logs -->
        <Root level="info">
          <AppenderRef ref="SYSLOG"/>
        </Root>
      </Loggers>
    </Configuration>
    
    • <BINDPLANE_HOST_IP> 替换为 Bindplane 主机的 IP 地址。
  3. 保存文件。

  4. 重启 Hadoop 守护进程以应用配置更改。

UDM 映射表

日志字段 UDM 映射 逻辑
允许 security_result.action 如果为“false”,则操作为“BLOCK”。如果为“true”,则操作为“ALLOW”。
auth_type additional.fields.key = "auth_type", additional.fields.value.string_value 使用 grok 模式“%{DATA:suser}@.*auth:%{WORD:auth_type}”从“ugi”字段中提取。移除了圆括号和“auth:”。
call additional.fields.key = "Call#", additional.fields.value.string_value 直接映射。
call_context additional.fields.key = "callerContext", additional.fields.value.string_value 直接映射。
cliIP principal.ip 仅当“json_data”字段存在且成功解析为 JSON 时才进行映射。
cmd principal.process.command_line 直接映射。
cluster_name target.hostname 如果存在,则用作目标主机名。
metadata.event_timestamp.seconds 与月份、年份、小时、分钟和秒一起使用,以构造 event_timestamp。
说明 metadata.description 直接映射。
driver additional.fields.key = "driver", additional.fields.value.string_value 直接映射。
dst target.ip OR target.hostname OR target.file.full_path 如果成功解析为 IP,则映射到目标 IP。如果值以“/user”开头,则映射到目标文件路径。否则,映射到目标主机名。
dstport target.port 直接映射并转换为整数。
执行者 security_result.rule_name 直接映射。
event_count additional.fields.key = "event_count", additional.fields.value.string_value 直接映射并转换为字符串。
fname src.file.full_path 直接映射。
小时 metadata.event_timestamp.seconds 与月份、日期、年份、分钟和秒一起使用,以构造 event_timestamp。
id additional.fields.key = "id", additional.fields.value.string_value 直接映射。
ip principal.ip 在移除任何前导“/”字符后,映射到主账号 IP。
json_data 解析为 JSON。提取的字段会映射到相应的 UDM 字段。
logType additional.fields.key = "logType", additional.fields.value.string_value 直接映射。
消息 用于使用 Grok 模式提取各种字段。
方法 network.http.method 直接映射。
分钟 metadata.event_timestamp.seconds 与月份、日期、年份、小时和秒数一起使用,以构造 event_timestamp。
metadata.event_timestamp.seconds 与天、年、小时、分钟和秒一起使用,以构造 event_timestamp。
观察者 observer.hostname 或 observer.ip 如果成功解析为 IP,则映射到观测者 IP。否则,映射到观测者主机名。
perm additional.fields.key = "perm", additional.fields.value.string_value 直接映射。
政策 security_result.rule_id 直接映射并转换为字符串。
产品 metadata.product_name 直接映射。
product_event metadata.product_event_type 直接映射。如果为“rename”,则“dst”字段会映射到“target.file.full_path”。
proto network.application_protocol 如果不是“webhdfs”,则直接映射并转换为大写。
reason security_result.summary 直接映射。
代码库 additional.fields.key = "repo", additional.fields.value.string_value 直接映射。
resType additional.fields.key = "resType", additional.fields.value.string_value 直接映射。
结果 additional.fields.key = "result", additional.fields.value.string_value 直接映射并转换为字符串。
重试 additional.fields.key = "Retry#", additional.fields.value.string_value 直接映射。
metadata.event_timestamp.seconds 与月份、日期、年份、小时和分钟一起使用,以构造 event_timestamp。
seq_num additional.fields.key = "seq_num", additional.fields.value.string_value 直接映射并转换为字符串。
和程度上减少 security_result.severity 根据值映射到不同的严重程度级别:“INFO”“Info”“info”->“INFORMATIONAL”;“Low”“low”“LOW”->“LOW”;“error”“Error”“WARN”“Warn”->“MEDIUM”;“High”“high”“HIGH”->“HIGH”;“Critical”“critical”“CRITICAL”->“CRITICAL”。
shost principal.hostname 如果与“src”不同,则用作主要主机名。
src principal.ip 或 principal.hostname 或 observer.ip 如果成功解析为 IP,则映射到主 IP 和观测者 IP。否则,映射到主主机名。
srcport principal.port 直接映射并转换为整数。
摘要 security_result.summary 直接映射。
suser principal.user.userid 直接映射。
标签 additional.fields.key = "tags", additional.fields.value.string_value 直接映射。
线程 additional.fields.key = "thread", additional.fields.value.string_value 直接映射。
提示 target.ip 直接映射。
ugi target.hostname 如果“log_data”字段不包含“·”,则用作目标主机名。
网址 target.url 直接映射。
vendor metadata.vendor_name 直接映射。
版本 metadata.product_version 直接映射。
metadata.event_timestamp.seconds 与月、日、小时、分钟和秒一起使用,以构造 event_timestamp。
不适用 metadata.event_type 默认设置为“NETWORK_CONNECTION”。如果未识别到目标,则更改为“STATUS_UPDATE”。
不适用 metadata.log_type 设置为“HADOOP”。
不适用 security_result.alert_state 如果严重程度为“HIGH”或“CRITICAL”,则设置为“ALERTING”。
不适用 is_alert 如果严重程度为“高”或“严重”,则设置为“true”。
不适用 is_significant 如果严重程度为“高”或“严重”,则设置为“true”。

需要更多帮助?获得社区成员和 Google SecOps 专业人士的解答。