Mengumpulkan log Apache Hadoop

Didukung di:

Dokumen ini menjelaskan cara menyerap log Apache Hadoop ke Google Security Operations menggunakan Bindplane. Parser terlebih dahulu mengekstrak kolom dari log Hadoop mentah menggunakan pola Grok berdasarkan format log Hadoop umum. Kemudian, kolom yang diekstrak dipetakan ke kolom yang sesuai dalam skema Model Data Terpadu (UDM), melakukan konversi jenis data, dan memperkaya data dengan konteks tambahan.

Sebelum memulai

Pastikan Anda memiliki prasyarat berikut:

  • Instance Google SecOps
  • Host Windows 2016 atau yang lebih baru atau Linux dengan systemd
  • Jika beroperasi dari balik proxy, pastikan port firewall terbuka sesuai dengan persyaratan agen Bindplane
  • Akses istimewa ke file konfigurasi cluster Apache Hadoop

Mendapatkan file autentikasi penyerapan Google SecOps

  1. Login ke konsol Google SecOps.
  2. Buka Setelan SIEM > Agen Pengumpulan.
  3. Download File Autentikasi Penyerapan. Simpan file dengan aman di sistem tempat BindPlane akan diinstal.

Mendapatkan ID pelanggan Google SecOps

  1. Login ke konsol Google SecOps.
  2. Buka Setelan SIEM > Profil.
  3. Salin dan simpan ID Pelanggan dari bagian Detail Organisasi.

Menginstal agen Bindplane

Instal agen Bindplane di sistem operasi Windows atau Linux Anda sesuai dengan petunjuk berikut.

Penginstalan Windows

  1. Buka Command Prompt atau PowerShell sebagai administrator.
  2. Jalankan perintah berikut:

    msiexec /i "https://github.com/observIQ/bindplane-agent/releases/latest/download/observiq-otel-collector.msi" /quiet
    

Penginstalan Linux

  1. Buka terminal dengan hak istimewa root atau sudo.
  2. Jalankan perintah berikut:

    sudo sh -c "$(curl -fsSlL https://github.com/observiq/bindplane-agent/releases/latest/download/install_unix.sh)" install_unix.sh
    

Referensi penginstalan tambahan

Mengonfigurasi agen BindPlane untuk menyerap Syslog dan mengirimkannya ke Google SecOps

  1. Akses file konfigurasi:

    1. Cari file config.yaml. Biasanya, file ini berada di direktori /etc/bindplane-agent/ di Linux atau di direktori penginstalan di Windows.
    2. Buka file menggunakan editor teks (misalnya, nano, vi, atau Notepad).
  2. Edit file config.yaml sebagai berikut:

    receivers:
      udplog:
        # Replace the port and IP address as required
        listen_address: "0.0.0.0:514"
    
    exporters:
      chronicle/chronicle_w_labels:
        compression: gzip
        # Adjust the path to the credentials file you downloaded in Step 1
        creds_file_path: '/path/to/ingestion-authentication-file.json'
        # Replace with your actual customer ID from Step 2
        customer_id: <CUSTOMER_ID>
        endpoint: malachiteingestion-pa.googleapis.com
        # Add optional ingestion labels for better organization
        log_type: 'HADOOP'
        raw_log_field: body
        ingestion_labels:
    
    service:
      pipelines:
        logs/source0__chronicle_w_labels-0:
          receivers:
            - udplog
          exporters:
            - chronicle/chronicle_w_labels
    
    • Ganti port dan alamat IP sesuai kebutuhan di infrastruktur Anda.
    • Ganti <CUSTOMER_ID> dengan ID pelanggan yang sebenarnya.
    • Ganti /path/to/ingestion-authentication-file.json dengan jalur tempat file autentikasi disimpan di bagian Mendapatkan file autentikasi penyerapan Google SecOps.

Mulai ulang agen Bindplane untuk menerapkan perubahan

  • Untuk memulai ulang agen Bindplane di Linux, jalankan perintah berikut:

    sudo systemctl restart bindplane-agent
    
  • Untuk memulai ulang agen Bindplane di Windows, Anda dapat menggunakan konsol Services atau memasukkan perintah berikut:

    net stop BindPlaneAgent && net start BindPlaneAgent
    

Mengonfigurasi penerusan Syslog di Apache Hadoop

Apache Hadoop menggunakan Log4j untuk logging. Konfigurasi Syslog appender yang sesuai berdasarkan versi Log4j Anda sehingga daemon Hadoop (NameNode, DataNode, ResourceManager, NodeManager, dll.) meneruskan log langsung ke penerima syslog Anda (host Bindplane). Log4j dikonfigurasi melalui file (tanpa UI web).

