Collecter les journaux des indicateurs de compromission Proofpoint Emerging Threats Pro

Compatible avec :

Ce document explique comment ingérer les journaux Proofpoint Emerging Threats Pro IOC dans Google Security Operations à l'aide d'Amazon S3. Emerging Threats Intelligence publie des listes de réputation horaires pour les adresses IP et les domaines au format CSV, avec des données de renseignements sur les menaces, y compris des catégories, des scores et des informations temporelles. Le code du parseur traite les données de renseignements sur les menaces ET_PRO au format CSV. Il extrait les adresses IP, les domaines, les catégories, les scores et d'autres informations pertinentes, et les mappe à la fois à un format d'IOC standardisé et au schéma UDM de Chronicle pour une analyse et une utilisation plus approfondies dans Google SecOps.

Avant de commencer

Assurez-vous de remplir les conditions suivantes :

  • Une instance Google SecOps disposant des autorisations nécessaires pour créer des flux
  • Abonnement Proofpoint ET Intelligence avec accès aux listes de réputation
  • Clé API ET Intelligence depuis https://etadmin.proofpoint.com/api-access
  • Accès privilégié à AWS (S3, IAM, Lambda, EventBridge)

Collecter les prérequis d'Emerging Threats Pro

  1. Connectez-vous au portail d'administration ET Intelligence à l'adresse https://etadmin.proofpoint.com.
  2. Accédez à Accès à l'API.
  3. Copiez et enregistrez votre clé API.
  4. Contactez votre représentant Proofpoint pour obtenir les éléments suivants :
    • URL de la liste détaillée de la réputation des adresses IP
    • URL de la liste détaillée de la réputation des domaines

ET Intelligence fournit des fichiers CSV distincts pour les listes de réputation des adresses IP et des domaines, qui sont mis à jour toutes les heures. Utilisez le format "détaillé", qui inclut les colonnes suivantes : * Liste de domaines : Domain Name, Category, Score, First Seen, Last Seen, PortsListe d'adresses IP : IP Address, Category, Score, First Seen, Last Seen, Ports

Configurer un bucket AWS S3 et IAM

Créer un bucket S3

  1. Ouvrez la console Amazon S3.
  2. Cliquez sur Créer un bucket.
  3. Nom du bucket : saisissez et-pro-ioc-bucket (ou le nom de votre choix).
  4. Région : sélectionnez la région de votre choix.
  5. Cliquez sur Créer un bucket.

Créer un utilisateur IAM pour Google SecOps

  1. Ouvrez la console IAM.
  2. Cliquez sur Utilisateurs > Créer un utilisateur.
  3. Nom d'utilisateur : saisissez secops-reader.
  4. Cliquez sur Suivant.
  5. Sélectionnez Joindre directement des règles.
  6. Cliquez sur Créer une règle.
  7. Dans l'éditeur JSON, saisissez la stratégie suivante :

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::et-pro-ioc-bucket/et-pro-ioc/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::et-pro-ioc-bucket",
          "Condition": {
            "StringLike": {
              "s3:prefix": ["et-pro-ioc/*"]
            }
          }
        }
      ]
    }
    
  8. Nommez la règle SecOpsReaderPolicy.

  9. Cliquez sur Créer une règle.

  10. Revenez à la création d'utilisateur et sélectionnez la règle que vous venez de créer.

  11. Cliquez sur Suivant > Créer un utilisateur.

  12. Accédez à l'onglet Identifiants de sécurité.

  13. Cliquez sur Créer une clé d'accès.

  14. Sélectionnez Service tiers.

  15. Cliquez sur Créer une clé d'accès.

  16. Téléchargez et enregistrez les identifiants.

