Recoger registros de indicadores de compromiso de Proofpoint Emerging Threats Pro

Disponible en:

En este documento se explica cómo ingerir registros de indicadores de compromiso (IOCs) de Proofpoint Emerging Threats Pro en Google Security Operations mediante Amazon S3. Emerging Threats Intelligence publica listas de reputación cada hora para IPs y dominios en formato CSV con datos de inteligencia sobre amenazas, como categorías, puntuaciones e información temporal. El código del analizador procesa los datos de inteligencia sobre amenazas de ET_PRO con formato CSV. Extrae direcciones IP, dominios, categorías, puntuaciones y otra información relevante, y los asigna tanto a un formato de IOC estandarizado como al esquema de UDM de Chronicle para analizarlos y usarlos en Google SecOps.

Antes de empezar

Asegúrate de que cumples los siguientes requisitos previos:

  • Una instancia de Google SecOps con permisos para crear feeds
  • Suscripción a Proofpoint ET Intelligence con acceso a listas de reputación
  • Clave de API de ET Intelligence de https://etadmin.proofpoint.com/api-access
  • Acceso con privilegios a AWS (S3, IAM, Lambda y EventBridge)

Recopilar los requisitos previos de Emerging Threats Pro

  1. Inicia sesión en el portal de administración de ET Intelligence en https://etadmin.proofpoint.com.
  2. Ve a Acceso a la API.
  3. Copia y guarda tu clave de API.
  4. Ponte en contacto con tu representante de Proofpoint para obtener lo siguiente:
    • URL de la lista detallada de reputación de IP
    • URL de la lista de reputación de dominio detallada

ET Intelligence proporciona archivos CSV independientes para las listas de reputación de IPs y dominios, que se actualizan cada hora. Usa el formato "Detallado", que incluye estas columnas: * Lista de dominios: Domain Name, Category, Score, First Seen, Last Seen, Ports * Lista de IPs: IP Address, Category, Score, First Seen, Last Seen, Ports

Configurar un segmento de AWS S3 y la gestión de identidades y accesos

Crear un segmento de S3

  1. Abre la consola de Amazon S3.
  2. Haz clic en Crear segmento.
  3. Nombre del segmento: introduce et-pro-ioc-bucket (o el nombre que prefieras).
  4. Región: selecciona la región que prefieras.
  5. Haz clic en Crear segmento.

Crear un usuario de gestión de identidades y accesos para Google SecOps

  1. Abre la consola de IAM.
  2. Haz clic en Usuarios > Crear usuario.
  3. Nombre de usuario: introduce secops-reader.
  4. Haz clic en Siguiente.
  5. Seleccione Adjuntar políticas directamente.
  6. Haz clic en Crear política.
  7. En el editor de JSON, introduce la siguiente política:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::et-pro-ioc-bucket/et-pro-ioc/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::et-pro-ioc-bucket",
          "Condition": {
            "StringLike": {
              "s3:prefix": ["et-pro-ioc/*"]
            }
          }
        }
      ]
    }
    
  8. Asigna un nombre a la política SecOpsReaderPolicy.

  9. Haz clic en Crear política.

  10. Vuelve a la creación de usuarios y selecciona la política que acabas de crear.

  11. Haz clic en Siguiente > Crear usuario.

  12. Ve a la pestaña Credenciales de seguridad.

  13. Haz clic en Crear clave de acceso.

  14. Selecciona Servicio de terceros.

  15. Haz clic en Crear clave de acceso.

  16. Descarga y guarda las credenciales.

