Integre o Apache Kafka com o Google SecOps

Versão da integração: 1.0

Este documento explica como integrar o Apache Kafka com o Google Security Operations (Google SecOps).

Exemplos de utilização

A integração do Apache Kafka pode resolver os seguintes exemplos de utilização:

  • Ingestão de registos de segurança em tempo real: ingira e processe automaticamente eventos de segurança de tópicos do Kafka no Google SecOps. Isto permite uma gestão de registos centralizada e uma análise em tempo real para gerar alertas com base em dados de streaming.

  • Automatização orientada por eventos: acione manuais de procedimentos automatizados no Google SecOps com base em eventos de segurança específicos ou mensagens transmitidas a partir de um tópico do Kafka. Isto acelera a resposta a eventos críticos, como o início de sessão de um utilizador a partir de uma localização invulgar.

  • Enriquecimento de informações sobre ameaças: extraia feeds de informações sobre ameaças personalizadas de tópicos do Kafka para enriquecer os alertas e os registos existentes. Isto fornece aos analistas um contexto atualizado sobre os indicadores de comprometimento (IOCs) e melhora a precisão da análise de ameaças.

Antes de começar

Antes de configurar a integração do Apache Kafka no Google SecOps, conclua os seguintes pré-requisitos:

  • Servidor Apache Kafka: certifique-se de que tem acesso a um servidor Apache Kafka em execução com os mediadores e os tópicos do Kafka necessários configurados.
  • Imagem Docker do agente remoto: quando criar agentes remotos, tem de usar uma imagem baseada no Debian. Use a seguinte imagem para garantir a compatibilidade:

    us-docker.pkg.dev/siem-ar-public/images/agent-debian:latest
    

Parâmetros de integração

A integração do Apache Kafka requer os seguintes parâmetros:

Parâmetro Descrição
Kafka brokers

Obrigatório.

Uma lista de agentes Kafka separados por vírgulas aos quais estabelecer ligação, no formato hostname:port.

Use TLS for connection

Opcional.

Se estiver selecionada, a integração usa a encriptação TLS para a autenticação.

Este parâmetro requer um certificado de autoridade de certificação (CA).

Não está ativada por predefinição.

Use SASL PLAIN with TLS for connection

Opcional.

Se estiver selecionada, a integração usa o mecanismo de nome de utilizador e palavra-passe SASL PLAIN para autenticação.

Esta opção só é suportada com encriptação TLS e requer um nome de utilizador e uma palavra-passe SASL, bem como um certificado de AC.

Não está ativada por predefinição.

CA certificate of Kafka server

Opcional.

O certificado da AC usado para validar a identidade do servidor Kafka.

Este parâmetro é obrigatório se o SASL estiver ativado.

Client certificate

Opcional.

O certificado do cliente para a autenticação TLS mútua com o servidor Kafka.

Este parâmetro é obrigatório se o TLS mútuo (mTLS) estiver ativado.

Client certificate key

Opcional.

A chave privada que corresponde ao certificado do cliente, usada para a autenticação TLS mútua.

Este parâmetro é obrigatório se o TLS mútuo (mTLS) estiver ativado.

Client certificate key password

Opcional.

A palavra-passe usada para desencriptar a chave privada do certificado do cliente.

Este parâmetro é obrigatório se o TLS mútuo (mTLS) estiver ativado.

SASL PLAIN Username

Opcional.

O nome de utilizador para a autenticação SASL PLAIN com agentes Kafka.

Este parâmetro é obrigatório se o SASL estiver ativado.

SASL PLAIN Password

Opcional.

A palavra-passe para a autenticação SASL PLAIN com agentes Kafka.

Este parâmetro é obrigatório se o SASL estiver ativado.

Para ver instruções sobre como configurar uma integração no Google SecOps, consulte o artigo Configurar integrações.

Se necessário, pode fazer alterações numa fase posterior. Depois de configurar uma instância de integração, pode usá-la em manuais de procedimentos. Para mais informações sobre como configurar e suportar várias instâncias, consulte o artigo Suporte de várias instâncias.

Ações

Para mais informações sobre ações, consulte os artigos Responda a ações pendentes da sua mesa de trabalho e Realize uma ação manual.

Tchim-tchim

Use a ação Ping para testar a conetividade ao Apache Kafka.

Esta ação não é executada em entidades do Google SecOps.

Dados de ações

Nenhum.

Resultados da ação

A ação Ping fornece os seguintes resultados:

