Raccogliere i log degli indicatori di Digital Shadows

Supportato in:

Questo documento spiega come importare i log degli indicatori di Digital Shadows in Google Security Operations utilizzando Amazon S3.

Digital Shadows Indicators (ora parte di ReliaQuest GreyMatter DRP) è una piattaforma di protezione dal rischio digitale che monitora e rileva continuamente minacce esterne, esposizioni di dati e furti di identità dei brand su open web, deep web, dark web e social media. Fornisce threat intelligence, avvisi relativi agli incidenti e indicatori di compromissione (IOC) per aiutare le organizzazioni a identificare e mitigare i rischi digitali.

Prima di iniziare

  • Un'istanza Google SecOps
  • Accesso con privilegi al portale Digital Shadows Indicators
  • Accesso privilegiato ad AWS (S3, IAM)
  • Abbonamento attivo a Digital Shadows Indicators con accesso API abilitato

Raccogliere le credenziali API di Digital Shadows Indicators

  1. Accedi al portale degli indicatori di Digital Shadows all'indirizzo https://portal-digitalshadows.com.
  2. Vai a Impostazioni > Credenziali API.
  3. Se non hai una chiave API esistente, fai clic su Crea nuovo client API o Genera chiave API.
  4. Copia e salva i seguenti dettagli in una posizione sicura:

    • Chiave API: la tua chiave API di 6 caratteri
    • API secret: il tuo API secret di 32 caratteri
    • ID account: il tuo ID account (visualizzato nel portale o fornito dal tuo rappresentante Digital Shadows)
    • URL di base dell'API: https://api.searchlight.app/v1 o https://portal-digitalshadows.com/api/v1 (a seconda della regione del tenant)

Configura il bucket AWS S3 e IAM per Google SecOps

  1. Crea un bucket Amazon S3 seguendo questa guida utente: Creazione di un bucket.
  2. Salva il nome e la regione del bucket per riferimento futuro (ad esempio, digital-shadows-logs).
  3. Crea un utente seguendo questa guida utente: creazione di un utente IAM.
  4. Seleziona l'utente creato.
  5. Seleziona la scheda Credenziali di sicurezza.
  6. Fai clic su Crea chiave di accesso nella sezione Chiavi di accesso.
  7. Seleziona Servizio di terze parti come Caso d'uso.
  8. Fai clic su Avanti.
  9. (Facoltativo) Aggiungi un tag di descrizione.
  10. Fai clic su Crea chiave di accesso.
  11. Fai clic su Scarica file .csv per salvare la chiave di accesso e la chiave di accesso segreta per riferimento futuro.
  12. Fai clic su Fine.
  13. Seleziona la scheda Autorizzazioni.
  14. Fai clic su Aggiungi autorizzazioni nella sezione Norme relative alle autorizzazioni.
  15. Seleziona Aggiungi autorizzazioni.
  16. Seleziona Allega direttamente i criteri.
  17. Cerca il criterio AmazonS3FullAccess.
  18. Seleziona la policy.
  19. Fai clic su Avanti.
  20. Fai clic su Aggiungi autorizzazioni.

Configura il ruolo e il criterio IAM per i caricamenti S3

  1. Nella console AWS, vai a IAM > Policy > Crea policy > scheda JSON.
  2. Copia e incolla il criterio riportato di seguito.
  3. JSON della policy (sostituisci digital-shadows-logs se hai inserito un nome del bucket diverso):

    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "AllowPutObjects",
                "Effect": "Allow",
                "Action": "s3:PutObject",
                "Resource": "arn:aws:s3:::digital-shadows-logs/*"
            },
            {
                "Sid": "AllowGetStateObject",
                "Effect": "Allow",
                "Action": "s3:GetObject",
                "Resource": "arn:aws:s3:::digital-shadows-logs/digital-shadows/state.json"
            }
        ]
    }
    
  4. Fai clic su Avanti > Crea policy.

  5. Vai a IAM > Ruoli > Crea ruolo > Servizio AWS > Lambda.

  6. Allega la policy appena creata.

  7. Assegna al ruolo il nome DigitalShadowsLambdaRole e fai clic su Crea ruolo.

