Mengintegrasikan Apache Kafka dengan Google SecOps

Dokumen ini menjelaskan cara mengintegrasikan Apache Kafka dengan Google Security Operations (Google SecOps).

Kasus penggunaan

Integrasi Apache Kafka dapat menangani kasus penggunaan berikut:

  • Penyerapan log keamanan real-time: Secara otomatis menyerap dan memproses peristiwa keamanan dari topik Kafka ke Google SecOps. Hal ini memungkinkan pengelolaan log terpusat dan analisis real-time untuk menghasilkan pemberitahuan berdasarkan data streaming.

  • Otomatisasi berbasis peristiwa: Memicu playbook otomatis di Google SecOps berdasarkan peristiwa keamanan atau pesan tertentu yang di-streaming dari topik Kafka. Hal ini mempercepat respons terhadap peristiwa penting seperti login pengguna dari lokasi yang tidak biasa.

  • Pengayaan threat intelligence: Tarik feed threat intelligence kustom dari topik Kafka untuk memperkaya pemberitahuan dan kasus yang ada. Hal ini memberikan konteks terbaru tentang Indikator Kompromi (IOC) kepada analis dan meningkatkan akurasi analisis ancaman.

Sebelum memulai

Sebelum mengonfigurasi integrasi Apache Kafka di Google SecOps, selesaikan prasyarat berikut:

  • Server Apache Kafka: Pastikan Anda memiliki akses ke server Apache Kafka yang sedang berjalan dengan broker dan topik Kafka yang diperlukan telah dikonfigurasi.
  • Image Docker agen jarak jauh: Saat membuat agen jarak jauh, Anda harus menggunakan image berbasis Debian. Gunakan gambar berikut untuk memastikan kompatibilitas:

    us-docker.pkg.dev/siem-ar-public/images/agent-debian:latest
    

Parameter integrasi

Integrasi Apache Kafka memerlukan parameter berikut:

Parameter Deskripsi
Kafka brokers

Wajib.

Daftar broker Kafka yang dipisahkan koma untuk dihubungkan, dalam format hostname:port.

Use TLS for connection

Opsional.

Jika dipilih, integrasi akan menggunakan enkripsi TLS untuk autentikasi.

Parameter ini memerlukan sertifikat Certificate Authority (CA).

Tidak diaktifkan secara default.

Use SASL PLAIN with TLS for connection

Opsional.

Jika dipilih, integrasi akan menggunakan mekanisme nama pengguna dan sandi SASL PLAIN untuk autentikasi.

Opsi ini hanya didukung dengan enkripsi TLS, dan memerlukan nama pengguna dan sandi SASL serta sertifikat CA.

Tidak diaktifkan secara default.

CA certificate of Kafka server

Opsional.

Sertifikat CA yang digunakan untuk memverifikasi identitas server Kafka.

Parameter ini diperlukan jika SASL diaktifkan.

Client certificate

Opsional.

Sertifikat klien untuk autentikasi TLS bersama dengan server Kafka.

Parameter ini diperlukan jika TLS bersama (mTLS) diaktifkan.

Client certificate key

Opsional.

Kunci pribadi yang sesuai dengan sertifikat klien, yang digunakan untuk autentikasi TLS bersama.

Parameter ini diperlukan jika TLS bersama (mTLS) diaktifkan.

Client certificate key password

Opsional.

Sandi yang digunakan untuk mendekripsi kunci pribadi sertifikat klien.

Parameter ini diperlukan jika TLS bersama (mTLS) diaktifkan.

SASL PLAIN Username

Opsional.

Nama pengguna untuk autentikasi SASL PLAIN dengan broker Kafka.

Parameter ini diperlukan jika SASL diaktifkan.

SASL PLAIN Password

Opsional.

Sandi untuk autentikasi SASL PLAIN dengan broker Kafka.

Parameter ini diperlukan jika SASL diaktifkan.

