收集 Digital Shadows Indicators 日志

支持的平台:

本文档介绍了如何使用 Amazon S3 将 Digital Shadows 指标日志注入到 Google Security Operations 中。

Digital Shadows Indicators(现已成为 ReliaQuest GreyMatter DRP 的一部分)是一个数字风险保护平台,可跨开放网络、深网、暗网和社交媒体持续监控和检测外部威胁、数据暴露和品牌冒充行为。它提供威胁情报、突发事件提醒和失陷指标 (IOC),帮助组织识别和缓解数字风险。

准备工作

  • Google SecOps 实例
  • Digital Shadows Indicators 门户网站的特权访问权限
  • AWS (S3、IAM) 的特权访问权限
  • 有效订阅了 Digital Shadows Indicators 并启用了 API 访问权限

收集 Digital Shadows 指标 API 凭据

  1. 前往 https://portal-digitalshadows.com,登录 Digital Shadows Indicators 门户
  2. 依次前往设置 > API 凭据
  3. 如果您没有现有的 API 密钥,请点击创建新的 API 客户端生成 API 密钥
  4. 复制以下详细信息并将其保存在安全的位置:

    • API 密钥:您的 6 字符 API 密钥
    • API Secret:您的 32 字符 API 密钥
    • 账号 ID:您的账号 ID(显示在门户中或由 Digital Shadows 代表提供)
    • API 基础网址https://api.searchlight.app/v1https://portal-digitalshadows.com/api/v1(取决于您的租户区域)

为 Google SecOps 配置 AWS S3 存储桶和 IAM

  1. 按照以下用户指南创建 Amazon S3 存储桶创建存储桶
  2. 保存存储桶名称区域以供日后参考(例如 digital-shadows-logs)。
  3. 按照以下用户指南创建用户创建 IAM 用户
  4. 选择创建的用户
  5. 选择安全凭据标签页。
  6. 访问密钥部分中,点击创建访问密钥
  7. 选择第三方服务作为使用情形
  8. 点击下一步
  9. 可选:添加说明标记。
  10. 点击创建访问密钥
  11. 点击下载 .csv 文件以保存访问密钥密钥,供日后参考。
  12. 点击完成
  13. 选择权限标签页。
  14. 权限政策部分中,点击添加权限
  15. 选择添加权限
  16. 选择直接附加政策
  17. 搜索 AmazonS3FullAccess 政策。
  18. 选择相应政策。
  19. 点击下一步
  20. 点击添加权限

为 S3 上传配置 IAM 政策和角色

  1. 在 AWS 控制台中,依次前往 IAM > 政策 > 创建政策 > JSON 标签页。
  2. 复制并粘贴下方的政策。
  3. 政策 JSON(如果您输入了其他存储桶名称,请替换 digital-shadows-logs):

    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "AllowPutObjects",
                "Effect": "Allow",
                "Action": "s3:PutObject",
                "Resource": "arn:aws:s3:::digital-shadows-logs/*"
            },
            {
                "Sid": "AllowGetStateObject",
                "Effect": "Allow",
                "Action": "s3:GetObject",
                "Resource": "arn:aws:s3:::digital-shadows-logs/digital-shadows/state.json"
            }
        ]
    }
    
  4. 依次点击下一步 > 创建政策

  5. 依次前往 IAM > 角色 > 创建角色 > AWS 服务 > Lambda

  6. 附加新创建的政策。

  7. 将角色命名为 DigitalShadowsLambdaRole,然后点击创建角色

