收集 Apache Hadoop 記錄

支援的國家/地區:

本文說明如何使用 Bindplane 將 Apache Hadoop 記錄擷取至 Google Security Operations。剖析器會先根據常見的 Hadoop 記錄格式,使用 Grok 模式從原始 Hadoop 記錄中擷取欄位。接著,系統會將擷取的欄位對應至 Unified Data Model (UDM) 結構定義中的對應欄位、執行資料型別轉換,並以其他內容擴充資料。

事前準備

請確認您已完成下列事前準備事項:

  • Google SecOps 執行個體
  • 搭載 systemd 的 Windows 2016 以上版本或 Linux 主機
  • 如果透過 Proxy 執行,請確保防火牆通訊埠已根據 Bindplane 代理程式需求開啟
  • Apache Hadoop 叢集設定檔的特殊存取權

取得 Google SecOps 擷取驗證檔案

  1. 登入 Google SecOps 控制台。
  2. 依序前往「SIEM 設定」>「收集代理程式」
  3. 下載擷取驗證檔案。將檔案安全地儲存在要安裝 Bindplane 的系統上。

取得 Google SecOps 客戶 ID

  1. 登入 Google SecOps 控制台。
  2. 依序前往「SIEM 設定」>「設定檔」
  3. 複製並儲存「機構詳細資料」專區中的客戶 ID

安裝 Bindplane 代理程式

請按照下列操作說明,在 Windows 或 Linux 作業系統上安裝 Bindplane 代理程式。

Windows 安裝

  1. 以管理員身分開啟「命令提示字元」或「PowerShell」
  2. 執行下列指令:

    msiexec /i "https://github.com/observIQ/bindplane-agent/releases/latest/download/observiq-otel-collector.msi" /quiet
    

Linux 安裝

  1. 開啟具備根層級或 sudo 權限的終端機。
  2. 執行下列指令:

    sudo sh -c "$(curl -fsSlL https://github.com/observiq/bindplane-agent/releases/latest/download/install_unix.sh)" install_unix.sh
    

其他安裝資源

設定 Bindplane 代理程式,擷取系統記錄檔並傳送至 Google SecOps

  1. 存取設定檔:

    1. 找出 config.yaml 檔案。通常位於 Linux 的 /etc/bindplane-agent/ 目錄,或 Windows 的安裝目錄。
    2. 使用文字編輯器 (例如 nanovi 或記事本) 開啟檔案。
  2. 按照下列方式編輯 config.yaml 檔案:

    receivers:
      udplog:
        # Replace the port and IP address as required
        listen_address: "0.0.0.0:514"
    
    exporters:
      chronicle/chronicle_w_labels:
        compression: gzip
        # Adjust the path to the credentials file you downloaded in Step 1
        creds_file_path: '/path/to/ingestion-authentication-file.json'
        # Replace with your actual customer ID from Step 2
        customer_id: <CUSTOMER_ID>
        endpoint: malachiteingestion-pa.googleapis.com
        # Add optional ingestion labels for better organization
        log_type: 'HADOOP'
        raw_log_field: body
        ingestion_labels:
    
    service:
      pipelines:
        logs/source0__chronicle_w_labels-0:
          receivers:
            - udplog
          exporters:
            - chronicle/chronicle_w_labels
    
    • 視基礎架構需求,替換通訊埠和 IP 位址。
    • <CUSTOMER_ID> 替換為實際的客戶 ID。
    • /path/to/ingestion-authentication-file.json 更新為「取得 Google SecOps 擷取驗證檔案」部分中驗證檔案的儲存路徑。

重新啟動 Bindplane 代理程式,以套用變更

  • 如要在 Linux 中重新啟動 Bindplane 代理程式,請執行下列指令:

    sudo systemctl restart bindplane-agent
    
  • 如要在 Windows 中重新啟動 Bindplane 代理程式,可以使用「服務」控制台,或輸入下列指令:

    net stop BindPlaneAgent && net start BindPlaneAgent
    

在 Apache Hadoop 上設定 Syslog 轉送

