收集 Duo 实体上下文日志
本文档介绍了如何使用 Google Cloud Storage 将 Duo 实体上下文数据提取到 Google Security Operations。解析器通过以下方式将 JSON 日志转换为统一数据模型 (UDM):首先从原始 JSON 中提取字段,然后将这些字段映射到 UDM 属性。它可处理各种数据场景,包括用户和资产信息、软件详细信息和安全标签,确保在 UDM 架构中全面呈现。
准备工作
确保您满足以下前提条件:
- Google SecOps 实例
- 对 Duo 租户的特权访问权限(具有足够管理权限来管理应用的 Admin API 应用)
- 已启用 Cloud Storage API 的 GCP 项目
- 创建和管理 GCS 存储分区的权限
- 管理 GCS 存储分区的 IAM 政策的权限
- 创建 Cloud Run 服务、Pub/Sub 主题和 Cloud Scheduler 作业的权限
配置 Duo Admin API 应用
- 登录 Duo 管理面板。
- 前往应用 > 保护应用。
- 搜索 Admin API,然后点击保护。
- 记录以下值:
- 集成密钥 (ikey)
- 密钥 (skey)
- API 主机名(例如
api-XXXXXXXX.duosecurity.com)
- 在权限中,启用授予资源 - 读取(用于读取用户、群组、电话、端点、令牌和 WebAuthn 凭据)。
点击保存。
创建 Google Cloud Storage 存储分区
- 前往 Google Cloud 控制台。
- 选择您的项目或创建新项目。
- 在导航菜单中,依次前往 Cloud Storage > 存储分区。
- 点击创建存储分区。
提供以下配置详细信息:
设置 值 为存储分区命名 输入一个全局唯一的名称(例如 duo-context)位置类型 根据您的需求进行选择(区域级、双区域级、多区域级) 位置 选择相应位置(例如 us-central1)存储类别 标准(建议用于经常访问的日志) 访问权限控制 统一(推荐) 保护工具 可选:启用对象版本控制或保留政策 点击创建。
保存相应存储分区名称和区域,以供日后参考。
为 Cloud Run 函数创建服务账号
Cloud Run 函数需要一个服务账号,该账号具有向 GCS 存储分区写入内容以及被 Pub/Sub 调用的权限。
创建服务账号
- 在 GCP 控制台中,依次前往 IAM 和管理 > 服务账号。
- 点击创建服务账号。
- 提供以下配置详细信息:
- 服务账号名称:输入
duo-entity-context-sa。 - 服务账号说明:输入
Service account for Cloud Run function to collect Duo entity context data。
- 服务账号名称:输入
- 点击创建并继续。
- 在向此服务账号授予对项目的访问权限部分中,添加以下角色:
- 点击选择角色。
- 搜索并选择 Storage Object Admin。
- 点击 + 添加其他角色。
- 搜索并选择 Cloud Run Invoker。
- 点击 + 添加其他角色。
- 搜索并选择 Cloud Functions Invoker。
- 点击继续。
- 点击完成。
必须拥有这些角色,才能:
- Storage Object Admin:将日志写入 GCS 存储分区
- Cloud Run Invoker:允许 Pub/Sub 调用函数
- Cloud Functions Invoker:允许调用函数
授予对 GCS 存储分区的 IAM 权限
向服务账号授予对 GCS 存储分区的写入权限:
- 前往 Cloud Storage > 存储分区。
- 点击您的存储分区名称。
- 前往权限标签页。
- 点击授予访问权限。
- 提供以下配置详细信息:
- 添加主账号:输入服务账号电子邮件地址(例如
duo-entity-context-sa@PROJECT_ID.iam.gserviceaccount.com)。 - 分配角色:选择 Storage Object Admin。
- 添加主账号:输入服务账号电子邮件地址(例如
- 点击保存。
创建发布/订阅主题
创建一个 Pub/Sub 主题,供 Cloud Scheduler 发布消息,并供 Cloud Run 函数订阅。
- 在 GCP 控制台中,前往 Pub/Sub > 主题。
- 点击创建主题。
- 提供以下配置详细信息:
- 主题 ID:输入
duo-entity-context-trigger。 - 将其他设置保留为默认值。
- 主题 ID:输入
- 点击创建。
创建 Cloud Run 函数以收集实体上下文数据
Cloud Run 函数由来自 Cloud Scheduler 的 Pub/Sub 消息触发,用于从 Duo Admin API 中提取实体上下文数据并将其写入 GCS。
- 在 GCP 控制台中,前往 Cloud Run。
- 点击创建服务。
- 选择函数(使用内嵌编辑器创建函数)。
在配置部分中,提供以下配置详细信息:
设置 值 Service 名称 duo-entity-context-collector区域 选择与您的 GCS 存储分区匹配的区域(例如 us-central1)运行时 选择 Python 3.12 或更高版本 在触发器(可选)部分中:
- 点击 + 添加触发器。
- 选择 Cloud Pub/Sub。
- 在选择 Cloud Pub/Sub 主题中,选择 Pub/Sub 主题 (
duo-entity-context-trigger)。 - 点击保存。
在身份验证部分中:
- 选择需要进行身份验证。
- 检查 Identity and Access Management (IAM)。
向下滚动并展开容器、网络、安全性。
前往安全标签页:
- 服务账号:选择服务账号 (
duo-entity-context-sa)。
- 服务账号:选择服务账号 (
前往容器标签页:
- 点击变量和密钥。
- 为每个环境变量点击 + 添加变量:
变量名称 示例值 GCS_BUCKETduo-contextGCS_PREFIXduo/context/DUO_IKEYDIXYZ...DUO_SKEY****************DUO_API_HOSTNAMEapi-XXXXXXXX.duosecurity.comLIMIT100RESOURCESusers,groups,phones,endpoints,tokens,webauthncredentials在变量和 Secret 部分中,滚动到请求:
- 请求超时:输入
600秒(10 分钟)。
- 请求超时:输入
前往容器中的设置标签页:
- 在资源部分中:
- 内存:选择 512 MiB 或更高值。
- CPU:选择 1。
- 点击完成。
- 在资源部分中:
滚动到执行环境:
- 选择默认(推荐)。
在修订版本扩缩部分中:
- 实例数下限:输入
0。 - 实例数上限:输入
100(或根据预期负载进行调整)。
- 实例数下限:输入
点击创建。
等待服务创建完成(1-2 分钟)。
创建服务后,系统会自动打开内嵌代码编辑器。
添加函数代码
- 在函数入口点中输入 main
在内嵌代码编辑器中,创建两个文件:
- 第一个文件:main.py::
import functions_framework from google.cloud import storage import json import os import time import hmac import hashlib import base64 import email.utils import urllib.parse from urllib.request import Request, urlopen # Environment variables DUO_IKEY = os.environ["DUO_IKEY"] DUO_SKEY = os.environ["DUO_SKEY"] DUO_API_HOSTNAME = os.environ["DUO_API_HOSTNAME"].strip() GCS_BUCKET = os.environ["GCS_BUCKET"] GCS_PREFIX = os.environ.get("GCS_PREFIX", "duo/context/") # Default resources can be adjusted via ENV RESOURCES = [r.strip() for r in os.environ.get("RESOURCES", "users,groups,phones,endpoints,tokens,webauthncredentials,desktop_authenticators").split(",") if r.strip()] # Duo paging: default 100; max varies by endpoint LIMIT = int(os.environ.get("LIMIT", "100")) # Initialize Storage client storage_client = storage.Client() def _canon_params(params: dict) -> str: """RFC3986 encoding with '~' unescaped, keys sorted lexicographically.""" if not params: return "" parts = [] for k in sorted(params.keys()): v = params[k] if v is None: continue ks = urllib.parse.quote(str(k), safe="~") vs = urllib.parse.quote(str(v), safe="~") parts.append(f"{ks}={vs}") return "&".join(parts) def _sign(method: str, host: str, path: str, params: dict) -> dict: """Construct Duo Admin API Authorization + Date headers (HMAC-SHA1).""" now = email.utils.formatdate() canon = "\n".join([ now, method.upper(), host.lower(), path, _canon_params(params) ]) sig = hmac.new( DUO_SKEY.encode("utf-8"), canon.encode("utf-8"), hashlib.sha1 ).hexdigest() auth = base64.b64encode(f"{DUO_IKEY}:{sig}".encode("utf-8")).decode("utf-8") return { "Date": now, "Authorization": f"Basic {auth}" } def _call(method: str, path: str, params: dict) -> dict: host = DUO_API_HOSTNAME assert host.startswith("api-") and host.endswith(".duosecurity.com"), \ "DUO_API_HOSTNAME must be e.g. api-XXXXXXXX.duosecurity.com" qs = _canon_params(params) url = f"https://{host}{path}" + (f"?{qs}" if method.upper() == "GET" and qs else "") req = Request(url, method=method.upper()) for k, v in _sign(method, host, path, params).items(): req.add_header(k, v) with urlopen(req, timeout=60) as r: return json.loads(r.read().decode("utf-8")) def _write_json(obj: dict, when: float, resource: str, page: int) -> str: bucket = storage_client.bucket(GCS_BUCKET) prefix = GCS_PREFIX.strip("/") + "/" if GCS_PREFIX else "" key = f"{prefix}{time.strftime('%Y/%m/%d', time.gmtime(when))}/duo-{resource}-{page:05d}.json" blob = bucket.blob(key) blob.upload_from_string( json.dumps(obj, separators=(",", ":")), content_type="application/json" ) return key def _fetch_resource(resource: str) -> dict: """Fetch all pages for a list endpoint using limit/offset + metadata.next_offset.""" path = f"/admin/v1/{resource}" offset = 0 page = 0 now = time.time() total_items = 0 while True: params = {"limit": LIMIT, "offset": offset} data = _call("GET", path, params) _write_json(data, now, resource, page) page += 1 resp = data.get("response") # most endpoints return a list; if not a list, count as 1 object page if isinstance(resp, list): total_items += len(resp) elif resp is not None: total_items += 1 meta = data.get("metadata") or {} next_offset = meta.get("next_offset") if next_offset is None: break # Duo returns next_offset as int try: offset = int(next_offset) except Exception: break return { "resource": resource, "pages": page, "objects": total_items } @functions_framework.cloud_event def main(cloud_event): """ Cloud Run function triggered by Pub/Sub to fetch Duo entity context data and write to GCS. Args: cloud_event: CloudEvent object containing Pub/Sub message """ results = [] for res in RESOURCES: print(f"Fetching resource: {res}") result = _fetch_resource(res) results.append(result) print(f"Completed {res}: {result['pages']} pages, {result['objects']} objects") print(f"All resources fetched successfully: {results}")- 第二个文件:requirements.txt::
functions-framework==3.* google-cloud-storage==2.*点击部署以保存并部署该函数。
等待部署完成(2-3 分钟)。
创建 Cloud Scheduler 作业
Cloud Scheduler 会定期向 Pub/Sub 主题发布消息,从而触发 Cloud Run 函数。
- 在 GCP Console 中,前往 Cloud Scheduler。
- 点击创建作业。
提供以下配置详细信息:
设置 值 名称 duo-entity-context-hourly区域 选择与 Cloud Run 函数相同的区域 频率 0 * * * *(每小时一次,在整点时)时区 选择时区(建议选择世界协调时间 [UTC]) 目标类型 Pub/Sub 主题 选择 Pub/Sub 主题 ( duo-entity-context-trigger)消息正文 {}(空 JSON 对象)点击创建。
时间表频率选项
根据数据新鲜度要求选择频次:
频率 Cron 表达式 使用场景 每小时 0 * * * *标准(推荐) 每 2 小时 0 */2 * * *新鲜度一般 每 6 小时 0 */6 * * *低频更新 每天 0 0 * * *最少的更新
测试调度器作业
- 在 Cloud Scheduler 控制台中,找到您的作业 (
duo-entity-context-hourly)。 - 点击强制运行以手动触发。
- 等待几秒钟,然后前往 Cloud Run > 服务 > duo-entity-context-collector > 日志。
- 验证函数是否已成功执行。
- 检查 GCS 存储分区,确认实体情境数据已写入。
检索 Google SecOps 服务账号
Google SecOps 使用唯一的服务账号从您的 GCS 存储分区中读取数据。您必须授予此服务账号对您的存储分区的访问权限。
获取服务账号电子邮件地址
- 依次前往 SIEM 设置 > Feed。
- 点击添加新 Feed。
- 点击配置单个 Feed。
- 在Feed 名称字段中,输入 Feed 的名称(例如
Duo Entity Context)。 - 选择 Google Cloud Storage V2 作为来源类型。
- 选择 Duo Entity context data 作为日志类型。
点击获取服务账号。系统会显示一个唯一的服务账号电子邮件地址,例如:
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com复制此电子邮件地址,以便在下一步中使用。
向 Google SecOps 服务账号授予 IAM 权限
Google SecOps 服务账号需要对您的 GCS 存储分区具有 Storage Object Viewer 角色。
- 前往 Cloud Storage > 存储分区。
- 点击您的存储分区名称。
- 前往权限标签页。
- 点击授予访问权限。
- 提供以下配置详细信息:
- 添加主账号:粘贴 Google SecOps 服务账号电子邮件地址。
- 分配角色:选择 Storage Object Viewer。
点击保存。
在 Google SecOps 中配置 Feed 以注入 Duo Entity Context 数据
- 依次前往 SIEM 设置 > Feed。
- 点击添加新 Feed。
- 点击配置单个 Feed。
- 在Feed 名称字段中,输入 Feed 的名称(例如
Duo Entity Context)。 - 选择 Google Cloud Storage V2 作为来源类型。
- 选择 Duo Entity context data 作为日志类型。
- 点击下一步。
为以下输入参数指定值:
存储分区网址:输入带有前缀路径的 GCS 存储分区 URI:
gs://duo-context/duo/context/将
duo-context:您的 GCS 存储分区名称。duo/context/:存储日志的前缀/文件夹路径(必须与GCS_PREFIX环境变量一致)。
来源删除选项:根据您的偏好选择删除选项:
- 永不:永不删除转移后的任何文件(建议用于测试)。
- 删除已转移的文件:在成功转移后删除文件。
删除已转移的文件和空目录:在成功转移后删除文件和空目录。
文件存在时间上限:包含在过去指定天数内修改的文件。默认值为 180 天。
资产命名空间:资产命名空间。
注入标签:要应用于此 Feed 中事件的标签。
点击下一步。
在最终确定界面中查看新的 Feed 配置,然后点击提交。
UDM 映射表
| 日志字段 | UDM 映射 | 逻辑 |
|---|---|---|
| 已启用 | entity.asset.deployment_status | 如果“activated”为 false,则设置为“DECOMISSIONED”,否则设置为“ACTIVE”。 |
| browsers.browser_family | entity.asset.software.name | 从原始日志的“browsers”数组中提取。 |
| browsers.browser_version | entity.asset.software.version | 从原始日志的“browsers”数组中提取。 |
| device_name | entity.asset.hostname | 直接从原始日志映射。 |
| disk_encryption_status | entity.asset.attribute.labels.key: "disk_encryption_status", entity.asset.attribute.labels.value | 直接从原始日志映射,转换为小写。 |
| 电子邮件 | entity.user.email_addresses | 如果原始日志包含“@”,则直接从原始日志映射;否则,如果“username”或“username1”包含“@”,则使用“username”或“username1”。 |
| 已加密 | entity.asset.attribute.labels.key: "Encrypted", entity.asset.attribute.labels.value | 直接从原始日志映射,转换为小写。 |
| epkey | entity.asset.product_object_id | 如果存在,则用作“product_object_id”,否则使用“phone_id”或“token_id”。 |
| 指纹 | entity.asset.attribute.labels.key:“Finger Print”,entity.asset.attribute.labels.value | 直接从原始日志映射,转换为小写。 |
| firewall_status | entity.asset.attribute.labels.key: "firewall_status", entity.asset.attribute.labels.value | 直接从原始日志映射,转换为小写。 |
| hardware_uuid | entity.asset.asset_id | 如果存在,则用作“asset_id”,否则使用“user_id”。 |
| last_seen | entity.asset.last_discover_time | 解析为 ISO8601 时间戳并进行映射。 |
| 模型 | entity.asset.hardware.model | 直接从原始日志映射。 |
| 数值 | entity.user.phone_numbers | 直接从原始日志映射。 |
| os_family | entity.asset.platform_software.platform | 根据值(不区分大小写)映射到“WINDOWS”“LINUX”或“MAC”。 |
| os_version | entity.asset.platform_software.platform_version | 直接从原始日志映射。 |
| password_status | entity.asset.attribute.labels.key: "password_status", entity.asset.attribute.labels.value | 直接从原始日志映射,转换为小写。 |
| phone_id | entity.asset.product_object_id | 如果不存在“epkey”,则用作“product_object_id”,否则使用“token_id”。 |
| security_agents.security_agent | entity.asset.software.name | 从原始日志的“security_agents”数组中提取。 |
| security_agents.version | entity.asset.software.version | 从原始日志的“security_agents”数组中提取。 |
| 时间戳 | entity.metadata.collected_timestamp | 填充“metadata”对象中的“collected_timestamp”字段。 |
| token_id | entity.asset.product_object_id | 如果不存在“epkey”和“phone_id”,则用作“product_object_id”。 |
| trusted_endpoint | entity.asset.attribute.labels.key: "trusted_endpoint", entity.asset.attribute.labels.value | 直接从原始日志映射,转换为小写。 |
| 类型 | entity.asset.type | 如果原始日志的“type”包含“mobile”(不区分大小写),则设置为“MOBILE”,否则设置为“LAPTOP”。 |
| user_id | entity.asset.asset_id | 如果不存在“hardware_uuid”,则用作“asset_id”。 |
| users.email | entity.user.email_addresses | 如果它是“users”数组中的第一个用户,并且包含“@”,则用作“email_addresses”。 |
| users.username | entity.user.userid | 提取“@”之前的用户名,如果这是“users”数组中的第一个用户,则将其用作“userid”。 |
| entity.metadata.vendor_name | “Duo” | |
| entity.metadata.product_name | “Duo 实体上下文数据” | |
| entity.metadata.entity_type | ASSET | |
| entity.relations.entity_type | 用户 | |
| entity.relations.relationship | OWNS |
需要更多帮助?获得社区成员和 Google SecOps 专业人士的解答。