Desarrollar el conector

Disponible en:

En este documento se explica cómo crear un conector en el entorno de desarrollo integrado (IDE) creando una integración y, a continuación, vinculando un conector a ella.

Crear una integración de conector de correo electrónico

Para crear una integración de conector de correo electrónico, sigue estos pasos:

  1. En la ventana Respuesta > IDE, haz clic en Añadir para añadir un nuevo elemento del IDE.
  2. Seleccione Integración y asigne un nombre a la integración Email Connector.
  3. Haz clic en Crear. La integración aparece en la barra lateral con un icono predeterminado.
  4. Haga clic en more_vert Configurar integración personalizada y defina estos ajustes:
    • Descripción
    • Icono
    • Dependencias de Python
    • Parámetros de integración
  5. Haz clic en Guardar.

Crear un conector de correo

Para crear un conector de correo electrónico, sigue estos pasos:

  1. En la ventana Respuesta > IDE, haz clic en Añadir para añadir un nuevo elemento del IDE.
  2. Seleccione el botón de radio Conector y asigne el nombre My Email Connector al conector.
  3. En la lista, selecciona la integración Email Connector para asociar el conector a la integración.
  4. Haz clic en Crear.

Definir los parámetros del conector

Una vez que hayas creado el conector, define sus parámetros:

  1. Define los parámetros del conector según la tabla:

  2. Parámetro Descripción Tipo
    Username Obligatorio. Nombre de usuario de IMAP. La dirección de correo desde la que el conector extrae los correos a la plataforma Google SecOps. El valor predeterminado es email@gmail.com. cadena
    Password Obligatorio. Contraseña del protocolo de acceso a mensajes de Internet (IMAP) de la dirección de correo que usa el conector para extraer correos a la plataforma de Google SecOps. contraseña
    IMAP Port Obligatorio. El puerto de red específico que usa IMAP para acceder a los correos de un servidor remoto. Por ejemplo, 993 (valor predeterminado). int
    IMAP Server Address Obligatorio. El servidor de correo entrante de una cuenta IMAP. Por ejemplo, imap.gmail.com (valor predeterminado). cadena
    Folder to check for emails Opcional. Extrae correos solo de la carpeta especificada. Por ejemplo, Inbox (valor predeterminado). inbox
  3. Introduce los detalles de los siguientes campos:
    1. Nombre del campo de producto = device_product: determina el valor del campo sin formato que se asignará al nombre del producto de la alerta. Busca el campo relacionado en el código, que se ha definido como Mail (producto).
      event["device_product"] = PRODUCT #The PRODUCT constant is `"Mail"
    2. Nombre del campo de evento = event_name: determina el valor del campo sin procesar que se asignará al campo de tipo de evento. Busque el campo relacionado en el código, definido como Suspicious email.
      event["event_name"] = Suspicious email.

Editar el conector de correo

