Développer le connecteur

Compatible avec :

Ce document explique comment créer un connecteur dans l'environnement de développement intégré (IDE) en créant une intégration, puis en y associant un connecteur.

Créer une intégration de connecteur d'e-mail

Pour créer une intégration de connecteur d'e-mails, procédez comme suit :

  1. Dans la fenêtre Réponse > IDE, cliquez sur Ajouter pour ajouter un élément IDE.
  2. Sélectionnez Intégration et nommez l'intégration Email Connector.
  3. Cliquez sur Créer. L'intégration s'affiche dans la barre latérale avec une icône par défaut.
  4. Cliquez sur more_vert Configurer l'intégration personnalisée et définissez les paramètres suivants :
    • Description
    • Icône
    • Dépendances Python
    • Paramètres d'intégration
  5. Cliquez sur Enregistrer.

Créer un connecteur d'e-mails

Pour créer un connecteur d'e-mails :

  1. Dans la fenêtre Réponse > IDE, cliquez sur Ajouter pour ajouter un élément IDE.
  2. Sélectionnez la case d'option Connecteur et nommez le connecteur My Email Connector.
  3. Dans la liste, sélectionnez l'intégration Email Connector (Connecteur d'e-mails) pour associer le connecteur à l'intégration.
  4. Cliquez sur Créer.

Définir les paramètres du connecteur

Après avoir créé le connecteur, définissez ses paramètres :

  1. Définissez les paramètres du connecteur en fonction du tableau :

  2. Paramètre Description Type
    Username Obligatoire. Nom d'utilisateur IMAP. Adresse e-mail à partir de laquelle le connecteur extrait les e-mails vers la plate-forme Google SecOps. La valeur par défaut est email@gmail.com. chaîne
    Password Obligatoire. Mot de passe IMAP (Internet Message Access Protocol) de l'adresse e-mail utilisée par le connecteur pour importer les e-mails dans la plate-forme Google SecOps. mot de passe
    IMAP Port Obligatoire. Port réseau spécifique utilisé par IMAP pour accéder aux e-mails d'un serveur distant. Par exemple : 993 (valeur par défaut). entier
    IMAP Server Address Obligatoire. Serveur de courrier entrant pour un compte IMAP. Par exemple, imap.gmail.com (valeur par défaut). chaîne
    Folder to check for emails Facultatif. Extrait les e-mails uniquement du dossier spécifié. Par exemple : Inbox (valeur par défaut). inbox
  3. Saisissez les informations suivantes dans les champs :
    1. Product Field Name = device_product : détermine la valeur brute du champ à attribuer au nom du produit de l'alerte. Recherchez le champ associé dans le code, qui a été défini comme Mail (produit).
      event["device_product"] = PRODUCT #The PRODUCT constant is `"Mail"
    2. Nom du champ d'événement = event_name : détermine la valeur brute du champ à attribuer au champ du type d'événement. Recherchez le champ associé dans le code, défini comme Suspicious email.
      event["event_name"] = Suspicious email

Modifier le connecteur d'e-mails

