Coletar registros de indicadores de Digital Shadows
Este documento explica como ingerir registros de indicadores do Digital Shadows no Google Security Operations usando o Amazon S3.
Os indicadores da Digital Shadows (agora parte da ReliaQuest GreyMatter DRP) são uma plataforma de proteção contra riscos digitais que monitora e detecta continuamente ameaças externas, exposições de dados e falsificação de identidade de marca na web aberta, na deep web, na dark web e nas redes sociais. Ele fornece inteligência de ameaças, alertas de incidentes e indicadores de comprometimento (IOCs) para ajudar as organizações a identificar e reduzir riscos digitais.
Antes de começar
- Uma instância do Google SecOps
- Acesso privilegiado ao portal Indicadores do Digital Shadows
- Acesso privilegiado à AWS (S3, IAM)
- Assinatura ativa dos indicadores do Digital Shadows com acesso à API ativado
Coletar credenciais da API Digital Shadows Indicators
- Faça login no portal de indicadores de rastros digitais em
https://portal-digitalshadows.com. - Acesse Configurações > Credenciais da API.
- Se você não tiver uma chave de API, clique em Criar novo cliente de API ou Gerar chave de API.
Copie e salve os seguintes detalhes em um local seguro:
- Chave de API: sua chave de API de seis caracteres
- Chave secreta da API: sua chave secreta de 32 caracteres.
- ID da conta: seu ID da conta (exibido no portal ou fornecido pelo representante da Digital Shadows)
- URL base da API:
https://api.searchlight.app/v1ouhttps://portal-digitalshadows.com/api/v1(dependendo da região do seu locatário)
Configurar o bucket do AWS S3 e o IAM para o Google SecOps
- Crie um bucket do Amazon S3 seguindo este guia do usuário: Como criar um bucket.
- Salve o Nome e a Região do bucket para referência futura (por exemplo,
digital-shadows-logs). - Crie um usuário seguindo este guia do usuário: Como criar um usuário do IAM.
- Selecione o usuário criado.
- Selecione a guia Credenciais de segurança.
- Clique em Criar chave de acesso na seção Chaves de acesso.
- Selecione Serviço de terceiros como o Caso de uso.
- Clique em Próxima.
- Opcional: adicione uma tag de descrição.
- Clique em Criar chave de acesso.
- Clique em Baixar arquivo .csv para salvar a chave de acesso e a chave de acesso secreta para referência futura.
- Clique em Concluído.
- Selecione a guia Permissões.
- Clique em Adicionar permissões na seção Políticas de permissões.
- Selecione Adicionar permissões.
- Selecione Anexar políticas diretamente.
- Pesquise a política AmazonS3FullAccess.
- Selecione a política.
- Clique em Próxima.
- Clique em Adicionar permissões
Configurar a política e o papel do IAM para uploads do S3
- No console da AWS, acesse a guia IAM > Políticas > Criar política > JSON.
- Copie e cole a política abaixo.
JSON da política (substitua
digital-shadows-logsse você tiver inserido um nome de bucket diferente):{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::digital-shadows-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::digital-shadows-logs/digital-shadows/state.json" } ] }Clique em Próxima > Criar política.
Acesse IAM > Funções > Criar função > Serviço da AWS > Lambda.
Anexe a política recém-criada.
Nomeie a função como
DigitalShadowsLambdaRolee clique em Criar função.
Criar a função Lambda
- No console da AWS, acesse Lambda > Functions > Create function.
- Clique em Criar do zero.
Informe os seguintes detalhes de configuração:
Configuração Valor Nome DigitalShadowsCollectorAmbiente de execução Python 3.13 Arquitetura x86_64 Função de execução DigitalShadowsLambdaRoleDepois que a função for criada, abra a guia Código, exclua o stub e cole o código abaixo (DigitalShadowsCollector.py).
import urllib3 import json import boto3 import os import base64 import logging import time from datetime import datetime, timedelta, timezone from urllib.parse import urlencode logger = logging.getLogger() logger.setLevel(logging.INFO) HTTP = urllib3.PoolManager(retries=False) storage_client = boto3.client('s3') def _basic_auth_header(key: str, secret: str) -> str: token = base64.b64encode(f"{key}:{secret}".encode("utf-8")).decode("utf-8") return f"Basic {token}" def _load_state(bucket, key, default_days=30) -> str: """Return ISO8601 checkpoint (UTC).""" try: response = storage_client.get_object(Bucket=bucket, Key=key) state_data = response['Body'].read().decode('utf-8') state = json.loads(state_data) ts = state.get("last_timestamp") if ts: return ts except storage_client.exceptions.NoSuchKey: logger.info("No previous state found, starting from default lookback") except Exception as e: logger.warning(f"State read error: {e}") return (datetime.now(timezone.utc) - timedelta(days=default_days)).isoformat() def _save_state(bucket, key, ts: str) -> None: storage_client.put_object( Bucket=bucket, Key=key, Body=json.dumps({"last_timestamp": ts}), ContentType="application/json" ) def _get_json(url: str, headers: dict, params: dict, backoff_s=2, max_retries=3) -> dict: qs = f"?{urlencode(params)}" if params else "" for attempt in range(max_retries): r = HTTP.request("GET", f"{url}{qs}", headers=headers) if r.status == 200: return json.loads(r.data.decode("utf-8")) if r.status in (429, 500, 502, 503, 504): wait = backoff_s * (2 ** attempt) logger.warning(f"HTTP {r.status} from DS API, retrying in {wait}s") time.sleep(wait) continue raise RuntimeError(f"DS API error {r.status}: {r.data[:200]}") raise RuntimeError("Exceeded retry budget for DS API") def _collect(api_base, headers, path, since_ts, account_id, page_size, max_pages, time_param): items = [] for page in range(max_pages): params = { "limit": page_size, "offset": page * page_size, time_param: since_ts, } if account_id: params["account-id"] = account_id data = _get_json(f"{api_base}/{path}", headers, params) batch = data.get("items") or data.get("data") or [] if not batch: break items.extend(batch) if len(batch) < page_size: break return items def lambda_handler(event, context): bucket_name = os.environ["S3_BUCKET"] api_key = os.environ["DS_API_KEY"] api_secret = os.environ["DS_API_SECRET"] prefix = os.environ.get("S3_PREFIX", "digital-shadows") state_key = os.environ.get("STATE_KEY", "digital-shadows/state.json") api_base = os.environ.get("API_BASE", "https://api.searchlight.app/v1") account_id = os.environ.get("DS_ACCOUNT_ID", "") page_size = int(os.environ.get("PAGE_SIZE", "100")) max_pages = int(os.environ.get("MAX_PAGES", "10")) try: last_ts = _load_state(bucket_name, state_key) logger.info(f"Checkpoint: {last_ts}") headers = { "Authorization": _basic_auth_header(api_key, api_secret), "Accept": "application/json", "User-Agent": "Chronicle-DigitalShadows-S3/1.0", } records = [] incidents = _collect( api_base, headers, "incidents", last_ts, account_id, page_size, max_pages, time_param="published-after" ) for incident in incidents: incident['_source_type'] = 'incident' records.extend(incidents) intel_incidents = _collect( api_base, headers, "intel-incidents", last_ts, account_id, page_size, max_pages, time_param="published-after" ) for intel in intel_incidents: intel['_source_type'] = 'intelligence_incident' records.extend(intel_incidents) indicators = _collect( api_base, headers, "indicators", last_ts, account_id, page_size, max_pages, time_param="lastUpdated-after" ) for indicator in indicators: indicator['_source_type'] = 'ioc' records.extend(indicators) if records: newest = max( (r.get("updated") or r.get("raised") or r.get("lastUpdated") or last_ts) for r in records ) key = f"{prefix}/digital_shadows_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}.json" body = "\n".join(json.dumps(r, separators=(",", ":")) for r in records) storage_client.put_object( Bucket=bucket_name, Key=key, Body=body, ContentType="application/x-ndjson" ) _save_state(bucket_name, state_key, newest) msg = f"Wrote {len(records)} records to s3://{bucket_name}/{key}" else: msg = "No new records" logger.info(msg) return {'statusCode': 200, 'body': json.dumps({'message': msg, 'records': len(records) if records else 0})} except Exception as e: logger.error(f"Error processing logs: {str(e)}") raiseAcesse Configuração > Variáveis de ambiente > Editar > Adicionar nova variável de ambiente.
Insira as variáveis de ambiente fornecidas abaixo, substituindo pelos seus valores.
Variáveis de ambiente
Chave Valor de exemplo S3_BUCKETdigital-shadows-logsS3_PREFIXdigital-shadows/STATE_KEYdigital-shadows/state.jsonDS_API_KEYABC123(sua chave de API de seis caracteres)DS_API_SECRETyour-32-character-api-secretAPI_BASEhttps://api.searchlight.app/v1DS_ACCOUNT_IDyour-account-idPAGE_SIZE100MAX_PAGES10Depois que a função for criada, permaneça na página dela ou abra Lambda > Functions > DigitalShadowsCollector.
Selecione a guia Configuração.
No painel Configuração geral, clique em Editar.
Mude o Tempo limite para 5 minutos (300 segundos) e clique em Salvar.
Criar uma programação do EventBridge
- Acesse Amazon EventBridge > Scheduler > Criar programação.
- Informe os seguintes detalhes de configuração:
- Programação recorrente: Taxa (
1 hour) - Destino: sua função Lambda
DigitalShadowsCollector - Nome:
DigitalShadowsCollector-1h
- Programação recorrente: Taxa (
- Clique em Criar programação.
Configurar um feed no Google SecOps para ingerir registros de indicadores do Digital Shadows
- Acesse Configurações do SIEM > Feeds.
- Clique em Adicionar novo feed.
- Na próxima página, clique em Configurar um único feed.
- Insira um nome exclusivo para o Nome do feed.
- Selecione Amazon S3 V2 como o Tipo de origem.
- Selecione Indicadores de rastros digitais como o Tipo de registro.
- Clique em Próxima e em Enviar.
Especifique valores para os seguintes campos:
- URI do S3:
s3://digital-shadows-logs/digital-shadows/ - Opção de exclusão da fonte: selecione a opção de exclusão de acordo com sua preferência.
- Idade máxima do arquivo: inclui arquivos modificados nos últimos dias (o padrão é 180 dias).
- ID da chave de acesso: chave de acesso do usuário com acesso ao bucket do S3
- Chave de acesso secreta: chave secreta do usuário com acesso ao bucket do S3
- Namespace do recurso: o namespace do recurso
- Rótulos de ingestão: o rótulo a ser aplicado aos eventos deste feed
- URI do S3:
Clique em Próxima e em Enviar.
Tabela de mapeamento do UDM
| Campo de registro | Mapeamento do UDM | Lógica |
|---|---|---|
| valor | entity.entity.file.md5 | Definido se type == "MD5" |
| valor | entity.entity.file.sha1 | Definido se type == "SHA1" |
| valor | entity.entity.file.sha256 | Definido se type == "SHA256" |
| valor | entity.entity.hostname | Definido se type == "HOST" |
| valor | entity.entity.ip | O valor é copiado diretamente se type == "IP" |
| valor | entity.entity.url | Definido se type == "URL" |
| valor | entity.entity.user.email_addresses | O valor é copiado diretamente se type == "EMAIL" |
| tipo | entity.metadata.entity_type | Defina como "DOMAIN_NAME" se type == "HOST", "IP_ADDRESS" se type == "IP", "URL" se type == "URL", "USER" se type == "EMAIL", "FILE" se type in ["SHA1","SHA256","MD5"], caso contrário, "UNKNOWN_ENTITYTYPE" |
| lastUpdated | entity.metadata.interval.start_time | Convertido de ISO8601 para carimbo de data/hora se não estiver vazio |
| ID | entity.metadata.product_entity_id | O valor é copiado diretamente se não estiver vazio. |
| attributionTag.id, attributionTag.name, attributionTag.type | entity.metadata.threat.about.labels | Mesclado com objetos {key: nome do campo da tag, value: valor da tag} se não estiver vazio |
| sourceType | entity.metadata.threat.category_details | Valor copiado diretamente |
| entity.metadata.threat.threat_feed_name | Defina como "Indicadores". | |
| ID | entity.metadata.threat.threat_id | O valor é copiado diretamente se não estiver vazio. |
| sourceIdentifier | entity.metadata.threat.url_back_to_product | Valor copiado diretamente |
| entity.metadata.product_name | Defina como "Indicadores". | |
| entity.metadata.vendor_name | Definido como "Digital Shadows" |
Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais do Google SecOps.