Raccogliere i log di Duo Telephony

Supportato in:

Questo documento spiega come importare i log di Duo Telephony in Google Security Operations utilizzando Amazon S3. Il parser estrae i campi dai log, li trasforma e li mappa al modello Unified Data Model (UDM). Gestisce vari formati di log di Duo, converte i timestamp, estrae le informazioni utente, i dettagli di rete e i risultati di sicurezza e infine struttura l'output nel formato UDM standardizzato.

Prima di iniziare

Assicurati di soddisfare i seguenti prerequisiti:

  • Istanza Google SecOps.
  • Accesso privilegiato al pannello di amministrazione di Duo con il ruolo Proprietario.
  • Accesso privilegiato ad AWS (S3, Identity and Access Management (IAM), Lambda, EventBridge).

Raccogli i prerequisiti di Duo (credenziali API)

  1. Accedi al pannello di amministrazione di Duo come amministratore con il ruolo Proprietario.
  2. Vai ad Applicazioni > Catalogo applicazioni.
  3. Individua la voce relativa all'API Admin nel catalogo.
  4. Fai clic su + Aggiungi per creare l'applicazione.
  5. Copia e salva in una posizione sicura i seguenti dettagli:
    • Chiave di integrazione
    • Chiave segreta
    • Nome host API (ad esempio, api-yyyyyyyy.duosecurity.com)
  6. Nella sezione Autorizzazioni, deseleziona tutte le opzioni di autorizzazione tranne Concedi lettura log.
  7. Fai clic su Salva modifiche.

Configura il bucket AWS S3 e IAM per Google SecOps

  1. Crea un bucket Amazon S3 seguendo questa guida utente: Creazione di un bucket
  2. Salva il nome e la regione del bucket per riferimento futuro (ad esempio, duo-telephony-logs).
  3. Crea un utente seguendo questa guida utente: Creazione di un utente IAM.
  4. Seleziona l'utente creato.
  5. Seleziona la scheda Credenziali di sicurezza.
  6. Fai clic su Crea chiave di accesso nella sezione Chiavi di accesso.
  7. Seleziona Servizio di terze parti come Caso d'uso.
  8. Fai clic su Avanti.
  9. (Facoltativo) Aggiungi un tag di descrizione.
  10. Fai clic su Crea chiave di accesso.
  11. Fai clic su Scarica file .CSV per salvare la chiave di accesso e la chiave di accesso segreta per riferimento futuro.
  12. Fai clic su Fine.
  13. Seleziona la scheda Autorizzazioni.
  14. Fai clic su Aggiungi autorizzazioni nella sezione Criteri per le autorizzazioni.
  15. Seleziona Aggiungi autorizzazioni.
  16. Seleziona Allega direttamente i criteri.
  17. Cerca i criteri AmazonS3FullAccess.
  18. Seleziona la policy.
  19. Fai clic su Avanti.
  20. Fai clic su Aggiungi autorizzazioni.

Configura il ruolo e il criterio IAM per i caricamenti S3

  1. Nella console AWS, vai a IAM > Policy.
  2. Fai clic su Crea criterio > scheda JSON.
  3. Copia e incolla i seguenti criteri.
  4. JSON delle policy (sostituisci duo-telephony-logs se hai inserito un nome del bucket diverso):

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::duo-telephony-logs/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": "s3:GetObject",
          "Resource": "arn:aws:s3:::duo-telephony-logs/duo-telephony/state.json"
        }
      ]
    }
    
  5. Fai clic su Avanti > Crea criterio.

  6. Vai a IAM > Ruoli > Crea ruolo > Servizio AWS > Lambda.

  7. Allega il criterio appena creato.

  8. Assegna al ruolo il nome duo-telephony-lambda-role e fai clic su Crea ruolo.

