Sviluppare il connettore

Supportato in:

Questo documento spiega come creare un nuovo connettore nell'ambiente di sviluppo integrato (IDE) creando un'integrazione e poi collegandovi un connettore.

Creare un'integrazione del connettore email

Per creare un'integrazione del connettore email:

  1. Nella finestra Risposta > IDE, fai clic su Aggiungi per aggiungere un nuovo elemento IDE.
  2. Seleziona Integrazione e assegna all'integrazione il nome Email Connector.
  3. Fai clic su Crea. L'integrazione viene visualizzata nella barra laterale con un'icona predefinita.
  4. Fai clic su more_vert Configura integrazione personalizzata e definisci queste impostazioni:
    • Descrizione
    • Icona
    • Dipendenze Python
    • Parametri di integrazione
  5. Fai clic su Salva.

Creare un connettore email

Per creare un connettore email:

  1. Nella finestra Risposta > IDE, fai clic su Aggiungi per aggiungere un nuovo elemento IDE.
  2. Seleziona il pulsante di opzione Connettore e assegna al connettore il nome My Email Connector.
  3. Nell'elenco, seleziona l'integrazione Email Connector per associare il connettore all'integrazione.
  4. Fai clic su Crea.

Impostare i parametri del connettore

Dopo aver creato il connettore, imposta i parametri del connettore:

  1. Imposta i parametri del connettore in base alla tabella:

  2. Parametro Descrizione Tipo
    Username Obbligatorio. Nome utente IMAP. L'indirizzo email da cui il connettore recupera le email nella piattaforma Google SecOps. Il valore predefinito è email@gmail.com. string
    Password Obbligatorio. Password IMAP (Internet Message Access Protocol) per l'indirizzo email utilizzato dal connettore per recuperare le email nella piattaforma Google SecOps. password
    IMAP Port Obbligatorio. La porta di rete specifica utilizzata da IMAP per accedere alle email da un server remoto. Ad esempio: 993 (valore predefinito). int
    IMAP Server Address Obbligatorio. Il server di posta in arrivo per un account IMAP. Ad esempio, imap.gmail.com (valore predefinito). string
    Folder to check for emails Facoltativo. Recupera le email solo dalla cartella specificata. Ad esempio: Inbox (valore predefinito). inbox
  3. Inserisci i seguenti dettagli del campo:
    1. Nome campo prodotto = device_product: determina il valore del campo non elaborato da assegnare al nome del prodotto dell'avviso. Trova il campo correlato nel codice, definito come Mail (prodotto).
      event["device_product"] = PRODUCT #The PRODUCT constant is `"Mail"
    2. Nome campo evento = event_name: determina il valore del campo non elaborato da assegnare al campo del tipo di evento. Trova il campo correlato nel codice, definito come Suspicious email.
      event["event_name"] = Suspicious email

Modificare il connettore email

