Coletar registros de indicadores de Digital Shadows

Compatível com:

Este documento explica como ingerir registros de indicadores do Digital Shadows no Google Security Operations usando o Amazon S3.

Os indicadores da Digital Shadows (agora parte da ReliaQuest GreyMatter DRP) são uma plataforma de proteção contra riscos digitais que monitora e detecta continuamente ameaças externas, exposições de dados e falsificação de identidade de marca na web aberta, na deep web, na dark web e nas redes sociais. Ele fornece inteligência de ameaças, alertas de incidentes e indicadores de comprometimento (IOCs) para ajudar as organizações a identificar e reduzir riscos digitais.

Antes de começar

  • Uma instância do Google SecOps
  • Acesso privilegiado ao portal Indicadores do Digital Shadows
  • Acesso privilegiado à AWS (S3, IAM)
  • Assinatura ativa dos indicadores do Digital Shadows com acesso à API ativado

Coletar credenciais da API Digital Shadows Indicators

  1. Faça login no portal de indicadores de rastros digitais em https://portal-digitalshadows.com.
  2. Acesse Configurações > Credenciais da API.
  3. Se você não tiver uma chave de API, clique em Criar novo cliente de API ou Gerar chave de API.
  4. Copie e salve os seguintes detalhes em um local seguro:

    • Chave de API: sua chave de API de seis caracteres
    • Chave secreta da API: sua chave secreta de 32 caracteres.
    • ID da conta: seu ID da conta (exibido no portal ou fornecido pelo representante da Digital Shadows)
    • URL base da API: https://api.searchlight.app/v1 ou https://portal-digitalshadows.com/api/v1 (dependendo da região do seu locatário)

Configurar o bucket do AWS S3 e o IAM para o Google SecOps

  1. Crie um bucket do Amazon S3 seguindo este guia do usuário: Como criar um bucket.
  2. Salve o Nome e a Região do bucket para referência futura (por exemplo, digital-shadows-logs).
  3. Crie um usuário seguindo este guia do usuário: Como criar um usuário do IAM.
  4. Selecione o usuário criado.
  5. Selecione a guia Credenciais de segurança.
  6. Clique em Criar chave de acesso na seção Chaves de acesso.
  7. Selecione Serviço de terceiros como o Caso de uso.
  8. Clique em Próxima.
  9. Opcional: adicione uma tag de descrição.
  10. Clique em Criar chave de acesso.
  11. Clique em Baixar arquivo .csv para salvar a chave de acesso e a chave de acesso secreta para referência futura.
  12. Clique em Concluído.
  13. Selecione a guia Permissões.
  14. Clique em Adicionar permissões na seção Políticas de permissões.
  15. Selecione Adicionar permissões.
  16. Selecione Anexar políticas diretamente.
  17. Pesquise a política AmazonS3FullAccess.
  18. Selecione a política.
  19. Clique em Próxima.
  20. Clique em Adicionar permissões

Configurar a política e o papel do IAM para uploads do S3

  1. No console da AWS, acesse a guia IAM > Políticas > Criar política > JSON.
  2. Copie e cole a política abaixo.
  3. JSON da política (substitua digital-shadows-logs se você tiver inserido um nome de bucket diferente):

    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "AllowPutObjects",
                "Effect": "Allow",
                "Action": "s3:PutObject",
                "Resource": "arn:aws:s3:::digital-shadows-logs/*"
            },
            {
                "Sid": "AllowGetStateObject",
                "Effect": "Allow",
                "Action": "s3:GetObject",
                "Resource": "arn:aws:s3:::digital-shadows-logs/digital-shadows/state.json"
            }
        ]
    }
    
  4. Clique em Próxima > Criar política.

  5. Acesse IAM > Funções > Criar função > Serviço da AWS > Lambda.

  6. Anexe a política recém-criada.

  7. Nomeie a função como DigitalShadowsLambdaRole e clique em Criar função.