Opsi 1: Konfigurasi Log4j 1.x

  1. Temukan file log4j.properties (biasanya di $HADOOP_CONF_DIR/log4j.properties).
  2. Tambahkan konfigurasi SyslogAppender berikut ke file:

    # Syslog appender (UDP example)
    log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
    log4j.appender.SYSLOG.SyslogHost=<BINDPLANE_HOST_IP>:514
    log4j.appender.SYSLOG.Facility=LOCAL0
    log4j.appender.SYSLOG.FacilityPrinting=true
    log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
    log4j.appender.SYSLOG.layout.ConversionPattern=%d{ISO8601} level=%p logger=%c thread=%t msg=%m%n
    
    # Example: send NameNode logs to syslog
    log4j.logger.org.apache.hadoop.hdfs.server.namenode=INFO,SYSLOG
    log4j.additivity.org.apache.hadoop.hdfs.server.namenode=false
    
    # Or attach to root logger to send all Hadoop logs
    # log4j.rootLogger=INFO, SYSLOG
    
  3. Ganti <BINDPLANE_HOST_IP> dengan alamat IP host Bindplane Anda.

  4. Simpan file.

  5. Mulai ulang daemon Hadoop untuk menerapkan perubahan konfigurasi.

Opsi 2: Konfigurasi Log4j 2.x

  1. Temukan file log4j2.xml (biasanya di $HADOOP_CONF_DIR/log4j2.xml).
  2. Tambahkan konfigurasi appender Syslog berikut ke file:

    <Configuration status="WARN">
      <Appenders>
        <!-- UDP example; for TCP use protocol="TCP" -->
        <Syslog name="SYSLOG" format="RFC5424"
                host="<BINDPLANE_HOST_IP>"
                port="514"
                protocol="UDP"
                facility="LOCAL0"
                appName="hadoop"
                enterpriseNumber="18060"
                mdcId="mdc">
          <PatternLayout pattern="%d{ISO8601} level=%p logger=%c thread=%t msg=%m %X%n"/>
        </Syslog>
      </Appenders>
    
      <Loggers>
        <!-- Send NameNode logs to syslog -->
        <Logger name="org.apache.hadoop.hdfs.server.namenode" level="info" additivity="false">
          <AppenderRef ref="SYSLOG"/>
        </Logger>
    
        <!-- Or send all Hadoop logs -->
        <Root level="info">
          <AppenderRef ref="SYSLOG"/>
        </Root>
      </Loggers>
    </Configuration>
    
    • Ganti <BINDPLANE_HOST_IP> dengan alamat IP host Bindplane Anda.
  3. Simpan file.

  4. Mulai ulang daemon Hadoop untuk menerapkan perubahan konfigurasi.

Tabel Pemetaan UDM

