收集 Proofpoint Emerging Threats Pro 入侵指標記錄

支援的國家/地區:

本文說明如何使用 Amazon S3,將 Proofpoint Emerging Threats Pro IOC 記錄擷取至 Google Security Operations。Emerging Threats Intelligence 會以 CSV 格式發布 IP 和網域的每小時信譽清單,其中包含類別、分數和時間資訊等威脅情報資料。剖析器程式碼會處理 CSV 格式的 ET_PRO 威脅情報資料。這項功能會擷取 IP 位址、網域、類別、分數和其他相關資訊,並將這些資訊對應至標準化 IOC 格式和 Chronicle UDM 架構,以便在 Google SecOps 中進一步分析及使用。

事前準備

請確認您已完成下列事前準備事項:

  • 具備建立動態消息權限的 Google SecOps 執行個體
  • 訂閱 Proofpoint ET Intelligence,並具備信譽清單的存取權
  • 從 https://etadmin.proofpoint.com/api-access 取得 ET Intelligence API 金鑰
  • AWS 的特殊存取權 (S3、IAM、Lambda、EventBridge)

收集新興威脅專業人員先決條件

  1. 前往 https://etadmin.proofpoint.com 登入 ET Intelligence Admin Portal
  2. 前往「API 存取權」
  3. 複製並儲存 API 金鑰
  4. 請與 Proofpoint 代表聯絡,取得下列資訊:
    • 詳細 IP 信譽清單網址
    • 詳細網域信譽清單網址

ET Intelligence 會提供 IP 和網域信譽清單的個別 CSV 檔案,並每小時更新。使用「詳細」格式,其中包含下列資料欄: * 網域清單Domain Name, Category, Score, First Seen, Last Seen, Ports * IP 清單IP Address, Category, Score, First Seen, Last Seen, Ports

設定 AWS S3 值區和 IAM

建立 S3 bucket

  1. 開啟 Amazon S3 控制台
  2. 點選 [建立 Bucket]
  3. Bucket name:輸入 et-pro-ioc-bucket (或您偏好的名稱)
  4. 「Region」(區域):選取偏好的區域
  5. 點選 [建立 Bucket]

為 Google SecOps 建立 IAM 使用者

  1. 開啟 IAM 主控台
  2. 依序點選「使用者」>「建立使用者」
  3. 使用者名稱:輸入 secops-reader
  4. 點選 [下一步]。
  5. 選取「直接附加政策」
  6. 點選「建立政策」
  7. 在 JSON 編輯器中輸入下列政策:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::et-pro-ioc-bucket/et-pro-ioc/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::et-pro-ioc-bucket",
          "Condition": {
            "StringLike": {
              "s3:prefix": ["et-pro-ioc/*"]
            }
          }
        }
      ]
    }
    
  8. 將政策命名為 SecOpsReaderPolicy

  9. 按一下「建立政策」

  10. 返回使用者建立程序,選取新建立的政策。

  11. 依序點選「下一步」>「建立使用者」

  12. 前往「安全憑證」分頁。

  13. 按一下「建立存取金鑰」

  14. 選取「第三方服務」

  15. 按一下「建立存取金鑰」

  16. 下載並儲存憑證。

