收集 Cisco AMP for Endpoints 記錄

支援的國家/地區:

本文說明如何使用 Amazon S3,將 Cisco AMP for Endpoints 記錄擷取至 Google Security Operations。剖析器會將原始 JSON 格式的記錄轉換為符合 Chronicle UDM 的結構化格式。這項服務會從巢狀 JSON 物件中擷取欄位,將這些欄位對應至 UDM 結構定義,識別事件類別,指派嚴重性等級,並最終產生統一的事件輸出內容,在符合特定條件時標示安全警示。

事前準備

  • Google SecOps 執行個體
  • Cisco AMP for Endpoints 控制台的特殊權限
  • AWS 的具備權限存取權 (S3、IAM、Lambda、EventBridge)

收集 Cisco AMP for Endpoints 的必要條件 (ID、API 金鑰、機構 ID、權杖)

  1. 登入 Cisco AMP for Endpoints 主控台
  2. 依序前往「帳戶」>「API 憑證」
  3. 按一下「New API Credential」(新的 API 憑證),建立新的 API 金鑰和用戶端 ID。
  4. 提供下列設定詳細資料:
    • 應用程式名稱:輸入名稱 (例如 Chronicle SecOps Integration)。
    • 範圍:如要輪詢基本事件,請選取「唯讀」;如要建立事件串流,請選取「讀取及寫入」
  5. 點選「建立」
  6. 複製下列詳細資料並存放在安全位置:
    • 第三方 API 用戶端 ID
    • API 金鑰
    • API 基準網址:視您所在的區域而定:
      • 美國:https://api.amp.cisco.com
      • 歐盟:https://api.eu.amp.cisco.com
      • 亞太和日本地區:https://api.apjc.amp.cisco.com

為 Google SecOps 設定 AWS S3 值區和 IAM

  1. 按照這份使用者指南建立 Amazon S3 bucket建立 bucket
  2. 儲存 bucket 的「名稱」和「區域」,以供日後參考 (例如 cisco-amp-logs)。
  3. 請按照這份使用者指南建立使用者建立 IAM 使用者
  4. 選取建立的「使用者」
  5. 選取「安全憑證」分頁標籤。
  6. 在「Access Keys」部分中,按一下「Create Access Key」
  7. 選取「第三方服務」做為「用途」
  8. 點選「下一步」
  9. 選用:新增說明標記。
  10. 按一下「建立存取金鑰」
  11. 按一下「下載 CSV 檔案」,儲存「存取金鑰」和「私密存取金鑰」以供日後參考。
  12. 按一下 [完成]
  13. 選取「權限」分頁標籤。
  14. 在「權限政策」部分中,按一下「新增權限」
  15. 選取「新增權限」
  16. 選取「直接附加政策」
  17. 搜尋「AmazonS3FullAccess」AmazonS3FullAccess政策。
  18. 選取政策。
  19. 點選「下一步」
  20. 按一下「Add permissions」。

設定 S3 上傳的身分與存取權管理政策和角色

  1. AWS 控制台中,依序前往「IAM」>「Policies」(政策)
  2. 按一下「建立政策」>「JSON」分頁
  3. 輸入下列政策:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::cisco-amp-logs/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": "s3:GetObject",
          "Resource": "arn:aws:s3:::cisco-amp-logs/cisco-amp-events/state.json"
        }
      ]
    }
    
    • 如果您輸入其他 bucket 名稱,請替換 cisco-amp-logs
  4. 依序點選「下一步」>「建立政策」

  5. 依序前往「IAM」>「Roles」>「Create role」>「AWS service」>「Lambda」。

  6. 附加新建立的政策。

  7. 為角色命名 cisco-amp-lambda-role,然後按一下「建立角色」