Pour modifier les paramètres du connecteur d'e-mails, procédez comme suit :

  1. Copiez le code suivant créé pour My Email Connector, collez-le dans l'IDE, puis suivez les instructions :
    from SiemplifyConnectors import SiemplifyConnectorExecution
    from SiemplifyConnectorsDataModel import AlertInfo
    from SiemplifyUtils import output_handler, convert_datetime_to_unix_time, convert_string_to_datetime
    import email, imaplib, sys, re
    
    # CONSTANTS
    CONNECTOR_NAME = "Mail"
    VENDOR = "Mail"
    PRODUCT = "Mail"
    DEFAULT_PRIORITY = 60 # Default is Medium
    RULE_GENERATOR_EXAMPLE = "Mail"
    DEFAULT_FOLDER_TO_CHECK_INBOX = "inbox"
    DEFAULT_MESSAGES_TO_READ_UNSEEN = "UNSEEN"
    URLS_REGEX = r"(?i)\b(?:http(?:s)?:\/\/)?(?:www\.)?[a-zA-Z0-9:%_\+~#=][a-zA-Z0-9:%\._\+~#=]{1,255}\.[a-z]{2,6}\b(?:[-a-zA-Z0-9@:%_\+.~#?&//=]*)"
    
    
    def create_alert(siemplify, alert_id, email_message_data, datetime_in_unix_time, created_event):
        """Returns an alert which is one event that contains one unread email message"""
        siemplify.LOGGER.info(f"Started processing Alert {alert_id}")
    
        create_event = None
        alert_info = AlertInfo()
        # Initializes alert_info
        alert_info.display_id = f"{alert_id}" # Each alert needs to have a unique id, otherwise it won't create a case with the same alert id.
        alert_info.ticket_id = f"{alert_id}" # In default, ticket_id = display_id. However, if for some reason the external alert id is different from the display_id, you can save the original external alert id in the "ticket_id" field. 
        alert_info.name = email_message_data['Subject']
        alert_info.rule_generator = RULE_GENERATOR_EXAMPLE # The name of the siem rule which causes the creation of the alert. 
        alert_info.start_time = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime 
        alert_info.end_time = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime
        alert_info.priority = 60 # Informative = -1,Low = 40,Medium = 60,High = 80,Critical = 100.
        alert_info.device_vendor = VENDOR # The field will be fetched from the Original Alert. If you build this alert manually, state the source vendor of the data. (ie: Microsoft, Mcafee) 
        alert_info.device_product = PRODUCT # The field will be fetched from the Original Alert. If you build this alert manually, state the source product of the data. (ie: ActiveDirectory, AntiVirus)
    
        siemplify.LOGGER.info(f"Events creating started for alert {alert_id}")
        try:
            if created_event is not None:
                alert_info.events.append(created_event)
                siemplify.LOGGER.info(f"Added Event {alert_id} to Alert {alert_id}")
        # Raise an exception if failed to process the event
        except Exception as e:
            siemplify.LOGGER.error(f"Failed to process event {alert_id}")
            siemplify.LOGGER.exception(e)
        return alert_info
    
    
    def create_event(siemplify, alert_id, email_message_data, all_found_url_in_emails_body_list, datetime_in_unix_time):
        """Returns the digested data of a single unread email"""
        siemplify.LOGGER.info(f"--- Started processing Event: alert_id: {alert_id} | event_id: {alert_id}") 
        event = {} 
        event["StartTime"] = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime 
        event["EndTime"] = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime 
        event["event_name"] = "Suspicious email" 
        event["device_product"] = PRODUCT # ie: "device_product" is the field name that describes the product the event originated from. 
        event["Subject"] = email_message_data["Subject"] 
        event["SourceUserName"] = email_message_data["From"] 
        event["DestinationUserName"] = email_message_data["To"] 
        event["found_url"] = ",".join(all_found_url_in_emails_body_list) 
        siemplify.LOGGER.info(f"---Finished processing Event: alert_id: {alert_id} | event_id: {alert_id}")
        return event
    
    
    def find_url_in_email_message_body(siemplify, email_messages_data_list):
        """Search for a url in the email body"""
        all_found_url_in_emails_body_list = []
        for message in email_messages_data_list:
            for part in message.walk():
                if part.get_content_maintype() == 'text\plain':
                    continue
                email_message_body = part.get_payload()
                all_found_urls = re.findall(URLS_REGEX, str(email_message_body))
                for url in all_found_urls:
                    if url not in all_found_url_in_emails_body_list:
                        all_found_url_in_emails_body_list.append(url)
    
    
    def get_email_messages_data(imap_host, imap_port, username, password, folder_to_check):
        """Returns all unread email messages"""
        email_messages_data_list = []
    
        # Login to email using 'imap' module
        mail = imaplib.IMAP4_SSL(imap_host, imap_port)
        mail.login(username, password)
    
        # Determining the default email folder to pull emails from - 'inbox'
        if folder_to_check is None:
            folder_to_check = DEFAULT_FOLDER_TO_CHECK_INBOX
    
        # Selecting the email folder to pull the data from
        mail.select(folder_to_check)
    
        # Storing the email message data
        result, data = mail.search(None, DEFAULT_MESSAGES_TO_READ_UNSEEN)
    
        # If there are several emails collected in the cycle it will split each
        # email message into a separate item in the list chosen_mailbox_items_list
        if len(data) > 0:
            chosen_mailbox_items_list = data[0].split()
            # Iterating each email message and appending to emails_messages_data_list
            for item in chosen_mailbox_items_list:
                typ, email_data = mail.fetch(item, '(RFC 822)')
                # Decoding from binary string to string
                raw_email = email_data[0][1].decode("utf-8")
                # Turning the email data into an email object
                email_message = email.message_from_string(raw_email)
                # Appending the email message data to email_messages_data_list
                email_messages_data_list.append(email_message)
        return email_messages_data_list
    
    
    @output_handler
    def main(is_test_run):
        alerts = [] # The main output of each connector run that contains the alerts data 
        siemplify = SiemplifyConnectorExecution() # Siemplify main SDK wrapper
        siemplify.script_name = CONNECTOR_NAME
    
        # In case of running a test
        if (is_test_run):
            siemplify.LOGGER.info("This is an \"IDE Play Button\"\\\"Run Connector once\" test run")
    
        # Extracting the connector's Params
        username = siemplify.extract_connector_param(param_name="Username") 
        password = siemplify.extract_connector_param(param_name="Password") 
        imap_host = siemplify.extract_connector_param(param_name="IMAP Server Address")
        imap_port = siemplify.extract_connector_param(param_name="IMAP Port")
        folder_to_check = siemplify.extract_connector_param(param_name="Folder to check for emails")
    
        # Getting the digested email message data
        email_messages_data_list = get_email_messages_data(imap_host, imap_port, username, password, folder_to_check)
        # If the email_messages_data_list is not empty
        if len(email_messages_data_list) > 0:
            for message in email_messages_data_list:
                # Converting the email message datetime from string to unix time by SiemplifyUtils functions
                datetime_email_message = message['Date']
                string_to_datetime = convert_string_to_datetime(datetime_email_message)
                datetime_in_unix_time = convert_datetime_to_unix_time(string_to_datetime)
                found_urls_in_email_body = find_url_in_email_message_body(siemplify, email_messages_data_list)
                # Getting the unique id of each email message and removing the suffix '@mail.gmail.com' from the Message-ID, Each alert id can be ingested to the system only once.
                alert_id = message['Message-ID'].replace('@mail.gmail.com','') 
                # Creating the event by calling create_event() function 
                created_event = create_event(siemplify, alert_id, message, found_urls_in_email_body, datetime_in_unix_time) 
                # Creating the alert by calling create_alert() function 
                created_alert = create_alert(siemplify, alert_id, message, datetime_in_unix_time, created_event)
                # Checking that the created_alert is not None 
                if created_alert is not None: 
                    alerts.append(created_alert) 
                    siemplify.LOGGER.info(f'Added Alert {alert_id} to package results')
        # If the inbox for the user has no unread emails. 
        else: 
            siemplify.LOGGER.info(f'The inbox for user {username} has no unread emails') 
        # Returning all the created alerts to the cases module in Siemplify 
        siemplify.return_package(alerts) 
    
    
    if __name__ == '__main__':
    # Connectors run in iterations. The interval is configurable from the ConnectorsScreen UI. 
        is_test_run = not (len(sys.argv) < 2 or sys.argv[1] == 'True') 
        main(is_test_run)
    
  2. Après avoir copié le code du connecteur, examinez les modules à importer, puis passez à la fonction principale. Chaque méthode appelée à partir de la fonction principale sera abordée plus en détail ultérieurement.

