Apache Hadoop-Logs erfassen

Unterstützt in:

In diesem Dokument wird beschrieben, wie Sie Apache Hadoop-Logs mit Bindplane in Google Security Operations aufnehmen. Der Parser extrahiert zuerst Felder aus Roh-Hadoop-Logs mithilfe von Grok-Mustern, die auf gängigen Hadoop-Logformaten basieren. Anschließend werden die extrahierten Felder den entsprechenden Feldern im Schema für einheitliche Datenmodelle (Unified Data Model, UDM) zugeordnet, Datentypkonvertierungen durchgeführt und die Daten mit zusätzlichem Kontext angereichert.

Hinweise

Prüfen Sie, ob folgende Voraussetzungen erfüllt sind:

  • Eine Google SecOps-Instanz
  • Ein Windows 2016- oder höher- oder Linux-Host mit systemd
  • Wenn Sie den Agent hinter einem Proxy ausführen, müssen die Firewallports gemäß den Anforderungen des Bindplane-Agents geöffnet sein.
  • Privilegierter Zugriff auf die Konfigurationsdateien des Apache Hadoop-Clusters

Authentifizierungsdatei für die Aufnahme in Google SecOps abrufen

  1. Melden Sie sich in der Google SecOps-Konsole an.
  2. Rufen Sie die SIEM-Einstellungen > Collection Agents auf.
  3. Laden Sie die Authentifizierungsdatei für die Aufnahme herunter. Speichern Sie die Datei sicher auf dem System, auf dem BindPlane installiert wird.

Google SecOps-Kundennummer abrufen

  1. Melden Sie sich in der Google SecOps-Konsole an.
  2. Rufen Sie die SIEM-Einstellungen > Profile auf.
  3. Kopieren und speichern Sie die Kunden-ID aus dem Bereich Organisationsdetails.

BindPlane-Agent installieren

Installieren Sie den Bindplane-Agent auf Ihrem Windows- oder Linux-Betriebssystem gemäß der folgenden Anleitung.

Fenstereinbau

  1. Öffnen Sie die Eingabeaufforderung oder PowerShell als Administrator.
  2. Führen Sie dazu diesen Befehl aus:

    msiexec /i "https://github.com/observIQ/bindplane-agent/releases/latest/download/observiq-otel-collector.msi" /quiet
    

Linux-Installation

  1. Öffnen Sie ein Terminal mit Root- oder Sudo-Berechtigungen.
  2. Führen Sie dazu diesen Befehl aus:

    sudo sh -c "$(curl -fsSlL https://github.com/observiq/bindplane-agent/releases/latest/download/install_unix.sh)" install_unix.sh
    

Zusätzliche Installationsressourcen

BindPlane-Agent zum Erfassen von Syslog-Daten und Senden an Google SecOps konfigurieren

  1. Konfigurationsdatei aufrufen:

    1. Suchen Sie die Datei config.yaml. Normalerweise befindet sie sich unter Linux im Verzeichnis /etc/bindplane-agent/ oder unter Windows im Installationsverzeichnis.
    2. Öffnen Sie die Datei mit einem Texteditor (z. B. nano, vi oder Notepad).
  2. Bearbeiten Sie die Datei config.yamlso:

    receivers:
      udplog:
        # Replace the port and IP address as required
        listen_address: "0.0.0.0:514"
    
    exporters:
      chronicle/chronicle_w_labels:
        compression: gzip
        # Adjust the path to the credentials file you downloaded in Step 1
        creds_file_path: '/path/to/ingestion-authentication-file.json'
        # Replace with your actual customer ID from Step 2
        customer_id: <CUSTOMER_ID>
        endpoint: malachiteingestion-pa.googleapis.com
        # Add optional ingestion labels for better organization
        log_type: 'HADOOP'
        raw_log_field: body
        ingestion_labels:
    
    service:
      pipelines:
        logs/source0__chronicle_w_labels-0:
          receivers:
            - udplog
          exporters:
            - chronicle/chronicle_w_labels
    
    • Ersetzen Sie den Port und die IP-Adresse nach Bedarf in Ihrer Infrastruktur.
    • Ersetzen Sie <CUSTOMER_ID> durch die tatsächliche Kunden-ID.
    • Aktualisieren Sie /path/to/ingestion-authentication-file.json auf den Pfad, in dem die Authentifizierungsdatei im Abschnitt Get Google SecOps ingestion authentication file (Authentifizierungsdatei für die Google SecOps-Aufnahme abrufen) gespeichert wurde.

