コネクタを開発する
このドキュメントでは、統合開発環境(IDE)でインテグレーションを作成し、コネクタをリンクして新しいコネクタを構築する方法について説明します。
メール コネクタ統合を作成する
メール コネクタ統合を作成する手順は次のとおりです。
- [Response > IDE] ウィンドウで、 [Add] をクリックして新しい IDE アイテムを追加します。
- [統合] を選択し、統合に
Email Connector
という名前を付けます。 - [作成] をクリックします。統合がデフォルトのアイコンとともにサイドバーに表示されます。
- more_vert アイコン [Configure Custom Integration] をクリックして、次の設定を定義します。
- 説明
- アイコン
- Python の依存関係
- 統合パラメータ
- [保存] をクリックします。
メール コネクタを作成する
メール コネクタを作成する手順は次のとおりです。
- [Response > IDE] ウィンドウで、 [Add] をクリックして新しい IDE アイテムを追加します。
- [Connector] ラジオボタンを選択し、コネクタに
My Email Connector
という名前を付けます。 - リストで、統合 [Email Connector] を選択して、コネクタを統合に関連付けます。
- [作成] をクリックします。
コネクタ パラメータを設定する
コネクタを作成したら、コネクタ パラメータを設定します。
- 次の表に従ってコネクタ パラメータを設定します。
- 次のフィールドの詳細を入力します。
- Product Field Name = device_product: アラートのプロダクト名に割り当てる未加工フィールドの値を決定します。コードの関連するフィールド(
Mail
(プロダクト)として定義されている)を見つけます。event["device_product"] = PRODUCT #The PRODUCT constant is `"Mail"
- Event Field Name = event_name: イベントタイプ フィールドに割り当てる未加工フィールドの値を決定します。コード内の関連フィールド(
Suspicious email
として定義)を見つけます。event["event_name"] = Suspicious email
パラメータ | 説明 | タイプ |
---|---|---|
Username |
必須。IMAP ユーザー名。コネクタが Google SecOps プラットフォームにメールを pull するメールアドレス。デフォルト値は email@gmail.com です。 |
文字列 |
Password |
必須。コネクタがメールを Google SecOps プラットフォームに取得するために使用するメールアドレスの Internet Message Access Protocol(IMAP)パスワード。 | パスワード |
IMAP Port |
必須。IMAP がリモート サーバーからメールにアクセスするために使用する特定のネットワーク ポート。例: 993 (デフォルト値)。 |
整数 |
IMAP Server Address |
必須。IMAP アカウントの受信メールサーバー。例: imap.gmail.com (デフォルト値)。 |
文字列 |
Folder to check for emails |
(省略可)指定したフォルダからのみメールを pull します。例: Inbox (デフォルト値)。 |
受信トレイ |
メール コネクタを編集する
メール コネクタのパラメータを編集する手順は次のとおりです。
- My Email Connector 用に作成された次のコードをコピーして IDE に貼り付け、次の手順に沿って操作します。
from SiemplifyConnectors import SiemplifyConnectorExecution from SiemplifyConnectorsDataModel import AlertInfo from SiemplifyUtils import output_handler, convert_datetime_to_unix_time, convert_string_to_datetime import email, imaplib, sys, re # CONSTANTS CONNECTOR_NAME = "Mail" VENDOR = "Mail" PRODUCT = "Mail" DEFAULT_PRIORITY = 60 # Default is Medium RULE_GENERATOR_EXAMPLE = "Mail" DEFAULT_FOLDER_TO_CHECK_INBOX = "inbox" DEFAULT_MESSAGES_TO_READ_UNSEEN = "UNSEEN" URLS_REGEX = r"(?i)\b(?:http(?:s)?:\/\/)?(?:www\.)?[a-zA-Z0-9:%_\+~#=][a-zA-Z0-9:%\._\+~#=]{1,255}\.[a-z]{2,6}\b(?:[-a-zA-Z0-9@:%_\+.~#?&//=]*)" def create_alert(siemplify, alert_id, email_message_data, datetime_in_unix_time, created_event): """Returns an alert which is one event that contains one unread email message""" siemplify.LOGGER.info(f"Started processing Alert {alert_id}") create_event = None alert_info = AlertInfo() # Initializes alert_info alert_info.display_id = f"{alert_id}" # Each alert needs to have a unique id, otherwise it won't create a case with the same alert id. alert_info.ticket_id = f"{alert_id}" # In default, ticket_id = display_id. However, if for some reason the external alert id is different from the display_id, you can save the original external alert id in the "ticket_id" field. alert_info.name = email_message_data['Subject'] alert_info.rule_generator = RULE_GENERATOR_EXAMPLE # The name of the siem rule which causes the creation of the alert. alert_info.start_time = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime alert_info.end_time = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime alert_info.priority = 60 # Informative = -1,Low = 40,Medium = 60,High = 80,Critical = 100. alert_info.device_vendor = VENDOR # The field will be fetched from the Original Alert. If you build this alert manually, state the source vendor of the data. (ie: Microsoft, Mcafee) alert_info.device_product = PRODUCT # The field will be fetched from the Original Alert. If you build this alert manually, state the source product of the data. (ie: ActiveDirectory, AntiVirus) siemplify.LOGGER.info(f"Events creating started for alert {alert_id}") try: if created_event is not None: alert_info.events.append(created_event) siemplify.LOGGER.info(f"Added Event {alert_id} to Alert {alert_id}") # Raise an exception if failed to process the event except Exception as e: siemplify.LOGGER.error(f"Failed to process event {alert_id}") siemplify.LOGGER.exception(e) return alert_info def create_event(siemplify, alert_id, email_message_data, all_found_url_in_emails_body_list, datetime_in_unix_time): """Returns the digested data of a single unread email""" siemplify.LOGGER.info(f"--- Started processing Event: alert_id: {alert_id} | event_id: {alert_id}") event = {} event["StartTime"] = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime event["EndTime"] = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime event["event_name"] = "Suspicious email" event["device_product"] = PRODUCT # ie: "device_product" is the field name that describes the product the event originated from. event["Subject"] = email_message_data["Subject"] event["SourceUserName"] = email_message_data["From"] event["DestinationUserName"] = email_message_data["To"] event["found_url"] = ",".join(all_found_url_in_emails_body_list) siemplify.LOGGER.info(f"---Finished processing Event: alert_id: {alert_id} | event_id: {alert_id}") return event def find_url_in_email_message_body(siemplify, email_messages_data_list): """Search for a url in the email body""" all_found_url_in_emails_body_list = [] for message in email_messages_data_list: for part in message.walk(): if part.get_content_maintype() == 'text\plain': continue email_message_body = part.get_payload() all_found_urls = re.findall(URLS_REGEX, str(email_message_body)) for url in all_found_urls: if url not in all_found_url_in_emails_body_list: all_found_url_in_emails_body_list.append(url) def get_email_messages_data(imap_host, imap_port, username, password, folder_to_check): """Returns all unread email messages""" email_messages_data_list = [] # Login to email using 'imap' module mail = imaplib.IMAP4_SSL(imap_host, imap_port) mail.login(username, password) # Determining the default email folder to pull emails from - 'inbox' if folder_to_check is None: folder_to_check = DEFAULT_FOLDER_TO_CHECK_INBOX # Selecting the email folder to pull the data from mail.select(folder_to_check) # Storing the email message data result, data = mail.search(None, DEFAULT_MESSAGES_TO_READ_UNSEEN) # If there are several emails collected in the cycle it will split each # email message into a separate item in the list chosen_mailbox_items_list if len(data) > 0: chosen_mailbox_items_list = data[0].split() # Iterating each email message and appending to emails_messages_data_list for item in chosen_mailbox_items_list: typ, email_data = mail.fetch(item, '(RFC 822)') # Decoding from binary string to string raw_email = email_data[0][1].decode("utf-8") # Turning the email data into an email object email_message = email.message_from_string(raw_email) # Appending the email message data to email_messages_data_list email_messages_data_list.append(email_message) return email_messages_data_list @output_handler def main(is_test_run): alerts = [] # The main output of each connector run that contains the alerts data siemplify = SiemplifyConnectorExecution() # Siemplify main SDK wrapper siemplify.script_name = CONNECTOR_NAME # In case of running a test if (is_test_run): siemplify.LOGGER.info("This is an \"IDE Play Button\"\\\"Run Connector once\" test run") # Extracting the connector's Params username = siemplify.extract_connector_param(param_name="Username") password = siemplify.extract_connector_param(param_name="Password") imap_host = siemplify.extract_connector_param(param_name="IMAP Server Address") imap_port = siemplify.extract_connector_param(param_name="IMAP Port") folder_to_check = siemplify.extract_connector_param(param_name="Folder to check for emails") # Getting the digested email message data email_messages_data_list = get_email_messages_data(imap_host, imap_port, username, password, folder_to_check) # If the email_messages_data_list is not empty if len(email_messages_data_list) > 0: for message in email_messages_data_list: # Converting the email message datetime from string to unix time by SiemplifyUtils functions datetime_email_message = message['Date'] string_to_datetime = convert_string_to_datetime(datetime_email_message) datetime_in_unix_time = convert_datetime_to_unix_time(string_to_datetime) found_urls_in_email_body = find_url_in_email_message_body(siemplify, email_messages_data_list) # Getting the unique id of each email message and removing the suffix '@mail.gmail.com' from the Message-ID, Each alert id can be ingested to the system only once. alert_id = message['Message-ID'].replace('@mail.gmail.com','') # Creating the event by calling create_event() function created_event = create_event(siemplify, alert_id, message, found_urls_in_email_body, datetime_in_unix_time) # Creating the alert by calling create_alert() function created_alert = create_alert(siemplify, alert_id, message, datetime_in_unix_time, created_event) # Checking that the created_alert is not None if created_alert is not None: alerts.append(created_alert) siemplify.LOGGER.info(f'Added Alert {alert_id} to package results') # If the inbox for the user has no unread emails. else: siemplify.LOGGER.info(f'The inbox for user {username} has no unread emails') # Returning all the created alerts to the cases module in Siemplify siemplify.return_package(alerts) if __name__ == '__main__': # Connectors run in iterations. The interval is configurable from the ConnectorsScreen UI. is_test_run = not (len(sys.argv) < 2 or sys.argv[1] == 'True') main(is_test_run)
- コネクタのコードをコピーしたら、インポートする必要なモジュールを確認し、メイン関数に進みます。メイン関数から呼び出される各メソッドについては、後で詳しく説明します。
関連するインポート
Python モジュールには、一連の関数、クラス、変数が定義、実装されます。次のすべての関数を実装するには、これらのモジュールをスクリプトにインポートします。
from SiemplifyConnectors import SiemplifyConnectorExecution # This module is responsible for executing the connector from SiemplifyConnectorsDataModel import AlertInfo # The data model that contains the alert info class from SiemplifyUtils import output_handler, convert_datetime_to_unix_time, convert_string_to_datetime # The functions that convert time import email, imaplib, sys, re
メイン関数
Main 関数は、スクリプトのエントリ ポイントです。Python インタープリタはコードを順番に実行し、スクリプトで定義された各メソッドを呼び出します。
- コネクタ パラメータを抽出します。
siemplify.extract_connector_param
関数を使用して、コネクタ(username
、password
、imap_host
、imap_port
、folder_to_check
)用に構成された各パラメータを抽出します。# Extracting the connector's Params username = siemplify.extract_connector_param(param_name="Username") password = siemplify.extract_connector_param(param_name="Password") imap_host = siemplify.extract_connector_param(param_name="IMAP Server Address") imap_port = siemplify.extract_connector_param(param_name="IMAP Port") folder_to_check = siemplify.extract_connector_param(param_name="Folder to check for emails")
- 関数
get_email_messages_data (`imap_host`, `imap_port`, `username`, `password`, `folder_to_check)
を使用して、未読メールからすべての情報を収集します。# Getting the digested email message data email_messages_data_list = get_email_messages_data(imap_host, imap_port, username, password, folder_to_check)
- メールからすべての情報を受け取ったら、情報が収集されたことを確認し、各メールに対して次の処理を行います。
# If the email_messages_data_list is not empty if len(email_messages_data_list) > 0: for message in email_messages_data_list: # Converting the email message datetime from string to unix time by SiemplifyUtils functions
このコードでは、datetime_email_message = message['Date']
によってメッセージの日付を抽出し、Google SecOps 関数を使用して、この日時を Unix エポック時間に変換します。
string_to_datetime = convert_string_to_datetime(datetime_email_message) datetime_in_unix_time = convert_datetime_to_unix_time(string_to_datetime)
find_url_in_email_message_body(siemplify_email_messages_data_list)
関数を使用して、メール メッセージ本文の URL を検索します。URL が見つかった場合は、ハンドブック内の他のプロダクトを使用して、その URL が悪意のあるものかどうかを確認します。- 各メール メッセージの一意の ID を抽出し、
alert_id
変数に割り当てます。# Getting the unique id of each email message and removing the suffix '@mail.gmail.com' from the Message-ID, Each alert id can be ingested to the system only once. alert_id = message['Message-ID'].replace('@mail.gmail.com','')
- アラートを Google SecOps プラットフォームに取り込むために必要な詳細情報をすべて抽出したら、アラートとイベントを作成します。
# Creating the event by calling create_event() function created_event = create_event(siemplify, alert_id, message, found_urls_in_email_body, datetime_in_unix_time) # Creating the alert by calling create_alert() function created_alert = create_alert(siemplify, alert_id, message, datetime_in_unix_time, created_event)
- 作成されたアラートと作成されたイベントを検証します。検証後、アラートをアラート リストに追加します。
# Checking that the created_alert is not None if created_alert is not None: alerts.append(created_alert) siemplify.LOGGER.info(f"Added Alert {alert_id} to package results")
- ユーザーの受信トレイに未読メールがない場合は、次のコードを追加します。
else: siemplify.LOGGER.info(f"The inbox for user {username} has no unread emails")
- アラートリストをシステムに返し、各アラートがケースキューにケースとして表示されます。
# Returning all the created alerts to the cases module in Siemplify siemplify.return_package(alerts)
- コネクタ構成で設定した時間内に
Main
関数を実行します。if __name__ == "__main__": # Connectors run in iterations. The interval is configurable from the Connectors UI. is_test_run = not (len(sys.argv) < 2 or sys.argv[1] == 'True') main(is_test_run)
found_urls_in_email_body = find_url_in_email_message_body(siemplify, email_messages_data_list)
未読メール メッセージを取得する
未読メール メッセージの取得関数は、Imap
モジュールと Email
モジュールを使用してメールに接続し、メール メッセージの詳細を取得します。また、すべての未読メール メッセージのすべての情報を含むリストも返します。
- main クラスから、関数
get_email_messages_data(imap_host, imap_port, username, password, folder_to_check)
を使用します。def get_email_messages_data(imap_host, imap_port, username, password, folder_to_check): """Returns all unread email messages""" email_messages_data_list = []
imap module
を使用してメールに接続します。# Login to email using 'imap' module mail = imaplib.IMAP4_SSL(imap_host, imap_port) mail.login(username, password)
- メール内のフォルダを特定して、未読メッセージを確認します。
この例では、Inbox フォルダ
(DEFAULT_FOLDER_TO_CHECK_INBOX = "inbox")
からメールを抽出します。 - 未読のメッセージをすべて収集し
DEFAULT_MESSAGES_TO_READ_UNSEEN = "UNSEEN"
、このデータをリストに変換します。# Storing the email message data result, data = mail.search(None, DEFAULT_MESSAGES_TO_READ_UNSEEN) # If there are several emails collected in the cycle it will split each # email message into a separate item in the list chosen_mailbox_items_list if len(data) > 0: chosen_mailbox_items_list = data[0].split() # Iterating each email message and appending to emails_messages_data_list for item in chosen_mailbox_items_list: typ, email_data = mail.fetch(item, '(RFC 822)') # Decoding from binary string to string raw_email = email_data[0][1].decode("utf-8") # Turning the email data into an email object email_message = email.message_from_string(raw_email) # Appending the email message data to email_messages_data_list email_messages_data_list.append(email_message) return email_messages_data_list
# Determining the default email folder to pull emails from - 'inbox' if folder_to_check is None: folder_to_check = DEFAULT_FOLDER_TO_CHECK_INBOX # Selecting the email folder to pull the data from mail.select(folder_to_check)
イベントを作成する
イベントを作成する関数は、各メール メッセージ コンポーネントをイベント フィールドにそれぞれ関連付けてイベントを作成します。
- main クラスから、関数
create_event(siemplify, alert_id, email_message_data, all_found_url_in_emails_body_list, datetime_in_unix_time)
を使用してイベントを作成します。 -
イベント フィールドの辞書を作成します。必須フィールドは
event["StartTime"], event["EndTime"], event["event_name"] and event["device_product"]
です。 - 各アラートには 1 つ以上のイベントが含まれます。この例では、アラートに 1 つのイベント(1 つのメール メッセージ)が含まれています。したがって、イベントを作成したら、すべてのイベント情報を含むアラートを作成します。
def create_event(siemplify, alert_id, email_message_data, all_found_url_in_emails_body_list, datetime_in_unix_time): """Returns the digested data of a single unread email""" siemplify.LOGGER.info(f"--- Started processing Event: alert_id: {alert_id} | event_id: {alert_id}")
event = {} event["StartTime"] = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime event["EndTime"] = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime event["event_name"] = "Suspicious email" event["device_product"] = PRODUCT # ie: "device_product" is the field name that describes the product the event originated from. event["Subject"] = email_message_data["Subject"] event["SourceUserName"] = email_message_data["From"] event["DestinationUserName"] = email_message_data["To"] event["found_url"] = ",".join(all_found_url_in_emails_body_list) siemplify.LOGGER.info(f"---Finished processing Event: alert_id: {alert_id} | event_id: {alert_id}") return event
アラート情報を作成し、アラート情報特性フィールドを初期化する
この関数はアラートを作成します。各アラートには 1 つ以上のイベントが含まれています。この場合、各アラートには 1 つのイベント(基本的には 1 つのメール メッセージ)が含まれます。
- main クラスからアラートを作成します。
def create_alert(siemplify, alert_id, email_message_data,datetime_in_unix_time, created_event): """Returns an alert which is one event that contains one unread email message""" siemplify.LOGGER.info(f"-------------- Started processing Alert {alert_id}") create_event = None
alert_info
インスタンスを作成して初期化します。# Initializes the alert_info Characteristics Fields alert_info.display_id = f"{alert_id}" alert_info.ticket_id = f"{alert_id}" alert_info.name = email_message_data['Subject'] alert_info.rule_generator = RULE_GENERATOR_EXAMPLE alert_info.start_time = datetime_in_unix_time alert_info.end_time = datetime_in_unix_time alert_info.device_vendor = VENDOR alert_info.device_product = PRODUCT
- アラートを作成したら、イベントを検証して
aert_info
特性に追加します。
siemplify.LOGGER.info(f"Events creating started for alert {alert_id}") try: if created_event is not None: alert_info.events.append(created_event) siemplify.LOGGER.info(f"Added Event {alert_id} to Alert {alert_id}") # Raise an exception if failed to process the event except Exception as e: siemplify.LOGGER.error(f"Failed to process event {alert_id}") siemplify.LOGGER.exception(e) return alert_info
メール本文の URL を検索する関数
メール本文の URL を検索する関数は、メール本文で URL をスキャンします。この関数を使用する手順は次のとおりです。
- 各メール メッセージで、プレーン テキスト コンテンツを含むメッセージ本文の部分を見つけます。
- 本文に必要なコンテンツが含まれている場合は、
email_message_body = part.get_payload()
でこの情報を読み込み、正規表現を使用して URL を検索します。 - この例では、メール本文から URL を抽出します。
- コネクタコードを確認したら、選択した Gmail の受信トレイからプラットフォームにケースを取り込むようにコネクタを構成できます。
def find_url_in_email_message_body(siemplify, email_messages_data_list):
"""
Search for a url in the email body,
"""
all_found_url_in_emails_body_list = []
for message in email_messages_data_list:
for part in message.walk():
if part.get_content_maintype() == 'text\plain':
continue
URLS_REGEX=r"http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+"
email_message_body = part.get_payload() all_found_urls = re.findall(URLS_REGEX, str(email_message_body)) for url in all_found_urls: if url not in all_found_url_in_emails_body_list: all_found_url_in_emails_body_list.append(url) siemplify.LOGGER.info(f"The URL found : {all_found_url_in_emails_body_list}") return all_found_url_in_emails_body_list
さらにサポートが必要な場合 コミュニティ メンバーや Google SecOps のプロフェッショナルから回答を得ることができます。