Raccogliere i log di Apache Hadoop

Supportato in:

Questo documento spiega come importare i log di Apache Hadoop in Google Security Operations utilizzando Bindplane. Il parser estrae innanzitutto i campi dai log Hadoop non elaborati utilizzando i pattern Grok in base ai formati comuni dei log Hadoop. Dopodiché, mappa i campi estratti con i campi corrispondenti nello schema Unified Data Model (UDM), esegue le conversioni dei tipi di dati e arricchisce i dati con un contesto aggiuntivo.

Prima di iniziare

Assicurati di soddisfare i seguenti prerequisiti:

  • Un'istanza Google SecOps
  • Un host Windows 2016 o versioni successive o Linux con systemd
  • Se l'agente viene eseguito dietro un proxy, assicurati che le porte del firewall siano aperte in base ai requisiti dell'agente Bindplane.
  • Accesso con privilegi ai file di configurazione del cluster Apache Hadoop

Recuperare il file di autenticazione importazione di Google SecOps

  1. Accedi alla console Google SecOps.
  2. Vai a Impostazioni SIEM > Agenti di raccolta.
  3. Scarica il file di autenticazione importazione. Salva il file in modo sicuro sul sistema in cui verrà installato Bindplane.

Recuperare l'ID cliente Google SecOps

  1. Accedi alla console Google SecOps.
  2. Vai a Impostazioni SIEM > Profilo.
  3. Copia e salva l'ID cliente dalla sezione Dettagli dell'organizzazione.

Installa l'agente Bindplane

Installa l'agente Bindplane sul sistema operativo Windows o Linux seguendo le istruzioni riportate di seguito.

Installazione di Windows

  1. Apri il prompt dei comandi o PowerShell come amministratore.
  2. Esegui questo comando:

    msiexec /i "https://github.com/observIQ/bindplane-agent/releases/latest/download/observiq-otel-collector.msi" /quiet
    

Installazione di Linux

  1. Apri un terminale con privilegi root o sudo.
  2. Esegui questo comando:

    sudo sh -c "$(curl -fsSlL https://github.com/observiq/bindplane-agent/releases/latest/download/install_unix.sh)" install_unix.sh
    

Risorse aggiuntive per l'installazione

Configura l'agente Bindplane per importare Syslog e inviarlo a Google SecOps

  1. Accedi al file di configurazione:

    1. Individua il file config.yaml. In genere si trova nella directory /etc/bindplane-agent/ su Linux o nella directory di installazione su Windows.
    2. Apri il file utilizzando un editor di testo (ad esempio nano, vi o Blocco note).
  2. Modifica il file config.yaml come segue:

    receivers:
      udplog:
        # Replace the port and IP address as required
        listen_address: "0.0.0.0:514"
    
    exporters:
      chronicle/chronicle_w_labels:
        compression: gzip
        # Adjust the path to the credentials file you downloaded in Step 1
        creds_file_path: '/path/to/ingestion-authentication-file.json'
        # Replace with your actual customer ID from Step 2
        customer_id: <CUSTOMER_ID>
        endpoint: malachiteingestion-pa.googleapis.com
        # Add optional ingestion labels for better organization
        log_type: 'HADOOP'
        raw_log_field: body
        ingestion_labels:
    
    service:
      pipelines:
        logs/source0__chronicle_w_labels-0:
          receivers:
            - udplog
          exporters:
            - chronicle/chronicle_w_labels
    

Riavvia l'agente Bindplane per applicare le modifiche

  • Per riavviare l'agente Bindplane in Linux, esegui questo comando:

    sudo systemctl restart bindplane-agent
    
  • Per riavviare l'agente Bindplane in Windows, puoi utilizzare la console Servizi o inserire il seguente comando:

    net stop BindPlaneAgent && net start BindPlaneAgent
    

Configura l'inoltro di Syslog su Apache Hadoop

Apache Hadoop utilizza Log4j per la registrazione. Configura l'appender Syslog appropriato in base alla tua versione di Log4j in modo che i daemon Hadoop (NameNode, DataNode, ResourceManager, NodeManager e così via) inoltrino i log direttamente al ricevitore syslog (host Bindplane). Log4j è configurato tramite file (nessuna UI web).

