Log audit ServiceNow
Dokumen ini menjelaskan cara menyerap log audit ServiceNow ke Google Security Operations menggunakan beberapa metode.
Opsi A: GCS dengan fungsi Cloud Run
Metode ini menggunakan fungsi Cloud Run untuk secara berkala membuat kueri ServiceNow REST API untuk mendapatkan log audit dan menyimpannya di bucket GCS. Kemudian, Google Security Operations mengumpulkan log dari bucket GCS.
Sebelum memulai
Pastikan Anda memiliki prasyarat berikut:
- Instance Google SecOps
- Akses istimewa ke tenant atau API ServiceNow dengan peran yang sesuai (biasanya
adminatau pengguna dengan akses baca ke tabel sys_audit) - Project GCP dengan Cloud Storage API diaktifkan
- Izin untuk membuat dan mengelola bucket GCS
- Izin untuk mengelola kebijakan IAM di bucket GCS
- Izin untuk membuat layanan Cloud Run, topik Pub/Sub, dan tugas Cloud Scheduler
Kumpulkan prasyarat ServiceNow (ID, kunci API, ID organisasi, token)
- Login ke Konsol Admin ServiceNow.
- Buka System Security > Users and Groups > Users.
- Buat pengguna baru atau pilih pengguna yang sudah ada dengan izin yang sesuai untuk mengakses log audit.
Salin dan simpan detail berikut di lokasi yang aman:
- Username
- Password
- URL Instance (misalnya,
https://instance.service-now.com)
Mengonfigurasi ACL untuk pengguna non-admin
Jika ingin menggunakan akun pengguna non-admin, Anda harus membuat Daftar Kontrol Akses (ACL) kustom untuk memberikan akses baca ke tabel sys_audit:
- Login ke Konsol Admin ServiceNow sebagai administrator.
- Buka System Security > Access Control (ACL).
- Klik Baru.
- Berikan detail konfigurasi berikut:
- Jenis: Pilih kumpulan data.
- Operation: Pilih read.
- Nama: Masukkan
sys_audit. - Deskripsi: Masukkan
Allow read access to sys_audit table for Chronicle integration.
- Di kolom Requires role, tambahkan peran yang ditetapkan ke pengguna integrasi Anda (misalnya,
chronicle_reader). - Klik Kirim.
- Verifikasi bahwa ACL aktif dan pengguna dapat membuat kueri tabel sys_audit.
Membuat bucket Google Cloud Storage
- Buka Google Cloud Console.
- Pilih project Anda atau buat project baru.
- Di menu navigasi, buka Cloud Storage > Buckets.
- Klik Create bucket.
Berikan detail konfigurasi berikut:
Setelan Nilai Beri nama bucket Anda Masukkan nama yang unik secara global (misalnya, servicenow-audit-logs)Location type Pilih berdasarkan kebutuhan Anda (Region, Dual-region, Multi-region) Lokasi Pilih lokasi (misalnya, us-central1)Kelas penyimpanan Standar (direkomendasikan untuk log yang sering diakses) Access control Seragam (direkomendasikan) Alat perlindungan Opsional: Aktifkan pembuatan versi objek atau kebijakan retensi Klik Buat.
Buat akun layanan untuk Cloud Run Function
Fungsi Cloud Run memerlukan akun layanan dengan izin untuk menulis ke bucket GCS dan dipanggil oleh Pub/Sub.
Membuat akun layanan
- Di GCP Console, buka IAM & Admin > Service Accounts.
- Klik Create Service Account.
- Berikan detail konfigurasi berikut:
- Nama akun layanan: Masukkan
servicenow-audit-collector-sa. - Deskripsi akun layanan: Masukkan
Service account for Cloud Run function to collect ServiceNow audit logs.
- Nama akun layanan: Masukkan
- Klik Create and Continue.
- Di bagian Berikan akun layanan ini akses ke project, tambahkan peran berikut:
- Klik Pilih peran.
- Telusuri dan pilih Storage Object Admin.
- Klik + Add another role.
- Telusuri dan pilih Cloud Run Invoker.
- Klik + Add another role.
- Telusuri dan pilih Cloud Functions Invoker.
- Klik Lanjutkan.
- Klik Selesai.
Peran ini diperlukan untuk:
- Storage Object Admin: Menulis log ke bucket GCS dan mengelola file status
- Cloud Run Invoker: Mengizinkan Pub/Sub memanggil fungsi
- Cloud Functions Invoker: Mengizinkan pemanggilan fungsi
Memberikan izin IAM pada bucket GCS
Beri akun layanan izin tulis di bucket GCS:
- Buka Cloud Storage > Buckets.
- Klik nama bucket Anda.
- Buka tab Izin.
- Klik Grant access.
- Berikan detail konfigurasi berikut:
- Tambahkan prinsipal: Masukkan email akun layanan (misalnya,
servicenow-audit-collector-sa@PROJECT_ID.iam.gserviceaccount.com). - Tetapkan peran: Pilih Storage Object Admin.
- Tambahkan prinsipal: Masukkan email akun layanan (misalnya,
- Klik Simpan.
Membuat topik Pub/Sub
Buat topik Pub/Sub yang akan dipublikasikan oleh Cloud Scheduler dan akan dilanggan oleh fungsi Cloud Run.
- Di GCP Console, buka Pub/Sub > Topics.
- Klik Create topic.
- Berikan detail konfigurasi berikut:
- ID Topik: Masukkan
servicenow-audit-trigger. - Biarkan setelan lainnya tetap default.
- ID Topik: Masukkan
- Klik Buat.
Membuat fungsi Cloud Run untuk mengumpulkan log
Fungsi Cloud Run dipicu oleh pesan Pub/Sub dari Cloud Scheduler untuk mengambil log dari ServiceNow API dan menuliskannya ke GCS.
- Di GCP Console, buka Cloud Run.
- Klik Create service.
- Pilih Function (gunakan editor inline untuk membuat fungsi).
Di bagian Konfigurasi, berikan detail konfigurasi berikut:
Setelan Nilai Nama layanan servicenow-audit-collectorWilayah Pilih region yang cocok dengan bucket GCS Anda (misalnya, us-central1)Runtime Pilih Python 3.12 atau yang lebih baru Di bagian Pemicu (opsional):
- Klik + Tambahkan pemicu.
- Pilih Cloud Pub/Sub.
- Di Select a Cloud Pub/Sub topic, pilih topik Pub/Sub (
servicenow-audit-trigger). - Klik Simpan.
Di bagian Authentication:
- Pilih Wajibkan autentikasi.
- Periksa Identity and Access Management (IAM).
Scroll ke bawah dan luaskan Containers, Networking, Security.
Buka tab Security:
- Akun layanan: Pilih akun layanan (
servicenow-audit-collector-sa).
- Akun layanan: Pilih akun layanan (
Buka tab Containers:
- Klik Variables & Secrets.
- Klik + Tambahkan variabel untuk setiap variabel lingkungan:
Nama Variabel Nilai Contoh Deskripsi GCS_BUCKETservicenow-audit-logsNama bucket GCS GCS_PREFIXaudit-logsAwalan untuk file log STATE_KEYaudit-logs/state.jsonJalur file status API_BASE_URLhttps://instance.service-now.comURL instance ServiceNow API_USERNAMEyour-usernameNama pengguna ServiceNow API_PASSWORDyour-passwordSandi ServiceNow PAGE_SIZE1000Data per halaman MAX_PAGES1000Jumlah maksimum halaman yang akan diambil Di bagian Variables & Secrets, scroll ke bawah ke Requests:
- Waktu tunggu permintaan: Masukkan
600detik (10 menit).
- Waktu tunggu permintaan: Masukkan
Buka tab Setelan:
- Di bagian Materi:
- Memori: Pilih 512 MiB atau yang lebih tinggi.
- CPU: Pilih 1.
- Di bagian Materi:
Di bagian Penskalaan revisi:
- Jumlah minimum instance: Masukkan
0. - Jumlah maksimum instance: Masukkan
100(atau sesuaikan berdasarkan perkiraan beban).
- Jumlah minimum instance: Masukkan
Klik Buat.
Tunggu hingga layanan dibuat (1-2 menit).
Setelah layanan dibuat, editor kode inline akan terbuka secara otomatis.
Menambahkan kode fungsi
- Masukkan main di kolom Entry point.
Di editor kode inline, buat dua file:
File pertama: main.py:
import functions_framework from google.cloud import storage import json import os import urllib3 from datetime import datetime, timezone, timedelta import time import base64 # Initialize HTTP client with timeouts http = urllib3.PoolManager( timeout=urllib3.Timeout(connect=5.0, read=30.0), retries=False, ) # Initialize Storage client storage_client = storage.Client() # Environment variables GCS_BUCKET = os.environ.get('GCS_BUCKET') GCS_PREFIX = os.environ.get('GCS_PREFIX', 'audit-logs') STATE_KEY = os.environ.get('STATE_KEY', 'audit-logs/state.json') API_BASE = os.environ.get('API_BASE_URL') USERNAME = os.environ.get('API_USERNAME') PASSWORD = os.environ.get('API_PASSWORD') PAGE_SIZE = int(os.environ.get('PAGE_SIZE', '1000')) MAX_PAGES = int(os.environ.get('MAX_PAGES', '1000')) def parse_datetime(value: str) -> datetime: """Parse ServiceNow datetime string to datetime object.""" # ServiceNow format: YYYY-MM-DD HH:MM:SS try: return datetime.strptime(value, '%Y-%m-%d %H:%M:%S').replace(tzinfo=timezone.utc) except ValueError: # Try ISO format as fallback if value.endswith("Z"): value = value[:-1] + "+00:00" return datetime.fromisoformat(value) @functions_framework.cloud_event def main(cloud_event): """ Cloud Run function triggered by Pub/Sub to fetch ServiceNow audit logs and write to GCS. Args: cloud_event: CloudEvent object containing Pub/Sub message """ if not all([GCS_BUCKET, API_BASE, USERNAME, PASSWORD]): print('Error: Missing required environment variables') return try: # Get GCS bucket bucket = storage_client.bucket(GCS_BUCKET) # Load state state = load_state(bucket, STATE_KEY) # Determine time window now = datetime.now(timezone.utc) last_time = None if isinstance(state, dict) and state.get("last_event_time"): try: last_time = parse_datetime(state["last_event_time"]) # Overlap by 2 minutes to catch any delayed events last_time = last_time - timedelta(minutes=2) except Exception as e: print(f"Warning: Could not parse last_event_time: {e}") if last_time is None: last_time = now - timedelta(hours=24) print(f"Fetching logs from {last_time.strftime('%Y-%m-%d %H:%M:%S')} to {now.strftime('%Y-%m-%d %H:%M:%S')}") # Fetch logs records, newest_event_time = fetch_logs( api_base=API_BASE, username=USERNAME, password=PASSWORD, start_time=last_time, end_time=now, page_size=PAGE_SIZE, max_pages=MAX_PAGES, ) if not records: print("No new log records found.") save_state(bucket, STATE_KEY, now.strftime('%Y-%m-%d %H:%M:%S')) return # Write to GCS as NDJSON timestamp = now.strftime('%Y%m%d_%H%M%S') object_key = f"{GCS_PREFIX}/logs_{timestamp}.ndjson" blob = bucket.blob(object_key) ndjson = '\n'.join([json.dumps(record, ensure_ascii=False) for record in records]) + '\n' blob.upload_from_string(ndjson, content_type='application/x-ndjson') print(f"Wrote {len(records)} records to gs://{GCS_BUCKET}/{object_key}") # Update state with newest event time if newest_event_time: save_state(bucket, STATE_KEY, newest_event_time) else: save_state(bucket, STATE_KEY, now.strftime('%Y-%m-%d %H:%M:%S')) print(f"Successfully processed {len(records)} records") except Exception as e: print(f'Error processing logs: {str(e)}') raise def load_state(bucket, key): """Load state from GCS.""" try: blob = bucket.blob(key) if blob.exists(): state_data = blob.download_as_text() return json.loads(state_data) except Exception as e: print(f"Warning: Could not load state: {e}") return {} def save_state(bucket, key, last_event_time: str): """Save the last event timestamp to GCS state file.""" try: state = {'last_event_time': last_event_time} blob = bucket.blob(key) blob.upload_from_string( json.dumps(state, indent=2), content_type='application/json' ) print(f"Saved state: last_event_time={last_event_time}") except Exception as e: print(f"Warning: Could not save state: {e}") def fetch_logs(api_base: str, username: str, password: str, start_time: datetime, end_time: datetime, page_size: int, max_pages: int): """ Fetch logs from ServiceNow sys_audit table with pagination and rate limiting. Args: api_base: ServiceNow instance URL username: ServiceNow username password: ServiceNow password start_time: Start time for log query end_time: End time for log query page_size: Number of records per page max_pages: Maximum total pages to fetch Returns: Tuple of (records list, newest_event_time string) """ # Clean up base URL base_url = api_base.rstrip('/') endpoint = f"{base_url}/api/now/table/sys_audit" # Encode credentials using UTF-8 auth_string = f"{username}:{password}" auth_bytes = auth_string.encode('utf-8') auth_b64 = base64.b64encode(auth_bytes).decode('utf-8') headers = { 'Authorization': f'Basic {auth_b64}', 'Accept': 'application/json', 'Content-Type': 'application/json', 'User-Agent': 'GoogleSecOps-ServiceNowCollector/1.0' } records = [] newest_time = None page_num = 0 backoff = 1.0 offset = 0 # Format timestamps for ServiceNow (YYYY-MM-DD HH:MM:SS) start_time_str = start_time.strftime('%Y-%m-%d %H:%M:%S') while True: page_num += 1 if len(records) >= page_size * max_pages: print(f"Reached max_pages limit ({max_pages})") break # Build query parameters # Use >= operator for sys_created_on field (on or after) params = [] params.append(f"sysparm_query=sys_created_on>={start_time_str}") params.append(f"sysparm_display_value=true") params.append(f"sysparm_limit={page_size}") params.append(f"sysparm_offset={offset}") url = f"{endpoint}?{'&'.join(params)}" try: response = http.request('GET', url, headers=headers) # Handle rate limiting with exponential backoff if response.status == 429: retry_after = int(response.headers.get('Retry-After', str(int(backoff)))) print(f"Rate limited (429). Retrying after {retry_after}s...") time.sleep(retry_after) backoff = min(backoff * 2, 30.0) continue backoff = 1.0 if response.status != 200: print(f"HTTP Error: {response.status}") response_text = response.data.decode('utf-8') print(f"Response body: {response_text}") return [], None data = json.loads(response.data.decode('utf-8')) page_results = data.get('result', []) if not page_results: print(f"No more results (empty page)") break print(f"Page {page_num}: Retrieved {len(page_results)} events") records.extend(page_results) # Track newest event time for event in page_results: try: event_time = event.get('sys_created_on') if event_time: if newest_time is None or parse_datetime(event_time) > parse_datetime(newest_time): newest_time = event_time except Exception as e: print(f"Warning: Could not parse event time: {e}") # Check for more results if len(page_results) < page_size: print(f"Reached last page (size={len(page_results)} < limit={page_size})") break # Move to next page offset += page_size # Small delay to avoid rate limiting time.sleep(0.1) except Exception as e: print(f"Error fetching logs: {e}") return [], None print(f"Retrieved {len(records)} total records from {page_num} pages") return records, newest_time ```
File kedua: requirements.txt:
``` functions-framework==3.* google-cloud-storage==2.* urllib3>=2.0.0 ```
Klik Deploy untuk menyimpan dan men-deploy fungsi.
Tunggu hingga deployment selesai (2-3 menit).
Buat tugas Cloud Scheduler
Cloud Scheduler memublikasikan pesan ke topik Pub/Sub secara berkala, sehingga memicu fungsi Cloud Run.
- Di GCP Console, buka Cloud Scheduler.
- Klik Create Job.
Berikan detail konfigurasi berikut:
Setelan Nilai Nama servicenow-audit-collector-hourlyWilayah Pilih region yang sama dengan fungsi Cloud Run Frekuensi 0 * * * *(setiap jam, tepat pada waktunya)Zona waktu Pilih zona waktu (UTC direkomendasikan) Jenis target Pub/Sub Topik Pilih topik Pub/Sub ( servicenow-audit-trigger)Isi pesan {}(objek JSON kosong)Klik Buat.
Opsi frekuensi jadwal
Pilih frekuensi berdasarkan volume log dan persyaratan latensi:
Frekuensi Ekspresi Cron Kasus Penggunaan Setiap 5 menit */5 * * * *Volume tinggi, latensi rendah Setiap 15 menit */15 * * * *Volume sedang Setiap jam 0 * * * *Standar (direkomendasikan) Setiap 6 jam 0 */6 * * *Volume rendah, pemrosesan batch Harian 0 0 * * *Pengumpulan data historis
Menguji integrasi
- Di konsol Cloud Scheduler, temukan tugas Anda.
- Klik Force run untuk memicu tugas secara manual.
- Tunggu beberapa detik.
- Buka Cloud Run > Services.
- Klik nama fungsi Anda (
servicenow-audit-collector). - Klik tab Logs.
Pastikan fungsi berhasil dieksekusi. Cari hal berikut:
Fetching logs from YYYY-MM-DD HH:MM:SS to YYYY-MM-DD HH:MM:SS Page 1: Retrieved X events Wrote X records to gs://bucket-name/audit-logs/logs_YYYYMMDD_HHMMSS.ndjson Successfully processed X recordsBuka Cloud Storage > Buckets.
Klik nama bucket Anda.
Buka folder awalan (
audit-logs/).Pastikan file
.ndjsonbaru dibuat dengan stempel waktu saat ini.
Jika Anda melihat error dalam log:
- HTTP 401: Periksa kredensial API di variabel lingkungan
- HTTP 403: Verifikasi bahwa akun memiliki izin yang diperlukan (peran admin atau ACL kustom untuk sys_audit)
- HTTP 429: Pembatasan kecepatan - fungsi akan otomatis mencoba lagi dengan penundaan
- Variabel lingkungan tidak ada: Periksa apakah semua variabel yang diperlukan telah ditetapkan
Mengambil akun layanan Google SecOps
Google SecOps menggunakan akun layanan unik untuk membaca data dari bucket GCS Anda. Anda harus memberi akun layanan ini akses ke bucket Anda.
Dapatkan email akun layanan
- Buka Setelan SIEM > Feed.
- Klik Tambahkan Feed Baru.
- Klik Konfigurasi satu feed.
- Di kolom Nama feed, masukkan nama untuk feed (misalnya,
ServiceNow Audit logs). - Pilih Google Cloud Storage V2 sebagai Source type.
- Pilih ServiceNow Audit sebagai Log type.
Klik Get Service Account. Email akun layanan yang unik akan ditampilkan, misalnya:
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.comSalin alamat email ini untuk digunakan di langkah berikutnya.
Memberikan izin IAM ke akun layanan Google SecOps
Akun layanan Google SecOps memerlukan peran Storage Object Viewer di bucket GCS Anda.
- Buka Cloud Storage > Buckets.
- Klik nama bucket Anda.
- Buka tab Izin.
- Klik Grant access.
- Berikan detail konfigurasi berikut:
- Add principals: Tempel email akun layanan Google SecOps.
- Tetapkan peran: Pilih Storage Object Viewer.
Klik Simpan.
Mengonfigurasi feed di Google SecOps untuk menyerap log Audit ServiceNow
- Buka Setelan SIEM > Feed.
- Klik Tambahkan Feed Baru.
- Klik Konfigurasi satu feed.
- Di kolom Nama feed, masukkan nama untuk feed (misalnya,
ServiceNow Audit logs). - Pilih Google Cloud Storage V2 sebagai Source type.
- Pilih ServiceNow Audit sebagai Log type.
- Klik Berikutnya.
Tentukan nilai untuk parameter input berikut:
URL bucket penyimpanan: Masukkan URI bucket GCS dengan jalur awalan:
gs://servicenow-audit-logs/audit-logs/Ganti:
servicenow-audit-logs: Nama bucket GCS Anda.audit-logs: Jalur folder/awalan tempat log disimpan.
Opsi penghapusan sumber: Pilih opsi penghapusan sesuai preferensi Anda:
- Jangan pernah: Tidak pernah menghapus file apa pun setelah transfer (direkomendasikan untuk pengujian).
- Hapus file yang ditransfer: Menghapus file setelah transfer berhasil.
Hapus file yang ditransfer dan direktori kosong: Menghapus file dan direktori kosong setelah transfer berhasil.
Usia File Maksimum: Menyertakan file yang diubah dalam beberapa hari terakhir. Defaultnya adalah 180 hari.
Namespace aset: Namespace aset.
Label penyerapan: Label yang akan diterapkan ke peristiwa dari feed ini.
Klik Berikutnya.
Tinjau konfigurasi feed baru Anda di layar Selesaikan, lalu klik Kirim.
Opsi B: Agen Bindplane dengan syslog
Metode ini menggunakan agen Bindplane untuk mengumpulkan log Audit ServiceNow dan meneruskannya ke Google Security Operations. Karena ServiceNow tidak mendukung syslog secara native untuk log audit, kita akan menggunakan skrip untuk membuat kueri ServiceNow REST API dan meneruskan log ke agen Bindplane melalui syslog.
Sebelum memulai
Pastikan Anda memiliki prasyarat berikut:
- Instance Google SecOps
- Windows Server 2016 atau yang lebih baru, atau host Linux dengan
systemd - Konektivitas jaringan antara agen Bindplane dan ServiceNow
- Jika beroperasi dari balik proxy, pastikan port firewall terbuka sesuai dengan persyaratan agen Bindplane
- Akses istimewa ke konsol atau appliance pengelolaan ServiceNow dengan peran yang sesuai (biasanya
adminatau pengguna dengan akses baca ke tabel sys_audit)
Mendapatkan file autentikasi penyerapan Google SecOps
- Login ke konsol Google SecOps.
- Buka Setelan SIEM > Agen Pengumpulan.
- Klik Download untuk mendownload file autentikasi penyerapan.
Simpan file dengan aman di sistem tempat BindPlane akan diinstal.
Mendapatkan ID pelanggan Google SecOps
- Login ke konsol Google SecOps.
- Buka Setelan SIEM > Profil.
Salin dan simpan ID Pelanggan dari bagian Detail Organisasi.
Menginstal agen BindPlane
Instal agen Bindplane di sistem operasi Windows atau Linux Anda sesuai dengan petunjuk berikut.
Penginstalan Windows
- Buka Command Prompt atau PowerShell sebagai administrator.
Jalankan perintah berikut:
msiexec /i "https://github.com/observIQ/bindplane-agent/releases/latest/download/observiq-otel-collector.msi" /quietTunggu hingga penginstalan selesai.
Verifikasi penginstalan dengan menjalankan:
sc query observiq-otel-collector
Layanan akan ditampilkan sebagai RUNNING.
Penginstalan Linux
- Buka terminal dengan hak istimewa root atau sudo.
Jalankan perintah berikut:
sudo sh -c "$(curl -fsSlL https://github.com/observiq/bindplane-agent/releases/latest/download/install_unix.sh)" install_unix.shTunggu hingga penginstalan selesai.
Verifikasi penginstalan dengan menjalankan:
sudo systemctl status observiq-otel-collector
Layanan akan ditampilkan sebagai aktif (berjalan).
Referensi penginstalan tambahan
Untuk opsi penginstalan dan pemecahan masalah tambahan, lihat Panduan penginstalan agen BindPlane.
Mengonfigurasi agen BindPlane untuk menyerap syslog dan mengirimkannya ke Google SecOps
Cari file konfigurasi
Linux:
bash
sudo nano /etc/bindplane-agent/config.yaml
Windows:
cmd
notepad "C:\Program Files\observIQ OpenTelemetry Collector\config.yaml"
Edit file konfigurasi
Ganti seluruh konten config.yaml dengan konfigurasi berikut:
```yaml
receivers:
udplog:
listen_address: "0.0.0.0:514"
exporters:
chronicle/servicenow_audit:
compression: gzip
creds_file_path: '/path/to/ingestion-authentication-file.json'
customer_id: '<YOUR_CUSTOMER_ID>'
endpoint: <CUSTOMER_REGION_ENDPOINT>
log_type: 'SERVICENOW_AUDIT'
raw_log_field: body
ingestion_labels:
service: servicenow
service:
pipelines:
logs/servicenow_to_chronicle:
receivers:
- udplog
exporters:
- chronicle/servicenow_audit
```
Parameter konfigurasi
Ganti placeholder berikut:
listen_address: Alamat IP dan port yang akan diproses. Gunakan0.0.0.0:514untuk memantau semua antarmuka di port 514.creds_file_path: Jalur lengkap ke file autentikasi penyerapan:- Linux:
/etc/bindplane-agent/ingestion-auth.json - Windows:
C:\Program Files\observIQ OpenTelemetry Collector\ingestion-auth.json
- Linux:
<YOUR_CUSTOMER_ID>: ID Pelanggan dari langkah sebelumnya.<CUSTOMER_REGION_ENDPOINT>: URL endpoint regional:- Amerika Serikat:
malachiteingestion-pa.googleapis.com - Eropa:
europe-malachiteingestion-pa.googleapis.com - Asia:
asia-southeast1-malachiteingestion-pa.googleapis.com - Lihat Endpoint Regional untuk mengetahui daftar lengkapnya.
- Amerika Serikat:
Simpan file konfigurasi
Setelah mengedit, simpan file:
* Linux: Tekan Ctrl+O, lalu Enter, lalu Ctrl+X
* Windows: Klik File > Simpan
Mulai ulang agen Bindplane untuk menerapkan perubahan
Untuk memulai ulang agen Bindplane di Linux, jalankan perintah berikut:
sudo systemctl restart observiq-otel-collectorPastikan layanan sedang berjalan:
sudo systemctl status observiq-otel-collectorPeriksa log untuk mengetahui error:
sudo journalctl -u observiq-otel-collector -f
Untuk memulai ulang agen Bindplane di Windows, pilih salah satu opsi berikut:
Menggunakan Command Prompt atau PowerShell sebagai administrator:
net stop observiq-otel-collector && net start observiq-otel-collectorMenggunakan konsol Layanan:
- Tekan
Win+R, ketikservices.msc, lalu tekan Enter. - Temukan observIQ OpenTelemetry Collector.
Klik kanan, lalu pilih Mulai Ulang.
Pastikan layanan sedang berjalan:
sc query observiq-otel-collectorPeriksa log untuk mengetahui error:
type "C:\Program Files\observIQ OpenTelemetry Collector\log\collector.log"
Membuat skrip untuk meneruskan log Audit ServiceNow ke syslog
Karena ServiceNow tidak mendukung syslog secara native untuk log audit, kita akan membuat skrip yang mengkueri ServiceNow REST API dan meneruskan log ke syslog. Skrip ini dapat dijadwalkan untuk dijalankan secara berkala.
Contoh Skrip Python (Linux)
Buat file bernama
servicenow_audit_to_syslog.pydengan konten berikut:import urllib3 import json import datetime import base64 import socket import time import os # ServiceNow API details BASE_URL = 'https://instance.service-now.com' # Replace with your ServiceNow instance URL USERNAME = 'admin' # Replace with your ServiceNow username PASSWORD = 'password' # Replace with your ServiceNow password # Syslog details SYSLOG_SERVER = '127.0.0.1' # Replace with your Bindplane agent IP SYSLOG_PORT = 514 # Replace with your Bindplane agent port # State file to keep track of last run STATE_FILE = '/tmp/servicenow_audit_last_run.txt' # Pagination settings PAGE_SIZE = 1000 MAX_PAGES = 1000 def get_last_run_timestamp(): try: with open(STATE_FILE, 'r') as f: return f.read().strip() except: return '1970-01-01 00:00:00' def update_state_file(timestamp): with open(STATE_FILE, 'w') as f: f.write(timestamp) def send_to_syslog(message): sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.sendto(message.encode(), (SYSLOG_SERVER, SYSLOG_PORT)) sock.close() def get_audit_logs(last_run_timestamp): """ Query ServiceNow sys_audit table with proper pagination. Uses sys_created_on field for timestamp filtering. """ # Encode credentials using UTF-8 auth_string = f"{USERNAME}:{PASSWORD}" auth_bytes = auth_string.encode('utf-8') auth_encoded = base64.b64encode(auth_bytes).decode('utf-8') # Setup HTTP client http = urllib3.PoolManager() headers = { 'Authorization': f'Basic {auth_encoded}', 'Accept': 'application/json' } results = [] offset = 0 # Format timestamp for ServiceNow (YYYY-MM-DD HH:MM:SS format) # Convert ISO format to ServiceNow format if needed if 'T' in last_run_timestamp: last_run_timestamp = last_run_timestamp.replace('T', ' ').split('.')[0] for page in range(MAX_PAGES): # Build query with pagination # Use >= operator for sys_created_on field (on or after) query_params = ( f"sysparm_query=sys_created_on>={last_run_timestamp}" f"&sysparm_display_value=true" f"&sysparm_limit={PAGE_SIZE}" f"&sysparm_offset={offset}" ) url = f"{BASE_URL}/api/now/table/sys_audit?{query_params}" try: response = http.request('GET', url, headers=headers) if response.status == 200: data = json.loads(response.data.decode('utf-8')) chunk = data.get('result', []) results.extend(chunk) # Stop if we got fewer records than PAGE_SIZE (last page) if len(chunk) < PAGE_SIZE: break # Move to next page offset += PAGE_SIZE else: print(f"Error querying ServiceNow API: {response.status} - {response.data.decode('utf-8')}") break except Exception as e: print(f"Exception querying ServiceNow API: {str(e)}") break return results def main(): # Get last run timestamp last_run_timestamp = get_last_run_timestamp() # Current timestamp for this run current_timestamp = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') # Query ServiceNow API for audit logs audit_logs = get_audit_logs(last_run_timestamp) if audit_logs: # Send each log to syslog for log in audit_logs: # Format the log as JSON log_json = json.dumps(log) # Send to syslog send_to_syslog(log_json) # Sleep briefly to avoid flooding time.sleep(0.01) # Update state file update_state_file(current_timestamp) print(f"Successfully forwarded {len(audit_logs)} audit logs to syslog") else: print("No new audit logs to forward") if __name__ == "__main__": main()
Menyiapkan Eksekusi Terjadwal (Linux)
Setel agar skrip dapat dieksekusi:
chmod +x servicenow_audit_to_syslog.pyBuat cron job untuk menjalankan skrip setiap jam:
crontab -eTambahkan baris berikut:
0 * * * * /usr/bin/python3 /path/to/servicenow_audit_to_syslog.py >> /tmp/servicenow_audit_to_syslog.log 2>&1
Contoh Skrip PowerShell (Windows)
Buat file bernama
ServiceNow-Audit-To-Syslog.ps1dengan konten berikut:# ServiceNow API details $BaseUrl = 'https://instance.service-now.com' # Replace with your ServiceNow instance URL $Username = 'admin' # Replace with your ServiceNow username $Password = 'password' # Replace with your ServiceNow password # Syslog details $SyslogServer = '127.0.0.1' # Replace with your Bindplane agent IP $SyslogPort = 514 # Replace with your Bindplane agent port # State file to keep track of last run $StateFile = "$env:TEMP\ServiceNowAuditLastRun.txt" # Pagination settings $PageSize = 1000 $MaxPages = 1000 function Get-LastRunTimestamp { try { if (Test-Path $StateFile) { return Get-Content $StateFile } else { return '1970-01-01 00:00:00' } } catch { return '1970-01-01 00:00:00' } } function Update-StateFile { param([string]$Timestamp) Set-Content -Path $StateFile -Value $Timestamp } function Send-ToSyslog { param([string]$Message) $UdpClient = New-Object System.Net.Sockets.UdpClient $UdpClient.Connect($SyslogServer, $SyslogPort) $Encoding = [System.Text.Encoding]::ASCII $Bytes = $Encoding.GetBytes($Message) $UdpClient.Send($Bytes, $Bytes.Length) $UdpClient.Close() } function Get-AuditLogs { param([string]$LastRunTimestamp) # Create auth header using UTF-8 encoding $Auth = [System.Convert]::ToBase64String([System.Text.Encoding]::UTF8.GetBytes("${Username}:${Password}")) $Headers = @{ Authorization = "Basic ${Auth}" Accept = 'application/json' } $Results = @() $Offset = 0 # Format timestamp for ServiceNow (YYYY-MM-DD HH:MM:SS format) # Convert ISO format to ServiceNow format if needed if ($LastRunTimestamp -match 'T') { $LastRunTimestamp = $LastRunTimestamp -replace 'T', ' ' $LastRunTimestamp = $LastRunTimestamp -replace '\.\d+', '' } for ($page = 0; $page -lt $MaxPages; $page++) { # Build query with pagination # Use >= operator for sys_created_on field (on or after) $QueryParams = "sysparm_query=sys_created_on>=${LastRunTimestamp}&sysparm_display_value=true&sysparm_limit=${PageSize}&sysparm_offset=${Offset}" $Url = "${BaseUrl}/api/now/table/sys_audit?${QueryParams}" try { $Response = Invoke-RestMethod -Uri $Url -Headers $Headers -Method Get $Chunk = $Response.result $Results += $Chunk # Stop if we got fewer records than PageSize (last page) if ($Chunk.Count -lt $PageSize) { break } # Move to next page $Offset += $PageSize } catch { Write-Error "Error querying ServiceNow API: $_" break } } return $Results } # Main execution $LastRunTimestamp = Get-LastRunTimestamp $CurrentTimestamp = (Get-Date).ToString('yyyy-MM-dd HH:mm:ss') $AuditLogs = Get-AuditLogs -LastRunTimestamp $LastRunTimestamp if ($AuditLogs -and $AuditLogs.Count -gt 0) { # Send each log to syslog foreach ($Log in $AuditLogs) { # Format the log as JSON $LogJson = $Log | ConvertTo-Json -Compress # Send to syslog Send-ToSyslog -Message $LogJson # Sleep briefly to avoid flooding Start-Sleep -Milliseconds 10 } # Update state file Update-StateFile -Timestamp $CurrentTimestamp Write-Output "Successfully forwarded $($AuditLogs.Count) audit logs to syslog" } else { Write-Output "No new audit logs to forward" }
Menyiapkan Eksekusi Terjadwal (Windows)
- Buka Task Scheduler.
- Klik Create Task.
- Berikan konfigurasi berikut:
- Nama:
ServiceNowAuditToSyslog - Opsi keamanan: Jalankan apakah pengguna login atau tidak
- Nama:
- Buka tab Pemicu.
- Klik New dan tetapkan agar berjalan setiap jam.
- Buka tab Tindakan.
- Klik Baru dan tetapkan:
- Tindakan: Mulai program
- Program/skrip:
powershell.exe - Argumen:
-ExecutionPolicy Bypass -File "C:\path\to\ServiceNow-Audit-To-Syslog.ps1"
- Klik OK untuk menyimpan tugas.
Perlu bantuan lebih lanjut? Dapatkan jawaban dari anggota Komunitas dan profesional Google SecOps.