Recopila registros de indicadores de Digital Shadows

Compatible con:

En este documento, se explica cómo transferir registros de indicadores de Digital Shadows a Google Security Operations con Amazon S3.

Los indicadores de Digital Shadows (ahora parte de ReliaQuest GreyMatter DRP) son una plataforma de protección contra riesgos digitales que supervisa y detecta de forma continua las amenazas externas, las exposiciones de datos y la suplantación de marca en la Web abierta, la Web profunda, la Web oscura y las redes sociales. Proporciona inteligencia sobre amenazas, alertas de incidentes e indicadores de compromiso (IOC) para ayudar a las organizaciones a identificar y mitigar los riesgos digitales.

Antes de comenzar

  • Una instancia de Google SecOps
  • Acceso con privilegios al portal de Indicadores de Sombras Digitales
  • Acceso con privilegios a AWS (S3, IAM)
  • Suscripción activa a los indicadores de Digital Shadows con acceso a la API habilitado

Recopila credenciales de la API de Digital Shadows Indicators

  1. Accede al Portal de indicadores de Digital Shadows en https://portal-digitalshadows.com.
  2. Ve a Configuración > Credenciales de API.
  3. Si no tienes una clave de API existente, haz clic en Create New API Client o Generate API Key.
  4. Copia y guarda los siguientes detalles en una ubicación segura:

    • Clave de API: Tu clave de API de 6 caracteres
    • Secreto de API: Tu secreto de API de 32 caracteres
    • ID de la cuenta: Es el ID de tu cuenta (se muestra en el portal o te lo proporciona tu representante de Digital Shadows).
    • URL base de la API: https://api.searchlight.app/v1 o https://portal-digitalshadows.com/api/v1 (según la región de tu arrendatario)

Configura el bucket de AWS S3 y el IAM para Google SecOps

  1. Crea un bucket de Amazon S3 siguiendo esta guía del usuario: Cómo crear un bucket.
  2. Guarda el Nombre y la Región del bucket para futuras referencias (por ejemplo, digital-shadows-logs).
  3. Crea un usuario siguiendo esta guía del usuario: Cómo crear un usuario de IAM.
  4. Selecciona el usuario creado.
  5. Selecciona la pestaña Credenciales de seguridad.
  6. Haz clic en Crear clave de acceso en la sección Claves de acceso.
  7. Selecciona Servicio de terceros como el Caso de uso.
  8. Haz clic en Siguiente.
  9. Opcional: Agrega una etiqueta de descripción.
  10. Haz clic en Crear clave de acceso.
  11. Haz clic en Descargar archivo .csv para guardar la clave de acceso y la clave de acceso secreta para consultarlas en el futuro.
  12. Haz clic en Listo.
  13. Selecciona la pestaña Permisos.
  14. Haz clic en Agregar permisos en la sección Políticas de permisos.
  15. Selecciona Agregar permisos.
  16. Selecciona Adjuntar políticas directamente.
  17. Busca la política AmazonS3FullAccess.
  18. Selecciona la política.
  19. Haz clic en Siguiente.
  20. Haz clic en Agregar permisos.

Configura la política y el rol de IAM para las cargas de S3

  1. En la consola de AWS, ve a IAM > Políticas > Crear política > pestaña JSON.
  2. Copia y pega la siguiente política.
  3. JSON de la política (reemplaza digital-shadows-logs si ingresaste un nombre de bucket diferente):

    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "AllowPutObjects",
                "Effect": "Allow",
                "Action": "s3:PutObject",
                "Resource": "arn:aws:s3:::digital-shadows-logs/*"
            },
            {
                "Sid": "AllowGetStateObject",
                "Effect": "Allow",
                "Action": "s3:GetObject",
                "Resource": "arn:aws:s3:::digital-shadows-logs/digital-shadows/state.json"
            }
        ]
    }
    
  4. Haz clic en Siguiente > Crear política.

  5. Ve a IAM > Roles > Crear rol > Servicio de AWS > Lambda.

  6. Adjunta la política recién creada.

  7. Asigna el nombre DigitalShadowsLambdaRole al rol y haz clic en Crear rol.

