收集 Digital Shadows Indicators 記錄

支援的國家/地區:

本文說明如何使用 Amazon S3,將 Digital Shadows Indicators 記錄檔擷取至 Google Security Operations。

Digital Shadows Indicators (現為 ReliaQuest GreyMatter DRP 的一部分) 是一項數位風險防護平台,可持續監控並偵測公開網路、深網、暗網和社群媒體中的外部威脅、資料外洩和品牌冒用行為。提供威脅情報、事件快訊和入侵指標 (IOC),協助機構識別及降低數位風險。

事前準備

  • Google SecOps 執行個體
  • Digital Shadows Indicators 入口網站的特殊存取權
  • AWS (S3、IAM) 的特殊存取權
  • 已啟用 API 存取權的 Digital Shadows Indicators 訂閱方案

收集 Digital Shadows Indicators API 憑證

  1. 前往 https://portal-digitalshadows.com,登入 Digital Shadows Indicators Portal
  2. 依序前往「設定」>「API 憑證」
  3. 如果沒有現有 API 金鑰,請按一下「Create New API Client」(建立新的 API 用戶端) 或「Generate API Key」(產生 API 金鑰)。
  4. 複製下列詳細資料並存放在安全的位置:

    • API 金鑰:6 個字元的 API 金鑰
    • API 密鑰:32 個字元的 API 密鑰
    • 帳戶 ID:您的帳戶 ID (顯示在入口網站中,或由 Digital Shadows 代表提供)
    • API 基準網址https://api.searchlight.app/v1https://portal-digitalshadows.com/api/v1 (視您的租戶區域而定)

為 Google SecOps 設定 AWS S3 值區和 IAM

  1. 按照這份使用者指南的指示,建立 Amazon S3 bucket建立 bucket
  2. 儲存 bucket 的「名稱」和「地區」,以供日後參考 (例如 digital-shadows-logs)。
  3. 按照這份使用者指南的指示建立「使用者」建立 IAM 使用者
  4. 選取建立的「使用者」
  5. 選取「安全憑證」分頁標籤。
  6. 在「Access Keys」部分中,按一下「Create Access Key」
  7. 選取「第三方服務」做為「用途」
  8. 點選「下一步」
  9. 選用:新增說明標記。
  10. 按一下「建立存取金鑰」
  11. 按一下「Download .csv file」(下載 .csv 檔案),儲存「Access Key」(存取金鑰) 和「Secret Access Key」(私密存取金鑰),以供日後參考。
  12. 按一下 [完成]
  13. 選取 [權限] 分頁標籤。
  14. 在「Permissions policies」(權限政策) 區段中,按一下「Add permissions」(新增權限)
  15. 選取「新增權限」
  16. 選取「直接附加政策」
  17. 搜尋「AmazonS3FullAccess」AmazonS3FullAccess政策。
  18. 選取政策。
  19. 點選「下一步」
  20. 按一下「Add permissions」。

設定 S3 上傳的身分與存取權管理政策和角色

  1. 在 AWS 控制台中,依序前往「IAM」>「Policies」>「Create policy」>「JSON」分頁標籤。
  2. 複製並貼上下列政策。
  3. 政策 JSON (如果您輸入的 bucket 名稱不同,請替換 digital-shadows-logs):

    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "AllowPutObjects",
                "Effect": "Allow",
                "Action": "s3:PutObject",
                "Resource": "arn:aws:s3:::digital-shadows-logs/*"
            },
            {
                "Sid": "AllowGetStateObject",
                "Effect": "Allow",
                "Action": "s3:GetObject",
                "Resource": "arn:aws:s3:::digital-shadows-logs/digital-shadows/state.json"
            }
        ]
    }
    
  4. 依序點選「下一步」>「建立政策」

  5. 依序前往「IAM」>「Roles」>「Create role」>「AWS service」>「Lambda」。

  6. 附加新建立的政策。

  7. 為角色命名 DigitalShadowsLambdaRole,然後按一下「建立角色」

