Recolha registos do Elasticsearch

Compatível com:

Este documento explica como carregar registos do Elasticsearch para o Google Security Operations através do Amazon S3. O analisador transforma os registos formatados JSON não processados num modelo de dados unificado (UDM). Extrai campos de estruturas JSON aninhadas, mapeia-os para campos UDM e enriquece os dados com contexto relevante para a segurança, como níveis de gravidade e funções de utilizador.

Antes de começar

  • Uma instância do Google SecOps
  • Acesso privilegiado à administração do cluster do Elasticsearch
  • Acesso privilegiado à AWS (S3, IAM, EC2)
  • Instância do EC2 ou anfitrião persistente para executar o Logstash

Obtenha os pré-requisitos do Elasticsearch

  1. Inicie sessão no seu cluster do Elasticsearch como administrador.
  2. Verifique se a sua subscrição do Elasticsearch inclui Funcionalidades de segurança (obrigatórias para o registo de auditoria).
  3. Tome nota do nome e da versão do cluster do Elasticsearch para referência.
  4. Identifique o caminho onde os registos de auditoria vão ser escritos (predefinição: $ES_HOME/logs/<clustername>_audit.json).

Ative o registo de auditoria do Elasticsearch

  1. Em cada nó do Elasticsearch, edite o ficheiro de configuração elasticsearch.yml.
  2. Adicione a seguinte definição:

    xpack.security.audit.enabled: true
    
  3. Faça um reinício contínuo do cluster para aplicar as alterações:

    • Desative a atribuição de fragmentos: PUT _cluster/settings {"persistent": {"cluster.routing.allocation.enable": "primaries"}}
    • Pare e reinicie cada nó um de cada vez.
    • Reative a atribuição de fragmentos: PUT _cluster/settings {"persistent": {"cluster.routing.allocation.enable": null}}
  4. Verifique se os registos de auditoria estão a ser gerados em <clustername>_audit.json no diretório de registos.

Configure o contentor do AWS S3 e o IAM para o Google SecOps

  1. Crie um contentor do Amazon S3 seguindo este guia do utilizador: Criar um contentor
  2. Guarde o nome e a região do contentor para referência futura (por exemplo, elastic-search-logs).
  3. Crie um utilizador seguindo este guia do utilizador: criar um utilizador do IAM.
  4. Selecione o utilizador criado.
  5. Selecione o separador Credenciais de segurança.
  6. Clique em Criar chave de acesso na secção Chaves de acesso.
  7. Selecione Serviço de terceiros como Exemplo de utilização.
  8. Clicar em Seguinte.
  9. Opcional: adicione uma etiqueta de descrição.
  10. Clique em Criar chave de acesso.
  11. Clique em Transferir ficheiro CSV para guardar a chave de acesso e a chave de acesso secreta para referência futura.
  12. Clique em Concluído.
  13. Selecione o separador Autorizações.
  14. Clique em Adicionar autorizações na secção Políticas de autorizações.
  15. Selecione Adicionar autorizações.
  16. Selecione Anexar políticas diretamente.
  17. Pesquise a política AmazonS3FullAccess.
  18. Selecione a política.
  19. Clicar em Seguinte.
  20. Clique em Adicionar autorizações.

Configure o Logstash para enviar registos de auditoria para o S3

  1. Instale o Logstash numa instância do EC2 ou num anfitrião persistente que possa aceder aos ficheiros de registo de auditoria do Elasticsearch.
  2. Instale o plug-in de saída do S3, se ainda não estiver presente:

    bin/logstash-plugin install logstash-output-s3
    
  3. Crie um ficheiro de configuração do Logstash (elastic-to-s3.conf):

    input {
      file {
        path => "/path/to/elasticsearch/logs/*_audit.json"
        start_position => "beginning"
        codec => "json"              # audit file: 1 JSON object per line
        sincedb_path => "/var/lib/logstash/sincedb_elastic_search"
        exclude => ["*.gz"]
      }
    }
    
    filter {
      # Intentionally minimal: do NOT reshape audit JSON the ELASTIC_SEARCH parser expects.
      # If you must add metadata for ops, put it under [@metadata] so it won't be written.
      # ruby { code => "event.set('[@metadata][ingested_at]', Time.now.utc.iso8601)" }
    }
    
    output {
      s3 {
        access_key_id => "YOUR_AWS_ACCESS_KEY"
        secret_access_key => "YOUR_AWS_SECRET_KEY"
        region => "us-east-1"
        bucket => "elastic-search-logs"
        prefix => "logs/%{+YYYY}/%{+MM}/%{+dd}/"
    
        codec => "json_lines"        # NDJSON output (1 JSON object per line)
        encoding => "gzip"           # compress objects
    
        server_side_encryption => true
        # Optionally for KMS:
        # server_side_encryption_kms_key_id => "arn:aws:kms:REGION:ACCT:key/KEY_ID"
    
        size_file => 104857600       # 100MB rotation
        time_file => 300             # 5 min rotation
      }
    }
    
  4. Inicie o Logstash com a configuração:

    bin/logstash -f elastic-to-s3.conf
    