建立 Lambda 函式

  1. AWS 控制台中,依序前往「Lambda」>「Functions」>「Create function」
  2. 按一下「從頭開始撰寫」
  3. 請提供下列設定詳細資料:

    設定
    名稱 cisco-amp-events-collector
    執行階段 Python 3.13
    架構 x86_64
    執行角色 cisco-amp-lambda-role
  4. 建立函式後,開啟「程式碼」分頁,刪除存根並輸入下列程式碼 (cisco-amp-events-collector.py):

    import json
    import boto3
    import urllib3
    import base64
    from datetime import datetime, timedelta
    import os
    import logging
    
    # Configure logging
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    
    # AWS S3 client and HTTP pool manager
    s3_client = boto3.client('s3')
    http = urllib3.PoolManager()
    
    def lambda_handler(event, context):
        """
        AWS Lambda handler to fetch Cisco AMP events and store them in S3
        """
    
        try:
            # Get environment variables
            s3_bucket = os.environ['S3_BUCKET']
            s3_prefix = os.environ['S3_PREFIX']
            state_key = os.environ['STATE_KEY']
            api_client_id = os.environ['AMP_CLIENT_ID']
            api_key = os.environ['AMP_API_KEY']
            api_base = os.environ['API_BASE']
    
            # Optional parameters
            page_size = int(os.environ.get('PAGE_SIZE', '500'))
            max_pages = int(os.environ.get('MAX_PAGES', '10'))
    
            logger.info(f"Starting Cisco AMP events collection for bucket: {s3_bucket}")
    
            # Get last run timestamp from state file
            last_timestamp = get_last_timestamp(s3_bucket, state_key)
            if not last_timestamp:
                last_timestamp = (datetime.utcnow() - timedelta(days=1)).isoformat() + 'Z'
    
            # Create Basic Auth header
            auth_header = base64.b64encode(f"{api_client_id}:{api_key}".encode()).decode()
            headers = {
                'Authorization': f'Basic {auth_header}',
                'Accept': 'application/json'
            }
    
            # Build initial API URL
            base_url = f"{api_base}/v1/events"
            next_url = f"{base_url}?limit={page_size}&start_date={last_timestamp}"
    
            all_events = []
            page_count = 0
    
            while next_url and page_count < max_pages:
                logger.info(f"Fetching page {page_count + 1} from: {next_url}")
    
                # Make API request using urllib3
                response = http.request('GET', next_url, headers=headers, timeout=60)
    
                if response.status != 200:
                    raise RuntimeError(f"API request failed: {response.status} {response.data[:256]!r}")
    
                data = json.loads(response.data.decode('utf-8'))
    
                # Extract events from response
                events = data.get('data', [])
                if events:
                    all_events.extend(events)
                    logger.info(f"Collected {len(events)} events from page {page_count + 1}")
    
                    # Check for next page
                    next_url = data.get('metadata', {}).get('links', {}).get('next')
                    page_count += 1
                else:
                    logger.info("No events found on current page")
                    break
    
            logger.info(f"Total events collected: {len(all_events)}")
    
            # Store events in S3 if any were collected
            if all_events:
                timestamp_str = datetime.utcnow().strftime('%Y%m%d_%H%M%S')
                s3_key = f"{s3_prefix}cisco_amp_events_{timestamp_str}.ndjson"
    
                # Convert events to NDJSON format (one JSON object per line)
                ndjson_content = 'n'.join(json.dumps(event) for event in all_events)
    
                # Upload to S3
                s3_client.put_object(
                    Bucket=s3_bucket,
                    Key=s3_key,
                    Body=ndjson_content.encode('utf-8'),
                    ContentType='application/x-ndjson'
                )
    
                logger.info(f"Uploaded {len(all_events)} events to s3://{s3_bucket}/{s3_key}")
    
            # Update state file with current timestamp
            current_timestamp = datetime.utcnow().isoformat() + 'Z'
            update_state(s3_bucket, state_key, current_timestamp)
    
            return {
                'statusCode': 200,
                'body': json.dumps({
                    'message': 'Success',
                    'events_collected': len(all_events),
                    'pages_processed': page_count
                })
            }
    
        except Exception as e:
            logger.error(f"Error in lambda_handler: {str(e)}")
            return {
                'statusCode': 500,
                'body': json.dumps({
                    'error': str(e)
                })
            }
    
    def get_last_timestamp(bucket, state_key):
        """
        Get the last run timestamp from S3 state file
        """
        try:
            response = s3_client.get_object(Bucket=bucket, Key=state_key)
            state_data = json.loads(response['Body'].read().decode('utf-8'))
            return state_data.get('last_timestamp')
        except s3_client.exceptions.NoSuchKey:
            logger.info("No state file found, starting from 24 hours ago")
            return None
        except Exception as e:
            logger.warning(f"Error reading state file: {str(e)}")
            return None
    
    def update_state(bucket, state_key, timestamp):
        """
        Update the state file with the current timestamp
        """
        try:
            state_data = {
                'last_timestamp': timestamp,
                'updated_at': datetime.utcnow().isoformat() + 'Z'
            }
    
            s3_client.put_object(
                Bucket=bucket,
                Key=state_key,
                Body=json.dumps(state_data).encode('utf-8'),
                ContentType='application/json'
            )
    
            logger.info(f"Updated state file with timestamp: {timestamp}")
    
        except Exception as e:
            logger.error(f"Error updating state file: {str(e)}")
    
  5. 依序前往「設定」>「環境變數」

  6. 依序點選「編輯」> 新增環境變數

  7. 輸入下列環境變數,並將 換成您的值。

    範例值
    S3_BUCKET cisco-amp-logs
    S3_PREFIX cisco-amp-events/
    STATE_KEY cisco-amp-events/state.json
    AMP_CLIENT_ID <your-client-id>
    AMP_API_KEY <your-api-key>
    API_BASE https://api.amp.cisco.com (或您所在地區的網址)
    PAGE_SIZE 500
    MAX_PAGES 10
  8. 建立函式後,請留在該函式的頁面 (或依序開啟「Lambda」>「Functions」>「cisco-amp-events-collector」)。

  9. 選取「設定」分頁標籤。

  10. 在「一般設定」面板中,按一下「編輯」

  11. 將「Timeout」(逾時間隔) 變更為「5 minutes (300 seconds)」(5 分鐘 (300 秒)),然後按一下「Save」(儲存)