建立 Lambda 函式

  1. AWS 控制台中,依序前往「Lambda」>「Functions」>「Create function」
  2. 按一下「從頭開始撰寫」
  3. 請提供下列設定詳細資料:

    設定
    名稱 DigitalShadowsCollector
    執行階段 Python 3.13
    架構 x86_64
    執行角色 DigitalShadowsLambdaRole
  4. 建立函式後,開啟「程式碼」分頁,刪除存根,然後貼上以下程式碼 (DigitalShadowsCollector.py)。

    import urllib3
    import json
    import boto3
    import os
    import base64
    import logging
    import time
    from datetime import datetime, timedelta, timezone
    from urllib.parse import urlencode
    
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    
    HTTP = urllib3.PoolManager(retries=False)
    storage_client = boto3.client('s3')
    
    def _basic_auth_header(key: str, secret: str) -> str:
        token = base64.b64encode(f"{key}:{secret}".encode("utf-8")).decode("utf-8")
        return f"Basic {token}"
    
    def _load_state(bucket, key, default_days=30) -> str:
        """Return ISO8601 checkpoint (UTC)."""
        try:
            response = storage_client.get_object(Bucket=bucket, Key=key)
            state_data = response['Body'].read().decode('utf-8')
            state = json.loads(state_data)
            ts = state.get("last_timestamp")
            if ts:
                return ts
        except storage_client.exceptions.NoSuchKey:
            logger.info("No previous state found, starting from default lookback")
        except Exception as e:
            logger.warning(f"State read error: {e}")
        return (datetime.now(timezone.utc) - timedelta(days=default_days)).isoformat()
    
    def _save_state(bucket, key, ts: str) -> None:
        storage_client.put_object(
            Bucket=bucket,
            Key=key,
            Body=json.dumps({"last_timestamp": ts}),
            ContentType="application/json"
        )
    
    def _get_json(url: str, headers: dict, params: dict, backoff_s=2, max_retries=3) -> dict:
        qs = f"?{urlencode(params)}" if params else ""
        for attempt in range(max_retries):
            r = HTTP.request("GET", f"{url}{qs}", headers=headers)
            if r.status == 200:
                return json.loads(r.data.decode("utf-8"))
            if r.status in (429, 500, 502, 503, 504):
                wait = backoff_s * (2 ** attempt)
                logger.warning(f"HTTP {r.status} from DS API, retrying in {wait}s")
                time.sleep(wait)
                continue
            raise RuntimeError(f"DS API error {r.status}: {r.data[:200]}")
        raise RuntimeError("Exceeded retry budget for DS API")
    
    def _collect(api_base, headers, path, since_ts, account_id, page_size, max_pages, time_param):
        items = []
        for page in range(max_pages):
            params = {
                "limit": page_size,
                "offset": page * page_size,
                time_param: since_ts,
            }
            if account_id:
                params["account-id"] = account_id
            data = _get_json(f"{api_base}/{path}", headers, params)
            batch = data.get("items") or data.get("data") or []
            if not batch:
                break
            items.extend(batch)
            if len(batch) < page_size:
                break
        return items
    
    def lambda_handler(event, context):
        bucket_name = os.environ["S3_BUCKET"]
        api_key = os.environ["DS_API_KEY"]
        api_secret = os.environ["DS_API_SECRET"]
        prefix = os.environ.get("S3_PREFIX", "digital-shadows")
        state_key = os.environ.get("STATE_KEY", "digital-shadows/state.json")
        api_base = os.environ.get("API_BASE", "https://api.searchlight.app/v1")
        account_id = os.environ.get("DS_ACCOUNT_ID", "")
        page_size = int(os.environ.get("PAGE_SIZE", "100"))
        max_pages = int(os.environ.get("MAX_PAGES", "10"))
    
        try:
            last_ts = _load_state(bucket_name, state_key)
            logger.info(f"Checkpoint: {last_ts}")
    
            headers = {
                "Authorization": _basic_auth_header(api_key, api_secret),
                "Accept": "application/json",
                "User-Agent": "Chronicle-DigitalShadows-S3/1.0",
            }
    
            records = []
    
            incidents = _collect(
                api_base, headers, "incidents", last_ts, account_id,
                page_size, max_pages, time_param="published-after"
            )
            for incident in incidents:
                incident['_source_type'] = 'incident'
            records.extend(incidents)
    
            intel_incidents = _collect(
                api_base, headers, "intel-incidents", last_ts, account_id,
                page_size, max_pages, time_param="published-after"
            )
            for intel in intel_incidents:
                intel['_source_type'] = 'intelligence_incident'
            records.extend(intel_incidents)
    
            indicators = _collect(
                api_base, headers, "indicators", last_ts, account_id,
                page_size, max_pages, time_param="lastUpdated-after"
            )
            for indicator in indicators:
                indicator['_source_type'] = 'ioc'
            records.extend(indicators)
    
            if records:
                newest = max(
                    (r.get("updated") or r.get("raised") or r.get("lastUpdated") or last_ts)
                    for r in records
                )
                key = f"{prefix}/digital_shadows_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}.json"
                body = "\n".join(json.dumps(r, separators=(",", ":")) for r in records)
                storage_client.put_object(
                    Bucket=bucket_name,
                    Key=key,
                    Body=body,
                    ContentType="application/x-ndjson"
                )
                _save_state(bucket_name, state_key, newest)
                msg = f"Wrote {len(records)} records to s3://{bucket_name}/{key}"
            else:
                msg = "No new records"
    
            logger.info(msg)
            return {'statusCode': 200, 'body': json.dumps({'message': msg, 'records': len(records) if records else 0})}
    
        except Exception as e:
            logger.error(f"Error processing logs: {str(e)}")
            raise
    
  5. 依序前往「Configuration」>「Environment variables」>「Edit」>「Add new environment variable」

  6. 輸入下列環境變數,並將 換成您的值。

    環境變數

    範例值
    S3_BUCKET digital-shadows-logs
    S3_PREFIX digital-shadows/
    STATE_KEY digital-shadows/state.json
    DS_API_KEY ABC123 (您的 6 個字元 API 金鑰)
    DS_API_SECRET your-32-character-api-secret
    API_BASE https://api.searchlight.app/v1
    DS_ACCOUNT_ID your-account-id
    PAGE_SIZE 100
    MAX_PAGES 10
  7. 建立函式後,請留在函式頁面 (或依序開啟「Lambda」>「Functions」>「DigitalShadowsCollector」)。

  8. 選取「設定」分頁標籤。

  9. 在「一般設定」面板中,按一下「編輯」

  10. 將「逾時」變更為「5 分鐘 (300 秒)」,然後按一下「儲存」