Crea la funzione Lambda

  1. Nella console AWS, vai a Lambda > Funzioni > Crea funzione.
  2. Fai clic su Crea autore da zero.
  3. Fornisci i seguenti dettagli di configurazione:

    Impostazione Valore
    Nome duo-telephony-logs-collector
    Tempo di esecuzione Python 3.13
    Architettura x86_64
    Ruolo di esecuzione duo-telephony-lambda-role
  4. Dopo aver creato la funzione, apri la scheda Codice, elimina lo stub e incolla il seguente codice (duo-telephony-logs-collector.py).

    import json
    import boto3
    import os
    import hmac
    import hashlib
    import base64
    import urllib.parse
    import urllib.request
    import email.utils
    from datetime import datetime, timedelta, timezone
    from typing import Dict, Any, List, Optional
    from botocore.exceptions import ClientError
    
    s3 = boto3.client('s3')
    
    def lambda_handler(event, context):
        """
        Lambda function to fetch Duo telephony logs and store them in S3.
        """
        try:
            # Get configuration from environment variables
            bucket_name = os.environ['S3_BUCKET']
            s3_prefix = os.environ['S3_PREFIX'].rstrip('/')
            state_key = os.environ['STATE_KEY']
            integration_key = os.environ['DUO_IKEY']
            secret_key = os.environ['DUO_SKEY']
            api_hostname = os.environ['DUO_API_HOST']
    
            # Load state
            state = load_state(bucket_name, state_key)
    
            # Calculate time range
            now = datetime.now(timezone.utc)
            if state.get('last_offset'):
                # Continue from last offset
                next_offset = state['last_offset']
                logs = []
                has_more = True
            else:
                # Start from last timestamp or 24 hours ago
                mintime = state.get('last_timestamp_ms', 
                                  int((now - timedelta(hours=24)).timestamp() * 1000))
                # Apply 2-minute delay as recommended by Duo
                maxtime = int((now - timedelta(minutes=2)).timestamp() * 1000)
                next_offset = None
                logs = []
                has_more = True
    
            # Fetch logs with pagination
            total_fetched = 0
            max_iterations = int(os.environ.get('MAX_ITERATIONS', '10'))
    
            while has_more and total_fetched < max_iterations:
                if next_offset:
                    # Use offset for pagination
                    params = {
                        'limit': '1000',
                        'next_offset': next_offset
                    }
                else:
                    # Initial request with time range
                    params = {
                        'mintime': str(mintime),
                        'maxtime': str(maxtime),
                        'limit': '1000',
                        'sort': 'ts:asc'
                    }
    
                # Make API request with retry logic
                response = duo_api_call_with_retry(
                    'GET',
                    api_hostname,
                    '/admin/v2/logs/telephony',
                    params,
                    integration_key,
                    secret_key
                )
    
                if 'items' in response:
                    logs.extend(response['items'])
                    total_fetched += 1
    
                    # Check for more data
                    if 'metadata' in response and 'next_offset' in response['metadata']:
                        next_offset = response['metadata']['next_offset']
                        state['last_offset'] = next_offset
                    else:
                        has_more = False
                        state['last_offset'] = None
    
                        # Update timestamp for next run
                        if logs:
                            # Get the latest timestamp from logs
                            latest_ts = max([log.get('ts', '') for log in logs])
                            if latest_ts:
                                # Convert ISO timestamp to milliseconds
                                dt = datetime.fromisoformat(latest_ts.replace('Z', '+00:00'))
                                state['last_timestamp_ms'] = int(dt.timestamp() * 1000) + 1
                else:
                    has_more = False
    
            # Save logs to S3 if any were fetched
            if logs:
                timestamp = datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')
                key = f"{s3_prefix}/telephony_{timestamp}.json"
    
                # Format logs as newline-delimited JSON
                log_data = '\n'.join(json.dumps(log) for log in logs)
    
                s3.put_object(
                    Bucket=bucket_name,
                    Key=key,
                    Body=log_data.encode('utf-8'),
                    ContentType='application/x-ndjson'
                )
    
                print(f"Saved {len(logs)} telephony logs to s3://{bucket_name}/{key}")
            else:
                print("No new telephony logs found")
    
            # Save state
            save_state(bucket_name, state_key, state)
    
            return {
                'statusCode': 200,
                'body': json.dumps({
                    'message': f'Successfully processed {len(logs)} telephony logs',
                    'logs_count': len(logs)
                })
            }
    
        except Exception as e:
            print(f"Error: {str(e)}")
            return {
                'statusCode': 500,
                'body': json.dumps({'error': str(e)})
            }
    
    def duo_api_call_with_retry(method: str, host: str, path: str, params: Dict[str, str], 
                                ikey: str, skey: str, max_retries: int = 3) -> Dict[str, Any]:
        """
        Make an authenticated API call to Duo Admin API with retry logic.
        """
        for attempt in range(max_retries):
            try:
                return duo_api_call(method, host, path, params, ikey, skey)
            except Exception as e:
                if '429' in str(e) or '5' in str(e)[:1]:  # Rate limit or server error
                    if attempt < max_retries - 1:
                        wait_time = (2 ** attempt) * 2  # Exponential backoff
                        print(f"Retrying after {wait_time} seconds...")
                        import time
                        time.sleep(wait_time)
                        continue
                raise
    
    def duo_api_call(method: str, host: str, path: str, params: Dict[str, str], 
                    ikey: str, skey: str) -> Dict[str, Any]:
        """
        Make an authenticated API call to Duo Admin API.
        """
        # Create canonical string for signing using RFC 2822 date format
        now = email.utils.formatdate()
        canon = [now, method.upper(), host.lower(), path]
    
        # Add parameters
        args = []
        for key in sorted(params.keys()):
            val = params[key]
            args.append(f"{urllib.parse.quote(key, '~')}={urllib.parse.quote(val, '~')}")
        canon.append('&'.join(args))
        canon_str = '\n'.join(canon)
    
        # Sign the request
        sig = hmac.new(
            skey.encode('utf-8'),
            canon_str.encode('utf-8'),
            hashlib.sha1
        ).hexdigest()
    
        # Create authorization header
        auth = base64.b64encode(f"{ikey}:{sig}".encode('utf-8')).decode('utf-8')
    
        # Build URL
        url = f"https://{host}{path}"
        if params:
            url += '?' + '&'.join(args)
    
        # Make request
        req = urllib.request.Request(url)
        req.add_header('Authorization', f'Basic {auth}')
        req.add_header('Date', now)
        req.add_header('Host', host)
        req.add_header('User-Agent', 'duo-telephony-s3-ingestor/1.0')
    
        try:
            with urllib.request.urlopen(req, timeout=30) as response:
                data = json.loads(response.read().decode('utf-8'))
                if data.get('stat') == 'OK':
                    return data.get('response', {})
                else:
                    raise Exception(f"API error: {data.get('message', 'Unknown error')}")
        except urllib.error.HTTPError as e:
            error_body = e.read().decode('utf-8')
            raise Exception(f"HTTP error {e.code}: {error_body}")
    
    def load_state(bucket: str, key: str) -> Dict[str, Any]:
        """Load state from S3."""
        try:
            response = s3.get_object(Bucket=bucket, Key=key)
            return json.loads(response['Body'].read().decode('utf-8'))
        except ClientError as e:
            if e.response.get('Error', {}).get('Code') in ('NoSuchKey', '404'):
                return {}
            print(f"Error loading state: {e}")
            return {}
        except Exception as e:
            print(f"Error loading state: {e}")
            return {}
    
    def save_state(bucket: str, key: str, state: Dict[str, Any]):
        """Save state to S3."""
        try:
            s3.put_object(
                Bucket=bucket,
                Key=key,
                Body=json.dumps(state).encode('utf-8'),
                ContentType='application/json'
            )
        except Exception as e:
            print(f"Error saving state: {e}")
    
  5. Vai a Configurazione > Variabili di ambiente.

  6. Fai clic su Modifica > Aggiungi nuova variabile di ambiente.

  7. Inserisci le variabili di ambiente fornite nella tabella seguente, sostituendo i valori di esempio con i tuoi valori.

    Variabili di ambiente

    Chiave Valore di esempio
    S3_BUCKET duo-telephony-logs
    S3_PREFIX duo-telephony/
    STATE_KEY duo-telephony/state.json
    DUO_IKEY <your-integration-key>
    DUO_SKEY <your-secret-key>
    DUO_API_HOST api-yyyyyyyy.duosecurity.com
    MAX_ITERATIONS 10
  8. Dopo aver creato la funzione, rimani sulla relativa pagina (o apri Lambda > Funzioni > duo-telephony-logs-collector).

  9. Seleziona la scheda Configurazione.

  10. Nel riquadro Configurazione generale, fai clic su Modifica.

  11. Modifica Timeout impostando 5 minuti (300 secondi) e fai clic su Salva.

