Recoger registros de indicadores de compromiso de Proofpoint Emerging Threats Pro
En este documento se explica cómo ingerir registros de indicadores de compromiso (IOCs) de Proofpoint Emerging Threats Pro en Google Security Operations mediante Amazon S3. Emerging Threats Intelligence publica listas de reputación cada hora para IPs y dominios en formato CSV con datos de inteligencia sobre amenazas, como categorías, puntuaciones e información temporal. El código del analizador procesa los datos de inteligencia sobre amenazas de ET_PRO con formato CSV. Extrae direcciones IP, dominios, categorías, puntuaciones y otra información relevante, y los asigna tanto a un formato de IOC estandarizado como al esquema de UDM de Chronicle para analizarlos y usarlos en Google SecOps.
Antes de empezar
Asegúrate de que cumples los siguientes requisitos previos:
- Una instancia de Google SecOps con permisos para crear feeds
- Suscripción a Proofpoint ET Intelligence con acceso a listas de reputación
- Clave de API de ET Intelligence de https://etadmin.proofpoint.com/api-access
- Acceso con privilegios a AWS (S3, IAM, Lambda y EventBridge)
Recopilar los requisitos previos de Emerging Threats Pro
- Inicia sesión en el portal de administración de ET Intelligence en https://etadmin.proofpoint.com.
- Ve a Acceso a la API.
- Copia y guarda tu clave de API.
- Ponte en contacto con tu representante de Proofpoint para obtener lo siguiente:
- URL de la lista detallada de reputación de IP
- URL de la lista de reputación de dominio detallada
ET Intelligence proporciona archivos CSV independientes para las listas de reputación de IPs y dominios, que se actualizan cada hora. Usa el formato "Detallado", que incluye estas columnas:
* Lista de dominios: Domain Name, Category, Score, First Seen, Last Seen, Ports
* Lista de IPs: IP Address, Category, Score, First Seen, Last Seen, Ports
Configurar un segmento de AWS S3 y la gestión de identidades y accesos
Crear un segmento de S3
- Abre la consola de Amazon S3.
- Haz clic en Crear segmento.
- Nombre del segmento: introduce
et-pro-ioc-bucket(o el nombre que prefieras). - Región: selecciona la región que prefieras.
- Haz clic en Crear segmento.
Crear un usuario de gestión de identidades y accesos para Google SecOps
- Abre la consola de IAM.
- Haz clic en Usuarios > Crear usuario.
- Nombre de usuario: introduce
secops-reader. - Haz clic en Siguiente.
- Seleccione Adjuntar políticas directamente.
- Haz clic en Crear política.
En el editor de JSON, introduce la siguiente política:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::et-pro-ioc-bucket/et-pro-ioc/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::et-pro-ioc-bucket", "Condition": { "StringLike": { "s3:prefix": ["et-pro-ioc/*"] } } } ] }Asigna un nombre a la política
SecOpsReaderPolicy.Haz clic en Crear política.
Vuelve a la creación de usuarios y selecciona la política que acabas de crear.
Haz clic en Siguiente > Crear usuario.
Ve a la pestaña Credenciales de seguridad.
Haz clic en Crear clave de acceso.
Selecciona Servicio de terceros.
Haz clic en Crear clave de acceso.
Descarga y guarda las credenciales.
Configurar el rol de gestión de identidades y accesos de Lambda
- En la consola de AWS, ve a IAM > Roles > Crear rol.
- Selecciona Servicio de AWS > Lambda.
- Haz clic en Siguiente.
- Haz clic en Crear política.
Selecciona la pestaña JSON e introduce lo siguiente:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::et-pro-ioc-bucket/et-pro-ioc/*" }, { "Sid": "AllowStateManagement", "Effect": "Allow", "Action": ["s3:GetObject", "s3:PutObject"], "Resource": "arn:aws:s3:::et-pro-ioc-bucket/et-pro-ioc/state.json" } ] }Asigna un nombre a la política
EtProIocLambdaPolicy.Haz clic en Crear política.
Vuelve a la creación de roles y adjunta la política.
Ponle un nombre al rol
EtProIocLambdaRole.Haz clic en Crear rol.
Crear la función Lambda
- En la consola de AWS, ve a Lambda > Funciones > Crear función.
- Haz clic en Crear desde cero.
Proporciona los siguientes detalles de configuración:
- Nombre de la función:
et-pro-ioc-fetcher - Tiempo de ejecución: Python 3.13
- Arquitectura: x86_64
- Rol de ejecución: usar un rol ya creado
EtProIocLambdaRole
- Nombre de la función:
Después de crearla, ve a la pestaña Código y sustitúyela por lo siguiente:
#!/usr/bin/env python3 # Lambda: Fetch ET Pro IOC reputation lists and write raw CSV to S3 import os import time import json from datetime import datetime from urllib.request import Request, urlopen from urllib.error import HTTPError, URLError import boto3 # Environment variables BUCKET = os.environ["S3_BUCKET"] PREFIX = os.environ.get("S3_PREFIX", "et-pro-ioc/").strip("/") ET_API_KEY = os.environ["ET_API_KEY"] ET_IP_LIST_URL = os.environ["ET_IP_LIST_URL"] ET_DOMAIN_LIST_URL = os.environ["ET_DOMAIN_LIST_URL"] STATE_KEY = os.environ.get("STATE_KEY", f"{PREFIX}/state.json") TIMEOUT = int(os.environ.get("TIMEOUT", "120")) s3 = boto3.client("s3") def _build_request(url: str) -> Request: """Build request with ET API authentication""" if not url.lower().startswith("https://"): raise ValueError("Only HTTPS URLs are allowed") req = Request(url, method="GET") # ET Intelligence uses Authorization header with API key req.add_header("Authorization", ET_API_KEY) return req def fetch_with_retry(url: str, max_retries: int = 3) -> bytes: """Fetch URL with retry logic for rate limits""" for attempt in range(max_retries): try: req = _build_request(url) with urlopen(req, timeout=TIMEOUT) as response: if response.status == 200: return response.read() elif response.status == 429: # Rate limited, wait and retry wait_time = min(30 * (2 ** attempt), 300) print(f"Rate limited, waiting {wait_time}s...") time.sleep(wait_time) else: raise HTTPError(url, response.status, response.reason, {}, None) except URLError as e: if attempt == max_retries - 1: raise time.sleep(5 * (attempt + 1)) raise Exception(f"Failed to fetch {url} after {max_retries} attempts") def save_to_s3(key: str, content: bytes): """Save content to S3 with appropriate content type""" s3.put_object( Bucket=BUCKET, Key=key, Body=content, ContentType="text/csv" ) print(f"Saved {len(content)} bytes to s3://{BUCKET}/{key}") def get_state(): """Get last fetch state from S3""" try: response = s3.get_object(Bucket=BUCKET, Key=STATE_KEY) return json.loads(response['Body'].read()) except: return {} def save_state(state: dict): """Save fetch state to S3""" s3.put_object( Bucket=BUCKET, Key=STATE_KEY, Body=json.dumps(state, indent=2), ContentType="application/json" ) def lambda_handler(event, context): """Main Lambda handler""" print("Starting ET Pro IOC fetch") # Generate timestamp for file naming now = datetime.utcnow() timestamp = now.strftime("%Y/%m/%d/%H%M%S") results = [] errors = [] # Fetch IP reputation list try: print(f"Fetching IP reputation list...") ip_data = fetch_with_retry(ET_IP_LIST_URL) ip_key = f"{PREFIX}/ip/{timestamp}.csv" save_to_s3(ip_key, ip_data) results.append({"type": "ip", "key": ip_key, "size": len(ip_data)}) except Exception as e: error_msg = f"Failed to fetch IP list: {str(e)}" print(error_msg) errors.append(error_msg) # Fetch Domain reputation list try: print(f"Fetching Domain reputation list...") domain_data = fetch_with_retry(ET_DOMAIN_LIST_URL) domain_key = f"{PREFIX}/domain/{timestamp}.csv" save_to_s3(domain_key, domain_data) results.append({"type": "domain", "key": domain_key, "size": len(domain_data)}) except Exception as e: error_msg = f"Failed to fetch Domain list: {str(e)}" print(error_msg) errors.append(error_msg) # Save state state = { "last_fetch": now.isoformat(), "results": results, "errors": errors } save_state(state) return { "statusCode": 200 if not errors else 207, "body": json.dumps(state) }Vaya a Configuración > Configuración general.
Haz clic en Editar.
Define Tiempo de espera en 5 minutos.
Haz clic en Guardar.
Configurar variables de entorno
- Vaya a Configuración > Variables de entorno.
- Haz clic en Editar > Añadir variable de entorno.
Añade las siguientes variables:
Clave Valor S3_BUCKETet-pro-ioc-bucketS3_PREFIXet-pro-iocSTATE_KEYet-pro-ioc/state.jsonET_API_KEY[Your ET API Key]ET_IP_LIST_URL[Your detailed IP list URL]ET_DOMAIN_LIST_URL[Your detailed Domain list URL]TIMEOUT120Haz clic en Guardar.
Ponte en contacto con tu representante de Proofpoint para obtener las URLs exactas de tu suscripción. Las URLs de formato detallado suelen seguir este patrón:
* Lista de IPs: https://rules.emergingthreatspro.com/[your-code]/reputation/detailed-iprepdata.txt
* Lista de dominios: https://rules.emergingthreatspro.com/[your-code]/reputation/detailed-domainrepdata.txt
Crear una programación de EventBridge
- Ve a Amazon EventBridge > Schedules > Create schedule (Amazon EventBridge > Programaciones > Crear programación).
- Nombre de la programación:
et-pro-ioc-hourly - Patrón de programación: programación basada en tarifas
- Tasa de expresión: 1 hora
- Haz clic en Siguiente.
- Destino: función Lambda
- Función:
et-pro-ioc-fetcher - Haz clic en Siguiente para completar los pasos restantes.
- Haz clic en Crear programación.
Configurar feeds en Google SecOps
Debe crear dos feeds independientes: uno para la reputación de la IP y otro para la reputación del dominio.
Crear un feed de reputación de IP
- Ve a Configuración de SIEM > Feeds.
- Haz clic en Añadir nuevo.
- En el campo Nombre del feed, introduce
ET Pro IOC - IP Reputation. - En la lista Tipo de fuente, selecciona Amazon S3.
- Seleccione Emerging Threats Pro como Tipo de registro.
- Haz clic en Siguiente.
- Especifique valores para los siguientes parámetros de entrada:
- URI de S3:
s3://et-pro-ioc-bucket/et-pro-ioc/ip/ - Opciones de eliminación de la fuente: selecciona la que prefieras
- Antigüedad máxima del archivo: incluye los archivos modificados en los últimos días. El valor predeterminado es de 180 días.
- ID de clave de acceso: clave de acceso de lectura de SecOps
- Clave de acceso secreta: clave secreta de lector de SecOps
- Espacio de nombres de recursos: el espacio de nombres de recursos.
- Etiquetas de ingestión: la etiqueta aplicada a los eventos de este feed.
- URI de S3:
- Haz clic en Siguiente.
- Revisa la información y haz clic en Enviar.
Crear un feed de reputación de dominio
- Repite el proceso de creación del feed.
- En el campo Nombre del feed, introduce
ET Pro IOC - Domain Reputation. - En la lista Tipo de fuente, selecciona Amazon S3.
- Seleccione Emerging Threats Pro como Tipo de registro.
- Haz clic en Siguiente.
- Especifique valores para los siguientes parámetros de entrada:
- URI de S3:
s3://et-pro-ioc-bucket/et-pro-ioc/domain/ - Opciones de eliminación de la fuente: selecciona la que prefieras
- Antigüedad máxima del archivo: incluye los archivos modificados en los últimos días. El valor predeterminado es de 180 días.
- ID de clave de acceso: clave de acceso de lectura de SecOps
- Clave de acceso secreta: clave secreta de lector de SecOps
- Espacio de nombres de recursos: el espacio de nombres de recursos.
- Etiquetas de ingestión: la etiqueta aplicada a los eventos de este feed.
- URI de S3:
- Haz clic en Siguiente.
- Revisa la información y haz clic en Enviar.
Tabla de asignación de UDM
| Campo de registro | Asignación de UDM | Lógica |
|---|---|---|
| category | Este campo se usa en la lógica del analizador, pero no se asigna directamente al UDM. Determina el valor de event.ioc.categorization mediante una tabla de consulta. |
|
| collection_time.nanos | event.idm.entity.metadata.collected_timestamp.nanos | Se asigna directamente desde el registro sin procesar. |
| collection_time.seconds | event.idm.entity.metadata.collected_timestamp.seconds | Se asigna directamente desde el registro sin procesar. |
| datos | Este campo se desglosa en varios campos UDM en función de su contenido. | |
| first_seen | event.idm.entity.metadata.interval.start_time | Se analiza como una fecha y se asigna al UDM. |
| first_seen | event.ioc.active_timerange.start | Se analiza como una fecha y se asigna al UDM. |
| ip_or_domain | event.idm.entity.entity.hostname | Se asigna a UDM si el patrón grok extrae un host del campo. |
| ip_or_domain | event.idm.entity.entity.ip | Se asigna a UDM si el patrón grok no extrae un host del campo. |
| ip_or_domain | event.ioc.domain_and_ports.domain | Se asigna a UDM si el patrón grok extrae un host del campo. |
| ip_or_domain | event.ioc.ip_and_ports.ip_address | Se asigna a UDM si el patrón grok no extrae un host del campo. |
| last_seen | event.idm.entity.metadata.interval.end_time | Se analiza como una fecha y se asigna al UDM. |
| last_seen | event.ioc.active_timerange.end | Se analiza como una fecha y se asigna al UDM. |
| puertos | event.idm.entity.entity.labels.value | Analizado, unido con un delimitador de comas y asignado al UDM si hay varios puertos. |
| puertos | event.idm.entity.entity.port | Se analiza y se asigna a UDM si solo hay un puerto. |
| puertos | event.ioc.domain_and_ports.ports | Se analiza y se asigna a UDM si el patrón grok extrae un host del campo. |
| puertos | event.ioc.ip_and_ports.ports | Se analiza y se asigna al UDM si el patrón grok no extrae un host del campo. |
| puntuación | event.ioc.confidence_score | Se asigna directamente desde el registro sin procesar. |
| event.idm.entity.entity.labels.key | Asigna el valor "ports" si hay varios puertos. | |
| event.idm.entity.metadata.entity_type | Asigna el valor "DOMAIN_NAME" si el patrón grok extrae un host del campo ip_or_domain. De lo contrario, asigna el valor "IP_ADDRESS". |
|
| event.idm.entity.metadata.threat.category | Se ha definido como "SOFTWARE_MALICIOUS". | |
| event.idm.entity.metadata.threat.category_details | Derivado del campo category mediante una tabla de consulta. |
|
| event.idm.entity.metadata.threat.threat_name | Selecciona "ET Intelligence Rep List". | |
| event.idm.entity.metadata.vendor_name | Se ha definido como "ET_PRO_IOC". | |
| event.ioc.feed_name | Selecciona "ET Intelligence Rep List". | |
| event.ioc.raw_severity | Se ha definido como "Malicioso". | |
| timestamp.nanos | Copiado de collection_time.nanos. |
|
| timestamp.seconds | Copiado de collection_time.seconds. |
¿Necesitas más ayuda? Recibe respuestas de los miembros de la comunidad y de los profesionales de Google SecOps.