收集 Duo Telephony 日志

支持的平台:

本文档介绍了如何使用 Amazon S3 将 Duo Telephony 日志注入到 Google Security Operations。解析器从日志中提取字段,并将其转换和映射到统一数据模型 (UDM)。它可处理各种 Duo 日志格式,转换时间戳,提取用户信息、网络详细信息和安全结果,最后将输出内容整理为标准化的 UDM 格式。

准备工作

请确保满足以下前提条件:

  • Google SecOps 实例。
  • 拥有所有者角色的用户对 Duo 管理面板的专属访问权限。
  • AWS(S3、Identity and Access Management (IAM)、Lambda、EventBridge)的特权访问权限。

收集 Duo 前提条件(API 凭据)

  1. 以拥有所有者角色的管理员身份登录 Duo 管理控制台
  2. 前往应用 > 应用目录
  3. 在目录中找到 Admin API 的条目。
  4. 点击 + 添加以创建应用。
  5. 复制以下详细信息并将其保存在安全的位置:
    • 集成密钥
    • 密钥
    • API 主机名(例如 api-yyyyyyyy.duosecurity.com
  6. 权限部分,取消选择除授予读取日志权限之外的所有权限选项。
  7. 点击保存更改

为 Google SecOps 配置 AWS S3 存储桶和 IAM

  1. 按照以下用户指南创建 Amazon S3 存储桶创建存储桶
  2. 保存存储桶名称区域以供日后参考(例如 duo-telephony-logs)。
  3. 按照以下用户指南创建用户创建 IAM 用户
  4. 选择创建的用户
  5. 选择安全凭据标签页。
  6. 访问密钥部分中,点击创建访问密钥
  7. 选择第三方服务作为使用情形
  8. 点击下一步
  9. 可选:添加说明标记。
  10. 点击创建访问密钥
  11. 点击下载 .CSV 文件,保存访问密钥秘密访问密钥,以供日后参考。
  12. 点击完成
  13. 选择权限标签页。
  14. 权限政策部分中,点击添加权限
  15. 选择添加权限
  16. 选择直接附加政策
  17. 搜索 AmazonS3FullAccess 政策。
  18. 选择相应政策。
  19. 点击下一步
  20. 点击添加权限

为 S3 上传配置 IAM 政策和角色

  1. AWS 控制台中,依次前往 IAM > 政策
  2. 依次点击创建政策 > JSON 标签页
  3. 复制并粘贴以下政策。
  4. 政策 JSON(如果您输入了其他存储桶名称,请替换 duo-telephony-logs):

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::duo-telephony-logs/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": "s3:GetObject",
          "Resource": "arn:aws:s3:::duo-telephony-logs/duo-telephony/state.json"
        }
      ]
    }
    
  5. 依次点击下一步 > 创建政策

  6. 依次前往 IAM > 角色 > 创建角色 > AWS 服务 > Lambda

  7. 附加新创建的政策。

  8. 将角色命名为 duo-telephony-lambda-role,然后点击创建角色

