Sviluppa il connettore

Supportato in:

Questo documento spiega come creare un nuovo connettore nell'ambiente di sviluppo integrato (IDE) creando un'integrazione e poi collegandovi un connettore.

Creare un'integrazione del connettore email

Per creare un'integrazione del connettore email:

  1. Nella finestra Risposta > IDE, fai clic su Aggiungi per aggiungere un nuovo elemento IDE.
  2. Seleziona Integrazione e assegna all'integrazione il nome Email Connector.
  3. Fai clic su Crea. L'integrazione viene visualizzata nella barra laterale con un'icona predefinita.
  4. Fai clic su more_vert
    • Descrizione
    • Icona
    • Dipendenze Python
    • Parametri di integrazione
  5. Fai clic su Salva.

Crea un connettore email

Per creare un connettore email:

  1. Nella finestra Risposta > IDE, fai clic su Aggiungi per aggiungere un nuovo elemento IDE.
  2. Seleziona il pulsante di opzione Connettore e assegna un nome al connettore My Email Connector.
  3. Nell'elenco, seleziona l'integrazione Email Connector per associare il connettore all'integrazione.
  4. Fai clic su Crea.

Imposta i parametri del connettore

Dopo aver creato il connettore, imposta i parametri del connettore:

  1. Imposta i parametri del connettore in base alla tabella:

  2. Parametro Descrizione Tipo
    Username Obbligatorio. Nome utente IMAP. L'indirizzo email da cui il connettore estrae le email nella piattaforma Google SecOps. Il valore predefinito è email@gmail.com. string
    Password Obbligatorio. Password del protocollo IMAP (Internet Message Access Protocol) per l'indirizzo email utilizzato dal connettore per estrarre le email nella piattaforma Google SecOps. password
    IMAP Port Obbligatorio. La porta di rete specifica utilizzata da IMAP per accedere alle email da un server remoto. Ad esempio: 993 (valore predefinito). int
    IMAP Server Address Obbligatorio. Il server di posta in arrivo per un account IMAP. Ad esempio, imap.gmail.com (valore predefinito). string
    Folder to check for emails (Facoltativo) Recupera le email solo dalla cartella specificata. Ad esempio: Inbox (valore predefinito). inbox
  3. Inserisci i seguenti dettagli del campo:
    1. Nome campo prodotto = device_product: determina il valore del campo non elaborato da assegnare al nome del prodotto dell'avviso. Trova il campo correlato nel codice, definito come Mail (prodotto).
      event["device_product"] = PRODUCT #The PRODUCT constant is `"Mail"
    2. Nome campo evento = event_name: determina il valore del campo non elaborato da assegnare al campo del tipo di evento. Trova il campo correlato nel codice, definito come Suspicious email.
      event["event_name"] = Suspicious email

Modificare il connettore email