Untuk mengetahui petunjuk tentang cara mengonfigurasi integrasi di Google SecOps, lihat Mengonfigurasi integrasi.

Anda dapat melakukan perubahan di tahap selanjutnya, jika diperlukan. Setelah mengonfigurasi instance integrasi, Anda dapat menggunakannya dalam playbook. Untuk mengetahui informasi selengkapnya tentang cara mengonfigurasi dan mendukung beberapa instance, lihat Mendukung beberapa instance.

Tindakan

Untuk mengetahui informasi selengkapnya tentang tindakan, lihat Merespons tindakan tertunda dari Ruang Kerja Anda dan Melakukan tindakan manual.

Ping

Gunakan tindakan Ping untuk menguji konektivitas ke Apache Kafka.

Tindakan ini tidak berjalan di entity Google SecOps.

Input tindakan

Tidak ada.

Output tindakan

Tindakan Ping memberikan output berikut:

Jenis output tindakan Ketersediaan
Lampiran repositori kasus Tidak tersedia
Link repositori kasus Tidak tersedia
Tabel repositori kasus Tidak tersedia
Tabel pengayaan Tidak tersedia
Hasil JSON Tidak tersedia
Pesan output Tersedia
Hasil skrip. Tersedia
Pesan output

Tindakan Ping dapat menampilkan pesan output berikut:

Pesan output Deskripsi pesan

Successfully connected to the Apache Kafka server with the provided connection parameters!

Tindakan berhasil.
Failed to connect to the Apache Kafka server! Error is ERROR_REASON

Tindakan gagal.

Periksa koneksi ke server, parameter input, atau kredensial.

Hasil skrip

Tabel berikut mencantumkan nilai untuk output hasil skrip saat menggunakan tindakan Ping:

Nama hasil skrip Nilai
is_success True atau False

Konektor

Untuk mempelajari lebih lanjut cara mengonfigurasi konektor di Google SecOps, lihat Menyerap data Anda (konektor).

Apache Kafka - Messages Connector

Gunakan Apache Kafka - Messages Connector untuk mengambil pesan dari Apache Kafka.

Konektor mengambil pesan dari topik Kafka yang ditentukan dan dapat memprosesnya dengan berbagai cara berdasarkan format pesan. Jika pesan adalah objek JSON yang valid, konektor akan mengekstrak kolom tertentu untuk pembuatan dan pemetaan pemberitahuan. Jika pesan adalah string biasa, pesan akan diproses sebagai data peristiwa mentah.

Konektor menangani pemetaan tingkat keparahan, pembuatan template nama pemberitahuan, dan pembuatan ID unik berdasarkan parameter yang Anda berikan.

Pemetaan tingkat keparahan JSON

Untuk memetakan tingkat keparahan pemberitahuan, Anda perlu menentukan kolom yang digunakan oleh Apache Kafka - Messages Connector untuk mendapatkan nilai tingkat keparahan dalam parameter Severity Mapping JSON. Respons konektor dapat berisi jenis nilai, seperti integer, float, dan string.

Apache Kafka - Messages Connector membaca nilai integer dan float serta memetakannya sesuai dengan setelan Google SecOps. Tabel berikut menunjukkan pemetaan nilai integer ke tingkat keparahan di Google SecOps:

Nilai bilangan bulat Tingkat keparahan yang dipetakan
100 Critical
Dari 80 hingga 100 High
Dari 60 hingga 80 Medium
Dari 40 hingga 60 Low
Kurang dari 40 Informational

Jika respons berisi nilai string, Pub/Sub – Messages Connector memerlukan konfigurasi tambahan.

Awalnya, nilai default muncul sebagai berikut:

{
    "Default": 60
}

Jika nilai yang diperlukan untuk pemetaan berada di kunci JSON event_severity, nilai dapat berupa sebagai berikut:

  • "Malicious"
  • "Benign"
  • "Unknown"