建立 EventBridge 排程

  1. 依序前往「Amazon EventBridge」>「Scheduler」>「Create schedule」
  2. 提供下列設定詳細資料:
    • 週期性時間表費率 (1 hour)。
    • 目標:您的 Lambda 函式 cisco-amp-events-collector
    • 名稱cisco-amp-events-collector-1h
  3. 按一下「建立時間表」

選用:為 Google SecOps 建立唯讀 IAM 使用者和金鑰

  1. 依序前往 AWS 管理中心 > IAM >「Users」(使用者) >「Add users」(新增使用者)
  2. 點選 [Add users] (新增使用者)。
  3. 提供下列設定詳細資料:
    • 使用者:輸入 secops-reader
    • 存取類型:選取「存取金鑰 - 程式輔助存取」
  4. 按一下「建立使用者」
  5. 附加最低讀取權限政策 (自訂):依序選取「Users」(使用者) >「secops-reader」>「Permissions」(權限) >「Add permissions」(新增權限) >「Attach policies directly」(直接附加政策) >「Create policy」(建立政策)
  6. 在 JSON 編輯器中輸入下列政策:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::cisco-amp-logs/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::cisco-amp-logs"
        }
      ]
    }
    
  7. 將名稱設為 secops-reader-policy

  8. 依序前往「建立政策」> 搜尋/選取 >「下一步」>「新增權限」

  9. 依序前往「安全憑證」>「存取金鑰」>「建立存取金鑰」

  10. 下載 CSV (這些值會輸入至動態饋給)。

在 Google SecOps 中設定資訊提供,以便擷取 Cisco AMP for Endpoints 記錄

  1. 依序前往「SIEM 設定」>「動態饋給」
  2. 按一下「+ 新增動態消息」
  3. 在「動態饋給名稱」欄位中輸入動態饋給名稱 (例如 Cisco AMP for Endpoints logs)。
  4. 選取「Amazon S3 V2」做為「來源類型」
  5. 選取「Cisco AMP」做為「記錄類型」
  6. 點選「下一步」
  7. 指定下列輸入參數的值:
    • S3 URIs3://cisco-amp-logs/cisco-amp-events/
    • 來源刪除選項:根據偏好設定選取刪除選項。
    • 檔案存在時間上限:包含在過去天數內修改的檔案。預設值為 180 天。
    • 存取金鑰 ID:具有 S3 值區存取權的使用者存取金鑰。
    • 存取密鑰:具有 S3 bucket 存取權的使用者私密金鑰。
    • 資產命名空間資產命名空間
    • 擷取標籤:套用至這個動態饋給事件的標籤。
  8. 點選「下一步」
  9. 在「完成」畫面中檢查新的動態饋給設定,然後按一下「提交」