Tipo de saída da ação Disponibilidade
Fixação à parede da caixa Não disponível
Link da parede da caixa Não disponível
Mesa de parede para capas Não disponível
Tabela de enriquecimento Não disponível
Resultado JSON Não disponível
Mensagens de saída Disponível
Resultado do script. Disponível
Mensagens de saída

A ação Ping pode devolver as seguintes mensagens de saída:

Mensagem de saída Descrição da mensagem

Successfully connected to the Apache Kafka server with the provided connection parameters!

A ação foi bem-sucedida.
Failed to connect to the Apache Kafka server! Error is ERROR_REASON

A ação falhou.

Verifique a ligação ao servidor, os parâmetros de entrada ou as credenciais.

Resultado do script

A tabela seguinte lista o valor do resultado do script quando usa a ação Ping:

Nome do resultado do script Valor
is_success True ou False

Conetores

Para mais informações sobre como configurar conetores no Google SecOps, consulte o artigo Carregue os seus dados (conetores).

Apache Kafka – Conetor de mensagens

Use o conetor Apache Kafka – Mensagens para obter mensagens do Apache Kafka.

O conector obtém mensagens de um tópico do Kafka especificado e pode processá-las de diferentes formas com base no formato da mensagem. Se uma mensagem for um objeto JSON válido, o conetor extrai campos específicos para a criação e o mapeamento de alertas. Se a mensagem for uma string simples, é carregada como os dados de eventos não processados.

O conector processa o mapeamento da gravidade, a criação de modelos de nomes de alertas e a geração de IDs únicos com base nos parâmetros que fornece.

Mapeamento da gravidade do JSON

Para mapear a gravidade do alerta, tem de especificar o campo que o conetor de mensagens do Apache Kafka usa para obter o valor da gravidade no parâmetro Severity Mapping JSON. A resposta do conetor pode conter tipos de valores, como integer, float e string.

O Apache Kafka - Messages Connector lê os valores integer e float e mapeia-os de acordo com as definições do Google SecOps. A tabela seguinte mostra o mapeamento dos valores integer para a gravidade no Google SecOps:

Valor de número inteiro Gravidade mapeada
100 Critical
Da(s) 80 à(s) 100 High
Da(s) 60 à(s) 80 Medium
Da(s) 40 à(s) 60 Low
Menor que 40 Informational

Se a resposta contiver o valor string, o Pub/Sub – Messages Connector requer uma configuração adicional.

Inicialmente, o valor predefinido é apresentado da seguinte forma:

{
    "Default": 60
}

Se os valores necessários para o mapeamento estiverem localizados na chave JSON, os valores podem ser os seguintes:event_severity

  • "Malicious"
  • "Benign"
  • "Unknown"

Para analisar os valores-chave JSON event_severity e garantir que o objeto JSON tem um formato correto, configure o parâmetro Severity Mapping JSON da seguinte forma:

{
    "event_severity": {
        "Malicious": 100,
        "Unknown": 60,
        "Benign": -1
    },
    "Default": 50
}

O valor "Default" é obrigatório.

No caso de existirem várias correspondências para o mesmo objeto JSON, o conetor de mensagens do Apache Kafka dá prioridade à primeira chave do objeto JSON.

Para trabalhar com campos que contêm valores integer ou float, configure a chave e uma string vazia no parâmetro Severity Mapping JSON:

{
  "Default":"60",
  "integer_field": "",
  "float_field": ""
}

Entradas do conetor

O conetor Apache Kafka – Mensagens requer os seguintes parâmetros:

Parâmetro Descrição
Product Field Name

Obrigatório.

O nome do campo onde o nome do produto está armazenado.

O nome do produto afeta principalmente o mapeamento. Para simplificar e melhorar o processo de mapeamento do conector, o valor predefinido é resolvido para um valor alternativo referenciado a partir do código. Qualquer entrada inválida para este parâmetro é resolvida para um valor alternativo por predefinição.

O valor predefinido é Product Name.

Event Field Name

Obrigatório.

O nome do campo que determina o nome do evento (subtipo).

O valor predefinido é event_type.

Environment Field Name

Opcional.

O nome do campo onde o nome do ambiente está armazenado.

Se o campo de ambiente estiver em falta, o conector usa o valor predefinido.

O valor predefinido é "".

Environment Regex Pattern

Opcional.

Um padrão de expressão regular a executar no valor encontrado no campo Environment Field Name. Este parâmetro permite-lhe manipular o campo de ambiente através da lógica de expressão regular.

Use o valor predefinido .* para obter o valor bruto Environment Field Name necessário.

Se o padrão de expressão regular for nulo ou estiver vazio, ou o valor do ambiente for nulo, o resultado do ambiente final é o ambiente predefinido.

