Collecter les journaux des indicateurs Digital Shadows

Compatible avec :

Ce document explique comment ingérer des journaux d'indicateurs Digital Shadows dans Google Security Operations à l'aide d'Amazon S3.

Digital Shadows Indicators (qui fait désormais partie de ReliaQuest GreyMatter DRP) est une plate-forme de protection contre les risques numériques qui surveille et détecte en continu les menaces externes, les expositions de données et les usurpations d'identité de marque sur le Web ouvert, le deep Web, le dark Web et les réseaux sociaux. Il fournit des renseignements sur les menaces, des alertes d'incident et des indicateurs de compromission (IOC) pour aider les organisations à identifier et à atténuer les risques numériques.

Avant de commencer

  • Une instance Google SecOps
  • Accès privilégié au portail Digital Shadows Indicators
  • Accès privilégié à AWS (S3, IAM)
  • Abonnement actif à Digital Shadows Indicators avec accès à l'API activé

Collecter les identifiants de l'API Digital Shadows Indicators

  1. Connectez-vous au portail Digital Shadows Indicators à l'adresse https://portal-digitalshadows.com.
  2. Accédez à Paramètres > Identifiants de l'API.
  3. Si vous ne disposez pas encore d'une clé API, cliquez sur Créer un client API ou Générer une clé API.
  4. Copiez et enregistrez les informations suivantes dans un emplacement sécurisé :

    • Clé API : votre clé API de six caractères
    • Code secret de l'API : code secret de l'API de 32 caractères
    • ID de compte : votre ID de compte (affiché dans le portail ou fourni par votre représentant Digital Shadows)
    • URL de base de l'API : https://api.searchlight.app/v1 ou https://portal-digitalshadows.com/api/v1 (selon la région de votre locataire)

Configurer un bucket AWS S3 et IAM pour Google SecOps

  1. Créez un bucket Amazon S3 en suivant ce guide de l'utilisateur : Créer un bucket.
  2. Enregistrez le Nom et la Région du bucket pour référence ultérieure (par exemple, digital-shadows-logs).
  3. Créez un utilisateur en suivant ce guide d'utilisation : Créer un utilisateur IAM.
  4. Sélectionnez l'utilisateur créé.
  5. Sélectionnez l'onglet Informations d'identification de sécurité.
  6. Cliquez sur Créer une clé d'accès dans la section Clés d'accès.
  7. Sélectionnez Service tiers comme Cas d'utilisation.
  8. Cliquez sur Suivant.
  9. Facultatif : Ajoutez un tag de description.
  10. Cliquez sur Créer une clé d'accès.
  11. Cliquez sur Download .csv file (Télécharger le fichier .csv) pour enregistrer la clé d'accès et la clé d'accès secrète pour référence ultérieure.
  12. Cliquez sur OK.
  13. Sélectionnez l'onglet Autorisations.
  14. Cliquez sur Ajouter des autorisations dans la section Règles d'autorisation.
  15. Sélectionnez Ajouter des autorisations.
  16. Sélectionnez Joindre directement des règles.
  17. Recherchez la règle AmazonS3FullAccess.
  18. Sélectionnez la règle.
  19. Cliquez sur Suivant.
  20. Cliquez sur Ajouter des autorisations.

Configurer la stratégie et le rôle IAM pour les importations S3

  1. Dans la console AWS, accédez à IAM > Policies > Create policy > JSON (IAM > Stratégies > Créer une stratégie > JSON).
  2. Copiez et collez le règlement ci-dessous.
  3. JSON de la règle (remplacez digital-shadows-logs si vous avez saisi un autre nom de bucket) :

    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "AllowPutObjects",
                "Effect": "Allow",
                "Action": "s3:PutObject",
                "Resource": "arn:aws:s3:::digital-shadows-logs/*"
            },
            {
                "Sid": "AllowGetStateObject",
                "Effect": "Allow",
                "Action": "s3:GetObject",
                "Resource": "arn:aws:s3:::digital-shadows-logs/digital-shadows/state.json"
            }
        ]
    }
    
  4. Cliquez sur Suivant > Créer une règle.

  5. Accédez à IAM > Rôles > Créer un rôle > Service AWS > Lambda.

  6. Associez la règle que vous venez de créer.

  7. Nommez le rôle DigitalShadowsLambdaRole, puis cliquez sur Créer un rôle.

