Intégrer Apache Kafka à Google SecOps

Version de l'intégration : 1.0

Ce document explique comment intégrer Apache Kafka à Google Security Operations (Google SecOps).

Cas d'utilisation

L'intégration Apache Kafka peut répondre aux cas d'utilisation suivants :

  • Ingestion des journaux de sécurité en temps réel : ingérez et traitez automatiquement les événements de sécurité des sujets Kafka dans Google SecOps. Cela permet une gestion centralisée des journaux et une analyse en temps réel pour générer des alertes basées sur les données de flux.

  • Automatisation basée sur des événements : déclenchez des playbooks automatisés dans Google SecOps en fonction d'événements ou de messages de sécurité spécifiques diffusés à partir d'un thème Kafka. Cela accélère la réponse aux événements critiques, comme la connexion d'un utilisateur depuis un lieu inhabituel.

  • Enrichissement des renseignements sur les menaces : extrayez des flux de renseignements personnalisés sur les menaces à partir de sujets Kafka pour enrichir les alertes et les cas existants. Cela fournit aux analystes un contexte à jour sur les indicateurs de compromission (IOC, Indicators of Compromise) et améliore la précision de l'analyse des menaces.

Avant de commencer

Avant de configurer l'intégration Apache Kafka dans Google SecOps, remplissez les conditions préalables suivantes :

  • Serveur Apache Kafka : assurez-vous d'avoir accès à un serveur Apache Kafka en cours d'exécution avec les courtiers et les thèmes Kafka nécessaires configurés.
  • Image Docker de l'agent à distance : lorsque vous créez des agents à distance, vous devez utiliser une image basée sur Debian. Utilisez l'image suivante pour vérifier la compatibilité :

    us-docker.pkg.dev/siem-ar-public/images/agent-debian:latest
    

Paramètres d'intégration

L'intégration Apache Kafka nécessite les paramètres suivants :

Paramètre Description
Kafka brokers

Obligatoire.

Liste de courtiers Kafka auxquels se connecter, séparés par une virgule, au format hostname:port.

Use TLS for connection

Facultatif.

Si cette option est sélectionnée, l'intégration utilise le chiffrement TLS pour l'authentification.

Ce paramètre nécessite un certificat d'autorité de certification (CA).

Cette option n'est pas activée par défaut.

Use SASL PLAIN with TLS for connection

Facultatif.

Si cette option est sélectionnée, l'intégration utilise le mécanisme d'authentification SASL/PLAIN avec nom d'utilisateur et mot de passe.

Cette option n'est compatible qu'avec le chiffrement TLS. Elle nécessite un nom d'utilisateur et un mot de passe SASL, ainsi qu'un certificat d'autorité de certification.

Cette option n'est pas activée par défaut.

CA certificate of Kafka server

Facultatif.

Certificat de l'autorité de certification utilisé pour vérifier l'identité du serveur Kafka.

Ce paramètre est obligatoire si SASL est activé.

Client certificate

Facultatif.

Certificat du client pour l'authentification TLS mutuelle avec le serveur Kafka.

Ce paramètre est obligatoire si l'authentification TLS mutuelle (mTLS) est activée.

Client certificate key

Facultatif.

Clé privée correspondant au certificat du client, utilisée pour l'authentification TLS mutuelle.

Ce paramètre est obligatoire si l'authentification TLS mutuelle (mTLS) est activée.

Client certificate key password

Facultatif.

Mot de passe utilisé pour déchiffrer la clé privée du certificat client.

Ce paramètre est obligatoire si l'authentification TLS mutuelle (mTLS) est activée.

SASL PLAIN Username

Facultatif.

Nom d'utilisateur pour l'authentification SASL/PLAIN avec les courtiers Kafka.

Ce paramètre est obligatoire si SASL est activé.

SASL PLAIN Password

Facultatif.

Mot de passe pour l'authentification SASL/PLAIN avec les courtiers Kafka.

Ce paramètre est obligatoire si SASL est activé.

Pour obtenir des instructions sur la configuration d'une intégration dans Google SecOps, consultez Configurer des intégrations.

Vous pourrez apporter des modifications ultérieurement, si nécessaire. Une fois que vous avez configuré une instance d'intégration, vous pouvez l'utiliser dans des playbooks. Pour savoir comment configurer et prendre en charge plusieurs instances, consultez Prise en charge de plusieurs instances.

Actions

Pour en savoir plus sur les actions, consultez Répondre aux actions en attente dans Votre bureau et Effectuer une action manuelle.

Ping

Utilisez l'action Ping pour tester la connectivité à Apache Kafka.

Cette action ne s'applique pas aux entités Google SecOps.

Entrées d'action

Aucune.

Sorties d'action

L'action Ping fournit les résultats suivants :

