Recolha registos de IOCs do Proofpoint Emerging Threats Pro

Suportado em:

Este documento explica como carregar registos de IOCs do Proofpoint Emerging Threats Pro para o Google Security Operations através do Amazon S3. A Emerging Threats Intelligence publica listas de reputação por hora para IPs e domínios no formato CSV com dados de informações sobre ameaças, incluindo categorias, classificações e informações temporais. O código do analisador processa dados de informações sobre ameaças ET_PRO formatados em CSV. Extrai endereços IP, domínios, categorias, classificações e outras informações relevantes, mapeando-os para um formato de IOC padronizado e para o esquema UDM do Chronicle para análise e utilização adicionais no Google SecOps.

Antes de começar

Certifique-se de que cumpre os seguintes pré-requisitos:

  • Uma instância do Google SecOps com autorizações para criar feeds
  • Subscrição da Proofpoint ET Intelligence com acesso a listas de reputação
  • Chave da API ET Intelligence a partir de https://etadmin.proofpoint.com/api-access
  • Acesso privilegiado à AWS (S3, IAM, Lambda e EventBridge)

Recolha os pré-requisitos do Emerging Threats Pro

  1. Inicie sessão no ET Intelligence Admin Portal em https://etadmin.proofpoint.com
  2. Aceda a Acesso à API
  3. Copie e guarde a sua chave da API
  4. Contacte o seu representante da Proofpoint para obter:
    • URL da lista de reputação de IPs detalhada
    • URL da lista de reputação do domínio detalhada

A ET Intelligence fornece ficheiros CSV separados para listas de reputação de IP e de domínio, atualizados de hora a hora. Use o formato "detalhado", que inclui estas colunas: * Lista de domínios: Domain Name, Category, Score, First Seen, Last Seen, Ports * Lista de IPs: IP Address, Category, Score, First Seen, Last Seen, Ports

Configure o contentor do AWS S3 e o IAM

Crie um contentor do S3

  1. Abra a consola do Amazon S3
  2. Clique em Criar contentor
  3. Nome do contentor: introduza et-pro-ioc-bucket (ou o nome preferido)
  4. Região: selecione a sua região preferida
  5. Clique em Criar contentor

Crie um utilizador do IAM para o Google SecOps

  1. Abra a consola do IAM
  2. Clique em Utilizadores > Criar utilizador
  3. Nome de utilizador: introduza secops-reader
  4. Clique em Seguinte
  5. Selecione Anexar políticas diretamente
  6. Clique em Criar política
  7. No editor JSON, introduza a seguinte política:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::et-pro-ioc-bucket/et-pro-ioc/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::et-pro-ioc-bucket",
          "Condition": {
            "StringLike": {
              "s3:prefix": ["et-pro-ioc/*"]
            }
          }
        }
      ]
    }
    
  8. Atribua um nome à política SecOpsReaderPolicy.

  9. Clique em Criar política.

  10. Regresse à criação de utilizadores e selecione a política criada recentemente.

  11. Clique em Seguinte > Criar utilizador.

  12. Aceda ao separador Credenciais de segurança.

  13. Clique em Criar chave de acesso.

  14. Selecione Serviço de terceiros.

  15. Clique em Criar chave de acesso.

  16. Transfira e guarde as credenciais.

