Raccogliere i log degli indicatori di Digital Shadows
Questo documento spiega come importare i log degli indicatori di Digital Shadows in Google Security Operations utilizzando Amazon S3.
Digital Shadows Indicators (ora parte di ReliaQuest GreyMatter DRP) è una piattaforma di protezione dal rischio digitale che monitora e rileva continuamente minacce esterne, esposizioni di dati e furti di identità dei brand su open web, deep web, dark web e social media. Fornisce threat intelligence, avvisi relativi agli incidenti e indicatori di compromissione (IOC) per aiutare le organizzazioni a identificare e mitigare i rischi digitali.
Prima di iniziare
- Un'istanza Google SecOps
- Accesso con privilegi al portale Digital Shadows Indicators
- Accesso privilegiato ad AWS (S3, IAM)
- Abbonamento attivo a Digital Shadows Indicators con accesso API abilitato
Raccogliere le credenziali API di Digital Shadows Indicators
- Accedi al portale degli indicatori di Digital Shadows all'indirizzo
https://portal-digitalshadows.com. - Vai a Impostazioni > Credenziali API.
- Se non hai una chiave API esistente, fai clic su Crea nuovo client API o Genera chiave API.
Copia e salva i seguenti dettagli in una posizione sicura:
- Chiave API: la tua chiave API di 6 caratteri
- API secret: il tuo API secret di 32 caratteri
- ID account: il tuo ID account (visualizzato nel portale o fornito dal tuo rappresentante Digital Shadows)
- URL di base dell'API:
https://api.searchlight.app/v1ohttps://portal-digitalshadows.com/api/v1(a seconda della regione del tenant)
Configura il bucket AWS S3 e IAM per Google SecOps
- Crea un bucket Amazon S3 seguendo questa guida utente: Creazione di un bucket.
- Salva il nome e la regione del bucket per riferimento futuro (ad esempio,
digital-shadows-logs). - Crea un utente seguendo questa guida utente: creazione di un utente IAM.
- Seleziona l'utente creato.
- Seleziona la scheda Credenziali di sicurezza.
- Fai clic su Crea chiave di accesso nella sezione Chiavi di accesso.
- Seleziona Servizio di terze parti come Caso d'uso.
- Fai clic su Avanti.
- (Facoltativo) Aggiungi un tag di descrizione.
- Fai clic su Crea chiave di accesso.
- Fai clic su Scarica file .csv per salvare la chiave di accesso e la chiave di accesso segreta per riferimento futuro.
- Fai clic su Fine.
- Seleziona la scheda Autorizzazioni.
- Fai clic su Aggiungi autorizzazioni nella sezione Norme relative alle autorizzazioni.
- Seleziona Aggiungi autorizzazioni.
- Seleziona Allega direttamente i criteri.
- Cerca il criterio AmazonS3FullAccess.
- Seleziona la policy.
- Fai clic su Avanti.
- Fai clic su Aggiungi autorizzazioni.
Configura il ruolo e il criterio IAM per i caricamenti S3
- Nella console AWS, vai a IAM > Policy > Crea policy > scheda JSON.
- Copia e incolla il criterio riportato di seguito.
JSON della policy (sostituisci
digital-shadows-logsse hai inserito un nome del bucket diverso):{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::digital-shadows-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::digital-shadows-logs/digital-shadows/state.json" } ] }Fai clic su Avanti > Crea policy.
Vai a IAM > Ruoli > Crea ruolo > Servizio AWS > Lambda.
Allega la policy appena creata.
Assegna al ruolo il nome
DigitalShadowsLambdaRolee fai clic su Crea ruolo.
Crea la funzione Lambda
- Nella console AWS, vai a Lambda > Funzioni > Crea funzione.
- Fai clic su Crea autore da zero.
Fornisci i seguenti dettagli di configurazione:
Impostazione Valore Nome DigitalShadowsCollectorTempo di esecuzione Python 3.13 Architettura x86_64 Ruolo di esecuzione DigitalShadowsLambdaRoleDopo aver creato la funzione, apri la scheda Codice, elimina lo stub e incolla il codice riportato di seguito (DigitalShadowsCollector.py).
import urllib3 import json import boto3 import os import base64 import logging import time from datetime import datetime, timedelta, timezone from urllib.parse import urlencode logger = logging.getLogger() logger.setLevel(logging.INFO) HTTP = urllib3.PoolManager(retries=False) storage_client = boto3.client('s3') def _basic_auth_header(key: str, secret: str) -> str: token = base64.b64encode(f"{key}:{secret}".encode("utf-8")).decode("utf-8") return f"Basic {token}" def _load_state(bucket, key, default_days=30) -> str: """Return ISO8601 checkpoint (UTC).""" try: response = storage_client.get_object(Bucket=bucket, Key=key) state_data = response['Body'].read().decode('utf-8') state = json.loads(state_data) ts = state.get("last_timestamp") if ts: return ts except storage_client.exceptions.NoSuchKey: logger.info("No previous state found, starting from default lookback") except Exception as e: logger.warning(f"State read error: {e}") return (datetime.now(timezone.utc) - timedelta(days=default_days)).isoformat() def _save_state(bucket, key, ts: str) -> None: storage_client.put_object( Bucket=bucket, Key=key, Body=json.dumps({"last_timestamp": ts}), ContentType="application/json" ) def _get_json(url: str, headers: dict, params: dict, backoff_s=2, max_retries=3) -> dict: qs = f"?{urlencode(params)}" if params else "" for attempt in range(max_retries): r = HTTP.request("GET", f"{url}{qs}", headers=headers) if r.status == 200: return json.loads(r.data.decode("utf-8")) if r.status in (429, 500, 502, 503, 504): wait = backoff_s * (2 ** attempt) logger.warning(f"HTTP {r.status} from DS API, retrying in {wait}s") time.sleep(wait) continue raise RuntimeError(f"DS API error {r.status}: {r.data[:200]}") raise RuntimeError("Exceeded retry budget for DS API") def _collect(api_base, headers, path, since_ts, account_id, page_size, max_pages, time_param): items = [] for page in range(max_pages): params = { "limit": page_size, "offset": page * page_size, time_param: since_ts, } if account_id: params["account-id"] = account_id data = _get_json(f"{api_base}/{path}", headers, params) batch = data.get("items") or data.get("data") or [] if not batch: break items.extend(batch) if len(batch) < page_size: break return items def lambda_handler(event, context): bucket_name = os.environ["S3_BUCKET"] api_key = os.environ["DS_API_KEY"] api_secret = os.environ["DS_API_SECRET"] prefix = os.environ.get("S3_PREFIX", "digital-shadows") state_key = os.environ.get("STATE_KEY", "digital-shadows/state.json") api_base = os.environ.get("API_BASE", "https://api.searchlight.app/v1") account_id = os.environ.get("DS_ACCOUNT_ID", "") page_size = int(os.environ.get("PAGE_SIZE", "100")) max_pages = int(os.environ.get("MAX_PAGES", "10")) try: last_ts = _load_state(bucket_name, state_key) logger.info(f"Checkpoint: {last_ts}") headers = { "Authorization": _basic_auth_header(api_key, api_secret), "Accept": "application/json", "User-Agent": "Chronicle-DigitalShadows-S3/1.0", } records = [] incidents = _collect( api_base, headers, "incidents", last_ts, account_id, page_size, max_pages, time_param="published-after" ) for incident in incidents: incident['_source_type'] = 'incident' records.extend(incidents) intel_incidents = _collect( api_base, headers, "intel-incidents", last_ts, account_id, page_size, max_pages, time_param="published-after" ) for intel in intel_incidents: intel['_source_type'] = 'intelligence_incident' records.extend(intel_incidents) indicators = _collect( api_base, headers, "indicators", last_ts, account_id, page_size, max_pages, time_param="lastUpdated-after" ) for indicator in indicators: indicator['_source_type'] = 'ioc' records.extend(indicators) if records: newest = max( (r.get("updated") or r.get("raised") or r.get("lastUpdated") or last_ts) for r in records ) key = f"{prefix}/digital_shadows_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}.json" body = "\n".join(json.dumps(r, separators=(",", ":")) for r in records) storage_client.put_object( Bucket=bucket_name, Key=key, Body=body, ContentType="application/x-ndjson" ) _save_state(bucket_name, state_key, newest) msg = f"Wrote {len(records)} records to s3://{bucket_name}/{key}" else: msg = "No new records" logger.info(msg) return {'statusCode': 200, 'body': json.dumps({'message': msg, 'records': len(records) if records else 0})} except Exception as e: logger.error(f"Error processing logs: {str(e)}") raiseVai a Configurazione > Variabili di ambiente > Modifica > Aggiungi nuova variabile di ambiente.
Inserisci le variabili di ambiente fornite di seguito, sostituendole con i tuoi valori.
Variabili di ambiente
Chiave Valore di esempio S3_BUCKETdigital-shadows-logsS3_PREFIXdigital-shadows/STATE_KEYdigital-shadows/state.jsonDS_API_KEYABC123(la tua chiave API di 6 caratteri)DS_API_SECRETyour-32-character-api-secretAPI_BASEhttps://api.searchlight.app/v1DS_ACCOUNT_IDyour-account-idPAGE_SIZE100MAX_PAGES10Dopo aver creato la funzione, rimani sulla relativa pagina (o apri Lambda > Funzioni > DigitalShadowsCollector).
Seleziona la scheda Configurazione.
Nel riquadro Configurazione generale, fai clic su Modifica.
Modifica Timeout impostando 5 minuti (300 secondi) e fai clic su Salva.
Creare una pianificazione EventBridge
- Vai a Amazon EventBridge > Scheduler > Crea pianificazione.
- Fornisci i seguenti dettagli di configurazione:
- Programma ricorrente: Tasso (
1 hour) - Target: la tua funzione Lambda
DigitalShadowsCollector - Nome:
DigitalShadowsCollector-1h
- Programma ricorrente: Tasso (
- Fai clic su Crea pianificazione.
Configura un feed in Google SecOps per importare i log degli indicatori di Digital Shadows
- Vai a Impostazioni SIEM > Feed.
- Fai clic su Aggiungi nuovo feed.
- Nella pagina successiva, fai clic su Configura un singolo feed.
- Inserisci un nome univoco per il nome del feed.
- Seleziona Amazon S3 V2 come Tipo di origine.
- Seleziona Indicatori di tracce digitali come Tipo di log.
- Fai clic su Avanti e poi su Invia.
Specifica i valori per i seguenti campi:
- URI S3:
s3://digital-shadows-logs/digital-shadows/ - Opzione di eliminazione dell'origine: seleziona l'opzione di eliminazione in base alle tue preferenze.
- Età massima del file: includi i file modificati nell'ultimo numero di giorni (il valore predefinito è 180 giorni)
- ID chiave di accesso: chiave di accesso utente con accesso al bucket S3
- Chiave di accesso segreta: chiave segreta dell'utente con accesso al bucket S3
- Spazio dei nomi dell'asset: lo spazio dei nomi dell'asset
- Etichette di importazione: l'etichetta da applicare agli eventi di questo feed
- URI S3:
Fai clic su Avanti e poi su Invia.
Tabella di mappatura UDM
| Campo log | Mappatura UDM | Logic |
|---|---|---|
| valore | entity.entity.file.md5 | Imposta se type == "MD5" |
| valore | entity.entity.file.sha1 | Imposta se type == "SHA1" |
| valore | entity.entity.file.sha256 | Imposta se type == "SHA256" |
| valore | entity.entity.hostname | Imposta se type == "HOST" |
| valore | entity.entity.ip | Valore copiato direttamente se type == "IP" |
| valore | entity.entity.url | Set if type == "URL" |
| valore | entity.entity.user.email_addresses | Valore copiato direttamente se type == "EMAIL" |
| tipo | entity.metadata.entity_type | Impostato su "DOMAIN_NAME" se type == "HOST", "IP_ADDRESS" se type == "IP", "URL" se type == "URL", "USER" se type == "EMAIL", "FILE" se type in ["SHA1","SHA256","MD5"], altrimenti "UNKNOWN_ENTITYTYPE" |
| lastUpdated | entity.metadata.interval.start_time | Convertito da ISO8601 a timestamp se non è vuoto |
| id | entity.metadata.product_entity_id | Valore copiato direttamente se non è vuoto |
| attributionTag.id, attributionTag.name, attributionTag.type | entity.metadata.threat.about.labels | Unito agli oggetti {key: tag field name, value: tag value} se non è vuoto |
| sourceType | entity.metadata.threat.category_details | Valore copiato direttamente |
| entity.metadata.threat.threat_feed_name | Imposta su "Indicatori" | |
| id | entity.metadata.threat.threat_id | Valore copiato direttamente se non è vuoto |
| sourceIdentifier | entity.metadata.threat.url_back_to_product | Valore copiato direttamente |
| entity.metadata.product_name | Imposta su "Indicatori" | |
| entity.metadata.vendor_name | Impostato su "Ombre digitali" |
Hai bisogno di ulteriore assistenza? Ricevi risposte dai membri della community e dai professionisti di Google SecOps.