Integrare Apache Kafka con Google SecOps
Questo documento spiega come integrare Apache Kafka con Google Security Operations (Google SecOps).
Casi d'uso
L'integrazione di Apache Kafka può gestire i seguenti casi d'uso:
Importazione dei log di sicurezza in tempo reale: importa ed elabora automaticamente gli eventi di sicurezza dagli argomenti Kafka in Google SecOps. Ciò consente la gestione centralizzata dei log e l'analisi in tempo reale per generare avvisi in base ai dati di streaming.
Automazione basata su eventi: attiva playbook automatizzati in Google SecOps in base a eventi o messaggi di sicurezza specifici trasmessi in streaming da un argomento Kafka. Ciò accelera la risposta a eventi critici come l'accesso di un utente da una posizione insolita.
Arricchimento dell'intelligence sulle minacce: estrai feed personalizzati di intelligence sulle minacce dagli argomenti Kafka per arricchire avvisi e casi esistenti. In questo modo, gli analisti dispongono di un contesto aggiornato sugli indicatori di compromissione (IOC) e migliora l'accuratezza dell'analisi delle minacce.
Prima di iniziare
Prima di configurare l'integrazione di Apache Kafka in Google SecOps, completa i seguenti prerequisiti:
- Server Apache Kafka: assicurati di avere accesso a un server Apache Kafka in esecuzione con i broker e gli argomenti Kafka necessari configurati.
Immagine Docker dell'agente remoto: quando crei agenti remoti, devi utilizzare un'immagine basata su Debian. Utilizza l'immagine seguente per garantire la compatibilità:
us-docker.pkg.dev/siem-ar-public/images/agent-debian:latest
Parametri di integrazione
L'integrazione di Apache Kafka richiede i seguenti parametri:
| Parametro | Descrizione |
|---|---|
Kafka brokers |
Obbligatorio. Un elenco separato da virgole di broker Kafka a cui connettersi, nel formato
|
Use TLS for connection |
Facoltativo. Se selezionata, l'integrazione utilizza la crittografia TLS per l'autenticazione. Questo parametro richiede un certificato dell'autorità di certificazione (CA). Non è abilitata per impostazione predefinita. |
Use SASL PLAIN with TLS for connection |
Facoltativo. Se selezionata, l'integrazione utilizza il meccanismo di autenticazione SASL PLAIN con nome utente e password. Questa opzione è supportata solo con la crittografia TLS e richiede sia un nome utente e una password SASL sia un certificato CA. Non è abilitata per impostazione predefinita. |
CA certificate of Kafka server |
Facoltativo. Il certificato CA utilizzato per verificare l'identità del server Kafka. Questo parametro è obbligatorio se SASL è abilitato. |
Client certificate |
Facoltativo. Il certificato del client per l'autenticazione TLS reciproca (mTLS) con il server Kafka. Questo parametro è obbligatorio se è abilitata l'autenticazione TLS reciproca (mTLS). |
Client certificate key |
Facoltativo. La chiave privata corrispondente al certificato del client, utilizzata per l'autenticazione TLS reciproca (mTLS). Questo parametro è obbligatorio se è abilitata l'autenticazione TLS reciproca (mTLS). |
Client certificate key password |
Facoltativo. La password utilizzata per decriptare la chiave privata del certificato client. Questo parametro è obbligatorio se è abilitata l'autenticazione TLS reciproca (mTLS). |
SASL PLAIN Username |
Facoltativo. Il nome utente per l'autenticazione SASL PLAIN con i broker Kafka. Questo parametro è obbligatorio se SASL è abilitato. |
SASL PLAIN Password |
Facoltativo. La password per l'autenticazione SASL PLAIN con i broker Kafka. Questo parametro è obbligatorio se SASL è abilitato. |
Per istruzioni su come configurare un'integrazione in Google SecOps, consulta Configurare le integrazioni.
Se necessario, potrai apportare modifiche in un secondo momento. Dopo aver configurato un'istanza di integrazione, puoi utilizzarla nei playbook. Per saperne di più su come configurare e supportare più istanze, consulta Supporto di più istanze.
Azioni
Per ulteriori informazioni sulle azioni, vedi Rispondere alle azioni in attesa dalla tua scrivania e Eseguire un'azione manuale.
Dindin
Utilizza l'azione Ping per testare la connettività ad Apache Kafka.
Questa azione non viene eseguita sulle entità Google SecOps.
Input azione
Nessuno.
Output dell'azione
L'azione Ping fornisce i seguenti output:
| Tipo di output dell'azione | Disponibilità |
|---|---|
| Allegato della bacheca casi | Non disponibile |
| Link alla bacheca casi | Non disponibile |
| Tabella della bacheca casi | Non disponibile |
| Tabella di arricchimento | Non disponibile |
| Risultato JSON | Non disponibile |
| Messaggi di output | Disponibile |
| Risultato dello script. | Disponibile |
Messaggi di output
L'azione Ping può restituire i seguenti messaggi di output:
| Messaggio di output | Descrizione del messaggio |
|---|---|
|
L'azione è riuscita. |
Failed to connect to the Apache Kafka server!
Error is ERROR_REASON |
L'azione non è riuscita. Controlla la connessione al server, i parametri di input o le credenziali. |
Risultato dello script
La seguente tabella elenca il valore dell'output del risultato dello script quando utilizzi l'azione Ping:
| Nome del risultato dello script | Valore |
|---|---|
is_success |
True o False |
Connettori
Per scoprire di più sulla configurazione dei connettori in Google SecOps, consulta Importare i dati (connettori).
Apache Kafka - Messages Connector
Utilizza il connettore Apache Kafka - Messaggi per recuperare i messaggi da Apache Kafka.
Il connettore recupera i messaggi da un argomento Kafka specificato e può elaborarli in modi diversi in base al formato del messaggio. Se un messaggio è un oggetto JSON valido, il connettore estrae campi specifici per la creazione e la mappatura degli avvisi. Se il messaggio è una stringa semplice, viene importato come dati sugli eventi non elaborati.
Il connettore gestisce la mappatura della gravità, la creazione di modelli di nomi degli avvisi e la generazione di ID univoci in base ai parametri forniti.
Mapping della gravità JSON
Per mappare la gravità dell'avviso, devi specificare il campo utilizzato dal
connettore Apache Kafka - Messaggi per
ottenere il valore della gravità nel parametro Severity Mapping JSON. La risposta del connettore può contenere tipi di valori, ad esempio integer, float e string.
Il connettore Apache Kafka - Messaggi legge i valori integer e float
e li mappa in base alle impostazioni di Google SecOps. La
tabella seguente mostra il mapping dei valori integer alla gravità in
Google SecOps:
| Valore Integer | Gravità mappata |
|---|---|
100 |
Critical |
Da 80 a 100 |
High |
Da 60 a 80 |
Medium |
Da 40 a 60 |
Low |
Meno di 40 |
Informational |
Se la risposta contiene il valore string, il connettore Pub/Sub - Messaggi richiede una configurazione aggiuntiva.
Inizialmente, il valore predefinito viene visualizzato nel seguente modo:
{
"Default": 60
}
Se i valori richiesti per la mappatura si trovano nella chiave JSON event_severity, i valori possono essere i seguenti:
"Malicious""Benign""Unknown"
Per analizzare i valori delle chiavi JSON event_severity e assicurarti che l'oggetto JSON
abbia un formato corretto, configura il parametro Severity Mapping JSON come segue:
{
"event_severity": {
"Malicious": 100,
"Unknown": 60,
"Benign": -1
},
"Default": 50
}
Il valore di "Default" è obbligatorio.
Nel caso in cui ci siano più corrispondenze per lo stesso oggetto JSON, il connettore Apache Kafka - Messaggi assegna la priorità alla prima chiave dell'oggetto JSON.
Per utilizzare i campi che contengono valori integer o float, configura la chiave
e una stringa vuota nel parametro Severity Mapping JSON:
{
"Default":"60",
"integer_field": "",
"float_field": ""
}
Ingressi del connettore
Il connettore Apache Kafka - Messaggi richiede i seguenti parametri:
| Parametro | Descrizione |
|---|---|
Product Field Name |
Obbligatorio. Il nome del campo in cui è memorizzato il nome del prodotto. Il nome del prodotto influisce principalmente sulla mappatura. Per semplificare e migliorare il processo di mappatura per il connettore, il valore predefinito viene risolto in un valore di riserva a cui viene fatto riferimento dal codice. Per impostazione predefinita, qualsiasi input non valido per questo parametro viene risolto in un valore di riserva. Il valore predefinito è |
Event Field Name |
Obbligatorio. Il nome del campo che determina il nome (sottotipo) dell'evento. Il valore predefinito è |
Environment Field Name |
Facoltativo. Il nome del campo in cui è memorizzato il nome dell'ambiente. Se il campo ambiente non è presente, il connettore utilizza il valore predefinito. Il valore predefinito è |
Environment Regex Pattern |
Facoltativo. Un pattern di espressione regolare da eseguire sul valore trovato nel campo
Utilizza il valore predefinito Se il pattern dell'espressione regolare è nullo o vuoto oppure il valore dell'ambiente è nullo, il risultato finale dell'ambiente è l'ambiente predefinito. |
Script Timeout (Seconds) |
Obbligatorio. Il limite di timeout, in secondi, per il processo Python che esegue lo script corrente. Il valore predefinito è |
Kafka brokers |
Obbligatorio. Un elenco separato da virgole di broker Kafka a cui connettersi, nel formato
|
Use TLS for connection |
Facoltativo. Se selezionata, l'integrazione utilizza la crittografia TLS per l'autenticazione. Questo parametro richiede un certificato CA. Non è abilitata per impostazione predefinita. |
Use SASL PLAIN with TLS for connection |
Facoltativo. Se selezionata, l'integrazione utilizza il meccanismo di autenticazione SASL PLAIN con nome utente e password. Questa opzione richiede la fornitura di un nome utente e una password SASL. È supportato solo con la crittografia TLS, che richiede un certificato CA. Non è abilitata per impostazione predefinita. |
CA certificate of Kafka server |
Facoltativo. Il certificato CA utilizzato per verificare l'identità del server Kafka. |
Client certificate |
Facoltativo. Il certificato del client per l'autenticazione TLS reciproca (mTLS) con il server Kafka. |
Client certificate key |
Facoltativo. La chiave privata corrispondente al certificato del client, utilizzata per l'autenticazione TLS reciproca (mTLS). |
Client certificate key password |
Facoltativo. La password utilizzata per decriptare la chiave privata del certificato client. |
SASL PLAIN Username |
Facoltativo. Il nome utente per l'autenticazione SASL PLAIN con i broker Kafka. |
SASL PLAIN Password |
Facoltativo. La password per l'autenticazione SASL PLAIN con i broker Kafka. |
Topic |
Obbligatorio. L'argomento Kafka da cui vengono recuperati gli incidenti. |
Consumer Group ID |
Facoltativo. L'identificatore del gruppo di consumatori utilizzato per recuperare gli incidenti. Se non viene fornito alcun valore, viene generato un ID univoco. |
Partitions |
Facoltativo. Un elenco CSV di partizioni da cui recuperare i messaggi. |
Initial Offset |
Facoltativo. Il punto in cui il connettore inizia a recuperare i messaggi da una partizione Kafka. Puoi specificare un numero intero positivo per iniziare da un offset specifico oppure utilizzare
i valori |
Poll Timeout |
Facoltativo. Timeout di polling per consumare un messaggio da Kafka, in secondi. |
Case Name Template |
Facoltativo. Un modello per definire un nome personalizzato per la richiesta. Il connettore aggiunge una
chiave Puoi utilizzare i segnaposto nel formato FIELD_NAME, che vengono compilati con i valori stringa del primo evento. Esempio: |
Alert Name Template |
Obbligatorio. Un modello per definire il nome dell'avviso. Puoi utilizzare i segnaposto nel formato FIELD_NAME, che vengono compilati con i valori stringa del primo evento. Esempio: Se non viene fornito un valore o il modello non è valido, il connettore utilizza un nome di avviso predefinito. |
Rule Generator Template |
Obbligatorio. Un modello per definire il generatore di regole. Puoi utilizzare i segnaposto nel formato FIELD_NAME, che vengono compilati con i valori stringa del primo evento. Esempio: Se non viene fornito un valore o il modello non è valido, il connettore utilizza un nome predefinito per il generatore di regole. |
Timestamp Field |
Obbligatorio. Il nome del campo nel messaggio Kafka che contiene il timestamp dell'avviso Google SecOps. Se il timestamp non è nel formato Unix epoch, il suo formato deve essere definito
nel parametro |
Timestamp Format |
Facoltativo. Il formato del timestamp del messaggio, richiesto per i timestamp non Unix epoch. Utilizza i codici di formato Se il timestamp non è nel formato epoca Unix e questo parametro non è configurato, il connettore non funziona. |
Severity Mapping JSON |
Obbligatorio. L'oggetto JSON utilizzato dal connettore per estrarre e mappare il livello di gravità dal messaggio alla scala di priorità di Google SecOps. Il valore predefinito è |
Unique ID Field |
Facoltativo. Il nome del campo da utilizzare come identificatore univoco del messaggio. Se non viene fornito alcun valore, il connettore genera e utilizza un hash SHA-256 del contenuto del messaggio come identificatore del messaggio. |
Max Messages To Fetch |
Obbligatorio. Il numero massimo di messaggi elaborati dal connettore per ogni iterazione. Il valore predefinito è |
Disable Overflow |
Facoltativo. Se selezionato, il connettore ignora il meccanismo di overflow di Google SecOps. Abilitato per impostazione predefinita. |
Verify SSL |
Obbligatorio. Se selezionata, l'integrazione convalida il certificato SSL quando si connette al server Apache Kafka. Abilitato per impostazione predefinita. |
Proxy Server Address |
Facoltativo. L'indirizzo del server proxy da utilizzare. |
Proxy Username |
Facoltativo. Il nome utente del proxy per l'autenticazione. |
Proxy Password |
Facoltativo. La password del proxy per l'autenticazione. |
Regole del connettore
Il connettore supporta i proxy.
Avvisi sui connettori
La tabella seguente descrive la mappatura dei campi dei messaggi Apache Kafka ai campi degli avvisi di Google SecOps:
| Campo di avviso Siemplify | Campo del messaggio Apache Kafka |
|---|---|
SourceSystemName |
Compilato dal framework. |
TicketId |
Il valore del campo ID univoco o un hash SHA-256 del messaggio. |
DisplayId |
ApacheKafka_{unique id or hash}_{connector identifier} |
Name |
Il valore generato da Alert Name Template. |
Reason |
N/D |
Description |
N/D |
DeviceVendor |
Hardcoded: Apache Kafka |
DeviceProduct |
Valore di riserva: Message |
Priority |
Mappato dal parametro Severity Mapping JSON. |
RuleGenerator |
Il valore generato da Rule Generator Template. |
SourceGroupingIdentifier |
N/D |
StartTime |
Convertito da Timestamp Field. |
EndTime |
Convertito da Timestamp Field. |
Siemplify Alert - Extensions |
N/D |
Siemplify Alert - Attachments |
N/D |
Eventi connettore
Di seguito è riportato un esempio di evento connettore:
{
"notificationConfigName": "organizations/ORGANIZATION_ID/notificationConfigs/soar_connector_CONNECTOR_ID_toxic_notifications_config",
"finding": {
"name": "organizations/ORGANIZATION_ID/sources/SOURCE_ID/findings/FINDING_ID",
"parent": "organizations/ORGANIZATION_ID/sources/SOURCE_ID",
"resourceName": "//compute.googleapis.com/projects/PROJECT_ID/global/firewalls/FIREWALL_ID",
"state": "ACTIVE",
"category": "OPEN_NETBIOS_PORT",
"externalUri": "https://console.cloud.google.com/networking/firewalls/details/default-allow-rdp?project=PROJECT_ID",
"sourceProperties": {
"Recommendation": "Restrict the firewall rules at: https://console.cloud.google.com/networking/firewalls/details/default-allow-rdp?project=PROJECT_ID",
"ExceptionInstructions": "Add the security mark \"allow_open_netbios_port\" to the asset with a value of \"true\" to prevent this finding from being activated again.",
"Explanation": "Firewall rules that allow connections from all IP addresses on TCP ports 137-139 or UDP ports 137-139 may expose NetBIOS services to attackers.",
"ScannerName": "FIREWALL_SCANNER",
"ResourcePath": [
"projects/PROJECT_ID/",
"folders/FOLDER_ID_1/",
"folders/FOLDER_ID_2/",
"organizations/ORGANIZATION_ID/"
],
"ExposedService": "NetBIOS",
"OpenPorts": {
"TCP": [
137.0,
138.0,
139.0
],
"UDP": [
137.0,
138.0,
139.0
]
},
"compliance_standards": {
"iso": [
{
"ids": [
"A.13.1.1"
]
}
],
"pci": [
{
"ids": [
"1.2.1"
]
}
],
"nist": [
{
"ids": [
"SC-7"
]
}
]
},
"ReactivationCount": 4.0
},
"securityMarks": {
"name": "organizations/ORGANIZATION_ID/sources/SOURCE_ID/findings/FINDING_ID/securityMarks",
"marks": {
"USER_ID": "SECURITY_MARK"
}
},
"eventTime": "2024-08-30T14:44:37.973090Z",
"createTime": "2024-06-24T07:08:54.777Z",
"propertyDataTypes": {
"ResourcePath": {
"listValues": {
"propertyDataTypes": [
{
"primitiveDataType": "STRING"
}
]
}
},
"ReactivationCount": {
"primitiveDataType": "NUMBER"
},
"Explanation": {
"primitiveDataType": "STRING"
},
"ExposedService": {
"primitiveDataType": "STRING"
},
"ScannerName": {
"primitiveDataType": "STRING"
}
}
}
}
Hai bisogno di ulteriore assistenza? Ricevi risposte dai membri della community e dai professionisti di Google SecOps.