Integra Apache Kafka con Google SecOps

En este documento, se explica cómo integrar Apache Kafka en Google Security Operations (Google SecOps).

Casos de uso

La integración de Apache Kafka puede abordar los siguientes casos de uso:

  • Transferencia de registros de seguridad en tiempo real: Transfiere y procesa automáticamente los eventos de seguridad de los temas de Kafka en Google SecOps. Esto permite la administración centralizada de registros y el análisis en tiempo real para generar alertas basadas en datos de transmisión.

  • Automatización basada en eventos: Activa playbooks automatizados en Google SecOps en función de eventos de seguridad específicos o mensajes transmitidos desde un tema de Kafka. Esto acelera la respuesta a eventos críticos, como el acceso de un usuario desde una ubicación inusual.

  • Enriquecimiento de inteligencia contra amenazas: Extrae feeds de inteligencia contra amenazas personalizados de temas de Kafka para enriquecer las alertas y los casos existentes. Esto proporciona a los analistas un contexto actualizado sobre los indicadores de compromiso (IOC) y mejora la precisión del análisis de amenazas.

Antes de comenzar

Antes de configurar la integración de Apache Kafka en Google SecOps, completa los siguientes requisitos previos:

  • Servidor de Apache Kafka: Asegúrate de tener acceso a un servidor de Apache Kafka en ejecución con los agentes y temas de Kafka necesarios configurados.
  • Imagen de Docker del agente remoto: Cuando crees agentes remotos, debes usar una imagen basada en Debian. Usa la siguiente imagen para garantizar la compatibilidad:

    us-docker.pkg.dev/siem-ar-public/images/agent-debian:latest
    

Parámetros de integración

La integración de Apache Kafka requiere los siguientes parámetros:

Parámetro Descripción
Kafka brokers

Obligatorio.

Es una lista separada por comas de los agentes de Kafka a los que se debe conectar, con el formato hostname:port.

Use TLS for connection

Es opcional.

Si se selecciona, la integración usa la encriptación TLS para la autenticación.

Este parámetro requiere un certificado de autoridad certificadora (CA).

No está habilitado de forma predeterminada.

Use SASL PLAIN with TLS for connection

Es opcional.

Si se selecciona, la integración usa el mecanismo de autenticación de nombre de usuario y contraseña SASL PLAIN.

Esta opción solo se admite con la encriptación TLS y requiere un nombre de usuario y una contraseña de SASL, así como un certificado de CA.

No está habilitado de forma predeterminada.

CA certificate of Kafka server

Es opcional.

Es el certificado de la CA que se usa para verificar la identidad del servidor de Kafka.

Este parámetro es obligatorio si SASL está habilitado.

Client certificate

Es opcional.

Certificado del cliente para la autenticación TLS mutua con el servidor de Kafka.

Este parámetro es obligatorio si la TLS mutua (mTLS) está habilitada.

Client certificate key

Es opcional.

Es la clave privada que corresponde al certificado del cliente y que se usa para la autenticación TLS mutua.

Este parámetro es obligatorio si la TLS mutua (mTLS) está habilitada.

Client certificate key password

Es opcional.

Es la contraseña que se usa para desencriptar la clave privada del certificado de cliente.

Este parámetro es obligatorio si la TLS mutua (mTLS) está habilitada.

SASL PLAIN Username

Es opcional.

Nombre de usuario para la autenticación SASL PLAIN con agentes de Kafka.

Este parámetro es obligatorio si SASL está habilitado.

SASL PLAIN Password

Es opcional.

Contraseña para la autenticación SASL PLAIN con agentes de Kafka.

Este parámetro es obligatorio si SASL está habilitado.

Si quieres obtener instrucciones para configurar una integración en Google SecOps, consulta Configura integraciones.

Si es necesario, puedes hacer cambios más adelante. Después de configurar una instancia de integración, puedes usarla en los playbooks. Para obtener más información sobre cómo configurar y admitir varias instancias, consulta Compatibilidad con varias instancias.

Acciones

Para obtener más información sobre las acciones, consulta Cómo responder a las acciones pendientes desde Tu escritorio y Cómo realizar una acción manual.

Ping

Usa la acción Ping para probar la conectividad a Apache Kafka.

Esta acción no se ejecuta en las entidades de Google SecOps.

Entradas de acción

Ninguno

Resultados de la acción

La acción Ping proporciona los siguientes resultados:

Tipo de salida de la acción Disponibilidad
Adjunto del muro de casos No disponible
Vínculo al muro de casos No disponible
Tabla del muro de casos No disponible
Tabla de enriquecimiento No disponible
Resultado de JSON No disponible
Mensajes de salida Disponible
Es el resultado de la secuencia de comandos. Disponible
Mensajes de salida