Configurer le rôle IAM pour Lambda

  1. Dans la console AWS, accédez à IAM > Rôles > Créer un rôle.
  2. Sélectionnez Service AWS> Lambda.
  3. Cliquez sur Suivant.
  4. Cliquez sur Créer une règle.
  5. Sélectionnez l'onglet JSON, puis saisissez ce qui suit :

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::et-pro-ioc-bucket/et-pro-ioc/*"
        },
        {
          "Sid": "AllowStateManagement",
          "Effect": "Allow",
          "Action": ["s3:GetObject", "s3:PutObject"],
          "Resource": "arn:aws:s3:::et-pro-ioc-bucket/et-pro-ioc/state.json"
        }
      ]
    }
    
  6. Nommez la règle EtProIocLambdaPolicy.

  7. Cliquez sur Créer une règle.

  8. Revenez à la création de rôle et associez la stratégie.

  9. Nommez le rôle EtProIocLambdaRole.

  10. Cliquez sur Créer un rôle.

Créer la fonction Lambda

  1. Dans la console AWS, accédez à Lambda > Fonctions > Créer une fonction.
  2. Cliquez sur Créer à partir de zéro.
  3. Fournissez les informations de configuration suivantes :

    • Nom de la fonction : et-pro-ioc-fetcher
    • Runtime : Python 3.13
    • Architecture : x86_64
    • Rôle d'exécution : utiliser un rôle existant EtProIocLambdaRole
  4. Une fois la page créée, accédez à l'onglet Code et remplacez le contenu par ce qui suit :

    #!/usr/bin/env python3
    # Lambda: Fetch ET Pro IOC reputation lists and write raw CSV to S3
    import os
    import time
    import json
    from datetime import datetime
    from urllib.request import Request, urlopen
    from urllib.error import HTTPError, URLError
    import boto3
    
    # Environment variables
    BUCKET = os.environ["S3_BUCKET"]
    PREFIX = os.environ.get("S3_PREFIX", "et-pro-ioc/").strip("/")
    ET_API_KEY = os.environ["ET_API_KEY"]
    ET_IP_LIST_URL = os.environ["ET_IP_LIST_URL"]
    ET_DOMAIN_LIST_URL = os.environ["ET_DOMAIN_LIST_URL"]
    STATE_KEY = os.environ.get("STATE_KEY", f"{PREFIX}/state.json")
    TIMEOUT = int(os.environ.get("TIMEOUT", "120"))
    
    s3 = boto3.client("s3")
    
    def _build_request(url: str) -> Request:
        """Build request with ET API authentication"""
        if not url.lower().startswith("https://"):
            raise ValueError("Only HTTPS URLs are allowed")
    
        req = Request(url, method="GET")
        # ET Intelligence uses Authorization header with API key
        req.add_header("Authorization", ET_API_KEY)
        return req
    
    def fetch_with_retry(url: str, max_retries: int = 3) -> bytes:
        """Fetch URL with retry logic for rate limits"""
        for attempt in range(max_retries):
            try:
                req = _build_request(url)
                with urlopen(req, timeout=TIMEOUT) as response:
                    if response.status == 200:
                        return response.read()
                    elif response.status == 429:
                        # Rate limited, wait and retry
                        wait_time = min(30 * (2 ** attempt), 300)
                        print(f"Rate limited, waiting {wait_time}s...")
                        time.sleep(wait_time)
                    else:
                        raise HTTPError(url, response.status, response.reason, {}, None)
            except URLError as e:
                if attempt == max_retries - 1:
                    raise
                time.sleep(5 * (attempt + 1))
    
        raise Exception(f"Failed to fetch {url} after {max_retries} attempts")
    
    def save_to_s3(key: str, content: bytes):
        """Save content to S3 with appropriate content type"""
        s3.put_object(
            Bucket=BUCKET,
            Key=key,
            Body=content,
            ContentType="text/csv"
        )
        print(f"Saved {len(content)} bytes to s3://{BUCKET}/{key}")
    
    def get_state():
        """Get last fetch state from S3"""
        try:
            response = s3.get_object(Bucket=BUCKET, Key=STATE_KEY)
            return json.loads(response['Body'].read())
        except:
            return {}
    
    def save_state(state: dict):
        """Save fetch state to S3"""
        s3.put_object(
            Bucket=BUCKET,
            Key=STATE_KEY,
            Body=json.dumps(state, indent=2),
            ContentType="application/json"
        )
    
    def lambda_handler(event, context):
        """Main Lambda handler"""
        print("Starting ET Pro IOC fetch")
    
        # Generate timestamp for file naming
        now = datetime.utcnow()
        timestamp = now.strftime("%Y/%m/%d/%H%M%S")
    
        results = []
        errors = []
    
        # Fetch IP reputation list
        try:
            print(f"Fetching IP reputation list...")
            ip_data = fetch_with_retry(ET_IP_LIST_URL)
            ip_key = f"{PREFIX}/ip/{timestamp}.csv"
            save_to_s3(ip_key, ip_data)
            results.append({"type": "ip", "key": ip_key, "size": len(ip_data)})
        except Exception as e:
            error_msg = f"Failed to fetch IP list: {str(e)}"
            print(error_msg)
            errors.append(error_msg)
    
        # Fetch Domain reputation list
        try:
            print(f"Fetching Domain reputation list...")
            domain_data = fetch_with_retry(ET_DOMAIN_LIST_URL)
            domain_key = f"{PREFIX}/domain/{timestamp}.csv"
            save_to_s3(domain_key, domain_data)
            results.append({"type": "domain", "key": domain_key, "size": len(domain_data)})
        except Exception as e:
            error_msg = f"Failed to fetch Domain list: {str(e)}"
            print(error_msg)
            errors.append(error_msg)
    
        # Save state
        state = {
            "last_fetch": now.isoformat(),
            "results": results,
            "errors": errors
        }
        save_state(state)
    
        return {
            "statusCode": 200 if not errors else 207,
            "body": json.dumps(state)
        }
    
  5. Accédez à Configuration > Configuration générale.

  6. Cliquez sur Modifier.

  7. Définissez Délai avant expiration sur 5 minutes.

  8. Cliquez sur Enregistrer.

Configurer les variables d'environnement

  1. Accédez à Configuration > Variables d'environnement.
  2. Cliquez sur Modifier > Ajouter une variable d'environnement.
  3. Ajoutez les variables suivantes :

    Clé Valeur
    S3_BUCKET et-pro-ioc-bucket
    S3_PREFIX et-pro-ioc
    STATE_KEY et-pro-ioc/state.json
    ET_API_KEY [Your ET API Key]
    ET_IP_LIST_URL [Your detailed IP list URL]
    ET_DOMAIN_LIST_URL [Your detailed Domain list URL]
    TIMEOUT 120
  4. Cliquez sur Enregistrer.

Contactez votre représentant Proofpoint pour obtenir les URL exactes de votre abonnement. Les URL au format détaillé suivent généralement ce modèle : * Liste d'adresses IP : https://rules.emergingthreatspro.com/[your-code]/reputation/detailed-iprepdata.txt * Liste de domaines : https://rules.emergingthreatspro.com/[your-code]/reputation/detailed-domainrepdata.txt

Créer une programmation EventBridge

  1. Accédez à Amazon EventBridge > Schedules > Create schedule (Amazon EventBridge > Plannings > Créer un planning).
  2. Nom de la programmation : et-pro-ioc-hourly
  3. Schéma de planification : planification basée sur le taux
  4. Expression du taux : 1 heure
  5. Cliquez sur Suivant.
  6. Cible : fonction Lambda
  7. Fonction : et-pro-ioc-fetcher
  8. Cliquez sur Suivant pour passer les étapes restantes.
  9. Cliquez sur Créer une programmation.

Configurer des flux dans Google SecOps

Vous devez créer deux flux distincts : un pour la réputation des adresses IP et un pour la réputation des domaines.

Créer un flux de réputation IP

  1. Accédez à Paramètres SIEM> Flux.
  2. Cliquez sur Ajouter.
  3. Dans le champ Nom du flux, saisissez ET Pro IOC - IP Reputation.
  4. Dans la liste Type de source, sélectionnez Amazon S3.
  5. Sélectionnez Emerging Threats Pro comme Type de journal.
  6. Cliquez sur Suivant.
  7. Spécifiez les valeurs des paramètres d'entrée suivants :
    • URI S3 : s3://et-pro-ioc-bucket/et-pro-ioc/ip/
    • Options de suppression de la source : sélectionnez l'option de votre choix.
    • Âge maximal des fichiers : incluez les fichiers modifiés au cours des derniers jours. La valeur par défaut est de 180 jours.
    • ID de clé d'accès : clé d'accès lecteur SecOps
    • Clé d'accès secrète : clé secrète du lecteur SecOps
    • Espace de noms de l'élément : espace de noms de l'élément.
    • Libellés d'ingestion : libellé appliqué aux événements de ce flux.
  8. Cliquez sur Suivant.
  9. Vérifiez, puis cliquez sur Envoyer.

Créer un flux de réputation de domaine

  1. Répétez le processus de création du flux.
  2. Dans le champ Nom du flux, saisissez ET Pro IOC - Domain Reputation.
  3. Dans la liste Type de source, sélectionnez Amazon S3.
  4. Sélectionnez Emerging Threats Pro comme type de journal.
  5. Cliquez sur Suivant.
  6. Spécifiez les valeurs des paramètres d'entrée suivants :
    • URI S3 : s3://et-pro-ioc-bucket/et-pro-ioc/domain/
    • Options de suppression de la source : sélectionnez l'option de votre choix.
    • Âge maximal des fichiers : incluez les fichiers modifiés au cours des derniers jours. La valeur par défaut est de 180 jours.
    • ID de clé d'accès : clé d'accès lecteur SecOps
    • Clé d'accès secrète : clé secrète du lecteur SecOps
    • Espace de noms de l'élément : espace de noms de l'élément.
    • Libellés d'ingestion : libellé appliqué aux événements de ce flux.
  7. Cliquez sur Suivant.
  8. Vérifiez, puis cliquez sur Envoyer.

Table de mappage UDM

Champ du journal Mappage UDM Logique
category Ce champ est utilisé dans la logique du parseur, mais n'est pas mappé directement à l'UDM. Elle détermine la valeur de event.ioc.categorization à l'aide d'une table de référence.
collection_time.nanos event.idm.entity.metadata.collected_timestamp.nanos Directement mappé à partir du journal brut.
collection_time.seconds event.idm.entity.metadata.collected_timestamp.seconds Directement mappé à partir du journal brut.
données Ce champ est analysé en plusieurs champs UDM en fonction de son contenu.
first_seen event.idm.entity.metadata.interval.start_time Analysée comme une date et mappée à l'UDM.
first_seen event.ioc.active_timerange.start Analysée comme une date et mappée à l'UDM.
ip_or_domain event.idm.entity.entity.hostname Mappé à l'UDM si le modèle grok extrait un hôte du champ.
ip_or_domain event.idm.entity.entity.ip Mappé à l'UDM si le modèle grok n'extrait pas d'hôte du champ.
ip_or_domain event.ioc.domain_and_ports.domain Mappé à l'UDM si le modèle grok extrait un hôte du champ.
ip_or_domain event.ioc.ip_and_ports.ip_address Mappé à l'UDM si le modèle grok n'extrait pas d'hôte du champ.
last_seen event.idm.entity.metadata.interval.end_time Analysée comme une date et mappée à l'UDM.
last_seen event.ioc.active_timerange.end Analysée comme une date et mappée à l'UDM.
ports event.idm.entity.entity.labels.value Analysés, joints avec un délimiteur de virgule et mappés à l'UDM s'il existe plusieurs ports.
ports event.idm.entity.entity.port Analysé et mappé à l'UDM s'il n'y a qu'un seul port.
ports event.ioc.domain_and_ports.ports Analysé et mappé sur l'UDM si le modèle Grok extrait un hôte du champ.
ports event.ioc.ip_and_ports.ports Analysé et mappé sur l'UDM si le modèle Grok n'extrait pas d'hôte du champ.
score event.ioc.confidence_score Directement mappé à partir du journal brut.
event.idm.entity.entity.labels.key Définissez la valeur sur "ports" s'il existe plusieurs ports.
event.idm.entity.metadata.entity_type Définissez la valeur sur "DOMAIN_NAME" si le modèle Grok extrait un hôte du champ ip_or_domain, sinon définissez-la sur "IP_ADDRESS".
event.idm.entity.metadata.threat.category Défini sur "SOFTWARE_MALICIOUS".
event.idm.entity.metadata.threat.category_details Dérivé du champ category à l'aide d'une table de référence.
event.idm.entity.metadata.threat.threat_name Définissez-le sur "ET Intelligence Rep List".
event.idm.entity.metadata.vendor_name Définissez-le sur "ET_PRO_IOC".
event.ioc.feed_name Définissez-le sur "ET Intelligence Rep List".
event.ioc.raw_severity Défini sur "Malveillant".
timestamp.nanos Copié depuis collection_time.nanos.
timestamp.seconds Copié depuis collection_time.seconds.

Vous avez encore besoin d'aide ? Obtenez des réponses de membres de la communauté et de professionnels Google SecOps.