커넥터 개발

다음에서 지원:

이 문서에서는 통합을 만든 다음 커넥터를 연결하여 통합 개발 환경 (IDE)에서 새 커넥터를 빌드하는 방법을 설명합니다.

이메일 커넥터 통합 만들기

이메일 커넥터 통합을 만들려면 다음 단계를 따르세요.

  1. 응답 > IDE 창에서 추가를 클릭하여 새 IDE 항목을 추가합니다.
  2. 통합을 선택하고 통합 이름을 Email Connector로 지정합니다.
  3. 만들기를 클릭합니다. 통합이 기본 아이콘과 함께 사이드바에 표시됩니다.
  4. more_vert 맞춤 통합 구성을 클릭하고 다음 설정을 정의합니다.
    • 설명
    • 아이콘
    • Python 종속 항목
    • 통합 매개변수
  5. 저장을 클릭합니다.

이메일 커넥터 만들기

이메일 커넥터를 만들려면 다음 단계를 따르세요.

  1. 응답 > IDE 창에서 추가를 클릭하여 새 IDE 항목을 추가합니다.
  2. 커넥터 라디오 버튼을 선택하고 커넥터 이름을 My Email Connector로 지정합니다.
  3. 목록에서 통합 이메일 커넥터를 선택하여 커넥터를 통합과 연결합니다.
  4. 만들기를 클릭합니다.

커넥터 매개변수 설정

커넥터를 만든 후 커넥터 매개변수를 설정합니다.

  1. 표에 따라 커넥터 매개변수를 설정합니다.

  2. 매개변수 설명 유형
    Username 필수. IMAP 사용자 이름입니다. 커넥터가 이메일을 Google SecOps 플랫폼으로 가져오는 이메일 주소입니다. 기본값은 email@gmail.com입니다. 문자열
    Password 필수. 커넥터가 이메일을 Google SecOps 플랫폼으로 가져오는 데 사용하는 이메일 주소의 인터넷 메시지 액세스 프로토콜 (IMAP) 비밀번호입니다. 비밀번호
    IMAP Port 필수. IMAP에서 원격 서버의 이메일에 액세스하는 데 사용되는 특정 네트워크 포트입니다. 예를 들어 993 (기본값)입니다. int
    IMAP Server Address 필수. IMAP 계정의 수신 메일 서버입니다. 예를 들어 imap.gmail.com (기본값)입니다. 문자열
    Folder to check for emails 선택사항. 지정된 폴더의 이메일만 가져옵니다. 예를 들어 Inbox (기본값)입니다. inbox
  3. 다음 필드 세부정보를 입력합니다.
    1. 제품 필드 이름 = device_product: 알림의 제품 이름에 할당할 원시 필드 값을 결정합니다. 코드에서 Mail (제품)으로 정의된 관련 필드를 찾습니다.
      event["device_product"] = PRODUCT #The PRODUCT constant is `"Mail"
    2. 이벤트 필드 이름 = event_name: 이벤트 유형 필드에 할당할 원시 필드 값을 결정합니다. 코드에서 Suspicious email로 정의된 관련 필드를 찾습니다.
      event["event_name"] = Suspicious email

이메일 커넥터 수정

