Duo-Telefonie-Logs erfassen

Unterstützt in:

In diesem Dokument wird beschrieben, wie Sie Duo Telephony-Logs mit Amazon S3 in Google Security Operations aufnehmen. Der Parser extrahiert Felder aus den Logs, transformiert sie und ordnet sie dem Unified Data Model (UDM) zu. Es verarbeitet verschiedene Duo-Logformate, konvertiert Zeitstempel, extrahiert Nutzerinformationen, Netzwerkdetails und Sicherheitsergebnisse und strukturiert die Ausgabe schließlich im standardisierten UDM-Format.

Hinweise

Prüfen Sie, ob folgende Voraussetzungen erfüllt sind:

  • Google SecOps-Instanz.
  • Privilegierter Zugriff auf das Duo-Admin-Panel mit der Rolle Inhaber.
  • Privilegierter Zugriff auf AWS (S3, Identity and Access Management (IAM), Lambda, EventBridge).

Duo-Voraussetzungen (API-Anmeldedaten) erfassen

  1. Melden Sie sich im Duo-Admin-Steuerfeld als Administrator mit der Rolle Inhaber an.
  2. Rufen Sie Anwendungen > Anwendungskatalog auf.
  3. Suchen Sie im Katalog nach dem Eintrag für die Admin API.
  4. Klicken Sie auf + Hinzufügen, um die Anwendung zu erstellen.
  5. Kopieren Sie die folgenden Details und speichern Sie sie an einem sicheren Ort:
    • Integrationsschlüssel
    • Geheimer Schlüssel
    • API-Hostname (z. B. api-yyyyyyyy.duosecurity.com)
  6. Deaktivieren Sie im Abschnitt Berechtigungen alle Berechtigungsoptionen mit Ausnahme von Leseprotokoll gewähren.
  7. Klicken Sie auf Änderungen speichern.

AWS S3-Bucket und IAM für Google SecOps konfigurieren

  1. Erstellen Sie einen Amazon S3-Bucket. Folgen Sie dazu der Anleitung unter Bucket erstellen.
  2. Speichern Sie den Namen und die Region des Buckets zur späteren Verwendung (z. B. duo-telephony-logs).
  3. Erstellen Sie einen Nutzer gemäß dieser Anleitung: IAM-Nutzer erstellen.
  4. Wählen Sie den erstellten Nutzer aus.
  5. Wählen Sie den Tab Sicherheitsanmeldedaten aus.
  6. Klicken Sie im Abschnitt Zugriffsschlüssel auf Zugriffsschlüssel erstellen.
  7. Wählen Sie Drittanbieterdienst als Anwendungsfall aus.
  8. Klicken Sie auf Weiter.
  9. Optional: Fügen Sie ein Beschreibungstag hinzu.
  10. Klicken Sie auf Zugriffsschlüssel erstellen.
  11. Klicken Sie auf CSV-Datei herunterladen, um den Access Key (Zugriffsschlüssel) und den Secret Access Key (geheimer Zugriffsschlüssel) für die zukünftige Verwendung zu speichern.
  12. Klicken Sie auf Fertig.
  13. Wählen Sie den Tab Berechtigungen aus.
  14. Klicken Sie im Bereich Berechtigungsrichtlinien auf Berechtigungen hinzufügen.
  15. Wählen Sie Berechtigungen hinzufügen aus.
  16. Wählen Sie Richtlinien direkt anhängen aus.
  17. Suchen Sie nach der Richtlinie AmazonS3FullAccess.
  18. Wählen Sie die Richtlinie aus.
  19. Klicken Sie auf Weiter.
  20. Klicken Sie auf Berechtigungen hinzufügen.

