Coletar registros do Elasticsearch
Este documento explica como ingerir registros do Elasticsearch no Google Security Operations usando o Amazon S3. O analisador transforma registros brutos formatados em JSON em um modelo de dados unificado (UDM). Ele extrai campos de estruturas JSON aninhadas, mapeia para campos do UDM e enriquece os dados com contexto relevante para a segurança, como níveis de gravidade e papéis de usuário.
Antes de começar
- Uma instância do Google SecOps
- Acesso privilegiado à administração do cluster do Elasticsearch
- Acesso privilegiado à AWS (S3, IAM, EC2)
- Instância do EC2 ou host persistente para executar o Logstash
Conferir os pré-requisitos do Elasticsearch
- Faça login no seu cluster do Elasticsearch como administrador.
- Verifique se sua assinatura do Elasticsearch inclui os recursos de segurança (necessários para o registro de auditoria).
- Anote o nome e a versão do cluster do Elasticsearch para referência.
- Identifique o caminho em que os registros de auditoria serão gravados (padrão:
$ES_HOME/logs/<clustername>_audit.json
).
Ativar o registro de auditoria do Elasticsearch
- Em cada nó do Elasticsearch, edite o arquivo de configuração elasticsearch.yml.
Adicione a seguinte configuração:
xpack.security.audit.enabled: true
Faça uma reinicialização gradual do cluster para aplicar as mudanças:
- Desativar a alocação de fragmentos:
PUT _cluster/settings {"persistent": {"cluster.routing.allocation.enable": "primaries"}}
- Pare e reinicie cada nó, um de cada vez.
- Reative a alocação de fragmentos:
PUT _cluster/settings {"persistent": {"cluster.routing.allocation.enable": null}}
- Desativar a alocação de fragmentos:
Verifique se os registros de auditoria estão sendo gerados em
<clustername>_audit.json
no diretório de registros.
Configurar o bucket do AWS S3 e o IAM para o Google SecOps
- Crie um bucket do Amazon S3 seguindo este guia do usuário: Como criar um bucket
- Salve o Nome e a Região do bucket para referência futura (por exemplo,
elastic-search-logs
). - Crie um usuário seguindo este guia: Como criar um usuário do IAM.
- Selecione o usuário criado.
- Selecione a guia Credenciais de segurança.
- Clique em Criar chave de acesso na seção Chaves de acesso.
- Selecione Serviço de terceiros como Caso de uso.
- Clique em Próxima.
- Opcional: adicione uma tag de descrição.
- Clique em Criar chave de acesso.
- Clique em Fazer o download do arquivo CSV para salvar a chave de acesso e a chave de acesso secreta para referência futura.
- Clique em Concluído.
- Selecione a guia Permissões.
- Clique em Adicionar permissões na seção Políticas de permissões.
- Selecione Adicionar permissões.
- Selecione Anexar políticas diretamente.
- Pesquise a política AmazonS3FullAccess.
- Selecione a política.
- Clique em Próxima.
- Clique em Adicionar permissões
Configurar o Logstash para enviar registros de auditoria ao S3
- Instale o Logstash em uma instância do EC2 ou em um host permanente que possa acessar os arquivos de registro de auditoria do Elasticsearch.
Instale o plug-in de saída do S3, se ainda não estiver presente:
bin/logstash-plugin install logstash-output-s3
Crie um arquivo de configuração do Logstash (
elastic-to-s3.conf
):input { file { path => "/path/to/elasticsearch/logs/*_audit.json" start_position => "beginning" codec => "json" # audit file: 1 JSON object per line sincedb_path => "/var/lib/logstash/sincedb_elastic_search" exclude => ["*.gz"] } } filter { # Intentionally minimal: do NOT reshape audit JSON the ELASTIC_SEARCH parser expects. # If you must add metadata for ops, put it under [@metadata] so it won't be written. # ruby { code => "event.set('[@metadata][ingested_at]', Time.now.utc.iso8601)" } } output { s3 { access_key_id => "YOUR_AWS_ACCESS_KEY" secret_access_key => "YOUR_AWS_SECRET_KEY" region => "us-east-1" bucket => "elastic-search-logs" prefix => "logs/%{+YYYY}/%{+MM}/%{+dd}/" codec => "json_lines" # NDJSON output (1 JSON object per line) encoding => "gzip" # compress objects server_side_encryption => true # Optionally for KMS: # server_side_encryption_kms_key_id => "arn:aws:kms:REGION:ACCT:key/KEY_ID" size_file => 104857600 # 100MB rotation time_file => 300 # 5 min rotation } }
Inicie o Logstash com a configuração:
bin/logstash -f elastic-to-s3.conf
Opcional: criar um usuário do IAM com acesso somente leitura para o Google SecOps
- Acesse Console da AWS > IAM > Usuários > Adicionar usuários.
- Clique em Add users.
- Informe os seguintes detalhes de configuração:
- Usuário: insira
secops-reader
. - Tipo de acesso: selecione Chave de acesso – Acesso programático.
- Usuário: insira
- Clique em Criar usuário.
- Anexe a política de leitura mínima (personalizada): Usuários > secops-reader > Permissões > Adicionar permissões > Anexar políticas diretamente > Criar política.
No editor JSON, insira a seguinte política:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::elastic-search-logs/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::elastic-search-logs" } ] }
Defina o nome como
secops-reader-policy
.Acesse Criar política > pesquise/selecione > Próxima > Adicionar permissões.
Acesse Credenciais de segurança > Chaves de acesso > Criar chave de acesso.
Faça o download do CSV (esses valores são inseridos no feed).
Configurar um feed no Google SecOps para ingerir registros do Elasticsearch
- Acesse Configurações do SIEM > Feeds.
- Clique em + Adicionar novo feed.
- No campo Nome do feed, insira um nome para o feed (por exemplo,
Elasticsearch Logs
). - Selecione Amazon S3 V2 como o Tipo de origem.
- Selecione Pesquisa elástica como o Tipo de registro.
- Clique em Próxima.
- Especifique valores para os seguintes parâmetros de entrada:
- URI do S3:
s3://elastic-search-logs/logs/
- Opções de exclusão de fontes: selecione a opção de exclusão de acordo com sua preferência.
- Idade máxima do arquivo: inclui arquivos modificados no último número de dias. O padrão é de 180 dias.
- ID da chave de acesso: chave de acesso do usuário com acesso ao bucket do S3.
- Chave de acesso secreta: chave secreta do usuário com acesso ao bucket do S3.
- Namespace do recurso: o namespace do recurso.
- Rótulos de ingestão: o rótulo aplicado aos eventos deste feed.
- URI do S3:
- Clique em Próxima.
- Revise a nova configuração do feed na tela Finalizar e clique em Enviar.
Tabela de mapeamento do UDM
Campo de registro | Mapeamento do UDM | Lógica |
---|---|---|
Nível | security_result.severity | A lógica verifica o valor do campo "Nível" e o mapeia para o nível de gravidade correspondente da UDM: - "INFO", "ALL", "OFF", "TRACE", "DEBUG" são mapeados para "INFORMATIONAL". - "WARN" é mapeado como "LOW". : "ERROR" é mapeado para "ERROR". : "FATAL" é mapeado como "CRITICAL". |
message.@timestamp | timestamp | O carimbo de data/hora é analisado do campo "@timestamp" no campo "message" do registro bruto, usando o formato "aaaa-MM-ddTHH:mm:ss.SSS". |
message.action | security_result.action_details | O valor é extraído do campo "action" dentro do campo "message" do registro bruto. |
message.event.action | security_result.summary | O valor é extraído do campo "event.action" no campo "message" do registro bruto. |
message.event.type | metadata.product_event_type | O valor é extraído do campo "event.type" no campo "message" do registro bruto. |
message.host.ip | target.ip | O valor é extraído do campo "host.ip" dentro do campo "message" do registro bruto. |
message.host.name | target.hostname | O valor é extraído do campo "host.name" dentro do campo "message" do registro bruto. |
message.indices | target.labels.value | O valor é extraído do campo "indices" dentro do campo "message" do registro bruto. |
message.mrId | target.hostname | O valor é extraído do campo "mrId" dentro do campo "message" do registro bruto. |
message.node.id | principal.asset.product_object_id | O valor é extraído do campo "node.id" dentro do campo "message" do registro bruto. |
message.node.name | target.asset.hostname | O valor é extraído do campo "node.name" dentro do campo "message" do registro bruto. |
message.origin.address | principal.ip | O endereço IP é extraído do campo "origin.address" dentro do campo "message" do registro bruto, removendo o número da porta. |
message.origin.type | principal.resource.resource_subtype | O valor é extraído do campo "origin.type" dentro do campo "message" do registro bruto. |
message.properties.host_group | principal.hostname | O valor é extraído do campo "properties.host_group" dentro do campo "message" do registro bruto. |
message.properties.host_group | target.group.group_display_name | O valor é extraído do campo "properties.host_group" dentro do campo "message" do registro bruto. |
message.request.id | target.resource.product_object_id | O valor é extraído do campo "request.id" dentro do campo "message" do registro bruto. |
message.request.name | target.resource.name | O valor é extraído do campo "request.name" dentro do campo "message" do registro bruto. |
message.user.name | principal.user.userid | O valor é extraído do campo "user.name" dentro do campo "message" do registro bruto. |
message.user.realm | principal.user.attribute.permissions.name | O valor é extraído do campo "user.realm" dentro do campo "message" do registro bruto. |
message.user.roles | about.user.attribute.roles.name | O valor é extraído do campo "user.roles" dentro do campo "message" do registro bruto. |
metadata.event_type | Valor codificado: "USER_RESOURCE_ACCESS" | |
metadata.log_type | Valor codificado: "ELASTIC_SEARCH" | |
metadata.product_name | Valor fixado no código: "ELASTICSEARCH" | |
metadata.vendor_name | Valor codificado: "ELASTIC" | |
principal.port | O número da porta é extraído do campo "origin.address" no campo "message" do registro bruto. | |
target.labels.key | Valor codificado: "Indice" |
Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais do Google SecOps.