Collecter les journaux Apache Hadoop
Ce document explique comment ingérer des journaux Apache Hadoop dans Google Security Operations à l'aide de Bindplane. Le parseur extrait d'abord les champs des journaux Hadoop bruts à l'aide de modèles Grok basés sur les formats de journaux Hadoop courants. Il mappe ensuite les champs extraits aux champs correspondants du schéma UDM (Unified Data Model), effectue des conversions de type de données et enrichit les données avec un contexte supplémentaire.
Avant de commencer
Assurez-vous de remplir les conditions suivantes :
- Une instance Google SecOps
- Un hôte Windows 2016 ou version ultérieure, ou Linux avec
systemd - Si vous exécutez l'agent derrière un proxy, assurez-vous que les ports de pare-feu sont ouverts conformément aux exigences de l'agent Bindplane.
- Accès privilégié aux fichiers de configuration du cluster Apache Hadoop
Obtenir le fichier d'authentification d'ingestion Google SecOps
- Connectez-vous à la console Google SecOps.
- Accédez à Paramètres du SIEM > Agents de collecte.
- Téléchargez le fichier d'authentification d'ingestion. Enregistrez le fichier de manière sécurisée sur le système sur lequel Bindplane sera installé.
Obtenir l'ID client Google SecOps
- Connectez-vous à la console Google SecOps.
- Accédez à Paramètres SIEM> Profil.
- Copiez et enregistrez le numéro client de la section Informations sur l'organisation.
Installer l'agent Bindplane
Installez l'agent Bindplane sur votre système d'exploitation Windows ou Linux en suivant les instructions ci-dessous.
Installation de fenêtres
- Ouvrez l'invite de commandes ou PowerShell en tant qu'administrateur.
Exécutez la commande suivante :
msiexec /i "https://github.com/observIQ/bindplane-agent/releases/latest/download/observiq-otel-collector.msi" /quiet
Installation de Linux
- Ouvrez un terminal avec les droits root ou sudo.
Exécutez la commande suivante :
sudo sh -c "$(curl -fsSlL https://github.com/observiq/bindplane-agent/releases/latest/download/install_unix.sh)" install_unix.sh
Autres ressources d'installation
- Pour plus d'options d'installation, consultez ce guide d'installation.
Configurer l'agent Bindplane pour ingérer Syslog et l'envoyer à Google SecOps
Accédez au fichier de configuration :
- Recherchez le fichier
config.yaml. Il se trouve généralement dans le répertoire/etc/bindplane-agent/sous Linux ou dans le répertoire d'installation sous Windows. - Ouvrez le fichier à l'aide d'un éditeur de texte (par exemple,
nano,viou le Bloc-notes).
- Recherchez le fichier
Modifiez le fichier
config.yamlcomme suit :receivers: udplog: # Replace the port and IP address as required listen_address: "0.0.0.0:514" exporters: chronicle/chronicle_w_labels: compression: gzip # Adjust the path to the credentials file you downloaded in Step 1 creds_file_path: '/path/to/ingestion-authentication-file.json' # Replace with your actual customer ID from Step 2 customer_id: <CUSTOMER_ID> endpoint: malachiteingestion-pa.googleapis.com # Add optional ingestion labels for better organization log_type: 'HADOOP' raw_log_field: body ingestion_labels: service: pipelines: logs/source0__chronicle_w_labels-0: receivers: - udplog exporters: - chronicle/chronicle_w_labels- Remplacez le port et l'adresse IP selon les besoins de votre infrastructure.
- Remplacez
<CUSTOMER_ID>par le numéro client réel. - Mettez à jour
/path/to/ingestion-authentication-file.jsonen indiquant le chemin d'accès où le fichier d'authentification a été enregistré dans la section Obtenir le fichier d'authentification pour l'ingestion Google SecOps.
Redémarrez l'agent Bindplane pour appliquer les modifications.
Pour redémarrer l'agent Bindplane sous Linux, exécutez la commande suivante :
sudo systemctl restart bindplane-agentPour redémarrer l'agent Bindplane sous Windows, vous pouvez utiliser la console Services ou saisir la commande suivante :
net stop BindPlaneAgent && net start BindPlaneAgent
Configurer le transfert Syslog sur Apache Hadoop
Apache Hadoop utilise Log4j pour la journalisation. Configurez l'appender Syslog approprié en fonction de votre version de Log4j afin que les daemons Hadoop (NameNode, DataNode, ResourceManager, NodeManager, etc.) transfèrent les journaux directement vers votre récepteur syslog (hôte Bindplane). Log4j est configuré via des fichiers (pas d'interface utilisateur Web).
Option 1 : Configuration de Log4j 1.x
- Recherchez le fichier log4j.properties (généralement dans
$HADOOP_CONF_DIR/log4j.properties). Ajoutez la configuration SyslogAppender suivante au fichier :
# Syslog appender (UDP example) log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender log4j.appender.SYSLOG.SyslogHost=<BINDPLANE_HOST_IP>:514 log4j.appender.SYSLOG.Facility=LOCAL0 log4j.appender.SYSLOG.FacilityPrinting=true log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout log4j.appender.SYSLOG.layout.ConversionPattern=%d{ISO8601} level=%p logger=%c thread=%t msg=%m%n # Example: send NameNode logs to syslog log4j.logger.org.apache.hadoop.hdfs.server.namenode=INFO,SYSLOG log4j.additivity.org.apache.hadoop.hdfs.server.namenode=false # Or attach to root logger to send all Hadoop logs # log4j.rootLogger=INFO, SYSLOGRemplacez
<BINDPLANE_HOST_IP>par l'adresse IP de votre hôte Bindplane.Enregistrez le fichier.
Redémarrez les daemons Hadoop pour appliquer les modifications de configuration.
Option 2 : Configuration Log4j 2.x
- Recherchez le fichier log4j2.xml (généralement dans
$HADOOP_CONF_DIR/log4j2.xml). Ajoutez la configuration Syslog appender suivante au fichier :
<Configuration status="WARN"> <Appenders> <!-- UDP example; for TCP use protocol="TCP" --> <Syslog name="SYSLOG" format="RFC5424" host="<BINDPLANE_HOST_IP>" port="514" protocol="UDP" facility="LOCAL0" appName="hadoop" enterpriseNumber="18060" mdcId="mdc"> <PatternLayout pattern="%d{ISO8601} level=%p logger=%c thread=%t msg=%m %X%n"/> </Syslog> </Appenders> <Loggers> <!-- Send NameNode logs to syslog --> <Logger name="org.apache.hadoop.hdfs.server.namenode" level="info" additivity="false"> <AppenderRef ref="SYSLOG"/> </Logger> <!-- Or send all Hadoop logs --> <Root level="info"> <AppenderRef ref="SYSLOG"/> </Root> </Loggers> </Configuration>- Remplacez
<BINDPLANE_HOST_IP>par l'adresse IP de votre hôte Bindplane.
- Remplacez
Enregistrez le fichier.
Redémarrez les daemons Hadoop pour appliquer les modifications de configuration.
Table de mappage UDM
| Champ de journal | Mappage UDM | Logique |
|---|---|---|
| Autorisés | security_result.action | Si la valeur est "false", l'action est "BLOCK". Si la valeur est "true", l'action est "ALLOW". |
| auth_type | additional.fields.key = "auth_type", additional.fields.value.string_value | Extrait du champ "ugi" à l'aide du modèle Grok "%{DATA:suser}@.*auth:%{WORD:auth_type}". Les parenthèses et "auth:" sont supprimés. |
| call | additional.fields.key = "Call#", additional.fields.value.string_value | Mappé directement. |
| call_context | additional.fields.key = "callerContext", additional.fields.value.string_value | Mappé directement. |
| cliIP | principal.ip | Mappé uniquement lorsque le champ "json_data" existe et est correctement analysé au format JSON. |
| cmd | principal.process.command_line | Mappé directement. |
| cluster_name | target.hostname | Utilisé comme nom d'hôte cible, le cas échéant. |
| jour | metadata.event_timestamp.seconds | Utilisé avec le mois, l'année, les heures, les minutes et les secondes pour construire event_timestamp. |
| description | metadata.description | Mappé directement. |
| conducteur | additional.fields.key = "driver", additional.fields.value.string_value | Mappé directement. |
| dst | target.ip OR target.hostname OR target.file.full_path | Si l'adresse est correctement analysée en tant qu'adresse IP, elle est mappée sur l'adresse IP cible. Si la valeur commence par "/user", elle est mappée au chemin d'accès du fichier cible. Sinon, il est mappé au nom d'hôte cible. |
| dstport | target.port | Mappé directement et converti en entier. |
| Enforcer | security_result.rule_name | Mappé directement. |
| event_count | additional.fields.key = "event_count", additional.fields.value.string_value | Mappé directement et converti en chaîne. |
| fname | src.file.full_path | Mappé directement. |
| heures | metadata.event_timestamp.seconds | Utilisé avec le mois, le jour, l'année, les minutes et les secondes pour construire event_timestamp. |
| id | additional.fields.key = "id", additional.fields.value.string_value | Mappé directement. |
| ip | principal.ip | Mappé à l'adresse IP principale après suppression de tout caractère "/" en début de chaîne. |
| json_data | Analysé au format JSON. Les champs extraits sont mappés sur les champs UDM correspondants. | |
| logType | additional.fields.key = "logType", additional.fields.value.string_value | Mappé directement. |
| message | Utilisé pour extraire différents champs à l'aide de modèles Grok. | |
| méthode | network.http.method | Mappé directement. |
| minutes | metadata.event_timestamp.seconds | Utilisé avec le mois, le jour, l'année, les heures et les secondes pour construire event_timestamp. |
| mois | metadata.event_timestamp.seconds | Utilisé avec le jour, l'année, les heures, les minutes et les secondes pour construire event_timestamp. |
| observateur | observer.hostname OU observer.ip | Si l'adresse IP est analysée avec succès, elle est mappée sur l'adresse IP de l'observateur. Sinon, mappé au nom d'hôte de l'observateur. |
| perm | additional.fields.key = "perm", additional.fields.value.string_value | Mappé directement. |
| stratégie | security_result.rule_id | Mappé directement et converti en chaîne. |
| produit | metadata.product_name | Mappé directement. |
| product_event | metadata.product_event_type | Mappé directement. Si la valeur est "rename", le champ "dst" est mappé sur "target.file.full_path". |
| proto | network.application_protocol | Directement mappé et converti en majuscules s'il ne s'agit pas de "webhdfs". |
| reason | security_result.summary | Mappé directement. |
| dépôt | additional.fields.key = "repo", additional.fields.value.string_value | Mappé directement. |
| resType | additional.fields.key = "resType", additional.fields.value.string_value | Mappé directement. |
| résultat | additional.fields.key = "result", additional.fields.value.string_value | Mappé directement et converti en chaîne. |
| Réessayer | additional.fields.key = "Retry#", additional.fields.value.string_value | Mappé directement. |
| seconds | metadata.event_timestamp.seconds | Utilisé avec le mois, le jour, l'année, les heures et les minutes pour construire event_timestamp. |
| seq_num | additional.fields.key = "seq_num", additional.fields.value.string_value | Mappé directement et converti en chaîne. |
| de gravité, | security_result.severity | Mappé à différents niveaux de gravité en fonction de la valeur : "INFO", "Info", "info" → "INFORMATIONAL" ; "Low", "low", "LOW" → "LOW" ; "error", "Error", "WARN", "Warn" → "MEDIUM" ; "High", "high", "HIGH" → "HIGH" ; "Critical", "critical", "CRITICAL" → "CRITICAL". |
| shost | principal.hostname | Utilisé comme nom d'hôte principal s'il est différent de "src". |
| src | principal.ip OU principal.hostname OU observer.ip | Si l'adresse IP a été analysée avec succès, elle est mappée sur l'adresse IP du compte principal et de l'observateur. Sinon, mappé au nom d'hôte principal. |
| srcport | principal.port | Mappé directement et converti en entier. |
| résumé | security_result.summary | Mappé directement. |
| suser | principal.user.userid | Mappé directement. |
| tags | additional.fields.key = "tags", additional.fields.value.string_value | Mappé directement. |
| fil de discussion | additional.fields.key = "thread", additional.fields.value.string_value | Mappé directement. |
| tip | target.ip | Mappé directement. |
| ugi | target.hostname | Utilisé comme nom d'hôte cible si le champ"log_data" ne contient pas "·". |
| url | target.url | Mappé directement. |
| vendor | metadata.vendor_name | Mappé directement. |
| version | metadata.product_version | Mappé directement. |
| an | metadata.event_timestamp.seconds | Utilisé avec le mois, le jour, les heures, les minutes et les secondes pour construire event_timestamp. |
| N/A | metadata.event_type | La valeur par défaut est "NETWORK_CONNECTION". La valeur est remplacée par "STATUS_UPDATE" si aucune cible n'est identifiée. |
| N/A | metadata.log_type | Définissez-le sur "HADOOP". |
| N/A | security_result.alert_state | Définie sur "ALERTING" si la gravité est "HIGH" ou "CRITICAL". |
| N/A | is_alert | Définissez sur "true" si la gravité est "HIGH" (ÉLEVÉE) ou "CRITICAL" (CRITIQUE). |
| N/A | is_significant | Définissez sur "true" si la gravité est "HIGH" (ÉLEVÉE) ou "CRITICAL" (CRITIQUE). |
Vous avez encore besoin d'aide ? Obtenez des réponses de membres de la communauté et de professionnels Google SecOps.