Recopila registros de administrador de Duo
En este documento, se explica cómo ingerir registros de administrador de Duo en Google Security Operations con Amazon S3. El analizador extrae campos de los registros (formato JSON) y los asigna al Modelo de datos unificado (UDM). Maneja de forma diferente varios tipos de Duo action (acceso, administración de usuarios, administración de grupos) y propaga los campos de UDM pertinentes según la acción y los datos disponibles, incluidos los detalles del usuario, los factores de autenticación y los resultados de seguridad. También realiza transformaciones de datos, como la combinación de direcciones IP, la conversión de marcas de tiempo y el manejo de errores.
Antes de comenzar
- Instancia de Google SecOps
- Acceso privilegiado al arrendatario de Duo (aplicación de la API de Admin)
- Acceso privilegiado a AWS (S3, IAM, Lambda, EventBridge)
Configura la aplicación de la API de Administrador de Duo
- Accede al Panel de administración de Duo.
- Ve a Aplicaciones > Catálogo de aplicaciones.
- Agrega la aplicación de la API de Administrador.
- Registra los siguientes valores:
- Clave de integración (ikey)
- Clave secreta (skey)
- Nombre de host de la API (por ejemplo,
api-XXXXXXXX.duosecurity.com)
- En Permisos, habilita Otorgar registro de lectura (para leer los registros de administrador).
- Guarda la aplicación.
Configura el bucket de AWS S3 y IAM para Google SecOps
- Crea un bucket de Amazon S3 siguiendo esta guía del usuario: Crea un bucket
- Guarda el Nombre y la Región del bucket para consultarlos en el futuro (por ejemplo,
duo-admin-logs). - Crea un usuario siguiendo esta guía del usuario: Crea un usuario de IAM.
- Selecciona el Usuario creado.
- Selecciona la pestaña Credenciales de seguridad.
- Haz clic en Crear clave de acceso en la sección Claves de acceso.
- Selecciona Servicio de terceros como el Caso de uso.
- Haz clic en Siguiente.
- Opcional: Agrega una etiqueta de descripción.
- Haz clic en Crear clave de acceso.
- Haz clic en Descargar archivo CSV para guardar la Clave de acceso y la Clave de acceso secreta para usarlas más adelante.
- Haz clic en Listo.
- Selecciona la pestaña Permisos.
- Haz clic en Agregar permisos en la sección Políticas de permisos.
- Selecciona Agregar permisos.
- Selecciona Adjuntar políticas directamente.
- Busca y selecciona la política AmazonS3FullAccess.
- Haz clic en Siguiente.
- Haz clic en Agregar permisos.
Configura la política y el rol de IAM para las cargas de S3
- Ve a Consola de AWS > IAM > Políticas > Crear política > pestaña JSON.
Ingresa la siguiente política:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutDuoAdminObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::duo-admin-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::duo-admin-logs/duo/admin/state.json" } ] }- Reemplaza
duo-admin-logssi ingresaste un nombre de bucket diferente:
- Reemplaza
Haz clic en Siguiente > Crear política.
Ve a IAM > Roles > Crear rol > Servicio de AWS > Lambda.
Adjunta la política recién creada.
Nombra el rol
WriteDuoAdminToS3Roley haz clic en Crear rol.
Crea la función Lambda
- En la consola de AWS, ve a Lambda > Funciones > Crear función.
- Haz clic en Crear desde cero.
Proporciona los siguientes detalles de configuración:
Configuración Valor Nombre duo_admin_to_s3Tiempo de ejecución Python 3.13 Arquitectura x86_64 Rol de ejecución WriteDuoAdminToS3RoleUna vez que se cree la función, abre la pestaña Código , borra el código auxiliar y, luego, ingresa el siguiente código (
duo_admin_to_s3.py):#!/usr/bin/env python3 # Lambda: Pull Duo Admin API v1 Administrator Logs to S3 (raw JSON pages) import os, json, time, hmac, hashlib, base64, email.utils, urllib.parse from urllib.request import Request, urlopen from urllib.error import HTTPError, URLError from datetime import datetime import boto3 DUO_IKEY = os.environ["DUO_IKEY"] DUO_SKEY = os.environ["DUO_SKEY"] DUO_API_HOSTNAME = os.environ["DUO_API_HOSTNAME"].strip() S3_BUCKET = os.environ["S3_BUCKET"] S3_PREFIX = os.environ.get("S3_PREFIX", "duo/admin/").strip("/") STATE_KEY = os.environ.get("STATE_KEY", "duo/admin/state.json") s3 = boto3.client("s3") def _canon_params(params: dict) -> str: parts = [] for k in sorted(params.keys()): v = params[k] if v is None: continue parts.append(f"{urllib.parse.quote(str(k), '~')}={urllib.parse.quote(str(v), '~')}") return "&".join(parts) def _sign(method: str, host: str, path: str, params: dict) -> dict: now = email.utils.formatdate() canon = "\n".join([now, method.upper(), host.lower(), path, _canon_params(params)]) sig = hmac.new(DUO_SKEY.encode("utf-8"), canon.encode("utf-8"), hashlib.sha1).hexdigest() auth = base64.b64encode(f"{DUO_IKEY}:{sig}".encode()).decode() return {"Date": now, "Authorization": f"Basic {auth}"} def _http(method: str, path: str, params: dict, timeout: int = 60, max_retries: int = 5) -> dict: host = DUO_API_HOSTNAME assert host.startswith("api-") and host.endswith(".duosecurity.com"), \ "DUO_API_HOSTNAME must be like api-XXXXXXXX.duosecurity.com" qs = _canon_params(params) url = f"https://{host}{path}" + (f"?{qs}" if qs else "") attempt, backoff = 0, 1.0 while True: req = Request(url, method=method.upper()) hdrs = _sign(method, host, path, params) req.add_header("Accept", "application/json") for k, v in hdrs.items(): req.add_header(k, v) try: with urlopen(req, timeout=timeout) as r: return json.loads(r.read().decode("utf-8")) except HTTPError as e: # 429 or 5xx → exponential backoff if (e.code == 429 or 500 <= e.code <= 599) and attempt < max_retries: time.sleep(backoff) attempt += 1 backoff *= 2 continue raise except URLError: if attempt < max_retries: time.sleep(backoff) attempt += 1 backoff *= 2 continue raise def _read_state() -> int | None: try: obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY) return int(json.loads(obj["Body"].read()).get("mintime")) except Exception: return None def _write_state(mintime: int): body = json.dumps({"mintime": mintime}).encode("utf-8") s3.put_object(Bucket=S3_BUCKET, Key=STATE_KEY, Body=body, ContentType="application/json") def _epoch_from_item(item: dict) -> int | None: # Prefer numeric 'timestamp' (seconds); fallback to ISO8601 'ts' ts_num = item.get("timestamp") if isinstance(ts_num, (int, float)): return int(ts_num) ts_iso = item.get("ts") if isinstance(ts_iso, str): try: # Accept "...Z" or with offset return int(datetime.fromisoformat(ts_iso.replace("Z", "+00:00")).timestamp()) except Exception: return None return None def _write_page(payload: dict, when: int, page: int) -> str: key = f"{S3_PREFIX}/{time.strftime('%Y/%m/%d', time.gmtime(when))}/duo-admin-{page:05d}.json" s3.put_object( Bucket=S3_BUCKET, Key=key, Body=json.dumps(payload, separators=(",", ":")).encode("utf-8"), ContentType="application/json", ) return key def fetch_and_store(): now = int(time.time()) # Start from last checkpoint or now-3600 on first run mintime = _read_state() or (now - 3600) page = 0 total = 0 next_mintime = mintime max_seen_ts = mintime while True: data = _http("GET", "/admin/v1/logs/administrator", {"mintime": mintime}) _write_page(data, now, page) page += 1 # Extract items resp = data.get("response") items = resp if isinstance(resp, list) else (resp.get("items") if isinstance(resp, dict) else []) items = items or [] if not items: break total += len(items) # Track the newest timestamp in this batch for it in items: ts = _epoch_from_item(it) if ts and ts > max_seen_ts: max_seen_ts = ts # Duo returns only the 1000 earliest events; page by advancing mintime if len(items) >= 1000 and max_seen_ts >= mintime: mintime = max_seen_ts next_mintime = max_seen_ts continue else: break # Save checkpoint: newest seen ts, or "now" if nothing new if max_seen_ts > next_mintime: _write_state(max_seen_ts) next_state = max_seen_ts else: _write_state(now) next_state = now return {"ok": True, "pages": page, "events": total, "next_mintime": next_state} def lambda_handler(event=None, context=None): return fetch_and_store() if __name__ == "__main__": print(lambda_handler())Ve a Configuración > Variables de entorno > Editar > Agregar variable de entorno nueva.
Ingresa las siguientes variables de entorno y reemplaza por tus valores.
Clave Ejemplo S3_BUCKETduo-admin-logsS3_PREFIXduo/admin/STATE_KEYduo/admin/state.jsonDUO_IKEYDIXYZ...DUO_SKEY****************DUO_API_HOSTNAMEapi-XXXXXXXX.duosecurity.comUna vez que se cree la función, permanece en su página (o abre Lambda > Funciones > tu-función).
Selecciona la pestaña Configuración.
En el panel Configuración general , haz clic en Editar.
Cambia Tiempo de espera a 5 minutos (300 segundos) y haz clic en Guardar.
Crea una programación de EventBridge
- Ve a Amazon EventBridge > Scheduler > Create schedule.
- Proporciona los siguientes detalles de configuración:
- Recurring schedule: Rate (
1 hour). - Target: tu función Lambda.
- Name:
duo-admin-1h.
- Recurring schedule: Rate (
- Haz clic en Crear programación.
Opcional: Crea un usuario y claves de IAM de solo lectura para Google SecOps
- En la consola de AWS, ve a IAM > Usuarios y, luego, haz clic en Agregar usuarios.
- Proporciona los siguientes detalles de configuración:
- Usuario: Ingresa un nombre único (por ejemplo,
secops-reader). - Tipo de acceso: Selecciona Clave de acceso: Acceso programático
- Haz clic en Crear usuario.
- Usuario: Ingresa un nombre único (por ejemplo,
- Adjunta una política de lectura mínima (personalizada): Usuarios > selecciona
secops-reader> Permisos > Agregar permisos > Adjuntar políticas directamente > Crear política En el editor de JSON, ingresa la siguiente política:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::<your-bucket>/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::<your-bucket>" } ] }Configura el nombre como
secops-reader-policy.Ve a Crear política > buscar/seleccionar > Siguiente > Agregar permisos.
Ve a Credenciales de seguridad > Claves de acceso > Crear clave de acceso.
Descarga el CSV (estos valores se ingresan en el feed).
Configura un feed en Google SecOps para ingerir registros de administrador de Duo
- Ve a Configuración de SIEM > Feeds.
- Haz clic en + Agregar feed nuevo.
- En el campo Nombre del feed, ingresa un nombre para el feed (por ejemplo,
Duo Administrator Logs). - Selecciona Amazon S3 V2 como el Tipo de fuente.
- Selecciona Registros de administrador de Duo como el Tipo de registro.
- Haz clic en Siguiente.
- Especifica valores para los siguientes parámetros de entrada:
- URI de S3:
s3://duo-admin-logs/duo/admin/ - Opciones de eliminación de fuentes: Selecciona la opción de eliminación según tu preferencia.
- Antigüedad máxima del archivo: 180 días predeterminados.
- ID de clave de acceso: Clave de acceso del usuario con acceso al bucket de S3.
- Clave de acceso secreta: Clave secreta del usuario con acceso al bucket de S3.
- Espacio de nombres del recurso: el espacio de nombres del recurso.
- Etiquetas de ingesta: La etiqueta aplicada a los eventos de este feed.
- URI de S3:
- Haz clic en Siguiente.
- Revisa la configuración de tu nuevo feed en la pantalla Finalizar y, luego, haz clic en Enviar.
Tabla de asignación de UDM
| Campo de registro | Asignación de UDM | Lógica |
|---|---|---|
action |
metadata.product_event_type |
El valor del campo action del registro sin procesar. |
desc |
metadata.description |
El valor del campo desc del objeto description del registro sin procesar. |
description._status |
target.group.attribute.labels.value |
El valor del campo _status dentro del objeto description del registro sin procesar, específicamente cuando se procesan acciones relacionadas con el grupo. Este valor se coloca dentro de un array "labels" con una "clave" correspondiente de "status". |
description.desc |
metadata.description |
El valor del campo desc del objeto description del registro sin procesar. |
description.email |
target.user.email_addresses |
El valor del campo email del objeto description del registro sin procesar. |
description.error |
security_result.summary |
El valor del campo error del objeto description del registro sin procesar. |
description.factor |
extensions.auth.auth_details |
El valor del campo factor del objeto description del registro sin procesar. |
description.groups.0._status |
target.group.attribute.labels.value |
El valor del campo _status del primer elemento del array groups dentro del objeto description del registro sin procesar. Este valor se coloca dentro de un array "labels" con una "clave" correspondiente de "status". |
description.groups.0.name |
target.group.group_display_name |
El valor del campo name del primer elemento del array groups dentro del objeto description del registro sin procesar. |
description.ip_address |
principal.ip |
El valor del campo ip_address del objeto description del registro sin procesar. |
description.name |
target.group.group_display_name |
El valor del campo name del objeto description del registro sin procesar. |
description.realname |
target.user.user_display_name |
El valor del campo realname del objeto description del registro sin procesar. |
description.status |
target.user.attribute.labels.value |
El valor del campo status del objeto description del registro sin procesar. Este valor se coloca dentro de un array "labels" con una "clave" correspondiente de "status". |
description.uname |
target.user.email_addresses o target.user.userid |
El valor del campo uname del objeto description del registro sin procesar. Si coincide con un formato de dirección de correo electrónico, se asigna a email_addresses; de lo contrario, se asigna a userid. |
host |
principal.hostname |
El valor del campo host del registro sin procesar. |
isotimestamp |
metadata.event_timestamp.seconds |
El valor del campo isotimestamp del registro sin procesar, convertido a segundos de época. |
object |
target.group.group_display_name |
El valor del campo object del registro sin procesar. |
timestamp |
metadata.event_timestamp.seconds |
El valor del campo timestamp del registro sin procesar. |
username |
target.user.userid o principal.user.userid |
Si el campo action contiene "login", el valor se asigna a target.user.userid. De lo contrario, se asigna a principal.user.userid. Se establece en "USERNAME_PASSWORD" si el action campo contiene "login". El analizador lo determina según el campo action. Valores posibles: USER_LOGIN, GROUP_CREATION, USER_UNCATEGORIZED, GROUP_DELETION, USER_CREATION, GROUP_MODIFICATION, GENERIC_EVENT. Siempre se establece en "DUO_ADMIN". Siempre se establece en "MULTI-FACTOR_AUTHENTICATION". Siempre se establece en "DUO_SECURITY". Se establece en "ADMINISTRATOR" si el eventtype campo contiene "admin". El analizador lo determina según el campo action. Se establece en "BLOCK" si el campo action contiene "error"; de lo contrario, se establece en "ALLOW". Siempre se establece en "status" cuando se propaga target.group.attribute.labels. Siempre se establece en "status" cuando se propaga target.user.attribute.labels. |
¿Necesitas más ayuda? Obtén respuestas de miembros de la comunidad y profesionales de Google SecOps.