收集 Duo 電話記錄
本文說明如何使用 Google Cloud Storage,將 Duo Telephony 記錄擷取至 Google Security Operations。剖析器會從記錄檔中擷取欄位,並轉換及對應至統合資料模型 (UDM)。這項工具可處理各種 Duo 記錄格式、轉換時間戳記、擷取使用者資訊、網路詳細資料和安全性結果,最後將輸出內容轉換為標準化 UDM 格式。
事前準備
請確認您已完成下列事前準備事項:
- Google SecOps 執行個體
- 已啟用 Cloud Storage API 的 GCP 專案
- 建立及管理 GCS 值區的權限
- 管理 Google Cloud Storage 值區 IAM 政策的權限
- 建立 Cloud Run 服務、Pub/Sub 主題和 Cloud Scheduler 工作的權限
- 透過擁有者角色取得 Duo 管理面板的特殊存取權
收集 Duo 必要條件 (API 憑證)
- 以擁有者角色的管理員身分登入 Duo 管理面板。
- 依序前往「應用程式」>「應用程式目錄」。
- 在目錄中找到「Admin API」項目。
- 按一下「+ 新增」建立應用程式。
- 複製並儲存以下詳細資料,存放在安全的位置:
- 整合金鑰
- 密鑰
- API 主機名稱 (例如
api-yyyyyyyy.duosecurity.com)
- 在「權限」部分,只勾選「讀取電話」權限核取方塊,取消選取所有其他權限。
- 按一下 [儲存變更]。
驗證權限
如要確認 Admin API 應用程式是否具備必要權限,請按照下列步驟操作:
- 登入 Duo 管理面板。
- 依序前往「應用程式」>「保護應用程式」。
- 找出 Admin API 應用程式。
- 按一下應用程式名稱即可查看詳細資料。
- 在「權限」部分,確認只有「讀取電話」已勾選。
- 如果勾選了其他權限,或未勾選「讀取電話」,請更新權限並點選「儲存變更」。
測試 API 存取權
請先測試憑證,再繼續進行整合:
# Replace with your actual credentials DUO_IKEY="your-integration-key" DUO_SKEY="your-secret-key" DUO_HOST="api-yyyyyyyy.duosecurity.com" # Test API access (requires signing - use Duo's API test tool or Python script) # Visit https://duo.com/docs/adminapi#testing to test your credentials
建立 Google Cloud Storage 值區
- 前往 Google Cloud 控制台。
- 選取專案或建立新專案。
- 在導覽選單中,依序前往「Cloud Storage」>「Bucket」。
- 按一下「建立值區」。
請提供下列設定詳細資料:
設定 值 為 bucket 命名 輸入全域不重複的名稱 (例如 duo-telephony-logs)位置類型 根據需求選擇 (區域、雙區域、多區域) 位置 選取位置 (例如 us-central1)儲存空間級別 標準 (建議用於經常存取的記錄) 存取控管 統一 (建議) 保護工具 選用:啟用物件版本管理或保留政策 點選「建立」。
為 Cloud Run 函式建立服務帳戶
Cloud Run 函式需要具備 GCS bucket 寫入權限的服務帳戶,並由 Pub/Sub 叫用。
建立服務帳戶
- 在 GCP 主控台中,依序前往「IAM & Admin」(IAM 與管理) >「Service Accounts」(服務帳戶)。
- 按一下 [Create Service Account] (建立服務帳戶)。
- 請提供下列設定詳細資料:
- 服務帳戶名稱:輸入
duo-telephony-collector-sa。 - 服務帳戶說明:輸入
Service account for Cloud Run function to collect Duo Telephony logs。
- 服務帳戶名稱:輸入
- 按一下「建立並繼續」。
- 在「將專案存取權授予這個服務帳戶」部分,新增下列角色:
- 按一下「選擇角色」。
- 搜尋並選取「Storage 物件管理員」。
- 點選「+ 新增其他角色」。
- 搜尋並選取「Cloud Run Invoker」。
- 點選「+ 新增其他角色」。
- 搜尋並選取「Cloud Functions Invoker」(Cloud Functions 叫用者)。
- 按一下「繼續」。
- 按一下 [完成]。
這些角色適用於:
- Storage 物件管理員:將記錄檔寫入 GCS 值區,並管理狀態檔案
- Cloud Run 叫用者:允許 Pub/Sub 叫用函式
- Cloud Functions 叫用者:允許函式叫用
授予 GCS 值區的 IAM 權限
授予服務帳戶 GCS bucket 的寫入權限:
- 依序前往「Cloud Storage」>「Buckets」。
- 按一下 bucket 名稱 (例如
duo-telephony-logs)。 - 前往「權限」分頁標籤。
- 按一下「授予存取權」。
- 請提供下列設定詳細資料:
- 新增主體:輸入服務帳戶電子郵件地址 (例如
duo-telephony-collector-sa@PROJECT_ID.iam.gserviceaccount.com)。 - 指派角色:選取「Storage 物件管理員」。
- 新增主體:輸入服務帳戶電子郵件地址 (例如
- 按一下 [儲存]。
建立 Pub/Sub 主題
建立 Pub/Sub 主題,Cloud Scheduler 會將訊息發布至該主題,而 Cloud Run 函式會訂閱該主題。
- 在 GCP Console 中,前往「Pub/Sub」>「Topics」(主題)。
- 按一下「建立主題」。
- 請提供下列設定詳細資料:
- 主題 ID:輸入
duo-telephony-trigger。 - 其他設定保留預設值。
- 主題 ID:輸入
- 點選「建立」。
建立 Cloud Run 函式來收集記錄
Cloud Run 函式會由 Cloud Scheduler 傳送的 Pub/Sub 訊息觸發,從 Duo Telephony API 擷取記錄,並寫入 GCS。
- 前往 GCP Console 的「Cloud Run」。
- 按一下「Create service」(建立服務)。
- 選取「函式」 (使用內嵌編輯器建立函式)。
在「設定」部分,提供下列設定詳細資料:
設定 值 服務名稱 duo-telephony-logs-collector區域 選取與 GCS bucket 相符的區域 (例如 us-central1)執行階段 選取「Python 3.12」以上版本 在「Trigger (optional)」(觸發條件 (選用)) 專區:
- 按一下「+ 新增觸發條件」。
- 選取「Cloud Pub/Sub」。
- 在「選取 Cloud Pub/Sub 主題」中,選擇 Pub/Sub 主題 (
duo-telephony-trigger)。 - 按一下 [儲存]。
在「Authentication」(驗證) 部分:
- 選取「需要驗證」。
- 檢查 Identity and Access Management (IAM)。
向下捲動並展開「Containers, Networking, Security」。
前往「安全性」分頁:
- 服務帳戶:選取服務帳戶 (
duo-telephony-collector-sa)。
- 服務帳戶:選取服務帳戶 (
前往「容器」分頁:
- 按一下「變數與密鑰」。
- 針對每個環境變數,按一下「+ 新增變數」:
變數名稱 範例值 GCS_BUCKETduo-telephony-logsGCS_PREFIXduo-telephonySTATE_KEYduo-telephony/state.jsonDUO_IKEY<your-integration-key>DUO_SKEY<your-secret-key>DUO_API_HOSTapi-yyyyyyyy.duosecurity.comMAX_ITERATIONS10在「變數與密鑰」分頁中向下捲動至「要求」:
- 要求逾時:輸入
600秒 (10 分鐘)。
- 要求逾時:輸入
前往「容器」中的「設定」分頁:
- 在「資源」部分:
- 記憶體:選取 512 MiB 以上。
- CPU:選取 1。
- 按一下 [完成]。
- 在「資源」部分:
捲動至「執行環境」:
- 選取「預設」 (建議選項)。
在「修訂版本資源調度」部分:
- 執行個體數量下限:輸入
0。 - 「Maximum number of instances」(執行個體數量上限):輸入
100(或根據預期負載調整)。
- 執行個體數量下限:輸入
點選「建立」。
等待服務建立完成 (1 到 2 分鐘)。
服務建立完成後,系統會自動開啟內嵌程式碼編輯器。
新增函式程式碼
- 在「進入點」欄位中輸入「main」。
在內嵌程式碼編輯器中建立兩個檔案:
- 第一個檔案:main.py:
import functions_framework from google.cloud import storage import json import os import hmac import hashlib import base64 import urllib.parse import urllib3 import email.utils from datetime import datetime, timedelta, timezone from typing import Dict, Any, List, Optional # Initialize HTTP client with timeouts http = urllib3.PoolManager( timeout=urllib3.Timeout(connect=5.0, read=30.0), retries=False, ) # Initialize Storage client storage_client = storage.Client() @functions_framework.cloud_event def main(cloud_event): """ Cloud Run function triggered by Pub/Sub to fetch Duo telephony logs and write to GCS. Args: cloud_event: CloudEvent object containing Pub/Sub message """ # Get environment variables bucket_name = os.environ.get('GCS_BUCKET') prefix = os.environ.get('GCS_PREFIX', 'duo-telephony').rstrip('/') state_key = os.environ.get('STATE_KEY', 'duo-telephony/state.json') integration_key = os.environ.get('DUO_IKEY') secret_key = os.environ.get('DUO_SKEY') api_hostname = os.environ.get('DUO_API_HOST') if not all([bucket_name, integration_key, secret_key, api_hostname]): print('Error: Missing required environment variables') return try: # Get GCS bucket bucket = storage_client.bucket(bucket_name) # Load state state = load_state(bucket, state_key) # Calculate time range now = datetime.now(timezone.utc) if state.get('last_offset'): # Continue from last offset next_offset = state['last_offset'] logs = [] has_more = True else: # Start from last timestamp or 24 hours ago mintime = state.get('last_timestamp_ms', int((now - timedelta(hours=24)).timestamp() * 1000)) # Apply 2-minute delay as recommended by Duo maxtime = int((now - timedelta(minutes=2)).timestamp() * 1000) next_offset = None logs = [] has_more = True # Fetch logs with pagination total_fetched = 0 max_iterations = int(os.environ.get('MAX_ITERATIONS', '10')) while has_more and total_fetched < max_iterations: page_num = total_fetched + 1 if next_offset: # Use offset for pagination params = { 'limit': '100', 'next_offset': next_offset } else: # Initial request with time range params = { 'mintime': str(mintime), 'maxtime': str(maxtime), 'limit': '100', 'sort': 'ts:asc' } # Make API request with retry logic response = duo_api_call_with_retry( 'GET', api_hostname, '/admin/v2/logs/telephony', params, integration_key, secret_key ) if 'items' in response: logs.extend(response['items']) total_fetched += 1 # Check for more data if 'metadata' in response and 'next_offset' in response['metadata']: next_offset = response['metadata']['next_offset'] state['last_offset'] = next_offset else: has_more = False state['last_offset'] = None # Update timestamp for next run if logs: # Get the latest timestamp from logs (ISO 8601 format) latest_ts = max([log.get('ts', '') for log in logs]) if latest_ts: # Convert ISO timestamp to milliseconds dt = datetime.fromisoformat(latest_ts.replace('Z', '+00:00')) state['last_timestamp_ms'] = int(dt.timestamp() * 1000) + 1 # Save logs to GCS if any were fetched if logs: timestamp = datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S') blob_name = f"{prefix}/telephony_{timestamp}.json" # Format logs as newline-delimited JSON log_data = '\n'.join(json.dumps(log) for log in logs) blob = bucket.blob(blob_name) blob.upload_from_string( log_data, content_type='application/x-ndjson' ) print(f"Saved {len(logs)} telephony logs to gs://{bucket_name}/{blob_name}") else: print("No new telephony logs found") # Save state save_state(bucket, state_key, state) except Exception as e: print(f'Error processing logs: {str(e)}') raise def duo_api_call_with_retry( method: str, host: str, path: str, params: Dict[str, str], ikey: str, skey: str, max_retries: int = 3 ) -> Dict[str, Any]: """Make an authenticated API call to Duo Admin API with retry logic.""" for attempt in range(max_retries): try: return duo_api_call(method, host, path, params, ikey, skey) except Exception as e: if '429' in str(e) or '5' in str(e)[:1]: if attempt < max_retries - 1: wait_time = (2 ** attempt) * 2 print(f"Retrying after {wait_time} seconds...") import time time.sleep(wait_time) continue raise def duo_api_call( method: str, host: str, path: str, params: Dict[str, str], ikey: str, skey: str ) -> Dict[str, Any]: """Make an authenticated API call to Duo Admin API.""" # Create canonical string for signing using RFC 2822 date format now = email.utils.formatdate() canon = [now, method.upper(), host.lower(), path] # Add parameters args = [] for key in sorted(params.keys()): val = params[key] args.append(f"{urllib.parse.quote(key, '~')}={urllib.parse.quote(val, '~')}") canon.append('&'.join(args)) canon_str = '\n'.join(canon) # Sign the request sig = hmac.new( skey.encode('utf-8'), canon_str.encode('utf-8'), hashlib.sha1 ).hexdigest() # Create authorization header auth = base64.b64encode(f"{ikey}:{sig}".encode('utf-8')).decode('utf-8') # Build URL url = f"https://{host}{path}" if params: url += '?' + '&'.join(args) # Make request headers = { 'Authorization': f'Basic {auth}', 'Date': now, 'Host': host, 'User-Agent': 'duo-telephony-gcs-ingestor/1.0' } try: response = http.request('GET', url, headers=headers) data = json.loads(response.data.decode('utf-8')) if data.get('stat') == 'OK': return data.get('response', {}) else: raise Exception(f"API error: {data.get('message', 'Unknown error')}") except urllib3.exceptions.HTTPError as e: raise Exception(f"HTTP error: {str(e)}") def load_state(bucket, key): """Load state from GCS.""" try: blob = bucket.blob(key) if blob.exists(): state_data = blob.download_as_text() return json.loads(state_data) except Exception as e: print(f'Warning: Could not load state: {str(e)}') return {} def save_state(bucket, key, state): """Save state to GCS.""" try: blob = bucket.blob(key) blob.upload_from_string( json.dumps(state), content_type='application/json' ) except Exception as e: print(f'Warning: Could not save state: {str(e)}')- 第二個檔案:requirements.txt:
functions-framework==3.* google-cloud-storage==2.* urllib3>=2.0.0點選「部署」來儲存並部署函式。
等待部署作業完成 (2 到 3 分鐘)。
建立 Cloud Scheduler 工作
Cloud Scheduler 會定期將訊息發布至 Pub/Sub 主題,觸發 Cloud Run 函式。
- 前往 GCP 主控台的「Cloud Scheduler」。
- 點選「建立工作」。
請提供下列設定詳細資料:
設定 值 名稱 duo-telephony-logs-1h區域 選取與 Cloud Run 函式相同的區域 頻率 0 * * * *(每小時整點)時區 選取時區 (建議使用世界標準時間) 目標類型 Pub/Sub 主題 選取 Pub/Sub 主題 ( duo-telephony-trigger)郵件內文 {}(空白 JSON 物件)點選「建立」。
排程頻率選項
根據記錄檔量和延遲時間要求選擇頻率:
頻率 Cron 運算式 用途 每 5 分鐘 */5 * * * *高容量、低延遲 每 15 分鐘檢查一次 */15 * * * *普通量 每小時 0 * * * *標準 (建議採用) 每 6 小時 0 */6 * * *少量、批次處理 每日 0 0 * * *歷來資料集合
測試排程器工作
- 在 Cloud Scheduler 控制台中,找出您的作業 (
duo-telephony-logs-1h)。 - 按一下「強制執行」即可手動觸發。
- 稍候片刻,然後前往「Cloud Run」>「服務」。
- 按一下函式名稱 (
duo-telephony-logs-collector)。 - 按一下 [Logs] (記錄) 分頁標籤。
- 確認函式是否已順利執行。
- 依序前往「Cloud Storage」>「Buckets」。
- 按一下 bucket 名稱 (
duo-telephony-logs)。 - 前往前置字元資料夾 (
duo-telephony/)。 - 確認是否已建立含有記錄的新
.json檔案。
如果在記錄中發現錯誤:
- HTTP 401:檢查環境變數中的 API 憑證 (DUO_IKEY、DUO_SKEY、DUO_API_HOST)
- HTTP 403:確認 Admin API 應用程式已啟用「讀取電話」權限
- HTTP 429:頻率限制 - 函式會自動重試,並採用指數輪詢間隔
- 缺少環境變數:檢查 Cloud Run 函式設定中是否已設定所有必要變數
擷取 Google SecOps 服務帳戶
Google SecOps 會使用專屬服務帳戶,從 GCS bucket 讀取資料。您必須授予這個服務帳戶值區存取權。
取得服務帳戶電子郵件地址
- 依序前往「SIEM 設定」>「動態饋給」。
- 按一下「新增動態消息」。
- 按一下「設定單一動態饋給」。
- 在「動態饋給名稱」欄位中輸入動態饋給名稱 (例如
Duo Telephony logs)。 - 選取「Google Cloud Storage V2」做為「來源類型」。
- 選取「Duo 電話記錄」做為「記錄類型」。
按一下「取得服務帳戶」。系統會顯示專屬的服務帳戶電子郵件地址,例如:
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com複製這個電子郵件地址,以便在下一步中使用。
將 IAM 權限授予 Google SecOps 服務帳戶
Google SecOps 服務帳戶需要 GCS bucket 的「Storage 物件檢視者」角色。
- 依序前往「Cloud Storage」>「Buckets」。
- 按一下 bucket 名稱 (例如
duo-telephony-logs)。 - 前往「權限」分頁標籤。
- 按一下「授予存取權」。
- 請提供下列設定詳細資料:
- 新增主體:貼上 Google SecOps 服務帳戶電子郵件地址。
- 指派角色:選取「Storage 物件檢視者」。
按一下 [儲存]。
在 Google SecOps 中設定動態饋給,以便擷取 Duo 電話記錄
- 依序前往「SIEM 設定」>「動態饋給」。
- 按一下「新增動態消息」。
- 按一下「設定單一動態饋給」。
- 在「動態饋給名稱」欄位中輸入動態饋給名稱 (例如
Duo Telephony logs)。 - 選取「Google Cloud Storage V2」做為「來源類型」。
- 選取「Duo 電話記錄」做為「記錄類型」。
- 點選 [下一步]。
指定下列輸入參數的值:
儲存空間 bucket URL:輸入 GCS bucket URI,並加上前置路徑:
gs://duo-telephony-logs/duo-telephony/取代:
duo-telephony-logs:您的 GCS bucket 名稱。duo-telephony:儲存記錄的選用前置字元/資料夾路徑 (如為根目錄,請留空)。
範例:
- 根層級 bucket:
gs://duo-telephony-logs/ - 前置字串:
gs://duo-telephony-logs/duo-telephony/
- 根層級 bucket:
來源刪除選項:根據偏好設定選取刪除選項:
- 永不:移轉後一律不刪除任何檔案 (建議用於測試)。
- 刪除已轉移的檔案:成功轉移檔案後刪除檔案。
- 刪除已轉移的檔案和空白目錄:成功轉移後刪除檔案和空白目錄。
檔案存在時間上限:包含在過去天數內修改的檔案。預設值為 180 天。
資產命名空間:資產命名空間。
擷取標籤:要套用至這個動態饋給事件的標籤。
點選 [下一步]。
在「Finalize」(完成) 畫面中檢查新的動態饋給設定,然後按一下「Submit」(提交)。
UDM 對應表
| 記錄欄位 | UDM 對應 | 邏輯 |
|---|---|---|
| 背景資訊 | metadata.product_event_type | 直接從原始記錄的內容欄位對應。 |
| 抵免額 | security_result.detection_fields.value | 直接從原始記錄的 credits 欄位對應,並巢狀內嵌在 detection_fields 物件中,且具有相應的鍵 credits。 |
| eventtype | security_result.detection_fields.value | 直接從原始記錄中的 eventtype 欄位對應,巢狀結構位於 detection_fields 物件下方,並具有對應的鍵 eventtype。 |
| 主機 | principal.hostname | 如果不是 IP 位址,則直接從原始記錄的「主機」欄位對應。 |
| security_result.action | 在剖析器中設為「ALLOW」的靜態值。 | |
| security_result.detection_fields.value | 在剖析器中設為「MECHANISM_UNSPECIFIED」的靜態值。 | |
| metadata.event_timestamp | 從原始記錄的 ts 欄位剖析而來,為 ISO 8601 時間戳記字串。 | |
| metadata.event_type | 如果原始記錄中同時存在內容和主機欄位,請設為「USER_UNCATEGORIZED」。如果只有主機存在,請設為「STATUS_UPDATE」。否則請設為「GENERIC_EVENT」。 | |
| metadata.log_type | 直接取自原始記錄的 log_type 欄位。 | |
| metadata.product_name | 在剖析器中設為「Telephony」的靜態值。 | |
| metadata.vendor_name | 在剖析器中設為「Duo」的靜態值。 | |
| 手機 | principal.user.phone_numbers | 直接從原始記錄中的電話欄位對應。 |
| 手機 | principal.user.userid | 直接從原始記錄中的電話欄位對應。 |
| security_result.severity | 在剖析器中設為「INFORMATIONAL」的靜態值。 | |
| security_result.summary | 在剖析器中設為「Duo Telephony」的靜態值。 | |
| ts | metadata.event_timestamp | 從原始記錄的 ts 欄位剖析而來,為 ISO 8601 時間戳記字串。 |
| 類型 | security_result.summary | 直接從原始記錄的類型欄位對應。 |
需要其他協助嗎?向社群成員和 Google SecOps 專業人員尋求答案。