Importations pertinentes

Un module Python comporte un ensemble de fonctions, de classes ou de variables définies et implémentées. Pour implémenter toutes les fonctions suivantes, importez ces modules dans le script :

from SiemplifyConnectors import SiemplifyConnectorExecution # This module is responsible for executing the connector 
from SiemplifyConnectorsDataModel import AlertInfo # The data model that contains the alert info class
from SiemplifyUtils import output_handler, convert_datetime_to_unix_time, convert_string_to_datetime # The functions that convert time 
import email, imaplib, sys, re

Fonction principale

La fonction Main est le point d'entrée du script. L'interpréteur Python exécute le code de manière séquentielle et appelle chaque méthode définie dans le script.

  1. Extrayez les paramètres du connecteur. Utilisez la fonction siemplify.extract_connector_param pour extraire chaque paramètre configuré pour le connecteur (username, password, imap_host, imap_port, folder_to_check).
    # Extracting the connector's Params
        username = siemplify.extract_connector_param(param_name="Username") 
        password = siemplify.extract_connector_param(param_name="Password") 
        imap_host = siemplify.extract_connector_param(param_name="IMAP Server Address") 
        imap_port = siemplify.extract_connector_param(param_name="IMAP Port") 
        folder_to_check = siemplify.extract_connector_param(param_name="Folder to check for emails")
        
  2. Utilisez la fonction get_email_messages_data (`imap_host`, `imap_port`, `username`, `password`, `folder_to_check) pour collecter toutes les informations des e-mails non lus.
    # Getting the digested email message data 
        email_messages_data_list = get_email_messages_data(imap_host, imap_port, username, password, folder_to_check) 
  3. Une fois que vous avez reçu toutes les informations de l'e-mail, confirmez qu'elles ont été collectées, puis effectuez les actions suivantes sur chaque e-mail :
    # If the email_messages_data_list is not empty 
    if len(email_messages_data_list) > 0:
       for message in email_messages_data_list:
           # Converting the email message datetime from string to unix time by SiemplifyUtils functions

    Ce code extrait la date du message par datetime_email_message = message['Date'], puis convertit cette date et heure en heure epoch Unix à l'aide des fonctions Google SecOps :
    string_to_datetime = convert_string_to_datetime(datetime_email_message) 
    datetime_in_unix_time = convert_datetime_to_unix_time(string_to_datetime) 
  4. Recherchez des URL dans le corps du message à l'aide de la fonction find_url_in_email_message_body(siemplify_email_messages_data_list). Si vous trouvez une URL, utilisez d'autres produits de notre guide pour vérifier si elle est malveillante.
  5. found_urls_in_email_body = find_url_in_email_message_body(siemplify, email_messages_data_list)
  6. Extrayez l'ID unique de chaque message électronique et attribuez-le à la variable alert_id :
    # Getting the unique id of each email message and removing the suffix '@mail.gmail.com' from the Message-ID, Each alert id can be ingested to the system only once. 
    alert_id = message['Message-ID'].replace('@mail.gmail.com','') 
  7. Après avoir extrait tous les détails nécessaires pour ingérer l'alerte dans la plate-forme Google SecOps, créez l'alerte et l'événement :
    # Creating the event by calling create_event() function 
    created_event = create_event(siemplify, alert_id, message, found_urls_in_email_body, datetime_in_unix_time) 
    # Creating the alert by calling create_alert() function 
    created_alert = create_alert(siemplify, alert_id, message, datetime_in_unix_time, created_event)
  8. Validez l'alerte et l'événement créés. Après la validation, ajoutez l'alerte à la liste des alertes.
        # Checking that the created_alert is not None 
        if created_alert is not None: 
            alerts.append(created_alert)     
            siemplify.LOGGER.info(f"Added Alert {alert_id} to package results")    
        
  9. Dans le cas où la boîte de réception de l'utilisateur donné ne contient aucun e-mail non lu, ajoutez le code suivant :
    else:
        siemplify.LOGGER.info(f"The inbox for user {username} has no unread emails")    
    
  10. La liste des alertes est renvoyée au système, et chaque alerte est ensuite présentée comme une demande dans la file d'attente des demandes :
        # Returning all the created alerts to the cases module in Siemplify 
        siemplify.return_package(alerts)
        
  11. Exécutez la fonction Main dans les limites de temps que vous avez définies dans la configuration du connecteur :
    if __name__ == "__main__": 
        # Connectors run in iterations. The interval is configurable from the Connectors UI. 
        is_test_run = not (len(sys.argv) < 2 or sys.argv[1] == 'True')
        main(is_test_run)
    

Récupérer l'e-mail non lu

La fonction Get the unread email message (Récupérer le message électronique non lu) se connecte à l'e-mail avec les modules Imap et Email, et récupère les détails du message électronique. Il renvoie également une liste contenant toutes les informations de tous les e-mails non lus.

  1. Dans la classe principale, utilisez la fonction : get_email_messages_data(imap_host, imap_port, username, password, folder_to_check).
    def get_email_messages_data(imap_host, imap_port, username, password, folder_to_check):
        """Returns all unread email messages"""
        email_messages_data_list = []
    
  2. Connectez-vous à l'e-mail à l'aide de imap module :
        # Login to email using 'imap' module
        mail = imaplib.IMAP4_SSL(imap_host, imap_port)
        mail.login(username, password)
        
  3. Déterminez le dossier de l'e-mail dans lequel rechercher les messages non lus. Dans cet exemple, vous extrayez les e-mails du dossier Boîte de réception (DEFAULT_FOLDER_TO_CHECK_INBOX = "inbox") :
  4. # Determining the default email folder to pull emails from - 'inbox'
    if folder_to_check is None:
        folder_to_check = DEFAULT_FOLDER_TO_CHECK_INBOX
    # Selecting the email folder to pull the data from mail.select(folder_to_check)
    
  5. Collectez tous les messages non lus DEFAULT_MESSAGES_TO_READ_UNSEEN = "UNSEEN", puis convertissez ces données en liste :
    # Storing the email message data
    result, data = mail.search(None, DEFAULT_MESSAGES_TO_READ_UNSEEN)
    # If there are several emails collected in the cycle it will split each
    # email message into a separate item in the list chosen_mailbox_items_list
    if len(data) > 0:
        chosen_mailbox_items_list = data[0].split()
        # Iterating each email message and appending to emails_messages_data_list
        for item in chosen_mailbox_items_list:
            typ, email_data = mail.fetch(item, '(RFC 822)')
            # Decoding from binary string to string
            raw_email = email_data[0][1].decode("utf-8")
            # Turning the email data into an email object
            email_message = email.message_from_string(raw_email)
            # Appending the email message data to email_messages_data_list
            email_messages_data_list.append(email_message)
    return email_messages_data_list
    

Créer l'événement

La fonction Créer l'événement crée l'événement en associant chaque composant du message électronique aux champs de l'événement, respectivement.

  1. À partir de la classe principale, créez l'événement à l'aide de la fonction create_event(siemplify, alert_id, email_message_data, all_found_url_in_emails_body_list, datetime_in_unix_time).
  2. def create_event(siemplify, alert_id, email_message_data, all_found_url_in_emails_body_list, datetime_in_unix_time):
        """Returns the digested data of a single unread email"""
        siemplify.LOGGER.info(f"--- Started processing Event: alert_id: {alert_id} | event_id: {alert_id}") 
    
  3. Créez un dictionnaire pour les champs d'événement. Les champs obligatoires sont les suivants : event["StartTime"], event["EndTime"], event["event_name"] and event["device_product"]
  4. event = {} 
    event["StartTime"] = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime 
    event["EndTime"] = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime 
    event["event_name"] = "Suspicious email" 
    event["device_product"] = PRODUCT # ie: "device_product" is the field name that describes the product the event originated from. 
    event["Subject"] = email_message_data["Subject"] 
    event["SourceUserName"] = email_message_data["From"] 
    event["DestinationUserName"] = email_message_data["To"] 
    event["found_url"] = ",".join(all_found_url_in_emails_body_list) 
    siemplify.LOGGER.info(f"---Finished processing Event: alert_id: {alert_id} | event_id: {alert_id}")
    return event
    
  5. Chaque alerte contient un ou plusieurs événements. Dans cet exemple, une alerte contient un seul événement : un message électronique. Par conséquent, après avoir créé l'événement, créez l'alerte qui contient toutes les informations sur l'événement.

Créez les informations sur l'alerte et initialisez les champs de caractéristiques des informations sur l'alerte.

Cette fonction crée l'alerte. Chaque alerte contient un ou plusieurs événements. Dans ce cas, chaque alerte contient un événement, qui correspond à un message électronique.

  1. Dans la classe principale, créez l'alerte :
    def create_alert(siemplify, alert_id, email_message_data,datetime_in_unix_time, created_event): 
        """Returns an alert which is one event that contains one unread email message"""
        siemplify.LOGGER.info(f"-------------- Started processing Alert {alert_id}")
        create_event = None
    
  2. Créez l'instance alert_info et initialisez-la :
    # Initializes the alert_info Characteristics Fields 
    alert_info.display_id = f"{alert_id}" 
    alert_info.ticket_id = f"{alert_id}" 
    alert_info.name = email_message_data['Subject'] 
    alert_info.rule_generator = RULE_GENERATOR_EXAMPLE 
    alert_info.start_time = datetime_in_unix_time 
    alert_info.end_time = datetime_in_unix_time 
    alert_info.device_vendor = VENDOR 
    alert_info.device_product = PRODUCT 
    
  3. Après avoir créé l'alerte, validez l'événement et ajoutez-le aux caractéristiques aert_info :
  4. siemplify.LOGGER.info(f"Events creating started for alert {alert_id}")
    try:
        if created_event is not None:
            alert_info.events.append(created_event)
        siemplify.LOGGER.info(f"Added Event {alert_id} to Alert {alert_id}")
    # Raise an exception if failed to process the event
    except Exception as e:
        siemplify.LOGGER.error(f"Failed to process event {alert_id}")
        siemplify.LOGGER.exception(e)
    return alert_info
    

Trouver l'URL dans la fonction du corps de l'e-mail

La fonction find the URL in the email body (rechercher l'URL dans le corps de l'e-mail) analyse le corps de l'e-mail pour y trouver des URL. Pour utiliser cette fonction, procédez comme suit :

  1. Pour chaque e-mail, recherchez la partie du corps du message contenant du texte brut :
  2. def find_url_in_email_message_body(siemplify, email_messages_data_list):
        """
        Search for a url in the email body,
        """
        all_found_url_in_emails_body_list = []
        for message in email_messages_data_list:
            for part in message.walk():
                if part.get_content_maintype() == 'text\plain':
                    continue
                
    
  3. Si le corps contient le contenu requis, chargez ces informations avec email_message_body = part.get_payload() et recherchez les URL à l'aide de l'expression régulière :
  4. 
    URLS_REGEX=r"http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+"
    
  5. Cet exemple extrait les URL du corps de l'e-mail :
  6. email_message_body = part.get_payload()
    all_found_urls = re.findall(URLS_REGEX, str(email_message_body))
    for url in all_found_urls:
        if url not in all_found_url_in_emails_body_list:
            all_found_url_in_emails_body_list.append(url)
    siemplify.LOGGER.info(f"The URL found : {all_found_url_in_emails_body_list}") 
    return all_found_url_in_emails_body_list 
  7. Après avoir examiné le code du connecteur, vous pouvez configurer un connecteur pour ingérer des demandes dans la plate-forme à partir d'une boîte de réception Gmail sélectionnée.

Vous avez encore besoin d'aide ? Obtenez des réponses de membres de la communauté et de professionnels Google SecOps.