收集 Cisco AMP for Endpoints 記錄
本文說明如何使用 Amazon S3,將 Cisco AMP for Endpoints 記錄擷取至 Google Security Operations。剖析器會將原始 JSON 格式的記錄轉換為符合 Chronicle UDM 的結構化格式。這項服務會從巢狀 JSON 物件中擷取欄位,將這些欄位對應至 UDM 結構定義,識別事件類別,指派嚴重性等級,並最終產生統一的事件輸出內容,在符合特定條件時標示安全警示。
事前準備
- Google SecOps 執行個體
- Cisco AMP for Endpoints 控制台的特殊權限
- AWS 的具備權限存取權 (S3、IAM、Lambda、EventBridge)
收集 Cisco AMP for Endpoints 的必要條件 (ID、API 金鑰、機構 ID、權杖)
- 登入 Cisco AMP for Endpoints 主控台。
- 依序前往「帳戶」>「API 憑證」。
- 按一下「New API Credential」(新的 API 憑證),建立新的 API 金鑰和用戶端 ID。
- 提供下列設定詳細資料:
- 應用程式名稱:輸入名稱 (例如
Chronicle SecOps Integration
)。 - 範圍:如要輪詢基本事件,請選取「唯讀」;如要建立事件串流,請選取「讀取及寫入」。
- 應用程式名稱:輸入名稱 (例如
- 點選「建立」。
- 複製下列詳細資料並存放在安全位置:
- 第三方 API 用戶端 ID
- API 金鑰
- API 基準網址:視您所在的區域而定:
- 美國:
https://api.amp.cisco.com
- 歐盟:
https://api.eu.amp.cisco.com
- 亞太和日本地區:
https://api.apjc.amp.cisco.com
- 美國:
為 Google SecOps 設定 AWS S3 值區和 IAM
- 按照這份使用者指南建立 Amazon S3 bucket:建立 bucket
- 儲存 bucket 的「名稱」和「區域」,以供日後參考 (例如
cisco-amp-logs
)。 - 請按照這份使用者指南建立使用者:建立 IAM 使用者。
- 選取建立的「使用者」。
- 選取「安全憑證」分頁標籤。
- 在「Access Keys」部分中,按一下「Create Access Key」。
- 選取「第三方服務」做為「用途」。
- 點選「下一步」。
- 選用:新增說明標記。
- 按一下「建立存取金鑰」。
- 按一下「下載 CSV 檔案」,儲存「存取金鑰」和「私密存取金鑰」以供日後參考。
- 按一下 [完成]。
- 選取「權限」分頁標籤。
- 在「權限政策」部分中,按一下「新增權限」。
- 選取「新增權限」。
- 選取「直接附加政策」。
- 搜尋「AmazonS3FullAccess」AmazonS3FullAccess政策。
- 選取政策。
- 點選「下一步」。
- 按一下「Add permissions」。
設定 S3 上傳的身分與存取權管理政策和角色
- 在 AWS 控制台中,依序前往「IAM」>「Policies」(政策)。
- 按一下「建立政策」>「JSON」分頁。
輸入下列政策:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::cisco-amp-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::cisco-amp-logs/cisco-amp-events/state.json" } ] }
- 如果您輸入其他 bucket 名稱,請替換
cisco-amp-logs
。
- 如果您輸入其他 bucket 名稱,請替換
依序點選「下一步」>「建立政策」。
依序前往「IAM」>「Roles」>「Create role」>「AWS service」>「Lambda」。
附加新建立的政策。
為角色命名
cisco-amp-lambda-role
,然後按一下「建立角色」。
建立 Lambda 函式
- 在 AWS 控制台中,依序前往「Lambda」>「Functions」>「Create function」。
- 按一下「從頭開始撰寫」。
請提供下列設定詳細資料:
設定 值 名稱 cisco-amp-events-collector
執行階段 Python 3.13 架構 x86_64 執行角色 cisco-amp-lambda-role
建立函式後,開啟「程式碼」分頁,刪除存根並輸入下列程式碼 (
cisco-amp-events-collector.py
):import json import boto3 import urllib3 import base64 from datetime import datetime, timedelta import os import logging # Configure logging logger = logging.getLogger() logger.setLevel(logging.INFO) # AWS S3 client and HTTP pool manager s3_client = boto3.client('s3') http = urllib3.PoolManager() def lambda_handler(event, context): """ AWS Lambda handler to fetch Cisco AMP events and store them in S3 """ try: # Get environment variables s3_bucket = os.environ['S3_BUCKET'] s3_prefix = os.environ['S3_PREFIX'] state_key = os.environ['STATE_KEY'] api_client_id = os.environ['AMP_CLIENT_ID'] api_key = os.environ['AMP_API_KEY'] api_base = os.environ['API_BASE'] # Optional parameters page_size = int(os.environ.get('PAGE_SIZE', '500')) max_pages = int(os.environ.get('MAX_PAGES', '10')) logger.info(f"Starting Cisco AMP events collection for bucket: {s3_bucket}") # Get last run timestamp from state file last_timestamp = get_last_timestamp(s3_bucket, state_key) if not last_timestamp: last_timestamp = (datetime.utcnow() - timedelta(days=1)).isoformat() + 'Z' # Create Basic Auth header auth_header = base64.b64encode(f"{api_client_id}:{api_key}".encode()).decode() headers = { 'Authorization': f'Basic {auth_header}', 'Accept': 'application/json' } # Build initial API URL base_url = f"{api_base}/v1/events" next_url = f"{base_url}?limit={page_size}&start_date={last_timestamp}" all_events = [] page_count = 0 while next_url and page_count < max_pages: logger.info(f"Fetching page {page_count + 1} from: {next_url}") # Make API request using urllib3 response = http.request('GET', next_url, headers=headers, timeout=60) if response.status != 200: raise RuntimeError(f"API request failed: {response.status} {response.data[:256]!r}") data = json.loads(response.data.decode('utf-8')) # Extract events from response events = data.get('data', []) if events: all_events.extend(events) logger.info(f"Collected {len(events)} events from page {page_count + 1}") # Check for next page next_url = data.get('metadata', {}).get('links', {}).get('next') page_count += 1 else: logger.info("No events found on current page") break logger.info(f"Total events collected: {len(all_events)}") # Store events in S3 if any were collected if all_events: timestamp_str = datetime.utcnow().strftime('%Y%m%d_%H%M%S') s3_key = f"{s3_prefix}cisco_amp_events_{timestamp_str}.ndjson" # Convert events to NDJSON format (one JSON object per line) ndjson_content = 'n'.join(json.dumps(event) for event in all_events) # Upload to S3 s3_client.put_object( Bucket=s3_bucket, Key=s3_key, Body=ndjson_content.encode('utf-8'), ContentType='application/x-ndjson' ) logger.info(f"Uploaded {len(all_events)} events to s3://{s3_bucket}/{s3_key}") # Update state file with current timestamp current_timestamp = datetime.utcnow().isoformat() + 'Z' update_state(s3_bucket, state_key, current_timestamp) return { 'statusCode': 200, 'body': json.dumps({ 'message': 'Success', 'events_collected': len(all_events), 'pages_processed': page_count }) } except Exception as e: logger.error(f"Error in lambda_handler: {str(e)}") return { 'statusCode': 500, 'body': json.dumps({ 'error': str(e) }) } def get_last_timestamp(bucket, state_key): """ Get the last run timestamp from S3 state file """ try: response = s3_client.get_object(Bucket=bucket, Key=state_key) state_data = json.loads(response['Body'].read().decode('utf-8')) return state_data.get('last_timestamp') except s3_client.exceptions.NoSuchKey: logger.info("No state file found, starting from 24 hours ago") return None except Exception as e: logger.warning(f"Error reading state file: {str(e)}") return None def update_state(bucket, state_key, timestamp): """ Update the state file with the current timestamp """ try: state_data = { 'last_timestamp': timestamp, 'updated_at': datetime.utcnow().isoformat() + 'Z' } s3_client.put_object( Bucket=bucket, Key=state_key, Body=json.dumps(state_data).encode('utf-8'), ContentType='application/json' ) logger.info(f"Updated state file with timestamp: {timestamp}") except Exception as e: logger.error(f"Error updating state file: {str(e)}")
依序前往「設定」>「環境變數」。
依序點選「編輯」> 新增環境變數。
輸入下列環境變數,並將 換成您的值。
鍵 範例值 S3_BUCKET
cisco-amp-logs
S3_PREFIX
cisco-amp-events/
STATE_KEY
cisco-amp-events/state.json
AMP_CLIENT_ID
<your-client-id>
AMP_API_KEY
<your-api-key>
API_BASE
https://api.amp.cisco.com
(或您所在地區的網址)PAGE_SIZE
500
MAX_PAGES
10
建立函式後,請留在該函式的頁面 (或依序開啟「Lambda」>「Functions」>「cisco-amp-events-collector」)。
選取「設定」分頁標籤。
在「一般設定」面板中,按一下「編輯」。
將「Timeout」(逾時間隔) 變更為「5 minutes (300 seconds)」(5 分鐘 (300 秒)),然後按一下「Save」(儲存)。
建立 EventBridge 排程
- 依序前往「Amazon EventBridge」>「Scheduler」>「Create schedule」。
- 提供下列設定詳細資料:
- 週期性時間表:費率 (
1 hour
)。 - 目標:您的 Lambda 函式
cisco-amp-events-collector
。 - 名稱:
cisco-amp-events-collector-1h
。
- 週期性時間表:費率 (
- 按一下「建立時間表」。
選用:為 Google SecOps 建立唯讀 IAM 使用者和金鑰
- 依序前往 AWS 管理中心 > IAM >「Users」(使用者) >「Add users」(新增使用者)。
- 點選 [Add users] (新增使用者)。
- 提供下列設定詳細資料:
- 使用者:輸入
secops-reader
。 - 存取類型:選取「存取金鑰 - 程式輔助存取」。
- 使用者:輸入
- 按一下「建立使用者」。
- 附加最低讀取權限政策 (自訂):依序選取「Users」(使用者) >「secops-reader」>「Permissions」(權限) >「Add permissions」(新增權限) >「Attach policies directly」(直接附加政策) >「Create policy」(建立政策)。
在 JSON 編輯器中輸入下列政策:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::cisco-amp-logs/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::cisco-amp-logs" } ] }
將名稱設為
secops-reader-policy
。依序前往「建立政策」> 搜尋/選取 >「下一步」>「新增權限」。
依序前往「安全憑證」>「存取金鑰」>「建立存取金鑰」。
下載 CSV (這些值會輸入至動態饋給)。
在 Google SecOps 中設定資訊提供,以便擷取 Cisco AMP for Endpoints 記錄
- 依序前往「SIEM 設定」>「動態饋給」。
- 按一下「+ 新增動態消息」。
- 在「動態饋給名稱」欄位中輸入動態饋給名稱 (例如
Cisco AMP for Endpoints logs
)。 - 選取「Amazon S3 V2」做為「來源類型」。
- 選取「Cisco AMP」做為「記錄類型」。
- 點選「下一步」。
- 指定下列輸入參數的值:
- S3 URI:
s3://cisco-amp-logs/cisco-amp-events/
- 來源刪除選項:根據偏好設定選取刪除選項。
- 檔案存在時間上限:包含在過去天數內修改的檔案。預設值為 180 天。
- 存取金鑰 ID:具有 S3 值區存取權的使用者存取金鑰。
- 存取密鑰:具有 S3 bucket 存取權的使用者私密金鑰。
- 資產命名空間:資產命名空間。
- 擷取標籤:套用至這個動態饋給事件的標籤。
- S3 URI:
- 點選「下一步」。
- 在「完成」畫面中檢查新的動態饋給設定,然後按一下「提交」。
UDM 對應表
記錄欄位 | UDM 對應 | 邏輯 |
---|---|---|
有效 | read_only_udm.principal.asset.active | 直接對應自 computer.active |
connector_guid | read_only_udm.principal.asset.uuid | 直接對應自 computer.connector_guid |
日期 | read_only_udm.metadata.event_timestamp.seconds | 轉換為時間戳記後,直接從 date 對應 |
偵測 | read_only_udm.security_result.threat_name | 直接對應自 detection |
detection_id | read_only_udm.security_result.detection_fields.value | 直接對應自 detection_id |
disposition | read_only_udm.security_result.description | 直接對應自 file.disposition |
error.error_code | read_only_udm.security_result.detection_fields.value | 直接對應自 error.error_code |
error.description | read_only_udm.security_result.detection_fields.value | 直接對應自 error.description |
event_type | read_only_udm.metadata.product_event_type | 直接對應自 event_type |
event_type_id | read_only_udm.metadata.product_log_id | 直接對應自 event_type_id |
external_ip | read_only_udm.principal.asset.external_ip | 直接對應自 computer.external_ip |
file.file_name | read_only_udm.target.file.names | 直接對應自 file.file_name |
file.file_path | read_only_udm.target.file.full_path | 直接對應自 file.file_path |
file.identity.md5 | read_only_udm.security_result.about.file.md5 | 直接對應自 file.identity.md5 |
file.identity.md5 | read_only_udm.target.file.md5 | 直接對應自 file.identity.md5 |
file.identity.sha1 | read_only_udm.security_result.about.file.sha1 | 直接對應自 file.identity.sha1 |
file.identity.sha1 | read_only_udm.target.file.sha1 | 直接對應自 file.identity.sha1 |
file.identity.sha256 | read_only_udm.security_result.about.file.sha256 | 直接對應自 file.identity.sha256 |
file.identity.sha256 | read_only_udm.target.file.sha256 | 直接對應自 file.identity.sha256 |
file.parent.disposition | read_only_udm.target.resource.attribute.labels.value | 直接對應自 file.parent.disposition |
file.parent.file_name | read_only_udm.target.resource.attribute.labels.value | 直接對應自 file.parent.file_name |
file.parent.identity.md5 | read_only_udm.target.resource.attribute.labels.value | 直接對應自 file.parent.identity.md5 |
file.parent.identity.sha1 | read_only_udm.target.resource.attribute.labels.value | 直接對應自 file.parent.identity.sha1 |
file.parent.identity.sha256 | read_only_udm.target.resource.attribute.labels.value | 直接對應自 file.parent.identity.sha256 |
file.parent.process_id | read_only_udm.security_result.about.process.parent_process.pid | 直接對應自 file.parent.process_id |
file.parent.process_id | read_only_udm.target.process.parent_process.pid | 直接對應自 file.parent.process_id |
主機名稱 | read_only_udm.principal.asset.hostname | 直接對應自 computer.hostname |
主機名稱 | read_only_udm.target.hostname | 直接對應自 computer.hostname |
主機名稱 | read_only_udm.target.asset.hostname | 直接對應自 computer.hostname |
ip | read_only_udm.principal.asset.ip | 直接對應自 computer.network_addresses.ip |
ip | read_only_udm.principal.ip | 直接對應自 computer.network_addresses.ip |
ip | read_only_udm.security_result.about.ip | 直接對應自 computer.network_addresses.ip |
mac | read_only_udm.principal.mac | 直接對應自 computer.network_addresses.mac |
mac | read_only_udm.security_result.about.mac | 直接對應自 computer.network_addresses.mac |
嚴重性 | read_only_udm.security_result.severity | 根據下列邏輯從 severity 對應:-「中」->「MEDIUM」 -「高」或「重大」->「HIGH」 -「低」->「LOW」 - 其他 ->「UNKNOWN_SEVERITY」 |
時間戳記 | read_only_udm.metadata.event_timestamp.seconds | 直接對應自 timestamp |
使用者 | read_only_udm.security_result.about.user.user_display_name | 直接對應自 computer.user |
使用者 | read_only_udm.target.user.user_display_name | 直接對應自 computer.user |
vulnerabilities.cve | read_only_udm.extensions.vulns.vulnerabilities.cve_id | 直接對應自 vulnerabilities.cve |
vulnerabilities.name | read_only_udm.extensions.vulns.vulnerabilities.name | 直接對應自 vulnerabilities.name |
vulnerabilities.score | read_only_udm.extensions.vulns.vulnerabilities.cvss_base_score | 從 vulnerabilities.score 轉換為浮點數後直接對應 |
vulnerabilities.url | read_only_udm.extensions.vulns.vulnerabilities.vendor_knowledge_base_article_id | 直接對應自 vulnerabilities.url |
vulnerabilities.version | read_only_udm.extensions.vulns.vulnerabilities.cvss_version | 直接對應自 vulnerabilities.version |
is_alert | 如果 event_type 是下列其中一項,請設為 true:「偵測到威脅」、「防止遭到攻擊」、「執行惡意軟體」、「可能感染 Dropper」、「多個受感染的檔案」、「偵測到有安全漏洞的應用程式」,或如果 security_result.severity 是「HIGH」 |
|
is_significant | 如果 event_type 是下列其中一項,請設為 true:「偵測到威脅」、「防止遭到攻擊」、「執行惡意軟體」、「可能感染 Dropper」、「多個受感染的檔案」、「偵測到有安全漏洞的應用程式」,或如果 security_result.severity 是「HIGH」 |
|
read_only_udm.metadata.event_type | 根據 event_type 和 security_result.severity 值決定。- 如果 event_type 是下列其中一項:「已執行的惡意軟體」、「偵測到威脅」、「可能感染 Dropper」、「雲端召回偵測」、「偵測到惡意活動」、「防止遭到攻擊」、「多個受感染的檔案」、「雲端 IOC」、「系統程序保護」、「偵測到有安全漏洞的應用程式」、「威脅已隔離」、「執行遭封鎖」、「雲端召回隔離成功」、「雲端召回還原隔離失敗」、「雲端召回隔離嘗試失敗」、「隔離失敗」,則事件類型會設為「SCAN_FILE」。- If security_result.severity is "HIGH", then the event type is set to "SCAN_FILE". - 如果 has_principal 和 has_target 皆為 true,則事件類型會設為「SCAN_UNCATEGORIZED」。- 否則,事件類型會設為「GENERIC_EVENT」。 |
|
read_only_udm.metadata.log_type | 設為「CISCO_AMP」 | |
read_only_udm.metadata.vendor_name | 設為「CISCO_AMP」 | |
read_only_udm.security_result.about.file.full_path | 直接對應自 file.file_path |
|
read_only_udm.security_result.about.hostname | 直接對應自 computer.hostname |
|
read_only_udm.security_result.about.user.user_display_name | 直接對應自 computer.user |
|
read_only_udm.security_result.detection_fields.key | 將 detection_id 設為「偵測 ID」、error.error_code 設為「錯誤代碼」、error.description 設為「錯誤說明」、file.parent.disposition 設為「父項處置」、file.parent.file_name 設為「父項檔案名稱」、file.parent.identity.md5 設為「父項 MD5」、file.parent.identity.sha1 設為「父項 SHA1」,以及 file.parent.identity.sha256 設為「父項 SHA256」 |
|
read_only_udm.security_result.summary | 如果 event_type 是下列其中一項:「偵測到威脅」、「防止遭到攻擊」、「執行惡意軟體」、「可能感染 Dropper」、「多個受感染的檔案」、「偵測到有安全漏洞的應用程式」,或 security_result.severity 為「HIGH」,請將 event_type 設為 event_type |
|
read_only_udm.target.asset.ip | 直接對應自 computer.network_addresses.ip |
|
read_only_udm.target.resource.attribute.labels.key | 將 file.parent.disposition 設為「Parent Disposition」、將 file.parent.file_name 設為「Parent File Name」、將 file.parent.identity.md5 設為「Parent MD5」、將 file.parent.identity.sha1 設為「Parent SHA1」,以及將 file.parent.identity.sha256 設為「Parent SHA256」 |
|
timestamp.seconds | 轉換為時間戳記後,直接從 date 對應 |
還有其他問題嗎?向社群成員和 Google SecOps 專業人員尋求答案。