Integrar Apache Kafka con Google SecOps

Versión de la integración: 1.0

En este documento se explica cómo integrar Apache Kafka con Google Security Operations (Google SecOps).

Casos prácticos

La integración de Apache Kafka puede abordar los siguientes casos prácticos:

  • Ingestión de registros de seguridad en tiempo real: ingiere y procesa automáticamente eventos de seguridad de temas de Kafka en Google SecOps. Esto permite gestionar los registros de forma centralizada y analizarlos en tiempo real para generar alertas basadas en datos de streaming.

  • Automatización basada en eventos: activa runbooks automatizados en Google SecOps en función de eventos de seguridad específicos o mensajes transmitidos desde un tema de Kafka. De esta forma, se acelera la respuesta a eventos críticos, como el inicio de sesión de un usuario desde una ubicación inusual.

  • Enriquecimiento de la inteligencia frente a amenazas: extrae feeds de inteligencia frente a amenazas personalizados de temas de Kafka para enriquecer las alertas y los casos. De esta forma, los analistas disponen de contexto actualizado sobre los indicadores de compromiso (IOCs) y se mejora la precisión del análisis de amenazas.

Antes de empezar

Antes de configurar la integración de Apache Kafka en Google SecOps, completa los siguientes requisitos previos:

  • Servidor Apache Kafka: asegúrate de tener acceso a un servidor Apache Kafka en funcionamiento con los brokers y temas de Kafka necesarios configurados.
  • Imagen Docker del agente remoto: al crear agentes remotos, debes usar una imagen basada en Debian. Usa la siguiente imagen para asegurarte de que es compatible:

    us-docker.pkg.dev/siem-ar-public/images/agent-debian:latest
    

Parámetros de integración

La integración de Apache Kafka requiere los siguientes parámetros:

Parámetro Descripción
Kafka brokers

Obligatorio.

Lista separada por comas de los brokers de Kafka a los que conectarse, con el formato hostname:port.

Use TLS for connection

Opcional.

Si se selecciona esta opción, la integración usará el cifrado TLS para la autenticación.

Este parámetro requiere un certificado de autoridad de certificación (CA).

No está habilitada de forma predeterminada.

Use SASL PLAIN with TLS for connection

Opcional.

Si se selecciona esta opción, la integración usará el mecanismo de nombre de usuario y contraseña SASL PLAIN para la autenticación.

Esta opción solo se admite con el cifrado TLS y requiere tanto un nombre de usuario y una contraseña de SASL como un certificado de autoridad de certificación.

No está habilitada de forma predeterminada.

CA certificate of Kafka server

Opcional.

El certificado de la AC que se usa para verificar la identidad del servidor de Kafka.

Este parámetro es obligatorio si SASL está habilitado.

Client certificate

Opcional.

El certificado del cliente para la autenticación TLS mutua con el servidor de Kafka.

Este parámetro es obligatorio si se ha habilitado el protocolo TLS mutuo (mTLS).

Client certificate key

Opcional.

La clave privada que corresponde al certificado del cliente, que se usa para la autenticación TLS mutua.

Este parámetro es obligatorio si se ha habilitado el protocolo TLS mutuo (mTLS).

Client certificate key password

Opcional.

Contraseña usada para descifrar la clave privada del certificado de cliente.

Este parámetro es obligatorio si se ha habilitado el protocolo TLS mutuo (mTLS).

SASL PLAIN Username

Opcional.

Nombre de usuario para la autenticación SASL PLAIN con brokers de Kafka.

Este parámetro es obligatorio si SASL está habilitado.

SASL PLAIN Password

Opcional.

Contraseña para la autenticación SASL PLAIN con brokers de Kafka.

Este parámetro es obligatorio si SASL está habilitado.

Para obtener instrucciones sobre cómo configurar una integración en Google SecOps, consulta Configurar integraciones.

Si es necesario, puedes hacer cambios más adelante. Después de configurar una instancia de integración, puedes usarla en los cuadernos de estrategias. Para obtener más información sobre cómo configurar y admitir varias instancias, consulta Admitir varias instancias.

Acciones

Para obtener más información sobre las acciones, consulta Responder a acciones pendientes desde Tu espacio de trabajo y Realizar una acción manual.

Ping

Usa la acción Ping para probar la conectividad con Apache Kafka.

Esta acción no se ejecuta en entidades de Google SecOps.

Entradas de acciones

Ninguno

Resultados de la acción

La acción Ping proporciona las siguientes salidas:

Tipo de salida de la acción Disponibilidad
Adjunto del panel de casos No disponible
Enlace del panel de casos No disponible
Tabla del panel de casos No disponible
Tabla de enriquecimiento No disponible
Resultado de JSON No disponible
Mensajes de salida Disponible
Resultado de la secuencia de comandos. Disponible
Mensajes de salida

