Mengumpulkan log Elasticsearch
Dokumen ini menjelaskan cara menyerap log Elasticsearch ke Google Security Operations menggunakan Amazon S3. Parser mengubah log berformat JSON mentah menjadi model data terpadu (UDM). Fitur ini mengekstrak kolom dari struktur JSON bertingkat, memetakannya ke kolom UDM, dan memperkaya data dengan konteks yang relevan dengan keamanan seperti tingkat keparahan dan peran pengguna.
Sebelum memulai
- Instance Google SecOps
- Akses istimewa ke administrasi cluster Elasticsearch
- Akses istimewa ke AWS (S3, IAM, EC2)
- Instance EC2 atau host persisten untuk menjalankan Logstash
Mendapatkan prasyarat Elasticsearch
- Login ke cluster Elasticsearch Anda sebagai administrator.
- Pastikan langganan Elasticsearch Anda menyertakan Fitur keamanan (diperlukan untuk pencatatan audit).
- Catat nama dan versi cluster Elasticsearch Anda sebagai referensi.
- Identifikasi jalur tempat log audit akan ditulis (default:
$ES_HOME/logs/<clustername>_audit.json
).
Mengaktifkan logging audit Elasticsearch
- Di setiap node Elasticsearch, edit file konfigurasi elasticsearch.yml.
Tambahkan setelan berikut:
xpack.security.audit.enabled: true
Lakukan rolling restart cluster untuk menerapkan perubahan:
- Nonaktifkan alokasi shard:
PUT _cluster/settings {"persistent": {"cluster.routing.allocation.enable": "primaries"}}
- Hentikan dan mulai ulang setiap node satu per satu.
- Aktifkan kembali alokasi shard:
PUT _cluster/settings {"persistent": {"cluster.routing.allocation.enable": null}}
- Nonaktifkan alokasi shard:
Pastikan log audit dibuat di
<clustername>_audit.json
di direktori log.
Mengonfigurasi bucket AWS S3 dan IAM untuk Google SecOps
- Buat bucket Amazon S3 dengan mengikuti panduan pengguna ini: Membuat bucket
- Simpan Name dan Region bucket untuk referensi di masa mendatang (misalnya,
elastic-search-logs
). - Buat Pengguna dengan mengikuti panduan pengguna ini: Membuat pengguna IAM.
- Pilih Pengguna yang dibuat.
- Pilih tab Kredensial keamanan.
- Klik Create Access Key di bagian Access Keys.
- Pilih Layanan pihak ketiga sebagai Kasus penggunaan.
- Klik Berikutnya.
- Opsional: Tambahkan tag deskripsi.
- Klik Create access key.
- Klik Download CSV file untuk menyimpan Access Key dan Secret Access Key untuk referensi di masa mendatang.
- Klik Selesai.
- Pilih tab Permissions.
- Klik Tambahkan izin di bagian Kebijakan izin.
- Pilih Tambahkan izin.
- Pilih Lampirkan kebijakan secara langsung.
- Cari kebijakan AmazonS3FullAccess.
- Pilih kebijakan.
- Klik Berikutnya.
- Klik Add permissions.
Mengonfigurasi Logstash untuk mengirimkan log audit ke S3
- Instal Logstash di instance EC2 atau host persisten yang dapat mengakses file log audit Elasticsearch.
Instal plugin output S3 jika belum ada:
bin/logstash-plugin install logstash-output-s3
Buat file konfigurasi Logstash (
elastic-to-s3.conf
):input { file { path => "/path/to/elasticsearch/logs/*_audit.json" start_position => "beginning" codec => "json" # audit file: 1 JSON object per line sincedb_path => "/var/lib/logstash/sincedb_elastic_search" exclude => ["*.gz"] } } filter { # Intentionally minimal: do NOT reshape audit JSON the ELASTIC_SEARCH parser expects. # If you must add metadata for ops, put it under [@metadata] so it won't be written. # ruby { code => "event.set('[@metadata][ingested_at]', Time.now.utc.iso8601)" } } output { s3 { access_key_id => "YOUR_AWS_ACCESS_KEY" secret_access_key => "YOUR_AWS_SECRET_KEY" region => "us-east-1" bucket => "elastic-search-logs" prefix => "logs/%{+YYYY}/%{+MM}/%{+dd}/" codec => "json_lines" # NDJSON output (1 JSON object per line) encoding => "gzip" # compress objects server_side_encryption => true # Optionally for KMS: # server_side_encryption_kms_key_id => "arn:aws:kms:REGION:ACCT:key/KEY_ID" size_file => 104857600 # 100MB rotation time_file => 300 # 5 min rotation } }
Mulai Logstash dengan konfigurasi:
bin/logstash -f elastic-to-s3.conf
Opsional: Buat pengguna IAM hanya baca untuk Google SecOps
- Buka Konsol AWS > IAM > Pengguna > Tambahkan pengguna.
- Klik Add users.
- Berikan detail konfigurasi berikut:
- Pengguna: Masukkan
secops-reader
. - Jenis akses: Pilih Kunci akses – Akses terprogram.
- Pengguna: Masukkan
- Klik Buat pengguna.
- Lampirkan kebijakan baca minimal (kustom): Pengguna > secops-reader > Izin > Tambahkan izin > Lampirkan kebijakan secara langsung > Buat kebijakan.
Di editor JSON, masukkan kebijakan berikut:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::elastic-search-logs/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::elastic-search-logs" } ] }
Tetapkan nama ke
secops-reader-policy
.Buka Buat kebijakan > cari/pilih > Berikutnya > Tambahkan izin.
Buka Kredensial keamanan > Kunci akses > Buat kunci akses.
Download CSV (nilai ini dimasukkan ke dalam feed).
Mengonfigurasi feed di Google SecOps untuk menyerap log Elasticsearch
- Buka Setelan SIEM > Feed.
- Klik + Tambahkan Feed Baru.
- Di kolom Nama feed, masukkan nama untuk feed (misalnya,
Elasticsearch Logs
). - Pilih Amazon S3 V2 sebagai Jenis sumber.
- Pilih Elastic Search sebagai Jenis log.
- Klik Berikutnya.
- Tentukan nilai untuk parameter input berikut:
- URI S3:
s3://elastic-search-logs/logs/
- Opsi penghapusan sumber: Pilih opsi penghapusan sesuai preferensi Anda.
- Usia File Maksimum: Menyertakan file yang diubah dalam jumlah hari terakhir. Defaultnya adalah 180 hari.
- ID Kunci Akses: Kunci akses pengguna dengan akses ke bucket S3.
- Kunci Akses Rahasia: Kunci rahasia pengguna dengan akses ke bucket S3.
- Namespace aset: Namespace aset.
- Label penyerapan: Label yang diterapkan ke peristiwa dari feed ini.
- URI S3:
- Klik Berikutnya.
- Tinjau konfigurasi feed baru Anda di layar Selesaikan, lalu klik Kirim.
Tabel Pemetaan UDM
Kolom log | Pemetaan UDM | Logika |
---|---|---|
Tingkat | security_result.severity | Logika memeriksa nilai kolom "Level" dan memetakannya ke tingkat keparahan UDM yang sesuai: - "INFO", "ALL", "OFF", "TRACE", "DEBUG" dipetakan ke "INFORMATIONAL". - "WARN" dipetakan ke "LOW". - "ERROR" dipetakan ke "ERROR". - "FATAL" dipetakan ke "CRITICAL". |
message.@timestamp | timestamp | Stempel waktu diuraikan dari kolom "@timestamp" dalam kolom "message" log mentah, menggunakan format "yyyy-MM-ddTHH:mm:ss.SSS". |
message.action | security_result.action_details | Nilai diambil dari kolom "action" dalam kolom "message" pada log mentah. |
message.event.action | security_result.summary | Nilai diambil dari kolom "event.action" dalam kolom "message" log mentah. |
message.event.type | metadata.product_event_type | Nilai diambil dari kolom "event.type" dalam kolom "message" log mentah. |
message.host.ip | target.ip | Nilai diambil dari kolom "host.ip" dalam kolom "message" pada log mentah. |
message.host.name | target.hostname | Nilai diambil dari kolom "host.name" dalam kolom "message" log mentah. |
message.indices | target.labels.value | Nilai diambil dari kolom "indices" dalam kolom "message" pada log mentah. |
message.mrId | target.hostname | Nilai diambil dari kolom "mrId" dalam kolom "message" pada log mentah. |
message.node.id | principal.asset.product_object_id | Nilai diambil dari kolom "node.id" dalam kolom "message" pada log mentah. |
message.node.name | target.asset.hostname | Nilai diambil dari kolom "node.name" dalam kolom "message" dari log mentah. |
message.origin.address | principal.ip | Alamat IP diekstrak dari kolom "origin.address" dalam kolom "message" log mentah, dengan menghapus nomor port. |
message.origin.type | principal.resource.resource_subtype | Nilai diambil dari kolom "origin.type" dalam kolom "message" pada log mentah. |
message.properties.host_group | principal.hostname | Nilai diambil dari kolom "properties.host_group" dalam kolom "message" log mentah. |
message.properties.host_group | target.group.group_display_name | Nilai diambil dari kolom "properties.host_group" dalam kolom "message" log mentah. |
message.request.id | target.resource.product_object_id | Nilai diambil dari kolom "request.id" dalam kolom "message" log mentah. |
message.request.name | target.resource.name | Nilai diambil dari kolom "request.name" dalam kolom "message" pada log mentah. |
message.user.name | principal.user.userid | Nilai diambil dari kolom "user.name" dalam kolom "message" log mentah. |
message.user.realm | principal.user.attribute.permissions.name | Nilai diambil dari kolom "user.realm" dalam kolom "message" pada log mentah. |
message.user.roles | about.user.attribute.roles.name | Nilai diambil dari kolom "user.roles" dalam kolom "message" pada log mentah. |
metadata.event_type | Nilai yang di-hardcode: "USER_RESOURCE_ACCESS" | |
metadata.log_type | Nilai yang di-hardcode: "ELASTIC_SEARCH" | |
metadata.product_name | Nilai yang di-hardcode: "ELASTICSEARCH" | |
metadata.vendor_name | Nilai yang dikodekan secara permanen: "ELASTIC" | |
principal.port | Nomor port diekstrak dari kolom "origin.address" dalam kolom "message" log mentah. | |
target.labels.key | Nilai yang di-hardcode: "Indice" |
Perlu bantuan lain? Dapatkan jawaban dari anggota Komunitas dan profesional Google SecOps.