UDM 對應表

記錄欄位 UDM 對應 邏輯
有效 read_only_udm.principal.asset.active 直接對應自 computer.active
connector_guid read_only_udm.principal.asset.uuid 直接對應自 computer.connector_guid
日期 read_only_udm.metadata.event_timestamp.seconds 轉換為時間戳記後,直接從 date 對應
偵測 read_only_udm.security_result.threat_name 直接對應自 detection
detection_id read_only_udm.security_result.detection_fields.value 直接對應自 detection_id
disposition read_only_udm.security_result.description 直接對應自 file.disposition
error.error_code read_only_udm.security_result.detection_fields.value 直接對應自 error.error_code
error.description read_only_udm.security_result.detection_fields.value 直接對應自 error.description
event_type read_only_udm.metadata.product_event_type 直接對應自 event_type
event_type_id read_only_udm.metadata.product_log_id 直接對應自 event_type_id
external_ip read_only_udm.principal.asset.external_ip 直接對應自 computer.external_ip
file.file_name read_only_udm.target.file.names 直接對應自 file.file_name
file.file_path read_only_udm.target.file.full_path 直接對應自 file.file_path
file.identity.md5 read_only_udm.security_result.about.file.md5 直接對應自 file.identity.md5
file.identity.md5 read_only_udm.target.file.md5 直接對應自 file.identity.md5
file.identity.sha1 read_only_udm.security_result.about.file.sha1 直接對應自 file.identity.sha1
file.identity.sha1 read_only_udm.target.file.sha1 直接對應自 file.identity.sha1
file.identity.sha256 read_only_udm.security_result.about.file.sha256 直接對應自 file.identity.sha256
file.identity.sha256 read_only_udm.target.file.sha256 直接對應自 file.identity.sha256
file.parent.disposition read_only_udm.target.resource.attribute.labels.value 直接對應自 file.parent.disposition
file.parent.file_name read_only_udm.target.resource.attribute.labels.value 直接對應自 file.parent.file_name
file.parent.identity.md5 read_only_udm.target.resource.attribute.labels.value 直接對應自 file.parent.identity.md5
file.parent.identity.sha1 read_only_udm.target.resource.attribute.labels.value 直接對應自 file.parent.identity.sha1
file.parent.identity.sha256 read_only_udm.target.resource.attribute.labels.value 直接對應自 file.parent.identity.sha256
file.parent.process_id read_only_udm.security_result.about.process.parent_process.pid 直接對應自 file.parent.process_id
file.parent.process_id read_only_udm.target.process.parent_process.pid 直接對應自 file.parent.process_id
主機名稱 read_only_udm.principal.asset.hostname 直接對應自 computer.hostname
主機名稱 read_only_udm.target.hostname 直接對應自 computer.hostname
主機名稱 read_only_udm.target.asset.hostname 直接對應自 computer.hostname
ip read_only_udm.principal.asset.ip 直接對應自 computer.network_addresses.ip
ip read_only_udm.principal.ip 直接對應自 computer.network_addresses.ip
ip read_only_udm.security_result.about.ip 直接對應自 computer.network_addresses.ip
mac read_only_udm.principal.mac 直接對應自 computer.network_addresses.mac
mac read_only_udm.security_result.about.mac 直接對應自 computer.network_addresses.mac
嚴重性 read_only_udm.security_result.severity 根據下列邏輯從 severity 對應:
-「中」->「MEDIUM」
-「高」或「重大」->「HIGH」
-「低」->「LOW」
- 其他 ->「UNKNOWN_SEVERITY」
時間戳記 read_only_udm.metadata.event_timestamp.seconds 直接對應自 timestamp
使用者 read_only_udm.security_result.about.user.user_display_name 直接對應自 computer.user
使用者 read_only_udm.target.user.user_display_name 直接對應自 computer.user
vulnerabilities.cve read_only_udm.extensions.vulns.vulnerabilities.cve_id 直接對應自 vulnerabilities.cve
vulnerabilities.name read_only_udm.extensions.vulns.vulnerabilities.name 直接對應自 vulnerabilities.name
vulnerabilities.score read_only_udm.extensions.vulns.vulnerabilities.cvss_base_score vulnerabilities.score 轉換為浮點數後直接對應
vulnerabilities.url read_only_udm.extensions.vulns.vulnerabilities.vendor_knowledge_base_article_id 直接對應自 vulnerabilities.url
vulnerabilities.version read_only_udm.extensions.vulns.vulnerabilities.cvss_version 直接對應自 vulnerabilities.version
is_alert 如果 event_type 是下列其中一項,請設為 true:「偵測到威脅」、「防止遭到攻擊」、「執行惡意軟體」、「可能感染 Dropper」、「多個受感染的檔案」、「偵測到有安全漏洞的應用程式」,或如果 security_result.severity 是「HIGH」
is_significant 如果 event_type 是下列其中一項,請設為 true:「偵測到威脅」、「防止遭到攻擊」、「執行惡意軟體」、「可能感染 Dropper」、「多個受感染的檔案」、「偵測到有安全漏洞的應用程式」,或如果 security_result.severity 是「HIGH」
read_only_udm.metadata.event_type 根據 event_typesecurity_result.severity 值決定。
- 如果 event_type 是下列其中一項:「已執行的惡意軟體」、「偵測到威脅」、「可能感染 Dropper」、「雲端召回偵測」、「偵測到惡意活動」、「防止遭到攻擊」、「多個受感染的檔案」、「雲端 IOC」、「系統程序保護」、「偵測到有安全漏洞的應用程式」、「威脅已隔離」、「執行遭封鎖」、「雲端召回隔離成功」、「雲端召回還原隔離失敗」、「雲端召回隔離嘗試失敗」、「隔離失敗」,則事件類型會設為「SCAN_FILE」。
- If security_result.severity is "HIGH", then the event type is set to "SCAN_FILE".
- 如果 has_principalhas_target 皆為 true,則事件類型會設為「SCAN_UNCATEGORIZED」。
- 否則,事件類型會設為「GENERIC_EVENT」。
read_only_udm.metadata.log_type 設為「CISCO_AMP」
read_only_udm.metadata.vendor_name 設為「CISCO_AMP」
read_only_udm.security_result.about.file.full_path 直接對應自 file.file_path
read_only_udm.security_result.about.hostname 直接對應自 computer.hostname
read_only_udm.security_result.about.user.user_display_name 直接對應自 computer.user
read_only_udm.security_result.detection_fields.key detection_id 設為「偵測 ID」、error.error_code 設為「錯誤代碼」、error.description 設為「錯誤說明」、file.parent.disposition 設為「父項處置」、file.parent.file_name 設為「父項檔案名稱」、file.parent.identity.md5 設為「父項 MD5」、file.parent.identity.sha1 設為「父項 SHA1」,以及 file.parent.identity.sha256 設為「父項 SHA256」
read_only_udm.security_result.summary 如果 event_type 是下列其中一項:「偵測到威脅」、「防止遭到攻擊」、「執行惡意軟體」、「可能感染 Dropper」、「多個受感染的檔案」、「偵測到有安全漏洞的應用程式」,或 security_result.severity 為「HIGH」,請將 event_type 設為 event_type
read_only_udm.target.asset.ip 直接對應自 computer.network_addresses.ip
read_only_udm.target.resource.attribute.labels.key file.parent.disposition 設為「Parent Disposition」、將 file.parent.file_name 設為「Parent File Name」、將 file.parent.identity.md5 設為「Parent MD5」、將 file.parent.identity.sha1 設為「Parent SHA1」,以及將 file.parent.identity.sha256 設為「Parent SHA256」
timestamp.seconds 轉換為時間戳記後,直接從 date 對應

還有其他問題嗎?向社群成員和 Google SecOps 專業人員尋求答案。