Integrar o Apache Kafka ao Google SecOps

Este documento explica como integrar o Apache Kafka ao Google Security Operations (Google SecOps).

Casos de uso

A integração do Apache Kafka pode abordar os seguintes casos de uso:

  • Ingestão de registros de segurança em tempo real: ingere e processa automaticamente eventos de segurança de tópicos do Kafka no Google SecOps. Isso permite o gerenciamento centralizado de registros e a análise em tempo real para gerar alertas com base em dados de streaming.

  • Automação baseada em eventos: acione playbooks automatizados no Google SecOps com base em eventos de segurança específicos ou mensagens transmitidas de um tópico do Kafka. Isso acelera a resposta a eventos críticos, como o login de um usuário em um local incomum.

  • Aperfeiçoamento da inteligência contra ameaças: extraia feeds personalizados de inteligência contra ameaças de tópicos do Kafka para aperfeiçoar alertas e casos atuais. Isso fornece aos analistas um contexto atualizado sobre indicadores de comprometimento (IOCs) e melhora a precisão da análise de ameaças.

Antes de começar

Antes de configurar a integração do Apache Kafka no Google SecOps, conclua os seguintes pré-requisitos:

  • Servidor do Apache Kafka: verifique se você tem acesso a um servidor do Apache Kafka em execução com os agentes e tópicos necessários configurados.
  • Imagem Docker do agente remoto: ao criar agentes remotos, use uma imagem baseada em Debian. Use a imagem a seguir para garantir a compatibilidade:

    us-docker.pkg.dev/siem-ar-public/images/agent-debian:latest
    

Parâmetros de integração

A integração do Apache Kafka exige os seguintes parâmetros:

Parâmetro Descrição
Kafka brokers

Obrigatório.

Uma lista separada por vírgulas de agentes do Kafka a serem conectados, no formato hostname:port.

Use TLS for connection

Opcional.

Se selecionada, a integração usa criptografia TLS para autenticação.

Esse parâmetro exige um certificado de autoridade de certificação (CA).

Não ativado por padrão.

Use SASL PLAIN with TLS for connection

Opcional.

Se selecionada, a integração usa o mecanismo de nome de usuário e senha SASL PLAIN para autenticação.

Essa opção só é compatível com criptografia TLS e exige um nome de usuário e uma senha SASL, além de um certificado de CA.

Não ativado por padrão.

CA certificate of Kafka server

Opcional.

O certificado de CA usado para verificar a identidade do servidor Kafka.

Esse parâmetro é obrigatório se o SASL estiver ativado.

Client certificate

Opcional.

O certificado do cliente para autenticação TLS mútua com o servidor Kafka.

Esse parâmetro é obrigatório se o TLS mútuo (mTLS) estiver ativado.

Client certificate key

Opcional.

A chave privada que corresponde ao certificado do cliente, usada para autenticação TLS mútua.

Esse parâmetro é obrigatório se o TLS mútuo (mTLS) estiver ativado.

Client certificate key password

Opcional.

A senha usada para descriptografar a chave privada do certificado do cliente.

Esse parâmetro é obrigatório se o TLS mútuo (mTLS) estiver ativado.

SASL PLAIN Username

Opcional.

O nome de usuário para autenticação SASL PLAIN com agentes Kafka.

Esse parâmetro é obrigatório se o SASL estiver ativado.

SASL PLAIN Password

Opcional.

A senha para autenticação SASL PLAIN com agentes do Kafka.

Esse parâmetro é obrigatório se o SASL estiver ativado.

Para instruções sobre como configurar uma integração no Google SecOps, consulte Configurar integrações.

É possível fazer mudanças mais tarde, se necessário. Depois de configurar uma instância de integração, você pode usá-la em playbooks. Para mais informações sobre como configurar e oferecer suporte a várias instâncias, consulte Suporte a várias instâncias.

Ações

Para mais informações sobre ações, consulte Responder a ações pendentes da sua mesa de trabalho e Realizar uma ação manual.

Ping

Use a ação Ping para testar a conectividade com o Apache Kafka.

Essa ação não é executada em entidades do Google SecOps.

Entradas de ação

Nenhuma.

Saídas de ação

A ação Ping fornece as seguintes saídas:

Tipo de saída da ação Disponibilidade
Anexo do Painel de Casos Indisponível
Link do Bloqueio de Casos Indisponível
Tabela do painel de casos Indisponível
Tabela de enriquecimento Indisponível
Resultado JSON Indisponível
Mensagens de saída Disponível
Resultado do script. Disponível
Mensagens de saída

