Integrare Apache Kafka con Google SecOps

Questo documento spiega come integrare Apache Kafka con Google Security Operations (Google SecOps).

Casi d'uso

L'integrazione di Apache Kafka può gestire i seguenti casi d'uso:

  • Importazione dei log di sicurezza in tempo reale: importa ed elabora automaticamente gli eventi di sicurezza dagli argomenti Kafka in Google SecOps. Ciò consente la gestione centralizzata dei log e l'analisi in tempo reale per generare avvisi in base ai dati di streaming.

  • Automazione basata su eventi: attiva playbook automatizzati in Google SecOps in base a eventi o messaggi di sicurezza specifici trasmessi in streaming da un argomento Kafka. Ciò accelera la risposta a eventi critici come l'accesso di un utente da una posizione insolita.

  • Arricchimento dell'intelligence sulle minacce: estrai feed personalizzati di intelligence sulle minacce dagli argomenti Kafka per arricchire avvisi e casi esistenti. In questo modo, gli analisti dispongono di un contesto aggiornato sugli indicatori di compromissione (IOC) e migliora l'accuratezza dell'analisi delle minacce.

Prima di iniziare

Prima di configurare l'integrazione di Apache Kafka in Google SecOps, completa i seguenti prerequisiti:

  • Server Apache Kafka: assicurati di avere accesso a un server Apache Kafka in esecuzione con i broker e gli argomenti Kafka necessari configurati.
  • Immagine Docker dell'agente remoto: quando crei agenti remoti, devi utilizzare un'immagine basata su Debian. Utilizza l'immagine seguente per garantire la compatibilità:

    us-docker.pkg.dev/siem-ar-public/images/agent-debian:latest
    

Parametri di integrazione

L'integrazione di Apache Kafka richiede i seguenti parametri:

Parametro Descrizione
Kafka brokers

Obbligatorio.

Un elenco separato da virgole di broker Kafka a cui connettersi, nel formato hostname:port.

Use TLS for connection

Facoltativo.

Se selezionata, l'integrazione utilizza la crittografia TLS per l'autenticazione.

Questo parametro richiede un certificato dell'autorità di certificazione (CA).

Non è abilitata per impostazione predefinita.

Use SASL PLAIN with TLS for connection

Facoltativo.

Se selezionata, l'integrazione utilizza il meccanismo di autenticazione SASL PLAIN con nome utente e password.

Questa opzione è supportata solo con la crittografia TLS e richiede sia un nome utente e una password SASL sia un certificato CA.

Non è abilitata per impostazione predefinita.

CA certificate of Kafka server

Facoltativo.

Il certificato CA utilizzato per verificare l'identità del server Kafka.

Questo parametro è obbligatorio se SASL è abilitato.

Client certificate

Facoltativo.

Il certificato del client per l'autenticazione TLS reciproca (mTLS) con il server Kafka.

Questo parametro è obbligatorio se è abilitata l'autenticazione TLS reciproca (mTLS).

Client certificate key

Facoltativo.

La chiave privata corrispondente al certificato del client, utilizzata per l'autenticazione TLS reciproca (mTLS).

Questo parametro è obbligatorio se è abilitata l'autenticazione TLS reciproca (mTLS).

Client certificate key password

Facoltativo.

La password utilizzata per decriptare la chiave privata del certificato client.

Questo parametro è obbligatorio se è abilitata l'autenticazione TLS reciproca (mTLS).

SASL PLAIN Username

Facoltativo.

Il nome utente per l'autenticazione SASL PLAIN con i broker Kafka.

Questo parametro è obbligatorio se SASL è abilitato.

SASL PLAIN Password

Facoltativo.

La password per l'autenticazione SASL PLAIN con i broker Kafka.

Questo parametro è obbligatorio se SASL è abilitato.

Per istruzioni su come configurare un'integrazione in Google SecOps, consulta Configurare le integrazioni.

Se necessario, potrai apportare modifiche in un secondo momento. Dopo aver configurato un'istanza di integrazione, puoi utilizzarla nei playbook. Per saperne di più su come configurare e supportare più istanze, consulta Supporto di più istanze.

Azioni

Per ulteriori informazioni sulle azioni, vedi Rispondere alle azioni in attesa dalla tua scrivania e Eseguire un'azione manuale.

Dindin

Utilizza l'azione Ping per testare la connettività ad Apache Kafka.

Questa azione non viene eseguita sulle entità Google SecOps.

Input azione

Nessuno.

Output dell'azione

L'azione Ping fornisce i seguenti output:

Tipo di output dell'azione Disponibilità
Allegato della bacheca casi Non disponibile
Link alla bacheca casi Non disponibile
Tabella della bacheca casi Non disponibile
Tabella di arricchimento Non disponibile
Risultato JSON Non disponibile
Messaggi di output Disponibile
Risultato dello script. Disponibile
Messaggi di output