创建 Lambda 函数

  1. AWS 控制台中,依次前往 Lambda > 函数 > 创建函数
  2. 点击从头开始创作
  3. 提供以下配置详细信息:

    设置
    名称 duo-telephony-logs-collector
    运行时 Python 3.13
    架构 x86_64
    执行角色 duo-telephony-lambda-role
  4. 创建函数后,打开 Code 标签页,删除桩代码并粘贴以下代码 (duo-telephony-logs-collector.py)。

    import json
    import boto3
    import os
    import hmac
    import hashlib
    import base64
    import urllib.parse
    import urllib.request
    import email.utils
    from datetime import datetime, timedelta, timezone
    from typing import Dict, Any, List, Optional
    from botocore.exceptions import ClientError
    
    s3 = boto3.client('s3')
    
    def lambda_handler(event, context):
        """
        Lambda function to fetch Duo telephony logs and store them in S3.
        """
        try:
            # Get configuration from environment variables
            bucket_name = os.environ['S3_BUCKET']
            s3_prefix = os.environ['S3_PREFIX'].rstrip('/')
            state_key = os.environ['STATE_KEY']
            integration_key = os.environ['DUO_IKEY']
            secret_key = os.environ['DUO_SKEY']
            api_hostname = os.environ['DUO_API_HOST']
    
            # Load state
            state = load_state(bucket_name, state_key)
    
            # Calculate time range
            now = datetime.now(timezone.utc)
            if state.get('last_offset'):
                # Continue from last offset
                next_offset = state['last_offset']
                logs = []
                has_more = True
            else:
                # Start from last timestamp or 24 hours ago
                mintime = state.get('last_timestamp_ms', 
                                  int((now - timedelta(hours=24)).timestamp() * 1000))
                # Apply 2-minute delay as recommended by Duo
                maxtime = int((now - timedelta(minutes=2)).timestamp() * 1000)
                next_offset = None
                logs = []
                has_more = True
    
            # Fetch logs with pagination
            total_fetched = 0
            max_iterations = int(os.environ.get('MAX_ITERATIONS', '10'))
    
            while has_more and total_fetched < max_iterations:
                if next_offset:
                    # Use offset for pagination
                    params = {
                        'limit': '1000',
                        'next_offset': next_offset
                    }
                else:
                    # Initial request with time range
                    params = {
                        'mintime': str(mintime),
                        'maxtime': str(maxtime),
                        'limit': '1000',
                        'sort': 'ts:asc'
                    }
    
                # Make API request with retry logic
                response = duo_api_call_with_retry(
                    'GET',
                    api_hostname,
                    '/admin/v2/logs/telephony',
                    params,
                    integration_key,
                    secret_key
                )
    
                if 'items' in response:
                    logs.extend(response['items'])
                    total_fetched += 1
    
                    # Check for more data
                    if 'metadata' in response and 'next_offset' in response['metadata']:
                        next_offset = response['metadata']['next_offset']
                        state['last_offset'] = next_offset
                    else:
                        has_more = False
                        state['last_offset'] = None
    
                        # Update timestamp for next run
                        if logs:
                            # Get the latest timestamp from logs
                            latest_ts = max([log.get('ts', '') for log in logs])
                            if latest_ts:
                                # Convert ISO timestamp to milliseconds
                                dt = datetime.fromisoformat(latest_ts.replace('Z', '+00:00'))
                                state['last_timestamp_ms'] = int(dt.timestamp() * 1000) + 1
                else:
                    has_more = False
    
            # Save logs to S3 if any were fetched
            if logs:
                timestamp = datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')
                key = f"{s3_prefix}/telephony_{timestamp}.json"
    
                # Format logs as newline-delimited JSON
                log_data = '\n'.join(json.dumps(log) for log in logs)
    
                s3.put_object(
                    Bucket=bucket_name,
                    Key=key,
                    Body=log_data.encode('utf-8'),
                    ContentType='application/x-ndjson'
                )
    
                print(f"Saved {len(logs)} telephony logs to s3://{bucket_name}/{key}")
            else:
                print("No new telephony logs found")
    
            # Save state
            save_state(bucket_name, state_key, state)
    
            return {
                'statusCode': 200,
                'body': json.dumps({
                    'message': f'Successfully processed {len(logs)} telephony logs',
                    'logs_count': len(logs)
                })
            }
    
        except Exception as e:
            print(f"Error: {str(e)}")
            return {
                'statusCode': 500,
                'body': json.dumps({'error': str(e)})
            }
    
    def duo_api_call_with_retry(method: str, host: str, path: str, params: Dict[str, str], 
                                ikey: str, skey: str, max_retries: int = 3) -> Dict[str, Any]:
        """
        Make an authenticated API call to Duo Admin API with retry logic.
        """
        for attempt in range(max_retries):
            try:
                return duo_api_call(method, host, path, params, ikey, skey)
            except Exception as e:
                if '429' in str(e) or '5' in str(e)[:1]:  # Rate limit or server error
                    if attempt < max_retries - 1:
                        wait_time = (2 ** attempt) * 2  # Exponential backoff
                        print(f"Retrying after {wait_time} seconds...")
                        import time
                        time.sleep(wait_time)
                        continue
                raise
    
    def duo_api_call(method: str, host: str, path: str, params: Dict[str, str], 
                    ikey: str, skey: str) -> Dict[str, Any]:
        """
        Make an authenticated API call to Duo Admin API.
        """
        # Create canonical string for signing using RFC 2822 date format
        now = email.utils.formatdate()
        canon = [now, method.upper(), host.lower(), path]
    
        # Add parameters
        args = []
        for key in sorted(params.keys()):
            val = params[key]
            args.append(f"{urllib.parse.quote(key, '~')}={urllib.parse.quote(val, '~')}")
        canon.append('&'.join(args))
        canon_str = '\n'.join(canon)
    
        # Sign the request
        sig = hmac.new(
            skey.encode('utf-8'),
            canon_str.encode('utf-8'),
            hashlib.sha1
        ).hexdigest()
    
        # Create authorization header
        auth = base64.b64encode(f"{ikey}:{sig}".encode('utf-8')).decode('utf-8')
    
        # Build URL
        url = f"https://{host}{path}"
        if params:
            url += '?' + '&'.join(args)
    
        # Make request
        req = urllib.request.Request(url)
        req.add_header('Authorization', f'Basic {auth}')
        req.add_header('Date', now)
        req.add_header('Host', host)
        req.add_header('User-Agent', 'duo-telephony-s3-ingestor/1.0')
    
        try:
            with urllib.request.urlopen(req, timeout=30) as response:
                data = json.loads(response.read().decode('utf-8'))
                if data.get('stat') == 'OK':
                    return data.get('response', {})
                else:
                    raise Exception(f"API error: {data.get('message', 'Unknown error')}")
        except urllib.error.HTTPError as e:
            error_body = e.read().decode('utf-8')
            raise Exception(f"HTTP error {e.code}: {error_body}")
    
    def load_state(bucket: str, key: str) -> Dict[str, Any]:
        """Load state from S3."""
        try:
            response = s3.get_object(Bucket=bucket, Key=key)
            return json.loads(response['Body'].read().decode('utf-8'))
        except ClientError as e:
            if e.response.get('Error', {}).get('Code') in ('NoSuchKey', '404'):
                return {}
            print(f"Error loading state: {e}")
            return {}
        except Exception as e:
            print(f"Error loading state: {e}")
            return {}
    
    def save_state(bucket: str, key: str, state: Dict[str, Any]):
        """Save state to S3."""
        try:
            s3.put_object(
                Bucket=bucket,
                Key=key,
                Body=json.dumps(state).encode('utf-8'),
                ContentType='application/json'
            )
        except Exception as e:
            print(f"Error saving state: {e}")
    
  5. 依次前往配置 > 环境变量

  6. 依次点击修改 > 添加新的环境变量

  7. 输入下表中提供的环境变量,并将示例值替换为您的值。

    环境变量

    示例值
    S3_BUCKET duo-telephony-logs
    S3_PREFIX duo-telephony/
    STATE_KEY duo-telephony/state.json
    DUO_IKEY <your-integration-key>
    DUO_SKEY <your-secret-key>
    DUO_API_HOST api-yyyyyyyy.duosecurity.com
    MAX_ITERATIONS 10
  8. 创建函数后,请停留在其页面上(或依次打开 Lambda > 函数 > duo-telephony-logs-collector)。

  9. 选择配置标签页。

  10. 常规配置面板中,点击修改

  11. 超时更改为 5 分钟(300 秒),然后点击保存

