Protokolle für Digital Shadows Indicators erfassen

Unterstützt in:

In diesem Dokument wird beschrieben, wie Sie Digital Shadows Indicators-Logs mit Amazon S3 in Google Security Operations aufnehmen.

Digital Shadows Indicators (jetzt Teil von ReliaQuest GreyMatter DRP) ist eine Plattform zum Schutz vor digitalen Risiken, die kontinuierlich externe Bedrohungen, Datenoffenlegungen und Marken-Identitätsdiebstahl im öffentlich zugänglichen Internet, Deep Web, Darknet und in sozialen Medien überwacht und erkennt. Der Dienst bietet Threat Intelligence, Vorfallbenachrichtigungen und Bedrohungsindikatoren (Indicators of Compromise, IOCs), um Unternehmen bei der Identifizierung und Minimierung digitaler Risiken zu unterstützen.

Hinweise

  • Eine Google SecOps-Instanz
  • Privilegierter Zugriff auf das Digital Shadows Indicators-Portal
  • Privilegierter Zugriff auf AWS (S3, IAM)
  • Aktives Abo für Digital Shadows Indicators mit aktiviertem API-Zugriff

Anmeldedaten für die Digital Shadows Indicators API abrufen

  1. Melden Sie sich unter https://portal-digitalshadows.com im Digital Shadows Indicators Portal an.
  2. Gehen Sie zu den Einstellungen> API-Anmeldedaten.
  3. Wenn Sie noch keinen API-Schlüssel haben, klicken Sie auf Neuen API-Client erstellen oder API-Schlüssel generieren.
  4. Kopieren Sie die folgenden Details und speichern Sie sie an einem sicheren Ort:

    • API-Schlüssel: Ihr 6‑stelliger API-Schlüssel
    • API-Secret: Ihr 32 Zeichen langes API-Secret
    • Konto-ID: Ihre Konto-ID (wird im Portal angezeigt oder von Ihrem Digital Shadows-Ansprechpartner bereitgestellt)
    • API-Basis-URL: https://api.searchlight.app/v1 oder https://portal-digitalshadows.com/api/v1 (je nach Mandantenregion)

AWS S3-Bucket und IAM für Google SecOps konfigurieren

  1. Erstellen Sie einen Amazon S3-Bucket. Folgen Sie dazu der Anleitung in diesem Nutzerhandbuch: Bucket erstellen.
  2. Speichern Sie den Namen und die Region des Buckets zur späteren Verwendung (z. B. digital-shadows-logs).
  3. Erstellen Sie einen Nutzer. Folgen Sie dazu dieser Anleitung: IAM-Nutzer erstellen.
  4. Wählen Sie den erstellten Nutzer aus.
  5. Wählen Sie den Tab Sicherheitsanmeldedaten aus.
  6. Klicken Sie im Abschnitt Zugriffsschlüssel auf Zugriffsschlüssel erstellen.
  7. Wählen Sie als Anwendungsfall Drittanbieterdienst aus.
  8. Klicken Sie auf Weiter.
  9. Optional: Fügen Sie ein Beschreibungstag hinzu.
  10. Klicken Sie auf Zugriffsschlüssel erstellen.
  11. Klicken Sie auf CSV-Datei herunterladen, um den Access Key (Zugriffsschlüssel) und den Secret Access Key (geheimer Zugriffsschlüssel) für die zukünftige Verwendung zu speichern.
  12. Klicken Sie auf Fertig.
  13. Wählen Sie den Tab Berechtigungen aus.
  14. Klicken Sie im Bereich Berechtigungsrichtlinien auf Berechtigungen hinzufügen.
  15. Wählen Sie Berechtigungen hinzufügen aus.
  16. Wählen Sie Richtlinien direkt anhängen aus.
  17. Suchen Sie nach der Richtlinie AmazonS3FullAccess.
  18. Wählen Sie die Richtlinie aus.
  19. Klicken Sie auf Weiter.
  20. Klicken Sie auf Berechtigungen hinzufügen.

