Collecter les journaux d'administrateur Duo
Ce document explique comment ingérer des journaux d'administrateur Duo dans Google Security Operations à l'aide d'Amazon S3. L'analyseur extrait les champs des journaux (au format JSON) et les mappe au modèle de données unifié (UDM). Il gère différemment les différents types de Duo action (connexion, gestion des utilisateurs, gestion des groupes), en remplissant les champs UDM pertinents en fonction de l'action et des données disponibles, y compris les informations sur l'utilisateur, les facteurs d'authentification et les résultats de sécurité. Il effectue également des transformations de données, telles que la fusion d'adresses IP, la conversion d'horodatages et la gestion des erreurs.
Avant de commencer
- Instance Google SecOps
- Accès privilégié au locataire Duo (application API Admin)
- Accès privilégié à AWS (S3, IAM, Lambda, EventBridge)
Configurer l'application API Admin Duo
- Connectez-vous au panneau d'administration Duo.
- Accédez à Applications > Catalogue d'applications.
- Ajoutez l'application API Admin.
- Enregistrez les valeurs suivantes :
- Clé d'intégration (ikey)
- Clé secrète (skey)
- Nom d'hôte de l'API (par exemple,
api-XXXXXXXX.duosecurity.com)
- Dans Autorisations, activez Accorder l'accès en lecture aux journaux (pour lire les journaux d'administrateur).
- Enregistrez l'application.
Configurer le bucket AWS S3 et IAM pour Google SecOps
- Créez un bucket Amazon S3 en suivant ce guide de l'utilisateur : Créer un bucket
- Enregistrez le nom et la région du bucket pour référence ultérieure (par exemple,
duo-admin-logs). - Créez un utilisateur en suivant ce guide de l'utilisateur : Créer un utilisateur IAM.
- Sélectionnez l'utilisateur créé.
- Sélectionnez l'onglet Identifiants de sécurité.
- Cliquez sur Créer une clé d'accès dans la section Clés d'accès.
- Sélectionnez Service tiers comme cas d'utilisation.
- Cliquez sur Suivant.
- Facultatif : ajoutez un tag de description.
- Cliquez sur Créer une clé d'accès.
- Cliquez sur Télécharger le fichier CSV pour enregistrer la clé d'accès et la clé d'accès secrète pour une utilisation ultérieure.
- Cliquez sur OK.
- Sélectionnez l'onglet Autorisations.
- Cliquez sur Ajouter des autorisations dans la section Règles d'autorisation.
- Sélectionnez Ajouter des autorisations.
- Sélectionnez Joindre directement des règles
- Recherchez et sélectionnez la règle AmazonS3FullAccess.
- Cliquez sur Suivant.
- Cliquez sur Ajouter des autorisations.
Configurer la règle et le rôle IAM pour les importations S3
- Accédez à Console AWS > IAM > Règles > Créer une règle > Onglet JSON.
Saisissez la règle suivante :
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutDuoAdminObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::duo-admin-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::duo-admin-logs/duo/admin/state.json" } ] }- Remplacez
duo-admin-logssi vous avez saisi un autre nom de bucket :
- Remplacez
Cliquez sur Suivant > Créer une règle.
Accédez à IAM > Rôles > Créer un rôle > Service AWS > Lambda.
Associez la règle que vous venez de créer.
Nommez le rôle
WriteDuoAdminToS3Role, puis cliquez sur Créer un rôle.
Créer la fonction Lambda
- Dans la console AWS, accédez à Lambda > Fonctions > Créer une fonction.
- Cliquez sur Créer à partir de zéro.
Fournissez les informations de configuration suivantes :
Paramètre Valeur Nom duo_admin_to_s3Durée d'exécution Python 3.13 Architecture x86_64 Rôle d'exécution WriteDuoAdminToS3RoleUne fois la fonction créée, ouvrez l'onglet Code, supprimez le stub et saisissez le code suivant (
duo_admin_to_s3.py) :#!/usr/bin/env python3 # Lambda: Pull Duo Admin API v1 Administrator Logs to S3 (raw JSON pages) import os, json, time, hmac, hashlib, base64, email.utils, urllib.parse from urllib.request import Request, urlopen from urllib.error import HTTPError, URLError from datetime import datetime import boto3 DUO_IKEY = os.environ["DUO_IKEY"] DUO_SKEY = os.environ["DUO_SKEY"] DUO_API_HOSTNAME = os.environ["DUO_API_HOSTNAME"].strip() S3_BUCKET = os.environ["S3_BUCKET"] S3_PREFIX = os.environ.get("S3_PREFIX", "duo/admin/").strip("/") STATE_KEY = os.environ.get("STATE_KEY", "duo/admin/state.json") s3 = boto3.client("s3") def _canon_params(params: dict) -> str: parts = [] for k in sorted(params.keys()): v = params[k] if v is None: continue parts.append(f"{urllib.parse.quote(str(k), '~')}={urllib.parse.quote(str(v), '~')}") return "&".join(parts) def _sign(method: str, host: str, path: str, params: dict) -> dict: now = email.utils.formatdate() canon = "\n".join([now, method.upper(), host.lower(), path, _canon_params(params)]) sig = hmac.new(DUO_SKEY.encode("utf-8"), canon.encode("utf-8"), hashlib.sha1).hexdigest() auth = base64.b64encode(f"{DUO_IKEY}:{sig}".encode()).decode() return {"Date": now, "Authorization": f"Basic {auth}"} def _http(method: str, path: str, params: dict, timeout: int = 60, max_retries: int = 5) -> dict: host = DUO_API_HOSTNAME assert host.startswith("api-") and host.endswith(".duosecurity.com"), \ "DUO_API_HOSTNAME must be like api-XXXXXXXX.duosecurity.com" qs = _canon_params(params) url = f"https://{host}{path}" + (f"?{qs}" if qs else "") attempt, backoff = 0, 1.0 while True: req = Request(url, method=method.upper()) hdrs = _sign(method, host, path, params) req.add_header("Accept", "application/json") for k, v in hdrs.items(): req.add_header(k, v) try: with urlopen(req, timeout=timeout) as r: return json.loads(r.read().decode("utf-8")) except HTTPError as e: # 429 or 5xx → exponential backoff if (e.code == 429 or 500 <= e.code <= 599) and attempt < max_retries: time.sleep(backoff) attempt += 1 backoff *= 2 continue raise except URLError: if attempt < max_retries: time.sleep(backoff) attempt += 1 backoff *= 2 continue raise def _read_state() -> int | None: try: obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY) return int(json.loads(obj["Body"].read()).get("mintime")) except Exception: return None def _write_state(mintime: int): body = json.dumps({"mintime": mintime}).encode("utf-8") s3.put_object(Bucket=S3_BUCKET, Key=STATE_KEY, Body=body, ContentType="application/json") def _epoch_from_item(item: dict) -> int | None: # Prefer numeric 'timestamp' (seconds); fallback to ISO8601 'ts' ts_num = item.get("timestamp") if isinstance(ts_num, (int, float)): return int(ts_num) ts_iso = item.get("ts") if isinstance(ts_iso, str): try: # Accept "...Z" or with offset return int(datetime.fromisoformat(ts_iso.replace("Z", "+00:00")).timestamp()) except Exception: return None return None def _write_page(payload: dict, when: int, page: int) -> str: key = f"{S3_PREFIX}/{time.strftime('%Y/%m/%d', time.gmtime(when))}/duo-admin-{page:05d}.json" s3.put_object( Bucket=S3_BUCKET, Key=key, Body=json.dumps(payload, separators=(",", ":")).encode("utf-8"), ContentType="application/json", ) return key def fetch_and_store(): now = int(time.time()) # Start from last checkpoint or now-3600 on first run mintime = _read_state() or (now - 3600) page = 0 total = 0 next_mintime = mintime max_seen_ts = mintime while True: data = _http("GET", "/admin/v1/logs/administrator", {"mintime": mintime}) _write_page(data, now, page) page += 1 # Extract items resp = data.get("response") items = resp if isinstance(resp, list) else (resp.get("items") if isinstance(resp, dict) else []) items = items or [] if not items: break total += len(items) # Track the newest timestamp in this batch for it in items: ts = _epoch_from_item(it) if ts and ts > max_seen_ts: max_seen_ts = ts # Duo returns only the 1000 earliest events; page by advancing mintime if len(items) >= 1000 and max_seen_ts >= mintime: mintime = max_seen_ts next_mintime = max_seen_ts continue else: break # Save checkpoint: newest seen ts, or "now" if nothing new if max_seen_ts > next_mintime: _write_state(max_seen_ts) next_state = max_seen_ts else: _write_state(now) next_state = now return {"ok": True, "pages": page, "events": total, "next_mintime": next_state} def lambda_handler(event=None, context=None): return fetch_and_store() if __name__ == "__main__": print(lambda_handler())Accédez à Configuration > Variables d'environnement > Modifier > Ajouter une variable d'environnement.
Saisissez les variables d'environnement suivantes en remplaçant par vos valeurs.
Clé Exemple S3_BUCKETduo-admin-logsS3_PREFIXduo/admin/STATE_KEYduo/admin/state.jsonDUO_IKEYDIXYZ...DUO_SKEY****************DUO_API_HOSTNAMEapi-XXXXXXXX.duosecurity.comUne fois la fonction créée, restez sur sa page (ou accédez à Lambda > Fonctions > votre-fonction).
Accédez à l'onglet Configuration.
Dans le panneau Configuration générale , cliquez sur Modifier.
Remplacez Délai d'inactivité par 5 minutes (300 secondes) , puis cliquez sur Enregistrer.
Créer une programmation EventBridge
- Accédez à Amazon EventBridge > Planificateur > Créer une programmation.
- Fournissez les informations de configuration suivantes :
- Programmation récurrente : Fréquence (
1 hour). - Cible : votre fonction Lambda.
- Nom :
duo-admin-1h.
- Programmation récurrente : Fréquence (
- Cliquez sur Créer la programmation.
Facultatif : Créer un utilisateur et des clés IAM en lecture seule pour Google SecOps
- Dans la console AWS, accédez à IAM > Utilisateurs, puis cliquez sur Ajouter des utilisateurs.
- Fournissez les informations de configuration suivantes :
- Utilisateur : saisissez un nom unique (par exemple,
secops-reader). - Type d'accès : sélectionnez Clé d'accès – Accès programmatique
- Cliquez sur Créer un utilisateur.
- Utilisateur : saisissez un nom unique (par exemple,
- Associez une règle de lecture minimale (personnalisée) : Utilisateurs > sélectionnez
secops-reader> Autorisations > Ajouter des autorisations > Joindre directement des règles > Créer une règle Dans l'éditeur JSON, saisissez la règle suivante :
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::<your-bucket>/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::<your-bucket>" } ] }Définissez le nom sur
secops-reader-policy.Accédez à Créer une règle > Rechercher/sélectionner > Suivant > Ajouter des autorisations.
Accédez à Identifiants de sécurité > Clés d’accès > Créer une clé d’accès.
Téléchargez le CSV (ces valeurs sont saisies dans le flux).
Configurer un flux dans Google SecOps pour ingérer les journaux d'administrateur Duo
- Accédez à Paramètres SIEM > Flux.
- Cliquez sur + Ajouter un flux.
- Dans le champ Nom du flux, saisissez un nom pour le flux (par exemple,
Duo Administrator Logs). - Sélectionnez Amazon S3 V2 comme type de source.
- Sélectionnez Journaux d'administrateur Duo comme type de journal.
- Cliquez sur Suivant.
- Spécifiez les valeurs des paramètres d'entrée suivants :
- URI S3 :
s3://duo-admin-logs/duo/admin/ - Options de suppression de la source : sélectionnez l'option de suppression de votre choix.
- Âge maximal du fichier : 180 jours par défaut.
- ID de clé d'accès : clé d'accès utilisateur avec accès au bucket S3.
- Clé d'accès secrète : clé secrète utilisateur avec accès au bucket S3.
- Espace de noms de l'élément : espace de noms de l'élément.
- Libellés d'ingestion : libellé appliqué aux événements de ce flux.
- URI S3 :
- Cliquez sur Suivant.
- Vérifiez la configuration de votre nouveau flux dans l'écran Finaliser, puis cliquez sur Envoyer.
Table de mappage UDM
| Champ du journal | Mappage UDM | Logique |
|---|---|---|
action |
metadata.product_event_type |
Valeur du champ action du journal brut. |
desc |
metadata.description |
Valeur du champ desc de l'objet description du journal brut. |
description._status |
target.group.attribute.labels.value |
Valeur du champ _status dans l'objet description du journal brut, en particulier lors du traitement des actions liées à un groupe. Cette valeur est placée dans un tableau "labels" avec une "clé" correspondante de "status". |
description.desc |
metadata.description |
Valeur du champ desc de l'objet description du journal brut. |
description.email |
target.user.email_addresses |
Valeur du champ email de l'objet description du journal brut. |
description.error |
security_result.summary |
Valeur du champ error de l'objet description du journal brut. |
description.factor |
extensions.auth.auth_details |
Valeur du champ factor de l'objet description du journal brut. |
description.groups.0._status |
target.group.attribute.labels.value |
Valeur du champ _status du premier élément du tableau groups dans l'objet description du journal brut. Cette valeur est placée dans un tableau "labels" avec une "clé" correspondante de "status". |
description.groups.0.name |
target.group.group_display_name |
Valeur du champ name du premier élément du tableau groups dans l'objet description du journal brut. |
description.ip_address |
principal.ip |
Valeur du champ ip_address de l'objet description du journal brut. |
description.name |
target.group.group_display_name |
Valeur du champ name de l'objet description du journal brut. |
description.realname |
target.user.user_display_name |
Valeur du champ realname de l'objet description du journal brut. |
description.status |
target.user.attribute.labels.value |
Valeur du champ status de l'objet description du journal brut. Cette valeur est placée dans un tableau "labels" avec une "clé" correspondante de "status". |
description.uname |
target.user.email_addresses ou target.user.userid |
Valeur du champ uname de l'objet description du journal brut. Si elle correspond à un format d'adresse e-mail, elle est mappée à email_addresses. Sinon, elle est mappée à userid. |
host |
principal.hostname |
Valeur du champ host du journal brut. |
isotimestamp |
metadata.event_timestamp.seconds |
Valeur du champ isotimestamp du journal brut, convertie en secondes depuis l'époque. |
object |
target.group.group_display_name |
Valeur du champ object du journal brut. |
timestamp |
metadata.event_timestamp.seconds |
Valeur du champ timestamp du journal brut. |
username |
target.user.userid ou principal.user.userid |
Si le champ action contient "login", la valeur est mappée à target.user.userid. Sinon, elle est mappée à principal.user.userid. Définissez la valeur sur "USERNAME_PASSWORD" si le action champ contient "login". Déterminé par l'analyseur en fonction du champ action. Valeurs possibles : USER_LOGIN, GROUP_CREATION, USER_UNCATEGORIZED, GROUP_DELETION, USER_CREATION, GROUP_MODIFICATION, GENERIC_EVENT. Toujours défini sur "DUO_ADMIN". Toujours défini sur "MULTI-FACTOR_AUTHENTICATION". Toujours défini sur "DUO_SECURITY". Définissez la valeur sur "ADMINISTRATOR" si le eventtype champ contient "admin". Déterminé par l'analyseur en fonction du champ action. Définissez la valeur sur "BLOCK" si le champ action contient "error". Sinon, définissez la valeur sur "ALLOW". Toujours défini sur "status" lors du remplissage de target.group.attribute.labels. Toujours défini sur "status" lors du remplissage de target.user.attribute.labels. |
Vous avez encore besoin d'aide ? Obtenez des réponses auprès des membres de la communauté et des professionnels Google SecOps.