Collecter les journaux Elasticsearch
Ce document explique comment ingérer des journaux Elasticsearch dans Google Security Operations à l'aide d'Amazon S3. L'analyseur transforme les journaux bruts au format JSON en un modèle de données unifié (UDM). Il extrait les champs des structures JSON imbriquées, les mappe aux champs UDM et enrichit les données avec un contexte pertinent pour la sécurité, comme les niveaux de gravité et les rôles utilisateur.
Avant de commencer
- Une instance Google SecOps
- Accès privilégié à l'administration du cluster Elasticsearch
- Accès privilégié à AWS (S3, IAM, EC2)
- Instance EC2 ou hôte persistant pour exécuter Logstash
Obtenir les conditions préalables d'Elasticsearch
- Connectez-vous à votre cluster Elasticsearch en tant qu'administrateur.
- Vérifiez que votre abonnement Elasticsearch inclut les fonctionnalités de sécurité (requises pour la journalisation des audits).
- Notez le nom et la version de votre cluster Elasticsearch pour référence.
- Identifiez le chemin d'accès où les journaux d'audit seront écrits (par défaut :
$ES_HOME/logs/<clustername>_audit.json
).
Activer la journalisation d'audit Elasticsearch
- Sur chaque nœud Elasticsearch, modifiez le fichier de configuration elasticsearch.yml.
Ajoutez le paramètre suivant :
xpack.security.audit.enabled: true
Effectuez un redémarrage progressif du cluster pour appliquer les modifications :
- Désactiver l'attribution des partitions :
PUT _cluster/settings {"persistent": {"cluster.routing.allocation.enable": "primaries"}}
- Arrêtez et redémarrez chaque nœud un par un.
- Réactivez l'attribution des partitions :
PUT _cluster/settings {"persistent": {"cluster.routing.allocation.enable": null}}
- Désactiver l'attribution des partitions :
Vérifiez que les journaux d'audit sont générés dans le répertoire des journaux, à l'adresse
<clustername>_audit.json
.
Configurer un bucket AWS S3 et IAM pour Google SecOps
- Créez un bucket Amazon S3 en suivant ce guide de l'utilisateur : Créer un bucket.
- Enregistrez le Nom et la Région du bucket pour référence ultérieure (par exemple,
elastic-search-logs
). - Créez un utilisateur en suivant ce guide de l'utilisateur : Créer un utilisateur IAM.
- Sélectionnez l'utilisateur créé.
- Sélectionnez l'onglet Informations d'identification de sécurité.
- Cliquez sur Créer une clé d'accès dans la section Clés d'accès.
- Sélectionnez Service tiers comme Cas d'utilisation.
- Cliquez sur Suivant.
- Facultatif : Ajoutez un tag de description.
- Cliquez sur Créer une clé d'accès.
- Cliquez sur Télécharger le fichier CSV pour enregistrer la clé d'accès et la clé d'accès secrète pour référence ultérieure.
- Cliquez sur OK.
- Sélectionnez l'onglet Autorisations.
- Cliquez sur Ajouter des autorisations dans la section Règles relatives aux autorisations.
- Sélectionnez Ajouter des autorisations.
- Sélectionnez Joindre directement des règles.
- Recherchez la règle AmazonS3FullAccess.
- Sélectionnez la règle.
- Cliquez sur Suivant.
- Cliquez sur Ajouter des autorisations.
Configurer Logstash pour envoyer les journaux d'audit à S3
- Installez Logstash sur une instance EC2 ou un hôte persistant pouvant accéder aux fichiers journaux d'audit Elasticsearch.
Installez le plug-in de sortie S3 s'il n'est pas déjà présent :
bin/logstash-plugin install logstash-output-s3
Créez un fichier de configuration Logstash (
elastic-to-s3.conf
) :input { file { path => "/path/to/elasticsearch/logs/*_audit.json" start_position => "beginning" codec => "json" # audit file: 1 JSON object per line sincedb_path => "/var/lib/logstash/sincedb_elastic_search" exclude => ["*.gz"] } } filter { # Intentionally minimal: do NOT reshape audit JSON the ELASTIC_SEARCH parser expects. # If you must add metadata for ops, put it under [@metadata] so it won't be written. # ruby { code => "event.set('[@metadata][ingested_at]', Time.now.utc.iso8601)" } } output { s3 { access_key_id => "YOUR_AWS_ACCESS_KEY" secret_access_key => "YOUR_AWS_SECRET_KEY" region => "us-east-1" bucket => "elastic-search-logs" prefix => "logs/%{+YYYY}/%{+MM}/%{+dd}/" codec => "json_lines" # NDJSON output (1 JSON object per line) encoding => "gzip" # compress objects server_side_encryption => true # Optionally for KMS: # server_side_encryption_kms_key_id => "arn:aws:kms:REGION:ACCT:key/KEY_ID" size_file => 104857600 # 100MB rotation time_file => 300 # 5 min rotation } }
Démarrez Logstash avec la configuration :
bin/logstash -f elastic-to-s3.conf
Facultatif : Créer un utilisateur IAM en lecture seule pour Google SecOps
- Accédez à la console AWS> IAM> Utilisateurs> Ajouter des utilisateurs.
- Cliquez sur Add users (Ajouter des utilisateurs).
- Fournissez les informations de configuration suivantes :
- Utilisateur : saisissez
secops-reader
. - Type d'accès : sélectionnez Clé d'accès – Accès programmatique.
- Utilisateur : saisissez
- Cliquez sur Créer un utilisateur.
- Associez une stratégie de lecture minimale (personnalisée) : Utilisateurs > secops-reader > Autorisations > Ajouter des autorisations > Associer des stratégies directement > Créer une stratégie.
Dans l'éditeur JSON, saisissez la stratégie suivante :
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::elastic-search-logs/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::elastic-search-logs" } ] }
Définissez le nom sur
secops-reader-policy
.Accédez à Créer une règle > recherchez/sélectionnez > Suivant > Ajouter des autorisations.
Accédez à Identifiants de sécurité > Clés d'accès > Créer une clé d'accès.
Téléchargez le CSV (ces valeurs sont saisies dans le flux).
Configurer un flux dans Google SecOps pour ingérer les journaux Elasticsearch
- Accédez à Paramètres SIEM> Flux.
- Cliquez sur + Ajouter un flux.
- Dans le champ Nom du flux, saisissez un nom pour le flux (par exemple,
Elasticsearch Logs
). - Sélectionnez Amazon S3 V2 comme type de source.
- Sélectionnez Elastic Search comme type de journal.
- Cliquez sur Suivant.
- Spécifiez les valeurs des paramètres d'entrée suivants :
- URI S3 :
s3://elastic-search-logs/logs/
- Options de suppression de la source : sélectionnez l'option de suppression de votre choix.
- Âge maximal des fichiers : incluez les fichiers modifiés au cours des derniers jours. La valeur par défaut est de 180 jours.
- ID de clé d'accès : clé d'accès utilisateur ayant accès au bucket S3.
- Clé d'accès secrète : clé secrète de l'utilisateur ayant accès au bucket S3.
- Espace de noms de l'élément : espace de noms de l'élément.
- Libellés d'ingestion : libellé appliqué aux événements de ce flux.
- URI S3 :
- Cliquez sur Suivant.
- Vérifiez la configuration de votre nouveau flux sur l'écran Finaliser, puis cliquez sur Envoyer.
Table de mappage UDM
Champ du journal | Mappage UDM | Logique |
---|---|---|
Niveau | security_result.severity | La logique vérifie la valeur du champ "Niveau" et la mappe au niveau de gravité UDM correspondant : : "INFO", "ALL", "OFF", "TRACE", "DEBUG" sont mappés sur "INFORMATIONAL". : "WARN" est associé à "LOW". : "ERROR" est associé à "ERROR". : "FATAL" est mappé sur "CRITICAL". |
message.@timestamp | timestamp | L'horodatage est analysé à partir du champ "@timestamp" dans le champ "message" du journal brut, au format "aaaa-MM-jjTHH:mm:ss.SSS". |
message.action | security_result.action_details | La valeur est extraite du champ "action" dans le champ "message" du journal brut. |
message.event.action | security_result.summary | La valeur est extraite du champ "event.action" dans le champ "message" du journal brut. |
message.event.type | metadata.product_event_type | La valeur est extraite du champ "event.type" dans le champ "message" du journal brut. |
message.host.ip | target.ip | La valeur est extraite du champ "host.ip" dans le champ "message" du journal brut. |
message.host.name | target.hostname | La valeur est extraite du champ "host.name" dans le champ "message" du journal brut. |
message.indices | target.labels.value | La valeur est extraite du champ "indices" dans le champ "message" du journal brut. |
message.mrId | target.hostname | La valeur est extraite du champ "mrId" dans le champ "message" du journal brut. |
message.node.id | principal.asset.product_object_id | La valeur est extraite du champ "node.id" dans le champ "message" du journal brut. |
message.node.name | target.asset.hostname | La valeur est extraite du champ "node.name" dans le champ "message" du journal brut. |
message.origin.address | principal.ip | L'adresse IP est extraite du champ "origin.address" dans le champ "message" du journal brut, en supprimant le numéro de port. |
message.origin.type | principal.resource.resource_subtype | La valeur est extraite du champ "origin.type" dans le champ "message" du journal brut. |
message.properties.host_group | principal.hostname | La valeur est extraite du champ "properties.host_group" dans le champ "message" du journal brut. |
message.properties.host_group | target.group.group_display_name | La valeur est extraite du champ "properties.host_group" dans le champ "message" du journal brut. |
message.request.id | target.resource.product_object_id | La valeur est extraite du champ "request.id" dans le champ "message" du journal brut. |
message.request.name | target.resource.name | La valeur est extraite du champ "request.name" dans le champ "message" du journal brut. |
message.user.name | principal.user.userid | La valeur est extraite du champ "user.name" dans le champ "message" du journal brut. |
message.user.realm | principal.user.attribute.permissions.name | La valeur est extraite du champ "user.realm" dans le champ "message" du journal brut. |
message.user.roles | about.user.attribute.roles.name | La valeur est extraite du champ "user.roles" dans le champ "message" du journal brut. |
metadata.event_type | Valeur codée en dur : "USER_RESOURCE_ACCESS" | |
metadata.log_type | Valeur codée en dur : "ELASTIC_SEARCH" | |
metadata.product_name | Valeur codée en dur : "ELASTICSEARCH" | |
metadata.vendor_name | Valeur codée en dur : "ELASTIC" | |
principal.port | Le numéro de port est extrait du champ "origin.address" dans le champ "message" du journal brut. | |
target.labels.key | Valeur codée en dur : "Indice" |
Vous avez encore besoin d'aide ? Obtenez des réponses de membres de la communauté et de professionnels Google SecOps.