L'azione Ping può restituire i seguenti messaggi di output:

Messaggio di output Descrizione del messaggio

Successfully connected to the Apache Kafka server with the provided connection parameters!

L'azione è riuscita.
Failed to connect to the Apache Kafka server! Error is ERROR_REASON

L'azione non è riuscita.

Controlla la connessione al server, i parametri di input o le credenziali.

Risultato dello script

La seguente tabella elenca il valore dell'output del risultato dello script quando utilizzi l'azione Ping:

Nome del risultato dello script Valore
is_success True o False

Connettori

Per scoprire di più sulla configurazione dei connettori in Google SecOps, consulta Importare i dati (connettori).

Apache Kafka - Messages Connector

Utilizza il connettore Apache Kafka - Messaggi per recuperare i messaggi da Apache Kafka.

Il connettore recupera i messaggi da un argomento Kafka specificato e può elaborarli in modi diversi in base al formato del messaggio. Se un messaggio è un oggetto JSON valido, il connettore estrae campi specifici per la creazione e la mappatura degli avvisi. Se il messaggio è una stringa semplice, viene importato come dati sugli eventi non elaborati.

Il connettore gestisce la mappatura della gravità, la creazione di modelli di nomi degli avvisi e la generazione di ID univoci in base ai parametri forniti.

Mapping della gravità JSON

Per mappare la gravità dell'avviso, devi specificare il campo utilizzato dal connettore Apache Kafka - Messaggi per ottenere il valore della gravità nel parametro Severity Mapping JSON. La risposta del connettore può contenere tipi di valori, ad esempio integer, float e string.

Il connettore Apache Kafka - Messaggi legge i valori integer e float e li mappa in base alle impostazioni di Google SecOps. La tabella seguente mostra il mapping dei valori integer alla gravità in Google SecOps:

Valore Integer Gravità mappata
100 Critical
Da 80 a 100 High
Da 60 a 80 Medium
Da 40 a 60 Low
Meno di 40 Informational

Se la risposta contiene il valore string, il connettore Pub/Sub - Messaggi richiede una configurazione aggiuntiva.

Inizialmente, il valore predefinito viene visualizzato nel seguente modo:

{
    "Default": 60
}

Se i valori richiesti per la mappatura si trovano nella chiave JSON event_severity, i valori possono essere i seguenti:

  • "Malicious"
  • "Benign"
  • "Unknown"

Per analizzare i valori delle chiavi JSON event_severity e assicurarti che l'oggetto JSON abbia un formato corretto, configura il parametro Severity Mapping JSON come segue:

{
    "event_severity": {
        "Malicious": 100,
        "Unknown": 60,
        "Benign": -1
    },
    "Default": 50
}

Il valore di "Default" è obbligatorio.

Nel caso in cui ci siano più corrispondenze per lo stesso oggetto JSON, il connettore Apache Kafka - Messaggi assegna la priorità alla prima chiave dell'oggetto JSON.

Per utilizzare i campi che contengono valori integer o float, configura la chiave e una stringa vuota nel parametro Severity Mapping JSON:

{
  "Default":"60",
  "integer_field": "",
  "float_field": ""
}

Ingressi del connettore

Il connettore Apache Kafka - Messaggi richiede i seguenti parametri:

Parametro Descrizione
Product Field Name

Obbligatorio.

Il nome del campo in cui è memorizzato il nome del prodotto.

Il nome del prodotto influisce principalmente sulla mappatura. Per semplificare e migliorare il processo di mappatura per il connettore, il valore predefinito viene risolto in un valore di riserva a cui viene fatto riferimento dal codice. Per impostazione predefinita, qualsiasi input non valido per questo parametro viene risolto in un valore di riserva.

Il valore predefinito è Product Name.

Event Field Name

Obbligatorio.

Il nome del campo che determina il nome (sottotipo) dell'evento.

Il valore predefinito è event_type.

Environment Field Name

Facoltativo.

Il nome del campo in cui è memorizzato il nome dell'ambiente.

Se il campo ambiente non è presente, il connettore utilizza il valore predefinito.

Il valore predefinito è "".

Environment Regex Pattern

Facoltativo.

Un pattern di espressione regolare da eseguire sul valore trovato nel campo Environment Field Name. Questo parametro ti consente di manipolare il campo dell'ambiente utilizzando la logica delle espressioni regolari.

Utilizza il valore predefinito .* per recuperare il valore Environment Field Name grezzo richiesto.

Se il pattern dell'espressione regolare è nullo o vuoto oppure il valore dell'ambiente è nullo, il risultato finale dell'ambiente è l'ambiente predefinito.