La acción Ping puede mostrar los siguientes mensajes de salida:

Mensaje de salida Descripción del mensaje

Successfully connected to the Apache Kafka server with the provided connection parameters!

La acción se completó correctamente.
Failed to connect to the Apache Kafka server! Error is ERROR_REASON

No se pudo realizar la acción.

Verifica la conexión al servidor, los parámetros de entrada o las credenciales.

Resultado de secuencia de comandos

En la siguiente tabla, se indica el valor del resultado de la secuencia de comandos cuando se usa la acción Ping:

Nombre del resultado de la secuencia de comandos Valor
is_success True o False

Conectores

Para obtener más información sobre cómo configurar conectores en Google SecOps, consulta Ingiere tus datos (conectores).

Conector de mensajes de Apache Kafka

Usa el conector de mensajes de Apache Kafka para recuperar mensajes de Apache Kafka.

El conector recupera mensajes de un tema de Kafka especificado y puede procesarlos de diferentes maneras según el formato del mensaje. Si un mensaje es un objeto JSON válido, el conector extrae campos específicos para la creación y la asignación de alertas. Si el mensaje es una cadena simple, se ingiere como los datos del evento sin procesar.

El conector controla la asignación de gravedad, la creación de plantillas de nombres de alertas y la generación de IDs únicos según los parámetros que proporciones.

Asignación de gravedad de JSON

Para asignar la gravedad de la alerta, debes especificar qué campo usa el conector de mensajes de Apache Kafka para obtener el valor de gravedad en el parámetro Severity Mapping JSON. La respuesta del conector puede contener tipos de valores, como integer, float y string.

El conector de mensajes de Apache Kafka lee los valores de integer y float y los asigna según la configuración de SecOps de Google. En la siguiente tabla, se muestra la asignación de los valores de integer a la gravedad en Google SecOps:

Valor del número entero Gravedad asignada
100 Critical
Desde 80 hasta 100 High
Desde 60 hasta 80 Medium
Desde 40 hasta 60 Low
Menos de 40 Informational

Si la respuesta contiene el valor string, el conector de Pub/Sub – Messages requiere configuración adicional.

Inicialmente, el valor predeterminado aparece de la siguiente manera:

{
    "Default": 60
}

Si los valores necesarios para la asignación se encuentran en la clave JSON event_severity, los valores pueden ser los siguientes:

  • "Malicious"
  • "Benign"
  • "Unknown"

Para analizar los valores de clave JSON de event_severity y garantizar que el objeto JSON tenga el formato correcto, configura el parámetro Severity Mapping JSON de la siguiente manera:

{
    "event_severity": {
        "Malicious": 100,
        "Unknown": 60,
        "Benign": -1
    },
    "Default": 50
}

El valor de "Default" es obligatorio.

En el caso de que haya varias coincidencias para el mismo objeto JSON, el conector de mensajes de Apache Kafka prioriza la primera clave del objeto JSON.

Para trabajar con campos que contienen valores integer o float, configura la clave y una cadena vacía en el parámetro Severity Mapping JSON:

{
  "Default":"60",
  "integer_field": "",
  "float_field": ""
}

Entradas del conector

El conector de mensajes de Apache Kafka requiere los siguientes parámetros:

Parámetro Descripción
Product Field Name

Obligatorio.

Es el nombre del campo en el que se almacena el nombre del producto.

El nombre del producto afecta principalmente la asignación. Para optimizar y mejorar el proceso de asignación del conector, el valor predeterminado se resuelve en un valor de resguardo al que se hace referencia desde el código. De forma predeterminada, cualquier entrada no válida para este parámetro se resuelve en un valor de resguardo.

El valor predeterminado es Product Name.

Event Field Name

Obligatorio.

Es el nombre del campo que determina el nombre del evento (subtipo).

El valor predeterminado es event_type.

Environment Field Name

Es opcional.

Nombre del campo en el que se almacena el nombre del entorno.

Si falta el campo environment, el conector usa el valor predeterminado.

El valor predeterminado es "".

Environment Regex Pattern

Es opcional.

Es un patrón de expresión regular que se ejecutará en el valor que se encuentra en el campo Environment Field Name. Este parámetro te permite manipular el campo del entorno con la lógica de expresiones regulares.

Usa el valor predeterminado .* para recuperar el valor Environment Field Name sin procesar requerido.

Si el patrón de expresión regular es nulo o está vacío, o si el valor del entorno es nulo, el resultado final del entorno es el entorno predeterminado.

Script Timeout (Seconds)

Obligatorio.

Es el límite de tiempo de espera, en segundos, para el proceso de Python que ejecuta la secuencia de comandos actual.