IAM-Richtlinie und ‑Rolle für S3-Uploads konfigurieren

  1. Rufen Sie in der AWS-Konsole IAM > Richtlinien auf.
  2. Klicken Sie auf Richtlinie erstellen> Tab „JSON“.
  3. Kopieren Sie die folgende Richtlinie und fügen Sie sie ein.
  4. Policy JSON (ersetzen Sie duo-telephony-logs, wenn Sie einen anderen Bucket-Namen eingegeben haben):

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::duo-telephony-logs/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": "s3:GetObject",
          "Resource": "arn:aws:s3:::duo-telephony-logs/duo-telephony/state.json"
        }
      ]
    }
    
  5. Klicken Sie auf Weiter > Richtlinie erstellen.

  6. Rufen Sie IAM > Rollen > Rolle erstellen > AWS-Service > Lambda auf.

  7. Hängen Sie die neu erstellte Richtlinie an.

  8. Geben Sie der Rolle den Namen duo-telephony-lambda-role und klicken Sie auf Rolle erstellen.

Lambda-Funktion erstellen

  1. Rufen Sie in der AWS Console Lambda > Funktionen > Funktion erstellen auf.
  2. Klicken Sie auf Von Grund auf erstellen.
  3. Geben Sie die folgenden Konfigurationsdetails an:

    Einstellung Wert
    Name duo-telephony-logs-collector
    Laufzeit Python 3.13
    Architektur x86_64
    Ausführungsrolle duo-telephony-lambda-role
  4. Nachdem die Funktion erstellt wurde, öffnen Sie den Tab Code, löschen Sie den Stub und fügen Sie den folgenden Code (duo-telephony-logs-collector.py) ein.

    import json
    import boto3
    import os
    import hmac
    import hashlib
    import base64
    import urllib.parse
    import urllib.request
    import email.utils
    from datetime import datetime, timedelta, timezone
    from typing import Dict, Any, List, Optional
    from botocore.exceptions import ClientError
    
    s3 = boto3.client('s3')
    
    def lambda_handler(event, context):
        """
        Lambda function to fetch Duo telephony logs and store them in S3.
        """
        try:
            # Get configuration from environment variables
            bucket_name = os.environ['S3_BUCKET']
            s3_prefix = os.environ['S3_PREFIX'].rstrip('/')
            state_key = os.environ['STATE_KEY']
            integration_key = os.environ['DUO_IKEY']
            secret_key = os.environ['DUO_SKEY']
            api_hostname = os.environ['DUO_API_HOST']
    
            # Load state
            state = load_state(bucket_name, state_key)
    
            # Calculate time range
            now = datetime.now(timezone.utc)
            if state.get('last_offset'):
                # Continue from last offset
                next_offset = state['last_offset']
                logs = []
                has_more = True
            else:
                # Start from last timestamp or 24 hours ago
                mintime = state.get('last_timestamp_ms', 
                                  int((now - timedelta(hours=24)).timestamp() * 1000))
                # Apply 2-minute delay as recommended by Duo
                maxtime = int((now - timedelta(minutes=2)).timestamp() * 1000)
                next_offset = None
                logs = []
                has_more = True
    
            # Fetch logs with pagination
            total_fetched = 0
            max_iterations = int(os.environ.get('MAX_ITERATIONS', '10'))
    
            while has_more and total_fetched < max_iterations:
                if next_offset:
                    # Use offset for pagination
                    params = {
                        'limit': '1000',
                        'next_offset': next_offset
                    }
                else:
                    # Initial request with time range
                    params = {
                        'mintime': str(mintime),
                        'maxtime': str(maxtime),
                        'limit': '1000',
                        'sort': 'ts:asc'
                    }
    
                # Make API request with retry logic
                response = duo_api_call_with_retry(
                    'GET',
                    api_hostname,
                    '/admin/v2/logs/telephony',
                    params,
                    integration_key,
                    secret_key
                )
    
                if 'items' in response:
                    logs.extend(response['items'])
                    total_fetched += 1
    
                    # Check for more data
                    if 'metadata' in response and 'next_offset' in response['metadata']:
                        next_offset = response['metadata']['next_offset']
                        state['last_offset'] = next_offset
                    else:
                        has_more = False
                        state['last_offset'] = None
    
                        # Update timestamp for next run
                        if logs:
                            # Get the latest timestamp from logs
                            latest_ts = max([log.get('ts', '') for log in logs])
                            if latest_ts:
                                # Convert ISO timestamp to milliseconds
                                dt = datetime.fromisoformat(latest_ts.replace('Z', '+00:00'))
                                state['last_timestamp_ms'] = int(dt.timestamp() * 1000) + 1
                else:
                    has_more = False
    
            # Save logs to S3 if any were fetched
            if logs:
                timestamp = datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')
                key = f"{s3_prefix}/telephony_{timestamp}.json"
    
                # Format logs as newline-delimited JSON
                log_data = '\n'.join(json.dumps(log) for log in logs)
    
                s3.put_object(
                    Bucket=bucket_name,
                    Key=key,
                    Body=log_data.encode('utf-8'),
                    ContentType='application/x-ndjson'
                )
    
                print(f"Saved {len(logs)} telephony logs to s3://{bucket_name}/{key}")
            else:
                print("No new telephony logs found")
    
            # Save state
            save_state(bucket_name, state_key, state)
    
            return {
                'statusCode': 200,
                'body': json.dumps({
                    'message': f'Successfully processed {len(logs)} telephony logs',
                    'logs_count': len(logs)
                })
            }
    
        except Exception as e:
            print(f"Error: {str(e)}")
            return {
                'statusCode': 500,
                'body': json.dumps({'error': str(e)})
            }
    
    def duo_api_call_with_retry(method: str, host: str, path: str, params: Dict[str, str], 
                                ikey: str, skey: str, max_retries: int = 3) -> Dict[str, Any]:
        """
        Make an authenticated API call to Duo Admin API with retry logic.
        """
        for attempt in range(max_retries):
            try:
                return duo_api_call(method, host, path, params, ikey, skey)
            except Exception as e:
                if '429' in str(e) or '5' in str(e)[:1]:  # Rate limit or server error
                    if attempt < max_retries - 1:
                        wait_time = (2 ** attempt) * 2  # Exponential backoff
                        print(f"Retrying after {wait_time} seconds...")
                        import time
                        time.sleep(wait_time)
                        continue
                raise
    
    def duo_api_call(method: str, host: str, path: str, params: Dict[str, str], 
                    ikey: str, skey: str) -> Dict[str, Any]:
        """
        Make an authenticated API call to Duo Admin API.
        """
        # Create canonical string for signing using RFC 2822 date format
        now = email.utils.formatdate()
        canon = [now, method.upper(), host.lower(), path]
    
        # Add parameters
        args = []
        for key in sorted(params.keys()):
            val = params[key]
            args.append(f"{urllib.parse.quote(key, '~')}={urllib.parse.quote(val, '~')}")
        canon.append('&'.join(args))
        canon_str = '\n'.join(canon)
    
        # Sign the request
        sig = hmac.new(
            skey.encode('utf-8'),
            canon_str.encode('utf-8'),
            hashlib.sha1
        ).hexdigest()
    
        # Create authorization header
        auth = base64.b64encode(f"{ikey}:{sig}".encode('utf-8')).decode('utf-8')
    
        # Build URL
        url = f"https://{host}{path}"
        if params:
            url += '?' + '&'.join(args)
    
        # Make request
        req = urllib.request.Request(url)
        req.add_header('Authorization', f'Basic {auth}')
        req.add_header('Date', now)
        req.add_header('Host', host)
        req.add_header('User-Agent', 'duo-telephony-s3-ingestor/1.0')
    
        try:
            with urllib.request.urlopen(req, timeout=30) as response:
                data = json.loads(response.read().decode('utf-8'))
                if data.get('stat') == 'OK':
                    return data.get('response', {})
                else:
                    raise Exception(f"API error: {data.get('message', 'Unknown error')}")
        except urllib.error.HTTPError as e:
            error_body = e.read().decode('utf-8')
            raise Exception(f"HTTP error {e.code}: {error_body}")
    
    def load_state(bucket: str, key: str) -> Dict[str, Any]:
        """Load state from S3."""
        try:
            response = s3.get_object(Bucket=bucket, Key=key)
            return json.loads(response['Body'].read().decode('utf-8'))
        except ClientError as e:
            if e.response.get('Error', {}).get('Code') in ('NoSuchKey', '404'):
                return {}
            print(f"Error loading state: {e}")
            return {}
        except Exception as e:
            print(f"Error loading state: {e}")
            return {}
    
    def save_state(bucket: str, key: str, state: Dict[str, Any]):
        """Save state to S3."""
        try:
            s3.put_object(
                Bucket=bucket,
                Key=key,
                Body=json.dumps(state).encode('utf-8'),
                ContentType='application/json'
            )
        except Exception as e:
            print(f"Error saving state: {e}")
    
  5. Rufen Sie Konfiguration > Umgebungsvariablen auf.

  6. Klicken Sie auf Bearbeiten> Neue Umgebungsvariable hinzufügen.

  7. Geben Sie die in der folgenden Tabelle aufgeführten Umgebungsvariablen ein und ersetzen Sie die Beispielwerte durch Ihre Werte.

    Umgebungsvariablen

    Schlüssel Beispielwert
    S3_BUCKET duo-telephony-logs
    S3_PREFIX duo-telephony/
    STATE_KEY duo-telephony/state.json
    DUO_IKEY <your-integration-key>
    DUO_SKEY <your-secret-key>
    DUO_API_HOST api-yyyyyyyy.duosecurity.com
    MAX_ITERATIONS 10
  8. Bleiben Sie nach dem Erstellen der Funktion auf der entsprechenden Seite oder öffnen Sie Lambda> Funktionen> duo-telephony-logs-collector.

  9. Wählen Sie den Tab Konfiguration aus.

  10. Klicken Sie im Bereich Allgemeine Konfiguration auf Bearbeiten.

  11. Ändern Sie Zeitlimit in 5 Minuten (300 Sekunden) und klicken Sie auf Speichern.