Type de sortie de l'action Disponibilité
Pièce jointe au mur des cas Non disponible
Lien vers le mur des cas Non disponible
Tableau du mur des cas Non disponible
Table d'enrichissement Non disponible
Résultat JSON Non disponible
Messages de sortie Disponible
Résultat du script. Disponible
Messages de sortie

L'action Ping peut renvoyer les messages de résultat suivants :

Message de sortie Description du message

Successfully connected to the Apache Kafka server with the provided connection parameters!

L'action a réussi.
Failed to connect to the Apache Kafka server! Error is ERROR_REASON

Échec de l'action.

Vérifiez la connexion au serveur, les paramètres d'entrée ou les identifiants.

Résultat du script

Le tableau suivant répertorie la valeur du résultat du script lorsque vous utilisez l'action Ping :

Nom du résultat du script Valeur
is_success True ou False

Connecteurs

Pour en savoir plus sur la configuration des connecteurs dans Google SecOps, consultez Ingérer vos données (connecteurs).

Connecteur Apache Kafka – Messages

Utilisez le connecteur Apache Kafka – Messages pour récupérer les messages depuis Apache Kafka.

Le connecteur récupère les messages d'un sujet Kafka spécifié et peut les traiter de différentes manières en fonction du format du message. Si un message est un objet JSON valide, le connecteur extrait des champs spécifiques pour la création et le mappage des alertes. Si le message est une chaîne simple, il est ingéré en tant que données d'événement brutes.

Le connecteur gère le mappage de la gravité, la création de modèles de noms d'alertes et la génération d'ID uniques en fonction des paramètres que vous fournissez.

Mappage de la gravité JSON

Pour mapper la gravité de l'alerte, vous devez spécifier le champ que le connecteur Apache Kafka – Messages utilise pour obtenir la valeur de gravité dans le paramètre Severity Mapping JSON. La réponse du connecteur peut contenir des types de valeurs, tels que integer, float et string.

Le connecteur Apache Kafka – Messages lit les valeurs integer et float et les mappe en fonction des paramètres Google SecOps. Le tableau suivant montre le mappage des valeurs integer à la gravité dans Google SecOps :

Valeur entière Gravité mappée
100 Critical
Du 80 au 100 High
Du 60 au 80 Medium
Du 40 au 60 Low
Moins de 40 Informational

Si la réponse contient la valeur string, le connecteur Pub/Sub – Messages nécessite une configuration supplémentaire.

Au départ, la valeur par défaut s'affiche comme suit :

{
    "Default": 60
}

Si les valeurs requises pour le mappage se trouvent dans la clé JSON event_severity, elles peuvent être les suivantes :

  • "Malicious"
  • "Benign"
  • "Unknown"

Pour analyser les valeurs de clé JSON event_severity et vous assurer que l'objet JSON est au bon format, configurez le paramètre Severity Mapping JSON comme suit :

{
    "event_severity": {
        "Malicious": 100,
        "Unknown": 60,
        "Benign": -1
    },
    "Default": 50
}

La valeur "Default" est obligatoire.

Dans le cas où plusieurs correspondances sont trouvées pour le même objet JSON, le connecteur Apache Kafka – Messages donne la priorité à la première clé d'objet JSON.

Pour utiliser des champs contenant des valeurs integer ou float, configurez la clé et une chaîne vide dans le paramètre Severity Mapping JSON :

{
  "Default":"60",
  "integer_field": "",
  "float_field": ""
}

Entrées du connecteur

Le connecteur Apache Kafka – Messages nécessite les paramètres suivants :

Paramètre Description
Product Field Name

Obligatoire.

Nom du champ dans lequel le nom du produit est stocké.

Le nom du produit a un impact majeur sur le mappage. Pour simplifier et améliorer le processus de mappage du connecteur, la valeur par défaut est résolue en une valeur de remplacement référencée dans le code. Toute entrée non valide pour ce paramètre est résolue par défaut sur une valeur de remplacement.

La valeur par défaut est Product Name.

Event Field Name

Obligatoire.

Nom du champ qui détermine le nom (sous-type) de l'événement.

La valeur par défaut est event_type.

Environment Field Name

Facultatif.

Nom du champ dans lequel le nom de l'environnement est stocké.

Si le champ "environment" est manquant, le connecteur utilise la valeur par défaut.

La valeur par défaut est "".

Environment Regex Pattern

Facultatif.

Modèle d'expression régulière à exécuter sur la valeur trouvée dans le champ Environment Field Name. Ce paramètre vous permet de manipuler le champ "environment" à l'aide de la logique d'expression régulière.

Utilisez la valeur par défaut .* pour récupérer la valeur Environment Field Name brute requise.

Si le modèle d'expression régulière est nul ou vide, ou si la valeur de l'environnement est nulle, l'environnement par défaut est sélectionné comme résultat final.

Script Timeout (Seconds)

Obligatoire.