La acción Ping puede devolver los siguientes mensajes de salida:

Mensaje resultante Descripción del mensaje

Successfully connected to the Apache Kafka server with the provided connection parameters!

La acción se ha realizado correctamente.
Failed to connect to the Apache Kafka server! Error is ERROR_REASON

No se ha podido realizar la acción.

Comprueba la conexión al servidor, los parámetros de entrada o las credenciales.

Resultado de la secuencia de comandos

En la siguiente tabla se muestra el valor de la salida del resultado de la secuencia de comandos al usar la acción Ping:

Nombre del resultado del script Valor
is_success True o False

Conectores

Para obtener más información sobre cómo configurar conectores en Google SecOps, consulta el artículo Ingerir datos (conectores).

Apache Kafka - Messages Connector

Usa el conector de mensajes de Apache Kafka para recuperar mensajes de Apache Kafka.

El conector obtiene mensajes de un tema de Kafka especificado y puede procesarlos de diferentes formas en función del formato del mensaje. Si un mensaje es un objeto JSON válido, el conector extrae campos específicos para crear alertas y asignaciones. Si el mensaje es una cadena de texto sin formato, se ingiere como datos de eventos sin procesar.

El conector gestiona la asignación de gravedad, las plantillas de nombres de alertas y la generación de IDs únicos en función de los parámetros que proporciones.

Asignación de gravedad de JSON

Para asignar la gravedad de la alerta, debe especificar qué campo usa el conector de mensajes de Apache Kafka para obtener el valor de la gravedad en el parámetro Severity Mapping JSON. La respuesta del conector puede contener tipos de valores, como integer, float y string.

El conector de mensajes de Apache Kafka lee los valores integer y float y los asigna según la configuración de Google SecOps. En la siguiente tabla se muestra la asignación de los valores de integer a la gravedad en Google SecOps:

Valor entero Gravedad asignada
100 Critical
De 80 a 100 High
De 60 a 80 Medium
De 40 a 60 Low
Menos de 40 Informational

Si la respuesta contiene el valor string, el conector Pub/Sub - Mensajes requiere una configuración adicional.

Inicialmente, el valor predeterminado aparece de la siguiente manera:

{
    "Default": 60
}

Si los valores necesarios para la asignación se encuentran en la clave JSON event_severity, pueden ser los siguientes:

  • "Malicious"
  • "Benign"
  • "Unknown"

Para analizar los valores de clave JSON de event_severity y asegurarte de que el objeto JSON tenga el formato correcto, configura el parámetro Severity Mapping JSON de la siguiente manera:

{
    "event_severity": {
        "Malicious": 100,
        "Unknown": 60,
        "Benign": -1
    },
    "Default": 50
}

El valor "Default" es obligatorio.

Si hay varias coincidencias para el mismo objeto JSON, el conector de mensajes de Apache Kafka prioriza la primera clave del objeto JSON.

Para trabajar con campos que contengan valores integer o float, configure la clave y una cadena vacía en el parámetro Severity Mapping JSON:

{
  "Default":"60",
  "integer_field": "",
  "float_field": ""
}

Entradas de conectores

El conector de mensajes de Apache Kafka requiere los siguientes parámetros:

Parámetro Descripción
Product Field Name

Obligatorio.

Nombre del campo en el que se almacena el nombre del producto.

El nombre del producto influye principalmente en la asignación. Para optimizar y mejorar el proceso de asignación del conector, el valor predeterminado se resuelve en un valor de reserva al que se hace referencia desde el código. Cualquier entrada no válida de este parámetro se resuelve en un valor de reserva de forma predeterminada.

El valor predeterminado es Product Name.

Event Field Name

Obligatorio.

Nombre del campo que determina el nombre del evento (subtipo).

El valor predeterminado es event_type.

Environment Field Name

Opcional.

Nombre del campo en el que se almacena el nombre del entorno.

Si falta el campo de entorno, el conector usa el valor predeterminado.

El valor predeterminado es "".

Environment Regex Pattern

Opcional.

Patrón de expresión regular que se va a ejecutar en el valor encontrado en el campo Environment Field Name. Este parámetro te permite manipular el campo de entorno mediante la lógica de expresiones regulares.

Usa el valor predeterminado .* para obtener el valor sin formato Environment Field Name necesario.

Si el patrón de expresión regular es nulo o está vacío, o si el valor del entorno es nulo, el resultado final del entorno es el entorno predeterminado.

Script Timeout (Seconds)

Obligatorio.

