收集 Cisco AMP for Endpoints 日志

支持的平台:

本文档介绍了如何使用 Amazon S3 将 Cisco AMP for Endpoints 日志注入到 Google Security Operations。解析器将原始 JSON 格式的日志转换为符合 Chronicle UDM 的结构化格式。它从嵌套的 JSON 对象中提取字段,将其映射到 UDM 架构,识别事件类别,分配严重程度,并最终生成统一的事件输出,在满足特定条件时标记安全提醒。

准备工作

  • Google SecOps 实例
  • Cisco AMP for Endpoints 控制台的特权访问权限
  • AWS(S3、IAM、Lambda、EventBridge)的特权访问权限

收集 Cisco AMP for Endpoints 前提条件(ID、API 密钥、组织 ID、令牌)

  1. 登录 Cisco AMP for Endpoints 控制台
  2. 依次前往账号 > API 凭据
  3. 点击 New API Credential(新的 API 凭据)以创建新的 API 密钥和客户端 ID。
  4. 提供以下配置详细信息:
    • 应用名称:输入一个名称(例如 Chronicle SecOps Integration)。
    • 范围:选择只读以进行基本事件轮询,如果您计划创建 Event Streams,请选择读取和写入
  5. 点击创建
  6. 复制以下详细信息并将其保存在安全的位置:
    • 第三方 API 客户端 ID
    • API 密钥
    • API 基准网址:取决于您所在的区域:
      • 美国:https://api.amp.cisco.com
      • 欧盟:https://api.eu.amp.cisco.com
      • APJC:https://api.apjc.amp.cisco.com

为 Google SecOps 配置 AWS S3 存储桶和 IAM

  1. 按照以下用户指南创建 Amazon S3 存储桶创建存储桶
  2. 保存存储桶名称区域以供日后参考(例如 cisco-amp-logs)。
  3. 按照以下用户指南创建用户创建 IAM 用户
  4. 选择创建的用户
  5. 选择安全凭据标签页。
  6. 访问密钥部分中,点击创建访问密钥
  7. 选择第三方服务作为使用情形
  8. 点击下一步
  9. 可选:添加说明标记。
  10. 点击创建访问密钥
  11. 点击下载 CSV 文件,保存访问密钥秘密访问密钥,以供日后参考。
  12. 点击完成
  13. 选择权限标签页。
  14. 权限政策部分中,点击添加权限
  15. 选择添加权限
  16. 选择直接附加政策
  17. 搜索 AmazonS3FullAccess 政策。
  18. 选择相应政策。
  19. 点击下一步
  20. 点击添加权限

为 S3 上传配置 IAM 政策和角色

  1. AWS 控制台中,依次前往 IAM > 政策
  2. 依次点击创建政策 > JSON 标签页
  3. 输入以下政策:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::cisco-amp-logs/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": "s3:GetObject",
          "Resource": "arn:aws:s3:::cisco-amp-logs/cisco-amp-events/state.json"
        }
      ]
    }
    
    • 如果您输入了其他存储桶名称,请替换 cisco-amp-logs
  4. 依次点击下一步 > 创建政策

  5. 依次前往 IAM > 角色 > 创建角色 > AWS 服务 > Lambda

  6. 附加新创建的政策。

  7. 将角色命名为 cisco-amp-lambda-role,然后点击创建角色

