Recopila registros de IOC de Proofpoint Emerging Threats Pro
En este documento, se explica cómo transferir registros de IOC de Proofpoint Emerging Threats Pro a Google Security Operations con Amazon S3. Emerging Threats Intelligence publica listas de reputación por hora para IPs y dominios en formato CSV con datos de inteligencia sobre amenazas, incluidas categorías, puntuaciones e información temporal. El código del analizador procesa los datos de inteligencia sobre amenazas de ET_PRO con formato CSV. Extrae direcciones IP, dominios, categorías, puntuaciones y otra información pertinente, y los asigna a un formato de IOC estandarizado y al esquema del UDM de Chronicle para su posterior análisis y uso en SecOps de Google.
Antes de comenzar
Asegúrate de cumplir con los siguientes requisitos previos:
- Una instancia de Google SecOps con permisos para crear feeds
- Suscripción a Proofpoint ET Intelligence con acceso a listas de reputación
- Clave de la API de ET Intelligence de https://etadmin.proofpoint.com/api-access
- Acceso con privilegios a AWS (S3, IAM, Lambda, EventBridge)
Recopila los requisitos previos de Emerging Threats Pro
- Accede al ET Intelligence Admin Portal en https://etadmin.proofpoint.com.
- Ve a Acceso a la API.
- Copia y guarda tu clave de API
- Comunícate con tu representante de Proofpoint para obtener lo siguiente:
- URL de la lista detallada de reputación de IP
- URL de la lista detallada de reputación del dominio
ET Intelligence proporciona archivos CSV separados para las listas de reputación de IP y dominio, que se actualizan cada hora. Utiliza el formato "detallado", que incluye las siguientes columnas:
* Lista de dominios: Domain Name, Category, Score, First Seen, Last Seen, Ports
* Lista de IPs: IP Address, Category, Score, First Seen, Last Seen, Ports
Configura el bucket de AWS S3 y el IAM
Crea un bucket de S3
- Abre la consola de Amazon S3.
- Haz clic en Crear depósito.
- Nombre del bucket: Ingresa
et-pro-ioc-bucket(o el nombre que prefieras). - Región: Selecciona tu región preferida.
- Haz clic en Crear depósito.
Crea un usuario de IAM para Google SecOps
- Abre la consola de IAM.
- Haz clic en Usuarios > Crear usuario.
- Nombre de usuario: Ingresa
secops-reader. - Haz clic en Siguiente.
- Selecciona Adjuntar políticas directamente.
- Haz clic en Crear política.
En el editor de JSON, ingresa la siguiente política:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::et-pro-ioc-bucket/et-pro-ioc/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::et-pro-ioc-bucket", "Condition": { "StringLike": { "s3:prefix": ["et-pro-ioc/*"] } } } ] }Asigna a la política el nombre
SecOpsReaderPolicy.Haz clic en Crear política.
Vuelve a la creación de usuarios y selecciona la política recién creada.
Haz clic en Siguiente > Crear usuario.
Ve a la pestaña Credenciales de seguridad.
Haz clic en Crear clave de acceso.
Selecciona Servicio de terceros.
Haz clic en Crear clave de acceso.
Descarga y guarda las credenciales.
Configura el rol de IAM para Lambda
- En la consola de AWS, ve a IAM > Roles > Create role.
- Selecciona Servicio de AWS > Lambda.
- Haz clic en Siguiente.
- Haz clic en Crear política.
Selecciona la pestaña JSON y, luego, ingresa lo siguiente:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::et-pro-ioc-bucket/et-pro-ioc/*" }, { "Sid": "AllowStateManagement", "Effect": "Allow", "Action": ["s3:GetObject", "s3:PutObject"], "Resource": "arn:aws:s3:::et-pro-ioc-bucket/et-pro-ioc/state.json" } ] }Asigna a la política el nombre
EtProIocLambdaPolicy.Haz clic en Crear política.
Regresa a la creación de roles y adjunta la política.
Asigna el nombre
EtProIocLambdaRoleal rol.Haz clic en Crear rol.
Crea la función Lambda
- En la consola de AWS, ve a Lambda > Functions > Create function.
- Haz clic en Crear desde cero.
Proporciona los siguientes detalles de configuración:
- Nombre de la función:
et-pro-ioc-fetcher - Entorno de ejecución: Python 3.13
- Arquitectura: x86_64
- Rol de ejecución: Usar rol existente
EtProIocLambdaRole
- Nombre de la función:
Después de la creación, ve a la pestaña Code y reemplaza el contenido por lo siguiente:
#!/usr/bin/env python3 # Lambda: Fetch ET Pro IOC reputation lists and write raw CSV to S3 import os import time import json from datetime import datetime from urllib.request import Request, urlopen from urllib.error import HTTPError, URLError import boto3 # Environment variables BUCKET = os.environ["S3_BUCKET"] PREFIX = os.environ.get("S3_PREFIX", "et-pro-ioc/").strip("/") ET_API_KEY = os.environ["ET_API_KEY"] ET_IP_LIST_URL = os.environ["ET_IP_LIST_URL"] ET_DOMAIN_LIST_URL = os.environ["ET_DOMAIN_LIST_URL"] STATE_KEY = os.environ.get("STATE_KEY", f"{PREFIX}/state.json") TIMEOUT = int(os.environ.get("TIMEOUT", "120")) s3 = boto3.client("s3") def _build_request(url: str) -> Request: """Build request with ET API authentication""" if not url.lower().startswith("https://"): raise ValueError("Only HTTPS URLs are allowed") req = Request(url, method="GET") # ET Intelligence uses Authorization header with API key req.add_header("Authorization", ET_API_KEY) return req def fetch_with_retry(url: str, max_retries: int = 3) -> bytes: """Fetch URL with retry logic for rate limits""" for attempt in range(max_retries): try: req = _build_request(url) with urlopen(req, timeout=TIMEOUT) as response: if response.status == 200: return response.read() elif response.status == 429: # Rate limited, wait and retry wait_time = min(30 * (2 ** attempt), 300) print(f"Rate limited, waiting {wait_time}s...") time.sleep(wait_time) else: raise HTTPError(url, response.status, response.reason, {}, None) except URLError as e: if attempt == max_retries - 1: raise time.sleep(5 * (attempt + 1)) raise Exception(f"Failed to fetch {url} after {max_retries} attempts") def save_to_s3(key: str, content: bytes): """Save content to S3 with appropriate content type""" s3.put_object( Bucket=BUCKET, Key=key, Body=content, ContentType="text/csv" ) print(f"Saved {len(content)} bytes to s3://{BUCKET}/{key}") def get_state(): """Get last fetch state from S3""" try: response = s3.get_object(Bucket=BUCKET, Key=STATE_KEY) return json.loads(response['Body'].read()) except: return {} def save_state(state: dict): """Save fetch state to S3""" s3.put_object( Bucket=BUCKET, Key=STATE_KEY, Body=json.dumps(state, indent=2), ContentType="application/json" ) def lambda_handler(event, context): """Main Lambda handler""" print("Starting ET Pro IOC fetch") # Generate timestamp for file naming now = datetime.utcnow() timestamp = now.strftime("%Y/%m/%d/%H%M%S") results = [] errors = [] # Fetch IP reputation list try: print(f"Fetching IP reputation list...") ip_data = fetch_with_retry(ET_IP_LIST_URL) ip_key = f"{PREFIX}/ip/{timestamp}.csv" save_to_s3(ip_key, ip_data) results.append({"type": "ip", "key": ip_key, "size": len(ip_data)}) except Exception as e: error_msg = f"Failed to fetch IP list: {str(e)}" print(error_msg) errors.append(error_msg) # Fetch Domain reputation list try: print(f"Fetching Domain reputation list...") domain_data = fetch_with_retry(ET_DOMAIN_LIST_URL) domain_key = f"{PREFIX}/domain/{timestamp}.csv" save_to_s3(domain_key, domain_data) results.append({"type": "domain", "key": domain_key, "size": len(domain_data)}) except Exception as e: error_msg = f"Failed to fetch Domain list: {str(e)}" print(error_msg) errors.append(error_msg) # Save state state = { "last_fetch": now.isoformat(), "results": results, "errors": errors } save_state(state) return { "statusCode": 200 if not errors else 207, "body": json.dumps(state) }Ve a Configuración > Configuración general.
Haz clic en Editar.
Establece el Tiempo de espera en 5 minutos.
Haz clic en Guardar.
Configure las variables de entorno
- Ve a Configuration > Environment variables.
- Haz clic en Editar > Agregar variable de entorno.
Agrega las siguientes variables:
Clave Valor S3_BUCKETet-pro-ioc-bucketS3_PREFIXet-pro-iocSTATE_KEYet-pro-ioc/state.jsonET_API_KEY[Your ET API Key]ET_IP_LIST_URL[Your detailed IP list URL]ET_DOMAIN_LIST_URL[Your detailed Domain list URL]TIMEOUT120Haga clic en Guardar.
Comunícate con tu representante de Proofpoint para obtener las URLs exactas de tu suscripción. Por lo general, las URLs de formato detallado siguen este patrón:
* Lista de IPs: https://rules.emergingthreatspro.com/[your-code]/reputation/detailed-iprepdata.txt
* Lista de dominios: https://rules.emergingthreatspro.com/[your-code]/reputation/detailed-domainrepdata.txt
Crea una programación de EventBridge
- Ve a Amazon EventBridge > Schedules > Create schedule.
- Nombre de la programación:
et-pro-ioc-hourly - Patrón de programación: Programación basada en tarifas
- Expresión de tarifa: 1 hora
- Haz clic en Siguiente.
- Objetivo: Función Lambda
- Función:
et-pro-ioc-fetcher - Haz clic en Siguiente para completar los pasos restantes.
- Haz clic en Crear programación.
Configura feeds en Google SecOps
Debes crear dos feeds separados: uno para la reputación de la IP y otro para la reputación del dominio.
Crea un feed de reputación de IP
- Ve a Configuración de SIEM > Feeds.
- Haz clic en Agregar nuevo.
- En el campo Nombre del feed, ingresa
ET Pro IOC - IP Reputation. - En la lista Tipo de fuente, selecciona Amazon S3.
- Selecciona Emerging Threats Pro como el Tipo de registro.
- Haz clic en Siguiente.
- Especifica valores para los siguientes parámetros de entrada:
- URI de S3:
s3://et-pro-ioc-bucket/et-pro-ioc/ip/ - Opciones de eliminación de la fuente: Selecciona la opción que prefieras.
- Antigüedad máxima del archivo: Incluye los archivos modificados en la cantidad de días especificada. El valor predeterminado es de 180 días.
- ID de clave de acceso: Clave de acceso de lector de SecOps
- Clave de acceso secreta: Clave secreta de lector de SecOps
- Espacio de nombres del recurso: Es el espacio de nombres del recurso.
- Etiquetas de transmisión: Es la etiqueta que se aplica a los eventos de este feed.
- URI de S3:
- Haz clic en Siguiente.
- Revisa y haz clic en Enviar.
Crea un feed de reputación del dominio
- Repite el proceso de creación del feed.
- En el campo Nombre del feed, ingresa
ET Pro IOC - Domain Reputation. - En la lista Tipo de fuente, selecciona Amazon S3.
- Selecciona Emerging Threats Pro como el Tipo de registro.
- Haz clic en Siguiente.
- Especifica valores para los siguientes parámetros de entrada:
- URI de S3:
s3://et-pro-ioc-bucket/et-pro-ioc/domain/ - Opciones de eliminación de la fuente: Selecciona la opción que prefieras.
- Antigüedad máxima del archivo: Incluye los archivos modificados en la cantidad de días especificada. El valor predeterminado es de 180 días.
- ID de clave de acceso: Clave de acceso de lector de SecOps
- Clave de acceso secreta: Clave secreta de lector de SecOps
- Espacio de nombres del recurso: Es el espacio de nombres del recurso.
- Etiquetas de transmisión: Es la etiqueta que se aplica a los eventos de este feed.
- URI de S3:
- Haz clic en Siguiente.
- Revisa y haz clic en Enviar.
Tabla de asignación de UDM
| Campo de registro | Asignación de UDM | Lógica |
|---|---|---|
| category | Este campo se usa en la lógica del analizador, pero no se asigna directamente al UDM. Determina el valor de event.ioc.categorization a través de una tabla de búsqueda. |
|
| collection_time.nanos | event.idm.entity.metadata.collected_timestamp.nanos | Se asigna directamente desde el registro sin procesar. |
| collection_time.seconds | event.idm.entity.metadata.collected_timestamp.seconds | Se asigna directamente desde el registro sin procesar. |
| datos | Este campo se analiza en varios campos del UDM según su contenido. | |
| first_seen | event.idm.entity.metadata.interval.start_time | Se analizó como una fecha y se asignó al UDM. |
| first_seen | event.ioc.active_timerange.start | Se analizó como una fecha y se asignó al UDM. |
| ip_or_domain | event.idm.entity.entity.hostname | Se asigna al UDM si el patrón de Grok extrae un host del campo. |
| ip_or_domain | event.idm.entity.entity.ip | Se asigna al UDM si el patrón de Grok no extrae un host del campo. |
| ip_or_domain | event.ioc.domain_and_ports.domain | Se asigna al UDM si el patrón de Grok extrae un host del campo. |
| ip_or_domain | event.ioc.ip_and_ports.ip_address | Se asigna al UDM si el patrón de Grok no extrae un host del campo. |
| last_seen | event.idm.entity.metadata.interval.end_time | Se analizó como una fecha y se asignó al UDM. |
| last_seen | event.ioc.active_timerange.end | Se analizó como una fecha y se asignó al UDM. |
| puertos | event.idm.entity.entity.labels.value | Se analiza, se une con un delimitador de comas y se asigna al UDM si hay varios puertos. |
| puertos | event.idm.entity.entity.port | Se analiza y se asigna al UDM si solo hay un puerto. |
| puertos | event.ioc.domain_and_ports.ports | Se analiza y se asigna al UDM si el patrón de Grok extrae un host del campo. |
| puertos | event.ioc.ip_and_ports.ports | Se analiza y se asigna al UDM si el patrón de Grok no extrae un host del campo. |
| puntuación | event.ioc.confidence_score | Se asigna directamente desde el registro sin procesar. |
| event.idm.entity.entity.labels.key | Se establece en "ports" si hay varios puertos. | |
| event.idm.entity.metadata.entity_type | Se establece en "DOMAIN_NAME" si el patrón de Grok extrae un host del campo ip_or_domain; de lo contrario, se establece en "IP_ADDRESS". |
|
| event.idm.entity.metadata.threat.category | Se establece en "SOFTWARE_MALICIOUS". | |
| event.idm.entity.metadata.threat.category_details | Se deriva del campo category con una tabla de búsqueda. |
|
| event.idm.entity.metadata.threat.threat_name | Se debe establecer en "ET Intelligence Rep List". | |
| event.idm.entity.metadata.vendor_name | Se debe establecer en "ET_PRO_IOC". | |
| event.ioc.feed_name | Se debe establecer en "ET Intelligence Rep List". | |
| event.ioc.raw_severity | Se estableció como "Malicioso". | |
| timestamp.nanos | Se copió desde collection_time.nanos. |
|
| timestamp.seconds | Se copió desde collection_time.seconds. |
¿Necesitas más ayuda? Obtén respuestas de miembros de la comunidad y profesionales de Google SecOps.