Desarrolla el conector

Compatible con:

En este documento, se explica cómo compilar un conector nuevo en el entorno de desarrollo integrado (IDE) creando una integración y, luego, vinculando un conector a ella.

Crea una integración de conector de correo electrónico

Para crear una integración del conector de correo electrónico, sigue estos pasos:

  1. En la ventana Response > IDE, haz clic en Agregar para agregar un nuevo elemento de IDE.
  2. Selecciona Integración y asígnale el nombre Email Connector.
  3. Haz clic en Crear. La integración aparece en la barra lateral con un ícono predeterminado.
  4. Haz clic en more_vert Configurar integración personalizada y define estos parámetros de configuración:
    • Descripción
    • Ícono
    • Dependencias de Python
    • Parámetros de integración
  5. Haz clic en Guardar.

Crea un conector de correo electrónico

Para crear un conector de correo electrónico, sigue estos pasos:

  1. En la ventana Response > IDE, haz clic en Agregar para agregar un nuevo elemento de IDE.
  2. Selecciona el botón de selección Connector y asigna el nombre My Email Connector al conector.
  3. En la lista, selecciona la integración Email Connector para asociar el conector con la integración.
  4. Haz clic en Crear.

Establece los parámetros del conector

Después de crear el conector, establece sus parámetros:

  1. Establece los parámetros del conector según la siguiente tabla:

  2. Parámetro Descripción Tipo
    Username Obligatorio. Nombre de usuario de IMAP. Es la dirección de correo electrónico desde la que el conector extrae los correos electrónicos a la plataforma de Google SecOps. El valor predeterminado es email@gmail.com. cadena
    Password Obligatorio. Contraseña del protocolo de acceso a mensajes de Internet (IMAP) para la dirección de correo electrónico que usa el conector para extraer correos electrónicos a la plataforma de Google SecOps. contraseña
    IMAP Port Obligatorio. Es el puerto de red específico que usa IMAP para acceder a los correos electrónicos desde un servidor remoto. Por ejemplo, 993 (valor predeterminado). int
    IMAP Server Address Obligatorio. Es el servidor de correo entrante de una cuenta de IMAP. Por ejemplo, imap.gmail.com (valor predeterminado). cadena
    Folder to check for emails Opcional. Extrae correos electrónicos solo de la carpeta especificada. Por ejemplo, Inbox (valor predeterminado). Recibidos
  3. Ingresa los siguientes detalles del campo:
    1. Nombre del campo del producto = device_product: Determina el valor del campo sin procesar que se asignará al nombre del producto de la alerta. Busca el campo relacionado en el código, que se definió como Mail (producto).
      event["device_product"] = PRODUCT #The PRODUCT constant is `"Mail"
    2. Nombre del campo del evento = event_name: Determina el valor del campo sin procesar que se asignará al campo del tipo de evento. Busca el campo relacionado en el código, definido como Suspicious email.
      event["event_name"] = Suspicious email.

Edita el conector de correo electrónico