Creare una pianificazione EventBridge

  1. Vai a Amazon EventBridge > Scheduler > Crea pianificazione.
  2. Fornisci i seguenti dettagli di configurazione:
    • Programma ricorrente: Tariffa (1 hour).
    • Destinazione: la tua funzione Lambda duo-telephony-logs-collector.
    • Nome: duo-telephony-logs-1h
  3. Fai clic su Crea pianificazione.

(Facoltativo) Crea chiavi e utenti IAM di sola lettura per Google SecOps

  1. Vai alla console AWS > IAM > Utenti.
  2. Fai clic su Add users (Aggiungi utenti).
  3. Fornisci i seguenti dettagli di configurazione:
    • Utente: inserisci secops-reader.
    • Tipo di accesso: seleziona Chiave di accesso - Accesso programmatico.
  4. Fai clic su Crea utente.
  5. Collega la criterio per la lettura minima (personalizzata): Utenti > secops-reader > Autorizzazioni > Aggiungi autorizzazioni > Collega le norme direttamente > Crea norma.
  6. JSON:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::duo-telephony-logs/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::duo-telephony-logs"
        }
      ]
    }
    
  7. Name = secops-reader-policy.

  8. Fai clic su Crea criterio > cerca/seleziona > Avanti > Aggiungi autorizzazioni.

  9. Crea la chiave di accesso per secops-reader: Credenziali di sicurezza > Chiavi di accesso.

  10. Fai clic su Crea chiave di accesso.

  11. Scarica il .CSV. Incollerai questi valori nel feed.

