Recopila registros de Elasticsearch

Compatible con:

En este documento, se explica cómo transferir registros de Elasticsearch a Google Security Operations con Amazon S3. El analizador transforma los registros sin procesar con formato JSON en un modelo de datos unificado (UDM). Extrae campos de estructuras JSON anidadas, los asigna a campos de UDM y enriquece los datos con contexto relevante para la seguridad, como niveles de gravedad y roles de usuario.

Antes de comenzar

  • Una instancia de Google SecOps
  • Acceso privilegiado a la administración del clúster de Elasticsearch
  • Acceso privilegiado a AWS (S3, IAM, EC2)
  • Instancia de EC2 o host persistente para ejecutar Logstash

Obtén los requisitos previos de Elasticsearch

  1. Accede a tu clúster de Elasticsearch como administrador.
  2. Verifica que tu suscripción a Elasticsearch incluya las funciones de seguridad (obligatorio para el registro de auditoría).
  3. Anota el nombre y la versión de tu clúster de Elasticsearch para consultarlos.
  4. Identifica la ruta de acceso en la que se escribirán los registros de auditoría (predeterminada: $ES_HOME/logs/<clustername>_audit.json).

Habilita el registro de auditoría de Elasticsearch

  1. En cada nodo de Elasticsearch, edita el archivo de configuración elasticsearch.yml.
  2. Agrega el siguiente parámetro de configuración:

    xpack.security.audit.enabled: true
    
  3. Realiza un reinicio progresivo del clúster para aplicar los cambios:

    • Inhabilita la asignación de fragmentos: PUT _cluster/settings {"persistent": {"cluster.routing.allocation.enable": "primaries"}}
    • Detén y reinicia cada nodo de a uno por vez.
    • Vuelve a habilitar la asignación de fragmentos: PUT _cluster/settings {"persistent": {"cluster.routing.allocation.enable": null}}
  4. Verifica que los registros de auditoría se generen en <clustername>_audit.json en el directorio de registros.

Configura el bucket de AWS S3 y el IAM para Google SecOps

  1. Crea un bucket de Amazon S3 siguiendo esta guía del usuario: Crea un bucket
  2. Guarda el Nombre y la Región del bucket para futuras referencias (por ejemplo, elastic-search-logs).
  3. Crea un usuario siguiendo esta guía del usuario: Cómo crear un usuario de IAM.
  4. Selecciona el usuario creado.
  5. Selecciona la pestaña Credenciales de seguridad.
  6. Haz clic en Crear clave de acceso en la sección Claves de acceso.
  7. Selecciona Servicio de terceros como Caso de uso.
  8. Haz clic en Siguiente.
  9. Opcional: Agrega una etiqueta de descripción.
  10. Haz clic en Crear clave de acceso.
  11. Haz clic en Descargar archivo CSV para guardar la clave de acceso y la clave de acceso secreta para consultarlas en el futuro.
  12. Haz clic en Listo.
  13. Selecciona la pestaña Permisos.
  14. Haz clic en Agregar permisos en la sección Políticas de permisos.
  15. Selecciona Agregar permisos.
  16. Selecciona Adjuntar políticas directamente.
  17. Busca la política AmazonS3FullAccess.
  18. Selecciona la política.
  19. Haz clic en Siguiente.
  20. Haz clic en Agregar permisos.

Configura Logstash para enviar registros de auditoría a S3

  1. Instala Logstash en una instancia de EC2 o en un host persistente que pueda acceder a los archivos de registro de auditoría de Elasticsearch.
  2. Instala el complemento de salida de S3 si aún no está presente:

    bin/logstash-plugin install logstash-output-s3
    
  3. Crea un archivo de configuración de Logstash (elastic-to-s3.conf):

    input {
      file {
        path => "/path/to/elasticsearch/logs/*_audit.json"
        start_position => "beginning"
        codec => "json"              # audit file: 1 JSON object per line
        sincedb_path => "/var/lib/logstash/sincedb_elastic_search"
        exclude => ["*.gz"]
      }
    }
    
    filter {
      # Intentionally minimal: do NOT reshape audit JSON the ELASTIC_SEARCH parser expects.
      # If you must add metadata for ops, put it under [@metadata] so it won't be written.
      # ruby { code => "event.set('[@metadata][ingested_at]', Time.now.utc.iso8601)" }
    }
    
    output {
      s3 {
        access_key_id => "YOUR_AWS_ACCESS_KEY"
        secret_access_key => "YOUR_AWS_SECRET_KEY"
        region => "us-east-1"
        bucket => "elastic-search-logs"
        prefix => "logs/%{+YYYY}/%{+MM}/%{+dd}/"
    
        codec => "json_lines"        # NDJSON output (1 JSON object per line)
        encoding => "gzip"           # compress objects
    
        server_side_encryption => true
        # Optionally for KMS:
        # server_side_encryption_kms_key_id => "arn:aws:kms:REGION:ACCT:key/KEY_ID"
    
        size_file => 104857600       # 100MB rotation
        time_file => 300             # 5 min rotation
      }
    }
    
  4. Inicia Logstash con la configuración:

    bin/logstash -f elastic-to-s3.conf
    

