Collecter les journaux Apache Hadoop

Compatible avec :

Ce document explique comment ingérer des journaux Apache Hadoop dans Google Security Operations à l'aide de Bindplane. Le parseur extrait d'abord les champs des journaux Hadoop bruts à l'aide de modèles Grok basés sur les formats de journaux Hadoop courants. Il mappe ensuite les champs extraits aux champs correspondants du schéma UDM (Unified Data Model), effectue des conversions de type de données et enrichit les données avec un contexte supplémentaire.

Avant de commencer

Assurez-vous de remplir les conditions suivantes :

  • Une instance Google SecOps
  • Un hôte Windows 2016 ou version ultérieure, ou Linux avec systemd
  • Si vous exécutez l'agent derrière un proxy, assurez-vous que les ports de pare-feu sont ouverts conformément aux exigences de l'agent Bindplane.
  • Accès privilégié aux fichiers de configuration du cluster Apache Hadoop

Obtenir le fichier d'authentification d'ingestion Google SecOps

  1. Connectez-vous à la console Google SecOps.
  2. Accédez à Paramètres du SIEM > Agents de collecte.
  3. Téléchargez le fichier d'authentification d'ingestion. Enregistrez le fichier de manière sécurisée sur le système sur lequel Bindplane sera installé.

Obtenir l'ID client Google SecOps

  1. Connectez-vous à la console Google SecOps.
  2. Accédez à Paramètres SIEM> Profil.
  3. Copiez et enregistrez le numéro client de la section Informations sur l'organisation.

Installer l'agent Bindplane

Installez l'agent Bindplane sur votre système d'exploitation Windows ou Linux en suivant les instructions ci-dessous.

Installation de fenêtres

  1. Ouvrez l'invite de commandes ou PowerShell en tant qu'administrateur.
  2. Exécutez la commande suivante :

    msiexec /i "https://github.com/observIQ/bindplane-agent/releases/latest/download/observiq-otel-collector.msi" /quiet
    

Installation de Linux

  1. Ouvrez un terminal avec les droits root ou sudo.
  2. Exécutez la commande suivante :

    sudo sh -c "$(curl -fsSlL https://github.com/observiq/bindplane-agent/releases/latest/download/install_unix.sh)" install_unix.sh
    

Autres ressources d'installation

Configurer l'agent Bindplane pour ingérer Syslog et l'envoyer à Google SecOps

  1. Accédez au fichier de configuration :

    1. Recherchez le fichier config.yaml. Il se trouve généralement dans le répertoire /etc/bindplane-agent/ sous Linux ou dans le répertoire d'installation sous Windows.
    2. Ouvrez le fichier à l'aide d'un éditeur de texte (par exemple, nano, vi ou le Bloc-notes).
  2. Modifiez le fichier config.yaml comme suit :

    receivers:
      udplog:
        # Replace the port and IP address as required
        listen_address: "0.0.0.0:514"
    
    exporters:
      chronicle/chronicle_w_labels:
        compression: gzip
        # Adjust the path to the credentials file you downloaded in Step 1
        creds_file_path: '/path/to/ingestion-authentication-file.json'
        # Replace with your actual customer ID from Step 2
        customer_id: <CUSTOMER_ID>
        endpoint: malachiteingestion-pa.googleapis.com
        # Add optional ingestion labels for better organization
        log_type: 'HADOOP'
        raw_log_field: body
        ingestion_labels:
    
    service:
      pipelines:
        logs/source0__chronicle_w_labels-0:
          receivers:
            - udplog
          exporters:
            - chronicle/chronicle_w_labels
    
    • Remplacez le port et l'adresse IP selon les besoins de votre infrastructure.
    • Remplacez <CUSTOMER_ID> par le numéro client réel.
    • Mettez à jour /path/to/ingestion-authentication-file.json en indiquant le chemin d'accès où le fichier d'authentification a été enregistré dans la section Obtenir le fichier d'authentification pour l'ingestion Google SecOps.

Redémarrez l'agent Bindplane pour appliquer les modifications.

  • Pour redémarrer l'agent Bindplane sous Linux, exécutez la commande suivante :

    sudo systemctl restart bindplane-agent
    
  • Pour redémarrer l'agent Bindplane sous Windows, vous pouvez utiliser la console Services ou saisir la commande suivante :

    net stop BindPlaneAgent && net start BindPlaneAgent
    

Configurer le transfert Syslog sur Apache Hadoop