Per modificare i parametri del connettore email:

  1. Copia il seguente codice creato per My Email Connector, incollalo nell'IDE e segui le istruzioni:
    from SiemplifyConnectors import SiemplifyConnectorExecution
    from SiemplifyConnectorsDataModel import AlertInfo
    from SiemplifyUtils import output_handler, convert_datetime_to_unix_time, convert_string_to_datetime
    import email, imaplib, sys, re
    
    # CONSTANTS
    CONNECTOR_NAME = "Mail"
    VENDOR = "Mail"
    PRODUCT = "Mail"
    DEFAULT_PRIORITY = 60 # Default is Medium
    RULE_GENERATOR_EXAMPLE = "Mail"
    DEFAULT_FOLDER_TO_CHECK_INBOX = "inbox"
    DEFAULT_MESSAGES_TO_READ_UNSEEN = "UNSEEN"
    URLS_REGEX = r"(?i)\b(?:http(?:s)?:\/\/)?(?:www\.)?[a-zA-Z0-9:%_\+~#=][a-zA-Z0-9:%\._\+~#=]{1,255}\.[a-z]{2,6}\b(?:[-a-zA-Z0-9@:%_\+.~#?&//=]*)"
    
    
    def create_alert(siemplify, alert_id, email_message_data, datetime_in_unix_time, created_event):
        """Returns an alert which is one event that contains one unread email message"""
        siemplify.LOGGER.info(f"Started processing Alert {alert_id}")
    
        create_event = None
        alert_info = AlertInfo()
        # Initializes alert_info
        alert_info.display_id = f"{alert_id}" # Each alert needs to have a unique id, otherwise it won't create a case with the same alert id.
        alert_info.ticket_id = f"{alert_id}" # In default, ticket_id = display_id. However, if for some reason the external alert id is different from the display_id, you can save the original external alert id in the "ticket_id" field. 
        alert_info.name = email_message_data['Subject']
        alert_info.rule_generator = RULE_GENERATOR_EXAMPLE # The name of the siem rule which causes the creation of the alert. 
        alert_info.start_time = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime 
        alert_info.end_time = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime
        alert_info.priority = 60 # Informative = -1,Low = 40,Medium = 60,High = 80,Critical = 100.
        alert_info.device_vendor = VENDOR # The field will be fetched from the Original Alert. If you build this alert manually, state the source vendor of the data. (ie: Microsoft, Mcafee) 
        alert_info.device_product = PRODUCT # The field will be fetched from the Original Alert. If you build this alert manually, state the source product of the data. (ie: ActiveDirectory, AntiVirus)
    
        siemplify.LOGGER.info(f"Events creating started for alert {alert_id}")
        try:
            if created_event is not None:
                alert_info.events.append(created_event)
                siemplify.LOGGER.info(f"Added Event {alert_id} to Alert {alert_id}")
        # Raise an exception if failed to process the event
        except Exception as e:
            siemplify.LOGGER.error(f"Failed to process event {alert_id}")
            siemplify.LOGGER.exception(e)
        return alert_info
    
    
    def create_event(siemplify, alert_id, email_message_data, all_found_url_in_emails_body_list, datetime_in_unix_time):
        """Returns the digested data of a single unread email"""
        siemplify.LOGGER.info(f"--- Started processing Event: alert_id: {alert_id} | event_id: {alert_id}") 
        event = {} 
        event["StartTime"] = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime 
        event["EndTime"] = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime 
        event["event_name"] = "Suspicious email" 
        event["device_product"] = PRODUCT # ie: "device_product" is the field name that describes the product the event originated from. 
        event["Subject"] = email_message_data["Subject"] 
        event["SourceUserName"] = email_message_data["From"] 
        event["DestinationUserName"] = email_message_data["To"] 
        event["found_url"] = ",".join(all_found_url_in_emails_body_list) 
        siemplify.LOGGER.info(f"---Finished processing Event: alert_id: {alert_id} | event_id: {alert_id}")
        return event
    
    
    def find_url_in_email_message_body(siemplify, email_messages_data_list):
        """Search for a url in the email body"""
        all_found_url_in_emails_body_list = []
        for message in email_messages_data_list:
            for part in message.walk():
                if part.get_content_maintype() == 'text\plain':
                    continue
                email_message_body = part.get_payload()
                all_found_urls = re.findall(URLS_REGEX, str(email_message_body))
                for url in all_found_urls:
                    if url not in all_found_url_in_emails_body_list:
                        all_found_url_in_emails_body_list.append(url)
    
    
    def get_email_messages_data(imap_host, imap_port, username, password, folder_to_check):
        """Returns all unread email messages"""
        email_messages_data_list = []
    
        # Login to email using 'imap' module
        mail = imaplib.IMAP4_SSL(imap_host, imap_port)
        mail.login(username, password)
    
        # Determining the default email folder to pull emails from - 'inbox'
        if folder_to_check is None:
            folder_to_check = DEFAULT_FOLDER_TO_CHECK_INBOX
    
        # Selecting the email folder to pull the data from
        mail.select(folder_to_check)
    
        # Storing the email message data
        result, data = mail.search(None, DEFAULT_MESSAGES_TO_READ_UNSEEN)
    
        # If there are several emails collected in the cycle it will split each
        # email message into a separate item in the list chosen_mailbox_items_list
        if len(data) > 0:
            chosen_mailbox_items_list = data[0].split()
            # Iterating each email message and appending to emails_messages_data_list
            for item in chosen_mailbox_items_list:
                typ, email_data = mail.fetch(item, '(RFC 822)')
                # Decoding from binary string to string
                raw_email = email_data[0][1].decode("utf-8")
                # Turning the email data into an email object
                email_message = email.message_from_string(raw_email)
                # Appending the email message data to email_messages_data_list
                email_messages_data_list.append(email_message)
        return email_messages_data_list
    
    
    @output_handler
    def main(is_test_run):
        alerts = [] # The main output of each connector run that contains the alerts data 
        siemplify = SiemplifyConnectorExecution() # Siemplify main SDK wrapper
        siemplify.script_name = CONNECTOR_NAME
    
        # In case of running a test
        if (is_test_run):
            siemplify.LOGGER.info("This is an \"IDE Play Button\"\\\"Run Connector once\" test run")
    
        # Extracting the connector's Params
        username = siemplify.extract_connector_param(param_name="Username") 
        password = siemplify.extract_connector_param(param_name="Password") 
        imap_host = siemplify.extract_connector_param(param_name="IMAP Server Address")
        imap_port = siemplify.extract_connector_param(param_name="IMAP Port")
        folder_to_check = siemplify.extract_connector_param(param_name="Folder to check for emails")
    
        # Getting the digested email message data
        email_messages_data_list = get_email_messages_data(imap_host, imap_port, username, password, folder_to_check)
        # If the email_messages_data_list is not empty
        if len(email_messages_data_list) > 0:
            for message in email_messages_data_list:
                # Converting the email message datetime from string to unix time by SiemplifyUtils functions
                datetime_email_message = message['Date']
                string_to_datetime = convert_string_to_datetime(datetime_email_message)
                datetime_in_unix_time = convert_datetime_to_unix_time(string_to_datetime)
                found_urls_in_email_body = find_url_in_email_message_body(siemplify, email_messages_data_list)
                # Getting the unique id of each email message and removing the suffix '@mail.gmail.com' from the Message-ID, Each alert id can be ingested to the system only once.
                alert_id = message['Message-ID'].replace('@mail.gmail.com','') 
                # Creating the event by calling create_event() function 
                created_event = create_event(siemplify, alert_id, message, found_urls_in_email_body, datetime_in_unix_time) 
                # Creating the alert by calling create_alert() function 
                created_alert = create_alert(siemplify, alert_id, message, datetime_in_unix_time, created_event)
                # Checking that the created_alert is not None 
                if created_alert is not None: 
                    alerts.append(created_alert) 
                    siemplify.LOGGER.info(f'Added Alert {alert_id} to package results')
        # If the inbox for the user has no unread emails. 
        else: 
            siemplify.LOGGER.info(f'The inbox for user {username} has no unread emails') 
        # Returning all the created alerts to the cases module in Siemplify 
        siemplify.return_package(alerts) 
    
    
    if __name__ == '__main__':
    # Connectors run in iterations. The interval is configurable from the ConnectorsScreen UI. 
        is_test_run = not (len(sys.argv) < 2 or sys.argv[1] == 'True') 
        main(is_test_run)
    
  2. Dopo aver copiato il codice del connettore, esamina i moduli necessari da importare e poi passa alla funzione principale. Ogni metodo chiamato dalla funzione principale verrà discusso in dettaglio in un secondo momento.

