Coletar registros de IOCs do Proofpoint Emerging Threats Pro

Compatível com:

Este documento explica como ingerir registros de IOC do Proofpoint Emerging Threats Pro no Google Security Operations usando o Amazon S3. A Emerging Threats Intelligence publica listas de reputação por hora para IPs e domínios em formato CSV com dados de inteligência contra ameaças, incluindo categorias, pontuações e informações temporais. O código do analisador processa dados de inteligência de ameaças ET_PRO formatados em CSV. Ele extrai endereços IP, domínios, categorias, pontuações e outras informações relevantes, mapeando-os para um formato de IOC padronizado e para o esquema UDM do Chronicle para análise e uso posteriores no Google SecOps.

Antes de começar

Verifique se você tem os pré-requisitos a seguir:

  • Uma instância do Google SecOps com permissões para criar feeds
  • Assinatura da Proofpoint ET Intelligence com acesso a listas de reputação.
  • Chave da API ET Intelligence em https://etadmin.proofpoint.com/api-access
  • Acesso privilegiado à AWS (S3, IAM, Lambda, EventBridge)

Reúna os pré-requisitos do Emerging Threats Pro

  1. Faça login no portal de administração da ET Intelligence em https://etadmin.proofpoint.com.
  2. Acesse Acesso à API.
  3. Copie e salve sua chave de API.
  4. Entre em contato com seu representante da Proofpoint para receber:
    • URL da lista detalhada de reputação de IP
    • URL da lista detalhada de reputação do domínio

A ET Intelligence fornece arquivos CSV separados para listas de reputação de IP e domínio, atualizadas a cada hora. Use o formato "detalhado", que inclui estas colunas: * Lista de domínios: Domain Name, Category, Score, First Seen, Last Seen, Ports * Lista de IPs: IP Address, Category, Score, First Seen, Last Seen, Ports

Configurar o bucket do AWS S3 e o IAM

Criar bucket do S3

  1. Abra o console do Amazon S3.
  2. Clique em Criar bucket.
  3. Nome do bucket: insira et-pro-ioc-bucket (ou o nome de sua preferência).
  4. Região: selecione a região de sua preferência.
  5. Clique em Criar bucket.

Criar um usuário do IAM para o Google SecOps

  1. Abra o console do IAM.
  2. Clique em Usuários > Criar usuário.
  3. Nome de usuário: insira secops-reader.
  4. Clique em Próximo.
  5. Selecione Anexar políticas diretamente.
  6. Clique em Criar política.
  7. No editor JSON, insira a seguinte política:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::et-pro-ioc-bucket/et-pro-ioc/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::et-pro-ioc-bucket",
          "Condition": {
            "StringLike": {
              "s3:prefix": ["et-pro-ioc/*"]
            }
          }
        }
      ]
    }
    
  8. Nomeie a política como SecOpsReaderPolicy.

  9. Clique em Criar política.

  10. Volte à criação de usuários e selecione a política recém-criada.

  11. Clique em Próxima > Criar usuário.

  12. Acesse a guia Credenciais de segurança.

  13. Clique em Criar chave de acesso.

  14. Selecione Serviço de terceiros.

  15. Clique em Criar chave de acesso.

  16. Faça o download e salve as credenciais.