Criar a função Lambda

  1. No console da AWS, acesse Lambda > Functions > Create function.
  2. Clique em Criar do zero.
  3. Informe os seguintes detalhes de configuração:

    Configuração Valor
    Nome DigitalShadowsCollector
    Ambiente de execução Python 3.13
    Arquitetura x86_64
    Função de execução DigitalShadowsLambdaRole
  4. Depois que a função for criada, abra a guia Código, exclua o stub e cole o código abaixo (DigitalShadowsCollector.py).

    import urllib3
    import json
    import boto3
    import os
    import base64
    import logging
    import time
    from datetime import datetime, timedelta, timezone
    from urllib.parse import urlencode
    
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    
    HTTP = urllib3.PoolManager(retries=False)
    storage_client = boto3.client('s3')
    
    def _basic_auth_header(key: str, secret: str) -> str:
        token = base64.b64encode(f"{key}:{secret}".encode("utf-8")).decode("utf-8")
        return f"Basic {token}"
    
    def _load_state(bucket, key, default_days=30) -> str:
        """Return ISO8601 checkpoint (UTC)."""
        try:
            response = storage_client.get_object(Bucket=bucket, Key=key)
            state_data = response['Body'].read().decode('utf-8')
            state = json.loads(state_data)
            ts = state.get("last_timestamp")
            if ts:
                return ts
        except storage_client.exceptions.NoSuchKey:
            logger.info("No previous state found, starting from default lookback")
        except Exception as e:
            logger.warning(f"State read error: {e}")
        return (datetime.now(timezone.utc) - timedelta(days=default_days)).isoformat()
    
    def _save_state(bucket, key, ts: str) -> None:
        storage_client.put_object(
            Bucket=bucket,
            Key=key,
            Body=json.dumps({"last_timestamp": ts}),
            ContentType="application/json"
        )
    
    def _get_json(url: str, headers: dict, params: dict, backoff_s=2, max_retries=3) -> dict:
        qs = f"?{urlencode(params)}" if params else ""
        for attempt in range(max_retries):
            r = HTTP.request("GET", f"{url}{qs}", headers=headers)
            if r.status == 200:
                return json.loads(r.data.decode("utf-8"))
            if r.status in (429, 500, 502, 503, 504):
                wait = backoff_s * (2 ** attempt)
                logger.warning(f"HTTP {r.status} from DS API, retrying in {wait}s")
                time.sleep(wait)
                continue
            raise RuntimeError(f"DS API error {r.status}: {r.data[:200]}")
        raise RuntimeError("Exceeded retry budget for DS API")
    
    def _collect(api_base, headers, path, since_ts, account_id, page_size, max_pages, time_param):
        items = []
        for page in range(max_pages):
            params = {
                "limit": page_size,
                "offset": page * page_size,
                time_param: since_ts,
            }
            if account_id:
                params["account-id"] = account_id
            data = _get_json(f"{api_base}/{path}", headers, params)
            batch = data.get("items") or data.get("data") or []
            if not batch:
                break
            items.extend(batch)
            if len(batch) < page_size:
                break
        return items
    
    def lambda_handler(event, context):
        bucket_name = os.environ["S3_BUCKET"]
        api_key = os.environ["DS_API_KEY"]
        api_secret = os.environ["DS_API_SECRET"]
        prefix = os.environ.get("S3_PREFIX", "digital-shadows")
        state_key = os.environ.get("STATE_KEY", "digital-shadows/state.json")
        api_base = os.environ.get("API_BASE", "https://api.searchlight.app/v1")
        account_id = os.environ.get("DS_ACCOUNT_ID", "")
        page_size = int(os.environ.get("PAGE_SIZE", "100"))
        max_pages = int(os.environ.get("MAX_PAGES", "10"))
    
        try:
            last_ts = _load_state(bucket_name, state_key)
            logger.info(f"Checkpoint: {last_ts}")
    
            headers = {
                "Authorization": _basic_auth_header(api_key, api_secret),
                "Accept": "application/json",
                "User-Agent": "Chronicle-DigitalShadows-S3/1.0",
            }
    
            records = []
    
            incidents = _collect(
                api_base, headers, "incidents", last_ts, account_id,
                page_size, max_pages, time_param="published-after"
            )
            for incident in incidents:
                incident['_source_type'] = 'incident'
            records.extend(incidents)
    
            intel_incidents = _collect(
                api_base, headers, "intel-incidents", last_ts, account_id,
                page_size, max_pages, time_param="published-after"
            )
            for intel in intel_incidents:
                intel['_source_type'] = 'intelligence_incident'
            records.extend(intel_incidents)
    
            indicators = _collect(
                api_base, headers, "indicators", last_ts, account_id,
                page_size, max_pages, time_param="lastUpdated-after"
            )
            for indicator in indicators:
                indicator['_source_type'] = 'ioc'
            records.extend(indicators)
    
            if records:
                newest = max(
                    (r.get("updated") or r.get("raised") or r.get("lastUpdated") or last_ts)
                    for r in records
                )
                key = f"{prefix}/digital_shadows_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}.json"
                body = "\n".join(json.dumps(r, separators=(",", ":")) for r in records)
                storage_client.put_object(
                    Bucket=bucket_name,
                    Key=key,
                    Body=body,
                    ContentType="application/x-ndjson"
                )
                _save_state(bucket_name, state_key, newest)
                msg = f"Wrote {len(records)} records to s3://{bucket_name}/{key}"
            else:
                msg = "No new records"
    
            logger.info(msg)
            return {'statusCode': 200, 'body': json.dumps({'message': msg, 'records': len(records) if records else 0})}
    
        except Exception as e:
            logger.error(f"Error processing logs: {str(e)}")
            raise
    
  5. Acesse Configuração > Variáveis de ambiente > Editar > Adicionar nova variável de ambiente.

  6. Insira as variáveis de ambiente fornecidas abaixo, substituindo pelos seus valores.

    Variáveis de ambiente

    Chave Valor de exemplo
    S3_BUCKET digital-shadows-logs
    S3_PREFIX digital-shadows/
    STATE_KEY digital-shadows/state.json
    DS_API_KEY ABC123 (sua chave de API de seis caracteres)
    DS_API_SECRET your-32-character-api-secret
    API_BASE https://api.searchlight.app/v1
    DS_ACCOUNT_ID your-account-id
    PAGE_SIZE 100
    MAX_PAGES 10
  7. Depois que a função for criada, permaneça na página dela ou abra Lambda > Functions > DigitalShadowsCollector.

  8. Selecione a guia Configuração.

  9. No painel Configuração geral, clique em Editar.

  10. Mude o Tempo limite para 5 minutos (300 segundos) e clique em Salvar.

