Collecter les journaux des indicateurs de compromission Proofpoint Emerging Threats Pro
Ce document explique comment ingérer les journaux Proofpoint Emerging Threats Pro IOC dans Google Security Operations à l'aide d'Amazon S3. Emerging Threats Intelligence publie des listes de réputation horaires pour les adresses IP et les domaines au format CSV, avec des données de renseignements sur les menaces, y compris des catégories, des scores et des informations temporelles. Le code du parseur traite les données de renseignements sur les menaces ET_PRO au format CSV. Il extrait les adresses IP, les domaines, les catégories, les scores et d'autres informations pertinentes, et les mappe à la fois à un format d'IOC standardisé et au schéma UDM de Chronicle pour une analyse et une utilisation plus approfondies dans Google SecOps.
Avant de commencer
Assurez-vous de remplir les conditions suivantes :
- Une instance Google SecOps disposant des autorisations nécessaires pour créer des flux
- Abonnement Proofpoint ET Intelligence avec accès aux listes de réputation
- Clé API ET Intelligence depuis https://etadmin.proofpoint.com/api-access
- Accès privilégié à AWS (S3, IAM, Lambda, EventBridge)
Collecter les prérequis d'Emerging Threats Pro
- Connectez-vous au portail d'administration ET Intelligence à l'adresse https://etadmin.proofpoint.com.
- Accédez à Accès à l'API.
- Copiez et enregistrez votre clé API.
- Contactez votre représentant Proofpoint pour obtenir les éléments suivants :
- URL de la liste détaillée de la réputation des adresses IP
- URL de la liste détaillée de la réputation des domaines
ET Intelligence fournit des fichiers CSV distincts pour les listes de réputation des adresses IP et des domaines, qui sont mis à jour toutes les heures. Utilisez le format "détaillé", qui inclut les colonnes suivantes :
* Liste de domaines : Domain Name, Category, Score, First Seen, Last Seen, Ports
* Liste d'adresses IP : IP Address, Category, Score, First Seen, Last Seen, Ports
Configurer un bucket AWS S3 et IAM
Créer un bucket S3
- Ouvrez la console Amazon S3.
- Cliquez sur Créer un bucket.
- Nom du bucket : saisissez
et-pro-ioc-bucket(ou le nom de votre choix). - Région : sélectionnez la région de votre choix.
- Cliquez sur Créer un bucket.
Créer un utilisateur IAM pour Google SecOps
- Ouvrez la console IAM.
- Cliquez sur Utilisateurs > Créer un utilisateur.
- Nom d'utilisateur : saisissez
secops-reader. - Cliquez sur Suivant.
- Sélectionnez Joindre directement des règles.
- Cliquez sur Créer une règle.
Dans l'éditeur JSON, saisissez la stratégie suivante :
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::et-pro-ioc-bucket/et-pro-ioc/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::et-pro-ioc-bucket", "Condition": { "StringLike": { "s3:prefix": ["et-pro-ioc/*"] } } } ] }Nommez la règle
SecOpsReaderPolicy.Cliquez sur Créer une règle.
Revenez à la création d'utilisateur et sélectionnez la règle que vous venez de créer.
Cliquez sur Suivant > Créer un utilisateur.
Accédez à l'onglet Identifiants de sécurité.
Cliquez sur Créer une clé d'accès.
Sélectionnez Service tiers.
Cliquez sur Créer une clé d'accès.
Téléchargez et enregistrez les identifiants.
Configurer le rôle IAM pour Lambda
- Dans la console AWS, accédez à IAM > Rôles > Créer un rôle.
- Sélectionnez Service AWS> Lambda.
- Cliquez sur Suivant.
- Cliquez sur Créer une règle.
Sélectionnez l'onglet JSON, puis saisissez ce qui suit :
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::et-pro-ioc-bucket/et-pro-ioc/*" }, { "Sid": "AllowStateManagement", "Effect": "Allow", "Action": ["s3:GetObject", "s3:PutObject"], "Resource": "arn:aws:s3:::et-pro-ioc-bucket/et-pro-ioc/state.json" } ] }Nommez la règle
EtProIocLambdaPolicy.Cliquez sur Créer une règle.
Revenez à la création de rôle et associez la stratégie.
Nommez le rôle
EtProIocLambdaRole.Cliquez sur Créer un rôle.
Créer la fonction Lambda
- Dans la console AWS, accédez à Lambda > Fonctions > Créer une fonction.
- Cliquez sur Créer à partir de zéro.
Fournissez les informations de configuration suivantes :
- Nom de la fonction :
et-pro-ioc-fetcher - Runtime : Python 3.13
- Architecture : x86_64
- Rôle d'exécution : utiliser un rôle existant
EtProIocLambdaRole
- Nom de la fonction :
Une fois la page créée, accédez à l'onglet Code et remplacez le contenu par ce qui suit :
#!/usr/bin/env python3 # Lambda: Fetch ET Pro IOC reputation lists and write raw CSV to S3 import os import time import json from datetime import datetime from urllib.request import Request, urlopen from urllib.error import HTTPError, URLError import boto3 # Environment variables BUCKET = os.environ["S3_BUCKET"] PREFIX = os.environ.get("S3_PREFIX", "et-pro-ioc/").strip("/") ET_API_KEY = os.environ["ET_API_KEY"] ET_IP_LIST_URL = os.environ["ET_IP_LIST_URL"] ET_DOMAIN_LIST_URL = os.environ["ET_DOMAIN_LIST_URL"] STATE_KEY = os.environ.get("STATE_KEY", f"{PREFIX}/state.json") TIMEOUT = int(os.environ.get("TIMEOUT", "120")) s3 = boto3.client("s3") def _build_request(url: str) -> Request: """Build request with ET API authentication""" if not url.lower().startswith("https://"): raise ValueError("Only HTTPS URLs are allowed") req = Request(url, method="GET") # ET Intelligence uses Authorization header with API key req.add_header("Authorization", ET_API_KEY) return req def fetch_with_retry(url: str, max_retries: int = 3) -> bytes: """Fetch URL with retry logic for rate limits""" for attempt in range(max_retries): try: req = _build_request(url) with urlopen(req, timeout=TIMEOUT) as response: if response.status == 200: return response.read() elif response.status == 429: # Rate limited, wait and retry wait_time = min(30 * (2 ** attempt), 300) print(f"Rate limited, waiting {wait_time}s...") time.sleep(wait_time) else: raise HTTPError(url, response.status, response.reason, {}, None) except URLError as e: if attempt == max_retries - 1: raise time.sleep(5 * (attempt + 1)) raise Exception(f"Failed to fetch {url} after {max_retries} attempts") def save_to_s3(key: str, content: bytes): """Save content to S3 with appropriate content type""" s3.put_object( Bucket=BUCKET, Key=key, Body=content, ContentType="text/csv" ) print(f"Saved {len(content)} bytes to s3://{BUCKET}/{key}") def get_state(): """Get last fetch state from S3""" try: response = s3.get_object(Bucket=BUCKET, Key=STATE_KEY) return json.loads(response['Body'].read()) except: return {} def save_state(state: dict): """Save fetch state to S3""" s3.put_object( Bucket=BUCKET, Key=STATE_KEY, Body=json.dumps(state, indent=2), ContentType="application/json" ) def lambda_handler(event, context): """Main Lambda handler""" print("Starting ET Pro IOC fetch") # Generate timestamp for file naming now = datetime.utcnow() timestamp = now.strftime("%Y/%m/%d/%H%M%S") results = [] errors = [] # Fetch IP reputation list try: print(f"Fetching IP reputation list...") ip_data = fetch_with_retry(ET_IP_LIST_URL) ip_key = f"{PREFIX}/ip/{timestamp}.csv" save_to_s3(ip_key, ip_data) results.append({"type": "ip", "key": ip_key, "size": len(ip_data)}) except Exception as e: error_msg = f"Failed to fetch IP list: {str(e)}" print(error_msg) errors.append(error_msg) # Fetch Domain reputation list try: print(f"Fetching Domain reputation list...") domain_data = fetch_with_retry(ET_DOMAIN_LIST_URL) domain_key = f"{PREFIX}/domain/{timestamp}.csv" save_to_s3(domain_key, domain_data) results.append({"type": "domain", "key": domain_key, "size": len(domain_data)}) except Exception as e: error_msg = f"Failed to fetch Domain list: {str(e)}" print(error_msg) errors.append(error_msg) # Save state state = { "last_fetch": now.isoformat(), "results": results, "errors": errors } save_state(state) return { "statusCode": 200 if not errors else 207, "body": json.dumps(state) }Accédez à Configuration > Configuration générale.
Cliquez sur Modifier.
Définissez Délai avant expiration sur 5 minutes.
Cliquez sur Enregistrer.
Configurer les variables d'environnement
- Accédez à Configuration > Variables d'environnement.
- Cliquez sur Modifier > Ajouter une variable d'environnement.
Ajoutez les variables suivantes :
Clé Valeur S3_BUCKETet-pro-ioc-bucketS3_PREFIXet-pro-iocSTATE_KEYet-pro-ioc/state.jsonET_API_KEY[Your ET API Key]ET_IP_LIST_URL[Your detailed IP list URL]ET_DOMAIN_LIST_URL[Your detailed Domain list URL]TIMEOUT120Cliquez sur Enregistrer.
Contactez votre représentant Proofpoint pour obtenir les URL exactes de votre abonnement. Les URL au format détaillé suivent généralement ce modèle :
* Liste d'adresses IP : https://rules.emergingthreatspro.com/[your-code]/reputation/detailed-iprepdata.txt
* Liste de domaines : https://rules.emergingthreatspro.com/[your-code]/reputation/detailed-domainrepdata.txt
Créer une programmation EventBridge
- Accédez à Amazon EventBridge > Schedules > Create schedule (Amazon EventBridge > Plannings > Créer un planning).
- Nom de la programmation :
et-pro-ioc-hourly - Schéma de planification : planification basée sur le taux
- Expression du taux : 1 heure
- Cliquez sur Suivant.
- Cible : fonction Lambda
- Fonction :
et-pro-ioc-fetcher - Cliquez sur Suivant pour passer les étapes restantes.
- Cliquez sur Créer une programmation.
Configurer des flux dans Google SecOps
Vous devez créer deux flux distincts : un pour la réputation des adresses IP et un pour la réputation des domaines.
Créer un flux de réputation IP
- Accédez à Paramètres SIEM> Flux.
- Cliquez sur Ajouter.
- Dans le champ Nom du flux, saisissez
ET Pro IOC - IP Reputation. - Dans la liste Type de source, sélectionnez Amazon S3.
- Sélectionnez Emerging Threats Pro comme Type de journal.
- Cliquez sur Suivant.
- Spécifiez les valeurs des paramètres d'entrée suivants :
- URI S3 :
s3://et-pro-ioc-bucket/et-pro-ioc/ip/ - Options de suppression de la source : sélectionnez l'option de votre choix.
- Âge maximal des fichiers : incluez les fichiers modifiés au cours des derniers jours. La valeur par défaut est de 180 jours.
- ID de clé d'accès : clé d'accès lecteur SecOps
- Clé d'accès secrète : clé secrète du lecteur SecOps
- Espace de noms de l'élément : espace de noms de l'élément.
- Libellés d'ingestion : libellé appliqué aux événements de ce flux.
- URI S3 :
- Cliquez sur Suivant.
- Vérifiez, puis cliquez sur Envoyer.
Créer un flux de réputation de domaine
- Répétez le processus de création du flux.
- Dans le champ Nom du flux, saisissez
ET Pro IOC - Domain Reputation. - Dans la liste Type de source, sélectionnez Amazon S3.
- Sélectionnez Emerging Threats Pro comme type de journal.
- Cliquez sur Suivant.
- Spécifiez les valeurs des paramètres d'entrée suivants :
- URI S3 :
s3://et-pro-ioc-bucket/et-pro-ioc/domain/ - Options de suppression de la source : sélectionnez l'option de votre choix.
- Âge maximal des fichiers : incluez les fichiers modifiés au cours des derniers jours. La valeur par défaut est de 180 jours.
- ID de clé d'accès : clé d'accès lecteur SecOps
- Clé d'accès secrète : clé secrète du lecteur SecOps
- Espace de noms de l'élément : espace de noms de l'élément.
- Libellés d'ingestion : libellé appliqué aux événements de ce flux.
- URI S3 :
- Cliquez sur Suivant.
- Vérifiez, puis cliquez sur Envoyer.
Table de mappage UDM
| Champ du journal | Mappage UDM | Logique |
|---|---|---|
| category | Ce champ est utilisé dans la logique du parseur, mais n'est pas mappé directement à l'UDM. Elle détermine la valeur de event.ioc.categorization à l'aide d'une table de référence. |
|
| collection_time.nanos | event.idm.entity.metadata.collected_timestamp.nanos | Directement mappé à partir du journal brut. |
| collection_time.seconds | event.idm.entity.metadata.collected_timestamp.seconds | Directement mappé à partir du journal brut. |
| données | Ce champ est analysé en plusieurs champs UDM en fonction de son contenu. | |
| first_seen | event.idm.entity.metadata.interval.start_time | Analysée comme une date et mappée à l'UDM. |
| first_seen | event.ioc.active_timerange.start | Analysée comme une date et mappée à l'UDM. |
| ip_or_domain | event.idm.entity.entity.hostname | Mappé à l'UDM si le modèle grok extrait un hôte du champ. |
| ip_or_domain | event.idm.entity.entity.ip | Mappé à l'UDM si le modèle grok n'extrait pas d'hôte du champ. |
| ip_or_domain | event.ioc.domain_and_ports.domain | Mappé à l'UDM si le modèle grok extrait un hôte du champ. |
| ip_or_domain | event.ioc.ip_and_ports.ip_address | Mappé à l'UDM si le modèle grok n'extrait pas d'hôte du champ. |
| last_seen | event.idm.entity.metadata.interval.end_time | Analysée comme une date et mappée à l'UDM. |
| last_seen | event.ioc.active_timerange.end | Analysée comme une date et mappée à l'UDM. |
| ports | event.idm.entity.entity.labels.value | Analysés, joints avec un délimiteur de virgule et mappés à l'UDM s'il existe plusieurs ports. |
| ports | event.idm.entity.entity.port | Analysé et mappé à l'UDM s'il n'y a qu'un seul port. |
| ports | event.ioc.domain_and_ports.ports | Analysé et mappé sur l'UDM si le modèle Grok extrait un hôte du champ. |
| ports | event.ioc.ip_and_ports.ports | Analysé et mappé sur l'UDM si le modèle Grok n'extrait pas d'hôte du champ. |
| score | event.ioc.confidence_score | Directement mappé à partir du journal brut. |
| event.idm.entity.entity.labels.key | Définissez la valeur sur "ports" s'il existe plusieurs ports. | |
| event.idm.entity.metadata.entity_type | Définissez la valeur sur "DOMAIN_NAME" si le modèle Grok extrait un hôte du champ ip_or_domain, sinon définissez-la sur "IP_ADDRESS". |
|
| event.idm.entity.metadata.threat.category | Défini sur "SOFTWARE_MALICIOUS". | |
| event.idm.entity.metadata.threat.category_details | Dérivé du champ category à l'aide d'une table de référence. |
|
| event.idm.entity.metadata.threat.threat_name | Définissez-le sur "ET Intelligence Rep List". | |
| event.idm.entity.metadata.vendor_name | Définissez-le sur "ET_PRO_IOC". | |
| event.ioc.feed_name | Définissez-le sur "ET Intelligence Rep List". | |
| event.ioc.raw_severity | Défini sur "Malveillant". | |
| timestamp.nanos | Copié depuis collection_time.nanos. |
|
| timestamp.seconds | Copié depuis collection_time.seconds. |
Vous avez encore besoin d'aide ? Obtenez des réponses de membres de la communauté et de professionnels Google SecOps.