收集 Duo Telephony 日志
本文档介绍了如何使用 Amazon S3 将 Duo Telephony 日志注入到 Google Security Operations。解析器从日志中提取字段,并将其转换和映射到统一数据模型 (UDM)。它可处理各种 Duo 日志格式,转换时间戳,提取用户信息、网络详细信息和安全结果,最后将输出内容整理为标准化的 UDM 格式。
准备工作
请确保满足以下前提条件:
- Google SecOps 实例。
- 拥有所有者角色的用户对 Duo 管理面板的专属访问权限。
- 对 AWS(S3、Identity and Access Management (IAM)、Lambda、EventBridge)的特权访问权限。
收集 Duo 前提条件(API 凭据)
- 以拥有所有者角色的管理员身份登录 Duo 管理控制台。
- 前往应用 > 应用目录。
- 在目录中找到 Admin API 的条目。
- 点击 + 添加以创建应用。
- 复制以下详细信息并将其保存在安全的位置:
- 集成密钥
- 密钥
- API 主机名(例如
api-yyyyyyyy.duosecurity.com
)
- 在权限部分,取消选择除授予读取日志权限之外的所有权限选项。
- 点击保存更改。
为 Google SecOps 配置 AWS S3 存储桶和 IAM
- 按照以下用户指南创建 Amazon S3 存储桶:创建存储桶
- 保存存储桶名称和区域以供日后参考(例如
duo-telephony-logs
)。 - 按照以下用户指南创建用户:创建 IAM 用户。
- 选择创建的用户。
- 选择安全凭据标签页。
- 在访问密钥部分中,点击创建访问密钥。
- 选择第三方服务作为使用情形。
- 点击下一步。
- 可选:添加说明标记。
- 点击创建访问密钥。
- 点击下载 .CSV 文件,保存访问密钥和秘密访问密钥,以供日后参考。
- 点击完成。
- 选择权限标签页。
- 在权限政策部分中,点击添加权限。
- 选择添加权限。
- 选择直接附加政策。
- 搜索 AmazonS3FullAccess 政策。
- 选择相应政策。
- 点击下一步。
- 点击添加权限。
为 S3 上传配置 IAM 政策和角色
- 在 AWS 控制台中,依次前往 IAM > 政策。
- 依次点击创建政策 > JSON 标签页。
- 复制并粘贴以下政策。
政策 JSON(如果您输入了其他存储桶名称,请替换
duo-telephony-logs
):{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::duo-telephony-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::duo-telephony-logs/duo-telephony/state.json" } ] }
依次点击下一步 > 创建政策。
依次前往 IAM > 角色 > 创建角色 > AWS 服务 > Lambda。
附加新创建的政策。
将角色命名为
duo-telephony-lambda-role
,然后点击创建角色。
创建 Lambda 函数
- 在 AWS 控制台中,依次前往 Lambda > 函数 > 创建函数。
- 点击从头开始创作。
提供以下配置详细信息:
设置 值 名称 duo-telephony-logs-collector
运行时 Python 3.13 架构 x86_64 执行角色 duo-telephony-lambda-role
创建函数后,打开 Code 标签页,删除桩代码并粘贴以下代码 (
duo-telephony-logs-collector.py
)。import json import boto3 import os import hmac import hashlib import base64 import urllib.parse import urllib.request import email.utils from datetime import datetime, timedelta, timezone from typing import Dict, Any, List, Optional from botocore.exceptions import ClientError s3 = boto3.client('s3') def lambda_handler(event, context): """ Lambda function to fetch Duo telephony logs and store them in S3. """ try: # Get configuration from environment variables bucket_name = os.environ['S3_BUCKET'] s3_prefix = os.environ['S3_PREFIX'].rstrip('/') state_key = os.environ['STATE_KEY'] integration_key = os.environ['DUO_IKEY'] secret_key = os.environ['DUO_SKEY'] api_hostname = os.environ['DUO_API_HOST'] # Load state state = load_state(bucket_name, state_key) # Calculate time range now = datetime.now(timezone.utc) if state.get('last_offset'): # Continue from last offset next_offset = state['last_offset'] logs = [] has_more = True else: # Start from last timestamp or 24 hours ago mintime = state.get('last_timestamp_ms', int((now - timedelta(hours=24)).timestamp() * 1000)) # Apply 2-minute delay as recommended by Duo maxtime = int((now - timedelta(minutes=2)).timestamp() * 1000) next_offset = None logs = [] has_more = True # Fetch logs with pagination total_fetched = 0 max_iterations = int(os.environ.get('MAX_ITERATIONS', '10')) while has_more and total_fetched < max_iterations: if next_offset: # Use offset for pagination params = { 'limit': '1000', 'next_offset': next_offset } else: # Initial request with time range params = { 'mintime': str(mintime), 'maxtime': str(maxtime), 'limit': '1000', 'sort': 'ts:asc' } # Make API request with retry logic response = duo_api_call_with_retry( 'GET', api_hostname, '/admin/v2/logs/telephony', params, integration_key, secret_key ) if 'items' in response: logs.extend(response['items']) total_fetched += 1 # Check for more data if 'metadata' in response and 'next_offset' in response['metadata']: next_offset = response['metadata']['next_offset'] state['last_offset'] = next_offset else: has_more = False state['last_offset'] = None # Update timestamp for next run if logs: # Get the latest timestamp from logs latest_ts = max([log.get('ts', '') for log in logs]) if latest_ts: # Convert ISO timestamp to milliseconds dt = datetime.fromisoformat(latest_ts.replace('Z', '+00:00')) state['last_timestamp_ms'] = int(dt.timestamp() * 1000) + 1 else: has_more = False # Save logs to S3 if any were fetched if logs: timestamp = datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S') key = f"{s3_prefix}/telephony_{timestamp}.json" # Format logs as newline-delimited JSON log_data = '\n'.join(json.dumps(log) for log in logs) s3.put_object( Bucket=bucket_name, Key=key, Body=log_data.encode('utf-8'), ContentType='application/x-ndjson' ) print(f"Saved {len(logs)} telephony logs to s3://{bucket_name}/{key}") else: print("No new telephony logs found") # Save state save_state(bucket_name, state_key, state) return { 'statusCode': 200, 'body': json.dumps({ 'message': f'Successfully processed {len(logs)} telephony logs', 'logs_count': len(logs) }) } except Exception as e: print(f"Error: {str(e)}") return { 'statusCode': 500, 'body': json.dumps({'error': str(e)}) } def duo_api_call_with_retry(method: str, host: str, path: str, params: Dict[str, str], ikey: str, skey: str, max_retries: int = 3) -> Dict[str, Any]: """ Make an authenticated API call to Duo Admin API with retry logic. """ for attempt in range(max_retries): try: return duo_api_call(method, host, path, params, ikey, skey) except Exception as e: if '429' in str(e) or '5' in str(e)[:1]: # Rate limit or server error if attempt < max_retries - 1: wait_time = (2 ** attempt) * 2 # Exponential backoff print(f"Retrying after {wait_time} seconds...") import time time.sleep(wait_time) continue raise def duo_api_call(method: str, host: str, path: str, params: Dict[str, str], ikey: str, skey: str) -> Dict[str, Any]: """ Make an authenticated API call to Duo Admin API. """ # Create canonical string for signing using RFC 2822 date format now = email.utils.formatdate() canon = [now, method.upper(), host.lower(), path] # Add parameters args = [] for key in sorted(params.keys()): val = params[key] args.append(f"{urllib.parse.quote(key, '~')}={urllib.parse.quote(val, '~')}") canon.append('&'.join(args)) canon_str = '\n'.join(canon) # Sign the request sig = hmac.new( skey.encode('utf-8'), canon_str.encode('utf-8'), hashlib.sha1 ).hexdigest() # Create authorization header auth = base64.b64encode(f"{ikey}:{sig}".encode('utf-8')).decode('utf-8') # Build URL url = f"https://{host}{path}" if params: url += '?' + '&'.join(args) # Make request req = urllib.request.Request(url) req.add_header('Authorization', f'Basic {auth}') req.add_header('Date', now) req.add_header('Host', host) req.add_header('User-Agent', 'duo-telephony-s3-ingestor/1.0') try: with urllib.request.urlopen(req, timeout=30) as response: data = json.loads(response.read().decode('utf-8')) if data.get('stat') == 'OK': return data.get('response', {}) else: raise Exception(f"API error: {data.get('message', 'Unknown error')}") except urllib.error.HTTPError as e: error_body = e.read().decode('utf-8') raise Exception(f"HTTP error {e.code}: {error_body}") def load_state(bucket: str, key: str) -> Dict[str, Any]: """Load state from S3.""" try: response = s3.get_object(Bucket=bucket, Key=key) return json.loads(response['Body'].read().decode('utf-8')) except ClientError as e: if e.response.get('Error', {}).get('Code') in ('NoSuchKey', '404'): return {} print(f"Error loading state: {e}") return {} except Exception as e: print(f"Error loading state: {e}") return {} def save_state(bucket: str, key: str, state: Dict[str, Any]): """Save state to S3.""" try: s3.put_object( Bucket=bucket, Key=key, Body=json.dumps(state).encode('utf-8'), ContentType='application/json' ) except Exception as e: print(f"Error saving state: {e}")
依次前往配置 > 环境变量。
依次点击修改 > 添加新的环境变量。
输入下表中提供的环境变量,并将示例值替换为您的值。
环境变量
键 示例值 S3_BUCKET
duo-telephony-logs
S3_PREFIX
duo-telephony/
STATE_KEY
duo-telephony/state.json
DUO_IKEY
<your-integration-key>
DUO_SKEY
<your-secret-key>
DUO_API_HOST
api-yyyyyyyy.duosecurity.com
MAX_ITERATIONS
10
创建函数后,请停留在其页面上(或依次打开 Lambda > 函数 > duo-telephony-logs-collector)。
选择配置标签页。
在常规配置面板中,点击修改。
将超时更改为 5 分钟(300 秒),然后点击保存。
创建 EventBridge 计划
- 依次前往 Amazon EventBridge > 调度器 > 创建调度。
- 提供以下配置详细信息:
- 周期性安排:费率 (
1 hour
)。 - 目标:您的 Lambda 函数
duo-telephony-logs-collector
。 - 名称:
duo-telephony-logs-1h
。
- 周期性安排:费率 (
- 点击创建时间表。
(可选)为 Google SecOps 创建只读 IAM 用户和密钥
- 依次前往 AWS 控制台 > IAM > 用户。
- 点击 Add users(添加用户)。
- 提供以下配置详细信息:
- 用户:输入
secops-reader
。 - 访问类型:选择访问密钥 - 以程序化方式访问。
- 用户:输入
- 点击创建用户。
- 附加最低限度的读取政策(自定义):依次选择用户 > secops-reader > 权限 > 添加权限 > 直接附加政策 > 创建政策。
JSON:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::duo-telephony-logs/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::duo-telephony-logs" } ] }
名称 =
secops-reader-policy
。依次点击创建政策 > 搜索/选择 > 下一步 > 添加权限。
为
secops-reader
创建访问密钥:安全凭据 > 访问密钥。点击创建访问密钥。
下载
.CSV
。(您需要将这些值粘贴到 Feed 中)。
在 Google SecOps 中配置 Feed 以注入 Duo 电话日志
- 依次前往 SIEM 设置> Feed。
- 点击 + 添加新 Feed。
- 在Feed 名称字段中,输入 Feed 的名称(例如
Duo Telephony logs
)。 - 选择 Amazon S3 V2 作为来源类型。
- 选择 Duo Telephony Logs 作为日志类型。
- 点击下一步。
- 为以下输入参数指定值:
- S3 URI:
s3://duo-telephony-logs/duo-telephony/
- 来源删除选项:根据您的偏好选择删除选项。
- 文件存在时间上限:包含在过去指定天数内修改的文件。默认值为 180 天。
- 访问密钥 ID:有权访问 S3 存储桶的用户访问密钥。
- 私有访问密钥:具有 S3 存储桶访问权限的用户私有密钥。
- 资产命名空间:资产命名空间。
- 注入标签:应用于此 Feed 中事件的标签。
- S3 URI:
- 点击下一步。
- 在最终确定界面中查看新的 Feed 配置,然后点击提交。
UDM 映射表
日志字段 | UDM 映射 | 逻辑 |
---|---|---|
context |
metadata.product_event_type |
直接从原始日志中的 context 字段映射。 |
credits |
security_result.detection_fields.value |
直接从原始日志中的 credits 字段映射,嵌套在具有相应键 credits 的 detection_fields 对象下。 |
eventtype |
security_result.detection_fields.value |
直接从原始日志中的 eventtype 字段映射,嵌套在具有相应键 eventtype 的 detection_fields 对象下。 |
host |
principal.hostname |
如果原始日志中的 host 字段不是 IP 地址,则直接从该字段映射。在解析器中设置为静态值“ALLOW”。在解析器中设置为静态值“MECHANISM_UNSPECIFIED”。从原始日志的 timestamp 字段解析而来,表示自纪元开始算起的秒数。如果原始日志中同时存在 context 和 host 字段,则设置为“USER_UNCATEGORIZED”。如果仅存在 host ,则设置为“STATUS_UPDATE”。否则,设置为“GENERIC_EVENT”。直接取自原始日志的 log_type 字段。在解析器中设置为静态值“Telephony”。在解析器中设置为静态值“Duo”。 |
phone |
principal.user.phone_numbers |
直接从原始日志中的 phone 字段映射。 |
phone |
principal.user.userid |
直接从原始日志中的 phone 字段映射。在解析器中设置为静态值“INFORMATIONAL”。在解析器中设置为静态值“Duo Telephony”。 |
timestamp |
metadata.event_timestamp |
从原始日志的 timestamp 字段解析而来,表示自纪元开始算起的秒数。 |
type |
security_result.summary |
直接从原始日志中的 type 字段映射。 |
需要更多帮助?从社区成员和 Google SecOps 专业人士那里获得解答。