EventBridge-Zeitplan erstellen

  1. Gehen Sie zu Amazon EventBridge > Scheduler > Create schedule (Amazon EventBridge > Scheduler > Zeitplan erstellen).
  2. Geben Sie die folgenden Konfigurationsdetails an:
    • Wiederkehrender Zeitplan: Preis (1 hour).
    • Ziel: Ihre Lambda-Funktion duo-telephony-logs-collector.
    • Name: duo-telephony-logs-1h.
  3. Klicken Sie auf Zeitplan erstellen.

(Optional) IAM-Nutzer mit Lesezugriff und Schlüssel für Google SecOps erstellen

  1. Rufen Sie die AWS-Konsole > IAM > Nutzer auf.
  2. Klicken Sie auf Add users (Nutzer hinzufügen).
  3. Geben Sie die folgenden Konfigurationsdetails an:
    • Nutzer: Geben Sie secops-reader ein.
    • Zugriffstyp: Wählen Sie Zugriffsschlüssel – programmatischer Zugriff aus.
  4. Klicken Sie auf Nutzer erstellen.
  5. Minimale Leseberechtigung (benutzerdefiniert) anhängen: Nutzer > secops-reader > Berechtigungen > Berechtigungen hinzufügen > Richtlinien direkt anhängen > Richtlinie erstellen.
  6. JSON:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::duo-telephony-logs/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::duo-telephony-logs"
        }
      ]
    }
    
  7. Name = secops-reader-policy.

  8. Klicken Sie auf Richtlinie erstellen> suchen/auswählen> Weiter> Berechtigungen hinzufügen.

  9. Erstellen Sie einen Zugriffsschlüssel für secops-reader: Sicherheitsanmeldedaten > Zugriffsschlüssel.

  10. Klicken Sie auf Zugriffsschlüssel erstellen.

  11. Laden Sie die Datei .CSV herunter. Sie fügen diese Werte in den Feed ein.