IAM-Richtlinie und ‑Rolle für S3-Uploads konfigurieren

  1. Rufen Sie in der AWS-Konsole den Tab IAM > Richtlinien > Richtlinie erstellen > JSON auf.
  2. Kopieren Sie die Richtlinie unten und fügen Sie sie ein.
  3. Richtlinien-JSON (ersetzen Sie digital-shadows-logs, wenn Sie einen anderen Bucket-Namen eingegeben haben):

    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "AllowPutObjects",
                "Effect": "Allow",
                "Action": "s3:PutObject",
                "Resource": "arn:aws:s3:::digital-shadows-logs/*"
            },
            {
                "Sid": "AllowGetStateObject",
                "Effect": "Allow",
                "Action": "s3:GetObject",
                "Resource": "arn:aws:s3:::digital-shadows-logs/digital-shadows/state.json"
            }
        ]
    }
    
  4. Klicken Sie auf Weiter > Richtlinie erstellen.

  5. Gehen Sie zu IAM > Rollen > Rolle erstellen > AWS-Service > Lambda.

  6. Hängen Sie die neu erstellte Richtlinie an.

  7. Geben Sie der Rolle den Namen DigitalShadowsLambdaRole und klicken Sie auf Rolle erstellen.

Lambda-Funktion erstellen

  1. Rufen Sie in der AWS Console Lambda > Funktionen > Funktion erstellen auf.
  2. Klicken Sie auf Von Grund auf erstellen.
  3. Geben Sie die folgenden Konfigurationsdetails an:

    Einstellung Wert
    Name DigitalShadowsCollector
    Laufzeit Python 3.13
    Architektur x86_64
    Ausführungsrolle DigitalShadowsLambdaRole
  4. Nachdem die Funktion erstellt wurde, öffnen Sie den Tab Code, löschen Sie den Stub und fügen Sie den Code unten (DigitalShadowsCollector.py) ein.

    import urllib3
    import json
    import boto3
    import os
    import base64
    import logging
    import time
    from datetime import datetime, timedelta, timezone
    from urllib.parse import urlencode
    
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    
    HTTP = urllib3.PoolManager(retries=False)
    storage_client = boto3.client('s3')
    
    def _basic_auth_header(key: str, secret: str) -> str:
        token = base64.b64encode(f"{key}:{secret}".encode("utf-8")).decode("utf-8")
        return f"Basic {token}"
    
    def _load_state(bucket, key, default_days=30) -> str:
        """Return ISO8601 checkpoint (UTC)."""
        try:
            response = storage_client.get_object(Bucket=bucket, Key=key)
            state_data = response['Body'].read().decode('utf-8')
            state = json.loads(state_data)
            ts = state.get("last_timestamp")
            if ts:
                return ts
        except storage_client.exceptions.NoSuchKey:
            logger.info("No previous state found, starting from default lookback")
        except Exception as e:
            logger.warning(f"State read error: {e}")
        return (datetime.now(timezone.utc) - timedelta(days=default_days)).isoformat()
    
    def _save_state(bucket, key, ts: str) -> None:
        storage_client.put_object(
            Bucket=bucket,
            Key=key,
            Body=json.dumps({"last_timestamp": ts}),
            ContentType="application/json"
        )
    
    def _get_json(url: str, headers: dict, params: dict, backoff_s=2, max_retries=3) -> dict:
        qs = f"?{urlencode(params)}" if params else ""
        for attempt in range(max_retries):
            r = HTTP.request("GET", f"{url}{qs}", headers=headers)
            if r.status == 200:
                return json.loads(r.data.decode("utf-8"))
            if r.status in (429, 500, 502, 503, 504):
                wait = backoff_s * (2 ** attempt)
                logger.warning(f"HTTP {r.status} from DS API, retrying in {wait}s")
                time.sleep(wait)
                continue
            raise RuntimeError(f"DS API error {r.status}: {r.data[:200]}")
        raise RuntimeError("Exceeded retry budget for DS API")
    
    def _collect(api_base, headers, path, since_ts, account_id, page_size, max_pages, time_param):
        items = []
        for page in range(max_pages):
            params = {
                "limit": page_size,
                "offset": page * page_size,
                time_param: since_ts,
            }
            if account_id:
                params["account-id"] = account_id
            data = _get_json(f"{api_base}/{path}", headers, params)
            batch = data.get("items") or data.get("data") or []
            if not batch:
                break
            items.extend(batch)
            if len(batch) < page_size:
                break
        return items
    
    def lambda_handler(event, context):
        bucket_name = os.environ["S3_BUCKET"]
        api_key = os.environ["DS_API_KEY"]
        api_secret = os.environ["DS_API_SECRET"]
        prefix = os.environ.get("S3_PREFIX", "digital-shadows")
        state_key = os.environ.get("STATE_KEY", "digital-shadows/state.json")
        api_base = os.environ.get("API_BASE", "https://api.searchlight.app/v1")
        account_id = os.environ.get("DS_ACCOUNT_ID", "")
        page_size = int(os.environ.get("PAGE_SIZE", "100"))
        max_pages = int(os.environ.get("MAX_PAGES", "10"))
    
        try:
            last_ts = _load_state(bucket_name, state_key)
            logger.info(f"Checkpoint: {last_ts}")
    
            headers = {
                "Authorization": _basic_auth_header(api_key, api_secret),
                "Accept": "application/json",
                "User-Agent": "Chronicle-DigitalShadows-S3/1.0",
            }
    
            records = []
    
            incidents = _collect(
                api_base, headers, "incidents", last_ts, account_id,
                page_size, max_pages, time_param="published-after"
            )
            for incident in incidents:
                incident['_source_type'] = 'incident'
            records.extend(incidents)
    
            intel_incidents = _collect(
                api_base, headers, "intel-incidents", last_ts, account_id,
                page_size, max_pages, time_param="published-after"
            )
            for intel in intel_incidents:
                intel['_source_type'] = 'intelligence_incident'
            records.extend(intel_incidents)
    
            indicators = _collect(
                api_base, headers, "indicators", last_ts, account_id,
                page_size, max_pages, time_param="lastUpdated-after"
            )
            for indicator in indicators:
                indicator['_source_type'] = 'ioc'
            records.extend(indicators)
    
            if records:
                newest = max(
                    (r.get("updated") or r.get("raised") or r.get("lastUpdated") or last_ts)
                    for r in records
                )
                key = f"{prefix}/digital_shadows_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}.json"
                body = "\n".join(json.dumps(r, separators=(",", ":")) for r in records)
                storage_client.put_object(
                    Bucket=bucket_name,
                    Key=key,
                    Body=body,
                    ContentType="application/x-ndjson"
                )
                _save_state(bucket_name, state_key, newest)
                msg = f"Wrote {len(records)} records to s3://{bucket_name}/{key}"
            else:
                msg = "No new records"
    
            logger.info(msg)
            return {'statusCode': 200, 'body': json.dumps({'message': msg, 'records': len(records) if records else 0})}
    
        except Exception as e:
            logger.error(f"Error processing logs: {str(e)}")
            raise
    
  5. Klicken Sie auf Konfiguration> Umgebungsvariablen> Bearbeiten> Neue Umgebungsvariable hinzufügen.

  6. Geben Sie die unten angegebenen Umgebungsvariablen ein und ersetzen Sie die Platzhalter durch Ihre Werte.

    Umgebungsvariablen

    Schlüssel Beispielwert
    S3_BUCKET digital-shadows-logs
    S3_PREFIX digital-shadows/
    STATE_KEY digital-shadows/state.json
    DS_API_KEY ABC123 (Ihr 6‑stelliger API‑Schlüssel)
    DS_API_SECRET your-32-character-api-secret
    API_BASE https://api.searchlight.app/v1
    DS_ACCOUNT_ID your-account-id
    PAGE_SIZE 100
    MAX_PAGES 10
  7. Bleiben Sie nach dem Erstellen der Funktion auf der entsprechenden Seite oder öffnen Sie Lambda > Funktionen > DigitalShadowsCollector.

  8. Wählen Sie den Tab Konfiguration aus.

  9. Klicken Sie im Bereich Allgemeine Konfiguration auf Bearbeiten.

  10. Ändern Sie Zeitüberschreitung in 5 Minuten (300 Sekunden) und klicken Sie auf Speichern.

