收集 Proofpoint Emerging Threats Pro IOC 日志

支持的平台:

本文档介绍了如何使用 Amazon S3 将 Proofpoint Emerging Threats Pro IOC 日志注入到 Google Security Operations。新兴威胁情报会以 CSV 格式发布 IP 和网域的每小时声誉列表,其中包含威胁情报数据,包括类别、得分和时间信息。解析器代码用于处理 CSV 格式的 ET_PRO 威胁情报数据。它会提取 IP 地址、网域、类别、得分和其他相关信息,并将这些信息映射到标准化的 IOC 格式和 Chronicle UDM 架构,以便在 Google SecOps 中进行进一步分析和使用。

准备工作

请确保满足以下前提条件:

  • 具有创建 Feed 权限的 Google SecOps 实例
  • Proofpoint ET 情报订阅(可访问信誉列表)
  • 来自 https://etadmin.proofpoint.com/api-access 的 ET Intelligence API 密钥
  • 对 AWS(S3、IAM、Lambda、EventBridge)的特权访问权限

收集“新兴威胁专家”认证的前提条件

  1. 登录 ET Intelligence 管理员门户 (https://etadmin.proofpoint.com)
  2. 前往 API 访问权限
  3. 复制并保存 API 密钥
  4. 请与您的 Proofpoint 代表联系,以获取:
    • 详细的 IP 声誉列表网址
    • 详细的网域声誉列表网址

ET Intelligence 提供单独的 IP 和网域声誉列表 CSV 文件,每小时更新一次。使用“详细”格式,其中包含以下列: * 网域列表Domain Name, Category, Score, First Seen, Last Seen, Ports * IP 列表IP Address, Category, Score, First Seen, Last Seen, Ports

配置 AWS S3 存储桶和 IAM

创建 S3 存储桶

  1. 打开 Amazon S3 控制台
  2. 点击创建存储桶
  3. 存储分区名称:输入 et-pro-ioc-bucket(或您偏好的名称)
  4. 区域:选择您的首选区域
  5. 点击创建存储桶

为 Google SecOps 创建 IAM 用户

  1. 打开 IAM 控制台
  2. 依次点击“用户”图标 >“创建用户”
  3. 用户名:输入 secops-reader
  4. 点击下一步
  5. 选择直接附加政策
  6. 点击创建政策
  7. 在 JSON 编辑器中,输入以下政策:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::et-pro-ioc-bucket/et-pro-ioc/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::et-pro-ioc-bucket",
          "Condition": {
            "StringLike": {
              "s3:prefix": ["et-pro-ioc/*"]
            }
          }
        }
      ]
    }
    
  8. 将政策命名为 SecOpsReaderPolicy

  9. 点击创建政策

  10. 返回到用户创建页面,选择新创建的政策。

  11. 依次点击下一步 > 创建用户

  12. 前往安全凭据标签页。

  13. 点击创建访问密钥

  14. 选择第三方服务

  15. 点击创建访问密钥

  16. 下载并保存凭据。

