收集 Duo 驗證記錄
本文說明如何將 Duo 驗證記錄匯入 Google Security Operations。剖析器會從 JSON 格式的訊息中擷取記錄。這項服務會將原始記錄資料轉換為統一資料模型 (UDM),對應使用者、裝置、應用程式、位置和驗證詳細資料等欄位,同時處理各種驗證因素和結果,將安全事件分類。剖析器也會執行資料清理、型別轉換和錯誤處理,確保資料品質和一致性。
你可以選擇下列兩種收集方法:
- 選項 1:使用第三方 API 直接擷取
- 方法 2:使用 AWS Lambda 和 Amazon S3 收集記錄
事前準備
- Google SecOps 執行個體
- Duo 管理員面板的特殊權限 (必須具備擁有者角色,才能建立 Admin API 應用程式)
- 如果使用選項 2,則需具備 AWS 特殊存取權
選項 1:使用第三方 API 擷取 Duo 驗證記錄
收集 Duo 必要條件 (API 憑證)
- 以管理員身分登入 Duo 管理面板,並具備「擁有者」、「管理員」或「應用程式管理員」角色。
- 依序前往「應用程式」>「應用程式目錄」。
- 在目錄中找到「Admin API」項目。
- 按一下「+ 新增」即可建立應用程式。
- 複製下列詳細資料並存放在安全位置:
- 整合金鑰
- 密鑰
- API 主機名稱 (例如
api-XXXXXXXX.duosecurity.com)
- 前往「權限」部分。
- 取消選取「授予讀取記錄」以外的所有權限選項。
- 按一下 [儲存變更]。
在 Google SecOps 中設定動態饋給,擷取 Duo 驗證記錄
- 依序前往「SIEM 設定」>「動態饋給」。
- 按一下「+ 新增動態消息」。
- 在「動態饋給名稱」欄位中輸入動態饋給名稱 (例如
Duo Authentication Logs)。 - 選取「第三方 API」做為「來源類型」。
- 選取「Duo Auth」做為「記錄類型」。
- 點選「下一步」。
- 指定下列輸入參數的值:
- 使用者名稱:輸入 Duo 的整合金鑰。
- 密鑰:輸入 Duo 的密鑰。
- API 主機名稱:輸入 API 主機名稱 (例如
api-XXXXXXXX.duosecurity.com)。 - 資產命名空間:選用。資產命名空間。
- 擷取標籤:選用。要套用至這個動態饋給事件的標籤。
- 點選「下一步」。
- 在「完成」畫面中檢查新的動態饋給設定,然後按一下「提交」。
方法 2:使用 AWS S3 擷取 Duo 驗證記錄
收集 Duo Admin API 憑證
- 登入 Duo 管理面板。
- 依序前往「Applications」>「Protect an Application」。
- 在應用程式目錄中找到「Admin API」。
- 按一下「保護」,新增 Admin API 應用程式。
- 複製並儲存下列值:
- 整合金鑰 (ikey)
- 密鑰 (skey)
- API 主機名稱 (例如
api-XXXXXXXX.duosecurity.com)
- 在「權限」中,啟用「授予讀取記錄」。
- 按一下 [儲存變更]。
為 Google SecOps 設定 AWS S3 值區和 IAM
- 按照這份使用者指南建立 Amazon S3 bucket:建立 bucket。
- 儲存 bucket 的「名稱」和「區域」,以供日後參考 (例如
duo-auth-logs)。 - 請按照這份使用者指南建立使用者:建立 IAM 使用者。
- 選取建立的「使用者」。
- 選取「安全憑證」分頁標籤。
- 在「Access Keys」部分中,按一下「Create Access Key」。
- 選取「第三方服務」做為「用途」。
- 點選「下一步」。
- 選用:新增說明標記。
- 按一下「建立存取金鑰」。
- 按一下「下載 CSV 檔案」,儲存「存取金鑰」和「私密存取金鑰」以供日後參考。
- 按一下 [完成]。
- 選取 [權限] 分頁標籤。
- 在「Permissions policies」(權限政策) 區段中,按一下「Add permissions」(新增權限)。
- 選取「新增權限」。
- 選取「直接附加政策」。
- 搜尋並選取 AmazonS3FullAccess 政策。
- 點選「下一步」。
- 按一下「Add permissions」。
設定 S3 上傳的身分與存取權管理政策和角色
- 在 AWS 控制台中,依序前往「IAM」>「Policies」>「Create policy」>「JSON」分頁標籤。
輸入下列政策:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutDuoAuthObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::duo-auth-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::duo-auth-logs/duo/auth/state.json" } ] }- 如果您輸入其他 bucket 名稱,請替換
duo-auth-logs。
- 如果您輸入其他 bucket 名稱,請替換
依序點選「下一步」>「建立政策」。
依序前往「IAM」>「Roles」>「Create role」>「AWS service」>「Lambda」。
附加新建立的政策。
為角色命名
WriteDuoAuthToS3Role,然後按一下「建立角色」。
建立 Lambda 函式
- 在 AWS 控制台中,依序前往「Lambda」>「Functions」。
- 依序點選「建立函式」>「從頭開始撰寫」。
請提供下列設定詳細資料:
設定 值 名稱 duo_auth_to_s3執行階段 Python 3.13 架構 x86_64 執行角色 WriteDuoAuthToS3Role建立函式後,開啟「程式碼」分頁,刪除存根並輸入下列程式碼 (
duo_auth_to_s3.py):#!/usr/bin/env python3 # Lambda: Pull Duo Admin API v2 Authentication Logs to S3 (raw JSON pages) # Notes: # - Duo v2 requires mintime/maxtime in *milliseconds* (13-digit epoch). # - Pagination via metadata.next_offset ("<millis>,<txid>"). # - We save state (mintime_ms) in ms to resume next run without gaps. import os, json, time, hmac, hashlib, base64, email.utils, urllib.parse from urllib.request import Request, urlopen from urllib.error import HTTPError, URLError import boto3 DUO_IKEY = os.environ["DUO_IKEY"] DUO_SKEY = os.environ["DUO_SKEY"] DUO_API_HOSTNAME = os.environ["DUO_API_HOSTNAME"].strip() S3_BUCKET = os.environ["S3_BUCKET"] S3_PREFIX = os.environ.get("S3_PREFIX", "duo/auth/").strip("/") STATE_KEY = os.environ.get("STATE_KEY", "duo/auth/state.json") LIMIT = min(int(os.environ.get("LIMIT", "500")), 1000) # default 100, max 1000 s3 = boto3.client("s3") def _canon_params(params: dict) -> str: parts = [] for k in sorted(params.keys()): v = params[k] if v is None: continue parts.append(f"{urllib.parse.quote(str(k), '~')}={urllib.parse.quote(str(v), '~')}") return "&".join(parts) def _sign(method: str, host: str, path: str, params: dict) -> dict: now = email.utils.formatdate() canon = "\n".join([now, method.upper(), host.lower(), path, _canon_params(params)]) sig = hmac.new(DUO_SKEY.encode("utf-8"), canon.encode("utf-8"), hashlib.sha1).hexdigest() auth = base64.b64encode(f"{DUO_IKEY}:{sig}".encode()).decode() return {"Date": now, "Authorization": f"Basic {auth}"} def _http(method: str, path: str, params: dict, timeout: int = 60, max_retries: int = 5) -> dict: host = DUO_API_HOSTNAME assert host.startswith("api-") and host.endswith(".duosecurity.com"), \ "DUO_API_HOSTNAME must be like api-XXXXXXXX.duosecurity.com" qs = _canon_params(params) url = f"https://{host}{path}" + (f"?{qs}" if qs else "") attempt, backoff = 0, 1.0 while True: req = Request(url, method=method.upper()) req.add_header("Accept", "application/json") for k, v in _sign(method, host, path, params).items(): req.add_header(k, v) try: with urlopen(req, timeout=timeout) as r: return json.loads(r.read().decode("utf-8")) except HTTPError as e: if (e.code == 429 or 500 <= e.code <= 599) and attempt < max_retries: time.sleep(backoff); attempt += 1; backoff *= 2; continue raise except URLError: if attempt < max_retries: time.sleep(backoff); attempt += 1; backoff *= 2; continue raise def _read_state_ms() -> int | None: try: obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY) val = json.loads(obj["Body"].read()).get("mintime") if val is None: return None # Backward safety: if seconds were stored, convert to ms return int(val) * 1000 if len(str(int(val))) <= 10 else int(val) except Exception: return None def _write_state_ms(mintime_ms: int): body = json.dumps({"mintime": int(mintime_ms)}).encode("utf-8") s3.put_object(Bucket=S3_BUCKET, Key=STATE_KEY, Body=body, ContentType="application/json") def _write_page(payload: dict, when_epoch_s: int, page: int) -> str: key = f"{S3_PREFIX}/{time.strftime('%Y/%m/%d', time.gmtime(when_epoch_s))}/duo-auth-{page:05d}.json" s3.put_object( Bucket=S3_BUCKET, Key=key, Body=json.dumps(payload, separators=(",", ":")).encode("utf-8"), ContentType="application/json", ) return key def fetch_and_store(): now_s = int(time.time()) # Duo recommends a ~2-minute delay buffer; use maxtime = now - 120 seconds (in ms) maxtime_ms = (now_s - 120) * 1000 mintime_ms = _read_state_ms() or (maxtime_ms - 3600 * 1000) # 1 hour on first run page = 0 total = 0 next_offset = None while True: params = {"mintime": mintime_ms, "maxtime": maxtime_ms, "limit": LIMIT} if next_offset: params["next_offset"] = next_offset data = _http("GET", "/admin/v2/logs/authentication", params) _write_page(data, maxtime_ms // 1000, page) page += 1 resp = data.get("response") items = resp if isinstance(resp, list) else [] total += len(items) meta = data.get("metadata") or {} next_offset = meta.get("next_offset") if not next_offset: break # Advance window to maxtime_ms for next run _write_state_ms(maxtime_ms) return {"ok": True, "pages": page, "events": total, "next_mintime_ms": maxtime_ms} def lambda_handler(event=None, context=None): return fetch_and_store() if __name__ == "__main__": print(lambda_handler())依序前往「設定」>「環境變數」。
依序點選「編輯」> 新增環境變數。
輸入下列環境變數,並將 換成您的值。
鍵 範例值 S3_BUCKETduo-auth-logsS3_PREFIXduo/auth/STATE_KEYduo/auth/state.jsonDUO_IKEYDIXYZ...DUO_SKEY****************DUO_API_HOSTNAMEapi-XXXXXXXX.duosecurity.comLIMIT500建立函式後,請留在函式頁面 (或依序開啟「Lambda」>「Functions」>「your-function」)。
選取「設定」分頁標籤。
在「一般設定」面板中,按一下「編輯」。
將「Timeout」(逾時間隔) 變更為「5 minutes (300 seconds)」(5 分鐘 (300 秒)),然後按一下「Save」(儲存)。
建立 EventBridge 排程
- 依序前往「Amazon EventBridge」>「Scheduler」>「Create schedule」。
- 提供下列設定詳細資料:
- 週期性時間表:費率 (
1 hour)。 - 目標:您的 Lambda 函式
duo_auth_to_s3。 - 名稱:
duo-auth-1h。
- 週期性時間表:費率 (
- 按一下「建立時間表」。
為 Google SecOps 建立唯讀 IAM 使用者和金鑰
- 在 AWS 控制台中,依序前往「IAM」>「Users」>「Add users」。
- 點選 [Add users] (新增使用者)。
- 提供下列設定詳細資料:
- 使用者:
secops-reader - 存取類型:存取金鑰 - 程式輔助存取
- 使用者:
- 按一下「建立使用者」。
- 附加最低讀取權限政策 (自訂):依序選取「Users」(使用者) >「secops-reader」>「Permissions」(權限) >「Add permissions」(新增權限) >「Attach policies directly」(直接附加政策) >「Create policy」(建立政策)。
在 JSON 編輯器中輸入下列政策:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::duo-auth-logs/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::duo-auth-logs" } ] }將名稱設為
secops-reader-policy。依序前往「建立政策」> 搜尋/選取 >「下一步」>「新增權限」。
依序前往「安全憑證」>「存取金鑰」>「建立存取金鑰」。
下載 CSV (這些值會輸入至動態饋給)。
在 Google SecOps 中設定動態饋給,擷取 Duo 驗證記錄
- 依序前往「SIEM 設定」>「動態饋給」。
- 按一下「+ 新增動態消息」。
- 在「動態饋給名稱」欄位中輸入動態饋給名稱 (例如
Duo Authentication Logs)。 - 選取「Amazon S3 V2」做為「來源類型」。
- 選取「Duo Auth」做為「記錄類型」。
- 點選「下一步」。
- 指定下列輸入參數的值:
- S3 URI:
s3://duo-auth-logs/duo/auth/ - 來源刪除選項:根據偏好設定選取刪除選項。
- 檔案存在時間上限:包含在過去天數內修改的檔案。預設值為 180 天。
- 存取金鑰 ID:具有 S3 值區存取權的使用者存取金鑰。
- 存取密鑰:具有 S3 bucket 存取權的使用者私密金鑰。
- 資產命名空間:資產命名空間。
- 擷取標籤:套用至這個動態饋給事件的標籤。
- S3 URI:
- 點選「下一步」。
- 在「完成」畫面中檢查新的動態饋給設定,然後按一下「提交」。
UDM 對應表
| 記錄欄位 | UDM 對應 | 邏輯 |
|---|---|---|
access_device.browser |
target.resource.attribute.labels.value |
如果存在 access_device.browser,系統會將其值對應至 UDM。 |
access_device.hostname |
principal.hostname |
如果 access_device.hostname 存在且不為空白,系統會將其值對應至 UDM。如果為空值且 event_type 為 USER_CREATION,則 event_type 會變更為 USER_UNCATEGORIZED。如果 access_device.hostname 為空值,且 hostname 欄位存在,系統會使用 hostname 的值。 |
access_device.ip |
principal.ip |
如果 access_device.ip 存在且為有效的 IPv4 位址,系統會將其值對應至 UDM。如果不是有效的 IPv4 位址,系統會將其做為字串值新增至 additional.fields,並使用鍵 access_device.ip。 |
access_device.location.city |
principal.location.city |
如有,則該值會對應至 UDM。 |
access_device.location.country |
principal.location.country_or_region |
如有,則該值會對應至 UDM。 |
access_device.location.state |
principal.location.state |
如有,則該值會對應至 UDM。 |
access_device.os |
principal.platform |
如果存在,該值會轉換為對應的 UDM 值 (MAC、WINDOWS、LINUX)。 |
access_device.os_version |
principal.platform_version |
如有,則該值會對應至 UDM。 |
application.key |
target.resource.id |
如有,則該值會對應至 UDM。 |
application.name |
target.application |
如有,則該值會對應至 UDM。 |
auth_device.ip |
target.ip |
如果存在且不是「無」,則值會對應至 UDM。 |
auth_device.location.city |
target.location.city |
如有,則該值會對應至 UDM。 |
auth_device.location.country |
target.location.country_or_region |
如有,則該值會對應至 UDM。 |
auth_device.location.state |
target.location.state |
如有,則該值會對應至 UDM。 |
auth_device.name |
target.hostname 或 target.user.phone_numbers |
如果 auth_device.name 存在且是電話號碼 (正規化後),系統會將其新增至 target.user.phone_numbers。否則會對應至 target.hostname。 |
client_ip |
target.ip |
如果存在且不是「無」,則值會對應至 UDM。 |
client_section |
target.resource.attribute.labels.value |
如果存在 client_section,系統會將其值對應至 UDM,並使用 client_section 做為鍵。 |
dn |
target.user.userid |
如果存在 dn,但不存在 user.name 和 username,系統會使用 grok 從 dn 欄位擷取 userid,並對應至 UDM。event_type 設為 USER_LOGIN。 |
event_type |
「metadata.product_event_type」和「metadata.event_type」 |
該值會對應至 metadata.product_event_type。這項屬性也用於判斷 metadata.event_type:「驗證」會變成 USER_LOGIN,「註冊」會變成 USER_CREATION,如果屬性空白或不是上述任一值,則會變成 GENERIC_EVENT。 |
factor |
「extensions.auth.mechanism」和「extensions.auth.auth_details」 |
這個值會轉換為對應的 UDM auth.mechanism 值 (HARDWARE_KEY、REMOTE_INTERACTIVE、LOCAL、OTP)。原始值也會對應至 extensions.auth.auth_details。 |
hostname |
principal.hostname |
如果存在且 access_device.hostname 為空,則值會對應至 UDM。 |
log_format |
target.resource.attribute.labels.value |
如果存在 log_format,系統會將其值對應至 UDM,並使用 log_format 做為鍵。 |
log_level.__class_uuid__ |
target.resource.attribute.labels.value |
如果存在 log_level.__class_uuid__,其值會對應至 UDM,並使用鍵 __class_uuid__。 |
log_level.name |
「target.resource.attribute.labels.value」和「security_result.severity」 |
如果存在 log_level.name,其值會對應至 UDM,並以 name 做為鍵。如果值為「info」,security_result.severity 會設為 INFORMATIONAL。 |
log_logger.unpersistable |
target.resource.attribute.labels.value |
如果存在 log_logger.unpersistable,其值會對應至 UDM,並使用鍵 unpersistable。 |
log_namespace |
target.resource.attribute.labels.value |
如果存在 log_namespace,系統會將其值對應至 UDM,並使用 log_namespace 做為鍵。 |
log_source |
target.resource.attribute.labels.value |
如果存在 log_source,系統會將其值對應至 UDM,並使用 log_source 做為鍵。 |
msg |
security_result.summary |
如果存在且 reason 為空,則值會對應至 UDM。 |
reason |
security_result.summary |
如有,則該值會對應至 UDM。 |
result |
「security_result.action_details」和「security_result.action」 |
如有,則值會對應至 security_result.action_details。「success」或「SUCCESS」會轉譯為 security_result.action ALLOW,否則為 BLOCK。 |
server_section |
target.resource.attribute.labels.value |
如果存在 server_section,系統會將其值對應至 UDM,並使用 server_section 做為鍵。 |
server_section_ikey |
target.resource.attribute.labels.value |
如果存在 server_section_ikey,系統會將其值對應至 UDM,並使用 server_section_ikey 做為鍵。 |
status |
「security_result.action_details」和「security_result.action」 |
如有的話,該值會對應至 security_result.action_details。「允許」會轉譯為 security_result.action ALLOW,「拒絕」則會轉譯為 BLOCK。 |
timestamp |
「metadata.event_timestamp」和「event.timestamp」 |
這個值會轉換為時間戳記,並對應至 metadata.event_timestamp 和 event.timestamp。 |
txid |
「metadata.product_log_id」和「network.session_id」 |
這個值會同時對應至 metadata.product_log_id 和 network.session_id。 |
user.groups |
target.user.group_identifiers |
陣列中的所有值都會加到 target.user.group_identifiers。 |
user.key |
target.user.product_object_id |
如有,則該值會對應至 UDM。 |
user.name |
target.user.userid |
如有,則該值會對應至 UDM。 |
username |
target.user.userid |
如果存在且不是 user.name,則值會對應至 UDM。event_type 設為 USER_LOGIN。 |
| (剖析器邏輯) | metadata.vendor_name |
一律設為「DUO_SECURITY」。 |
| (剖析器邏輯) | metadata.product_name |
一律設為「MULTI-FACTOR_AUTHENTICATION」。 |
| (剖析器邏輯) | metadata.log_type |
取自原始記錄檔的頂層 log_type 欄位。 |
| (剖析器邏輯) | extensions.auth.type |
一律設為「SSO」。 |
還有其他問題嗎?向社群成員和 Google SecOps 專業人員尋求答案。