Feed in Google SecOps konfigurieren, um Duo-Telefonie-Logs aufzunehmen

  1. Rufen Sie die SIEM-Einstellungen > Feeds auf.
  2. Klicken Sie auf + Neuen Feed hinzufügen.
  3. Geben Sie im Feld Feedname einen Namen für den Feed ein, z. B. Duo Telephony logs.
  4. Wählen Sie Amazon S3 V2 als Quelltyp aus.
  5. Wählen Sie Duo-Telefonie-Logs als Logtyp aus.
  6. Klicken Sie auf Weiter.
  7. Geben Sie Werte für die folgenden Eingabeparameter an:
    • S3-URI: s3://duo-telephony-logs/duo-telephony/
    • Optionen zum Löschen der Quelle: Wählen Sie die gewünschte Option zum Löschen aus.
    • Maximales Dateialter: Dateien einschließen, die in den letzten Tagen geändert wurden. Der Standardwert ist 180 Tage.
    • Zugriffsschlüssel-ID: Nutzerzugriffsschlüssel mit Zugriff auf den S3-Bucket.
    • Geheimer Zugriffsschlüssel: Der geheime Schlüssel des Nutzers mit Zugriff auf den S3-Bucket.
    • Asset-Namespace: Der Asset-Namespace.
    • Aufnahmelabels: Das Label, das auf die Ereignisse aus diesem Feed angewendet wird.
  8. Klicken Sie auf Weiter.
  9. Prüfen Sie die neue Feedkonfiguration auf dem Bildschirm Abschließen und klicken Sie dann auf Senden.

