收集 Apache Hadoop 記錄
本文說明如何使用 Bindplane 將 Apache Hadoop 記錄擷取至 Google Security Operations。剖析器會先根據常見的 Hadoop 記錄格式,使用 Grok 模式從原始 Hadoop 記錄中擷取欄位。接著,系統會將擷取的欄位對應至 Unified Data Model (UDM) 結構定義中的對應欄位、執行資料型別轉換,並以其他內容擴充資料。
事前準備
請確認您已完成下列事前準備事項:
- Google SecOps 執行個體
- 搭載
systemd的 Windows 2016 以上版本或 Linux 主機 - 如果透過 Proxy 執行,請確保防火牆通訊埠已根據 Bindplane 代理程式需求開啟
- Apache Hadoop 叢集設定檔的特殊存取權
取得 Google SecOps 擷取驗證檔案
- 登入 Google SecOps 控制台。
- 依序前往「SIEM 設定」>「收集代理程式」。
- 下載擷取驗證檔案。將檔案安全地儲存在要安裝 Bindplane 的系統上。
取得 Google SecOps 客戶 ID
- 登入 Google SecOps 控制台。
- 依序前往「SIEM 設定」>「設定檔」。
- 複製並儲存「機構詳細資料」專區中的客戶 ID。
安裝 Bindplane 代理程式
請按照下列操作說明,在 Windows 或 Linux 作業系統上安裝 Bindplane 代理程式。
Windows 安裝
- 以管理員身分開啟「命令提示字元」或「PowerShell」。
執行下列指令:
msiexec /i "https://github.com/observIQ/bindplane-agent/releases/latest/download/observiq-otel-collector.msi" /quiet
Linux 安裝
- 開啟具備根層級或 sudo 權限的終端機。
執行下列指令:
sudo sh -c "$(curl -fsSlL https://github.com/observiq/bindplane-agent/releases/latest/download/install_unix.sh)" install_unix.sh
其他安裝資源
- 如需其他安裝選項,請參閱這份安裝指南。
設定 Bindplane 代理程式,擷取系統記錄檔並傳送至 Google SecOps
存取設定檔:
- 找出
config.yaml檔案。通常位於 Linux 的/etc/bindplane-agent/目錄,或 Windows 的安裝目錄。 - 使用文字編輯器 (例如
nano、vi或記事本) 開啟檔案。
- 找出
按照下列方式編輯
config.yaml檔案:receivers: udplog: # Replace the port and IP address as required listen_address: "0.0.0.0:514" exporters: chronicle/chronicle_w_labels: compression: gzip # Adjust the path to the credentials file you downloaded in Step 1 creds_file_path: '/path/to/ingestion-authentication-file.json' # Replace with your actual customer ID from Step 2 customer_id: <CUSTOMER_ID> endpoint: malachiteingestion-pa.googleapis.com # Add optional ingestion labels for better organization log_type: 'HADOOP' raw_log_field: body ingestion_labels: service: pipelines: logs/source0__chronicle_w_labels-0: receivers: - udplog exporters: - chronicle/chronicle_w_labels
重新啟動 Bindplane 代理程式,以套用變更
如要在 Linux 中重新啟動 Bindplane 代理程式,請執行下列指令:
sudo systemctl restart bindplane-agent如要在 Windows 中重新啟動 Bindplane 代理程式,可以使用「服務」控制台,或輸入下列指令:
net stop BindPlaneAgent && net start BindPlaneAgent
在 Apache Hadoop 上設定 Syslog 轉送
Apache Hadoop 使用 Log4j 記錄。根據 Log4j 版本設定適當的 Syslog appender,讓 Hadoop 常駐程式 (NameNode、DataNode、ResourceManager、NodeManager 等) 將記錄直接轉送至 Syslog 接收器 (Bindplane 主機)。Log4j 是透過檔案設定 (沒有網頁 UI)。
方法 1:Log4j 1.x 設定
- 找出 log4j.properties 檔案 (通常位於
$HADOOP_CONF_DIR/log4j.properties中)。 在檔案中新增以下 SyslogAppender 設定:
# Syslog appender (UDP example) log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender log4j.appender.SYSLOG.SyslogHost=<BINDPLANE_HOST_IP>:514 log4j.appender.SYSLOG.Facility=LOCAL0 log4j.appender.SYSLOG.FacilityPrinting=true log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout log4j.appender.SYSLOG.layout.ConversionPattern=%d{ISO8601} level=%p logger=%c thread=%t msg=%m%n # Example: send NameNode logs to syslog log4j.logger.org.apache.hadoop.hdfs.server.namenode=INFO,SYSLOG log4j.additivity.org.apache.hadoop.hdfs.server.namenode=false # Or attach to root logger to send all Hadoop logs # log4j.rootLogger=INFO, SYSLOG將
<BINDPLANE_HOST_IP>替換為 Bindplane 主機的 IP 位址。儲存檔案。
重新啟動 Hadoop Daemon,套用設定變更。
選項 2:Log4j 2.x 設定
- 找出 log4j2.xml 檔案 (通常位於
$HADOOP_CONF_DIR/log4j2.xml中)。 在檔案中新增以下 Syslog appender 設定:
<Configuration status="WARN"> <Appenders> <!-- UDP example; for TCP use protocol="TCP" --> <Syslog name="SYSLOG" format="RFC5424" host="<BINDPLANE_HOST_IP>" port="514" protocol="UDP" facility="LOCAL0" appName="hadoop" enterpriseNumber="18060" mdcId="mdc"> <PatternLayout pattern="%d{ISO8601} level=%p logger=%c thread=%t msg=%m %X%n"/> </Syslog> </Appenders> <Loggers> <!-- Send NameNode logs to syslog --> <Logger name="org.apache.hadoop.hdfs.server.namenode" level="info" additivity="false"> <AppenderRef ref="SYSLOG"/> </Logger> <!-- Or send all Hadoop logs --> <Root level="info"> <AppenderRef ref="SYSLOG"/> </Root> </Loggers> </Configuration>- 將
<BINDPLANE_HOST_IP>替換為 Bindplane 主機的 IP 位址。
- 將
儲存檔案。
重新啟動 Hadoop Daemon,套用設定變更。
UDM 對應表
| 記錄欄位 | UDM 對應 | 邏輯 |
|---|---|---|
| 允許 | security_result.action | 如果為「false」,動作為「BLOCK」。如果為「true」,則動作為「ALLOW」。 |
| auth_type | additional.fields.key = "auth_type", additional.fields.value.string_value | 使用 grok 模式「%{DATA:suser}@.*auth:%{WORD:auth_type}」從「ugi」欄位擷取。系統會移除括號和「auth:」。 |
| 呼叫 | additional.fields.key = "Call#", additional.fields.value.string_value | 直接對應。 |
| call_context | additional.fields.key = "callerContext", additional.fields.value.string_value | 直接對應。 |
| cliIP | principal.ip | 只有在「json_data」欄位存在且成功剖析為 JSON 時,才會對應。 |
| cmd | principal.process.command_line | 直接對應。 |
| cluster_name | target.hostname | 如有,則做為目標主機名稱。 |
| 天 | metadata.event_timestamp.seconds | 與月、年、時、分和秒搭配使用,可建構 event_timestamp。 |
| 說明 | metadata.description | 直接對應。 |
| 駕駛人 | additional.fields.key = "driver", additional.fields.value.string_value | 直接對應。 |
| dst | target.ip 或 target.hostname 或 target.file.full_path | 如果成功剖析為 IP,則會對應至目標 IP。如果值開頭為「/user」,則會對應至目標檔案路徑。否則會對應至目標主機名稱。 |
| dstport | target.port | 直接對應並轉換為整數。 |
| 執行者 | security_result.rule_name | 直接對應。 |
| event_count | additional.fields.key = "event_count", additional.fields.value.string_value | 直接對應並轉換為字串。 |
| fname | src.file.full_path | 直接對應。 |
| 小時 | metadata.event_timestamp.seconds | 與月、日、年、分和秒搭配使用,建構 event_timestamp。 |
| id | additional.fields.key = "id", additional.fields.value.string_value | 直接對應。 |
| ip | principal.ip | 移除任何開頭的「/」字元後,對應至主體 IP。 |
| json_data | 剖析為 JSON。擷取的欄位會對應至相應的 UDM 欄位。 | |
| logType | additional.fields.key = "logType", additional.fields.value.string_value | 直接對應。 |
| 訊息 | 用於使用 grok 模式擷取各種欄位。 | |
| 方法 | network.http.method | 直接對應。 |
| 分鐘 | metadata.event_timestamp.seconds | 與月、日、年、時和秒搭配使用,建構 event_timestamp。 |
| 個月 | metadata.event_timestamp.seconds | 與日、年、時、分和秒搭配使用,建構 event_timestamp。 |
| 觀察員 | observer.hostname 或 observer.ip | 如果成功剖析為 IP,則會對應至觀察者 IP。否則會對應至觀察器主機名稱。 |
| perm | additional.fields.key = "perm", additional.fields.value.string_value | 直接對應。 |
| 政策 | security_result.rule_id | 直接對應並轉換為字串。 |
| 產品 | metadata.product_name | 直接對應。 |
| product_event | metadata.product_event_type | 直接對應。如果是「rename」,「dst」欄位會對應至「target.file.full_path」。 |
| proto | network.application_protocol | 如果不是「webhdfs」,則直接對應並轉換為大寫。 |
| 原因 | security_result.summary | 直接對應。 |
| 存放區 | additional.fields.key = "repo", additional.fields.value.string_value | 直接對應。 |
| resType | additional.fields.key = "resType", additional.fields.value.string_value | 直接對應。 |
| result | additional.fields.key = "result", additional.fields.value.string_value | 直接對應並轉換為字串。 |
| 重試 | additional.fields.key = "Retry#", additional.fields.value.string_value | 直接對應。 |
| 秒 | metadata.event_timestamp.seconds | 與月、日、年、時和分搭配使用,建構 event_timestamp。 |
| seq_num | additional.fields.key = "seq_num", additional.fields.value.string_value | 直接對應並轉換為字串。 |
| 嚴重性 | security_result.severity | 根據值對應至不同嚴重性等級:「INFO」、「Info」、「info」->「INFORMATIONAL」;「Low」、「low」、「LOW」->「LOW」;「error」、「Error」、「WARN」、「Warn」->「MEDIUM」;「High」、「high」、「HIGH」->「HIGH」;「Critical」、「critical」、「CRITICAL」->「CRITICAL」。 |
| shost | principal.hostname | 如果與「src」不同,則做為主要主機名稱。 |
| src | principal.ip 或 principal.hostname 或 observer.ip | 如果成功剖析為 IP,則會對應至主體和觀察者 IP。否則會對應至主機名稱。 |
| srcport | principal.port | 直接對應並轉換為整數。 |
| 摘要 | security_result.summary | 直接對應。 |
| suser | principal.user.userid | 直接對應。 |
| 標記 | additional.fields.key = "tags", additional.fields.value.string_value | 直接對應。 |
| 執行緒 | additional.fields.key = "thread", additional.fields.value.string_value | 直接對應。 |
| 提示 | target.ip | 直接對應。 |
| ugi | target.hostname | 如果「log_data」欄位不含「·」,則會做為目標主機名稱。 |
| 網址 | target.url | 直接對應。 |
| vendor | metadata.vendor_name | 直接對應。 |
| 版本 | metadata.product_version | 直接對應。 |
| 年 | metadata.event_timestamp.seconds | 與月、日、時、分和秒搭配使用,可建構 event_timestamp。 |
| 不適用 | metadata.event_type | 預設為「NETWORK_CONNECTION」。如果未識別任何目標,則變更為「STATUS_UPDATE」。 |
| 不適用 | metadata.log_type | 設為「HADOOP」。 |
| 不適用 | security_result.alert_state | 如果嚴重程度為「HIGH」或「CRITICAL」,則設為「ALERTING」。 |
| 不適用 | is_alert | 如果嚴重程度為「HIGH」或「CRITICAL」,則設為「true」。 |
| 不適用 | is_significant | 如果嚴重程度為「HIGH」或「CRITICAL」,則設為「true」。 |
還有其他問題嗎?向社群成員和 Google SecOps 專業人員尋求答案。