El límite de tiempo de espera, en segundos, del proceso de Python que ejecuta la secuencia de comandos actual.

El valor predeterminado es 180.

Kafka brokers

Obligatorio.

Lista separada por comas de los brokers de Kafka a los que conectarse, con el formato hostname:port.

Use TLS for connection

Opcional.

Si se selecciona esta opción, la integración usará el cifrado TLS para la autenticación.

Este parámetro requiere un certificado de CA.

No está habilitada de forma predeterminada.

Use SASL PLAIN with TLS for connection

Opcional.

Si se selecciona esta opción, la integración usará el mecanismo de nombre de usuario y contraseña SASL PLAIN para la autenticación.

Esta opción requiere que se proporcione un nombre de usuario y una contraseña de SASL. Solo se admite con el cifrado TLS, que requiere un certificado de CA.

No está habilitada de forma predeterminada.

CA certificate of Kafka server

Opcional.

El certificado de la AC que se usa para verificar la identidad del servidor de Kafka.

Client certificate

Opcional.

El certificado del cliente para la autenticación TLS mutua con el servidor de Kafka.

Client certificate key

Opcional.

La clave privada que corresponde al certificado del cliente, que se usa para la autenticación TLS mutua.

Client certificate key password

Opcional.

Contraseña usada para descifrar la clave privada del certificado de cliente.

SASL PLAIN Username

Opcional.

Nombre de usuario para la autenticación SASL PLAIN con brokers de Kafka.

SASL PLAIN Password

Opcional.

Contraseña para la autenticación SASL PLAIN con brokers de Kafka.

Topic

Obligatorio.

Tema de Kafka del que se recuperan los incidentes.

Consumer Group ID

Opcional.

Identificador del grupo de consumidores usado al recuperar incidentes.

Si no se proporciona ningún valor, se genera un ID único.

Partitions

Opcional.

Lista CSV de particiones de las que se deben obtener los mensajes.

Initial Offset

Opcional.

Indica dónde empieza el conector a obtener mensajes de una partición de Kafka.

Puede especificar un número entero positivo para empezar en un desplazamiento concreto o usar los valores earliest o latest para empezar a obtener datos desde el principio o el final de la partición.

Poll Timeout

Opcional.

Tiempo de espera de la petición para consumir un mensaje de Kafka, en segundos.

Case Name Template

Opcional.

Plantilla para definir un nombre de caso personalizado. El conector añade una clave custom_case_name al evento.

Puede usar marcadores de posición con el formato FIELD_NAME, que se rellenan con los valores de cadena del primer evento.

Ejemplo: Phishing - EVENT_MAILBOX

Alert Name Template

Obligatorio.

Plantilla para definir el nombre de la alerta.

Puede usar marcadores de posición con el formato FIELD_NAME, que se rellenan con los valores de cadena del primer evento.

Ejemplo: Phishing - EVENT_MAILBOX

Si no se proporciona ningún valor o la plantilla no es válida, el conector utiliza un nombre de alerta predeterminado.

Rule Generator Template

Obligatorio.

Una plantilla para definir el generador de reglas.

Puede usar marcadores de posición con el formato FIELD_NAME, que se rellenan con los valores de cadena del primer evento.

Ejemplo: Phishing - EVENT_MAILBOX

Si no se proporciona ningún valor o la plantilla no es válida, el conector usa un nombre de generador de reglas predeterminado.

Timestamp Field

Obligatorio.

Nombre del campo del mensaje de Kafka que contiene la marca de tiempo de la alerta de Google SecOps.

Si la marca de tiempo no está en formato de época de Unix, su formato debe definirse en el parámetro Timestamp Format.

Timestamp Format

Opcional.

El formato de la marca de tiempo del mensaje, necesario para las marcas de tiempo que no sean de la época de Unix. Usa códigos de formato estándar de Python strftime.

Si la marca de tiempo no está en formato de época de Unix y este parámetro no está configurado, el conector falla.

Severity Mapping JSON

Obligatorio.

El objeto JSON que usa el conector para extraer y asignar el nivel de gravedad del mensaje a la escala de prioridad de Google SecOps.

El valor predeterminado es {"Default": "60"}.

Unique ID Field

Opcional.

Nombre del campo que se va a usar como identificador único del mensaje.

Si no se proporciona ningún valor, el conector genera y usa un hash SHA-256 del contenido del mensaje como identificador del mensaje.

Max Messages To Fetch

Obligatorio.

El número máximo de mensajes que procesa el conector en cada iteración.

El valor predeterminado es 100.

Disable Overflow

Opcional.

Si se selecciona, el conector ignora el mecanismo de desbordamiento de Google SecOps.

