Raccogliere i log IOC di Proofpoint Emerging Threats Pro

Supportato in:

Questo documento spiega come importare i log Proofpoint Emerging Threats Pro IOC in Google Security Operations utilizzando Amazon S3. Emerging Threats Intelligence pubblica elenchi di reputazione orari per IP e domini in formato CSV con dati di intelligence sulle minacce, tra cui categorie, punteggi e informazioni temporali. Il codice del parser elabora i dati di threat intelligence ET_PRO in formato CSV. Estrae indirizzi IP, domini, categorie, punteggi e altre informazioni pertinenti, mappandoli sia in un formato IOC standardizzato sia nello schema UDM di Chronicle per ulteriori analisi e utilizzo in Google SecOps.

Prima di iniziare

Assicurati di soddisfare i seguenti prerequisiti:

  • Un'istanza di Google SecOps con le autorizzazioni per creare feed
  • Abbonamento a Proofpoint ET Intelligence con accesso agli elenchi di reputazione
  • Chiave API ET Intelligence da https://etadmin.proofpoint.com/api-access
  • Accesso privilegiato ad AWS (S3, IAM, Lambda, EventBridge)

Raccogliere i prerequisiti di Emerging Threats Pro

  1. Accedi al portale di amministrazione di ET Intelligence all'indirizzo https://etadmin.proofpoint.com
  2. Vai ad Accesso API.
  3. Copia e salva la chiave API.
  4. Contatta il tuo rappresentante di Proofpoint per ottenere:
    • URL dell'elenco dettagliato della reputazione IP
    • URL dell'elenco dettagliato della reputazione del dominio

ET Intelligence fornisce file CSV separati per gli elenchi di reputazione di IP e domini, aggiornati ogni ora. Utilizza il formato "Dettagliato", che include le seguenti colonne: * Elenco domini: Domain Name, Category, Score, First Seen, Last Seen, Ports * Elenco IP: IP Address, Category, Score, First Seen, Last Seen, Ports

Configura il bucket AWS S3 e IAM

Crea bucket S3

  1. Apri la console Amazon S3.
  2. Fai clic su Crea bucket
  3. Nome bucket: inserisci et-pro-ioc-bucket (o il nome che preferisci).
  4. Regione: seleziona la tua regione preferita
  5. Fai clic su Crea bucket

Crea un utente IAM per Google SecOps

  1. Apri la console IAM.
  2. Fai clic su Utenti > Crea utente.
  3. Nome utente: inserisci secops-reader
  4. Fai clic su Avanti.
  5. Seleziona Allega direttamente le norme.
  6. Fai clic su Crea policy.
  7. Nell'editor JSON, inserisci la seguente policy:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::et-pro-ioc-bucket/et-pro-ioc/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::et-pro-ioc-bucket",
          "Condition": {
            "StringLike": {
              "s3:prefix": ["et-pro-ioc/*"]
            }
          }
        }
      ]
    }
    
  8. Assegna al criterio il nome SecOpsReaderPolicy.

  9. Fai clic su Crea policy.

  10. Torna alla creazione dell'utente e seleziona la norma appena creata.

  11. Fai clic su Avanti > Crea utente.

  12. Vai alla scheda Credenziali di sicurezza.

  13. Fai clic su Crea chiave di accesso.

  14. Seleziona Servizio di terze parti.

  15. Fai clic su Crea chiave di accesso.

  16. Scarica e salva le credenziali.