创建 EventBridge 计划

  1. 依次前往 Amazon EventBridge > 调度器 > 创建调度
  2. 提供以下配置详细信息:
    • 周期性安排费率 (1 hour)。
    • 目标:您的 Lambda 函数 duo-telephony-logs-collector
    • 名称duo-telephony-logs-1h
  3. 点击创建时间表

(可选)为 Google SecOps 创建只读 IAM 用户和密钥

  1. 依次前往 AWS 控制台 > IAM > 用户
  2. 点击 Add users(添加用户)。
  3. 提供以下配置详细信息:
    • 用户:输入 secops-reader
    • 访问类型:选择访问密钥 - 以程序化方式访问
  4. 点击创建用户
  5. 附加最低限度的读取政策(自定义):依次选择用户 > secops-reader > 权限 > 添加权限 > 直接附加政策 > 创建政策
  6. JSON:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::duo-telephony-logs/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::duo-telephony-logs"
        }
      ]
    }
    
  7. 名称 = secops-reader-policy

  8. 依次点击创建政策 > 搜索/选择 > 下一步 > 添加权限

  9. secops-reader 创建访问密钥:安全凭据 > 访问密钥

  10. 点击创建访问密钥

  11. 下载 .CSV。(您需要将这些值粘贴到 Feed 中)。

