Mengumpulkan log Indikator Jejak Digital

Didukung di:

Dokumen ini menjelaskan cara menyerap log Indikator Digital Shadows ke Google Security Operations menggunakan Amazon S3.

Digital Shadows Indicators (kini menjadi bagian dari ReliaQuest GreyMatter DRP) adalah platform perlindungan risiko digital yang terus memantau dan mendeteksi ancaman eksternal, eksposur data, dan peniruan identitas brand di seluruh open web, deep web, dark web, dan media sosial. Layanan ini menyediakan intelijen ancaman, pemberitahuan insiden, dan indikator penyusupan (IOC) untuk membantu organisasi mengidentifikasi dan memitigasi risiko digital.

Sebelum memulai

  • Instance Google SecOps
  • Akses istimewa ke portal Indikator Digital Shadows
  • Akses istimewa ke AWS (S3, IAM)
  • Langganan aktif ke Indikator Digital Shadows dengan akses API diaktifkan

Mengumpulkan kredensial Digital Shadows Indicators API

  1. Login ke Digital Shadows Indicators Portal di https://portal-digitalshadows.com.
  2. Buka Setelan > Kredensial API.
  3. Jika Anda belum memiliki kunci API, klik Create New API Client atau Generate API Key.
  4. Salin dan simpan detail berikut di tempat yang aman:

    • Kunci API: Kunci API 6 karakter Anda
    • Rahasia API: Rahasia API 32 karakter Anda
    • ID Akun: ID akun Anda (ditampilkan di portal atau diberikan oleh perwakilan Digital Shadows Anda)
    • URL Dasar API: https://api.searchlight.app/v1 atau https://portal-digitalshadows.com/api/v1 (bergantung pada region tenant Anda)

Mengonfigurasi bucket AWS S3 dan IAM untuk Google SecOps

  1. Buat bucket Amazon S3 dengan mengikuti panduan pengguna ini: Membuat bucket.
  2. Simpan Name dan Region bucket untuk referensi di masa mendatang (misalnya, digital-shadows-logs).
  3. Buat Pengguna dengan mengikuti panduan pengguna ini: Membuat pengguna IAM.
  4. Pilih Pengguna yang dibuat.
  5. Pilih tab Kredensial keamanan.
  6. Klik Create Access Key di bagian Access Keys.
  7. Pilih Layanan pihak ketiga sebagai Kasus penggunaan.
  8. Klik Berikutnya.
  9. Opsional: Tambahkan tag deskripsi.
  10. Klik Create access key.
  11. Klik Download file .csv untuk menyimpan Access Key dan Secret Access Key untuk referensi di masa mendatang.
  12. Klik Done.
  13. Pilih tab Izin.
  14. Klik Tambahkan izin di bagian Kebijakan izin.
  15. Pilih Tambahkan izin.
  16. Pilih Lampirkan kebijakan secara langsung.
  17. Cari kebijakan AmazonS3FullAccess.
  18. Pilih kebijakan.
  19. Klik Berikutnya.
  20. Klik Add permissions.

Mengonfigurasi kebijakan dan peran IAM untuk upload S3

  1. Di konsol AWS, buka tab IAM > Policies > Create policy > JSON.
  2. Salin dan tempel kebijakan di bawah.
  3. Policy JSON (ganti digital-shadows-logs jika Anda memasukkan nama bucket yang berbeda):

    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "AllowPutObjects",
                "Effect": "Allow",
                "Action": "s3:PutObject",
                "Resource": "arn:aws:s3:::digital-shadows-logs/*"
            },
            {
                "Sid": "AllowGetStateObject",
                "Effect": "Allow",
                "Action": "s3:GetObject",
                "Resource": "arn:aws:s3:::digital-shadows-logs/digital-shadows/state.json"
            }
        ]
    }
    
  4. Klik Berikutnya > Buat kebijakan.

  5. Buka IAM > Roles > Create role > AWS service > Lambda.

  6. Lampirkan kebijakan yang baru dibuat.

  7. Beri nama peran DigitalShadowsLambdaRole, lalu klik Buat peran.