Apache Hadoop 使用 Log4j 記錄。根據 Log4j 版本設定適當的 Syslog appender,讓 Hadoop 常駐程式 (NameNode、DataNode、ResourceManager、NodeManager 等) 將記錄直接轉送至 Syslog 接收器 (Bindplane 主機)。Log4j 是透過檔案設定 (沒有網頁 UI)。

方法 1:Log4j 1.x 設定

  1. 找出 log4j.properties 檔案 (通常位於 $HADOOP_CONF_DIR/log4j.properties 中)。
  2. 在檔案中新增以下 SyslogAppender 設定:

    # Syslog appender (UDP example)
    log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
    log4j.appender.SYSLOG.SyslogHost=<BINDPLANE_HOST_IP>:514
    log4j.appender.SYSLOG.Facility=LOCAL0
    log4j.appender.SYSLOG.FacilityPrinting=true
    log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
    log4j.appender.SYSLOG.layout.ConversionPattern=%d{ISO8601} level=%p logger=%c thread=%t msg=%m%n
    
    # Example: send NameNode logs to syslog
    log4j.logger.org.apache.hadoop.hdfs.server.namenode=INFO,SYSLOG
    log4j.additivity.org.apache.hadoop.hdfs.server.namenode=false
    
    # Or attach to root logger to send all Hadoop logs
    # log4j.rootLogger=INFO, SYSLOG
    
  3. <BINDPLANE_HOST_IP> 替換為 Bindplane 主機的 IP 位址。

  4. 儲存檔案。

  5. 重新啟動 Hadoop Daemon,套用設定變更。

選項 2:Log4j 2.x 設定

  1. 找出 log4j2.xml 檔案 (通常位於 $HADOOP_CONF_DIR/log4j2.xml 中)。
  2. 在檔案中新增以下 Syslog appender 設定:

    <Configuration status="WARN">
      <Appenders>
        <!-- UDP example; for TCP use protocol="TCP" -->
        <Syslog name="SYSLOG" format="RFC5424"
                host="<BINDPLANE_HOST_IP>"
                port="514"
                protocol="UDP"
                facility="LOCAL0"
                appName="hadoop"
                enterpriseNumber="18060"
                mdcId="mdc">
          <PatternLayout pattern="%d{ISO8601} level=%p logger=%c thread=%t msg=%m %X%n"/>
        </Syslog>
      </Appenders>
    
      <Loggers>
        <!-- Send NameNode logs to syslog -->
        <Logger name="org.apache.hadoop.hdfs.server.namenode" level="info" additivity="false">
          <AppenderRef ref="SYSLOG"/>
        </Logger>
    
        <!-- Or send all Hadoop logs -->
        <Root level="info">
          <AppenderRef ref="SYSLOG"/>
        </Root>
      </Loggers>
    </Configuration>
    
    • <BINDPLANE_HOST_IP> 替換為 Bindplane 主機的 IP 位址。
  3. 儲存檔案。

  4. 重新啟動 Hadoop Daemon,套用設定變更。

UDM 對應表