이메일 커넥터의 매개변수를 수정하려면 다음 단계를 따르세요.

  1. 내 이메일 커넥터를 위해 생성된 다음 코드를 복사하여 IDE에 붙여넣고 안내를 따릅니다.
    from SiemplifyConnectors import SiemplifyConnectorExecution
    from SiemplifyConnectorsDataModel import AlertInfo
    from SiemplifyUtils import output_handler, convert_datetime_to_unix_time, convert_string_to_datetime
    import email, imaplib, sys, re
    
    # CONSTANTS
    CONNECTOR_NAME = "Mail"
    VENDOR = "Mail"
    PRODUCT = "Mail"
    DEFAULT_PRIORITY = 60 # Default is Medium
    RULE_GENERATOR_EXAMPLE = "Mail"
    DEFAULT_FOLDER_TO_CHECK_INBOX = "inbox"
    DEFAULT_MESSAGES_TO_READ_UNSEEN = "UNSEEN"
    URLS_REGEX = r"(?i)\b(?:http(?:s)?:\/\/)?(?:www\.)?[a-zA-Z0-9:%_\+~#=][a-zA-Z0-9:%\._\+~#=]{1,255}\.[a-z]{2,6}\b(?:[-a-zA-Z0-9@:%_\+.~#?&//=]*)"
    
    
    def create_alert(siemplify, alert_id, email_message_data, datetime_in_unix_time, created_event):
        """Returns an alert which is one event that contains one unread email message"""
        siemplify.LOGGER.info(f"Started processing Alert {alert_id}")
    
        create_event = None
        alert_info = AlertInfo()
        # Initializes alert_info
        alert_info.display_id = f"{alert_id}" # Each alert needs to have a unique id, otherwise it won't create a case with the same alert id.
        alert_info.ticket_id = f"{alert_id}" # In default, ticket_id = display_id. However, if for some reason the external alert id is different from the display_id, you can save the original external alert id in the "ticket_id" field. 
        alert_info.name = email_message_data['Subject']
        alert_info.rule_generator = RULE_GENERATOR_EXAMPLE # The name of the siem rule which causes the creation of the alert. 
        alert_info.start_time = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime 
        alert_info.end_time = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime
        alert_info.priority = 60 # Informative = -1,Low = 40,Medium = 60,High = 80,Critical = 100.
        alert_info.device_vendor = VENDOR # The field will be fetched from the Original Alert. If you build this alert manually, state the source vendor of the data. (ie: Microsoft, Mcafee) 
        alert_info.device_product = PRODUCT # The field will be fetched from the Original Alert. If you build this alert manually, state the source product of the data. (ie: ActiveDirectory, AntiVirus)
    
        siemplify.LOGGER.info(f"Events creating started for alert {alert_id}")
        try:
            if created_event is not None:
                alert_info.events.append(created_event)
                siemplify.LOGGER.info(f"Added Event {alert_id} to Alert {alert_id}")
        # Raise an exception if failed to process the event
        except Exception as e:
            siemplify.LOGGER.error(f"Failed to process event {alert_id}")
            siemplify.LOGGER.exception(e)
        return alert_info
    
    
    def create_event(siemplify, alert_id, email_message_data, all_found_url_in_emails_body_list, datetime_in_unix_time):
        """Returns the digested data of a single unread email"""
        siemplify.LOGGER.info(f"--- Started processing Event: alert_id: {alert_id} | event_id: {alert_id}") 
        event = {} 
        event["StartTime"] = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime 
        event["EndTime"] = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime 
        event["event_name"] = "Suspicious email" 
        event["device_product"] = PRODUCT # ie: "device_product" is the field name that describes the product the event originated from. 
        event["Subject"] = email_message_data["Subject"] 
        event["SourceUserName"] = email_message_data["From"] 
        event["DestinationUserName"] = email_message_data["To"] 
        event["found_url"] = ",".join(all_found_url_in_emails_body_list) 
        siemplify.LOGGER.info(f"---Finished processing Event: alert_id: {alert_id} | event_id: {alert_id}")
        return event
    
    
    def find_url_in_email_message_body(siemplify, email_messages_data_list):
        """Search for a url in the email body"""
        all_found_url_in_emails_body_list = []
        for message in email_messages_data_list:
            for part in message.walk():
                if part.get_content_maintype() == 'text\plain':
                    continue
                email_message_body = part.get_payload()
                all_found_urls = re.findall(URLS_REGEX, str(email_message_body))
                for url in all_found_urls:
                    if url not in all_found_url_in_emails_body_list:
                        all_found_url_in_emails_body_list.append(url)
    
    
    def get_email_messages_data(imap_host, imap_port, username, password, folder_to_check):
        """Returns all unread email messages"""
        email_messages_data_list = []
    
        # Login to email using 'imap' module
        mail = imaplib.IMAP4_SSL(imap_host, imap_port)
        mail.login(username, password)
    
        # Determining the default email folder to pull emails from - 'inbox'
        if folder_to_check is None:
            folder_to_check = DEFAULT_FOLDER_TO_CHECK_INBOX
    
        # Selecting the email folder to pull the data from
        mail.select(folder_to_check)
    
        # Storing the email message data
        result, data = mail.search(None, DEFAULT_MESSAGES_TO_READ_UNSEEN)
    
        # If there are several emails collected in the cycle it will split each
        # email message into a separate item in the list chosen_mailbox_items_list
        if len(data) > 0:
            chosen_mailbox_items_list = data[0].split()
            # Iterating each email message and appending to emails_messages_data_list
            for item in chosen_mailbox_items_list:
                typ, email_data = mail.fetch(item, '(RFC 822)')
                # Decoding from binary string to string
                raw_email = email_data[0][1].decode("utf-8")
                # Turning the email data into an email object
                email_message = email.message_from_string(raw_email)
                # Appending the email message data to email_messages_data_list
                email_messages_data_list.append(email_message)
        return email_messages_data_list
    
    
    @output_handler
    def main(is_test_run):
        alerts = [] # The main output of each connector run that contains the alerts data 
        siemplify = SiemplifyConnectorExecution() # Siemplify main SDK wrapper
        siemplify.script_name = CONNECTOR_NAME
    
        # In case of running a test
        if (is_test_run):
            siemplify.LOGGER.info("This is an \"IDE Play Button\"\\\"Run Connector once\" test run")
    
        # Extracting the connector's Params
        username = siemplify.extract_connector_param(param_name="Username") 
        password = siemplify.extract_connector_param(param_name="Password") 
        imap_host = siemplify.extract_connector_param(param_name="IMAP Server Address")
        imap_port = siemplify.extract_connector_param(param_name="IMAP Port")
        folder_to_check = siemplify.extract_connector_param(param_name="Folder to check for emails")
    
        # Getting the digested email message data
        email_messages_data_list = get_email_messages_data(imap_host, imap_port, username, password, folder_to_check)
        # If the email_messages_data_list is not empty
        if len(email_messages_data_list) > 0:
            for message in email_messages_data_list:
                # Converting the email message datetime from string to unix time by SiemplifyUtils functions
                datetime_email_message = message['Date']
                string_to_datetime = convert_string_to_datetime(datetime_email_message)
                datetime_in_unix_time = convert_datetime_to_unix_time(string_to_datetime)
                found_urls_in_email_body = find_url_in_email_message_body(siemplify, email_messages_data_list)
                # Getting the unique id of each email message and removing the suffix '@mail.gmail.com' from the Message-ID, Each alert id can be ingested to the system only once.
                alert_id = message['Message-ID'].replace('@mail.gmail.com','') 
                # Creating the event by calling create_event() function 
                created_event = create_event(siemplify, alert_id, message, found_urls_in_email_body, datetime_in_unix_time) 
                # Creating the alert by calling create_alert() function 
                created_alert = create_alert(siemplify, alert_id, message, datetime_in_unix_time, created_event)
                # Checking that the created_alert is not None 
                if created_alert is not None: 
                    alerts.append(created_alert) 
                    siemplify.LOGGER.info(f'Added Alert {alert_id} to package results')
        # If the inbox for the user has no unread emails. 
        else: 
            siemplify.LOGGER.info(f'The inbox for user {username} has no unread emails') 
        # Returning all the created alerts to the cases module in Siemplify 
        siemplify.return_package(alerts) 
    
    
    if __name__ == '__main__':
    # Connectors run in iterations. The interval is configurable from the ConnectorsScreen UI. 
        is_test_run = not (len(sys.argv) < 2 or sys.argv[1] == 'True') 
        main(is_test_run)
    
  2. 커넥터의 코드를 복사한 후 가져와야 하는 필수 모듈을 검토하고 기본 함수로 진행합니다. 기본 함수에서 호출되는 각 메서드는 나중에 더 자세히 설명합니다.

