收集 Duo 身份验证日志
本文档介绍了如何将 Duo 身份验证日志注入到 Google Security Operations。解析器从 JSON 格式的消息中提取日志。它将原始日志数据转换为统一数据模型 (UDM),映射用户、设备、应用、位置和身份验证详细信息等字段,同时处理各种身份验证因素和结果,以对安全事件进行分类。解析器还会执行数据清理、类型转换和错误处理,以确保数据质量和一致性。
您可以选择以下两种收集方法之一:
- 选项 1:使用第三方 API 直接提取
- 选项 2:使用 AWS Lambda 和 Amazon S3 收集日志
准备工作
- Google SecOps 实例
- 对 Duo 管理面板的特权访问权限(需要所有者角色才能创建 Admin API 应用)
- 如果使用选项 2,则需要 AWS 特权访问权限
方法 1:使用第三方 API 提取 Duo 身份验证日志
收集 Duo 前提条件(API 凭据)
- 以拥有所有者、管理员或应用管理器角色的管理员身份登录 Duo 管理控制台。
- 前往应用 > 应用目录。
- 在目录中找到 Admin API 的条目。
- 点击 + 添加以创建应用。
- 复制以下详细信息并将其保存在安全的位置:
- 集成密钥
- 密钥
- API 主机名(例如
api-XXXXXXXX.duosecurity.com)
- 前往权限部分。
- 取消选择除授予读取日志权限以外的所有权限选项。
- 点击保存更改。
在 Google SecOps 中配置 Feed 以注入 Duo 身份验证日志
- 依次前往 SIEM 设置> Feed。
- 点击 + 添加新 Feed。
- 在Feed 名称字段中,输入 Feed 的名称(例如
Duo Authentication Logs)。 - 选择第三方 API 作为来源类型。
- 选择 Duo Auth 作为日志类型。
- 点击下一步。
- 为以下输入参数指定值:
- 用户名:输入 Duo 中的集成密钥。
- 密钥:输入 Duo 中的密钥。
- API 主机名:输入您的 API 主机名(例如
api-XXXXXXXX.duosecurity.com)。 - 资源命名空间:可选。资产命名空间。
- 注入标签:可选。要应用于此 Feed 中事件的标签。
- 点击下一步。
- 在最终确定界面中查看新的 Feed 配置,然后点击提交。
方法 2:使用 AWS S3 注入 Duo 身份验证日志
收集 Duo Admin API 凭据
- 登录 Duo 管理面板。
- 依次前往应用 > 保护应用。
- 在应用目录中找到 Admin API。
- 点击保护以添加 Admin API 应用。
- 复制并保存以下值:
- 集成密钥 (ikey)
- 密钥 (skey)
- API 主机名(例如
api-XXXXXXXX.duosecurity.com)
- 在权限中,启用授予读取日志权限。
- 点击保存更改。
为 Google SecOps 配置 AWS S3 存储桶和 IAM
- 按照以下用户指南创建 Amazon S3 存储桶:创建存储桶。
- 保存存储桶名称和区域以供日后参考(例如
duo-auth-logs)。 - 按照以下用户指南创建用户:创建 IAM 用户。
- 选择创建的用户。
- 选择安全凭据标签页。
- 在访问密钥部分中,点击创建访问密钥。
- 选择第三方服务作为使用情形。
- 点击下一步。
- 可选:添加说明标记。
- 点击创建访问密钥。
- 点击下载 CSV 文件,保存访问密钥和秘密访问密钥,以供日后参考。
- 点击完成。
- 选择权限标签页。
- 在权限政策部分中,点击添加权限。
- 选择添加权限。
- 选择直接附加政策。
- 搜索并选择 AmazonS3FullAccess 政策。
- 点击下一步。
- 点击添加权限。
为 S3 上传配置 IAM 政策和角色
- 在 AWS 控制台中,依次前往 IAM > 政策 > 创建政策 > JSON 标签页。
输入以下政策:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutDuoAuthObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::duo-auth-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::duo-auth-logs/duo/auth/state.json" } ] }- 如果您输入了其他存储桶名称,请替换
duo-auth-logs。
- 如果您输入了其他存储桶名称,请替换
依次点击下一步 > 创建政策。
依次前往 IAM > 角色 > 创建角色 > AWS 服务 > Lambda。
附加新创建的政策。
将角色命名为
WriteDuoAuthToS3Role,然后点击创建角色。
创建 Lambda 函数
- 在 AWS 控制台中,依次前往 Lambda > 函数。
- 依次点击创建函数 > 从头开始创作。
提供以下配置详细信息:
设置 值 名称 duo_auth_to_s3运行时 Python 3.13 架构 x86_64 执行角色 WriteDuoAuthToS3Role创建函数后,打开 Code 标签页,删除桩代码并输入以下代码 (
duo_auth_to_s3.py):#!/usr/bin/env python3 # Lambda: Pull Duo Admin API v2 Authentication Logs to S3 (raw JSON pages) # Notes: # - Duo v2 requires mintime/maxtime in *milliseconds* (13-digit epoch). # - Pagination via metadata.next_offset ("<millis>,<txid>"). # - We save state (mintime_ms) in ms to resume next run without gaps. import os, json, time, hmac, hashlib, base64, email.utils, urllib.parse from urllib.request import Request, urlopen from urllib.error import HTTPError, URLError import boto3 DUO_IKEY = os.environ["DUO_IKEY"] DUO_SKEY = os.environ["DUO_SKEY"] DUO_API_HOSTNAME = os.environ["DUO_API_HOSTNAME"].strip() S3_BUCKET = os.environ["S3_BUCKET"] S3_PREFIX = os.environ.get("S3_PREFIX", "duo/auth/").strip("/") STATE_KEY = os.environ.get("STATE_KEY", "duo/auth/state.json") LIMIT = min(int(os.environ.get("LIMIT", "500")), 1000) # default 100, max 1000 s3 = boto3.client("s3") def _canon_params(params: dict) -> str: parts = [] for k in sorted(params.keys()): v = params[k] if v is None: continue parts.append(f"{urllib.parse.quote(str(k), '~')}={urllib.parse.quote(str(v), '~')}") return "&".join(parts) def _sign(method: str, host: str, path: str, params: dict) -> dict: now = email.utils.formatdate() canon = "\n".join([now, method.upper(), host.lower(), path, _canon_params(params)]) sig = hmac.new(DUO_SKEY.encode("utf-8"), canon.encode("utf-8"), hashlib.sha1).hexdigest() auth = base64.b64encode(f"{DUO_IKEY}:{sig}".encode()).decode() return {"Date": now, "Authorization": f"Basic {auth}"} def _http(method: str, path: str, params: dict, timeout: int = 60, max_retries: int = 5) -> dict: host = DUO_API_HOSTNAME assert host.startswith("api-") and host.endswith(".duosecurity.com"), \ "DUO_API_HOSTNAME must be like api-XXXXXXXX.duosecurity.com" qs = _canon_params(params) url = f"https://{host}{path}" + (f"?{qs}" if qs else "") attempt, backoff = 0, 1.0 while True: req = Request(url, method=method.upper()) req.add_header("Accept", "application/json") for k, v in _sign(method, host, path, params).items(): req.add_header(k, v) try: with urlopen(req, timeout=timeout) as r: return json.loads(r.read().decode("utf-8")) except HTTPError as e: if (e.code == 429 or 500 <= e.code <= 599) and attempt < max_retries: time.sleep(backoff); attempt += 1; backoff *= 2; continue raise except URLError: if attempt < max_retries: time.sleep(backoff); attempt += 1; backoff *= 2; continue raise def _read_state_ms() -> int | None: try: obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY) val = json.loads(obj["Body"].read()).get("mintime") if val is None: return None # Backward safety: if seconds were stored, convert to ms return int(val) * 1000 if len(str(int(val))) <= 10 else int(val) except Exception: return None def _write_state_ms(mintime_ms: int): body = json.dumps({"mintime": int(mintime_ms)}).encode("utf-8") s3.put_object(Bucket=S3_BUCKET, Key=STATE_KEY, Body=body, ContentType="application/json") def _write_page(payload: dict, when_epoch_s: int, page: int) -> str: key = f"{S3_PREFIX}/{time.strftime('%Y/%m/%d', time.gmtime(when_epoch_s))}/duo-auth-{page:05d}.json" s3.put_object( Bucket=S3_BUCKET, Key=key, Body=json.dumps(payload, separators=(",", ":")).encode("utf-8"), ContentType="application/json", ) return key def fetch_and_store(): now_s = int(time.time()) # Duo recommends a ~2-minute delay buffer; use maxtime = now - 120 seconds (in ms) maxtime_ms = (now_s - 120) * 1000 mintime_ms = _read_state_ms() or (maxtime_ms - 3600 * 1000) # 1 hour on first run page = 0 total = 0 next_offset = None while True: params = {"mintime": mintime_ms, "maxtime": maxtime_ms, "limit": LIMIT} if next_offset: params["next_offset"] = next_offset data = _http("GET", "/admin/v2/logs/authentication", params) _write_page(data, maxtime_ms // 1000, page) page += 1 resp = data.get("response") items = resp if isinstance(resp, list) else [] total += len(items) meta = data.get("metadata") or {} next_offset = meta.get("next_offset") if not next_offset: break # Advance window to maxtime_ms for next run _write_state_ms(maxtime_ms) return {"ok": True, "pages": page, "events": total, "next_mintime_ms": maxtime_ms} def lambda_handler(event=None, context=None): return fetch_and_store() if __name__ == "__main__": print(lambda_handler())依次前往配置 > 环境变量。
依次点击修改 > 添加新的环境变量。
输入以下环境变量,并替换为您的值。
键 示例值 S3_BUCKETduo-auth-logsS3_PREFIXduo/auth/STATE_KEYduo/auth/state.jsonDUO_IKEYDIXYZ...DUO_SKEY****************DUO_API_HOSTNAMEapi-XXXXXXXX.duosecurity.comLIMIT500创建函数后,请停留在其页面上(或依次打开 Lambda > 函数 > 您的函数)。
选择配置标签页。
在常规配置面板中,点击修改。
将超时更改为 5 分钟(300 秒),然后点击保存。
创建 EventBridge 计划
- 依次前往 Amazon EventBridge > 调度器 > 创建调度。
- 提供以下配置详细信息:
- 周期性安排:费率 (
1 hour)。 - 目标:您的 Lambda 函数
duo_auth_to_s3。 - 名称:
duo-auth-1h。
- 周期性安排:费率 (
- 点击创建时间表。
为 Google SecOps 创建只读 IAM 用户和密钥
- 在 AWS 控制台中,依次前往 IAM > 用户 > 添加用户。
- 点击 Add users(添加用户)。
- 提供以下配置详细信息:
- 用户:
secops-reader - 访问类型:访问密钥 - 以程序化方式访问
- 用户:
- 点击创建用户。
- 附加最低限度的读取政策(自定义):依次选择用户 > secops-reader > 权限 > 添加权限 > 直接附加政策 > 创建政策。
在 JSON 编辑器中,输入以下政策:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::duo-auth-logs/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::duo-auth-logs" } ] }将名称设置为
secops-reader-policy。依次前往创建政策 > 搜索/选择 > 下一步 > 添加权限。
依次前往安全凭据 > 访问密钥 > 创建访问密钥。
下载 CSV(这些值会输入到 Feed 中)。
在 Google SecOps 中配置 Feed 以注入 Duo 身份验证日志
- 依次前往 SIEM 设置> Feed。
- 点击 + 添加新 Feed。
- 在Feed 名称字段中,输入 Feed 的名称(例如
Duo Authentication Logs)。 - 选择 Amazon S3 V2 作为来源类型。
- 选择 Duo Auth 作为日志类型。
- 点击下一步。
- 为以下输入参数指定值:
- S3 URI:
s3://duo-auth-logs/duo/auth/ - 来源删除选项:根据您的偏好选择删除选项。
- 文件存在时间上限:包含在过去指定天数内修改的文件。默认值为 180 天。
- 访问密钥 ID:有权访问 S3 存储桶的用户访问密钥。
- 私有访问密钥:具有 S3 存储桶访问权限的用户私有密钥。
- 资产命名空间:资产命名空间。
- 注入标签:应用于此 Feed 中事件的标签。
- S3 URI:
- 点击下一步。
- 在最终确定界面中查看新的 Feed 配置,然后点击提交。
UDM 映射表
| 日志字段 | UDM 映射 | 逻辑 |
|---|---|---|
access_device.browser |
target.resource.attribute.labels.value |
如果存在 access_device.browser,则将其值映射到 UDM。 |
access_device.hostname |
principal.hostname |
如果 access_device.hostname 存在且不为空,则其值会映射到 UDM。如果该字段为空且 event_type 为 USER_CREATION,则 event_type 会更改为 USER_UNCATEGORIZED。如果 access_device.hostname 为空且存在 hostname 字段,则使用 hostname 的值。 |
access_device.ip |
principal.ip |
如果 access_device.ip 存在且是有效的 IPv4 地址,则其值会映射到 UDM。如果不是有效的 IPv4 地址,则会作为字符串值添加到键为 access_device.ip 的 additional.fields 中。 |
access_device.location.city |
principal.location.city |
如果存在,则将该值映射到 UDM。 |
access_device.location.country |
principal.location.country_or_region |
如果存在,则将该值映射到 UDM。 |
access_device.location.state |
principal.location.state |
如果存在,则将该值映射到 UDM。 |
access_device.os |
principal.platform |
如果存在,则该值会转换为相应的 UDM 值(MAC、WINDOWS、LINUX)。 |
access_device.os_version |
principal.platform_version |
如果存在,则将该值映射到 UDM。 |
application.key |
target.resource.id |
如果存在,则将该值映射到 UDM。 |
application.name |
target.application |
如果存在,则将该值映射到 UDM。 |
auth_device.ip |
target.ip |
如果存在且不为“None”,则该值会映射到 UDM。 |
auth_device.location.city |
target.location.city |
如果存在,则将该值映射到 UDM。 |
auth_device.location.country |
target.location.country_or_region |
如果存在,则将该值映射到 UDM。 |
auth_device.location.state |
target.location.state |
如果存在,则将该值映射到 UDM。 |
auth_device.name |
target.hostname 或 target.user.phone_numbers |
如果存在 auth_device.name 且 auth_device.name 是手机号码(标准化后),则将其添加到 target.user.phone_numbers。否则,它会映射到 target.hostname。 |
client_ip |
target.ip |
如果存在且不为“None”,则该值会映射到 UDM。 |
client_section |
target.resource.attribute.labels.value |
如果存在 client_section,则其值会映射到键为 client_section 的 UDM。 |
dn |
target.user.userid |
如果存在 dn,但不存在 user.name 和 username,则使用 grok 从 dn 字段中提取 userid 并将其映射到 UDM。event_type 设置为 USER_LOGIN。 |
event_type |
metadata.product_event_type 和 metadata.event_type |
该值映射到 metadata.product_event_type。它还用于确定 metadata.event_type:“authentication”变为 USER_LOGIN,“enrollment”变为 USER_CREATION,如果为空或不是上述任一值,则变为 GENERIC_EVENT。 |
factor |
extensions.auth.mechanism 和 extensions.auth.auth_details |
该值会转换为相应的 UDM auth.mechanism 值(HARDWARE_KEY、REMOTE_INTERACTIVE、LOCAL、OTP)。原始值也会映射到 extensions.auth.auth_details。 |
hostname |
principal.hostname |
如果存在且 access_device.hostname 为空,则该值会映射到 UDM。 |
log_format |
target.resource.attribute.labels.value |
如果存在 log_format,则其值会映射到键为 log_format 的 UDM。 |
log_level.__class_uuid__ |
target.resource.attribute.labels.value |
如果存在 log_level.__class_uuid__,则其值会映射到键为 __class_uuid__ 的 UDM。 |
log_level.name |
target.resource.attribute.labels.value 和 security_result.severity |
如果存在 log_level.name,则其值会映射到键为 name 的 UDM。如果值为“info”,则将 security_result.severity 设置为 INFORMATIONAL。 |
log_logger.unpersistable |
target.resource.attribute.labels.value |
如果存在 log_logger.unpersistable,则其值会映射到键为 unpersistable 的 UDM。 |
log_namespace |
target.resource.attribute.labels.value |
如果存在 log_namespace,则其值会映射到键为 log_namespace 的 UDM。 |
log_source |
target.resource.attribute.labels.value |
如果存在 log_source,则其值会映射到键为 log_source 的 UDM。 |
msg |
security_result.summary |
如果存在且 reason 为空,则该值会映射到 UDM。 |
reason |
security_result.summary |
如果存在,则将该值映射到 UDM。 |
result |
security_result.action_details 和 security_result.action |
如果存在,则将该值映射到 security_result.action_details。“success”或“SUCCESS”表示 security_result.action ALLOW,否则表示 BLOCK。 |
server_section |
target.resource.attribute.labels.value |
如果存在 server_section,则其值会映射到键为 server_section 的 UDM。 |
server_section_ikey |
target.resource.attribute.labels.value |
如果存在 server_section_ikey,则其值会映射到键为 server_section_ikey 的 UDM。 |
status |
security_result.action_details 和 security_result.action |
如果存在,则将该值映射到 security_result.action_details。“允许”转换为 security_result.action ALLOW,“拒绝”转换为 BLOCK。 |
timestamp |
metadata.event_timestamp 和 event.timestamp |
该值会转换为时间戳,并同时映射到 metadata.event_timestamp 和 event.timestamp。 |
txid |
metadata.product_log_id 和 network.session_id |
该值同时映射到 metadata.product_log_id 和 network.session_id。 |
user.groups |
target.user.group_identifiers |
数组中的所有值都会添加到 target.user.group_identifiers 中。 |
user.key |
target.user.product_object_id |
如果存在,则将该值映射到 UDM。 |
user.name |
target.user.userid |
如果存在,则将该值映射到 UDM。 |
username |
target.user.userid |
如果存在且 user.name 不存在,则该值会映射到 UDM。event_type 设置为 USER_LOGIN。 |
| (解析器逻辑) | metadata.vendor_name |
始终设置为“DUO_SECURITY”。 |
| (解析器逻辑) | metadata.product_name |
始终设置为“MULTI-FACTOR_AUTHENTICATION”。 |
| (解析器逻辑) | metadata.log_type |
取自原始日志的顶层 log_type 字段。 |
| (解析器逻辑) | extensions.auth.type |
始终设置为“SSO”。 |
需要更多帮助?从社区成员和 Google SecOps 专业人士那里获得解答。