为 Lambda 配置 IAM 角色

  1. 在 AWS 控制台中,依次前往 IAM > 角色 > 创建角色
  2. 依次选择 AWS 服务 > Lambda
  3. 点击下一步
  4. 点击创建政策
  5. 选择 JSON 标签页,然后输入以下内容:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::et-pro-ioc-bucket/et-pro-ioc/*"
        },
        {
          "Sid": "AllowStateManagement",
          "Effect": "Allow",
          "Action": ["s3:GetObject", "s3:PutObject"],
          "Resource": "arn:aws:s3:::et-pro-ioc-bucket/et-pro-ioc/state.json"
        }
      ]
    }
    
  6. 将政策命名为 EtProIocLambdaPolicy

  7. 点击创建政策

  8. 返回到角色创建界面,附加政策。

  9. 将角色命名为 EtProIocLambdaRole

  10. 点击 Create role

创建 Lambda 函数

  1. AWS 控制台中,依次前往 Lambda > 函数 > 创建函数
  2. 点击从头开始创作
  3. 提供以下配置详细信息:

    • 函数名称et-pro-ioc-fetcher
    • 运行时:Python 3.13
    • 架构:x86_64
    • 执行角色:使用现有角色 EtProIocLambdaRole
  4. 创建后,前往 Code(代码)标签页,然后替换为:

    #!/usr/bin/env python3
    # Lambda: Fetch ET Pro IOC reputation lists and write raw CSV to S3
    import os
    import time
    import json
    from datetime import datetime
    from urllib.request import Request, urlopen
    from urllib.error import HTTPError, URLError
    import boto3
    
    # Environment variables
    BUCKET = os.environ["S3_BUCKET"]
    PREFIX = os.environ.get("S3_PREFIX", "et-pro-ioc/").strip("/")
    ET_API_KEY = os.environ["ET_API_KEY"]
    ET_IP_LIST_URL = os.environ["ET_IP_LIST_URL"]
    ET_DOMAIN_LIST_URL = os.environ["ET_DOMAIN_LIST_URL"]
    STATE_KEY = os.environ.get("STATE_KEY", f"{PREFIX}/state.json")
    TIMEOUT = int(os.environ.get("TIMEOUT", "120"))
    
    s3 = boto3.client("s3")
    
    def _build_request(url: str) -> Request:
        """Build request with ET API authentication"""
        if not url.lower().startswith("https://"):
            raise ValueError("Only HTTPS URLs are allowed")
    
        req = Request(url, method="GET")
        # ET Intelligence uses Authorization header with API key
        req.add_header("Authorization", ET_API_KEY)
        return req
    
    def fetch_with_retry(url: str, max_retries: int = 3) -> bytes:
        """Fetch URL with retry logic for rate limits"""
        for attempt in range(max_retries):
            try:
                req = _build_request(url)
                with urlopen(req, timeout=TIMEOUT) as response:
                    if response.status == 200:
                        return response.read()
                    elif response.status == 429:
                        # Rate limited, wait and retry
                        wait_time = min(30 * (2 ** attempt), 300)
                        print(f"Rate limited, waiting {wait_time}s...")
                        time.sleep(wait_time)
                    else:
                        raise HTTPError(url, response.status, response.reason, {}, None)
            except URLError as e:
                if attempt == max_retries - 1:
                    raise
                time.sleep(5 * (attempt + 1))
    
        raise Exception(f"Failed to fetch {url} after {max_retries} attempts")
    
    def save_to_s3(key: str, content: bytes):
        """Save content to S3 with appropriate content type"""
        s3.put_object(
            Bucket=BUCKET,
            Key=key,
            Body=content,
            ContentType="text/csv"
        )
        print(f"Saved {len(content)} bytes to s3://{BUCKET}/{key}")
    
    def get_state():
        """Get last fetch state from S3"""
        try:
            response = s3.get_object(Bucket=BUCKET, Key=STATE_KEY)
            return json.loads(response['Body'].read())
        except:
            return {}
    
    def save_state(state: dict):
        """Save fetch state to S3"""
        s3.put_object(
            Bucket=BUCKET,
            Key=STATE_KEY,
            Body=json.dumps(state, indent=2),
            ContentType="application/json"
        )
    
    def lambda_handler(event, context):
        """Main Lambda handler"""
        print("Starting ET Pro IOC fetch")
    
        # Generate timestamp for file naming
        now = datetime.utcnow()
        timestamp = now.strftime("%Y/%m/%d/%H%M%S")
    
        results = []
        errors = []
    
        # Fetch IP reputation list
        try:
            print(f"Fetching IP reputation list...")
            ip_data = fetch_with_retry(ET_IP_LIST_URL)
            ip_key = f"{PREFIX}/ip/{timestamp}.csv"
            save_to_s3(ip_key, ip_data)
            results.append({"type": "ip", "key": ip_key, "size": len(ip_data)})
        except Exception as e:
            error_msg = f"Failed to fetch IP list: {str(e)}"
            print(error_msg)
            errors.append(error_msg)
    
        # Fetch Domain reputation list
        try:
            print(f"Fetching Domain reputation list...")
            domain_data = fetch_with_retry(ET_DOMAIN_LIST_URL)
            domain_key = f"{PREFIX}/domain/{timestamp}.csv"
            save_to_s3(domain_key, domain_data)
            results.append({"type": "domain", "key": domain_key, "size": len(domain_data)})
        except Exception as e:
            error_msg = f"Failed to fetch Domain list: {str(e)}"
            print(error_msg)
            errors.append(error_msg)
    
        # Save state
        state = {
            "last_fetch": now.isoformat(),
            "results": results,
            "errors": errors
        }
        save_state(state)
    
        return {
            "statusCode": 200 if not errors else 207,
            "body": json.dumps(state)
        }
    
  5. 依次前往配置 > 常规配置

  6. 点击修改

  7. 超时设置为 5 分钟

  8. 点击保存

配置环境变量

  1. 依次前往配置 > 环境变量
  2. 依次点击修改 > 添加环境变量
  3. 添加以下变量:

    S3_BUCKET et-pro-ioc-bucket
    S3_PREFIX et-pro-ioc
    STATE_KEY et-pro-ioc/state.json
    ET_API_KEY [Your ET API Key]
    ET_IP_LIST_URL [Your detailed IP list URL]
    ET_DOMAIN_LIST_URL [Your detailed Domain list URL]
    TIMEOUT 120
  4. 点击保存

如需了解订阅的准确网址,请与您的 Proofpoint 代表联系。详细格式网址通常遵循以下模式: * IP 列表:https://rules.emergingthreatspro.com/[your-code]/reputation/detailed-iprepdata.txt * 网域列表:https://rules.emergingthreatspro.com/[your-code]/reputation/detailed-domainrepdata.txt

创建 EventBridge 计划

  1. 依次前往 Amazon EventBridge > 计划 > 创建计划
  2. 时间安排名称et-pro-ioc-hourly
  3. 安排模式:基于费率的安排
  4. 速率表达式:1 小时
  5. 点击下一步
  6. 目标:Lambda 函数
  7. 函数et-pro-ioc-fetcher
  8. 点击下一步,完成剩余步骤
  9. 点击创建时间表

在 Google SecOps 中配置 Feed

您需要创建两个单独的 Feed,一个用于 IP 地址声誉,另一个用于网域声誉。

创建 IP 信誉 Feed

  1. 依次前往 SIEM 设置> Feed
  2. 点击新增
  3. Feed 名称字段中,输入 ET Pro IOC - IP Reputation
  4. 来源类型列表中,选择 Amazon S3
  5. 选择 Emerging Threats Pro 作为日志类型
  6. 点击下一步
  7. 为以下输入参数指定值:
    • S3 URIs3://et-pro-ioc-bucket/et-pro-ioc/ip/
    • 来源删除选项:根据您的偏好选择
    • 文件存在时间上限:包含在过去指定天数内修改的文件。默认值为 180 天。
    • 访问密钥 ID:SecOps 读取者访问密钥
    • 私有访问密钥:SecOps 读取者密钥
    • 资产命名空间资产命名空间
    • 注入标签:应用于此 Feed 中事件的标签。
  8. 点击下一步
  9. 检查并点击提交

创建网域声誉 Feed

  1. 重复 Feed 创建流程。
  2. Feed 名称字段中,输入 ET Pro IOC - Domain Reputation
  3. 来源类型列表中,选择 Amazon S3
  4. 选择 Emerging Threats Pro 作为日志类型
  5. 点击下一步
  6. 为以下输入参数指定值:
    • S3 URIs3://et-pro-ioc-bucket/et-pro-ioc/domain/
    • 来源删除选项:根据您的偏好选择
    • 文件存在时间上限:包含在过去指定天数内修改的文件。默认值为 180 天。
    • 访问密钥 ID:SecOps 读取者访问密钥
    • 私有访问密钥:SecOps 读取者密钥
    • 资产命名空间资产命名空间
    • 注入标签:应用于此 Feed 中事件的标签。
  7. 点击下一步
  8. 检查并点击提交

UDM 映射表

日志字段 UDM 映射 逻辑
类别 此字段用于解析器逻辑中,但未直接映射到 UDM。它通过查找表确定 event.ioc.categorization 的值。
collection_time.nanos event.idm.entity.metadata.collected_timestamp.nanos 直接从原始日志映射。
collection_time.seconds event.idm.entity.metadata.collected_timestamp.seconds 直接从原始日志映射。
数据 系统会根据此字段的内容将其解析为多个 UDM 字段。
first_seen event.idm.entity.metadata.interval.start_time 解析为日期并映射到 UDM。
first_seen event.ioc.active_timerange.start 解析为日期并映射到 UDM。
ip_or_domain event.idm.entity.entity.hostname 如果 Grok 模式从字段中提取了主机,则映射到 UDM。
ip_or_domain event.idm.entity.entity.ip 如果 Grok 模式未从字段中提取主机,则映射到 UDM。
ip_or_domain event.ioc.domain_and_ports.domain 如果 Grok 模式从字段中提取了主机,则映射到 UDM。
ip_or_domain event.ioc.ip_and_ports.ip_address 如果 Grok 模式未从字段中提取主机,则映射到 UDM。
last_seen event.idm.entity.metadata.interval.end_time 解析为日期并映射到 UDM。
last_seen event.ioc.active_timerange.end 解析为日期并映射到 UDM。
端口 event.idm.entity.entity.labels.value 如果存在多个端口,则进行解析、使用英文逗号分隔符进行联接,并映射到 UDM。
端口 event.idm.entity.entity.port 如果只有一个端口,则解析并映射到 UDM。
端口 event.ioc.domain_and_ports.ports 如果 Grok 模式从字段中提取了主机,则会解析并映射到 UDM。
端口 event.ioc.ip_and_ports.ports 如果 Grok 模式未从字段中提取主机,则解析并映射到 UDM。
得分 event.ioc.confidence_score 直接从原始日志映射。
event.idm.entity.entity.labels.key 如果有多个端口,请设置为“ports”。
event.idm.entity.metadata.entity_type 如果 Grok 模式从 ip_or_domain 字段中提取主机,则设置为“DOMAIN_NAME”,否则设置为“IP_ADDRESS”。
event.idm.entity.metadata.threat.category 设置为“SOFTWARE_MALICIOUS”。
event.idm.entity.metadata.threat.category_details 使用查找表从 category 字段派生。
event.idm.entity.metadata.threat.threat_name 设置为“ET 情报代表名单”。
event.idm.entity.metadata.vendor_name 设置为“ET_PRO_IOC”。
event.ioc.feed_name 设置为“ET 情报代表名单”。
event.ioc.raw_severity 设置为“恶意”。
timestamp.nanos collection_time.nanos 复制的。
timestamp.seconds collection_time.seconds 复制的。

需要更多帮助?获得社区成员和 Google SecOps 专业人士的解答。