Configurar o papel do IAM para o Lambda

  1. No console da AWS, acesse IAM > Papéis > Criar papel.
  2. Selecione Serviço da AWS > Lambda.
  3. Clique em Próxima.
  4. Clique em Criar política.
  5. Selecione a guia JSON e insira o seguinte:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::et-pro-ioc-bucket/et-pro-ioc/*"
        },
        {
          "Sid": "AllowStateManagement",
          "Effect": "Allow",
          "Action": ["s3:GetObject", "s3:PutObject"],
          "Resource": "arn:aws:s3:::et-pro-ioc-bucket/et-pro-ioc/state.json"
        }
      ]
    }
    
  6. Nomeie a política como EtProIocLambdaPolicy.

  7. Clique em Criar política.

  8. Volte à criação de papéis e anexe a política.

  9. Nomeie a função como EtProIocLambdaRole.

  10. Clique em Criar papel.

Criar a função Lambda

  1. No console da AWS, acesse Lambda > Functions > Create function.
  2. Clique em Criar do zero.
  3. Informe os seguintes detalhes de configuração:

    • Nome da função: et-pro-ioc-fetcher
    • Tempo de execução: Python 3.13
    • Arquitetura: x86_64
    • Função de execução: use a função EtProIocLambdaRole
  4. Depois da criação, acesse a guia Código e substitua por:

    #!/usr/bin/env python3
    # Lambda: Fetch ET Pro IOC reputation lists and write raw CSV to S3
    import os
    import time
    import json
    from datetime import datetime
    from urllib.request import Request, urlopen
    from urllib.error import HTTPError, URLError
    import boto3
    
    # Environment variables
    BUCKET = os.environ["S3_BUCKET"]
    PREFIX = os.environ.get("S3_PREFIX", "et-pro-ioc/").strip("/")
    ET_API_KEY = os.environ["ET_API_KEY"]
    ET_IP_LIST_URL = os.environ["ET_IP_LIST_URL"]
    ET_DOMAIN_LIST_URL = os.environ["ET_DOMAIN_LIST_URL"]
    STATE_KEY = os.environ.get("STATE_KEY", f"{PREFIX}/state.json")
    TIMEOUT = int(os.environ.get("TIMEOUT", "120"))
    
    s3 = boto3.client("s3")
    
    def _build_request(url: str) -> Request:
        """Build request with ET API authentication"""
        if not url.lower().startswith("https://"):
            raise ValueError("Only HTTPS URLs are allowed")
    
        req = Request(url, method="GET")
        # ET Intelligence uses Authorization header with API key
        req.add_header("Authorization", ET_API_KEY)
        return req
    
    def fetch_with_retry(url: str, max_retries: int = 3) -> bytes:
        """Fetch URL with retry logic for rate limits"""
        for attempt in range(max_retries):
            try:
                req = _build_request(url)
                with urlopen(req, timeout=TIMEOUT) as response:
                    if response.status == 200:
                        return response.read()
                    elif response.status == 429:
                        # Rate limited, wait and retry
                        wait_time = min(30 * (2 ** attempt), 300)
                        print(f"Rate limited, waiting {wait_time}s...")
                        time.sleep(wait_time)
                    else:
                        raise HTTPError(url, response.status, response.reason, {}, None)
            except URLError as e:
                if attempt == max_retries - 1:
                    raise
                time.sleep(5 * (attempt + 1))
    
        raise Exception(f"Failed to fetch {url} after {max_retries} attempts")
    
    def save_to_s3(key: str, content: bytes):
        """Save content to S3 with appropriate content type"""
        s3.put_object(
            Bucket=BUCKET,
            Key=key,
            Body=content,
            ContentType="text/csv"
        )
        print(f"Saved {len(content)} bytes to s3://{BUCKET}/{key}")
    
    def get_state():
        """Get last fetch state from S3"""
        try:
            response = s3.get_object(Bucket=BUCKET, Key=STATE_KEY)
            return json.loads(response['Body'].read())
        except:
            return {}
    
    def save_state(state: dict):
        """Save fetch state to S3"""
        s3.put_object(
            Bucket=BUCKET,
            Key=STATE_KEY,
            Body=json.dumps(state, indent=2),
            ContentType="application/json"
        )
    
    def lambda_handler(event, context):
        """Main Lambda handler"""
        print("Starting ET Pro IOC fetch")
    
        # Generate timestamp for file naming
        now = datetime.utcnow()
        timestamp = now.strftime("%Y/%m/%d/%H%M%S")
    
        results = []
        errors = []
    
        # Fetch IP reputation list
        try:
            print(f"Fetching IP reputation list...")
            ip_data = fetch_with_retry(ET_IP_LIST_URL)
            ip_key = f"{PREFIX}/ip/{timestamp}.csv"
            save_to_s3(ip_key, ip_data)
            results.append({"type": "ip", "key": ip_key, "size": len(ip_data)})
        except Exception as e:
            error_msg = f"Failed to fetch IP list: {str(e)}"
            print(error_msg)
            errors.append(error_msg)
    
        # Fetch Domain reputation list
        try:
            print(f"Fetching Domain reputation list...")
            domain_data = fetch_with_retry(ET_DOMAIN_LIST_URL)
            domain_key = f"{PREFIX}/domain/{timestamp}.csv"
            save_to_s3(domain_key, domain_data)
            results.append({"type": "domain", "key": domain_key, "size": len(domain_data)})
        except Exception as e:
            error_msg = f"Failed to fetch Domain list: {str(e)}"
            print(error_msg)
            errors.append(error_msg)
    
        # Save state
        state = {
            "last_fetch": now.isoformat(),
            "results": results,
            "errors": errors
        }
        save_state(state)
    
        return {
            "statusCode": 200 if not errors else 207,
            "body": json.dumps(state)
        }
    
  5. Acesse Configuração > Configuração geral.

  6. Clique em Editar.

  7. Defina o Tempo limite como 5 minutos.

  8. Clique em Salvar.

Configure as variáveis de ambiente

  1. Acesse Configuração > Variáveis de ambiente.
  2. Clique em Editar > Adicionar variável de ambiente.
  3. Adicione as seguintes variáveis:

    Chave Valor
    S3_BUCKET et-pro-ioc-bucket
    S3_PREFIX et-pro-ioc
    STATE_KEY et-pro-ioc/state.json
    ET_API_KEY [Your ET API Key]
    ET_IP_LIST_URL [Your detailed IP list URL]
    ET_DOMAIN_LIST_URL [Your detailed Domain list URL]
    TIMEOUT 120
  4. Clique em Salvar.

Entre em contato com seu representante da Proofpoint para saber os URLs exatos da sua assinatura. Os URLs de formato detalhado geralmente seguem este padrão: * Lista de IPs: https://rules.emergingthreatspro.com/[your-code]/reputation/detailed-iprepdata.txt * Lista de domínios: https://rules.emergingthreatspro.com/[your-code]/reputation/detailed-domainrepdata.txt

Criar uma programação do EventBridge

  1. Acesse Amazon EventBridge > Agendamentos > Criar agendamento.
  2. Nome da programação: et-pro-ioc-hourly
  3. Padrão de programação: programação com base em taxa
  4. Expressão de taxa: 1 hora
  5. Clique em Próximo.
  6. Destino: função Lambda
  7. Função: et-pro-ioc-fetcher
  8. Clique em Próxima nas etapas restantes.
  9. Clique em Criar programação.

Configurar feeds no Google SecOps

É necessário criar dois feeds separados: um para reputação de IP e outro para reputação de domínio.

Criar um feed de reputação de IP

  1. Acesse Configurações do SIEM > Feeds.
  2. Clique em Adicionar novo.
  3. No campo Nome do feed, insira ET Pro IOC - IP Reputation.
  4. Na lista Tipo de origem, selecione Amazon S3.
  5. Selecione Emerging Threats Pro como o Tipo de registro.
  6. Clique em Próximo.
  7. Especifique valores para os seguintes parâmetros de entrada:
    • URI do S3: s3://et-pro-ioc-bucket/et-pro-ioc/ip/
    • Opções de exclusão da fonte: selecione de acordo com sua preferência
    • Idade máxima do arquivo: inclui arquivos modificados no último número de dias. O padrão é de 180 dias.
    • ID da chave de acesso: chave de acesso do leitor do SecOps
    • Chave de acesso secreta: chave secreta do leitor de SecOps.
    • Namespace do recurso: o namespace do recurso.
    • Rótulos de ingestão: o rótulo aplicado aos eventos deste feed.
  8. Clique em Próximo.
  9. Revise e clique em Enviar.

Criar feed de reputação de domínio

  1. Repita o processo de criação do feed.
  2. No campo Nome do feed, insira ET Pro IOC - Domain Reputation.
  3. Na lista Tipo de origem, selecione Amazon S3.
  4. Selecione Emerging Threats Pro como o Tipo de registro.
  5. Clique em Próxima.
  6. Especifique valores para os seguintes parâmetros de entrada:
    • URI do S3: s3://et-pro-ioc-bucket/et-pro-ioc/domain/
    • Opções de exclusão da fonte: selecione de acordo com sua preferência
    • Idade máxima do arquivo: inclui arquivos modificados no último número de dias. O padrão é de 180 dias.
    • ID da chave de acesso: chave de acesso do leitor do SecOps
    • Chave de acesso secreta: chave secreta do leitor de SecOps.
    • Namespace do recurso: o namespace do recurso.
    • Rótulos de ingestão: o rótulo aplicado aos eventos deste feed.
  7. Clique em Próximo.
  8. Revise e clique em Enviar.

Tabela de mapeamento do UDM

Campo de registro Mapeamento do UDM Lógica
categoria Esse campo é usado na lógica do analisador, mas não é mapeado diretamente para a UDM. Ele determina o valor de event.ioc.categorization usando uma tabela de pesquisa.
collection_time.nanos event.idm.entity.metadata.collected_timestamp.nanos Mapeado diretamente do registro bruto.
collection_time.seconds event.idm.entity.metadata.collected_timestamp.seconds Mapeado diretamente do registro bruto.
dados Esse campo é analisado em vários campos da UDM com base no conteúdo dele.
first_seen event.idm.entity.metadata.interval.start_time Analisado como uma data e mapeado para o UDM.
first_seen event.ioc.active_timerange.start Analisado como uma data e mapeado para o UDM.
ip_or_domain event.idm.entity.entity.hostname Mapeado para o UDM se o padrão grok extrair um host do campo.
ip_or_domain event.idm.entity.entity.ip Mapeado para o UDM se o padrão grok não extrair um host do campo.
ip_or_domain event.ioc.domain_and_ports.domain Mapeado para o UDM se o padrão grok extrair um host do campo.
ip_or_domain event.ioc.ip_and_ports.ip_address Mapeado para o UDM se o padrão grok não extrair um host do campo.
last_seen event.idm.entity.metadata.interval.end_time Analisado como uma data e mapeado para o UDM.
last_seen event.ioc.active_timerange.end Analisado como uma data e mapeado para o UDM.
portas event.idm.entity.entity.labels.value Analisado, unido com delimitador de vírgula e mapeado para a UDM se houver várias portas.
portas event.idm.entity.entity.port Analisado e mapeado para a UDM se houver apenas uma porta.
portas event.ioc.domain_and_ports.ports Analisado e mapeado para o UDM se o padrão grok extrair um host do campo.
portas event.ioc.ip_and_ports.ports Analisado e mapeado para o UDM se o padrão grok não extrair um host do campo.
score event.ioc.confidence_score Mapeado diretamente do registro bruto.
event.idm.entity.entity.labels.key Defina como "ports" se houver várias portas.
event.idm.entity.metadata.entity_type Defina como "DOMAIN_NAME" se o padrão grok extrair um host do campo ip_or_domain. Caso contrário, defina como "IP_ADDRESS".
event.idm.entity.metadata.threat.category Definido como "SOFTWARE_MALICIOUS".
event.idm.entity.metadata.threat.category_details Derivado do campo category usando uma tabela de pesquisa.
event.idm.entity.metadata.threat.threat_name Defina como "ET Intelligence Rep List".
event.idm.entity.metadata.vendor_name Defina como "ET_PRO_IOC".
event.ioc.feed_name Defina como "ET Intelligence Rep List".
event.ioc.raw_severity Defina como "Malicious".
timestamp.nanos Copiado de collection_time.nanos.
timestamp.seconds Copiado de collection_time.seconds.

Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais do Google SecOps.