コネクタを開発する

以下でサポートされています。

このドキュメントでは、統合開発環境(IDE)でインテグレーションを作成し、コネクタをリンクして新しいコネクタを構築する方法について説明します。

メール コネクタ統合を作成する

メール コネクタ統合を作成する手順は次のとおりです。

  1. [Response > IDE] ウィンドウで、 [Add] をクリックして新しい IDE アイテムを追加します。
  2. [統合] を選択し、統合に Email Connector という名前を付けます。
  3. [作成] をクリックします。統合がデフォルトのアイコンとともにサイドバーに表示されます。
  4. more_vert アイコン [Configure Custom Integration] をクリックして、次の設定を定義します。
    • 説明
    • アイコン
    • Python の依存関係
    • 統合パラメータ
  5. [保存] をクリックします。

メール コネクタを作成する

メール コネクタを作成する手順は次のとおりです。

  1. [Response > IDE] ウィンドウで、 [Add] をクリックして新しい IDE アイテムを追加します。
  2. [Connector] ラジオボタンを選択し、コネクタに My Email Connector という名前を付けます。
  3. リストで、統合 [Email Connector] を選択して、コネクタを統合に関連付けます。
  4. [作成] をクリックします。

コネクタ パラメータを設定する

コネクタを作成したら、コネクタ パラメータを設定します。

  1. 次の表に従ってコネクタ パラメータを設定します。

  2. パラメータ 説明 タイプ
    Username 必須。IMAP ユーザー名。コネクタが Google SecOps プラットフォームにメールを pull するメールアドレス。デフォルト値は email@gmail.com です。 文字列
    Password 必須。コネクタがメールを Google SecOps プラットフォームに取得するために使用するメールアドレスの Internet Message Access Protocol(IMAP)パスワード。 パスワード
    IMAP Port 必須。IMAP がリモート サーバーからメールにアクセスするために使用する特定のネットワーク ポート。例: 993(デフォルト値)。 整数
    IMAP Server Address 必須。IMAP アカウントの受信メールサーバー。例: imap.gmail.com(デフォルト値)。 文字列
    Folder to check for emails (省略可)指定したフォルダからのみメールを pull します。例: Inbox(デフォルト値)。 受信トレイ
  3. 次のフィールドの詳細を入力します。
    1. Product Field Name = device_product: アラートのプロダクト名に割り当てる未加工フィールドの値を決定します。コードの関連するフィールド(Mail(プロダクト)として定義されている)を見つけます。
      event["device_product"] = PRODUCT #The PRODUCT constant is `"Mail"
    2. Event Field Name = event_name: イベントタイプ フィールドに割り当てる未加工フィールドの値を決定します。コード内の関連フィールド(Suspicious email として定義)を見つけます。
      event["event_name"] = Suspicious email

メール コネクタを編集する

