Collecter les journaux Digital Shadows SearchLight
Ce document explique comment ingérer des journaux Digital Shadows SearchLight dans Google Security Operations à l'aide d'Amazon S3. L'analyseur extrait les données d'événement de sécurité des journaux JSON. Il initialise les champs UDM (Unified Data Model), analyse la charge utile JSON, mappe les champs pertinents au schéma UDM, extrait les entités telles que l'adresse e-mail et le nom d'hôte à l'aide de modèles grok, et construit les objets security_result
et metadata
dans l'événement UDM.
Avant de commencer
Assurez-vous de remplir les conditions suivantes :
- Une instance Google SecOps.
- Accès privilégié au locataire Digital Shadows SearchLight.
- Accès privilégié à AWS (S3, Identity and Access Management (IAM), Lambda, EventBridge).
Collecter les conditions préalables de Digital Shadows SearchLight (ID, clés API, ID d'organisation, jetons)
- Connectez-vous au portail Digital Shadows SearchLight.
- Accédez à Paramètres > Identifiants de l'API.
- Créez un client API ou une paire de clés.
- Copiez et enregistrez les informations suivantes dans un emplacement sécurisé :
- Clé API
- SID de clé API (Code secret de l'API)
- Numéro de compte
- URL de base de l'API :
https://api.searchlight.app/v1
ouhttps://portal-digitalshadows.com/api/v1
Configurer un bucket AWS S3 et IAM pour Google SecOps
- Créez un bucket Amazon S3 en suivant ce guide de l'utilisateur : Créer un bucket.
- Enregistrez le Nom et la Région du bucket pour référence ultérieure (par exemple,
digital-shadows-logs
). - Créez un utilisateur en suivant ce guide de l'utilisateur : Créer un utilisateur IAM.
- Sélectionnez l'utilisateur créé.
- Sélectionnez l'onglet Informations d'identification de sécurité.
- Cliquez sur Créer une clé d'accès dans la section Clés d'accès.
- Sélectionnez Service tiers comme Cas d'utilisation.
- Cliquez sur Suivant.
- Facultatif : Ajoutez un tag de description.
- Cliquez sur Créer une clé d'accès.
- Cliquez sur Télécharger le fichier CSV pour enregistrer la clé d'accès et la clé d'accès secrète pour référence ultérieure.
- Cliquez sur OK.
- Sélectionnez l'onglet Autorisations.
- Cliquez sur Ajouter des autorisations dans la section Règles relatives aux autorisations.
- Sélectionnez Ajouter des autorisations.
- Sélectionnez Joindre directement des règles.
- Recherchez la règle AmazonS3FullAccess.
- Sélectionnez la règle.
- Cliquez sur Suivant.
- Cliquez sur Ajouter des autorisations.
Configurer la stratégie et le rôle IAM pour les importations S3
- Dans la console AWS, accédez à IAM > Stratégies.
- Cliquez sur Créer une règle > onglet JSON.
- Copiez et collez le règlement suivant.
JSON de la règle (remplacez
digital-shadows-logs
si vous avez saisi un autre nom de bucket) :{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::digital-shadows-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::digital-shadows-logs/digital-shadows-searchlight/state.json" } ] }
Cliquez sur Suivant > Créer une règle.
Accédez à IAM > Rôles > Créer un rôle > Service AWS > Lambda.
Associez la règle que vous venez de créer.
Nommez le rôle
digital-shadows-lambda-role
, puis cliquez sur Créer un rôle.
Créer la fonction Lambda
- Dans la console AWS, accédez à Lambda > Fonctions > Créer une fonction.
- Cliquez sur Créer à partir de zéro.
Fournissez les informations de configuration suivantes :
Paramètre Valeur Nom digital-shadows-collector
Durée d'exécution Python 3.13 Architecture x86_64 Rôle d'exécution digital-shadows-lambda-role
Une fois la fonction créée, ouvrez l'onglet Code, supprimez le stub et collez le code suivant (
digital-shadows-collector.py
).import json import os import base64 import logging import time from datetime import datetime, timedelta, timezone from urllib.parse import urlencode import boto3 import urllib3 logger = logging.getLogger() logger.setLevel(logging.INFO) HTTP = urllib3.PoolManager(retries=False) def _basic_auth_header(key: str, secret: str) -> str: token = base64.b64encode(f"{key}:{secret}".encode("utf-8")).decode("utf-8") return f"Basic {token}" def _load_state(s3, bucket, key, default_days=30) -> str: """Return ISO8601 checkpoint (UTC).""" try: obj = s3.get_object(Bucket=bucket, Key=key) state = json.loads(obj["Body"].read().decode("utf-8")) ts = state.get("last_timestamp") if ts: return ts except s3.exceptions.NoSuchKey: pass except Exception as e: logger.warning(f"State read error: {e}") return (datetime.now(timezone.utc) - timedelta(days=default_days)).isoformat() def _save_state(s3, bucket, key, ts: str) -> None: s3.put_object( Bucket=bucket, Key=key, Body=json.dumps({"last_timestamp": ts}).encode("utf-8"), ContentType="application/json", ) def _get_json(url: str, headers: dict, params: dict, backoff_s=2, max_retries=3) -> dict: qs = f"?{urlencode(params)}" if params else "" for attempt in range(max_retries): r = HTTP.request("GET", f"{url}{qs}", headers=headers) if r.status == 200: return json.loads(r.data.decode("utf-8")) if r.status in (429, 500, 502, 503, 504): wait = backoff_s * (2 ** attempt) logger.warning(f"HTTP {r.status} from DS API, retrying in {wait}s") time.sleep(wait) continue raise RuntimeError(f"DS API error {r.status}: {r.data[:200]}") raise RuntimeError("Exceeded retry budget for DS API") def _collect(api_base, headers, path, since_ts, account_id, page_size, max_pages, time_param): items = [] for page in range(max_pages): params = { "limit": page_size, "offset": page * page_size, time_param: since_ts, } if account_id: params["account-id"] = account_id data = _get_json(f"{api_base}/{path}", headers, params) batch = data.get("items") or data.get("data") or [] if not batch: break items.extend(batch) if len(batch) < page_size: break return items def lambda_handler(event, context): # Required s3_bucket = os.environ["S3_BUCKET"] api_key = os.environ["DS_API_KEY"] api_secret = os.environ["DS_API_SECRET"] # Optional / defaults s3_prefix = os.environ.get("S3_PREFIX", "digital-shadows-searchlight/") state_key = os.environ.get("STATE_KEY", "digital-shadows-searchlight/state.json") api_base = os.environ.get("API_BASE", "https://api.searchlight.app/v1") account_id = os.environ.get("DS_ACCOUNT_ID", "") page_size = int(os.environ.get("PAGE_SIZE", "100")) max_pages = int(os.environ.get("MAX_PAGES", "10")) s3 = boto3.client("s3") last_ts = _load_state(s3, s3_bucket, state_key) logger.info(f"Checkpoint: {last_ts}") headers = { "Authorization": _basic_auth_header(api_key, api_secret), "Accept": "application/json", "User-Agent": "Chronicle-DigitalShadows-S3/1.0", } records = [] # Incidents (time filter often 'published-after' or 'updated-since' depending on tenancy) incidents = _collect(api_base, headers, "incidents", last_ts, account_id, page_size, max_pages, time_param="published-after") for incident in incidents: incident['_source_type'] = 'incident' records.extend(incidents) # Intelligence incidents (alerts) intel_incidents = _collect(api_base, headers, "intel-incidents", last_ts, account_id, page_size, max_pages, time_param="published-after") for intel in intel_incidents: intel['_source_type'] = 'intelligence_incident' records.extend(intel_incidents) # Indicators (IOCs) indicators = _collect(api_base, headers, "indicators", last_ts, account_id, page_size, max_pages, time_param="lastUpdated-after") for indicator in indicators: indicator['_source_type'] = 'ioc' records.extend(indicators) if records: # Choose newest timestamp seen in this batch newest = max( (r.get("updated") or r.get("raised") or r.get("lastUpdated") or last_ts) for r in records ) key = f"{s3_prefix}digital_shadows_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}.json" body = "\n".join(json.dumps(r, separators=(",", ":")) for r in records).encode("utf-8") s3.put_object( Bucket=s3_bucket, Key=key, Body=body, ContentType="application/x-ndjson", ) _save_state(s3, s3_bucket, state_key, newest) msg = f"Wrote {len(records)} records to s3://{s3_bucket}/{key}" else: msg = "No new records" logger.info(msg) return {"statusCode": 200, "body": msg}
Accédez à Configuration > Variables d'environnement.
Cliquez sur Modifier > Ajouter une variable d'environnement.
Saisissez les variables d'environnement fournies dans le tableau suivant, en remplaçant les exemples de valeurs par les vôtres.
Variables d'environnement
Clé Exemple de valeur S3_BUCKET
digital-shadows-logs
S3_PREFIX
digital-shadows-searchlight/
STATE_KEY
digital-shadows-searchlight/state.json
DS_API_KEY
<your-6-character-api-key>
DS_API_SECRET
<your-32-character-api-secret>
API_BASE
https://api.searchlight.app/v1
(ouhttps://portal-digitalshadows.com/api/v1
)DS_ACCOUNT_ID
<your-account-id>
(obligatoire pour la plupart des locataires)PAGE_SIZE
100
MAX_PAGES
10
Une fois la fonction créée, restez sur sa page (ou ouvrez Lambda > Fonctions > votre-fonction).
Accédez à l'onglet Configuration.
Dans le panneau Configuration générale, cliquez sur Modifier.
Définissez le délai avant expiration sur 5 minutes (300 secondes), puis cliquez sur Enregistrer.
Créer une programmation EventBridge
- Accédez à Amazon EventBridge> Scheduler> Create schedule.
- Fournissez les informations de configuration suivantes :
- Planning récurrent : Tarif (
1 hour
). - Cible : votre fonction Lambda
digital-shadows-collector
. - Nom :
digital-shadows-collector-1h
.
- Planning récurrent : Tarif (
- Cliquez sur Créer la programmation.
(Facultatif) Créez un utilisateur et des clés IAM en lecture seule pour Google SecOps
- Accédez à Console AWS > IAM > Utilisateurs.
- Cliquez sur Add users (Ajouter des utilisateurs).
- Fournissez les informations de configuration suivantes :
- Utilisateur : saisissez
secops-reader
. - Type d'accès : sélectionnez Clé d'accès – Accès programmatique.
- Utilisateur : saisissez
- Cliquez sur Créer un utilisateur.
- Associez une stratégie de lecture minimale (personnalisée) : Utilisateurs > secops-reader > Autorisations > Ajouter des autorisations > Associer des stratégies directement > Créer une stratégie.
JSON :
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::digital-shadows-logs/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::digital-shadows-logs" } ] }
Nom =
secops-reader-policy
.Cliquez sur Créer une règle> recherchez/sélectionnez > Suivant> Ajouter des autorisations.
Créez une clé d'accès pour
secops-reader
: Identifiants de sécurité > Clés d'accès.Cliquez sur Créer une clé d'accès.
Téléchargez le fichier
.CSV
. (Vous collerez ces valeurs dans le flux.)
Configurer un flux dans Google SecOps pour ingérer les journaux Digital Shadows SearchLight
- Accédez à Paramètres SIEM> Flux.
- Cliquez sur + Ajouter un flux.
- Dans le champ Nom du flux, saisissez un nom pour le flux (par exemple,
Digital Shadows SearchLight logs
). - Sélectionnez Amazon S3 V2 comme type de source.
- Sélectionnez Digital Shadows SearchLight comme Type de journal.
- Cliquez sur Suivant.
- Spécifiez les valeurs des paramètres d'entrée suivants :
- URI S3 :
s3://digital-shadows-logs/digital-shadows-searchlight/
- Options de suppression de la source : sélectionnez l'option de suppression de votre choix.
- Âge maximal des fichiers : incluez les fichiers modifiés au cours des derniers jours. La valeur par défaut est de 180 jours.
- ID de clé d'accès : clé d'accès utilisateur ayant accès au bucket S3.
- Clé d'accès secrète : clé secrète de l'utilisateur ayant accès au bucket S3.
- Espace de noms de l'élément : espace de noms de l'élément.
- Libellés d'ingestion : libellé appliqué aux événements de ce flux.
- URI S3 :
- Cliquez sur Suivant.
- Vérifiez la configuration de votre nouveau flux sur l'écran Finaliser, puis cliquez sur Envoyer.
Vous avez encore besoin d'aide ? Obtenez des réponses de membres de la communauté et de professionnels Google SecOps.