Collecter les journaux des indicateurs Digital Shadows
Ce document explique comment ingérer des journaux d'indicateurs Digital Shadows dans Google Security Operations à l'aide d'Amazon S3.
Digital Shadows Indicators (qui fait désormais partie de ReliaQuest GreyMatter DRP) est une plate-forme de protection contre les risques numériques qui surveille et détecte en continu les menaces externes, les expositions de données et les usurpations d'identité de marque sur le Web ouvert, le deep Web, le dark Web et les réseaux sociaux. Il fournit des renseignements sur les menaces, des alertes d'incident et des indicateurs de compromission (IOC) pour aider les organisations à identifier et à atténuer les risques numériques.
Avant de commencer
- Une instance Google SecOps
- Accès privilégié au portail Digital Shadows Indicators
- Accès privilégié à AWS (S3, IAM)
- Abonnement actif à Digital Shadows Indicators avec accès à l'API activé
Collecter les identifiants de l'API Digital Shadows Indicators
- Connectez-vous au portail Digital Shadows Indicators à l'adresse
https://portal-digitalshadows.com. - Accédez à Paramètres > Identifiants de l'API.
- Si vous ne disposez pas encore d'une clé API, cliquez sur Créer un client API ou Générer une clé API.
Copiez et enregistrez les informations suivantes dans un emplacement sécurisé :
- Clé API : votre clé API de six caractères
- Code secret de l'API : code secret de l'API de 32 caractères
- ID de compte : votre ID de compte (affiché dans le portail ou fourni par votre représentant Digital Shadows)
- URL de base de l'API :
https://api.searchlight.app/v1ouhttps://portal-digitalshadows.com/api/v1(selon la région de votre locataire)
Configurer un bucket AWS S3 et IAM pour Google SecOps
- Créez un bucket Amazon S3 en suivant ce guide de l'utilisateur : Créer un bucket.
- Enregistrez le Nom et la Région du bucket pour référence ultérieure (par exemple,
digital-shadows-logs). - Créez un utilisateur en suivant ce guide d'utilisation : Créer un utilisateur IAM.
- Sélectionnez l'utilisateur créé.
- Sélectionnez l'onglet Informations d'identification de sécurité.
- Cliquez sur Créer une clé d'accès dans la section Clés d'accès.
- Sélectionnez Service tiers comme Cas d'utilisation.
- Cliquez sur Suivant.
- Facultatif : Ajoutez un tag de description.
- Cliquez sur Créer une clé d'accès.
- Cliquez sur Download .csv file (Télécharger le fichier .csv) pour enregistrer la clé d'accès et la clé d'accès secrète pour référence ultérieure.
- Cliquez sur OK.
- Sélectionnez l'onglet Autorisations.
- Cliquez sur Ajouter des autorisations dans la section Règles d'autorisation.
- Sélectionnez Ajouter des autorisations.
- Sélectionnez Joindre directement des règles.
- Recherchez la règle AmazonS3FullAccess.
- Sélectionnez la règle.
- Cliquez sur Suivant.
- Cliquez sur Ajouter des autorisations.
Configurer la stratégie et le rôle IAM pour les importations S3
- Dans la console AWS, accédez à IAM > Policies > Create policy > JSON (IAM > Stratégies > Créer une stratégie > JSON).
- Copiez et collez le règlement ci-dessous.
JSON de la règle (remplacez
digital-shadows-logssi vous avez saisi un autre nom de bucket) :{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::digital-shadows-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::digital-shadows-logs/digital-shadows/state.json" } ] }Cliquez sur Suivant > Créer une règle.
Accédez à IAM > Rôles > Créer un rôle > Service AWS > Lambda.
Associez la règle que vous venez de créer.
Nommez le rôle
DigitalShadowsLambdaRole, puis cliquez sur Créer un rôle.
Créer la fonction Lambda
- Dans la console AWS, accédez à Lambda > Fonctions > Créer une fonction.
- Cliquez sur Créer à partir de zéro.
Fournissez les informations de configuration suivantes :
Paramètre Valeur Nom DigitalShadowsCollectorDurée d'exécution Python 3.13 Architecture x86_64 Rôle d'exécution DigitalShadowsLambdaRoleUne fois la fonction créée, ouvrez l'onglet Code, supprimez le stub et collez le code ci-dessous (DigitalShadowsCollector.py).
import urllib3 import json import boto3 import os import base64 import logging import time from datetime import datetime, timedelta, timezone from urllib.parse import urlencode logger = logging.getLogger() logger.setLevel(logging.INFO) HTTP = urllib3.PoolManager(retries=False) storage_client = boto3.client('s3') def _basic_auth_header(key: str, secret: str) -> str: token = base64.b64encode(f"{key}:{secret}".encode("utf-8")).decode("utf-8") return f"Basic {token}" def _load_state(bucket, key, default_days=30) -> str: """Return ISO8601 checkpoint (UTC).""" try: response = storage_client.get_object(Bucket=bucket, Key=key) state_data = response['Body'].read().decode('utf-8') state = json.loads(state_data) ts = state.get("last_timestamp") if ts: return ts except storage_client.exceptions.NoSuchKey: logger.info("No previous state found, starting from default lookback") except Exception as e: logger.warning(f"State read error: {e}") return (datetime.now(timezone.utc) - timedelta(days=default_days)).isoformat() def _save_state(bucket, key, ts: str) -> None: storage_client.put_object( Bucket=bucket, Key=key, Body=json.dumps({"last_timestamp": ts}), ContentType="application/json" ) def _get_json(url: str, headers: dict, params: dict, backoff_s=2, max_retries=3) -> dict: qs = f"?{urlencode(params)}" if params else "" for attempt in range(max_retries): r = HTTP.request("GET", f"{url}{qs}", headers=headers) if r.status == 200: return json.loads(r.data.decode("utf-8")) if r.status in (429, 500, 502, 503, 504): wait = backoff_s * (2 ** attempt) logger.warning(f"HTTP {r.status} from DS API, retrying in {wait}s") time.sleep(wait) continue raise RuntimeError(f"DS API error {r.status}: {r.data[:200]}") raise RuntimeError("Exceeded retry budget for DS API") def _collect(api_base, headers, path, since_ts, account_id, page_size, max_pages, time_param): items = [] for page in range(max_pages): params = { "limit": page_size, "offset": page * page_size, time_param: since_ts, } if account_id: params["account-id"] = account_id data = _get_json(f"{api_base}/{path}", headers, params) batch = data.get("items") or data.get("data") or [] if not batch: break items.extend(batch) if len(batch) < page_size: break return items def lambda_handler(event, context): bucket_name = os.environ["S3_BUCKET"] api_key = os.environ["DS_API_KEY"] api_secret = os.environ["DS_API_SECRET"] prefix = os.environ.get("S3_PREFIX", "digital-shadows") state_key = os.environ.get("STATE_KEY", "digital-shadows/state.json") api_base = os.environ.get("API_BASE", "https://api.searchlight.app/v1") account_id = os.environ.get("DS_ACCOUNT_ID", "") page_size = int(os.environ.get("PAGE_SIZE", "100")) max_pages = int(os.environ.get("MAX_PAGES", "10")) try: last_ts = _load_state(bucket_name, state_key) logger.info(f"Checkpoint: {last_ts}") headers = { "Authorization": _basic_auth_header(api_key, api_secret), "Accept": "application/json", "User-Agent": "Chronicle-DigitalShadows-S3/1.0", } records = [] incidents = _collect( api_base, headers, "incidents", last_ts, account_id, page_size, max_pages, time_param="published-after" ) for incident in incidents: incident['_source_type'] = 'incident' records.extend(incidents) intel_incidents = _collect( api_base, headers, "intel-incidents", last_ts, account_id, page_size, max_pages, time_param="published-after" ) for intel in intel_incidents: intel['_source_type'] = 'intelligence_incident' records.extend(intel_incidents) indicators = _collect( api_base, headers, "indicators", last_ts, account_id, page_size, max_pages, time_param="lastUpdated-after" ) for indicator in indicators: indicator['_source_type'] = 'ioc' records.extend(indicators) if records: newest = max( (r.get("updated") or r.get("raised") or r.get("lastUpdated") or last_ts) for r in records ) key = f"{prefix}/digital_shadows_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}.json" body = "\n".join(json.dumps(r, separators=(",", ":")) for r in records) storage_client.put_object( Bucket=bucket_name, Key=key, Body=body, ContentType="application/x-ndjson" ) _save_state(bucket_name, state_key, newest) msg = f"Wrote {len(records)} records to s3://{bucket_name}/{key}" else: msg = "No new records" logger.info(msg) return {'statusCode': 200, 'body': json.dumps({'message': msg, 'records': len(records) if records else 0})} except Exception as e: logger.error(f"Error processing logs: {str(e)}") raiseAccédez à Configuration> Variables d'environnement> Modifier> Ajouter une variable d'environnement.
Saisissez les variables d'environnement fournies ci-dessous, en les remplaçant par vos valeurs.
Variables d'environnement
Clé Exemple de valeur S3_BUCKETdigital-shadows-logsS3_PREFIXdigital-shadows/STATE_KEYdigital-shadows/state.jsonDS_API_KEYABC123(votre clé API de six caractères)DS_API_SECRETyour-32-character-api-secretAPI_BASEhttps://api.searchlight.app/v1DS_ACCOUNT_IDyour-account-idPAGE_SIZE100MAX_PAGES10Une fois la fonction créée, restez sur sa page (ou ouvrez Lambda > Functions > DigitalShadowsCollector).
Accédez à l'onglet Configuration.
Dans le panneau Configuration générale, cliquez sur Modifier.
Définissez Délai avant expiration sur 5 minutes (300 secondes), puis cliquez sur Enregistrer.
Créer une programmation EventBridge
- Accédez à Amazon EventBridge> Scheduler> Create schedule.
- Fournissez les informations de configuration suivantes :
- Calendrier récurrent : Taux (
1 hour) - Cible : votre fonction Lambda
DigitalShadowsCollector - Nom :
DigitalShadowsCollector-1h
- Calendrier récurrent : Taux (
- Cliquez sur Créer la programmation.
Configurer un flux dans Google SecOps pour ingérer les journaux des indicateurs Digital Shadows
- Accédez à Paramètres SIEM> Flux.
- Cliquez sur Add New Feed (Ajouter un flux).
- Sur la page suivante, cliquez sur Configurer un seul flux.
- Saisissez un nom unique pour le nom du flux.
- Sélectionnez Amazon S3 V2 comme type de source.
- Sélectionnez Indicateurs Digital Shadows comme Type de journal.
- Cliquez sur Suivant, puis sur Envoyer.
Indiquez les valeurs des champs suivants :
- URI S3 :
s3://digital-shadows-logs/digital-shadows/ - Option de suppression de la source : sélectionnez l'option de suppression de votre choix.
- Âge maximal des fichiers : incluez les fichiers modifiés au cours des derniers jours (180 jours par défaut).
- ID de clé d'accès : clé d'accès utilisateur avec accès au bucket S3
- Clé d'accès secrète : clé secrète de l'utilisateur ayant accès au bucket S3
- Espace de noms de l'élément : espace de noms de l'élément
- Libellés d'ingestion : libellé à appliquer aux événements de ce flux
- URI S3 :
Cliquez sur Suivant, puis sur Envoyer.
Table de mappage UDM
| Champ de journal | Mappage UDM | Logique |
|---|---|---|
| valeur | entity.entity.file.md5 | Définissez cet attribut si type == "MD5". |
| valeur | entity.entity.file.sha1 | Définissez si type == "SHA1" |
| valeur | entity.entity.file.sha256 | Définissez cet attribut si type == "SHA256". |
| valeur | entity.entity.hostname | Définissez si type == "HOST" |
| valeur | entity.entity.ip | Valeur copiée directement si type == "IP" |
| valeur | entity.entity.url | Définir si type == "URL" |
| valeur | entity.entity.user.email_addresses | Valeur copiée directement si type == "EMAIL" |
| type | entity.metadata.entity_type | Définissez sur "DOMAIN_NAME" si type == "HOST", "IP_ADDRESS" si type == "IP", "URL" si type == "URL", "USER" si type == "EMAIL", "FILE" si type in ["SHA1","SHA256","MD5"], sinon "UNKNOWN_ENTITYTYPE" |
| lastUpdated | entity.metadata.interval.start_time | Converti d'ISO8601 en code temporel s'il n'est pas vide |
| id | entity.metadata.product_entity_id | Valeur copiée directement si elle n'est pas vide |
| attributionTag.id, attributionTag.name, attributionTag.type | entity.metadata.threat.about.labels | Fusionné avec les objets {key: nom du champ de tag, value: valeur du tag} si non vide |
| sourceType | entity.metadata.threat.category_details | Valeur copiée directement |
| entity.metadata.threat.threat_feed_name | Définissez le paramètre sur "Indicateurs". | |
| id | entity.metadata.threat.threat_id | Valeur copiée directement si elle n'est pas vide |
| sourceIdentifier | entity.metadata.threat.url_back_to_product | Valeur copiée directement |
| entity.metadata.product_name | Définissez le paramètre sur "Indicateurs". | |
| entity.metadata.vendor_name | Défini sur "Digital Shadows" |
Vous avez encore besoin d'aide ? Obtenez des réponses de membres de la communauté et de professionnels Google SecOps.