Per modificare i parametri del connettore email:

  1. Copia il seguente codice creato per My Email Connector, incollalo nell'IDE e segui le istruzioni:
    from SiemplifyConnectors import SiemplifyConnectorExecution
    from SiemplifyConnectorsDataModel import AlertInfo
    from SiemplifyUtils import output_handler, convert_datetime_to_unix_time, convert_string_to_datetime
    import email, imaplib, sys, re
    
    # CONSTANTS
    CONNECTOR_NAME = "Mail"
    VENDOR = "Mail"
    PRODUCT = "Mail"
    DEFAULT_PRIORITY = 60 # Default is Medium
    RULE_GENERATOR_EXAMPLE = "Mail"
    DEFAULT_FOLDER_TO_CHECK_INBOX = "inbox"
    DEFAULT_MESSAGES_TO_READ_UNSEEN = "UNSEEN"
    URLS_REGEX = r"(?i)\b(?:http(?:s)?:\/\/)?(?:www\.)?[a-zA-Z0-9:%_\+~#=][a-zA-Z0-9:%\._\+~#=]{1,255}\.[a-z]{2,6}\b(?:[-a-zA-Z0-9@:%_\+.~#?&//=]*)"
    
    
    def create_alert(siemplify, alert_id, email_message_data, datetime_in_unix_time, created_event):
        """Returns an alert which is one event that contains one unread email message"""
        siemplify.LOGGER.info(f"Started processing Alert {alert_id}")
    
        create_event = None
        alert_info = AlertInfo()
        # Initializes alert_info
        alert_info.display_id = f"{alert_id}" # Each alert needs to have a unique id, otherwise it won't create a case with the same alert id.
        alert_info.ticket_id = f"{alert_id}" # In default, ticket_id = display_id. However, if for some reason the external alert id is different from the display_id, you can save the original external alert id in the "ticket_id" field. 
        alert_info.name = email_message_data['Subject']
        alert_info.rule_generator = RULE_GENERATOR_EXAMPLE # The name of the siem rule which causes the creation of the alert. 
        alert_info.start_time = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime 
        alert_info.end_time = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime
        alert_info.priority = 60 # Informative = -1,Low = 40,Medium = 60,High = 80,Critical = 100.
        alert_info.device_vendor = VENDOR # The field will be fetched from the Original Alert. If you build this alert manually, state the source vendor of the data. (ie: Microsoft, Mcafee) 
        alert_info.device_product = PRODUCT # The field will be fetched from the Original Alert. If you build this alert manually, state the source product of the data. (ie: ActiveDirectory, AntiVirus)
    
        siemplify.LOGGER.info(f"Events creating started for alert {alert_id}")
        try:
            if created_event is not None:
                alert_info.events.append(created_event)
                siemplify.LOGGER.info(f"Added Event {alert_id} to Alert {alert_id}")
        # Raise an exception if failed to process the event
        except Exception as e:
            siemplify.LOGGER.error(f"Failed to process event {alert_id}")
            siemplify.LOGGER.exception(e)
        return alert_info
    
    
    def create_event(siemplify, alert_id, email_message_data, all_found_url_in_emails_body_list, datetime_in_unix_time):
        """Returns the digested data of a single unread email"""
        siemplify.LOGGER.info(f"--- Started processing Event: alert_id: {alert_id} | event_id: {alert_id}") 
        event = {} 
        event["StartTime"] = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime 
        event["EndTime"] = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime 
        event["event_name"] = "Suspicious email" 
        event["device_product"] = PRODUCT # ie: "device_product" is the field name that describes the product the event originated from. 
        event["Subject"] = email_message_data["Subject"] 
        event["SourceUserName"] = email_message_data["From"] 
        event["DestinationUserName"] = email_message_data["To"] 
        event["found_url"] = ",".join(all_found_url_in_emails_body_list) 
        siemplify.LOGGER.info(f"---Finished processing Event: alert_id: {alert_id} | event_id: {alert_id}")
        return event
    
    
    def find_url_in_email_message_body(siemplify, email_messages_data_list):
        """Search for a url in the email body"""
        all_found_url_in_emails_body_list = []
        for message in email_messages_data_list:
            for part in message.walk():
                if part.get_content_maintype() == 'text\plain':
                    continue
                email_message_body = part.get_payload()
                all_found_urls = re.findall(URLS_REGEX, str(email_message_body))
                for url in all_found_urls:
                    if url not in all_found_url_in_emails_body_list:
                        all_found_url_in_emails_body_list.append(url)
    
    
    def get_email_messages_data(imap_host, imap_port, username, password, folder_to_check):
        """Returns all unread email messages"""
        email_messages_data_list = []
    
        # Login to email using 'imap' module
        mail = imaplib.IMAP4_SSL(imap_host, imap_port)
        mail.login(username, password)
    
        # Determining the default email folder to pull emails from - 'inbox'
        if folder_to_check is None:
            folder_to_check = DEFAULT_FOLDER_TO_CHECK_INBOX
    
        # Selecting the email folder to pull the data from
        mail.select(folder_to_check)
    
        # Storing the email message data
        result, data = mail.search(None, DEFAULT_MESSAGES_TO_READ_UNSEEN)
    
        # If there are several emails collected in the cycle it will split each
        # email message into a separate item in the list chosen_mailbox_items_list
        if len(data) > 0:
            chosen_mailbox_items_list = data[0].split()
            # Iterating each email message and appending to emails_messages_data_list
            for item in chosen_mailbox_items_list:
                typ, email_data = mail.fetch(item, '(RFC 822)')
                # Decoding from binary string to string
                raw_email = email_data[0][1].decode("utf-8")
                # Turning the email data into an email object
                email_message = email.message_from_string(raw_email)
                # Appending the email message data to email_messages_data_list
                email_messages_data_list.append(email_message)
        return email_messages_data_list
    
    
    @output_handler
    def main(is_test_run):
        alerts = [] # The main output of each connector run that contains the alerts data 
        siemplify = SiemplifyConnectorExecution() # Siemplify main SDK wrapper
        siemplify.script_name = CONNECTOR_NAME
    
        # In case of running a test
        if (is_test_run):
            siemplify.LOGGER.info("This is an \"IDE Play Button\"\\\"Run Connector once\" test run")
    
        # Extracting the connector's Params
        username = siemplify.extract_connector_param(param_name="Username") 
        password = siemplify.extract_connector_param(param_name="Password") 
        imap_host = siemplify.extract_connector_param(param_name="IMAP Server Address")
        imap_port = siemplify.extract_connector_param(param_name="IMAP Port")
        folder_to_check = siemplify.extract_connector_param(param_name="Folder to check for emails")
    
        # Getting the digested email message data
        email_messages_data_list = get_email_messages_data(imap_host, imap_port, username, password, folder_to_check)
        # If the email_messages_data_list is not empty
        if len(email_messages_data_list) > 0:
            for message in email_messages_data_list:
                # Converting the email message datetime from string to unix time by SiemplifyUtils functions
                datetime_email_message = message['Date']
                string_to_datetime = convert_string_to_datetime(datetime_email_message)
                datetime_in_unix_time = convert_datetime_to_unix_time(string_to_datetime)
                found_urls_in_email_body = find_url_in_email_message_body(siemplify, email_messages_data_list)
                # Getting the unique id of each email message and removing the suffix '@mail.gmail.com' from the Message-ID, Each alert id can be ingested to the system only once.
                alert_id = message['Message-ID'].replace('@mail.gmail.com','') 
                # Creating the event by calling create_event() function 
                created_event = create_event(siemplify, alert_id, message, found_urls_in_email_body, datetime_in_unix_time) 
                # Creating the alert by calling create_alert() function 
                created_alert = create_alert(siemplify, alert_id, message, datetime_in_unix_time, created_event)
                # Checking that the created_alert is not None 
                if created_alert is not None: 
                    alerts.append(created_alert) 
                    siemplify.LOGGER.info(f'Added Alert {alert_id} to package results')
        # If the inbox for the user has no unread emails. 
        else: 
            siemplify.LOGGER.info(f'The inbox for user {username} has no unread emails') 
        # Returning all the created alerts to the cases module in Siemplify 
        siemplify.return_package(alerts) 
    
    
    if __name__ == '__main__':
    # Connectors run in iterations. The interval is configurable from the ConnectorsScreen UI. 
        is_test_run = not (len(sys.argv) < 2 or sys.argv[1] == 'True') 
        main(is_test_run)
    
  2. Dopo aver copiato il codice del connettore, rivedi i moduli necessari da importare e poi procedi alla funzione principale. Ogni metodo chiamato dalla funzione principale verrà discusso in dettaglio più avanti.

