Raccogliere i log di Apache Hadoop
Questo documento spiega come importare i log di Apache Hadoop in Google Security Operations utilizzando Bindplane. Il parser estrae innanzitutto i campi dai log Hadoop non elaborati utilizzando i pattern Grok in base ai formati comuni dei log Hadoop. Dopodiché, mappa i campi estratti con i campi corrispondenti nello schema Unified Data Model (UDM), esegue le conversioni dei tipi di dati e arricchisce i dati con un contesto aggiuntivo.
Prima di iniziare
Assicurati di soddisfare i seguenti prerequisiti:
- Un'istanza Google SecOps
- Un host Windows 2016 o versioni successive o Linux con
systemd - Se l'agente viene eseguito dietro un proxy, assicurati che le porte del firewall siano aperte in base ai requisiti dell'agente Bindplane.
- Accesso con privilegi ai file di configurazione del cluster Apache Hadoop
Recuperare il file di autenticazione importazione di Google SecOps
- Accedi alla console Google SecOps.
- Vai a Impostazioni SIEM > Agenti di raccolta.
- Scarica il file di autenticazione importazione. Salva il file in modo sicuro sul sistema in cui verrà installato Bindplane.
Recuperare l'ID cliente Google SecOps
- Accedi alla console Google SecOps.
- Vai a Impostazioni SIEM > Profilo.
- Copia e salva l'ID cliente dalla sezione Dettagli dell'organizzazione.
Installa l'agente Bindplane
Installa l'agente Bindplane sul sistema operativo Windows o Linux seguendo le istruzioni riportate di seguito.
Installazione di Windows
- Apri il prompt dei comandi o PowerShell come amministratore.
Esegui questo comando:
msiexec /i "https://github.com/observIQ/bindplane-agent/releases/latest/download/observiq-otel-collector.msi" /quiet
Installazione di Linux
- Apri un terminale con privilegi root o sudo.
Esegui questo comando:
sudo sh -c "$(curl -fsSlL https://github.com/observiq/bindplane-agent/releases/latest/download/install_unix.sh)" install_unix.sh
Risorse aggiuntive per l'installazione
- Per ulteriori opzioni di installazione, consulta questa guida all'installazione.
Configura l'agente Bindplane per importare Syslog e inviarlo a Google SecOps
Accedi al file di configurazione:
- Individua il file
config.yaml. In genere si trova nella directory/etc/bindplane-agent/su Linux o nella directory di installazione su Windows. - Apri il file utilizzando un editor di testo (ad esempio
nano,vio Blocco note).
- Individua il file
Modifica il file
config.yamlcome segue:receivers: udplog: # Replace the port and IP address as required listen_address: "0.0.0.0:514" exporters: chronicle/chronicle_w_labels: compression: gzip # Adjust the path to the credentials file you downloaded in Step 1 creds_file_path: '/path/to/ingestion-authentication-file.json' # Replace with your actual customer ID from Step 2 customer_id: <CUSTOMER_ID> endpoint: malachiteingestion-pa.googleapis.com # Add optional ingestion labels for better organization log_type: 'HADOOP' raw_log_field: body ingestion_labels: service: pipelines: logs/source0__chronicle_w_labels-0: receivers: - udplog exporters: - chronicle/chronicle_w_labels- Sostituisci la porta e l'indirizzo IP in base alle esigenze della tua infrastruttura.
- Sostituisci
<CUSTOMER_ID>con l'ID cliente effettivo. - Aggiorna
/path/to/ingestion-authentication-file.jsonal percorso in cui è stato salvato il file di autenticazione nella sezione Recupera il file di autenticazione per l'importazione di Google SecOps.
Riavvia l'agente Bindplane per applicare le modifiche
Per riavviare l'agente Bindplane in Linux, esegui questo comando:
sudo systemctl restart bindplane-agentPer riavviare l'agente Bindplane in Windows, puoi utilizzare la console Servizi o inserire il seguente comando:
net stop BindPlaneAgent && net start BindPlaneAgent
Configura l'inoltro di Syslog su Apache Hadoop
Apache Hadoop utilizza Log4j per la registrazione. Configura l'appender Syslog appropriato in base alla tua versione di Log4j in modo che i daemon Hadoop (NameNode, DataNode, ResourceManager, NodeManager e così via) inoltrino i log direttamente al ricevitore syslog (host Bindplane). Log4j è configurato tramite file (nessuna UI web).
Opzione 1: configurazione di Log4j 1.x
- Individua il file log4j.properties (in genere in
$HADOOP_CONF_DIR/log4j.properties). Aggiungi la seguente configurazione SyslogAppender al file:
# Syslog appender (UDP example) log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender log4j.appender.SYSLOG.SyslogHost=<BINDPLANE_HOST_IP>:514 log4j.appender.SYSLOG.Facility=LOCAL0 log4j.appender.SYSLOG.FacilityPrinting=true log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout log4j.appender.SYSLOG.layout.ConversionPattern=%d{ISO8601} level=%p logger=%c thread=%t msg=%m%n # Example: send NameNode logs to syslog log4j.logger.org.apache.hadoop.hdfs.server.namenode=INFO,SYSLOG log4j.additivity.org.apache.hadoop.hdfs.server.namenode=false # Or attach to root logger to send all Hadoop logs # log4j.rootLogger=INFO, SYSLOGSostituisci
<BINDPLANE_HOST_IP>con l'indirizzo IP dell'host Bindplane.Salva il file.
Riavvia i daemon Hadoop per applicare le modifiche alla configurazione.
Opzione 2: configurazione di Log4j 2.x
- Individua il file log4j2.xml (in genere in
$HADOOP_CONF_DIR/log4j2.xml). Aggiungi la seguente configurazione di Syslog appender al file:
<Configuration status="WARN"> <Appenders> <!-- UDP example; for TCP use protocol="TCP" --> <Syslog name="SYSLOG" format="RFC5424" host="<BINDPLANE_HOST_IP>" port="514" protocol="UDP" facility="LOCAL0" appName="hadoop" enterpriseNumber="18060" mdcId="mdc"> <PatternLayout pattern="%d{ISO8601} level=%p logger=%c thread=%t msg=%m %X%n"/> </Syslog> </Appenders> <Loggers> <!-- Send NameNode logs to syslog --> <Logger name="org.apache.hadoop.hdfs.server.namenode" level="info" additivity="false"> <AppenderRef ref="SYSLOG"/> </Logger> <!-- Or send all Hadoop logs --> <Root level="info"> <AppenderRef ref="SYSLOG"/> </Root> </Loggers> </Configuration>- Sostituisci
<BINDPLANE_HOST_IP>con l'indirizzo IP dell'host Bindplane.
- Sostituisci
Salva il file.
Riavvia i daemon Hadoop per applicare le modifiche alla configurazione.
Tabella di mappatura UDM
| Campo log | Mappatura UDM | Logic |
|---|---|---|
| consentito | security_result.action | Se "false", l'azione è "BLOCK". Se "true", l'azione è "ALLOW". |
| auth_type | additional.fields.key = "auth_type", additional.fields.value.string_value | Estratto dal campo "ugi" utilizzando il pattern grok "%{DATA:suser}@.*auth:%{WORD:auth_type}". Le parentesi e "auth:" vengono rimossi. |
| chiamare | additional.fields.key = "Call#", additional.fields.value.string_value | Mappato direttamente. |
| call_context | additional.fields.key = "callerContext", additional.fields.value.string_value | Mappato direttamente. |
| cliIP | principal.ip | Mappato solo quando esiste il campo "json_data" e viene analizzato correttamente come JSON. |
| cmd | principal.process.command_line | Mappato direttamente. |
| cluster_name | target.hostname | Utilizzato come nome host di destinazione, se presente. |
| giorno | metadata.event_timestamp.seconds | Utilizzato con mese, anno, ore, minuti e secondi per creare event_timestamp. |
| descrizione | metadata.description | Mappato direttamente. |
| autista | additional.fields.key = "driver", additional.fields.value.string_value | Mappato direttamente. |
| dst | target.ip OR target.hostname OR target.file.full_path | Se l'analisi è stata eseguita correttamente come IP, viene mappato all'IP di destinazione. Se il valore inizia con "/user", mappato al percorso del file di destinazione. In caso contrario, viene mappato al nome host di destinazione. |
| dstport | target.port | Mappato e convertito direttamente in un numero intero. |
| enforcer | security_result.rule_name | Mappato direttamente. |
| event_count | additional.fields.key = "event_count", additional.fields.value.string_value | Mappato e convertito direttamente in stringa. |
| fname | src.file.full_path | Mappato direttamente. |
| ore | metadata.event_timestamp.seconds | Utilizzato con mese, giorno, anno, minuti e secondi per creare event_timestamp. |
| id | additional.fields.key = "id", additional.fields.value.string_value | Mappato direttamente. |
| ip | principal.ip | Mappato all'IP dell'entità dopo aver rimosso eventuali caratteri "/" iniziali. |
| json_data | Analizzato come JSON. I campi estratti vengono mappati ai campi UDM corrispondenti. | |
| logType | additional.fields.key = "logType", additional.fields.value.string_value | Mappato direttamente. |
| messaggio | Utilizzato per estrarre vari campi utilizzando i pattern grok. | |
| metodo | network.http.method | Mappato direttamente. |
| minuti | metadata.event_timestamp.seconds | Utilizzato con mese, giorno, anno, ore e secondi per creare event_timestamp. |
| mese | metadata.event_timestamp.seconds | Utilizzato con giorno, anno, ore, minuti e secondi per creare event_timestamp. |
| osservatore | observer.hostname OR observer.ip | Se l'analisi è stata eseguita correttamente come IP, viene mappato all'IP dell'osservatore. In caso contrario, viene mappato al nome host dell'osservatore. |
| perm | additional.fields.key = "perm", additional.fields.value.string_value | Mappato direttamente. |
| policy | security_result.rule_id | Mappato e convertito direttamente in stringa. |
| prodotto | metadata.product_name | Mappato direttamente. |
| product_event | metadata.product_event_type | Mappato direttamente. Se "rename", il campo "dst" viene mappato su "target.file.full_path". |
| proto | network.application_protocol | Mappato e convertito direttamente in maiuscolo se non è "webhdfs". |
| motivo | security_result.summary | Mappato direttamente. |
| repo | additional.fields.key = "repo", additional.fields.value.string_value | Mappato direttamente. |
| resType | additional.fields.key = "resType", additional.fields.value.string_value | Mappato direttamente. |
| result | additional.fields.key = "result", additional.fields.value.string_value | Mappato e convertito direttamente in stringa. |
| Riprova | additional.fields.key = "Retry#", additional.fields.value.string_value | Mappato direttamente. |
| secondi | metadata.event_timestamp.seconds | Utilizzato con mese, giorno, anno, ore e minuti per creare event_timestamp. |
| seq_num | additional.fields.key = "seq_num", additional.fields.value.string_value | Mappato e convertito direttamente in stringa. |
| gravità | security_result.severity | Mappato a diversi livelli di gravità in base al valore: "INFO", "Info", "info" -> "INFORMATIONAL"; "Low", "low", "LOW" -> "LOW"; "error", "Error", "WARN", "Warn" -> "MEDIUM"; "High", "high", "HIGH" -> "HIGH"; "Critical", "critical", "CRITICAL" -> "CRITICAL". |
| spettro | principal.hostname | Utilizzato come nome host principale se diverso da "src". |
| src | principal.ip OR principal.hostname OR observer.ip | Se l'analisi è stata eseguita correttamente come IP, mappato all'IP principale e all'IP osservatore. In caso contrario, viene mappato al nome host principale. |
| srcport | principal.port | Mappato e convertito direttamente in un numero intero. |
| riepilogo | security_result.summary | Mappato direttamente. |
| suser | principal.user.userid | Mappato direttamente. |
| Tag | additional.fields.key = "tags", additional.fields.value.string_value | Mappato direttamente. |
| thread | additional.fields.key = "thread", additional.fields.value.string_value | Mappato direttamente. |
| mancia | target.ip | Mappato direttamente. |
| ugi | target.hostname | Utilizzato come nome host di destinazione se il campo "log_data" non contiene "·". |
| url | target.url | Mappato direttamente. |
| vendor | metadata.vendor_name | Mappato direttamente. |
| versione | metadata.product_version | Mappato direttamente. |
| anno | metadata.event_timestamp.seconds | Utilizzato con mese, giorno, ore, minuti e secondi per creare event_timestamp. |
| N/D | metadata.event_type | Impostato su "NETWORK_CONNECTION" per impostazione predefinita. Modificato in "STATUS_UPDATE" se non viene identificata alcuna destinazione. |
| N/D | metadata.log_type | Imposta "HADOOP". |
| N/D | security_result.alert_state | Impostato su "ALERTING" se la gravità è "HIGH" o "CRITICAL". |
| N/D | is_alert | Impostato su "true" se la gravità è "HIGH" o "CRITICAL". |
| N/D | is_significant | Impostato su "true" se la gravità è "HIGH" o "CRITICAL". |
Hai bisogno di ulteriore assistenza? Ricevi risposte dai membri della community e dai professionisti di Google SecOps.