Apache Hadoop utilise Log4j pour la journalisation. Configurez l'appender Syslog approprié en fonction de votre version de Log4j afin que les daemons Hadoop (NameNode, DataNode, ResourceManager, NodeManager, etc.) transfèrent les journaux directement vers votre récepteur syslog (hôte Bindplane). Log4j est configuré via des fichiers (pas d'interface utilisateur Web).

Option 1 : Configuration de Log4j 1.x

  1. Recherchez le fichier log4j.properties (généralement dans $HADOOP_CONF_DIR/log4j.properties).
  2. Ajoutez la configuration SyslogAppender suivante au fichier :

    # Syslog appender (UDP example)
    log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
    log4j.appender.SYSLOG.SyslogHost=<BINDPLANE_HOST_IP>:514
    log4j.appender.SYSLOG.Facility=LOCAL0
    log4j.appender.SYSLOG.FacilityPrinting=true
    log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
    log4j.appender.SYSLOG.layout.ConversionPattern=%d{ISO8601} level=%p logger=%c thread=%t msg=%m%n
    
    # Example: send NameNode logs to syslog
    log4j.logger.org.apache.hadoop.hdfs.server.namenode=INFO,SYSLOG
    log4j.additivity.org.apache.hadoop.hdfs.server.namenode=false
    
    # Or attach to root logger to send all Hadoop logs
    # log4j.rootLogger=INFO, SYSLOG
    
  3. Remplacez <BINDPLANE_HOST_IP> par l'adresse IP de votre hôte Bindplane.

  4. Enregistrez le fichier.

  5. Redémarrez les daemons Hadoop pour appliquer les modifications de configuration.

Option 2 : Configuration Log4j 2.x

  1. Recherchez le fichier log4j2.xml (généralement dans $HADOOP_CONF_DIR/log4j2.xml).
  2. Ajoutez la configuration Syslog appender suivante au fichier :

    <Configuration status="WARN">
      <Appenders>
        <!-- UDP example; for TCP use protocol="TCP" -->
        <Syslog name="SYSLOG" format="RFC5424"
                host="<BINDPLANE_HOST_IP>"
                port="514"
                protocol="UDP"
                facility="LOCAL0"
                appName="hadoop"
                enterpriseNumber="18060"
                mdcId="mdc">
          <PatternLayout pattern="%d{ISO8601} level=%p logger=%c thread=%t msg=%m %X%n"/>
        </Syslog>
      </Appenders>
    
      <Loggers>
        <!-- Send NameNode logs to syslog -->
        <Logger name="org.apache.hadoop.hdfs.server.namenode" level="info" additivity="false">
          <AppenderRef ref="SYSLOG"/>
        </Logger>
    
        <!-- Or send all Hadoop logs -->
        <Root level="info">
          <AppenderRef ref="SYSLOG"/>
        </Root>
      </Loggers>
    </Configuration>
    
    • Remplacez <BINDPLANE_HOST_IP> par l'adresse IP de votre hôte Bindplane.
  3. Enregistrez le fichier.

  4. Redémarrez les daemons Hadoop pour appliquer les modifications de configuration.

Table de mappage UDM