A ação Ping pode retornar as seguintes mensagens de saída:

Mensagem de resposta Descrição da mensagem

Successfully connected to the Apache Kafka server with the provided connection parameters!

A ação foi concluída.
Failed to connect to the Apache Kafka server! Error is ERROR_REASON

A ação falhou.

Verifique a conexão com o servidor, os parâmetros de entrada ou as credenciais.

Resultado do script

A tabela a seguir lista o valor da saída do resultado do script ao usar a ação Ping:

Nome do resultado do script Valor
is_success True ou False

Conectores

Para saber mais sobre como configurar conectores no Google SecOps, consulte Ingerir seus dados (conectores).

Conector de mensagens do Apache Kafka

Use o Conector de mensagens do Apache Kafka para recuperar mensagens do Apache Kafka.

O conector recupera mensagens de um tópico especificado do Kafka e pode processá-las de diferentes maneiras com base no formato da mensagem. Se uma mensagem for um objeto JSON válido, o conector vai extrair campos específicos para criação e mapeamento de alertas. Se a mensagem for uma string simples, ela será ingerida como os dados de eventos brutos.

O conector processa o mapeamento de gravidade, a criação de modelos de nomes de alertas e a geração de ID exclusivo com base nos parâmetros fornecidos.

Mapeamento de gravidade JSON

Para mapear a gravidade do alerta, especifique qual campo o Conector de mensagens do Apache Kafka usa para receber o valor da gravidade no parâmetro Severity Mapping JSON. A resposta do conector pode conter tipos de valores, como integer, float e string.

O conector de mensagens do Apache Kafka lê os valores integer e float e os mapeia de acordo com as configurações do Google SecOps. A tabela a seguir mostra o mapeamento dos valores de integer para a gravidade no Google SecOps:

Número inteiro Gravidade mapeada
100 Critical
De 80 a 100 High
De 60 a 80 Medium
De 40 a 60 Low
Menos de 40 Informational

Se a resposta contiver o valor string, o conector do Pub/Sub – Messages vai exigir mais configuração.

Inicialmente, o valor padrão aparece assim:

{
    "Default": 60
}

Se os valores necessários para o mapeamento estiverem localizados na chave JSON event_severity, eles poderão ser os seguintes:

  • "Malicious"
  • "Benign"
  • "Unknown"

Para analisar os valores de chave JSON event_severity e garantir que o objeto JSON tenha um formato correto, configure o parâmetro Severity Mapping JSON da seguinte maneira:

{
    "event_severity": {
        "Malicious": 100,
        "Unknown": 60,
        "Benign": -1
    },
    "Default": 50
}

O valor de "Default" é obrigatório.

Quando há várias correspondências para o mesmo objeto JSON, o conector de mensagens do Apache Kafka prioriza a primeira chave do objeto JSON.

Para trabalhar com campos que contêm valores integer ou float, configure a chave e uma string vazia no parâmetro Severity Mapping JSON:

{
  "Default":"60",
  "integer_field": "",
  "float_field": ""
}

Entradas do conector

O conector de mensagens do Apache Kafka exige os seguintes parâmetros:

Parâmetro Descrição
Product Field Name

Obrigatório.

O nome do campo em que o nome do produto é armazenado.

O nome do produto afeta principalmente o mapeamento. Para simplificar e melhorar o processo de mapeamento do conector, o valor padrão é resolvido como um valor substituto referenciado no código. Qualquer entrada inválida para esse parâmetro é resolvida como um valor de substituição por padrão.

O valor padrão é Product Name.

Event Field Name

Obrigatório.

O nome do campo que determina o nome do evento (subtipo).

O valor padrão é event_type.

Environment Field Name

Opcional.

O nome do campo em que o nome do ambiente é armazenado.

Se o campo "environment" estiver ausente, o conector usará o valor padrão.

O valor padrão é "".

Environment Regex Pattern

Opcional.

Um padrão de expressão regular a ser executado no valor encontrado no campo Environment Field Name. Com esse parâmetro, é possível manipular o campo "environment" usando a lógica de expressão regular.

Use o valor padrão .* para extrair o valor Environment Field Name bruto necessário.

Se o padrão de expressão regular for nulo ou vazio, ou se o valor do ambiente for nulo, o resultado final será o ambiente padrão.

Script Timeout (Seconds)

Obrigatório.

O limite de tempo limite, em segundos, para o processo do Python que executa o script atual.

