Desarrolla el conector
En este documento, se explica cómo compilar un conector nuevo en el entorno de desarrollo integrado (IDE) creando una integración y, luego, vinculando un conector a ella.
Crea una integración de conector de correo electrónico
Para crear una integración del conector de correo electrónico, sigue estos pasos:
- En la ventana Response > IDE, haz clic en Agregar para agregar un nuevo elemento de IDE.
- Selecciona Integración y asígnale el nombre
Email Connector
. - Haz clic en Crear. La integración aparece en la barra lateral con un ícono predeterminado.
- Haz clic en more_vert Configurar integración personalizada y define estos parámetros de configuración:
- Descripción
- Ícono
- Dependencias de Python
- Parámetros de integración
- Haz clic en Guardar.
Crea un conector de correo electrónico
Para crear un conector de correo electrónico, sigue estos pasos:
- En la ventana Response > IDE, haz clic en Agregar para agregar un nuevo elemento de IDE.
- Selecciona el botón de selección Connector y asigna el nombre
My Email Connector
al conector. - En la lista, selecciona la integración Email Connector para asociar el conector con la integración.
- Haz clic en Crear.
Establece los parámetros del conector
Después de crear el conector, establece sus parámetros:
- Establece los parámetros del conector según la siguiente tabla:
- Ingresa los siguientes detalles del campo:
- Nombre del campo del producto = device_product: Determina el valor del campo sin procesar que se asignará al nombre del producto de la alerta. Busca el campo relacionado en el código, que se definió como
Mail
(producto).event["device_product"] = PRODUCT #The PRODUCT constant is `"Mail"
- Nombre del campo del evento = event_name: Determina el valor del campo sin procesar que se asignará al campo del tipo de evento. Busca el campo relacionado en el código, definido como
Suspicious email
.event["event_name"] = Suspicious email
.
Parámetro | Descripción | Tipo |
---|---|---|
Username |
Obligatorio. Nombre de usuario de IMAP. Es la dirección de correo electrónico desde la que el conector extrae los correos electrónicos a la plataforma de Google SecOps. El valor predeterminado es email@gmail.com . |
cadena |
Password |
Obligatorio. Contraseña del protocolo de acceso a mensajes de Internet (IMAP) para la dirección de correo electrónico que usa el conector para extraer correos electrónicos a la plataforma de Google SecOps. | contraseña |
IMAP Port |
Obligatorio. Es el puerto de red específico que usa IMAP para acceder a los correos electrónicos desde un servidor remoto. Por ejemplo, 993 (valor predeterminado). |
int |
IMAP Server Address |
Obligatorio. Es el servidor de correo entrante de una cuenta de IMAP. Por ejemplo, imap.gmail.com (valor predeterminado). |
cadena |
Folder to check for emails |
Opcional. Extrae correos electrónicos solo de la carpeta especificada. Por ejemplo, Inbox (valor predeterminado). |
Recibidos |
Edita el conector de correo electrónico
Para editar los parámetros del conector de correo electrónico, sigue estos pasos:
- Copia el siguiente código creado para My Email Connector, pégalo en el IDE y sigue las instrucciones:
from SiemplifyConnectors import SiemplifyConnectorExecution from SiemplifyConnectorsDataModel import AlertInfo from SiemplifyUtils import output_handler, convert_datetime_to_unix_time, convert_string_to_datetime import email, imaplib, sys, re # CONSTANTS CONNECTOR_NAME = "Mail" VENDOR = "Mail" PRODUCT = "Mail" DEFAULT_PRIORITY = 60 # Default is Medium RULE_GENERATOR_EXAMPLE = "Mail" DEFAULT_FOLDER_TO_CHECK_INBOX = "inbox" DEFAULT_MESSAGES_TO_READ_UNSEEN = "UNSEEN" URLS_REGEX = r"(?i)\b(?:http(?:s)?:\/\/)?(?:www\.)?[a-zA-Z0-9:%_\+~#=][a-zA-Z0-9:%\._\+~#=]{1,255}\.[a-z]{2,6}\b(?:[-a-zA-Z0-9@:%_\+.~#?&//=]*)" def create_alert(siemplify, alert_id, email_message_data, datetime_in_unix_time, created_event): """Returns an alert which is one event that contains one unread email message""" siemplify.LOGGER.info(f"Started processing Alert {alert_id}") create_event = None alert_info = AlertInfo() # Initializes alert_info alert_info.display_id = f"{alert_id}" # Each alert needs to have a unique id, otherwise it won't create a case with the same alert id. alert_info.ticket_id = f"{alert_id}" # In default, ticket_id = display_id. However, if for some reason the external alert id is different from the display_id, you can save the original external alert id in the "ticket_id" field. alert_info.name = email_message_data['Subject'] alert_info.rule_generator = RULE_GENERATOR_EXAMPLE # The name of the siem rule which causes the creation of the alert. alert_info.start_time = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime alert_info.end_time = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime alert_info.priority = 60 # Informative = -1,Low = 40,Medium = 60,High = 80,Critical = 100. alert_info.device_vendor = VENDOR # The field will be fetched from the Original Alert. If you build this alert manually, state the source vendor of the data. (ie: Microsoft, Mcafee) alert_info.device_product = PRODUCT # The field will be fetched from the Original Alert. If you build this alert manually, state the source product of the data. (ie: ActiveDirectory, AntiVirus) siemplify.LOGGER.info(f"Events creating started for alert {alert_id}") try: if created_event is not None: alert_info.events.append(created_event) siemplify.LOGGER.info(f"Added Event {alert_id} to Alert {alert_id}") # Raise an exception if failed to process the event except Exception as e: siemplify.LOGGER.error(f"Failed to process event {alert_id}") siemplify.LOGGER.exception(e) return alert_info def create_event(siemplify, alert_id, email_message_data, all_found_url_in_emails_body_list, datetime_in_unix_time): """Returns the digested data of a single unread email""" siemplify.LOGGER.info(f"--- Started processing Event: alert_id: {alert_id} | event_id: {alert_id}") event = {} event["StartTime"] = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime event["EndTime"] = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime event["event_name"] = "Suspicious email" event["device_product"] = PRODUCT # ie: "device_product" is the field name that describes the product the event originated from. event["Subject"] = email_message_data["Subject"] event["SourceUserName"] = email_message_data["From"] event["DestinationUserName"] = email_message_data["To"] event["found_url"] = ",".join(all_found_url_in_emails_body_list) siemplify.LOGGER.info(f"---Finished processing Event: alert_id: {alert_id} | event_id: {alert_id}") return event def find_url_in_email_message_body(siemplify, email_messages_data_list): """Search for a url in the email body""" all_found_url_in_emails_body_list = [] for message in email_messages_data_list: for part in message.walk(): if part.get_content_maintype() == 'text\plain': continue email_message_body = part.get_payload() all_found_urls = re.findall(URLS_REGEX, str(email_message_body)) for url in all_found_urls: if url not in all_found_url_in_emails_body_list: all_found_url_in_emails_body_list.append(url) def get_email_messages_data(imap_host, imap_port, username, password, folder_to_check): """Returns all unread email messages""" email_messages_data_list = [] # Login to email using 'imap' module mail = imaplib.IMAP4_SSL(imap_host, imap_port) mail.login(username, password) # Determining the default email folder to pull emails from - 'inbox' if folder_to_check is None: folder_to_check = DEFAULT_FOLDER_TO_CHECK_INBOX # Selecting the email folder to pull the data from mail.select(folder_to_check) # Storing the email message data result, data = mail.search(None, DEFAULT_MESSAGES_TO_READ_UNSEEN) # If there are several emails collected in the cycle it will split each # email message into a separate item in the list chosen_mailbox_items_list if len(data) > 0: chosen_mailbox_items_list = data[0].split() # Iterating each email message and appending to emails_messages_data_list for item in chosen_mailbox_items_list: typ, email_data = mail.fetch(item, '(RFC 822)') # Decoding from binary string to string raw_email = email_data[0][1].decode("utf-8") # Turning the email data into an email object email_message = email.message_from_string(raw_email) # Appending the email message data to email_messages_data_list email_messages_data_list.append(email_message) return email_messages_data_list @output_handler def main(is_test_run): alerts = [] # The main output of each connector run that contains the alerts data siemplify = SiemplifyConnectorExecution() # Siemplify main SDK wrapper siemplify.script_name = CONNECTOR_NAME # In case of running a test if (is_test_run): siemplify.LOGGER.info("This is an \"IDE Play Button\"\\\"Run Connector once\" test run") # Extracting the connector's Params username = siemplify.extract_connector_param(param_name="Username") password = siemplify.extract_connector_param(param_name="Password") imap_host = siemplify.extract_connector_param(param_name="IMAP Server Address") imap_port = siemplify.extract_connector_param(param_name="IMAP Port") folder_to_check = siemplify.extract_connector_param(param_name="Folder to check for emails") # Getting the digested email message data email_messages_data_list = get_email_messages_data(imap_host, imap_port, username, password, folder_to_check) # If the email_messages_data_list is not empty if len(email_messages_data_list) > 0: for message in email_messages_data_list: # Converting the email message datetime from string to unix time by SiemplifyUtils functions datetime_email_message = message['Date'] string_to_datetime = convert_string_to_datetime(datetime_email_message) datetime_in_unix_time = convert_datetime_to_unix_time(string_to_datetime) found_urls_in_email_body = find_url_in_email_message_body(siemplify, email_messages_data_list) # Getting the unique id of each email message and removing the suffix '@mail.gmail.com' from the Message-ID, Each alert id can be ingested to the system only once. alert_id = message['Message-ID'].replace('@mail.gmail.com','') # Creating the event by calling create_event() function created_event = create_event(siemplify, alert_id, message, found_urls_in_email_body, datetime_in_unix_time) # Creating the alert by calling create_alert() function created_alert = create_alert(siemplify, alert_id, message, datetime_in_unix_time, created_event) # Checking that the created_alert is not None if created_alert is not None: alerts.append(created_alert) siemplify.LOGGER.info(f'Added Alert {alert_id} to package results') # If the inbox for the user has no unread emails. else: siemplify.LOGGER.info(f'The inbox for user {username} has no unread emails') # Returning all the created alerts to the cases module in Siemplify siemplify.return_package(alerts) if __name__ == '__main__': # Connectors run in iterations. The interval is configurable from the ConnectorsScreen UI. is_test_run = not (len(sys.argv) < 2 or sys.argv[1] == 'True') main(is_test_run)
- Después de copiar el código del conector, revisa los módulos necesarios que se deben importar y, luego, continúa con la función principal. Cada método llamado desde la función principal se analizará más adelante con mayor detalle.
Importaciones pertinentes
Un módulo de Python tiene un conjunto de funciones, clases o variables definidas e implementadas. Para implementar todas las siguientes funciones, importa esos módulos en la secuencia de comandos:
from SiemplifyConnectors import SiemplifyConnectorExecution # This module is responsible for executing the connector from SiemplifyConnectorsDataModel import AlertInfo # The data model that contains the alert info class from SiemplifyUtils import output_handler, convert_datetime_to_unix_time, convert_string_to_datetime # The functions that convert time import email, imaplib, sys, re
Función principal
La función Main es el punto de entrada de la secuencia de comandos. El intérprete de Python ejecuta el código de forma secuencial y llama a cada método definido en la secuencia de comandos.
- Extrae los parámetros del conector. Usa la función
siemplify.extract_connector_param
para extraer cada parámetro configurado para el conector (username
,password
,imap_host
,imap_port
,folder_to_check
).# Extracting the connector's Params username = siemplify.extract_connector_param(param_name="Username") password = siemplify.extract_connector_param(param_name="Password") imap_host = siemplify.extract_connector_param(param_name="IMAP Server Address") imap_port = siemplify.extract_connector_param(param_name="IMAP Port") folder_to_check = siemplify.extract_connector_param(param_name="Folder to check for emails")
- Usa la función
get_email_messages_data (`imap_host`, `imap_port`, `username`, `password`, `folder_to_check)
para recopilar toda la información de los correos electrónicos no leídos.# Getting the digested email message data email_messages_data_list = get_email_messages_data(imap_host, imap_port, username, password, folder_to_check)
- Después de recibir toda la información del correo electrónico, confirma que se recopiló y, luego, realiza las siguientes acciones en cada correo electrónico:
# If the email_messages_data_list is not empty if len(email_messages_data_list) > 0: for message in email_messages_data_list: # Converting the email message datetime from string to unix time by SiemplifyUtils functions
Este código extrae la fecha del mensaje condatetime_email_message = message['Date']
y, luego, convierte esta fecha y hora en hora de época de Unix con las funciones de Google SecOps:
string_to_datetime = convert_string_to_datetime(datetime_email_message) datetime_in_unix_time = convert_datetime_to_unix_time(string_to_datetime)
- Busca URLs en el cuerpo del mensaje de correo electrónico con la función
find_url_in_email_message_body(siemplify_email_messages_data_list)
. Si se encuentra una URL, usa otros productos de nuestro manual para verificar si es maliciosa. - Extrae el ID único de cada mensaje de correo electrónico y asígnalo a la variable
alert_id
:# Getting the unique id of each email message and removing the suffix '@mail.gmail.com' from the Message-ID, Each alert id can be ingested to the system only once. alert_id = message['Message-ID'].replace('@mail.gmail.com','')
- Después de extraer todos los detalles necesarios para transferir la alerta a la plataforma de Google SecOps, crea la alerta y el evento:
# Creating the event by calling create_event() function created_event = create_event(siemplify, alert_id, message, found_urls_in_email_body, datetime_in_unix_time) # Creating the alert by calling create_alert() function created_alert = create_alert(siemplify, alert_id, message, datetime_in_unix_time, created_event)
- Valida la alerta y el evento creados. Después de la validación, agrega la alerta a la lista de alertas.
# Checking that the created_alert is not None if created_alert is not None: alerts.append(created_alert) siemplify.LOGGER.info(f"Added Alert {alert_id} to package results")
- En una situación en la que la carpeta Recibidos del usuario determinado no tenga correos electrónicos no leídos, agrega el siguiente código:
else: siemplify.LOGGER.info(f"The inbox for user {username} has no unread emails")
- Devuelve la lista de alertas al sistema, y cada alerta se presenta como un caso en la cola de casos:
# Returning all the created alerts to the cases module in Siemplify siemplify.return_package(alerts)
- Ejecuta la función
Main
dentro de los horarios que estableciste en la configuración del conector:if __name__ == "__main__": # Connectors run in iterations. The interval is configurable from the Connectors UI. is_test_run = not (len(sys.argv) < 2 or sys.argv[1] == 'True') main(is_test_run)
found_urls_in_email_body = find_url_in_email_message_body(siemplify, email_messages_data_list)
Obtén el mensaje de correo electrónico no leído
La función Get the unread email message se conecta al correo electrónico con los módulos Imap
y Email
, y recupera los detalles del mensaje de correo electrónico. También devuelve una lista que contiene toda la información de todos los mensajes de correo electrónico no leídos.
- Desde la clase principal, usa la función
get_email_messages_data(imap_host, imap_port, username, password, folder_to_check)
.def get_email_messages_data(imap_host, imap_port, username, password, folder_to_check): """Returns all unread email messages""" email_messages_data_list = []
- Conéctate al correo electrónico con
imap module
:# Login to email using 'imap' module mail = imaplib.IMAP4_SSL(imap_host, imap_port) mail.login(username, password)
- Determina la carpeta del correo electrónico en la que se deben buscar mensajes no leídos.
En este ejemplo, extraerás correos electrónicos de la carpeta Inbox
(DEFAULT_FOLDER_TO_CHECK_INBOX = "inbox")
: - Recopila todos los mensajes no leídos
DEFAULT_MESSAGES_TO_READ_UNSEEN = "UNSEEN"
y, luego, convierte estos datos en una lista:# Storing the email message data result, data = mail.search(None, DEFAULT_MESSAGES_TO_READ_UNSEEN) # If there are several emails collected in the cycle it will split each # email message into a separate item in the list chosen_mailbox_items_list if len(data) > 0: chosen_mailbox_items_list = data[0].split() # Iterating each email message and appending to emails_messages_data_list for item in chosen_mailbox_items_list: typ, email_data = mail.fetch(item, '(RFC 822)') # Decoding from binary string to string raw_email = email_data[0][1].decode("utf-8") # Turning the email data into an email object email_message = email.message_from_string(raw_email) # Appending the email message data to email_messages_data_list email_messages_data_list.append(email_message) return email_messages_data_list
# Determining the default email folder to pull emails from - 'inbox' if folder_to_check is None: folder_to_check = DEFAULT_FOLDER_TO_CHECK_INBOX # Selecting the email folder to pull the data from mail.select(folder_to_check)
Crea el evento
La función Create the event crea el evento asociando cada componente del mensaje de correo electrónico a los campos del evento, respectivamente.
- Desde la clase principal, crea el evento con la función:
create_event(siemplify, alert_id, email_message_data, all_found_url_in_emails_body_list, datetime_in_unix_time)
-
Crea un diccionario para los campos del evento. Los campos obligatorios son los siguientes:
event["StartTime"], event["EndTime"], event["event_name"] and event["device_product"]
- Cada alerta contiene uno o más eventos. En este ejemplo, una alerta contiene un solo evento: un mensaje de correo electrónico. Por lo tanto, después de crear el evento, crea la alerta que contenga toda la información del evento.
def create_event(siemplify, alert_id, email_message_data, all_found_url_in_emails_body_list, datetime_in_unix_time): """Returns the digested data of a single unread email""" siemplify.LOGGER.info(f"--- Started processing Event: alert_id: {alert_id} | event_id: {alert_id}")
event = {} event["StartTime"] = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime event["EndTime"] = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime event["event_name"] = "Suspicious email" event["device_product"] = PRODUCT # ie: "device_product" is the field name that describes the product the event originated from. event["Subject"] = email_message_data["Subject"] event["SourceUserName"] = email_message_data["From"] event["DestinationUserName"] = email_message_data["To"] event["found_url"] = ",".join(all_found_url_in_emails_body_list) siemplify.LOGGER.info(f"---Finished processing Event: alert_id: {alert_id} | event_id: {alert_id}") return event
Crea la información de la alerta y, luego, inicializa los campos de características de la información de la alerta.
Esta función crea la alerta. Cada alerta contiene uno o más eventos. En este caso, cada alerta contiene un evento, que es básicamente un mensaje de correo electrónico.
- Desde la clase principal, crea la alerta:
def create_alert(siemplify, alert_id, email_message_data,datetime_in_unix_time, created_event): """Returns an alert which is one event that contains one unread email message""" siemplify.LOGGER.info(f"-------------- Started processing Alert {alert_id}") create_event = None
- Crea la instancia de
alert_info
y la inicializa:# Initializes the alert_info Characteristics Fields alert_info.display_id = f"{alert_id}" alert_info.ticket_id = f"{alert_id}" alert_info.name = email_message_data['Subject'] alert_info.rule_generator = RULE_GENERATOR_EXAMPLE alert_info.start_time = datetime_in_unix_time alert_info.end_time = datetime_in_unix_time alert_info.device_vendor = VENDOR alert_info.device_product = PRODUCT
- Después de crear la alerta, valida el evento y agrégalo a las características de
aert_info
:
siemplify.LOGGER.info(f"Events creating started for alert {alert_id}") try: if created_event is not None: alert_info.events.append(created_event) siemplify.LOGGER.info(f"Added Event {alert_id} to Alert {alert_id}") # Raise an exception if failed to process the event except Exception as e: siemplify.LOGGER.error(f"Failed to process event {alert_id}") siemplify.LOGGER.exception(e) return alert_info
Busca la URL en la función del cuerpo del correo electrónico
La función find the URL in the email body analiza el cuerpo del correo electrónico en busca de URLs. Para usar esta función, sigue estos pasos:
- Para cada mensaje de correo electrónico, ubica la parte del cuerpo del mensaje con contenido de texto sin formato:
- Si el cuerpo contiene el contenido requerido, carga esta información con
email_message_body = part.get_payload()
y busca URLs con la siguiente expresión regular: - En este ejemplo, se extraen URLs del cuerpo del correo electrónico:
- Después de revisar el código del conector, puedes configurarlo para que ingiera casos en la plataforma desde una carpeta Recibidos de Gmail seleccionada.
def find_url_in_email_message_body(siemplify, email_messages_data_list):
"""
Search for a url in the email body,
"""
all_found_url_in_emails_body_list = []
for message in email_messages_data_list:
for part in message.walk():
if part.get_content_maintype() == 'text\plain':
continue
URLS_REGEX=r"http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+"
email_message_body = part.get_payload() all_found_urls = re.findall(URLS_REGEX, str(email_message_body)) for url in all_found_urls: if url not in all_found_url_in_emails_body_list: all_found_url_in_emails_body_list.append(url) siemplify.LOGGER.info(f"The URL found : {all_found_url_in_emails_body_list}") return all_found_url_in_emails_body_list
¿Necesitas más ayuda? Obtén respuestas de miembros de la comunidad y profesionales de Google SecOps.