Champ de journal Mappage UDM Logique
Autorisés security_result.action Si la valeur est "false", l'action est "BLOCK". Si la valeur est "true", l'action est "ALLOW".
auth_type additional.fields.key = "auth_type", additional.fields.value.string_value Extrait du champ "ugi" à l'aide du modèle Grok "%{DATA:suser}@.*auth:%{WORD:auth_type}". Les parenthèses et "auth:" sont supprimés.
call additional.fields.key = "Call#", additional.fields.value.string_value Mappé directement.
call_context additional.fields.key = "callerContext", additional.fields.value.string_value Mappé directement.
cliIP principal.ip Mappé uniquement lorsque le champ "json_data" existe et est correctement analysé au format JSON.
cmd principal.process.command_line Mappé directement.
cluster_name target.hostname Utilisé comme nom d'hôte cible, le cas échéant.
jour metadata.event_timestamp.seconds Utilisé avec le mois, l'année, les heures, les minutes et les secondes pour construire event_timestamp.
description metadata.description Mappé directement.
conducteur additional.fields.key = "driver", additional.fields.value.string_value Mappé directement.
dst target.ip OR target.hostname OR target.file.full_path Si l'adresse est correctement analysée en tant qu'adresse IP, elle est mappée sur l'adresse IP cible. Si la valeur commence par "/user", elle est mappée au chemin d'accès du fichier cible. Sinon, il est mappé au nom d'hôte cible.
dstport target.port Mappé directement et converti en entier.
Enforcer security_result.rule_name Mappé directement.
event_count additional.fields.key = "event_count", additional.fields.value.string_value Mappé directement et converti en chaîne.
fname src.file.full_path Mappé directement.
heures metadata.event_timestamp.seconds Utilisé avec le mois, le jour, l'année, les minutes et les secondes pour construire event_timestamp.
id additional.fields.key = "id", additional.fields.value.string_value Mappé directement.
ip principal.ip Mappé à l'adresse IP principale après suppression de tout caractère "/" en début de chaîne.
json_data Analysé au format JSON. Les champs extraits sont mappés sur les champs UDM correspondants.
logType additional.fields.key = "logType", additional.fields.value.string_value Mappé directement.
message Utilisé pour extraire différents champs à l'aide de modèles Grok.
méthode network.http.method Mappé directement.
minutes metadata.event_timestamp.seconds Utilisé avec le mois, le jour, l'année, les heures et les secondes pour construire event_timestamp.
mois metadata.event_timestamp.seconds Utilisé avec le jour, l'année, les heures, les minutes et les secondes pour construire event_timestamp.
observateur observer.hostname OU observer.ip Si l'adresse IP est analysée avec succès, elle est mappée sur l'adresse IP de l'observateur. Sinon, mappé au nom d'hôte de l'observateur.
perm additional.fields.key = "perm", additional.fields.value.string_value Mappé directement.
stratégie security_result.rule_id Mappé directement et converti en chaîne.
produit metadata.product_name Mappé directement.
product_event metadata.product_event_type Mappé directement. Si la valeur est "rename", le champ "dst" est mappé sur "target.file.full_path".
proto network.application_protocol Directement mappé et converti en majuscules s'il ne s'agit pas de "webhdfs".
reason security_result.summary Mappé directement.
dépôt additional.fields.key = "repo", additional.fields.value.string_value Mappé directement.
resType additional.fields.key = "resType", additional.fields.value.string_value Mappé directement.
résultat additional.fields.key = "result", additional.fields.value.string_value Mappé directement et converti en chaîne.
Réessayer additional.fields.key = "Retry#", additional.fields.value.string_value Mappé directement.
seconds metadata.event_timestamp.seconds Utilisé avec le mois, le jour, l'année, les heures et les minutes pour construire event_timestamp.
seq_num additional.fields.key = "seq_num", additional.fields.value.string_value Mappé directement et converti en chaîne.
de gravité, security_result.severity Mappé à différents niveaux de gravité en fonction de la valeur : "INFO", "Info", "info" → "INFORMATIONAL" ; "Low", "low", "LOW" → "LOW" ; "error", "Error", "WARN", "Warn" → "MEDIUM" ; "High", "high", "HIGH" → "HIGH" ; "Critical", "critical", "CRITICAL" → "CRITICAL".
shost principal.hostname Utilisé comme nom d'hôte principal s'il est différent de "src".
src principal.ip OU principal.hostname OU observer.ip Si l'adresse IP a été analysée avec succès, elle est mappée sur l'adresse IP du compte principal et de l'observateur. Sinon, mappé au nom d'hôte principal.
srcport principal.port Mappé directement et converti en entier.
résumé security_result.summary Mappé directement.
suser principal.user.userid Mappé directement.
tags additional.fields.key = "tags", additional.fields.value.string_value Mappé directement.
fil de discussion additional.fields.key = "thread", additional.fields.value.string_value Mappé directement.
tip target.ip Mappé directement.
ugi target.hostname Utilisé comme nom d'hôte cible si le champ"log_data" ne contient pas "·".
url target.url Mappé directement.
vendor metadata.vendor_name Mappé directement.
version metadata.product_version Mappé directement.
an metadata.event_timestamp.seconds Utilisé avec le mois, le jour, les heures, les minutes et les secondes pour construire event_timestamp.
N/A metadata.event_type La valeur par défaut est "NETWORK_CONNECTION". La valeur est remplacée par "STATUS_UPDATE" si aucune cible n'est identifiée.
N/A metadata.log_type Définissez-le sur "HADOOP".
N/A security_result.alert_state Définie sur "ALERTING" si la gravité est "HIGH" ou "CRITICAL".
N/A is_alert Définissez sur "true" si la gravité est "HIGH" (ÉLEVÉE) ou "CRITICAL" (CRITIQUE).
N/A is_significant Définissez sur "true" si la gravité est "HIGH" (ÉLEVÉE) ou "CRITICAL" (CRITIQUE).

Vous avez encore besoin d'aide ? Obtenez des réponses de membres de la communauté et de professionnels Google SecOps.