Untuk mengurai nilai kunci JSON event_severity dan memastikan objek JSON memiliki format yang benar, konfigurasikan parameter Severity Mapping JSON sebagai berikut:

{
    "event_severity": {
        "Malicious": 100,
        "Unknown": 60,
        "Benign": -1
    },
    "Default": 50
}

Nilai "Default" wajib diisi.

Jika ada beberapa kecocokan untuk objek JSON yang sama, Apache Kafka - Messages Connector akan memprioritaskan kunci objek JSON pertama.

Untuk menggunakan kolom yang berisi nilai integer atau float, konfigurasikan kunci dan string kosong dalam parameter Severity Mapping JSON:

{
  "Default":"60",
  "integer_field": "",
  "float_field": ""
}

Input konektor

Apache Kafka - Messages Connector memerlukan parameter berikut:

Parameter Deskripsi
Product Field Name

Wajib.

Nama kolom tempat nama produk disimpan.

Nama produk terutama memengaruhi pemetaan. Untuk menyederhanakan dan meningkatkan proses pemetaan untuk konektor, nilai default di-resolve ke nilai penggantian yang dirujuk dari kode. Input yang tidak valid untuk parameter ini akan diselesaikan ke nilai penggantian secara default.

Nilai defaultnya adalah Product Name.

Event Field Name

Wajib.

Nama kolom yang menentukan nama peristiwa (subjenis).

Nilai defaultnya adalah event_type.

Environment Field Name

Opsional.

Nama kolom tempat nama lingkungan disimpan.

Jika kolom environment tidak ada, konektor akan menggunakan nilai default.

Nilai defaultnya adalah "".

Environment Regex Pattern

Opsional.

Pola ekspresi reguler untuk dijalankan pada nilai yang ditemukan di kolom Environment Field Name. Dengan parameter ini, Anda dapat memanipulasi kolom lingkungan menggunakan logika ekspresi reguler.

Gunakan nilai default .* untuk mengambil nilai Environment Field Name mentah yang diperlukan.

Jika pola ekspresi reguler adalah null atau kosong, atau nilai lingkungan adalah null, hasil lingkungan akhir adalah lingkungan default.

Script Timeout (Seconds)

Wajib.

Batas waktu, dalam detik, untuk proses Python yang menjalankan skrip saat ini.

Nilai defaultnya adalah 180.

Kafka brokers

Wajib.

Daftar broker Kafka yang dipisahkan koma untuk dihubungkan, dalam format hostname:port.

Use TLS for connection

Opsional.

Jika dipilih, integrasi akan menggunakan enkripsi TLS untuk autentikasi.

Parameter ini memerlukan sertifikat CA.

Tidak diaktifkan secara default.

Use SASL PLAIN with TLS for connection

Opsional.

Jika dipilih, integrasi akan menggunakan mekanisme nama pengguna dan sandi SASL PLAIN untuk autentikasi.

Opsi ini mengharuskan nama pengguna dan sandi SASL diberikan. Fitur ini hanya didukung dengan enkripsi TLS, yang memerlukan sertifikat CA.

Tidak diaktifkan secara default.

CA certificate of Kafka server

Opsional.

Sertifikat CA yang digunakan untuk memverifikasi identitas server Kafka.

Client certificate

Opsional.

Sertifikat klien untuk autentikasi TLS bersama dengan server Kafka.

Client certificate key

Opsional.

Kunci pribadi yang sesuai dengan sertifikat klien, yang digunakan untuk autentikasi TLS bersama.

Client certificate key password

Opsional.

Sandi yang digunakan untuk mendekripsi kunci pribadi sertifikat klien.

SASL PLAIN Username

Opsional.

Nama pengguna untuk autentikasi SASL PLAIN dengan broker Kafka.

SASL PLAIN Password

Opsional.

Sandi untuk autentikasi SASL PLAIN dengan broker Kafka.

Topic

Wajib.

Topik Kafka tempat insiden diambil.

