Mengumpulkan log Digital Shadows SearchLight
Dokumen ini menjelaskan cara menyerap log Digital Shadows SearchLight ke
Google Security Operations menggunakan Amazon S3. Parser mengekstrak data peristiwa keamanan dari log JSON. Plugin ini menginisialisasi kolom Model Data Terpadu (UDM), mengurai payload JSON, memetakan kolom yang relevan ke skema UDM, mengekstrak entitas seperti email dan nama host menggunakan pola grok, serta membuat objek security_result
dan metadata
dalam peristiwa UDM.
Sebelum memulai
Pastikan Anda memenuhi prasyarat berikut:
- Instance Google SecOps.
- Akses istimewa ke tenant Digital Shadows SearchLight.
- Akses istimewa ke AWS (S3, Identity and Access Management (IAM), Lambda, EventBridge).
Mengumpulkan prasyarat Digital Shadows SearchLight (ID, kunci API, ID org, token)
- Login ke Digital Shadows SearchLight Portal.
- Buka Setelan > Kredensial API.
- Buat klien API atau pasangan kunci baru.
- Salin dan simpan detail berikut di lokasi yang aman:
- Kunci API
- Rahasia API
- ID Akun
- URL Dasar API:
https://api.searchlight.app/v1
atauhttps://portal-digitalshadows.com/api/v1
Mengonfigurasi bucket AWS S3 dan IAM untuk Google SecOps
- Buat bucket Amazon S3 dengan mengikuti panduan pengguna ini: Membuat bucket
- Simpan Name dan Region bucket untuk referensi di masa mendatang (misalnya,
digital-shadows-logs
). - Buat Pengguna dengan mengikuti panduan pengguna ini: Membuat pengguna IAM.
- Pilih Pengguna yang dibuat.
- Pilih tab Kredensial keamanan.
- Klik Create Access Key di bagian Access Keys.
- Pilih Layanan pihak ketiga sebagai Kasus penggunaan.
- Klik Berikutnya.
- Opsional: Tambahkan tag deskripsi.
- Klik Create access key.
- Klik Download file .CSV untuk menyimpan Kunci Akses dan Kunci Akses Rahasia untuk referensi di masa mendatang.
- Klik Selesai.
- Pilih tab Permissions.
- Klik Tambahkan izin di bagian Kebijakan izin.
- Pilih Tambahkan izin.
- Pilih Lampirkan kebijakan secara langsung.
- Cari kebijakan AmazonS3FullAccess.
- Pilih kebijakan.
- Klik Berikutnya.
- Klik Add permissions.
Mengonfigurasi kebijakan dan peran IAM untuk upload S3
- Di konsol AWS, buka IAM > Policies.
- Klik Buat kebijakan > tab JSON.
- Salin dan tempel kebijakan berikut.
Policy JSON (ganti
digital-shadows-logs
jika Anda memasukkan nama bucket yang berbeda):{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::digital-shadows-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::digital-shadows-logs/digital-shadows-searchlight/state.json" } ] }
Klik Berikutnya > Buat kebijakan.
Buka IAM > Roles > Create role > AWS service > Lambda.
Lampirkan kebijakan yang baru dibuat.
Beri nama peran
digital-shadows-lambda-role
, lalu klik Buat peran.
Buat fungsi Lambda
- Di Konsol AWS, buka Lambda > Functions > Create function.
- Klik Buat dari awal.
Berikan detail konfigurasi berikut:
Setelan Nilai Nama digital-shadows-collector
Runtime Python 3.13 Arsitektur x86_64 Peran eksekusi digital-shadows-lambda-role
Setelah fungsi dibuat, buka tab Code, hapus stub, dan tempelkan kode berikut (
digital-shadows-collector.py
).import json import os import base64 import logging import time from datetime import datetime, timedelta, timezone from urllib.parse import urlencode import boto3 import urllib3 logger = logging.getLogger() logger.setLevel(logging.INFO) HTTP = urllib3.PoolManager(retries=False) def _basic_auth_header(key: str, secret: str) -> str: token = base64.b64encode(f"{key}:{secret}".encode("utf-8")).decode("utf-8") return f"Basic {token}" def _load_state(s3, bucket, key, default_days=30) -> str: """Return ISO8601 checkpoint (UTC).""" try: obj = s3.get_object(Bucket=bucket, Key=key) state = json.loads(obj["Body"].read().decode("utf-8")) ts = state.get("last_timestamp") if ts: return ts except s3.exceptions.NoSuchKey: pass except Exception as e: logger.warning(f"State read error: {e}") return (datetime.now(timezone.utc) - timedelta(days=default_days)).isoformat() def _save_state(s3, bucket, key, ts: str) -> None: s3.put_object( Bucket=bucket, Key=key, Body=json.dumps({"last_timestamp": ts}).encode("utf-8"), ContentType="application/json", ) def _get_json(url: str, headers: dict, params: dict, backoff_s=2, max_retries=3) -> dict: qs = f"?{urlencode(params)}" if params else "" for attempt in range(max_retries): r = HTTP.request("GET", f"{url}{qs}", headers=headers) if r.status == 200: return json.loads(r.data.decode("utf-8")) if r.status in (429, 500, 502, 503, 504): wait = backoff_s * (2 ** attempt) logger.warning(f"HTTP {r.status} from DS API, retrying in {wait}s") time.sleep(wait) continue raise RuntimeError(f"DS API error {r.status}: {r.data[:200]}") raise RuntimeError("Exceeded retry budget for DS API") def _collect(api_base, headers, path, since_ts, account_id, page_size, max_pages, time_param): items = [] for page in range(max_pages): params = { "limit": page_size, "offset": page * page_size, time_param: since_ts, } if account_id: params["account-id"] = account_id data = _get_json(f"{api_base}/{path}", headers, params) batch = data.get("items") or data.get("data") or [] if not batch: break items.extend(batch) if len(batch) < page_size: break return items def lambda_handler(event, context): # Required s3_bucket = os.environ["S3_BUCKET"] api_key = os.environ["DS_API_KEY"] api_secret = os.environ["DS_API_SECRET"] # Optional / defaults s3_prefix = os.environ.get("S3_PREFIX", "digital-shadows-searchlight/") state_key = os.environ.get("STATE_KEY", "digital-shadows-searchlight/state.json") api_base = os.environ.get("API_BASE", "https://api.searchlight.app/v1") account_id = os.environ.get("DS_ACCOUNT_ID", "") page_size = int(os.environ.get("PAGE_SIZE", "100")) max_pages = int(os.environ.get("MAX_PAGES", "10")) s3 = boto3.client("s3") last_ts = _load_state(s3, s3_bucket, state_key) logger.info(f"Checkpoint: {last_ts}") headers = { "Authorization": _basic_auth_header(api_key, api_secret), "Accept": "application/json", "User-Agent": "Chronicle-DigitalShadows-S3/1.0", } records = [] # Incidents (time filter often 'published-after' or 'updated-since' depending on tenancy) incidents = _collect(api_base, headers, "incidents", last_ts, account_id, page_size, max_pages, time_param="published-after") for incident in incidents: incident['_source_type'] = 'incident' records.extend(incidents) # Intelligence incidents (alerts) intel_incidents = _collect(api_base, headers, "intel-incidents", last_ts, account_id, page_size, max_pages, time_param="published-after") for intel in intel_incidents: intel['_source_type'] = 'intelligence_incident' records.extend(intel_incidents) # Indicators (IOCs) indicators = _collect(api_base, headers, "indicators", last_ts, account_id, page_size, max_pages, time_param="lastUpdated-after") for indicator in indicators: indicator['_source_type'] = 'ioc' records.extend(indicators) if records: # Choose newest timestamp seen in this batch newest = max( (r.get("updated") or r.get("raised") or r.get("lastUpdated") or last_ts) for r in records ) key = f"{s3_prefix}digital_shadows_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}.json" body = "\n".join(json.dumps(r, separators=(",", ":")) for r in records).encode("utf-8") s3.put_object( Bucket=s3_bucket, Key=key, Body=body, ContentType="application/x-ndjson", ) _save_state(s3, s3_bucket, state_key, newest) msg = f"Wrote {len(records)} records to s3://{s3_bucket}/{key}" else: msg = "No new records" logger.info(msg) return {"statusCode": 200, "body": msg}
Buka Configuration > Environment variables.
Klik Edit > Tambahkan variabel lingkungan baru.
Masukkan variabel lingkungan yang diberikan dalam tabel berikut, dengan mengganti nilai contoh dengan nilai Anda.
Variabel lingkungan
Kunci Nilai contoh S3_BUCKET
digital-shadows-logs
S3_PREFIX
digital-shadows-searchlight/
STATE_KEY
digital-shadows-searchlight/state.json
DS_API_KEY
<your-6-character-api-key>
DS_API_SECRET
<your-32-character-api-secret>
API_BASE
https://api.searchlight.app/v1
(atauhttps://portal-digitalshadows.com/api/v1
)DS_ACCOUNT_ID
<your-account-id>
(wajib untuk sebagian besar penyewa)PAGE_SIZE
100
MAX_PAGES
10
Setelah fungsi dibuat, tetap buka halamannya (atau buka Lambda > Functions > your-function).
Pilih tab Configuration
Di panel Konfigurasi umum, klik Edit.
Ubah Waktu Tunggu menjadi 5 menit (300 detik), lalu klik Simpan.
Membuat jadwal EventBridge
- Buka Amazon EventBridge > Scheduler > Create schedule.
- Berikan detail konfigurasi berikut:
- Jadwal berulang: Tarif (
1 hour
). - Target: fungsi Lambda Anda
digital-shadows-collector
. - Name:
digital-shadows-collector-1h
.
- Jadwal berulang: Tarif (
- Klik Buat jadwal.
(Opsional) Buat pengguna & kunci IAM hanya baca untuk Google SecOps
- Buka Konsol AWS > IAM > Pengguna.
- Klik Add users.
- Berikan detail konfigurasi berikut:
- Pengguna: Masukkan
secops-reader
. - Jenis akses: Pilih Kunci akses – Akses terprogram.
- Pengguna: Masukkan
- Klik Buat pengguna.
- Lampirkan kebijakan baca minimal (kustom): Pengguna > secops-reader > Izin > Tambahkan izin > Lampirkan kebijakan secara langsung > Buat kebijakan.
JSON:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::digital-shadows-logs/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::digital-shadows-logs" } ] }
Nama =
secops-reader-policy
.Klik Buat kebijakan > cari/pilih > Berikutnya > Tambahkan izin.
Buat kunci akses untuk
secops-reader
: Kredensial keamanan > Kunci akses.Klik Create access key.
Download
.CSV
. (Anda akan menempelkan nilai ini ke feed).
Mengonfigurasi feed di Google SecOps untuk menyerap log Digital Shadows SearchLight
- Buka Setelan SIEM > Feed.
- Klik + Tambahkan Feed Baru.
- Di kolom Nama feed, masukkan nama untuk feed (misalnya,
Digital Shadows SearchLight logs
). - Pilih Amazon S3 V2 sebagai Jenis sumber.
- Pilih Digital Shadows SearchLight sebagai Log type.
- Klik Berikutnya.
- Tentukan nilai untuk parameter input berikut:
- URI S3:
s3://digital-shadows-logs/digital-shadows-searchlight/
- Opsi penghapusan sumber: Pilih opsi penghapusan sesuai preferensi Anda.
- Usia File Maksimum: Menyertakan file yang diubah dalam jumlah hari terakhir. Defaultnya adalah 180 hari.
- ID Kunci Akses: Kunci akses pengguna dengan akses ke bucket S3.
- Kunci Akses Rahasia: Kunci rahasia pengguna dengan akses ke bucket S3.
- Namespace aset: Namespace aset.
- Label penyerapan: Label yang diterapkan ke peristiwa dari feed ini.
- URI S3:
- Klik Berikutnya.
- Tinjau konfigurasi feed baru Anda di layar Selesaikan, lalu klik Kirim.
Perlu bantuan lain? Dapatkan jawaban dari anggota Komunitas dan profesional Google SecOps.