Mengembangkan konektor

Didukung di:

Dokumen ini menjelaskan cara membuat konektor baru di Lingkungan Pengembangan Terintegrasi (IDE) dengan membuat integrasi, lalu menautkan konektor ke integrasi tersebut.

Membuat integrasi konektor email

Untuk membuat integrasi konektor email, ikuti langkah-langkah berikut:

  1. Di jendela Response > IDE, klik Add untuk menambahkan item IDE baru.
  2. Pilih Integrasi dan beri nama integrasi Email Connector.
  3. Klik Buat. Integrasi muncul di sidebar dengan ikon default.
  4. Klik more_vert Konfigurasi Integrasi Kustom dan tentukan setelan berikut:
    • Deskripsi
    • Ikon
    • Dependensi Python
    • Parameter Integrasi
  5. Klik Simpan.

Membuat konektor email

Untuk membuat konektor email, ikuti langkah-langkah berikut:

  1. Di jendela Response > IDE, klik Add untuk menambahkan item IDE baru.
  2. Pilih tombol pilihan Connector dan beri nama konektor My Email Connector.
  3. Dalam daftar, pilih integrasi Email Connector untuk mengaitkan konektor dengan integrasi.
  4. Klik Buat.

Menetapkan parameter konektor

Setelah membuat konektor, tetapkan parameter konektor:

  1. Tetapkan parameter konektor sesuai dengan tabel:

  2. Parameter Deskripsi Jenis
    Username Wajib diisi. Nama pengguna IMAP. Alamat email dari mana konektor menarik email ke platform Google SecOps. Nilai defaultnya adalah email@gmail.com. string
    Password Wajib diisi. Sandi Internet Message Access Protocol (IMAP) untuk alamat email yang digunakan konektor untuk menarik email ke platform Google SecOps. sandi
    IMAP Port Wajib diisi. Port jaringan tertentu yang digunakan oleh IMAP untuk mengakses email dari server jarak jauh. Misalnya: 993 (nilai default). int
    IMAP Server Address Wajib diisi. Server email masuk untuk akun IMAP. Misalnya, imap.gmail.com (nilai default). string
    Folder to check for emails Opsional. Mengambil email hanya dari folder yang ditentukan. Misalnya: Inbox (nilai default). inbox
  3. Masukkan detail kolom berikut:
    1. Nama Kolom Produk = device_product: Menentukan nilai kolom mentah yang akan ditetapkan ke nama produk pemberitahuan. Temukan kolom terkait dalam kode, yang ditentukan sebagai Mail (produk).
      event["device_product"] = PRODUCT #The PRODUCT constant is `"Mail"
    2. Nama Kolom Peristiwa = event_name: Menentukan nilai kolom mentah yang akan ditetapkan ke kolom jenis peristiwa. Temukan kolom terkait dalam kode, yang ditentukan sebagai Suspicious email.
      event["event_name"] = Suspicious email

Mengedit konektor email

