Coletar arquivos CSV de indicadores personalizados de comprometimento
Este documento explica como ingerir arquivos CSV de IOC personalizados no Google Security Operations usando o Google Cloud Storage. Em seguida, ele mapeia esses campos para a UDM, processando vários tipos de dados, como IPs, domínios e hashes, e enriquecendo a saída com detalhes de ameaças, informações de entidades e níveis de gravidade.
Antes de começar
Verifique se você atende os seguintes pré-requisitos:
- Uma instância do Google SecOps
- Um projeto do GCP com a API Cloud Storage ativada
- Permissões para criar e gerenciar buckets do GCS
- Permissões para gerenciar políticas do IAM em buckets do GCS
- Permissões para criar serviços do Cloud Run, tópicos do Pub/Sub e jobs do Cloud Scheduler
- Acesso a um ou mais URLs de feed de IOC CSV (HTTPS) ou a um endpoint interno que veicula CSV
Criar um bucket do Google Cloud Storage
- Acesse o Console do Google Cloud.
- Selecione seu projeto ou crie um novo.
- No menu de navegação, acesse Cloud Storage > Buckets.
- Clique em Criar bucket.
Informe os seguintes detalhes de configuração:
Configuração Valor Nomeie seu bucket Insira um nome exclusivo globalmente, por exemplo, csv-ioc-logs.Tipo de local Escolha com base nas suas necessidades (região, birregional, multirregional) Local Selecione o local (por exemplo, us-central1).Classe de armazenamento Padrão (recomendado para registros acessados com frequência) Controle de acesso Uniforme (recomendado) Ferramentas de proteção Opcional: ativar o controle de versões de objetos ou a política de retenção Clique em Criar.
Criar uma conta de serviço para a função do Cloud Run
A função do Cloud Run precisa de uma conta de serviço com permissões para gravar no bucket do GCS e ser invocada pelo Pub/Sub.
Criar conta de serviço
- No Console do GCP, acesse IAM e administrador > Contas de serviço.
- Clique em Criar conta de serviço.
- Informe os seguintes detalhes de configuração:
- Nome da conta de serviço: insira
csv-ioc-collector-sa. - Descrição da conta de serviço: insira
Service account for Cloud Run function to collect CSV IOC files.
- Nome da conta de serviço: insira
- Clique em Criar e continuar.
- Na seção Conceder acesso a essa conta de serviço ao projeto, adicione os seguintes papéis:
- Clique em Selecionar papel.
- Pesquise e selecione Administrador de objetos do Storage.
- Clique em + Adicionar outro papel.
- Pesquise e selecione Invocador do Cloud Run.
- Clique em + Adicionar outro papel.
- Pesquise e selecione Invocador do Cloud Functions.
- Clique em Continuar.
- Clique em Concluído.
Esses papéis são necessários para:
- Administrador de objetos do Storage: gravar registros em um bucket do GCS
- Invocador do Cloud Run: permite que o Pub/Sub invoque a função
- Invocador do Cloud Functions: permite a invocação de funções
Conceder permissões do IAM no bucket do GCS
Conceda permissões de gravação à conta de serviço no bucket do GCS:
- Acesse Cloud Storage > Buckets.
- Clique no nome do bucket.
- Acesse a guia Permissões.
- Clique em Conceder acesso.
- Informe os seguintes detalhes de configuração:
- Adicionar principais: insira o e-mail da conta de serviço (por exemplo,
csv-ioc-collector-sa@PROJECT_ID.iam.gserviceaccount.com). - Atribuir papéis: selecione Administrador de objetos do Storage.
- Adicionar principais: insira o e-mail da conta de serviço (por exemplo,
- Clique em Salvar.
Criar tópico Pub/Sub
Crie um tópico do Pub/Sub em que o Cloud Scheduler vai publicar e a função do Cloud Run vai se inscrever.
- No Console do GCP, acesse Pub/Sub > Tópicos.
- Selecione Criar tópico.
- Informe os seguintes detalhes de configuração:
- ID do tópico: insira
csv-ioc-trigger. - Não altere as outras configurações.
- ID do tópico: insira
- Clique em Criar.
Criar uma função do Cloud Run para coletar arquivos CSV de IOC
A função do Cloud Run é acionada por mensagens do Pub/Sub do Cloud Scheduler para buscar arquivos CSV de indicadores de comprometimento (IOCs, na sigla em inglês) de endpoints HTTPS e gravá-los no GCS.
- No console do GCP, acesse o Cloud Run.
- Clique em Criar serviço.
- Selecione Função (use um editor in-line para criar uma função).
Na seção Configurar, forneça os seguintes detalhes de configuração:
Configuração Valor Nome do serviço csv-ioc-collectorRegião Selecione a região que corresponde ao seu bucket do GCS (por exemplo, us-central1).Ambiente de execução Selecione Python 3.12 ou uma versão mais recente. Na seção Acionador (opcional):
- Clique em + Adicionar gatilho.
- Selecione Cloud Pub/Sub.
- Em Selecionar um tópico do Cloud Pub/Sub, escolha o tópico do Pub/Sub (
csv-ioc-trigger). - Clique em Salvar.
Na seção Autenticação:
- Selecione Exigir autenticação.
- Confira o Identity and Access Management (IAM).
Role a tela para baixo e abra Contêineres, rede, segurança.
Acesse a guia Segurança:
- Conta de serviço: selecione a conta de serviço (
csv-ioc-collector-sa).
- Conta de serviço: selecione a conta de serviço (
Acesse a guia Contêineres:
- Clique em Variáveis e secrets.
- Clique em + Adicionar variável para cada variável de ambiente:
Nome da variável Valor de exemplo Descrição GCS_BUCKETcsv-ioc-logsNome do bucket do GCS GCS_PREFIXcsv-iocPrefixo para arquivos de registro IOC_URLShttps://ioc.example.com/feed.csv,https://another.example.org/iocs.csvURLs HTTPS separados por vírgula AUTH_HEADERAuthorization: Bearer <token>Cabeçalho de autenticação opcional TIMEOUT60Tempo limite da solicitação em segundos Na seção Variáveis e secrets, role a tela para baixo até Solicitações:
- Tempo limite da solicitação: insira
600segundos (10 minutos).
- Tempo limite da solicitação: insira
Acesse a guia Configurações:
- Na seção Recursos:
- Memória: selecione 512 MiB ou mais.
- CPU: selecione 1.
- Clique em Concluído.
- Na seção Recursos:
Na seção Escalonamento de revisão:
- Número mínimo de instâncias: insira
0. - Número máximo de instâncias: insira
100ou ajuste com base na carga esperada.
- Número mínimo de instâncias: insira
Clique em Criar.
Aguarde a criação do serviço (1 a 2 minutos).
Depois que o serviço é criado, o editor de código inline é aberto automaticamente.
Adicionar código da função
- Insira main em Ponto de entrada da função.
No editor de código em linha, crie dois arquivos:
- Primeiro arquivo: main.py::
import functions_framework from google.cloud import storage import json import os import urllib3 from datetime import datetime, timezone import time # Initialize HTTP client with timeouts http = urllib3.PoolManager( timeout=urllib3.Timeout(connect=5.0, read=30.0), retries=False, ) # Initialize Storage client storage_client = storage.Client() @functions_framework.cloud_event def main(cloud_event): """ Cloud Run function triggered by Pub/Sub to fetch CSV IOC feeds over HTTPS and write to GCS. Args: cloud_event: CloudEvent object containing Pub/Sub message """ # Get environment variables bucket_name = os.environ.get('GCS_BUCKET') prefix = os.environ.get('GCS_PREFIX', 'csv-ioc').strip('/') ioc_urls_str = os.environ.get('IOC_URLS', '') auth_header = os.environ.get('AUTH_HEADER', '') timeout = int(os.environ.get('TIMEOUT', '60')) ioc_urls = [u.strip() for u in ioc_urls_str.split(',') if u.strip()] if not bucket_name: print('Error: GCS_BUCKET environment variable is required') return if not ioc_urls: print('Error: IOC_URLS must contain at least one HTTPS URL') return try: # Get GCS bucket bucket = storage_client.bucket(bucket_name) run_ts = int(time.time()) written = [] for i, url in enumerate(ioc_urls): print(f'Processing URL {i+1}/{len(ioc_urls)}: {url}') # Build request req_headers = {'Accept': 'text/csv, */*'} # Add authentication header if provided if auth_header: if ':' in auth_header: k, v = auth_header.split(':', 1) req_headers[k.strip()] = v.strip() else: req_headers['Authorization'] = auth_header.strip() # Fetch data with retries data = fetch_with_retries(url, req_headers, timeout) if data: # Write to GCS key = generate_blob_name(prefix, url, run_ts, i) blob = bucket.blob(key) blob.upload_from_string(data, content_type='text/csv') written.append({ 'url': url, 'gcs_key': key, 'bytes': len(data) }) print(f'Wrote {len(data)} bytes to gs://{bucket_name}/{key}') else: print(f'Warning: No data retrieved from {url}') print(f'Successfully processed {len(written)} URLs') print(json.dumps({'ok': True, 'written': written}, indent=2)) except Exception as e: print(f'Error processing CSV IOC feeds: {str(e)}') raise def fetch_with_retries(url, headers, timeout, max_retries=5): """Fetch data from URL with retry logic for 429/5xx errors.""" if not url.lower().startswith('https://'): raise ValueError('Only HTTPS URLs are allowed in IOC_URLS') attempt = 0 backoff = 1.0 while attempt < max_retries: try: response = http.request('GET', url, headers=headers, timeout=timeout) if response.status == 200: return response.data.decode('utf-8') elif response.status == 429 or (500 <= response.status < 600): print(f'Received status {response.status}, retrying in {backoff}s (attempt {attempt+1}/{max_retries})') time.sleep(backoff) attempt += 1 backoff *= 2 else: print(f'Error: Received unexpected status {response.status} from {url}') return None except Exception as e: if attempt < max_retries - 1: print(f'Request failed: {str(e)}, retrying in {backoff}s (attempt {attempt+1}/{max_retries})') time.sleep(backoff) attempt += 1 backoff *= 2 else: raise print(f'Max retries exceeded for {url}') return None def generate_blob_name(prefix, url, run_ts, idx): """Generate a unique blob name for the CSV file.""" # Create a short, filesystem-safe token for the URL safe_url = url.replace('://', '_').replace('/', '_').replace('?', '_').replace('&', '_')[:100] # Generate timestamp-based path timestamp_path = time.strftime('%Y/%m/%d/%H%M%S', time.gmtime(run_ts)) return f"{prefix}/{timestamp_path}-url{idx:03d}-{safe_url}.csv"- Segundo arquivo: requirements.txt:
functions-framework==3.* google-cloud-storage==2.* urllib3>=2.0.0Clique em Implantar para salvar e implantar a função.
Aguarde a conclusão da implantação (2 a 3 minutos).
Criar o job do Cloud Scheduler
O Cloud Scheduler vai publicar mensagens no tópico do Pub/Sub em intervalos regulares, acionando a função do Cloud Run.
- No Console do GCP, acesse o Cloud Scheduler.
- Clique em Criar job.
Informe os seguintes detalhes de configuração:
Configuração Valor Nome csv-ioc-collector-hourlyRegião Selecione a mesma região da função do Cloud Run Frequência 0 * * * *(a cada hora, na hora)Fuso horário Selecione o fuso horário (UTC recomendado) Tipo de destino Pub/Sub Tópico Selecione o tópico do Pub/Sub ( csv-ioc-trigger).Corpo da mensagem {}(objeto JSON vazio)Clique em Criar.
Opções de frequência de programação
Escolha a frequência com base no volume de registros e nos requisitos de latência:
Frequência Expressão Cron Caso de uso A cada 5 minutos */5 * * * *Alto volume e baixa latência A cada 15 minutos */15 * * * *Volume médio A cada hora 0 * * * *Padrão (recomendado) A cada 6 horas 0 */6 * * *Baixo volume, processamento em lote Diário 0 0 * * *Coleta de dados históricos
Testar a integração
- No console do Cloud Scheduler, encontre seu job (
csv-ioc-collector-hourly). - Clique em Executar à força para acionar o job manualmente.
- Aguarde alguns segundos.
- Acesse Cloud Run > Serviços.
- Clique no nome da função (
csv-ioc-collector). - Clique na guia Registros.
Verifique se a função foi executada com sucesso. Procure o seguinte:
Processing URL 1/X: https://... Wrote X bytes to gs://csv-ioc-logs/csv-ioc/YYYY/MM/DD/HHMMSS-url000-...csv Successfully processed X URLsAcesse Cloud Storage > Buckets.
Clique no nome do bucket (
csv-ioc-logs).Navegue até a pasta de prefixo (
csv-ioc/).Verifique se os novos arquivos
.csvforam criados com o carimbo de data/hora atual.
Se você encontrar erros nos registros:
- HTTP 401/403: verifique a variável de ambiente AUTH_HEADER
- HTTP 429: limitação de taxa. A função vai tentar novamente automaticamente com espera.
- Variáveis de ambiente ausentes: verifique se todas as variáveis necessárias estão definidas.
- Somente URLs HTTPS são permitidos: verifique se IOC_URLS contém apenas URLs HTTPS.
Recuperar a conta de serviço do Google SecOps
O Google SecOps usa uma conta de serviço exclusiva para ler dados do seu bucket do GCS. Você precisa conceder a essa conta de serviço acesso ao seu bucket.
Receber o e-mail da conta de serviço
- Acesse Configurações do SIEM > Feeds.
- Clique em Adicionar novo feed.
- Clique em Configurar um único feed.
- No campo Nome do feed, insira um nome para o feed (por exemplo,
CSV Custom IOC). - Selecione Google Cloud Storage V2 como o Tipo de origem.
- Selecione IOC personalizado em CSV como o Tipo de registro.
Clique em Receber conta de serviço. Um e-mail exclusivo da conta de serviço é exibido, por exemplo:
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.comCopie esse endereço de e-mail para usar na próxima etapa.
Conceder permissões do IAM à conta de serviço do Google SecOps
A conta de serviço do Google SecOps precisa do papel de Leitor de objetos do Storage no seu bucket do GCS.
- Acesse Cloud Storage > Buckets.
- Clique no nome do bucket (
csv-ioc-logs). - Acesse a guia Permissões.
- Clique em Conceder acesso.
- Informe os seguintes detalhes de configuração:
- Adicionar participantes: cole o e-mail da conta de serviço do Google SecOps.
- Atribuir papéis: selecione Leitor de objetos do Storage.
Clique em Salvar.
Configurar um feed no Google SecOps para ingerir arquivos CSV de IOC personalizados
- Acesse Configurações do SIEM > Feeds.
- Clique em Adicionar novo feed.
- Clique em Configurar um único feed.
- No campo Nome do feed, insira um nome para o feed (por exemplo,
CSV Custom IOC). - Selecione Google Cloud Storage V2 como o Tipo de origem.
- Selecione IOC personalizado em CSV como o Tipo de registro.
- Clique em Próxima.
Especifique valores para os seguintes parâmetros de entrada:
URL do bucket de armazenamento: insira o URI do bucket do GCS com o caminho do prefixo:
gs://csv-ioc-logs/csv-ioc/Substitua:
csv-ioc-logs: o nome do bucket do GCS.csv-ioc: prefixo/caminho da pasta opcional em que os registros são armazenados.
Exemplos:
- Bucket raiz:
gs://csv-ioc-logs/ - Com prefixo:
gs://csv-ioc-logs/csv-ioc/ - Com subpasta:
gs://csv-ioc-logs/ioc-feeds/
- Bucket raiz:
Opção de exclusão da fonte: selecione a opção de exclusão de acordo com sua preferência:
- Nunca: nunca exclui arquivos após as transferências (recomendado para testes).
- Excluir arquivos transferidos: exclui os arquivos após a transferência bem-sucedida.
Excluir arquivos transferidos e diretórios vazios: exclui arquivos e diretórios vazios após a transferência bem-sucedida.
Idade máxima do arquivo: inclui arquivos modificados no último número de dias. O padrão é de 180 dias.
Namespace do recurso: o namespace do recurso.
Rótulos de ingestão: o rótulo a ser aplicado aos eventos deste feed.
Clique em Próxima.
Revise a nova configuração do feed na tela Finalizar e clique em Enviar.
Tabela de mapeamento do UDM
| Campo de registro | Mapeamento do UDM | Lógica |
|---|---|---|
| asn | entity.metadata.threat.detection_fields.asn_label.value | Mapeado diretamente do campo "asn". |
| categoria | entity.metadata.threat.category_details | Mapeado diretamente do campo "category". |
| classificação | entity.metadata.threat.category_details | Adicionada a "classification - " e mapeada para o campo "entity.metadata.threat.category_details". |
| column2 | entity.entity.hostname | Mapeado para "entity.entity.hostname" se [category] corresponder a ". ?ip" ou ". ?proxy" e [not_ip] for verdadeiro. |
| column2 | entity.entity.ip | Unido a "entity.entity.ip" se [category] corresponder a ". ?ip" ou ". ?proxy" e [not_ip] for falso. |
| confiança | entity.metadata.threat.confidence_score | Convertido para ponto flutuante e mapeado para o campo "entity.metadata.threat.confidence_score". |
| país | entity.entity.location.country_or_region | Mapeado diretamente do campo "country". |
| date_first | entity.metadata.threat.first_discovered_time | Analisado como ISO8601 e mapeado para o campo "entity.metadata.threat.first_discovered_time". |
| date_last | entity.metadata.threat.last_updated_time | Analisado como ISO8601 e mapeado para o campo "entity.metadata.threat.last_updated_time". |
| detalhe | entity.metadata.threat.summary | Mapeado diretamente do campo "detail". |
| detail2 | entity.metadata.threat.description | Mapeado diretamente do campo "detail2". |
| domínio | entity.entity.hostname | Mapeado diretamente do campo "domain". |
| entity.entity.user.email_addresses | Unido ao campo "entity.entity.user.email_addresses". | |
| ID | entity.metadata.product_entity_id | Adicionado a "id - " e mapeado para o campo "entity.metadata.product_entity_id". |
| import_session_id | entity.metadata.threat.detection_fields.import_session_id_label.value | Mapeado diretamente do campo "import_session_id". |
| itype | entity.metadata.threat.detection_fields.itype_label.value | Mapeado diretamente do campo "itype". |
| lat | entity.entity.location.region_latitude | Convertido para ponto flutuante e mapeado para o campo "entity.entity.location.region_latitude". |
| lon | entity.entity.location.region_longitude | Convertido para ponto flutuante e mapeado para o campo "entity.entity.location.region_longitude". |
| maltype | entity.metadata.threat.detection_fields.maltype_label.value | Mapeado diretamente do campo "maltype". |
| md5 | entity.entity.file.md5 | Mapeado diretamente do campo "md5". |
| mídia | entity.metadata.threat.detection_fields.media_label.value | Mapeado diretamente do campo "media". |
| media_type | entity.metadata.threat.detection_fields.media_type_label.value | Mapeado diretamente do campo "media_type". |
| org | entity.metadata.threat.detection_fields.org_label.value | Mapeado diretamente do campo "org". |
| resource_uri | entity.entity.url | Mapeado para "entity.entity.url" se [itype] não corresponder a "(ip |
| resource_uri | entity.metadata.threat.url_back_to_product | Mapeado para "entity.metadata.threat.url_back_to_product" se [itype] corresponder a "(ip |
| score | entity.metadata.threat.confidence_details | Mapeado diretamente do campo "score". |
| gravidade, | entity.metadata.threat.severity | Convertido para maiúsculas e mapeado para o campo "entity.metadata.threat.severity" se corresponder a "LOW", "MEDIUM", "HIGH" ou "CRITICAL". |
| source | entity.metadata.threat.detection_fields.source_label.value | Mapeado diretamente do campo "source". |
| source_feed_id | entity.metadata.threat.detection_fields.source_feed_id_label.value | Mapeado diretamente do campo "source_feed_id". |
| srcip | entity.entity.ip | Unido a "entity.entity.ip" se [srcip] não estiver vazio e não for igual a [value]. |
| estado | entity.metadata.threat.detection_fields.state_label.value | Mapeado diretamente do campo "state". |
| trusted_circle_ids | entity.metadata.threat.detection_fields.trusted_circle_ids_label.value | Mapeado diretamente do campo "trusted_circle_ids". |
| update_id | entity.metadata.threat.detection_fields.update_id_label.value | Mapeado diretamente do campo "update_id". |
| valor | entity.entity.file.full_path | Mapeado para "entity.entity.file.full_path" se [category] corresponder a ".*?file". |
| valor | entity.entity.file.md5 | Mapeado para "entity.entity.file.md5" se [category] corresponder a ".*?md5" e [value] for uma string hexadecimal de 32 caracteres. |
| valor | entity.entity.file.sha1 | Mapeado para "entity.entity.file.sha1" se ([category] corresponder a ". ?md5" e [value] for uma string hexadecimal de 40 caracteres) ou ([category] corresponder a ". ?sha1" e [value] for uma string hexadecimal de 40 caracteres). |
| valor | entity.entity.file.sha256 | Mapeado para "entity.entity.file.sha256" se ([category] corresponder a ". ?md5" e [value] for uma string hexadecimal e [file_type] não for "md5") ou ([category] corresponder a ". ?sha256" e [value] for uma string hexadecimal). |
| valor | entity.entity.hostname | Mapeado para "entity.entity.hostname" se ([category] corresponder a ". ?domain") ou ([category] corresponder a ". ?ip" ou ".*?proxy" e [not_ip] for verdadeiro). |
| valor | entity.entity.url | Mapeado para "entity.entity.url" se ([category] corresponder a ".*?url") ou ([category] corresponder a "url" e [resource_uri] não estiver vazio). |
| N/A | entity.metadata.collected_timestamp | Preenchido com o carimbo de data/hora do evento. |
| N/A | entity.metadata.interval.end_time | Definido como um valor constante de 253402300799 segundos. |
| N/A | entity.metadata.interval.start_time | Preenchido com o carimbo de data/hora do evento. |
| N/A | entity.metadata.vendor_name | Definido como um valor constante de "IOC personalizado". |
Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais do Google SecOps.