Recolha registos de IOCs do Proofpoint Emerging Threats Pro
Este documento explica como carregar registos de IOCs do Proofpoint Emerging Threats Pro para o Google Security Operations através do Amazon S3. A Emerging Threats Intelligence publica listas de reputação por hora para IPs e domínios no formato CSV com dados de informações sobre ameaças, incluindo categorias, classificações e informações temporais. O código do analisador processa dados de informações sobre ameaças ET_PRO formatados em CSV. Extrai endereços IP, domínios, categorias, classificações e outras informações relevantes, mapeando-os para um formato de IOC padronizado e para o esquema UDM do Chronicle para análise e utilização adicionais no Google SecOps.
Antes de começar
Certifique-se de que cumpre os seguintes pré-requisitos:
- Uma instância do Google SecOps com autorizações para criar feeds
- Subscrição da Proofpoint ET Intelligence com acesso a listas de reputação
- Chave da API ET Intelligence a partir de https://etadmin.proofpoint.com/api-access
- Acesso privilegiado à AWS (S3, IAM, Lambda e EventBridge)
Recolha os pré-requisitos do Emerging Threats Pro
- Inicie sessão no ET Intelligence Admin Portal em https://etadmin.proofpoint.com
- Aceda a Acesso à API
- Copie e guarde a sua chave da API
- Contacte o seu representante da Proofpoint para obter:
- URL da lista de reputação de IPs detalhada
- URL da lista de reputação do domínio detalhada
A ET Intelligence fornece ficheiros CSV separados para listas de reputação de IP e de domínio, atualizados de hora a hora. Use o formato "detalhado", que inclui estas colunas:
* Lista de domínios: Domain Name, Category, Score, First Seen, Last Seen, Ports
* Lista de IPs: IP Address, Category, Score, First Seen, Last Seen, Ports
Configure o contentor do AWS S3 e o IAM
Crie um contentor do S3
- Abra a consola do Amazon S3
- Clique em Criar contentor
- Nome do contentor: introduza
et-pro-ioc-bucket(ou o nome preferido) - Região: selecione a sua região preferida
- Clique em Criar contentor
Crie um utilizador do IAM para o Google SecOps
- Abra a consola do IAM
- Clique em Utilizadores > Criar utilizador
- Nome de utilizador: introduza
secops-reader - Clique em Seguinte
- Selecione Anexar políticas diretamente
- Clique em Criar política
No editor JSON, introduza a seguinte política:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::et-pro-ioc-bucket/et-pro-ioc/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::et-pro-ioc-bucket", "Condition": { "StringLike": { "s3:prefix": ["et-pro-ioc/*"] } } } ] }Atribua um nome à política
SecOpsReaderPolicy.Clique em Criar política.
Regresse à criação de utilizadores e selecione a política criada recentemente.
Clique em Seguinte > Criar utilizador.
Aceda ao separador Credenciais de segurança.
Clique em Criar chave de acesso.
Selecione Serviço de terceiros.
Clique em Criar chave de acesso.
Transfira e guarde as credenciais.
Configure a função IAM para o Lambda
- Na consola do AWS, aceda a IAM > Funções > Criar função.
- Selecione Serviço AWS > Lambda.
- Clicar em Seguinte.
- Clique em Criar política.
Selecione o separador JSON e introduza o seguinte:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::et-pro-ioc-bucket/et-pro-ioc/*" }, { "Sid": "AllowStateManagement", "Effect": "Allow", "Action": ["s3:GetObject", "s3:PutObject"], "Resource": "arn:aws:s3:::et-pro-ioc-bucket/et-pro-ioc/state.json" } ] }Atribua um nome à política
EtProIocLambdaPolicy.Clique em Criar política.
Regresse à criação de funções e anexe a política.
Atribua um nome à função
EtProIocLambdaRole.Clique em Criar função.
Crie a função Lambda
- Na consola da AWS, aceda a Lambda > Functions > Create function.
- Clique em Criar do zero.
Faculte os seguintes detalhes de configuração:
- Nome da função:
et-pro-ioc-fetcher - Tempo de execução: Python 3.13
- Arquitetura: x86_64
- Função de execução: usar função existente
EtProIocLambdaRole
- Nome da função:
Após a criação, aceda ao separador Código e substitua-o pelo seguinte:
#!/usr/bin/env python3 # Lambda: Fetch ET Pro IOC reputation lists and write raw CSV to S3 import os import time import json from datetime import datetime from urllib.request import Request, urlopen from urllib.error import HTTPError, URLError import boto3 # Environment variables BUCKET = os.environ["S3_BUCKET"] PREFIX = os.environ.get("S3_PREFIX", "et-pro-ioc/").strip("/") ET_API_KEY = os.environ["ET_API_KEY"] ET_IP_LIST_URL = os.environ["ET_IP_LIST_URL"] ET_DOMAIN_LIST_URL = os.environ["ET_DOMAIN_LIST_URL"] STATE_KEY = os.environ.get("STATE_KEY", f"{PREFIX}/state.json") TIMEOUT = int(os.environ.get("TIMEOUT", "120")) s3 = boto3.client("s3") def _build_request(url: str) -> Request: """Build request with ET API authentication""" if not url.lower().startswith("https://"): raise ValueError("Only HTTPS URLs are allowed") req = Request(url, method="GET") # ET Intelligence uses Authorization header with API key req.add_header("Authorization", ET_API_KEY) return req def fetch_with_retry(url: str, max_retries: int = 3) -> bytes: """Fetch URL with retry logic for rate limits""" for attempt in range(max_retries): try: req = _build_request(url) with urlopen(req, timeout=TIMEOUT) as response: if response.status == 200: return response.read() elif response.status == 429: # Rate limited, wait and retry wait_time = min(30 * (2 ** attempt), 300) print(f"Rate limited, waiting {wait_time}s...") time.sleep(wait_time) else: raise HTTPError(url, response.status, response.reason, {}, None) except URLError as e: if attempt == max_retries - 1: raise time.sleep(5 * (attempt + 1)) raise Exception(f"Failed to fetch {url} after {max_retries} attempts") def save_to_s3(key: str, content: bytes): """Save content to S3 with appropriate content type""" s3.put_object( Bucket=BUCKET, Key=key, Body=content, ContentType="text/csv" ) print(f"Saved {len(content)} bytes to s3://{BUCKET}/{key}") def get_state(): """Get last fetch state from S3""" try: response = s3.get_object(Bucket=BUCKET, Key=STATE_KEY) return json.loads(response['Body'].read()) except: return {} def save_state(state: dict): """Save fetch state to S3""" s3.put_object( Bucket=BUCKET, Key=STATE_KEY, Body=json.dumps(state, indent=2), ContentType="application/json" ) def lambda_handler(event, context): """Main Lambda handler""" print("Starting ET Pro IOC fetch") # Generate timestamp for file naming now = datetime.utcnow() timestamp = now.strftime("%Y/%m/%d/%H%M%S") results = [] errors = [] # Fetch IP reputation list try: print(f"Fetching IP reputation list...") ip_data = fetch_with_retry(ET_IP_LIST_URL) ip_key = f"{PREFIX}/ip/{timestamp}.csv" save_to_s3(ip_key, ip_data) results.append({"type": "ip", "key": ip_key, "size": len(ip_data)}) except Exception as e: error_msg = f"Failed to fetch IP list: {str(e)}" print(error_msg) errors.append(error_msg) # Fetch Domain reputation list try: print(f"Fetching Domain reputation list...") domain_data = fetch_with_retry(ET_DOMAIN_LIST_URL) domain_key = f"{PREFIX}/domain/{timestamp}.csv" save_to_s3(domain_key, domain_data) results.append({"type": "domain", "key": domain_key, "size": len(domain_data)}) except Exception as e: error_msg = f"Failed to fetch Domain list: {str(e)}" print(error_msg) errors.append(error_msg) # Save state state = { "last_fetch": now.isoformat(), "results": results, "errors": errors } save_state(state) return { "statusCode": 200 if not errors else 207, "body": json.dumps(state) }Aceda a Configuração > Configuração geral.
Clique em Edit.
Defina o Limite de tempo para 5 minutos.
Clique em Guardar.
Configure variáveis de ambiente
- Aceda a Configuração > Variáveis de ambiente.
- Clique em Editar > Adicionar variável de ambiente.
Adicione as seguintes variáveis:
Chave Valor S3_BUCKETet-pro-ioc-bucketS3_PREFIXet-pro-iocSTATE_KEYet-pro-ioc/state.jsonET_API_KEY[Your ET API Key]ET_IP_LIST_URL[Your detailed IP list URL]ET_DOMAIN_LIST_URL[Your detailed Domain list URL]TIMEOUT120Clique em Guardar.
Contacte o seu representante da Proofpoint para saber os URLs exatos da sua subscrição. Normalmente, os URLs de formato detalhado seguem este padrão:
* Lista de IPs: https://rules.emergingthreatspro.com/[your-code]/reputation/detailed-iprepdata.txt
* Lista de domínios: https://rules.emergingthreatspro.com/[your-code]/reputation/detailed-domainrepdata.txt
Crie uma programação do EventBridge
- Aceda a Amazon EventBridge > Schedules > Create schedule
- Nome da programação:
et-pro-ioc-hourly - Padrão de horário: horário baseado na taxa
- Taxa de expressão: 1 hora
- Clique em Seguinte
- Destino: função Lambda
- Função:
et-pro-ioc-fetcher - Clique em Seguinte nos restantes passos
- Clique em Criar horário
Configure feeds no Google SecOps
Tem de criar dois feeds separados: um para a reputação de IP e outro para a reputação de domínio.
Crie um feed de reputação de IP
- Aceda a Definições do SIEM > Feeds
- Clique em Adicionar novo
- No campo Nome do feed, introduza
ET Pro IOC - IP Reputation - Na lista Tipo de origem, selecione Amazon S3
- Selecione Emerging Threats Pro como o Tipo de registo
- Clique em Seguinte
- Especifique valores para os seguintes parâmetros de entrada:
- URI do S3:
s3://et-pro-ioc-bucket/et-pro-ioc/ip/ - Opções de eliminação de origens: selecione de acordo com a sua preferência
- Idade máxima do ficheiro: inclua ficheiros modificados no último número de dias. A predefinição é 180 dias.
- ID da chave de acesso: chave de acesso de leitura do SecOps
- Chave de acesso secreta: chave secreta do leitor do SecOps
- Espaço de nomes do recurso: o espaço de nomes do recurso.
- Etiquetas de carregamento: a etiqueta aplicada aos eventos deste feed.
- URI do S3:
- Clique em Seguinte
- Reveja e clique em Enviar
Crie um feed de reputação do domínio
- Repita o processo de criação do feed.
- No campo Nome do feed, introduza
ET Pro IOC - Domain Reputation. - Na lista Tipo de origem, selecione Amazon S3.
- Selecione Emerging Threats Pro como o Tipo de registo.
- Clicar em Seguinte.
- Especifique valores para os seguintes parâmetros de entrada:
- URI do S3:
s3://et-pro-ioc-bucket/et-pro-ioc/domain/ - Opções de eliminação de origens: selecione de acordo com a sua preferência
- Idade máxima do ficheiro: inclua ficheiros modificados no último número de dias. A predefinição é 180 dias.
- ID da chave de acesso: chave de acesso de leitura do SecOps
- Chave de acesso secreta: chave secreta do leitor do SecOps
- Espaço de nomes do recurso: o espaço de nomes do recurso.
- Etiquetas de carregamento: a etiqueta aplicada aos eventos deste feed.
- URI do S3:
- Clique em Seguinte
- Reveja e clique em Enviar
Tabela de mapeamento da UDM
| Campo de registo | Mapeamento do UDM | Lógica |
|---|---|---|
| categoria | Este campo é usado na lógica do analisador, mas não é mapeado diretamente para o UDM. Determina o valor de event.ioc.categorization através de uma tabela de pesquisa. |
|
| collection_time.nanos | event.idm.entity.metadata.collected_timestamp.nanos | Mapeado diretamente a partir do registo não processado. |
| collection_time.seconds | event.idm.entity.metadata.collected_timestamp.seconds | Mapeado diretamente a partir do registo não processado. |
| dados | Este campo é analisado em vários campos da UDM com base no respetivo conteúdo. | |
| first_seen | event.idm.entity.metadata.interval.start_time | Analisado como uma data e mapeado para o MDU. |
| first_seen | event.ioc.active_timerange.start | Analisado como uma data e mapeado para o MDU. |
| ip_or_domain | event.idm.entity.entity.hostname | Mapeado para o UDM se o padrão grok extrair um anfitrião do campo. |
| ip_or_domain | event.idm.entity.entity.ip | Mapeado para o UDM se o padrão grok não extrair um anfitrião do campo. |
| ip_or_domain | event.ioc.domain_and_ports.domain | Mapeado para o UDM se o padrão grok extrair um anfitrião do campo. |
| ip_or_domain | event.ioc.ip_and_ports.ip_address | Mapeado para o UDM se o padrão grok não extrair um anfitrião do campo. |
| last_seen | event.idm.entity.metadata.interval.end_time | Analisado como uma data e mapeado para o MDU. |
| last_seen | event.ioc.active_timerange.end | Analisado como uma data e mapeado para o MDU. |
| portas | event.idm.entity.entity.labels.value | Analisado, unido com um delimitador de vírgula e mapeado para o UDM se existirem várias portas. |
| portas | event.idm.entity.entity.port | Analisa e mapeia para o UDM se existir apenas uma porta. |
| portas | event.ioc.domain_and_ports.ports | Analisa e mapeia para o UDM se o padrão grok extrair um anfitrião do campo. |
| portas | event.ioc.ip_and_ports.ports | Analisa e mapeia para o UDM se o padrão grok não extrair um anfitrião do campo. |
| pontuação | event.ioc.confidence_score | Mapeado diretamente a partir do registo não processado. |
| event.idm.entity.entity.labels.key | Definido como "ports" se existirem várias portas. | |
| event.idm.entity.metadata.entity_type | Defina como "DOMAIN_NAME" se o padrão grok extrair um anfitrião do campo ip_or_domain. Caso contrário, defina como "IP_ADDRESS". |
|
| event.idm.entity.metadata.threat.category | Definido como "SOFTWARE_MALICIOUS". | |
| event.idm.entity.metadata.threat.category_details | Derivado do campo category através de uma tabela de pesquisa. |
|
| event.idm.entity.metadata.threat.threat_name | Definir como "ET Intelligence Rep List". | |
| event.idm.entity.metadata.vendor_name | Definido como "ET_PRO_IOC". | |
| event.ioc.feed_name | Definir como "ET Intelligence Rep List". | |
| event.ioc.raw_severity | Definido como "Malicioso". | |
| timestamp.nanos | Copiado de collection_time.nanos. |
|
| timestamp.seconds | Copiado de collection_time.seconds. |
Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais da Google SecOps.