Esta opción está habilitada de forma predeterminada.

Verify SSL

Obligatorio.

Si se selecciona esta opción, la integración valida el certificado SSL al conectarse al servidor Apache Kafka.

Esta opción está habilitada de forma predeterminada.

Proxy Server Address

Opcional.

Dirección del servidor proxy que se va a usar.

Proxy Username

Opcional.

Nombre de usuario del proxy para autenticarse.

Proxy Password

Opcional.

La contraseña del proxy para autenticarte.

Reglas de conectores

El conector admite proxies.

Alertas de conectores

En la siguiente tabla se describe la asignación de campos de mensajes de Apache Kafka a campos de alertas de SecOps de Google:

Campo de alerta de Siemplify Campo de mensaje de Apache Kafka
SourceSystemName Rellenado por el framework.
TicketId El valor del campo de ID único o un hash SHA-256 del mensaje.
DisplayId ApacheKafka_{unique id or hash}_{connector identifier}
Name El valor generado por Alert Name Template.
Reason N/A
Description N/A
DeviceVendor Codificado: Apache Kafka
DeviceProduct Valor alternativo: Message
Priority Se asigna a partir del parámetro Severity Mapping JSON.
RuleGenerator El valor generado por Rule Generator Template.
SourceGroupingIdentifier N/A
StartTime Convertida del Timestamp Field.
EndTime Convertida del Timestamp Field.
Siemplify Alert - Extensions N/A
Siemplify Alert - Attachments N/A

Eventos del conector

A continuación, se muestra un ejemplo de evento de conector:

{
  "notificationConfigName": "organizations/ORGANIZATION_ID/notificationConfigs/soar_connector_CONNECTOR_ID_toxic_notifications_config",
  "finding": {
    "name": "organizations/ORGANIZATION_ID/sources/SOURCE_ID/findings/FINDING_ID",
    "parent": "organizations/ORGANIZATION_ID/sources/SOURCE_ID",
    "resourceName": "//compute.googleapis.com/projects/PROJECT_ID/global/firewalls/FIREWALL_ID",
    "state": "ACTIVE",
    "category": "OPEN_NETBIOS_PORT",
    "externalUri": "https://console.cloud.google.com/networking/firewalls/details/default-allow-rdp?project=PROJECT_ID",
    "sourceProperties": {
      "Recommendation": "Restrict the firewall rules at: https://console.cloud.google.com/networking/firewalls/details/default-allow-rdp?project=PROJECT_ID",
      "ExceptionInstructions": "Add the security mark \"allow_open_netbios_port\" to the asset with a value of \"true\" to prevent this finding from being activated again.",
      "Explanation": "Firewall rules that allow connections from all IP addresses on TCP ports 137-139 or UDP ports 137-139 may expose NetBIOS services to attackers.",
      "ScannerName": "FIREWALL_SCANNER",
      "ResourcePath": [
        "projects/PROJECT_ID/",
        "folders/FOLDER_ID_1/",
        "folders/FOLDER_ID_2/",
        "organizations/ORGANIZATION_ID/"
      ],
      "ExposedService": "NetBIOS",
      "OpenPorts": {
        "TCP": [
          137.0,
          138.0,
          139.0
        ],
        "UDP": [
          137.0,
          138.0,
          139.0
        ]
      },
      "compliance_standards": {
        "iso": [
          {
            "ids": [
              "A.13.1.1"
            ]
          }
        ],
        "pci": [
          {
            "ids": [
              "1.2.1"
            ]
          }
        ],
        "nist": [
          {
            "ids": [
              "SC-7"
            ]
          }
        ]
      },
      "ReactivationCount": 4.0
    },
    "securityMarks": {
      "name": "organizations/ORGANIZATION_ID/sources/SOURCE_ID/findings/FINDING_ID/securityMarks",
      "marks": {
        "USER_ID": "SECURITY_MARK"
      }
    },
    "eventTime": "2024-08-30T14:44:37.973090Z",
    "createTime": "2024-06-24T07:08:54.777Z",
    "propertyDataTypes": {
      "ResourcePath": {
        "listValues": {
          "propertyDataTypes": [
            {
              "primitiveDataType": "STRING"
            }
          ]
        }
      },
      "ReactivationCount": {
        "primitiveDataType": "NUMBER"
      },
      "Explanation": {
        "primitiveDataType": "STRING"
      },
      "ExposedService": {
        "primitiveDataType": "STRING"
      },
      "ScannerName": {
        "primitiveDataType": "STRING"
      }
    }
  }
}

¿Necesitas más ayuda? Recibe respuestas de los miembros de la comunidad y de los profesionales de Google SecOps.