Développer le connecteur
Ce document explique comment créer un connecteur dans l'environnement de développement intégré (IDE) en créant une intégration, puis en y associant un connecteur.
Créer une intégration de connecteur d'e-mail
Pour créer une intégration de connecteur d'e-mails, procédez comme suit :
- Dans la fenêtre Réponse > IDE, cliquez sur Ajouter pour ajouter un élément IDE.
- Sélectionnez Intégration et nommez l'intégration
Email Connector
. - Cliquez sur Créer. L'intégration s'affiche dans la barre latérale avec une icône par défaut.
- Cliquez sur more_vert Configurer l'intégration personnalisée et définissez les paramètres suivants :
- Description
- Icône
- Dépendances Python
- Paramètres d'intégration
- Cliquez sur Enregistrer.
Créer un connecteur d'e-mails
Pour créer un connecteur d'e-mails :
- Dans la fenêtre Réponse > IDE, cliquez sur Ajouter pour ajouter un élément IDE.
- Sélectionnez la case d'option Connecteur et nommez le connecteur
My Email Connector
. - Dans la liste, sélectionnez l'intégration Email Connector (Connecteur d'e-mails) pour associer le connecteur à l'intégration.
- Cliquez sur Créer.
Définir les paramètres du connecteur
Après avoir créé le connecteur, définissez ses paramètres :
- Définissez les paramètres du connecteur en fonction du tableau :
- Saisissez les informations suivantes dans les champs :
- Product Field Name = device_product : détermine la valeur brute du champ à attribuer au nom du produit de l'alerte. Recherchez le champ associé dans le code, qui a été défini comme
Mail
(produit).event["device_product"] = PRODUCT #The PRODUCT constant is `"Mail"
- Nom du champ d'événement = event_name : détermine la valeur brute du champ à attribuer au champ du type d'événement. Recherchez le champ associé dans le code, défini comme
Suspicious email
.event["event_name"] = Suspicious email
Paramètre | Description | Type |
---|---|---|
Username |
Obligatoire. Nom d'utilisateur IMAP. Adresse e-mail à partir de laquelle le connecteur extrait les e-mails vers la plate-forme Google SecOps. La valeur par défaut est email@gmail.com . |
chaîne |
Password |
Obligatoire. Mot de passe IMAP (Internet Message Access Protocol) de l'adresse e-mail utilisée par le connecteur pour importer les e-mails dans la plate-forme Google SecOps. | mot de passe |
IMAP Port |
Obligatoire. Port réseau spécifique utilisé par IMAP pour accéder aux e-mails d'un serveur distant. Par exemple : 993 (valeur par défaut). |
entier |
IMAP Server Address |
Obligatoire. Serveur de courrier entrant pour un compte IMAP. Par exemple, imap.gmail.com (valeur par défaut). |
chaîne |
Folder to check for emails |
Facultatif. Extrait les e-mails uniquement du dossier spécifié. Par exemple : Inbox (valeur par défaut). |
inbox |
Modifier le connecteur d'e-mails
Pour modifier les paramètres du connecteur d'e-mails, procédez comme suit :
- Copiez le code suivant créé pour My Email Connector, collez-le dans l'IDE, puis suivez les instructions :
from SiemplifyConnectors import SiemplifyConnectorExecution from SiemplifyConnectorsDataModel import AlertInfo from SiemplifyUtils import output_handler, convert_datetime_to_unix_time, convert_string_to_datetime import email, imaplib, sys, re # CONSTANTS CONNECTOR_NAME = "Mail" VENDOR = "Mail" PRODUCT = "Mail" DEFAULT_PRIORITY = 60 # Default is Medium RULE_GENERATOR_EXAMPLE = "Mail" DEFAULT_FOLDER_TO_CHECK_INBOX = "inbox" DEFAULT_MESSAGES_TO_READ_UNSEEN = "UNSEEN" URLS_REGEX = r"(?i)\b(?:http(?:s)?:\/\/)?(?:www\.)?[a-zA-Z0-9:%_\+~#=][a-zA-Z0-9:%\._\+~#=]{1,255}\.[a-z]{2,6}\b(?:[-a-zA-Z0-9@:%_\+.~#?&//=]*)" def create_alert(siemplify, alert_id, email_message_data, datetime_in_unix_time, created_event): """Returns an alert which is one event that contains one unread email message""" siemplify.LOGGER.info(f"Started processing Alert {alert_id}") create_event = None alert_info = AlertInfo() # Initializes alert_info alert_info.display_id = f"{alert_id}" # Each alert needs to have a unique id, otherwise it won't create a case with the same alert id. alert_info.ticket_id = f"{alert_id}" # In default, ticket_id = display_id. However, if for some reason the external alert id is different from the display_id, you can save the original external alert id in the "ticket_id" field. alert_info.name = email_message_data['Subject'] alert_info.rule_generator = RULE_GENERATOR_EXAMPLE # The name of the siem rule which causes the creation of the alert. alert_info.start_time = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime alert_info.end_time = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime alert_info.priority = 60 # Informative = -1,Low = 40,Medium = 60,High = 80,Critical = 100. alert_info.device_vendor = VENDOR # The field will be fetched from the Original Alert. If you build this alert manually, state the source vendor of the data. (ie: Microsoft, Mcafee) alert_info.device_product = PRODUCT # The field will be fetched from the Original Alert. If you build this alert manually, state the source product of the data. (ie: ActiveDirectory, AntiVirus) siemplify.LOGGER.info(f"Events creating started for alert {alert_id}") try: if created_event is not None: alert_info.events.append(created_event) siemplify.LOGGER.info(f"Added Event {alert_id} to Alert {alert_id}") # Raise an exception if failed to process the event except Exception as e: siemplify.LOGGER.error(f"Failed to process event {alert_id}") siemplify.LOGGER.exception(e) return alert_info def create_event(siemplify, alert_id, email_message_data, all_found_url_in_emails_body_list, datetime_in_unix_time): """Returns the digested data of a single unread email""" siemplify.LOGGER.info(f"--- Started processing Event: alert_id: {alert_id} | event_id: {alert_id}") event = {} event["StartTime"] = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime event["EndTime"] = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime event["event_name"] = "Suspicious email" event["device_product"] = PRODUCT # ie: "device_product" is the field name that describes the product the event originated from. event["Subject"] = email_message_data["Subject"] event["SourceUserName"] = email_message_data["From"] event["DestinationUserName"] = email_message_data["To"] event["found_url"] = ",".join(all_found_url_in_emails_body_list) siemplify.LOGGER.info(f"---Finished processing Event: alert_id: {alert_id} | event_id: {alert_id}") return event def find_url_in_email_message_body(siemplify, email_messages_data_list): """Search for a url in the email body""" all_found_url_in_emails_body_list = [] for message in email_messages_data_list: for part in message.walk(): if part.get_content_maintype() == 'text\plain': continue email_message_body = part.get_payload() all_found_urls = re.findall(URLS_REGEX, str(email_message_body)) for url in all_found_urls: if url not in all_found_url_in_emails_body_list: all_found_url_in_emails_body_list.append(url) def get_email_messages_data(imap_host, imap_port, username, password, folder_to_check): """Returns all unread email messages""" email_messages_data_list = [] # Login to email using 'imap' module mail = imaplib.IMAP4_SSL(imap_host, imap_port) mail.login(username, password) # Determining the default email folder to pull emails from - 'inbox' if folder_to_check is None: folder_to_check = DEFAULT_FOLDER_TO_CHECK_INBOX # Selecting the email folder to pull the data from mail.select(folder_to_check) # Storing the email message data result, data = mail.search(None, DEFAULT_MESSAGES_TO_READ_UNSEEN) # If there are several emails collected in the cycle it will split each # email message into a separate item in the list chosen_mailbox_items_list if len(data) > 0: chosen_mailbox_items_list = data[0].split() # Iterating each email message and appending to emails_messages_data_list for item in chosen_mailbox_items_list: typ, email_data = mail.fetch(item, '(RFC 822)') # Decoding from binary string to string raw_email = email_data[0][1].decode("utf-8") # Turning the email data into an email object email_message = email.message_from_string(raw_email) # Appending the email message data to email_messages_data_list email_messages_data_list.append(email_message) return email_messages_data_list @output_handler def main(is_test_run): alerts = [] # The main output of each connector run that contains the alerts data siemplify = SiemplifyConnectorExecution() # Siemplify main SDK wrapper siemplify.script_name = CONNECTOR_NAME # In case of running a test if (is_test_run): siemplify.LOGGER.info("This is an \"IDE Play Button\"\\\"Run Connector once\" test run") # Extracting the connector's Params username = siemplify.extract_connector_param(param_name="Username") password = siemplify.extract_connector_param(param_name="Password") imap_host = siemplify.extract_connector_param(param_name="IMAP Server Address") imap_port = siemplify.extract_connector_param(param_name="IMAP Port") folder_to_check = siemplify.extract_connector_param(param_name="Folder to check for emails") # Getting the digested email message data email_messages_data_list = get_email_messages_data(imap_host, imap_port, username, password, folder_to_check) # If the email_messages_data_list is not empty if len(email_messages_data_list) > 0: for message in email_messages_data_list: # Converting the email message datetime from string to unix time by SiemplifyUtils functions datetime_email_message = message['Date'] string_to_datetime = convert_string_to_datetime(datetime_email_message) datetime_in_unix_time = convert_datetime_to_unix_time(string_to_datetime) found_urls_in_email_body = find_url_in_email_message_body(siemplify, email_messages_data_list) # Getting the unique id of each email message and removing the suffix '@mail.gmail.com' from the Message-ID, Each alert id can be ingested to the system only once. alert_id = message['Message-ID'].replace('@mail.gmail.com','') # Creating the event by calling create_event() function created_event = create_event(siemplify, alert_id, message, found_urls_in_email_body, datetime_in_unix_time) # Creating the alert by calling create_alert() function created_alert = create_alert(siemplify, alert_id, message, datetime_in_unix_time, created_event) # Checking that the created_alert is not None if created_alert is not None: alerts.append(created_alert) siemplify.LOGGER.info(f'Added Alert {alert_id} to package results') # If the inbox for the user has no unread emails. else: siemplify.LOGGER.info(f'The inbox for user {username} has no unread emails') # Returning all the created alerts to the cases module in Siemplify siemplify.return_package(alerts) if __name__ == '__main__': # Connectors run in iterations. The interval is configurable from the ConnectorsScreen UI. is_test_run = not (len(sys.argv) < 2 or sys.argv[1] == 'True') main(is_test_run)
- Après avoir copié le code du connecteur, examinez les modules à importer, puis passez à la fonction principale. Chaque méthode appelée à partir de la fonction principale sera abordée plus en détail ultérieurement.
Importations pertinentes
Un module Python comporte un ensemble de fonctions, de classes ou de variables définies et implémentées. Pour implémenter toutes les fonctions suivantes, importez ces modules dans le script :
from SiemplifyConnectors import SiemplifyConnectorExecution # This module is responsible for executing the connector from SiemplifyConnectorsDataModel import AlertInfo # The data model that contains the alert info class from SiemplifyUtils import output_handler, convert_datetime_to_unix_time, convert_string_to_datetime # The functions that convert time import email, imaplib, sys, re
Fonction principale
La fonction Main est le point d'entrée du script. L'interpréteur Python exécute le code de manière séquentielle et appelle chaque méthode définie dans le script.
- Extrayez les paramètres du connecteur. Utilisez la fonction
siemplify.extract_connector_param
pour extraire chaque paramètre configuré pour le connecteur (username
,password
,imap_host
,imap_port
,folder_to_check
).# Extracting the connector's Params username = siemplify.extract_connector_param(param_name="Username") password = siemplify.extract_connector_param(param_name="Password") imap_host = siemplify.extract_connector_param(param_name="IMAP Server Address") imap_port = siemplify.extract_connector_param(param_name="IMAP Port") folder_to_check = siemplify.extract_connector_param(param_name="Folder to check for emails")
- Utilisez la fonction
get_email_messages_data (`imap_host`, `imap_port`, `username`, `password`, `folder_to_check)
pour collecter toutes les informations des e-mails non lus.# Getting the digested email message data email_messages_data_list = get_email_messages_data(imap_host, imap_port, username, password, folder_to_check)
- Une fois que vous avez reçu toutes les informations de l'e-mail, confirmez qu'elles ont été collectées, puis effectuez les actions suivantes sur chaque e-mail :
# If the email_messages_data_list is not empty if len(email_messages_data_list) > 0: for message in email_messages_data_list: # Converting the email message datetime from string to unix time by SiemplifyUtils functions
Ce code extrait la date du message pardatetime_email_message = message['Date']
, puis convertit cette date et heure en heure epoch Unix à l'aide des fonctions Google SecOps :
string_to_datetime = convert_string_to_datetime(datetime_email_message) datetime_in_unix_time = convert_datetime_to_unix_time(string_to_datetime)
- Recherchez des URL dans le corps du message à l'aide de la fonction
find_url_in_email_message_body(siemplify_email_messages_data_list)
. Si vous trouvez une URL, utilisez d'autres produits de notre guide pour vérifier si elle est malveillante. - Extrayez l'ID unique de chaque message électronique et attribuez-le à la variable
alert_id
:# Getting the unique id of each email message and removing the suffix '@mail.gmail.com' from the Message-ID, Each alert id can be ingested to the system only once. alert_id = message['Message-ID'].replace('@mail.gmail.com','')
- Après avoir extrait tous les détails nécessaires pour ingérer l'alerte dans la plate-forme Google SecOps, créez l'alerte et l'événement :
# Creating the event by calling create_event() function created_event = create_event(siemplify, alert_id, message, found_urls_in_email_body, datetime_in_unix_time) # Creating the alert by calling create_alert() function created_alert = create_alert(siemplify, alert_id, message, datetime_in_unix_time, created_event)
- Validez l'alerte et l'événement créés. Après la validation, ajoutez l'alerte à la liste des alertes.
# Checking that the created_alert is not None if created_alert is not None: alerts.append(created_alert) siemplify.LOGGER.info(f"Added Alert {alert_id} to package results")
- Dans le cas où la boîte de réception de l'utilisateur donné ne contient aucun e-mail non lu, ajoutez le code suivant :
else: siemplify.LOGGER.info(f"The inbox for user {username} has no unread emails")
- La liste des alertes est renvoyée au système, et chaque alerte est ensuite présentée comme une demande dans la file d'attente des demandes :
# Returning all the created alerts to the cases module in Siemplify siemplify.return_package(alerts)
- Exécutez la fonction
Main
dans les limites de temps que vous avez définies dans la configuration du connecteur :if __name__ == "__main__": # Connectors run in iterations. The interval is configurable from the Connectors UI. is_test_run = not (len(sys.argv) < 2 or sys.argv[1] == 'True') main(is_test_run)
found_urls_in_email_body = find_url_in_email_message_body(siemplify, email_messages_data_list)
Récupérer l'e-mail non lu
La fonction Get the unread email message (Récupérer le message électronique non lu) se connecte à l'e-mail avec les modules Imap
et Email
, et récupère les détails du message électronique. Il renvoie également une liste contenant toutes les informations de tous les e-mails non lus.
- Dans la classe principale, utilisez la fonction :
get_email_messages_data(imap_host, imap_port, username, password, folder_to_check)
.def get_email_messages_data(imap_host, imap_port, username, password, folder_to_check): """Returns all unread email messages""" email_messages_data_list = []
- Connectez-vous à l'e-mail à l'aide de
imap module
:# Login to email using 'imap' module mail = imaplib.IMAP4_SSL(imap_host, imap_port) mail.login(username, password)
- Déterminez le dossier de l'e-mail dans lequel rechercher les messages non lus.
Dans cet exemple, vous extrayez les e-mails du dossier Boîte de réception
(DEFAULT_FOLDER_TO_CHECK_INBOX = "inbox")
: - Collectez tous les messages non lus
DEFAULT_MESSAGES_TO_READ_UNSEEN = "UNSEEN"
, puis convertissez ces données en liste :# Storing the email message data result, data = mail.search(None, DEFAULT_MESSAGES_TO_READ_UNSEEN) # If there are several emails collected in the cycle it will split each # email message into a separate item in the list chosen_mailbox_items_list if len(data) > 0: chosen_mailbox_items_list = data[0].split() # Iterating each email message and appending to emails_messages_data_list for item in chosen_mailbox_items_list: typ, email_data = mail.fetch(item, '(RFC 822)') # Decoding from binary string to string raw_email = email_data[0][1].decode("utf-8") # Turning the email data into an email object email_message = email.message_from_string(raw_email) # Appending the email message data to email_messages_data_list email_messages_data_list.append(email_message) return email_messages_data_list
# Determining the default email folder to pull emails from - 'inbox' if folder_to_check is None: folder_to_check = DEFAULT_FOLDER_TO_CHECK_INBOX # Selecting the email folder to pull the data from mail.select(folder_to_check)
Créer l'événement
La fonction Créer l'événement crée l'événement en associant chaque composant du message électronique aux champs de l'événement, respectivement.
- À partir de la classe principale, créez l'événement à l'aide de la fonction
create_event(siemplify, alert_id, email_message_data, all_found_url_in_emails_body_list, datetime_in_unix_time)
. -
Créez un dictionnaire pour les champs d'événement. Les champs obligatoires sont les suivants :
event["StartTime"], event["EndTime"], event["event_name"] and event["device_product"]
- Chaque alerte contient un ou plusieurs événements. Dans cet exemple, une alerte contient un seul événement : un message électronique. Par conséquent, après avoir créé l'événement, créez l'alerte qui contient toutes les informations sur l'événement.
def create_event(siemplify, alert_id, email_message_data, all_found_url_in_emails_body_list, datetime_in_unix_time): """Returns the digested data of a single unread email""" siemplify.LOGGER.info(f"--- Started processing Event: alert_id: {alert_id} | event_id: {alert_id}")
event = {} event["StartTime"] = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime event["EndTime"] = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime event["event_name"] = "Suspicious email" event["device_product"] = PRODUCT # ie: "device_product" is the field name that describes the product the event originated from. event["Subject"] = email_message_data["Subject"] event["SourceUserName"] = email_message_data["From"] event["DestinationUserName"] = email_message_data["To"] event["found_url"] = ",".join(all_found_url_in_emails_body_list) siemplify.LOGGER.info(f"---Finished processing Event: alert_id: {alert_id} | event_id: {alert_id}") return event
Créez les informations sur l'alerte et initialisez les champs de caractéristiques des informations sur l'alerte.
Cette fonction crée l'alerte. Chaque alerte contient un ou plusieurs événements. Dans ce cas, chaque alerte contient un événement, qui correspond à un message électronique.
- Dans la classe principale, créez l'alerte :
def create_alert(siemplify, alert_id, email_message_data,datetime_in_unix_time, created_event): """Returns an alert which is one event that contains one unread email message""" siemplify.LOGGER.info(f"-------------- Started processing Alert {alert_id}") create_event = None
- Créez l'instance
alert_info
et initialisez-la :# Initializes the alert_info Characteristics Fields alert_info.display_id = f"{alert_id}" alert_info.ticket_id = f"{alert_id}" alert_info.name = email_message_data['Subject'] alert_info.rule_generator = RULE_GENERATOR_EXAMPLE alert_info.start_time = datetime_in_unix_time alert_info.end_time = datetime_in_unix_time alert_info.device_vendor = VENDOR alert_info.device_product = PRODUCT
- Après avoir créé l'alerte, validez l'événement et ajoutez-le aux caractéristiques
aert_info
:
siemplify.LOGGER.info(f"Events creating started for alert {alert_id}") try: if created_event is not None: alert_info.events.append(created_event) siemplify.LOGGER.info(f"Added Event {alert_id} to Alert {alert_id}") # Raise an exception if failed to process the event except Exception as e: siemplify.LOGGER.error(f"Failed to process event {alert_id}") siemplify.LOGGER.exception(e) return alert_info
Trouver l'URL dans la fonction du corps de l'e-mail
La fonction find the URL in the email body (rechercher l'URL dans le corps de l'e-mail) analyse le corps de l'e-mail pour y trouver des URL. Pour utiliser cette fonction, procédez comme suit :
- Pour chaque e-mail, recherchez la partie du corps du message contenant du texte brut :
- Si le corps contient le contenu requis, chargez ces informations avec
email_message_body = part.get_payload()
et recherchez les URL à l'aide de l'expression régulière : - Cet exemple extrait les URL du corps de l'e-mail :
- Après avoir examiné le code du connecteur, vous pouvez configurer un connecteur pour ingérer des demandes dans la plate-forme à partir d'une boîte de réception Gmail sélectionnée.
def find_url_in_email_message_body(siemplify, email_messages_data_list):
"""
Search for a url in the email body,
"""
all_found_url_in_emails_body_list = []
for message in email_messages_data_list:
for part in message.walk():
if part.get_content_maintype() == 'text\plain':
continue
URLS_REGEX=r"http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+"
email_message_body = part.get_payload() all_found_urls = re.findall(URLS_REGEX, str(email_message_body)) for url in all_found_urls: if url not in all_found_url_in_emails_body_list: all_found_url_in_emails_body_list.append(url) siemplify.LOGGER.info(f"The URL found : {all_found_url_in_emails_body_list}") return all_found_url_in_emails_body_list
Vous avez encore besoin d'aide ? Obtenez des réponses de membres de la communauté et de professionnels Google SecOps.