创建 Lambda 函数

  1. AWS 控制台中,依次前往 Lambda > 函数 > 创建函数
  2. 点击从头开始创作
  3. 提供以下配置详细信息:

    设置
    名称 DigitalShadowsCollector
    运行时 Python 3.13
    架构 x86_64
    执行角色 DigitalShadowsLambdaRole
  4. 创建函数后,打开 Code 标签页,删除桩代码,然后粘贴以下代码 (DigitalShadowsCollector.py)。

    import urllib3
    import json
    import boto3
    import os
    import base64
    import logging
    import time
    from datetime import datetime, timedelta, timezone
    from urllib.parse import urlencode
    
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    
    HTTP = urllib3.PoolManager(retries=False)
    storage_client = boto3.client('s3')
    
    def _basic_auth_header(key: str, secret: str) -> str:
        token = base64.b64encode(f"{key}:{secret}".encode("utf-8")).decode("utf-8")
        return f"Basic {token}"
    
    def _load_state(bucket, key, default_days=30) -> str:
        """Return ISO8601 checkpoint (UTC)."""
        try:
            response = storage_client.get_object(Bucket=bucket, Key=key)
            state_data = response['Body'].read().decode('utf-8')
            state = json.loads(state_data)
            ts = state.get("last_timestamp")
            if ts:
                return ts
        except storage_client.exceptions.NoSuchKey:
            logger.info("No previous state found, starting from default lookback")
        except Exception as e:
            logger.warning(f"State read error: {e}")
        return (datetime.now(timezone.utc) - timedelta(days=default_days)).isoformat()
    
    def _save_state(bucket, key, ts: str) -> None:
        storage_client.put_object(
            Bucket=bucket,
            Key=key,
            Body=json.dumps({"last_timestamp": ts}),
            ContentType="application/json"
        )
    
    def _get_json(url: str, headers: dict, params: dict, backoff_s=2, max_retries=3) -> dict:
        qs = f"?{urlencode(params)}" if params else ""
        for attempt in range(max_retries):
            r = HTTP.request("GET", f"{url}{qs}", headers=headers)
            if r.status == 200:
                return json.loads(r.data.decode("utf-8"))
            if r.status in (429, 500, 502, 503, 504):
                wait = backoff_s * (2 ** attempt)
                logger.warning(f"HTTP {r.status} from DS API, retrying in {wait}s")
                time.sleep(wait)
                continue
            raise RuntimeError(f"DS API error {r.status}: {r.data[:200]}")
        raise RuntimeError("Exceeded retry budget for DS API")
    
    def _collect(api_base, headers, path, since_ts, account_id, page_size, max_pages, time_param):
        items = []
        for page in range(max_pages):
            params = {
                "limit": page_size,
                "offset": page * page_size,
                time_param: since_ts,
            }
            if account_id:
                params["account-id"] = account_id
            data = _get_json(f"{api_base}/{path}", headers, params)
            batch = data.get("items") or data.get("data") or []
            if not batch:
                break
            items.extend(batch)
            if len(batch) < page_size:
                break
        return items
    
    def lambda_handler(event, context):
        bucket_name = os.environ["S3_BUCKET"]
        api_key = os.environ["DS_API_KEY"]
        api_secret = os.environ["DS_API_SECRET"]
        prefix = os.environ.get("S3_PREFIX", "digital-shadows")
        state_key = os.environ.get("STATE_KEY", "digital-shadows/state.json")
        api_base = os.environ.get("API_BASE", "https://api.searchlight.app/v1")
        account_id = os.environ.get("DS_ACCOUNT_ID", "")
        page_size = int(os.environ.get("PAGE_SIZE", "100"))
        max_pages = int(os.environ.get("MAX_PAGES", "10"))
    
        try:
            last_ts = _load_state(bucket_name, state_key)
            logger.info(f"Checkpoint: {last_ts}")
    
            headers = {
                "Authorization": _basic_auth_header(api_key, api_secret),
                "Accept": "application/json",
                "User-Agent": "Chronicle-DigitalShadows-S3/1.0",
            }
    
            records = []
    
            incidents = _collect(
                api_base, headers, "incidents", last_ts, account_id,
                page_size, max_pages, time_param="published-after"
            )
            for incident in incidents:
                incident['_source_type'] = 'incident'
            records.extend(incidents)
    
            intel_incidents = _collect(
                api_base, headers, "intel-incidents", last_ts, account_id,
                page_size, max_pages, time_param="published-after"
            )
            for intel in intel_incidents:
                intel['_source_type'] = 'intelligence_incident'
            records.extend(intel_incidents)
    
            indicators = _collect(
                api_base, headers, "indicators", last_ts, account_id,
                page_size, max_pages, time_param="lastUpdated-after"
            )
            for indicator in indicators:
                indicator['_source_type'] = 'ioc'
            records.extend(indicators)
    
            if records:
                newest = max(
                    (r.get("updated") or r.get("raised") or r.get("lastUpdated") or last_ts)
                    for r in records
                )
                key = f"{prefix}/digital_shadows_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}.json"
                body = "\n".join(json.dumps(r, separators=(",", ":")) for r in records)
                storage_client.put_object(
                    Bucket=bucket_name,
                    Key=key,
                    Body=body,
                    ContentType="application/x-ndjson"
                )
                _save_state(bucket_name, state_key, newest)
                msg = f"Wrote {len(records)} records to s3://{bucket_name}/{key}"
            else:
                msg = "No new records"
    
            logger.info(msg)
            return {'statusCode': 200, 'body': json.dumps({'message': msg, 'records': len(records) if records else 0})}
    
        except Exception as e:
            logger.error(f"Error processing logs: {str(e)}")
            raise
    
  5. 依次前往配置 > 环境变量 > 修改 > 添加新的环境变量

  6. 输入以下环境变量,并将其替换为您的值。

    环境变量

    示例值
    S3_BUCKET digital-shadows-logs
    S3_PREFIX digital-shadows/
    STATE_KEY digital-shadows/state.json
    DS_API_KEY ABC123(您的 6 字符 API 密钥)
    DS_API_SECRET your-32-character-api-secret
    API_BASE https://api.searchlight.app/v1
    DS_ACCOUNT_ID your-account-id
    PAGE_SIZE 100
    MAX_PAGES 10
  7. 创建函数后,请停留在其页面上(或依次打开 Lambda > 函数 > DigitalShadowsCollector)。

  8. 选择配置标签页。

  9. 常规配置面板中,点击修改

  10. 超时更改为 5 分钟(300 秒),然后点击保存

