Coletar registros de autenticação do Duo
Este documento explica como ingerir registros de autenticação do Duo no Google Security Operations. O analisador extrai os registros de mensagens formatadas em JSON. Ele transforma os dados de registro brutos no Modelo de dados unificado (UDM), mapeando campos como usuário, dispositivo, aplicativo, local e detalhes de autenticação, além de processar vários fatores e resultados de autenticação para categorizar eventos de segurança. O analisador também realiza limpeza de dados, conversão de tipos e tratamento de erros para garantir a qualidade e a consistência dos dados.
Escolha entre dois métodos de coleta:
- Opção 1: ingestão direta usando a API de terceiros
- Opção 2: coletar registros usando o AWS Lambda e o Amazon S3
Antes de começar
- Instância do Google SecOps
- Acesso privilegiado ao painel de administração do Duo (é necessário ter a função de proprietário para criar aplicativos da API Admin)
- Acesso privilegiado à AWS se você usar a opção 2
Opção 1: ingerir registros de autenticação do Duo usando a API de terceiros
Coletar os pré-requisitos do Duo (credenciais da API)
- Faça login no painel de administração do Duo como administrador com a função de Proprietário, Administrador ou Gerente de aplicativos.
- Acesse Aplicativos > Catálogo de aplicativos.
- Localize a entrada da API Admin no catálogo.
- Clique em + Adicionar para criar o aplicativo.
- Copie e salve em um local seguro os seguintes detalhes:
- Chave de integração
- Chave secreta
- Nome do host da API (por exemplo,
api-XXXXXXXX.duosecurity.com)
- Acesse a seção Permissões.
- Desmarque todas as opções de permissão, exceto Conceder leitura de registros.
- Clique em Salvar alterações.
Configurar um feed no Google SecOps para ingerir registros de autenticação do Duo
- Acesse Configurações do SIEM > Feeds.
- Clique em + Adicionar novo feed.
- No campo Nome do feed, insira um nome para o feed (por exemplo,
Duo Authentication Logs). - Selecione API de terceiros como o Tipo de fonte.
- Selecione Autenticação do Duo como o Tipo de registro.
- Clique em Próxima.
- Especifique valores para os seguintes parâmetros de entrada:
- Nome de usuário: insira a Chave de integração do Duo.
- Secret: insira a Chave secreta do Duo.
- Nome do host da API: insira o nome do host da API (por exemplo,
api-XXXXXXXX.duosecurity.com). - Namespace do recurso: opcional. O namespace do recurso.
- Rótulos de ingestão: opcional. O rótulo a ser aplicado aos eventos deste feed.
- Clique em Próxima.
- Revise a nova configuração do feed na tela Finalizar e clique em Enviar.
Opção 2: ingerir registros de autenticação do Duo usando o AWS S3
Coletar credenciais da API Admin do Duo
- Faça login no painel de administração do Duo.
- Acesse Aplicativos > Proteger um aplicativo.
- Localize a API Admin no catálogo de aplicativos.
- Clique em Proteger para adicionar o aplicativo da API Admin.
- Copie e salve os seguintes valores:
- Chave de integração (ikey)
- Chave secreta (skey)
- Nome do host da API (por exemplo,
api-XXXXXXXX.duosecurity.com)
- Em Permissões, ative Conceder leitura de registros.
- Clique em Salvar alterações.
Configurar o bucket do AWS S3 e o IAM para o Google SecOps
- Crie um bucket do Amazon S3 seguindo este guia do usuário: Como criar um bucket.
- Salve o Nome e a Região do bucket para referência futura (por exemplo,
duo-auth-logs). - Crie um usuário seguindo este guia: Como criar um usuário do IAM.
- Selecione o usuário criado.
- Selecione a guia Credenciais de segurança.
- Clique em Criar chave de acesso na seção Chaves de acesso.
- Selecione Serviço de terceiros como Caso de uso.
- Clique em Próxima.
- Opcional: adicione uma tag de descrição.
- Clique em Criar chave de acesso.
- Clique em Fazer o download do arquivo CSV para salvar a chave de acesso e a chave de acesso secreta para referência futura.
- Clique em Concluído.
- Selecione a guia Permissões.
- Clique em Adicionar permissões na seção Políticas de permissões.
- Selecione Adicionar permissões.
- Selecione Anexar políticas diretamente.
- Pesquise e selecione a política AmazonS3FullAccess.
- Clique em Próxima.
- Clique em Adicionar permissões
Configurar a política e o papel do IAM para uploads do S3
- No console da AWS, acesse IAM > Políticas > Criar política > guia JSON.
Insira a seguinte política:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutDuoAuthObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::duo-auth-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::duo-auth-logs/duo/auth/state.json" } ] }- Substitua
duo-auth-logsse você tiver inserido um nome de bucket diferente.
- Substitua
Clique em Próxima > Criar política.
Acesse IAM > Funções > Criar função > Serviço da AWS > Lambda.
Anexe a política recém-criada.
Nomeie a função como
WriteDuoAuthToS3Rolee clique em Criar função.
Criar a função Lambda
- No console da AWS, acesse Lambda > Functions.
- Clique em Criar função > Criar do zero.
Informe os seguintes detalhes de configuração:
Configuração Valor Nome duo_auth_to_s3Ambiente de execução Python 3.13 Arquitetura x86_64 Função de execução WriteDuoAuthToS3RoleDepois que a função for criada, abra a guia Código, exclua o stub e insira o seguinte código (
duo_auth_to_s3.py):#!/usr/bin/env python3 # Lambda: Pull Duo Admin API v2 Authentication Logs to S3 (raw JSON pages) # Notes: # - Duo v2 requires mintime/maxtime in *milliseconds* (13-digit epoch). # - Pagination via metadata.next_offset ("<millis>,<txid>"). # - We save state (mintime_ms) in ms to resume next run without gaps. import os, json, time, hmac, hashlib, base64, email.utils, urllib.parse from urllib.request import Request, urlopen from urllib.error import HTTPError, URLError import boto3 DUO_IKEY = os.environ["DUO_IKEY"] DUO_SKEY = os.environ["DUO_SKEY"] DUO_API_HOSTNAME = os.environ["DUO_API_HOSTNAME"].strip() S3_BUCKET = os.environ["S3_BUCKET"] S3_PREFIX = os.environ.get("S3_PREFIX", "duo/auth/").strip("/") STATE_KEY = os.environ.get("STATE_KEY", "duo/auth/state.json") LIMIT = min(int(os.environ.get("LIMIT", "500")), 1000) # default 100, max 1000 s3 = boto3.client("s3") def _canon_params(params: dict) -> str: parts = [] for k in sorted(params.keys()): v = params[k] if v is None: continue parts.append(f"{urllib.parse.quote(str(k), '~')}={urllib.parse.quote(str(v), '~')}") return "&".join(parts) def _sign(method: str, host: str, path: str, params: dict) -> dict: now = email.utils.formatdate() canon = "\n".join([now, method.upper(), host.lower(), path, _canon_params(params)]) sig = hmac.new(DUO_SKEY.encode("utf-8"), canon.encode("utf-8"), hashlib.sha1).hexdigest() auth = base64.b64encode(f"{DUO_IKEY}:{sig}".encode()).decode() return {"Date": now, "Authorization": f"Basic {auth}"} def _http(method: str, path: str, params: dict, timeout: int = 60, max_retries: int = 5) -> dict: host = DUO_API_HOSTNAME assert host.startswith("api-") and host.endswith(".duosecurity.com"), \ "DUO_API_HOSTNAME must be like api-XXXXXXXX.duosecurity.com" qs = _canon_params(params) url = f"https://{host}{path}" + (f"?{qs}" if qs else "") attempt, backoff = 0, 1.0 while True: req = Request(url, method=method.upper()) req.add_header("Accept", "application/json") for k, v in _sign(method, host, path, params).items(): req.add_header(k, v) try: with urlopen(req, timeout=timeout) as r: return json.loads(r.read().decode("utf-8")) except HTTPError as e: if (e.code == 429 or 500 <= e.code <= 599) and attempt < max_retries: time.sleep(backoff); attempt += 1; backoff *= 2; continue raise except URLError: if attempt < max_retries: time.sleep(backoff); attempt += 1; backoff *= 2; continue raise def _read_state_ms() -> int | None: try: obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY) val = json.loads(obj["Body"].read()).get("mintime") if val is None: return None # Backward safety: if seconds were stored, convert to ms return int(val) * 1000 if len(str(int(val))) <= 10 else int(val) except Exception: return None def _write_state_ms(mintime_ms: int): body = json.dumps({"mintime": int(mintime_ms)}).encode("utf-8") s3.put_object(Bucket=S3_BUCKET, Key=STATE_KEY, Body=body, ContentType="application/json") def _write_page(payload: dict, when_epoch_s: int, page: int) -> str: key = f"{S3_PREFIX}/{time.strftime('%Y/%m/%d', time.gmtime(when_epoch_s))}/duo-auth-{page:05d}.json" s3.put_object( Bucket=S3_BUCKET, Key=key, Body=json.dumps(payload, separators=(",", ":")).encode("utf-8"), ContentType="application/json", ) return key def fetch_and_store(): now_s = int(time.time()) # Duo recommends a ~2-minute delay buffer; use maxtime = now - 120 seconds (in ms) maxtime_ms = (now_s - 120) * 1000 mintime_ms = _read_state_ms() or (maxtime_ms - 3600 * 1000) # 1 hour on first run page = 0 total = 0 next_offset = None while True: params = {"mintime": mintime_ms, "maxtime": maxtime_ms, "limit": LIMIT} if next_offset: params["next_offset"] = next_offset data = _http("GET", "/admin/v2/logs/authentication", params) _write_page(data, maxtime_ms // 1000, page) page += 1 resp = data.get("response") items = resp if isinstance(resp, list) else [] total += len(items) meta = data.get("metadata") or {} next_offset = meta.get("next_offset") if not next_offset: break # Advance window to maxtime_ms for next run _write_state_ms(maxtime_ms) return {"ok": True, "pages": page, "events": total, "next_mintime_ms": maxtime_ms} def lambda_handler(event=None, context=None): return fetch_and_store() if __name__ == "__main__": print(lambda_handler())Acesse Configuração > Variáveis de ambiente.
Clique em Editar > Adicionar nova variável de ambiente.
Insira as seguintes variáveis de ambiente, substituindo pelos seus valores.
Chave Valor de exemplo S3_BUCKETduo-auth-logsS3_PREFIXduo/auth/STATE_KEYduo/auth/state.jsonDUO_IKEYDIXYZ...DUO_SKEY****************DUO_API_HOSTNAMEapi-XXXXXXXX.duosecurity.comLIMIT500Depois que a função for criada, permaneça na página dela ou abra Lambda > Functions > sua-função.
Selecione a guia Configuração.
No painel Configuração geral, clique em Editar.
Mude Tempo limite para 5 minutos (300 segundos) e clique em Salvar.
Criar uma programação do EventBridge
- Acesse Amazon EventBridge > Scheduler > Criar programação.
- Informe os seguintes detalhes de configuração:
- Programação recorrente: Taxa (
1 hour). - Destino: sua função Lambda
duo_auth_to_s3. - Nome:
duo-auth-1h.
- Programação recorrente: Taxa (
- Clique em Criar programação.
Criar um usuário e chaves do IAM somente leitura para o Google SecOps
- No console da AWS, acesse IAM > Usuários > Adicionar usuários.
- Clique em Add users.
- Informe os seguintes detalhes de configuração:
- Usuário:
secops-reader - Tipo de acesso: Chave de acesso — Acesso programático
- Usuário:
- Clique em Criar usuário.
- Anexe a política de leitura mínima (personalizada): Usuários > secops-reader > Permissões > Adicionar permissões > Anexar políticas diretamente > Criar política.
No editor JSON, insira a seguinte política:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::duo-auth-logs/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::duo-auth-logs" } ] }Defina o nome como
secops-reader-policy.Acesse Criar política > pesquise/selecione > Próxima > Adicionar permissões.
Acesse Credenciais de segurança > Chaves de acesso > Criar chave de acesso.
Faça o download do CSV (esses valores são inseridos no feed).
Configurar um feed no Google SecOps para ingerir registros de autenticação do Duo
- Acesse Configurações do SIEM > Feeds.
- Clique em + Adicionar novo feed.
- No campo Nome do feed, insira um nome para o feed (por exemplo,
Duo Authentication Logs). - Selecione Amazon S3 V2 como o Tipo de origem.
- Selecione Autenticação do Duo como o Tipo de registro.
- Clique em Próxima.
- Especifique valores para os seguintes parâmetros de entrada:
- URI do S3:
s3://duo-auth-logs/duo/auth/ - Opções de exclusão de fontes: selecione a opção de exclusão de acordo com sua preferência.
- Idade máxima do arquivo: inclui arquivos modificados no último número de dias. O padrão é de 180 dias.
- ID da chave de acesso: chave de acesso do usuário com acesso ao bucket do S3.
- Chave de acesso secreta: chave secreta do usuário com acesso ao bucket do S3.
- Namespace do recurso: o namespace do recurso.
- Rótulos de ingestão: o rótulo aplicado aos eventos deste feed.
- URI do S3:
- Clique em Próxima.
- Revise a nova configuração do feed na tela Finalizar e clique em Enviar.
Tabela de mapeamento do UDM
| Campo de registro | Mapeamento do UDM | Lógica |
|---|---|---|
access_device.browser |
target.resource.attribute.labels.value |
Se access_device.browser estiver presente, o valor dele será mapeado para a UDM. |
access_device.hostname |
principal.hostname |
Se access_device.hostname estiver presente e não estiver vazio, o valor será mapeado para a UDM. Se estiver vazio e o event_type for USER_CREATION, o event_type será alterado para USER_UNCATEGORIZED. Se access_device.hostname estiver vazio e o campo hostname existir, o valor de hostname será usado. |
access_device.ip |
principal.ip |
Se access_device.ip existir e for um endereço IPv4 válido, o valor dele será mapeado para a UDM. Se não for um endereço IPv4 válido, ele será adicionado como um valor de string a additional.fields com a chave access_device.ip. |
access_device.location.city |
principal.location.city |
Se presente, o valor será mapeado para a UDM. |
access_device.location.country |
principal.location.country_or_region |
Se presente, o valor será mapeado para a UDM. |
access_device.location.state |
principal.location.state |
Se presente, o valor será mapeado para a UDM. |
access_device.os |
principal.platform |
Se presente, o valor será traduzido para o valor correspondente do UDM (MAC, WINDOWS, LINUX). |
access_device.os_version |
principal.platform_version |
Se presente, o valor será mapeado para a UDM. |
application.key |
target.resource.id |
Se presente, o valor será mapeado para a UDM. |
application.name |
target.application |
Se presente, o valor será mapeado para a UDM. |
auth_device.ip |
target.ip |
Se estiver presente e não for "None", o valor será mapeado para a UDM. |
auth_device.location.city |
target.location.city |
Se presente, o valor será mapeado para a UDM. |
auth_device.location.country |
target.location.country_or_region |
Se presente, o valor será mapeado para a UDM. |
auth_device.location.state |
target.location.state |
Se presente, o valor será mapeado para a UDM. |
auth_device.name |
target.hostname OU target.user.phone_numbers |
Se auth_device.name estiver presente e for um número de telefone (após a normalização), ele será adicionado a target.user.phone_numbers. Caso contrário, ele será mapeado para target.hostname. |
client_ip |
target.ip |
Se estiver presente e não for "None", o valor será mapeado para a UDM. |
client_section |
target.resource.attribute.labels.value |
Se client_section estiver presente, o valor dele será mapeado para a UDM com a chave client_section. |
dn |
target.user.userid |
Se dn estiver presente e user.name e username não estiverem, o userid será extraído do campo dn usando o grok e mapeado para a UDM. O event_type é definido como USER_LOGIN. |
event_type |
metadata.product_event_type E metadata.event_type |
O valor é mapeado para metadata.product_event_type. Ele também é usado para determinar o metadata.event_type: "authentication" se torna USER_LOGIN, "enrollment" se torna USER_CREATION e, se estiver vazio ou nenhum dos dois, se torna GENERIC_EVENT. |
factor |
extensions.auth.mechanism E extensions.auth.auth_details |
O valor é traduzido para o valor correspondente do UDM auth.mechanism (HARDWARE_KEY, REMOTE_INTERACTIVE, LOCAL, OTP). O valor original também é mapeado para extensions.auth.auth_details. |
hostname |
principal.hostname |
Se estiver presente e access_device.hostname estiver vazio, o valor será mapeado para a UDM. |
log_format |
target.resource.attribute.labels.value |
Se log_format estiver presente, o valor dele será mapeado para a UDM com a chave log_format. |
log_level.__class_uuid__ |
target.resource.attribute.labels.value |
Se log_level.__class_uuid__ estiver presente, o valor dele será mapeado para a UDM com a chave __class_uuid__. |
log_level.name |
target.resource.attribute.labels.value E security_result.severity |
Se log_level.name estiver presente, o valor dele será mapeado para a UDM com a chave name. Se o valor for "info", security_result.severity será definido como INFORMATIONAL. |
log_logger.unpersistable |
target.resource.attribute.labels.value |
Se log_logger.unpersistable estiver presente, o valor dele será mapeado para a UDM com a chave unpersistable. |
log_namespace |
target.resource.attribute.labels.value |
Se log_namespace estiver presente, o valor dele será mapeado para a UDM com a chave log_namespace. |
log_source |
target.resource.attribute.labels.value |
Se log_source estiver presente, o valor dele será mapeado para a UDM com a chave log_source. |
msg |
security_result.summary |
Se estiver presente e reason estiver vazio, o valor será mapeado para a UDM. |
reason |
security_result.summary |
Se presente, o valor será mapeado para a UDM. |
result |
security_result.action_details E security_result.action |
Se presente, o valor será mapeado para security_result.action_details. "success" ou "SUCCESS" significa security_result.action ALLOW. Caso contrário, BLOCK. |
server_section |
target.resource.attribute.labels.value |
Se server_section estiver presente, o valor dele será mapeado para a UDM com a chave server_section. |
server_section_ikey |
target.resource.attribute.labels.value |
Se server_section_ikey estiver presente, o valor dele será mapeado para a UDM com a chave server_section_ikey. |
status |
security_result.action_details E security_result.action |
Se presente, o valor será mapeado para security_result.action_details. "Permitir" significa security_result.action ALLOW, e "Rejeitar" significa BLOCK. |
timestamp |
metadata.event_timestamp E event.timestamp |
O valor é convertido em um carimbo de data/hora e mapeado para metadata.event_timestamp e event.timestamp. |
txid |
metadata.product_log_id E network.session_id |
O valor é mapeado para metadata.product_log_id e network.session_id. |
user.groups |
target.user.group_identifiers |
Todos os valores na matriz são adicionados a target.user.group_identifiers. |
user.key |
target.user.product_object_id |
Se presente, o valor será mapeado para a UDM. |
user.name |
target.user.userid |
Se presente, o valor será mapeado para a UDM. |
username |
target.user.userid |
Se estiver presente e user.name não estiver, o valor será mapeado para a UDM. O event_type é definido como USER_LOGIN. |
| (Lógica do analisador) | metadata.vendor_name |
Sempre definido como "DUO_SECURITY". |
| (Lógica do analisador) | metadata.product_name |
Sempre definido como "MULTI-FACTOR_AUTHENTICATION". |
| (Lógica do analisador) | metadata.log_type |
Extraído do campo log_type de nível superior do registro bruto. |
| (Lógica do analisador) | extensions.auth.type |
Sempre definido como "SSO". |
Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais do Google SecOps.