Opzione 1: configurazione di Log4j 1.x

  1. Individua il file log4j.properties (in genere in $HADOOP_CONF_DIR/log4j.properties).
  2. Aggiungi la seguente configurazione SyslogAppender al file:

    # Syslog appender (UDP example)
    log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
    log4j.appender.SYSLOG.SyslogHost=<BINDPLANE_HOST_IP>:514
    log4j.appender.SYSLOG.Facility=LOCAL0
    log4j.appender.SYSLOG.FacilityPrinting=true
    log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
    log4j.appender.SYSLOG.layout.ConversionPattern=%d{ISO8601} level=%p logger=%c thread=%t msg=%m%n
    
    # Example: send NameNode logs to syslog
    log4j.logger.org.apache.hadoop.hdfs.server.namenode=INFO,SYSLOG
    log4j.additivity.org.apache.hadoop.hdfs.server.namenode=false
    
    # Or attach to root logger to send all Hadoop logs
    # log4j.rootLogger=INFO, SYSLOG
    
  3. Sostituisci <BINDPLANE_HOST_IP> con l'indirizzo IP dell'host Bindplane.

  4. Salva il file.

  5. Riavvia i daemon Hadoop per applicare le modifiche alla configurazione.

Opzione 2: configurazione di Log4j 2.x

  1. Individua il file log4j2.xml (in genere in $HADOOP_CONF_DIR/log4j2.xml).
  2. Aggiungi la seguente configurazione di Syslog appender al file:

    <Configuration status="WARN">
      <Appenders>
        <!-- UDP example; for TCP use protocol="TCP" -->
        <Syslog name="SYSLOG" format="RFC5424"
                host="<BINDPLANE_HOST_IP>"
                port="514"
                protocol="UDP"
                facility="LOCAL0"
                appName="hadoop"
                enterpriseNumber="18060"
                mdcId="mdc">
          <PatternLayout pattern="%d{ISO8601} level=%p logger=%c thread=%t msg=%m %X%n"/>
        </Syslog>
      </Appenders>
    
      <Loggers>
        <!-- Send NameNode logs to syslog -->
        <Logger name="org.apache.hadoop.hdfs.server.namenode" level="info" additivity="false">
          <AppenderRef ref="SYSLOG"/>
        </Logger>
    
        <!-- Or send all Hadoop logs -->
        <Root level="info">
          <AppenderRef ref="SYSLOG"/>
        </Root>
      </Loggers>
    </Configuration>
    
    • Sostituisci <BINDPLANE_HOST_IP> con l'indirizzo IP dell'host Bindplane.
  3. Salva il file.

  4. Riavvia i daemon Hadoop per applicare le modifiche alla configurazione.

Tabella di mappatura UDM