Configurare un feed in Google SecOps per importare i log di Duo Telephony

  1. Vai a Impostazioni SIEM > Feed.
  2. Fai clic su + Aggiungi nuovo feed.
  3. Nel campo Nome feed, inserisci un nome per il feed (ad esempio, Duo Telephony logs).
  4. Seleziona Amazon S3 V2 come Tipo di origine.
  5. Seleziona Log di telefonia Duo come Tipo di log.
  6. Fai clic su Avanti.
  7. Specifica i valori per i seguenti parametri di input:
    • URI S3: s3://duo-telephony-logs/duo-telephony/
    • Opzioni di eliminazione dell'origine: seleziona l'opzione di eliminazione in base alle tue preferenze.
    • Età massima del file: includi i file modificati nell'ultimo numero di giorni. Il valore predefinito è 180 giorni.
    • ID chiave di accesso: chiave di accesso utente con accesso al bucket S3.
    • Chiave di accesso segreta: chiave segreta dell'utente con accesso al bucket S3.
    • Spazio dei nomi dell'asset: lo spazio dei nomi dell'asset.
    • Etichette di importazione: l'etichetta applicata agli eventi di questo feed.
  8. Fai clic su Avanti.
  9. Controlla la nuova configurazione del feed nella schermata Finalizza e poi fai clic su Invia.

Tabella di mappatura UDM

Campo log Mappatura UDM Logic
context metadata.product_event_type Mappato direttamente dal campo context nel log non elaborato.
credits security_result.detection_fields.value Mappato direttamente dal campo credits nel log non elaborato, nidificato all'interno di un oggetto detection_fields con la chiave corrispondente credits.
eventtype security_result.detection_fields.value Mappato direttamente dal campo eventtype nel log non elaborato, nidificato all'interno di un oggetto detection_fields con la chiave corrispondente eventtype.
host principal.hostname Mappato direttamente dal campo host nel log non elaborato, se non è un indirizzo IP. Imposta un valore statico di "ALLOW" nel parser. Imposta un valore statico di "MECHANISM_UNSPECIFIED" nel parser. Analizzato dal campo timestamp nel log non elaborato, che rappresenta i secondi trascorsi da epoca. Imposta su "USER_UNCATEGORIZED" se nel log non elaborato sono presenti sia il campo context che il campo host. Imposta "STATUS_UPDATE" se è presente solo host. In caso contrario, imposta "GENERIC_EVENT". Estratto direttamente dal campo log_type del log non elaborato. Imposta un valore statico di "Telefonia" nel parser. Imposta un valore statico di "Duo" nel parser.
phone principal.user.phone_numbers Mappato direttamente dal campo phone nel log non elaborato.
phone principal.user.userid Mappato direttamente dal campo phone nel log non elaborato. Impostato sul valore statico "INFORMATIONAL" nel parser. Imposta un valore statico di "Duo Telephony" nel parser.
timestamp metadata.event_timestamp Analizzato dal campo timestamp nel log non elaborato, che rappresenta i secondi trascorsi da epoca.
type security_result.summary Mappato direttamente dal campo type nel log non elaborato.

Hai bisogno di ulteriore assistenza? Ricevi risposte dai membri della community e dai professionisti di Google SecOps.