관련 가져오기

Python 모듈에는 정의되고 구현된 함수, 클래스 또는 변수 집합이 있습니다. 다음 함수를 모두 구현하려면 해당 모듈을 스크립트로 가져오세요.

from SiemplifyConnectors import SiemplifyConnectorExecution # This module is responsible for executing the connector 
from SiemplifyConnectorsDataModel import AlertInfo # The data model that contains the alert info class
from SiemplifyUtils import output_handler, convert_datetime_to_unix_time, convert_string_to_datetime # The functions that convert time 
import email, imaplib, sys, re

기본 함수

Main 함수는 스크립트의 진입점입니다. Python 인터프리터는 코드를 순차적으로 실행하고 스크립트에 정의된 각 메서드를 호출합니다.

  1. 커넥터 매개변수를 추출합니다. siemplify.extract_connector_param 함수를 사용하여 커넥터의 각 구성된 매개변수 (username, password, imap_host, imap_port, folder_to_check)를 추출합니다.
    # Extracting the connector's Params
        username = siemplify.extract_connector_param(param_name="Username") 
        password = siemplify.extract_connector_param(param_name="Password") 
        imap_host = siemplify.extract_connector_param(param_name="IMAP Server Address") 
        imap_port = siemplify.extract_connector_param(param_name="IMAP Port") 
        folder_to_check = siemplify.extract_connector_param(param_name="Folder to check for emails")
        
  2. get_email_messages_data (`imap_host`, `imap_port`, `username`, `password`, `folder_to_check) 함수를 사용하여 읽지 않은 이메일의 모든 정보를 수집해 줘.
    # Getting the digested email message data 
        email_messages_data_list = get_email_messages_data(imap_host, imap_port, username, password, folder_to_check) 
  3. 이메일에서 모든 정보를 받은 후 정보가 수집되었는지 확인하고 각 이메일에서 다음 작업을 실행합니다.
    # If the email_messages_data_list is not empty 
    if len(email_messages_data_list) > 0:
       for message in email_messages_data_list:
           # Converting the email message datetime from string to unix time by SiemplifyUtils functions

    이 코드는 datetime_email_message = message['Date']로 메시지 날짜를 추출한 다음 Google SecOps 함수를 사용하여 이 날짜 시간을 Unix 에포크 시간으로 변환합니다.
    string_to_datetime = convert_string_to_datetime(datetime_email_message) 
    datetime_in_unix_time = convert_datetime_to_unix_time(string_to_datetime) 
  4. find_url_in_email_message_body(siemplify_email_messages_data_list) 함수를 사용하여 이메일 메시지 본문에서 URL을 검색합니다. URL이 발견되면 플레이북의 다른 제품을 사용하여 악성 여부를 확인합니다.
  5. found_urls_in_email_body = find_url_in_email_message_body(siemplify, email_messages_data_list)
  6. 각 이메일 메시지의 고유 ID를 추출하여 alert_id 변수에 할당합니다.
    # Getting the unique id of each email message and removing the suffix '@mail.gmail.com' from the Message-ID, Each alert id can be ingested to the system only once. 
    alert_id = message['Message-ID'].replace('@mail.gmail.com','') 
  7. Google SecOps 플랫폼에 알림을 수집하는 데 필요한 모든 세부정보를 추출한 후 알림과 이벤트를 만듭니다.
    # Creating the event by calling create_event() function 
    created_event = create_event(siemplify, alert_id, message, found_urls_in_email_body, datetime_in_unix_time) 
    # Creating the alert by calling create_alert() function 
    created_alert = create_alert(siemplify, alert_id, message, datetime_in_unix_time, created_event)
  8. 생성된 알림과 생성된 이벤트를 검증합니다. 유효성을 검사한 후 알림을 알림 목록에 추가합니다.
        # Checking that the created_alert is not None 
        if created_alert is not None: 
            alerts.append(created_alert)     
            siemplify.LOGGER.info(f"Added Alert {alert_id} to package results")    
        
  9. 지정된 사용자의 받은편지함에 읽지 않은 이메일이 없는 시나리오에서는 다음 코드를 추가합니다.
    else:
        siemplify.LOGGER.info(f"The inbox for user {username} has no unread emails")    
    
  10. 알림 목록을 시스템에 반환하면 각 알림이 케이스 대기열에 케이스로 표시됩니다.
        # Returning all the created alerts to the cases module in Siemplify 
        siemplify.return_package(alerts)
        
  11. 커넥터 구성에서 설정한 시간 내에 Main 함수를 실행합니다.
    if __name__ == "__main__": 
        # Connectors run in iterations. The interval is configurable from the Connectors UI. 
        is_test_run = not (len(sys.argv) < 2 or sys.argv[1] == 'True')
        main(is_test_run)
    

읽지 않은 이메일 메시지 가져오기

읽지 않은 이메일 메시지 가져오기 함수는 ImapEmail 모듈을 사용하여 이메일에 연결하고 이메일 메시지 세부정보를 가져옵니다. 또한 읽지 않은 모든 이메일 메시지의 모든 정보가 포함된 목록을 반환합니다.

  1. 기본 클래스에서 get_email_messages_data(imap_host, imap_port, username, password, folder_to_check) 함수를 사용합니다.
    def get_email_messages_data(imap_host, imap_port, username, password, folder_to_check):
        """Returns all unread email messages"""
        email_messages_data_list = []
    
  2. imap module를 사용하여 이메일에 연결합니다.
        # Login to email using 'imap' module
        mail = imaplib.IMAP4_SSL(imap_host, imap_port)
        mail.login(username, password)
        
  3. 읽지 않은 메시지를 확인할 이메일의 폴더를 확인합니다. 이 예에서는 Inbox 폴더 (DEFAULT_FOLDER_TO_CHECK_INBOX = "inbox")에서 이메일을 추출합니다.
  4. # Determining the default email folder to pull emails from - 'inbox'
    if folder_to_check is None:
        folder_to_check = DEFAULT_FOLDER_TO_CHECK_INBOX
    # Selecting the email folder to pull the data from mail.select(folder_to_check)
    
  5. 읽지 않은 메일을 모두 수집하고 DEFAULT_MESSAGES_TO_READ_UNSEEN = "UNSEEN" 이 데이터를 목록으로 변환합니다.
    # Storing the email message data
    result, data = mail.search(None, DEFAULT_MESSAGES_TO_READ_UNSEEN)
    # If there are several emails collected in the cycle it will split each
    # email message into a separate item in the list chosen_mailbox_items_list
    if len(data) > 0:
        chosen_mailbox_items_list = data[0].split()
        # Iterating each email message and appending to emails_messages_data_list
        for item in chosen_mailbox_items_list:
            typ, email_data = mail.fetch(item, '(RFC 822)')
            # Decoding from binary string to string
            raw_email = email_data[0][1].decode("utf-8")
            # Turning the email data into an email object
            email_message = email.message_from_string(raw_email)
            # Appending the email message data to email_messages_data_list
            email_messages_data_list.append(email_message)
    return email_messages_data_list
    

이벤트 만들기

이벤트 만들기 함수는 각 이메일 메시지 구성요소를 이벤트 필드에 각각 연결하여 이벤트를 만듭니다.

  1. 기본 클래스에서 create_event(siemplify, alert_id, email_message_data, all_found_url_in_emails_body_list, datetime_in_unix_time) 함수를 사용하여 이벤트를 만듭니다.
  2. def create_event(siemplify, alert_id, email_message_data, all_found_url_in_emails_body_list, datetime_in_unix_time):
        """Returns the digested data of a single unread email"""
        siemplify.LOGGER.info(f"--- Started processing Event: alert_id: {alert_id} | event_id: {alert_id}") 
    
  3. 이벤트 필드의 사전을 만듭니다. 필수 입력란은 event["StartTime"], event["EndTime"], event["event_name"] and event["device_product"] 입니다.
  4. event = {} 
    event["StartTime"] = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime 
    event["EndTime"] = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime 
    event["event_name"] = "Suspicious email" 
    event["device_product"] = PRODUCT # ie: "device_product" is the field name that describes the product the event originated from. 
    event["Subject"] = email_message_data["Subject"] 
    event["SourceUserName"] = email_message_data["From"] 
    event["DestinationUserName"] = email_message_data["To"] 
    event["found_url"] = ",".join(all_found_url_in_emails_body_list) 
    siemplify.LOGGER.info(f"---Finished processing Event: alert_id: {alert_id} | event_id: {alert_id}")
    return event
    
  5. 각 알림에는 하나 이상의 이벤트가 포함됩니다. 이 예에서 알림에는 하나의 이벤트(이메일 메시지)가 포함되어 있습니다. 따라서 이벤트를 만든 후 모든 이벤트 정보가 포함된 알림을 만듭니다.

알림 정보를 만들고 알림 정보 특성 필드를 초기화합니다.

이 함수는 알림을 만듭니다. 각 알림에는 하나 이상의 이벤트가 포함되어 있습니다. 이 경우 각 알림에는 기본적으로 하나의 이메일 메시지인 하나의 이벤트가 포함됩니다.

  1. 기본 클래스에서 알림을 만듭니다.
    def create_alert(siemplify, alert_id, email_message_data,datetime_in_unix_time, created_event): 
        """Returns an alert which is one event that contains one unread email message"""
        siemplify.LOGGER.info(f"-------------- Started processing Alert {alert_id}")
        create_event = None
    
  2. alert_info 인스턴스를 만들고 초기화합니다.
    # Initializes the alert_info Characteristics Fields 
    alert_info.display_id = f"{alert_id}" 
    alert_info.ticket_id = f"{alert_id}" 
    alert_info.name = email_message_data['Subject'] 
    alert_info.rule_generator = RULE_GENERATOR_EXAMPLE 
    alert_info.start_time = datetime_in_unix_time 
    alert_info.end_time = datetime_in_unix_time 
    alert_info.device_vendor = VENDOR 
    alert_info.device_product = PRODUCT 
    
  3. 알림을 만든 후 이벤트를 검증하고 aert_info 특성에 추가합니다.
  4. siemplify.LOGGER.info(f"Events creating started for alert {alert_id}")
    try:
        if created_event is not None:
            alert_info.events.append(created_event)
        siemplify.LOGGER.info(f"Added Event {alert_id} to Alert {alert_id}")
    # Raise an exception if failed to process the event
    except Exception as e:
        siemplify.LOGGER.error(f"Failed to process event {alert_id}")
        siemplify.LOGGER.exception(e)
    return alert_info
    

이메일 본문 함수에서 URL 찾기

이메일 본문에서 URL 찾기 기능은 이메일 본문에서 URL을 검색합니다. 이 기능을 사용하려면 다음 단계를 따르세요.

  1. 각 이메일 메시지에서 일반 텍스트 콘텐츠가 포함된 메시지 본문 부분을 찾습니다.
  2. def find_url_in_email_message_body(siemplify, email_messages_data_list):
        """
        Search for a url in the email body,
        """
        all_found_url_in_emails_body_list = []
        for message in email_messages_data_list:
            for part in message.walk():
                if part.get_content_maintype() == 'text\plain':
                    continue
                
    
  3. 본문에 필요한 콘텐츠가 포함되어 있으면 email_message_body = part.get_payload()로 이 정보를 로드하고 정규식을 사용하여 URL을 검색합니다.
  4. 
    URLS_REGEX=r"http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+"
    
  5. 이 예에서는 이메일 본문에서 URL을 추출합니다.
  6. email_message_body = part.get_payload()
    all_found_urls = re.findall(URLS_REGEX, str(email_message_body))
    for url in all_found_urls:
        if url not in all_found_url_in_emails_body_list:
            all_found_url_in_emails_body_list.append(url)
    siemplify.LOGGER.info(f"The URL found : {all_found_url_in_emails_body_list}") 
    return all_found_url_in_emails_body_list 
  7. 커넥터 코드를 검토한 후 선택한 Gmail 받은편지함에서 케이스를 플랫폼으로 수집하도록 커넥터를 구성할 수 있습니다.

도움이 더 필요하신가요? 커뮤니티 회원 및 Google SecOps 전문가로부터 답변을 받으세요.