Digital Shadows Indicators ログを収集する

以下でサポートされています。

このドキュメントでは、Amazon S3 を使用して Digital Shadows Indicators ログを Google Security Operations に取り込む方法について説明します。

Digital Shadows Indicators(現在は ReliaQuest GreyMatter DRP の一部)は、オープンウェブ、ディープウェブ、ダークウェブ、ソーシャル メディア全体で外部の脅威、データ漏洩、ブランドのなりすましを継続的にモニタリングして検出するデジタル リスク保護プラットフォームです。脅威インテリジェンス、インシデント アラート、セキュリティ侵害インジケーター(IOC)を提供し、組織がデジタル リスクを特定して軽減できるようにします。

始める前に

  • Google SecOps インスタンス
  • Digital Shadows Indicators ポータルへの特権アクセス
  • AWS(S3、IAM)への特権アクセス
  • API アクセスが有効になっている Digital Shadows Indicators の有効なサブスクリプション

Digital Shadows Indicators API の認証情報を収集する

  1. https://portal-digitalshadows.comDigital Shadows Indicators Portal にログインします。
  2. [設定] > [API 認証情報] に移動します。
  3. 既存の API キーがない場合は、[Create New API Client] または [Generate API Key] をクリックします。
  4. 次の詳細をコピーして安全な場所に保存してください。

    • API キー: 6 文字の API キー
    • API Secret: 32 文字の API シークレット
    • アカウント ID: アカウント ID(ポータルに表示されるか、Digital Shadows の担当者から提供されます)
    • API ベース URL: https://api.searchlight.app/v1 または https://portal-digitalshadows.com/api/v1(テナントのリージョンによって異なります)

Google SecOps 用に AWS S3 バケットと IAM を構成する

  1. バケットの作成のユーザーガイドに沿って、Amazon S3 バケットを作成します。
  2. 後で参照できるように、バケットの名前リージョンを保存します(例: digital-shadows-logs)。
  3. IAM ユーザーの作成のユーザーガイドに沿って、ユーザーを作成します。
  4. 作成した [User] を選択します。
  5. [セキュリティ認証情報] タブを選択します。
  6. [アクセスキー] セクションで [アクセスキーを作成] をクリックします。
  7. [ユースケース] で [サードパーティ サービス] を選択します。
  8. [次へ] をクリックします。
  9. 省略可: 説明タグを追加します。
  10. [アクセスキーを作成] をクリックします。
  11. [.csv ファイルをダウンロード] をクリックして、[アクセスキー] と [シークレット アクセスキー] を保存し、今後の参照に備えます。
  12. [完了] をクリックします。
  13. [権限] タブを選択します。
  14. [権限ポリシー] セクションで、[権限を追加] をクリックします。
  15. [権限を追加] を選択します。
  16. [ポリシーを直接アタッチする] を選択します。
  17. AmazonS3FullAccess ポリシーを検索します。
  18. ポリシーを選択します。
  19. [次へ] をクリックします。
  20. [権限を追加] をクリックします。

S3 アップロードの IAM ポリシーとロールを構成する

  1. AWS コンソールで、[IAM> ポリシー> ポリシーを作成> JSON] タブに移動します。
  2. 以下のポリシーをコピーして貼り付けます。
  3. ポリシー JSON(別のバケット名を入力した場合は digital-shadows-logs を置き換えます):

    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "AllowPutObjects",
                "Effect": "Allow",
                "Action": "s3:PutObject",
                "Resource": "arn:aws:s3:::digital-shadows-logs/*"
            },
            {
                "Sid": "AllowGetStateObject",
                "Effect": "Allow",
                "Action": "s3:GetObject",
                "Resource": "arn:aws:s3:::digital-shadows-logs/digital-shadows/state.json"
            }
        ]
    }
    
  4. [次へ] > [ポリシーを作成] をクリックします。

  5. [IAM] > [ロール] > [ロールの作成] > [AWS サービス] > [Lambda] に移動します。

  6. 新しく作成したポリシーを関連付けます。

  7. ロールに「DigitalShadowsLambdaRole」という名前を付けて、[ロールを作成] をクリックします。

