Coletar registros de contexto da entidade do Duo
Este documento explica como ingerir dados de contexto de entidade do Duo no Google Security Operations usando o Google Cloud Storage. O analisador transforma os registros JSON em um modelo de dados unificado (UDM) extraindo primeiro os campos do JSON bruto e mapeando esses campos para atributos do UDM. Ele processa vários cenários de dados, incluindo informações de usuários e recursos, detalhes de software e rótulos de segurança, garantindo uma representação abrangente no esquema do UDM.
Antes de começar
Verifique se você atende os seguintes pré-requisitos:
- Uma instância do Google SecOps
- Acesso privilegiado ao locatário do Duo (aplicativo da API Admin com privilégios administrativos suficientes para gerenciar aplicativos)
- Um projeto do GCP com a API Cloud Storage ativada
- Permissões para criar e gerenciar buckets do GCS
- Permissões para gerenciar políticas do IAM em buckets do GCS
- Permissões para criar serviços do Cloud Run, tópicos do Pub/Sub e jobs do Cloud Scheduler
Configurar o aplicativo da API Admin do Duo
- Faça login no painel de administração do Duo.
- Acesse Aplicativos > Proteger um aplicativo.
- Pesquise API Admin e clique em Proteger.
- Anote os seguintes valores:
- Chave de integração (ikey)
- Chave secreta (skey)
- Nome do host da API (por exemplo,
api-XXXXXXXX.duosecurity.com)
- Em Permissões, ative Conceder recurso - leitura (para ler usuários, grupos, smartphones, endpoints, tokens e credenciais WebAuthn).
Clique em Salvar.
Criar um bucket do Google Cloud Storage
- Acesse o Console do Google Cloud.
- Selecione seu projeto ou crie um novo.
- No menu de navegação, acesse Cloud Storage > Buckets.
- Clique em Criar bucket.
Informe os seguintes detalhes de configuração:
Configuração Valor Nomeie seu bucket Insira um nome exclusivo globalmente, por exemplo, duo-context.Tipo de local Escolha com base nas suas necessidades (região, birregional, multirregional) Local Selecione o local (por exemplo, us-central1).Classe de armazenamento Padrão (recomendado para registros acessados com frequência) Controle de acesso Uniforme (recomendado) Ferramentas de proteção Opcional: ativar o controle de versões de objetos ou a política de retenção Clique em Criar.
Salve o nome e a região do bucket para referência futura.
Criar uma conta de serviço para a função do Cloud Run
A função do Cloud Run precisa de uma conta de serviço com permissões para gravar no bucket do GCS e ser invocada pelo Pub/Sub.
Criar conta de serviço
- No Console do GCP, acesse IAM e administrador > Contas de serviço.
- Clique em Criar conta de serviço.
- Informe os seguintes detalhes de configuração:
- Nome da conta de serviço: insira
duo-entity-context-sa. - Descrição da conta de serviço: insira
Service account for Cloud Run function to collect Duo entity context data.
- Nome da conta de serviço: insira
- Clique em Criar e continuar.
- Na seção Conceder acesso a essa conta de serviço ao projeto, adicione os seguintes papéis:
- Clique em Selecionar papel.
- Pesquise e selecione Administrador de objetos do Storage.
- Clique em + Adicionar outro papel.
- Pesquise e selecione Invocador do Cloud Run.
- Clique em + Adicionar outro papel.
- Pesquise e selecione Invocador do Cloud Functions.
- Clique em Continuar.
- Clique em Concluído.
Esses papéis são necessários para:
- Administrador de objetos do Storage: gravar registros em um bucket do GCS
- Invocador do Cloud Run: permite que o Pub/Sub invoque a função
- Invocador do Cloud Functions: permite a invocação de funções
Conceder permissões do IAM no bucket do GCS
Conceda permissões de gravação à conta de serviço no bucket do GCS:
- Acesse Cloud Storage > Buckets.
- Clique no nome do bucket.
- Acesse a guia Permissões.
- Clique em Conceder acesso.
- Informe os seguintes detalhes de configuração:
- Adicionar principais: insira o e-mail da conta de serviço (por exemplo,
duo-entity-context-sa@PROJECT_ID.iam.gserviceaccount.com). - Atribuir papéis: selecione Administrador de objetos do Storage.
- Adicionar principais: insira o e-mail da conta de serviço (por exemplo,
- Clique em Salvar.
Criar tópico Pub/Sub
Crie um tópico do Pub/Sub em que o Cloud Scheduler vai publicar e a função do Cloud Run vai se inscrever.
- No Console do GCP, acesse Pub/Sub > Tópicos.
- Selecione Criar tópico.
- Informe os seguintes detalhes de configuração:
- ID do tópico: insira
duo-entity-context-trigger. - Não altere as outras configurações.
- ID do tópico: insira
- Clique em Criar.
Criar uma função do Cloud Run para coletar dados de contexto de entidade
A função do Cloud Run é acionada por mensagens do Pub/Sub do Cloud Scheduler para buscar dados de contexto de entidade da API Duo Admin e gravá-los no GCS.
- No console do GCP, acesse o Cloud Run.
- Clique em Criar serviço.
- Selecione Função (use um editor in-line para criar uma função).
Na seção Configurar, forneça os seguintes detalhes de configuração:
Configuração Valor Nome do serviço duo-entity-context-collectorRegião Selecione a região que corresponde ao seu bucket do GCS (por exemplo, us-central1).Ambiente de execução Selecione Python 3.12 ou uma versão mais recente. Na seção Acionador (opcional):
- Clique em + Adicionar gatilho.
- Selecione Cloud Pub/Sub.
- Em Selecionar um tópico do Cloud Pub/Sub, escolha o tópico do Pub/Sub (
duo-entity-context-trigger). - Clique em Salvar.
Na seção Autenticação:
- Selecione Exigir autenticação.
- Confira o Identity and Access Management (IAM).
Role a tela para baixo e abra Contêineres, rede, segurança.
Acesse a guia Segurança:
- Conta de serviço: selecione a conta de serviço (
duo-entity-context-sa).
- Conta de serviço: selecione a conta de serviço (
Acesse a guia Contêineres:
- Clique em Variáveis e secrets.
- Clique em + Adicionar variável para cada variável de ambiente:
Nome da variável Valor de exemplo GCS_BUCKETduo-contextGCS_PREFIXduo/context/DUO_IKEYDIXYZ...DUO_SKEY****************DUO_API_HOSTNAMEapi-XXXXXXXX.duosecurity.comLIMIT100RESOURCESusers,groups,phones,endpoints,tokens,webauthncredentialsNa seção Variáveis e secrets, role a tela até Solicitações:
- Tempo limite da solicitação: insira
600segundos (10 minutos).
- Tempo limite da solicitação: insira
Acesse a guia Configurações em Contêineres:
- Na seção Recursos:
- Memória: selecione 512 MiB ou mais.
- CPU: selecione 1.
- Clique em Concluído.
- Na seção Recursos:
Role até Ambiente de execução:
- Selecione Padrão (recomendado).
Na seção Escalonamento de revisão:
- Número mínimo de instâncias: insira
0. - Número máximo de instâncias: insira
100ou ajuste com base na carga esperada.
- Número mínimo de instâncias: insira
Clique em Criar.
Aguarde a criação do serviço (1 a 2 minutos).
Depois que o serviço é criado, o editor de código inline é aberto automaticamente.
Adicionar código da função
- Insira main em Ponto de entrada da função.
No editor de código em linha, crie dois arquivos:
- Primeiro arquivo: main.py::
import functions_framework from google.cloud import storage import json import os import time import hmac import hashlib import base64 import email.utils import urllib.parse from urllib.request import Request, urlopen # Environment variables DUO_IKEY = os.environ["DUO_IKEY"] DUO_SKEY = os.environ["DUO_SKEY"] DUO_API_HOSTNAME = os.environ["DUO_API_HOSTNAME"].strip() GCS_BUCKET = os.environ["GCS_BUCKET"] GCS_PREFIX = os.environ.get("GCS_PREFIX", "duo/context/") # Default resources can be adjusted via ENV RESOURCES = [r.strip() for r in os.environ.get("RESOURCES", "users,groups,phones,endpoints,tokens,webauthncredentials,desktop_authenticators").split(",") if r.strip()] # Duo paging: default 100; max varies by endpoint LIMIT = int(os.environ.get("LIMIT", "100")) # Initialize Storage client storage_client = storage.Client() def _canon_params(params: dict) -> str: """RFC3986 encoding with '~' unescaped, keys sorted lexicographically.""" if not params: return "" parts = [] for k in sorted(params.keys()): v = params[k] if v is None: continue ks = urllib.parse.quote(str(k), safe="~") vs = urllib.parse.quote(str(v), safe="~") parts.append(f"{ks}={vs}") return "&".join(parts) def _sign(method: str, host: str, path: str, params: dict) -> dict: """Construct Duo Admin API Authorization + Date headers (HMAC-SHA1).""" now = email.utils.formatdate() canon = "\n".join([ now, method.upper(), host.lower(), path, _canon_params(params) ]) sig = hmac.new( DUO_SKEY.encode("utf-8"), canon.encode("utf-8"), hashlib.sha1 ).hexdigest() auth = base64.b64encode(f"{DUO_IKEY}:{sig}".encode("utf-8")).decode("utf-8") return { "Date": now, "Authorization": f"Basic {auth}" } def _call(method: str, path: str, params: dict) -> dict: host = DUO_API_HOSTNAME assert host.startswith("api-") and host.endswith(".duosecurity.com"), \ "DUO_API_HOSTNAME must be e.g. api-XXXXXXXX.duosecurity.com" qs = _canon_params(params) url = f"https://{host}{path}" + (f"?{qs}" if method.upper() == "GET" and qs else "") req = Request(url, method=method.upper()) for k, v in _sign(method, host, path, params).items(): req.add_header(k, v) with urlopen(req, timeout=60) as r: return json.loads(r.read().decode("utf-8")) def _write_json(obj: dict, when: float, resource: str, page: int) -> str: bucket = storage_client.bucket(GCS_BUCKET) prefix = GCS_PREFIX.strip("/") + "/" if GCS_PREFIX else "" key = f"{prefix}{time.strftime('%Y/%m/%d', time.gmtime(when))}/duo-{resource}-{page:05d}.json" blob = bucket.blob(key) blob.upload_from_string( json.dumps(obj, separators=(",", ":")), content_type="application/json" ) return key def _fetch_resource(resource: str) -> dict: """Fetch all pages for a list endpoint using limit/offset + metadata.next_offset.""" path = f"/admin/v1/{resource}" offset = 0 page = 0 now = time.time() total_items = 0 while True: params = {"limit": LIMIT, "offset": offset} data = _call("GET", path, params) _write_json(data, now, resource, page) page += 1 resp = data.get("response") # most endpoints return a list; if not a list, count as 1 object page if isinstance(resp, list): total_items += len(resp) elif resp is not None: total_items += 1 meta = data.get("metadata") or {} next_offset = meta.get("next_offset") if next_offset is None: break # Duo returns next_offset as int try: offset = int(next_offset) except Exception: break return { "resource": resource, "pages": page, "objects": total_items } @functions_framework.cloud_event def main(cloud_event): """ Cloud Run function triggered by Pub/Sub to fetch Duo entity context data and write to GCS. Args: cloud_event: CloudEvent object containing Pub/Sub message """ results = [] for res in RESOURCES: print(f"Fetching resource: {res}") result = _fetch_resource(res) results.append(result) print(f"Completed {res}: {result['pages']} pages, {result['objects']} objects") print(f"All resources fetched successfully: {results}")- Segundo arquivo: requirements.txt:
functions-framework==3.* google-cloud-storage==2.*Clique em Implantar para salvar e implantar a função.
Aguarde a conclusão da implantação (2 a 3 minutos).
Criar o job do Cloud Scheduler
O Cloud Scheduler publica mensagens no tópico do Pub/Sub em intervalos regulares, acionando a função do Cloud Run.
- No Console do GCP, acesse o Cloud Scheduler.
- Clique em Criar job.
Informe os seguintes detalhes de configuração:
Configuração Valor Nome duo-entity-context-hourlyRegião Selecione a mesma região da função do Cloud Run Frequência 0 * * * *(a cada hora, na hora)Fuso horário Selecione o fuso horário (UTC recomendado) Tipo de destino Pub/Sub Tópico Selecione o tópico do Pub/Sub ( duo-entity-context-trigger).Corpo da mensagem {}(objeto JSON vazio)Clique em Criar.
Opções de frequência de programação
Escolha a frequência com base nos requisitos de atualização de dados:
Frequência Expressão Cron Caso de uso A cada hora 0 * * * *Padrão (recomendado) A cada 2 horas 0 */2 * * *Atualização moderada A cada 6 horas 0 */6 * * *Atualizações de baixa frequência Diário 0 0 * * *Atualizações mínimas
Testar o job do programador
- No console do Cloud Scheduler, encontre seu job (
duo-entity-context-hourly). - Clique em Forçar execução para acionar manualmente.
- Aguarde alguns segundos e acesse Cloud Run > Serviços > duo-entity-context-collector > Registros.
- Verifique se a função foi executada com sucesso.
- Verifique o bucket do GCS para confirmar se os dados de contexto da entidade foram gravados.
Recuperar a conta de serviço do Google SecOps
O Google SecOps usa uma conta de serviço exclusiva para ler dados do seu bucket do GCS. Você precisa conceder a essa conta de serviço acesso ao seu bucket.
Receber o e-mail da conta de serviço
- Acesse Configurações do SIEM > Feeds.
- Clique em Adicionar novo feed.
- Clique em Configurar um único feed.
- No campo Nome do feed, insira um nome para o feed (por exemplo,
Duo Entity Context). - Selecione Google Cloud Storage V2 como o Tipo de origem.
- Selecione Dados de contexto da entidade do Duo como o Tipo de registro.
Clique em Receber conta de serviço. Um e-mail exclusivo da conta de serviço é exibido, por exemplo:
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.comCopie esse endereço de e-mail para usar na próxima etapa.
Conceder permissões do IAM à conta de serviço do Google SecOps
A conta de serviço do Google SecOps precisa do papel de Leitor de objetos do Storage no seu bucket do GCS.
- Acesse Cloud Storage > Buckets.
- Clique no nome do bucket.
- Acesse a guia Permissões.
- Clique em Conceder acesso.
- Informe os seguintes detalhes de configuração:
- Adicionar participantes: cole o e-mail da conta de serviço do Google SecOps.
- Atribuir papéis: selecione Leitor de objetos do Storage.
Clique em Salvar.
Configurar um feed no Google SecOps para ingerir dados de contexto de entidade do Duo
- Acesse Configurações do SIEM > Feeds.
- Clique em Adicionar novo feed.
- Clique em Configurar um único feed.
- No campo Nome do feed, insira um nome para o feed (por exemplo,
Duo Entity Context). - Selecione Google Cloud Storage V2 como o Tipo de origem.
- Selecione Dados de contexto da entidade do Duo como o Tipo de registro.
- Clique em Próxima.
Especifique valores para os seguintes parâmetros de entrada:
URL do bucket de armazenamento: insira o URI do bucket do GCS com o caminho do prefixo:
gs://duo-context/duo/context/Substitua:
duo-context: o nome do bucket do GCS.duo/context/: prefixo/caminho da pasta em que os registros são armazenados (precisa corresponder à variável de ambienteGCS_PREFIX).
Opção de exclusão da fonte: selecione a opção de exclusão de acordo com sua preferência:
- Nunca: nunca exclui arquivos após as transferências (recomendado para testes).
- Excluir arquivos transferidos: exclui os arquivos após a transferência bem-sucedida.
Excluir arquivos transferidos e diretórios vazios: exclui arquivos e diretórios vazios após a transferência bem-sucedida.
Idade máxima do arquivo: inclui arquivos modificados no último número de dias. O padrão é de 180 dias.
Namespace do recurso: o namespace do recurso.
Rótulos de ingestão: o rótulo a ser aplicado aos eventos deste feed.
Clique em Próxima.
Revise a nova configuração do feed na tela Finalizar e clique em Enviar.
Tabela de mapeamento do UDM
| Campo de registro | Mapeamento do UDM | Lógica |
|---|---|---|
| Ativado | entity.asset.deployment_status | Se "activated" for falso, defina como "DECOMISSIONED". Caso contrário, defina como "ACTIVE". |
| browsers.browser_family | entity.asset.software.name | Extraído da matriz "browsers" no registro bruto. |
| browsers.browser_version | entity.asset.software.version | Extraído da matriz "browsers" no registro bruto. |
| device_name | entity.asset.hostname | Mapeado diretamente do registro bruto. |
| disk_encryption_status | entity.asset.attribute.labels.key: "disk_encryption_status", entity.asset.attribute.labels.value | Mapeado diretamente do registro bruto e convertido em letras minúsculas. |
| entity.user.email_addresses | Mapeado diretamente do registro bruto se ele contiver "@". Caso contrário, usa "username" ou "username1" se eles contiverem "@". | |
| criptografado | entity.asset.attribute.labels.key: "Encrypted", entity.asset.attribute.labels.value | Mapeado diretamente do registro bruto e convertido em letras minúsculas. |
| epkey | entity.asset.product_object_id | Usado como "product_object_id" se estiver presente. Caso contrário, usa "phone_id" ou "token_id". |
| impressão digital | entity.asset.attribute.labels.key: "Finger Print", entity.asset.attribute.labels.value | Mapeado diretamente do registro bruto e convertido em letras minúsculas. |
| firewall_status | entity.asset.attribute.labels.key: "firewall_status", entity.asset.attribute.labels.value | Mapeado diretamente do registro bruto e convertido em letras minúsculas. |
| hardware_uuid | entity.asset.asset_id | Usado como "asset_id" se estiver presente. Caso contrário, usa "user_id". |
| last_seen | entity.asset.last_discover_time | Analisado como um carimbo de data/hora ISO8601 e mapeado. |
| modelo | entity.asset.hardware.model | Mapeado diretamente do registro bruto. |
| número | entity.user.phone_numbers | Mapeado diretamente do registro bruto. |
| os_family | entity.asset.platform_software.platform | Mapeado para "WINDOWS", "LINUX" ou "MAC" com base no valor, sem diferenciar maiúsculas e minúsculas. |
| versão_so | entity.asset.platform_software.platform_version | Mapeado diretamente do registro bruto. |
| password_status | entity.asset.attribute.labels.key: "password_status", entity.asset.attribute.labels.value | Mapeado diretamente do registro bruto e convertido em letras minúsculas. |
| phone_id | entity.asset.product_object_id | Usado como "product_object_id" se "epkey" não estiver presente. Caso contrário, usa "token_id". |
| security_agents.security_agent | entity.asset.software.name | Extraído da matriz "security_agents" no registro bruto. |
| security_agents.version | entity.asset.software.version | Extraído da matriz "security_agents" no registro bruto. |
| timestamp | entity.metadata.collected_timestamp | Preenche o campo "collected_timestamp" no objeto "metadata". |
| token_id | entity.asset.product_object_id | Usado como "product_object_id" se "epkey" e "phone_id" não estiverem presentes. |
| trusted_endpoint | entity.asset.attribute.labels.key: "trusted_endpoint", entity.asset.attribute.labels.value | Mapeado diretamente do registro bruto e convertido em letras minúsculas. |
| tipo | entity.asset.type | Se o "type" do registro bruto contiver "mobile" (sem diferenciação de maiúsculas e minúsculas), defina como "MOBILE". Caso contrário, defina como "LAPTOP". |
| user_id | entity.asset.asset_id | Usado como "asset_id" se "hardware_uuid" não estiver presente. |
| users.email | entity.user.email_addresses | Usado como "email_addresses" se for o primeiro usuário na matriz "users" e contiver "@". |
| users.username | entity.user.userid | Nome de usuário extraído antes de "@" e usado como "userid" se for o primeiro usuário na matriz "users". |
| entity.metadata.vendor_name | "Duo" | |
| entity.metadata.product_name | "Dados de contexto da entidade do Duo" | |
| entity.metadata.entity_type | RECURSO | |
| entity.relations.entity_type | USUÁRIO | |
| entity.relations.relationship | OWNS |
Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais do Google SecOps.