Mengumpulkan log Apache Hadoop
Dokumen ini menjelaskan cara menyerap log Apache Hadoop ke Google Security Operations menggunakan Bindplane. Parser terlebih dahulu mengekstrak kolom dari log Hadoop mentah menggunakan pola Grok berdasarkan format log Hadoop umum. Kemudian, kolom yang diekstrak dipetakan ke kolom yang sesuai dalam skema Model Data Terpadu (UDM), melakukan konversi jenis data, dan memperkaya data dengan konteks tambahan.
Sebelum memulai
Pastikan Anda memiliki prasyarat berikut:
- Instance Google SecOps
- Host Windows 2016 atau yang lebih baru atau Linux dengan
systemd - Jika beroperasi dari balik proxy, pastikan port firewall terbuka sesuai dengan persyaratan agen Bindplane
- Akses istimewa ke file konfigurasi cluster Apache Hadoop
Mendapatkan file autentikasi penyerapan Google SecOps
- Login ke konsol Google SecOps.
- Buka Setelan SIEM > Agen Pengumpulan.
- Download File Autentikasi Penyerapan. Simpan file dengan aman di sistem tempat BindPlane akan diinstal.
Mendapatkan ID pelanggan Google SecOps
- Login ke konsol Google SecOps.
- Buka Setelan SIEM > Profil.
- Salin dan simpan ID Pelanggan dari bagian Detail Organisasi.
Menginstal agen Bindplane
Instal agen Bindplane di sistem operasi Windows atau Linux Anda sesuai dengan petunjuk berikut.
Penginstalan Windows
- Buka Command Prompt atau PowerShell sebagai administrator.
Jalankan perintah berikut:
msiexec /i "https://github.com/observIQ/bindplane-agent/releases/latest/download/observiq-otel-collector.msi" /quiet
Penginstalan Linux
- Buka terminal dengan hak istimewa root atau sudo.
Jalankan perintah berikut:
sudo sh -c "$(curl -fsSlL https://github.com/observiq/bindplane-agent/releases/latest/download/install_unix.sh)" install_unix.sh
Referensi penginstalan tambahan
- Untuk opsi penginstalan tambahan, lihat panduan penginstalan ini.
Mengonfigurasi agen BindPlane untuk menyerap Syslog dan mengirimkannya ke Google SecOps
Akses file konfigurasi:
- Cari file
config.yaml. Biasanya, file ini berada di direktori/etc/bindplane-agent/di Linux atau di direktori penginstalan di Windows. - Buka file menggunakan editor teks (misalnya,
nano,vi, atau Notepad).
- Cari file
Edit file
config.yamlsebagai berikut:receivers: udplog: # Replace the port and IP address as required listen_address: "0.0.0.0:514" exporters: chronicle/chronicle_w_labels: compression: gzip # Adjust the path to the credentials file you downloaded in Step 1 creds_file_path: '/path/to/ingestion-authentication-file.json' # Replace with your actual customer ID from Step 2 customer_id: <CUSTOMER_ID> endpoint: malachiteingestion-pa.googleapis.com # Add optional ingestion labels for better organization log_type: 'HADOOP' raw_log_field: body ingestion_labels: service: pipelines: logs/source0__chronicle_w_labels-0: receivers: - udplog exporters: - chronicle/chronicle_w_labels- Ganti port dan alamat IP sesuai kebutuhan di infrastruktur Anda.
- Ganti
<CUSTOMER_ID>dengan ID pelanggan yang sebenarnya. - Ganti
/path/to/ingestion-authentication-file.jsondengan jalur tempat file autentikasi disimpan di bagian Mendapatkan file autentikasi penyerapan Google SecOps.
Mulai ulang agen Bindplane untuk menerapkan perubahan
Untuk memulai ulang agen Bindplane di Linux, jalankan perintah berikut:
sudo systemctl restart bindplane-agentUntuk memulai ulang agen Bindplane di Windows, Anda dapat menggunakan konsol Services atau memasukkan perintah berikut:
net stop BindPlaneAgent && net start BindPlaneAgent
Mengonfigurasi penerusan Syslog di Apache Hadoop
Apache Hadoop menggunakan Log4j untuk logging. Konfigurasi Syslog appender yang sesuai berdasarkan versi Log4j Anda sehingga daemon Hadoop (NameNode, DataNode, ResourceManager, NodeManager, dll.) meneruskan log langsung ke penerima syslog Anda (host Bindplane). Log4j dikonfigurasi melalui file (tanpa UI web).
Opsi 1: Konfigurasi Log4j 1.x
- Temukan file log4j.properties (biasanya di
$HADOOP_CONF_DIR/log4j.properties). Tambahkan konfigurasi SyslogAppender berikut ke file:
# Syslog appender (UDP example) log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender log4j.appender.SYSLOG.SyslogHost=<BINDPLANE_HOST_IP>:514 log4j.appender.SYSLOG.Facility=LOCAL0 log4j.appender.SYSLOG.FacilityPrinting=true log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout log4j.appender.SYSLOG.layout.ConversionPattern=%d{ISO8601} level=%p logger=%c thread=%t msg=%m%n # Example: send NameNode logs to syslog log4j.logger.org.apache.hadoop.hdfs.server.namenode=INFO,SYSLOG log4j.additivity.org.apache.hadoop.hdfs.server.namenode=false # Or attach to root logger to send all Hadoop logs # log4j.rootLogger=INFO, SYSLOGGanti
<BINDPLANE_HOST_IP>dengan alamat IP host Bindplane Anda.Simpan file.
Mulai ulang daemon Hadoop untuk menerapkan perubahan konfigurasi.
Opsi 2: Konfigurasi Log4j 2.x
- Temukan file log4j2.xml (biasanya di
$HADOOP_CONF_DIR/log4j2.xml). Tambahkan konfigurasi appender Syslog berikut ke file:
<Configuration status="WARN"> <Appenders> <!-- UDP example; for TCP use protocol="TCP" --> <Syslog name="SYSLOG" format="RFC5424" host="<BINDPLANE_HOST_IP>" port="514" protocol="UDP" facility="LOCAL0" appName="hadoop" enterpriseNumber="18060" mdcId="mdc"> <PatternLayout pattern="%d{ISO8601} level=%p logger=%c thread=%t msg=%m %X%n"/> </Syslog> </Appenders> <Loggers> <!-- Send NameNode logs to syslog --> <Logger name="org.apache.hadoop.hdfs.server.namenode" level="info" additivity="false"> <AppenderRef ref="SYSLOG"/> </Logger> <!-- Or send all Hadoop logs --> <Root level="info"> <AppenderRef ref="SYSLOG"/> </Root> </Loggers> </Configuration>- Ganti
<BINDPLANE_HOST_IP>dengan alamat IP host Bindplane Anda.
- Ganti
Simpan file.
Mulai ulang daemon Hadoop untuk menerapkan perubahan konfigurasi.
Tabel Pemetaan UDM
| Kolom Log | Pemetaan UDM | Logika |
|---|---|---|
| diizinkan | security_result.action | Jika "false", tindakan adalah "BLOCK". Jika "true", tindakan adalah "IZINKAN". |
| auth_type | additional.fields.key = "auth_type", additional.fields.value.string_value | Diekstrak dari kolom "ugi" menggunakan pola grok "%{DATA:suser}@.*auth:%{WORD:auth_type}". Tanda kurung dan "auth:" dihapus. |
| call | additional.fields.key = "Call#", additional.fields.value.string_value | Dipetakan secara langsung. |
| call_context | additional.fields.key = "callerContext", additional.fields.value.string_value | Dipetakan secara langsung. |
| cliIP | principal.ip | Dipetakan hanya jika kolom "json_data" ada dan berhasil diuraikan sebagai JSON. |
| cmd | principal.process.command_line | Dipetakan secara langsung. |
| cluster_name | target.hostname | Digunakan sebagai nama host target jika ada. |
| hari | metadata.event_timestamp.seconds | Digunakan dengan bulan, tahun, jam, menit, dan detik untuk membuat event_timestamp. |
| deskripsi | metadata.description | Dipetakan secara langsung. |
| pengemudi | additional.fields.key = "driver", additional.fields.value.string_value | Dipetakan secara langsung. |
| dst | target.ip OR target.hostname OR target.file.full_path | Jika berhasil diuraikan sebagai IP, dipetakan ke IP target. Jika nilai dimulai dengan "/user", dipetakan ke jalur file target. Jika tidak, dipetakan ke nama host target. |
| dstport | target.port | Dipetakan dan dikonversi langsung ke bilangan bulat. |
| penegak | security_result.rule_name | Dipetakan secara langsung. |
| event_count | additional.fields.key = "event_count", additional.fields.value.string_value | Dipetakan dan dikonversi langsung ke string. |
| fname | src.file.full_path | Dipetakan secara langsung. |
| jam | metadata.event_timestamp.seconds | Digunakan dengan bulan, hari, tahun, menit, dan detik untuk membuat event_timestamp. |
| id | additional.fields.key = "id", additional.fields.value.string_value | Dipetakan secara langsung. |
| ip | principal.ip | Dipetakan ke IP utama setelah menghapus karakter "/" di awal. |
| json_data | Diuraikan sebagai JSON. Kolom yang diekstrak dipetakan ke kolom UDM yang sesuai. | |
| logType | additional.fields.key = "logType", additional.fields.value.string_value | Dipetakan secara langsung. |
| pesan | Digunakan untuk mengekstrak berbagai kolom menggunakan pola grok. | |
| metode | network.http.method | Dipetakan secara langsung. |
| menit | metadata.event_timestamp.seconds | Digunakan dengan bulan, hari, tahun, jam, dan detik untuk membuat event_timestamp. |
| bulan | metadata.event_timestamp.seconds | Digunakan dengan hari, tahun, jam, menit, dan detik untuk membuat event_timestamp. |
| pengamat | observer.hostname OR observer.ip | Jika berhasil diuraikan sebagai IP, dipetakan ke IP pengamat. Jika tidak, dipetakan ke nama host pengamat. |
| perm | additional.fields.key = "perm", additional.fields.value.string_value | Dipetakan secara langsung. |
| kebijakan | security_result.rule_id | Dipetakan dan dikonversi langsung ke string. |
| produk | metadata.product_name | Dipetakan secara langsung. |
| product_event | metadata.product_event_type | Dipetakan secara langsung. Jika "rename", kolom "dst" dipetakan ke "target.file.full_path". |
| proto | network.application_protocol | Dipetakan dan dikonversi langsung menjadi huruf besar jika bukan "webhdfs". |
| alasan | security_result.summary | Dipetakan secara langsung. |
| repo | additional.fields.key = "repo", additional.fields.value.string_value | Dipetakan secara langsung. |
| resType | additional.fields.key = "resType", additional.fields.value.string_value | Dipetakan secara langsung. |
| hasil | additional.fields.key = "result", additional.fields.value.string_value | Dipetakan dan dikonversi langsung ke string. |
| Coba lagi | additional.fields.key = "Retry#", additional.fields.value.string_value | Dipetakan secara langsung. |
| detik | metadata.event_timestamp.seconds | Digunakan dengan bulan, hari, tahun, jam, dan menit untuk membuat event_timestamp. |
| seq_num | additional.fields.key = "seq_num", additional.fields.value.string_value | Dipetakan dan dikonversi langsung ke string. |
| tingkat keseriusan, | security_result.severity | Dipetakan ke tingkat keparahan yang berbeda berdasarkan nilai: "INFO", "Info", "info" -> "INFORMATIONAL"; "Low", "low", "LOW" -> "LOW"; "error", "Error", "WARN", "Warn" -> "MEDIUM"; "High", "high", "HIGH" -> "HIGH"; "Critical", "critical", "CRITICAL" -> "CRITICAL". |
| shost | principal.hostname | Digunakan sebagai nama host utama jika berbeda dari "src". |
| src | principal.ip OR principal.hostname OR observer.ip | Jika berhasil diuraikan sebagai IP, dipetakan ke IP akun utama dan pengamat. Jika tidak, dipetakan ke nama host utama. |
| srcport | principal.port | Dipetakan dan dikonversi langsung ke bilangan bulat. |
| ringkasan | security_result.summary | Dipetakan secara langsung. |
| suser | principal.user.userid | Dipetakan secara langsung. |
| tags | additional.fields.key = "tags", additional.fields.value.string_value | Dipetakan secara langsung. |
| thread | additional.fields.key = "thread", additional.fields.value.string_value | Dipetakan secara langsung. |
| tips | target.ip | Dipetakan secara langsung. |
| ugi | target.hostname | Digunakan sebagai nama host target jika kolom "log_data" tidak berisi "·". |
| url | target.url | Dipetakan secara langsung. |
| vendor | metadata.vendor_name | Dipetakan secara langsung. |
| versi | metadata.product_version | Dipetakan secara langsung. |
| tahun | metadata.event_timestamp.seconds | Digunakan dengan bulan, hari, jam, menit, dan detik untuk membuat event_timestamp. |
| T/A | metadata.event_type | Disetel ke "NETWORK_CONNECTION" secara default. Diubah menjadi "STATUS_UPDATE" jika tidak ada target yang diidentifikasi. |
| T/A | metadata.log_type | Tetapkan ke "HADOOP". |
| T/A | security_result.alert_state | Disetel ke "ALERTING" jika tingkat keparahan adalah "HIGH" atau "CRITICAL". |
| T/A | is_alert | Setel ke "true" jika tingkat keparahan adalah "TINGGI" atau "KRITIS". |
| T/A | is_significant | Setel ke "true" jika tingkat keparahan adalah "TINGGI" atau "KRITIS". |
Perlu bantuan lain? Dapatkan jawaban dari anggota Komunitas dan profesional Google SecOps.