Para editar los parámetros del conector de correo electrónico, sigue estos pasos:

  1. Copia el siguiente código creado para My Email Connector, pégalo en el IDE y sigue las instrucciones:
    from SiemplifyConnectors import SiemplifyConnectorExecution
    from SiemplifyConnectorsDataModel import AlertInfo
    from SiemplifyUtils import output_handler, convert_datetime_to_unix_time, convert_string_to_datetime
    import email, imaplib, sys, re
    
    # CONSTANTS
    CONNECTOR_NAME = "Mail"
    VENDOR = "Mail"
    PRODUCT = "Mail"
    DEFAULT_PRIORITY = 60 # Default is Medium
    RULE_GENERATOR_EXAMPLE = "Mail"
    DEFAULT_FOLDER_TO_CHECK_INBOX = "inbox"
    DEFAULT_MESSAGES_TO_READ_UNSEEN = "UNSEEN"
    URLS_REGEX = r"(?i)\b(?:http(?:s)?:\/\/)?(?:www\.)?[a-zA-Z0-9:%_\+~#=][a-zA-Z0-9:%\._\+~#=]{1,255}\.[a-z]{2,6}\b(?:[-a-zA-Z0-9@:%_\+.~#?&//=]*)"
    
    
    def create_alert(siemplify, alert_id, email_message_data, datetime_in_unix_time, created_event):
        """Returns an alert which is one event that contains one unread email message"""
        siemplify.LOGGER.info(f"Started processing Alert {alert_id}")
    
        create_event = None
        alert_info = AlertInfo()
        # Initializes alert_info
        alert_info.display_id = f"{alert_id}" # Each alert needs to have a unique id, otherwise it won't create a case with the same alert id.
        alert_info.ticket_id = f"{alert_id}" # In default, ticket_id = display_id. However, if for some reason the external alert id is different from the display_id, you can save the original external alert id in the "ticket_id" field. 
        alert_info.name = email_message_data['Subject']
        alert_info.rule_generator = RULE_GENERATOR_EXAMPLE # The name of the siem rule which causes the creation of the alert. 
        alert_info.start_time = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime 
        alert_info.end_time = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime
        alert_info.priority = 60 # Informative = -1,Low = 40,Medium = 60,High = 80,Critical = 100.
        alert_info.device_vendor = VENDOR # The field will be fetched from the Original Alert. If you build this alert manually, state the source vendor of the data. (ie: Microsoft, Mcafee) 
        alert_info.device_product = PRODUCT # The field will be fetched from the Original Alert. If you build this alert manually, state the source product of the data. (ie: ActiveDirectory, AntiVirus)
    
        siemplify.LOGGER.info(f"Events creating started for alert {alert_id}")
        try:
            if created_event is not None:
                alert_info.events.append(created_event)
                siemplify.LOGGER.info(f"Added Event {alert_id} to Alert {alert_id}")
        # Raise an exception if failed to process the event
        except Exception as e:
            siemplify.LOGGER.error(f"Failed to process event {alert_id}")
            siemplify.LOGGER.exception(e)
        return alert_info
    
    
    def create_event(siemplify, alert_id, email_message_data, all_found_url_in_emails_body_list, datetime_in_unix_time):
        """Returns the digested data of a single unread email"""
        siemplify.LOGGER.info(f"--- Started processing Event: alert_id: {alert_id} | event_id: {alert_id}") 
        event = {} 
        event["StartTime"] = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime 
        event["EndTime"] = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime 
        event["event_name"] = "Suspicious email" 
        event["device_product"] = PRODUCT # ie: "device_product" is the field name that describes the product the event originated from. 
        event["Subject"] = email_message_data["Subject"] 
        event["SourceUserName"] = email_message_data["From"] 
        event["DestinationUserName"] = email_message_data["To"] 
        event["found_url"] = ",".join(all_found_url_in_emails_body_list) 
        siemplify.LOGGER.info(f"---Finished processing Event: alert_id: {alert_id} | event_id: {alert_id}")
        return event
    
    
    def find_url_in_email_message_body(siemplify, email_messages_data_list):
        """Search for a url in the email body"""
        all_found_url_in_emails_body_list = []
        for message in email_messages_data_list:
            for part in message.walk():
                if part.get_content_maintype() == 'text\plain':
                    continue
                email_message_body = part.get_payload()
                all_found_urls = re.findall(URLS_REGEX, str(email_message_body))
                for url in all_found_urls:
                    if url not in all_found_url_in_emails_body_list:
                        all_found_url_in_emails_body_list.append(url)
    
    
    def get_email_messages_data(imap_host, imap_port, username, password, folder_to_check):
        """Returns all unread email messages"""
        email_messages_data_list = []
    
        # Login to email using 'imap' module
        mail = imaplib.IMAP4_SSL(imap_host, imap_port)
        mail.login(username, password)
    
        # Determining the default email folder to pull emails from - 'inbox'
        if folder_to_check is None:
            folder_to_check = DEFAULT_FOLDER_TO_CHECK_INBOX
    
        # Selecting the email folder to pull the data from
        mail.select(folder_to_check)
    
        # Storing the email message data
        result, data = mail.search(None, DEFAULT_MESSAGES_TO_READ_UNSEEN)
    
        # If there are several emails collected in the cycle it will split each
        # email message into a separate item in the list chosen_mailbox_items_list
        if len(data) > 0:
            chosen_mailbox_items_list = data[0].split()
            # Iterating each email message and appending to emails_messages_data_list
            for item in chosen_mailbox_items_list:
                typ, email_data = mail.fetch(item, '(RFC 822)')
                # Decoding from binary string to string
                raw_email = email_data[0][1].decode("utf-8")
                # Turning the email data into an email object
                email_message = email.message_from_string(raw_email)
                # Appending the email message data to email_messages_data_list
                email_messages_data_list.append(email_message)
        return email_messages_data_list
    
    
    @output_handler
    def main(is_test_run):
        alerts = [] # The main output of each connector run that contains the alerts data 
        siemplify = SiemplifyConnectorExecution() # Siemplify main SDK wrapper
        siemplify.script_name = CONNECTOR_NAME
    
        # In case of running a test
        if (is_test_run):
            siemplify.LOGGER.info("This is an \"IDE Play Button\"\\\"Run Connector once\" test run")
    
        # Extracting the connector's Params
        username = siemplify.extract_connector_param(param_name="Username") 
        password = siemplify.extract_connector_param(param_name="Password") 
        imap_host = siemplify.extract_connector_param(param_name="IMAP Server Address")
        imap_port = siemplify.extract_connector_param(param_name="IMAP Port")
        folder_to_check = siemplify.extract_connector_param(param_name="Folder to check for emails")
    
        # Getting the digested email message data
        email_messages_data_list = get_email_messages_data(imap_host, imap_port, username, password, folder_to_check)
        # If the email_messages_data_list is not empty
        if len(email_messages_data_list) > 0:
            for message in email_messages_data_list:
                # Converting the email message datetime from string to unix time by SiemplifyUtils functions
                datetime_email_message = message['Date']
                string_to_datetime = convert_string_to_datetime(datetime_email_message)
                datetime_in_unix_time = convert_datetime_to_unix_time(string_to_datetime)
                found_urls_in_email_body = find_url_in_email_message_body(siemplify, email_messages_data_list)
                # Getting the unique id of each email message and removing the suffix '@mail.gmail.com' from the Message-ID, Each alert id can be ingested to the system only once.
                alert_id = message['Message-ID'].replace('@mail.gmail.com','') 
                # Creating the event by calling create_event() function 
                created_event = create_event(siemplify, alert_id, message, found_urls_in_email_body, datetime_in_unix_time) 
                # Creating the alert by calling create_alert() function 
                created_alert = create_alert(siemplify, alert_id, message, datetime_in_unix_time, created_event)
                # Checking that the created_alert is not None 
                if created_alert is not None: 
                    alerts.append(created_alert) 
                    siemplify.LOGGER.info(f'Added Alert {alert_id} to package results')
        # If the inbox for the user has no unread emails. 
        else: 
            siemplify.LOGGER.info(f'The inbox for user {username} has no unread emails') 
        # Returning all the created alerts to the cases module in Siemplify 
        siemplify.return_package(alerts) 
    
    
    if __name__ == '__main__':
    # Connectors run in iterations. The interval is configurable from the ConnectorsScreen UI. 
        is_test_run = not (len(sys.argv) < 2 or sys.argv[1] == 'True') 
        main(is_test_run)
    
  2. Después de copiar el código del conector, revisa los módulos necesarios que se deben importar y, luego, continúa con la función principal. Cada método llamado desde la función principal se analizará más adelante con mayor detalle.

