Intégrer Apache Kafka à Google SecOps
Version de l'intégration : 1.0
Ce document explique comment intégrer Apache Kafka à Google Security Operations (Google SecOps).
Cas d'utilisation
L'intégration Apache Kafka peut répondre aux cas d'utilisation suivants :
Ingestion des journaux de sécurité en temps réel : ingérez et traitez automatiquement les événements de sécurité des sujets Kafka dans Google SecOps. Cela permet une gestion centralisée des journaux et une analyse en temps réel pour générer des alertes basées sur les données de flux.
Automatisation basée sur des événements : déclenchez des playbooks automatisés dans Google SecOps en fonction d'événements ou de messages de sécurité spécifiques diffusés à partir d'un thème Kafka. Cela accélère la réponse aux événements critiques, comme la connexion d'un utilisateur depuis un lieu inhabituel.
Enrichissement des renseignements sur les menaces : extrayez des flux de renseignements personnalisés sur les menaces à partir de sujets Kafka pour enrichir les alertes et les cas existants. Cela fournit aux analystes un contexte à jour sur les indicateurs de compromission (IOC, Indicators of Compromise) et améliore la précision de l'analyse des menaces.
Avant de commencer
Avant de configurer l'intégration Apache Kafka dans Google SecOps, remplissez les conditions préalables suivantes :
- Serveur Apache Kafka : assurez-vous d'avoir accès à un serveur Apache Kafka en cours d'exécution avec les courtiers et les thèmes Kafka nécessaires configurés.
Image Docker de l'agent à distance : lorsque vous créez des agents à distance, vous devez utiliser une image basée sur Debian. Utilisez l'image suivante pour vérifier la compatibilité :
us-docker.pkg.dev/siem-ar-public/images/agent-debian:latest
Paramètres d'intégration
L'intégration Apache Kafka nécessite les paramètres suivants :
| Paramètre | Description |
|---|---|
Kafka brokers |
Obligatoire. Liste de courtiers Kafka auxquels se connecter, séparés par une virgule, au format |
Use TLS for connection |
Facultatif. Si cette option est sélectionnée, l'intégration utilise le chiffrement TLS pour l'authentification. Ce paramètre nécessite un certificat d'autorité de certification (CA). Cette option n'est pas activée par défaut. |
Use SASL PLAIN with TLS for connection |
Facultatif. Si cette option est sélectionnée, l'intégration utilise le mécanisme d'authentification SASL/PLAIN avec nom d'utilisateur et mot de passe. Cette option n'est compatible qu'avec le chiffrement TLS. Elle nécessite un nom d'utilisateur et un mot de passe SASL, ainsi qu'un certificat d'autorité de certification. Cette option n'est pas activée par défaut. |
CA certificate of Kafka server |
Facultatif. Certificat de l'autorité de certification utilisé pour vérifier l'identité du serveur Kafka. Ce paramètre est obligatoire si SASL est activé. |
Client certificate |
Facultatif. Certificat du client pour l'authentification TLS mutuelle avec le serveur Kafka. Ce paramètre est obligatoire si l'authentification TLS mutuelle (mTLS) est activée. |
Client certificate key |
Facultatif. Clé privée correspondant au certificat du client, utilisée pour l'authentification TLS mutuelle. Ce paramètre est obligatoire si l'authentification TLS mutuelle (mTLS) est activée. |
Client certificate key password |
Facultatif. Mot de passe utilisé pour déchiffrer la clé privée du certificat client. Ce paramètre est obligatoire si l'authentification TLS mutuelle (mTLS) est activée. |
SASL PLAIN Username |
Facultatif. Nom d'utilisateur pour l'authentification SASL/PLAIN avec les courtiers Kafka. Ce paramètre est obligatoire si SASL est activé. |
SASL PLAIN Password |
Facultatif. Mot de passe pour l'authentification SASL/PLAIN avec les courtiers Kafka. Ce paramètre est obligatoire si SASL est activé. |
Pour obtenir des instructions sur la configuration d'une intégration dans Google SecOps, consultez Configurer des intégrations.
Vous pourrez apporter des modifications ultérieurement, si nécessaire. Une fois que vous avez configuré une instance d'intégration, vous pouvez l'utiliser dans des playbooks. Pour savoir comment configurer et prendre en charge plusieurs instances, consultez Prise en charge de plusieurs instances.
Actions
Pour en savoir plus sur les actions, consultez Répondre aux actions en attente dans Votre bureau et Effectuer une action manuelle.
Ping
Utilisez l'action Ping pour tester la connectivité à Apache Kafka.
Cette action ne s'applique pas aux entités Google SecOps.
Entrées d'action
Aucune.
Sorties d'action
L'action Ping fournit les résultats suivants :
| Type de sortie de l'action | Disponibilité |
|---|---|
| Pièce jointe au mur des cas | Non disponible |
| Lien vers le mur des cas | Non disponible |
| Tableau du mur des cas | Non disponible |
| Table d'enrichissement | Non disponible |
| Résultat JSON | Non disponible |
| Messages de sortie | Disponible |
| Résultat du script. | Disponible |
Messages de sortie
L'action Ping peut renvoyer les messages de résultat suivants :
| Message de sortie | Description du message |
|---|---|
|
L'action a réussi. |
Failed to connect to the Apache Kafka server!
Error is ERROR_REASON |
Échec de l'action. Vérifiez la connexion au serveur, les paramètres d'entrée ou les identifiants. |
Résultat du script
Le tableau suivant répertorie la valeur du résultat du script lorsque vous utilisez l'action Ping :
| Nom du résultat du script | Valeur |
|---|---|
is_success |
True ou False |
Connecteurs
Pour en savoir plus sur la configuration des connecteurs dans Google SecOps, consultez Ingérer vos données (connecteurs).
Connecteur Apache Kafka – Messages
Utilisez le connecteur Apache Kafka – Messages pour récupérer les messages depuis Apache Kafka.
Le connecteur récupère les messages d'un sujet Kafka spécifié et peut les traiter de différentes manières en fonction du format du message. Si un message est un objet JSON valide, le connecteur extrait des champs spécifiques pour la création et le mappage des alertes. Si le message est une chaîne simple, il est ingéré en tant que données d'événement brutes.
Le connecteur gère le mappage de la gravité, la création de modèles de noms d'alertes et la génération d'ID uniques en fonction des paramètres que vous fournissez.
Mappage de la gravité JSON
Pour mapper la gravité de l'alerte, vous devez spécifier le champ que le connecteur Apache Kafka – Messages utilise pour obtenir la valeur de gravité dans le paramètre Severity Mapping JSON. La réponse du connecteur peut contenir des types de valeurs, tels que integer, float et string.
Le connecteur Apache Kafka – Messages lit les valeurs integer et float et les mappe en fonction des paramètres Google SecOps. Le tableau suivant montre le mappage des valeurs integer à la gravité dans Google SecOps :
| Valeur entière | Gravité mappée |
|---|---|
100 |
Critical |
Du 80 au 100 |
High |
Du 60 au 80 |
Medium |
Du 40 au 60 |
Low |
Moins de 40 |
Informational |
Si la réponse contient la valeur string, le connecteur Pub/Sub – Messages nécessite une configuration supplémentaire.
Au départ, la valeur par défaut s'affiche comme suit :
{
"Default": 60
}
Si les valeurs requises pour le mappage se trouvent dans la clé JSON event_severity, elles peuvent être les suivantes :
"Malicious""Benign""Unknown"
Pour analyser les valeurs de clé JSON event_severity et vous assurer que l'objet JSON est au bon format, configurez le paramètre Severity Mapping JSON comme suit :
{
"event_severity": {
"Malicious": 100,
"Unknown": 60,
"Benign": -1
},
"Default": 50
}
La valeur "Default" est obligatoire.
Dans le cas où plusieurs correspondances sont trouvées pour le même objet JSON, le connecteur Apache Kafka – Messages donne la priorité à la première clé d'objet JSON.
Pour utiliser des champs contenant des valeurs integer ou float, configurez la clé et une chaîne vide dans le paramètre Severity Mapping JSON :
{
"Default":"60",
"integer_field": "",
"float_field": ""
}
Entrées du connecteur
Le connecteur Apache Kafka – Messages nécessite les paramètres suivants :
| Paramètre | Description |
|---|---|
Product Field Name |
Obligatoire. Nom du champ dans lequel le nom du produit est stocké. Le nom du produit a un impact majeur sur le mappage. Pour simplifier et améliorer le processus de mappage du connecteur, la valeur par défaut est résolue en une valeur de remplacement référencée dans le code. Toute entrée non valide pour ce paramètre est résolue par défaut sur une valeur de remplacement. La valeur par défaut est |
Event Field Name |
Obligatoire. Nom du champ qui détermine le nom (sous-type) de l'événement. La valeur par défaut est |
Environment Field Name |
Facultatif. Nom du champ dans lequel le nom de l'environnement est stocké. Si le champ "environment" est manquant, le connecteur utilise la valeur par défaut. La valeur par défaut est |
Environment Regex Pattern |
Facultatif. Modèle d'expression régulière à exécuter sur la valeur trouvée dans le champ Utilisez la valeur par défaut Si le modèle d'expression régulière est nul ou vide, ou si la valeur de l'environnement est nulle, l'environnement par défaut est sélectionné comme résultat final. |
Script Timeout (Seconds) |
Obligatoire. Délai avant expiration, en secondes, du processus Python qui exécute le script actuel. La valeur par défaut est |
Kafka brokers |
Obligatoire. Liste de courtiers Kafka auxquels se connecter, séparés par une virgule, au format |
Use TLS for connection |
Facultatif. Si cette option est sélectionnée, l'intégration utilise le chiffrement TLS pour l'authentification. Ce paramètre nécessite un certificat d'autorité de certification. Cette option n'est pas activée par défaut. |
Use SASL PLAIN with TLS for connection |
Facultatif. Si cette option est sélectionnée, l'intégration utilise le mécanisme d'authentification SASL/PLAIN avec nom d'utilisateur et mot de passe. Cette option nécessite de fournir un nom d'utilisateur et un mot de passe SASL. Il n'est compatible qu'avec le chiffrement TLS, qui nécessite un certificat d'autorité de certification. Cette option n'est pas activée par défaut. |
CA certificate of Kafka server |
Facultatif. Certificat de l'autorité de certification utilisé pour vérifier l'identité du serveur Kafka. |
Client certificate |
Facultatif. Certificat du client pour l'authentification TLS mutuelle avec le serveur Kafka. |
Client certificate key |
Facultatif. Clé privée correspondant au certificat du client, utilisée pour l'authentification TLS mutuelle. |
Client certificate key password |
Facultatif. Mot de passe utilisé pour déchiffrer la clé privée du certificat client. |
SASL PLAIN Username |
Facultatif. Nom d'utilisateur pour l'authentification SASL/PLAIN avec les courtiers Kafka. |
SASL PLAIN Password |
Facultatif. Mot de passe pour l'authentification SASL/PLAIN avec les courtiers Kafka. |
Topic |
Obligatoire. Sujet Kafka à partir duquel les incidents sont récupérés. |
Consumer Group ID |
Facultatif. Identifiant du groupe de consommateurs utilisé lors de la récupération des incidents. Si aucune valeur n'est fournie, un ID unique est généré. |
Partitions |
Facultatif. Liste CSV des partitions à partir desquelles récupérer les messages. |
Initial Offset |
Facultatif. Point de départ de l'extraction des messages d'une partition Kafka par le connecteur. Vous pouvez spécifier un entier positif pour commencer à un décalage particulier, ou utiliser les valeurs |
Poll Timeout |
Facultatif. Délai avant expiration de l'interrogation pour consommer un message de Kafka, en secondes. |
Case Name Template |
Facultatif. Modèle permettant de définir un nom de demande personnalisé. Le connecteur ajoute une clé Vous pouvez utiliser des espaces réservés au format FIELD_NAME, qui sont renseignés à partir des valeurs de chaîne du premier événement. Exemple : |
Alert Name Template |
Obligatoire. Modèle permettant de définir le nom de l'alerte. Vous pouvez utiliser des espaces réservés au format FIELD_NAME, qui sont renseignés à partir des valeurs de chaîne du premier événement. Exemple : Si aucune valeur n'est fournie ou si le modèle n'est pas valide, le connecteur utilise un nom d'alerte par défaut. |
Rule Generator Template |
Obligatoire. Modèle permettant de définir le générateur de règles. Vous pouvez utiliser des espaces réservés au format FIELD_NAME, qui sont renseignés à partir des valeurs de chaîne du premier événement. Exemple : Si aucune valeur n'est fournie ou si le modèle n'est pas valide, le connecteur utilise un nom de générateur de règles par défaut. |
Timestamp Field |
Obligatoire. Nom du champ du message Kafka contenant le code temporel de l'alerte Google SecOps. Si l'horodatage n'est pas au format epoch Unix, son format doit être défini dans le paramètre |
Timestamp Format |
Facultatif. Format du code temporel du message, requis pour les codes temporels non epoch Unix. Utilisez les codes de format Si le code temporel n'est pas au format epoch Unix et que ce paramètre n'est pas configuré, le connecteur échoue. |
Severity Mapping JSON |
Obligatoire. Objet JSON utilisé par le connecteur pour extraire et mapper le niveau de gravité du message à l'échelle de priorité Google SecOps. La valeur par défaut est |
Unique ID Field |
Facultatif. Nom du champ à utiliser comme identifiant unique du message. Si aucune valeur n'est fournie, le connecteur génère et utilise un hachage SHA-256 du contenu du message comme identifiant de message. |
Max Messages To Fetch |
Obligatoire. Nombre maximal de messages traités par le connecteur à chaque itération. La valeur par défaut est |
Disable Overflow |
Facultatif. Si cette option est sélectionnée, le connecteur ignore le mécanisme de dépassement de capacité Google SecOps. Cette option est activée par défaut. |
Verify SSL |
Obligatoire. Si cette option est sélectionnée, l'intégration valide le certificat SSL lors de la connexion au serveur Apache Kafka. Cette option est activée par défaut. |
Proxy Server Address |
Facultatif. Adresse du serveur proxy à utiliser. |
Proxy Username |
Facultatif. Nom d'utilisateur du proxy pour l'authentification. |
Proxy Password |
Facultatif. Mot de passe du proxy pour l'authentification. |
Règles du connecteur
Le connecteur est compatible avec les proxys.
Alertes de connecteur
Le tableau suivant décrit le mappage des champs de message Apache Kafka avec les champs d'alerte Google SecOps :
| Champ d'alerte Siemplify | Champ de message Apache Kafka |
|---|---|
SourceSystemName |
Rempli par le framework. |
TicketId |
Valeur du champ d'ID unique ou hachage SHA-256 du message. |
DisplayId |
ApacheKafka_{unique id or hash}_{connector identifier} |
Name |
Valeur générée par Alert Name Template. |
Reason |
N/A |
Description |
N/A |
DeviceVendor |
Codé en dur : Apache Kafka |
DeviceProduct |
Valeur de remplacement : Message |
Priority |
Mappé à partir du paramètre Severity Mapping JSON. |
RuleGenerator |
Valeur générée par Rule Generator Template. |
SourceGroupingIdentifier |
N/A |
StartTime |
Converti à partir de Timestamp Field. |
EndTime |
Converti à partir de Timestamp Field. |
Siemplify Alert - Extensions |
N/A |
Siemplify Alert - Attachments |
N/A |
Événements de connecteur
Voici un exemple d'événement de connecteur :
{
"notificationConfigName": "organizations/ORGANIZATION_ID/notificationConfigs/soar_connector_CONNECTOR_ID_toxic_notifications_config",
"finding": {
"name": "organizations/ORGANIZATION_ID/sources/SOURCE_ID/findings/FINDING_ID",
"parent": "organizations/ORGANIZATION_ID/sources/SOURCE_ID",
"resourceName": "//compute.googleapis.com/projects/PROJECT_ID/global/firewalls/FIREWALL_ID",
"state": "ACTIVE",
"category": "OPEN_NETBIOS_PORT",
"externalUri": "https://console.cloud.google.com/networking/firewalls/details/default-allow-rdp?project=PROJECT_ID",
"sourceProperties": {
"Recommendation": "Restrict the firewall rules at: https://console.cloud.google.com/networking/firewalls/details/default-allow-rdp?project=PROJECT_ID",
"ExceptionInstructions": "Add the security mark \"allow_open_netbios_port\" to the asset with a value of \"true\" to prevent this finding from being activated again.",
"Explanation": "Firewall rules that allow connections from all IP addresses on TCP ports 137-139 or UDP ports 137-139 may expose NetBIOS services to attackers.",
"ScannerName": "FIREWALL_SCANNER",
"ResourcePath": [
"projects/PROJECT_ID/",
"folders/FOLDER_ID_1/",
"folders/FOLDER_ID_2/",
"organizations/ORGANIZATION_ID/"
],
"ExposedService": "NetBIOS",
"OpenPorts": {
"TCP": [
137.0,
138.0,
139.0
],
"UDP": [
137.0,
138.0,
139.0
]
},
"compliance_standards": {
"iso": [
{
"ids": [
"A.13.1.1"
]
}
],
"pci": [
{
"ids": [
"1.2.1"
]
}
],
"nist": [
{
"ids": [
"SC-7"
]
}
]
},
"ReactivationCount": 4.0
},
"securityMarks": {
"name": "organizations/ORGANIZATION_ID/sources/SOURCE_ID/findings/FINDING_ID/securityMarks",
"marks": {
"USER_ID": "SECURITY_MARK"
}
},
"eventTime": "2024-08-30T14:44:37.973090Z",
"createTime": "2024-06-24T07:08:54.777Z",
"propertyDataTypes": {
"ResourcePath": {
"listValues": {
"propertyDataTypes": [
{
"primitiveDataType": "STRING"
}
]
}
},
"ReactivationCount": {
"primitiveDataType": "NUMBER"
},
"Explanation": {
"primitiveDataType": "STRING"
},
"ExposedService": {
"primitiveDataType": "STRING"
},
"ScannerName": {
"primitiveDataType": "STRING"
}
}
}
}
Vous avez encore besoin d'aide ? Obtenez des réponses de membres de la communauté et de professionnels Google SecOps.