Integrar Apache Kafka con Google SecOps
Versión de la integración: 1.0
En este documento se explica cómo integrar Apache Kafka con Google Security Operations (Google SecOps).
Casos prácticos
La integración de Apache Kafka puede abordar los siguientes casos prácticos:
Ingestión de registros de seguridad en tiempo real: ingiere y procesa automáticamente eventos de seguridad de temas de Kafka en Google SecOps. Esto permite gestionar los registros de forma centralizada y analizarlos en tiempo real para generar alertas basadas en datos de streaming.
Automatización basada en eventos: activa runbooks automatizados en Google SecOps en función de eventos de seguridad específicos o mensajes transmitidos desde un tema de Kafka. De esta forma, se acelera la respuesta a eventos críticos, como el inicio de sesión de un usuario desde una ubicación inusual.
Enriquecimiento de la inteligencia frente a amenazas: extrae feeds de inteligencia frente a amenazas personalizados de temas de Kafka para enriquecer las alertas y los casos. De esta forma, los analistas disponen de contexto actualizado sobre los indicadores de compromiso (IOCs) y se mejora la precisión del análisis de amenazas.
Antes de empezar
Antes de configurar la integración de Apache Kafka en Google SecOps, completa los siguientes requisitos previos:
- Servidor Apache Kafka: asegúrate de tener acceso a un servidor Apache Kafka en funcionamiento con los brokers y temas de Kafka necesarios configurados.
Imagen Docker del agente remoto: al crear agentes remotos, debes usar una imagen basada en Debian. Usa la siguiente imagen para asegurarte de que es compatible:
us-docker.pkg.dev/siem-ar-public/images/agent-debian:latest
Parámetros de integración
La integración de Apache Kafka requiere los siguientes parámetros:
| Parámetro | Descripción |
|---|---|
Kafka brokers |
Obligatorio. Lista separada por comas de los brokers de Kafka a los que conectarse, con el formato
|
Use TLS for connection |
Opcional. Si se selecciona esta opción, la integración usará el cifrado TLS para la autenticación. Este parámetro requiere un certificado de autoridad de certificación (CA). No está habilitada de forma predeterminada. |
Use SASL PLAIN with TLS for connection |
Opcional. Si se selecciona esta opción, la integración usará el mecanismo de nombre de usuario y contraseña SASL PLAIN para la autenticación. Esta opción solo se admite con el cifrado TLS y requiere tanto un nombre de usuario y una contraseña de SASL como un certificado de autoridad de certificación. No está habilitada de forma predeterminada. |
CA certificate of Kafka server |
Opcional. El certificado de la AC que se usa para verificar la identidad del servidor de Kafka. Este parámetro es obligatorio si SASL está habilitado. |
Client certificate |
Opcional. El certificado del cliente para la autenticación TLS mutua con el servidor de Kafka. Este parámetro es obligatorio si se ha habilitado el protocolo TLS mutuo (mTLS). |
Client certificate key |
Opcional. La clave privada que corresponde al certificado del cliente, que se usa para la autenticación TLS mutua. Este parámetro es obligatorio si se ha habilitado el protocolo TLS mutuo (mTLS). |
Client certificate key password |
Opcional. Contraseña usada para descifrar la clave privada del certificado de cliente. Este parámetro es obligatorio si se ha habilitado el protocolo TLS mutuo (mTLS). |
SASL PLAIN Username |
Opcional. Nombre de usuario para la autenticación SASL PLAIN con brokers de Kafka. Este parámetro es obligatorio si SASL está habilitado. |
SASL PLAIN Password |
Opcional. Contraseña para la autenticación SASL PLAIN con brokers de Kafka. Este parámetro es obligatorio si SASL está habilitado. |
Para obtener instrucciones sobre cómo configurar una integración en Google SecOps, consulta Configurar integraciones.
Si es necesario, puedes hacer cambios más adelante. Después de configurar una instancia de integración, puedes usarla en los cuadernos de estrategias. Para obtener más información sobre cómo configurar y admitir varias instancias, consulta Admitir varias instancias.
Acciones
Para obtener más información sobre las acciones, consulta Responder a acciones pendientes desde Tu espacio de trabajo y Realizar una acción manual.
Ping
Usa la acción Ping para probar la conectividad con Apache Kafka.
Esta acción no se ejecuta en entidades de Google SecOps.
Entradas de acciones
Ninguno
Resultados de la acción
La acción Ping proporciona las siguientes salidas:
| Tipo de salida de la acción | Disponibilidad |
|---|---|
| Adjunto del panel de casos | No disponible |
| Enlace del panel de casos | No disponible |
| Tabla del panel de casos | No disponible |
| Tabla de enriquecimiento | No disponible |
| Resultado de JSON | No disponible |
| Mensajes de salida | Disponible |
| Resultado de la secuencia de comandos. | Disponible |
Mensajes de salida
La acción Ping puede devolver los siguientes mensajes de salida:
| Mensaje resultante | Descripción del mensaje |
|---|---|
|
La acción se ha realizado correctamente. |
Failed to connect to the Apache Kafka server!
Error is ERROR_REASON |
No se ha podido realizar la acción. Comprueba la conexión al servidor, los parámetros de entrada o las credenciales. |
Resultado de la secuencia de comandos
En la siguiente tabla se muestra el valor de la salida del resultado de la secuencia de comandos al usar la acción Ping:
| Nombre del resultado del script | Valor |
|---|---|
is_success |
True o False |
Conectores
Para obtener más información sobre cómo configurar conectores en Google SecOps, consulta el artículo Ingerir datos (conectores).
Apache Kafka - Messages Connector
Usa el conector de mensajes de Apache Kafka para recuperar mensajes de Apache Kafka.
El conector obtiene mensajes de un tema de Kafka especificado y puede procesarlos de diferentes formas en función del formato del mensaje. Si un mensaje es un objeto JSON válido, el conector extrae campos específicos para crear alertas y asignaciones. Si el mensaje es una cadena de texto sin formato, se ingiere como datos de eventos sin procesar.
El conector gestiona la asignación de gravedad, las plantillas de nombres de alertas y la generación de IDs únicos en función de los parámetros que proporciones.
Asignación de gravedad de JSON
Para asignar la gravedad de la alerta, debe especificar qué campo usa el conector de mensajes de Apache Kafka para obtener el valor de la gravedad en el parámetro Severity Mapping JSON. La respuesta del conector puede contener tipos de valores, como integer, float y string.
El conector de mensajes de Apache Kafka lee los valores integer y float y los asigna según la configuración de Google SecOps. En la siguiente tabla se muestra la asignación de los valores de integer a la gravedad en Google SecOps:
| Valor entero | Gravedad asignada |
|---|---|
100 |
Critical |
De 80 a 100 |
High |
De 60 a 80 |
Medium |
De 40 a 60 |
Low |
Menos de 40 |
Informational |
Si la respuesta contiene el valor string, el conector Pub/Sub - Mensajes requiere una configuración adicional.
Inicialmente, el valor predeterminado aparece de la siguiente manera:
{
"Default": 60
}
Si los valores necesarios para la asignación se encuentran en la clave JSON event_severity, pueden ser los siguientes:
"Malicious""Benign""Unknown"
Para analizar los valores de clave JSON de event_severity y asegurarte de que el objeto JSON tenga el formato correcto, configura el parámetro Severity Mapping JSON de la siguiente manera:
{
"event_severity": {
"Malicious": 100,
"Unknown": 60,
"Benign": -1
},
"Default": 50
}
El valor "Default" es obligatorio.
Si hay varias coincidencias para el mismo objeto JSON, el conector de mensajes de Apache Kafka prioriza la primera clave del objeto JSON.
Para trabajar con campos que contengan valores integer o float, configure la clave y una cadena vacía en el parámetro Severity Mapping JSON:
{
"Default":"60",
"integer_field": "",
"float_field": ""
}
Entradas de conectores
El conector de mensajes de Apache Kafka requiere los siguientes parámetros:
| Parámetro | Descripción |
|---|---|
Product Field Name |
Obligatorio. Nombre del campo en el que se almacena el nombre del producto. El nombre del producto influye principalmente en la asignación. Para optimizar y mejorar el proceso de asignación del conector, el valor predeterminado se resuelve en un valor de reserva al que se hace referencia desde el código. Cualquier entrada no válida de este parámetro se resuelve en un valor de reserva de forma predeterminada. El valor predeterminado es |
Event Field Name |
Obligatorio. Nombre del campo que determina el nombre del evento (subtipo). El valor predeterminado es |
Environment Field Name |
Opcional. Nombre del campo en el que se almacena el nombre del entorno. Si falta el campo de entorno, el conector usa el valor predeterminado. El valor predeterminado es |
Environment Regex Pattern |
Opcional. Patrón de expresión regular que se va a ejecutar en el valor encontrado en el campo Usa el valor predeterminado Si el patrón de expresión regular es nulo o está vacío, o si el valor del entorno es nulo, el resultado final del entorno es el entorno predeterminado. |
Script Timeout (Seconds) |
Obligatorio. El límite de tiempo de espera, en segundos, del proceso de Python que ejecuta la secuencia de comandos actual. El valor predeterminado es |
Kafka brokers |
Obligatorio. Lista separada por comas de los brokers de Kafka a los que conectarse, con el formato
|
Use TLS for connection |
Opcional. Si se selecciona esta opción, la integración usará el cifrado TLS para la autenticación. Este parámetro requiere un certificado de CA. No está habilitada de forma predeterminada. |
Use SASL PLAIN with TLS for connection |
Opcional. Si se selecciona esta opción, la integración usará el mecanismo de nombre de usuario y contraseña SASL PLAIN para la autenticación. Esta opción requiere que se proporcione un nombre de usuario y una contraseña de SASL. Solo se admite con el cifrado TLS, que requiere un certificado de CA. No está habilitada de forma predeterminada. |
CA certificate of Kafka server |
Opcional. El certificado de la AC que se usa para verificar la identidad del servidor de Kafka. |
Client certificate |
Opcional. El certificado del cliente para la autenticación TLS mutua con el servidor de Kafka. |
Client certificate key |
Opcional. La clave privada que corresponde al certificado del cliente, que se usa para la autenticación TLS mutua. |
Client certificate key password |
Opcional. Contraseña usada para descifrar la clave privada del certificado de cliente. |
SASL PLAIN Username |
Opcional. Nombre de usuario para la autenticación SASL PLAIN con brokers de Kafka. |
SASL PLAIN Password |
Opcional. Contraseña para la autenticación SASL PLAIN con brokers de Kafka. |
Topic |
Obligatorio. Tema de Kafka del que se recuperan los incidentes. |
Consumer Group ID |
Opcional. Identificador del grupo de consumidores usado al recuperar incidentes. Si no se proporciona ningún valor, se genera un ID único. |
Partitions |
Opcional. Lista CSV de particiones de las que se deben obtener los mensajes. |
Initial Offset |
Opcional. Indica dónde empieza el conector a obtener mensajes de una partición de Kafka. Puede especificar un número entero positivo para empezar en un desplazamiento concreto o usar los valores |
Poll Timeout |
Opcional. Tiempo de espera de la petición para consumir un mensaje de Kafka, en segundos. |
Case Name Template |
Opcional. Plantilla para definir un nombre de caso personalizado. El conector añade una clave Puede usar marcadores de posición con el formato FIELD_NAME, que se rellenan con los valores de cadena del primer evento. Ejemplo: |
Alert Name Template |
Obligatorio. Plantilla para definir el nombre de la alerta. Puede usar marcadores de posición con el formato FIELD_NAME, que se rellenan con los valores de cadena del primer evento. Ejemplo: Si no se proporciona ningún valor o la plantilla no es válida, el conector utiliza un nombre de alerta predeterminado. |
Rule Generator Template |
Obligatorio. Una plantilla para definir el generador de reglas. Puede usar marcadores de posición con el formato FIELD_NAME, que se rellenan con los valores de cadena del primer evento. Ejemplo: Si no se proporciona ningún valor o la plantilla no es válida, el conector usa un nombre de generador de reglas predeterminado. |
Timestamp Field |
Obligatorio. Nombre del campo del mensaje de Kafka que contiene la marca de tiempo de la alerta de Google SecOps. Si la marca de tiempo no está en formato de época de Unix, su formato debe definirse en el parámetro |
Timestamp Format |
Opcional. El formato de la marca de tiempo del mensaje, necesario para las marcas de tiempo que no sean de la época de Unix. Usa códigos de formato estándar de Python Si la marca de tiempo no está en formato de época de Unix y este parámetro no está configurado, el conector falla. |
Severity Mapping JSON |
Obligatorio. El objeto JSON que usa el conector para extraer y asignar el nivel de gravedad del mensaje a la escala de prioridad de Google SecOps. El valor predeterminado es |
Unique ID Field |
Opcional. Nombre del campo que se va a usar como identificador único del mensaje. Si no se proporciona ningún valor, el conector genera y usa un hash SHA-256 del contenido del mensaje como identificador del mensaje. |
Max Messages To Fetch |
Obligatorio. El número máximo de mensajes que procesa el conector en cada iteración. El valor predeterminado es |
Disable Overflow |
Opcional. Si se selecciona, el conector ignora el mecanismo de desbordamiento de Google SecOps. Esta opción está habilitada de forma predeterminada. |
Verify SSL |
Obligatorio. Si se selecciona esta opción, la integración valida el certificado SSL al conectarse al servidor Apache Kafka. Esta opción está habilitada de forma predeterminada. |
Proxy Server Address |
Opcional. Dirección del servidor proxy que se va a usar. |
Proxy Username |
Opcional. Nombre de usuario del proxy para autenticarse. |
Proxy Password |
Opcional. La contraseña del proxy para autenticarte. |
Reglas de conectores
El conector admite proxies.
Alertas de conectores
En la siguiente tabla se describe la asignación de campos de mensajes de Apache Kafka a campos de alertas de SecOps de Google:
| Campo de alerta de Siemplify | Campo de mensaje de Apache Kafka |
|---|---|
SourceSystemName |
Rellenado por el framework. |
TicketId |
El valor del campo de ID único o un hash SHA-256 del mensaje. |
DisplayId |
ApacheKafka_{unique id or hash}_{connector identifier} |
Name |
El valor generado por Alert Name Template. |
Reason |
N/A |
Description |
N/A |
DeviceVendor |
Codificado: Apache Kafka |
DeviceProduct |
Valor alternativo: Message |
Priority |
Se asigna a partir del parámetro Severity Mapping JSON. |
RuleGenerator |
El valor generado por Rule Generator Template. |
SourceGroupingIdentifier |
N/A |
StartTime |
Convertida del Timestamp Field. |
EndTime |
Convertida del Timestamp Field. |
Siemplify Alert - Extensions |
N/A |
Siemplify Alert - Attachments |
N/A |
Eventos del conector
A continuación, se muestra un ejemplo de evento de conector:
{
"notificationConfigName": "organizations/ORGANIZATION_ID/notificationConfigs/soar_connector_CONNECTOR_ID_toxic_notifications_config",
"finding": {
"name": "organizations/ORGANIZATION_ID/sources/SOURCE_ID/findings/FINDING_ID",
"parent": "organizations/ORGANIZATION_ID/sources/SOURCE_ID",
"resourceName": "//compute.googleapis.com/projects/PROJECT_ID/global/firewalls/FIREWALL_ID",
"state": "ACTIVE",
"category": "OPEN_NETBIOS_PORT",
"externalUri": "https://console.cloud.google.com/networking/firewalls/details/default-allow-rdp?project=PROJECT_ID",
"sourceProperties": {
"Recommendation": "Restrict the firewall rules at: https://console.cloud.google.com/networking/firewalls/details/default-allow-rdp?project=PROJECT_ID",
"ExceptionInstructions": "Add the security mark \"allow_open_netbios_port\" to the asset with a value of \"true\" to prevent this finding from being activated again.",
"Explanation": "Firewall rules that allow connections from all IP addresses on TCP ports 137-139 or UDP ports 137-139 may expose NetBIOS services to attackers.",
"ScannerName": "FIREWALL_SCANNER",
"ResourcePath": [
"projects/PROJECT_ID/",
"folders/FOLDER_ID_1/",
"folders/FOLDER_ID_2/",
"organizations/ORGANIZATION_ID/"
],
"ExposedService": "NetBIOS",
"OpenPorts": {
"TCP": [
137.0,
138.0,
139.0
],
"UDP": [
137.0,
138.0,
139.0
]
},
"compliance_standards": {
"iso": [
{
"ids": [
"A.13.1.1"
]
}
],
"pci": [
{
"ids": [
"1.2.1"
]
}
],
"nist": [
{
"ids": [
"SC-7"
]
}
]
},
"ReactivationCount": 4.0
},
"securityMarks": {
"name": "organizations/ORGANIZATION_ID/sources/SOURCE_ID/findings/FINDING_ID/securityMarks",
"marks": {
"USER_ID": "SECURITY_MARK"
}
},
"eventTime": "2024-08-30T14:44:37.973090Z",
"createTime": "2024-06-24T07:08:54.777Z",
"propertyDataTypes": {
"ResourcePath": {
"listValues": {
"propertyDataTypes": [
{
"primitiveDataType": "STRING"
}
]
}
},
"ReactivationCount": {
"primitiveDataType": "NUMBER"
},
"Explanation": {
"primitiveDataType": "STRING"
},
"ExposedService": {
"primitiveDataType": "STRING"
},
"ScannerName": {
"primitiveDataType": "STRING"
}
}
}
}
¿Necesitas más ayuda? Recibe respuestas de los miembros de la comunidad y de los profesionales de Google SecOps.