Configura il ruolo IAM per Lambda

  1. Nella console AWS, vai a IAM > Ruoli > Crea ruolo.
  2. Seleziona Servizio AWS > Lambda.
  3. Fai clic su Avanti.
  4. Fai clic su Crea policy.
  5. Seleziona la scheda JSON e inserisci quanto segue:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::et-pro-ioc-bucket/et-pro-ioc/*"
        },
        {
          "Sid": "AllowStateManagement",
          "Effect": "Allow",
          "Action": ["s3:GetObject", "s3:PutObject"],
          "Resource": "arn:aws:s3:::et-pro-ioc-bucket/et-pro-ioc/state.json"
        }
      ]
    }
    
  6. Assegna al criterio il nome EtProIocLambdaPolicy.

  7. Fai clic su Crea policy.

  8. Torna alla creazione del ruolo e collega la policy.

  9. Assegna un nome al ruolo EtProIocLambdaRole.

  10. Fai clic su Crea ruolo.

Crea la funzione Lambda

  1. Nella console AWS, vai a Lambda > Funzioni > Crea funzione.
  2. Fai clic su Autore da zero.
  3. Fornisci i seguenti dettagli di configurazione:

    • Nome funzione: et-pro-ioc-fetcher
    • Runtime: Python 3.13
    • Architettura: x86_64
    • Ruolo di esecuzione: utilizza il ruolo esistente EtProIocLambdaRole
  4. Dopo la creazione, vai alla scheda Codice e sostituisci il codice con:

    #!/usr/bin/env python3
    # Lambda: Fetch ET Pro IOC reputation lists and write raw CSV to S3
    import os
    import time
    import json
    from datetime import datetime
    from urllib.request import Request, urlopen
    from urllib.error import HTTPError, URLError
    import boto3
    
    # Environment variables
    BUCKET = os.environ["S3_BUCKET"]
    PREFIX = os.environ.get("S3_PREFIX", "et-pro-ioc/").strip("/")
    ET_API_KEY = os.environ["ET_API_KEY"]
    ET_IP_LIST_URL = os.environ["ET_IP_LIST_URL"]
    ET_DOMAIN_LIST_URL = os.environ["ET_DOMAIN_LIST_URL"]
    STATE_KEY = os.environ.get("STATE_KEY", f"{PREFIX}/state.json")
    TIMEOUT = int(os.environ.get("TIMEOUT", "120"))
    
    s3 = boto3.client("s3")
    
    def _build_request(url: str) -> Request:
        """Build request with ET API authentication"""
        if not url.lower().startswith("https://"):
            raise ValueError("Only HTTPS URLs are allowed")
    
        req = Request(url, method="GET")
        # ET Intelligence uses Authorization header with API key
        req.add_header("Authorization", ET_API_KEY)
        return req
    
    def fetch_with_retry(url: str, max_retries: int = 3) -> bytes:
        """Fetch URL with retry logic for rate limits"""
        for attempt in range(max_retries):
            try:
                req = _build_request(url)
                with urlopen(req, timeout=TIMEOUT) as response:
                    if response.status == 200:
                        return response.read()
                    elif response.status == 429:
                        # Rate limited, wait and retry
                        wait_time = min(30 * (2 ** attempt), 300)
                        print(f"Rate limited, waiting {wait_time}s...")
                        time.sleep(wait_time)
                    else:
                        raise HTTPError(url, response.status, response.reason, {}, None)
            except URLError as e:
                if attempt == max_retries - 1:
                    raise
                time.sleep(5 * (attempt + 1))
    
        raise Exception(f"Failed to fetch {url} after {max_retries} attempts")
    
    def save_to_s3(key: str, content: bytes):
        """Save content to S3 with appropriate content type"""
        s3.put_object(
            Bucket=BUCKET,
            Key=key,
            Body=content,
            ContentType="text/csv"
        )
        print(f"Saved {len(content)} bytes to s3://{BUCKET}/{key}")
    
    def get_state():
        """Get last fetch state from S3"""
        try:
            response = s3.get_object(Bucket=BUCKET, Key=STATE_KEY)
            return json.loads(response['Body'].read())
        except:
            return {}
    
    def save_state(state: dict):
        """Save fetch state to S3"""
        s3.put_object(
            Bucket=BUCKET,
            Key=STATE_KEY,
            Body=json.dumps(state, indent=2),
            ContentType="application/json"
        )
    
    def lambda_handler(event, context):
        """Main Lambda handler"""
        print("Starting ET Pro IOC fetch")
    
        # Generate timestamp for file naming
        now = datetime.utcnow()
        timestamp = now.strftime("%Y/%m/%d/%H%M%S")
    
        results = []
        errors = []
    
        # Fetch IP reputation list
        try:
            print(f"Fetching IP reputation list...")
            ip_data = fetch_with_retry(ET_IP_LIST_URL)
            ip_key = f"{PREFIX}/ip/{timestamp}.csv"
            save_to_s3(ip_key, ip_data)
            results.append({"type": "ip", "key": ip_key, "size": len(ip_data)})
        except Exception as e:
            error_msg = f"Failed to fetch IP list: {str(e)}"
            print(error_msg)
            errors.append(error_msg)
    
        # Fetch Domain reputation list
        try:
            print(f"Fetching Domain reputation list...")
            domain_data = fetch_with_retry(ET_DOMAIN_LIST_URL)
            domain_key = f"{PREFIX}/domain/{timestamp}.csv"
            save_to_s3(domain_key, domain_data)
            results.append({"type": "domain", "key": domain_key, "size": len(domain_data)})
        except Exception as e:
            error_msg = f"Failed to fetch Domain list: {str(e)}"
            print(error_msg)
            errors.append(error_msg)
    
        # Save state
        state = {
            "last_fetch": now.isoformat(),
            "results": results,
            "errors": errors
        }
        save_state(state)
    
        return {
            "statusCode": 200 if not errors else 207,
            "body": json.dumps(state)
        }
    
  5. Vai a Configurazione > Configurazione generale.

  6. Fai clic su Modifica.

  7. Imposta Timeout su 5 minuti.

  8. Fai clic su Salva.

Configura le variabili di ambiente

  1. Vai a Configurazione > Variabili di ambiente.
  2. Fai clic su Modifica > Aggiungi variabile di ambiente.
  3. Aggiungi le seguenti variabili:

    Chiave Valore
    S3_BUCKET et-pro-ioc-bucket
    S3_PREFIX et-pro-ioc
    STATE_KEY et-pro-ioc/state.json
    ET_API_KEY [Your ET API Key]
    ET_IP_LIST_URL [Your detailed IP list URL]
    ET_DOMAIN_LIST_URL [Your detailed Domain list URL]
    TIMEOUT 120
  4. Fai clic su Salva.

Contatta il tuo rappresentante Proofpoint per gli URL esatti del tuo abbonamento. In genere, gli URL in formato dettagliato seguono questo pattern: * Elenco IP: https://rules.emergingthreatspro.com/[your-code]/reputation/detailed-iprepdata.txt * Elenco domini: https://rules.emergingthreatspro.com/[your-code]/reputation/detailed-domainrepdata.txt

Crea pianificazione EventBridge

  1. Vai a Amazon EventBridge > Pianificazioni > Crea pianificazione.
  2. Nome programma: et-pro-ioc-hourly
  3. Modello di programmazione: programmazione basata sulla tariffa
  4. Valutazione dell'espressione: 1 ora
  5. Fai clic su Avanti.
  6. Destinazione: funzione Lambda
  7. Funzione: et-pro-ioc-fetcher
  8. Fai clic su Avanti per completare i passaggi rimanenti.
  9. Fai clic su Crea pianificazione.

Configurare i feed in Google SecOps

Devi creare due feed separati: uno per la reputazione IP e uno per la reputazione del dominio.

Crea feed reputazione IP

  1. Vai a Impostazioni SIEM > Feed.
  2. Fai clic su Aggiungi nuovo.
  3. Nel campo Nome feed, inserisci ET Pro IOC - IP Reputation.
  4. Nell'elenco Tipo di origine, seleziona Amazon S3.
  5. Seleziona Emerging Threats Pro come Tipo di log.
  6. Fai clic su Avanti.
  7. Specifica i valori per i seguenti parametri di input:
    • URI S3: s3://et-pro-ioc-bucket/et-pro-ioc/ip/
    • Opzioni di eliminazione delle fonti: seleziona l'opzione che preferisci.
    • Età massima del file: includi i file modificati nell'ultimo numero di giorni. Il valore predefinito è 180 giorni.
    • ID chiave di accesso: chiave di accesso in lettura di SecOps
    • Chiave di accesso segreta: chiave segreta del lettore SecOps
    • Spazio dei nomi dell'asset: lo spazio dei nomi dell'asset.
    • Etichette di importazione: l'etichetta applicata agli eventi di questo feed.
  8. Fai clic su Avanti.
  9. Rivedi e fai clic su Invia.

Crea feed reputazione dominio

  1. Ripeti la procedura di creazione del feed.
  2. Nel campo Nome feed, inserisci ET Pro IOC - Domain Reputation.
  3. Nell'elenco Tipo di origine, seleziona Amazon S3.
  4. Seleziona Emerging Threats Pro come Tipo di log.
  5. Fai clic su Avanti.
  6. Specifica i valori per i seguenti parametri di input:
    • URI S3: s3://et-pro-ioc-bucket/et-pro-ioc/domain/
    • Opzioni di eliminazione delle fonti: seleziona l'opzione che preferisci.
    • Età massima del file: includi i file modificati nell'ultimo numero di giorni. Il valore predefinito è 180 giorni.
    • ID chiave di accesso: chiave di accesso in lettura di SecOps
    • Chiave di accesso segreta: chiave segreta del lettore SecOps
    • Spazio dei nomi dell'asset: lo spazio dei nomi dell'asset.
    • Etichette di importazione: l'etichetta applicata agli eventi di questo feed.
  7. Fai clic su Avanti.
  8. Rivedi e fai clic su Invia.

Tabella di mappatura UDM

Campo log Mappatura UDM Logic
categoria Questo campo viene utilizzato nella logica del parser, ma non viene mappato direttamente all'UDM. Determina il valore di event.ioc.categorization tramite una tabella di ricerca.
collection_time.nanos event.idm.entity.metadata.collected_timestamp.nanos Mappato direttamente dal log non elaborato.
collection_time.seconds event.idm.entity.metadata.collected_timestamp.seconds Mappato direttamente dal log non elaborato.
dati Questo campo viene analizzato in più campi UDM in base ai suoi contenuti.
first_seen event.idm.entity.metadata.interval.start_time Analizzato come data e mappato al modello UDM.
first_seen event.ioc.active_timerange.start Analizzato come data e mappato al modello UDM.
ip_or_domain event.idm.entity.entity.hostname Mappato a UDM se il pattern grok estrae un host dal campo.
ip_or_domain event.idm.entity.entity.ip Mappato a UDM se il pattern grok non estrae un host dal campo.
ip_or_domain event.ioc.domain_and_ports.domain Mappato a UDM se il pattern grok estrae un host dal campo.
ip_or_domain event.ioc.ip_and_ports.ip_address Mappato a UDM se il pattern grok non estrae un host dal campo.
last_seen event.idm.entity.metadata.interval.end_time Analizzato come data e mappato al modello UDM.
last_seen event.ioc.active_timerange.end Analizzato come data e mappato al modello UDM.
porte event.idm.entity.entity.labels.value Analizzato, unito con il delimitatore virgola e mappato all'UDM se sono presenti più porte.
porte event.idm.entity.entity.port Analizzato e mappato all'UDM se è presente una sola porta.
porte event.ioc.domain_and_ports.ports Analizzato e mappato a UDM se il pattern grok estrae un host dal campo.
porte event.ioc.ip_and_ports.ports Analizzato e mappato all'UDM se il pattern Grok non estrae un host dal campo.
punteggio event.ioc.confidence_score Mappato direttamente dal log non elaborato.
event.idm.entity.entity.labels.key Imposta "porte" se sono presenti più porte.
event.idm.entity.metadata.entity_type Imposta su "DOMAIN_NAME" se il pattern grok estrae un host dal campo ip_or_domain, altrimenti imposta su "IP_ADDRESS".
event.idm.entity.metadata.threat.category Imposta il valore su "SOFTWARE_MALICIOUS".
event.idm.entity.metadata.threat.category_details Derivato dal campo category utilizzando una tabella di ricerca.
event.idm.entity.metadata.threat.threat_name Imposta "Elenco rappresentanti ET Intelligence".
event.idm.entity.metadata.vendor_name Imposta il valore su "ET_PRO_IOC".
event.ioc.feed_name Imposta "Elenco rappresentanti ET Intelligence".
event.ioc.raw_severity Impostato su "Malicious" (Dannoso).
timestamp.nanos Copiato da collection_time.nanos.
timestamp.seconds Copiato da collection_time.seconds.

Hai bisogno di ulteriore assistenza? Ricevi risposte dai membri della community e dai professionisti di Google SecOps.