Délai avant expiration, en secondes, du processus Python qui exécute le script actuel.

La valeur par défaut est 180.

Kafka brokers

Obligatoire.

Liste de courtiers Kafka auxquels se connecter, séparés par une virgule, au format hostname:port.

Use TLS for connection

Facultatif.

Si cette option est sélectionnée, l'intégration utilise le chiffrement TLS pour l'authentification.

Ce paramètre nécessite un certificat d'autorité de certification.

Cette option n'est pas activée par défaut.

Use SASL PLAIN with TLS for connection

Facultatif.

Si cette option est sélectionnée, l'intégration utilise le mécanisme d'authentification SASL/PLAIN avec nom d'utilisateur et mot de passe.

Cette option nécessite de fournir un nom d'utilisateur et un mot de passe SASL. Il n'est compatible qu'avec le chiffrement TLS, qui nécessite un certificat d'autorité de certification.

Cette option n'est pas activée par défaut.

CA certificate of Kafka server

Facultatif.

Certificat de l'autorité de certification utilisé pour vérifier l'identité du serveur Kafka.

Client certificate

Facultatif.

Certificat du client pour l'authentification TLS mutuelle avec le serveur Kafka.

Client certificate key

Facultatif.

Clé privée correspondant au certificat du client, utilisée pour l'authentification TLS mutuelle.

Client certificate key password

Facultatif.

Mot de passe utilisé pour déchiffrer la clé privée du certificat client.

SASL PLAIN Username

Facultatif.

Nom d'utilisateur pour l'authentification SASL/PLAIN avec les courtiers Kafka.

SASL PLAIN Password

Facultatif.

Mot de passe pour l'authentification SASL/PLAIN avec les courtiers Kafka.

Topic

Obligatoire.

Sujet Kafka à partir duquel les incidents sont récupérés.

Consumer Group ID

Facultatif.

Identifiant du groupe de consommateurs utilisé lors de la récupération des incidents.

Si aucune valeur n'est fournie, un ID unique est généré.

Partitions

Facultatif.

Liste CSV des partitions à partir desquelles récupérer les messages.

Initial Offset

Facultatif.

Point de départ de l'extraction des messages d'une partition Kafka par le connecteur.

Vous pouvez spécifier un entier positif pour commencer à un décalage particulier, ou utiliser les valeurs earliest ou latest pour commencer à récupérer les données au début ou à la fin de la partition.

Poll Timeout

Facultatif.

Délai avant expiration de l'interrogation pour consommer un message de Kafka, en secondes.

Case Name Template

Facultatif.

Modèle permettant de définir un nom de demande personnalisé. Le connecteur ajoute une clé custom_case_name à l'événement.

Vous pouvez utiliser des espaces réservés au format FIELD_NAME, qui sont renseignés à partir des valeurs de chaîne du premier événement.

Exemple : Phishing - EVENT_MAILBOX.

Alert Name Template

Obligatoire.

Modèle permettant de définir le nom de l'alerte.

Vous pouvez utiliser des espaces réservés au format FIELD_NAME, qui sont renseignés à partir des valeurs de chaîne du premier événement.

Exemple : Phishing - EVENT_MAILBOX.

Si aucune valeur n'est fournie ou si le modèle n'est pas valide, le connecteur utilise un nom d'alerte par défaut.

Rule Generator Template

Obligatoire.

Modèle permettant de définir le générateur de règles.

Vous pouvez utiliser des espaces réservés au format FIELD_NAME, qui sont renseignés à partir des valeurs de chaîne du premier événement.

Exemple : Phishing - EVENT_MAILBOX.

Si aucune valeur n'est fournie ou si le modèle n'est pas valide, le connecteur utilise un nom de générateur de règles par défaut.

Timestamp Field

Obligatoire.

Nom du champ du message Kafka contenant le code temporel de l'alerte Google SecOps.

Si l'horodatage n'est pas au format epoch Unix, son format doit être défini dans le paramètre Timestamp Format.

Timestamp Format

Facultatif.

Format du code temporel du message, requis pour les codes temporels non epoch Unix. Utilisez les codes de format strftime Python standards.

Si le code temporel n'est pas au format epoch Unix et que ce paramètre n'est pas configuré, le connecteur échoue.

Severity Mapping JSON

Obligatoire.

Objet JSON utilisé par le connecteur pour extraire et mapper le niveau de gravité du message à l'échelle de priorité Google SecOps.

La valeur par défaut est {"Default": "60"}.

Unique ID Field

Facultatif.

Nom du champ à utiliser comme identifiant unique du message.

Si aucune valeur n'est fournie, le connecteur génère et utilise un hachage SHA-256 du contenu du message comme identifiant de message.

Max Messages To Fetch

Obligatoire.

Nombre maximal de messages traités par le connecteur à chaque itération.