Script Timeout (Seconds)

Obbligatorio.

Il limite di timeout, in secondi, per il processo Python che esegue lo script corrente.

Il valore predefinito è 180.

Kafka brokers

Obbligatorio.

Un elenco separato da virgole di broker Kafka a cui connettersi, nel formato hostname:port.

Use TLS for connection

Facoltativo.

Se selezionata, l'integrazione utilizza la crittografia TLS per l'autenticazione.

Questo parametro richiede un certificato CA.

Non è abilitata per impostazione predefinita.

Use SASL PLAIN with TLS for connection

Facoltativo.

Se selezionata, l'integrazione utilizza il meccanismo di autenticazione SASL PLAIN con nome utente e password.

Questa opzione richiede la fornitura di un nome utente e una password SASL. È supportato solo con la crittografia TLS, che richiede un certificato CA.

Non è abilitata per impostazione predefinita.

CA certificate of Kafka server

Facoltativo.

Il certificato CA utilizzato per verificare l'identità del server Kafka.

Client certificate

Facoltativo.

Il certificato del client per l'autenticazione TLS reciproca (mTLS) con il server Kafka.

Client certificate key

Facoltativo.

La chiave privata corrispondente al certificato del client, utilizzata per l'autenticazione TLS reciproca (mTLS).

Client certificate key password

Facoltativo.

La password utilizzata per decriptare la chiave privata del certificato client.

SASL PLAIN Username

Facoltativo.

Il nome utente per l'autenticazione SASL PLAIN con i broker Kafka.

SASL PLAIN Password

Facoltativo.

La password per l'autenticazione SASL PLAIN con i broker Kafka.

Topic

Obbligatorio.

L'argomento Kafka da cui vengono recuperati gli incidenti.

Consumer Group ID

Facoltativo.

L'identificatore del gruppo di consumatori utilizzato per recuperare gli incidenti.

Se non viene fornito alcun valore, viene generato un ID univoco.

Partitions

Facoltativo.

Un elenco CSV di partizioni da cui recuperare i messaggi.

Initial Offset

Facoltativo.

Il punto in cui il connettore inizia a recuperare i messaggi da una partizione Kafka.

Puoi specificare un numero intero positivo per iniziare da un offset specifico oppure utilizzare i valori earliest o latest per iniziare il recupero dall'inizio o dalla fine della partizione.

Poll Timeout

Facoltativo.

Timeout di polling per consumare un messaggio da Kafka, in secondi.

Case Name Template

Facoltativo.

Un modello per definire un nome personalizzato per la richiesta. Il connettore aggiunge una chiave custom_case_name all'evento.

Puoi utilizzare i segnaposto nel formato FIELD_NAME, che vengono compilati con i valori stringa del primo evento.

Esempio: Phishing - EVENT_MAILBOX.

Alert Name Template

Obbligatorio.

Un modello per definire il nome dell'avviso.

Puoi utilizzare i segnaposto nel formato FIELD_NAME, che vengono compilati con i valori stringa del primo evento.

Esempio: Phishing - EVENT_MAILBOX.

Se non viene fornito un valore o il modello non è valido, il connettore utilizza un nome di avviso predefinito.

Rule Generator Template

Obbligatorio.

Un modello per definire il generatore di regole.

Puoi utilizzare i segnaposto nel formato FIELD_NAME, che vengono compilati con i valori stringa del primo evento.

Esempio: Phishing - EVENT_MAILBOX.

Se non viene fornito un valore o il modello non è valido, il connettore utilizza un nome predefinito per il generatore di regole.

Timestamp Field

Obbligatorio.

Il nome del campo nel messaggio Kafka che contiene il timestamp dell'avviso Google SecOps.

Se il timestamp non è nel formato Unix epoch, il suo formato deve essere definito nel parametro Timestamp Format.

Timestamp Format

Facoltativo.

Il formato del timestamp del messaggio, richiesto per i timestamp non Unix epoch. Utilizza i codici di formato strftime standard di Python.

Se il timestamp non è nel formato epoca Unix e questo parametro non è configurato, il connettore non funziona.

Severity Mapping JSON

Obbligatorio.

L'oggetto JSON utilizzato dal connettore per estrarre e mappare il livello di gravità dal messaggio alla scala di priorità di Google SecOps.

Il valore predefinito è {"Default": "60"}.

Unique ID Field

Facoltativo.

Il nome del campo da utilizzare come identificatore univoco del messaggio.

Se non viene fornito alcun valore, il connettore genera e utilizza un hash SHA-256 del contenuto del messaggio come identificatore del messaggio.

Max Messages To Fetch

Obbligatorio.

Il numero massimo di messaggi elaborati dal connettore per ogni iterazione.

Il valore predefinito è 100.

Disable Overflow