Crea la funzione Lambda

  1. Nella console AWS, vai a Lambda > Funzioni > Crea funzione.
  2. Fai clic su Crea autore da zero.
  3. Fornisci i seguenti dettagli di configurazione:

    Impostazione Valore
    Nome DigitalShadowsCollector
    Tempo di esecuzione Python 3.13
    Architettura x86_64
    Ruolo di esecuzione DigitalShadowsLambdaRole
  4. Dopo aver creato la funzione, apri la scheda Codice, elimina lo stub e incolla il codice riportato di seguito (DigitalShadowsCollector.py).

    import urllib3
    import json
    import boto3
    import os
    import base64
    import logging
    import time
    from datetime import datetime, timedelta, timezone
    from urllib.parse import urlencode
    
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    
    HTTP = urllib3.PoolManager(retries=False)
    storage_client = boto3.client('s3')
    
    def _basic_auth_header(key: str, secret: str) -> str:
        token = base64.b64encode(f"{key}:{secret}".encode("utf-8")).decode("utf-8")
        return f"Basic {token}"
    
    def _load_state(bucket, key, default_days=30) -> str:
        """Return ISO8601 checkpoint (UTC)."""
        try:
            response = storage_client.get_object(Bucket=bucket, Key=key)
            state_data = response['Body'].read().decode('utf-8')
            state = json.loads(state_data)
            ts = state.get("last_timestamp")
            if ts:
                return ts
        except storage_client.exceptions.NoSuchKey:
            logger.info("No previous state found, starting from default lookback")
        except Exception as e:
            logger.warning(f"State read error: {e}")
        return (datetime.now(timezone.utc) - timedelta(days=default_days)).isoformat()
    
    def _save_state(bucket, key, ts: str) -> None:
        storage_client.put_object(
            Bucket=bucket,
            Key=key,
            Body=json.dumps({"last_timestamp": ts}),
            ContentType="application/json"
        )
    
    def _get_json(url: str, headers: dict, params: dict, backoff_s=2, max_retries=3) -> dict:
        qs = f"?{urlencode(params)}" if params else ""
        for attempt in range(max_retries):
            r = HTTP.request("GET", f"{url}{qs}", headers=headers)
            if r.status == 200:
                return json.loads(r.data.decode("utf-8"))
            if r.status in (429, 500, 502, 503, 504):
                wait = backoff_s * (2 ** attempt)
                logger.warning(f"HTTP {r.status} from DS API, retrying in {wait}s")
                time.sleep(wait)
                continue
            raise RuntimeError(f"DS API error {r.status}: {r.data[:200]}")
        raise RuntimeError("Exceeded retry budget for DS API")
    
    def _collect(api_base, headers, path, since_ts, account_id, page_size, max_pages, time_param):
        items = []
        for page in range(max_pages):
            params = {
                "limit": page_size,
                "offset": page * page_size,
                time_param: since_ts,
            }
            if account_id:
                params["account-id"] = account_id
            data = _get_json(f"{api_base}/{path}", headers, params)
            batch = data.get("items") or data.get("data") or []
            if not batch:
                break
            items.extend(batch)
            if len(batch) < page_size:
                break
        return items
    
    def lambda_handler(event, context):
        bucket_name = os.environ["S3_BUCKET"]
        api_key = os.environ["DS_API_KEY"]
        api_secret = os.environ["DS_API_SECRET"]
        prefix = os.environ.get("S3_PREFIX", "digital-shadows")
        state_key = os.environ.get("STATE_KEY", "digital-shadows/state.json")
        api_base = os.environ.get("API_BASE", "https://api.searchlight.app/v1")
        account_id = os.environ.get("DS_ACCOUNT_ID", "")
        page_size = int(os.environ.get("PAGE_SIZE", "100"))
        max_pages = int(os.environ.get("MAX_PAGES", "10"))
    
        try:
            last_ts = _load_state(bucket_name, state_key)
            logger.info(f"Checkpoint: {last_ts}")
    
            headers = {
                "Authorization": _basic_auth_header(api_key, api_secret),
                "Accept": "application/json",
                "User-Agent": "Chronicle-DigitalShadows-S3/1.0",
            }
    
            records = []
    
            incidents = _collect(
                api_base, headers, "incidents", last_ts, account_id,
                page_size, max_pages, time_param="published-after"
            )
            for incident in incidents:
                incident['_source_type'] = 'incident'
            records.extend(incidents)
    
            intel_incidents = _collect(
                api_base, headers, "intel-incidents", last_ts, account_id,
                page_size, max_pages, time_param="published-after"
            )
            for intel in intel_incidents:
                intel['_source_type'] = 'intelligence_incident'
            records.extend(intel_incidents)
    
            indicators = _collect(
                api_base, headers, "indicators", last_ts, account_id,
                page_size, max_pages, time_param="lastUpdated-after"
            )
            for indicator in indicators:
                indicator['_source_type'] = 'ioc'
            records.extend(indicators)
    
            if records:
                newest = max(
                    (r.get("updated") or r.get("raised") or r.get("lastUpdated") or last_ts)
                    for r in records
                )
                key = f"{prefix}/digital_shadows_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}.json"
                body = "\n".join(json.dumps(r, separators=(",", ":")) for r in records)
                storage_client.put_object(
                    Bucket=bucket_name,
                    Key=key,
                    Body=body,
                    ContentType="application/x-ndjson"
                )
                _save_state(bucket_name, state_key, newest)
                msg = f"Wrote {len(records)} records to s3://{bucket_name}/{key}"
            else:
                msg = "No new records"
    
            logger.info(msg)
            return {'statusCode': 200, 'body': json.dumps({'message': msg, 'records': len(records) if records else 0})}
    
        except Exception as e:
            logger.error(f"Error processing logs: {str(e)}")
            raise
    
  5. Vai a Configurazione > Variabili di ambiente > Modifica > Aggiungi nuova variabile di ambiente.

  6. Inserisci le variabili di ambiente fornite di seguito, sostituendole con i tuoi valori.

    Variabili di ambiente

    Chiave Valore di esempio
    S3_BUCKET digital-shadows-logs
    S3_PREFIX digital-shadows/
    STATE_KEY digital-shadows/state.json
    DS_API_KEY ABC123 (la tua chiave API di 6 caratteri)
    DS_API_SECRET your-32-character-api-secret
    API_BASE https://api.searchlight.app/v1
    DS_ACCOUNT_ID your-account-id
    PAGE_SIZE 100
    MAX_PAGES 10
  7. Dopo aver creato la funzione, rimani sulla relativa pagina (o apri Lambda > Funzioni > DigitalShadowsCollector).

  8. Seleziona la scheda Configurazione.

  9. Nel riquadro Configurazione generale, fai clic su Modifica.

  10. Modifica Timeout impostando 5 minuti (300 secondi) e fai clic su Salva.