Opcional: Crea un usuario de IAM con acceso de solo lectura para Google SecOps

  1. Ve a Consola de AWS > IAM > Usuarios > Agregar usuarios.
  2. Haz clic en Agregar usuarios.
  3. Proporciona los siguientes detalles de configuración:
    • Usuario: Ingresa secops-reader.
    • Tipo de acceso: Selecciona Clave de acceso: Acceso programático.
  4. Haz clic en Crear usuario.
  5. Adjunta una política de lectura mínima (personalizada): Usuarios > secops-reader > Permisos > Agregar permisos > Adjuntar políticas directamente > Crear política.
  6. En el editor de JSON, ingresa la siguiente política:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::elastic-search-logs/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::elastic-search-logs"
        }
      ]
    }
    
  7. Configura el nombre como secops-reader-policy.

  8. Ve a Crear política > busca o selecciona > Siguiente > Agregar permisos.

  9. Ve a Credenciales de seguridad > Claves de acceso > Crear clave de acceso.

  10. Descarga el archivo CSV (estos valores se ingresan en el feed).

Configura un feed en Google SecOps para transferir registros de Elasticsearch

  1. Ve a Configuración de SIEM > Feeds.
  2. Haz clic en + Agregar feed nuevo.
  3. En el campo Nombre del feed, ingresa un nombre para el feed (por ejemplo, Elasticsearch Logs).
  4. Selecciona Amazon S3 V2 como el Tipo de fuente.
  5. Selecciona Elastic Search como el Tipo de registro.
  6. Haz clic en Siguiente.
  7. Especifica valores para los siguientes parámetros de entrada:
    • URI de S3: s3://elastic-search-logs/logs/
    • Opciones de borrado de la fuente: Selecciona la opción de borrado según tu preferencia.
    • Antigüedad máxima del archivo: Incluye los archivos modificados en la cantidad de días especificada. El valor predeterminado es de 180 días.
    • ID de clave de acceso: Clave de acceso del usuario con acceso al bucket de S3.
    • Clave de acceso secreta: Clave secreta del usuario con acceso al bucket de S3.
    • Espacio de nombres del recurso: Es el espacio de nombres del recurso.
    • Etiquetas de transmisión: Es la etiqueta que se aplica a los eventos de este feed.
  8. Haz clic en Siguiente.
  9. Revisa la nueva configuración del feed en la pantalla Finalizar y, luego, haz clic en Enviar.

Tabla de asignación de UDM

Campo de registro Asignación de UDM Lógica
Nivel security_result.severity La lógica verifica el valor del campo "Nivel" y lo asigna al nivel de gravedad del UDM correspondiente:
: "INFO", "ALL", "OFF", "TRACE", "DEBUG" se asignan a "INFORMATIONAL".
: "WARN" se asigna a "LOW".
: "ERROR" se asigna a "ERROR".
: "FATAL" se asigna a "CRITICAL".
message.@timestamp timestamp La marca de tiempo se analiza a partir del campo "@timestamp" dentro del campo "message" del registro sin procesar, con el formato "aaaa-MM-ddTHH:mm:ss.SSS".
message.action security_result.action_details El valor se toma del campo "action" dentro del campo "message" del registro sin procesar.
message.event.action security_result.summary El valor se toma del campo "event.action" dentro del campo "message" del registro sin procesar.
message.event.type metadata.product_event_type El valor se toma del campo "event.type" dentro del campo "message" del registro sin procesar.
message.host.ip target.ip El valor se toma del campo "host.ip" dentro del campo "message" del registro sin procesar.
message.host.name target.hostname El valor se toma del campo "host.name" dentro del campo "message" del registro sin procesar.
message.indices target.labels.value El valor se toma del campo "indices" dentro del campo "message" del registro sin procesar.
message.mrId target.hostname El valor se toma del campo "mrId" dentro del campo "message" del registro sin procesar.
message.node.id principal.asset.product_object_id El valor se toma del campo "node.id" dentro del campo "message" del registro sin procesar.
message.node.name target.asset.hostname El valor se toma del campo "node.name" dentro del campo "message" del registro sin procesar.
message.origin.address principal.ip La dirección IP se extrae del campo "origin.address" dentro del campo "message" del registro sin procesar, quitando el número de puerto.
message.origin.type principal.resource.resource_subtype El valor se toma del campo "origin.type" dentro del campo "message" del registro sin procesar.
message.properties.host_group principal.hostname El valor se toma del campo "properties.host_group" dentro del campo "message" del registro sin procesar.
message.properties.host_group target.group.group_display_name El valor se toma del campo "properties.host_group" dentro del campo "message" del registro sin procesar.
message.request.id target.resource.product_object_id El valor se toma del campo "request.id" dentro del campo "message" del registro sin procesar.
message.request.name target.resource.name El valor se toma del campo "request.name" dentro del campo "message" del registro sin procesar.
message.user.name principal.user.userid El valor se toma del campo "user.name" dentro del campo "message" del registro sin procesar.
message.user.realm principal.user.attribute.permissions.name El valor se toma del campo "user.realm" dentro del campo "message" del registro sin procesar.
message.user.roles about.user.attribute.roles.name El valor se toma del campo "user.roles" dentro del campo "message" del registro sin procesar.
metadata.event_type Valor codificado: "USER_RESOURCE_ACCESS"
metadata.log_type Valor codificado: "ELASTIC_SEARCH"
metadata.product_name Valor codificado: "ELASTICSEARCH"
metadata.vendor_name Valor codificado: "ELASTIC"
principal.port El número de puerto se extrae del campo "origin.address" dentro del campo "message" del registro sin procesar.
target.labels.key Valor codificado: "Indice"

¿Necesitas más ayuda? Obtén respuestas de miembros de la comunidad y profesionales de Google SecOps.