Lambda 関数を作成する

  1. AWS コンソールで、[Lambda] > [Functions] > [Create function] に移動します。
  2. [Author from scratch] をクリックします。
  3. 次の構成情報を提供してください。

    設定
    名前 DigitalShadowsCollector
    ランタイム Python 3.13
    アーキテクチャ x86_64
    実行ロール DigitalShadowsLambdaRole
  4. 関数を作成したら、[コード] タブを開き、スタブを削除して、以下のコード(DigitalShadowsCollector.py)を貼り付けます。

    import urllib3
    import json
    import boto3
    import os
    import base64
    import logging
    import time
    from datetime import datetime, timedelta, timezone
    from urllib.parse import urlencode
    
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    
    HTTP = urllib3.PoolManager(retries=False)
    storage_client = boto3.client('s3')
    
    def _basic_auth_header(key: str, secret: str) -> str:
        token = base64.b64encode(f"{key}:{secret}".encode("utf-8")).decode("utf-8")
        return f"Basic {token}"
    
    def _load_state(bucket, key, default_days=30) -> str:
        """Return ISO8601 checkpoint (UTC)."""
        try:
            response = storage_client.get_object(Bucket=bucket, Key=key)
            state_data = response['Body'].read().decode('utf-8')
            state = json.loads(state_data)
            ts = state.get("last_timestamp")
            if ts:
                return ts
        except storage_client.exceptions.NoSuchKey:
            logger.info("No previous state found, starting from default lookback")
        except Exception as e:
            logger.warning(f"State read error: {e}")
        return (datetime.now(timezone.utc) - timedelta(days=default_days)).isoformat()
    
    def _save_state(bucket, key, ts: str) -> None:
        storage_client.put_object(
            Bucket=bucket,
            Key=key,
            Body=json.dumps({"last_timestamp": ts}),
            ContentType="application/json"
        )
    
    def _get_json(url: str, headers: dict, params: dict, backoff_s=2, max_retries=3) -> dict:
        qs = f"?{urlencode(params)}" if params else ""
        for attempt in range(max_retries):
            r = HTTP.request("GET", f"{url}{qs}", headers=headers)
            if r.status == 200:
                return json.loads(r.data.decode("utf-8"))
            if r.status in (429, 500, 502, 503, 504):
                wait = backoff_s * (2 ** attempt)
                logger.warning(f"HTTP {r.status} from DS API, retrying in {wait}s")
                time.sleep(wait)
                continue
            raise RuntimeError(f"DS API error {r.status}: {r.data[:200]}")
        raise RuntimeError("Exceeded retry budget for DS API")
    
    def _collect(api_base, headers, path, since_ts, account_id, page_size, max_pages, time_param):
        items = []
        for page in range(max_pages):
            params = {
                "limit": page_size,
                "offset": page * page_size,
                time_param: since_ts,
            }
            if account_id:
                params["account-id"] = account_id
            data = _get_json(f"{api_base}/{path}", headers, params)
            batch = data.get("items") or data.get("data") or []
            if not batch:
                break
            items.extend(batch)
            if len(batch) < page_size:
                break
        return items
    
    def lambda_handler(event, context):
        bucket_name = os.environ["S3_BUCKET"]
        api_key = os.environ["DS_API_KEY"]
        api_secret = os.environ["DS_API_SECRET"]
        prefix = os.environ.get("S3_PREFIX", "digital-shadows")
        state_key = os.environ.get("STATE_KEY", "digital-shadows/state.json")
        api_base = os.environ.get("API_BASE", "https://api.searchlight.app/v1")
        account_id = os.environ.get("DS_ACCOUNT_ID", "")
        page_size = int(os.environ.get("PAGE_SIZE", "100"))
        max_pages = int(os.environ.get("MAX_PAGES", "10"))
    
        try:
            last_ts = _load_state(bucket_name, state_key)
            logger.info(f"Checkpoint: {last_ts}")
    
            headers = {
                "Authorization": _basic_auth_header(api_key, api_secret),
                "Accept": "application/json",
                "User-Agent": "Chronicle-DigitalShadows-S3/1.0",
            }
    
            records = []
    
            incidents = _collect(
                api_base, headers, "incidents", last_ts, account_id,
                page_size, max_pages, time_param="published-after"
            )
            for incident in incidents:
                incident['_source_type'] = 'incident'
            records.extend(incidents)
    
            intel_incidents = _collect(
                api_base, headers, "intel-incidents", last_ts, account_id,
                page_size, max_pages, time_param="published-after"
            )
            for intel in intel_incidents:
                intel['_source_type'] = 'intelligence_incident'
            records.extend(intel_incidents)
    
            indicators = _collect(
                api_base, headers, "indicators", last_ts, account_id,
                page_size, max_pages, time_param="lastUpdated-after"
            )
            for indicator in indicators:
                indicator['_source_type'] = 'ioc'
            records.extend(indicators)
    
            if records:
                newest = max(
                    (r.get("updated") or r.get("raised") or r.get("lastUpdated") or last_ts)
                    for r in records
                )
                key = f"{prefix}/digital_shadows_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}.json"
                body = "\n".join(json.dumps(r, separators=(",", ":")) for r in records)
                storage_client.put_object(
                    Bucket=bucket_name,
                    Key=key,
                    Body=body,
                    ContentType="application/x-ndjson"
                )
                _save_state(bucket_name, state_key, newest)
                msg = f"Wrote {len(records)} records to s3://{bucket_name}/{key}"
            else:
                msg = "No new records"
    
            logger.info(msg)
            return {'statusCode': 200, 'body': json.dumps({'message': msg, 'records': len(records) if records else 0})}
    
        except Exception as e:
            logger.error(f"Error processing logs: {str(e)}")
            raise
    
  5. [構成> 環境変数 > 編集 > 新しい環境変数を追加] に移動します。

  6. 次の環境変数を入力し、実際の値に置き換えます。

    環境変数

    キー 値の例
    S3_BUCKET digital-shadows-logs
    S3_PREFIX digital-shadows/
    STATE_KEY digital-shadows/state.json
    DS_API_KEY ABC123(6 文字の API キー)
    DS_API_SECRET your-32-character-api-secret
    API_BASE https://api.searchlight.app/v1
    DS_ACCOUNT_ID your-account-id
    PAGE_SIZE 100
    MAX_PAGES 10
  7. 関数が作成されたら、そのページにとどまるか、[Lambda] > [Functions] > [DigitalShadowsCollector] を開きます。

  8. [CONFIGURATION] タブを選択します。

  9. [全般設定] パネルで、[編集] をクリックします。

  10. [タイムアウト] を [5 分(300 秒)] に変更し、[保存] をクリックします。

