收集 Duo 身份验证日志
本文档介绍了如何将 Duo 身份验证日志提取到 Google Security Operations。解析器从 JSON 格式的消息中提取日志。它将原始日志数据转换为统一数据模型 (UDM),映射用户、设备、应用、位置和身份验证详细信息等字段,同时处理各种身份验证因素和结果,以对安全事件进行分类。解析器还会执行数据清理、类型转换和错误处理,以确保数据质量和一致性。
您可以选择以下两种收集方法:
- 选项 1:使用第三方 API 直接提取
- 方法 2:使用 Cloud Run 函数和 Google Cloud Storage 收集日志
准备工作
确保您满足以下前提条件:
- Google SecOps 实例
- 对 Duo 管理面板的特权访问权限(需要所有者角色才能创建 Admin API 应用)
- 如果使用方法 2,则需要对 GCP 具有特权访问权限
方法 1:使用第三方 API 提取 Duo 身份验证日志
收集 Duo 前提条件(API 凭据)
- 以拥有 Owner(所有者)、Administrator(管理员)或 Application Manager(应用管理器)角色的管理员身份登录 Duo 管理控制台。
- 前往应用 > 应用目录。
- 在目录中找到 Admin API 的条目。
- 点击 + 添加以创建应用。
- 复制以下详细信息并将其保存在安全的位置:
- 集成密钥
- 密钥
- API 主机名(例如
api-XXXXXXXX.duosecurity.com)
- 前往权限部分。
- 取消选择除授予读取日志权限以外的所有权限选项。
- 点击保存更改。
在 Google SecOps 中配置 Feed 以提取 Duo 身份验证日志
- 依次前往 SIEM 设置 > Feed。
- 点击 + 添加新 Feed。
- 在Feed 名称字段中,输入 Feed 的名称(例如
Duo Authentication Logs)。 - 选择第三方 API 作为来源类型。
- 选择 Duo Auth 作为日志类型。
- 点击下一步。
- 为以下输入参数指定值:
- 用户名:输入 Duo 中的集成密钥。
- 密钥:输入 Duo 中的密钥。
- API 主机名:输入您的 API 主机名(例如
api-XXXXXXXX.duosecurity.com)。 - 资源命名空间:可选。资产命名空间。
- 注入标签:可选。要应用于相应 Feed 中事件的标签。
- 点击下一步。
- 在最终确定界面中查看新的 Feed 配置,然后点击提交。
方案 2:使用 Google Cloud Storage 提取 Duo 身份验证日志
收集 Duo Admin API 凭据
- 登录 Duo 管理面板。
- 前往应用 > 应用目录。
- 在应用目录中找到 Admin API。
- 点击 + 添加以添加 Admin API 应用。
- 复制并保存以下值:
- 集成密钥 (ikey)
- 密钥 (skey)
- API 主机名(例如
api-XXXXXXXX.duosecurity.com)
- 在权限中,启用授予读取日志权限。
- 点击保存更改。
创建 Google Cloud Storage 存储分区
- 前往 Google Cloud 控制台。
- 选择您的项目或创建新项目。
- 在导航菜单中,依次前往 Cloud Storage > 存储分区。
- 点击创建存储分区。
提供以下配置详细信息:
设置 值 为存储分区命名 输入一个全局唯一的名称(例如 duo-auth-logs)位置类型 根据您的需求进行选择(区域级、双区域级、多区域级) 位置 选择相应位置(例如 us-central1)存储类别 标准(建议用于经常访问的日志) 访问权限控制 统一(推荐) 保护工具 可选:启用对象版本控制或保留政策 点击创建。
为 Cloud Run 函数创建服务账号
Cloud Run 函数需要一个有权写入 GCS 存储分区的服务账号。
创建服务账号
- 在 GCP 控制台中,依次前往 IAM 和管理 > 服务账号。
- 点击创建服务账号。
- 提供以下配置详细信息:
- 服务账号名称:输入
duo-auth-collector-sa。 - 服务账号说明:输入
Service account for Cloud Run function to collect Duo authentication logs。
- 服务账号名称:输入
- 点击创建并继续。
- 在向此服务账号授予对项目的访问权限部分中,添加以下角色:
- 点击选择角色。
- 搜索并选择 Storage Object Admin。
- 点击 + 添加其他角色。
- 搜索并选择 Cloud Run Invoker。
- 点击 + 添加其他角色。
- 搜索并选择 Cloud Functions Invoker。
- 点击继续。
- 点击完成。
必须拥有这些角色,才能:
- Storage Object Admin:将日志写入 GCS 存储分区并管理状态文件
- Cloud Run Invoker:允许 Pub/Sub 调用函数
- Cloud Functions Invoker:允许调用函数
授予对 GCS 存储分区的 IAM 权限
向服务账号授予对 GCS 存储分区的写入权限:
- 前往 Cloud Storage > 存储分区。
- 点击您的存储分区名称。
- 前往权限标签页。
- 点击授予访问权限。
- 提供以下配置详细信息:
- 添加主账号:输入服务账号电子邮件地址(例如
duo-auth-collector-sa@PROJECT_ID.iam.gserviceaccount.com)。 - 分配角色:选择 Storage Object Admin。
- 添加主账号:输入服务账号电子邮件地址(例如
- 点击保存。
创建发布/订阅主题
创建一个 Pub/Sub 主题,供 Cloud Scheduler 发布消息,并供 Cloud Run 函数订阅。
- 在 GCP 控制台中,前往 Pub/Sub > 主题。
- 点击创建主题。
- 提供以下配置详细信息:
- 主题 ID:输入
duo-auth-trigger。 - 将其他设置保留为默认值。
- 主题 ID:输入
- 点击创建。
创建 Cloud Run 函数以收集日志
Cloud Run 函数由来自 Cloud Scheduler 的 Pub/Sub 消息触发,用于从 Duo Admin API 中提取日志并将其写入 GCS。
- 在 GCP 控制台中,前往 Cloud Run。
- 点击创建服务。
- 选择函数(使用内嵌编辑器创建函数)。
在配置部分中,提供以下配置详细信息:
设置 值 Service 名称 duo-auth-collector区域 选择与您的 GCS 存储分区匹配的区域(例如 us-central1)运行时 选择 Python 3.12 或更高版本 在触发器(可选)部分中:
- 点击 + 添加触发器。
- 选择 Cloud Pub/Sub。
- 在选择 Cloud Pub/Sub 主题部分,选择主题
duo-auth-trigger。 - 点击保存。
在身份验证部分中:
- 选择需要进行身份验证。
- 检查 Identity and Access Management (IAM)。
向下滚动并展开容器、网络、安全性。
前往安全标签页:
- 服务账号:选择服务账号
duo-auth-collector-sa。
- 服务账号:选择服务账号
前往容器标签页:
- 点击变量和密钥。
- 为每个环境变量点击 + 添加变量:
变量名称 示例值 GCS_BUCKETduo-auth-logsGCS_PREFIXduo/auth/STATE_KEYduo/auth/state.jsonDUO_IKEYDIXYZ...DUO_SKEY****************DUO_API_HOSTNAMEapi-XXXXXXXX.duosecurity.comLIMIT500在变量和密钥标签页中,向下滚动到请求:
- 请求超时:输入
600秒(10 分钟)。
- 请求超时:输入
前往容器中的设置标签页:
- 在资源部分中:
- 内存:选择 512 MiB 或更高值。
- CPU:选择 1。
- 点击完成。
- 在资源部分中:
滚动到执行环境:
- 选择默认(推荐)。
在修订版本扩缩部分中:
- 实例数下限:输入
0。 - 实例数上限:输入
100(或根据预期负载进行调整)。
- 实例数下限:输入
点击创建。
等待服务创建完成(1-2 分钟)。
创建服务后,系统会自动打开内嵌代码编辑器。
添加函数代码
- 在函数入口点中输入 main
在内嵌代码编辑器中,创建两个文件:
- 第一个文件:main.py::
#!/usr/bin/env python3 # Cloud Run Function: Pull Duo Admin API v2 Authentication Logs to GCS (raw JSON pages) # Notes: # - Duo v2 requires mintime/maxtime in *milliseconds* (13-digit epoch). # - Pagination via metadata.next_offset ("<millis>,<txid>"). # - We save state (mintime_ms) in ms to resume next run without gaps. import functions_framework from google.cloud import storage import os import json import time import hmac import hashlib import base64 import email.utils import urllib.parse from urllib.request import Request, urlopen from urllib.error import HTTPError, URLError DUO_IKEY = os.environ["DUO_IKEY"] DUO_SKEY = os.environ["DUO_SKEY"] DUO_API_HOSTNAME = os.environ["DUO_API_HOSTNAME"].strip() GCS_BUCKET = os.environ["GCS_BUCKET"] GCS_PREFIX = os.environ.get("GCS_PREFIX", "duo/auth/").strip("/") STATE_KEY = os.environ.get("STATE_KEY", "duo/auth/state.json") LIMIT = min(int(os.environ.get("LIMIT", "500")), 1000) # default 500, max 1000 storage_client = storage.Client() def _canon_params(params: dict) -> str: parts = [] for k in sorted(params.keys()): v = params[k] if v is None: continue parts.append(f"{urllib.parse.quote(str(k), '~')}={urllib.parse.quote(str(v), '~')}") return "&".join(parts) def _sign(method: str, host: str, path: str, params: dict) -> dict: now = email.utils.formatdate() canon = "\n".join([ now, method.upper(), host.lower(), path, _canon_params(params) ]) sig = hmac.new( DUO_SKEY.encode("utf-8"), canon.encode("utf-8"), hashlib.sha1 ).hexdigest() auth = base64.b64encode(f"{DUO_IKEY}:{sig}".encode()).decode() return { "Date": now, "Authorization": f"Basic {auth}" } def _http(method: str, path: str, params: dict, timeout: int = 60, max_retries: int = 5) -> dict: host = DUO_API_HOSTNAME assert host.startswith("api-") and host.endswith(".duosecurity.com"), \ "DUO_API_HOSTNAME must be like api-XXXXXXXX.duosecurity.com" qs = _canon_params(params) url = f"https://{host}{path}" + (f"?{qs}" if qs else "") attempt, backoff = 0, 1.0 while True: req = Request(url, method=method.upper()) req.add_header("Accept", "application/json") for k, v in _sign(method, host, path, params).items(): req.add_header(k, v) try: with urlopen(req, timeout=timeout) as r: return json.loads(r.read().decode("utf-8")) except HTTPError as e: if (e.code == 429 or 500 <= e.code <= 599) and attempt < max_retries: time.sleep(backoff) attempt += 1 backoff *= 2 continue raise except URLError: if attempt < max_retries: time.sleep(backoff) attempt += 1 backoff *= 2 continue raise def _read_state_ms() -> int | None: try: bucket = storage_client.bucket(GCS_BUCKET) blob = bucket.blob(STATE_KEY) if blob.exists(): state_data = blob.download_as_text() val = json.loads(state_data).get("mintime") if val is None: return None # Backward safety: if seconds were stored, convert to ms return int(val) * 1000 if len(str(int(val))) <= 10 else int(val) except Exception: return None def _write_state_ms(mintime_ms: int): bucket = storage_client.bucket(GCS_BUCKET) blob = bucket.blob(STATE_KEY) body = json.dumps({"mintime": int(mintime_ms)}).encode("utf-8") blob.upload_from_string(body, content_type="application/json") def _write_page(payload: dict, when_epoch_s: int, page: int) -> str: bucket = storage_client.bucket(GCS_BUCKET) key = f"{GCS_PREFIX}/{time.strftime('%Y/%m/%d', time.gmtime(when_epoch_s))}/duo-auth-{page:05d}.json" blob = bucket.blob(key) blob.upload_from_string( json.dumps(payload, separators=(",", ":")).encode("utf-8"), content_type="application/json" ) return key def fetch_and_store(): now_s = int(time.time()) # Duo recommends a ~2-minute delay buffer; use maxtime = now - 120 seconds (in ms) maxtime_ms = (now_s - 120) * 1000 mintime_ms = _read_state_ms() or (maxtime_ms - 3600 * 1000) # 1 hour on first run page = 0 total = 0 next_offset = None while True: params = { "mintime": mintime_ms, "maxtime": maxtime_ms, "limit": LIMIT } if next_offset: params["next_offset"] = next_offset data = _http("GET", "/admin/v2/logs/authentication", params) _write_page(data, maxtime_ms // 1000, page) page += 1 resp = data.get("response") items = resp if isinstance(resp, list) else [] total += len(items) meta = data.get("metadata") or {} next_offset = meta.get("next_offset") if not next_offset: break # Advance window to maxtime_ms for next run _write_state_ms(maxtime_ms) return { "ok": True, "pages": page, "events": total, "next_mintime_ms": maxtime_ms } @functions_framework.cloud_event def main(cloud_event): """ Cloud Run function triggered by Pub/Sub to fetch Duo authentication logs and write to GCS. Args: cloud_event: CloudEvent object containing Pub/Sub message """ try: result = fetch_and_store() print(f"Successfully processed {result['events']} events in {result['pages']} pages") print(f"Next mintime_ms: {result['next_mintime_ms']}") except Exception as e: print(f"Error processing logs: {str(e)}") raise- 第二个文件:requirements.txt::
functions-framework==3.* google-cloud-storage==2.*点击部署以保存并部署该函数。
等待部署完成(2-3 分钟)。
创建 Cloud Scheduler 作业
Cloud Scheduler 会定期向 Pub/Sub 主题发布消息,从而触发 Cloud Run 函数。
- 在 GCP Console 中,前往 Cloud Scheduler。
- 点击创建作业。
提供以下配置详细信息:
设置 值 名称 duo-auth-collector-hourly区域 选择与 Cloud Run 函数相同的区域 频率 0 * * * *(每小时一次,在整点时)时区 选择时区(建议选择世界协调时间 [UTC]) 目标类型 Pub/Sub 主题 选择主题 duo-auth-trigger消息正文 {}(空 JSON 对象)点击创建。
时间表频率选项
根据日志量和延迟时间要求选择频次:
频率 Cron 表达式 使用场景 每隔 5 分钟 */5 * * * *高容量、低延迟 每隔 15 分钟 */15 * * * *搜索量中等 每小时 0 * * * *标准(推荐) 每 6 小时 0 */6 * * *量小、批处理 每天 0 0 * * *历史数据收集
测试调度器作业
- 在 Cloud Scheduler 控制台中,找到您的作业。
- 点击强制运行以手动触发。
- 等待几秒钟,然后前往 Cloud Run > 服务 > duo-auth-collector > 日志。
- 验证函数是否已成功执行。
- 检查 GCS 存储分区,确认日志已写入。
检索 Google SecOps 服务账号
Google SecOps 使用唯一的服务账号从您的 GCS 存储分区中读取数据。您必须授予此服务账号对您的存储分区的访问权限。
获取服务账号电子邮件地址
- 依次前往 SIEM 设置 > Feed。
- 点击添加新 Feed。
- 点击配置单个 Feed。
- 在Feed 名称字段中,输入 Feed 的名称(例如
Duo Authentication Logs)。 - 选择 Google Cloud Storage V2 作为来源类型。
- 选择 Duo Auth 作为日志类型。
点击获取服务账号。系统会显示一个唯一的服务账号电子邮件地址,例如:
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com复制此电子邮件地址,以便在下一步中使用。
向 Google SecOps 服务账号授予 IAM 权限
Google SecOps 服务账号需要对您的 GCS 存储分区具有 Storage Object Viewer 角色。
- 前往 Cloud Storage > 存储分区。
- 点击您的存储分区名称。
- 前往权限标签页。
- 点击授予访问权限。
- 提供以下配置详细信息:
- 添加主账号:粘贴 Google SecOps 服务账号电子邮件地址。
- 分配角色:选择 Storage Object Viewer。
点击保存。
在 Google SecOps 中配置 Feed 以提取 Duo 身份验证日志
- 依次前往 SIEM 设置 > Feed。
- 点击添加新 Feed。
- 点击配置单个 Feed。
- 在Feed 名称字段中,输入 Feed 的名称(例如
Duo Authentication Logs)。 - 选择 Google Cloud Storage V2 作为来源类型。
- 选择 Duo Auth 作为日志类型。
- 点击下一步。
为以下输入参数指定值:
存储分区网址:输入带有前缀路径的 GCS 存储分区 URI:
gs://duo-auth-logs/duo/auth/将
duo-auth-logs:您的 GCS 存储分区名称。duo/auth/:存储日志的可选前缀/文件夹路径(留空表示根目录)。
示例:
- 根存储分区:
gs://company-logs/ - 带前缀:
gs://company-logs/duo-logs/ - 使用子文件夹:
gs://company-logs/duo/auth/
- 根存储分区:
来源删除选项:根据您的偏好选择删除选项:
- 永不:永不删除转移后的任何文件(建议用于测试)。
- 删除已转移的文件:在成功转移后删除文件。
删除已转移的文件和空目录:在成功转移后删除文件和空目录。
文件存在时间上限:包含在过去指定天数内修改的文件。默认值为 180 天。
资产命名空间:资产命名空间。
注入标签:要应用于此 Feed 中事件的标签。
点击下一步。
在最终确定界面中查看新的 Feed 配置,然后点击提交。
UDM 映射表
| 日志字段 | UDM 映射 | 逻辑 |
|---|---|---|
| access_device.browser | target.resource.attribute.labels.value | 如果存在 access_device.browser,则将其值映射到 UDM。 |
| access_device.hostname | principal.hostname | 如果 access_device.hostname 存在且不为空,则其值会映射到 UDM。如果为空且 event_type 为 USER_CREATION,则 event_type 会更改为 USER_UNCATEGORIZED。如果 access_device.hostname 为空且存在 hostname 字段,则使用 hostname 的值。 |
| access_device.ip | principal.ip | 如果 access_device.ip 存在且是有效的 IPv4 地址,则其值会映射到 UDM。如果不是有效的 IPv4 地址,则会作为字符串值添加到 additional.fields 中,键为 access_device.ip。 |
| access_device.location.city | principal.location.city | 如果存在,则将该值映射到 UDM。 |
| access_device.location.country | principal.location.country_or_region | 如果存在,则将该值映射到 UDM。 |
| access_device.location.state | principal.location.state | 如果存在,则将该值映射到 UDM。 |
| access_device.os | principal.platform | 如果存在,则该值会转换为相应的 UDM 值(MAC、WINDOWS、LINUX)。 |
| access_device.os_version | principal.platform_version | 如果存在,则将该值映射到 UDM。 |
| application.key | target.resource.id | 如果存在,则将该值映射到 UDM。 |
| application.name | target.application | 如果存在,则将该值映射到 UDM。 |
| auth_device.ip | target.ip | 如果存在且不为“None”,则该值会映射到 UDM。 |
| auth_device.location.city | target.location.city | 如果存在,则将该值映射到 UDM。 |
| auth_device.location.country | target.location.country_or_region | 如果存在,则将该值映射到 UDM。 |
| auth_device.location.state | target.location.state | 如果存在,则将该值映射到 UDM。 |
| auth_device.name | target.hostname OR target.user.phone_numbers | 如果 auth_device.name 存在且是电话号码(标准化后),则会将其添加到 target.user.phone_numbers。否则,它会映射到 target.hostname。 |
| client_ip | target.ip | 如果存在且不为“None”,则该值会映射到 UDM。 |
| client_section | target.resource.attribute.labels.value | 如果存在 client_section,则其值会映射到键为 client_section 的 UDM。 |
| dn | target.user.userid | 如果存在 dn,但不存在 user.name 和 username,则使用 grok 从 dn 字段中提取 userid 并将其映射到 UDM。event_type 设置为 USER_LOGIN。 |
| event_type | metadata.product_event_type 和 metadata.event_type | 该值会映射到 metadata.product_event_type。它还用于确定 metadata.event_type:“authentication”变为 USER_LOGIN,“enrollment”变为 USER_CREATION,如果为空或不是上述任一值,则变为 GENERIC_EVENT。 |
| 因素 | extensions.auth.mechanism 和 extensions.auth.auth_details | 该值会转换为相应的 UDM auth.mechanism 值(HARDWARE_KEY、REMOTE_INTERACTIVE、LOCAL、OTP)。原始值也会映射到 extensions.auth.auth_details。 |
| 主机名 | principal.hostname | 如果存在且 access_device.hostname 为空,则该值会映射到 UDM。 |
| log_format | target.resource.attribute.labels.value | 如果存在 log_format,则其值会映射到键为 log_format 的 UDM。 |
| 日志级别。_classuuid_ | target.resource.attribute.labels.value | 如果存在 loglevel._classuuid_,则其值会映射到键为 class_uuid 的 UDM。 |
| log_level.name | target.resource.attribute.labels.value 和 security_result.severity | 如果存在 log_level.name,则其值会映射到具有相应键名称的 UDM。如果值为“info”,则将 security_result.severity 设置为 INFORMATIONAL。 |
| log_logger.unpersistable | target.resource.attribute.labels.value | 如果存在 log_logger.unpersistable,则其值会映射到键为 unpersistable 的 UDM。 |
| log_namespace | target.resource.attribute.labels.value | 如果存在 log_namespace,则其值会映射到键为 log_namespace 的 UDM。 |
| log_source | target.resource.attribute.labels.value | 如果存在 log_source,则其值会映射到键为 log_source 的 UDM。 |
| 消息 | security_result.summary | 如果存在且原因是空值,则该值会映射到 UDM。 |
| reason | security_result.summary | 如果存在,则将该值映射到 UDM。 |
| 结果 | security_result.action_details 和 security_result.action | 如果存在,则该值会映射到 security_result.action_details。“success”或“SUCCESS”表示 security_result.action 为 ALLOW,否则为 BLOCK。 |
| server_section | target.resource.attribute.labels.value | 如果存在 server_section,则其值会映射到键为 server_section 的 UDM。 |
| server_section_ikey | target.resource.attribute.labels.value | 如果存在 server_section_ikey,则其值会映射到具有键 server_section_ikey 的 UDM。 |
| 状态 | security_result.action_details 和 security_result.action | 如果存在,则该值会映射到 security_result.action_details。“允许”会转换为 security_result.action ALLOW,“拒绝”会转换为 BLOCK。 |
| 时间戳 | metadata.event_timestamp 和 event.timestamp | 该值会转换为时间戳,并同时映射到 metadata.event_timestamp 和 event.timestamp。 |
| txid | metadata.product_log_id AND network.session_id | 该值会同时映射到 metadata.product_log_id 和 network.session_id。 |
| user.groups | target.user.group_identifiers | 数组中的所有值都会添加到 target.user.group_identifiers 中。 |
| user.key | target.user.product_object_id | 如果存在,则将该值映射到 UDM。 |
| user.name | target.user.userid | 如果存在,则将该值映射到 UDM。 |
| username | target.user.userid | 如果存在且 user.name 不存在,则该值会映射到 UDM。event_type 设置为 USER_LOGIN。 |
| (解析器逻辑) | metadata.vendor_name | 始终设置为“DUO_SECURITY”。 |
| (解析器逻辑) | metadata.product_name | 始终设置为“MULTI-FACTOR_AUTHENTICATION”。 |
| (解析器逻辑) | metadata.log_type | 取自原始日志的顶级 log_type 字段。 |
| (解析器逻辑) | extensions.auth.type | 始终设置为“SSO”。 |
需要更多帮助?获得社区成员和 Google SecOps 专业人士的解答。