Coletar registros de EDR do Digital Guardian
Este documento explica como ingerir registros do Digital Guardian EDR no Google Security Operations usando o Google Cloud Storage V2 por uma função do Cloud Run.
O Digital Guardian da Fortra (antigo Digital Guardian) é uma plataforma abrangente de prevenção contra perda de dados e detecção e resposta de endpoints que oferece visibilidade de eventos de sistema, usuário e dados em endpoints, redes e aplicativos na nuvem. O serviço Analytics & Reporting Cloud (ARC) oferece recursos avançados de análise, fluxo de trabalho e relatórios para proteção de dados holística. A função do Cloud Run faz a autenticação na API ARC Export usando o OAuth 2.0, recupera dados de exportação, confirma o marcador para avançar para o próximo bloco, grava os resultados como NDJSON em um bucket do GCS, e o Google SecOps os ingere por um feed V2 do GCS.
Antes de começar
Verifique se você tem os pré-requisitos a seguir:
- Uma instância do Google SecOps
- Um projeto do Google Cloud com as seguintes APIs ativadas:
- Cloud Storage
- Cloud Run Functions
- Cloud Scheduler
- Pub/Sub
- Cloud Build
- Permissões para criar e gerenciar buckets do Cloud Storage, funções do Cloud Run, tópicos do Pub/Sub e jobs do Cloud Scheduler
- Acesso privilegiado ao console de gerenciamento do Digital Guardian (DGMC, na sigla em inglês)
- Acesso às configurações do locatário do Digital Guardian Analytics & Reporting Cloud (ARC)
- Permissões de administrador para configurar os Serviços do Google Cloud no DGMC
- Um perfil de exportação criado no DGMC com um GUID válido
Criar um bucket do Google Cloud Storage.
- Acesse o Console do Google Cloud.
- Selecione seu projeto ou crie um novo.
- No menu de navegação, acesse Cloud Storage > Buckets.
- Clique em Criar bucket.
Informe os seguintes detalhes de configuração:
Configuração Valor Nomeie seu bucket Insira um nome exclusivo globalmente, por exemplo, digitalguardian-edr-logs.Tipo de local Escolha com base nas suas necessidades (região, birregional, multirregional) Local Selecione o local mais próximo da sua instância do Google SecOps (por exemplo, us-central1).Classe de armazenamento Padrão (recomendado para registros acessados com frequência) Controle de acesso Uniforme (recomendado) Ferramentas de proteção Opcional: ativar o controle de versões de objetos ou a política de retenção Clique em Criar.
Coletar credenciais da API Digital Guardian
Para permitir que a função do Cloud Run recupere dados de exportação do ARC do Digital Guardian, você precisa conseguir credenciais de API e configurar um perfil de exportação.
Receber credenciais de API do DGMC
- Faça login no Digital Guardian Management Console (DGMC).
- Acesse Sistema > Configuração > Serviços de nuvem.
Na seção Acesso à API, localize e registre os seguintes valores:
- ID de acesso à API: é o ID do cliente para autenticação OAuth 2.0.
- Chave secreta de acesso à API: é a chave secreta do cliente para autenticação OAuth 2.0.
- URL base do Access Gateway: o endpoint do gateway de API (por exemplo,
https://accessgw-usw.msp.digitalguardian.com). - URL do servidor de autorização: o endpoint do token OAuth 2.0 (por exemplo,
https://authsrv.msp.digitalguardian.com/as/token.oauth2).
Criar e configurar um perfil de exportação
- No Digital Guardian Management Console (DGMC), acesse Admin > Reports > Export Profiles.
- Clique em Criar perfil de exportação ou selecione um perfil de exportação existente.
- Configure o perfil de exportação com as seguintes configurações:
- Nome do perfil: insira um nome descritivo, por exemplo,
Google SecOps SIEM Integration. - Fonte de dados: selecione Eventos ou Alertas, dependendo dos dados que você quer exportar.
- Formato de exportação: selecione Tabela JSON simplificada (recomendado para integrações de SIEM).
- Campos: selecione os campos que você quer incluir na exportação.
- Filtros: configure filtros para limitar os dados exportados (opcional).
- Nome do perfil: insira um nome descritivo, por exemplo,
- Clique em Salvar para criar o perfil de exportação.
Depois de salvar, localize o perfil de exportação na lista e copie o GUID do URL do perfil de exportação ou da página de detalhes.
Resumo de credenciais de registro
Salve as seguintes informações para configurar as variáveis de ambiente da função do Cloud Run:
- ID do cliente (ID de acesso à API): dos serviços de nuvem do DGMC
- Chave secreta do cliente (chave secreta de acesso à API): dos serviços de nuvem do DGMC
- URL do servidor de autorização: por exemplo,
https://authsrv.msp.digitalguardian.com/as/token.oauth2 - URL base do Access Gateway: por exemplo,
https://accessgw-usw.msp.digitalguardian.com - GUID do perfil de exportação: do perfil de exportação criado no DGMC
Testar o acesso à API
Para verificar se as credenciais são válidas, execute os seguintes comandos:
# Step 1: Obtain OAuth 2.0 access token curl -s -X POST \ -d "grant_type=client_credentials&client_id=YOUR_CLIENT_ID&client_secret=YOUR_CLIENT_SECRET&scope=client" \ "https://authsrv.msp.digitalguardian.com/as/token.oauth2"# Step 2: Test export endpoint with the access token curl -s -H "Authorization: Bearer YOUR_ACCESS_TOKEN" \ "https://accessgw-usw.msp.digitalguardian.com/rest/1.0/export/YOUR_EXPORT_PROFILE_GUID"Uma resposta bem-sucedida retorna um documento JSON com os dados de exportação. Se você receber um erro de autenticação, verifique o ID e o segredo de acesso à API nos serviços de nuvem do DGMC.
Criar uma conta de serviço para a função do Cloud Run
- No console do Google Cloud, acesse IAM e administrador > Contas de serviço.
- Clique em Criar conta de serviço.
- Informe os seguintes detalhes de configuração:
- Nome da conta de serviço: insira
digitalguardian-ingestion(ou um nome descritivo). - Descrição da conta de serviço: insira
Service account for Digital Guardian EDR Cloud Run function to write logs to GCS.
- Nome da conta de serviço: insira
- Clique em Criar e continuar.
- Na seção Conceder acesso a essa conta de serviço ao projeto, adicione os seguintes papéis:
- Administrador de objetos do Storage (para ler/gravar objetos no bucket do Cloud Storage)
- Invocador do Cloud Run (para permitir que o Cloud Scheduler invoque a função)
- Clique em Continuar.
Clique em Concluído.
Criar um tópico do Pub/Sub
O Cloud Scheduler aciona a função do Cloud Run por um tópico do Pub/Sub.
- No console do Google Cloud, acesse Pub/Sub > Tópicos.
- Selecione Criar tópico.
- No campo ID do tópico, insira
digitalguardian-edr-trigger. - Não mude as configurações padrão.
Clique em Criar.
Criar a função do Cloud Run
Crie uma função do Cloud Run que faça a autenticação no ARC do Digital Guardian usando credenciais do cliente OAuth 2.0, recupere dados de exportação, confirme o marcador para avançar para o próximo bloco e grave os resultados como NDJSON no GCS.
Preparar arquivos de origem da função
Crie os dois arquivos a seguir para a implantação da função do Cloud Run.
requirements.txt
functions-framework==3.* google-cloud-storage==2.* urllib3==2.*main.py
"""Cloud Run function to ingest Digital Guardian EDR logs into GCS.""" import json import os import time import urllib.parse from datetime import datetime, timezone import functions_framework import urllib3 from google.cloud import storage GCS_BUCKET = os.environ["GCS_BUCKET"] GCS_PREFIX = os.environ.get("GCS_PREFIX", "digitalguardian_edr") STATE_KEY = os.environ.get("STATE_KEY", "digitalguardian_edr_state.json") AUTH_SERVER_URL = os.environ["AUTH_SERVER_URL"] ARC_SERVER_URL = os.environ["ARC_SERVER_URL"] CLIENT_ID = os.environ["CLIENT_ID"] CLIENT_SECRET = os.environ["CLIENT_SECRET"] EXPORT_PROFILE_GUID = os.environ["EXPORT_PROFILE_GUID"] MAX_RECORDS = int(os.environ.get("MAX_RECORDS", "10000")) http = urllib3.PoolManager() gcs = storage.Client() def _get_access_token() -> str: """Obtain an OAuth 2.0 access token using client credentials grant.""" body = urllib.parse.urlencode({ "grant_type": "client_credentials", "client_id": CLIENT_ID, "client_secret": CLIENT_SECRET, "scope": "client", }) resp = http.request( "POST", AUTH_SERVER_URL, body=body, headers={"Content-Type": "application/x-www-form-urlencoded"}, ) if resp.status != 200: raise RuntimeError( f"OAuth token request failed: {resp.status} — " f"{resp.data.decode('utf-8')}" ) token_data = json.loads(resp.data.decode("utf-8")) return token_data["access_token"] def _arc_get(token: str, path: str, retries: int = 5) -> dict: """Execute a GET request against the ARC API with retry on 429.""" url = f"{ARC_SERVER_URL}{path}" headers = { "Authorization": f"Bearer {token}", "Accept": "application/json", } backoff = 2 for attempt in range(retries): resp = http.request("GET", url, headers=headers) if resp.status == 200: return json.loads(resp.data.decode("utf-8")) if resp.status == 429: wait = backoff * (2 ** attempt) print( f"Rate limited (429). Retrying in {wait}s " f"(attempt {attempt + 1}/{retries})." ) time.sleep(wait) continue raise RuntimeError( f"ARC API error: {resp.status} — {resp.data.decode('utf-8')}" ) raise RuntimeError( "ARC API rate limit exceeded after maximum retries." ) def _arc_acknowledge(token: str) -> None: """POST to the acknowledge endpoint to advance the export bookmark.""" url = ( f"{ARC_SERVER_URL}/rest/1.0/export/" f"{EXPORT_PROFILE_GUID}/acknowledge" ) headers = { "Authorization": f"Bearer {token}", "Accept": "application/json", } resp = http.request("POST", url, headers=headers) if resp.status not in (200, 204): raise RuntimeError( f"ARC acknowledge failed: {resp.status} — " f"{resp.data.decode('utf-8')}" ) print("Export bookmark acknowledged successfully.") def _load_state() -> dict: """Load the last run state from GCS.""" bucket = gcs.bugcs.bucketUCKET) blob = bucket.blob(f"{GCS_PREFIX}/{STATE_KEY}") if blob.exists(): return json.loads(blob.downlodownload_as_text return {} def _save_state(state: dict) -> None: """Persist run state to GCS.""" bucket = gcs.bugcs.bucketUCKET) blob = bucket.blob(f"{GCS_PREFIX}/{STATE_KEY}") blob.uploadupload_from_string json.dumps(state), content_type="application/json" ) def _fetch_export(token: str) -> list: """Fetch export data from the ARC Export API.""" path = f"/rest/1.0/export/{EXPORT_PROFILE_GUID}" data = _arc_get(token, path) records = data if isinstance(data, list) else data.get("data", []) return records[:MAX_RECORDS] def _write_ndjson(records: list, run_ts: str) -> str: """Write records as NDJSON to GCS and return the blob path.""" bucket = gcs.bugcs.bucketUCKET) blob_path = ( f"{GCS_PREFIX}/year={run_ts[:4]}/month={run_ts[5:7]}/" f"day={run_ts[8:10]}/{run_ts}_export.ndjson" ) blob = bucket.blob(blob_path) ndjson = "\n".join( json.dumps(r, separators=(",", ":")) for r in records ) blob.uploadupload_from_stringn, content_type="application/x-ndjson") return blob_path @functions_framework.cloud_event def main(cloud_event): """Entry point triggered by Pub/Sub via Cloud Scheduler.""" state = _load_state() now = datetime.now(timezone.utc) print("Authenticating to Digital Guardian ARC.") token = _get_access_token() print( f"Fetching export data for profile {EXPORT_PROFILE_GUID}." ) records = _fetch_export(token) if not records: print("No new export data found.") return "OK" run_ts = now.strftime("%Y-%m-%dT%H%M%SZ") blob_path = _write_ndjson(records, run_ts) print( f"Wrote {len(records)} records to " f"gs://{GCS_BUCKET}/{blob_path}." ) _arc_acknowledge(token) state["last_run"] = now.isoformat() state["records_written"] = len(records) _save_state(state) print(f"State updated. last_run={now.isoformat()}.") return "OK"
implantar a função do Cloud Run
- Salve os dois arquivos (
main.pyerequirements.txt) em um diretório local (por exemplo,digitalguardian-function/). - Abra o Cloud Shell ou um terminal com a CLI
gcloudinstalada. Execute o seguinte comando para implantar a função:
gcloud functions deploy digitalguardian-edr-to-gcs \ --gen2 \ --region=us-central1 \ --runtime=python312 \ --trigger-topic=digitalguardian-edr-trigger \ --entry-point=main \ --memory=512MB \ --timeout=540s \ --service-account=digitalguardian-ingestion@PROJECT_ID.iam.gserviceaccount.com \ --set-env-vars=\ "GCS_BUCKET=digitalguardian-edr-logs",\ "GCS_PREFIX=digitalguardian_edr",\ "STATE_KEY=digitalguardian_edr_state.json",\ "AUTH_SERVER_URL=https://authsrv.msp.digitalguardian.com/as/token.oauth2",\ "ARC_SERVER_URL=https://accessgw-usw.msp.digitalguardian.com",\ "CLIENT_ID=YOUR_CLIENT_ID",\ "CLIENT_SECRET=YOUR_CLIENT_SECRET",\ "EXPORT_PROFILE_GUID=YOUR_EXPORT_PROFILE_GUID",\ "MAX_RECORDS=10000"Substitua os seguintes valores de marcador:
PROJECT_IDpelo ID do projeto no Google Cloud.digitalguardian-edr-logs: o nome do bucket do GCS.YOUR_CLIENT_ID: seu ID de acesso à API do Digital Guardian.YOUR_CLIENT_SECRET: sua chave secreta de acesso à API do Digital Guardian.YOUR_EXPORT_PROFILE_GUID: o GUID do seu perfil de exportação do DGMC.
Verifique a implantação conferindo o status da função:
gcloud functions describe digitalguardian-edr-to-gcs --region=us-central1 --gen2
Referência de variáveis de ambiente
| Variável | Obrigatório | Padrão | Descrição |
|---|---|---|---|
GCS_BUCKET |
Sim | Nome do bucket do GCS para armazenar a saída NDJSON | |
GCS_PREFIX |
Não | digitalguardian_edr |
Prefixo do objeto (caminho da pasta) no bucket |
STATE_KEY |
Não | digitalguardian_edr_state.json |
Nome do blob para o arquivo de estado no prefixo |
AUTH_SERVER_URL |
Sim | URL do servidor de autorização OAuth 2.0 | |
ARC_SERVER_URL |
Sim | URL base do gateway de acesso do ARC | |
CLIENT_ID |
Sim | ID de acesso à API do DGMC | |
CLIENT_SECRET |
Sim | Chave secreta de acesso à API do DGMC | |
EXPORT_PROFILE_GUID |
Sim | Exportar o GUID do perfil do DGMC | |
MAX_RECORDS |
Não | 10000 |
Número máximo de registros a serem gravados por execução |
Criar um job do Cloud Scheduler
O Cloud Scheduler aciona a função do Cloud Run em intervalos regulares pelo tópico do Pub/Sub.
- No console do Google Cloud, acesse o Cloud Scheduler.
- Clique em Criar job.
Informe os seguintes detalhes de configuração:
- Nome: insira
digitalguardian-edr-ingestion-schedule. - Região: selecione a mesma região da sua função do Cloud Run (por exemplo,
us-central1). Frequência: insira
*/5 * * * *(a cada 5 minutos).Fuso horário: selecione seu fuso horário preferido (por exemplo,
UTC).
- Nome: insira
Clique em Continuar.
Na seção Configurar a execução:
- Tipo de destino: selecione Pub/Sub.
- Tópico: selecione
digitalguardian-edr-trigger. - Corpo da mensagem: insira
{"run": true}.
Clique em Continuar.
Na seção Configurar configurações opcionais:
- Máximo de tentativas de repetição: insira
3. - Duração mínima de espera: insira
5s. - Duração máxima de espera: insira
60s.
- Máximo de tentativas de repetição: insira
Clique em Criar.
Para executar um teste imediato, clique nos três pontos (...) ao lado do nome do job e selecione Executar à força.
Recuperar a conta de serviço do Google SecOps e configurar o feed
O Google SecOps usa uma conta de serviço exclusiva para ler dados do seu bucket do GCS. Você precisa conceder a essa conta de serviço acesso ao seu bucket.
Receber o e-mail da conta de serviço
- Acesse Configurações do SIEM > Feeds.
- Clique em Adicionar novo feed.
- Clique em Configurar um único feed.
- No campo Nome do feed, insira um nome para o feed (por exemplo,
Digital Guardian EDR Logs). - Selecione Google Cloud Storage V2 como o Tipo de origem.
- Selecione Digital Guardian EDR como o Tipo de registro.
Clique em Receber conta de serviço. Um e-mail exclusivo da conta de serviço será exibido, por exemplo:
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.comCopie esse endereço de e-mail para usar na próxima etapa.
Clique em Próxima.
Especifique valores para os seguintes parâmetros de entrada:
URL do bucket de armazenamento: insira o URI do bucket do GCS:
gs://digitalguardian-edr-logs/digitalguardian_edr/- Substitua
digitalguardian-edr-logspelo nome do bucket do GCS. - Substitua
digitalguardian_edrpelo valorGCS_PREFIXconfigurado.
- Substitua
Opção de exclusão da fonte: selecione a opção de exclusão de acordo com sua preferência:
- Nunca: nunca exclui arquivos após as transferências (recomendado para testes).
- Excluir arquivos transferidos: exclui os arquivos após a transferência bem-sucedida.
Excluir arquivos transferidos e diretórios vazios: exclui arquivos e diretórios vazios após a transferência bem-sucedida.
Idade máxima do arquivo: inclui arquivos modificados nos últimos dias (o padrão é 180 dias).
Namespace do recurso: o namespace do recurso.
Rótulos de ingestão: o rótulo a ser aplicado aos eventos deste feed.
Clique em Próxima.
Revise a nova configuração do feed na tela Finalizar e clique em Enviar.
Conceder permissões do IAM à conta de serviço do Google SecOps
A conta de serviço do Google SecOps precisa do papel Leitor de objetos do Storage no seu bucket do Cloud Storage.
- Acesse Cloud Storage > Buckets.
- Clique no nome do bucket (por exemplo,
digitalguardian-edr-logs). - Acesse a guia Permissões.
- Clique em Conceder acesso.
- Informe os seguintes detalhes de configuração:
- Adicionar participantes: cole o e-mail da conta de serviço do Google SecOps (por exemplo,
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com). - Atribuir papéis: selecione Leitor de objetos do Storage.
- Adicionar participantes: cole o e-mail da conta de serviço do Google SecOps (por exemplo,
Clique em Salvar.
Tabela de mapeamento do UDM
| Campo de registro | Mapeamento do UDM | Lógica |
|---|---|---|
| Aplicativo | target.application | Valor copiado diretamente |
| Aplicativo | target.process.command_line | Definido como %{Application} se a regra corresponder a Printer. |
| Bytes_Written | network.sent_bytes | Valor copiado diretamente e convertido em uinteger |
| Categoria, Computer_Name, Detail | metadata.description | Definido como %{Detail} se Category == "Policies" e Computer_Name estiver vazio; caso contrário, definido como %{message} em grok_parse_failure |
| Command_Line, Command_Line1 | principal.process.command_line | Valor de Command_Line após remover as aspas finais, se não estiver vazio. Caso contrário, valor de Command_Line1 após remover as aspas finais. |
| Computer_Name, source | principal.hostname | Valor de computerName se não estiver vazio. Caso contrário, definido como %{source} |
| Destination_Device_Serial_Number, Destination_Device_Serial_Number1 | Extraído usando o padrão grok para processar aspas | |
| Destination_Directory, Destination_File | target.file.full_path | Concatenado de Destination_Directory e Destination_File se ambos não estiverem vazios |
| Destination_Drive_Type | security_result.detection_fields | Mesclado com destination_drive_type_label (chave: Destination_Drive_Type, valor: %{Destination_Drive_Type}) |
| Destination_File | target.file.names | Mesclado de Destination_File |
| Destination_File_Extension | target.file.mime_type | Valor copiado diretamente |
| Dll_SHA1_Hash | target.process.file.sha1 | Valor copiado diretamente após a conversão para minúsculas |
| Email_Address | principal.user.email_addresses | Mesclado de Email_Address |
| Email_Sender, Email_Subject | network.email.from | Definido como %{Email_Sender} se não estiver vazio |
| Email_Sender, Email_Subject | network.email.subject | Unido ao assunto (%{Email_Subject}) se Email_Sender não estiver vazio |
| File_Extension | principal.process.file.mime_type | Valor copiado diretamente |
| IP_Address, source_ip | principal.ip | Unido de "source_ip" se não estiver vazio, caso contrário, de "IP_Address" |
| Local_Port, source_port | principal.port | Valor de source_port se não estiver vazio e convertido em número inteiro. Caso contrário, de Local_Port e convertido em número inteiro. |
| MD5_Checksum | target.process.file.md5 | Valor copiado diretamente após a conversão para minúsculas |
| Network_Direction | network.direction | Definido como INBOUND se True, caso contrário, OUTBOUND se False |
| Process_PID | principal.process.pid | Valor copiado diretamente |
| Process_SHA256_Hash | target.process.file.sha256 | Valor copiado diretamente após a conversão para minúsculas |
| Product_Version | metadata.product_version | Valor copiado diretamente |
| Protocolo | network.ip_protocol | Definido como ICMP se == "1" |
| Remote_Port | target.port | Valor copiado diretamente e convertido em número inteiro |
| Regra | security_result.rule_name | Valor copiado diretamente |
| Regra | metadata.event_type | Definido como PROCESS_UNCATEGORIZED se corresponder a .Printer.. Caso contrário, FILE_MOVE se corresponder a DLP.* |
| Gravidade | security_result.severity | Definido como LOW se <=3, MEDIUM se <=6, HIGH se <=8, CRITICAL se <=10 após a conversão para inteiro |
| Gravidade | security_result.severity_details | Valor copiado diretamente |
| Source_Directory, Source_File | src.file.full_path | Concatenado de Source_Directory e Source_File se ambos não estiverem vazios |
| Source_Drive_Type | security_result.detection_fields | Mesclado com source_drive_type_label (chave: Source_Drive_Type, valor: %{Source_Drive_Type}) |
| Source_File | src.file.names | Mesclado de Source_File |
| Source_File_Extension | src.file.mime_type | Valor copiado diretamente |
| URL_Path, http_url | target.url | Valor de http_url se não estiver vazio, caso contrário, de URL_Path |
| User_Name | principal.user.userid | Valor de userName após a extração do grok |
| User_Name | principal.administrative_domain | Valor de domainName após a extração do grok |
| Was_Removable | security_result.detection_fields | Mesclado com was_removable_label (chave: Was_Removable, valor: %{Was_Removable}) |
| Was_Source_Removable | security_result.detection_fields | Mesclado com was_source_removable_label (chave: Was_Source_Removable, valor: %{Was_Source_Removable}) |
| computerName, destination_ip, protocol, source_ip, IP_Address, destination, userName, Process_PID, Category, Computer_Name | metadata.event_type | Definido como GENERIC_EVENT inicialmente; NETWORK_HTTP se protocol == HTTPS e (destination_ip ou computerName); NETWORK_CONNECTION se (source_ip ou IP_Address) e destination_ip; USER_UNCATEGORIZED se userName não estiver vazio; SCAN_PROCESS se Process_PID não estiver vazio |
| destination_ip | target.ip | Mesclado de destination_ip |
| incidents_url, matched_policies_by_severity | security_result | Consolidado com _sr (rule_name: %{matched_policies_by_severity}, url_back_to_product: %{incidents_url}) |
| protocolo | network.application_protocol | Definido como HTTPS se protocol == HTTP ou HTTPS |
| security_action | security_result.action | Mesclado de security_action |
| metadata.product_name | Defina como "Plataforma de DLP empresarial" | |
| metadata.vendor_name | Definido como "DigitalGuardian". |
Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais do Google SecOps.