EventBridge スケジュールを作成する

  1. [Amazon EventBridge] > [Scheduler] > [スケジュールの作成] に移動します。
  2. 次の構成の詳細を入力します。
    • 定期的なスケジュール: レート1 hour
    • ターゲット: Lambda 関数 DigitalShadowsCollector
    • 名前: DigitalShadowsCollector-1h
  3. [スケジュールを作成] をクリックします。

Digital Shadows Indicators のログを取り込むように Google SecOps でフィードを構成する

  1. [SIEM 設定] > [フィード] に移動します。
  2. [Add New Feed] をクリックします。
  3. 次のページで [単一フィードを設定] をクリックします。
  4. [Feed name] に一意の名前を入力します。
  5. [ソースタイプ] として [Amazon S3 V2] を選択します。
  6. [Log type] として [Digital Shadows Indicators] を選択します。
  7. [次へ] をクリックしてから、[送信] をクリックします。
  8. 次のフィールドに値を指定します。

    • S3 URI: s3://digital-shadows-logs/digital-shadows/
    • Source deletion option: 必要に応じて削除オプションを選択します。
    • ファイルの最大経過日数: 過去の日数内に変更されたファイルを含めます(デフォルトは 180 日)。
    • アクセスキー ID: S3 バケットにアクセスできるユーザー アクセスキー
    • シークレット アクセスキー: S3 バケットにアクセスできるユーザーのシークレット キー
    • アセットの名前空間: アセットの名前空間
    • Ingestion labels: このフィードのイベントに適用されるラベル
  9. [次へ] をクリックしてから、[送信] をクリックします。

UDM マッピング テーブル

ログフィールド UDM マッピング ロジック
entity.entity.file.md5 type == "MD5" の場合に設定します。
entity.entity.file.sha1 type == "SHA1" の場合に設定
entity.entity.file.sha256 type == "SHA256" の場合に設定
entity.entity.hostname type == "HOST" の場合に設定
entity.entity.ip 型が「IP」の場合は値が直接コピーされます
entity.entity.url type == "URL" の場合に設定
entity.entity.user.email_addresses 型が「EMAIL」の場合は値が直接コピーされます
type entity.metadata.entity_type type == 「HOST」の場合は「DOMAIN_NAME」、type == 「IP」の場合は「IP_ADDRESS」、type == 「URL」の場合は「URL」、type == 「EMAIL」の場合は「USER」、type が ["SHA1","SHA256","MD5"] の場合は「FILE」、それ以外の場合は「UNKNOWN_ENTITYTYPE」に設定します。
lastUpdated entity.metadata.interval.start_time 空でない場合は ISO8601 からタイムスタンプに変換されます
id entity.metadata.product_entity_id 空でない場合、値が直接コピーされます
attributionTag.id、attributionTag.name、attributionTag.type entity.metadata.threat.about.labels 空でない場合は、オブジェクト {key: タグ フィールド名, value: タグ値} と統合されます
sourceType entity.metadata.threat.category_details 値を直接コピーしました
entity.metadata.threat.threat_feed_name [インジケーター] に設定
id entity.metadata.threat.threat_id 空でない場合、値が直接コピーされます
sourceIdentifier entity.metadata.threat.url_back_to_product 値を直接コピーしました
entity.metadata.product_name [インジケーター] に設定
entity.metadata.vendor_name 「デジタル シャドウ」に設定

さらにサポートが必要な場合 コミュニティ メンバーや Google SecOps のプロフェッショナルから回答を得ることができます。