Untuk mengedit parameter konektor email, ikuti langkah-langkah berikut:

  1. Salin kode berikut yang dibuat untuk My Email Connector, tempelkan di IDE, lalu ikuti petunjuknya:
    from SiemplifyConnectors import SiemplifyConnectorExecution
    from SiemplifyConnectorsDataModel import AlertInfo
    from SiemplifyUtils import output_handler, convert_datetime_to_unix_time, convert_string_to_datetime
    import email, imaplib, sys, re
    
    # CONSTANTS
    CONNECTOR_NAME = "Mail"
    VENDOR = "Mail"
    PRODUCT = "Mail"
    DEFAULT_PRIORITY = 60 # Default is Medium
    RULE_GENERATOR_EXAMPLE = "Mail"
    DEFAULT_FOLDER_TO_CHECK_INBOX = "inbox"
    DEFAULT_MESSAGES_TO_READ_UNSEEN = "UNSEEN"
    URLS_REGEX = r"(?i)\b(?:http(?:s)?:\/\/)?(?:www\.)?[a-zA-Z0-9:%_\+~#=][a-zA-Z0-9:%\._\+~#=]{1,255}\.[a-z]{2,6}\b(?:[-a-zA-Z0-9@:%_\+.~#?&//=]*)"
    
    
    def create_alert(siemplify, alert_id, email_message_data, datetime_in_unix_time, created_event):
        """Returns an alert which is one event that contains one unread email message"""
        siemplify.LOGGER.info(f"Started processing Alert {alert_id}")
    
        create_event = None
        alert_info = AlertInfo()
        # Initializes alert_info
        alert_info.display_id = f"{alert_id}" # Each alert needs to have a unique id, otherwise it won't create a case with the same alert id.
        alert_info.ticket_id = f"{alert_id}" # In default, ticket_id = display_id. However, if for some reason the external alert id is different from the display_id, you can save the original external alert id in the "ticket_id" field. 
        alert_info.name = email_message_data['Subject']
        alert_info.rule_generator = RULE_GENERATOR_EXAMPLE # The name of the siem rule which causes the creation of the alert. 
        alert_info.start_time = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime 
        alert_info.end_time = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime
        alert_info.priority = 60 # Informative = -1,Low = 40,Medium = 60,High = 80,Critical = 100.
        alert_info.device_vendor = VENDOR # The field will be fetched from the Original Alert. If you build this alert manually, state the source vendor of the data. (ie: Microsoft, Mcafee) 
        alert_info.device_product = PRODUCT # The field will be fetched from the Original Alert. If you build this alert manually, state the source product of the data. (ie: ActiveDirectory, AntiVirus)
    
        siemplify.LOGGER.info(f"Events creating started for alert {alert_id}")
        try:
            if created_event is not None:
                alert_info.events.append(created_event)
                siemplify.LOGGER.info(f"Added Event {alert_id} to Alert {alert_id}")
        # Raise an exception if failed to process the event
        except Exception as e:
            siemplify.LOGGER.error(f"Failed to process event {alert_id}")
            siemplify.LOGGER.exception(e)
        return alert_info
    
    
    def create_event(siemplify, alert_id, email_message_data, all_found_url_in_emails_body_list, datetime_in_unix_time):
        """Returns the digested data of a single unread email"""
        siemplify.LOGGER.info(f"--- Started processing Event: alert_id: {alert_id} | event_id: {alert_id}") 
        event = {} 
        event["StartTime"] = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime 
        event["EndTime"] = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime 
        event["event_name"] = "Suspicious email" 
        event["device_product"] = PRODUCT # ie: "device_product" is the field name that describes the product the event originated from. 
        event["Subject"] = email_message_data["Subject"] 
        event["SourceUserName"] = email_message_data["From"] 
        event["DestinationUserName"] = email_message_data["To"] 
        event["found_url"] = ",".join(all_found_url_in_emails_body_list) 
        siemplify.LOGGER.info(f"---Finished processing Event: alert_id: {alert_id} | event_id: {alert_id}")
        return event
    
    
    def find_url_in_email_message_body(siemplify, email_messages_data_list):
        """Search for a url in the email body"""
        all_found_url_in_emails_body_list = []
        for message in email_messages_data_list:
            for part in message.walk():
                if part.get_content_maintype() == 'text\plain':
                    continue
                email_message_body = part.get_payload()
                all_found_urls = re.findall(URLS_REGEX, str(email_message_body))
                for url in all_found_urls:
                    if url not in all_found_url_in_emails_body_list:
                        all_found_url_in_emails_body_list.append(url)
    
    
    def get_email_messages_data(imap_host, imap_port, username, password, folder_to_check):
        """Returns all unread email messages"""
        email_messages_data_list = []
    
        # Login to email using 'imap' module
        mail = imaplib.IMAP4_SSL(imap_host, imap_port)
        mail.login(username, password)
    
        # Determining the default email folder to pull emails from - 'inbox'
        if folder_to_check is None:
            folder_to_check = DEFAULT_FOLDER_TO_CHECK_INBOX
    
        # Selecting the email folder to pull the data from
        mail.select(folder_to_check)
    
        # Storing the email message data
        result, data = mail.search(None, DEFAULT_MESSAGES_TO_READ_UNSEEN)
    
        # If there are several emails collected in the cycle it will split each
        # email message into a separate item in the list chosen_mailbox_items_list
        if len(data) > 0:
            chosen_mailbox_items_list = data[0].split()
            # Iterating each email message and appending to emails_messages_data_list
            for item in chosen_mailbox_items_list:
                typ, email_data = mail.fetch(item, '(RFC 822)')
                # Decoding from binary string to string
                raw_email = email_data[0][1].decode("utf-8")
                # Turning the email data into an email object
                email_message = email.message_from_string(raw_email)
                # Appending the email message data to email_messages_data_list
                email_messages_data_list.append(email_message)
        return email_messages_data_list
    
    
    @output_handler
    def main(is_test_run):
        alerts = [] # The main output of each connector run that contains the alerts data 
        siemplify = SiemplifyConnectorExecution() # Siemplify main SDK wrapper
        siemplify.script_name = CONNECTOR_NAME
    
        # In case of running a test
        if (is_test_run):
            siemplify.LOGGER.info("This is an \"IDE Play Button\"\\\"Run Connector once\" test run")
    
        # Extracting the connector's Params
        username = siemplify.extract_connector_param(param_name="Username") 
        password = siemplify.extract_connector_param(param_name="Password") 
        imap_host = siemplify.extract_connector_param(param_name="IMAP Server Address")
        imap_port = siemplify.extract_connector_param(param_name="IMAP Port")
        folder_to_check = siemplify.extract_connector_param(param_name="Folder to check for emails")
    
        # Getting the digested email message data
        email_messages_data_list = get_email_messages_data(imap_host, imap_port, username, password, folder_to_check)
        # If the email_messages_data_list is not empty
        if len(email_messages_data_list) > 0:
            for message in email_messages_data_list:
                # Converting the email message datetime from string to unix time by SiemplifyUtils functions
                datetime_email_message = message['Date']
                string_to_datetime = convert_string_to_datetime(datetime_email_message)
                datetime_in_unix_time = convert_datetime_to_unix_time(string_to_datetime)
                found_urls_in_email_body = find_url_in_email_message_body(siemplify, email_messages_data_list)
                # Getting the unique id of each email message and removing the suffix '@mail.gmail.com' from the Message-ID, Each alert id can be ingested to the system only once.
                alert_id = message['Message-ID'].replace('@mail.gmail.com','') 
                # Creating the event by calling create_event() function 
                created_event = create_event(siemplify, alert_id, message, found_urls_in_email_body, datetime_in_unix_time) 
                # Creating the alert by calling create_alert() function 
                created_alert = create_alert(siemplify, alert_id, message, datetime_in_unix_time, created_event)
                # Checking that the created_alert is not None 
                if created_alert is not None: 
                    alerts.append(created_alert) 
                    siemplify.LOGGER.info(f'Added Alert {alert_id} to package results')
        # If the inbox for the user has no unread emails. 
        else: 
            siemplify.LOGGER.info(f'The inbox for user {username} has no unread emails') 
        # Returning all the created alerts to the cases module in Siemplify 
        siemplify.return_package(alerts) 
    
    
    if __name__ == '__main__':
    # Connectors run in iterations. The interval is configurable from the ConnectorsScreen UI. 
        is_test_run = not (len(sys.argv) < 2 or sys.argv[1] == 'True') 
        main(is_test_run)
    
  2. Setelah menyalin kode konektor, tinjau modul yang perlu diimpor, lalu lanjutkan ke fungsi utama. Setiap metode yang dipanggil dari fungsi utama akan dibahas lebih lanjut secara mendetail.

