Coletar registros de IOCs do Proofpoint Emerging Threats Pro
Este documento explica como ingerir registros de IOC do Proofpoint Emerging Threats Pro no Google Security Operations usando o Amazon S3. A Emerging Threats Intelligence publica listas de reputação por hora para IPs e domínios em formato CSV com dados de inteligência contra ameaças, incluindo categorias, pontuações e informações temporais. O código do analisador processa dados de inteligência de ameaças ET_PRO formatados em CSV. Ele extrai endereços IP, domínios, categorias, pontuações e outras informações relevantes, mapeando-os para um formato de IOC padronizado e para o esquema UDM do Chronicle para análise e uso posteriores no Google SecOps.
Antes de começar
Verifique se você tem os pré-requisitos a seguir:
- Uma instância do Google SecOps com permissões para criar feeds
- Assinatura da Proofpoint ET Intelligence com acesso a listas de reputação.
- Chave da API ET Intelligence em https://etadmin.proofpoint.com/api-access
- Acesso privilegiado à AWS (S3, IAM, Lambda, EventBridge)
Reúna os pré-requisitos do Emerging Threats Pro
- Faça login no portal de administração da ET Intelligence em https://etadmin.proofpoint.com.
- Acesse Acesso à API.
- Copie e salve sua chave de API.
- Entre em contato com seu representante da Proofpoint para receber:
- URL da lista detalhada de reputação de IP
- URL da lista detalhada de reputação do domínio
A ET Intelligence fornece arquivos CSV separados para listas de reputação de IP e domínio, atualizadas a cada hora. Use o formato "detalhado", que inclui estas colunas:
* Lista de domínios: Domain Name, Category, Score, First Seen, Last Seen, Ports
* Lista de IPs: IP Address, Category, Score, First Seen, Last Seen, Ports
Configurar o bucket do AWS S3 e o IAM
Criar bucket do S3
- Abra o console do Amazon S3.
- Clique em Criar bucket.
- Nome do bucket: insira
et-pro-ioc-bucket(ou o nome de sua preferência). - Região: selecione a região de sua preferência.
- Clique em Criar bucket.
Criar um usuário do IAM para o Google SecOps
- Abra o console do IAM.
- Clique em Usuários > Criar usuário.
- Nome de usuário: insira
secops-reader. - Clique em Próximo.
- Selecione Anexar políticas diretamente.
- Clique em Criar política.
No editor JSON, insira a seguinte política:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::et-pro-ioc-bucket/et-pro-ioc/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::et-pro-ioc-bucket", "Condition": { "StringLike": { "s3:prefix": ["et-pro-ioc/*"] } } } ] }Nomeie a política como
SecOpsReaderPolicy.Clique em Criar política.
Volte à criação de usuários e selecione a política recém-criada.
Clique em Próxima > Criar usuário.
Acesse a guia Credenciais de segurança.
Clique em Criar chave de acesso.
Selecione Serviço de terceiros.
Clique em Criar chave de acesso.
Faça o download e salve as credenciais.
Configurar o papel do IAM para o Lambda
- No console da AWS, acesse IAM > Papéis > Criar papel.
- Selecione Serviço da AWS > Lambda.
- Clique em Próxima.
- Clique em Criar política.
Selecione a guia JSON e insira o seguinte:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::et-pro-ioc-bucket/et-pro-ioc/*" }, { "Sid": "AllowStateManagement", "Effect": "Allow", "Action": ["s3:GetObject", "s3:PutObject"], "Resource": "arn:aws:s3:::et-pro-ioc-bucket/et-pro-ioc/state.json" } ] }Nomeie a política como
EtProIocLambdaPolicy.Clique em Criar política.
Volte à criação de papéis e anexe a política.
Nomeie a função como
EtProIocLambdaRole.Clique em Criar papel.
Criar a função Lambda
- No console da AWS, acesse Lambda > Functions > Create function.
- Clique em Criar do zero.
Informe os seguintes detalhes de configuração:
- Nome da função:
et-pro-ioc-fetcher - Tempo de execução: Python 3.13
- Arquitetura: x86_64
- Função de execução: use a função
EtProIocLambdaRole
- Nome da função:
Depois da criação, acesse a guia Código e substitua por:
#!/usr/bin/env python3 # Lambda: Fetch ET Pro IOC reputation lists and write raw CSV to S3 import os import time import json from datetime import datetime from urllib.request import Request, urlopen from urllib.error import HTTPError, URLError import boto3 # Environment variables BUCKET = os.environ["S3_BUCKET"] PREFIX = os.environ.get("S3_PREFIX", "et-pro-ioc/").strip("/") ET_API_KEY = os.environ["ET_API_KEY"] ET_IP_LIST_URL = os.environ["ET_IP_LIST_URL"] ET_DOMAIN_LIST_URL = os.environ["ET_DOMAIN_LIST_URL"] STATE_KEY = os.environ.get("STATE_KEY", f"{PREFIX}/state.json") TIMEOUT = int(os.environ.get("TIMEOUT", "120")) s3 = boto3.client("s3") def _build_request(url: str) -> Request: """Build request with ET API authentication""" if not url.lower().startswith("https://"): raise ValueError("Only HTTPS URLs are allowed") req = Request(url, method="GET") # ET Intelligence uses Authorization header with API key req.add_header("Authorization", ET_API_KEY) return req def fetch_with_retry(url: str, max_retries: int = 3) -> bytes: """Fetch URL with retry logic for rate limits""" for attempt in range(max_retries): try: req = _build_request(url) with urlopen(req, timeout=TIMEOUT) as response: if response.status == 200: return response.read() elif response.status == 429: # Rate limited, wait and retry wait_time = min(30 * (2 ** attempt), 300) print(f"Rate limited, waiting {wait_time}s...") time.sleep(wait_time) else: raise HTTPError(url, response.status, response.reason, {}, None) except URLError as e: if attempt == max_retries - 1: raise time.sleep(5 * (attempt + 1)) raise Exception(f"Failed to fetch {url} after {max_retries} attempts") def save_to_s3(key: str, content: bytes): """Save content to S3 with appropriate content type""" s3.put_object( Bucket=BUCKET, Key=key, Body=content, ContentType="text/csv" ) print(f"Saved {len(content)} bytes to s3://{BUCKET}/{key}") def get_state(): """Get last fetch state from S3""" try: response = s3.get_object(Bucket=BUCKET, Key=STATE_KEY) return json.loads(response['Body'].read()) except: return {} def save_state(state: dict): """Save fetch state to S3""" s3.put_object( Bucket=BUCKET, Key=STATE_KEY, Body=json.dumps(state, indent=2), ContentType="application/json" ) def lambda_handler(event, context): """Main Lambda handler""" print("Starting ET Pro IOC fetch") # Generate timestamp for file naming now = datetime.utcnow() timestamp = now.strftime("%Y/%m/%d/%H%M%S") results = [] errors = [] # Fetch IP reputation list try: print(f"Fetching IP reputation list...") ip_data = fetch_with_retry(ET_IP_LIST_URL) ip_key = f"{PREFIX}/ip/{timestamp}.csv" save_to_s3(ip_key, ip_data) results.append({"type": "ip", "key": ip_key, "size": len(ip_data)}) except Exception as e: error_msg = f"Failed to fetch IP list: {str(e)}" print(error_msg) errors.append(error_msg) # Fetch Domain reputation list try: print(f"Fetching Domain reputation list...") domain_data = fetch_with_retry(ET_DOMAIN_LIST_URL) domain_key = f"{PREFIX}/domain/{timestamp}.csv" save_to_s3(domain_key, domain_data) results.append({"type": "domain", "key": domain_key, "size": len(domain_data)}) except Exception as e: error_msg = f"Failed to fetch Domain list: {str(e)}" print(error_msg) errors.append(error_msg) # Save state state = { "last_fetch": now.isoformat(), "results": results, "errors": errors } save_state(state) return { "statusCode": 200 if not errors else 207, "body": json.dumps(state) }Acesse Configuração > Configuração geral.
Clique em Editar.
Defina o Tempo limite como 5 minutos.
Clique em Salvar.
Configure as variáveis de ambiente
- Acesse Configuração > Variáveis de ambiente.
- Clique em Editar > Adicionar variável de ambiente.
Adicione as seguintes variáveis:
Chave Valor S3_BUCKETet-pro-ioc-bucketS3_PREFIXet-pro-iocSTATE_KEYet-pro-ioc/state.jsonET_API_KEY[Your ET API Key]ET_IP_LIST_URL[Your detailed IP list URL]ET_DOMAIN_LIST_URL[Your detailed Domain list URL]TIMEOUT120Clique em Salvar.
Entre em contato com seu representante da Proofpoint para saber os URLs exatos da sua assinatura. Os URLs de formato detalhado geralmente seguem este padrão:
* Lista de IPs: https://rules.emergingthreatspro.com/[your-code]/reputation/detailed-iprepdata.txt
* Lista de domínios: https://rules.emergingthreatspro.com/[your-code]/reputation/detailed-domainrepdata.txt
Criar uma programação do EventBridge
- Acesse Amazon EventBridge > Agendamentos > Criar agendamento.
- Nome da programação:
et-pro-ioc-hourly - Padrão de programação: programação com base em taxa
- Expressão de taxa: 1 hora
- Clique em Próximo.
- Destino: função Lambda
- Função:
et-pro-ioc-fetcher - Clique em Próxima nas etapas restantes.
- Clique em Criar programação.
Configurar feeds no Google SecOps
É necessário criar dois feeds separados: um para reputação de IP e outro para reputação de domínio.
Criar um feed de reputação de IP
- Acesse Configurações do SIEM > Feeds.
- Clique em Adicionar novo.
- No campo Nome do feed, insira
ET Pro IOC - IP Reputation. - Na lista Tipo de origem, selecione Amazon S3.
- Selecione Emerging Threats Pro como o Tipo de registro.
- Clique em Próximo.
- Especifique valores para os seguintes parâmetros de entrada:
- URI do S3:
s3://et-pro-ioc-bucket/et-pro-ioc/ip/ - Opções de exclusão da fonte: selecione de acordo com sua preferência
- Idade máxima do arquivo: inclui arquivos modificados no último número de dias. O padrão é de 180 dias.
- ID da chave de acesso: chave de acesso do leitor do SecOps
- Chave de acesso secreta: chave secreta do leitor de SecOps.
- Namespace do recurso: o namespace do recurso.
- Rótulos de ingestão: o rótulo aplicado aos eventos deste feed.
- URI do S3:
- Clique em Próximo.
- Revise e clique em Enviar.
Criar feed de reputação de domínio
- Repita o processo de criação do feed.
- No campo Nome do feed, insira
ET Pro IOC - Domain Reputation. - Na lista Tipo de origem, selecione Amazon S3.
- Selecione Emerging Threats Pro como o Tipo de registro.
- Clique em Próxima.
- Especifique valores para os seguintes parâmetros de entrada:
- URI do S3:
s3://et-pro-ioc-bucket/et-pro-ioc/domain/ - Opções de exclusão da fonte: selecione de acordo com sua preferência
- Idade máxima do arquivo: inclui arquivos modificados no último número de dias. O padrão é de 180 dias.
- ID da chave de acesso: chave de acesso do leitor do SecOps
- Chave de acesso secreta: chave secreta do leitor de SecOps.
- Namespace do recurso: o namespace do recurso.
- Rótulos de ingestão: o rótulo aplicado aos eventos deste feed.
- URI do S3:
- Clique em Próximo.
- Revise e clique em Enviar.
Tabela de mapeamento do UDM
| Campo de registro | Mapeamento do UDM | Lógica |
|---|---|---|
| categoria | Esse campo é usado na lógica do analisador, mas não é mapeado diretamente para a UDM. Ele determina o valor de event.ioc.categorization usando uma tabela de pesquisa. |
|
| collection_time.nanos | event.idm.entity.metadata.collected_timestamp.nanos | Mapeado diretamente do registro bruto. |
| collection_time.seconds | event.idm.entity.metadata.collected_timestamp.seconds | Mapeado diretamente do registro bruto. |
| dados | Esse campo é analisado em vários campos da UDM com base no conteúdo dele. | |
| first_seen | event.idm.entity.metadata.interval.start_time | Analisado como uma data e mapeado para o UDM. |
| first_seen | event.ioc.active_timerange.start | Analisado como uma data e mapeado para o UDM. |
| ip_or_domain | event.idm.entity.entity.hostname | Mapeado para o UDM se o padrão grok extrair um host do campo. |
| ip_or_domain | event.idm.entity.entity.ip | Mapeado para o UDM se o padrão grok não extrair um host do campo. |
| ip_or_domain | event.ioc.domain_and_ports.domain | Mapeado para o UDM se o padrão grok extrair um host do campo. |
| ip_or_domain | event.ioc.ip_and_ports.ip_address | Mapeado para o UDM se o padrão grok não extrair um host do campo. |
| last_seen | event.idm.entity.metadata.interval.end_time | Analisado como uma data e mapeado para o UDM. |
| last_seen | event.ioc.active_timerange.end | Analisado como uma data e mapeado para o UDM. |
| portas | event.idm.entity.entity.labels.value | Analisado, unido com delimitador de vírgula e mapeado para a UDM se houver várias portas. |
| portas | event.idm.entity.entity.port | Analisado e mapeado para a UDM se houver apenas uma porta. |
| portas | event.ioc.domain_and_ports.ports | Analisado e mapeado para o UDM se o padrão grok extrair um host do campo. |
| portas | event.ioc.ip_and_ports.ports | Analisado e mapeado para o UDM se o padrão grok não extrair um host do campo. |
| score | event.ioc.confidence_score | Mapeado diretamente do registro bruto. |
| event.idm.entity.entity.labels.key | Defina como "ports" se houver várias portas. | |
| event.idm.entity.metadata.entity_type | Defina como "DOMAIN_NAME" se o padrão grok extrair um host do campo ip_or_domain. Caso contrário, defina como "IP_ADDRESS". |
|
| event.idm.entity.metadata.threat.category | Definido como "SOFTWARE_MALICIOUS". | |
| event.idm.entity.metadata.threat.category_details | Derivado do campo category usando uma tabela de pesquisa. |
|
| event.idm.entity.metadata.threat.threat_name | Defina como "ET Intelligence Rep List". | |
| event.idm.entity.metadata.vendor_name | Defina como "ET_PRO_IOC". | |
| event.ioc.feed_name | Defina como "ET Intelligence Rep List". | |
| event.ioc.raw_severity | Defina como "Malicious". | |
| timestamp.nanos | Copiado de collection_time.nanos. |
|
| timestamp.seconds | Copiado de collection_time.seconds. |
Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais do Google SecOps.