在 Google SecOps 中配置 Feed 以注入 Duo 电话日志

  1. 依次前往 SIEM 设置> Feed
  2. 点击 + 添加新 Feed
  3. Feed 名称字段中,输入 Feed 的名称(例如 Duo Telephony logs)。
  4. 选择 Amazon S3 V2 作为来源类型
  5. 选择 Duo Telephony Logs 作为日志类型
  6. 点击下一步
  7. 为以下输入参数指定值:
    • S3 URIs3://duo-telephony-logs/duo-telephony/
    • 来源删除选项:根据您的偏好选择删除选项。
    • 文件存在时间上限:包含在过去指定天数内修改的文件。默认值为 180 天。
    • 访问密钥 ID:有权访问 S3 存储桶的用户访问密钥。
    • 私有访问密钥:具有 S3 存储桶访问权限的用户私有密钥。
    • 资产命名空间资产命名空间
    • 注入标签:应用于此 Feed 中事件的标签。
  8. 点击下一步
  9. 最终确定界面中查看新的 Feed 配置,然后点击提交

UDM 映射表

日志字段 UDM 映射 逻辑
context metadata.product_event_type 直接从原始日志中的 context 字段映射。
credits security_result.detection_fields.value 直接从原始日志中的 credits 字段映射,嵌套在具有相应键 creditsdetection_fields 对象下。
eventtype security_result.detection_fields.value 直接从原始日志中的 eventtype 字段映射,嵌套在具有相应键 eventtypedetection_fields 对象下。
host principal.hostname 如果原始日志中的 host 字段不是 IP 地址,则直接从该字段映射。在解析器中设置为静态值“ALLOW”。在解析器中设置为静态值“MECHANISM_UNSPECIFIED”。从原始日志的 timestamp 字段解析而来,表示自纪元开始算起的秒数。如果原始日志中同时存在 contexthost 字段,则设置为“USER_UNCATEGORIZED”。如果仅存在 host,则设置为“STATUS_UPDATE”。否则,设置为“GENERIC_EVENT”。直接取自原始日志的 log_type 字段。在解析器中设置为静态值“Telephony”。在解析器中设置为静态值“Duo”。
phone principal.user.phone_numbers 直接从原始日志中的 phone 字段映射。
phone principal.user.userid 直接从原始日志中的 phone 字段映射。在解析器中设置为静态值“INFORMATIONAL”。在解析器中设置为静态值“Duo Telephony”。
timestamp metadata.event_timestamp 从原始日志的 timestamp 字段解析而来,表示自纪元开始算起的秒数。
type security_result.summary 直接从原始日志中的 type 字段映射。

需要更多帮助?从社区成员和 Google SecOps 专业人士那里获得解答。