Bindplane-Agent neu starten, um die Änderungen zu übernehmen

  • Führen Sie den folgenden Befehl aus, um den Bindplane-Agent unter Linux neu zu starten:

    sudo systemctl restart bindplane-agent
    
  • Um den Bindplane-Agent unter Windows neu zu starten, können Sie entweder die Konsole Dienste verwenden oder den folgenden Befehl eingeben:

    net stop BindPlaneAgent && net start BindPlaneAgent
    

Syslog-Weiterleitung in Apache Hadoop konfigurieren

Apache Hadoop verwendet Log4j für das Logging. Konfigurieren Sie den entsprechenden Syslog-Appender basierend auf Ihrer Log4j-Version, damit Hadoop-Daemons (NameNode, DataNode, ResourceManager, NodeManager usw.) Logs direkt an Ihren Syslog-Empfänger (Bindplane-Host) weiterleiten. Log4j wird über Dateien konfiguriert (keine Weboberfläche).

Option 1: Log4j 1.x-Konfiguration

  1. Suchen Sie die Datei log4j.properties (normalerweise in $HADOOP_CONF_DIR/log4j.properties).
  2. Fügen Sie der Datei die folgende SyslogAppender-Konfiguration hinzu:

    # Syslog appender (UDP example)
    log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
    log4j.appender.SYSLOG.SyslogHost=<BINDPLANE_HOST_IP>:514
    log4j.appender.SYSLOG.Facility=LOCAL0
    log4j.appender.SYSLOG.FacilityPrinting=true
    log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
    log4j.appender.SYSLOG.layout.ConversionPattern=%d{ISO8601} level=%p logger=%c thread=%t msg=%m%n
    
    # Example: send NameNode logs to syslog
    log4j.logger.org.apache.hadoop.hdfs.server.namenode=INFO,SYSLOG
    log4j.additivity.org.apache.hadoop.hdfs.server.namenode=false
    
    # Or attach to root logger to send all Hadoop logs
    # log4j.rootLogger=INFO, SYSLOG
    
  3. Ersetzen Sie <BINDPLANE_HOST_IP> durch die IP-Adresse Ihres Bindplane-Hosts.

  4. Speichern Sie die Datei.

  5. Starten Sie die Hadoop-Daemons neu, um die Konfigurationsänderungen zu übernehmen.

Option 2: Log4j 2.x-Konfiguration

  1. Suchen Sie die Datei log4j2.xml (normalerweise in $HADOOP_CONF_DIR/log4j2.xml).
  2. Fügen Sie der Datei die folgende Syslog-Appender-Konfiguration hinzu:

    <Configuration status="WARN">
      <Appenders>
        <!-- UDP example; for TCP use protocol="TCP" -->
        <Syslog name="SYSLOG" format="RFC5424"
                host="<BINDPLANE_HOST_IP>"
                port="514"
                protocol="UDP"
                facility="LOCAL0"
                appName="hadoop"
                enterpriseNumber="18060"
                mdcId="mdc">
          <PatternLayout pattern="%d{ISO8601} level=%p logger=%c thread=%t msg=%m %X%n"/>
        </Syslog>
      </Appenders>
    
      <Loggers>
        <!-- Send NameNode logs to syslog -->
        <Logger name="org.apache.hadoop.hdfs.server.namenode" level="info" additivity="false">
          <AppenderRef ref="SYSLOG"/>
        </Logger>
    
        <!-- Or send all Hadoop logs -->
        <Root level="info">
          <AppenderRef ref="SYSLOG"/>
        </Root>
      </Loggers>
    </Configuration>
    
    • Ersetzen Sie <BINDPLANE_HOST_IP> durch die IP-Adresse Ihres Bindplane-Hosts.
  3. Speichern Sie die Datei.

  4. Starten Sie die Hadoop-Daemons neu, um die Konfigurationsänderungen zu übernehmen.

