Collecter les journaux Elasticsearch

Compatible avec :

Ce document explique comment ingérer des journaux Elasticsearch dans Google Security Operations à l'aide d'Amazon S3. L'analyseur transforme les journaux bruts au format JSON en un modèle de données unifié (UDM). Il extrait les champs des structures JSON imbriquées, les mappe aux champs UDM et enrichit les données avec un contexte pertinent pour la sécurité, comme les niveaux de gravité et les rôles utilisateur.

Avant de commencer

  • Une instance Google SecOps
  • Accès privilégié à l'administration du cluster Elasticsearch
  • Accès privilégié à AWS (S3, IAM, EC2)
  • Instance EC2 ou hôte persistant pour exécuter Logstash

Obtenir les conditions préalables d'Elasticsearch

  1. Connectez-vous à votre cluster Elasticsearch en tant qu'administrateur.
  2. Vérifiez que votre abonnement Elasticsearch inclut les fonctionnalités de sécurité (requises pour la journalisation des audits).
  3. Notez le nom et la version de votre cluster Elasticsearch pour référence.
  4. Identifiez le chemin d'accès où les journaux d'audit seront écrits (par défaut : $ES_HOME/logs/<clustername>_audit.json).

Activer la journalisation d'audit Elasticsearch

  1. Sur chaque nœud Elasticsearch, modifiez le fichier de configuration elasticsearch.yml.
  2. Ajoutez le paramètre suivant :

    xpack.security.audit.enabled: true
    
  3. Effectuez un redémarrage progressif du cluster pour appliquer les modifications :

    • Désactiver l'attribution des partitions : PUT _cluster/settings {"persistent": {"cluster.routing.allocation.enable": "primaries"}}
    • Arrêtez et redémarrez chaque nœud un par un.
    • Réactivez l'attribution des partitions : PUT _cluster/settings {"persistent": {"cluster.routing.allocation.enable": null}}
  4. Vérifiez que les journaux d'audit sont générés dans le répertoire des journaux, à l'adresse <clustername>_audit.json.

Configurer un bucket AWS S3 et IAM pour Google SecOps

  1. Créez un bucket Amazon S3 en suivant ce guide de l'utilisateur : Créer un bucket.
  2. Enregistrez le Nom et la Région du bucket pour référence ultérieure (par exemple, elastic-search-logs).
  3. Créez un utilisateur en suivant ce guide de l'utilisateur : Créer un utilisateur IAM.
  4. Sélectionnez l'utilisateur créé.
  5. Sélectionnez l'onglet Informations d'identification de sécurité.
  6. Cliquez sur Créer une clé d'accès dans la section Clés d'accès.
  7. Sélectionnez Service tiers comme Cas d'utilisation.
  8. Cliquez sur Suivant.
  9. Facultatif : Ajoutez un tag de description.
  10. Cliquez sur Créer une clé d'accès.
  11. Cliquez sur Télécharger le fichier CSV pour enregistrer la clé d'accès et la clé d'accès secrète pour référence ultérieure.
  12. Cliquez sur OK.
  13. Sélectionnez l'onglet Autorisations.
  14. Cliquez sur Ajouter des autorisations dans la section Règles relatives aux autorisations.
  15. Sélectionnez Ajouter des autorisations.
  16. Sélectionnez Joindre directement des règles.
  17. Recherchez la règle AmazonS3FullAccess.
  18. Sélectionnez la règle.
  19. Cliquez sur Suivant.
  20. Cliquez sur Ajouter des autorisations.

Configurer Logstash pour envoyer les journaux d'audit à S3

  1. Installez Logstash sur une instance EC2 ou un hôte persistant pouvant accéder aux fichiers journaux d'audit Elasticsearch.
  2. Installez le plug-in de sortie S3 s'il n'est pas déjà présent :

    bin/logstash-plugin install logstash-output-s3
    
  3. Créez un fichier de configuration Logstash (elastic-to-s3.conf) :

    input {
      file {
        path => "/path/to/elasticsearch/logs/*_audit.json"
        start_position => "beginning"
        codec => "json"              # audit file: 1 JSON object per line
        sincedb_path => "/var/lib/logstash/sincedb_elastic_search"
        exclude => ["*.gz"]
      }
    }
    
    filter {
      # Intentionally minimal: do NOT reshape audit JSON the ELASTIC_SEARCH parser expects.
      # If you must add metadata for ops, put it under [@metadata] so it won't be written.
      # ruby { code => "event.set('[@metadata][ingested_at]', Time.now.utc.iso8601)" }
    }
    
    output {
      s3 {
        access_key_id => "YOUR_AWS_ACCESS_KEY"
        secret_access_key => "YOUR_AWS_SECRET_KEY"
        region => "us-east-1"
        bucket => "elastic-search-logs"
        prefix => "logs/%{+YYYY}/%{+MM}/%{+dd}/"
    
        codec => "json_lines"        # NDJSON output (1 JSON object per line)
        encoding => "gzip"           # compress objects
    
        server_side_encryption => true
        # Optionally for KMS:
        # server_side_encryption_kms_key_id => "arn:aws:kms:REGION:ACCT:key/KEY_ID"
    
        size_file => 104857600       # 100MB rotation
        time_file => 300             # 5 min rotation
      }
    }
    
  4. Démarrez Logstash avec la configuration :

    bin/logstash -f elastic-to-s3.conf
    

