Recopila registros de indicadores de Digital Shadows
En este documento, se explica cómo transferir registros de indicadores de Digital Shadows a Google Security Operations con Amazon S3.
Los indicadores de Digital Shadows (ahora parte de ReliaQuest GreyMatter DRP) son una plataforma de protección contra riesgos digitales que supervisa y detecta de forma continua las amenazas externas, las exposiciones de datos y la suplantación de marca en la Web abierta, la Web profunda, la Web oscura y las redes sociales. Proporciona inteligencia sobre amenazas, alertas de incidentes e indicadores de compromiso (IOC) para ayudar a las organizaciones a identificar y mitigar los riesgos digitales.
Antes de comenzar
- Una instancia de Google SecOps
- Acceso con privilegios al portal de Indicadores de Sombras Digitales
- Acceso con privilegios a AWS (S3, IAM)
- Suscripción activa a los indicadores de Digital Shadows con acceso a la API habilitado
Recopila credenciales de la API de Digital Shadows Indicators
- Accede al Portal de indicadores de Digital Shadows en
https://portal-digitalshadows.com. - Ve a Configuración > Credenciales de API.
- Si no tienes una clave de API existente, haz clic en Create New API Client o Generate API Key.
Copia y guarda los siguientes detalles en una ubicación segura:
- Clave de API: Tu clave de API de 6 caracteres
- Secreto de API: Tu secreto de API de 32 caracteres
- ID de la cuenta: Es el ID de tu cuenta (se muestra en el portal o te lo proporciona tu representante de Digital Shadows).
- URL base de la API:
https://api.searchlight.app/v1ohttps://portal-digitalshadows.com/api/v1(según la región de tu arrendatario)
Configura el bucket de AWS S3 y el IAM para Google SecOps
- Crea un bucket de Amazon S3 siguiendo esta guía del usuario: Cómo crear un bucket.
- Guarda el Nombre y la Región del bucket para futuras referencias (por ejemplo,
digital-shadows-logs). - Crea un usuario siguiendo esta guía del usuario: Cómo crear un usuario de IAM.
- Selecciona el usuario creado.
- Selecciona la pestaña Credenciales de seguridad.
- Haz clic en Crear clave de acceso en la sección Claves de acceso.
- Selecciona Servicio de terceros como el Caso de uso.
- Haz clic en Siguiente.
- Opcional: Agrega una etiqueta de descripción.
- Haz clic en Crear clave de acceso.
- Haz clic en Descargar archivo .csv para guardar la clave de acceso y la clave de acceso secreta para consultarlas en el futuro.
- Haz clic en Listo.
- Selecciona la pestaña Permisos.
- Haz clic en Agregar permisos en la sección Políticas de permisos.
- Selecciona Agregar permisos.
- Selecciona Adjuntar políticas directamente.
- Busca la política AmazonS3FullAccess.
- Selecciona la política.
- Haz clic en Siguiente.
- Haz clic en Agregar permisos.
Configura la política y el rol de IAM para las cargas de S3
- En la consola de AWS, ve a IAM > Políticas > Crear política > pestaña JSON.
- Copia y pega la siguiente política.
JSON de la política (reemplaza
digital-shadows-logssi ingresaste un nombre de bucket diferente):{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::digital-shadows-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::digital-shadows-logs/digital-shadows/state.json" } ] }Haz clic en Siguiente > Crear política.
Ve a IAM > Roles > Crear rol > Servicio de AWS > Lambda.
Adjunta la política recién creada.
Asigna el nombre
DigitalShadowsLambdaRoleal rol y haz clic en Crear rol.
Crea la función Lambda
- En la consola de AWS, ve a Lambda > Functions > Create function.
- Haz clic en Author from scratch.
Proporciona los siguientes detalles de configuración:
Configuración Valor Nombre DigitalShadowsCollectorTiempo de ejecución Python 3.13 Arquitectura x86_64 Rol de ejecución DigitalShadowsLambdaRoleDespués de crear la función, abre la pestaña Code, borra el código auxiliar y pega el siguiente código (DigitalShadowsCollector.py).
import urllib3 import json import boto3 import os import base64 import logging import time from datetime import datetime, timedelta, timezone from urllib.parse import urlencode logger = logging.getLogger() logger.setLevel(logging.INFO) HTTP = urllib3.PoolManager(retries=False) storage_client = boto3.client('s3') def _basic_auth_header(key: str, secret: str) -> str: token = base64.b64encode(f"{key}:{secret}".encode("utf-8")).decode("utf-8") return f"Basic {token}" def _load_state(bucket, key, default_days=30) -> str: """Return ISO8601 checkpoint (UTC).""" try: response = storage_client.get_object(Bucket=bucket, Key=key) state_data = response['Body'].read().decode('utf-8') state = json.loads(state_data) ts = state.get("last_timestamp") if ts: return ts except storage_client.exceptions.NoSuchKey: logger.info("No previous state found, starting from default lookback") except Exception as e: logger.warning(f"State read error: {e}") return (datetime.now(timezone.utc) - timedelta(days=default_days)).isoformat() def _save_state(bucket, key, ts: str) -> None: storage_client.put_object( Bucket=bucket, Key=key, Body=json.dumps({"last_timestamp": ts}), ContentType="application/json" ) def _get_json(url: str, headers: dict, params: dict, backoff_s=2, max_retries=3) -> dict: qs = f"?{urlencode(params)}" if params else "" for attempt in range(max_retries): r = HTTP.request("GET", f"{url}{qs}", headers=headers) if r.status == 200: return json.loads(r.data.decode("utf-8")) if r.status in (429, 500, 502, 503, 504): wait = backoff_s * (2 ** attempt) logger.warning(f"HTTP {r.status} from DS API, retrying in {wait}s") time.sleep(wait) continue raise RuntimeError(f"DS API error {r.status}: {r.data[:200]}") raise RuntimeError("Exceeded retry budget for DS API") def _collect(api_base, headers, path, since_ts, account_id, page_size, max_pages, time_param): items = [] for page in range(max_pages): params = { "limit": page_size, "offset": page * page_size, time_param: since_ts, } if account_id: params["account-id"] = account_id data = _get_json(f"{api_base}/{path}", headers, params) batch = data.get("items") or data.get("data") or [] if not batch: break items.extend(batch) if len(batch) < page_size: break return items def lambda_handler(event, context): bucket_name = os.environ["S3_BUCKET"] api_key = os.environ["DS_API_KEY"] api_secret = os.environ["DS_API_SECRET"] prefix = os.environ.get("S3_PREFIX", "digital-shadows") state_key = os.environ.get("STATE_KEY", "digital-shadows/state.json") api_base = os.environ.get("API_BASE", "https://api.searchlight.app/v1") account_id = os.environ.get("DS_ACCOUNT_ID", "") page_size = int(os.environ.get("PAGE_SIZE", "100")) max_pages = int(os.environ.get("MAX_PAGES", "10")) try: last_ts = _load_state(bucket_name, state_key) logger.info(f"Checkpoint: {last_ts}") headers = { "Authorization": _basic_auth_header(api_key, api_secret), "Accept": "application/json", "User-Agent": "Chronicle-DigitalShadows-S3/1.0", } records = [] incidents = _collect( api_base, headers, "incidents", last_ts, account_id, page_size, max_pages, time_param="published-after" ) for incident in incidents: incident['_source_type'] = 'incident' records.extend(incidents) intel_incidents = _collect( api_base, headers, "intel-incidents", last_ts, account_id, page_size, max_pages, time_param="published-after" ) for intel in intel_incidents: intel['_source_type'] = 'intelligence_incident' records.extend(intel_incidents) indicators = _collect( api_base, headers, "indicators", last_ts, account_id, page_size, max_pages, time_param="lastUpdated-after" ) for indicator in indicators: indicator['_source_type'] = 'ioc' records.extend(indicators) if records: newest = max( (r.get("updated") or r.get("raised") or r.get("lastUpdated") or last_ts) for r in records ) key = f"{prefix}/digital_shadows_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}.json" body = "\n".join(json.dumps(r, separators=(",", ":")) for r in records) storage_client.put_object( Bucket=bucket_name, Key=key, Body=body, ContentType="application/x-ndjson" ) _save_state(bucket_name, state_key, newest) msg = f"Wrote {len(records)} records to s3://{bucket_name}/{key}" else: msg = "No new records" logger.info(msg) return {'statusCode': 200, 'body': json.dumps({'message': msg, 'records': len(records) if records else 0})} except Exception as e: logger.error(f"Error processing logs: {str(e)}") raiseVe a Configuración > Variables de entorno > Editar > Agregar nueva variable de entorno.
Ingresa las variables de entorno que se proporcionan a continuación y reemplaza los marcadores de posición por tus valores.
Variables de entorno
Clave Valor de ejemplo S3_BUCKETdigital-shadows-logsS3_PREFIXdigital-shadows/STATE_KEYdigital-shadows/state.jsonDS_API_KEYABC123(tu clave de API de 6 caracteres)DS_API_SECRETyour-32-character-api-secretAPI_BASEhttps://api.searchlight.app/v1DS_ACCOUNT_IDyour-account-idPAGE_SIZE100MAX_PAGES10Después de crear la función, permanece en su página (o abre Lambda > Functions > DigitalShadowsCollector).
Selecciona la pestaña Configuración.
En el panel Configuración general, haz clic en Editar.
Cambia Tiempo de espera a 5 minutos (300 segundos) y haz clic en Guardar.
Crea una programación de EventBridge
- Ve a Amazon EventBridge > Scheduler > Create schedule.
- Proporciona los siguientes detalles de configuración:
- Programa recurrente: Frecuencia (
1 hour) - Destino: Tu función Lambda
DigitalShadowsCollector - Nombre:
DigitalShadowsCollector-1h
- Programa recurrente: Frecuencia (
- Haz clic en Crear programación.
Configura un feed en Google SecOps para transferir registros de indicadores de Digital Shadows
- Ve a Configuración de SIEM > Feeds.
- Haz clic en Agregar feed nuevo.
- En la siguiente página, haz clic en Configurar un solo feed.
- Ingresa un nombre único para el Nombre del feed.
- Selecciona Amazon S3 V2 como el Tipo de fuente.
- Selecciona Indicadores de sombras digitales como el Tipo de registro.
- Haz clic en Siguiente y, luego, en Enviar.
Especifica valores para los siguientes campos:
- URI de S3:
s3://digital-shadows-logs/digital-shadows/ - Opción de borrado de la fuente: Selecciona la opción de borrado según tu preferencia.
- Antigüedad máxima del archivo: Incluye los archivos modificados en la cantidad de días más reciente (el valor predeterminado es 180 días).
- ID de clave de acceso: Clave de acceso del usuario con acceso al bucket de S3
- Clave de acceso secreta: Clave secreta del usuario con acceso al bucket de S3
- Espacio de nombres del recurso: Es el espacio de nombres del recurso.
- Etiquetas de transmisión: Es la etiqueta que se aplicará a los eventos de este feed.
- URI de S3:
Haz clic en Siguiente y, luego, en Enviar.
Tabla de asignación de UDM
| Campo de registro | Asignación de UDM | Lógica |
|---|---|---|
| valor | entity.entity.file.md5 | Se establece si type == "MD5". |
| valor | entity.entity.file.sha1 | Se establece si type == "SHA1" |
| valor | entity.entity.file.sha256 | Se establece si type == "SHA256" |
| valor | entity.entity.hostname | Establece si type == "HOST" |
| valor | entity.entity.ip | El valor se copia directamente si el tipo es "IP". |
| valor | entity.entity.url | Se establece si type == "URL" |
| valor | entity.entity.user.email_addresses | El valor se copia directamente si el tipo es "EMAIL". |
| tipo | entity.metadata.entity_type | Se establece en "DOMAIN_NAME" si type == "HOST", "IP_ADDRESS" si type == "IP", "URL" si type == "URL", "USER" si type == "EMAIL", "FILE" si type in ["SHA1","SHA256","MD5"], de lo contrario, "UNKNOWN_ENTITYTYPE" |
| lastUpdated | entity.metadata.interval.start_time | Se convierte de ISO8601 a marca de tiempo si no está vacío. |
| id | entity.metadata.product_entity_id | El valor se copia directamente si no está vacío. |
| attributionTag.id, attributionTag.name, attributionTag.type | entity.metadata.threat.about.labels | Se combina con los objetos {key: nombre del campo de etiqueta, value: valor de la etiqueta} si no está vacío. |
| sourceType | entity.metadata.threat.category_details | Valor copiado directamente |
| entity.metadata.threat.threat_feed_name | Establece el valor en "Indicadores". | |
| id | entity.metadata.threat.threat_id | El valor se copia directamente si no está vacío. |
| sourceIdentifier | entity.metadata.threat.url_back_to_product | Valor copiado directamente |
| entity.metadata.product_name | Establece el valor en "Indicadores". | |
| entity.metadata.vendor_name | Se estableció en "Sombras digitales". |
¿Necesitas más ayuda? Obtén respuestas de miembros de la comunidad y profesionales de Google SecOps.