Creare una pianificazione EventBridge

  1. Vai a Amazon EventBridge > Scheduler > Crea pianificazione.
  2. Fornisci i seguenti dettagli di configurazione:
    • Programma ricorrente: Tasso (1 hour)
    • Target: la tua funzione Lambda DigitalShadowsCollector
    • Nome: DigitalShadowsCollector-1h
  3. Fai clic su Crea pianificazione.

Configura un feed in Google SecOps per importare i log degli indicatori di Digital Shadows

  1. Vai a Impostazioni SIEM > Feed.
  2. Fai clic su Aggiungi nuovo feed.
  3. Nella pagina successiva, fai clic su Configura un singolo feed.
  4. Inserisci un nome univoco per il nome del feed.
  5. Seleziona Amazon S3 V2 come Tipo di origine.
  6. Seleziona Indicatori di tracce digitali come Tipo di log.
  7. Fai clic su Avanti e poi su Invia.
  8. Specifica i valori per i seguenti campi:

    • URI S3: s3://digital-shadows-logs/digital-shadows/
    • Opzione di eliminazione dell'origine: seleziona l'opzione di eliminazione in base alle tue preferenze.
    • Età massima del file: includi i file modificati nell'ultimo numero di giorni (il valore predefinito è 180 giorni)
    • ID chiave di accesso: chiave di accesso utente con accesso al bucket S3
    • Chiave di accesso segreta: chiave segreta dell'utente con accesso al bucket S3
    • Spazio dei nomi dell'asset: lo spazio dei nomi dell'asset
    • Etichette di importazione: l'etichetta da applicare agli eventi di questo feed
  9. Fai clic su Avanti e poi su Invia.

Tabella di mappatura UDM

Campo log Mappatura UDM Logic
valore entity.entity.file.md5 Imposta se type == "MD5"
valore entity.entity.file.sha1 Imposta se type == "SHA1"
valore entity.entity.file.sha256 Imposta se type == "SHA256"
valore entity.entity.hostname Imposta se type == "HOST"
valore entity.entity.ip Valore copiato direttamente se type == "IP"
valore entity.entity.url Set if type == "URL"
valore entity.entity.user.email_addresses Valore copiato direttamente se type == "EMAIL"
tipo entity.metadata.entity_type Impostato su "DOMAIN_NAME" se type == "HOST", "IP_ADDRESS" se type == "IP", "URL" se type == "URL", "USER" se type == "EMAIL", "FILE" se type in ["SHA1","SHA256","MD5"], altrimenti "UNKNOWN_ENTITYTYPE"
lastUpdated entity.metadata.interval.start_time Convertito da ISO8601 a timestamp se non è vuoto
id entity.metadata.product_entity_id Valore copiato direttamente se non è vuoto
attributionTag.id, attributionTag.name, attributionTag.type entity.metadata.threat.about.labels Unito agli oggetti {key: tag field name, value: tag value} se non è vuoto
sourceType entity.metadata.threat.category_details Valore copiato direttamente
entity.metadata.threat.threat_feed_name Imposta su "Indicatori"
id entity.metadata.threat.threat_id Valore copiato direttamente se non è vuoto
sourceIdentifier entity.metadata.threat.url_back_to_product Valore copiato direttamente
entity.metadata.product_name Imposta su "Indicatori"
entity.metadata.vendor_name Impostato su "Ombre digitali"

Hai bisogno di ulteriore assistenza? Ricevi risposte dai membri della community e dai professionisti di Google SecOps.