创建 Lambda 函数

  1. AWS 控制台中,依次前往 Lambda > 函数 > 创建函数
  2. 点击从头开始创作
  3. 提供以下配置详细信息:

    设置
    名称 cisco-amp-events-collector
    运行时 Python 3.13
    架构 x86_64
    执行角色 cisco-amp-lambda-role
  4. 创建函数后,打开 Code 标签页,删除桩代码并输入以下代码 (cisco-amp-events-collector.py):

    import json
    import boto3
    import urllib3
    import base64
    from datetime import datetime, timedelta
    import os
    import logging
    
    # Configure logging
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    
    # AWS S3 client and HTTP pool manager
    s3_client = boto3.client('s3')
    http = urllib3.PoolManager()
    
    def lambda_handler(event, context):
        """
        AWS Lambda handler to fetch Cisco AMP events and store them in S3
        """
    
        try:
            # Get environment variables
            s3_bucket = os.environ['S3_BUCKET']
            s3_prefix = os.environ['S3_PREFIX']
            state_key = os.environ['STATE_KEY']
            api_client_id = os.environ['AMP_CLIENT_ID']
            api_key = os.environ['AMP_API_KEY']
            api_base = os.environ['API_BASE']
    
            # Optional parameters
            page_size = int(os.environ.get('PAGE_SIZE', '500'))
            max_pages = int(os.environ.get('MAX_PAGES', '10'))
    
            logger.info(f"Starting Cisco AMP events collection for bucket: {s3_bucket}")
    
            # Get last run timestamp from state file
            last_timestamp = get_last_timestamp(s3_bucket, state_key)
            if not last_timestamp:
                last_timestamp = (datetime.utcnow() - timedelta(days=1)).isoformat() + 'Z'
    
            # Create Basic Auth header
            auth_header = base64.b64encode(f"{api_client_id}:{api_key}".encode()).decode()
            headers = {
                'Authorization': f'Basic {auth_header}',
                'Accept': 'application/json'
            }
    
            # Build initial API URL
            base_url = f"{api_base}/v1/events"
            next_url = f"{base_url}?limit={page_size}&start_date={last_timestamp}"
    
            all_events = []
            page_count = 0
    
            while next_url and page_count < max_pages:
                logger.info(f"Fetching page {page_count + 1} from: {next_url}")
    
                # Make API request using urllib3
                response = http.request('GET', next_url, headers=headers, timeout=60)
    
                if response.status != 200:
                    raise RuntimeError(f"API request failed: {response.status} {response.data[:256]!r}")
    
                data = json.loads(response.data.decode('utf-8'))
    
                # Extract events from response
                events = data.get('data', [])
                if events:
                    all_events.extend(events)
                    logger.info(f"Collected {len(events)} events from page {page_count + 1}")
    
                    # Check for next page
                    next_url = data.get('metadata', {}).get('links', {}).get('next')
                    page_count += 1
                else:
                    logger.info("No events found on current page")
                    break
    
            logger.info(f"Total events collected: {len(all_events)}")
    
            # Store events in S3 if any were collected
            if all_events:
                timestamp_str = datetime.utcnow().strftime('%Y%m%d_%H%M%S')
                s3_key = f"{s3_prefix}cisco_amp_events_{timestamp_str}.ndjson"
    
                # Convert events to NDJSON format (one JSON object per line)
                ndjson_content = 'n'.join(json.dumps(event) for event in all_events)
    
                # Upload to S3
                s3_client.put_object(
                    Bucket=s3_bucket,
                    Key=s3_key,
                    Body=ndjson_content.encode('utf-8'),
                    ContentType='application/x-ndjson'
                )
    
                logger.info(f"Uploaded {len(all_events)} events to s3://{s3_bucket}/{s3_key}")
    
            # Update state file with current timestamp
            current_timestamp = datetime.utcnow().isoformat() + 'Z'
            update_state(s3_bucket, state_key, current_timestamp)
    
            return {
                'statusCode': 200,
                'body': json.dumps({
                    'message': 'Success',
                    'events_collected': len(all_events),
                    'pages_processed': page_count
                })
            }
    
        except Exception as e:
            logger.error(f"Error in lambda_handler: {str(e)}")
            return {
                'statusCode': 500,
                'body': json.dumps({
                    'error': str(e)
                })
            }
    
    def get_last_timestamp(bucket, state_key):
        """
        Get the last run timestamp from S3 state file
        """
        try:
            response = s3_client.get_object(Bucket=bucket, Key=state_key)
            state_data = json.loads(response['Body'].read().decode('utf-8'))
            return state_data.get('last_timestamp')
        except s3_client.exceptions.NoSuchKey:
            logger.info("No state file found, starting from 24 hours ago")
            return None
        except Exception as e:
            logger.warning(f"Error reading state file: {str(e)}")
            return None
    
    def update_state(bucket, state_key, timestamp):
        """
        Update the state file with the current timestamp
        """
        try:
            state_data = {
                'last_timestamp': timestamp,
                'updated_at': datetime.utcnow().isoformat() + 'Z'
            }
    
            s3_client.put_object(
                Bucket=bucket,
                Key=state_key,
                Body=json.dumps(state_data).encode('utf-8'),
                ContentType='application/json'
            )
    
            logger.info(f"Updated state file with timestamp: {timestamp}")
    
        except Exception as e:
            logger.error(f"Error updating state file: {str(e)}")
    
  5. 依次前往配置 > 环境变量

  6. 依次点击修改 > 添加新的环境变量

  7. 输入以下提供的环境变量,并将其替换为您的值。

    示例值
    S3_BUCKET cisco-amp-logs
    S3_PREFIX cisco-amp-events/
    STATE_KEY cisco-amp-events/state.json
    AMP_CLIENT_ID <your-client-id>
    AMP_API_KEY <your-api-key>
    API_BASE https://api.amp.cisco.com(或您所在区域的网址)
    PAGE_SIZE 500
    MAX_PAGES 10
  8. 创建函数后,停留在其页面上(或依次打开 Lambda > 函数 > cisco-amp-events-collector)。

  9. 选择配置标签页。

  10. 常规配置面板中,点击修改

  11. 超时更改为 5 分钟(300 秒),然后点击保存