Configure a função IAM para o Lambda

  1. Na consola do AWS, aceda a IAM > Funções > Criar função.
  2. Selecione Serviço AWS > Lambda.
  3. Clicar em Seguinte.
  4. Clique em Criar política.
  5. Selecione o separador JSON e introduza o seguinte:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::et-pro-ioc-bucket/et-pro-ioc/*"
        },
        {
          "Sid": "AllowStateManagement",
          "Effect": "Allow",
          "Action": ["s3:GetObject", "s3:PutObject"],
          "Resource": "arn:aws:s3:::et-pro-ioc-bucket/et-pro-ioc/state.json"
        }
      ]
    }
    
  6. Atribua um nome à política EtProIocLambdaPolicy.

  7. Clique em Criar política.

  8. Regresse à criação de funções e anexe a política.

  9. Atribua um nome à função EtProIocLambdaRole.

  10. Clique em Criar função.

Crie a função Lambda

  1. Na consola da AWS, aceda a Lambda > Functions > Create function.
  2. Clique em Criar do zero.
  3. Faculte os seguintes detalhes de configuração:

    • Nome da função: et-pro-ioc-fetcher
    • Tempo de execução: Python 3.13
    • Arquitetura: x86_64
    • Função de execução: usar função existente EtProIocLambdaRole
  4. Após a criação, aceda ao separador Código e substitua-o pelo seguinte:

    #!/usr/bin/env python3
    # Lambda: Fetch ET Pro IOC reputation lists and write raw CSV to S3
    import os
    import time
    import json
    from datetime import datetime
    from urllib.request import Request, urlopen
    from urllib.error import HTTPError, URLError
    import boto3
    
    # Environment variables
    BUCKET = os.environ["S3_BUCKET"]
    PREFIX = os.environ.get("S3_PREFIX", "et-pro-ioc/").strip("/")
    ET_API_KEY = os.environ["ET_API_KEY"]
    ET_IP_LIST_URL = os.environ["ET_IP_LIST_URL"]
    ET_DOMAIN_LIST_URL = os.environ["ET_DOMAIN_LIST_URL"]
    STATE_KEY = os.environ.get("STATE_KEY", f"{PREFIX}/state.json")
    TIMEOUT = int(os.environ.get("TIMEOUT", "120"))
    
    s3 = boto3.client("s3")
    
    def _build_request(url: str) -> Request:
        """Build request with ET API authentication"""
        if not url.lower().startswith("https://"):
            raise ValueError("Only HTTPS URLs are allowed")
    
        req = Request(url, method="GET")
        # ET Intelligence uses Authorization header with API key
        req.add_header("Authorization", ET_API_KEY)
        return req
    
    def fetch_with_retry(url: str, max_retries: int = 3) -> bytes:
        """Fetch URL with retry logic for rate limits"""
        for attempt in range(max_retries):
            try:
                req = _build_request(url)
                with urlopen(req, timeout=TIMEOUT) as response:
                    if response.status == 200:
                        return response.read()
                    elif response.status == 429:
                        # Rate limited, wait and retry
                        wait_time = min(30 * (2 ** attempt), 300)
                        print(f"Rate limited, waiting {wait_time}s...")
                        time.sleep(wait_time)
                    else:
                        raise HTTPError(url, response.status, response.reason, {}, None)
            except URLError as e:
                if attempt == max_retries - 1:
                    raise
                time.sleep(5 * (attempt + 1))
    
        raise Exception(f"Failed to fetch {url} after {max_retries} attempts")
    
    def save_to_s3(key: str, content: bytes):
        """Save content to S3 with appropriate content type"""
        s3.put_object(
            Bucket=BUCKET,
            Key=key,
            Body=content,
            ContentType="text/csv"
        )
        print(f"Saved {len(content)} bytes to s3://{BUCKET}/{key}")
    
    def get_state():
        """Get last fetch state from S3"""
        try:
            response = s3.get_object(Bucket=BUCKET, Key=STATE_KEY)
            return json.loads(response['Body'].read())
        except:
            return {}
    
    def save_state(state: dict):
        """Save fetch state to S3"""
        s3.put_object(
            Bucket=BUCKET,
            Key=STATE_KEY,
            Body=json.dumps(state, indent=2),
            ContentType="application/json"
        )
    
    def lambda_handler(event, context):
        """Main Lambda handler"""
        print("Starting ET Pro IOC fetch")
    
        # Generate timestamp for file naming
        now = datetime.utcnow()
        timestamp = now.strftime("%Y/%m/%d/%H%M%S")
    
        results = []
        errors = []
    
        # Fetch IP reputation list
        try:
            print(f"Fetching IP reputation list...")
            ip_data = fetch_with_retry(ET_IP_LIST_URL)
            ip_key = f"{PREFIX}/ip/{timestamp}.csv"
            save_to_s3(ip_key, ip_data)
            results.append({"type": "ip", "key": ip_key, "size": len(ip_data)})
        except Exception as e:
            error_msg = f"Failed to fetch IP list: {str(e)}"
            print(error_msg)
            errors.append(error_msg)
    
        # Fetch Domain reputation list
        try:
            print(f"Fetching Domain reputation list...")
            domain_data = fetch_with_retry(ET_DOMAIN_LIST_URL)
            domain_key = f"{PREFIX}/domain/{timestamp}.csv"
            save_to_s3(domain_key, domain_data)
            results.append({"type": "domain", "key": domain_key, "size": len(domain_data)})
        except Exception as e:
            error_msg = f"Failed to fetch Domain list: {str(e)}"
            print(error_msg)
            errors.append(error_msg)
    
        # Save state
        state = {
            "last_fetch": now.isoformat(),
            "results": results,
            "errors": errors
        }
        save_state(state)
    
        return {
            "statusCode": 200 if not errors else 207,
            "body": json.dumps(state)
        }
    
  5. Aceda a Configuração > Configuração geral.

  6. Clique em Edit.

  7. Defina o Limite de tempo para 5 minutos.

  8. Clique em Guardar.

Configure variáveis de ambiente

  1. Aceda a Configuração > Variáveis de ambiente.
  2. Clique em Editar > Adicionar variável de ambiente.
  3. Adicione as seguintes variáveis:

    Chave Valor
    S3_BUCKET et-pro-ioc-bucket
    S3_PREFIX et-pro-ioc
    STATE_KEY et-pro-ioc/state.json
    ET_API_KEY [Your ET API Key]
    ET_IP_LIST_URL [Your detailed IP list URL]
    ET_DOMAIN_LIST_URL [Your detailed Domain list URL]
    TIMEOUT 120
  4. Clique em Guardar.

Contacte o seu representante da Proofpoint para saber os URLs exatos da sua subscrição. Normalmente, os URLs de formato detalhado seguem este padrão: * Lista de IPs: https://rules.emergingthreatspro.com/[your-code]/reputation/detailed-iprepdata.txt * Lista de domínios: https://rules.emergingthreatspro.com/[your-code]/reputation/detailed-domainrepdata.txt

Crie uma programação do EventBridge

  1. Aceda a Amazon EventBridge > Schedules > Create schedule
  2. Nome da programação: et-pro-ioc-hourly
  3. Padrão de horário: horário baseado na taxa
  4. Taxa de expressão: 1 hora
  5. Clique em Seguinte
  6. Destino: função Lambda
  7. Função: et-pro-ioc-fetcher
  8. Clique em Seguinte nos restantes passos
  9. Clique em Criar horário

Configure feeds no Google SecOps

Tem de criar dois feeds separados: um para a reputação de IP e outro para a reputação de domínio.

Crie um feed de reputação de IP

  1. Aceda a Definições do SIEM > Feeds
  2. Clique em Adicionar novo
  3. No campo Nome do feed, introduza ET Pro IOC - IP Reputation
  4. Na lista Tipo de origem, selecione Amazon S3
  5. Selecione Emerging Threats Pro como o Tipo de registo
  6. Clique em Seguinte
  7. Especifique valores para os seguintes parâmetros de entrada:
    • URI do S3: s3://et-pro-ioc-bucket/et-pro-ioc/ip/
    • Opções de eliminação de origens: selecione de acordo com a sua preferência
    • Idade máxima do ficheiro: inclua ficheiros modificados no último número de dias. A predefinição é 180 dias.
    • ID da chave de acesso: chave de acesso de leitura do SecOps
    • Chave de acesso secreta: chave secreta do leitor do SecOps
    • Espaço de nomes do recurso: o espaço de nomes do recurso.
    • Etiquetas de carregamento: a etiqueta aplicada aos eventos deste feed.
  8. Clique em Seguinte
  9. Reveja e clique em Enviar

Crie um feed de reputação do domínio

  1. Repita o processo de criação do feed.
  2. No campo Nome do feed, introduza ET Pro IOC - Domain Reputation.
  3. Na lista Tipo de origem, selecione Amazon S3.
  4. Selecione Emerging Threats Pro como o Tipo de registo.
  5. Clicar em Seguinte.
  6. Especifique valores para os seguintes parâmetros de entrada:
    • URI do S3: s3://et-pro-ioc-bucket/et-pro-ioc/domain/
    • Opções de eliminação de origens: selecione de acordo com a sua preferência
    • Idade máxima do ficheiro: inclua ficheiros modificados no último número de dias. A predefinição é 180 dias.
    • ID da chave de acesso: chave de acesso de leitura do SecOps
    • Chave de acesso secreta: chave secreta do leitor do SecOps
    • Espaço de nomes do recurso: o espaço de nomes do recurso.
    • Etiquetas de carregamento: a etiqueta aplicada aos eventos deste feed.
  7. Clique em Seguinte
  8. Reveja e clique em Enviar

Tabela de mapeamento da UDM

Campo de registo Mapeamento do UDM Lógica
categoria Este campo é usado na lógica do analisador, mas não é mapeado diretamente para o UDM. Determina o valor de event.ioc.categorization através de uma tabela de pesquisa.
collection_time.nanos event.idm.entity.metadata.collected_timestamp.nanos Mapeado diretamente a partir do registo não processado.
collection_time.seconds event.idm.entity.metadata.collected_timestamp.seconds Mapeado diretamente a partir do registo não processado.
dados Este campo é analisado em vários campos da UDM com base no respetivo conteúdo.
first_seen event.idm.entity.metadata.interval.start_time Analisado como uma data e mapeado para o MDU.
first_seen event.ioc.active_timerange.start Analisado como uma data e mapeado para o MDU.
ip_or_domain event.idm.entity.entity.hostname Mapeado para o UDM se o padrão grok extrair um anfitrião do campo.
ip_or_domain event.idm.entity.entity.ip Mapeado para o UDM se o padrão grok não extrair um anfitrião do campo.
ip_or_domain event.ioc.domain_and_ports.domain Mapeado para o UDM se o padrão grok extrair um anfitrião do campo.
ip_or_domain event.ioc.ip_and_ports.ip_address Mapeado para o UDM se o padrão grok não extrair um anfitrião do campo.
last_seen event.idm.entity.metadata.interval.end_time Analisado como uma data e mapeado para o MDU.
last_seen event.ioc.active_timerange.end Analisado como uma data e mapeado para o MDU.
portas event.idm.entity.entity.labels.value Analisado, unido com um delimitador de vírgula e mapeado para o UDM se existirem várias portas.
portas event.idm.entity.entity.port Analisa e mapeia para o UDM se existir apenas uma porta.
portas event.ioc.domain_and_ports.ports Analisa e mapeia para o UDM se o padrão grok extrair um anfitrião do campo.
portas event.ioc.ip_and_ports.ports Analisa e mapeia para o UDM se o padrão grok não extrair um anfitrião do campo.
pontuação event.ioc.confidence_score Mapeado diretamente a partir do registo não processado.
event.idm.entity.entity.labels.key Definido como "ports" se existirem várias portas.
event.idm.entity.metadata.entity_type Defina como "DOMAIN_NAME" se o padrão grok extrair um anfitrião do campo ip_or_domain. Caso contrário, defina como "IP_ADDRESS".
event.idm.entity.metadata.threat.category Definido como "SOFTWARE_MALICIOUS".
event.idm.entity.metadata.threat.category_details Derivado do campo category através de uma tabela de pesquisa.
event.idm.entity.metadata.threat.threat_name Definir como "ET Intelligence Rep List".
event.idm.entity.metadata.vendor_name Definido como "ET_PRO_IOC".
event.ioc.feed_name Definir como "ET Intelligence Rep List".
event.ioc.raw_severity Definido como "Malicioso".
timestamp.nanos Copiado de collection_time.nanos.
timestamp.seconds Copiado de collection_time.seconds.

Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais da Google SecOps.