创建 EventBridge 计划

  1. 依次前往 Amazon EventBridge > 调度器 > 创建调度
  2. 提供以下配置详细信息:
    • 周期性安排费率 (1 hour)
    • 目标:您的 Lambda 函数 DigitalShadowsCollector
    • 名称DigitalShadowsCollector-1h
  3. 点击创建时间表

在 Google SecOps 中配置 Feed 以注入 Digital Shadows Indicators 日志

  1. 依次前往 SIEM 设置> Feed
  2. 点击添加新 Feed
  3. 在下一页上,点击配置单个 Feed
  4. Feed 名称输入一个唯一名称。
  5. 选择 Amazon S3 V2 作为来源类型
  6. 选择数字阴影指标作为日志类型
  7. 点击下一步,然后点击提交
  8. 为以下字段指定值:

    • S3 URIs3://digital-shadows-logs/digital-shadows/
    • 来源删除选项:根据您的偏好选择删除选项
    • 文件存在时间上限:包含在过去指定天数内修改的文件(默认值为 180 天)
    • 访问密钥 ID:有权访问 S3 存储桶的用户访问密钥
    • 私有访问密钥:具有 S3 存储桶访问权限的用户私有密钥
    • 资产命名空间资产命名空间
    • 注入标签:要应用于此 Feed 中事件的标签
  9. 点击下一步,然后点击提交

UDM 映射表

日志字段 UDM 映射 逻辑
entity.entity.file.md5 如果 type == "MD5",则需要设置此属性
entity.entity.file.sha1 如果类型为“SHA1”,则进行设置
entity.entity.file.sha256 如果类型为“SHA256”,则需要设置此属性
entity.entity.hostname 如果 type == "HOST",则设置此字段
entity.entity.ip 如果类型为“IP”,则直接复制值
entity.entity.url 如果类型为“网址”,则进行设置
entity.entity.user.email_addresses 如果类型为“EMAIL”,则直接复制值
类型 entity.metadata.entity_type 如果 type == "HOST",则设置为“DOMAIN_NAME”;如果 type == "IP",则设置为“IP_ADDRESS”;如果 type == "网址",则设置为“网址”;如果 type == "EMAIL",则设置为“USER”;如果 type 属于 ["SHA1","SHA256","MD5"],则设置为“FILE”;否则设置为“UNKNOWN_ENTITYTYPE”
lastUpdated entity.metadata.interval.start_time 如果非空,则从 ISO8601 转换为时间戳
id entity.metadata.product_entity_id 如果不为空,则直接复制值
attributionTag.id、attributionTag.name、attributionTag.type entity.metadata.threat.about.labels 如果非空,则与对象 {key: tag field name, value: tag value} 合并
sourceType entity.metadata.threat.category_details 直接复制值
entity.metadata.threat.threat_feed_name 设置为“指示器”
id entity.metadata.threat.threat_id 如果不为空,则直接复制值
sourceIdentifier entity.metadata.threat.url_back_to_product 直接复制值
entity.metadata.product_name 设置为“指示器”
entity.metadata.vendor_name 设置为“数字足迹”

需要更多帮助?获得社区成员和 Google SecOps 专业人士的解答。