設定 Lambda 的 IAM 角色

  1. 在 AWS 控制台中,依序前往「IAM」>「Roles」>「Create role」
  2. 依序選取「AWS 服務」>「Lambda」
  3. 點選「下一步」
  4. 按一下「建立政策」
  5. 選取「JSON」分頁標籤,然後輸入下列內容:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::et-pro-ioc-bucket/et-pro-ioc/*"
        },
        {
          "Sid": "AllowStateManagement",
          "Effect": "Allow",
          "Action": ["s3:GetObject", "s3:PutObject"],
          "Resource": "arn:aws:s3:::et-pro-ioc-bucket/et-pro-ioc/state.json"
        }
      ]
    }
    
  6. 將政策命名為 EtProIocLambdaPolicy

  7. 按一下「建立政策」

  8. 返回角色建立程序,附加政策。

  9. 將角色命名為 EtProIocLambdaRole

  10. 按一下「建立角色」

建立 Lambda 函式

  1. AWS 控制台中,依序前往「Lambda」>「Functions」>「Create function」
  2. 按一下「從頭開始撰寫」
  3. 請提供下列設定詳細資料:

    • 函式名稱et-pro-ioc-fetcher
    • 執行階段:Python 3.13
    • 架構:x86_64
    • 執行角色:使用現有角色 EtProIocLambdaRole
  4. 建立後,請前往「程式碼」分頁,並替換為:

    #!/usr/bin/env python3
    # Lambda: Fetch ET Pro IOC reputation lists and write raw CSV to S3
    import os
    import time
    import json
    from datetime import datetime
    from urllib.request import Request, urlopen
    from urllib.error import HTTPError, URLError
    import boto3
    
    # Environment variables
    BUCKET = os.environ["S3_BUCKET"]
    PREFIX = os.environ.get("S3_PREFIX", "et-pro-ioc/").strip("/")
    ET_API_KEY = os.environ["ET_API_KEY"]
    ET_IP_LIST_URL = os.environ["ET_IP_LIST_URL"]
    ET_DOMAIN_LIST_URL = os.environ["ET_DOMAIN_LIST_URL"]
    STATE_KEY = os.environ.get("STATE_KEY", f"{PREFIX}/state.json")
    TIMEOUT = int(os.environ.get("TIMEOUT", "120"))
    
    s3 = boto3.client("s3")
    
    def _build_request(url: str) -> Request:
        """Build request with ET API authentication"""
        if not url.lower().startswith("https://"):
            raise ValueError("Only HTTPS URLs are allowed")
    
        req = Request(url, method="GET")
        # ET Intelligence uses Authorization header with API key
        req.add_header("Authorization", ET_API_KEY)
        return req
    
    def fetch_with_retry(url: str, max_retries: int = 3) -> bytes:
        """Fetch URL with retry logic for rate limits"""
        for attempt in range(max_retries):
            try:
                req = _build_request(url)
                with urlopen(req, timeout=TIMEOUT) as response:
                    if response.status == 200:
                        return response.read()
                    elif response.status == 429:
                        # Rate limited, wait and retry
                        wait_time = min(30 * (2 ** attempt), 300)
                        print(f"Rate limited, waiting {wait_time}s...")
                        time.sleep(wait_time)
                    else:
                        raise HTTPError(url, response.status, response.reason, {}, None)
            except URLError as e:
                if attempt == max_retries - 1:
                    raise
                time.sleep(5 * (attempt + 1))
    
        raise Exception(f"Failed to fetch {url} after {max_retries} attempts")
    
    def save_to_s3(key: str, content: bytes):
        """Save content to S3 with appropriate content type"""
        s3.put_object(
            Bucket=BUCKET,
            Key=key,
            Body=content,
            ContentType="text/csv"
        )
        print(f"Saved {len(content)} bytes to s3://{BUCKET}/{key}")
    
    def get_state():
        """Get last fetch state from S3"""
        try:
            response = s3.get_object(Bucket=BUCKET, Key=STATE_KEY)
            return json.loads(response['Body'].read())
        except:
            return {}
    
    def save_state(state: dict):
        """Save fetch state to S3"""
        s3.put_object(
            Bucket=BUCKET,
            Key=STATE_KEY,
            Body=json.dumps(state, indent=2),
            ContentType="application/json"
        )
    
    def lambda_handler(event, context):
        """Main Lambda handler"""
        print("Starting ET Pro IOC fetch")
    
        # Generate timestamp for file naming
        now = datetime.utcnow()
        timestamp = now.strftime("%Y/%m/%d/%H%M%S")
    
        results = []
        errors = []
    
        # Fetch IP reputation list
        try:
            print(f"Fetching IP reputation list...")
            ip_data = fetch_with_retry(ET_IP_LIST_URL)
            ip_key = f"{PREFIX}/ip/{timestamp}.csv"
            save_to_s3(ip_key, ip_data)
            results.append({"type": "ip", "key": ip_key, "size": len(ip_data)})
        except Exception as e:
            error_msg = f"Failed to fetch IP list: {str(e)}"
            print(error_msg)
            errors.append(error_msg)
    
        # Fetch Domain reputation list
        try:
            print(f"Fetching Domain reputation list...")
            domain_data = fetch_with_retry(ET_DOMAIN_LIST_URL)
            domain_key = f"{PREFIX}/domain/{timestamp}.csv"
            save_to_s3(domain_key, domain_data)
            results.append({"type": "domain", "key": domain_key, "size": len(domain_data)})
        except Exception as e:
            error_msg = f"Failed to fetch Domain list: {str(e)}"
            print(error_msg)
            errors.append(error_msg)
    
        # Save state
        state = {
            "last_fetch": now.isoformat(),
            "results": results,
            "errors": errors
        }
        save_state(state)
    
        return {
            "statusCode": 200 if not errors else 207,
            "body": json.dumps(state)
        }
    
  5. 依序前往「設定」>「一般設定」

  6. 按一下 [編輯]

  7. 將「Timeout」(逾時) 設定為「5 minutes」(5 分鐘)

  8. 按一下 [儲存]

設定環境變數

  1. 依序前往「設定」>「環境變數」
  2. 依序點選「編輯」>「新增環境變數」
  3. 新增下列變數:

    S3_BUCKET et-pro-ioc-bucket
    S3_PREFIX et-pro-ioc
    STATE_KEY et-pro-ioc/state.json
    ET_API_KEY [Your ET API Key]
    ET_IP_LIST_URL [Your detailed IP list URL]
    ET_DOMAIN_LIST_URL [Your detailed Domain list URL]
    TIMEOUT 120
  4. 按一下「儲存」

如要取得訂閱方案的確切網址,請洽詢 Proofpoint 代表。詳細格式網址通常會遵循下列模式: * IP 清單:https://rules.emergingthreatspro.com/[your-code]/reputation/detailed-iprepdata.txt * 網域清單:https://rules.emergingthreatspro.com/[your-code]/reputation/detailed-domainrepdata.txt

建立 EventBridge 排程

  1. 依序前往「Amazon EventBridge」>「Schedules」>「Create schedule」
  2. 時間表名稱et-pro-ioc-hourly
  3. 排程模式:以費率為準的排程
  4. 費率表達方式:1 小時
  5. 點選 [下一步]。
  6. 目標:Lambda 函式
  7. 功能et-pro-ioc-fetcher
  8. 按「下一步」完成其餘步驟
  9. 按一下「建立時間表」

在 Google SecOps 中設定動態饋給

您需要建立兩個不同的動態饋給,分別用於 IP 信譽和網域信譽。

建立 IP 信譽動態饋給

  1. 依序前往「SIEM 設定」>「動態饋給」
  2. 按一下「新增」
  3. 在「動態饋給名稱」欄位中,輸入 ET Pro IOC - IP Reputation
  4. 在「來源類型」清單中,選取「Amazon S3」
  5. 選取「新興威脅 Pro」做為「記錄類型」
  6. 點選 [下一步]。
  7. 指定下列輸入參數的值:
    • S3 URIs3://et-pro-ioc-bucket/et-pro-ioc/ip/
    • 來源刪除選項:根據偏好設定選取
    • 檔案存在時間上限:包含在過去天數內修改的檔案。預設值為 180 天。
    • 存取金鑰 ID:SecOps 讀取者存取金鑰
    • 存取密鑰:SecOps 讀取者存取密鑰
    • 資產命名空間資產命名空間
    • 擷取標籤:套用至這個動態饋給事件的標籤。
  8. 點選 [下一步]。
  9. 檢查並按一下「提交」

建立網域信譽動態饋給

  1. 重複動態饋給建立程序。
  2. 在「動態饋給名稱」欄位中輸入 ET Pro IOC - Domain Reputation
  3. 在「Source type」(來源類型) 清單中,選取「Amazon S3」
  4. 選取「新興威脅專業版」做為「記錄類型」。
  5. 點選「下一步」
  6. 指定下列輸入參數的值:
    • S3 URIs3://et-pro-ioc-bucket/et-pro-ioc/domain/
    • 來源刪除選項:根據偏好設定選取
    • 檔案存在時間上限:包含在過去天數內修改的檔案。預設值為 180 天。
    • 存取金鑰 ID:SecOps 讀取者存取金鑰
    • 存取密鑰:SecOps 讀取者存取密鑰
    • 資產命名空間資產命名空間
    • 擷取標籤:套用至這個動態饋給事件的標籤。
  7. 點選 [下一步]。
  8. 檢查並按一下「提交」

UDM 對應表

記錄欄位 UDM 對應 邏輯
category 這個欄位用於剖析器邏輯,但不會直接對應至 UDM。並透過對照表決定 event.ioc.categorization 的值。
collection_time.nanos event.idm.entity.metadata.collected_timestamp.nanos 直接從原始記錄對應。
collection_time.seconds event.idm.entity.metadata.collected_timestamp.seconds 直接從原始記錄對應。
資料 系統會根據這個欄位的內容,剖析成多個 UDM 欄位。
first_seen event.idm.entity.metadata.interval.start_time 剖析為日期並對應至 UDM。
first_seen event.ioc.active_timerange.start 剖析為日期並對應至 UDM。
ip_or_domain event.idm.entity.entity.hostname 如果 grok 模式從欄位中擷取主機,則會對應至 UDM。
ip_or_domain event.idm.entity.entity.ip 如果 grok 模式未從欄位擷取主機,則會對應至 UDM。
ip_or_domain event.ioc.domain_and_ports.domain 如果 grok 模式從欄位中擷取主機,則會對應至 UDM。
ip_or_domain event.ioc.ip_and_ports.ip_address 如果 grok 模式未從欄位擷取主機,則會對應至 UDM。
last_seen event.idm.entity.metadata.interval.end_time 剖析為日期並對應至 UDM。
last_seen event.ioc.active_timerange.end 剖析為日期並對應至 UDM。
連接埠 event.idm.entity.entity.labels.value 如果有多個通訊埠,系統會剖析這些通訊埠,以半形逗號分隔符號加入,並對應至 UDM。
連接埠 event.idm.entity.entity.port 如果只有一個通訊埠,系統會剖析並對應至 UDM。
連接埠 event.ioc.domain_and_ports.ports 如果 grok 模式從欄位擷取主機,系統會剖析並對應至 UDM。
連接埠 event.ioc.ip_and_ports.ports 如果 grok 模式未從欄位擷取主機,則會剖析並對應至 UDM。
分數 event.ioc.confidence_score 直接從原始記錄對應。
event.idm.entity.entity.labels.key 如果有多個通訊埠,請設為「ports」。
event.idm.entity.metadata.entity_type 如果 grok 模式從 ip_or_domain 欄位擷取主機,請設為「DOMAIN_NAME」,否則請設為「IP_ADDRESS」。
event.idm.entity.metadata.threat.category 設為「SOFTWARE_MALICIOUS」。
event.idm.entity.metadata.threat.category_details 使用查閱資料表從 category 欄位衍生而來。
event.idm.entity.metadata.threat.threat_name 設為「ET Intelligence Rep List」。
event.idm.entity.metadata.vendor_name 設為「ET_PRO_IOC」。
event.ioc.feed_name 設為「ET Intelligence Rep List」。
event.ioc.raw_severity 設為「惡意」。
timestamp.nanos 從「collection_time.nanos」複製的項目。
timestamp.seconds 從「collection_time.seconds」複製的項目。

還有其他問題嗎?向社群成員和 Google SecOps 專業人員尋求答案。