Importazioni pertinenti

Un modulo Python ha un insieme di funzioni, classi o variabili definiti e implementati. Per implementare tutte le seguenti funzioni, importa questi moduli nello script:

from SiemplifyConnectors import SiemplifyConnectorExecution # This module is responsible for executing the connector 
from SiemplifyConnectorsDataModel import AlertInfo # The data model that contains the alert info class
from SiemplifyUtils import output_handler, convert_datetime_to_unix_time, convert_string_to_datetime # The functions that convert time 
import email, imaplib, sys, re

Funzione principale

La funzione Main è l'entry point dello script. L'interprete Python esegue il codice in sequenza e chiama ogni metodo definito nello script.

  1. Estrai i parametri del connettore. Utilizza la funzione siemplify.extract_connector_param per estrarre ogni parametro configurato per il connettore (username, password, imap_host, imap_port, folder_to_check).
    # Extracting the connector's Params
        username = siemplify.extract_connector_param(param_name="Username") 
        password = siemplify.extract_connector_param(param_name="Password") 
        imap_host = siemplify.extract_connector_param(param_name="IMAP Server Address") 
        imap_port = siemplify.extract_connector_param(param_name="IMAP Port") 
        folder_to_check = siemplify.extract_connector_param(param_name="Folder to check for emails")
        
  2. Utilizza la funzione get_email_messages_data (`imap_host`, `imap_port`, `username`, `password`, `folder_to_check) per raccogliere tutte le informazioni dalle email non lette.
    # Getting the digested email message data 
        email_messages_data_list = get_email_messages_data(imap_host, imap_port, username, password, folder_to_check) 
  3. Dopo aver ricevuto tutte le informazioni dall'email, conferma che le informazioni sono state raccolte, quindi esegui queste azioni su ogni email:
    # If the email_messages_data_list is not empty 
    if len(email_messages_data_list) > 0:
       for message in email_messages_data_list:
           # Converting the email message datetime from string to unix time by SiemplifyUtils functions

    Questo codice estrae la data del messaggio in base a datetime_email_message = message['Date'] e poi converte questa data e ora in ora Unix epoch utilizzando le funzioni di Google SecOps:
    string_to_datetime = convert_string_to_datetime(datetime_email_message) 
    datetime_in_unix_time = convert_datetime_to_unix_time(string_to_datetime) 
  4. Cerca gli URL nel corpo del messaggio email utilizzando la funzione find_url_in_email_message_body(siemplify_email_messages_data_list). Se viene trovato un URL, utilizza altri prodotti del nostro playbook per verificare se è dannoso.
  5. found_urls_in_email_body = find_url_in_email_message_body(siemplify, email_messages_data_list)
  6. Estrai l'ID univoco di ogni messaggio email e assegnalo alla variabile alert_id:
    # Getting the unique id of each email message and removing the suffix '@mail.gmail.com' from the Message-ID, Each alert id can be ingested to the system only once. 
    alert_id = message['Message-ID'].replace('@mail.gmail.com','') 
  7. Dopo aver estratto tutti i dettagli necessari per importare l'avviso nella piattaforma Google SecOps, crea l'avviso e l'evento:
    # Creating the event by calling create_event() function 
    created_event = create_event(siemplify, alert_id, message, found_urls_in_email_body, datetime_in_unix_time) 
    # Creating the alert by calling create_alert() function 
    created_alert = create_alert(siemplify, alert_id, message, datetime_in_unix_time, created_event)
  8. Convalida l'avviso e l'evento creati. Dopo la convalida, aggiungi l'avviso all'elenco degli avvisi.
        # Checking that the created_alert is not None 
        if created_alert is not None: 
            alerts.append(created_alert)     
            siemplify.LOGGER.info(f"Added Alert {alert_id} to package results")    
        
  9. In uno scenario in cui la casella di posta in arrivo dell'utente specificato non contiene email non lette, aggiungi il seguente codice:
    else:
        siemplify.LOGGER.info(f"The inbox for user {username} has no unread emails")    
    
  10. Restituisci l'elenco degli avvisi al sistema e ogni avviso viene quindi presentato come una richiesta nella coda delle richieste:
        # Returning all the created alerts to the cases module in Siemplify 
        siemplify.return_package(alerts)
        
  11. Esegui la funzione Main negli orari che hai impostato nella configurazione del connettore:
    if __name__ == "__main__": 
        # Connectors run in iterations. The interval is configurable from the Connectors UI. 
        is_test_run = not (len(sys.argv) < 2 or sys.argv[1] == 'True')
        main(is_test_run)
    

Recuperare il messaggio email da leggere

La funzione Recupera il messaggio email non letto si connette all'email con i moduli Imap e Email e recupera i dettagli del messaggio email. Restituisce anche un elenco contenente tutte le informazioni di tutti i messaggi email non letti.

  1. Dalla classe principale, utilizza la funzione: get_email_messages_data(imap_host, imap_port, username, password, folder_to_check).
    def get_email_messages_data(imap_host, imap_port, username, password, folder_to_check):
        """Returns all unread email messages"""
        email_messages_data_list = []
    
  2. Connettiti all'email utilizzando imap module:
        # Login to email using 'imap' module
        mail = imaplib.IMAP4_SSL(imap_host, imap_port)
        mail.login(username, password)
        
  3. Determina la cartella nell'email in cui controllare i messaggi da leggere. In questo esempio, estrai le email dalla cartella Posta in arrivo (DEFAULT_FOLDER_TO_CHECK_INBOX = "inbox"):
  4. # Determining the default email folder to pull emails from - 'inbox'
    if folder_to_check is None:
        folder_to_check = DEFAULT_FOLDER_TO_CHECK_INBOX
    # Selecting the email folder to pull the data from mail.select(folder_to_check)
    
  5. Raccogli tutti i messaggi non letti DEFAULT_MESSAGES_TO_READ_UNSEEN = "UNSEEN" e poi converti questi dati in un elenco:
    # Storing the email message data
    result, data = mail.search(None, DEFAULT_MESSAGES_TO_READ_UNSEEN)
    # If there are several emails collected in the cycle it will split each
    # email message into a separate item in the list chosen_mailbox_items_list
    if len(data) > 0:
        chosen_mailbox_items_list = data[0].split()
        # Iterating each email message and appending to emails_messages_data_list
        for item in chosen_mailbox_items_list:
            typ, email_data = mail.fetch(item, '(RFC 822)')
            # Decoding from binary string to string
            raw_email = email_data[0][1].decode("utf-8")
            # Turning the email data into an email object
            email_message = email.message_from_string(raw_email)
            # Appending the email message data to email_messages_data_list
            email_messages_data_list.append(email_message)
    return email_messages_data_list
    

Creare l'evento

La funzione Crea l'evento crea l'evento associando ogni componente del messaggio email ai campi dell'evento, rispettivamente.

  1. Dalla classe principale crea l'evento utilizzando la funzione: create_event(siemplify, alert_id, email_message_data, all_found_url_in_emails_body_list, datetime_in_unix_time)
  2. def create_event(siemplify, alert_id, email_message_data, all_found_url_in_emails_body_list, datetime_in_unix_time):
        """Returns the digested data of a single unread email"""
        siemplify.LOGGER.info(f"--- Started processing Event: alert_id: {alert_id} | event_id: {alert_id}") 
    
  3. Crea un dizionario per i campi evento. I campi obbligatori sono event["StartTime"], event["EndTime"], event["event_name"] and event["device_product"] :
  4. event = {} 
    event["StartTime"] = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime 
    event["EndTime"] = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime 
    event["event_name"] = "Suspicious email" 
    event["device_product"] = PRODUCT # ie: "device_product" is the field name that describes the product the event originated from. 
    event["Subject"] = email_message_data["Subject"] 
    event["SourceUserName"] = email_message_data["From"] 
    event["DestinationUserName"] = email_message_data["To"] 
    event["found_url"] = ",".join(all_found_url_in_emails_body_list) 
    siemplify.LOGGER.info(f"---Finished processing Event: alert_id: {alert_id} | event_id: {alert_id}")
    return event
    
  5. Ogni avviso contiene uno o più eventi. In questo esempio, un avviso contiene un singolo evento: un messaggio email. Pertanto, dopo aver creato l'evento, crea l'avviso che contiene tutte le informazioni sull'evento.

Crea le informazioni sull'avviso e inizializza i campi delle caratteristiche delle informazioni sull'avviso

Questa funzione crea l'avviso. Ogni avviso contiene uno o più eventi. In questo caso, ogni avviso contiene un evento, che è fondamentalmente un messaggio email.

  1. Dal corso principale, crea l'avviso:
    def create_alert(siemplify, alert_id, email_message_data,datetime_in_unix_time, created_event): 
        """Returns an alert which is one event that contains one unread email message"""
        siemplify.LOGGER.info(f"-------------- Started processing Alert {alert_id}")
        create_event = None
    
  2. Crea l'istanza alert_info e inizializzala:
    # Initializes the alert_info Characteristics Fields 
    alert_info.display_id = f"{alert_id}" 
    alert_info.ticket_id = f"{alert_id}" 
    alert_info.name = email_message_data['Subject'] 
    alert_info.rule_generator = RULE_GENERATOR_EXAMPLE 
    alert_info.start_time = datetime_in_unix_time 
    alert_info.end_time = datetime_in_unix_time 
    alert_info.device_vendor = VENDOR 
    alert_info.device_product = PRODUCT 
    
  3. Dopo aver creato l'avviso, convalida l'evento e aggiungilo alle caratteristiche di aert_info:
  4. siemplify.LOGGER.info(f"Events creating started for alert {alert_id}")
    try:
        if created_event is not None:
            alert_info.events.append(created_event)
        siemplify.LOGGER.info(f"Added Event {alert_id} to Alert {alert_id}")
    # Raise an exception if failed to process the event
    except Exception as e:
        siemplify.LOGGER.error(f"Failed to process event {alert_id}")
        siemplify.LOGGER.exception(e)
    return alert_info
    

Trovare l'URL nella funzione del corpo dell'email

La funzione Trova l'URL nel corpo dell'email analizza il corpo dell'email alla ricerca di URL. Per utilizzare questa funzione, segui questi passaggi:

  1. Per ogni messaggio email, individua la parte del corpo del messaggio con contenuti in testo normale:
  2. def find_url_in_email_message_body(siemplify, email_messages_data_list):
        """
        Search for a url in the email body,
        """
        all_found_url_in_emails_body_list = []
        for message in email_messages_data_list:
            for part in message.walk():
                if part.get_content_maintype() == 'text\plain':
                    continue
                
    
  3. Se il corpo contiene i contenuti richiesti, carica queste informazioni con email_message_body = part.get_payload() e cerca gli URL utilizzando l'espressione regolare:
  4. 
    URLS_REGEX=r"http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+"
    
  5. Questo esempio estrae gli URL dal corpo dell'email:
  6. email_message_body = part.get_payload()
    all_found_urls = re.findall(URLS_REGEX, str(email_message_body))
    for url in all_found_urls:
        if url not in all_found_url_in_emails_body_list:
            all_found_url_in_emails_body_list.append(url)
    siemplify.LOGGER.info(f"The URL found : {all_found_url_in_emails_body_list}") 
    return all_found_url_in_emails_body_list 
  7. Dopo aver esaminato il codice del connettore, puoi configurarne uno per importare le richieste nella piattaforma da una Posta in arrivo di Gmail selezionata.

Hai bisogno di ulteriore assistenza? Ricevi risposte dai membri della community e dai professionisti di Google SecOps.