記錄欄位 UDM 對應 邏輯
允許 security_result.action 如果為「false」,動作為「BLOCK」。如果為「true」,則動作為「ALLOW」。
auth_type additional.fields.key = "auth_type", additional.fields.value.string_value 使用 grok 模式「%{DATA:suser}@.*auth:%{WORD:auth_type}」從「ugi」欄位擷取。系統會移除括號和「auth:」。
呼叫 additional.fields.key = "Call#", additional.fields.value.string_value 直接對應。
call_context additional.fields.key = "callerContext", additional.fields.value.string_value 直接對應。
cliIP principal.ip 只有在「json_data」欄位存在且成功剖析為 JSON 時,才會對應。
cmd principal.process.command_line 直接對應。
cluster_name target.hostname 如有,則做為目標主機名稱。
metadata.event_timestamp.seconds 與月、年、時、分和秒搭配使用,可建構 event_timestamp。
說明 metadata.description 直接對應。
駕駛人 additional.fields.key = "driver", additional.fields.value.string_value 直接對應。
dst target.ip 或 target.hostname 或 target.file.full_path 如果成功剖析為 IP,則會對應至目標 IP。如果值開頭為「/user」,則會對應至目標檔案路徑。否則會對應至目標主機名稱。
dstport target.port 直接對應並轉換為整數。
執行者 security_result.rule_name 直接對應。
event_count additional.fields.key = "event_count", additional.fields.value.string_value 直接對應並轉換為字串。
fname src.file.full_path 直接對應。
小時 metadata.event_timestamp.seconds 與月、日、年、分和秒搭配使用,建構 event_timestamp。
id additional.fields.key = "id", additional.fields.value.string_value 直接對應。
ip principal.ip 移除任何開頭的「/」字元後,對應至主體 IP。
json_data 剖析為 JSON。擷取的欄位會對應至相應的 UDM 欄位。
logType additional.fields.key = "logType", additional.fields.value.string_value 直接對應。
訊息 用於使用 grok 模式擷取各種欄位。
方法 network.http.method 直接對應。
分鐘 metadata.event_timestamp.seconds 與月、日、年、時和秒搭配使用,建構 event_timestamp。
個月 metadata.event_timestamp.seconds 與日、年、時、分和秒搭配使用,建構 event_timestamp。
觀察員 observer.hostname 或 observer.ip 如果成功剖析為 IP,則會對應至觀察者 IP。否則會對應至觀察器主機名稱。
perm additional.fields.key = "perm", additional.fields.value.string_value 直接對應。
政策 security_result.rule_id 直接對應並轉換為字串。
產品 metadata.product_name 直接對應。
product_event metadata.product_event_type 直接對應。如果是「rename」,「dst」欄位會對應至「target.file.full_path」。
proto network.application_protocol 如果不是「webhdfs」,則直接對應並轉換為大寫。
原因 security_result.summary 直接對應。
存放區 additional.fields.key = "repo", additional.fields.value.string_value 直接對應。
resType additional.fields.key = "resType", additional.fields.value.string_value 直接對應。
result additional.fields.key = "result", additional.fields.value.string_value 直接對應並轉換為字串。
重試 additional.fields.key = "Retry#", additional.fields.value.string_value 直接對應。
metadata.event_timestamp.seconds 與月、日、年、時和分搭配使用,建構 event_timestamp。
seq_num additional.fields.key = "seq_num", additional.fields.value.string_value 直接對應並轉換為字串。
嚴重性 security_result.severity 根據值對應至不同嚴重性等級:「INFO」、「Info」、「info」->「INFORMATIONAL」;「Low」、「low」、「LOW」->「LOW」;「error」、「Error」、「WARN」、「Warn」->「MEDIUM」;「High」、「high」、「HIGH」->「HIGH」;「Critical」、「critical」、「CRITICAL」->「CRITICAL」。
shost principal.hostname 如果與「src」不同,則做為主要主機名稱。
src principal.ip 或 principal.hostname 或 observer.ip 如果成功剖析為 IP,則會對應至主體和觀察者 IP。否則會對應至主機名稱。
srcport principal.port 直接對應並轉換為整數。
摘要 security_result.summary 直接對應。
suser principal.user.userid 直接對應。
標記 additional.fields.key = "tags", additional.fields.value.string_value 直接對應。
執行緒 additional.fields.key = "thread", additional.fields.value.string_value 直接對應。
提示 target.ip 直接對應。
ugi target.hostname 如果「log_data」欄位不含「·」,則會做為目標主機名稱。
網址 target.url 直接對應。
vendor metadata.vendor_name 直接對應。
版本 metadata.product_version 直接對應。
metadata.event_timestamp.seconds 與月、日、時、分和秒搭配使用,可建構 event_timestamp。
不適用 metadata.event_type 預設為「NETWORK_CONNECTION」。如果未識別任何目標,則變更為「STATUS_UPDATE」。
不適用 metadata.log_type 設為「HADOOP」。
不適用 security_result.alert_state 如果嚴重程度為「HIGH」或「CRITICAL」,則設為「ALERTING」。
不適用 is_alert 如果嚴重程度為「HIGH」或「CRITICAL」,則設為「true」。
不適用 is_significant 如果嚴重程度為「HIGH」或「CRITICAL」,則設為「true」。

還有其他問題嗎?向社群成員和 Google SecOps 專業人員尋求答案。