Raccogliere i log IOC di Proofpoint Emerging Threats Pro
Questo documento spiega come importare i log Proofpoint Emerging Threats Pro IOC in Google Security Operations utilizzando Amazon S3. Emerging Threats Intelligence pubblica elenchi di reputazione orari per IP e domini in formato CSV con dati di intelligence sulle minacce, tra cui categorie, punteggi e informazioni temporali. Il codice del parser elabora i dati di threat intelligence ET_PRO in formato CSV. Estrae indirizzi IP, domini, categorie, punteggi e altre informazioni pertinenti, mappandoli sia in un formato IOC standardizzato sia nello schema UDM di Chronicle per ulteriori analisi e utilizzo in Google SecOps.
Prima di iniziare
Assicurati di soddisfare i seguenti prerequisiti:
- Un'istanza di Google SecOps con le autorizzazioni per creare feed
- Abbonamento a Proofpoint ET Intelligence con accesso agli elenchi di reputazione
- Chiave API ET Intelligence da https://etadmin.proofpoint.com/api-access
- Accesso privilegiato ad AWS (S3, IAM, Lambda, EventBridge)
Raccogliere i prerequisiti di Emerging Threats Pro
- Accedi al portale di amministrazione di ET Intelligence all'indirizzo https://etadmin.proofpoint.com
- Vai ad Accesso API.
- Copia e salva la chiave API.
- Contatta il tuo rappresentante di Proofpoint per ottenere:
- URL dell'elenco dettagliato della reputazione IP
- URL dell'elenco dettagliato della reputazione del dominio
ET Intelligence fornisce file CSV separati per gli elenchi di reputazione di IP e domini, aggiornati ogni ora. Utilizza il formato "Dettagliato", che include le seguenti colonne:
* Elenco domini: Domain Name, Category, Score, First Seen, Last Seen, Ports
* Elenco IP: IP Address, Category, Score, First Seen, Last Seen, Ports
Configura il bucket AWS S3 e IAM
Crea bucket S3
- Apri la console Amazon S3.
- Fai clic su Crea bucket
- Nome bucket: inserisci
et-pro-ioc-bucket(o il nome che preferisci). - Regione: seleziona la tua regione preferita
- Fai clic su Crea bucket
Crea un utente IAM per Google SecOps
- Apri la console IAM.
- Fai clic su Utenti > Crea utente.
- Nome utente: inserisci
secops-reader - Fai clic su Avanti.
- Seleziona Allega direttamente le norme.
- Fai clic su Crea policy.
Nell'editor JSON, inserisci la seguente policy:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::et-pro-ioc-bucket/et-pro-ioc/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::et-pro-ioc-bucket", "Condition": { "StringLike": { "s3:prefix": ["et-pro-ioc/*"] } } } ] }Assegna al criterio il nome
SecOpsReaderPolicy.Fai clic su Crea policy.
Torna alla creazione dell'utente e seleziona la norma appena creata.
Fai clic su Avanti > Crea utente.
Vai alla scheda Credenziali di sicurezza.
Fai clic su Crea chiave di accesso.
Seleziona Servizio di terze parti.
Fai clic su Crea chiave di accesso.
Scarica e salva le credenziali.
Configura il ruolo IAM per Lambda
- Nella console AWS, vai a IAM > Ruoli > Crea ruolo.
- Seleziona Servizio AWS > Lambda.
- Fai clic su Avanti.
- Fai clic su Crea policy.
Seleziona la scheda JSON e inserisci quanto segue:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::et-pro-ioc-bucket/et-pro-ioc/*" }, { "Sid": "AllowStateManagement", "Effect": "Allow", "Action": ["s3:GetObject", "s3:PutObject"], "Resource": "arn:aws:s3:::et-pro-ioc-bucket/et-pro-ioc/state.json" } ] }Assegna al criterio il nome
EtProIocLambdaPolicy.Fai clic su Crea policy.
Torna alla creazione del ruolo e collega la policy.
Assegna un nome al ruolo
EtProIocLambdaRole.Fai clic su Crea ruolo.
Crea la funzione Lambda
- Nella console AWS, vai a Lambda > Funzioni > Crea funzione.
- Fai clic su Autore da zero.
Fornisci i seguenti dettagli di configurazione:
- Nome funzione:
et-pro-ioc-fetcher - Runtime: Python 3.13
- Architettura: x86_64
- Ruolo di esecuzione: utilizza il ruolo esistente
EtProIocLambdaRole
- Nome funzione:
Dopo la creazione, vai alla scheda Codice e sostituisci il codice con:
#!/usr/bin/env python3 # Lambda: Fetch ET Pro IOC reputation lists and write raw CSV to S3 import os import time import json from datetime import datetime from urllib.request import Request, urlopen from urllib.error import HTTPError, URLError import boto3 # Environment variables BUCKET = os.environ["S3_BUCKET"] PREFIX = os.environ.get("S3_PREFIX", "et-pro-ioc/").strip("/") ET_API_KEY = os.environ["ET_API_KEY"] ET_IP_LIST_URL = os.environ["ET_IP_LIST_URL"] ET_DOMAIN_LIST_URL = os.environ["ET_DOMAIN_LIST_URL"] STATE_KEY = os.environ.get("STATE_KEY", f"{PREFIX}/state.json") TIMEOUT = int(os.environ.get("TIMEOUT", "120")) s3 = boto3.client("s3") def _build_request(url: str) -> Request: """Build request with ET API authentication""" if not url.lower().startswith("https://"): raise ValueError("Only HTTPS URLs are allowed") req = Request(url, method="GET") # ET Intelligence uses Authorization header with API key req.add_header("Authorization", ET_API_KEY) return req def fetch_with_retry(url: str, max_retries: int = 3) -> bytes: """Fetch URL with retry logic for rate limits""" for attempt in range(max_retries): try: req = _build_request(url) with urlopen(req, timeout=TIMEOUT) as response: if response.status == 200: return response.read() elif response.status == 429: # Rate limited, wait and retry wait_time = min(30 * (2 ** attempt), 300) print(f"Rate limited, waiting {wait_time}s...") time.sleep(wait_time) else: raise HTTPError(url, response.status, response.reason, {}, None) except URLError as e: if attempt == max_retries - 1: raise time.sleep(5 * (attempt + 1)) raise Exception(f"Failed to fetch {url} after {max_retries} attempts") def save_to_s3(key: str, content: bytes): """Save content to S3 with appropriate content type""" s3.put_object( Bucket=BUCKET, Key=key, Body=content, ContentType="text/csv" ) print(f"Saved {len(content)} bytes to s3://{BUCKET}/{key}") def get_state(): """Get last fetch state from S3""" try: response = s3.get_object(Bucket=BUCKET, Key=STATE_KEY) return json.loads(response['Body'].read()) except: return {} def save_state(state: dict): """Save fetch state to S3""" s3.put_object( Bucket=BUCKET, Key=STATE_KEY, Body=json.dumps(state, indent=2), ContentType="application/json" ) def lambda_handler(event, context): """Main Lambda handler""" print("Starting ET Pro IOC fetch") # Generate timestamp for file naming now = datetime.utcnow() timestamp = now.strftime("%Y/%m/%d/%H%M%S") results = [] errors = [] # Fetch IP reputation list try: print(f"Fetching IP reputation list...") ip_data = fetch_with_retry(ET_IP_LIST_URL) ip_key = f"{PREFIX}/ip/{timestamp}.csv" save_to_s3(ip_key, ip_data) results.append({"type": "ip", "key": ip_key, "size": len(ip_data)}) except Exception as e: error_msg = f"Failed to fetch IP list: {str(e)}" print(error_msg) errors.append(error_msg) # Fetch Domain reputation list try: print(f"Fetching Domain reputation list...") domain_data = fetch_with_retry(ET_DOMAIN_LIST_URL) domain_key = f"{PREFIX}/domain/{timestamp}.csv" save_to_s3(domain_key, domain_data) results.append({"type": "domain", "key": domain_key, "size": len(domain_data)}) except Exception as e: error_msg = f"Failed to fetch Domain list: {str(e)}" print(error_msg) errors.append(error_msg) # Save state state = { "last_fetch": now.isoformat(), "results": results, "errors": errors } save_state(state) return { "statusCode": 200 if not errors else 207, "body": json.dumps(state) }Vai a Configurazione > Configurazione generale.
Fai clic su Modifica.
Imposta Timeout su 5 minuti.
Fai clic su Salva.
Configura le variabili di ambiente
- Vai a Configurazione > Variabili di ambiente.
- Fai clic su Modifica > Aggiungi variabile di ambiente.
Aggiungi le seguenti variabili:
Chiave Valore S3_BUCKETet-pro-ioc-bucketS3_PREFIXet-pro-iocSTATE_KEYet-pro-ioc/state.jsonET_API_KEY[Your ET API Key]ET_IP_LIST_URL[Your detailed IP list URL]ET_DOMAIN_LIST_URL[Your detailed Domain list URL]TIMEOUT120Fai clic su Salva.
Contatta il tuo rappresentante Proofpoint per gli URL esatti del tuo abbonamento. In genere, gli URL in formato dettagliato seguono questo pattern:
* Elenco IP: https://rules.emergingthreatspro.com/[your-code]/reputation/detailed-iprepdata.txt
* Elenco domini: https://rules.emergingthreatspro.com/[your-code]/reputation/detailed-domainrepdata.txt
Crea pianificazione EventBridge
- Vai a Amazon EventBridge > Pianificazioni > Crea pianificazione.
- Nome programma:
et-pro-ioc-hourly - Modello di programmazione: programmazione basata sulla tariffa
- Valutazione dell'espressione: 1 ora
- Fai clic su Avanti.
- Destinazione: funzione Lambda
- Funzione:
et-pro-ioc-fetcher - Fai clic su Avanti per completare i passaggi rimanenti.
- Fai clic su Crea pianificazione.
Configurare i feed in Google SecOps
Devi creare due feed separati: uno per la reputazione IP e uno per la reputazione del dominio.
Crea feed reputazione IP
- Vai a Impostazioni SIEM > Feed.
- Fai clic su Aggiungi nuovo.
- Nel campo Nome feed, inserisci
ET Pro IOC - IP Reputation. - Nell'elenco Tipo di origine, seleziona Amazon S3.
- Seleziona Emerging Threats Pro come Tipo di log.
- Fai clic su Avanti.
- Specifica i valori per i seguenti parametri di input:
- URI S3:
s3://et-pro-ioc-bucket/et-pro-ioc/ip/ - Opzioni di eliminazione delle fonti: seleziona l'opzione che preferisci.
- Età massima del file: includi i file modificati nell'ultimo numero di giorni. Il valore predefinito è 180 giorni.
- ID chiave di accesso: chiave di accesso in lettura di SecOps
- Chiave di accesso segreta: chiave segreta del lettore SecOps
- Spazio dei nomi dell'asset: lo spazio dei nomi dell'asset.
- Etichette di importazione: l'etichetta applicata agli eventi di questo feed.
- URI S3:
- Fai clic su Avanti.
- Rivedi e fai clic su Invia.
Crea feed reputazione dominio
- Ripeti la procedura di creazione del feed.
- Nel campo Nome feed, inserisci
ET Pro IOC - Domain Reputation. - Nell'elenco Tipo di origine, seleziona Amazon S3.
- Seleziona Emerging Threats Pro come Tipo di log.
- Fai clic su Avanti.
- Specifica i valori per i seguenti parametri di input:
- URI S3:
s3://et-pro-ioc-bucket/et-pro-ioc/domain/ - Opzioni di eliminazione delle fonti: seleziona l'opzione che preferisci.
- Età massima del file: includi i file modificati nell'ultimo numero di giorni. Il valore predefinito è 180 giorni.
- ID chiave di accesso: chiave di accesso in lettura di SecOps
- Chiave di accesso segreta: chiave segreta del lettore SecOps
- Spazio dei nomi dell'asset: lo spazio dei nomi dell'asset.
- Etichette di importazione: l'etichetta applicata agli eventi di questo feed.
- URI S3:
- Fai clic su Avanti.
- Rivedi e fai clic su Invia.
Tabella di mappatura UDM
| Campo log | Mappatura UDM | Logic |
|---|---|---|
| categoria | Questo campo viene utilizzato nella logica del parser, ma non viene mappato direttamente all'UDM. Determina il valore di event.ioc.categorization tramite una tabella di ricerca. |
|
| collection_time.nanos | event.idm.entity.metadata.collected_timestamp.nanos | Mappato direttamente dal log non elaborato. |
| collection_time.seconds | event.idm.entity.metadata.collected_timestamp.seconds | Mappato direttamente dal log non elaborato. |
| dati | Questo campo viene analizzato in più campi UDM in base ai suoi contenuti. | |
| first_seen | event.idm.entity.metadata.interval.start_time | Analizzato come data e mappato al modello UDM. |
| first_seen | event.ioc.active_timerange.start | Analizzato come data e mappato al modello UDM. |
| ip_or_domain | event.idm.entity.entity.hostname | Mappato a UDM se il pattern grok estrae un host dal campo. |
| ip_or_domain | event.idm.entity.entity.ip | Mappato a UDM se il pattern grok non estrae un host dal campo. |
| ip_or_domain | event.ioc.domain_and_ports.domain | Mappato a UDM se il pattern grok estrae un host dal campo. |
| ip_or_domain | event.ioc.ip_and_ports.ip_address | Mappato a UDM se il pattern grok non estrae un host dal campo. |
| last_seen | event.idm.entity.metadata.interval.end_time | Analizzato come data e mappato al modello UDM. |
| last_seen | event.ioc.active_timerange.end | Analizzato come data e mappato al modello UDM. |
| porte | event.idm.entity.entity.labels.value | Analizzato, unito con il delimitatore virgola e mappato all'UDM se sono presenti più porte. |
| porte | event.idm.entity.entity.port | Analizzato e mappato all'UDM se è presente una sola porta. |
| porte | event.ioc.domain_and_ports.ports | Analizzato e mappato a UDM se il pattern grok estrae un host dal campo. |
| porte | event.ioc.ip_and_ports.ports | Analizzato e mappato all'UDM se il pattern Grok non estrae un host dal campo. |
| punteggio | event.ioc.confidence_score | Mappato direttamente dal log non elaborato. |
| event.idm.entity.entity.labels.key | Imposta "porte" se sono presenti più porte. | |
| event.idm.entity.metadata.entity_type | Imposta su "DOMAIN_NAME" se il pattern grok estrae un host dal campo ip_or_domain, altrimenti imposta su "IP_ADDRESS". |
|
| event.idm.entity.metadata.threat.category | Imposta il valore su "SOFTWARE_MALICIOUS". | |
| event.idm.entity.metadata.threat.category_details | Derivato dal campo category utilizzando una tabella di ricerca. |
|
| event.idm.entity.metadata.threat.threat_name | Imposta "Elenco rappresentanti ET Intelligence". | |
| event.idm.entity.metadata.vendor_name | Imposta il valore su "ET_PRO_IOC". | |
| event.ioc.feed_name | Imposta "Elenco rappresentanti ET Intelligence". | |
| event.ioc.raw_severity | Impostato su "Malicious" (Dannoso). | |
| timestamp.nanos | Copiato da collection_time.nanos. |
|
| timestamp.seconds | Copiato da collection_time.seconds. |
Hai bisogno di ulteriore assistenza? Ricevi risposte dai membri della community e dai professionisti di Google SecOps.