Script Timeout (Seconds)

Obrigatório.

O limite de tempo limite, em segundos, para o processo Python que executa o script atual.

O valor predefinido é 180.

Kafka brokers

Obrigatório.

Uma lista de agentes Kafka separados por vírgulas aos quais estabelecer ligação, no formato hostname:port.

Use TLS for connection

Opcional.

Se estiver selecionada, a integração usa a encriptação TLS para a autenticação.

Este parâmetro requer um certificado da CA.

Não está ativada por predefinição.

Use SASL PLAIN with TLS for connection

Opcional.

Se estiver selecionada, a integração usa o mecanismo de nome de utilizador e palavra-passe SASL PLAIN para autenticação.

Esta opção requer o fornecimento de um nome de utilizador e uma palavra-passe SASL. Só é compatível com a encriptação TLS, que requer um certificado de AC.

Não está ativada por predefinição.

CA certificate of Kafka server

Opcional.

O certificado da AC usado para validar a identidade do servidor Kafka.

Client certificate

Opcional.

O certificado do cliente para a autenticação TLS mútua com o servidor Kafka.

Client certificate key

Opcional.

A chave privada que corresponde ao certificado do cliente, usada para a autenticação TLS mútua.

Client certificate key password

Opcional.

A palavra-passe usada para desencriptar a chave privada do certificado do cliente.

SASL PLAIN Username

Opcional.

O nome de utilizador para a autenticação SASL PLAIN com agentes Kafka.

SASL PLAIN Password

Opcional.

A palavra-passe para a autenticação SASL PLAIN com agentes Kafka.

Topic

Obrigatório.

O tópico do Kafka a partir do qual os incidentes são obtidos.

Consumer Group ID

Opcional.

O identificador do grupo de consumidores usado ao obter incidentes.

Se não for indicado nenhum valor, é gerado um ID exclusivo.

Partitions

Opcional.

Uma lista CSV de partições a partir das quais obter mensagens.

Initial Offset

Opcional.

Onde o conector começa a obter mensagens de uma partição do Kafka.

Pode especificar um número inteiro positivo para começar num desvio específico ou usar os valores earliest ou latest para começar a obter dados do início ou do fim da partição.

Poll Timeout

Opcional.

Um limite de tempo de sondagem para consumir uma mensagem do Kafka, em segundos.

Case Name Template

Opcional.

Um modelo para definir um nome de registo personalizado. O conector adiciona uma chave custom_case_name ao evento.

Pode usar marcadores de posição no formato FIELD_NAME, que são preenchidos a partir dos valores de string do primeiro evento.

Exemplo: Phishing - EVENT_MAILBOX.

Alert Name Template

Obrigatório.

Um modelo para definir o nome do alerta.

Pode usar marcadores de posição no formato FIELD_NAME, que são preenchidos a partir dos valores de string do primeiro evento.

Exemplo: Phishing - EVENT_MAILBOX.

Se não for fornecido um valor ou o modelo for inválido, o conector usa um nome de alerta predefinido.

Rule Generator Template

Obrigatório.

Um modelo para definir o gerador de regras.

Pode usar marcadores de posição no formato FIELD_NAME, que são preenchidos a partir dos valores de string do primeiro evento.

Exemplo: Phishing - EVENT_MAILBOX.

Se não for fornecido um valor ou o modelo for inválido, o conector usa um nome do gerador de regras predefinido.

Timestamp Field

Obrigatório.

O nome do campo na mensagem Kafka que contém a indicação de tempo do alerta do Google SecOps.

Se a data/hora não estiver no formato epoch Unix, o respetivo formato tem de ser definido no parâmetro Timestamp Format.

Timestamp Format

Opcional.

O formato da data/hora da mensagem, necessário para datas/horas que não sejam de época Unix. Use códigos de formato strftime padrão do Python.

Se a data/hora não estiver no formato de época Unix e este parâmetro não estiver configurado, o conetor falha.

Severity Mapping JSON

Obrigatório.

O objeto JSON usado pelo conetor para extrair e mapear o nível de gravidade da mensagem para a escala de prioridade do Google SecOps.

O valor predefinido é {"Default": "60"}.

Unique ID Field

Opcional.

O nome do campo a usar como um identificador de mensagem exclusivo.

Se não for fornecido nenhum valor, o conector gera e usa um hash SHA-256 do conteúdo da mensagem como identificador da mensagem.

Max Messages To Fetch

Obrigatório.

O número máximo de mensagens que o conector processa para cada iteração.

O valor predefinido é 100.

Disable Overflow

Opcional.