Campo log Mappatura UDM Logic
consentito security_result.action Se "false", l'azione è "BLOCK". Se "true", l'azione è "ALLOW".
auth_type additional.fields.key = "auth_type", additional.fields.value.string_value Estratto dal campo "ugi" utilizzando il pattern grok "%{DATA:suser}@.*auth:%{WORD:auth_type}". Le parentesi e "auth:" vengono rimossi.
chiamare additional.fields.key = "Call#", additional.fields.value.string_value Mappato direttamente.
call_context additional.fields.key = "callerContext", additional.fields.value.string_value Mappato direttamente.
cliIP principal.ip Mappato solo quando esiste il campo "json_data" e viene analizzato correttamente come JSON.
cmd principal.process.command_line Mappato direttamente.
cluster_name target.hostname Utilizzato come nome host di destinazione, se presente.
giorno metadata.event_timestamp.seconds Utilizzato con mese, anno, ore, minuti e secondi per creare event_timestamp.
descrizione metadata.description Mappato direttamente.
autista additional.fields.key = "driver", additional.fields.value.string_value Mappato direttamente.
dst target.ip OR target.hostname OR target.file.full_path Se l'analisi è stata eseguita correttamente come IP, viene mappato all'IP di destinazione. Se il valore inizia con "/user", mappato al percorso del file di destinazione. In caso contrario, viene mappato al nome host di destinazione.
dstport target.port Mappato e convertito direttamente in un numero intero.
enforcer security_result.rule_name Mappato direttamente.
event_count additional.fields.key = "event_count", additional.fields.value.string_value Mappato e convertito direttamente in stringa.
fname src.file.full_path Mappato direttamente.
ore metadata.event_timestamp.seconds Utilizzato con mese, giorno, anno, minuti e secondi per creare event_timestamp.
id additional.fields.key = "id", additional.fields.value.string_value Mappato direttamente.
ip principal.ip Mappato all'IP dell'entità dopo aver rimosso eventuali caratteri "/" iniziali.
json_data Analizzato come JSON. I campi estratti vengono mappati ai campi UDM corrispondenti.
logType additional.fields.key = "logType", additional.fields.value.string_value Mappato direttamente.
messaggio Utilizzato per estrarre vari campi utilizzando i pattern grok.
metodo network.http.method Mappato direttamente.
minuti metadata.event_timestamp.seconds Utilizzato con mese, giorno, anno, ore e secondi per creare event_timestamp.
mese metadata.event_timestamp.seconds Utilizzato con giorno, anno, ore, minuti e secondi per creare event_timestamp.
osservatore observer.hostname OR observer.ip Se l'analisi è stata eseguita correttamente come IP, viene mappato all'IP dell'osservatore. In caso contrario, viene mappato al nome host dell'osservatore.
perm additional.fields.key = "perm", additional.fields.value.string_value Mappato direttamente.
policy security_result.rule_id Mappato e convertito direttamente in stringa.
prodotto metadata.product_name Mappato direttamente.
product_event metadata.product_event_type Mappato direttamente. Se "rename", il campo "dst" viene mappato su "target.file.full_path".
proto network.application_protocol Mappato e convertito direttamente in maiuscolo se non è "webhdfs".
motivo security_result.summary Mappato direttamente.
repo additional.fields.key = "repo", additional.fields.value.string_value Mappato direttamente.
resType additional.fields.key = "resType", additional.fields.value.string_value Mappato direttamente.
result additional.fields.key = "result", additional.fields.value.string_value Mappato e convertito direttamente in stringa.
Riprova additional.fields.key = "Retry#", additional.fields.value.string_value Mappato direttamente.
secondi metadata.event_timestamp.seconds Utilizzato con mese, giorno, anno, ore e minuti per creare event_timestamp.
seq_num additional.fields.key = "seq_num", additional.fields.value.string_value Mappato e convertito direttamente in stringa.
gravità security_result.severity Mappato a diversi livelli di gravità in base al valore: "INFO", "Info", "info" -> "INFORMATIONAL"; "Low", "low", "LOW" -> "LOW"; "error", "Error", "WARN", "Warn" -> "MEDIUM"; "High", "high", "HIGH" -> "HIGH"; "Critical", "critical", "CRITICAL" -> "CRITICAL".
spettro principal.hostname Utilizzato come nome host principale se diverso da "src".
src principal.ip OR principal.hostname OR observer.ip Se l'analisi è stata eseguita correttamente come IP, mappato all'IP principale e all'IP osservatore. In caso contrario, viene mappato al nome host principale.
srcport principal.port Mappato e convertito direttamente in un numero intero.
riepilogo security_result.summary Mappato direttamente.
suser principal.user.userid Mappato direttamente.
Tag additional.fields.key = "tags", additional.fields.value.string_value Mappato direttamente.
thread additional.fields.key = "thread", additional.fields.value.string_value Mappato direttamente.
mancia target.ip Mappato direttamente.
ugi target.hostname Utilizzato come nome host di destinazione se il campo "log_data" non contiene "·".
url target.url Mappato direttamente.
vendor metadata.vendor_name Mappato direttamente.
versione metadata.product_version Mappato direttamente.
anno metadata.event_timestamp.seconds Utilizzato con mese, giorno, ore, minuti e secondi per creare event_timestamp.
N/D metadata.event_type Impostato su "NETWORK_CONNECTION" per impostazione predefinita. Modificato in "STATUS_UPDATE" se non viene identificata alcuna destinazione.
N/D metadata.log_type Imposta "HADOOP".
N/D security_result.alert_state Impostato su "ALERTING" se la gravità è "HIGH" o "CRITICAL".
N/D is_alert Impostato su "true" se la gravità è "HIGH" o "CRITICAL".
N/D is_significant Impostato su "true" se la gravità è "HIGH" o "CRITICAL".

Hai bisogno di ulteriore assistenza? Ricevi risposte dai membri della community e dai professionisti di Google SecOps.