Impor yang relevan

Modul Python memiliki sekumpulan fungsi, class, atau variabel yang ditentukan dan diimplementasikan. Untuk menerapkan semua fungsi berikut, impor modul tersebut ke dalam skrip:

from SiemplifyConnectors import SiemplifyConnectorExecution # This module is responsible for executing the connector 
from SiemplifyConnectorsDataModel import AlertInfo # The data model that contains the alert info class
from SiemplifyUtils import output_handler, convert_datetime_to_unix_time, convert_string_to_datetime # The functions that convert time 
import email, imaplib, sys, re

Fungsi utama

Fungsi Main adalah titik entri skrip. Penafsir Python menjalankan kode secara berurutan dan memanggil setiap metode yang ditentukan dalam skrip.

  1. Ekstrak parameter konektor. Gunakan fungsi siemplify.extract_connector_param untuk mengekstrak setiap parameter yang dikonfigurasi untuk konektor (username, password, imap_host, imap_port, folder_to_check).
    # Extracting the connector's Params
        username = siemplify.extract_connector_param(param_name="Username") 
        password = siemplify.extract_connector_param(param_name="Password") 
        imap_host = siemplify.extract_connector_param(param_name="IMAP Server Address") 
        imap_port = siemplify.extract_connector_param(param_name="IMAP Port") 
        folder_to_check = siemplify.extract_connector_param(param_name="Folder to check for emails")
        
  2. Gunakan fungsi get_email_messages_data (`imap_host`, `imap_port`, `username`, `password`, `folder_to_check) untuk mengumpulkan semua informasi dari email yang belum dibaca.
    # Getting the digested email message data 
        email_messages_data_list = get_email_messages_data(imap_host, imap_port, username, password, folder_to_check) 
  3. Setelah Anda menerima semua informasi dari email, konfirmasi bahwa informasi tersebut dikumpulkan, lalu lakukan tindakan berikut pada setiap email:
    # If the email_messages_data_list is not empty 
    if len(email_messages_data_list) > 0:
       for message in email_messages_data_list:
           # Converting the email message datetime from string to unix time by SiemplifyUtils functions

    Kode ini mengekstrak tanggal pesan dengan datetime_email_message = message['Date'], lalu mengonversi tanggal dan waktu ini ke waktu epoch Unix menggunakan fungsi Google SecOps:
    string_to_datetime = convert_string_to_datetime(datetime_email_message) 
    datetime_in_unix_time = convert_datetime_to_unix_time(string_to_datetime) 
  4. Telusuri URL di isi pesan email menggunakan fungsi find_url_in_email_message_body(siemplify_email_messages_data_list). Jika URL ditemukan, gunakan produk lain dalam panduan kami untuk memeriksa apakah URL tersebut berbahaya.
  5. found_urls_in_email_body = find_url_in_email_message_body(siemplify, email_messages_data_list)
  6. Ekstrak ID unik setiap pesan email, lalu tetapkan ke variabel alert_id:
    # Getting the unique id of each email message and removing the suffix '@mail.gmail.com' from the Message-ID, Each alert id can be ingested to the system only once. 
    alert_id = message['Message-ID'].replace('@mail.gmail.com','') 
  7. Setelah Anda mengekstrak semua detail yang diperlukan untuk menyerap pemberitahuan ke platform Google SecOps, buat pemberitahuan dan peristiwa:
    # Creating the event by calling create_event() function 
    created_event = create_event(siemplify, alert_id, message, found_urls_in_email_body, datetime_in_unix_time) 
    # Creating the alert by calling create_alert() function 
    created_alert = create_alert(siemplify, alert_id, message, datetime_in_unix_time, created_event)
  8. Validasi pemberitahuan yang dibuat dan peristiwa yang dibuat. Setelah memvalidasi, tambahkan pemberitahuan ke daftar pemberitahuan.
        # Checking that the created_alert is not None 
        if created_alert is not None: 
            alerts.append(created_alert)     
            siemplify.LOGGER.info(f"Added Alert {alert_id} to package results")    
        
  9. Dalam skenario saat kotak masuk pengguna tertentu tidak memiliki email yang belum dibaca, tambahkan kode berikut:
    else:
        siemplify.LOGGER.info(f"The inbox for user {username} has no unread emails")    
    
  10. Menampilkan daftar pemberitahuan ke sistem dan setiap pemberitahuan kemudian ditampilkan sebagai kasus dalam antrean kasus:
        # Returning all the created alerts to the cases module in Siemplify 
        siemplify.return_package(alerts)
        
  11. Jalankan fungsi Main dalam waktu yang Anda tetapkan di konfigurasi konektor:
    if __name__ == "__main__": 
        # Connectors run in iterations. The interval is configurable from the Connectors UI. 
        is_test_run = not (len(sys.argv) < 2 or sys.argv[1] == 'True')
        main(is_test_run)
    

Mendapatkan pesan email yang belum dibaca

Fungsi Get the unread email message terhubung ke email dengan modul Imap dan Email, serta mengambil detail pesan email. Fungsi ini juga menampilkan daftar yang berisi semua informasi dari semua pesan email yang belum dibaca.

  1. Dari class utama, gunakan fungsi: get_email_messages_data(imap_host, imap_port, username, password, folder_to_check).
    def get_email_messages_data(imap_host, imap_port, username, password, folder_to_check):
        """Returns all unread email messages"""
        email_messages_data_list = []
    
  2. Hubungkan ke email menggunakan imap module:
        # Login to email using 'imap' module
        mail = imaplib.IMAP4_SSL(imap_host, imap_port)
        mail.login(username, password)
        
  3. Tentukan folder dalam email untuk memeriksa pesan yang belum dibaca. Dalam contoh ini, Anda mengekstrak email dari folder Inbox (DEFAULT_FOLDER_TO_CHECK_INBOX = "inbox"):
  4. # Determining the default email folder to pull emails from - 'inbox'
    if folder_to_check is None:
        folder_to_check = DEFAULT_FOLDER_TO_CHECK_INBOX
    # Selecting the email folder to pull the data from mail.select(folder_to_check)
    
  5. Kumpulkan semua pesan yang belum dibaca DEFAULT_MESSAGES_TO_READ_UNSEEN = "UNSEEN", lalu konversi data ini menjadi daftar:
    # Storing the email message data
    result, data = mail.search(None, DEFAULT_MESSAGES_TO_READ_UNSEEN)
    # If there are several emails collected in the cycle it will split each
    # email message into a separate item in the list chosen_mailbox_items_list
    if len(data) > 0:
        chosen_mailbox_items_list = data[0].split()
        # Iterating each email message and appending to emails_messages_data_list
        for item in chosen_mailbox_items_list:
            typ, email_data = mail.fetch(item, '(RFC 822)')
            # Decoding from binary string to string
            raw_email = email_data[0][1].decode("utf-8")
            # Turning the email data into an email object
            email_message = email.message_from_string(raw_email)
            # Appending the email message data to email_messages_data_list
            email_messages_data_list.append(email_message)
    return email_messages_data_list
    

Buat acara

Fungsi Buat acara membuat acara dengan mengaitkan setiap komponen pesan email ke kolom acara.

  1. Dari class utama, buat peristiwa menggunakan fungsi: create_event(siemplify, alert_id, email_message_data, all_found_url_in_emails_body_list, datetime_in_unix_time)
  2. def create_event(siemplify, alert_id, email_message_data, all_found_url_in_emails_body_list, datetime_in_unix_time):
        """Returns the digested data of a single unread email"""
        siemplify.LOGGER.info(f"--- Started processing Event: alert_id: {alert_id} | event_id: {alert_id}") 
    
  3. Buat kamus untuk kolom peristiwa. Kolom wajib diisi adalah event["StartTime"], event["EndTime"], event["event_name"] and event["device_product"] :
  4. event = {} 
    event["StartTime"] = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime 
    event["EndTime"] = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime 
    event["event_name"] = "Suspicious email" 
    event["device_product"] = PRODUCT # ie: "device_product" is the field name that describes the product the event originated from. 
    event["Subject"] = email_message_data["Subject"] 
    event["SourceUserName"] = email_message_data["From"] 
    event["DestinationUserName"] = email_message_data["To"] 
    event["found_url"] = ",".join(all_found_url_in_emails_body_list) 
    siemplify.LOGGER.info(f"---Finished processing Event: alert_id: {alert_id} | event_id: {alert_id}")
    return event
    
  5. Setiap pemberitahuan berisi satu atau beberapa peristiwa. Dalam contoh ini, notifikasi berisi satu peristiwa: satu pesan email. Oleh karena itu, setelah membuat acara, buat pemberitahuan yang berisi semua informasi acara.

Buat info pemberitahuan dan inisialisasi kolom karakteristik informasi pemberitahuan

Fungsi ini membuat pemberitahuan. Setiap pemberitahuan berisi satu atau beberapa peristiwa di dalamnya. Dalam hal ini, setiap pemberitahuan berisi satu peristiwa yang pada dasarnya adalah satu pesan email.

  1. Dari class utama, buat pemberitahuan:
    def create_alert(siemplify, alert_id, email_message_data,datetime_in_unix_time, created_event): 
        """Returns an alert which is one event that contains one unread email message"""
        siemplify.LOGGER.info(f"-------------- Started processing Alert {alert_id}")
        create_event = None
    
  2. Buat instance alert_info dan lakukan inisialisasi:
    # Initializes the alert_info Characteristics Fields 
    alert_info.display_id = f"{alert_id}" 
    alert_info.ticket_id = f"{alert_id}" 
    alert_info.name = email_message_data['Subject'] 
    alert_info.rule_generator = RULE_GENERATOR_EXAMPLE 
    alert_info.start_time = datetime_in_unix_time 
    alert_info.end_time = datetime_in_unix_time 
    alert_info.device_vendor = VENDOR 
    alert_info.device_product = PRODUCT 
    
  3. Setelah membuat pemberitahuan, validasi peristiwa dan tambahkan ke karakteristik aert_info:
  4. siemplify.LOGGER.info(f"Events creating started for alert {alert_id}")
    try:
        if created_event is not None:
            alert_info.events.append(created_event)
        siemplify.LOGGER.info(f"Added Event {alert_id} to Alert {alert_id}")
    # Raise an exception if failed to process the event
    except Exception as e:
        siemplify.LOGGER.error(f"Failed to process event {alert_id}")
        siemplify.LOGGER.exception(e)
    return alert_info
    

Temukan URL di fungsi isi email

Fungsi temukan URL di isi email memindai isi email untuk menemukan URL. Untuk menggunakan fungsi ini, ikuti langkah-langkah berikut:

  1. Untuk setiap pesan email, temukan bagian isi pesan dengan konten teks biasa:
  2. def find_url_in_email_message_body(siemplify, email_messages_data_list):
        """
        Search for a url in the email body,
        """
        all_found_url_in_emails_body_list = []
        for message in email_messages_data_list:
            for part in message.walk():
                if part.get_content_maintype() == 'text\plain':
                    continue
                
    
  3. Jika isi berisi konten yang diperlukan, muat informasi ini dengan email_message_body = part.get_payload(), dan telusuri URL menggunakan ekspresi reguler:
  4. 
    URLS_REGEX=r"http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+"
    
  5. Contoh ini mengekstrak URL dari isi email:
  6. email_message_body = part.get_payload()
    all_found_urls = re.findall(URLS_REGEX, str(email_message_body))
    for url in all_found_urls:
        if url not in all_found_url_in_emails_body_list:
            all_found_url_in_emails_body_list.append(url)
    siemplify.LOGGER.info(f"The URL found : {all_found_url_in_emails_body_list}") 
    return all_found_url_in_emails_body_list 
  7. Setelah meninjau kode konektor, Anda dapat mengonfigurasi konektor untuk menyerap kasus ke dalam platform dari kotak masuk Gmail yang dipilih.

Perlu bantuan lain? Dapatkan jawaban dari anggota Komunitas dan profesional Google SecOps.