Facoltativo.

Se selezionato, il connettore ignora il meccanismo di overflow di Google SecOps.

Abilitato per impostazione predefinita.

Verify SSL

Obbligatorio.

Se selezionata, l'integrazione convalida il certificato SSL quando si connette al server Apache Kafka.

Abilitato per impostazione predefinita.

Proxy Server Address

Facoltativo.

L'indirizzo del server proxy da utilizzare.

Proxy Username

Facoltativo.

Il nome utente del proxy per l'autenticazione.

Proxy Password

Facoltativo.

La password del proxy per l'autenticazione.

Regole del connettore

Il connettore supporta i proxy.

Avvisi sui connettori

La tabella seguente descrive la mappatura dei campi dei messaggi Apache Kafka ai campi degli avvisi di Google SecOps:

Campo di avviso Siemplify Campo del messaggio Apache Kafka
SourceSystemName Compilato dal framework.
TicketId Il valore del campo ID univoco o un hash SHA-256 del messaggio.
DisplayId ApacheKafka_{unique id or hash}_{connector identifier}
Name Il valore generato da Alert Name Template.
Reason N/D
Description N/D
DeviceVendor Hardcoded: Apache Kafka
DeviceProduct Valore di riserva: Message
Priority Mappato dal parametro Severity Mapping JSON.
RuleGenerator Il valore generato da Rule Generator Template.
SourceGroupingIdentifier N/D
StartTime Convertito da Timestamp Field.
EndTime Convertito da Timestamp Field.
Siemplify Alert - Extensions N/D
Siemplify Alert - Attachments N/D

Eventi connettore

Di seguito è riportato un esempio di evento connettore:

{
  "notificationConfigName": "organizations/ORGANIZATION_ID/notificationConfigs/soar_connector_CONNECTOR_ID_toxic_notifications_config",
  "finding": {
    "name": "organizations/ORGANIZATION_ID/sources/SOURCE_ID/findings/FINDING_ID",
    "parent": "organizations/ORGANIZATION_ID/sources/SOURCE_ID",
    "resourceName": "//compute.googleapis.com/projects/PROJECT_ID/global/firewalls/FIREWALL_ID",
    "state": "ACTIVE",
    "category": "OPEN_NETBIOS_PORT",
    "externalUri": "https://console.cloud.google.com/networking/firewalls/details/default-allow-rdp?project=PROJECT_ID",
    "sourceProperties": {
      "Recommendation": "Restrict the firewall rules at: https://console.cloud.google.com/networking/firewalls/details/default-allow-rdp?project=PROJECT_ID",
      "ExceptionInstructions": "Add the security mark \"allow_open_netbios_port\" to the asset with a value of \"true\" to prevent this finding from being activated again.",
      "Explanation": "Firewall rules that allow connections from all IP addresses on TCP ports 137-139 or UDP ports 137-139 may expose NetBIOS services to attackers.",
      "ScannerName": "FIREWALL_SCANNER",
      "ResourcePath": [
        "projects/PROJECT_ID/",
        "folders/FOLDER_ID_1/",
        "folders/FOLDER_ID_2/",
        "organizations/ORGANIZATION_ID/"
      ],
      "ExposedService": "NetBIOS",
      "OpenPorts": {
        "TCP": [
          137.0,
          138.0,
          139.0
        ],
        "UDP": [
          137.0,
          138.0,
          139.0
        ]
      },
      "compliance_standards": {
        "iso": [
          {
            "ids": [
              "A.13.1.1"
            ]
          }
        ],
        "pci": [
          {
            "ids": [
              "1.2.1"
            ]
          }
        ],
        "nist": [
          {
            "ids": [
              "SC-7"
            ]
          }
        ]
      },
      "ReactivationCount": 4.0
    },
    "securityMarks": {
      "name": "organizations/ORGANIZATION_ID/sources/SOURCE_ID/findings/FINDING_ID/securityMarks",
      "marks": {
        "USER_ID": "SECURITY_MARK"
      }
    },
    "eventTime": "2024-08-30T14:44:37.973090Z",
    "createTime": "2024-06-24T07:08:54.777Z",
    "propertyDataTypes": {
      "ResourcePath": {
        "listValues": {
          "propertyDataTypes": [
            {
              "primitiveDataType": "STRING"
            }
          ]
        }
      },
      "ReactivationCount": {
        "primitiveDataType": "NUMBER"
      },
      "Explanation": {
        "primitiveDataType": "STRING"
      },
      "ExposedService": {
        "primitiveDataType": "STRING"
      },
      "ScannerName": {
        "primitiveDataType": "STRING"
      }
    }
  }
}

Hai bisogno di ulteriore assistenza? Ricevi risposte dai membri della community e dai professionisti di Google SecOps.