Mengintegrasikan Apache Kafka dengan Google SecOps
Dokumen ini menjelaskan cara mengintegrasikan Apache Kafka dengan Google Security Operations (Google SecOps).
Kasus penggunaan
Integrasi Apache Kafka dapat menangani kasus penggunaan berikut:
Penyerapan log keamanan real-time: Secara otomatis menyerap dan memproses peristiwa keamanan dari topik Kafka ke Google SecOps. Hal ini memungkinkan pengelolaan log terpusat dan analisis real-time untuk menghasilkan pemberitahuan berdasarkan data streaming.
Otomatisasi berbasis peristiwa: Memicu playbook otomatis di Google SecOps berdasarkan peristiwa keamanan atau pesan tertentu yang di-streaming dari topik Kafka. Hal ini mempercepat respons terhadap peristiwa penting seperti login pengguna dari lokasi yang tidak biasa.
Pengayaan threat intelligence: Tarik feed threat intelligence kustom dari topik Kafka untuk memperkaya pemberitahuan dan kasus yang ada. Hal ini memberikan konteks terbaru tentang Indikator Kompromi (IOC) kepada analis dan meningkatkan akurasi analisis ancaman.
Sebelum memulai
Sebelum mengonfigurasi integrasi Apache Kafka di Google SecOps, selesaikan prasyarat berikut:
- Server Apache Kafka: Pastikan Anda memiliki akses ke server Apache Kafka yang sedang berjalan dengan broker dan topik Kafka yang diperlukan telah dikonfigurasi.
Image Docker agen jarak jauh: Saat membuat agen jarak jauh, Anda harus menggunakan image berbasis Debian. Gunakan gambar berikut untuk memastikan kompatibilitas:
us-docker.pkg.dev/siem-ar-public/images/agent-debian:latest
Parameter integrasi
Integrasi Apache Kafka memerlukan parameter berikut:
| Parameter | Deskripsi |
|---|---|
Kafka brokers |
Wajib. Daftar broker Kafka yang dipisahkan koma untuk dihubungkan, dalam format
|
Use TLS for connection |
Opsional. Jika dipilih, integrasi akan menggunakan enkripsi TLS untuk autentikasi. Parameter ini memerlukan sertifikat Certificate Authority (CA). Tidak diaktifkan secara default. |
Use SASL PLAIN with TLS for connection |
Opsional. Jika dipilih, integrasi akan menggunakan mekanisme nama pengguna dan sandi SASL PLAIN untuk autentikasi. Opsi ini hanya didukung dengan enkripsi TLS, dan memerlukan nama pengguna dan sandi SASL serta sertifikat CA. Tidak diaktifkan secara default. |
CA certificate of Kafka server |
Opsional. Sertifikat CA yang digunakan untuk memverifikasi identitas server Kafka. Parameter ini diperlukan jika SASL diaktifkan. |
Client certificate |
Opsional. Sertifikat klien untuk autentikasi TLS bersama dengan server Kafka. Parameter ini diperlukan jika TLS bersama (mTLS) diaktifkan. |
Client certificate key |
Opsional. Kunci pribadi yang sesuai dengan sertifikat klien, yang digunakan untuk autentikasi TLS bersama. Parameter ini diperlukan jika TLS bersama (mTLS) diaktifkan. |
Client certificate key password |
Opsional. Sandi yang digunakan untuk mendekripsi kunci pribadi sertifikat klien. Parameter ini diperlukan jika TLS bersama (mTLS) diaktifkan. |
SASL PLAIN Username |
Opsional. Nama pengguna untuk autentikasi SASL PLAIN dengan broker Kafka. Parameter ini diperlukan jika SASL diaktifkan. |
SASL PLAIN Password |
Opsional. Sandi untuk autentikasi SASL PLAIN dengan broker Kafka. Parameter ini diperlukan jika SASL diaktifkan. |
Untuk mengetahui petunjuk tentang cara mengonfigurasi integrasi di Google SecOps, lihat Mengonfigurasi integrasi.
Anda dapat melakukan perubahan di tahap selanjutnya, jika diperlukan. Setelah mengonfigurasi instance integrasi, Anda dapat menggunakannya dalam playbook. Untuk mengetahui informasi selengkapnya tentang cara mengonfigurasi dan mendukung beberapa instance, lihat Mendukung beberapa instance.
Tindakan
Untuk mengetahui informasi selengkapnya tentang tindakan, lihat Merespons tindakan tertunda dari Ruang Kerja Anda dan Melakukan tindakan manual.
Ping
Gunakan tindakan Ping untuk menguji konektivitas ke Apache Kafka.
Tindakan ini tidak berjalan di entity Google SecOps.
Input tindakan
Tidak ada.
Output tindakan
Tindakan Ping memberikan output berikut:
| Jenis output tindakan | Ketersediaan |
|---|---|
| Lampiran repositori kasus | Tidak tersedia |
| Link repositori kasus | Tidak tersedia |
| Tabel repositori kasus | Tidak tersedia |
| Tabel pengayaan | Tidak tersedia |
| Hasil JSON | Tidak tersedia |
| Pesan output | Tersedia |
| Hasil skrip. | Tersedia |
Pesan output
Tindakan Ping dapat menampilkan pesan output berikut:
| Pesan output | Deskripsi pesan |
|---|---|
|
Tindakan berhasil. |
Failed to connect to the Apache Kafka server!
Error is ERROR_REASON |
Tindakan gagal. Periksa koneksi ke server, parameter input, atau kredensial. |
Hasil skrip
Tabel berikut mencantumkan nilai untuk output hasil skrip saat menggunakan tindakan Ping:
| Nama hasil skrip | Nilai |
|---|---|
is_success |
True atau False |
Konektor
Untuk mempelajari lebih lanjut cara mengonfigurasi konektor di Google SecOps, lihat Menyerap data Anda (konektor).
Apache Kafka - Messages Connector
Gunakan Apache Kafka - Messages Connector untuk mengambil pesan dari Apache Kafka.
Konektor mengambil pesan dari topik Kafka yang ditentukan dan dapat memprosesnya dengan berbagai cara berdasarkan format pesan. Jika pesan adalah objek JSON yang valid, konektor akan mengekstrak kolom tertentu untuk pembuatan dan pemetaan pemberitahuan. Jika pesan adalah string biasa, pesan akan diproses sebagai data peristiwa mentah.
Konektor menangani pemetaan tingkat keparahan, pembuatan template nama pemberitahuan, dan pembuatan ID unik berdasarkan parameter yang Anda berikan.
Pemetaan tingkat keparahan JSON
Untuk memetakan tingkat keparahan pemberitahuan, Anda perlu menentukan kolom yang digunakan oleh
Apache Kafka - Messages Connector untuk
mendapatkan nilai tingkat keparahan dalam parameter Severity Mapping JSON. Respons
konektor dapat berisi jenis nilai, seperti integer, float, dan string.
Apache Kafka - Messages Connector membaca nilai integer dan float
serta memetakannya sesuai dengan setelan Google SecOps. Tabel berikut menunjukkan pemetaan nilai integer ke tingkat keparahan di Google SecOps:
| Nilai bilangan bulat | Tingkat keparahan yang dipetakan |
|---|---|
100 |
Critical |
Dari 80 hingga 100 |
High |
Dari 60 hingga 80 |
Medium |
Dari 40 hingga 60 |
Low |
Kurang dari 40 |
Informational |
Jika respons berisi nilai string, Pub/Sub – Messages Connector memerlukan konfigurasi tambahan.
Awalnya, nilai default muncul sebagai berikut:
{
"Default": 60
}
Jika nilai yang diperlukan untuk pemetaan berada di kunci JSON event_severity, nilai dapat berupa sebagai berikut:
"Malicious""Benign""Unknown"
Untuk mengurai nilai kunci JSON event_severity dan memastikan objek JSON memiliki format yang benar, konfigurasikan parameter Severity Mapping JSON sebagai berikut:
{
"event_severity": {
"Malicious": 100,
"Unknown": 60,
"Benign": -1
},
"Default": 50
}
Nilai "Default" wajib diisi.
Jika ada beberapa kecocokan untuk objek JSON yang sama, Apache Kafka - Messages Connector akan memprioritaskan kunci objek JSON pertama.
Untuk menggunakan kolom yang berisi nilai integer atau float, konfigurasikan kunci dan string kosong dalam parameter Severity Mapping JSON:
{
"Default":"60",
"integer_field": "",
"float_field": ""
}
Input konektor
Apache Kafka - Messages Connector memerlukan parameter berikut:
| Parameter | Deskripsi |
|---|---|
Product Field Name |
Wajib. Nama kolom tempat nama produk disimpan. Nama produk terutama memengaruhi pemetaan. Untuk menyederhanakan dan meningkatkan proses pemetaan untuk konektor, nilai default di-resolve ke nilai penggantian yang dirujuk dari kode. Input yang tidak valid untuk parameter ini akan diselesaikan ke nilai penggantian secara default. Nilai defaultnya adalah |
Event Field Name |
Wajib. Nama kolom yang menentukan nama peristiwa (subjenis). Nilai defaultnya adalah |
Environment Field Name |
Opsional. Nama kolom tempat nama lingkungan disimpan. Jika kolom environment tidak ada, konektor akan menggunakan nilai default. Nilai defaultnya adalah |
Environment Regex Pattern |
Opsional. Pola ekspresi reguler untuk dijalankan pada nilai yang ditemukan di kolom
Gunakan nilai default Jika pola ekspresi reguler adalah null atau kosong, atau nilai lingkungan adalah null, hasil lingkungan akhir adalah lingkungan default. |
Script Timeout (Seconds) |
Wajib. Batas waktu, dalam detik, untuk proses Python yang menjalankan skrip saat ini. Nilai defaultnya adalah |
Kafka brokers |
Wajib. Daftar broker Kafka yang dipisahkan koma untuk dihubungkan, dalam format
|
Use TLS for connection |
Opsional. Jika dipilih, integrasi akan menggunakan enkripsi TLS untuk autentikasi. Parameter ini memerlukan sertifikat CA. Tidak diaktifkan secara default. |
Use SASL PLAIN with TLS for connection |
Opsional. Jika dipilih, integrasi akan menggunakan mekanisme nama pengguna dan sandi SASL PLAIN untuk autentikasi. Opsi ini mengharuskan nama pengguna dan sandi SASL diberikan. Fitur ini hanya didukung dengan enkripsi TLS, yang memerlukan sertifikat CA. Tidak diaktifkan secara default. |
CA certificate of Kafka server |
Opsional. Sertifikat CA yang digunakan untuk memverifikasi identitas server Kafka. |
Client certificate |
Opsional. Sertifikat klien untuk autentikasi TLS bersama dengan server Kafka. |
Client certificate key |
Opsional. Kunci pribadi yang sesuai dengan sertifikat klien, yang digunakan untuk autentikasi TLS bersama. |
Client certificate key password |
Opsional. Sandi yang digunakan untuk mendekripsi kunci pribadi sertifikat klien. |
SASL PLAIN Username |
Opsional. Nama pengguna untuk autentikasi SASL PLAIN dengan broker Kafka. |
SASL PLAIN Password |
Opsional. Sandi untuk autentikasi SASL PLAIN dengan broker Kafka. |
Topic |
Wajib. Topik Kafka tempat insiden diambil. |
Consumer Group ID |
Opsional. ID grup konsumen yang digunakan saat mengambil insiden. Jika tidak ada nilai yang diberikan, ID unik akan dibuat. |
Partitions |
Opsional. Daftar partisi CSV tempat pesan akan diambil. |
Initial Offset |
Opsional. Tempat konektor mulai mengambil pesan dari partisi Kafka. Anda dapat menentukan bilangan bulat positif untuk memulai pada offset tertentu, atau menggunakan
nilai |
Poll Timeout |
Opsional. Waktu tunggu polling untuk menggunakan pesan dari Kafka, dalam detik. |
Case Name Template |
Opsional. Template untuk menentukan nama kasus kustom. Konektor menambahkan kunci
Anda dapat menggunakan placeholder dalam format FIELD_NAME, yang diisi dari nilai string peristiwa pertama. Contoh: |
Alert Name Template |
Wajib. Template untuk menentukan nama pemberitahuan. Anda dapat menggunakan placeholder dalam format FIELD_NAME, yang diisi dari nilai string peristiwa pertama. Contoh: Jika nilai tidak diberikan atau template tidak valid, konektor akan menggunakan nama pemberitahuan default. |
Rule Generator Template |
Wajib. Template untuk menentukan generator aturan. Anda dapat menggunakan placeholder dalam format FIELD_NAME, yang diisi dari nilai string peristiwa pertama. Contoh: Jika nilai tidak diberikan atau template tidak valid, konektor akan menggunakan nama generator aturan default. |
Timestamp Field |
Wajib. Nama kolom dalam pesan Kafka yang berisi stempel waktu pemberitahuan Google SecOps. Jika stempel waktu tidak dalam format epoch Unix, formatnya harus ditentukan
dalam parameter |
Timestamp Format |
Opsional. Format stempel waktu pesan, diperlukan untuk stempel waktu
epoch non-Unix. Gunakan kode format Jika stempel waktu tidak dalam format epoch Unix dan parameter ini tidak dikonfigurasi, konektor akan gagal. |
Severity Mapping JSON |
Wajib. Objek JSON yang digunakan oleh konektor untuk mengekstrak dan memetakan tingkat keparahan dari pesan ke skala prioritas Google SecOps. Nilai defaultnya adalah |
Unique ID Field |
Opsional. Nama kolom yang akan digunakan sebagai ID pesan unik. Jika tidak ada nilai yang diberikan, konektor akan membuat dan menggunakan hash SHA-256 dari konten pesan sebagai ID pesan. |
Max Messages To Fetch |
Wajib. Jumlah maksimum pesan yang diproses konektor untuk setiap iterasi. Nilai defaultnya adalah |
Disable Overflow |
Opsional. Jika dipilih, konektor akan mengabaikan mekanisme peluapan Google SecOps. Diaktifkan secara default. |
Verify SSL |
Wajib. Jika dipilih, integrasi akan memvalidasi sertifikat SSL saat terhubung ke server Apache Kafka. Diaktifkan secara default. |
Proxy Server Address |
Opsional. Alamat server proxy yang akan digunakan. |
Proxy Username |
Opsional. Nama pengguna proxy untuk melakukan autentikasi. |
Proxy Password |
Opsional. Sandi proxy untuk mengautentikasi. |
Aturan konektor
Konektor mendukung proxy.
Notifikasi konektor
Tabel berikut menjelaskan pemetaan kolom pesan Apache Kafka ke kolom pemberitahuan Google SecOps:
| Kolom Pemberitahuan Siemplify | Kolom Pesan Apache Kafka |
|---|---|
SourceSystemName |
Diisi oleh framework. |
TicketId |
Nilai kolom ID unik atau hash SHA-256 pesan. |
DisplayId |
ApacheKafka_{unique id or hash}_{connector identifier} |
Name |
Nilai yang dihasilkan oleh Alert Name Template. |
Reason |
T/A |
Description |
T/A |
DeviceVendor |
Hardcode: Apache Kafka |
DeviceProduct |
Nilai penggantian: Message |
Priority |
Dipetakan dari parameter Severity Mapping JSON. |
RuleGenerator |
Nilai yang dihasilkan oleh Rule Generator Template. |
SourceGroupingIdentifier |
T/A |
StartTime |
Dikonversi dari Timestamp Field. |
EndTime |
Dikonversi dari Timestamp Field. |
Siemplify Alert - Extensions |
T/A |
Siemplify Alert - Attachments |
T/A |
Peristiwa konektor
Contoh peristiwa konektor adalah sebagai berikut:
{
"notificationConfigName": "organizations/ORGANIZATION_ID/notificationConfigs/soar_connector_CONNECTOR_ID_toxic_notifications_config",
"finding": {
"name": "organizations/ORGANIZATION_ID/sources/SOURCE_ID/findings/FINDING_ID",
"parent": "organizations/ORGANIZATION_ID/sources/SOURCE_ID",
"resourceName": "//compute.googleapis.com/projects/PROJECT_ID/global/firewalls/FIREWALL_ID",
"state": "ACTIVE",
"category": "OPEN_NETBIOS_PORT",
"externalUri": "https://console.cloud.google.com/networking/firewalls/details/default-allow-rdp?project=PROJECT_ID",
"sourceProperties": {
"Recommendation": "Restrict the firewall rules at: https://console.cloud.google.com/networking/firewalls/details/default-allow-rdp?project=PROJECT_ID",
"ExceptionInstructions": "Add the security mark \"allow_open_netbios_port\" to the asset with a value of \"true\" to prevent this finding from being activated again.",
"Explanation": "Firewall rules that allow connections from all IP addresses on TCP ports 137-139 or UDP ports 137-139 may expose NetBIOS services to attackers.",
"ScannerName": "FIREWALL_SCANNER",
"ResourcePath": [
"projects/PROJECT_ID/",
"folders/FOLDER_ID_1/",
"folders/FOLDER_ID_2/",
"organizations/ORGANIZATION_ID/"
],
"ExposedService": "NetBIOS",
"OpenPorts": {
"TCP": [
137.0,
138.0,
139.0
],
"UDP": [
137.0,
138.0,
139.0
]
},
"compliance_standards": {
"iso": [
{
"ids": [
"A.13.1.1"
]
}
],
"pci": [
{
"ids": [
"1.2.1"
]
}
],
"nist": [
{
"ids": [
"SC-7"
]
}
]
},
"ReactivationCount": 4.0
},
"securityMarks": {
"name": "organizations/ORGANIZATION_ID/sources/SOURCE_ID/findings/FINDING_ID/securityMarks",
"marks": {
"USER_ID": "SECURITY_MARK"
}
},
"eventTime": "2024-08-30T14:44:37.973090Z",
"createTime": "2024-06-24T07:08:54.777Z",
"propertyDataTypes": {
"ResourcePath": {
"listValues": {
"propertyDataTypes": [
{
"primitiveDataType": "STRING"
}
]
}
},
"ReactivationCount": {
"primitiveDataType": "NUMBER"
},
"Explanation": {
"primitiveDataType": "STRING"
},
"ExposedService": {
"primitiveDataType": "STRING"
},
"ScannerName": {
"primitiveDataType": "STRING"
}
}
}
}
Perlu bantuan lain? Dapatkan jawaban dari anggota Komunitas dan profesional Google SecOps.