建立 EventBridge 排程

  1. 依序前往「Amazon EventBridge」>「Scheduler」>「Create schedule」
  2. 請提供下列設定詳細資料:
    • 週期性時間表費率 (1 hour)
    • 目標:您的 Lambda 函式 DigitalShadowsCollector
    • Name (名稱):DigitalShadowsCollector-1h
  3. 按一下「建立時間表」

在 Google SecOps 中設定動態饋給,擷取 Digital Shadows Indicators 記錄

  1. 依序前往「SIEM 設定」>「動態饋給」
  2. 按一下「新增動態消息」
  3. 在下一個頁面中,按一下「設定單一動態饋給」
  4. 輸入動態饋給名稱的專屬名稱。
  5. 選取「Amazon S3 V2」做為「來源類型」
  6. 選取「數位足跡指標」做為「記錄類型」
  7. 依序點按「繼續」和「提交」
  8. 為下列欄位指定值:

    • S3 URIs3://digital-shadows-logs/digital-shadows/
    • 來源刪除選項:根據偏好設定選取刪除選項
    • 檔案存在時間上限:包含在過去天數內修改的檔案 (預設為 180 天)
    • 存取金鑰 ID:可存取 S3 儲存貯體的使用者存取金鑰
    • 存取密鑰:具有 S3 bucket 存取權的使用者私密金鑰
    • 資產命名空間資產命名空間
    • 擷取標籤:要套用至這個動態饋給事件的標籤
  9. 依序點按「繼續」和「提交」

UDM 對應表

記錄欄位 UDM 對應 邏輯
entity.entity.file.md5 如果 type ==「MD5」,則為必填
entity.entity.file.sha1 如果 type ==「SHA1」,則為必填
entity.entity.file.sha256 如果 type ==「SHA256」,則為必填
entity.entity.hostname 如果 type ==「HOST」,請設定
entity.entity.ip 如果 type ==「IP」,則直接複製值
entity.entity.url Set if type == "URL"
entity.entity.user.email_addresses 如果 type ==「EMAIL」,則直接複製值
類型 entity.metadata.entity_type 如果 type == "HOST",則設為「DOMAIN_NAME」;如果 type == "IP",則設為「IP_ADDRESS」;如果 type == "URL",則設為「URL」;如果 type == "EMAIL",則設為「USER」;如果 type 位於 ["SHA1","SHA256","MD5"] 中,則設為「FILE」;否則設為「UNKNOWN_ENTITYTYPE」
lastUpdated entity.metadata.interval.start_time 如果不是空白,則從 ISO8601 轉換為時間戳記
id entity.metadata.product_entity_id 如果值不是空白,則直接複製該值
attributionTag.id、attributionTag.name、attributionTag.type entity.metadata.threat.about.labels 如果不是空白,則與物件 {key: tag field name, value: tag value} 合併
sourceType entity.metadata.threat.category_details 直接複製值
entity.metadata.threat.threat_feed_name 設為「指標」
id entity.metadata.threat.threat_id 如果值不是空白,則直接複製該值
sourceIdentifier entity.metadata.threat.url_back_to_product 直接複製值
entity.metadata.product_name 設為「指標」
entity.metadata.vendor_name 設為「數位陰影」

還有其他問題嗎?向社群成員和 Google SecOps 專業人員尋求答案。