Opcional: crie um utilizador do IAM só de leitura para o Google SecOps

  1. Aceda a AWS Console > IAM > Users > Add users.
  2. Clique em Adicionar utilizadores.
  3. Indique os seguintes detalhes de configuração:
    • Utilizador: introduza secops-reader.
    • Tipo de acesso: selecione Chave de acesso – Acesso programático.
  4. Clique em Criar utilizador.
  5. Anexe a política de leitura mínima (personalizada): Users > secops-reader > Permissions > Add permissions > Attach policies directly > Create policy.
  6. No editor JSON, introduza a seguinte política:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::elastic-search-logs/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::elastic-search-logs"
        }
      ]
    }
    
  7. Defina o nome como secops-reader-policy.

  8. Aceda a Criar política > pesquise/selecione > Seguinte > Adicionar autorizações.

  9. Aceda a Credenciais de segurança > Chaves de acesso > Criar chave de acesso.

  10. Transfira o CSV (estes valores são introduzidos no feed).

Configure um feed no Google SecOps para carregar registos do Elasticsearch

  1. Aceda a Definições do SIEM > Feeds.
  2. Clique em + Adicionar novo feed.
  3. No campo Nome do feed, introduza um nome para o feed (por exemplo, Elasticsearch Logs).
  4. Selecione Amazon S3 V2 como o Tipo de origem.
  5. Selecione Elastic Search como o Tipo de registo.
  6. Clicar em Seguinte.
  7. Especifique valores para os seguintes parâmetros de entrada:
    • URI do S3: s3://elastic-search-logs/logs/
    • Opções de eliminação de origens: selecione a opção de eliminação de acordo com a sua preferência.
    • Idade máxima do ficheiro: inclua ficheiros modificados no último número de dias. A predefinição é 180 dias.
    • ID da chave de acesso: chave de acesso do utilizador com acesso ao contentor do S3.
    • Chave de acesso secreta: chave secreta do utilizador com acesso ao contentor do S3.
    • Espaço de nomes do recurso: o espaço de nomes do recurso.
    • Etiquetas de carregamento: a etiqueta aplicada aos eventos deste feed.
  8. Clicar em Seguinte.
  9. Reveja a nova configuração do feed no ecrã Finalizar e, de seguida, clique em Enviar.

Tabela de mapeamento da UDM

Campo de registo Mapeamento do UDM Lógica
Nível security_result.severity A lógica verifica o valor do campo "Nível" e mapeia-o para o nível de gravidade correspondente do UDM:
: "INFO", "ALL", "OFF", "TRACE", "DEBUG" são mapeados para "INFORMATIONAL".
: "WARN" está mapeado para "LOW".
: "ERROR" está mapeado para "ERROR".
: "FATAL" está mapeado para "CRITICAL".
message.@timestamp timestamp A data/hora é analisada a partir do campo "@timestamp" no campo "message" do registo não processado, usando o formato "aaaa-MM-ddTHH:mm:ss.SSS".
message.action security_result.action_details O valor é retirado do campo "action" no campo "message" do registo não processado.
message.event.action security_result.summary O valor é retirado do campo "event.action" no campo "message" do registo não processado.
message.event.type metadata.product_event_type O valor é retirado do campo "event.type" no campo "message" do registo não processado.
message.host.ip target.ip O valor é retirado do campo "host.ip" no campo "message" do registo não processado.
message.host.name target.hostname O valor é retirado do campo "host.name" no campo "message" do registo não processado.
message.indices target.labels.value O valor é retirado do campo "indices" no campo "message" do registo não processado.
message.mrId target.hostname O valor é retirado do campo "mrId" no campo "message" do registo não processado.
message.node.id principal.asset.product_object_id O valor é retirado do campo "node.id" no campo "message" do registo não processado.
message.node.name target.asset.hostname O valor é retirado do campo "node.name" no campo "message" do registo não processado.
message.origin.address principal.ip O endereço IP é extraído do campo "origin.address" no campo "message" do registo não processado, removendo o número da porta.
message.origin.type principal.resource.resource_subtype O valor é retirado do campo "origin.type" no campo "message" do registo não processado.
message.properties.host_group principal.hostname O valor é retirado do campo "properties.host_group" no campo "message" do registo não processado.
message.properties.host_group target.group.group_display_name O valor é retirado do campo "properties.host_group" no campo "message" do registo não processado.
message.request.id target.resource.product_object_id O valor é retirado do campo "request.id" no campo "message" do registo não processado.
message.request.name target.resource.name O valor é retirado do campo "request.name" no campo "message" do registo não processado.
message.user.name principal.user.userid O valor é retirado do campo "user.name" no campo "message" do registo não processado.
message.user.realm principal.user.attribute.permissions.name O valor é retirado do campo "user.realm" no campo "message" do registo não processado.
message.user.roles about.user.attribute.roles.name O valor é retirado do campo "user.roles" no campo "message" do registo não processado.
metadata.event_type Valor codificado: "USER_RESOURCE_ACCESS"
metadata.log_type Valor codificado: "ELASTIC_SEARCH"
metadata.product_name Valor codificado: "ELASTICSEARCH"
metadata.vendor_name Valor codificado: "ELASTIC"
principal.port O número da porta é extraído do campo "origin.address" no campo "message" do registo não processado.
target.labels.key Valor codificado: "Indice"

Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais da Google SecOps.