Créer la fonction Lambda

  1. Dans la console AWS, accédez à Lambda > Fonctions > Créer une fonction.
  2. Cliquez sur Créer à partir de zéro.
  3. Fournissez les informations de configuration suivantes :

    Paramètre Valeur
    Nom DigitalShadowsCollector
    Durée d'exécution Python 3.13
    Architecture x86_64
    Rôle d'exécution DigitalShadowsLambdaRole
  4. Une fois la fonction créée, ouvrez l'onglet Code, supprimez le stub et collez le code ci-dessous (DigitalShadowsCollector.py).

    import urllib3
    import json
    import boto3
    import os
    import base64
    import logging
    import time
    from datetime import datetime, timedelta, timezone
    from urllib.parse import urlencode
    
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    
    HTTP = urllib3.PoolManager(retries=False)
    storage_client = boto3.client('s3')
    
    def _basic_auth_header(key: str, secret: str) -> str:
        token = base64.b64encode(f"{key}:{secret}".encode("utf-8")).decode("utf-8")
        return f"Basic {token}"
    
    def _load_state(bucket, key, default_days=30) -> str:
        """Return ISO8601 checkpoint (UTC)."""
        try:
            response = storage_client.get_object(Bucket=bucket, Key=key)
            state_data = response['Body'].read().decode('utf-8')
            state = json.loads(state_data)
            ts = state.get("last_timestamp")
            if ts:
                return ts
        except storage_client.exceptions.NoSuchKey:
            logger.info("No previous state found, starting from default lookback")
        except Exception as e:
            logger.warning(f"State read error: {e}")
        return (datetime.now(timezone.utc) - timedelta(days=default_days)).isoformat()
    
    def _save_state(bucket, key, ts: str) -> None:
        storage_client.put_object(
            Bucket=bucket,
            Key=key,
            Body=json.dumps({"last_timestamp": ts}),
            ContentType="application/json"
        )
    
    def _get_json(url: str, headers: dict, params: dict, backoff_s=2, max_retries=3) -> dict:
        qs = f"?{urlencode(params)}" if params else ""
        for attempt in range(max_retries):
            r = HTTP.request("GET", f"{url}{qs}", headers=headers)
            if r.status == 200:
                return json.loads(r.data.decode("utf-8"))
            if r.status in (429, 500, 502, 503, 504):
                wait = backoff_s * (2 ** attempt)
                logger.warning(f"HTTP {r.status} from DS API, retrying in {wait}s")
                time.sleep(wait)
                continue
            raise RuntimeError(f"DS API error {r.status}: {r.data[:200]}")
        raise RuntimeError("Exceeded retry budget for DS API")
    
    def _collect(api_base, headers, path, since_ts, account_id, page_size, max_pages, time_param):
        items = []
        for page in range(max_pages):
            params = {
                "limit": page_size,
                "offset": page * page_size,
                time_param: since_ts,
            }
            if account_id:
                params["account-id"] = account_id
            data = _get_json(f"{api_base}/{path}", headers, params)
            batch = data.get("items") or data.get("data") or []
            if not batch:
                break
            items.extend(batch)
            if len(batch) < page_size:
                break
        return items
    
    def lambda_handler(event, context):
        bucket_name = os.environ["S3_BUCKET"]
        api_key = os.environ["DS_API_KEY"]
        api_secret = os.environ["DS_API_SECRET"]
        prefix = os.environ.get("S3_PREFIX", "digital-shadows")
        state_key = os.environ.get("STATE_KEY", "digital-shadows/state.json")
        api_base = os.environ.get("API_BASE", "https://api.searchlight.app/v1")
        account_id = os.environ.get("DS_ACCOUNT_ID", "")
        page_size = int(os.environ.get("PAGE_SIZE", "100"))
        max_pages = int(os.environ.get("MAX_PAGES", "10"))
    
        try:
            last_ts = _load_state(bucket_name, state_key)
            logger.info(f"Checkpoint: {last_ts}")
    
            headers = {
                "Authorization": _basic_auth_header(api_key, api_secret),
                "Accept": "application/json",
                "User-Agent": "Chronicle-DigitalShadows-S3/1.0",
            }
    
            records = []
    
            incidents = _collect(
                api_base, headers, "incidents", last_ts, account_id,
                page_size, max_pages, time_param="published-after"
            )
            for incident in incidents:
                incident['_source_type'] = 'incident'
            records.extend(incidents)
    
            intel_incidents = _collect(
                api_base, headers, "intel-incidents", last_ts, account_id,
                page_size, max_pages, time_param="published-after"
            )
            for intel in intel_incidents:
                intel['_source_type'] = 'intelligence_incident'
            records.extend(intel_incidents)
    
            indicators = _collect(
                api_base, headers, "indicators", last_ts, account_id,
                page_size, max_pages, time_param="lastUpdated-after"
            )
            for indicator in indicators:
                indicator['_source_type'] = 'ioc'
            records.extend(indicators)
    
            if records:
                newest = max(
                    (r.get("updated") or r.get("raised") or r.get("lastUpdated") or last_ts)
                    for r in records
                )
                key = f"{prefix}/digital_shadows_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}.json"
                body = "\n".join(json.dumps(r, separators=(",", ":")) for r in records)
                storage_client.put_object(
                    Bucket=bucket_name,
                    Key=key,
                    Body=body,
                    ContentType="application/x-ndjson"
                )
                _save_state(bucket_name, state_key, newest)
                msg = f"Wrote {len(records)} records to s3://{bucket_name}/{key}"
            else:
                msg = "No new records"
    
            logger.info(msg)
            return {'statusCode': 200, 'body': json.dumps({'message': msg, 'records': len(records) if records else 0})}
    
        except Exception as e:
            logger.error(f"Error processing logs: {str(e)}")
            raise
    
  5. Accédez à Configuration> Variables d'environnement> Modifier> Ajouter une variable d'environnement.

  6. Saisissez les variables d'environnement fournies ci-dessous, en les remplaçant par vos valeurs.

    Variables d'environnement

    Clé Exemple de valeur
    S3_BUCKET digital-shadows-logs
    S3_PREFIX digital-shadows/
    STATE_KEY digital-shadows/state.json
    DS_API_KEY ABC123 (votre clé API de six caractères)
    DS_API_SECRET your-32-character-api-secret
    API_BASE https://api.searchlight.app/v1
    DS_ACCOUNT_ID your-account-id
    PAGE_SIZE 100
    MAX_PAGES 10
  7. Une fois la fonction créée, restez sur sa page (ou ouvrez Lambda > Functions > DigitalShadowsCollector).

  8. Accédez à l'onglet Configuration.

  9. Dans le panneau Configuration générale, cliquez sur Modifier.

  10. Définissez Délai avant expiration sur 5 minutes (300 secondes), puis cliquez sur Enregistrer.

