Coletar registros do Elasticsearch

Compatível com:

Este documento explica como ingerir registros do Elasticsearch no Google Security Operations usando o Amazon S3. O analisador transforma registros brutos formatados em JSON em um modelo de dados unificado (UDM). Ele extrai campos de estruturas JSON aninhadas, mapeia para campos do UDM e enriquece os dados com contexto relevante para a segurança, como níveis de gravidade e papéis de usuário.

Antes de começar

  • Uma instância do Google SecOps
  • Acesso privilegiado à administração do cluster do Elasticsearch
  • Acesso privilegiado à AWS (S3, IAM, EC2)
  • Instância do EC2 ou host persistente para executar o Logstash

Conferir os pré-requisitos do Elasticsearch

  1. Faça login no seu cluster do Elasticsearch como administrador.
  2. Verifique se sua assinatura do Elasticsearch inclui os recursos de segurança (necessários para o registro de auditoria).
  3. Anote o nome e a versão do cluster do Elasticsearch para referência.
  4. Identifique o caminho em que os registros de auditoria serão gravados (padrão: $ES_HOME/logs/<clustername>_audit.json).

Ativar o registro de auditoria do Elasticsearch

  1. Em cada nó do Elasticsearch, edite o arquivo de configuração elasticsearch.yml.
  2. Adicione a seguinte configuração:

    xpack.security.audit.enabled: true
    
  3. Faça uma reinicialização gradual do cluster para aplicar as mudanças:

    • Desativar a alocação de fragmentos: PUT _cluster/settings {"persistent": {"cluster.routing.allocation.enable": "primaries"}}
    • Pare e reinicie cada nó, um de cada vez.
    • Reative a alocação de fragmentos: PUT _cluster/settings {"persistent": {"cluster.routing.allocation.enable": null}}
  4. Verifique se os registros de auditoria estão sendo gerados em <clustername>_audit.json no diretório de registros.

Configurar o bucket do AWS S3 e o IAM para o Google SecOps

  1. Crie um bucket do Amazon S3 seguindo este guia do usuário: Como criar um bucket
  2. Salve o Nome e a Região do bucket para referência futura (por exemplo, elastic-search-logs).
  3. Crie um usuário seguindo este guia: Como criar um usuário do IAM.
  4. Selecione o usuário criado.
  5. Selecione a guia Credenciais de segurança.
  6. Clique em Criar chave de acesso na seção Chaves de acesso.
  7. Selecione Serviço de terceiros como Caso de uso.
  8. Clique em Próxima.
  9. Opcional: adicione uma tag de descrição.
  10. Clique em Criar chave de acesso.
  11. Clique em Fazer o download do arquivo CSV para salvar a chave de acesso e a chave de acesso secreta para referência futura.
  12. Clique em Concluído.
  13. Selecione a guia Permissões.
  14. Clique em Adicionar permissões na seção Políticas de permissões.
  15. Selecione Adicionar permissões.
  16. Selecione Anexar políticas diretamente.
  17. Pesquise a política AmazonS3FullAccess.
  18. Selecione a política.
  19. Clique em Próxima.
  20. Clique em Adicionar permissões

Configurar o Logstash para enviar registros de auditoria ao S3

  1. Instale o Logstash em uma instância do EC2 ou em um host permanente que possa acessar os arquivos de registro de auditoria do Elasticsearch.
  2. Instale o plug-in de saída do S3, se ainda não estiver presente:

    bin/logstash-plugin install logstash-output-s3
    
  3. Crie um arquivo de configuração do Logstash (elastic-to-s3.conf):

    input {
      file {
        path => "/path/to/elasticsearch/logs/*_audit.json"
        start_position => "beginning"
        codec => "json"              # audit file: 1 JSON object per line
        sincedb_path => "/var/lib/logstash/sincedb_elastic_search"
        exclude => ["*.gz"]
      }
    }
    
    filter {
      # Intentionally minimal: do NOT reshape audit JSON the ELASTIC_SEARCH parser expects.
      # If you must add metadata for ops, put it under [@metadata] so it won't be written.
      # ruby { code => "event.set('[@metadata][ingested_at]', Time.now.utc.iso8601)" }
    }
    
    output {
      s3 {
        access_key_id => "YOUR_AWS_ACCESS_KEY"
        secret_access_key => "YOUR_AWS_SECRET_KEY"
        region => "us-east-1"
        bucket => "elastic-search-logs"
        prefix => "logs/%{+YYYY}/%{+MM}/%{+dd}/"
    
        codec => "json_lines"        # NDJSON output (1 JSON object per line)
        encoding => "gzip"           # compress objects
    
        server_side_encryption => true
        # Optionally for KMS:
        # server_side_encryption_kms_key_id => "arn:aws:kms:REGION:ACCT:key/KEY_ID"
    
        size_file => 104857600       # 100MB rotation
        time_file => 300             # 5 min rotation
      }
    }
    
  4. Inicie o Logstash com a configuração:

    bin/logstash -f elastic-to-s3.conf
    