O valor padrão é 180.

Kafka brokers

Obrigatório.

Uma lista separada por vírgulas de agentes do Kafka a serem conectados, no formato hostname:port.

Use TLS for connection

Opcional.

Se selecionada, a integração usa criptografia TLS para autenticação.

Esse parâmetro exige um certificado de CA.

Não ativado por padrão.

Use SASL PLAIN with TLS for connection

Opcional.

Se selecionada, a integração usa o mecanismo de nome de usuário e senha SASL PLAIN para autenticação.

Essa opção exige que um nome de usuário e uma senha SASL sejam fornecidos. Ele só é compatível com criptografia TLS, que exige um certificado de CA.

Não ativado por padrão.

CA certificate of Kafka server

Opcional.

O certificado de CA usado para verificar a identidade do servidor Kafka.

Client certificate

Opcional.

O certificado do cliente para autenticação TLS mútua com o servidor Kafka.

Client certificate key

Opcional.

A chave privada que corresponde ao certificado do cliente, usada para autenticação TLS mútua.

Client certificate key password

Opcional.

A senha usada para descriptografar a chave privada do certificado do cliente.

SASL PLAIN Username

Opcional.

O nome de usuário para autenticação SASL PLAIN com agentes Kafka.

SASL PLAIN Password

Opcional.

A senha para autenticação SASL PLAIN com agentes do Kafka.

Topic

Obrigatório.

O tópico do Kafka de onde os incidentes são extraídos.

Consumer Group ID

Opcional.

O identificador do grupo de consumidores usado ao recuperar incidentes.

Se nenhum valor for fornecido, um ID exclusivo será gerado.

Partitions

Opcional.

Uma lista CSV de partições de onde buscar mensagens.

Initial Offset

Opcional.

Onde o conector começa a buscar mensagens de uma partição do Kafka.

É possível especificar um número inteiro positivo para começar em um determinado deslocamento ou usar os valores earliest ou latest para começar a buscar do início ou do fim da partição.

Poll Timeout

Opcional.

Um tempo limite de pesquisa para consumir uma mensagem do Kafka, em segundos.

Case Name Template

Opcional.

Um modelo para definir um nome de caso personalizado. O conector adiciona uma chave custom_case_name ao evento.

É possível usar marcadores de posição no formato FIELD_NAME, que são preenchidos com os valores de string do primeiro evento.

Exemplo: Phishing - EVENT_MAILBOX.

Alert Name Template

Obrigatório.

Um modelo para definir o nome do alerta.

É possível usar marcadores de posição no formato FIELD_NAME, que são preenchidos com os valores de string do primeiro evento.

Exemplo: Phishing - EVENT_MAILBOX.

Se um valor não for fornecido ou o modelo for inválido, o conector usará um nome de alerta padrão.

Rule Generator Template

Obrigatório.

Um modelo para definir o gerador de regras.

É possível usar marcadores de posição no formato FIELD_NAME, que são preenchidos com os valores de string do primeiro evento.

Exemplo: Phishing - EVENT_MAILBOX.

Se um valor não for fornecido ou o modelo for inválido, o conector usará um nome padrão de gerador de regras.

Timestamp Field

Obrigatório.

O nome do campo na mensagem do Kafka que contém o carimbo de data/hora do alerta do Google SecOps.

Se o carimbo de data/hora não estiver no formato de época Unix, o formato dele precisará ser definido no parâmetro Timestamp Format.

Timestamp Format

Opcional.

O formato do carimbo de data/hora da mensagem, obrigatório para carimbos de data/hora que não são da época Unix. Use códigos de formato strftime padrão do Python.

Se o carimbo de data/hora não estiver no formato de época Unix e esse parâmetro não estiver configurado, o conector vai falhar.

Severity Mapping JSON

Obrigatório.

O objeto JSON usado pelo conector para extrair e mapear o nível de gravidade da mensagem para a escala de prioridade do Google SecOps.

O valor padrão é {"Default": "60"}.

Unique ID Field

Opcional.

O nome do campo a ser usado como um identificador exclusivo de mensagem.

Se nenhum valor for fornecido, o conector vai gerar e usar um hash SHA-256 do conteúdo da mensagem como identificador.

Max Messages To Fetch

Obrigatório.

O número máximo de mensagens que o conector processa em cada iteração.

O valor padrão é 100.

Disable Overflow

Opcional.

Se selecionado, o conector ignora o mecanismo de estouro do Google SecOps.

Ativado por padrão.

