Mengumpulkan log Indikator Jejak Digital
Dokumen ini menjelaskan cara menyerap log Indikator Digital Shadows ke Google Security Operations menggunakan Amazon S3.
Digital Shadows Indicators (kini menjadi bagian dari ReliaQuest GreyMatter DRP) adalah platform perlindungan risiko digital yang terus memantau dan mendeteksi ancaman eksternal, eksposur data, dan peniruan identitas brand di seluruh open web, deep web, dark web, dan media sosial. Layanan ini menyediakan intelijen ancaman, pemberitahuan insiden, dan indikator penyusupan (IOC) untuk membantu organisasi mengidentifikasi dan memitigasi risiko digital.
Sebelum memulai
- Instance Google SecOps
- Akses istimewa ke portal Indikator Digital Shadows
- Akses istimewa ke AWS (S3, IAM)
- Langganan aktif ke Indikator Digital Shadows dengan akses API diaktifkan
Mengumpulkan kredensial Digital Shadows Indicators API
- Login ke Digital Shadows Indicators Portal di
https://portal-digitalshadows.com. - Buka Setelan > Kredensial API.
- Jika Anda belum memiliki kunci API, klik Create New API Client atau Generate API Key.
Salin dan simpan detail berikut di tempat yang aman:
- Kunci API: Kunci API 6 karakter Anda
- Rahasia API: Rahasia API 32 karakter Anda
- ID Akun: ID akun Anda (ditampilkan di portal atau diberikan oleh perwakilan Digital Shadows Anda)
- URL Dasar API:
https://api.searchlight.app/v1atauhttps://portal-digitalshadows.com/api/v1(bergantung pada region tenant Anda)
Mengonfigurasi bucket AWS S3 dan IAM untuk Google SecOps
- Buat bucket Amazon S3 dengan mengikuti panduan pengguna ini: Membuat bucket.
- Simpan Name dan Region bucket untuk referensi di masa mendatang (misalnya,
digital-shadows-logs). - Buat Pengguna dengan mengikuti panduan pengguna ini: Membuat pengguna IAM.
- Pilih Pengguna yang dibuat.
- Pilih tab Kredensial keamanan.
- Klik Create Access Key di bagian Access Keys.
- Pilih Layanan pihak ketiga sebagai Kasus penggunaan.
- Klik Berikutnya.
- Opsional: Tambahkan tag deskripsi.
- Klik Create access key.
- Klik Download file .csv untuk menyimpan Access Key dan Secret Access Key untuk referensi di masa mendatang.
- Klik Done.
- Pilih tab Izin.
- Klik Tambahkan izin di bagian Kebijakan izin.
- Pilih Tambahkan izin.
- Pilih Lampirkan kebijakan secara langsung.
- Cari kebijakan AmazonS3FullAccess.
- Pilih kebijakan.
- Klik Berikutnya.
- Klik Add permissions.
Mengonfigurasi kebijakan dan peran IAM untuk upload S3
- Di konsol AWS, buka tab IAM > Policies > Create policy > JSON.
- Salin dan tempel kebijakan di bawah.
Policy JSON (ganti
digital-shadows-logsjika Anda memasukkan nama bucket yang berbeda):{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::digital-shadows-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::digital-shadows-logs/digital-shadows/state.json" } ] }Klik Berikutnya > Buat kebijakan.
Buka IAM > Roles > Create role > AWS service > Lambda.
Lampirkan kebijakan yang baru dibuat.
Beri nama peran
DigitalShadowsLambdaRole, lalu klik Buat peran.
Buat fungsi Lambda
- Di AWS Console, buka Lambda > Functions > Create function.
- Klik Buat dari awal.
Berikan detail konfigurasi berikut:
Setelan Nilai Nama DigitalShadowsCollectorRuntime Python 3.13 Arsitektur x86_64 Peran eksekusi DigitalShadowsLambdaRoleSetelah fungsi dibuat, buka tab Code, hapus stub, dan tempelkan kode di bawah (DigitalShadowsCollector.py).
import urllib3 import json import boto3 import os import base64 import logging import time from datetime import datetime, timedelta, timezone from urllib.parse import urlencode logger = logging.getLogger() logger.setLevel(logging.INFO) HTTP = urllib3.PoolManager(retries=False) storage_client = boto3.client('s3') def _basic_auth_header(key: str, secret: str) -> str: token = base64.b64encode(f"{key}:{secret}".encode("utf-8")).decode("utf-8") return f"Basic {token}" def _load_state(bucket, key, default_days=30) -> str: """Return ISO8601 checkpoint (UTC).""" try: response = storage_client.get_object(Bucket=bucket, Key=key) state_data = response['Body'].read().decode('utf-8') state = json.loads(state_data) ts = state.get("last_timestamp") if ts: return ts except storage_client.exceptions.NoSuchKey: logger.info("No previous state found, starting from default lookback") except Exception as e: logger.warning(f"State read error: {e}") return (datetime.now(timezone.utc) - timedelta(days=default_days)).isoformat() def _save_state(bucket, key, ts: str) -> None: storage_client.put_object( Bucket=bucket, Key=key, Body=json.dumps({"last_timestamp": ts}), ContentType="application/json" ) def _get_json(url: str, headers: dict, params: dict, backoff_s=2, max_retries=3) -> dict: qs = f"?{urlencode(params)}" if params else "" for attempt in range(max_retries): r = HTTP.request("GET", f"{url}{qs}", headers=headers) if r.status == 200: return json.loads(r.data.decode("utf-8")) if r.status in (429, 500, 502, 503, 504): wait = backoff_s * (2 ** attempt) logger.warning(f"HTTP {r.status} from DS API, retrying in {wait}s") time.sleep(wait) continue raise RuntimeError(f"DS API error {r.status}: {r.data[:200]}") raise RuntimeError("Exceeded retry budget for DS API") def _collect(api_base, headers, path, since_ts, account_id, page_size, max_pages, time_param): items = [] for page in range(max_pages): params = { "limit": page_size, "offset": page * page_size, time_param: since_ts, } if account_id: params["account-id"] = account_id data = _get_json(f"{api_base}/{path}", headers, params) batch = data.get("items") or data.get("data") or [] if not batch: break items.extend(batch) if len(batch) < page_size: break return items def lambda_handler(event, context): bucket_name = os.environ["S3_BUCKET"] api_key = os.environ["DS_API_KEY"] api_secret = os.environ["DS_API_SECRET"] prefix = os.environ.get("S3_PREFIX", "digital-shadows") state_key = os.environ.get("STATE_KEY", "digital-shadows/state.json") api_base = os.environ.get("API_BASE", "https://api.searchlight.app/v1") account_id = os.environ.get("DS_ACCOUNT_ID", "") page_size = int(os.environ.get("PAGE_SIZE", "100")) max_pages = int(os.environ.get("MAX_PAGES", "10")) try: last_ts = _load_state(bucket_name, state_key) logger.info(f"Checkpoint: {last_ts}") headers = { "Authorization": _basic_auth_header(api_key, api_secret), "Accept": "application/json", "User-Agent": "Chronicle-DigitalShadows-S3/1.0", } records = [] incidents = _collect( api_base, headers, "incidents", last_ts, account_id, page_size, max_pages, time_param="published-after" ) for incident in incidents: incident['_source_type'] = 'incident' records.extend(incidents) intel_incidents = _collect( api_base, headers, "intel-incidents", last_ts, account_id, page_size, max_pages, time_param="published-after" ) for intel in intel_incidents: intel['_source_type'] = 'intelligence_incident' records.extend(intel_incidents) indicators = _collect( api_base, headers, "indicators", last_ts, account_id, page_size, max_pages, time_param="lastUpdated-after" ) for indicator in indicators: indicator['_source_type'] = 'ioc' records.extend(indicators) if records: newest = max( (r.get("updated") or r.get("raised") or r.get("lastUpdated") or last_ts) for r in records ) key = f"{prefix}/digital_shadows_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}.json" body = "\n".join(json.dumps(r, separators=(",", ":")) for r in records) storage_client.put_object( Bucket=bucket_name, Key=key, Body=body, ContentType="application/x-ndjson" ) _save_state(bucket_name, state_key, newest) msg = f"Wrote {len(records)} records to s3://{bucket_name}/{key}" else: msg = "No new records" logger.info(msg) return {'statusCode': 200, 'body': json.dumps({'message': msg, 'records': len(records) if records else 0})} except Exception as e: logger.error(f"Error processing logs: {str(e)}") raiseBuka Configuration > Environment variables > Edit > Add new environment variable.
Masukkan variabel lingkungan yang disediakan di bawah, lalu ganti dengan nilai Anda.
Variabel lingkungan
Kunci Nilai contoh S3_BUCKETdigital-shadows-logsS3_PREFIXdigital-shadows/STATE_KEYdigital-shadows/state.jsonDS_API_KEYABC123(kunci API 6 karakter Anda)DS_API_SECRETyour-32-character-api-secretAPI_BASEhttps://api.searchlight.app/v1DS_ACCOUNT_IDyour-account-idPAGE_SIZE100MAX_PAGES10Setelah fungsi dibuat, tetap buka halamannya (atau buka Lambda > Functions > DigitalShadowsCollector).
Pilih tab Configuration
Di panel General configuration, klik Edit.
Ubah Waktu tunggu menjadi 5 menit (300 detik), lalu klik Simpan.
Membuat jadwal EventBridge
- Buka Amazon EventBridge > Scheduler > Create schedule.
- Berikan detail konfigurasi berikut:
- Jadwal berulang: Tarif (
1 hour) - Target: Fungsi Lambda Anda
DigitalShadowsCollector - Nama:
DigitalShadowsCollector-1h
- Jadwal berulang: Tarif (
- Klik Buat jadwal.
Mengonfigurasi feed di Google SecOps untuk menyerap log Indikator Digital Shadows
- Buka Setelan SIEM > Feed.
- Klik Tambahkan Feed Baru.
- Di halaman berikutnya, klik Konfigurasi satu feed.
- Masukkan nama unik untuk Nama feed.
- Pilih Amazon S3 V2 sebagai Jenis sumber.
- Pilih Digital Shadows Indicators sebagai Log type.
- Klik Berikutnya, lalu klik Kirim.
Tentukan nilai untuk kolom berikut:
- URI S3:
s3://digital-shadows-logs/digital-shadows/ - Opsi penghapusan sumber: Pilih opsi penghapusan sesuai preferensi Anda
- Usia File Maksimum: Menyertakan file yang diubah dalam beberapa hari terakhir (defaultnya adalah 180 hari)
- ID Kunci Akses: Kunci akses pengguna dengan akses ke bucket S3
- Kunci Akses Rahasia: Kunci rahasia pengguna dengan akses ke bucket S3
- Namespace aset: Namespace aset
- Label penyerapan: Label yang akan diterapkan ke peristiwa dari feed ini
- URI S3:
Klik Berikutnya, lalu klik Kirim.
Tabel pemetaan UDM
| Kolom Log | Pemetaan UDM | Logika |
|---|---|---|
| nilai | entity.entity.file.md5 | Tetapkan jika type == "MD5" |
| nilai | entity.entity.file.sha1 | Tetapkan jika type == "SHA1" |
| nilai | entity.entity.file.sha256 | Tetapkan jika type == "SHA256" |
| nilai | entity.entity.hostname | Tetapkan jika type == "HOST" |
| nilai | entity.entity.ip | Nilai disalin secara langsung jika jenis == "IP" |
| nilai | entity.entity.url | Tetapkan jika type == "URL" |
| nilai | entity.entity.user.email_addresses | Nilai disalin langsung jika jenis == "EMAIL" |
| jenis | entity.metadata.entity_type | Disetel ke "DOMAIN_NAME" jika type == "HOST", "IP_ADDRESS" jika type == "IP", "URL" jika type == "URL", "USER" jika type == "EMAIL", "FILE" jika type in ["SHA1","SHA256","MD5"], atau "UNKNOWN_ENTITYTYPE" |
| lastUpdated | entity.metadata.interval.start_time | Dikonversi dari ISO8601 ke stempel waktu jika tidak kosong |
| id | entity.metadata.product_entity_id | Nilai disalin langsung jika tidak kosong |
| attributionTag.id, attributionTag.name, attributionTag.type | entity.metadata.threat.about.labels | Digabungkan dengan objek {key: tag field name, value: tag value} jika tidak kosong |
| sourceType | entity.metadata.threat.category_details | Nilai disalin secara langsung |
| entity.metadata.threat.threat_feed_name | Setel ke "Indikator" | |
| id | entity.metadata.threat.threat_id | Nilai disalin langsung jika tidak kosong |
| sourceIdentifier | entity.metadata.threat.url_back_to_product | Nilai disalin secara langsung |
| entity.metadata.product_name | Setel ke "Indikator" | |
| entity.metadata.vendor_name | Ditetapkan ke "Jejak Digital" |
Perlu bantuan lain? Dapatkan jawaban dari anggota Komunitas dan profesional Google SecOps.