Créer une programmation EventBridge

  1. Accédez à Amazon EventBridge> Scheduler> Create schedule.
  2. Fournissez les informations de configuration suivantes :
    • Calendrier récurrent : Taux (1 hour)
    • Cible : votre fonction Lambda DigitalShadowsCollector
    • Nom : DigitalShadowsCollector-1h
  3. Cliquez sur Créer la programmation.

Configurer un flux dans Google SecOps pour ingérer les journaux des indicateurs Digital Shadows

  1. Accédez à Paramètres SIEM> Flux.
  2. Cliquez sur Add New Feed (Ajouter un flux).
  3. Sur la page suivante, cliquez sur Configurer un seul flux.
  4. Saisissez un nom unique pour le nom du flux.
  5. Sélectionnez Amazon S3 V2 comme type de source.
  6. Sélectionnez Indicateurs Digital Shadows comme Type de journal.
  7. Cliquez sur Suivant, puis sur Envoyer.
  8. Indiquez les valeurs des champs suivants :

    • URI S3 : s3://digital-shadows-logs/digital-shadows/
    • Option de suppression de la source : sélectionnez l'option de suppression de votre choix.
    • Âge maximal des fichiers : incluez les fichiers modifiés au cours des derniers jours (180 jours par défaut).
    • ID de clé d'accès : clé d'accès utilisateur avec accès au bucket S3
    • Clé d'accès secrète : clé secrète de l'utilisateur ayant accès au bucket S3
    • Espace de noms de l'élément : espace de noms de l'élément
    • Libellés d'ingestion : libellé à appliquer aux événements de ce flux
  9. Cliquez sur Suivant, puis sur Envoyer.

Table de mappage UDM

Champ de journal Mappage UDM Logique
valeur entity.entity.file.md5 Définissez cet attribut si type == "MD5".
valeur entity.entity.file.sha1 Définissez si type == "SHA1"
valeur entity.entity.file.sha256 Définissez cet attribut si type == "SHA256".
valeur entity.entity.hostname Définissez si type == "HOST"
valeur entity.entity.ip Valeur copiée directement si type == "IP"
valeur entity.entity.url Définir si type == "URL"
valeur entity.entity.user.email_addresses Valeur copiée directement si type == "EMAIL"
type entity.metadata.entity_type Définissez sur "DOMAIN_NAME" si type == "HOST", "IP_ADDRESS" si type == "IP", "URL" si type == "URL", "USER" si type == "EMAIL", "FILE" si type in ["SHA1","SHA256","MD5"], sinon "UNKNOWN_ENTITYTYPE"
lastUpdated entity.metadata.interval.start_time Converti d'ISO8601 en code temporel s'il n'est pas vide
id entity.metadata.product_entity_id Valeur copiée directement si elle n'est pas vide
attributionTag.id, attributionTag.name, attributionTag.type entity.metadata.threat.about.labels Fusionné avec les objets {key: nom du champ de tag, value: valeur du tag} si non vide
sourceType entity.metadata.threat.category_details Valeur copiée directement
entity.metadata.threat.threat_feed_name Définissez le paramètre sur "Indicateurs".
id entity.metadata.threat.threat_id Valeur copiée directement si elle n'est pas vide
sourceIdentifier entity.metadata.threat.url_back_to_product Valeur copiée directement
entity.metadata.product_name Définissez le paramètre sur "Indicateurs".
entity.metadata.vendor_name Défini sur "Digital Shadows"

Vous avez encore besoin d'aide ? Obtenez des réponses de membres de la communauté et de professionnels Google SecOps.