Raccogliere i log degli amministratori di Duo
Questo documento spiega come importare i log degli amministratori di Duo in Google Security Operations utilizzando Amazon S3. Il parser estrae i campi dai log (formato JSON) e li mappa al modello UDM (Unified Data Model). Gestisce in modo diverso i vari tipi action di Duo (accesso, gestione utenti, gestione gruppi), compilando i campi UDM pertinenti in base all'azione e ai dati disponibili, inclusi dettagli utente, fattori di autenticazione e risultati di sicurezza. Esegue anche trasformazioni dei dati, come l'unione degli indirizzi IP, la conversione dei timestamp e la gestione degli errori.
Prima di iniziare
- Istanza Google SecOps
- Accesso con privilegi al tenant Duo (applicazione API Admin)
- Accesso con privilegi ad AWS (S3, IAM, Lambda, EventBridge)
Configurare l'applicazione API Admin di Duo
- Accedi al pannello di amministrazione di Duo.
- Vai a Applicazioni > Catalogo applicazioni.
- Aggiungi l'applicazione API Admin.
- Registra i seguenti valori:
- Chiave di integrazione (ikey)
- Chiave secret (skey)
- Nome host API (ad esempio
api-XXXXXXXX.duosecurity.com)
- In Autorizzazioni, attiva Concedi log di lettura (per leggere i log degli amministratori).
- Salva l'applicazione.
Configurare il bucket AWS S3 e IAM per Google SecOps
- Crea un bucket Amazon S3 seguendo questa guida utente: Creare un bucket
- Salva Nome e Regione del bucket per riferimento futuro (ad esempio
duo-admin-logs). - Crea un utente seguendo questa guida utente: Creare un utente IAM.
- Seleziona l'utente creato.
- Seleziona la scheda Credenziali di sicurezza.
- Fai clic su Crea chiave di accesso nella sezione Chiavi di accesso.
- Seleziona Servizio di terze parti come Caso d'uso.
- Fai clic su Avanti.
- (Facoltativo) Aggiungi un tag di descrizione.
- Fai clic su Crea chiave di accesso.
- Fai clic su Scarica file CSV per salvare la chiave di accesso e la chiave di accesso secret per un utilizzo successivo.
- Fai clic su Fine.
- Seleziona la scheda Autorizzazioni.
- Fai clic su Aggiungi autorizzazioni nella sezione Criteri di autorizzazione.
- Seleziona Aggiungi autorizzazioni.
- Seleziona Collega direttamente i criteri
- Cerca e seleziona il criterio AmazonS3FullAccess.
- Fai clic su Avanti.
- Fai clic su Aggiungi autorizzazioni.
Configurare il criterio e il ruolo IAM per i caricamenti S3
- Vai a Console AWS > IAM > Criteri > Crea criterio > Scheda JSON.
Inserisci il seguente criterio:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutDuoAdminObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::duo-admin-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::duo-admin-logs/duo/admin/state.json" } ] }- Sostituisci
duo-admin-logsse hai inserito un nome del bucket diverso:
- Sostituisci
Fai clic su Avanti > Crea criterio.
Vai a IAM > Ruoli > Crea ruolo > Servizio AWS > Lambda.
Collega il criterio appena creato.
Assegna al ruolo il nome
WriteDuoAdminToS3Rolee fai clic su Crea ruolo.
Creare la funzione Lambda
- Nella console AWS, vai a Lambda > Funzioni > Crea funzione.
- Fai clic su Crea da zero.
Fornisci i seguenti dettagli di configurazione:
Impostazione Valore Nome duo_admin_to_s3Tempo di esecuzione Python 3.13 Architettura x86_64 Ruolo di esecuzione WriteDuoAdminToS3RoleDopo aver creato la funzione, apri la scheda Codice, elimina lo stub e inserisci il seguente codice (
duo_admin_to_s3.py):#!/usr/bin/env python3 # Lambda: Pull Duo Admin API v1 Administrator Logs to S3 (raw JSON pages) import os, json, time, hmac, hashlib, base64, email.utils, urllib.parse from urllib.request import Request, urlopen from urllib.error import HTTPError, URLError from datetime import datetime import boto3 DUO_IKEY = os.environ["DUO_IKEY"] DUO_SKEY = os.environ["DUO_SKEY"] DUO_API_HOSTNAME = os.environ["DUO_API_HOSTNAME"].strip() S3_BUCKET = os.environ["S3_BUCKET"] S3_PREFIX = os.environ.get("S3_PREFIX", "duo/admin/").strip("/") STATE_KEY = os.environ.get("STATE_KEY", "duo/admin/state.json") s3 = boto3.client("s3") def _canon_params(params: dict) -> str: parts = [] for k in sorted(params.keys()): v = params[k] if v is None: continue parts.append(f"{urllib.parse.quote(str(k), '~')}={urllib.parse.quote(str(v), '~')}") return "&".join(parts) def _sign(method: str, host: str, path: str, params: dict) -> dict: now = email.utils.formatdate() canon = "\n".join([now, method.upper(), host.lower(), path, _canon_params(params)]) sig = hmac.new(DUO_SKEY.encode("utf-8"), canon.encode("utf-8"), hashlib.sha1).hexdigest() auth = base64.b64encode(f"{DUO_IKEY}:{sig}".encode()).decode() return {"Date": now, "Authorization": f"Basic {auth}"} def _http(method: str, path: str, params: dict, timeout: int = 60, max_retries: int = 5) -> dict: host = DUO_API_HOSTNAME assert host.startswith("api-") and host.endswith(".duosecurity.com"), \ "DUO_API_HOSTNAME must be like api-XXXXXXXX.duosecurity.com" qs = _canon_params(params) url = f"https://{host}{path}" + (f"?{qs}" if qs else "") attempt, backoff = 0, 1.0 while True: req = Request(url, method=method.upper()) hdrs = _sign(method, host, path, params) req.add_header("Accept", "application/json") for k, v in hdrs.items(): req.add_header(k, v) try: with urlopen(req, timeout=timeout) as r: return json.loads(r.read().decode("utf-8")) except HTTPError as e: # 429 or 5xx → exponential backoff if (e.code == 429 or 500 <= e.code <= 599) and attempt < max_retries: time.sleep(backoff) attempt += 1 backoff *= 2 continue raise except URLError: if attempt < max_retries: time.sleep(backoff) attempt += 1 backoff *= 2 continue raise def _read_state() -> int | None: try: obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY) return int(json.loads(obj["Body"].read()).get("mintime")) except Exception: return None def _write_state(mintime: int): body = json.dumps({"mintime": mintime}).encode("utf-8") s3.put_object(Bucket=S3_BUCKET, Key=STATE_KEY, Body=body, ContentType="application/json") def _epoch_from_item(item: dict) -> int | None: # Prefer numeric 'timestamp' (seconds); fallback to ISO8601 'ts' ts_num = item.get("timestamp") if isinstance(ts_num, (int, float)): return int(ts_num) ts_iso = item.get("ts") if isinstance(ts_iso, str): try: # Accept "...Z" or with offset return int(datetime.fromisoformat(ts_iso.replace("Z", "+00:00")).timestamp()) except Exception: return None return None def _write_page(payload: dict, when: int, page: int) -> str: key = f"{S3_PREFIX}/{time.strftime('%Y/%m/%d', time.gmtime(when))}/duo-admin-{page:05d}.json" s3.put_object( Bucket=S3_BUCKET, Key=key, Body=json.dumps(payload, separators=(",", ":")).encode("utf-8"), ContentType="application/json", ) return key def fetch_and_store(): now = int(time.time()) # Start from last checkpoint or now-3600 on first run mintime = _read_state() or (now - 3600) page = 0 total = 0 next_mintime = mintime max_seen_ts = mintime while True: data = _http("GET", "/admin/v1/logs/administrator", {"mintime": mintime}) _write_page(data, now, page) page += 1 # Extract items resp = data.get("response") items = resp if isinstance(resp, list) else (resp.get("items") if isinstance(resp, dict) else []) items = items or [] if not items: break total += len(items) # Track the newest timestamp in this batch for it in items: ts = _epoch_from_item(it) if ts and ts > max_seen_ts: max_seen_ts = ts # Duo returns only the 1000 earliest events; page by advancing mintime if len(items) >= 1000 and max_seen_ts >= mintime: mintime = max_seen_ts next_mintime = max_seen_ts continue else: break # Save checkpoint: newest seen ts, or "now" if nothing new if max_seen_ts > next_mintime: _write_state(max_seen_ts) next_state = max_seen_ts else: _write_state(now) next_state = now return {"ok": True, "pages": page, "events": total, "next_mintime": next_state} def lambda_handler(event=None, context=None): return fetch_and_store() if __name__ == "__main__": print(lambda_handler())Vai a Configurazione > Variabili di ambiente > Modifica > Aggiungi nuova variabile di ambiente.
Inserisci le seguenti variabili di ambiente, sostituendo i valori con i tuoi.
Chiave Esempio S3_BUCKETduo-admin-logsS3_PREFIXduo/admin/STATE_KEYduo/admin/state.jsonDUO_IKEYDIXYZ...DUO_SKEY****************DUO_API_HOSTNAMEapi-XXXXXXXX.duosecurity.comDopo aver creato la funzione, rimani sulla relativa pagina (o apri Lambda > Funzioni > tua-funzione).
Seleziona la scheda Configurazione.
Nel riquadro Configurazione generale , fai clic su Modifica.
Modifica Timeout in 5 minuti (300 secondi) e fai clic su Salva.
Creare una pianificazione EventBridge
- Vai a Amazon EventBridge > Scheduler > Crea pianificazione.
- Fornisci i seguenti dettagli di configurazione:
- Pianificazione ricorrente: Tariffa (
1 hour). - Destinazione: la tua funzione Lambda.
- Nome:
duo-admin-1h.
- Pianificazione ricorrente: Tariffa (
- Fai clic su Crea pianificazione.
(Facoltativo) Creare un utente e chiavi IAM di sola lettura per Google SecOps
- Nella console AWS, vai a IAM > Utenti, quindi fai clic su Aggiungi utenti.
- Fornisci i seguenti dettagli di configurazione:
- Utente: inserisci un nome univoco (ad esempio
secops-reader) - Tipo di accesso: seleziona Chiave di accesso - Accesso programmatico
- Fai clic su Crea utente.
- Utente: inserisci un nome univoco (ad esempio
- Collega il criterio per la lettura minimo (personalizzato): Utenti > seleziona
secops-reader> Autorizzazioni > Aggiungi autorizzazioni > Collega direttamente i criteri > Crea criterio Nell'editor JSON, inserisci il seguente criterio:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::<your-bucket>/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::<your-bucket>" } ] }Imposta il nome su
secops-reader-policy.Vai a Crea criterio > cerca/seleziona > Avanti > Aggiungi autorizzazioni.
Vai a Credenziali di sicurezza > Chiavi di accesso > Crea chiave di accesso.
Scarica il CSV (questi valori vengono inseriti nel feed).
Configurare un feed in Google SecOps per importare i log degli amministratori di Duo
- Vai a Impostazioni SIEM > Feed.
- Fai clic su + Aggiungi nuovo feed.
- Nel campo Nome feed , inserisci un nome per il feed (ad esempio
Duo Administrator Logs). - Seleziona Amazon S3 V2 come Tipo di origine.
- Seleziona Log degli amministratori di Duo come Tipo di log.
- Fai clic su Avanti.
- Specifica i valori per i seguenti parametri di input:
- URI S3:
s3://duo-admin-logs/duo/admin/ - Opzioni di eliminazione dell'origine: seleziona l'opzione di eliminazione in base alle tue preferenze.
- Durata massima del file: valore predefinito 180 giorni.
- ID chiave di accesso: chiave di accesso utente con accesso al bucket S3.
- Chiave di accesso secret: chiave secret utente con accesso al bucket S3.
- Spazio dei nomi dell'asset: lo spazio dei nomi dell'asset.
- Etichette di importazione: l'etichetta applicata agli eventi di questo feed.
- URI S3:
- Fai clic su Avanti.
- Esamina la nuova configurazione del feed nella schermata Finalizza, quindi fai clic su Invia.
Tabella di mapping UDM
| Campo log | Mapping UDM | Funzione logica |
|---|---|---|
action |
metadata.product_event_type |
Il valore del campo action del log non elaborato. |
desc |
metadata.description |
Il valore del campo desc dell'oggetto description del log non elaborato. |
description._status |
target.group.attribute.labels.value |
Il valore del campo _status all'interno dell'oggetto description del log non elaborato, in particolare durante l'elaborazione delle azioni correlate ai gruppi. Questo valore viene inserito in un array "labels" con una "key" corrispondente di "status". |
description.desc |
metadata.description |
Il valore del campo desc dell'oggetto description del log non elaborato. |
description.email |
target.user.email_addresses |
Il valore del campo email dell'oggetto description del log non elaborato. |
description.error |
security_result.summary |
Il valore del campo error dell'oggetto description del log non elaborato. |
description.factor |
extensions.auth.auth_details |
Il valore del campo factor dell'oggetto description del log non elaborato. |
description.groups.0._status |
target.group.attribute.labels.value |
Il valore del campo _status del primo elemento dell'array groups all'interno dell'oggetto description del log non elaborato. Questo valore viene inserito in un array "labels" con una "key" corrispondente di "status". |
description.groups.0.name |
target.group.group_display_name |
Il valore del campo name del primo elemento dell'array groups all'interno dell'oggetto description del log non elaborato. |
description.ip_address |
principal.ip |
Il valore del campo ip_address dell'oggetto description del log non elaborato. |
description.name |
target.group.group_display_name |
Il valore del campo name dell'oggetto description del log non elaborato. |
description.realname |
target.user.user_display_name |
Il valore del campo realname dell'oggetto description del log non elaborato. |
description.status |
target.user.attribute.labels.value |
Il valore del campo status dell'oggetto description del log non elaborato. Questo valore viene inserito in un array "labels" con una "key" corrispondente di "status". |
description.uname |
target.user.email_addresses o target.user.userid |
Il valore del campo uname dell'oggetto description del log non elaborato. Se corrisponde a un formato di indirizzo email, viene mappato a email_addresses; in caso contrario, viene mappato a userid. |
host |
principal.hostname |
Il valore del campo host del log non elaborato. |
isotimestamp |
metadata.event_timestamp.seconds |
Il valore del campo isotimestamp del log non elaborato, convertito in secondi di epoca. |
object |
target.group.group_display_name |
Il valore del campo object del log non elaborato. |
timestamp |
metadata.event_timestamp.seconds |
Il valore del campo timestamp del log non elaborato. |
username |
target.user.userid o principal.user.userid |
Se il campo action contiene "login", il valore viene mappato a target.user.userid. In caso contrario, viene mappato a principal.user.userid. Imposta su "USERNAME_PASSWORD" se il action campo contiene "login". Determinato dal parser in base al action campo. Valori possibili: USER_LOGIN, GROUP_CREATION, USER_UNCATEGORIZED, GROUP_DELETION, USER_CREATION, GROUP_MODIFICATION, GENERIC_EVENT. Imposta sempre su "DUO_ADMIN". Imposta sempre su "MULTI-FACTOR_AUTHENTICATION". Imposta sempre su "DUO_SECURITY". Imposta su "ADMINISTRATOR" se il eventtype campo contiene "admin". Determinato dal parser in base al action campo. Imposta su "BLOCK" se il campo action contiene "error"; in caso contrario, imposta su "ALLOW". Imposta sempre su "status" quando compili target.group.attribute.labels. Imposta sempre su "status" quando compili target.user.attribute.labels. |
Hai bisogno di ulteriore assistenza? Ricevi risposte dai membri della community e dai professionisti di Google SecOps.