Apache Hadoop-Logs erfassen
In diesem Dokument wird beschrieben, wie Sie Apache Hadoop-Logs mit Bindplane in Google Security Operations aufnehmen. Der Parser extrahiert zuerst Felder aus Roh-Hadoop-Logs mithilfe von Grok-Mustern, die auf gängigen Hadoop-Logformaten basieren. Anschließend werden die extrahierten Felder den entsprechenden Feldern im Schema für einheitliche Datenmodelle (Unified Data Model, UDM) zugeordnet, Datentypkonvertierungen durchgeführt und die Daten mit zusätzlichem Kontext angereichert.
Hinweise
Prüfen Sie, ob folgende Voraussetzungen erfüllt sind:
- Eine Google SecOps-Instanz
- Ein Windows 2016- oder höher- oder Linux-Host mit
systemd - Wenn Sie den Agent hinter einem Proxy ausführen, müssen die Firewallports gemäß den Anforderungen des Bindplane-Agents geöffnet sein.
- Privilegierter Zugriff auf die Konfigurationsdateien des Apache Hadoop-Clusters
Authentifizierungsdatei für die Aufnahme in Google SecOps abrufen
- Melden Sie sich in der Google SecOps-Konsole an.
- Rufen Sie die SIEM-Einstellungen > Collection Agents auf.
- Laden Sie die Authentifizierungsdatei für die Aufnahme herunter. Speichern Sie die Datei sicher auf dem System, auf dem BindPlane installiert wird.
Google SecOps-Kundennummer abrufen
- Melden Sie sich in der Google SecOps-Konsole an.
- Rufen Sie die SIEM-Einstellungen > Profile auf.
- Kopieren und speichern Sie die Kunden-ID aus dem Bereich Organisationsdetails.
BindPlane-Agent installieren
Installieren Sie den Bindplane-Agent auf Ihrem Windows- oder Linux-Betriebssystem gemäß der folgenden Anleitung.
Fenstereinbau
- Öffnen Sie die Eingabeaufforderung oder PowerShell als Administrator.
Führen Sie dazu diesen Befehl aus:
msiexec /i "https://github.com/observIQ/bindplane-agent/releases/latest/download/observiq-otel-collector.msi" /quiet
Linux-Installation
- Öffnen Sie ein Terminal mit Root- oder Sudo-Berechtigungen.
Führen Sie dazu diesen Befehl aus:
sudo sh -c "$(curl -fsSlL https://github.com/observiq/bindplane-agent/releases/latest/download/install_unix.sh)" install_unix.sh
Zusätzliche Installationsressourcen
- Weitere Installationsoptionen finden Sie in diesem Installationsleitfaden.
BindPlane-Agent zum Erfassen von Syslog-Daten und Senden an Google SecOps konfigurieren
Konfigurationsdatei aufrufen:
- Suchen Sie die Datei
config.yaml. Normalerweise befindet sie sich unter Linux im Verzeichnis/etc/bindplane-agent/oder unter Windows im Installationsverzeichnis. - Öffnen Sie die Datei mit einem Texteditor (z. B.
nano,vioder Notepad).
- Suchen Sie die Datei
Bearbeiten Sie die Datei
config.yamlso:receivers: udplog: # Replace the port and IP address as required listen_address: "0.0.0.0:514" exporters: chronicle/chronicle_w_labels: compression: gzip # Adjust the path to the credentials file you downloaded in Step 1 creds_file_path: '/path/to/ingestion-authentication-file.json' # Replace with your actual customer ID from Step 2 customer_id: <CUSTOMER_ID> endpoint: malachiteingestion-pa.googleapis.com # Add optional ingestion labels for better organization log_type: 'HADOOP' raw_log_field: body ingestion_labels: service: pipelines: logs/source0__chronicle_w_labels-0: receivers: - udplog exporters: - chronicle/chronicle_w_labels- Ersetzen Sie den Port und die IP-Adresse nach Bedarf in Ihrer Infrastruktur.
- Ersetzen Sie
<CUSTOMER_ID>durch die tatsächliche Kunden-ID. - Aktualisieren Sie
/path/to/ingestion-authentication-file.jsonauf den Pfad, in dem die Authentifizierungsdatei im Abschnitt Get Google SecOps ingestion authentication file (Authentifizierungsdatei für die Google SecOps-Aufnahme abrufen) gespeichert wurde.
Bindplane-Agent neu starten, um die Änderungen zu übernehmen
Führen Sie den folgenden Befehl aus, um den Bindplane-Agent unter Linux neu zu starten:
sudo systemctl restart bindplane-agentUm den Bindplane-Agent unter Windows neu zu starten, können Sie entweder die Konsole Dienste verwenden oder den folgenden Befehl eingeben:
net stop BindPlaneAgent && net start BindPlaneAgent
Syslog-Weiterleitung in Apache Hadoop konfigurieren
Apache Hadoop verwendet Log4j für das Logging. Konfigurieren Sie den entsprechenden Syslog-Appender basierend auf Ihrer Log4j-Version, damit Hadoop-Daemons (NameNode, DataNode, ResourceManager, NodeManager usw.) Logs direkt an Ihren Syslog-Empfänger (Bindplane-Host) weiterleiten. Log4j wird über Dateien konfiguriert (keine Weboberfläche).
Option 1: Log4j 1.x-Konfiguration
- Suchen Sie die Datei log4j.properties (normalerweise in
$HADOOP_CONF_DIR/log4j.properties). Fügen Sie der Datei die folgende SyslogAppender-Konfiguration hinzu:
# Syslog appender (UDP example) log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender log4j.appender.SYSLOG.SyslogHost=<BINDPLANE_HOST_IP>:514 log4j.appender.SYSLOG.Facility=LOCAL0 log4j.appender.SYSLOG.FacilityPrinting=true log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout log4j.appender.SYSLOG.layout.ConversionPattern=%d{ISO8601} level=%p logger=%c thread=%t msg=%m%n # Example: send NameNode logs to syslog log4j.logger.org.apache.hadoop.hdfs.server.namenode=INFO,SYSLOG log4j.additivity.org.apache.hadoop.hdfs.server.namenode=false # Or attach to root logger to send all Hadoop logs # log4j.rootLogger=INFO, SYSLOGErsetzen Sie
<BINDPLANE_HOST_IP>durch die IP-Adresse Ihres Bindplane-Hosts.Speichern Sie die Datei.
Starten Sie die Hadoop-Daemons neu, um die Konfigurationsänderungen zu übernehmen.
Option 2: Log4j 2.x-Konfiguration
- Suchen Sie die Datei log4j2.xml (normalerweise in
$HADOOP_CONF_DIR/log4j2.xml). Fügen Sie der Datei die folgende Syslog-Appender-Konfiguration hinzu:
<Configuration status="WARN"> <Appenders> <!-- UDP example; for TCP use protocol="TCP" --> <Syslog name="SYSLOG" format="RFC5424" host="<BINDPLANE_HOST_IP>" port="514" protocol="UDP" facility="LOCAL0" appName="hadoop" enterpriseNumber="18060" mdcId="mdc"> <PatternLayout pattern="%d{ISO8601} level=%p logger=%c thread=%t msg=%m %X%n"/> </Syslog> </Appenders> <Loggers> <!-- Send NameNode logs to syslog --> <Logger name="org.apache.hadoop.hdfs.server.namenode" level="info" additivity="false"> <AppenderRef ref="SYSLOG"/> </Logger> <!-- Or send all Hadoop logs --> <Root level="info"> <AppenderRef ref="SYSLOG"/> </Root> </Loggers> </Configuration>- Ersetzen Sie
<BINDPLANE_HOST_IP>durch die IP-Adresse Ihres Bindplane-Hosts.
- Ersetzen Sie
Speichern Sie die Datei.
Starten Sie die Hadoop-Daemons neu, um die Konfigurationsänderungen zu übernehmen.
UDM-Zuordnungstabelle
| Logfeld | UDM-Zuordnung | Logik |
|---|---|---|
| zulässig | security_result.action | Wenn „false“, ist die Aktion „BLOCK“. Wenn „true“, ist die Aktion „ALLOW“. |
| auth_type | additional.fields.key = "auth_type", additional.fields.value.string_value | Aus dem Feld „ugi“ extrahiert mit dem Grok-Muster „%{DATA:suser}@.*auth:%{WORD:auth_type}“. Klammern und „auth:“ werden entfernt. |
| Anruf | additional.fields.key = "Call#", additional.fields.value.string_value | Direkt zugeordnet. |
| call_context | additional.fields.key = "callerContext", additional.fields.value.string_value | Direkt zugeordnet. |
| cliIP | principal.ip | Wird nur zugeordnet, wenn das Feld „json_data“ vorhanden ist und erfolgreich als JSON geparst wurde. |
| CMD | principal.process.command_line | Direkt zugeordnet. |
| cluster_name | target.hostname | Wird als Zielhostname verwendet, sofern vorhanden. |
| Tag | metadata.event_timestamp.seconds | Wird zusammen mit Monat, Jahr, Stunden, Minuten und Sekunden verwendet, um „event_timestamp“ zu erstellen. |
| Beschreibung | metadata.description | Direkt zugeordnet. |
| Fahrer | additional.fields.key = "driver", additional.fields.value.string_value | Direkt zugeordnet. |
| dst | target.ip ODER target.hostname ODER target.file.full_path | Wenn die IP-Adresse erfolgreich geparst wurde, wird sie der Ziel-IP-Adresse zugeordnet. Wenn der Wert mit „/user“ beginnt, wird er dem Zielpfad der Datei zugeordnet. Andernfalls wird der Zielhostname zugeordnet. |
| dstport | target.port | Direkt zugeordnet und in eine Ganzzahl konvertiert. |
| Vollstrecker | security_result.rule_name | Direkt zugeordnet. |
| event_count | additional.fields.key = "event_count", additional.fields.value.string_value | Direkt zugeordnet und in einen String konvertiert. |
| fname | src.file.full_path | Direkt zugeordnet. |
| Stunden | metadata.event_timestamp.seconds | Wird zusammen mit Monat, Tag, Jahr, Minuten und Sekunden verwendet, um „event_timestamp“ zu erstellen. |
| id | additional.fields.key = "id", additional.fields.value.string_value | Direkt zugeordnet. |
| ip | principal.ip | Wird der Haupt-IP zugeordnet, nachdem alle führenden „/“-Zeichen entfernt wurden. |
| json_data | Als JSON geparst. Extrahierte Felder werden den entsprechenden UDM-Feldern zugeordnet. | |
| logType | additional.fields.key = "logType", additional.fields.value.string_value | Direkt zugeordnet. |
| Nachricht | Wird zum Extrahieren verschiedener Felder mithilfe von Grok-Mustern verwendet. | |
| Methode | network.http.method | Direkt zugeordnet. |
| Minuten | metadata.event_timestamp.seconds | Wird zusammen mit Monat, Tag, Jahr, Stunden und Sekunden verwendet, um „event_timestamp“ zu erstellen. |
| Monat | metadata.event_timestamp.seconds | Wird zusammen mit „day“, „year“, „hours“, „minutes“ und „seconds“ verwendet, um „event_timestamp“ zu erstellen. |
| Beobachter | observer.hostname ODER observer.ip | Wenn die IP-Adresse erfolgreich geparst wurde, wird sie der IP-Adresse des Beobachters zugeordnet. Andernfalls wird der Hostname des Beobachters zugeordnet. |
| Dauerwelle | additional.fields.key = "perm", additional.fields.value.string_value | Direkt zugeordnet. |
| Richtlinie | security_result.rule_id | Direkt zugeordnet und in einen String konvertiert. |
| Produkt | metadata.product_name | Direkt zugeordnet. |
| product_event | metadata.product_event_type | Direkt zugeordnet. Bei „rename“ wird das Feld „dst“ dem Feld „target.file.full_path“ zugeordnet. |
| Proto | network.application_protocol | Direkt zugeordnet und in Großbuchstaben konvertiert, wenn es sich nicht um „webhdfs“ handelt. |
| reason | security_result.summary | Direkt zugeordnet. |
| Repository | additional.fields.key = "repo", additional.fields.value.string_value | Direkt zugeordnet. |
| resType | additional.fields.key = "resType", additional.fields.value.string_value | Direkt zugeordnet. |
| Ergebnis | additional.fields.key = "result", additional.fields.value.string_value | Direkt zugeordnet und in einen String konvertiert. |
| Wiederholen | additional.fields.key = "Retry#", additional.fields.value.string_value | Direkt zugeordnet. |
| Sekunden | metadata.event_timestamp.seconds | Wird zusammen mit Monat, Tag, Jahr, Stunden und Minuten verwendet, um „event_timestamp“ zu erstellen. |
| seq_num | additional.fields.key = "seq_num", additional.fields.value.string_value | Direkt zugeordnet und in einen String konvertiert. |
| die Ausprägung | security_result.severity | Wird je nach Wert unterschiedlichen Schweregraden zugeordnet: „INFO“, „Info“, „info“ –> „INFORMATIONAL“; „Low“, „low“, „LOW“ –> „LOW“; „error“, „Error“, „WARN“, „Warn“ –> „MEDIUM“; „High“, „high“, „HIGH“ –> „HIGH“; „Critical“, „critical“, „CRITICAL“ –> „CRITICAL“. |
| shost | principal.hostname | Wird als primärer Hostname verwendet, wenn er sich von „src“ unterscheidet. |
| src | principal.ip ODER principal.hostname ODER observer.ip | Wenn die IP-Adresse erfolgreich geparst wurde, wird sie der IP-Adresse des Hauptkontos und des Beobachters zugeordnet. Andernfalls wird der Hostname des Prinzipal verwendet. |
| srcport | principal.port | Direkt zugeordnet und in eine Ganzzahl konvertiert. |
| Zusammenfassung | security_result.summary | Direkt zugeordnet. |
| suser | principal.user.userid | Direkt zugeordnet. |
| Tags | additional.fields.key = "tags", additional.fields.value.string_value | Direkt zugeordnet. |
| Thread | additional.fields.key = "thread", additional.fields.value.string_value | Direkt zugeordnet. |
| Tipp | target.ip | Direkt zugeordnet. |
| ugi | target.hostname | Wird als Zielhostname verwendet, wenn das Feld „log_data“ kein „·“ enthält. |
| URL | target.url | Direkt zugeordnet. |
| vendor | metadata.vendor_name | Direkt zugeordnet. |
| Version | metadata.product_version | Direkt zugeordnet. |
| Jahr | metadata.event_timestamp.seconds | Wird zusammen mit Monat, Tag, Stunden, Minuten und Sekunden verwendet, um „event_timestamp“ zu erstellen. |
| – | metadata.event_type | Standardmäßig auf „NETWORK_CONNECTION“ festgelegt. Wird in „STATUS_UPDATE“ geändert, wenn kein Ziel angegeben ist. |
| – | metadata.log_type | Legen Sie diesen Wert auf „HADOOP“ fest. |
| – | security_result.alert_state | Wird auf „ALERTING“ gesetzt, wenn der Schweregrad „HIGH“ oder „CRITICAL“ ist. |
| – | is_alert | Auf „true“ gesetzt, wenn der Schweregrad „HIGH“ oder „CRITICAL“ ist. |
| – | is_significant | Auf „true“ gesetzt, wenn der Schweregrad „HIGH“ oder „CRITICAL“ ist. |
Benötigen Sie weitere Hilfe? Antworten von Community-Mitgliedern und Google SecOps-Experten erhalten