Mengumpulkan log Elasticsearch

Didukung di:

Dokumen ini menjelaskan cara menyerap log Elasticsearch ke Google Security Operations menggunakan Amazon S3. Parser mengubah log berformat JSON mentah menjadi model data terpadu (UDM). Fitur ini mengekstrak kolom dari struktur JSON bertingkat, memetakannya ke kolom UDM, dan memperkaya data dengan konteks yang relevan dengan keamanan seperti tingkat keparahan dan peran pengguna.

Sebelum memulai

  • Instance Google SecOps
  • Akses istimewa ke administrasi cluster Elasticsearch
  • Akses istimewa ke AWS (S3, IAM, EC2)
  • Instance EC2 atau host persisten untuk menjalankan Logstash

Mendapatkan prasyarat Elasticsearch

  1. Login ke cluster Elasticsearch Anda sebagai administrator.
  2. Pastikan langganan Elasticsearch Anda menyertakan Fitur keamanan (diperlukan untuk pencatatan audit).
  3. Catat nama dan versi cluster Elasticsearch Anda sebagai referensi.
  4. Identifikasi jalur tempat log audit akan ditulis (default: $ES_HOME/logs/<clustername>_audit.json).

Mengaktifkan logging audit Elasticsearch

  1. Di setiap node Elasticsearch, edit file konfigurasi elasticsearch.yml.
  2. Tambahkan setelan berikut:

    xpack.security.audit.enabled: true
    
  3. Lakukan rolling restart cluster untuk menerapkan perubahan:

    • Nonaktifkan alokasi shard: PUT _cluster/settings {"persistent": {"cluster.routing.allocation.enable": "primaries"}}
    • Hentikan dan mulai ulang setiap node satu per satu.
    • Aktifkan kembali alokasi shard: PUT _cluster/settings {"persistent": {"cluster.routing.allocation.enable": null}}
  4. Pastikan log audit dibuat di <clustername>_audit.json di direktori log.

Mengonfigurasi bucket AWS S3 dan IAM untuk Google SecOps

  1. Buat bucket Amazon S3 dengan mengikuti panduan pengguna ini: Membuat bucket
  2. Simpan Name dan Region bucket untuk referensi di masa mendatang (misalnya, elastic-search-logs).
  3. Buat Pengguna dengan mengikuti panduan pengguna ini: Membuat pengguna IAM.
  4. Pilih Pengguna yang dibuat.
  5. Pilih tab Kredensial keamanan.
  6. Klik Create Access Key di bagian Access Keys.
  7. Pilih Layanan pihak ketiga sebagai Kasus penggunaan.
  8. Klik Berikutnya.
  9. Opsional: Tambahkan tag deskripsi.
  10. Klik Create access key.
  11. Klik Download CSV file untuk menyimpan Access Key dan Secret Access Key untuk referensi di masa mendatang.
  12. Klik Selesai.
  13. Pilih tab Permissions.
  14. Klik Tambahkan izin di bagian Kebijakan izin.
  15. Pilih Tambahkan izin.
  16. Pilih Lampirkan kebijakan secara langsung.
  17. Cari kebijakan AmazonS3FullAccess.
  18. Pilih kebijakan.
  19. Klik Berikutnya.
  20. Klik Add permissions.

Mengonfigurasi Logstash untuk mengirimkan log audit ke S3

  1. Instal Logstash di instance EC2 atau host persisten yang dapat mengakses file log audit Elasticsearch.
  2. Instal plugin output S3 jika belum ada:

    bin/logstash-plugin install logstash-output-s3
    
  3. Buat file konfigurasi Logstash (elastic-to-s3.conf):

    input {
      file {
        path => "/path/to/elasticsearch/logs/*_audit.json"
        start_position => "beginning"
        codec => "json"              # audit file: 1 JSON object per line
        sincedb_path => "/var/lib/logstash/sincedb_elastic_search"
        exclude => ["*.gz"]
      }
    }
    
    filter {
      # Intentionally minimal: do NOT reshape audit JSON the ELASTIC_SEARCH parser expects.
      # If you must add metadata for ops, put it under [@metadata] so it won't be written.
      # ruby { code => "event.set('[@metadata][ingested_at]', Time.now.utc.iso8601)" }
    }
    
    output {
      s3 {
        access_key_id => "YOUR_AWS_ACCESS_KEY"
        secret_access_key => "YOUR_AWS_SECRET_KEY"
        region => "us-east-1"
        bucket => "elastic-search-logs"
        prefix => "logs/%{+YYYY}/%{+MM}/%{+dd}/"
    
        codec => "json_lines"        # NDJSON output (1 JSON object per line)
        encoding => "gzip"           # compress objects
    
        server_side_encryption => true
        # Optionally for KMS:
        # server_side_encryption_kms_key_id => "arn:aws:kms:REGION:ACCT:key/KEY_ID"
    
        size_file => 104857600       # 100MB rotation
        time_file => 300             # 5 min rotation
      }
    }
    
  4. Mulai Logstash dengan konfigurasi:

    bin/logstash -f elastic-to-s3.conf
    