Buat fungsi Lambda

  1. Di AWS Console, buka Lambda > Functions > Create function.
  2. Klik Buat dari awal.
  3. Berikan detail konfigurasi berikut:

    Setelan Nilai
    Nama DigitalShadowsCollector
    Runtime Python 3.13
    Arsitektur x86_64
    Peran eksekusi DigitalShadowsLambdaRole
  4. Setelah fungsi dibuat, buka tab Code, hapus stub, dan tempelkan kode di bawah (DigitalShadowsCollector.py).

    import urllib3
    import json
    import boto3
    import os
    import base64
    import logging
    import time
    from datetime import datetime, timedelta, timezone
    from urllib.parse import urlencode
    
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    
    HTTP = urllib3.PoolManager(retries=False)
    storage_client = boto3.client('s3')
    
    def _basic_auth_header(key: str, secret: str) -> str:
        token = base64.b64encode(f"{key}:{secret}".encode("utf-8")).decode("utf-8")
        return f"Basic {token}"
    
    def _load_state(bucket, key, default_days=30) -> str:
        """Return ISO8601 checkpoint (UTC)."""
        try:
            response = storage_client.get_object(Bucket=bucket, Key=key)
            state_data = response['Body'].read().decode('utf-8')
            state = json.loads(state_data)
            ts = state.get("last_timestamp")
            if ts:
                return ts
        except storage_client.exceptions.NoSuchKey:
            logger.info("No previous state found, starting from default lookback")
        except Exception as e:
            logger.warning(f"State read error: {e}")
        return (datetime.now(timezone.utc) - timedelta(days=default_days)).isoformat()
    
    def _save_state(bucket, key, ts: str) -> None:
        storage_client.put_object(
            Bucket=bucket,
            Key=key,
            Body=json.dumps({"last_timestamp": ts}),
            ContentType="application/json"
        )
    
    def _get_json(url: str, headers: dict, params: dict, backoff_s=2, max_retries=3) -> dict:
        qs = f"?{urlencode(params)}" if params else ""
        for attempt in range(max_retries):
            r = HTTP.request("GET", f"{url}{qs}", headers=headers)
            if r.status == 200:
                return json.loads(r.data.decode("utf-8"))
            if r.status in (429, 500, 502, 503, 504):
                wait = backoff_s * (2 ** attempt)
                logger.warning(f"HTTP {r.status} from DS API, retrying in {wait}s")
                time.sleep(wait)
                continue
            raise RuntimeError(f"DS API error {r.status}: {r.data[:200]}")
        raise RuntimeError("Exceeded retry budget for DS API")
    
    def _collect(api_base, headers, path, since_ts, account_id, page_size, max_pages, time_param):
        items = []
        for page in range(max_pages):
            params = {
                "limit": page_size,
                "offset": page * page_size,
                time_param: since_ts,
            }
            if account_id:
                params["account-id"] = account_id
            data = _get_json(f"{api_base}/{path}", headers, params)
            batch = data.get("items") or data.get("data") or []
            if not batch:
                break
            items.extend(batch)
            if len(batch) < page_size:
                break
        return items
    
    def lambda_handler(event, context):
        bucket_name = os.environ["S3_BUCKET"]
        api_key = os.environ["DS_API_KEY"]
        api_secret = os.environ["DS_API_SECRET"]
        prefix = os.environ.get("S3_PREFIX", "digital-shadows")
        state_key = os.environ.get("STATE_KEY", "digital-shadows/state.json")
        api_base = os.environ.get("API_BASE", "https://api.searchlight.app/v1")
        account_id = os.environ.get("DS_ACCOUNT_ID", "")
        page_size = int(os.environ.get("PAGE_SIZE", "100"))
        max_pages = int(os.environ.get("MAX_PAGES", "10"))
    
        try:
            last_ts = _load_state(bucket_name, state_key)
            logger.info(f"Checkpoint: {last_ts}")
    
            headers = {
                "Authorization": _basic_auth_header(api_key, api_secret),
                "Accept": "application/json",
                "User-Agent": "Chronicle-DigitalShadows-S3/1.0",
            }
    
            records = []
    
            incidents = _collect(
                api_base, headers, "incidents", last_ts, account_id,
                page_size, max_pages, time_param="published-after"
            )
            for incident in incidents:
                incident['_source_type'] = 'incident'
            records.extend(incidents)
    
            intel_incidents = _collect(
                api_base, headers, "intel-incidents", last_ts, account_id,
                page_size, max_pages, time_param="published-after"
            )
            for intel in intel_incidents:
                intel['_source_type'] = 'intelligence_incident'
            records.extend(intel_incidents)
    
            indicators = _collect(
                api_base, headers, "indicators", last_ts, account_id,
                page_size, max_pages, time_param="lastUpdated-after"
            )
            for indicator in indicators:
                indicator['_source_type'] = 'ioc'
            records.extend(indicators)
    
            if records:
                newest = max(
                    (r.get("updated") or r.get("raised") or r.get("lastUpdated") or last_ts)
                    for r in records
                )
                key = f"{prefix}/digital_shadows_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}.json"
                body = "\n".join(json.dumps(r, separators=(",", ":")) for r in records)
                storage_client.put_object(
                    Bucket=bucket_name,
                    Key=key,
                    Body=body,
                    ContentType="application/x-ndjson"
                )
                _save_state(bucket_name, state_key, newest)
                msg = f"Wrote {len(records)} records to s3://{bucket_name}/{key}"
            else:
                msg = "No new records"
    
            logger.info(msg)
            return {'statusCode': 200, 'body': json.dumps({'message': msg, 'records': len(records) if records else 0})}
    
        except Exception as e:
            logger.error(f"Error processing logs: {str(e)}")
            raise
    
  5. Buka Configuration > Environment variables > Edit > Add new environment variable.

  6. Masukkan variabel lingkungan yang disediakan di bawah, lalu ganti dengan nilai Anda.

    Variabel lingkungan

    Kunci Nilai contoh
    S3_BUCKET digital-shadows-logs
    S3_PREFIX digital-shadows/
    STATE_KEY digital-shadows/state.json
    DS_API_KEY ABC123 (kunci API 6 karakter Anda)
    DS_API_SECRET your-32-character-api-secret
    API_BASE https://api.searchlight.app/v1
    DS_ACCOUNT_ID your-account-id
    PAGE_SIZE 100
    MAX_PAGES 10
  7. Setelah fungsi dibuat, tetap buka halamannya (atau buka Lambda > Functions > DigitalShadowsCollector).

  8. Pilih tab Configuration

  9. Di panel General configuration, klik Edit.

  10. Ubah Waktu tunggu menjadi 5 menit (300 detik), lalu klik Simpan.

