Duo-Telefonie-Logs erfassen
In diesem Dokument wird beschrieben, wie Sie Duo Telephony-Logs mit Amazon S3 in Google Security Operations aufnehmen. Der Parser extrahiert Felder aus den Logs, transformiert sie und ordnet sie dem Unified Data Model (UDM) zu. Es verarbeitet verschiedene Duo-Logformate, konvertiert Zeitstempel, extrahiert Nutzerinformationen, Netzwerkdetails und Sicherheitsergebnisse und strukturiert die Ausgabe schließlich im standardisierten UDM-Format.
Hinweise
Prüfen Sie, ob folgende Voraussetzungen erfüllt sind:
- Google SecOps-Instanz.
- Privilegierter Zugriff auf das Duo-Admin-Panel mit der Rolle Inhaber.
- Privilegierter Zugriff auf AWS (S3, Identity and Access Management (IAM), Lambda, EventBridge).
Duo-Voraussetzungen (API-Anmeldedaten) erfassen
- Melden Sie sich im Duo-Admin-Steuerfeld als Administrator mit der Rolle Inhaber an.
- Rufen Sie Anwendungen > Anwendungskatalog auf.
- Suchen Sie im Katalog nach dem Eintrag für die Admin API.
- Klicken Sie auf + Hinzufügen, um die Anwendung zu erstellen.
- Kopieren Sie die folgenden Details und speichern Sie sie an einem sicheren Ort:
- Integrationsschlüssel
- Geheimer Schlüssel
- API-Hostname (z. B.
api-yyyyyyyy.duosecurity.com
)
- Deaktivieren Sie im Abschnitt Berechtigungen alle Berechtigungsoptionen mit Ausnahme von Leseprotokoll gewähren.
- Klicken Sie auf Änderungen speichern.
AWS S3-Bucket und IAM für Google SecOps konfigurieren
- Erstellen Sie einen Amazon S3-Bucket. Folgen Sie dazu der Anleitung unter Bucket erstellen.
- Speichern Sie den Namen und die Region des Buckets zur späteren Verwendung (z. B.
duo-telephony-logs
). - Erstellen Sie einen Nutzer gemäß dieser Anleitung: IAM-Nutzer erstellen.
- Wählen Sie den erstellten Nutzer aus.
- Wählen Sie den Tab Sicherheitsanmeldedaten aus.
- Klicken Sie im Abschnitt Zugriffsschlüssel auf Zugriffsschlüssel erstellen.
- Wählen Sie Drittanbieterdienst als Anwendungsfall aus.
- Klicken Sie auf Weiter.
- Optional: Fügen Sie ein Beschreibungstag hinzu.
- Klicken Sie auf Zugriffsschlüssel erstellen.
- Klicken Sie auf CSV-Datei herunterladen, um den Access Key (Zugriffsschlüssel) und den Secret Access Key (geheimer Zugriffsschlüssel) für die zukünftige Verwendung zu speichern.
- Klicken Sie auf Fertig.
- Wählen Sie den Tab Berechtigungen aus.
- Klicken Sie im Bereich Berechtigungsrichtlinien auf Berechtigungen hinzufügen.
- Wählen Sie Berechtigungen hinzufügen aus.
- Wählen Sie Richtlinien direkt anhängen aus.
- Suchen Sie nach der Richtlinie AmazonS3FullAccess.
- Wählen Sie die Richtlinie aus.
- Klicken Sie auf Weiter.
- Klicken Sie auf Berechtigungen hinzufügen.
IAM-Richtlinie und ‑Rolle für S3-Uploads konfigurieren
- Rufen Sie in der AWS-Konsole IAM > Richtlinien auf.
- Klicken Sie auf Richtlinie erstellen> Tab „JSON“.
- Kopieren Sie die folgende Richtlinie und fügen Sie sie ein.
Policy JSON (ersetzen Sie
duo-telephony-logs
, wenn Sie einen anderen Bucket-Namen eingegeben haben):{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::duo-telephony-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::duo-telephony-logs/duo-telephony/state.json" } ] }
Klicken Sie auf Weiter > Richtlinie erstellen.
Rufen Sie IAM > Rollen > Rolle erstellen > AWS-Service > Lambda auf.
Hängen Sie die neu erstellte Richtlinie an.
Geben Sie der Rolle den Namen
duo-telephony-lambda-role
und klicken Sie auf Rolle erstellen.
Lambda-Funktion erstellen
- Rufen Sie in der AWS Console Lambda > Funktionen > Funktion erstellen auf.
- Klicken Sie auf Von Grund auf erstellen.
Geben Sie die folgenden Konfigurationsdetails an:
Einstellung Wert Name duo-telephony-logs-collector
Laufzeit Python 3.13 Architektur x86_64 Ausführungsrolle duo-telephony-lambda-role
Nachdem die Funktion erstellt wurde, öffnen Sie den Tab Code, löschen Sie den Stub und fügen Sie den folgenden Code (
duo-telephony-logs-collector.py
) ein.import json import boto3 import os import hmac import hashlib import base64 import urllib.parse import urllib.request import email.utils from datetime import datetime, timedelta, timezone from typing import Dict, Any, List, Optional from botocore.exceptions import ClientError s3 = boto3.client('s3') def lambda_handler(event, context): """ Lambda function to fetch Duo telephony logs and store them in S3. """ try: # Get configuration from environment variables bucket_name = os.environ['S3_BUCKET'] s3_prefix = os.environ['S3_PREFIX'].rstrip('/') state_key = os.environ['STATE_KEY'] integration_key = os.environ['DUO_IKEY'] secret_key = os.environ['DUO_SKEY'] api_hostname = os.environ['DUO_API_HOST'] # Load state state = load_state(bucket_name, state_key) # Calculate time range now = datetime.now(timezone.utc) if state.get('last_offset'): # Continue from last offset next_offset = state['last_offset'] logs = [] has_more = True else: # Start from last timestamp or 24 hours ago mintime = state.get('last_timestamp_ms', int((now - timedelta(hours=24)).timestamp() * 1000)) # Apply 2-minute delay as recommended by Duo maxtime = int((now - timedelta(minutes=2)).timestamp() * 1000) next_offset = None logs = [] has_more = True # Fetch logs with pagination total_fetched = 0 max_iterations = int(os.environ.get('MAX_ITERATIONS', '10')) while has_more and total_fetched < max_iterations: if next_offset: # Use offset for pagination params = { 'limit': '1000', 'next_offset': next_offset } else: # Initial request with time range params = { 'mintime': str(mintime), 'maxtime': str(maxtime), 'limit': '1000', 'sort': 'ts:asc' } # Make API request with retry logic response = duo_api_call_with_retry( 'GET', api_hostname, '/admin/v2/logs/telephony', params, integration_key, secret_key ) if 'items' in response: logs.extend(response['items']) total_fetched += 1 # Check for more data if 'metadata' in response and 'next_offset' in response['metadata']: next_offset = response['metadata']['next_offset'] state['last_offset'] = next_offset else: has_more = False state['last_offset'] = None # Update timestamp for next run if logs: # Get the latest timestamp from logs latest_ts = max([log.get('ts', '') for log in logs]) if latest_ts: # Convert ISO timestamp to milliseconds dt = datetime.fromisoformat(latest_ts.replace('Z', '+00:00')) state['last_timestamp_ms'] = int(dt.timestamp() * 1000) + 1 else: has_more = False # Save logs to S3 if any were fetched if logs: timestamp = datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S') key = f"{s3_prefix}/telephony_{timestamp}.json" # Format logs as newline-delimited JSON log_data = '\n'.join(json.dumps(log) for log in logs) s3.put_object( Bucket=bucket_name, Key=key, Body=log_data.encode('utf-8'), ContentType='application/x-ndjson' ) print(f"Saved {len(logs)} telephony logs to s3://{bucket_name}/{key}") else: print("No new telephony logs found") # Save state save_state(bucket_name, state_key, state) return { 'statusCode': 200, 'body': json.dumps({ 'message': f'Successfully processed {len(logs)} telephony logs', 'logs_count': len(logs) }) } except Exception as e: print(f"Error: {str(e)}") return { 'statusCode': 500, 'body': json.dumps({'error': str(e)}) } def duo_api_call_with_retry(method: str, host: str, path: str, params: Dict[str, str], ikey: str, skey: str, max_retries: int = 3) -> Dict[str, Any]: """ Make an authenticated API call to Duo Admin API with retry logic. """ for attempt in range(max_retries): try: return duo_api_call(method, host, path, params, ikey, skey) except Exception as e: if '429' in str(e) or '5' in str(e)[:1]: # Rate limit or server error if attempt < max_retries - 1: wait_time = (2 ** attempt) * 2 # Exponential backoff print(f"Retrying after {wait_time} seconds...") import time time.sleep(wait_time) continue raise def duo_api_call(method: str, host: str, path: str, params: Dict[str, str], ikey: str, skey: str) -> Dict[str, Any]: """ Make an authenticated API call to Duo Admin API. """ # Create canonical string for signing using RFC 2822 date format now = email.utils.formatdate() canon = [now, method.upper(), host.lower(), path] # Add parameters args = [] for key in sorted(params.keys()): val = params[key] args.append(f"{urllib.parse.quote(key, '~')}={urllib.parse.quote(val, '~')}") canon.append('&'.join(args)) canon_str = '\n'.join(canon) # Sign the request sig = hmac.new( skey.encode('utf-8'), canon_str.encode('utf-8'), hashlib.sha1 ).hexdigest() # Create authorization header auth = base64.b64encode(f"{ikey}:{sig}".encode('utf-8')).decode('utf-8') # Build URL url = f"https://{host}{path}" if params: url += '?' + '&'.join(args) # Make request req = urllib.request.Request(url) req.add_header('Authorization', f'Basic {auth}') req.add_header('Date', now) req.add_header('Host', host) req.add_header('User-Agent', 'duo-telephony-s3-ingestor/1.0') try: with urllib.request.urlopen(req, timeout=30) as response: data = json.loads(response.read().decode('utf-8')) if data.get('stat') == 'OK': return data.get('response', {}) else: raise Exception(f"API error: {data.get('message', 'Unknown error')}") except urllib.error.HTTPError as e: error_body = e.read().decode('utf-8') raise Exception(f"HTTP error {e.code}: {error_body}") def load_state(bucket: str, key: str) -> Dict[str, Any]: """Load state from S3.""" try: response = s3.get_object(Bucket=bucket, Key=key) return json.loads(response['Body'].read().decode('utf-8')) except ClientError as e: if e.response.get('Error', {}).get('Code') in ('NoSuchKey', '404'): return {} print(f"Error loading state: {e}") return {} except Exception as e: print(f"Error loading state: {e}") return {} def save_state(bucket: str, key: str, state: Dict[str, Any]): """Save state to S3.""" try: s3.put_object( Bucket=bucket, Key=key, Body=json.dumps(state).encode('utf-8'), ContentType='application/json' ) except Exception as e: print(f"Error saving state: {e}")
Rufen Sie Konfiguration > Umgebungsvariablen auf.
Klicken Sie auf Bearbeiten> Neue Umgebungsvariable hinzufügen.
Geben Sie die in der folgenden Tabelle aufgeführten Umgebungsvariablen ein und ersetzen Sie die Beispielwerte durch Ihre Werte.
Umgebungsvariablen
Schlüssel Beispielwert S3_BUCKET
duo-telephony-logs
S3_PREFIX
duo-telephony/
STATE_KEY
duo-telephony/state.json
DUO_IKEY
<your-integration-key>
DUO_SKEY
<your-secret-key>
DUO_API_HOST
api-yyyyyyyy.duosecurity.com
MAX_ITERATIONS
10
Bleiben Sie nach dem Erstellen der Funktion auf der entsprechenden Seite oder öffnen Sie Lambda> Funktionen> duo-telephony-logs-collector.
Wählen Sie den Tab Konfiguration aus.
Klicken Sie im Bereich Allgemeine Konfiguration auf Bearbeiten.
Ändern Sie Zeitlimit in 5 Minuten (300 Sekunden) und klicken Sie auf Speichern.
EventBridge-Zeitplan erstellen
- Gehen Sie zu Amazon EventBridge > Scheduler > Create schedule (Amazon EventBridge > Scheduler > Zeitplan erstellen).
- Geben Sie die folgenden Konfigurationsdetails an:
- Wiederkehrender Zeitplan: Preis (
1 hour
). - Ziel: Ihre Lambda-Funktion
duo-telephony-logs-collector
. - Name:
duo-telephony-logs-1h
.
- Wiederkehrender Zeitplan: Preis (
- Klicken Sie auf Zeitplan erstellen.
(Optional) IAM-Nutzer mit Lesezugriff und Schlüssel für Google SecOps erstellen
- Rufen Sie die AWS-Konsole > IAM > Nutzer auf.
- Klicken Sie auf Add users (Nutzer hinzufügen).
- Geben Sie die folgenden Konfigurationsdetails an:
- Nutzer: Geben Sie
secops-reader
ein. - Zugriffstyp: Wählen Sie Zugriffsschlüssel – programmatischer Zugriff aus.
- Nutzer: Geben Sie
- Klicken Sie auf Nutzer erstellen.
- Minimale Leseberechtigung (benutzerdefiniert) anhängen: Nutzer > secops-reader > Berechtigungen > Berechtigungen hinzufügen > Richtlinien direkt anhängen > Richtlinie erstellen.
JSON:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::duo-telephony-logs/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::duo-telephony-logs" } ] }
Name =
secops-reader-policy
.Klicken Sie auf Richtlinie erstellen> suchen/auswählen> Weiter> Berechtigungen hinzufügen.
Erstellen Sie einen Zugriffsschlüssel für
secops-reader
: Sicherheitsanmeldedaten > Zugriffsschlüssel.Klicken Sie auf Zugriffsschlüssel erstellen.
Laden Sie die Datei
.CSV
herunter. Sie fügen diese Werte in den Feed ein.
Feed in Google SecOps konfigurieren, um Duo-Telefonie-Logs aufzunehmen
- Rufen Sie die SIEM-Einstellungen > Feeds auf.
- Klicken Sie auf + Neuen Feed hinzufügen.
- Geben Sie im Feld Feedname einen Namen für den Feed ein, z. B.
Duo Telephony logs
. - Wählen Sie Amazon S3 V2 als Quelltyp aus.
- Wählen Sie Duo-Telefonie-Logs als Logtyp aus.
- Klicken Sie auf Weiter.
- Geben Sie Werte für die folgenden Eingabeparameter an:
- S3-URI:
s3://duo-telephony-logs/duo-telephony/
- Optionen zum Löschen der Quelle: Wählen Sie die gewünschte Option zum Löschen aus.
- Maximales Dateialter: Dateien einschließen, die in den letzten Tagen geändert wurden. Der Standardwert ist 180 Tage.
- Zugriffsschlüssel-ID: Nutzerzugriffsschlüssel mit Zugriff auf den S3-Bucket.
- Geheimer Zugriffsschlüssel: Der geheime Schlüssel des Nutzers mit Zugriff auf den S3-Bucket.
- Asset-Namespace: Der Asset-Namespace.
- Aufnahmelabels: Das Label, das auf die Ereignisse aus diesem Feed angewendet wird.
- S3-URI:
- Klicken Sie auf Weiter.
- Prüfen Sie die neue Feedkonfiguration auf dem Bildschirm Abschließen und klicken Sie dann auf Senden.
UDM-Zuordnungstabelle
Logfeld | UDM-Zuordnung | Logik |
---|---|---|
context |
metadata.product_event_type |
Direkt aus dem Feld context im Rohlog zugeordnet. |
credits |
security_result.detection_fields.value |
Direkt aus dem Feld credits im Rohlog zugeordnet, das in einem detection_fields -Objekt mit dem entsprechenden Schlüssel credits verschachtelt ist. |
eventtype |
security_result.detection_fields.value |
Direkt aus dem Feld eventtype im Rohlog zugeordnet, das in einem detection_fields -Objekt mit dem entsprechenden Schlüssel eventtype verschachtelt ist. |
host |
principal.hostname |
Direkt aus dem Feld host im Rohlog, wenn es sich nicht um eine IP-Adresse handelt. Im Parser auf den statischen Wert „ALLOW“ festlegen. Wird im Parser auf den statischen Wert „MECHANISM_UNSPECIFIED“ gesetzt. Aus dem Feld timestamp im Rohlog geparst, das Sekunden seit der Epoche darstellt. Auf „USER_UNCATEGORIZED“ festgelegt, wenn sowohl das Feld context als auch das Feld host im Rohlog vorhanden sind. Auf „STATUS_UPDATE“ festgelegt, wenn nur host vorhanden ist. Andernfalls auf „GENERIC_EVENT“ setzen. Direkt aus dem Feld log_type des Rohlogs. Im Parser auf den statischen Wert „Telephony“ festlegen. Legen Sie im Parser einen statischen Wert von „Duo“ fest. |
phone |
principal.user.phone_numbers |
Direkt aus dem Feld phone im Rohlog zugeordnet. |
phone |
principal.user.userid |
Direkt aus dem Feld phone im Rohlog zugeordnet. Im Parser auf den statischen Wert „INFORMATIONAL“ festlegen. Im Parser auf den statischen Wert „Duo Telephony“ festlegen. |
timestamp |
metadata.event_timestamp |
Aus dem Feld timestamp im Rohlog geparst, das Sekunden seit der Epoche darstellt. |
type |
security_result.summary |
Direkt aus dem Feld type im Rohlog zugeordnet. |
Benötigen Sie weitere Hilfe? Antworten von Community-Mitgliedern und Google SecOps-Experten erhalten