Para editar los parámetros del conector de correo electrónico, sigue estos pasos:

  1. Copia el siguiente código creado para Mi conector de correo electrónico, pégalo en el IDE y sigue las instrucciones:
    from SiemplifyConnectors import SiemplifyConnectorExecution
    from SiemplifyConnectorsDataModel import AlertInfo
    from SiemplifyUtils import output_handler, convert_datetime_to_unix_time, convert_string_to_datetime
    import email, imaplib, sys, re
    
    # CONSTANTS
    CONNECTOR_NAME = "Mail"
    VENDOR = "Mail"
    PRODUCT = "Mail"
    DEFAULT_PRIORITY = 60 # Default is Medium
    RULE_GENERATOR_EXAMPLE = "Mail"
    DEFAULT_FOLDER_TO_CHECK_INBOX = "inbox"
    DEFAULT_MESSAGES_TO_READ_UNSEEN = "UNSEEN"
    URLS_REGEX = r"(?i)\b(?:http(?:s)?:\/\/)?(?:www\.)?[a-zA-Z0-9:%_\+~#=][a-zA-Z0-9:%\._\+~#=]{1,255}\.[a-z]{2,6}\b(?:[-a-zA-Z0-9@:%_\+.~#?&//=]*)"
    
    
    def create_alert(siemplify, alert_id, email_message_data, datetime_in_unix_time, created_event):
        """Returns an alert which is one event that contains one unread email message"""
        siemplify.LOGGER.info(f"Started processing Alert {alert_id}")
    
        create_event = None
        alert_info = AlertInfo()
        # Initializes alert_info
        alert_info.display_id = f"{alert_id}" # Each alert needs to have a unique id, otherwise it won't create a case with the same alert id.
        alert_info.ticket_id = f"{alert_id}" # In default, ticket_id = display_id. However, if for some reason the external alert id is different from the display_id, you can save the original external alert id in the "ticket_id" field. 
        alert_info.name = email_message_data['Subject']
        alert_info.rule_generator = RULE_GENERATOR_EXAMPLE # The name of the siem rule which causes the creation of the alert. 
        alert_info.start_time = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime 
        alert_info.end_time = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime
        alert_info.priority = 60 # Informative = -1,Low = 40,Medium = 60,High = 80,Critical = 100.
        alert_info.device_vendor = VENDOR # The field will be fetched from the Original Alert. If you build this alert manually, state the source vendor of the data. (ie: Microsoft, Mcafee) 
        alert_info.device_product = PRODUCT # The field will be fetched from the Original Alert. If you build this alert manually, state the source product of the data. (ie: ActiveDirectory, AntiVirus)
    
        siemplify.LOGGER.info(f"Events creating started for alert {alert_id}")
        try:
            if created_event is not None:
                alert_info.events.append(created_event)
                siemplify.LOGGER.info(f"Added Event {alert_id} to Alert {alert_id}")
        # Raise an exception if failed to process the event
        except Exception as e:
            siemplify.LOGGER.error(f"Failed to process event {alert_id}")
            siemplify.LOGGER.exception(e)
        return alert_info
    
    
    def create_event(siemplify, alert_id, email_message_data, all_found_url_in_emails_body_list, datetime_in_unix_time):
        """Returns the digested data of a single unread email"""
        siemplify.LOGGER.info(f"--- Started processing Event: alert_id: {alert_id} | event_id: {alert_id}") 
        event = {} 
        event["StartTime"] = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime 
        event["EndTime"] = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime 
        event["event_name"] = "Suspicious email" 
        event["device_product"] = PRODUCT # ie: "device_product" is the field name that describes the product the event originated from. 
        event["Subject"] = email_message_data["Subject"] 
        event["SourceUserName"] = email_message_data["From"] 
        event["DestinationUserName"] = email_message_data["To"] 
        event["found_url"] = ",".join(all_found_url_in_emails_body_list) 
        siemplify.LOGGER.info(f"---Finished processing Event: alert_id: {alert_id} | event_id: {alert_id}")
        return event
    
    
    def find_url_in_email_message_body(siemplify, email_messages_data_list):
        """Search for a url in the email body"""
        all_found_url_in_emails_body_list = []
        for message in email_messages_data_list:
            for part in message.walk():
                if part.get_content_maintype() == 'text\plain':
                    continue
                email_message_body = part.get_payload()
                all_found_urls = re.findall(URLS_REGEX, str(email_message_body))
                for url in all_found_urls:
                    if url not in all_found_url_in_emails_body_list:
                        all_found_url_in_emails_body_list.append(url)
    
    
    def get_email_messages_data(imap_host, imap_port, username, password, folder_to_check):
        """Returns all unread email messages"""
        email_messages_data_list = []
    
        # Login to email using 'imap' module
        mail = imaplib.IMAP4_SSL(imap_host, imap_port)
        mail.login(username, password)
    
        # Determining the default email folder to pull emails from - 'inbox'
        if folder_to_check is None:
            folder_to_check = DEFAULT_FOLDER_TO_CHECK_INBOX
    
        # Selecting the email folder to pull the data from
        mail.select(folder_to_check)
    
        # Storing the email message data
        result, data = mail.search(None, DEFAULT_MESSAGES_TO_READ_UNSEEN)
    
        # If there are several emails collected in the cycle it will split each
        # email message into a separate item in the list chosen_mailbox_items_list
        if len(data) > 0:
            chosen_mailbox_items_list = data[0].split()
            # Iterating each email message and appending to emails_messages_data_list
            for item in chosen_mailbox_items_list:
                typ, email_data = mail.fetch(item, '(RFC 822)')
                # Decoding from binary string to string
                raw_email = email_data[0][1].decode("utf-8")
                # Turning the email data into an email object
                email_message = email.message_from_string(raw_email)
                # Appending the email message data to email_messages_data_list
                email_messages_data_list.append(email_message)
        return email_messages_data_list
    
    
    @output_handler
    def main(is_test_run):
        alerts = [] # The main output of each connector run that contains the alerts data 
        siemplify = SiemplifyConnectorExecution() # Siemplify main SDK wrapper
        siemplify.script_name = CONNECTOR_NAME
    
        # In case of running a test
        if (is_test_run):
            siemplify.LOGGER.info("This is an \"IDE Play Button\"\\\"Run Connector once\" test run")
    
        # Extracting the connector's Params
        username = siemplify.extract_connector_param(param_name="Username") 
        password = siemplify.extract_connector_param(param_name="Password") 
        imap_host = siemplify.extract_connector_param(param_name="IMAP Server Address")
        imap_port = siemplify.extract_connector_param(param_name="IMAP Port")
        folder_to_check = siemplify.extract_connector_param(param_name="Folder to check for emails")
    
        # Getting the digested email message data
        email_messages_data_list = get_email_messages_data(imap_host, imap_port, username, password, folder_to_check)
        # If the email_messages_data_list is not empty
        if len(email_messages_data_list) > 0:
            for message in email_messages_data_list:
                # Converting the email message datetime from string to unix time by SiemplifyUtils functions
                datetime_email_message = message['Date']
                string_to_datetime = convert_string_to_datetime(datetime_email_message)
                datetime_in_unix_time = convert_datetime_to_unix_time(string_to_datetime)
                found_urls_in_email_body = find_url_in_email_message_body(siemplify, email_messages_data_list)
                # Getting the unique id of each email message and removing the suffix '@mail.gmail.com' from the Message-ID, Each alert id can be ingested to the system only once.
                alert_id = message['Message-ID'].replace('@mail.gmail.com','') 
                # Creating the event by calling create_event() function 
                created_event = create_event(siemplify, alert_id, message, found_urls_in_email_body, datetime_in_unix_time) 
                # Creating the alert by calling create_alert() function 
                created_alert = create_alert(siemplify, alert_id, message, datetime_in_unix_time, created_event)
                # Checking that the created_alert is not None 
                if created_alert is not None: 
                    alerts.append(created_alert) 
                    siemplify.LOGGER.info(f'Added Alert {alert_id} to package results')
        # If the inbox for the user has no unread emails. 
        else: 
            siemplify.LOGGER.info(f'The inbox for user {username} has no unread emails') 
        # Returning all the created alerts to the cases module in Siemplify 
        siemplify.return_package(alerts) 
    
    
    if __name__ == '__main__':
    # Connectors run in iterations. The interval is configurable from the ConnectorsScreen UI. 
        is_test_run = not (len(sys.argv) < 2 or sys.argv[1] == 'True') 
        main(is_test_run)
    
  2. Después de copiar el código del conector, revisa los módulos que debes importar y, a continuación, ve a la función principal. Más adelante, hablaremos con más detalle de cada método llamado desde la función principal.