创建 EventBridge 计划

  1. 依次前往 Amazon EventBridge > 调度器 > 创建调度
  2. 提供以下配置详细信息:
    • 周期性安排费率 (1 hour)。
    • 目标:您的 Lambda 函数 cisco-amp-events-collector
    • 名称cisco-amp-events-collector-1h
  3. 点击创建时间表

可选:为 Google SecOps 创建只读 IAM 用户和密钥

  1. 依次前往 AWS 控制台 > IAM > 用户 > 添加用户
  2. 点击 Add users(添加用户)。
  3. 提供以下配置详细信息:
    • 用户:输入 secops-reader
    • 访问类型:选择访问密钥 - 以程序化方式访问
  4. 点击创建用户
  5. 附加最低限度的读取政策(自定义):依次选择用户 > secops-reader > 权限 > 添加权限 > 直接附加政策 > 创建政策
  6. 在 JSON 编辑器中,输入以下政策:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::cisco-amp-logs/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::cisco-amp-logs"
        }
      ]
    }
    
  7. 将名称设置为 secops-reader-policy

  8. 依次前往创建政策 > 搜索/选择 > 下一步 > 添加权限

  9. 依次前往安全凭据 > 访问密钥 > 创建访问密钥

  10. 下载 CSV(这些值会输入到 Feed 中)。

在 Google SecOps 中配置 Feed 以注入 Cisco AMP for Endpoints 日志

  1. 依次前往 SIEM 设置> Feed
  2. 点击 + 添加新 Feed
  3. Feed 名称字段中,输入 Feed 的名称(例如 Cisco AMP for Endpoints logs)。
  4. 选择 Amazon S3 V2 作为来源类型
  5. 选择 Cisco AMP 作为日志类型
  6. 点击下一步
  7. 为以下输入参数指定值:
    • S3 URIs3://cisco-amp-logs/cisco-amp-events/
    • 来源删除选项:根据您的偏好选择删除选项。
    • 文件存在时间上限:包含在过去指定天数内修改的文件。默认值为 180 天。
    • 访问密钥 ID:有权访问 S3 存储桶的用户访问密钥。
    • 私有访问密钥:具有 S3 存储桶访问权限的用户私有密钥。
    • 资产命名空间资产命名空间
    • 注入标签:应用于此 Feed 中事件的标签。
  8. 点击下一步
  9. 最终确定界面中查看新的 Feed 配置,然后点击提交

