收集 Duo 实体上下文日志

支持的平台:

本文档介绍了如何使用 Google Cloud Storage 将 Duo 实体上下文数据提取到 Google Security Operations。解析器通过以下方式将 JSON 日志转换为统一数据模型 (UDM):首先从原始 JSON 中提取字段,然后将这些字段映射到 UDM 属性。它可处理各种数据场景,包括用户和资产信息、软件详细信息和安全标签,确保在 UDM 架构中全面呈现。

准备工作

确保您满足以下前提条件:

  • Google SecOps 实例
  • 对 Duo 租户的特权访问权限(具有足够管理权限来管理应用的 Admin API 应用)
  • 已启用 Cloud Storage API 的 GCP 项目
  • 创建和管理 GCS 存储分区的权限
  • 管理 GCS 存储分区的 IAM 政策的权限
  • 创建 Cloud Run 服务、Pub/Sub 主题和 Cloud Scheduler 作业的权限

配置 Duo Admin API 应用

  1. 登录 Duo 管理面板
  2. 前往应用 > 保护应用
  3. 搜索 Admin API,然后点击保护
  4. 记录以下值:
    • 集成密钥 (ikey)
    • 密钥 (skey)
    • API 主机名(例如 api-XXXXXXXX.duosecurity.com
  5. 权限中,启用授予资源 - 读取(用于读取用户、群组、电话、端点、令牌和 WebAuthn 凭据)。
  6. 点击保存

创建 Google Cloud Storage 存储分区

  1. 前往 Google Cloud 控制台
  2. 选择您的项目或创建新项目。
  3. 在导航菜单中,依次前往 Cloud Storage > 存储分区
  4. 点击创建存储分区
  5. 提供以下配置详细信息:

    设置
    为存储分区命名 输入一个全局唯一的名称(例如 duo-context
    位置类型 根据您的需求进行选择(区域级、双区域级、多区域级)
    位置 选择相应位置(例如 us-central1
    存储类别 标准(建议用于经常访问的日志)
    访问权限控制 统一(推荐)
    保护工具 可选:启用对象版本控制或保留政策
  6. 点击创建

  7. 保存相应存储分区名称和区域,以供日后参考。

为 Cloud Run 函数创建服务账号

Cloud Run 函数需要一个服务账号,该账号具有向 GCS 存储分区写入内容以及被 Pub/Sub 调用的权限。

创建服务账号

  1. GCP 控制台中,依次前往 IAM 和管理 > 服务账号
  2. 点击创建服务账号
  3. 提供以下配置详细信息:
    • 服务账号名称:输入 duo-entity-context-sa
    • 服务账号说明:输入 Service account for Cloud Run function to collect Duo entity context data
  4. 点击创建并继续
  5. 向此服务账号授予对项目的访问权限部分中,添加以下角色:
    1. 点击选择角色
    2. 搜索并选择 Storage Object Admin
    3. 点击 + 添加其他角色
    4. 搜索并选择 Cloud Run Invoker
    5. 点击 + 添加其他角色
    6. 搜索并选择 Cloud Functions Invoker
  6. 点击继续
  7. 点击完成

必须拥有这些角色,才能:

  • Storage Object Admin:将日志写入 GCS 存储分区
  • Cloud Run Invoker:允许 Pub/Sub 调用函数
  • Cloud Functions Invoker:允许调用函数

授予对 GCS 存储分区的 IAM 权限

向服务账号授予对 GCS 存储分区的写入权限:

  1. 前往 Cloud Storage > 存储分区
  2. 点击您的存储分区名称。
  3. 前往权限标签页。
  4. 点击授予访问权限
  5. 提供以下配置详细信息:
    • 添加主账号:输入服务账号电子邮件地址(例如 duo-entity-context-sa@PROJECT_ID.iam.gserviceaccount.com)。
    • 分配角色:选择 Storage Object Admin
  6. 点击保存

创建发布/订阅主题

创建一个 Pub/Sub 主题,供 Cloud Scheduler 发布消息,并供 Cloud Run 函数订阅。

  1. GCP 控制台中,前往 Pub/Sub > 主题
  2. 点击创建主题
  3. 提供以下配置详细信息:
    • 主题 ID:输入 duo-entity-context-trigger
    • 将其他设置保留为默认值。
  4. 点击创建

创建 Cloud Run 函数以收集实体上下文数据

Cloud Run 函数由来自 Cloud Scheduler 的 Pub/Sub 消息触发,用于从 Duo Admin API 中提取实体上下文数据并将其写入 GCS。

  1. GCP 控制台中,前往 Cloud Run
  2. 点击创建服务
  3. 选择函数(使用内嵌编辑器创建函数)。
  4. 配置部分中,提供以下配置详细信息:

    设置
    Service 名称 duo-entity-context-collector
    区域 选择与您的 GCS 存储分区匹配的区域(例如 us-central1
    运行时 选择 Python 3.12 或更高版本
  5. 触发器(可选)部分中:

    1. 点击 + 添加触发器
    2. 选择 Cloud Pub/Sub
    3. 选择 Cloud Pub/Sub 主题中,选择 Pub/Sub 主题 (duo-entity-context-trigger)。
    4. 点击保存
  6. 身份验证部分中:

    1. 选择需要进行身份验证
    2. 检查 Identity and Access Management (IAM)
  7. 向下滚动并展开容器、网络、安全性

  8. 前往安全标签页:

    • 服务账号:选择服务账号 (duo-entity-context-sa)。
  9. 前往容器标签页:

    1. 点击变量和密钥
    2. 为每个环境变量点击 + 添加变量
    变量名称 示例值
    GCS_BUCKET duo-context
    GCS_PREFIX duo/context/
    DUO_IKEY DIXYZ...
    DUO_SKEY ****************
    DUO_API_HOSTNAME api-XXXXXXXX.duosecurity.com
    LIMIT 100
    RESOURCES users,groups,phones,endpoints,tokens,webauthncredentials
  10. 变量和 Secret 部分中,滚动到请求

    • 请求超时:输入 600 秒(10 分钟)。
  11. 前往容器中的设置标签页:

    • 资源部分中:
      • 内存:选择 512 MiB 或更高值。
      • CPU:选择 1
    • 点击完成
  12. 滚动到执行环境

    • 选择默认(推荐)。
  13. 修订版本扩缩部分中:

    • 实例数下限:输入 0
    • 实例数上限:输入 100(或根据预期负载进行调整)。
  14. 点击创建

  15. 等待服务创建完成(1-2 分钟)。

  16. 创建服务后,系统会自动打开内嵌代码编辑器

添加函数代码

  1. 函数入口点中输入 main
  2. 在内嵌代码编辑器中,创建两个文件:

    • 第一个文件:main.py:
    import functions_framework
    from google.cloud import storage
    import json
    import os
    import time
    import hmac
    import hashlib
    import base64
    import email.utils
    import urllib.parse
    from urllib.request import Request, urlopen
    
    # Environment variables
    DUO_IKEY = os.environ["DUO_IKEY"]
    DUO_SKEY = os.environ["DUO_SKEY"]
    DUO_API_HOSTNAME = os.environ["DUO_API_HOSTNAME"].strip()
    GCS_BUCKET = os.environ["GCS_BUCKET"]
    GCS_PREFIX = os.environ.get("GCS_PREFIX", "duo/context/")
    
    # Default resources can be adjusted via ENV
    RESOURCES = [r.strip() for r in os.environ.get("RESOURCES", "users,groups,phones,endpoints,tokens,webauthncredentials,desktop_authenticators").split(",") if r.strip()]
    
    # Duo paging: default 100; max varies by endpoint
    LIMIT = int(os.environ.get("LIMIT", "100"))
    
    # Initialize Storage client
    storage_client = storage.Client()
    
    def _canon_params(params: dict) -> str:
        """RFC3986 encoding with '~' unescaped, keys sorted lexicographically."""
        if not params:
            return ""
        parts = []
        for k in sorted(params.keys()):
            v = params[k]
            if v is None:
                continue
            ks = urllib.parse.quote(str(k), safe="~")
            vs = urllib.parse.quote(str(v), safe="~")
            parts.append(f"{ks}={vs}")
        return "&".join(parts)
    
    def _sign(method: str, host: str, path: str, params: dict) -> dict:
        """Construct Duo Admin API Authorization + Date headers (HMAC-SHA1)."""
        now = email.utils.formatdate()
        canon = "\n".join([
            now,
            method.upper(),
            host.lower(),
            path,
            _canon_params(params)
        ])
        sig = hmac.new(
            DUO_SKEY.encode("utf-8"),
            canon.encode("utf-8"),
            hashlib.sha1
        ).hexdigest()
        auth = base64.b64encode(f"{DUO_IKEY}:{sig}".encode("utf-8")).decode("utf-8")
        return {
            "Date": now,
            "Authorization": f"Basic {auth}"
        }
    
    def _call(method: str, path: str, params: dict) -> dict:
        host = DUO_API_HOSTNAME
        assert host.startswith("api-") and host.endswith(".duosecurity.com"), \
            "DUO_API_HOSTNAME must be e.g. api-XXXXXXXX.duosecurity.com"
    
        qs = _canon_params(params)
        url = f"https://{host}{path}" + (f"?{qs}" if method.upper() == "GET" and qs else "")
    
        req = Request(url, method=method.upper())
        for k, v in _sign(method, host, path, params).items():
            req.add_header(k, v)
    
        with urlopen(req, timeout=60) as r:
            return json.loads(r.read().decode("utf-8"))
    
    def _write_json(obj: dict, when: float, resource: str, page: int) -> str:
        bucket = storage_client.bucket(GCS_BUCKET)
        prefix = GCS_PREFIX.strip("/") + "/" if GCS_PREFIX else ""
        key = f"{prefix}{time.strftime('%Y/%m/%d', time.gmtime(when))}/duo-{resource}-{page:05d}.json"
    
        blob = bucket.blob(key)
        blob.upload_from_string(
            json.dumps(obj, separators=(",", ":")),
            content_type="application/json"
        )
        return key
    
    def _fetch_resource(resource: str) -> dict:
        """Fetch all pages for a list endpoint using limit/offset + metadata.next_offset."""
        path = f"/admin/v1/{resource}"
        offset = 0
        page = 0
        now = time.time()
        total_items = 0
    
        while True:
            params = {"limit": LIMIT, "offset": offset}
            data = _call("GET", path, params)
            _write_json(data, now, resource, page)
            page += 1
    
            resp = data.get("response")
            # most endpoints return a list; if not a list, count as 1 object page
            if isinstance(resp, list):
                total_items += len(resp)
            elif resp is not None:
                total_items += 1
    
            meta = data.get("metadata") or {}
            next_offset = meta.get("next_offset")
            if next_offset is None:
                break
    
            # Duo returns next_offset as int
            try:
                offset = int(next_offset)
            except Exception:
                break
    
        return {
            "resource": resource,
            "pages": page,
            "objects": total_items
        }
    
    @functions_framework.cloud_event
    def main(cloud_event):
        """
        Cloud Run function triggered by Pub/Sub to fetch Duo entity context data and write to GCS.
    
        Args:
            cloud_event: CloudEvent object containing Pub/Sub message
        """
        results = []
        for res in RESOURCES:
            print(f"Fetching resource: {res}")
            result = _fetch_resource(res)
            results.append(result)
            print(f"Completed {res}: {result['pages']} pages, {result['objects']} objects")
    
        print(f"All resources fetched successfully: {results}")
    
    • 第二个文件:requirements.txt:
    functions-framework==3.*
    google-cloud-storage==2.*
    
  3. 点击部署以保存并部署该函数。

  4. 等待部署完成(2-3 分钟)。

创建 Cloud Scheduler 作业

Cloud Scheduler 会定期向 Pub/Sub 主题发布消息,从而触发 Cloud Run 函数。

  1. GCP Console 中,前往 Cloud Scheduler
  2. 点击创建作业
  3. 提供以下配置详细信息:

    设置
    名称 duo-entity-context-hourly
    区域 选择与 Cloud Run 函数相同的区域
    频率 0 * * * *(每小时一次,在整点时)
    时区 选择时区(建议选择世界协调时间 [UTC])
    目标类型 Pub/Sub
    主题 选择 Pub/Sub 主题 (duo-entity-context-trigger)
    消息正文 {}(空 JSON 对象)
  4. 点击创建

时间表频率选项

  • 根据数据新鲜度要求选择频次:

    频率 Cron 表达式 使用场景
    每小时 0 * * * * 标准(推荐)
    每 2 小时 0 */2 * * * 新鲜度一般
    每 6 小时 0 */6 * * * 低频更新
    每天 0 0 * * * 最少的更新

测试调度器作业

  1. Cloud Scheduler 控制台中,找到您的作业 (duo-entity-context-hourly)。
  2. 点击强制运行以手动触发。
  3. 等待几秒钟,然后前往 Cloud Run > 服务 > duo-entity-context-collector > 日志
  4. 验证函数是否已成功执行。
  5. 检查 GCS 存储分区,确认实体情境数据已写入。

检索 Google SecOps 服务账号

Google SecOps 使用唯一的服务账号从您的 GCS 存储分区中读取数据。您必须授予此服务账号对您的存储分区的访问权限。

获取服务账号电子邮件地址

  1. 依次前往 SIEM 设置 > Feed
  2. 点击添加新 Feed
  3. 点击配置单个 Feed
  4. Feed 名称字段中,输入 Feed 的名称(例如 Duo Entity Context)。
  5. 选择 Google Cloud Storage V2 作为来源类型
  6. 选择 Duo Entity context data 作为日志类型
  7. 点击获取服务账号。系统会显示一个唯一的服务账号电子邮件地址,例如:

    chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com
    
  8. 复制此电子邮件地址,以便在下一步中使用。

向 Google SecOps 服务账号授予 IAM 权限

Google SecOps 服务账号需要对您的 GCS 存储分区具有 Storage Object Viewer 角色。

  1. 前往 Cloud Storage > 存储分区
  2. 点击您的存储分区名称。
  3. 前往权限标签页。
  4. 点击授予访问权限
  5. 提供以下配置详细信息:
    • 添加主账号:粘贴 Google SecOps 服务账号电子邮件地址。
    • 分配角色:选择 Storage Object Viewer
  6. 点击保存

在 Google SecOps 中配置 Feed 以注入 Duo Entity Context 数据

  1. 依次前往 SIEM 设置 > Feed
  2. 点击添加新 Feed
  3. 点击配置单个 Feed
  4. Feed 名称字段中,输入 Feed 的名称(例如 Duo Entity Context)。
  5. 选择 Google Cloud Storage V2 作为来源类型
  6. 选择 Duo Entity context data 作为日志类型
  7. 点击下一步
  8. 为以下输入参数指定值:

    • 存储分区网址:输入带有前缀路径的 GCS 存储分区 URI:

      gs://duo-context/duo/context/
      
        • duo-context:您的 GCS 存储分区名称。
        • duo/context/:存储日志的前缀/文件夹路径(必须与 GCS_PREFIX 环境变量一致)。
    • 来源删除选项:根据您的偏好选择删除选项:

      • 永不:永不删除转移后的任何文件(建议用于测试)。
      • 删除已转移的文件:在成功转移后删除文件。
      • 删除已转移的文件和空目录:在成功转移后删除文件和空目录。

    • 文件存在时间上限:包含在过去指定天数内修改的文件。默认值为 180 天。

    • 资产命名空间资产命名空间

    • 注入标签:要应用于此 Feed 中事件的标签。

  9. 点击下一步

  10. 最终确定界面中查看新的 Feed 配置,然后点击提交

UDM 映射表

日志字段 UDM 映射 逻辑
已启用 entity.asset.deployment_status 如果“activated”为 false,则设置为“DECOMISSIONED”,否则设置为“ACTIVE”。
browsers.browser_family entity.asset.software.name 从原始日志的“browsers”数组中提取。
browsers.browser_version entity.asset.software.version 从原始日志的“browsers”数组中提取。
device_name entity.asset.hostname 直接从原始日志映射。
disk_encryption_status entity.asset.attribute.labels.key: "disk_encryption_status", entity.asset.attribute.labels.value 直接从原始日志映射,转换为小写。
电子邮件 entity.user.email_addresses 如果原始日志包含“@”,则直接从原始日志映射;否则,如果“username”或“username1”包含“@”,则使用“username”或“username1”。
已加密 entity.asset.attribute.labels.key: "Encrypted", entity.asset.attribute.labels.value 直接从原始日志映射,转换为小写。
epkey entity.asset.product_object_id 如果存在,则用作“product_object_id”,否则使用“phone_id”或“token_id”。
指纹 entity.asset.attribute.labels.key:“Finger Print”,entity.asset.attribute.labels.value 直接从原始日志映射,转换为小写。
firewall_status entity.asset.attribute.labels.key: "firewall_status", entity.asset.attribute.labels.value 直接从原始日志映射,转换为小写。
hardware_uuid entity.asset.asset_id 如果存在,则用作“asset_id”,否则使用“user_id”。
last_seen entity.asset.last_discover_time 解析为 ISO8601 时间戳并进行映射。
模型 entity.asset.hardware.model 直接从原始日志映射。
数值 entity.user.phone_numbers 直接从原始日志映射。
os_family entity.asset.platform_software.platform 根据值(不区分大小写)映射到“WINDOWS”“LINUX”或“MAC”。
os_version entity.asset.platform_software.platform_version 直接从原始日志映射。
password_status entity.asset.attribute.labels.key: "password_status", entity.asset.attribute.labels.value 直接从原始日志映射,转换为小写。
phone_id entity.asset.product_object_id 如果不存在“epkey”,则用作“product_object_id”,否则使用“token_id”。
security_agents.security_agent entity.asset.software.name 从原始日志的“security_agents”数组中提取。
security_agents.version entity.asset.software.version 从原始日志的“security_agents”数组中提取。
时间戳 entity.metadata.collected_timestamp 填充“metadata”对象中的“collected_timestamp”字段。
token_id entity.asset.product_object_id 如果不存在“epkey”和“phone_id”,则用作“product_object_id”。
trusted_endpoint entity.asset.attribute.labels.key: "trusted_endpoint", entity.asset.attribute.labels.value 直接从原始日志映射,转换为小写。
类型 entity.asset.type 如果原始日志的“type”包含“mobile”(不区分大小写),则设置为“MOBILE”,否则设置为“LAPTOP”。
user_id entity.asset.asset_id 如果不存在“hardware_uuid”,则用作“asset_id”。
users.email entity.user.email_addresses 如果它是“users”数组中的第一个用户,并且包含“@”,则用作“email_addresses”。
users.username entity.user.userid 提取“@”之前的用户名,如果这是“users”数组中的第一个用户,则将其用作“userid”。
entity.metadata.vendor_name “Duo”
entity.metadata.product_name “Duo 实体上下文数据”
entity.metadata.entity_type ASSET
entity.relations.entity_type 用户
entity.relations.relationship OWNS

需要更多帮助?获得社区成员和 Google SecOps 专业人士的解答。