Opsional: Buat pengguna IAM hanya baca untuk Google SecOps

  1. Buka Konsol AWS > IAM > Pengguna > Tambahkan pengguna.
  2. Klik Add users.
  3. Berikan detail konfigurasi berikut:
    • Pengguna: Masukkan secops-reader.
    • Jenis akses: Pilih Kunci akses – Akses terprogram.
  4. Klik Buat pengguna.
  5. Lampirkan kebijakan baca minimal (kustom): Pengguna > secops-reader > Izin > Tambahkan izin > Lampirkan kebijakan secara langsung > Buat kebijakan.
  6. Di editor JSON, masukkan kebijakan berikut:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::elastic-search-logs/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::elastic-search-logs"
        }
      ]
    }
    
  7. Tetapkan nama ke secops-reader-policy.

  8. Buka Buat kebijakan > cari/pilih > Berikutnya > Tambahkan izin.

  9. Buka Kredensial keamanan > Kunci akses > Buat kunci akses.

  10. Download CSV (nilai ini dimasukkan ke dalam feed).

Mengonfigurasi feed di Google SecOps untuk menyerap log Elasticsearch

  1. Buka Setelan SIEM > Feed.
  2. Klik + Tambahkan Feed Baru.
  3. Di kolom Nama feed, masukkan nama untuk feed (misalnya, Elasticsearch Logs).
  4. Pilih Amazon S3 V2 sebagai Jenis sumber.
  5. Pilih Elastic Search sebagai Jenis log.
  6. Klik Berikutnya.
  7. Tentukan nilai untuk parameter input berikut:
    • URI S3: s3://elastic-search-logs/logs/
    • Opsi penghapusan sumber: Pilih opsi penghapusan sesuai preferensi Anda.
    • Usia File Maksimum: Menyertakan file yang diubah dalam jumlah hari terakhir. Defaultnya adalah 180 hari.
    • ID Kunci Akses: Kunci akses pengguna dengan akses ke bucket S3.
    • Kunci Akses Rahasia: Kunci rahasia pengguna dengan akses ke bucket S3.
    • Namespace aset: Namespace aset.
    • Label penyerapan: Label yang diterapkan ke peristiwa dari feed ini.
  8. Klik Berikutnya.
  9. Tinjau konfigurasi feed baru Anda di layar Selesaikan, lalu klik Kirim.

Tabel Pemetaan UDM

Kolom log Pemetaan UDM Logika
Tingkat security_result.severity Logika memeriksa nilai kolom "Level" dan memetakannya ke tingkat keparahan UDM yang sesuai:
- "INFO", "ALL", "OFF", "TRACE", "DEBUG" dipetakan ke "INFORMATIONAL".
- "WARN" dipetakan ke "LOW".
- "ERROR" dipetakan ke "ERROR".
- "FATAL" dipetakan ke "CRITICAL".
message.@timestamp timestamp Stempel waktu diuraikan dari kolom "@timestamp" dalam kolom "message" log mentah, menggunakan format "yyyy-MM-ddTHH:mm:ss.SSS".
message.action security_result.action_details Nilai diambil dari kolom "action" dalam kolom "message" pada log mentah.
message.event.action security_result.summary Nilai diambil dari kolom "event.action" dalam kolom "message" log mentah.
message.event.type metadata.product_event_type Nilai diambil dari kolom "event.type" dalam kolom "message" log mentah.
message.host.ip target.ip Nilai diambil dari kolom "host.ip" dalam kolom "message" pada log mentah.
message.host.name target.hostname Nilai diambil dari kolom "host.name" dalam kolom "message" log mentah.
message.indices target.labels.value Nilai diambil dari kolom "indices" dalam kolom "message" pada log mentah.
message.mrId target.hostname Nilai diambil dari kolom "mrId" dalam kolom "message" pada log mentah.
message.node.id principal.asset.product_object_id Nilai diambil dari kolom "node.id" dalam kolom "message" pada log mentah.
message.node.name target.asset.hostname Nilai diambil dari kolom "node.name" dalam kolom "message" dari log mentah.
message.origin.address principal.ip Alamat IP diekstrak dari kolom "origin.address" dalam kolom "message" log mentah, dengan menghapus nomor port.
message.origin.type principal.resource.resource_subtype Nilai diambil dari kolom "origin.type" dalam kolom "message" pada log mentah.
message.properties.host_group principal.hostname Nilai diambil dari kolom "properties.host_group" dalam kolom "message" log mentah.
message.properties.host_group target.group.group_display_name Nilai diambil dari kolom "properties.host_group" dalam kolom "message" log mentah.
message.request.id target.resource.product_object_id Nilai diambil dari kolom "request.id" dalam kolom "message" log mentah.
message.request.name target.resource.name Nilai diambil dari kolom "request.name" dalam kolom "message" pada log mentah.
message.user.name principal.user.userid Nilai diambil dari kolom "user.name" dalam kolom "message" log mentah.
message.user.realm principal.user.attribute.permissions.name Nilai diambil dari kolom "user.realm" dalam kolom "message" pada log mentah.
message.user.roles about.user.attribute.roles.name Nilai diambil dari kolom "user.roles" dalam kolom "message" pada log mentah.
metadata.event_type Nilai yang di-hardcode: "USER_RESOURCE_ACCESS"
metadata.log_type Nilai yang di-hardcode: "ELASTIC_SEARCH"
metadata.product_name Nilai yang di-hardcode: "ELASTICSEARCH"
metadata.vendor_name Nilai yang dikodekan secara permanen: "ELASTIC"
principal.port Nomor port diekstrak dari kolom "origin.address" dalam kolom "message" log mentah.
target.labels.key Nilai yang di-hardcode: "Indice"

Perlu bantuan lain? Dapatkan jawaban dari anggota Komunitas dan profesional Google SecOps.