メール コネクタのパラメータを編集する手順は次のとおりです。

  1. My Email Connector 用に作成された次のコードをコピーして IDE に貼り付け、次の手順に沿って操作します。
    from SiemplifyConnectors import SiemplifyConnectorExecution
    from SiemplifyConnectorsDataModel import AlertInfo
    from SiemplifyUtils import output_handler, convert_datetime_to_unix_time, convert_string_to_datetime
    import email, imaplib, sys, re
    
    # CONSTANTS
    CONNECTOR_NAME = "Mail"
    VENDOR = "Mail"
    PRODUCT = "Mail"
    DEFAULT_PRIORITY = 60 # Default is Medium
    RULE_GENERATOR_EXAMPLE = "Mail"
    DEFAULT_FOLDER_TO_CHECK_INBOX = "inbox"
    DEFAULT_MESSAGES_TO_READ_UNSEEN = "UNSEEN"
    URLS_REGEX = r"(?i)\b(?:http(?:s)?:\/\/)?(?:www\.)?[a-zA-Z0-9:%_\+~#=][a-zA-Z0-9:%\._\+~#=]{1,255}\.[a-z]{2,6}\b(?:[-a-zA-Z0-9@:%_\+.~#?&//=]*)"
    
    
    def create_alert(siemplify, alert_id, email_message_data, datetime_in_unix_time, created_event):
        """Returns an alert which is one event that contains one unread email message"""
        siemplify.LOGGER.info(f"Started processing Alert {alert_id}")
    
        create_event = None
        alert_info = AlertInfo()
        # Initializes alert_info
        alert_info.display_id = f"{alert_id}" # Each alert needs to have a unique id, otherwise it won't create a case with the same alert id.
        alert_info.ticket_id = f"{alert_id}" # In default, ticket_id = display_id. However, if for some reason the external alert id is different from the display_id, you can save the original external alert id in the "ticket_id" field. 
        alert_info.name = email_message_data['Subject']
        alert_info.rule_generator = RULE_GENERATOR_EXAMPLE # The name of the siem rule which causes the creation of the alert. 
        alert_info.start_time = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime 
        alert_info.end_time = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime
        alert_info.priority = 60 # Informative = -1,Low = 40,Medium = 60,High = 80,Critical = 100.
        alert_info.device_vendor = VENDOR # The field will be fetched from the Original Alert. If you build this alert manually, state the source vendor of the data. (ie: Microsoft, Mcafee) 
        alert_info.device_product = PRODUCT # The field will be fetched from the Original Alert. If you build this alert manually, state the source product of the data. (ie: ActiveDirectory, AntiVirus)
    
        siemplify.LOGGER.info(f"Events creating started for alert {alert_id}")
        try:
            if created_event is not None:
                alert_info.events.append(created_event)
                siemplify.LOGGER.info(f"Added Event {alert_id} to Alert {alert_id}")
        # Raise an exception if failed to process the event
        except Exception as e:
            siemplify.LOGGER.error(f"Failed to process event {alert_id}")
            siemplify.LOGGER.exception(e)
        return alert_info
    
    
    def create_event(siemplify, alert_id, email_message_data, all_found_url_in_emails_body_list, datetime_in_unix_time):
        """Returns the digested data of a single unread email"""
        siemplify.LOGGER.info(f"--- Started processing Event: alert_id: {alert_id} | event_id: {alert_id}") 
        event = {} 
        event["StartTime"] = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime 
        event["EndTime"] = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime 
        event["event_name"] = "Suspicious email" 
        event["device_product"] = PRODUCT # ie: "device_product" is the field name that describes the product the event originated from. 
        event["Subject"] = email_message_data["Subject"] 
        event["SourceUserName"] = email_message_data["From"] 
        event["DestinationUserName"] = email_message_data["To"] 
        event["found_url"] = ",".join(all_found_url_in_emails_body_list) 
        siemplify.LOGGER.info(f"---Finished processing Event: alert_id: {alert_id} | event_id: {alert_id}")
        return event
    
    
    def find_url_in_email_message_body(siemplify, email_messages_data_list):
        """Search for a url in the email body"""
        all_found_url_in_emails_body_list = []
        for message in email_messages_data_list:
            for part in message.walk():
                if part.get_content_maintype() == 'text\plain':
                    continue
                email_message_body = part.get_payload()
                all_found_urls = re.findall(URLS_REGEX, str(email_message_body))
                for url in all_found_urls:
                    if url not in all_found_url_in_emails_body_list:
                        all_found_url_in_emails_body_list.append(url)
    
    
    def get_email_messages_data(imap_host, imap_port, username, password, folder_to_check):
        """Returns all unread email messages"""
        email_messages_data_list = []
    
        # Login to email using 'imap' module
        mail = imaplib.IMAP4_SSL(imap_host, imap_port)
        mail.login(username, password)
    
        # Determining the default email folder to pull emails from - 'inbox'
        if folder_to_check is None:
            folder_to_check = DEFAULT_FOLDER_TO_CHECK_INBOX
    
        # Selecting the email folder to pull the data from
        mail.select(folder_to_check)
    
        # Storing the email message data
        result, data = mail.search(None, DEFAULT_MESSAGES_TO_READ_UNSEEN)
    
        # If there are several emails collected in the cycle it will split each
        # email message into a separate item in the list chosen_mailbox_items_list
        if len(data) > 0:
            chosen_mailbox_items_list = data[0].split()
            # Iterating each email message and appending to emails_messages_data_list
            for item in chosen_mailbox_items_list:
                typ, email_data = mail.fetch(item, '(RFC 822)')
                # Decoding from binary string to string
                raw_email = email_data[0][1].decode("utf-8")
                # Turning the email data into an email object
                email_message = email.message_from_string(raw_email)
                # Appending the email message data to email_messages_data_list
                email_messages_data_list.append(email_message)
        return email_messages_data_list
    
    
    @output_handler
    def main(is_test_run):
        alerts = [] # The main output of each connector run that contains the alerts data 
        siemplify = SiemplifyConnectorExecution() # Siemplify main SDK wrapper
        siemplify.script_name = CONNECTOR_NAME
    
        # In case of running a test
        if (is_test_run):
            siemplify.LOGGER.info("This is an \"IDE Play Button\"\\\"Run Connector once\" test run")
    
        # Extracting the connector's Params
        username = siemplify.extract_connector_param(param_name="Username") 
        password = siemplify.extract_connector_param(param_name="Password") 
        imap_host = siemplify.extract_connector_param(param_name="IMAP Server Address")
        imap_port = siemplify.extract_connector_param(param_name="IMAP Port")
        folder_to_check = siemplify.extract_connector_param(param_name="Folder to check for emails")
    
        # Getting the digested email message data
        email_messages_data_list = get_email_messages_data(imap_host, imap_port, username, password, folder_to_check)
        # If the email_messages_data_list is not empty
        if len(email_messages_data_list) > 0:
            for message in email_messages_data_list:
                # Converting the email message datetime from string to unix time by SiemplifyUtils functions
                datetime_email_message = message['Date']
                string_to_datetime = convert_string_to_datetime(datetime_email_message)
                datetime_in_unix_time = convert_datetime_to_unix_time(string_to_datetime)
                found_urls_in_email_body = find_url_in_email_message_body(siemplify, email_messages_data_list)
                # Getting the unique id of each email message and removing the suffix '@mail.gmail.com' from the Message-ID, Each alert id can be ingested to the system only once.
                alert_id = message['Message-ID'].replace('@mail.gmail.com','') 
                # Creating the event by calling create_event() function 
                created_event = create_event(siemplify, alert_id, message, found_urls_in_email_body, datetime_in_unix_time) 
                # Creating the alert by calling create_alert() function 
                created_alert = create_alert(siemplify, alert_id, message, datetime_in_unix_time, created_event)
                # Checking that the created_alert is not None 
                if created_alert is not None: 
                    alerts.append(created_alert) 
                    siemplify.LOGGER.info(f'Added Alert {alert_id} to package results')
        # If the inbox for the user has no unread emails. 
        else: 
            siemplify.LOGGER.info(f'The inbox for user {username} has no unread emails') 
        # Returning all the created alerts to the cases module in Siemplify 
        siemplify.return_package(alerts) 
    
    
    if __name__ == '__main__':
    # Connectors run in iterations. The interval is configurable from the ConnectorsScreen UI. 
        is_test_run = not (len(sys.argv) < 2 or sys.argv[1] == 'True') 
        main(is_test_run)
    
  2. コネクタのコードをコピーしたら、インポートする必要なモジュールを確認し、メイン関数に進みます。メイン関数から呼び出される各メソッドについては、後で詳しく説明します。