La valeur par défaut est 100.

Disable Overflow

Facultatif.

Si cette option est sélectionnée, le connecteur ignore le mécanisme de dépassement de capacité Google SecOps.

Cette option est activée par défaut.

Verify SSL

Obligatoire.

Si cette option est sélectionnée, l'intégration valide le certificat SSL lors de la connexion au serveur Apache Kafka.

Cette option est activée par défaut.

Proxy Server Address

Facultatif.

Adresse du serveur proxy à utiliser.

Proxy Username

Facultatif.

Nom d'utilisateur du proxy pour l'authentification.

Proxy Password

Facultatif.

Mot de passe du proxy pour l'authentification.

Règles du connecteur

Le connecteur est compatible avec les proxys.

Alertes de connecteur

Le tableau suivant décrit le mappage des champs de message Apache Kafka avec les champs d'alerte Google SecOps :

Champ d'alerte Siemplify Champ de message Apache Kafka
SourceSystemName Rempli par le framework.
TicketId Valeur du champ d'ID unique ou hachage SHA-256 du message.
DisplayId ApacheKafka_{unique id or hash}_{connector identifier}
Name Valeur générée par Alert Name Template.
Reason N/A
Description N/A
DeviceVendor Codé en dur : Apache Kafka
DeviceProduct Valeur de remplacement : Message
Priority Mappé à partir du paramètre Severity Mapping JSON.
RuleGenerator Valeur générée par Rule Generator Template.
SourceGroupingIdentifier N/A
StartTime Converti à partir de Timestamp Field.
EndTime Converti à partir de Timestamp Field.
Siemplify Alert - Extensions N/A
Siemplify Alert - Attachments N/A

Événements de connecteur

Voici un exemple d'événement de connecteur :

{
  "notificationConfigName": "organizations/ORGANIZATION_ID/notificationConfigs/soar_connector_CONNECTOR_ID_toxic_notifications_config",
  "finding": {
    "name": "organizations/ORGANIZATION_ID/sources/SOURCE_ID/findings/FINDING_ID",
    "parent": "organizations/ORGANIZATION_ID/sources/SOURCE_ID",
    "resourceName": "//compute.googleapis.com/projects/PROJECT_ID/global/firewalls/FIREWALL_ID",
    "state": "ACTIVE",
    "category": "OPEN_NETBIOS_PORT",
    "externalUri": "https://console.cloud.google.com/networking/firewalls/details/default-allow-rdp?project=PROJECT_ID",
    "sourceProperties": {
      "Recommendation": "Restrict the firewall rules at: https://console.cloud.google.com/networking/firewalls/details/default-allow-rdp?project=PROJECT_ID",
      "ExceptionInstructions": "Add the security mark \"allow_open_netbios_port\" to the asset with a value of \"true\" to prevent this finding from being activated again.",
      "Explanation": "Firewall rules that allow connections from all IP addresses on TCP ports 137-139 or UDP ports 137-139 may expose NetBIOS services to attackers.",
      "ScannerName": "FIREWALL_SCANNER",
      "ResourcePath": [
        "projects/PROJECT_ID/",
        "folders/FOLDER_ID_1/",
        "folders/FOLDER_ID_2/",
        "organizations/ORGANIZATION_ID/"
      ],
      "ExposedService": "NetBIOS",
      "OpenPorts": {
        "TCP": [
          137.0,
          138.0,
          139.0
        ],
        "UDP": [
          137.0,
          138.0,
          139.0
        ]
      },
      "compliance_standards": {
        "iso": [
          {
            "ids": [
              "A.13.1.1"
            ]
          }
        ],
        "pci": [
          {
            "ids": [
              "1.2.1"
            ]
          }
        ],
        "nist": [
          {
            "ids": [
              "SC-7"
            ]
          }
        ]
      },
      "ReactivationCount": 4.0
    },
    "securityMarks": {
      "name": "organizations/ORGANIZATION_ID/sources/SOURCE_ID/findings/FINDING_ID/securityMarks",
      "marks": {
        "USER_ID": "SECURITY_MARK"
      }
    },
    "eventTime": "2024-08-30T14:44:37.973090Z",
    "createTime": "2024-06-24T07:08:54.777Z",
    "propertyDataTypes": {
      "ResourcePath": {
        "listValues": {
          "propertyDataTypes": [
            {
              "primitiveDataType": "STRING"
            }
          ]
        }
      },
      "ReactivationCount": {
        "primitiveDataType": "NUMBER"
      },
      "Explanation": {
        "primitiveDataType": "STRING"
      },
      "ExposedService": {
        "primitiveDataType": "STRING"
      },
      "ScannerName": {
        "primitiveDataType": "STRING"
      }
    }
  }
}

Vous avez encore besoin d'aide ? Obtenez des réponses de membres de la communauté et de professionnels Google SecOps.