Opcional: criar um usuário do IAM com acesso somente leitura para o Google SecOps

  1. Acesse Console da AWS > IAM > Usuários > Adicionar usuários.
  2. Clique em Add users.
  3. Informe os seguintes detalhes de configuração:
    • Usuário: insira secops-reader.
    • Tipo de acesso: selecione Chave de acesso – Acesso programático.
  4. Clique em Criar usuário.
  5. Anexe a política de leitura mínima (personalizada): Usuários > secops-reader > Permissões > Adicionar permissões > Anexar políticas diretamente > Criar política.
  6. No editor JSON, insira a seguinte política:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::elastic-search-logs/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::elastic-search-logs"
        }
      ]
    }
    
  7. Defina o nome como secops-reader-policy.

  8. Acesse Criar política > pesquise/selecione > Próxima > Adicionar permissões.

  9. Acesse Credenciais de segurança > Chaves de acesso > Criar chave de acesso.

  10. Faça o download do CSV (esses valores são inseridos no feed).

Configurar um feed no Google SecOps para ingerir registros do Elasticsearch

  1. Acesse Configurações do SIEM > Feeds.
  2. Clique em + Adicionar novo feed.
  3. No campo Nome do feed, insira um nome para o feed (por exemplo, Elasticsearch Logs).
  4. Selecione Amazon S3 V2 como o Tipo de origem.
  5. Selecione Pesquisa elástica como o Tipo de registro.
  6. Clique em Próxima.
  7. Especifique valores para os seguintes parâmetros de entrada:
    • URI do S3: s3://elastic-search-logs/logs/
    • Opções de exclusão de fontes: selecione a opção de exclusão de acordo com sua preferência.
    • Idade máxima do arquivo: inclui arquivos modificados no último número de dias. O padrão é de 180 dias.
    • ID da chave de acesso: chave de acesso do usuário com acesso ao bucket do S3.
    • Chave de acesso secreta: chave secreta do usuário com acesso ao bucket do S3.
    • Namespace do recurso: o namespace do recurso.
    • Rótulos de ingestão: o rótulo aplicado aos eventos deste feed.
  8. Clique em Próxima.
  9. Revise a nova configuração do feed na tela Finalizar e clique em Enviar.

Tabela de mapeamento do UDM

Campo de registro Mapeamento do UDM Lógica
Nível security_result.severity A lógica verifica o valor do campo "Nível" e o mapeia para o nível de gravidade correspondente da UDM:
- "INFO", "ALL", "OFF", "TRACE", "DEBUG" são mapeados para "INFORMATIONAL".
- "WARN" é mapeado como "LOW".
: "ERROR" é mapeado para "ERROR".
: "FATAL" é mapeado como "CRITICAL".
message.@timestamp timestamp O carimbo de data/hora é analisado do campo "@timestamp" no campo "message" do registro bruto, usando o formato "aaaa-MM-ddTHH:mm:ss.SSS".
message.action security_result.action_details O valor é extraído do campo "action" dentro do campo "message" do registro bruto.
message.event.action security_result.summary O valor é extraído do campo "event.action" no campo "message" do registro bruto.
message.event.type metadata.product_event_type O valor é extraído do campo "event.type" no campo "message" do registro bruto.
message.host.ip target.ip O valor é extraído do campo "host.ip" dentro do campo "message" do registro bruto.
message.host.name target.hostname O valor é extraído do campo "host.name" dentro do campo "message" do registro bruto.
message.indices target.labels.value O valor é extraído do campo "indices" dentro do campo "message" do registro bruto.
message.mrId target.hostname O valor é extraído do campo "mrId" dentro do campo "message" do registro bruto.
message.node.id principal.asset.product_object_id O valor é extraído do campo "node.id" dentro do campo "message" do registro bruto.
message.node.name target.asset.hostname O valor é extraído do campo "node.name" dentro do campo "message" do registro bruto.
message.origin.address principal.ip O endereço IP é extraído do campo "origin.address" dentro do campo "message" do registro bruto, removendo o número da porta.
message.origin.type principal.resource.resource_subtype O valor é extraído do campo "origin.type" dentro do campo "message" do registro bruto.
message.properties.host_group principal.hostname O valor é extraído do campo "properties.host_group" dentro do campo "message" do registro bruto.
message.properties.host_group target.group.group_display_name O valor é extraído do campo "properties.host_group" dentro do campo "message" do registro bruto.
message.request.id target.resource.product_object_id O valor é extraído do campo "request.id" dentro do campo "message" do registro bruto.
message.request.name target.resource.name O valor é extraído do campo "request.name" dentro do campo "message" do registro bruto.
message.user.name principal.user.userid O valor é extraído do campo "user.name" dentro do campo "message" do registro bruto.
message.user.realm principal.user.attribute.permissions.name O valor é extraído do campo "user.realm" dentro do campo "message" do registro bruto.
message.user.roles about.user.attribute.roles.name O valor é extraído do campo "user.roles" dentro do campo "message" do registro bruto.
metadata.event_type Valor codificado: "USER_RESOURCE_ACCESS"
metadata.log_type Valor codificado: "ELASTIC_SEARCH"
metadata.product_name Valor fixado no código: "ELASTICSEARCH"
metadata.vendor_name Valor codificado: "ELASTIC"
principal.port O número da porta é extraído do campo "origin.address" no campo "message" do registro bruto.
target.labels.key Valor codificado: "Indice"

Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais do Google SecOps.