EventBridge-Zeitplan erstellen

  1. Gehen Sie zu Amazon EventBridge > Scheduler > Create schedule (Amazon EventBridge > Scheduler > Zeitplan erstellen).
  2. Geben Sie die folgenden Konfigurationsdetails an:
    • Wiederkehrender Termin: Rate (1 hour)
    • Ziel: Ihre Lambda-Funktion DigitalShadowsCollector
    • Name: DigitalShadowsCollector-1h
  3. Klicken Sie auf Zeitplan erstellen.

Feed in Google SecOps konfigurieren, um Digital Shadows Indicators-Logs aufzunehmen

  1. Rufen Sie die SIEM-Einstellungen > Feeds auf.
  2. Klicken Sie auf Neuen Feed hinzufügen.
  3. Klicken Sie auf der nächsten Seite auf Einen einzelnen Feed konfigurieren.
  4. Geben Sie einen eindeutigen Namen für den Feednamen ein.
  5. Wählen Sie Amazon S3 V2 als Quelltyp aus.
  6. Wählen Sie Digital Shadows Indicators als Logtyp aus.
  7. Klicken Sie auf Weiter und dann auf Senden.
  8. Geben Sie Werte für die folgenden Felder an:

    • S3-URI: s3://digital-shadows-logs/digital-shadows/
    • Option zum Löschen der Quelle: Wählen Sie die gewünschte Option zum Löschen aus.
    • Höchstalter für Dateien: Dateien einschließen, die in den letzten Tagen geändert wurden (Standard: 180 Tage)
    • Zugriffsschlüssel-ID: Nutzerzugriffsschlüssel mit Zugriff auf den S3-Bucket
    • Geheimer Zugriffsschlüssel: Der geheime Schlüssel des Nutzers mit Zugriff auf den S3-Bucket.
    • Asset-Namespace: Der Asset-Namespace
    • Labels für Datenaufnahme: Das Label, das auf die Ereignisse aus diesem Feed angewendet werden soll
  9. Klicken Sie auf Weiter und dann auf Senden.