Crea la función Lambda

  1. En la consola de AWS, ve a Lambda > Functions > Create function.
  2. Haz clic en Author from scratch.
  3. Proporciona los siguientes detalles de configuración:

    Configuración Valor
    Nombre DigitalShadowsCollector
    Tiempo de ejecución Python 3.13
    Arquitectura x86_64
    Rol de ejecución DigitalShadowsLambdaRole
  4. Después de crear la función, abre la pestaña Code, borra el código auxiliar y pega el siguiente código (DigitalShadowsCollector.py).

    import urllib3
    import json
    import boto3
    import os
    import base64
    import logging
    import time
    from datetime import datetime, timedelta, timezone
    from urllib.parse import urlencode
    
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    
    HTTP = urllib3.PoolManager(retries=False)
    storage_client = boto3.client('s3')
    
    def _basic_auth_header(key: str, secret: str) -> str:
        token = base64.b64encode(f"{key}:{secret}".encode("utf-8")).decode("utf-8")
        return f"Basic {token}"
    
    def _load_state(bucket, key, default_days=30) -> str:
        """Return ISO8601 checkpoint (UTC)."""
        try:
            response = storage_client.get_object(Bucket=bucket, Key=key)
            state_data = response['Body'].read().decode('utf-8')
            state = json.loads(state_data)
            ts = state.get("last_timestamp")
            if ts:
                return ts
        except storage_client.exceptions.NoSuchKey:
            logger.info("No previous state found, starting from default lookback")
        except Exception as e:
            logger.warning(f"State read error: {e}")
        return (datetime.now(timezone.utc) - timedelta(days=default_days)).isoformat()
    
    def _save_state(bucket, key, ts: str) -> None:
        storage_client.put_object(
            Bucket=bucket,
            Key=key,
            Body=json.dumps({"last_timestamp": ts}),
            ContentType="application/json"
        )
    
    def _get_json(url: str, headers: dict, params: dict, backoff_s=2, max_retries=3) -> dict:
        qs = f"?{urlencode(params)}" if params else ""
        for attempt in range(max_retries):
            r = HTTP.request("GET", f"{url}{qs}", headers=headers)
            if r.status == 200:
                return json.loads(r.data.decode("utf-8"))
            if r.status in (429, 500, 502, 503, 504):
                wait = backoff_s * (2 ** attempt)
                logger.warning(f"HTTP {r.status} from DS API, retrying in {wait}s")
                time.sleep(wait)
                continue
            raise RuntimeError(f"DS API error {r.status}: {r.data[:200]}")
        raise RuntimeError("Exceeded retry budget for DS API")
    
    def _collect(api_base, headers, path, since_ts, account_id, page_size, max_pages, time_param):
        items = []
        for page in range(max_pages):
            params = {
                "limit": page_size,
                "offset": page * page_size,
                time_param: since_ts,
            }
            if account_id:
                params["account-id"] = account_id
            data = _get_json(f"{api_base}/{path}", headers, params)
            batch = data.get("items") or data.get("data") or []
            if not batch:
                break
            items.extend(batch)
            if len(batch) < page_size:
                break
        return items
    
    def lambda_handler(event, context):
        bucket_name = os.environ["S3_BUCKET"]
        api_key = os.environ["DS_API_KEY"]
        api_secret = os.environ["DS_API_SECRET"]
        prefix = os.environ.get("S3_PREFIX", "digital-shadows")
        state_key = os.environ.get("STATE_KEY", "digital-shadows/state.json")
        api_base = os.environ.get("API_BASE", "https://api.searchlight.app/v1")
        account_id = os.environ.get("DS_ACCOUNT_ID", "")
        page_size = int(os.environ.get("PAGE_SIZE", "100"))
        max_pages = int(os.environ.get("MAX_PAGES", "10"))
    
        try:
            last_ts = _load_state(bucket_name, state_key)
            logger.info(f"Checkpoint: {last_ts}")
    
            headers = {
                "Authorization": _basic_auth_header(api_key, api_secret),
                "Accept": "application/json",
                "User-Agent": "Chronicle-DigitalShadows-S3/1.0",
            }
    
            records = []
    
            incidents = _collect(
                api_base, headers, "incidents", last_ts, account_id,
                page_size, max_pages, time_param="published-after"
            )
            for incident in incidents:
                incident['_source_type'] = 'incident'
            records.extend(incidents)
    
            intel_incidents = _collect(
                api_base, headers, "intel-incidents", last_ts, account_id,
                page_size, max_pages, time_param="published-after"
            )
            for intel in intel_incidents:
                intel['_source_type'] = 'intelligence_incident'
            records.extend(intel_incidents)
    
            indicators = _collect(
                api_base, headers, "indicators", last_ts, account_id,
                page_size, max_pages, time_param="lastUpdated-after"
            )
            for indicator in indicators:
                indicator['_source_type'] = 'ioc'
            records.extend(indicators)
    
            if records:
                newest = max(
                    (r.get("updated") or r.get("raised") or r.get("lastUpdated") or last_ts)
                    for r in records
                )
                key = f"{prefix}/digital_shadows_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}.json"
                body = "\n".join(json.dumps(r, separators=(",", ":")) for r in records)
                storage_client.put_object(
                    Bucket=bucket_name,
                    Key=key,
                    Body=body,
                    ContentType="application/x-ndjson"
                )
                _save_state(bucket_name, state_key, newest)
                msg = f"Wrote {len(records)} records to s3://{bucket_name}/{key}"
            else:
                msg = "No new records"
    
            logger.info(msg)
            return {'statusCode': 200, 'body': json.dumps({'message': msg, 'records': len(records) if records else 0})}
    
        except Exception as e:
            logger.error(f"Error processing logs: {str(e)}")
            raise
    
  5. Ve a Configuración > Variables de entorno > Editar > Agregar nueva variable de entorno.

  6. Ingresa las variables de entorno que se proporcionan a continuación y reemplaza los marcadores de posición por tus valores.

    Variables de entorno

    Clave Valor de ejemplo
    S3_BUCKET digital-shadows-logs
    S3_PREFIX digital-shadows/
    STATE_KEY digital-shadows/state.json
    DS_API_KEY ABC123 (tu clave de API de 6 caracteres)
    DS_API_SECRET your-32-character-api-secret
    API_BASE https://api.searchlight.app/v1
    DS_ACCOUNT_ID your-account-id
    PAGE_SIZE 100
    MAX_PAGES 10
  7. Después de crear la función, permanece en su página (o abre Lambda > Functions > DigitalShadowsCollector).

  8. Selecciona la pestaña Configuración.

  9. En el panel Configuración general, haz clic en Editar.

  10. Cambia Tiempo de espera a 5 minutos (300 segundos) y haz clic en Guardar.