Kolom Log Pemetaan UDM Logika
diizinkan security_result.action Jika "false", tindakan adalah "BLOCK". Jika "true", tindakan adalah "IZINKAN".
auth_type additional.fields.key = "auth_type", additional.fields.value.string_value Diekstrak dari kolom "ugi" menggunakan pola grok "%{DATA:suser}@.*auth:%{WORD:auth_type}". Tanda kurung dan "auth:" dihapus.
call additional.fields.key = "Call#", additional.fields.value.string_value Dipetakan secara langsung.
call_context additional.fields.key = "callerContext", additional.fields.value.string_value Dipetakan secara langsung.
cliIP principal.ip Dipetakan hanya jika kolom "json_data" ada dan berhasil diuraikan sebagai JSON.
cmd principal.process.command_line Dipetakan secara langsung.
cluster_name target.hostname Digunakan sebagai nama host target jika ada.
hari metadata.event_timestamp.seconds Digunakan dengan bulan, tahun, jam, menit, dan detik untuk membuat event_timestamp.
deskripsi metadata.description Dipetakan secara langsung.
pengemudi additional.fields.key = "driver", additional.fields.value.string_value Dipetakan secara langsung.
dst target.ip OR target.hostname OR target.file.full_path Jika berhasil diuraikan sebagai IP, dipetakan ke IP target. Jika nilai dimulai dengan "/user", dipetakan ke jalur file target. Jika tidak, dipetakan ke nama host target.
dstport target.port Dipetakan dan dikonversi langsung ke bilangan bulat.
penegak security_result.rule_name Dipetakan secara langsung.
event_count additional.fields.key = "event_count", additional.fields.value.string_value Dipetakan dan dikonversi langsung ke string.
fname src.file.full_path Dipetakan secara langsung.
jam metadata.event_timestamp.seconds Digunakan dengan bulan, hari, tahun, menit, dan detik untuk membuat event_timestamp.
id additional.fields.key = "id", additional.fields.value.string_value Dipetakan secara langsung.
ip principal.ip Dipetakan ke IP utama setelah menghapus karakter "/" di awal.
json_data Diuraikan sebagai JSON. Kolom yang diekstrak dipetakan ke kolom UDM yang sesuai.
logType additional.fields.key = "logType", additional.fields.value.string_value Dipetakan secara langsung.
pesan Digunakan untuk mengekstrak berbagai kolom menggunakan pola grok.
metode network.http.method Dipetakan secara langsung.
menit metadata.event_timestamp.seconds Digunakan dengan bulan, hari, tahun, jam, dan detik untuk membuat event_timestamp.
bulan metadata.event_timestamp.seconds Digunakan dengan hari, tahun, jam, menit, dan detik untuk membuat event_timestamp.
pengamat observer.hostname OR observer.ip Jika berhasil diuraikan sebagai IP, dipetakan ke IP pengamat. Jika tidak, dipetakan ke nama host pengamat.
perm additional.fields.key = "perm", additional.fields.value.string_value Dipetakan secara langsung.
kebijakan security_result.rule_id Dipetakan dan dikonversi langsung ke string.
produk metadata.product_name Dipetakan secara langsung.
product_event metadata.product_event_type Dipetakan secara langsung. Jika "rename", kolom "dst" dipetakan ke "target.file.full_path".
proto network.application_protocol Dipetakan dan dikonversi langsung menjadi huruf besar jika bukan "webhdfs".
alasan security_result.summary Dipetakan secara langsung.
repo additional.fields.key = "repo", additional.fields.value.string_value Dipetakan secara langsung.
resType additional.fields.key = "resType", additional.fields.value.string_value Dipetakan secara langsung.
hasil additional.fields.key = "result", additional.fields.value.string_value Dipetakan dan dikonversi langsung ke string.
Coba lagi additional.fields.key = "Retry#", additional.fields.value.string_value Dipetakan secara langsung.
detik metadata.event_timestamp.seconds Digunakan dengan bulan, hari, tahun, jam, dan menit untuk membuat event_timestamp.
seq_num additional.fields.key = "seq_num", additional.fields.value.string_value Dipetakan dan dikonversi langsung ke string.
tingkat keseriusan, security_result.severity Dipetakan ke tingkat keparahan yang berbeda berdasarkan nilai: "INFO", "Info", "info" -> "INFORMATIONAL"; "Low", "low", "LOW" -> "LOW"; "error", "Error", "WARN", "Warn" -> "MEDIUM"; "High", "high", "HIGH" -> "HIGH"; "Critical", "critical", "CRITICAL" -> "CRITICAL".
shost principal.hostname Digunakan sebagai nama host utama jika berbeda dari "src".
src principal.ip OR principal.hostname OR observer.ip Jika berhasil diuraikan sebagai IP, dipetakan ke IP akun utama dan pengamat. Jika tidak, dipetakan ke nama host utama.
srcport principal.port Dipetakan dan dikonversi langsung ke bilangan bulat.
ringkasan security_result.summary Dipetakan secara langsung.
suser principal.user.userid Dipetakan secara langsung.
tags additional.fields.key = "tags", additional.fields.value.string_value Dipetakan secara langsung.
thread additional.fields.key = "thread", additional.fields.value.string_value Dipetakan secara langsung.
tips target.ip Dipetakan secara langsung.
ugi target.hostname Digunakan sebagai nama host target jika kolom "log_data" tidak berisi "·".
url target.url Dipetakan secara langsung.
vendor metadata.vendor_name Dipetakan secara langsung.
versi metadata.product_version Dipetakan secara langsung.
tahun metadata.event_timestamp.seconds Digunakan dengan bulan, hari, jam, menit, dan detik untuk membuat event_timestamp.
T/A metadata.event_type Disetel ke "NETWORK_CONNECTION" secara default. Diubah menjadi "STATUS_UPDATE" jika tidak ada target yang diidentifikasi.
T/A metadata.log_type Tetapkan ke "HADOOP".
T/A security_result.alert_state Disetel ke "ALERTING" jika tingkat keparahan adalah "HIGH" atau "CRITICAL".
T/A is_alert Setel ke "true" jika tingkat keparahan adalah "TINGGI" atau "KRITIS".
T/A is_significant Setel ke "true" jika tingkat keparahan adalah "TINGGI" atau "KRITIS".

Perlu bantuan lain? Dapatkan jawaban dari anggota Komunitas dan profesional Google SecOps.