커넥터 개발
이 문서에서는 통합을 만든 다음 커넥터를 연결하여 통합 개발 환경 (IDE)에서 새 커넥터를 빌드하는 방법을 설명합니다.
이메일 커넥터 통합 만들기
이메일 커넥터 통합을 만들려면 다음 단계를 따르세요.
- 응답 > IDE 창에서 추가를 클릭하여 새 IDE 항목을 추가합니다.
- 통합을 선택하고 통합 이름을
Email Connector
로 지정합니다. - 만들기를 클릭합니다. 통합이 기본 아이콘과 함께 사이드바에 표시됩니다.
- more_vert 맞춤 통합 구성을 클릭하고 다음 설정을 정의합니다.
- 설명
- 아이콘
- Python 종속 항목
- 통합 매개변수
- 저장을 클릭합니다.
이메일 커넥터 만들기
이메일 커넥터를 만들려면 다음 단계를 따르세요.
- 응답 > IDE 창에서 추가를 클릭하여 새 IDE 항목을 추가합니다.
- 커넥터 라디오 버튼을 선택하고 커넥터 이름을
My Email Connector
로 지정합니다. - 목록에서 통합 이메일 커넥터를 선택하여 커넥터를 통합과 연결합니다.
- 만들기를 클릭합니다.
커넥터 매개변수 설정
커넥터를 만든 후 커넥터 매개변수를 설정합니다.
- 표에 따라 커넥터 매개변수를 설정합니다.
- 다음 필드 세부정보를 입력합니다.
- 제품 필드 이름 = device_product: 알림의 제품 이름에 할당할 원시 필드 값을 결정합니다. 코드에서
Mail
(제품)으로 정의된 관련 필드를 찾습니다.event["device_product"] = PRODUCT #The PRODUCT constant is `"Mail"
- 이벤트 필드 이름 = event_name: 이벤트 유형 필드에 할당할 원시 필드 값을 결정합니다. 코드에서
Suspicious email
로 정의된 관련 필드를 찾습니다.event["event_name"] = Suspicious email
매개변수 | 설명 | 유형 |
---|---|---|
Username |
필수. IMAP 사용자 이름입니다. 커넥터가 이메일을 Google SecOps 플랫폼으로 가져오는 이메일 주소입니다. 기본값은 email@gmail.com 입니다. |
문자열 |
Password |
필수. 커넥터가 이메일을 Google SecOps 플랫폼으로 가져오는 데 사용하는 이메일 주소의 인터넷 메시지 액세스 프로토콜 (IMAP) 비밀번호입니다. | 비밀번호 |
IMAP Port |
필수. IMAP에서 원격 서버의 이메일에 액세스하는 데 사용되는 특정 네트워크 포트입니다. 예를 들어 993 (기본값)입니다. |
int |
IMAP Server Address |
필수. IMAP 계정의 수신 메일 서버입니다. 예를 들어 imap.gmail.com (기본값)입니다. |
문자열 |
Folder to check for emails |
선택사항. 지정된 폴더의 이메일만 가져옵니다. 예를 들어 Inbox (기본값)입니다. |
inbox |
이메일 커넥터 수정
이메일 커넥터의 매개변수를 수정하려면 다음 단계를 따르세요.
- 내 이메일 커넥터를 위해 생성된 다음 코드를 복사하여 IDE에 붙여넣고 안내를 따릅니다.
from SiemplifyConnectors import SiemplifyConnectorExecution from SiemplifyConnectorsDataModel import AlertInfo from SiemplifyUtils import output_handler, convert_datetime_to_unix_time, convert_string_to_datetime import email, imaplib, sys, re # CONSTANTS CONNECTOR_NAME = "Mail" VENDOR = "Mail" PRODUCT = "Mail" DEFAULT_PRIORITY = 60 # Default is Medium RULE_GENERATOR_EXAMPLE = "Mail" DEFAULT_FOLDER_TO_CHECK_INBOX = "inbox" DEFAULT_MESSAGES_TO_READ_UNSEEN = "UNSEEN" URLS_REGEX = r"(?i)\b(?:http(?:s)?:\/\/)?(?:www\.)?[a-zA-Z0-9:%_\+~#=][a-zA-Z0-9:%\._\+~#=]{1,255}\.[a-z]{2,6}\b(?:[-a-zA-Z0-9@:%_\+.~#?&//=]*)" def create_alert(siemplify, alert_id, email_message_data, datetime_in_unix_time, created_event): """Returns an alert which is one event that contains one unread email message""" siemplify.LOGGER.info(f"Started processing Alert {alert_id}") create_event = None alert_info = AlertInfo() # Initializes alert_info alert_info.display_id = f"{alert_id}" # Each alert needs to have a unique id, otherwise it won't create a case with the same alert id. alert_info.ticket_id = f"{alert_id}" # In default, ticket_id = display_id. However, if for some reason the external alert id is different from the display_id, you can save the original external alert id in the "ticket_id" field. alert_info.name = email_message_data['Subject'] alert_info.rule_generator = RULE_GENERATOR_EXAMPLE # The name of the siem rule which causes the creation of the alert. alert_info.start_time = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime alert_info.end_time = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime alert_info.priority = 60 # Informative = -1,Low = 40,Medium = 60,High = 80,Critical = 100. alert_info.device_vendor = VENDOR # The field will be fetched from the Original Alert. If you build this alert manually, state the source vendor of the data. (ie: Microsoft, Mcafee) alert_info.device_product = PRODUCT # The field will be fetched from the Original Alert. If you build this alert manually, state the source product of the data. (ie: ActiveDirectory, AntiVirus) siemplify.LOGGER.info(f"Events creating started for alert {alert_id}") try: if created_event is not None: alert_info.events.append(created_event) siemplify.LOGGER.info(f"Added Event {alert_id} to Alert {alert_id}") # Raise an exception if failed to process the event except Exception as e: siemplify.LOGGER.error(f"Failed to process event {alert_id}") siemplify.LOGGER.exception(e) return alert_info def create_event(siemplify, alert_id, email_message_data, all_found_url_in_emails_body_list, datetime_in_unix_time): """Returns the digested data of a single unread email""" siemplify.LOGGER.info(f"--- Started processing Event: alert_id: {alert_id} | event_id: {alert_id}") event = {} event["StartTime"] = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime event["EndTime"] = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime event["event_name"] = "Suspicious email" event["device_product"] = PRODUCT # ie: "device_product" is the field name that describes the product the event originated from. event["Subject"] = email_message_data["Subject"] event["SourceUserName"] = email_message_data["From"] event["DestinationUserName"] = email_message_data["To"] event["found_url"] = ",".join(all_found_url_in_emails_body_list) siemplify.LOGGER.info(f"---Finished processing Event: alert_id: {alert_id} | event_id: {alert_id}") return event def find_url_in_email_message_body(siemplify, email_messages_data_list): """Search for a url in the email body""" all_found_url_in_emails_body_list = [] for message in email_messages_data_list: for part in message.walk(): if part.get_content_maintype() == 'text\plain': continue email_message_body = part.get_payload() all_found_urls = re.findall(URLS_REGEX, str(email_message_body)) for url in all_found_urls: if url not in all_found_url_in_emails_body_list: all_found_url_in_emails_body_list.append(url) def get_email_messages_data(imap_host, imap_port, username, password, folder_to_check): """Returns all unread email messages""" email_messages_data_list = [] # Login to email using 'imap' module mail = imaplib.IMAP4_SSL(imap_host, imap_port) mail.login(username, password) # Determining the default email folder to pull emails from - 'inbox' if folder_to_check is None: folder_to_check = DEFAULT_FOLDER_TO_CHECK_INBOX # Selecting the email folder to pull the data from mail.select(folder_to_check) # Storing the email message data result, data = mail.search(None, DEFAULT_MESSAGES_TO_READ_UNSEEN) # If there are several emails collected in the cycle it will split each # email message into a separate item in the list chosen_mailbox_items_list if len(data) > 0: chosen_mailbox_items_list = data[0].split() # Iterating each email message and appending to emails_messages_data_list for item in chosen_mailbox_items_list: typ, email_data = mail.fetch(item, '(RFC 822)') # Decoding from binary string to string raw_email = email_data[0][1].decode("utf-8") # Turning the email data into an email object email_message = email.message_from_string(raw_email) # Appending the email message data to email_messages_data_list email_messages_data_list.append(email_message) return email_messages_data_list @output_handler def main(is_test_run): alerts = [] # The main output of each connector run that contains the alerts data siemplify = SiemplifyConnectorExecution() # Siemplify main SDK wrapper siemplify.script_name = CONNECTOR_NAME # In case of running a test if (is_test_run): siemplify.LOGGER.info("This is an \"IDE Play Button\"\\\"Run Connector once\" test run") # Extracting the connector's Params username = siemplify.extract_connector_param(param_name="Username") password = siemplify.extract_connector_param(param_name="Password") imap_host = siemplify.extract_connector_param(param_name="IMAP Server Address") imap_port = siemplify.extract_connector_param(param_name="IMAP Port") folder_to_check = siemplify.extract_connector_param(param_name="Folder to check for emails") # Getting the digested email message data email_messages_data_list = get_email_messages_data(imap_host, imap_port, username, password, folder_to_check) # If the email_messages_data_list is not empty if len(email_messages_data_list) > 0: for message in email_messages_data_list: # Converting the email message datetime from string to unix time by SiemplifyUtils functions datetime_email_message = message['Date'] string_to_datetime = convert_string_to_datetime(datetime_email_message) datetime_in_unix_time = convert_datetime_to_unix_time(string_to_datetime) found_urls_in_email_body = find_url_in_email_message_body(siemplify, email_messages_data_list) # Getting the unique id of each email message and removing the suffix '@mail.gmail.com' from the Message-ID, Each alert id can be ingested to the system only once. alert_id = message['Message-ID'].replace('@mail.gmail.com','') # Creating the event by calling create_event() function created_event = create_event(siemplify, alert_id, message, found_urls_in_email_body, datetime_in_unix_time) # Creating the alert by calling create_alert() function created_alert = create_alert(siemplify, alert_id, message, datetime_in_unix_time, created_event) # Checking that the created_alert is not None if created_alert is not None: alerts.append(created_alert) siemplify.LOGGER.info(f'Added Alert {alert_id} to package results') # If the inbox for the user has no unread emails. else: siemplify.LOGGER.info(f'The inbox for user {username} has no unread emails') # Returning all the created alerts to the cases module in Siemplify siemplify.return_package(alerts) if __name__ == '__main__': # Connectors run in iterations. The interval is configurable from the ConnectorsScreen UI. is_test_run = not (len(sys.argv) < 2 or sys.argv[1] == 'True') main(is_test_run)
- 커넥터의 코드를 복사한 후 가져와야 하는 필수 모듈을 검토하고 기본 함수로 진행합니다. 기본 함수에서 호출되는 각 메서드는 나중에 더 자세히 설명합니다.
관련 가져오기
Python 모듈에는 정의되고 구현된 함수, 클래스 또는 변수 집합이 있습니다. 다음 함수를 모두 구현하려면 해당 모듈을 스크립트로 가져오세요.
from SiemplifyConnectors import SiemplifyConnectorExecution # This module is responsible for executing the connector from SiemplifyConnectorsDataModel import AlertInfo # The data model that contains the alert info class from SiemplifyUtils import output_handler, convert_datetime_to_unix_time, convert_string_to_datetime # The functions that convert time import email, imaplib, sys, re
기본 함수
Main 함수는 스크립트의 진입점입니다. Python 인터프리터는 코드를 순차적으로 실행하고 스크립트에 정의된 각 메서드를 호출합니다.
- 커넥터 매개변수를 추출합니다.
siemplify.extract_connector_param
함수를 사용하여 커넥터의 각 구성된 매개변수 (username
,password
,imap_host
,imap_port
,folder_to_check
)를 추출합니다.# Extracting the connector's Params username = siemplify.extract_connector_param(param_name="Username") password = siemplify.extract_connector_param(param_name="Password") imap_host = siemplify.extract_connector_param(param_name="IMAP Server Address") imap_port = siemplify.extract_connector_param(param_name="IMAP Port") folder_to_check = siemplify.extract_connector_param(param_name="Folder to check for emails")
get_email_messages_data (`imap_host`, `imap_port`, `username`, `password`, `folder_to_check)
함수를 사용하여 읽지 않은 이메일의 모든 정보를 수집해 줘.# Getting the digested email message data email_messages_data_list = get_email_messages_data(imap_host, imap_port, username, password, folder_to_check)
- 이메일에서 모든 정보를 받은 후 정보가 수집되었는지 확인하고 각 이메일에서 다음 작업을 실행합니다.
# If the email_messages_data_list is not empty if len(email_messages_data_list) > 0: for message in email_messages_data_list: # Converting the email message datetime from string to unix time by SiemplifyUtils functions
이 코드는datetime_email_message = message['Date']
로 메시지 날짜를 추출한 다음 Google SecOps 함수를 사용하여 이 날짜 시간을 Unix 에포크 시간으로 변환합니다.
string_to_datetime = convert_string_to_datetime(datetime_email_message) datetime_in_unix_time = convert_datetime_to_unix_time(string_to_datetime)
find_url_in_email_message_body(siemplify_email_messages_data_list)
함수를 사용하여 이메일 메시지 본문에서 URL을 검색합니다. URL이 발견되면 플레이북의 다른 제품을 사용하여 악성 여부를 확인합니다.- 각 이메일 메시지의 고유 ID를 추출하여
alert_id
변수에 할당합니다.# Getting the unique id of each email message and removing the suffix '@mail.gmail.com' from the Message-ID, Each alert id can be ingested to the system only once. alert_id = message['Message-ID'].replace('@mail.gmail.com','')
- Google SecOps 플랫폼에 알림을 수집하는 데 필요한 모든 세부정보를 추출한 후 알림과 이벤트를 만듭니다.
# Creating the event by calling create_event() function created_event = create_event(siemplify, alert_id, message, found_urls_in_email_body, datetime_in_unix_time) # Creating the alert by calling create_alert() function created_alert = create_alert(siemplify, alert_id, message, datetime_in_unix_time, created_event)
- 생성된 알림과 생성된 이벤트를 검증합니다. 유효성을 검사한 후 알림을 알림 목록에 추가합니다.
# Checking that the created_alert is not None if created_alert is not None: alerts.append(created_alert) siemplify.LOGGER.info(f"Added Alert {alert_id} to package results")
- 지정된 사용자의 받은편지함에 읽지 않은 이메일이 없는 시나리오에서는 다음 코드를 추가합니다.
else: siemplify.LOGGER.info(f"The inbox for user {username} has no unread emails")
- 알림 목록을 시스템에 반환하면 각 알림이 케이스 대기열에 케이스로 표시됩니다.
# Returning all the created alerts to the cases module in Siemplify siemplify.return_package(alerts)
- 커넥터 구성에서 설정한 시간 내에
Main
함수를 실행합니다.if __name__ == "__main__": # Connectors run in iterations. The interval is configurable from the Connectors UI. is_test_run = not (len(sys.argv) < 2 or sys.argv[1] == 'True') main(is_test_run)
found_urls_in_email_body = find_url_in_email_message_body(siemplify, email_messages_data_list)
읽지 않은 이메일 메시지 가져오기
읽지 않은 이메일 메시지 가져오기 함수는 Imap
및 Email
모듈을 사용하여 이메일에 연결하고 이메일 메시지 세부정보를 가져옵니다. 또한 읽지 않은 모든 이메일 메시지의 모든 정보가 포함된 목록을 반환합니다.
- 기본 클래스에서
get_email_messages_data(imap_host, imap_port, username, password, folder_to_check)
함수를 사용합니다.def get_email_messages_data(imap_host, imap_port, username, password, folder_to_check): """Returns all unread email messages""" email_messages_data_list = []
imap module
를 사용하여 이메일에 연결합니다.# Login to email using 'imap' module mail = imaplib.IMAP4_SSL(imap_host, imap_port) mail.login(username, password)
- 읽지 않은 메시지를 확인할 이메일의 폴더를 확인합니다.
이 예에서는 Inbox 폴더
(DEFAULT_FOLDER_TO_CHECK_INBOX = "inbox")
에서 이메일을 추출합니다. - 읽지 않은 메일을 모두 수집하고
DEFAULT_MESSAGES_TO_READ_UNSEEN = "UNSEEN"
이 데이터를 목록으로 변환합니다.# Storing the email message data result, data = mail.search(None, DEFAULT_MESSAGES_TO_READ_UNSEEN) # If there are several emails collected in the cycle it will split each # email message into a separate item in the list chosen_mailbox_items_list if len(data) > 0: chosen_mailbox_items_list = data[0].split() # Iterating each email message and appending to emails_messages_data_list for item in chosen_mailbox_items_list: typ, email_data = mail.fetch(item, '(RFC 822)') # Decoding from binary string to string raw_email = email_data[0][1].decode("utf-8") # Turning the email data into an email object email_message = email.message_from_string(raw_email) # Appending the email message data to email_messages_data_list email_messages_data_list.append(email_message) return email_messages_data_list
# Determining the default email folder to pull emails from - 'inbox' if folder_to_check is None: folder_to_check = DEFAULT_FOLDER_TO_CHECK_INBOX # Selecting the email folder to pull the data from mail.select(folder_to_check)
이벤트 만들기
이벤트 만들기 함수는 각 이메일 메시지 구성요소를 이벤트 필드에 각각 연결하여 이벤트를 만듭니다.
- 기본 클래스에서
create_event(siemplify, alert_id, email_message_data, all_found_url_in_emails_body_list, datetime_in_unix_time)
함수를 사용하여 이벤트를 만듭니다. -
이벤트 필드의 사전을 만듭니다. 필수 입력란은
event["StartTime"], event["EndTime"], event["event_name"] and event["device_product"]
입니다. - 각 알림에는 하나 이상의 이벤트가 포함됩니다. 이 예에서 알림에는 하나의 이벤트(이메일 메시지)가 포함되어 있습니다. 따라서 이벤트를 만든 후 모든 이벤트 정보가 포함된 알림을 만듭니다.
def create_event(siemplify, alert_id, email_message_data, all_found_url_in_emails_body_list, datetime_in_unix_time): """Returns the digested data of a single unread email""" siemplify.LOGGER.info(f"--- Started processing Event: alert_id: {alert_id} | event_id: {alert_id}")
event = {} event["StartTime"] = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime event["EndTime"] = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime event["event_name"] = "Suspicious email" event["device_product"] = PRODUCT # ie: "device_product" is the field name that describes the product the event originated from. event["Subject"] = email_message_data["Subject"] event["SourceUserName"] = email_message_data["From"] event["DestinationUserName"] = email_message_data["To"] event["found_url"] = ",".join(all_found_url_in_emails_body_list) siemplify.LOGGER.info(f"---Finished processing Event: alert_id: {alert_id} | event_id: {alert_id}") return event
알림 정보를 만들고 알림 정보 특성 필드를 초기화합니다.
이 함수는 알림을 만듭니다. 각 알림에는 하나 이상의 이벤트가 포함되어 있습니다. 이 경우 각 알림에는 기본적으로 하나의 이메일 메시지인 하나의 이벤트가 포함됩니다.
- 기본 클래스에서 알림을 만듭니다.
def create_alert(siemplify, alert_id, email_message_data,datetime_in_unix_time, created_event): """Returns an alert which is one event that contains one unread email message""" siemplify.LOGGER.info(f"-------------- Started processing Alert {alert_id}") create_event = None
alert_info
인스턴스를 만들고 초기화합니다.# Initializes the alert_info Characteristics Fields alert_info.display_id = f"{alert_id}" alert_info.ticket_id = f"{alert_id}" alert_info.name = email_message_data['Subject'] alert_info.rule_generator = RULE_GENERATOR_EXAMPLE alert_info.start_time = datetime_in_unix_time alert_info.end_time = datetime_in_unix_time alert_info.device_vendor = VENDOR alert_info.device_product = PRODUCT
- 알림을 만든 후 이벤트를 검증하고
aert_info
특성에 추가합니다.
siemplify.LOGGER.info(f"Events creating started for alert {alert_id}") try: if created_event is not None: alert_info.events.append(created_event) siemplify.LOGGER.info(f"Added Event {alert_id} to Alert {alert_id}") # Raise an exception if failed to process the event except Exception as e: siemplify.LOGGER.error(f"Failed to process event {alert_id}") siemplify.LOGGER.exception(e) return alert_info
이메일 본문 함수에서 URL 찾기
이메일 본문에서 URL 찾기 기능은 이메일 본문에서 URL을 검색합니다. 이 기능을 사용하려면 다음 단계를 따르세요.
- 각 이메일 메시지에서 일반 텍스트 콘텐츠가 포함된 메시지 본문 부분을 찾습니다.
- 본문에 필요한 콘텐츠가 포함되어 있으면
email_message_body = part.get_payload()
로 이 정보를 로드하고 정규식을 사용하여 URL을 검색합니다. - 이 예에서는 이메일 본문에서 URL을 추출합니다.
- 커넥터 코드를 검토한 후 선택한 Gmail 받은편지함에서 케이스를 플랫폼으로 수집하도록 커넥터를 구성할 수 있습니다.
def find_url_in_email_message_body(siemplify, email_messages_data_list):
"""
Search for a url in the email body,
"""
all_found_url_in_emails_body_list = []
for message in email_messages_data_list:
for part in message.walk():
if part.get_content_maintype() == 'text\plain':
continue
URLS_REGEX=r"http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+"
email_message_body = part.get_payload() all_found_urls = re.findall(URLS_REGEX, str(email_message_body)) for url in all_found_urls: if url not in all_found_url_in_emails_body_list: all_found_url_in_emails_body_list.append(url) siemplify.LOGGER.info(f"The URL found : {all_found_url_in_emails_body_list}") return all_found_url_in_emails_body_list
도움이 더 필요하신가요? 커뮤니티 회원 및 Google SecOps 전문가로부터 답변을 받으세요.