関連するインポート

Python モジュールには、一連の関数、クラス、変数が定義、実装されます。次のすべての関数を実装するには、これらのモジュールをスクリプトにインポートします。

from SiemplifyConnectors import SiemplifyConnectorExecution # This module is responsible for executing the connector 
from SiemplifyConnectorsDataModel import AlertInfo # The data model that contains the alert info class
from SiemplifyUtils import output_handler, convert_datetime_to_unix_time, convert_string_to_datetime # The functions that convert time 
import email, imaplib, sys, re

メイン関数

Main 関数は、スクリプトのエントリ ポイントです。Python インタープリタはコードを順番に実行し、スクリプトで定義された各メソッドを呼び出します。

  1. コネクタ パラメータを抽出します。siemplify.extract_connector_param 関数を使用して、コネクタ(usernamepasswordimap_hostimap_portfolder_to_check)用に構成された各パラメータを抽出します。
    # Extracting the connector's Params
        username = siemplify.extract_connector_param(param_name="Username") 
        password = siemplify.extract_connector_param(param_name="Password") 
        imap_host = siemplify.extract_connector_param(param_name="IMAP Server Address") 
        imap_port = siemplify.extract_connector_param(param_name="IMAP Port") 
        folder_to_check = siemplify.extract_connector_param(param_name="Folder to check for emails")
        
  2. 関数 get_email_messages_data (`imap_host`, `imap_port`, `username`, `password`, `folder_to_check) を使用して、未読メールからすべての情報を収集します。
    # Getting the digested email message data 
        email_messages_data_list = get_email_messages_data(imap_host, imap_port, username, password, folder_to_check) 
  3. メールからすべての情報を受け取ったら、情報が収集されたことを確認し、各メールに対して次の処理を行います。
    # If the email_messages_data_list is not empty 
    if len(email_messages_data_list) > 0:
       for message in email_messages_data_list:
           # Converting the email message datetime from string to unix time by SiemplifyUtils functions

    このコードでは、datetime_email_message = message['Date'] によってメッセージの日付を抽出し、Google SecOps 関数を使用して、この日時を Unix エポック時間に変換します。
    string_to_datetime = convert_string_to_datetime(datetime_email_message) 
    datetime_in_unix_time = convert_datetime_to_unix_time(string_to_datetime) 
  4. find_url_in_email_message_body(siemplify_email_messages_data_list) 関数を使用して、メール メッセージ本文の URL を検索します。URL が見つかった場合は、ハンドブック内の他のプロダクトを使用して、その URL が悪意のあるものかどうかを確認します。
  5. found_urls_in_email_body = find_url_in_email_message_body(siemplify, email_messages_data_list)
  6. 各メール メッセージの一意の ID を抽出し、alert_id 変数に割り当てます。
    # Getting the unique id of each email message and removing the suffix '@mail.gmail.com' from the Message-ID, Each alert id can be ingested to the system only once. 
    alert_id = message['Message-ID'].replace('@mail.gmail.com','') 
  7. アラートを Google SecOps プラットフォームに取り込むために必要な詳細情報をすべて抽出したら、アラートとイベントを作成します。
    # Creating the event by calling create_event() function 
    created_event = create_event(siemplify, alert_id, message, found_urls_in_email_body, datetime_in_unix_time) 
    # Creating the alert by calling create_alert() function 
    created_alert = create_alert(siemplify, alert_id, message, datetime_in_unix_time, created_event)
  8. 作成されたアラートと作成されたイベントを検証します。検証後、アラートをアラート リストに追加します。
        # Checking that the created_alert is not None 
        if created_alert is not None: 
            alerts.append(created_alert)     
            siemplify.LOGGER.info(f"Added Alert {alert_id} to package results")    
        
  9. ユーザーの受信トレイに未読メールがない場合は、次のコードを追加します。
    else:
        siemplify.LOGGER.info(f"The inbox for user {username} has no unread emails")    
    
  10. アラートリストをシステムに返し、各アラートがケースキューにケースとして表示されます。
        # Returning all the created alerts to the cases module in Siemplify 
        siemplify.return_package(alerts)
        
  11. コネクタ構成で設定した時間内に Main 関数を実行します。
    if __name__ == "__main__": 
        # Connectors run in iterations. The interval is configurable from the Connectors UI. 
        is_test_run = not (len(sys.argv) < 2 or sys.argv[1] == 'True')
        main(is_test_run)
    

未読メール メッセージを取得する

未読メール メッセージの取得関数は、Imap モジュールと Email モジュールを使用してメールに接続し、メール メッセージの詳細を取得します。また、すべての未読メール メッセージのすべての情報を含むリストも返します。

  1. main クラスから、関数 get_email_messages_data(imap_host, imap_port, username, password, folder_to_check) を使用します。
    def get_email_messages_data(imap_host, imap_port, username, password, folder_to_check):
        """Returns all unread email messages"""
        email_messages_data_list = []
    
  2. imap module を使用してメールに接続します。
        # Login to email using 'imap' module
        mail = imaplib.IMAP4_SSL(imap_host, imap_port)
        mail.login(username, password)
        
  3. メール内のフォルダを特定して、未読メッセージを確認します。 この例では、Inbox フォルダ (DEFAULT_FOLDER_TO_CHECK_INBOX = "inbox") からメールを抽出します。
  4. # Determining the default email folder to pull emails from - 'inbox'
    if folder_to_check is None:
        folder_to_check = DEFAULT_FOLDER_TO_CHECK_INBOX
    # Selecting the email folder to pull the data from mail.select(folder_to_check)
    
  5. 未読のメッセージをすべて収集し DEFAULT_MESSAGES_TO_READ_UNSEEN = "UNSEEN"、このデータをリストに変換します。
    # Storing the email message data
    result, data = mail.search(None, DEFAULT_MESSAGES_TO_READ_UNSEEN)
    # If there are several emails collected in the cycle it will split each
    # email message into a separate item in the list chosen_mailbox_items_list
    if len(data) > 0:
        chosen_mailbox_items_list = data[0].split()
        # Iterating each email message and appending to emails_messages_data_list
        for item in chosen_mailbox_items_list:
            typ, email_data = mail.fetch(item, '(RFC 822)')
            # Decoding from binary string to string
            raw_email = email_data[0][1].decode("utf-8")
            # Turning the email data into an email object
            email_message = email.message_from_string(raw_email)
            # Appending the email message data to email_messages_data_list
            email_messages_data_list.append(email_message)
    return email_messages_data_list
    

イベントを作成する

イベントを作成する関数は、各メール メッセージ コンポーネントをイベント フィールドにそれぞれ関連付けてイベントを作成します。

  1. main クラスから、関数 create_event(siemplify, alert_id, email_message_data, all_found_url_in_emails_body_list, datetime_in_unix_time) を使用してイベントを作成します。
  2. def create_event(siemplify, alert_id, email_message_data, all_found_url_in_emails_body_list, datetime_in_unix_time):
        """Returns the digested data of a single unread email"""
        siemplify.LOGGER.info(f"--- Started processing Event: alert_id: {alert_id} | event_id: {alert_id}") 
    
  3. イベント フィールドの辞書を作成します。必須フィールドは event["StartTime"], event["EndTime"], event["event_name"] and event["device_product"] です。
  4. event = {} 
    event["StartTime"] = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime 
    event["EndTime"] = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime 
    event["event_name"] = "Suspicious email" 
    event["device_product"] = PRODUCT # ie: "device_product" is the field name that describes the product the event originated from. 
    event["Subject"] = email_message_data["Subject"] 
    event["SourceUserName"] = email_message_data["From"] 
    event["DestinationUserName"] = email_message_data["To"] 
    event["found_url"] = ",".join(all_found_url_in_emails_body_list) 
    siemplify.LOGGER.info(f"---Finished processing Event: alert_id: {alert_id} | event_id: {alert_id}")
    return event
    
  5. 各アラートには 1 つ以上のイベントが含まれます。この例では、アラートに 1 つのイベント(1 つのメール メッセージ)が含まれています。したがって、イベントを作成したら、すべてのイベント情報を含むアラートを作成します。

アラート情報を作成し、アラート情報特性フィールドを初期化する

この関数はアラートを作成します。各アラートには 1 つ以上のイベントが含まれています。この場合、各アラートには 1 つのイベント(基本的には 1 つのメール メッセージ)が含まれます。

  1. main クラスからアラートを作成します。
    def create_alert(siemplify, alert_id, email_message_data,datetime_in_unix_time, created_event): 
        """Returns an alert which is one event that contains one unread email message"""
        siemplify.LOGGER.info(f"-------------- Started processing Alert {alert_id}")
        create_event = None
    
  2. alert_info インスタンスを作成して初期化します。
    # Initializes the alert_info Characteristics Fields 
    alert_info.display_id = f"{alert_id}" 
    alert_info.ticket_id = f"{alert_id}" 
    alert_info.name = email_message_data['Subject'] 
    alert_info.rule_generator = RULE_GENERATOR_EXAMPLE 
    alert_info.start_time = datetime_in_unix_time 
    alert_info.end_time = datetime_in_unix_time 
    alert_info.device_vendor = VENDOR 
    alert_info.device_product = PRODUCT 
    
  3. アラートを作成したら、イベントを検証して aert_info 特性に追加します。
  4. siemplify.LOGGER.info(f"Events creating started for alert {alert_id}")
    try:
        if created_event is not None:
            alert_info.events.append(created_event)
        siemplify.LOGGER.info(f"Added Event {alert_id} to Alert {alert_id}")
    # Raise an exception if failed to process the event
    except Exception as e:
        siemplify.LOGGER.error(f"Failed to process event {alert_id}")
        siemplify.LOGGER.exception(e)
    return alert_info
    

メール本文の URL を検索する関数

メール本文の URL を検索する関数は、メール本文で URL をスキャンします。この関数を使用する手順は次のとおりです。

  1. 各メール メッセージで、プレーン テキスト コンテンツを含むメッセージ本文の部分を見つけます。
  2. def find_url_in_email_message_body(siemplify, email_messages_data_list):
        """
        Search for a url in the email body,
        """
        all_found_url_in_emails_body_list = []
        for message in email_messages_data_list:
            for part in message.walk():
                if part.get_content_maintype() == 'text\plain':
                    continue
                
    
  3. 本文に必要なコンテンツが含まれている場合は、email_message_body = part.get_payload() でこの情報を読み込み、正規表現を使用して URL を検索します。
  4. 
    URLS_REGEX=r"http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+"
    
  5. この例では、メール本文から URL を抽出します。
  6. email_message_body = part.get_payload()
    all_found_urls = re.findall(URLS_REGEX, str(email_message_body))
    for url in all_found_urls:
        if url not in all_found_url_in_emails_body_list:
            all_found_url_in_emails_body_list.append(url)
    siemplify.LOGGER.info(f"The URL found : {all_found_url_in_emails_body_list}") 
    return all_found_url_in_emails_body_list 
  7. コネクタコードを確認したら、選択した Gmail の受信トレイからプラットフォームにケースを取り込むようにコネクタを構成できます。

さらにサポートが必要な場合 コミュニティ メンバーや Google SecOps のプロフェッショナルから回答を得ることができます。