UDM-Zuordnungstabelle

Logfeld UDM-Zuordnung Logik
zulässig security_result.action Wenn „false“, ist die Aktion „BLOCK“. Wenn „true“, ist die Aktion „ALLOW“.
auth_type additional.fields.key = "auth_type", additional.fields.value.string_value Aus dem Feld „ugi“ extrahiert mit dem Grok-Muster „%{DATA:suser}@.*auth:%{WORD:auth_type}“. Klammern und „auth:“ werden entfernt.
Anruf additional.fields.key = "Call#", additional.fields.value.string_value Direkt zugeordnet.
call_context additional.fields.key = "callerContext", additional.fields.value.string_value Direkt zugeordnet.
cliIP principal.ip Wird nur zugeordnet, wenn das Feld „json_data“ vorhanden ist und erfolgreich als JSON geparst wurde.
CMD principal.process.command_line Direkt zugeordnet.
cluster_name target.hostname Wird als Zielhostname verwendet, sofern vorhanden.
Tag metadata.event_timestamp.seconds Wird zusammen mit Monat, Jahr, Stunden, Minuten und Sekunden verwendet, um „event_timestamp“ zu erstellen.
Beschreibung metadata.description Direkt zugeordnet.
Fahrer additional.fields.key = "driver", additional.fields.value.string_value Direkt zugeordnet.
dst target.ip ODER target.hostname ODER target.file.full_path Wenn die IP-Adresse erfolgreich geparst wurde, wird sie der Ziel-IP-Adresse zugeordnet. Wenn der Wert mit „/user“ beginnt, wird er dem Zielpfad der Datei zugeordnet. Andernfalls wird der Zielhostname zugeordnet.
dstport target.port Direkt zugeordnet und in eine Ganzzahl konvertiert.
Vollstrecker security_result.rule_name Direkt zugeordnet.
event_count additional.fields.key = "event_count", additional.fields.value.string_value Direkt zugeordnet und in einen String konvertiert.
fname src.file.full_path Direkt zugeordnet.
Stunden metadata.event_timestamp.seconds Wird zusammen mit Monat, Tag, Jahr, Minuten und Sekunden verwendet, um „event_timestamp“ zu erstellen.
id additional.fields.key = "id", additional.fields.value.string_value Direkt zugeordnet.
ip principal.ip Wird der Haupt-IP zugeordnet, nachdem alle führenden „/“-Zeichen entfernt wurden.
json_data Als JSON geparst. Extrahierte Felder werden den entsprechenden UDM-Feldern zugeordnet.
logType additional.fields.key = "logType", additional.fields.value.string_value Direkt zugeordnet.
Nachricht Wird zum Extrahieren verschiedener Felder mithilfe von Grok-Mustern verwendet.
Methode network.http.method Direkt zugeordnet.
Minuten metadata.event_timestamp.seconds Wird zusammen mit Monat, Tag, Jahr, Stunden und Sekunden verwendet, um „event_timestamp“ zu erstellen.
Monat metadata.event_timestamp.seconds Wird zusammen mit „day“, „year“, „hours“, „minutes“ und „seconds“ verwendet, um „event_timestamp“ zu erstellen.
Beobachter observer.hostname ODER observer.ip Wenn die IP-Adresse erfolgreich geparst wurde, wird sie der IP-Adresse des Beobachters zugeordnet. Andernfalls wird der Hostname des Beobachters zugeordnet.
Dauerwelle additional.fields.key = "perm", additional.fields.value.string_value Direkt zugeordnet.
Richtlinie security_result.rule_id Direkt zugeordnet und in einen String konvertiert.
Produkt metadata.product_name Direkt zugeordnet.
product_event metadata.product_event_type Direkt zugeordnet. Bei „rename“ wird das Feld „dst“ dem Feld „target.file.full_path“ zugeordnet.
Proto network.application_protocol Direkt zugeordnet und in Großbuchstaben konvertiert, wenn es sich nicht um „webhdfs“ handelt.
reason security_result.summary Direkt zugeordnet.
Repository additional.fields.key = "repo", additional.fields.value.string_value Direkt zugeordnet.
resType additional.fields.key = "resType", additional.fields.value.string_value Direkt zugeordnet.
Ergebnis additional.fields.key = "result", additional.fields.value.string_value Direkt zugeordnet und in einen String konvertiert.
Wiederholen additional.fields.key = "Retry#", additional.fields.value.string_value Direkt zugeordnet.
Sekunden metadata.event_timestamp.seconds Wird zusammen mit Monat, Tag, Jahr, Stunden und Minuten verwendet, um „event_timestamp“ zu erstellen.
seq_num additional.fields.key = "seq_num", additional.fields.value.string_value Direkt zugeordnet und in einen String konvertiert.
die Ausprägung security_result.severity Wird je nach Wert unterschiedlichen Schweregraden zugeordnet: „INFO“, „Info“, „info“ –> „INFORMATIONAL“; „Low“, „low“, „LOW“ –> „LOW“; „error“, „Error“, „WARN“, „Warn“ –> „MEDIUM“; „High“, „high“, „HIGH“ –> „HIGH“; „Critical“, „critical“, „CRITICAL“ –> „CRITICAL“.
shost principal.hostname Wird als primärer Hostname verwendet, wenn er sich von „src“ unterscheidet.
src principal.ip ODER principal.hostname ODER observer.ip Wenn die IP-Adresse erfolgreich geparst wurde, wird sie der IP-Adresse des Hauptkontos und des Beobachters zugeordnet. Andernfalls wird der Hostname des Prinzipal verwendet.
srcport principal.port Direkt zugeordnet und in eine Ganzzahl konvertiert.
Zusammenfassung security_result.summary Direkt zugeordnet.
suser principal.user.userid Direkt zugeordnet.
Tags additional.fields.key = "tags", additional.fields.value.string_value Direkt zugeordnet.
Thread additional.fields.key = "thread", additional.fields.value.string_value Direkt zugeordnet.
Tipp target.ip Direkt zugeordnet.
ugi target.hostname Wird als Zielhostname verwendet, wenn das Feld „log_data“ kein „·“ enthält.
URL target.url Direkt zugeordnet.
vendor metadata.vendor_name Direkt zugeordnet.
Version metadata.product_version Direkt zugeordnet.
Jahr metadata.event_timestamp.seconds Wird zusammen mit Monat, Tag, Stunden, Minuten und Sekunden verwendet, um „event_timestamp“ zu erstellen.
metadata.event_type Standardmäßig auf „NETWORK_CONNECTION“ festgelegt. Wird in „STATUS_UPDATE“ geändert, wenn kein Ziel angegeben ist.
metadata.log_type Legen Sie diesen Wert auf „HADOOP“ fest.
security_result.alert_state Wird auf „ALERTING“ gesetzt, wenn der Schweregrad „HIGH“ oder „CRITICAL“ ist.
is_alert Auf „true“ gesetzt, wenn der Schweregrad „HIGH“ oder „CRITICAL“ ist.
is_significant Auf „true“ gesetzt, wenn der Schweregrad „HIGH“ oder „CRITICAL“ ist.

Benötigen Sie weitere Hilfe? Antworten von Community-Mitgliedern und Google SecOps-Experten erhalten