UDM-Zuordnungstabelle

Logfeld UDM-Zuordnung Logik
context metadata.product_event_type Direkt aus dem Feld context im Rohlog zugeordnet.
credits security_result.detection_fields.value Direkt aus dem Feld credits im Rohlog zugeordnet, das in einem detection_fields-Objekt mit dem entsprechenden Schlüssel credits verschachtelt ist.
eventtype security_result.detection_fields.value Direkt aus dem Feld eventtype im Rohlog zugeordnet, das in einem detection_fields-Objekt mit dem entsprechenden Schlüssel eventtype verschachtelt ist.
host principal.hostname Direkt aus dem Feld host im Rohlog, wenn es sich nicht um eine IP-Adresse handelt. Im Parser auf den statischen Wert „ALLOW“ festlegen. Wird im Parser auf den statischen Wert „MECHANISM_UNSPECIFIED“ gesetzt. Aus dem Feld timestamp im Rohlog geparst, das Sekunden seit der Epoche darstellt. Auf „USER_UNCATEGORIZED“ festgelegt, wenn sowohl das Feld context als auch das Feld host im Rohlog vorhanden sind. Auf „STATUS_UPDATE“ festgelegt, wenn nur host vorhanden ist. Andernfalls auf „GENERIC_EVENT“ setzen. Direkt aus dem Feld log_type des Rohlogs. Im Parser auf den statischen Wert „Telephony“ festlegen. Legen Sie im Parser einen statischen Wert von „Duo“ fest.
phone principal.user.phone_numbers Direkt aus dem Feld phone im Rohlog zugeordnet.
phone principal.user.userid Direkt aus dem Feld phone im Rohlog zugeordnet. Im Parser auf den statischen Wert „INFORMATIONAL“ festlegen. Im Parser auf den statischen Wert „Duo Telephony“ festlegen.
timestamp metadata.event_timestamp Aus dem Feld timestamp im Rohlog geparst, das Sekunden seit der Epoche darstellt.
type security_result.summary Direkt aus dem Feld type im Rohlog zugeordnet.

Benötigen Sie weitere Hilfe? Antworten von Community-Mitgliedern und Google SecOps-Experten erhalten