Recoger registros de Elasticsearch
En este documento se explica cómo ingerir registros de Elasticsearch en Google Security Operations mediante Amazon S3. El analizador transforma los registros sin procesar con formato JSON en un modelo de datos unificado (UDM). Extrae campos de estructuras JSON anidadas, los asigna a campos de UDM y enriquece los datos con contexto relevante para la seguridad, como niveles de gravedad y roles de usuario.
Antes de empezar
- Una instancia de Google SecOps
- Acceso privilegiado a la administración del clúster de Elasticsearch
- Acceso privilegiado a AWS (S3, IAM, EC2)
- Instancia de EC2 o host persistente para ejecutar Logstash
Obtener los requisitos previos de Elasticsearch
- Inicia sesión en tu clúster de Elasticsearch como administrador.
- Verifica que tu suscripción a Elasticsearch incluya las funciones de seguridad (necesarias para el registro de auditoría).
- Anota el nombre y la versión del clúster de Elasticsearch para consultarlos más adelante.
- Identifica la ruta en la que se escribirán los registros de auditoría (valor predeterminado:
$ES_HOME/logs/<clustername>_audit.json
).
Habilitar el registro de auditoría de Elasticsearch
- En cada nodo de Elasticsearch, edita el archivo de configuración elasticsearch.yml.
Añade el siguiente ajuste:
xpack.security.audit.enabled: true
Realiza un reinicio gradual del clúster para aplicar los cambios:
- Inhabilitar la asignación de particiones:
PUT _cluster/settings {"persistent": {"cluster.routing.allocation.enable": "primaries"}}
- Detén y reinicia cada nodo de uno en uno.
- Volver a habilitar la asignación de particiones:
PUT _cluster/settings {"persistent": {"cluster.routing.allocation.enable": null}}
- Inhabilitar la asignación de particiones:
Comprueba que se estén generando registros de auditoría en
<clustername>_audit.json
en el directorio de registros.
Configurar un segmento de AWS S3 y IAM para Google SecOps
- Crea un segmento de Amazon S3 siguiendo esta guía de usuario: Crear un segmento.
- Guarda el nombre y la región del segmento para consultarlos más adelante (por ejemplo,
elastic-search-logs
). - Crea un usuario siguiendo esta guía: Crear un usuario de gestión de identidades y accesos.
- Selecciona el Usuario creado.
- Selecciona la pestaña Credenciales de seguridad.
- En la sección Claves de acceso, haz clic en Crear clave de acceso.
- Selecciona Servicio de terceros en Caso práctico.
- Haz clic en Siguiente.
- Opcional: añade una etiqueta de descripción.
- Haz clic en Crear clave de acceso.
- Haz clic en Descargar archivo CSV para guardar la clave de acceso y la clave de acceso secreta para futuras consultas.
- Haz clic en Listo.
- Selecciona la pestaña Permisos.
- En la sección Políticas de permisos, haz clic en Añadir permisos.
- Selecciona Añadir permisos.
- Seleccione Adjuntar políticas directamente.
- Busca la política AmazonS3FullAccess.
- Selecciona la política.
- Haz clic en Siguiente.
- Haz clic en Añadir permisos.
Configurar Logstash para enviar registros de auditoría a S3
- Instala Logstash en una instancia EC2 o en un host persistente que pueda acceder a los archivos de registro de auditoría de Elasticsearch.
Instala el complemento de salida de S3 si aún no lo tienes:
bin/logstash-plugin install logstash-output-s3
Crea un archivo de configuración de Logstash (
elastic-to-s3.conf
):input { file { path => "/path/to/elasticsearch/logs/*_audit.json" start_position => "beginning" codec => "json" # audit file: 1 JSON object per line sincedb_path => "/var/lib/logstash/sincedb_elastic_search" exclude => ["*.gz"] } } filter { # Intentionally minimal: do NOT reshape audit JSON the ELASTIC_SEARCH parser expects. # If you must add metadata for ops, put it under [@metadata] so it won't be written. # ruby { code => "event.set('[@metadata][ingested_at]', Time.now.utc.iso8601)" } } output { s3 { access_key_id => "YOUR_AWS_ACCESS_KEY" secret_access_key => "YOUR_AWS_SECRET_KEY" region => "us-east-1" bucket => "elastic-search-logs" prefix => "logs/%{+YYYY}/%{+MM}/%{+dd}/" codec => "json_lines" # NDJSON output (1 JSON object per line) encoding => "gzip" # compress objects server_side_encryption => true # Optionally for KMS: # server_side_encryption_kms_key_id => "arn:aws:kms:REGION:ACCT:key/KEY_ID" size_file => 104857600 # 100MB rotation time_file => 300 # 5 min rotation } }
Inicia Logstash con la configuración:
bin/logstash -f elastic-to-s3.conf
Opcional: Crear un usuario de IAM de solo lectura para Google SecOps
- Ve a Consola de AWS > IAM > Usuarios > Añadir usuarios.
- Haz clic en Add users (Añadir usuarios).
- Proporcione los siguientes detalles de configuración:
- Usuario: introduce
secops-reader
. - Tipo de acceso: selecciona Clave de acceso – Acceso programático.
- Usuario: introduce
- Haz clic en Crear usuario.
- Asigna una política de lectura mínima (personalizada): Usuarios > secops-reader > Permisos > Añadir permisos > Asignar políticas directamente > Crear política.
En el editor de JSON, introduce la siguiente política:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::elastic-search-logs/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::elastic-search-logs" } ] }
Asigna el nombre
secops-reader-policy
.Ve a Crear política > busca o selecciona > Siguiente > Añadir permisos.
Ve a Credenciales de seguridad > Claves de acceso > Crear clave de acceso.
Descarga el archivo CSV (estos valores se introducen en el feed).
Configurar un feed en Google SecOps para ingerir registros de Elasticsearch
- Ve a Configuración de SIEM > Feeds.
- Haz clic en + Añadir nuevo feed.
- En el campo Nombre del feed, introduce un nombre para el feed (por ejemplo,
Elasticsearch Logs
). - Selecciona Amazon S3 V2 como Tipo de fuente.
- Seleccione Búsqueda elástica como Tipo de registro.
- Haz clic en Siguiente.
- Especifique los valores de los siguientes parámetros de entrada:
- URI de S3:
s3://elastic-search-logs/logs/
- Opciones de eliminación de la fuente: selecciona la opción de eliminación que prefieras.
- Antigüedad máxima del archivo: incluye los archivos modificados en los últimos días. El valor predeterminado es 180 días.
- ID de clave de acceso: clave de acceso de usuario con acceso al bucket de S3.
- Clave de acceso secreta: clave secreta del usuario con acceso al bucket de S3.
- Espacio de nombres de recursos: el espacio de nombres de recursos.
- Etiquetas de ingestión: la etiqueta aplicada a los eventos de este feed.
- URI de S3:
- Haz clic en Siguiente.
- Revise la configuración de la nueva fuente en la pantalla Finalizar y, a continuación, haga clic en Enviar.
Tabla de asignación de UDM
Campo de registro | Asignación de UDM | Lógica |
---|---|---|
Nivel | security_result.severity | La lógica comprueba el valor del campo "Level" y lo asigna al nivel de gravedad de UDM correspondiente: - "INFO", "ALL", "OFF", "TRACE" y "DEBUG" se asignan a "INFORMATIONAL". : "WARN" se ha asignado a "LOW". : "ERROR" se asigna a "ERROR". : "FATAL" se asigna a "CRITICAL". |
message.@timestamp | timestamp | La marca de tiempo se analiza a partir del campo "@timestamp" del campo "message" del registro sin procesar, con el formato "yyyy-MM-ddTHH:mm:ss.SSS". |
message.action | security_result.action_details | El valor se toma del campo "action" del campo "message" del registro sin procesar. |
message.event.action | security_result.summary | El valor se toma del campo "event.action" del campo "message" del registro sin procesar. |
message.event.type | metadata.product_event_type | El valor se toma del campo "event.type" del campo "message" del registro sin procesar. |
message.host.ip | target.ip | El valor se toma del campo "host.ip" del campo "message" del registro sin procesar. |
message.host.name | target.hostname | El valor se toma del campo "host.name" del campo "message" del registro sin procesar. |
message.indices | target.labels.value | El valor se toma del campo "indices" del campo "message" del registro sin procesar. |
message.mrId | target.hostname | El valor se toma del campo "mrId" del campo "message" del registro sin procesar. |
message.node.id | principal.asset.product_object_id | El valor se toma del campo "node.id" del campo "message" del registro sin procesar. |
message.node.name | target.asset.hostname | El valor se toma del campo "node.name" del campo "message" del registro sin procesar. |
message.origin.address | principal.ip | La dirección IP se extrae del campo "origin.address" del campo "message" del registro sin procesar. Para ello, se elimina el número de puerto. |
message.origin.type | principal.resource.resource_subtype | El valor se toma del campo "origin.type" del campo "message" del registro sin procesar. |
message.properties.host_group | principal.hostname | El valor se toma del campo "properties.host_group" del campo "message" del registro sin procesar. |
message.properties.host_group | target.group.group_display_name | El valor se toma del campo "properties.host_group" del campo "message" del registro sin procesar. |
message.request.id | target.resource.product_object_id | El valor se toma del campo "request.id" del campo "message" del registro sin procesar. |
message.request.name | target.resource.name | El valor se toma del campo "request.name" del campo "message" del registro sin procesar. |
message.user.name | principal.user.userid | El valor se toma del campo "user.name" del campo "message" del registro sin procesar. |
message.user.realm | principal.user.attribute.permissions.name | El valor se toma del campo "user.realm" del campo "message" del registro sin procesar. |
message.user.roles | about.user.attribute.roles.name | El valor se toma del campo "user.roles" del campo "message" del registro sin procesar. |
metadata.event_type | Valor codificado: "USER_RESOURCE_ACCESS" | |
metadata.log_type | Valor codificado: "ELASTIC_SEARCH" | |
metadata.product_name | Valor codificado: "ELASTICSEARCH" | |
metadata.vendor_name | Valor codificado: "ELASTIC" | |
principal.port | El número de puerto se extrae del campo "origin.address" del campo "message" del registro sin procesar. | |
target.labels.key | Valor codificado: "Indice" |
¿Necesitas más ayuda? Recibe respuestas de los miembros de la comunidad y de los profesionales de Google SecOps.