Importaciones relevantes

Un módulo de Python tiene un conjunto de funciones, clases o variables definidas e implementadas. Para implementar todas las funciones siguientes, importe esos módulos en la secuencia de comandos:

from SiemplifyConnectors import SiemplifyConnectorExecution # This module is responsible for executing the connector 
from SiemplifyConnectorsDataModel import AlertInfo # The data model that contains the alert info class
from SiemplifyUtils import output_handler, convert_datetime_to_unix_time, convert_string_to_datetime # The functions that convert time 
import email, imaplib, sys, re

Función principal

La función Main es el punto de entrada de la secuencia de comandos. El intérprete de Python ejecuta el código de forma secuencial y llama a cada método definido en la secuencia de comandos.

  1. Extrae los parámetros del conector. Usa la función siemplify.extract_connector_param para extraer cada parámetro configurado del conector (username, password, imap_host, imap_port y folder_to_check).
    # Extracting the connector's Params
        username = siemplify.extract_connector_param(param_name="Username") 
        password = siemplify.extract_connector_param(param_name="Password") 
        imap_host = siemplify.extract_connector_param(param_name="IMAP Server Address") 
        imap_port = siemplify.extract_connector_param(param_name="IMAP Port") 
        folder_to_check = siemplify.extract_connector_param(param_name="Folder to check for emails")
        
  2. Usa la función get_email_messages_data (`imap_host`, `imap_port`, `username`, `password`, `folder_to_check) para recoger toda la información de los correos no leídos.
    # Getting the digested email message data 
        email_messages_data_list = get_email_messages_data(imap_host, imap_port, username, password, folder_to_check) 
  3. Cuando hayas recibido toda la información del correo, confirma que se ha recogido y, a continuación, realiza estas acciones en cada correo:
    # If the email_messages_data_list is not empty 
    if len(email_messages_data_list) > 0:
       for message in email_messages_data_list:
           # Converting the email message datetime from string to unix time by SiemplifyUtils functions

    Este código extrae la fecha del mensaje mediante datetime_email_message = message['Date'] y, a continuación, convierte esta fecha y hora en una marca de tiempo Unix mediante las funciones de Google SecOps:
    string_to_datetime = convert_string_to_datetime(datetime_email_message) 
    datetime_in_unix_time = convert_datetime_to_unix_time(string_to_datetime) 
  4. Busca URLs en el cuerpo del mensaje de correo mediante la función find_url_in_email_message_body(siemplify_email_messages_data_list). Si se encuentra una URL, usa otros productos de nuestra guía para comprobar si es maliciosa.
  5. found_urls_in_email_body = find_url_in_email_message_body(siemplify, email_messages_data_list)
  6. Extrae el ID único de cada mensaje de correo y asígnalo a la variable alert_id:
    # Getting the unique id of each email message and removing the suffix '@mail.gmail.com' from the Message-ID, Each alert id can be ingested to the system only once. 
    alert_id = message['Message-ID'].replace('@mail.gmail.com','') 
  7. Una vez que hayas extraído todos los detalles necesarios para ingerir la alerta en la plataforma de Google SecOps, crea la alerta y el evento:
    # Creating the event by calling create_event() function 
    created_event = create_event(siemplify, alert_id, message, found_urls_in_email_body, datetime_in_unix_time) 
    # Creating the alert by calling create_alert() function 
    created_alert = create_alert(siemplify, alert_id, message, datetime_in_unix_time, created_event)
  8. Valida la alerta y el evento creados. Después de validarla, añade la alerta a la lista de alertas.
        # Checking that the created_alert is not None 
        if created_alert is not None: 
            alerts.append(created_alert)     
            siemplify.LOGGER.info(f"Added Alert {alert_id} to package results")    
        
  9. En un caso en el que la bandeja de entrada del usuario no tenga correos sin leer, añade el siguiente código:
    else:
        siemplify.LOGGER.info(f"The inbox for user {username} has no unread emails")    
    
  10. Devuelve la lista de alertas al sistema y, a continuación, cada alerta se presenta como un caso en la cola de casos:
        # Returning all the created alerts to the cases module in Siemplify 
        siemplify.return_package(alerts)
        
  11. Ejecuta la función Main dentro de los plazos que hayas definido en la configuración del conector:
    if __name__ == "__main__": 
        # Connectors run in iterations. The interval is configurable from the Connectors UI. 
        is_test_run = not (len(sys.argv) < 2 or sys.argv[1] == 'True')
        main(is_test_run)
    

Obtener el mensaje de correo no leído

La función Obtener el mensaje de correo no leído se conecta al correo con los módulos Imap y Email, y obtiene los detalles del mensaje de correo. También devuelve una lista que contiene toda la información de todos los mensajes de correo no leídos.

  1. En la clase principal, usa la función: get_email_messages_data(imap_host, imap_port, username, password, folder_to_check).
    def get_email_messages_data(imap_host, imap_port, username, password, folder_to_check):
        """Returns all unread email messages"""
        email_messages_data_list = []
    
  2. Conéctate al correo mediante el imap module:
        # Login to email using 'imap' module
        mail = imaplib.IMAP4_SSL(imap_host, imap_port)
        mail.login(username, password)
        
  3. Determina la carpeta del correo en la que quieres comprobar si hay mensajes sin leer. En este ejemplo, se extraen correos de la carpeta Bandeja de entrada (DEFAULT_FOLDER_TO_CHECK_INBOX = "inbox"):
  4. # Determining the default email folder to pull emails from - 'inbox'
    if folder_to_check is None:
        folder_to_check = DEFAULT_FOLDER_TO_CHECK_INBOX
    # Selecting the email folder to pull the data from mail.select(folder_to_check)
    
  5. Recoge todos los mensajes no leídos DEFAULT_MESSAGES_TO_READ_UNSEEN = "UNSEEN" y, a continuación, convierte estos datos en una lista:
    # Storing the email message data
    result, data = mail.search(None, DEFAULT_MESSAGES_TO_READ_UNSEEN)
    # If there are several emails collected in the cycle it will split each
    # email message into a separate item in the list chosen_mailbox_items_list
    if len(data) > 0:
        chosen_mailbox_items_list = data[0].split()
        # Iterating each email message and appending to emails_messages_data_list
        for item in chosen_mailbox_items_list:
            typ, email_data = mail.fetch(item, '(RFC 822)')
            # Decoding from binary string to string
            raw_email = email_data[0][1].decode("utf-8")
            # Turning the email data into an email object
            email_message = email.message_from_string(raw_email)
            # Appending the email message data to email_messages_data_list
            email_messages_data_list.append(email_message)
    return email_messages_data_list
    

Crear el evento

La función Crear evento crea el evento asociando cada componente del mensaje de correo a los campos del evento, respectivamente.

  1. Desde la clase principal, crea el evento con la función create_event(siemplify, alert_id, email_message_data, all_found_url_in_emails_body_list, datetime_in_unix_time):
  2. def create_event(siemplify, alert_id, email_message_data, all_found_url_in_emails_body_list, datetime_in_unix_time):
        """Returns the digested data of a single unread email"""
        siemplify.LOGGER.info(f"--- Started processing Event: alert_id: {alert_id} | event_id: {alert_id}") 
    
  3. Crea un diccionario para los campos del evento. Los campos obligatorios son los siguientes: event["StartTime"], event["EndTime"], event["event_name"] and event["device_product"]
  4. event = {} 
    event["StartTime"] = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime 
    event["EndTime"] = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime 
    event["event_name"] = "Suspicious email" 
    event["device_product"] = PRODUCT # ie: "device_product" is the field name that describes the product the event originated from. 
    event["Subject"] = email_message_data["Subject"] 
    event["SourceUserName"] = email_message_data["From"] 
    event["DestinationUserName"] = email_message_data["To"] 
    event["found_url"] = ",".join(all_found_url_in_emails_body_list) 
    siemplify.LOGGER.info(f"---Finished processing Event: alert_id: {alert_id} | event_id: {alert_id}")
    return event
    
  5. Cada alerta contiene uno o varios eventos. En este ejemplo, una alerta contiene un solo evento: un mensaje de correo. Por lo tanto, después de crear el evento, crea la alerta que contenga toda la información del evento.

Crea la información de la alerta e inicializa los campos de las características de la información de la alerta.

Esta función crea la alerta. Cada alerta contiene uno o varios eventos. En este caso, cada alerta contiene un evento, que es básicamente un mensaje de correo.

  1. En la clase principal, crea la alerta:
    def create_alert(siemplify, alert_id, email_message_data,datetime_in_unix_time, created_event): 
        """Returns an alert which is one event that contains one unread email message"""
        siemplify.LOGGER.info(f"-------------- Started processing Alert {alert_id}")
        create_event = None
    
  2. Crea la instancia de alert_info e inicialízala:
    # Initializes the alert_info Characteristics Fields 
    alert_info.display_id = f"{alert_id}" 
    alert_info.ticket_id = f"{alert_id}" 
    alert_info.name = email_message_data['Subject'] 
    alert_info.rule_generator = RULE_GENERATOR_EXAMPLE 
    alert_info.start_time = datetime_in_unix_time 
    alert_info.end_time = datetime_in_unix_time 
    alert_info.device_vendor = VENDOR 
    alert_info.device_product = PRODUCT 
    
  3. Después de crear la alerta, valida el evento y añádelo a las características de aert_info:
  4. siemplify.LOGGER.info(f"Events creating started for alert {alert_id}")
    try:
        if created_event is not None:
            alert_info.events.append(created_event)
        siemplify.LOGGER.info(f"Added Event {alert_id} to Alert {alert_id}")
    # Raise an exception if failed to process the event
    except Exception as e:
        siemplify.LOGGER.error(f"Failed to process event {alert_id}")
        siemplify.LOGGER.exception(e)
    return alert_info
    

Buscar la URL en la función del cuerpo del correo

La función buscar la URL en el cuerpo del correo analiza el cuerpo del correo en busca de URLs. Para usar esta función, sigue estos pasos:

  1. En cada mensaje de correo, busca la parte del cuerpo del mensaje que contenga texto sin formato:
  2. def find_url_in_email_message_body(siemplify, email_messages_data_list):
        """
        Search for a url in the email body,
        """
        all_found_url_in_emails_body_list = []
        for message in email_messages_data_list:
            for part in message.walk():
                if part.get_content_maintype() == 'text\plain':
                    continue
                
    
  3. Si el cuerpo contiene el contenido necesario, carga esta información con email_message_body = part.get_payload() y busca URLs con la expresión regular:
  4. 
    URLS_REGEX=r"http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+"
    
  5. En este ejemplo se extraen URLs del cuerpo del correo:
  6. email_message_body = part.get_payload()
    all_found_urls = re.findall(URLS_REGEX, str(email_message_body))
    for url in all_found_urls:
        if url not in all_found_url_in_emails_body_list:
            all_found_url_in_emails_body_list.append(url)
    siemplify.LOGGER.info(f"The URL found : {all_found_url_in_emails_body_list}") 
    return all_found_url_in_emails_body_list 
  7. Después de revisar el código del conector, puedes configurarlo para que ingiera casos en la plataforma desde una bandeja de entrada de Gmail seleccionada.

¿Necesitas más ayuda? Recibe respuestas de los miembros de la comunidad y de los profesionales de Google SecOps.