Importaciones pertinentes

Un módulo de Python tiene un conjunto de funciones, clases o variables definidas e implementadas. Para implementar todas las siguientes funciones, importa esos módulos en la secuencia de comandos:

from SiemplifyConnectors import SiemplifyConnectorExecution # This module is responsible for executing the connector 
from SiemplifyConnectorsDataModel import AlertInfo # The data model that contains the alert info class
from SiemplifyUtils import output_handler, convert_datetime_to_unix_time, convert_string_to_datetime # The functions that convert time 
import email, imaplib, sys, re

Función principal

La función Main es el punto de entrada de la secuencia de comandos. El intérprete de Python ejecuta el código de forma secuencial y llama a cada método definido en la secuencia de comandos.

  1. Extrae los parámetros del conector. Usa la función siemplify.extract_connector_param para extraer cada parámetro configurado para el conector (username, password, imap_host, imap_port, folder_to_check).
    # Extracting the connector's Params
        username = siemplify.extract_connector_param(param_name="Username") 
        password = siemplify.extract_connector_param(param_name="Password") 
        imap_host = siemplify.extract_connector_param(param_name="IMAP Server Address") 
        imap_port = siemplify.extract_connector_param(param_name="IMAP Port") 
        folder_to_check = siemplify.extract_connector_param(param_name="Folder to check for emails")
        
  2. Usa la función get_email_messages_data (`imap_host`, `imap_port`, `username`, `password`, `folder_to_check) para recopilar toda la información de los correos electrónicos no leídos.
    # Getting the digested email message data 
        email_messages_data_list = get_email_messages_data(imap_host, imap_port, username, password, folder_to_check) 
  3. Después de recibir toda la información del correo electrónico, confirma que se recopiló y, luego, realiza las siguientes acciones en cada correo electrónico:
    # If the email_messages_data_list is not empty 
    if len(email_messages_data_list) > 0:
       for message in email_messages_data_list:
           # Converting the email message datetime from string to unix time by SiemplifyUtils functions

    Este código extrae la fecha del mensaje con datetime_email_message = message['Date']y, luego, convierte esta fecha y hora en hora de época de Unix con las funciones de Google SecOps:
    string_to_datetime = convert_string_to_datetime(datetime_email_message) 
    datetime_in_unix_time = convert_datetime_to_unix_time(string_to_datetime) 
  4. Busca URLs en el cuerpo del mensaje de correo electrónico con la función find_url_in_email_message_body(siemplify_email_messages_data_list). Si se encuentra una URL, usa otros productos de nuestro manual para verificar si es maliciosa.
  5. found_urls_in_email_body = find_url_in_email_message_body(siemplify, email_messages_data_list)
  6. Extrae el ID único de cada mensaje de correo electrónico y asígnalo a la variable alert_id:
    # Getting the unique id of each email message and removing the suffix '@mail.gmail.com' from the Message-ID, Each alert id can be ingested to the system only once. 
    alert_id = message['Message-ID'].replace('@mail.gmail.com','') 
  7. Después de extraer todos los detalles necesarios para transferir la alerta a la plataforma de Google SecOps, crea la alerta y el evento:
    # Creating the event by calling create_event() function 
    created_event = create_event(siemplify, alert_id, message, found_urls_in_email_body, datetime_in_unix_time) 
    # Creating the alert by calling create_alert() function 
    created_alert = create_alert(siemplify, alert_id, message, datetime_in_unix_time, created_event)
  8. Valida la alerta y el evento creados. Después de la validación, agrega la alerta a la lista de alertas.
        # Checking that the created_alert is not None 
        if created_alert is not None: 
            alerts.append(created_alert)     
            siemplify.LOGGER.info(f"Added Alert {alert_id} to package results")    
        
  9. En una situación en la que la carpeta Recibidos del usuario determinado no tenga correos electrónicos no leídos, agrega el siguiente código:
    else:
        siemplify.LOGGER.info(f"The inbox for user {username} has no unread emails")    
    
  10. Devuelve la lista de alertas al sistema, y cada alerta se presenta como un caso en la cola de casos:
        # Returning all the created alerts to the cases module in Siemplify 
        siemplify.return_package(alerts)
        
  11. Ejecuta la función Main dentro de los horarios que estableciste en la configuración del conector:
    if __name__ == "__main__": 
        # Connectors run in iterations. The interval is configurable from the Connectors UI. 
        is_test_run = not (len(sys.argv) < 2 or sys.argv[1] == 'True')
        main(is_test_run)
    

Obtén el mensaje de correo electrónico no leído

La función Get the unread email message se conecta al correo electrónico con los módulos Imap y Email, y recupera los detalles del mensaje de correo electrónico. También devuelve una lista que contiene toda la información de todos los mensajes de correo electrónico no leídos.

  1. Desde la clase principal, usa la función get_email_messages_data(imap_host, imap_port, username, password, folder_to_check).
    def get_email_messages_data(imap_host, imap_port, username, password, folder_to_check):
        """Returns all unread email messages"""
        email_messages_data_list = []
    
  2. Conéctate al correo electrónico con imap module:
        # Login to email using 'imap' module
        mail = imaplib.IMAP4_SSL(imap_host, imap_port)
        mail.login(username, password)
        
  3. Determina la carpeta del correo electrónico en la que se deben buscar mensajes no leídos. En este ejemplo, extraerás correos electrónicos de la carpeta Inbox (DEFAULT_FOLDER_TO_CHECK_INBOX = "inbox"):
  4. # Determining the default email folder to pull emails from - 'inbox'
    if folder_to_check is None:
        folder_to_check = DEFAULT_FOLDER_TO_CHECK_INBOX
    # Selecting the email folder to pull the data from mail.select(folder_to_check)
    
  5. Recopila todos los mensajes no leídos DEFAULT_MESSAGES_TO_READ_UNSEEN = "UNSEEN" y, luego, convierte estos datos en una lista:
    # Storing the email message data
    result, data = mail.search(None, DEFAULT_MESSAGES_TO_READ_UNSEEN)
    # If there are several emails collected in the cycle it will split each
    # email message into a separate item in the list chosen_mailbox_items_list
    if len(data) > 0:
        chosen_mailbox_items_list = data[0].split()
        # Iterating each email message and appending to emails_messages_data_list
        for item in chosen_mailbox_items_list:
            typ, email_data = mail.fetch(item, '(RFC 822)')
            # Decoding from binary string to string
            raw_email = email_data[0][1].decode("utf-8")
            # Turning the email data into an email object
            email_message = email.message_from_string(raw_email)
            # Appending the email message data to email_messages_data_list
            email_messages_data_list.append(email_message)
    return email_messages_data_list
    

Crea el evento

La función Create the event crea el evento asociando cada componente del mensaje de correo electrónico a los campos del evento, respectivamente.

  1. Desde la clase principal, crea el evento con la función: create_event(siemplify, alert_id, email_message_data, all_found_url_in_emails_body_list, datetime_in_unix_time)
  2. def create_event(siemplify, alert_id, email_message_data, all_found_url_in_emails_body_list, datetime_in_unix_time):
        """Returns the digested data of a single unread email"""
        siemplify.LOGGER.info(f"--- Started processing Event: alert_id: {alert_id} | event_id: {alert_id}") 
    
  3. Crea un diccionario para los campos del evento. Los campos obligatorios son los siguientes: event["StartTime"], event["EndTime"], event["event_name"] and event["device_product"]
  4. event = {} 
    event["StartTime"] = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime 
    event["EndTime"] = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime 
    event["event_name"] = "Suspicious email" 
    event["device_product"] = PRODUCT # ie: "device_product" is the field name that describes the product the event originated from. 
    event["Subject"] = email_message_data["Subject"] 
    event["SourceUserName"] = email_message_data["From"] 
    event["DestinationUserName"] = email_message_data["To"] 
    event["found_url"] = ",".join(all_found_url_in_emails_body_list) 
    siemplify.LOGGER.info(f"---Finished processing Event: alert_id: {alert_id} | event_id: {alert_id}")
    return event
    
  5. Cada alerta contiene uno o más eventos. En este ejemplo, una alerta contiene un solo evento: un mensaje de correo electrónico. Por lo tanto, después de crear el evento, crea la alerta que contenga toda la información del evento.

Crea la información de la alerta y, luego, inicializa los campos de características de la información de la alerta.

Esta función crea la alerta. Cada alerta contiene uno o más eventos. En este caso, cada alerta contiene un evento, que es básicamente un mensaje de correo electrónico.

  1. Desde la clase principal, crea la alerta:
    def create_alert(siemplify, alert_id, email_message_data,datetime_in_unix_time, created_event): 
        """Returns an alert which is one event that contains one unread email message"""
        siemplify.LOGGER.info(f"-------------- Started processing Alert {alert_id}")
        create_event = None
    
  2. Crea la instancia de alert_info y la inicializa:
    # Initializes the alert_info Characteristics Fields 
    alert_info.display_id = f"{alert_id}" 
    alert_info.ticket_id = f"{alert_id}" 
    alert_info.name = email_message_data['Subject'] 
    alert_info.rule_generator = RULE_GENERATOR_EXAMPLE 
    alert_info.start_time = datetime_in_unix_time 
    alert_info.end_time = datetime_in_unix_time 
    alert_info.device_vendor = VENDOR 
    alert_info.device_product = PRODUCT 
    
  3. Después de crear la alerta, valida el evento y agrégalo a las características de aert_info:
  4. siemplify.LOGGER.info(f"Events creating started for alert {alert_id}")
    try:
        if created_event is not None:
            alert_info.events.append(created_event)
        siemplify.LOGGER.info(f"Added Event {alert_id} to Alert {alert_id}")
    # Raise an exception if failed to process the event
    except Exception as e:
        siemplify.LOGGER.error(f"Failed to process event {alert_id}")
        siemplify.LOGGER.exception(e)
    return alert_info
    

Busca la URL en la función del cuerpo del correo electrónico

La función find the URL in the email body analiza el cuerpo del correo electrónico en busca de URLs. Para usar esta función, sigue estos pasos:

  1. Para cada mensaje de correo electrónico, ubica la parte del cuerpo del mensaje con contenido de texto sin formato:
  2. def find_url_in_email_message_body(siemplify, email_messages_data_list):
        """
        Search for a url in the email body,
        """
        all_found_url_in_emails_body_list = []
        for message in email_messages_data_list:
            for part in message.walk():
                if part.get_content_maintype() == 'text\plain':
                    continue
                
    
  3. Si el cuerpo contiene el contenido requerido, carga esta información con email_message_body = part.get_payload() y busca URLs con la siguiente expresión regular:
  4. 
    URLS_REGEX=r"http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+"
    
  5. En este ejemplo, se extraen URLs del cuerpo del correo electrónico:
  6. email_message_body = part.get_payload()
    all_found_urls = re.findall(URLS_REGEX, str(email_message_body))
    for url in all_found_urls:
        if url not in all_found_url_in_emails_body_list:
            all_found_url_in_emails_body_list.append(url)
    siemplify.LOGGER.info(f"The URL found : {all_found_url_in_emails_body_list}") 
    return all_found_url_in_emails_body_list 
  7. Después de revisar el código del conector, puedes configurarlo para que ingiera casos en la plataforma desde una carpeta Recibidos de Gmail seleccionada.

¿Necesitas más ayuda? Obtén respuestas de miembros de la comunidad y profesionales de Google SecOps.