Se selecionado, o conector ignora o mecanismo de overflow do Google SecOps.

Ativada por predefinição.

Verify SSL

Obrigatório.

Se estiver selecionada, a integração valida o certificado SSL quando se liga ao servidor do Apache Kafka.

Ativada por predefinição.

Proxy Server Address

Opcional.

O endereço do servidor proxy a usar.

Proxy Username

Opcional.

O nome de utilizador do proxy para autenticação.

Proxy Password

Opcional.

A palavra-passe do proxy para autenticação.

Regras de conector

O conetor suporta proxies.

Alertas de conetores

A tabela seguinte descreve o mapeamento dos campos de mensagens do Apache Kafka para os campos de alertas do Google SecOps:

Campo de alerta do Siemplify Campo de mensagem do Apache Kafka
SourceSystemName Preenchidos pelo framework.
TicketId O valor do campo de ID exclusivo ou um hash SHA-256 da mensagem.
DisplayId ApacheKafka_{unique id or hash}_{connector identifier}
Name O valor gerado pelo Alert Name Template.
Reason N/A
Description N/A
DeviceVendor Codificado: Apache Kafka
DeviceProduct Valor alternativo: Message
Priority Mapeado a partir do parâmetro Severity Mapping JSON.
RuleGenerator O valor gerado pelo Rule Generator Template.
SourceGroupingIdentifier N/A
StartTime Convertida a partir do Timestamp Field.
EndTime Convertida a partir do Timestamp Field.
Siemplify Alert - Extensions N/A
Siemplify Alert - Attachments N/A

Eventos de conetores

Segue-se um exemplo de um evento de conector:

{
  "notificationConfigName": "organizations/ORGANIZATION_ID/notificationConfigs/soar_connector_CONNECTOR_ID_toxic_notifications_config",
  "finding": {
    "name": "organizations/ORGANIZATION_ID/sources/SOURCE_ID/findings/FINDING_ID",
    "parent": "organizations/ORGANIZATION_ID/sources/SOURCE_ID",
    "resourceName": "//compute.googleapis.com/projects/PROJECT_ID/global/firewalls/FIREWALL_ID",
    "state": "ACTIVE",
    "category": "OPEN_NETBIOS_PORT",
    "externalUri": "https://console.cloud.google.com/networking/firewalls/details/default-allow-rdp?project=PROJECT_ID",
    "sourceProperties": {
      "Recommendation": "Restrict the firewall rules at: https://console.cloud.google.com/networking/firewalls/details/default-allow-rdp?project=PROJECT_ID",
      "ExceptionInstructions": "Add the security mark \"allow_open_netbios_port\" to the asset with a value of \"true\" to prevent this finding from being activated again.",
      "Explanation": "Firewall rules that allow connections from all IP addresses on TCP ports 137-139 or UDP ports 137-139 may expose NetBIOS services to attackers.",
      "ScannerName": "FIREWALL_SCANNER",
      "ResourcePath": [
        "projects/PROJECT_ID/",
        "folders/FOLDER_ID_1/",
        "folders/FOLDER_ID_2/",
        "organizations/ORGANIZATION_ID/"
      ],
      "ExposedService": "NetBIOS",
      "OpenPorts": {
        "TCP": [
          137.0,
          138.0,
          139.0
        ],
        "UDP": [
          137.0,
          138.0,
          139.0
        ]
      },
      "compliance_standards": {
        "iso": [
          {
            "ids": [
              "A.13.1.1"
            ]
          }
        ],
        "pci": [
          {
            "ids": [
              "1.2.1"
            ]
          }
        ],
        "nist": [
          {
            "ids": [
              "SC-7"
            ]
          }
        ]
      },
      "ReactivationCount": 4.0
    },
    "securityMarks": {
      "name": "organizations/ORGANIZATION_ID/sources/SOURCE_ID/findings/FINDING_ID/securityMarks",
      "marks": {
        "USER_ID": "SECURITY_MARK"
      }
    },
    "eventTime": "2024-08-30T14:44:37.973090Z",
    "createTime": "2024-06-24T07:08:54.777Z",
    "propertyDataTypes": {
      "ResourcePath": {
        "listValues": {
          "propertyDataTypes": [
            {
              "primitiveDataType": "STRING"
            }
          ]
        }
      },
      "ReactivationCount": {
        "primitiveDataType": "NUMBER"
      },
      "Explanation": {
        "primitiveDataType": "STRING"
      },
      "ExposedService": {
        "primitiveDataType": "STRING"
      },
      "ScannerName": {
        "primitiveDataType": "STRING"
      }
    }
  }
}

Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais da Google SecOps.