Recolha registos do Elasticsearch
Este documento explica como carregar registos do Elasticsearch para o Google Security Operations através do Amazon S3. O analisador transforma os registos formatados JSON não processados num modelo de dados unificado (UDM). Extrai campos de estruturas JSON aninhadas, mapeia-os para campos UDM e enriquece os dados com contexto relevante para a segurança, como níveis de gravidade e funções de utilizador.
Antes de começar
- Uma instância do Google SecOps
- Acesso privilegiado à administração do cluster do Elasticsearch
- Acesso privilegiado à AWS (S3, IAM, EC2)
- Instância do EC2 ou anfitrião persistente para executar o Logstash
Obtenha os pré-requisitos do Elasticsearch
- Inicie sessão no seu cluster do Elasticsearch como administrador.
- Verifique se a sua subscrição do Elasticsearch inclui Funcionalidades de segurança (obrigatórias para o registo de auditoria).
- Tome nota do nome e da versão do cluster do Elasticsearch para referência.
- Identifique o caminho onde os registos de auditoria vão ser escritos (predefinição:
$ES_HOME/logs/<clustername>_audit.json
).
Ative o registo de auditoria do Elasticsearch
- Em cada nó do Elasticsearch, edite o ficheiro de configuração elasticsearch.yml.
Adicione a seguinte definição:
xpack.security.audit.enabled: true
Faça um reinício contínuo do cluster para aplicar as alterações:
- Desative a atribuição de fragmentos:
PUT _cluster/settings {"persistent": {"cluster.routing.allocation.enable": "primaries"}}
- Pare e reinicie cada nó um de cada vez.
- Reative a atribuição de fragmentos:
PUT _cluster/settings {"persistent": {"cluster.routing.allocation.enable": null}}
- Desative a atribuição de fragmentos:
Verifique se os registos de auditoria estão a ser gerados em
<clustername>_audit.json
no diretório de registos.
Configure o contentor do AWS S3 e o IAM para o Google SecOps
- Crie um contentor do Amazon S3 seguindo este guia do utilizador: Criar um contentor
- Guarde o nome e a região do contentor para referência futura (por exemplo,
elastic-search-logs
). - Crie um utilizador seguindo este guia do utilizador: criar um utilizador do IAM.
- Selecione o utilizador criado.
- Selecione o separador Credenciais de segurança.
- Clique em Criar chave de acesso na secção Chaves de acesso.
- Selecione Serviço de terceiros como Exemplo de utilização.
- Clicar em Seguinte.
- Opcional: adicione uma etiqueta de descrição.
- Clique em Criar chave de acesso.
- Clique em Transferir ficheiro CSV para guardar a chave de acesso e a chave de acesso secreta para referência futura.
- Clique em Concluído.
- Selecione o separador Autorizações.
- Clique em Adicionar autorizações na secção Políticas de autorizações.
- Selecione Adicionar autorizações.
- Selecione Anexar políticas diretamente.
- Pesquise a política AmazonS3FullAccess.
- Selecione a política.
- Clicar em Seguinte.
- Clique em Adicionar autorizações.
Configure o Logstash para enviar registos de auditoria para o S3
- Instale o Logstash numa instância do EC2 ou num anfitrião persistente que possa aceder aos ficheiros de registo de auditoria do Elasticsearch.
Instale o plug-in de saída do S3, se ainda não estiver presente:
bin/logstash-plugin install logstash-output-s3
Crie um ficheiro de configuração do Logstash (
elastic-to-s3.conf
):input { file { path => "/path/to/elasticsearch/logs/*_audit.json" start_position => "beginning" codec => "json" # audit file: 1 JSON object per line sincedb_path => "/var/lib/logstash/sincedb_elastic_search" exclude => ["*.gz"] } } filter { # Intentionally minimal: do NOT reshape audit JSON the ELASTIC_SEARCH parser expects. # If you must add metadata for ops, put it under [@metadata] so it won't be written. # ruby { code => "event.set('[@metadata][ingested_at]', Time.now.utc.iso8601)" } } output { s3 { access_key_id => "YOUR_AWS_ACCESS_KEY" secret_access_key => "YOUR_AWS_SECRET_KEY" region => "us-east-1" bucket => "elastic-search-logs" prefix => "logs/%{+YYYY}/%{+MM}/%{+dd}/" codec => "json_lines" # NDJSON output (1 JSON object per line) encoding => "gzip" # compress objects server_side_encryption => true # Optionally for KMS: # server_side_encryption_kms_key_id => "arn:aws:kms:REGION:ACCT:key/KEY_ID" size_file => 104857600 # 100MB rotation time_file => 300 # 5 min rotation } }
Inicie o Logstash com a configuração:
bin/logstash -f elastic-to-s3.conf
Opcional: crie um utilizador do IAM só de leitura para o Google SecOps
- Aceda a AWS Console > IAM > Users > Add users.
- Clique em Adicionar utilizadores.
- Indique os seguintes detalhes de configuração:
- Utilizador: introduza
secops-reader
. - Tipo de acesso: selecione Chave de acesso – Acesso programático.
- Utilizador: introduza
- Clique em Criar utilizador.
- Anexe a política de leitura mínima (personalizada): Users > secops-reader > Permissions > Add permissions > Attach policies directly > Create policy.
No editor JSON, introduza a seguinte política:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::elastic-search-logs/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::elastic-search-logs" } ] }
Defina o nome como
secops-reader-policy
.Aceda a Criar política > pesquise/selecione > Seguinte > Adicionar autorizações.
Aceda a Credenciais de segurança > Chaves de acesso > Criar chave de acesso.
Transfira o CSV (estes valores são introduzidos no feed).
Configure um feed no Google SecOps para carregar registos do Elasticsearch
- Aceda a Definições do SIEM > Feeds.
- Clique em + Adicionar novo feed.
- No campo Nome do feed, introduza um nome para o feed (por exemplo,
Elasticsearch Logs
). - Selecione Amazon S3 V2 como o Tipo de origem.
- Selecione Elastic Search como o Tipo de registo.
- Clicar em Seguinte.
- Especifique valores para os seguintes parâmetros de entrada:
- URI do S3:
s3://elastic-search-logs/logs/
- Opções de eliminação de origens: selecione a opção de eliminação de acordo com a sua preferência.
- Idade máxima do ficheiro: inclua ficheiros modificados no último número de dias. A predefinição é 180 dias.
- ID da chave de acesso: chave de acesso do utilizador com acesso ao contentor do S3.
- Chave de acesso secreta: chave secreta do utilizador com acesso ao contentor do S3.
- Espaço de nomes do recurso: o espaço de nomes do recurso.
- Etiquetas de carregamento: a etiqueta aplicada aos eventos deste feed.
- URI do S3:
- Clicar em Seguinte.
- Reveja a nova configuração do feed no ecrã Finalizar e, de seguida, clique em Enviar.
Tabela de mapeamento da UDM
Campo de registo | Mapeamento do UDM | Lógica |
---|---|---|
Nível | security_result.severity | A lógica verifica o valor do campo "Nível" e mapeia-o para o nível de gravidade correspondente do UDM: : "INFO", "ALL", "OFF", "TRACE", "DEBUG" são mapeados para "INFORMATIONAL". : "WARN" está mapeado para "LOW". : "ERROR" está mapeado para "ERROR". : "FATAL" está mapeado para "CRITICAL". |
message.@timestamp | timestamp | A data/hora é analisada a partir do campo "@timestamp" no campo "message" do registo não processado, usando o formato "aaaa-MM-ddTHH:mm:ss.SSS". |
message.action | security_result.action_details | O valor é retirado do campo "action" no campo "message" do registo não processado. |
message.event.action | security_result.summary | O valor é retirado do campo "event.action" no campo "message" do registo não processado. |
message.event.type | metadata.product_event_type | O valor é retirado do campo "event.type" no campo "message" do registo não processado. |
message.host.ip | target.ip | O valor é retirado do campo "host.ip" no campo "message" do registo não processado. |
message.host.name | target.hostname | O valor é retirado do campo "host.name" no campo "message" do registo não processado. |
message.indices | target.labels.value | O valor é retirado do campo "indices" no campo "message" do registo não processado. |
message.mrId | target.hostname | O valor é retirado do campo "mrId" no campo "message" do registo não processado. |
message.node.id | principal.asset.product_object_id | O valor é retirado do campo "node.id" no campo "message" do registo não processado. |
message.node.name | target.asset.hostname | O valor é retirado do campo "node.name" no campo "message" do registo não processado. |
message.origin.address | principal.ip | O endereço IP é extraído do campo "origin.address" no campo "message" do registo não processado, removendo o número da porta. |
message.origin.type | principal.resource.resource_subtype | O valor é retirado do campo "origin.type" no campo "message" do registo não processado. |
message.properties.host_group | principal.hostname | O valor é retirado do campo "properties.host_group" no campo "message" do registo não processado. |
message.properties.host_group | target.group.group_display_name | O valor é retirado do campo "properties.host_group" no campo "message" do registo não processado. |
message.request.id | target.resource.product_object_id | O valor é retirado do campo "request.id" no campo "message" do registo não processado. |
message.request.name | target.resource.name | O valor é retirado do campo "request.name" no campo "message" do registo não processado. |
message.user.name | principal.user.userid | O valor é retirado do campo "user.name" no campo "message" do registo não processado. |
message.user.realm | principal.user.attribute.permissions.name | O valor é retirado do campo "user.realm" no campo "message" do registo não processado. |
message.user.roles | about.user.attribute.roles.name | O valor é retirado do campo "user.roles" no campo "message" do registo não processado. |
metadata.event_type | Valor codificado: "USER_RESOURCE_ACCESS" | |
metadata.log_type | Valor codificado: "ELASTIC_SEARCH" | |
metadata.product_name | Valor codificado: "ELASTICSEARCH" | |
metadata.vendor_name | Valor codificado: "ELASTIC" | |
principal.port | O número da porta é extraído do campo "origin.address" no campo "message" do registo não processado. | |
target.labels.key | Valor codificado: "Indice" |
Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais da Google SecOps.