Verify SSL

Obrigatório.

Se selecionada, a integração valida o certificado SSL ao se conectar ao servidor Apache Kafka.

Ativado por padrão.

Proxy Server Address

Opcional.

O endereço do servidor proxy a ser usado.

Proxy Username

Opcional.

O nome de usuário do proxy para autenticação.

Proxy Password

Opcional.

A senha do proxy para autenticação.

Regras do conector

O conector é compatível com proxies.

Alertas de conector

A tabela a seguir descreve o mapeamento dos campos de mensagens do Apache Kafka para os campos de alertas do Google SecOps:

Campo de alerta da Siemplify Campo de mensagem do Apache Kafka
SourceSystemName Preenchido pela estrutura.
TicketId O valor do campo de ID exclusivo ou um hash SHA-256 da mensagem.
DisplayId ApacheKafka_{unique id or hash}_{connector identifier}
Name O valor gerado pelo Alert Name Template.
Reason N/A
Description N/A
DeviceVendor Fixado no código: Apache Kafka
DeviceProduct Valor substituto: Message
Priority Mapeado do parâmetro Severity Mapping JSON.
RuleGenerator O valor gerado pelo Rule Generator Template.
SourceGroupingIdentifier N/A
StartTime Convertido do Timestamp Field.
EndTime Convertido do Timestamp Field.
Siemplify Alert - Extensions N/A
Siemplify Alert - Attachments N/A

Eventos do conector

Confira um exemplo de evento de conector:

{
  "notificationConfigName": "organizations/ORGANIZATION_ID/notificationConfigs/soar_connector_CONNECTOR_ID_toxic_notifications_config",
  "finding": {
    "name": "organizations/ORGANIZATION_ID/sources/SOURCE_ID/findings/FINDING_ID",
    "parent": "organizations/ORGANIZATION_ID/sources/SOURCE_ID",
    "resourceName": "//compute.googleapis.com/projects/PROJECT_ID/global/firewalls/FIREWALL_ID",
    "state": "ACTIVE",
    "category": "OPEN_NETBIOS_PORT",
    "externalUri": "https://console.cloud.google.com/networking/firewalls/details/default-allow-rdp?project=PROJECT_ID",
    "sourceProperties": {
      "Recommendation": "Restrict the firewall rules at: https://console.cloud.google.com/networking/firewalls/details/default-allow-rdp?project=PROJECT_ID",
      "ExceptionInstructions": "Add the security mark \"allow_open_netbios_port\" to the asset with a value of \"true\" to prevent this finding from being activated again.",
      "Explanation": "Firewall rules that allow connections from all IP addresses on TCP ports 137-139 or UDP ports 137-139 may expose NetBIOS services to attackers.",
      "ScannerName": "FIREWALL_SCANNER",
      "ResourcePath": [
        "projects/PROJECT_ID/",
        "folders/FOLDER_ID_1/",
        "folders/FOLDER_ID_2/",
        "organizations/ORGANIZATION_ID/"
      ],
      "ExposedService": "NetBIOS",
      "OpenPorts": {
        "TCP": [
          137.0,
          138.0,
          139.0
        ],
        "UDP": [
          137.0,
          138.0,
          139.0
        ]
      },
      "compliance_standards": {
        "iso": [
          {
            "ids": [
              "A.13.1.1"
            ]
          }
        ],
        "pci": [
          {
            "ids": [
              "1.2.1"
            ]
          }
        ],
        "nist": [
          {
            "ids": [
              "SC-7"
            ]
          }
        ]
      },
      "ReactivationCount": 4.0
    },
    "securityMarks": {
      "name": "organizations/ORGANIZATION_ID/sources/SOURCE_ID/findings/FINDING_ID/securityMarks",
      "marks": {
        "USER_ID": "SECURITY_MARK"
      }
    },
    "eventTime": "2024-08-30T14:44:37.973090Z",
    "createTime": "2024-06-24T07:08:54.777Z",
    "propertyDataTypes": {
      "ResourcePath": {
        "listValues": {
          "propertyDataTypes": [
            {
              "primitiveDataType": "STRING"
            }
          ]
        }
      },
      "ReactivationCount": {
        "primitiveDataType": "NUMBER"
      },
      "Explanation": {
        "primitiveDataType": "STRING"
      },
      "ExposedService": {
        "primitiveDataType": "STRING"
      },
      "ScannerName": {
        "primitiveDataType": "STRING"
      }
    }
  }
}

Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais do Google SecOps.