Importazioni pertinenti

Un modulo Python ha un insieme di funzioni, classi o variabili definiti e implementati. Per implementare tutte le seguenti funzioni, importa questi moduli nello script:

from SiemplifyConnectors import SiemplifyConnectorExecution # This module is responsible for executing the connector 
from SiemplifyConnectorsDataModel import AlertInfo # The data model that contains the alert info class
from SiemplifyUtils import output_handler, convert_datetime_to_unix_time, convert_string_to_datetime # The functions that convert time 
import email, imaplib, sys, re

Funzione principale

La funzione Main è l'entry point dello script. L'interprete Python esegue il codice in sequenza e chiama ogni metodo definito nello script.

  1. Estrai i parametri del connettore. Utilizza la funzione siemplify.extract_connector_param per estrarre ogni parametro configurato per il connettore (username, password, imap_host, imap_port, folder_to_check).
    # Extracting the connector's Params
        username = siemplify.extract_connector_param(param_name="Username") 
        password = siemplify.extract_connector_param(param_name="Password") 
        imap_host = siemplify.extract_connector_param(param_name="IMAP Server Address") 
        imap_port = siemplify.extract_connector_param(param_name="IMAP Port") 
        folder_to_check = siemplify.extract_connector_param(param_name="Folder to check for emails")
        
  2. Utilizza la funzione get_email_messages_data (`imap_host`, `imap_port`, `username`, `password`, `folder_to_check) per raccogliere tutte le informazioni dalle email da leggere.
    # Getting the digested email message data 
        email_messages_data_list = get_email_messages_data(imap_host, imap_port, username, password, folder_to_check) 
  3. Dopo aver ricevuto tutte le informazioni dall'email, verifica che siano state raccolte e poi esegui queste azioni su ogni email:
    # If the email_messages_data_list is not empty 
    if len(email_messages_data_list) > 0:
       for message in email_messages_data_list:
           # Converting the email message datetime from string to unix time by SiemplifyUtils functions

    Questo codice estrae la data del messaggio tramite datetime_email_message = message['Date'] e poi converte questa data/ora in ora Unix epoch utilizzando le funzioni di Google SecOps:
    string_to_datetime = convert_string_to_datetime(datetime_email_message) 
    datetime_in_unix_time = convert_datetime_to_unix_time(string_to_datetime) 
  4. Cerca gli URL nel corpo del messaggio email utilizzando la funzione find_url_in_email_message_body(siemplify_email_messages_data_list). Se viene trovato un URL, utilizza altri prodotti nel nostro playbook per verificare se è dannoso.
  5. found_urls_in_email_body = find_url_in_email_message_body(siemplify, email_messages_data_list)
  6. Estrai l'ID univoco di ogni messaggio email e assegnalo alla variabile alert_id:
    # Getting the unique id of each email message and removing the suffix '@mail.gmail.com' from the Message-ID, Each alert id can be ingested to the system only once. 
    alert_id = message['Message-ID'].replace('@mail.gmail.com','') 
  7. Dopo aver estratto tutti i dettagli necessari per inserire l'avviso nella piattaforma Google SecOps, crea l'avviso e l'evento:
    # Creating the event by calling create_event() function 
    created_event = create_event(siemplify, alert_id, message, found_urls_in_email_body, datetime_in_unix_time) 
    # Creating the alert by calling create_alert() function 
    created_alert = create_alert(siemplify, alert_id, message, datetime_in_unix_time, created_event)
  8. Convalida l'avviso e l'evento creati. Dopo la convalida, aggiungi l'avviso all'elenco degli avvisi.
        # Checking that the created_alert is not None 
        if created_alert is not None: 
            alerts.append(created_alert)     
            siemplify.LOGGER.info(f"Added Alert {alert_id} to package results")    
        
  9. In uno scenario in cui la Posta in arrivo dell'utente specificato non contiene email da leggere, aggiungi il seguente codice:
    else:
        siemplify.LOGGER.info(f"The inbox for user {username} has no unread emails")    
    
  10. Restituisci l'elenco degli avvisi al sistema e ogni avviso viene presentato come un caso nella coda dei casi:
        # Returning all the created alerts to the cases module in Siemplify 
        siemplify.return_package(alerts)
        
  11. Esegui la funzione Main negli orari impostati nella configurazione del connettore:
    if __name__ == "__main__": 
        # Connectors run in iterations. The interval is configurable from the Connectors UI. 
        is_test_run = not (len(sys.argv) < 2 or sys.argv[1] == 'True')
        main(is_test_run)
    

Recuperare il messaggio email da leggere

La funzione Recuperare il messaggio email da leggere si connette all'email con i moduli Imap ed Email e recupera i dettagli del messaggio email. Restituisce anche un elenco contenente tutte le informazioni di tutti i messaggi email da leggere.

  1. Dalla classe principale, utilizza la funzione: get_email_messages_data(imap_host, imap_port, username, password, folder_to_check).
    def get_email_messages_data(imap_host, imap_port, username, password, folder_to_check):
        """Returns all unread email messages"""
        email_messages_data_list = []
    
  2. Connettiti all'email utilizzando il imap module:
        # Login to email using 'imap' module
        mail = imaplib.IMAP4_SSL(imap_host, imap_port)
        mail.login(username, password)
        
  3. Determina la cartella dell'email in cui cercare i messaggi da leggere. In questo esempio, estrai le email dalla cartella Posta in arrivo (DEFAULT_FOLDER_TO_CHECK_INBOX = "inbox"):
  4. # Determining the default email folder to pull emails from - 'inbox'
    if folder_to_check is None:
        folder_to_check = DEFAULT_FOLDER_TO_CHECK_INBOX
    # Selecting the email folder to pull the data from mail.select(folder_to_check)
    
  5. Raccogli tutti i messaggi da leggere DEFAULT_MESSAGES_TO_READ_UNSEEN = "UNSEEN" e poi converti questi dati in un elenco:
    # Storing the email message data
    result, data = mail.search(None, DEFAULT_MESSAGES_TO_READ_UNSEEN)
    # If there are several emails collected in the cycle it will split each
    # email message into a separate item in the list chosen_mailbox_items_list
    if len(data) > 0:
        chosen_mailbox_items_list = data[0].split()
        # Iterating each email message and appending to emails_messages_data_list
        for item in chosen_mailbox_items_list:
            typ, email_data = mail.fetch(item, '(RFC 822)')
            # Decoding from binary string to string
            raw_email = email_data[0][1].decode("utf-8")
            # Turning the email data into an email object
            email_message = email.message_from_string(raw_email)
            # Appending the email message data to email_messages_data_list
            email_messages_data_list.append(email_message)
    return email_messages_data_list
    

Creare l'evento

La funzione Creare l'evento crea l'evento associando rispettivamente ogni componente del messaggio email ai campi dell'evento.

  1. Dalla classe principale, crea l'evento utilizzando la funzione: create_event(siemplify, alert_id, email_message_data, all_found_url_in_emails_body_list, datetime_in_unix_time)
  2. def create_event(siemplify, alert_id, email_message_data, all_found_url_in_emails_body_list, datetime_in_unix_time):
        """Returns the digested data of a single unread email"""
        siemplify.LOGGER.info(f"--- Started processing Event: alert_id: {alert_id} | event_id: {alert_id}") 
    
  3. Crea un dizionario per i campi dell'evento. I campi obbligatori sono event["StartTime"], event["EndTime"], event["event_name"] and event["device_product"] :
  4. event = {} 
    event["StartTime"] = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime 
    event["EndTime"] = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime 
    event["event_name"] = "Suspicious email" 
    event["device_product"] = PRODUCT # ie: "device_product" is the field name that describes the product the event originated from. 
    event["Subject"] = email_message_data["Subject"] 
    event["SourceUserName"] = email_message_data["From"] 
    event["DestinationUserName"] = email_message_data["To"] 
    event["found_url"] = ",".join(all_found_url_in_emails_body_list) 
    siemplify.LOGGER.info(f"---Finished processing Event: alert_id: {alert_id} | event_id: {alert_id}")
    return event
    
  5. Ogni avviso contiene uno o più eventi. In questo esempio, un avviso contiene un singolo evento: un messaggio email. Pertanto, dopo aver creato l'evento, crea l'avviso che contiene tutte le informazioni dell'evento.

Creare le informazioni sull'avviso e inizializzare i campi delle caratteristiche delle informazioni sull'avviso

Questa funzione crea l'avviso. Ogni avviso contiene uno o più eventi. In questo caso, ogni avviso contiene un evento, ovvero un messaggio email.

  1. Dalla classe principale, crea l'avviso:
    def create_alert(siemplify, alert_id, email_message_data,datetime_in_unix_time, created_event): 
        """Returns an alert which is one event that contains one unread email message"""
        siemplify.LOGGER.info(f"-------------- Started processing Alert {alert_id}")
        create_event = None
    
  2. Crea l'istanza alert_info e inizializzala:
    # Initializes the alert_info Characteristics Fields 
    alert_info.display_id = f"{alert_id}" 
    alert_info.ticket_id = f"{alert_id}" 
    alert_info.name = email_message_data['Subject'] 
    alert_info.rule_generator = RULE_GENERATOR_EXAMPLE 
    alert_info.start_time = datetime_in_unix_time 
    alert_info.end_time = datetime_in_unix_time 
    alert_info.device_vendor = VENDOR 
    alert_info.device_product = PRODUCT 
    
  3. Dopo aver creato l'avviso, convalida l'evento e aggiungilo alle caratteristiche di alert_info:
  4. siemplify.LOGGER.info(f"Events creating started for alert {alert_id}")
    try:
        if created_event is not None:
            alert_info.events.append(created_event)
        siemplify.LOGGER.info(f"Added Event {alert_id} to Alert {alert_id}")
    # Raise an exception if failed to process the event
    except Exception as e:
        siemplify.LOGGER.error(f"Failed to process event {alert_id}")
        siemplify.LOGGER.exception(e)
    return alert_info
    

Trovare la funzione dell'URL nel corpo dell'email

La funzione Trovare l'URL nel corpo dell'email esamina il corpo dell'email alla ricerca di URL. Per utilizzare questa funzione:

  1. Per ogni messaggio email, individua la parte del corpo del messaggio con contenuti di testo normale:
  2. def find_url_in_email_message_body(siemplify, email_messages_data_list):
        """
        Search for a url in the email body,
        """
        all_found_url_in_emails_body_list = []
        for message in email_messages_data_list:
            for part in message.walk():
                if part.get_content_maintype() == 'text\plain':
                    continue
                
    
  3. Se il corpo contiene i contenuti richiesti, carica queste informazioni con email_message_body = part.get_payload() e cerca gli URL utilizzando l'espressione regolare:
  4. 
    URLS_REGEX=r"http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+"
    
  5. Questo esempio estrae gli URL dal corpo dell'email:
  6. email_message_body = part.get_payload()
    all_found_urls = re.findall(URLS_REGEX, str(email_message_body))
    for url in all_found_urls:
        if url not in all_found_url_in_emails_body_list:
            all_found_url_in_emails_body_list.append(url)
    siemplify.LOGGER.info(f"The URL found: {all_found_url_in_emails_body_list}") 
    return all_found_url_in_emails_body_list 
  7. Dopo aver esaminato il codice del connettore, puoi configurare un connettore per inserire le richieste nella piattaforma da una Posta in arrivo di Gmail selezionata.

Hai bisogno di ulteriore assistenza? Ricevi risposte dai membri della community e dai professionisti di Google SecOps.