开发连接器
支持的平台:
Google SecOps
SOAR
本文档介绍了如何在集成开发环境 (IDE) 中通过创建集成并将其与连接器相关联来构建新连接器。
创建电子邮件连接器集成
如需创建电子邮件连接器集成,请按以下步骤操作:
- 在响应 > IDE 窗口中,点击 添加以添加新的 IDE 项。
- 选择集成,并将集成命名为
Email Connector
。 - 点击创建。集成会显示在侧边栏中,并带有默认图标。
- 依次点击 more_vert 配置自定义集成,然后定义以下设置:
- 说明
- 图标
- Python 依赖项
- 集成参数
- 点击保存。
创建电子邮件连接器
如需创建电子邮件连接器,请按以下步骤操作:
- 在响应 > IDE 窗口中,点击 添加以添加新的 IDE 项。
- 选择连接器单选按钮,并将连接器命名为
My Email Connector
。 - 在列表中,选择集成 Email Connector 以将连接器与集成相关联。
- 点击创建。
设置连接器参数
创建连接器后,设置连接器参数:
- 根据下表设置连接器参数:
- 输入以下字段详细信息:
- Product Field Name = device_product:确定要分配给提醒产品名称的原始字段值。在代码中找到相关字段,该字段定义为
Mail
(商品)。event["device_product"] = PRODUCT #The PRODUCT constant is `"Mail"
- 事件字段名称 = event_name:确定要分配给事件类型字段的原始字段值。在代码中找到定义为
Suspicious email
的相关字段。event["event_name"] = Suspicious email
参数 | 说明 | 类型 |
---|---|---|
Username |
必需。IMAP 用户名。连接器从中提取电子邮件并将其导入 Google SecOps 平台的电子邮件地址。默认值为 email@gmail.com 。 |
字符串 |
Password |
必需。连接器用于将电子邮件提取到 Google SecOps 平台中的电子邮件地址的互联网邮件访问协议 (IMAP) 密码。 | 密码 |
IMAP Port |
必需。IMAP 用于访问远程服务器上的电子邮件的特定网络端口。例如:993 (默认值)。 |
整数 |
IMAP Server Address |
必需。IMAP 账号的接收邮件服务器。例如,imap.gmail.com (默认值)。 |
字符串 |
Folder to check for emails |
可选。仅从指定文件夹提取电子邮件。例如:Inbox (默认值)。 |
inbox |
修改电子邮件连接器
如需修改电子邮件连接器的参数,请按以下步骤操作:
- 复制为 My Email Connector 创建的以下代码,将其粘贴到 IDE 中,然后按照说明操作:
from SiemplifyConnectors import SiemplifyConnectorExecution from SiemplifyConnectorsDataModel import AlertInfo from SiemplifyUtils import output_handler, convert_datetime_to_unix_time, convert_string_to_datetime import email, imaplib, sys, re # CONSTANTS CONNECTOR_NAME = "Mail" VENDOR = "Mail" PRODUCT = "Mail" DEFAULT_PRIORITY = 60 # Default is Medium RULE_GENERATOR_EXAMPLE = "Mail" DEFAULT_FOLDER_TO_CHECK_INBOX = "inbox" DEFAULT_MESSAGES_TO_READ_UNSEEN = "UNSEEN" URLS_REGEX = r"(?i)\b(?:http(?:s)?:\/\/)?(?:www\.)?[a-zA-Z0-9:%_\+~#=][a-zA-Z0-9:%\._\+~#=]{1,255}\.[a-z]{2,6}\b(?:[-a-zA-Z0-9@:%_\+.~#?&//=]*)" def create_alert(siemplify, alert_id, email_message_data, datetime_in_unix_time, created_event): """Returns an alert which is one event that contains one unread email message""" siemplify.LOGGER.info(f"Started processing Alert {alert_id}") create_event = None alert_info = AlertInfo() # Initializes alert_info alert_info.display_id = f"{alert_id}" # Each alert needs to have a unique id, otherwise it won't create a case with the same alert id. alert_info.ticket_id = f"{alert_id}" # In default, ticket_id = display_id. However, if for some reason the external alert id is different from the display_id, you can save the original external alert id in the "ticket_id" field. alert_info.name = email_message_data['Subject'] alert_info.rule_generator = RULE_GENERATOR_EXAMPLE # The name of the siem rule which causes the creation of the alert. alert_info.start_time = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime alert_info.end_time = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime alert_info.priority = 60 # Informative = -1,Low = 40,Medium = 60,High = 80,Critical = 100. alert_info.device_vendor = VENDOR # The field will be fetched from the Original Alert. If you build this alert manually, state the source vendor of the data. (ie: Microsoft, Mcafee) alert_info.device_product = PRODUCT # The field will be fetched from the Original Alert. If you build this alert manually, state the source product of the data. (ie: ActiveDirectory, AntiVirus) siemplify.LOGGER.info(f"Events creating started for alert {alert_id}") try: if created_event is not None: alert_info.events.append(created_event) siemplify.LOGGER.info(f"Added Event {alert_id} to Alert {alert_id}") # Raise an exception if failed to process the event except Exception as e: siemplify.LOGGER.error(f"Failed to process event {alert_id}") siemplify.LOGGER.exception(e) return alert_info def create_event(siemplify, alert_id, email_message_data, all_found_url_in_emails_body_list, datetime_in_unix_time): """Returns the digested data of a single unread email""" siemplify.LOGGER.info(f"--- Started processing Event: alert_id: {alert_id} | event_id: {alert_id}") event = {} event["StartTime"] = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime event["EndTime"] = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime event["event_name"] = "Suspicious email" event["device_product"] = PRODUCT # ie: "device_product" is the field name that describes the product the event originated from. event["Subject"] = email_message_data["Subject"] event["SourceUserName"] = email_message_data["From"] event["DestinationUserName"] = email_message_data["To"] event["found_url"] = ",".join(all_found_url_in_emails_body_list) siemplify.LOGGER.info(f"---Finished processing Event: alert_id: {alert_id} | event_id: {alert_id}") return event def find_url_in_email_message_body(siemplify, email_messages_data_list): """Search for a url in the email body""" all_found_url_in_emails_body_list = [] for message in email_messages_data_list: for part in message.walk(): if part.get_content_maintype() == 'text\plain': continue email_message_body = part.get_payload() all_found_urls = re.findall(URLS_REGEX, str(email_message_body)) for url in all_found_urls: if url not in all_found_url_in_emails_body_list: all_found_url_in_emails_body_list.append(url) def get_email_messages_data(imap_host, imap_port, username, password, folder_to_check): """Returns all unread email messages""" email_messages_data_list = [] # Login to email using 'imap' module mail = imaplib.IMAP4_SSL(imap_host, imap_port) mail.login(username, password) # Determining the default email folder to pull emails from - 'inbox' if folder_to_check is None: folder_to_check = DEFAULT_FOLDER_TO_CHECK_INBOX # Selecting the email folder to pull the data from mail.select(folder_to_check) # Storing the email message data result, data = mail.search(None, DEFAULT_MESSAGES_TO_READ_UNSEEN) # If there are several emails collected in the cycle it will split each # email message into a separate item in the list chosen_mailbox_items_list if len(data) > 0: chosen_mailbox_items_list = data[0].split() # Iterating each email message and appending to emails_messages_data_list for item in chosen_mailbox_items_list: typ, email_data = mail.fetch(item, '(RFC 822)') # Decoding from binary string to string raw_email = email_data[0][1].decode("utf-8") # Turning the email data into an email object email_message = email.message_from_string(raw_email) # Appending the email message data to email_messages_data_list email_messages_data_list.append(email_message) return email_messages_data_list @output_handler def main(is_test_run): alerts = [] # The main output of each connector run that contains the alerts data siemplify = SiemplifyConnectorExecution() # Siemplify main SDK wrapper siemplify.script_name = CONNECTOR_NAME # In case of running a test if (is_test_run): siemplify.LOGGER.info("This is an \"IDE Play Button\"\\\"Run Connector once\" test run") # Extracting the connector's Params username = siemplify.extract_connector_param(param_name="Username") password = siemplify.extract_connector_param(param_name="Password") imap_host = siemplify.extract_connector_param(param_name="IMAP Server Address") imap_port = siemplify.extract_connector_param(param_name="IMAP Port") folder_to_check = siemplify.extract_connector_param(param_name="Folder to check for emails") # Getting the digested email message data email_messages_data_list = get_email_messages_data(imap_host, imap_port, username, password, folder_to_check) # If the email_messages_data_list is not empty if len(email_messages_data_list) > 0: for message in email_messages_data_list: # Converting the email message datetime from string to unix time by SiemplifyUtils functions datetime_email_message = message['Date'] string_to_datetime = convert_string_to_datetime(datetime_email_message) datetime_in_unix_time = convert_datetime_to_unix_time(string_to_datetime) found_urls_in_email_body = find_url_in_email_message_body(siemplify, email_messages_data_list) # Getting the unique id of each email message and removing the suffix '@mail.gmail.com' from the Message-ID, Each alert id can be ingested to the system only once. alert_id = message['Message-ID'].replace('@mail.gmail.com','') # Creating the event by calling create_event() function created_event = create_event(siemplify, alert_id, message, found_urls_in_email_body, datetime_in_unix_time) # Creating the alert by calling create_alert() function created_alert = create_alert(siemplify, alert_id, message, datetime_in_unix_time, created_event) # Checking that the created_alert is not None if created_alert is not None: alerts.append(created_alert) siemplify.LOGGER.info(f'Added Alert {alert_id} to package results') # If the inbox for the user has no unread emails. else: siemplify.LOGGER.info(f'The inbox for user {username} has no unread emails') # Returning all the created alerts to the cases module in Siemplify siemplify.return_package(alerts) if __name__ == '__main__': # Connectors run in iterations. The interval is configurable from the ConnectorsScreen UI. is_test_run = not (len(sys.argv) < 2 or sys.argv[1] == 'True') main(is_test_run)
- 复制连接器的代码后,查看需要导入的模块,然后继续执行主函数。稍后将更详细地讨论从主函数调用的每种方法。
相关导入
Python 模块包含一组已定义和实现的功能、类或变量。如需实现以下所有函数,请将这些模块导入到脚本中:
from SiemplifyConnectors import SiemplifyConnectorExecution # This module is responsible for executing the connector from SiemplifyConnectorsDataModel import AlertInfo # The data model that contains the alert info class from SiemplifyUtils import output_handler, convert_datetime_to_unix_time, convert_string_to_datetime # The functions that convert time import email, imaplib, sys, re
主函数
Main 函数是脚本的入口点。Python 解释器会按顺序运行代码,并调用脚本中定义的每个方法。
- 提取连接器参数。使用
siemplify.extract_connector_param
函数提取连接器的每个配置参数(username
、password
、imap_host
、imap_port
、folder_to_check
)。# Extracting the connector's Params username = siemplify.extract_connector_param(param_name="Username") password = siemplify.extract_connector_param(param_name="Password") imap_host = siemplify.extract_connector_param(param_name="IMAP Server Address") imap_port = siemplify.extract_connector_param(param_name="IMAP Port") folder_to_check = siemplify.extract_connector_param(param_name="Folder to check for emails")
- 使用函数
get_email_messages_data (`imap_host`, `imap_port`, `username`, `password`, `folder_to_check)
收集未读电子邮件中的所有信息。# Getting the digested email message data email_messages_data_list = get_email_messages_data(imap_host, imap_port, username, password, folder_to_check)
- 收到电子邮件中的所有信息后,请确认这些信息已收集完毕,然后对每封电子邮件执行以下操作:
# If the email_messages_data_list is not empty if len(email_messages_data_list) > 0: for message in email_messages_data_list: # Converting the email message datetime from string to unix time by SiemplifyUtils functions
此代码通过datetime_email_message = message['Date']
提取消息日期,然后使用 Google SecOps 函数将此日期时间转换为 Unix 纪元时间:
string_to_datetime = convert_string_to_datetime(datetime_email_message) datetime_in_unix_time = convert_datetime_to_unix_time(string_to_datetime)
- 使用
find_url_in_email_message_body(siemplify_email_messages_data_list)
函数搜索电子邮件正文中的网址。如果找到网址,请使用我们剧本中的其他产品来检查该网址是否恶意。 - 提取每封电子邮件的唯一 ID,并将其分配给
alert_id
变量:# Getting the unique id of each email message and removing the suffix '@mail.gmail.com' from the Message-ID, Each alert id can be ingested to the system only once. alert_id = message['Message-ID'].replace('@mail.gmail.com','')
- 注入所有必要详细信息以将提醒注入到 Google SecOps 平台后,创建提醒和事件:
# Creating the event by calling create_event() function created_event = create_event(siemplify, alert_id, message, found_urls_in_email_body, datetime_in_unix_time) # Creating the alert by calling create_alert() function created_alert = create_alert(siemplify, alert_id, message, datetime_in_unix_time, created_event)
- 验证创建的提醒和创建的事件。验证后,将相应提醒添加到提醒列表中。
# Checking that the created_alert is not None if created_alert is not None: alerts.append(created_alert) siemplify.LOGGER.info(f"Added Alert {alert_id} to package results")
- 如果指定用户的收件箱中没有未读电子邮件,请添加以下代码:
else: siemplify.LOGGER.info(f"The inbox for user {username} has no unread emails")
- 将提醒列表返回给系统,然后每个提醒都会在支持请求队列中显示为一个支持请求:
# Returning all the created alerts to the cases module in Siemplify siemplify.return_package(alerts)
- 在连接器配置中设置的时间范围内运行
Main
函数:if __name__ == "__main__": # Connectors run in iterations. The interval is configurable from the Connectors UI. is_test_run = not (len(sys.argv) < 2 or sys.argv[1] == 'True') main(is_test_run)
found_urls_in_email_body = find_url_in_email_message_body(siemplify, email_messages_data_list)
获取未读电子邮件
获取未读电子邮件函数使用 Imap
和 Email
模块连接到电子邮件,并检索电子邮件的详细信息。它还会返回一个列表,其中包含所有未读电子邮件的所有信息。
- 在主类中,使用以下函数:
get_email_messages_data(imap_host, imap_port, username, password, folder_to_check)
。def get_email_messages_data(imap_host, imap_port, username, password, folder_to_check): """Returns all unread email messages""" email_messages_data_list = []
- 使用
imap module
连接到电子邮件:# Login to email using 'imap' module mail = imaplib.IMAP4_SSL(imap_host, imap_port) mail.login(username, password)
- 确定电子邮件中要检查未读邮件的文件夹。
在此示例中,您将从收件箱文件夹
(DEFAULT_FOLDER_TO_CHECK_INBOX = "inbox")
中提取电子邮件: - 收集所有未读消息 �3,然后将这些数据转换为列表:
DEFAULT_MESSAGES_TO_READ_UNSEEN = "UNSEEN"
# Storing the email message data result, data = mail.search(None, DEFAULT_MESSAGES_TO_READ_UNSEEN) # If there are several emails collected in the cycle it will split each # email message into a separate item in the list chosen_mailbox_items_list if len(data) > 0: chosen_mailbox_items_list = data[0].split() # Iterating each email message and appending to emails_messages_data_list for item in chosen_mailbox_items_list: typ, email_data = mail.fetch(item, '(RFC 822)') # Decoding from binary string to string raw_email = email_data[0][1].decode("utf-8") # Turning the email data into an email object email_message = email.message_from_string(raw_email) # Appending the email message data to email_messages_data_list email_messages_data_list.append(email_message) return email_messages_data_list
# Determining the default email folder to pull emails from - 'inbox' if folder_to_check is None: folder_to_check = DEFAULT_FOLDER_TO_CHECK_INBOX # Selecting the email folder to pull the data from mail.select(folder_to_check)
创建活动
创建活动函数通过将每个电子邮件消息组件分别与活动字段相关联来创建活动。
- 从主类中使用以下函数创建事件:
create_event(siemplify, alert_id, email_message_data, all_found_url_in_emails_body_list, datetime_in_unix_time)
-
为活动字段创建字典。必填字段为
event["StartTime"], event["EndTime"], event["event_name"] and event["device_product"]
: - 每条提醒都包含一个或多个事件。在此示例中,提醒包含单个事件:一封电子邮件。因此,在创建活动后,请创建包含所有活动信息的提醒。
def create_event(siemplify, alert_id, email_message_data, all_found_url_in_emails_body_list, datetime_in_unix_time): """Returns the digested data of a single unread email""" siemplify.LOGGER.info(f"--- Started processing Event: alert_id: {alert_id} | event_id: {alert_id}")
event = {} event["StartTime"] = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime event["EndTime"] = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime event["event_name"] = "Suspicious email" event["device_product"] = PRODUCT # ie: "device_product" is the field name that describes the product the event originated from. event["Subject"] = email_message_data["Subject"] event["SourceUserName"] = email_message_data["From"] event["DestinationUserName"] = email_message_data["To"] event["found_url"] = ",".join(all_found_url_in_emails_body_list) siemplify.LOGGER.info(f"---Finished processing Event: alert_id: {alert_id} | event_id: {alert_id}") return event
创建提醒信息并初始化提醒信息特征字段
此函数用于创建提醒。每个提醒都包含一个或多个事件。在这种情况下,每个提醒都包含一个事件,该事件基本上是一封电子邮件。
- 从主类创建提醒:
def create_alert(siemplify, alert_id, email_message_data,datetime_in_unix_time, created_event): """Returns an alert which is one event that contains one unread email message""" siemplify.LOGGER.info(f"-------------- Started processing Alert {alert_id}") create_event = None
- 创建
alert_info
实例并对其进行初始化:# Initializes the alert_info Characteristics Fields alert_info.display_id = f"{alert_id}" alert_info.ticket_id = f"{alert_id}" alert_info.name = email_message_data['Subject'] alert_info.rule_generator = RULE_GENERATOR_EXAMPLE alert_info.start_time = datetime_in_unix_time alert_info.end_time = datetime_in_unix_time alert_info.device_vendor = VENDOR alert_info.device_product = PRODUCT
- 创建提醒后,验证事件并将其附加到
aert_info
特征:
siemplify.LOGGER.info(f"Events creating started for alert {alert_id}") try: if created_event is not None: alert_info.events.append(created_event) siemplify.LOGGER.info(f"Added Event {alert_id} to Alert {alert_id}") # Raise an exception if failed to process the event except Exception as e: siemplify.LOGGER.error(f"Failed to process event {alert_id}") siemplify.LOGGER.exception(e) return alert_info
在电子邮件正文函数中查找网址
在电子邮件正文中查找网址函数会扫描电子邮件正文中的网址。如需使用此函数,请按以下步骤操作:
- 对于每封电子邮件,找到正文中包含纯文本内容的部分:
- 如果正文包含所需内容,请使用
email_message_body = part.get_payload()
加载此信息,并使用以下正则表达式搜索网址: - 此示例从电子邮件正文中提取网址:
- 查看连接器代码后,您可以配置连接器,以从所选 Gmail 收件箱中将支持请求注入到平台中。
def find_url_in_email_message_body(siemplify, email_messages_data_list):
"""
Search for a url in the email body,
"""
all_found_url_in_emails_body_list = []
for message in email_messages_data_list:
for part in message.walk():
if part.get_content_maintype() == 'text\plain':
continue
URLS_REGEX=r"http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+"
email_message_body = part.get_payload() all_found_urls = re.findall(URLS_REGEX, str(email_message_body)) for url in all_found_urls: if url not in all_found_url_in_emails_body_list: all_found_url_in_emails_body_list.append(url) siemplify.LOGGER.info(f"The URL found : {all_found_url_in_emails_body_list}") return all_found_url_in_emails_body_list
需要更多帮助?从社区成员和 Google SecOps 专业人士那里获得解答。