UDM 映射表

日志字段 UDM 映射 逻辑
有效 read_only_udm.principal.asset.active 直接从 computer.active 映射
connector_guid read_only_udm.principal.asset.uuid 直接从 computer.connector_guid 映射
日期 read_only_udm.metadata.event_timestamp.seconds date 转换成时间戳后直接映射
检测 read_only_udm.security_result.threat_name 直接从 detection 映射
detection_id read_only_udm.security_result.detection_fields.value 直接从 detection_id 映射
disposition read_only_udm.security_result.description 直接从 file.disposition 映射
error.error_code read_only_udm.security_result.detection_fields.value 直接从 error.error_code 映射
error.description read_only_udm.security_result.detection_fields.value 直接从 error.description 映射
event_type read_only_udm.metadata.product_event_type 直接从 event_type 映射
event_type_id read_only_udm.metadata.product_log_id 直接从 event_type_id 映射
external_ip read_only_udm.principal.asset.external_ip 直接从 computer.external_ip 映射
file.file_name read_only_udm.target.file.names 直接从 file.file_name 映射
file.file_path read_only_udm.target.file.full_path 直接从 file.file_path 映射
file.identity.md5 read_only_udm.security_result.about.file.md5 直接从 file.identity.md5 映射
file.identity.md5 read_only_udm.target.file.md5 直接从 file.identity.md5 映射
file.identity.sha1 read_only_udm.security_result.about.file.sha1 直接从 file.identity.sha1 映射
file.identity.sha1 read_only_udm.target.file.sha1 直接从 file.identity.sha1 映射
file.identity.sha256 read_only_udm.security_result.about.file.sha256 直接从 file.identity.sha256 映射
file.identity.sha256 read_only_udm.target.file.sha256 直接从 file.identity.sha256 映射
file.parent.disposition read_only_udm.target.resource.attribute.labels.value 直接从 file.parent.disposition 映射
file.parent.file_name read_only_udm.target.resource.attribute.labels.value 直接从 file.parent.file_name 映射
file.parent.identity.md5 read_only_udm.target.resource.attribute.labels.value 直接从 file.parent.identity.md5 映射
file.parent.identity.sha1 read_only_udm.target.resource.attribute.labels.value 直接从 file.parent.identity.sha1 映射
file.parent.identity.sha256 read_only_udm.target.resource.attribute.labels.value 直接从 file.parent.identity.sha256 映射
file.parent.process_id read_only_udm.security_result.about.process.parent_process.pid 直接从 file.parent.process_id 映射
file.parent.process_id read_only_udm.target.process.parent_process.pid 直接从 file.parent.process_id 映射
主机名 read_only_udm.principal.asset.hostname 直接从 computer.hostname 映射
主机名 read_only_udm.target.hostname 直接从 computer.hostname 映射
主机名 read_only_udm.target.asset.hostname 直接从 computer.hostname 映射
ip read_only_udm.principal.asset.ip 直接从 computer.network_addresses.ip 映射
ip read_only_udm.principal.ip 直接从 computer.network_addresses.ip 映射
ip read_only_udm.security_result.about.ip 直接从 computer.network_addresses.ip 映射
mac read_only_udm.principal.mac 直接从 computer.network_addresses.mac 映射
mac read_only_udm.security_result.about.mac 直接从 computer.network_addresses.mac 映射
和程度上减少 read_only_udm.security_result.severity 根据以下逻辑从 severity 映射:
-“中”->“中”
-“高”或“严重”->“高”
-“低”->“低”
- 其他 - >“UNKNOWN_SEVERITY”
时间戳 read_only_udm.metadata.event_timestamp.seconds 直接从 timestamp 映射
用户 read_only_udm.security_result.about.user.user_display_name 直接从 computer.user 映射
用户 read_only_udm.target.user.user_display_name 直接从 computer.user 映射
vulnerabilities.cve read_only_udm.extensions.vulns.vulnerabilities.cve_id 直接从 vulnerabilities.cve 映射
vulnerabilities.name read_only_udm.extensions.vulns.vulnerabilities.name 直接从 vulnerabilities.name 映射
vulnerabilities.score read_only_udm.extensions.vulns.vulnerabilities.cvss_base_score vulnerabilities.score 直接映射,转换为浮点数
vulnerabilities.url read_only_udm.extensions.vulns.vulnerabilities.vendor_knowledge_base_article_id 直接从 vulnerabilities.url 映射
vulnerabilities.version read_only_udm.extensions.vulns.vulnerabilities.cvss_version 直接从 vulnerabilities.version 映射
is_alert 如果 event_type 是以下值之一,则设置为 true:“检测到威胁”“漏洞利用防范”“已执行的恶意软件”“潜在的投放程序感染”“多个受感染的文件”“检测到存在漏洞的应用”;如果 security_result.severity 为“高”,则设置为 true
is_significant 如果 event_type 是以下值之一,则设置为 true:“检测到威胁”“漏洞利用防范”“已执行的恶意软件”“潜在的投放程序感染”“多个受感染的文件”“检测到存在漏洞的应用”;如果 security_result.severity 为“高”,则设置为 true
read_only_udm.metadata.event_type 根据 event_typesecurity_result.severity 值确定。
- 如果 event_type 是以下值之一:“Executed malware”“Threat Detected”“Potential Dropper Infection”“Cloud Recall Detection”“Malicious Activity Detection”“Exploit Prevention”“Multiple Infected Files”“Cloud IOC”“System Process Protection”“Vulnerable Application Detected”“Threat Quarantined”“Execution Blocked”“Cloud Recall Quarantine Successful”“Cloud Recall Restore from Quarantine Failed”“Cloud Recall Quarantine Attempt Failed”“Quarantine Failure”,则将事件类型设置为“SCAN_FILE”。
- 如果 security_result.severity 为“HIGH”,则将事件类型设置为“SCAN_FILE”。
- 如果 has_principalhas_target 均为 true,则将事件类型设置为“SCAN_UNCATEGORIZED”。
- 否则,事件类型将设置为“GENERIC_EVENT”。
read_only_udm.metadata.log_type 设置为“CISCO_AMP”
read_only_udm.metadata.vendor_name 设置为“CISCO_AMP”
read_only_udm.security_result.about.file.full_path 直接从 file.file_path 映射
read_only_udm.security_result.about.hostname 直接从 computer.hostname 映射
read_only_udm.security_result.about.user.user_display_name 直接从 computer.user 映射
read_only_udm.security_result.detection_fields.key 对于 detection_id,设置为“检测 ID”;对于 error.error_code,设置为“错误代码”;对于 error.description,设置为“错误说明”;对于 file.parent.disposition,设置为“父处置”;对于 file.parent.file_name,设置为“父文件名”;对于 file.parent.identity.md5,设置为“父 MD5”;对于 file.parent.identity.sha1,设置为“父 SHA1”;对于 file.parent.identity.sha256,设置为“父 SHA256”
read_only_udm.security_result.summary 如果 event_type 是以下值之一,则设置为 event_type:“检测到威胁”“漏洞利用防范”“已执行的恶意软件”“潜在的投放程序感染”“多个受感染的文件”“检测到存在漏洞的应用”;如果 security_result.severity 为“高”,则设置为 event_type
read_only_udm.target.asset.ip 直接从 computer.network_addresses.ip 映射
read_only_udm.target.resource.attribute.labels.key 对于 file.parent.disposition,设置为“父处置”;对于 file.parent.file_name,设置为“父文件名”;对于 file.parent.identity.md5,设置为“父 MD5”;对于 file.parent.identity.sha1,设置为“父 SHA1”;对于 file.parent.identity.sha256,设置为“父 SHA256”
timestamp.seconds date 转换成时间戳后直接映射

需要更多帮助?从社区成员和 Google SecOps 专业人士那里获得解答。