Configurar el rol de gestión de identidades y accesos de Lambda

  1. En la consola de AWS, ve a IAM > Roles > Crear rol.
  2. Selecciona Servicio de AWS > Lambda.
  3. Haz clic en Siguiente.
  4. Haz clic en Crear política.
  5. Selecciona la pestaña JSON e introduce lo siguiente:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::et-pro-ioc-bucket/et-pro-ioc/*"
        },
        {
          "Sid": "AllowStateManagement",
          "Effect": "Allow",
          "Action": ["s3:GetObject", "s3:PutObject"],
          "Resource": "arn:aws:s3:::et-pro-ioc-bucket/et-pro-ioc/state.json"
        }
      ]
    }
    
  6. Asigna un nombre a la política EtProIocLambdaPolicy.

  7. Haz clic en Crear política.

  8. Vuelve a la creación de roles y adjunta la política.

  9. Ponle un nombre al rol EtProIocLambdaRole.

  10. Haz clic en Crear rol.

Crear la función Lambda

  1. En la consola de AWS, ve a Lambda > Funciones > Crear función.
  2. Haz clic en Crear desde cero.
  3. Proporciona los siguientes detalles de configuración:

    • Nombre de la función: et-pro-ioc-fetcher
    • Tiempo de ejecución: Python 3.13
    • Arquitectura: x86_64
    • Rol de ejecución: usar un rol ya creado EtProIocLambdaRole
  4. Después de crearla, ve a la pestaña Código y sustitúyela por lo siguiente:

    #!/usr/bin/env python3
    # Lambda: Fetch ET Pro IOC reputation lists and write raw CSV to S3
    import os
    import time
    import json
    from datetime import datetime
    from urllib.request import Request, urlopen
    from urllib.error import HTTPError, URLError
    import boto3
    
    # Environment variables
    BUCKET = os.environ["S3_BUCKET"]
    PREFIX = os.environ.get("S3_PREFIX", "et-pro-ioc/").strip("/")
    ET_API_KEY = os.environ["ET_API_KEY"]
    ET_IP_LIST_URL = os.environ["ET_IP_LIST_URL"]
    ET_DOMAIN_LIST_URL = os.environ["ET_DOMAIN_LIST_URL"]
    STATE_KEY = os.environ.get("STATE_KEY", f"{PREFIX}/state.json")
    TIMEOUT = int(os.environ.get("TIMEOUT", "120"))
    
    s3 = boto3.client("s3")
    
    def _build_request(url: str) -> Request:
        """Build request with ET API authentication"""
        if not url.lower().startswith("https://"):
            raise ValueError("Only HTTPS URLs are allowed")
    
        req = Request(url, method="GET")
        # ET Intelligence uses Authorization header with API key
        req.add_header("Authorization", ET_API_KEY)
        return req
    
    def fetch_with_retry(url: str, max_retries: int = 3) -> bytes:
        """Fetch URL with retry logic for rate limits"""
        for attempt in range(max_retries):
            try:
                req = _build_request(url)
                with urlopen(req, timeout=TIMEOUT) as response:
                    if response.status == 200:
                        return response.read()
                    elif response.status == 429:
                        # Rate limited, wait and retry
                        wait_time = min(30 * (2 ** attempt), 300)
                        print(f"Rate limited, waiting {wait_time}s...")
                        time.sleep(wait_time)
                    else:
                        raise HTTPError(url, response.status, response.reason, {}, None)
            except URLError as e:
                if attempt == max_retries - 1:
                    raise
                time.sleep(5 * (attempt + 1))
    
        raise Exception(f"Failed to fetch {url} after {max_retries} attempts")
    
    def save_to_s3(key: str, content: bytes):
        """Save content to S3 with appropriate content type"""
        s3.put_object(
            Bucket=BUCKET,
            Key=key,
            Body=content,
            ContentType="text/csv"
        )
        print(f"Saved {len(content)} bytes to s3://{BUCKET}/{key}")
    
    def get_state():
        """Get last fetch state from S3"""
        try:
            response = s3.get_object(Bucket=BUCKET, Key=STATE_KEY)
            return json.loads(response['Body'].read())
        except:
            return {}
    
    def save_state(state: dict):
        """Save fetch state to S3"""
        s3.put_object(
            Bucket=BUCKET,
            Key=STATE_KEY,
            Body=json.dumps(state, indent=2),
            ContentType="application/json"
        )
    
    def lambda_handler(event, context):
        """Main Lambda handler"""
        print("Starting ET Pro IOC fetch")
    
        # Generate timestamp for file naming
        now = datetime.utcnow()
        timestamp = now.strftime("%Y/%m/%d/%H%M%S")
    
        results = []
        errors = []
    
        # Fetch IP reputation list
        try:
            print(f"Fetching IP reputation list...")
            ip_data = fetch_with_retry(ET_IP_LIST_URL)
            ip_key = f"{PREFIX}/ip/{timestamp}.csv"
            save_to_s3(ip_key, ip_data)
            results.append({"type": "ip", "key": ip_key, "size": len(ip_data)})
        except Exception as e:
            error_msg = f"Failed to fetch IP list: {str(e)}"
            print(error_msg)
            errors.append(error_msg)
    
        # Fetch Domain reputation list
        try:
            print(f"Fetching Domain reputation list...")
            domain_data = fetch_with_retry(ET_DOMAIN_LIST_URL)
            domain_key = f"{PREFIX}/domain/{timestamp}.csv"
            save_to_s3(domain_key, domain_data)
            results.append({"type": "domain", "key": domain_key, "size": len(domain_data)})
        except Exception as e:
            error_msg = f"Failed to fetch Domain list: {str(e)}"
            print(error_msg)
            errors.append(error_msg)
    
        # Save state
        state = {
            "last_fetch": now.isoformat(),
            "results": results,
            "errors": errors
        }
        save_state(state)
    
        return {
            "statusCode": 200 if not errors else 207,
            "body": json.dumps(state)
        }
    
  5. Vaya a Configuración > Configuración general.

  6. Haz clic en Editar.

  7. Define Tiempo de espera en 5 minutos.

  8. Haz clic en Guardar.

Configurar variables de entorno

  1. Vaya a Configuración > Variables de entorno.
  2. Haz clic en Editar > Añadir variable de entorno.
  3. Añade las siguientes variables:

    Clave Valor
    S3_BUCKET et-pro-ioc-bucket
    S3_PREFIX et-pro-ioc
    STATE_KEY et-pro-ioc/state.json
    ET_API_KEY [Your ET API Key]
    ET_IP_LIST_URL [Your detailed IP list URL]
    ET_DOMAIN_LIST_URL [Your detailed Domain list URL]
    TIMEOUT 120
  4. Haz clic en Guardar.

Ponte en contacto con tu representante de Proofpoint para obtener las URLs exactas de tu suscripción. Las URLs de formato detallado suelen seguir este patrón: * Lista de IPs: https://rules.emergingthreatspro.com/[your-code]/reputation/detailed-iprepdata.txt * Lista de dominios: https://rules.emergingthreatspro.com/[your-code]/reputation/detailed-domainrepdata.txt

Crear una programación de EventBridge

  1. Ve a Amazon EventBridge > Schedules > Create schedule (Amazon EventBridge > Programaciones > Crear programación).
  2. Nombre de la programación: et-pro-ioc-hourly
  3. Patrón de programación: programación basada en tarifas
  4. Tasa de expresión: 1 hora
  5. Haz clic en Siguiente.
  6. Destino: función Lambda
  7. Función: et-pro-ioc-fetcher
  8. Haz clic en Siguiente para completar los pasos restantes.
  9. Haz clic en Crear programación.

Configurar feeds en Google SecOps

Debe crear dos feeds independientes: uno para la reputación de la IP y otro para la reputación del dominio.

Crear un feed de reputación de IP

  1. Ve a Configuración de SIEM > Feeds.
  2. Haz clic en Añadir nuevo.
  3. En el campo Nombre del feed, introduce ET Pro IOC - IP Reputation.
  4. En la lista Tipo de fuente, selecciona Amazon S3.
  5. Seleccione Emerging Threats Pro como Tipo de registro.
  6. Haz clic en Siguiente.
  7. Especifique valores para los siguientes parámetros de entrada:
    • URI de S3: s3://et-pro-ioc-bucket/et-pro-ioc/ip/
    • Opciones de eliminación de la fuente: selecciona la que prefieras
    • Antigüedad máxima del archivo: incluye los archivos modificados en los últimos días. El valor predeterminado es de 180 días.
    • ID de clave de acceso: clave de acceso de lectura de SecOps
    • Clave de acceso secreta: clave secreta de lector de SecOps
    • Espacio de nombres de recursos: el espacio de nombres de recursos.
    • Etiquetas de ingestión: la etiqueta aplicada a los eventos de este feed.
  8. Haz clic en Siguiente.
  9. Revisa la información y haz clic en Enviar.

Crear un feed de reputación de dominio

  1. Repite el proceso de creación del feed.
  2. En el campo Nombre del feed, introduce ET Pro IOC - Domain Reputation.
  3. En la lista Tipo de fuente, selecciona Amazon S3.
  4. Seleccione Emerging Threats Pro como Tipo de registro.
  5. Haz clic en Siguiente.
  6. Especifique valores para los siguientes parámetros de entrada:
    • URI de S3: s3://et-pro-ioc-bucket/et-pro-ioc/domain/
    • Opciones de eliminación de la fuente: selecciona la que prefieras
    • Antigüedad máxima del archivo: incluye los archivos modificados en los últimos días. El valor predeterminado es de 180 días.
    • ID de clave de acceso: clave de acceso de lectura de SecOps
    • Clave de acceso secreta: clave secreta de lector de SecOps
    • Espacio de nombres de recursos: el espacio de nombres de recursos.
    • Etiquetas de ingestión: la etiqueta aplicada a los eventos de este feed.
  7. Haz clic en Siguiente.
  8. Revisa la información y haz clic en Enviar.

Tabla de asignación de UDM

Campo de registro Asignación de UDM Lógica
category Este campo se usa en la lógica del analizador, pero no se asigna directamente al UDM. Determina el valor de event.ioc.categorization mediante una tabla de consulta.
collection_time.nanos event.idm.entity.metadata.collected_timestamp.nanos Se asigna directamente desde el registro sin procesar.
collection_time.seconds event.idm.entity.metadata.collected_timestamp.seconds Se asigna directamente desde el registro sin procesar.
datos Este campo se desglosa en varios campos UDM en función de su contenido.
first_seen event.idm.entity.metadata.interval.start_time Se analiza como una fecha y se asigna al UDM.
first_seen event.ioc.active_timerange.start Se analiza como una fecha y se asigna al UDM.
ip_or_domain event.idm.entity.entity.hostname Se asigna a UDM si el patrón grok extrae un host del campo.
ip_or_domain event.idm.entity.entity.ip Se asigna a UDM si el patrón grok no extrae un host del campo.
ip_or_domain event.ioc.domain_and_ports.domain Se asigna a UDM si el patrón grok extrae un host del campo.
ip_or_domain event.ioc.ip_and_ports.ip_address Se asigna a UDM si el patrón grok no extrae un host del campo.
last_seen event.idm.entity.metadata.interval.end_time Se analiza como una fecha y se asigna al UDM.
last_seen event.ioc.active_timerange.end Se analiza como una fecha y se asigna al UDM.
puertos event.idm.entity.entity.labels.value Analizado, unido con un delimitador de comas y asignado al UDM si hay varios puertos.
puertos event.idm.entity.entity.port Se analiza y se asigna a UDM si solo hay un puerto.
puertos event.ioc.domain_and_ports.ports Se analiza y se asigna a UDM si el patrón grok extrae un host del campo.
puertos event.ioc.ip_and_ports.ports Se analiza y se asigna al UDM si el patrón grok no extrae un host del campo.
puntuación event.ioc.confidence_score Se asigna directamente desde el registro sin procesar.
event.idm.entity.entity.labels.key Asigna el valor "ports" si hay varios puertos.
event.idm.entity.metadata.entity_type Asigna el valor "DOMAIN_NAME" si el patrón grok extrae un host del campo ip_or_domain. De lo contrario, asigna el valor "IP_ADDRESS".
event.idm.entity.metadata.threat.category Se ha definido como "SOFTWARE_MALICIOUS".
event.idm.entity.metadata.threat.category_details Derivado del campo category mediante una tabla de consulta.
event.idm.entity.metadata.threat.threat_name Selecciona "ET Intelligence Rep List".
event.idm.entity.metadata.vendor_name Se ha definido como "ET_PRO_IOC".
event.ioc.feed_name Selecciona "ET Intelligence Rep List".
event.ioc.raw_severity Se ha definido como "Malicioso".
timestamp.nanos Copiado de collection_time.nanos.
timestamp.seconds Copiado de collection_time.seconds.

¿Necesitas más ayuda? Recibe respuestas de los miembros de la comunidad y de los profesionales de Google SecOps.