El valor predeterminado es 180.

Kafka brokers

Obligatorio.

Es una lista separada por comas de los agentes de Kafka a los que se debe conectar, con el formato hostname:port.

Use TLS for connection

Es opcional.

Si se selecciona, la integración usa la encriptación TLS para la autenticación.

Este parámetro requiere un certificado de CA.

No está habilitado de forma predeterminada.

Use SASL PLAIN with TLS for connection

Es opcional.

Si se selecciona, la integración usa el mecanismo de autenticación de nombre de usuario y contraseña SASL PLAIN.

Esta opción requiere que se proporcionen un nombre de usuario y una contraseña de SASL. Solo se admite con la encriptación TLS, que requiere un certificado de CA.

No está habilitado de forma predeterminada.

CA certificate of Kafka server

Es opcional.

Es el certificado de la CA que se usa para verificar la identidad del servidor de Kafka.

Client certificate

Es opcional.

Certificado del cliente para la autenticación TLS mutua con el servidor de Kafka.

Client certificate key

Es opcional.

Es la clave privada que corresponde al certificado del cliente y que se usa para la autenticación TLS mutua.

Client certificate key password

Es opcional.

Es la contraseña que se usa para desencriptar la clave privada del certificado de cliente.

SASL PLAIN Username

Es opcional.

Nombre de usuario para la autenticación SASL PLAIN con agentes de Kafka.

SASL PLAIN Password

Es opcional.

Contraseña para la autenticación SASL PLAIN con agentes de Kafka.

Topic

Obligatorio.

Es el tema de Kafka desde el que se recuperan los incidentes.

Consumer Group ID

Es opcional.

Es el identificador del grupo de consumidores que se usa cuando se recuperan incidentes.

Si no se proporciona ningún valor, se genera un ID único.

Partitions

Es opcional.

Es una lista en formato CSV de las particiones desde las que se recuperarán los mensajes.

Initial Offset

Es opcional.

Es el punto en el que el conector comienza a recuperar mensajes de una partición de Kafka.

Puedes especificar un número entero positivo para comenzar en un desplazamiento determinado o usar los valores earliest o latest para comenzar a recuperar datos desde el principio o el final de la partición.

Poll Timeout

Es opcional.

Es un tiempo de espera de sondeo para consumir un mensaje de Kafka, en segundos.

Case Name Template

Es opcional.

Es una plantilla para definir un nombre de caso personalizado. El conector agrega una clave custom_case_name al evento.

Puedes usar marcadores de posición en el formato FIELD_NAME, que se completan con los valores de cadena del primer evento.

Ejemplo: Phishing - EVENT_MAILBOX.

Alert Name Template

Obligatorio.

Es una plantilla para definir el nombre de la alerta.

Puedes usar marcadores de posición en el formato FIELD_NAME, que se completan con los valores de cadena del primer evento.

Ejemplo: Phishing - EVENT_MAILBOX.

Si no se proporciona un valor o la plantilla no es válida, el conector usa un nombre de alerta predeterminado.

Rule Generator Template

Obligatorio.

Es una plantilla para definir el generador de reglas.

Puedes usar marcadores de posición en el formato FIELD_NAME, que se completan con los valores de cadena del primer evento.

Ejemplo: Phishing - EVENT_MAILBOX.

Si no se proporciona un valor o la plantilla no es válida, el conector usa un nombre predeterminado para el generador de reglas.

Timestamp Field

Obligatorio.

Nombre del campo en el mensaje de Kafka que contiene la marca de tiempo de la alerta de SecOps de Google.

Si la marca de tiempo no está en formato de época de Unix, su formato debe definirse en el parámetro Timestamp Format.

Timestamp Format

Es opcional.

Es el formato de la marca de tiempo del mensaje, que se requiere para las marcas de tiempo que no son de época Unix. Usa códigos de formato strftime estándar de Python.

Si la marca de tiempo no está en formato de época de Unix y este parámetro no está configurado, el conector fallará.

Severity Mapping JSON

Obligatorio.

Objeto JSON que usa el conector para extraer y asignar el nivel de gravedad del mensaje a la escala de prioridad de Google SecOps.

El valor predeterminado es {"Default": "60"}.

Unique ID Field

Es opcional.

Nombre del campo que se usará como identificador único del mensaje.

Si no se proporciona ningún valor, el conector genera y usa un hash SHA-256 del contenido del mensaje como identificador del mensaje.

Max Messages To Fetch

Obligatorio.

Es la cantidad máxima de mensajes que el conector procesa en cada iteración.

El valor predeterminado es 100.

Disable Overflow

Es opcional.

Si se selecciona, el conector ignora el mecanismo de desbordamiento de SecOps de Google.