Facultatif : Créer un utilisateur IAM en lecture seule pour Google SecOps

  1. Accédez à la console AWS> IAM> Utilisateurs> Ajouter des utilisateurs.
  2. Cliquez sur Add users (Ajouter des utilisateurs).
  3. Fournissez les informations de configuration suivantes :
    • Utilisateur : saisissez secops-reader.
    • Type d'accès : sélectionnez Clé d'accès – Accès programmatique.
  4. Cliquez sur Créer un utilisateur.
  5. Associez une stratégie de lecture minimale (personnalisée) : Utilisateurs > secops-reader > Autorisations > Ajouter des autorisations > Associer des stratégies directement > Créer une stratégie.
  6. Dans l'éditeur JSON, saisissez la stratégie suivante :

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::elastic-search-logs/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::elastic-search-logs"
        }
      ]
    }
    
  7. Définissez le nom sur secops-reader-policy.

  8. Accédez à Créer une règle > recherchez/sélectionnez > Suivant > Ajouter des autorisations.

  9. Accédez à Identifiants de sécurité > Clés d'accès > Créer une clé d'accès.

  10. Téléchargez le CSV (ces valeurs sont saisies dans le flux).

Configurer un flux dans Google SecOps pour ingérer les journaux Elasticsearch

  1. Accédez à Paramètres SIEM> Flux.
  2. Cliquez sur + Ajouter un flux.
  3. Dans le champ Nom du flux, saisissez un nom pour le flux (par exemple, Elasticsearch Logs).
  4. Sélectionnez Amazon S3 V2 comme type de source.
  5. Sélectionnez Elastic Search comme type de journal.
  6. Cliquez sur Suivant.
  7. Spécifiez les valeurs des paramètres d'entrée suivants :
    • URI S3 : s3://elastic-search-logs/logs/
    • Options de suppression de la source : sélectionnez l'option de suppression de votre choix.
    • Âge maximal des fichiers : incluez les fichiers modifiés au cours des derniers jours. La valeur par défaut est de 180 jours.
    • ID de clé d'accès : clé d'accès utilisateur ayant accès au bucket S3.
    • Clé d'accès secrète : clé secrète de l'utilisateur ayant accès au bucket S3.
    • Espace de noms de l'élément : espace de noms de l'élément.
    • Libellés d'ingestion : libellé appliqué aux événements de ce flux.
  8. Cliquez sur Suivant.
  9. Vérifiez la configuration de votre nouveau flux sur l'écran Finaliser, puis cliquez sur Envoyer.

Table de mappage UDM

Champ du journal Mappage UDM Logique
Niveau security_result.severity La logique vérifie la valeur du champ "Niveau" et la mappe au niveau de gravité UDM correspondant :
 : "INFO", "ALL", "OFF", "TRACE", "DEBUG" sont mappés sur "INFORMATIONAL".
 : "WARN" est associé à "LOW".
 : "ERROR" est associé à "ERROR".
 : "FATAL" est mappé sur "CRITICAL".
message.@timestamp timestamp L'horodatage est analysé à partir du champ "@timestamp" dans le champ "message" du journal brut, au format "aaaa-MM-jjTHH:mm:ss.SSS".
message.action security_result.action_details La valeur est extraite du champ "action" dans le champ "message" du journal brut.
message.event.action security_result.summary La valeur est extraite du champ "event.action" dans le champ "message" du journal brut.
message.event.type metadata.product_event_type La valeur est extraite du champ "event.type" dans le champ "message" du journal brut.
message.host.ip target.ip La valeur est extraite du champ "host.ip" dans le champ "message" du journal brut.
message.host.name target.hostname La valeur est extraite du champ "host.name" dans le champ "message" du journal brut.
message.indices target.labels.value La valeur est extraite du champ "indices" dans le champ "message" du journal brut.
message.mrId target.hostname La valeur est extraite du champ "mrId" dans le champ "message" du journal brut.
message.node.id principal.asset.product_object_id La valeur est extraite du champ "node.id" dans le champ "message" du journal brut.
message.node.name target.asset.hostname La valeur est extraite du champ "node.name" dans le champ "message" du journal brut.
message.origin.address principal.ip L'adresse IP est extraite du champ "origin.address" dans le champ "message" du journal brut, en supprimant le numéro de port.
message.origin.type principal.resource.resource_subtype La valeur est extraite du champ "origin.type" dans le champ "message" du journal brut.
message.properties.host_group principal.hostname La valeur est extraite du champ "properties.host_group" dans le champ "message" du journal brut.
message.properties.host_group target.group.group_display_name La valeur est extraite du champ "properties.host_group" dans le champ "message" du journal brut.
message.request.id target.resource.product_object_id La valeur est extraite du champ "request.id" dans le champ "message" du journal brut.
message.request.name target.resource.name La valeur est extraite du champ "request.name" dans le champ "message" du journal brut.
message.user.name principal.user.userid La valeur est extraite du champ "user.name" dans le champ "message" du journal brut.
message.user.realm principal.user.attribute.permissions.name La valeur est extraite du champ "user.realm" dans le champ "message" du journal brut.
message.user.roles about.user.attribute.roles.name La valeur est extraite du champ "user.roles" dans le champ "message" du journal brut.
metadata.event_type Valeur codée en dur : "USER_RESOURCE_ACCESS"
metadata.log_type Valeur codée en dur : "ELASTIC_SEARCH"
metadata.product_name Valeur codée en dur : "ELASTICSEARCH"
metadata.vendor_name Valeur codée en dur : "ELASTIC"
principal.port Le numéro de port est extrait du champ "origin.address" dans le champ "message" du journal brut.
target.labels.key Valeur codée en dur : "Indice"

Vous avez encore besoin d'aide ? Obtenez des réponses de membres de la communauté et de professionnels Google SecOps.