Crea una programación de EventBridge

  1. Ve a Amazon EventBridge > Scheduler > Create schedule.
  2. Proporciona los siguientes detalles de configuración:
    • Programa recurrente: Frecuencia (1 hour)
    • Destino: Tu función Lambda DigitalShadowsCollector
    • Nombre: DigitalShadowsCollector-1h
  3. Haz clic en Crear programación.

Configura un feed en Google SecOps para transferir registros de indicadores de Digital Shadows

  1. Ve a Configuración de SIEM > Feeds.
  2. Haz clic en Agregar feed nuevo.
  3. En la siguiente página, haz clic en Configurar un solo feed.
  4. Ingresa un nombre único para el Nombre del feed.
  5. Selecciona Amazon S3 V2 como el Tipo de fuente.
  6. Selecciona Indicadores de sombras digitales como el Tipo de registro.
  7. Haz clic en Siguiente y, luego, en Enviar.
  8. Especifica valores para los siguientes campos:

    • URI de S3: s3://digital-shadows-logs/digital-shadows/
    • Opción de borrado de la fuente: Selecciona la opción de borrado según tu preferencia.
    • Antigüedad máxima del archivo: Incluye los archivos modificados en la cantidad de días más reciente (el valor predeterminado es 180 días).
    • ID de clave de acceso: Clave de acceso del usuario con acceso al bucket de S3
    • Clave de acceso secreta: Clave secreta del usuario con acceso al bucket de S3
    • Espacio de nombres del recurso: Es el espacio de nombres del recurso.
    • Etiquetas de transmisión: Es la etiqueta que se aplicará a los eventos de este feed.
  9. Haz clic en Siguiente y, luego, en Enviar.

Tabla de asignación de UDM

Campo de registro Asignación de UDM Lógica
valor entity.entity.file.md5 Se establece si type == "MD5".
valor entity.entity.file.sha1 Se establece si type == "SHA1"
valor entity.entity.file.sha256 Se establece si type == "SHA256"
valor entity.entity.hostname Establece si type == "HOST"
valor entity.entity.ip El valor se copia directamente si el tipo es "IP".
valor entity.entity.url Se establece si type == "URL"
valor entity.entity.user.email_addresses El valor se copia directamente si el tipo es "EMAIL".
tipo entity.metadata.entity_type Se establece en "DOMAIN_NAME" si type == "HOST", "IP_ADDRESS" si type == "IP", "URL" si type == "URL", "USER" si type == "EMAIL", "FILE" si type in ["SHA1","SHA256","MD5"], de lo contrario, "UNKNOWN_ENTITYTYPE"
lastUpdated entity.metadata.interval.start_time Se convierte de ISO8601 a marca de tiempo si no está vacío.
id entity.metadata.product_entity_id El valor se copia directamente si no está vacío.
attributionTag.id, attributionTag.name, attributionTag.type entity.metadata.threat.about.labels Se combina con los objetos {key: nombre del campo de etiqueta, value: valor de la etiqueta} si no está vacío.
sourceType entity.metadata.threat.category_details Valor copiado directamente
entity.metadata.threat.threat_feed_name Establece el valor en "Indicadores".
id entity.metadata.threat.threat_id El valor se copia directamente si no está vacío.
sourceIdentifier entity.metadata.threat.url_back_to_product Valor copiado directamente
entity.metadata.product_name Establece el valor en "Indicadores".
entity.metadata.vendor_name Se estableció en "Sombras digitales".

¿Necesitas más ayuda? Obtén respuestas de miembros de la comunidad y profesionales de Google SecOps.