Habilitada de forma predeterminada.

Verify SSL

Obligatorio.

Si se selecciona esta opción, la integración valida el certificado SSL cuando se conecta al servidor de Apache Kafka.

Habilitada de forma predeterminada.

Proxy Server Address

Es opcional.

Es la dirección del servidor proxy que se usará.

Proxy Username

Es opcional.

Nombre de usuario del proxy con el que se realizará la autenticación.

Proxy Password

Es opcional.

Es la contraseña del proxy con la que se realizará la autenticación.

Reglas del conector

El conector admite proxies.

Alertas de conector

En la siguiente tabla, se describe la asignación de los campos de mensajes de Apache Kafka a los campos de alertas de SecOps de Google:

Campo de alerta de Siemplify Campo de mensaje de Apache Kafka
SourceSystemName El framework lo completa.
TicketId Es el valor del campo de ID único o un hash SHA-256 del mensaje.
DisplayId ApacheKafka_{unique id or hash}_{connector identifier}
Name Es el valor que genera el Alert Name Template.
Reason N/A
Description N/A
DeviceVendor Codificado: Apache Kafka
DeviceProduct Valor de resguardo: Message
Priority Se asigna desde el parámetro Severity Mapping JSON.
RuleGenerator Es el valor que genera el Rule Generator Template.
SourceGroupingIdentifier N/A
StartTime Se convirtió desde el Timestamp Field.
EndTime Se convirtió desde el Timestamp Field.
Siemplify Alert - Extensions N/A
Siemplify Alert - Attachments N/A

Eventos del conector

El siguiente es un ejemplo de un evento de conector:

{
  "notificationConfigName": "organizations/ORGANIZATION_ID/notificationConfigs/soar_connector_CONNECTOR_ID_toxic_notifications_config",
  "finding": {
    "name": "organizations/ORGANIZATION_ID/sources/SOURCE_ID/findings/FINDING_ID",
    "parent": "organizations/ORGANIZATION_ID/sources/SOURCE_ID",
    "resourceName": "//compute.googleapis.com/projects/PROJECT_ID/global/firewalls/FIREWALL_ID",
    "state": "ACTIVE",
    "category": "OPEN_NETBIOS_PORT",
    "externalUri": "https://console.cloud.google.com/networking/firewalls/details/default-allow-rdp?project=PROJECT_ID",
    "sourceProperties": {
      "Recommendation": "Restrict the firewall rules at: https://console.cloud.google.com/networking/firewalls/details/default-allow-rdp?project=PROJECT_ID",
      "ExceptionInstructions": "Add the security mark \"allow_open_netbios_port\" to the asset with a value of \"true\" to prevent this finding from being activated again.",
      "Explanation": "Firewall rules that allow connections from all IP addresses on TCP ports 137-139 or UDP ports 137-139 may expose NetBIOS services to attackers.",
      "ScannerName": "FIREWALL_SCANNER",
      "ResourcePath": [
        "projects/PROJECT_ID/",
        "folders/FOLDER_ID_1/",
        "folders/FOLDER_ID_2/",
        "organizations/ORGANIZATION_ID/"
      ],
      "ExposedService": "NetBIOS",
      "OpenPorts": {
        "TCP": [
          137.0,
          138.0,
          139.0
        ],
        "UDP": [
          137.0,
          138.0,
          139.0
        ]
      },
      "compliance_standards": {
        "iso": [
          {
            "ids": [
              "A.13.1.1"
            ]
          }
        ],
        "pci": [
          {
            "ids": [
              "1.2.1"
            ]
          }
        ],
        "nist": [
          {
            "ids": [
              "SC-7"
            ]
          }
        ]
      },
      "ReactivationCount": 4.0
    },
    "securityMarks": {
      "name": "organizations/ORGANIZATION_ID/sources/SOURCE_ID/findings/FINDING_ID/securityMarks",
      "marks": {
        "USER_ID": "SECURITY_MARK"
      }
    },
    "eventTime": "2024-08-30T14:44:37.973090Z",
    "createTime": "2024-06-24T07:08:54.777Z",
    "propertyDataTypes": {
      "ResourcePath": {
        "listValues": {
          "propertyDataTypes": [
            {
              "primitiveDataType": "STRING"
            }
          ]
        }
      },
      "ReactivationCount": {
        "primitiveDataType": "NUMBER"
      },
      "Explanation": {
        "primitiveDataType": "STRING"
      },
      "ExposedService": {
        "primitiveDataType": "STRING"
      },
      "ScannerName": {
        "primitiveDataType": "STRING"
      }
    }
  }
}

¿Necesitas más ayuda? Obtén respuestas de miembros de la comunidad y profesionales de Google SecOps.