UDM-Zuordnungstabelle

Logfeld UDM-Zuordnung Logik
Wert entity.entity.file.md5 Wird festgelegt, wenn type == „MD5“
Wert entity.entity.file.sha1 Wird festgelegt, wenn type == „SHA1“
Wert entity.entity.file.sha256 Wird festgelegt, wenn type == „SHA256“
Wert entity.entity.hostname Wird festgelegt, wenn type == „HOST“
Wert entity.entity.ip Wert wird direkt kopiert, wenn type == „IP“
Wert entity.entity.url Festlegen, ob type == „URL“
Wert entity.entity.user.email_addresses Wert wird direkt kopiert, wenn type == „EMAIL“
Typ entity.metadata.entity_type Auf „DOMAIN_NAME“ setzen, wenn type == „HOST“, „IP_ADDRESS“, wenn type == „IP“, „URL“, wenn type == „URL“, „USER“, wenn type == „EMAIL“, „FILE“, wenn type in [„SHA1“, „SHA256“, „MD5“], andernfalls „UNKNOWN_ENTITYTYPE“
lastUpdated entity.metadata.interval.start_time Wird von ISO8601 in Zeitstempel umgewandelt, sofern nicht leer
id entity.metadata.product_entity_id Wert wird direkt kopiert, wenn er nicht leer ist
attributionTag.id, attributionTag.name, attributionTag.type entity.metadata.threat.about.labels Wird mit Objekten {key: tag field name, value: tag value} zusammengeführt, falls nicht leer
sourceType entity.metadata.threat.category_details Wert direkt kopiert
entity.metadata.threat.threat_feed_name Auf „Indikatoren“ festgelegt
id entity.metadata.threat.threat_id Wert wird direkt kopiert, wenn er nicht leer ist
sourceIdentifier entity.metadata.threat.url_back_to_product Wert direkt kopiert
entity.metadata.product_name Auf „Indikatoren“ festgelegt
entity.metadata.vendor_name Auf „Digital Shadows“ festgelegt

Benötigen Sie weitere Hilfe? Antworten von Community-Mitgliedern und Google SecOps-Experten erhalten