Membuat jadwal EventBridge

  1. Buka Amazon EventBridge > Scheduler > Create schedule.
  2. Berikan detail konfigurasi berikut:
    • Jadwal berulang: Tarif (1 hour)
    • Target: Fungsi Lambda Anda DigitalShadowsCollector
    • Nama: DigitalShadowsCollector-1h
  3. Klik Buat jadwal.

Mengonfigurasi feed di Google SecOps untuk menyerap log Indikator Digital Shadows

  1. Buka Setelan SIEM > Feed.
  2. Klik Tambahkan Feed Baru.
  3. Di halaman berikutnya, klik Konfigurasi satu feed.
  4. Masukkan nama unik untuk Nama feed.
  5. Pilih Amazon S3 V2 sebagai Jenis sumber.
  6. Pilih Digital Shadows Indicators sebagai Log type.
  7. Klik Berikutnya, lalu klik Kirim.
  8. Tentukan nilai untuk kolom berikut:

    • URI S3: s3://digital-shadows-logs/digital-shadows/
    • Opsi penghapusan sumber: Pilih opsi penghapusan sesuai preferensi Anda
    • Usia File Maksimum: Menyertakan file yang diubah dalam beberapa hari terakhir (defaultnya adalah 180 hari)
    • ID Kunci Akses: Kunci akses pengguna dengan akses ke bucket S3
    • Kunci Akses Rahasia: Kunci rahasia pengguna dengan akses ke bucket S3
    • Namespace aset: Namespace aset
    • Label penyerapan: Label yang akan diterapkan ke peristiwa dari feed ini
  9. Klik Berikutnya, lalu klik Kirim.

Tabel pemetaan UDM

Kolom Log Pemetaan UDM Logika
nilai entity.entity.file.md5 Tetapkan jika type == "MD5"
nilai entity.entity.file.sha1 Tetapkan jika type == "SHA1"
nilai entity.entity.file.sha256 Tetapkan jika type == "SHA256"
nilai entity.entity.hostname Tetapkan jika type == "HOST"
nilai entity.entity.ip Nilai disalin secara langsung jika jenis == "IP"
nilai entity.entity.url Tetapkan jika type == "URL"
nilai entity.entity.user.email_addresses Nilai disalin langsung jika jenis == "EMAIL"
jenis entity.metadata.entity_type Disetel ke "DOMAIN_NAME" jika type == "HOST", "IP_ADDRESS" jika type == "IP", "URL" jika type == "URL", "USER" jika type == "EMAIL", "FILE" jika type in ["SHA1","SHA256","MD5"], atau "UNKNOWN_ENTITYTYPE"
lastUpdated entity.metadata.interval.start_time Dikonversi dari ISO8601 ke stempel waktu jika tidak kosong
id entity.metadata.product_entity_id Nilai disalin langsung jika tidak kosong
attributionTag.id, attributionTag.name, attributionTag.type entity.metadata.threat.about.labels Digabungkan dengan objek {key: tag field name, value: tag value} jika tidak kosong
sourceType entity.metadata.threat.category_details Nilai disalin secara langsung
entity.metadata.threat.threat_feed_name Setel ke "Indikator"
id entity.metadata.threat.threat_id Nilai disalin langsung jika tidak kosong
sourceIdentifier entity.metadata.threat.url_back_to_product Nilai disalin secara langsung
entity.metadata.product_name Setel ke "Indikator"
entity.metadata.vendor_name Ditetapkan ke "Jejak Digital"

Perlu bantuan lain? Dapatkan jawaban dari anggota Komunitas dan profesional Google SecOps.