Criar uma programação do EventBridge

  1. Acesse Amazon EventBridge > Scheduler > Criar programação.
  2. Informe os seguintes detalhes de configuração:
    • Programação recorrente: Taxa (1 hour)
    • Destino: sua função Lambda DigitalShadowsCollector
    • Nome: DigitalShadowsCollector-1h
  3. Clique em Criar programação.

Configurar um feed no Google SecOps para ingerir registros de indicadores do Digital Shadows

  1. Acesse Configurações do SIEM > Feeds.
  2. Clique em Adicionar novo feed.
  3. Na próxima página, clique em Configurar um único feed.
  4. Insira um nome exclusivo para o Nome do feed.
  5. Selecione Amazon S3 V2 como o Tipo de origem.
  6. Selecione Indicadores de rastros digitais como o Tipo de registro.
  7. Clique em Próxima e em Enviar.
  8. Especifique valores para os seguintes campos:

    • URI do S3: s3://digital-shadows-logs/digital-shadows/
    • Opção de exclusão da fonte: selecione a opção de exclusão de acordo com sua preferência.
    • Idade máxima do arquivo: inclui arquivos modificados nos últimos dias (o padrão é 180 dias).
    • ID da chave de acesso: chave de acesso do usuário com acesso ao bucket do S3
    • Chave de acesso secreta: chave secreta do usuário com acesso ao bucket do S3
    • Namespace do recurso: o namespace do recurso
    • Rótulos de ingestão: o rótulo a ser aplicado aos eventos deste feed
  9. Clique em Próxima e em Enviar.

Tabela de mapeamento do UDM

Campo de registro Mapeamento do UDM Lógica
valor entity.entity.file.md5 Definido se type == "MD5"
valor entity.entity.file.sha1 Definido se type == "SHA1"
valor entity.entity.file.sha256 Definido se type == "SHA256"
valor entity.entity.hostname Definido se type == "HOST"
valor entity.entity.ip O valor é copiado diretamente se type == "IP"
valor entity.entity.url Definido se type == "URL"
valor entity.entity.user.email_addresses O valor é copiado diretamente se type == "EMAIL"
tipo entity.metadata.entity_type Defina como "DOMAIN_NAME" se type == "HOST", "IP_ADDRESS" se type == "IP", "URL" se type == "URL", "USER" se type == "EMAIL", "FILE" se type in ["SHA1","SHA256","MD5"], caso contrário, "UNKNOWN_ENTITYTYPE"
lastUpdated entity.metadata.interval.start_time Convertido de ISO8601 para carimbo de data/hora se não estiver vazio
ID entity.metadata.product_entity_id O valor é copiado diretamente se não estiver vazio.
attributionTag.id, attributionTag.name, attributionTag.type entity.metadata.threat.about.labels Mesclado com objetos {key: nome do campo da tag, value: valor da tag} se não estiver vazio
sourceType entity.metadata.threat.category_details Valor copiado diretamente
entity.metadata.threat.threat_feed_name Defina como "Indicadores".
ID entity.metadata.threat.threat_id O valor é copiado diretamente se não estiver vazio.
sourceIdentifier entity.metadata.threat.url_back_to_product Valor copiado diretamente
entity.metadata.product_name Defina como "Indicadores".
entity.metadata.vendor_name Definido como "Digital Shadows"

Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais do Google SecOps.