Consumer Group ID

Opsional.

ID grup konsumen yang digunakan saat mengambil insiden.

Jika tidak ada nilai yang diberikan, ID unik akan dibuat.

Partitions

Opsional.

Daftar partisi CSV tempat pesan akan diambil.

Initial Offset

Opsional.

Tempat konektor mulai mengambil pesan dari partisi Kafka.

Anda dapat menentukan bilangan bulat positif untuk memulai pada offset tertentu, atau menggunakan nilai earliest atau latest untuk mulai mengambil dari awal atau akhir partisi.

Poll Timeout

Opsional.

Waktu tunggu polling untuk menggunakan pesan dari Kafka, dalam detik.

Case Name Template

Opsional.

Template untuk menentukan nama kasus kustom. Konektor menambahkan kunci custom_case_name ke acara.

Anda dapat menggunakan placeholder dalam format FIELD_NAME, yang diisi dari nilai string peristiwa pertama.

Contoh: Phishing - EVENT_MAILBOX.

Alert Name Template

Wajib.

Template untuk menentukan nama pemberitahuan.

Anda dapat menggunakan placeholder dalam format FIELD_NAME, yang diisi dari nilai string peristiwa pertama.

Contoh: Phishing - EVENT_MAILBOX.

Jika nilai tidak diberikan atau template tidak valid, konektor akan menggunakan nama pemberitahuan default.

Rule Generator Template

Wajib.

Template untuk menentukan generator aturan.

Anda dapat menggunakan placeholder dalam format FIELD_NAME, yang diisi dari nilai string peristiwa pertama.

Contoh: Phishing - EVENT_MAILBOX.

Jika nilai tidak diberikan atau template tidak valid, konektor akan menggunakan nama generator aturan default.

Timestamp Field

Wajib.

Nama kolom dalam pesan Kafka yang berisi stempel waktu pemberitahuan Google SecOps.

Jika stempel waktu tidak dalam format epoch Unix, formatnya harus ditentukan dalam parameter Timestamp Format.

Timestamp Format

Opsional.

Format stempel waktu pesan, diperlukan untuk stempel waktu epoch non-Unix. Gunakan kode format strftime Python standar.

Jika stempel waktu tidak dalam format epoch Unix dan parameter ini tidak dikonfigurasi, konektor akan gagal.

Severity Mapping JSON

Wajib.

Objek JSON yang digunakan oleh konektor untuk mengekstrak dan memetakan tingkat keparahan dari pesan ke skala prioritas Google SecOps.

Nilai defaultnya adalah {"Default": "60"}.

Unique ID Field

Opsional.

Nama kolom yang akan digunakan sebagai ID pesan unik.

Jika tidak ada nilai yang diberikan, konektor akan membuat dan menggunakan hash SHA-256 dari konten pesan sebagai ID pesan.

Max Messages To Fetch

Wajib.

Jumlah maksimum pesan yang diproses konektor untuk setiap iterasi.

Nilai defaultnya adalah 100.

Disable Overflow

Opsional.

Jika dipilih, konektor akan mengabaikan mekanisme peluapan Google SecOps.

Diaktifkan secara default.

Verify SSL

Wajib.

Jika dipilih, integrasi akan memvalidasi sertifikat SSL saat terhubung ke server Apache Kafka.

Diaktifkan secara default.

Proxy Server Address

Opsional.

Alamat server proxy yang akan digunakan.

Proxy Username

Opsional.

Nama pengguna proxy untuk melakukan autentikasi.

Proxy Password

Opsional.

Sandi proxy untuk mengautentikasi.

Aturan konektor

Konektor mendukung proxy.

Notifikasi konektor

Tabel berikut menjelaskan pemetaan kolom pesan Apache Kafka ke kolom pemberitahuan Google SecOps:

Kolom Pemberitahuan Siemplify Kolom Pesan Apache Kafka
SourceSystemName Diisi oleh framework.
TicketId Nilai kolom ID unik atau hash SHA-256 pesan.
DisplayId ApacheKafka_{unique id or hash}_{connector identifier}
Name Nilai yang dihasilkan oleh Alert Name Template.
Reason T/A
Description T/A
DeviceVendor Hardcode: Apache Kafka
DeviceProduct Nilai penggantian: Message
Priority Dipetakan dari parameter Severity Mapping JSON.
RuleGenerator Nilai yang dihasilkan oleh Rule Generator Template.
SourceGroupingIdentifier T/A
StartTime Dikonversi dari Timestamp Field.
EndTime Dikonversi dari Timestamp Field.
Siemplify Alert - Extensions T/A
Siemplify Alert - Attachments T/A

Peristiwa konektor

Contoh peristiwa konektor adalah sebagai berikut:

{
  "notificationConfigName": "organizations/ORGANIZATION_ID/notificationConfigs/soar_connector_CONNECTOR_ID_toxic_notifications_config",
  "finding": {
    "name": "organizations/ORGANIZATION_ID/sources/SOURCE_ID/findings/FINDING_ID",
    "parent": "organizations/ORGANIZATION_ID/sources/SOURCE_ID",
    "resourceName": "//compute.googleapis.com/projects/PROJECT_ID/global/firewalls/FIREWALL_ID",
    "state": "ACTIVE",
    "category": "OPEN_NETBIOS_PORT",
    "externalUri": "https://console.cloud.google.com/networking/firewalls/details/default-allow-rdp?project=PROJECT_ID",
    "sourceProperties": {
      "Recommendation": "Restrict the firewall rules at: https://console.cloud.google.com/networking/firewalls/details/default-allow-rdp?project=PROJECT_ID",
      "ExceptionInstructions": "Add the security mark \"allow_open_netbios_port\" to the asset with a value of \"true\" to prevent this finding from being activated again.",
      "Explanation": "Firewall rules that allow connections from all IP addresses on TCP ports 137-139 or UDP ports 137-139 may expose NetBIOS services to attackers.",
      "ScannerName": "FIREWALL_SCANNER",
      "ResourcePath": [
        "projects/PROJECT_ID/",
        "folders/FOLDER_ID_1/",
        "folders/FOLDER_ID_2/",
        "organizations/ORGANIZATION_ID/"
      ],
      "ExposedService": "NetBIOS",
      "OpenPorts": {
        "TCP": [
          137.0,
          138.0,
          139.0
        ],
        "UDP": [
          137.0,
          138.0,
          139.0
        ]
      },
      "compliance_standards": {
        "iso": [
          {
            "ids": [
              "A.13.1.1"
            ]
          }
        ],
        "pci": [
          {
            "ids": [
              "1.2.1"
            ]
          }
        ],
        "nist": [
          {
            "ids": [
              "SC-7"
            ]
          }
        ]
      },
      "ReactivationCount": 4.0
    },
    "securityMarks": {
      "name": "organizations/ORGANIZATION_ID/sources/SOURCE_ID/findings/FINDING_ID/securityMarks",
      "marks": {
        "USER_ID": "SECURITY_MARK"
      }
    },
    "eventTime": "2024-08-30T14:44:37.973090Z",
    "createTime": "2024-06-24T07:08:54.777Z",
    "propertyDataTypes": {
      "ResourcePath": {
        "listValues": {
          "propertyDataTypes": [
            {
              "primitiveDataType": "STRING"
            }
          ]
        }
      },
      "ReactivationCount": {
        "primitiveDataType": "NUMBER"
      },
      "Explanation": {
        "primitiveDataType": "STRING"
      },
      "ExposedService": {
        "primitiveDataType": "STRING"
      },
      "ScannerName": {
        "primitiveDataType": "STRING"
      }
    }
  }
}

Perlu bantuan lain? Dapatkan jawaban dari anggota Komunitas dan profesional Google SecOps.