Digital Shadows Indicators ログを収集する
このドキュメントでは、Amazon S3 を使用して Digital Shadows Indicators ログを Google Security Operations に取り込む方法について説明します。
Digital Shadows Indicators(現在は ReliaQuest GreyMatter DRP の一部)は、オープンウェブ、ディープウェブ、ダークウェブ、ソーシャル メディア全体で外部の脅威、データ漏洩、ブランドのなりすましを継続的にモニタリングして検出するデジタル リスク保護プラットフォームです。脅威インテリジェンス、インシデント アラート、セキュリティ侵害インジケーター(IOC)を提供し、組織がデジタル リスクを特定して軽減できるようにします。
始める前に
- Google SecOps インスタンス
- Digital Shadows Indicators ポータルへの特権アクセス
- AWS(S3、IAM)への特権アクセス
- API アクセスが有効になっている Digital Shadows Indicators の有効なサブスクリプション
Digital Shadows Indicators API の認証情報を収集する
https://portal-digitalshadows.comで Digital Shadows Indicators Portal にログインします。- [設定] > [API 認証情報] に移動します。
- 既存の API キーがない場合は、[Create New API Client] または [Generate API Key] をクリックします。
次の詳細をコピーして安全な場所に保存してください。
- API キー: 6 文字の API キー
- API Secret: 32 文字の API シークレット
- アカウント ID: アカウント ID(ポータルに表示されるか、Digital Shadows の担当者から提供されます)
- API ベース URL:
https://api.searchlight.app/v1またはhttps://portal-digitalshadows.com/api/v1(テナントのリージョンによって異なります)
Google SecOps 用に AWS S3 バケットと IAM を構成する
- バケットの作成のユーザーガイドに沿って、Amazon S3 バケットを作成します。
- 後で参照できるように、バケットの名前とリージョンを保存します(例:
digital-shadows-logs)。 - IAM ユーザーの作成のユーザーガイドに沿って、ユーザーを作成します。
- 作成した [User] を選択します。
- [セキュリティ認証情報] タブを選択します。
- [アクセスキー] セクションで [アクセスキーを作成] をクリックします。
- [ユースケース] で [サードパーティ サービス] を選択します。
- [次へ] をクリックします。
- 省略可: 説明タグを追加します。
- [アクセスキーを作成] をクリックします。
- [.csv ファイルをダウンロード] をクリックして、[アクセスキー] と [シークレット アクセスキー] を保存し、今後の参照に備えます。
- [完了] をクリックします。
- [権限] タブを選択します。
- [権限ポリシー] セクションで、[権限を追加] をクリックします。
- [権限を追加] を選択します。
- [ポリシーを直接アタッチする] を選択します。
- AmazonS3FullAccess ポリシーを検索します。
- ポリシーを選択します。
- [次へ] をクリックします。
- [権限を追加] をクリックします。
S3 アップロードの IAM ポリシーとロールを構成する
- AWS コンソールで、[IAM> ポリシー> ポリシーを作成> JSON] タブに移動します。
- 以下のポリシーをコピーして貼り付けます。
ポリシー JSON(別のバケット名を入力した場合は
digital-shadows-logsを置き換えます):{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::digital-shadows-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::digital-shadows-logs/digital-shadows/state.json" } ] }[次へ] > [ポリシーを作成] をクリックします。
[IAM] > [ロール] > [ロールの作成] > [AWS サービス] > [Lambda] に移動します。
新しく作成したポリシーを関連付けます。
ロールに「
DigitalShadowsLambdaRole」という名前を付けて、[ロールを作成] をクリックします。
Lambda 関数を作成する
- AWS コンソールで、[Lambda] > [Functions] > [Create function] に移動します。
- [Author from scratch] をクリックします。
次の構成情報を提供してください。
設定 値 名前 DigitalShadowsCollectorランタイム Python 3.13 アーキテクチャ x86_64 実行ロール DigitalShadowsLambdaRole関数を作成したら、[コード] タブを開き、スタブを削除して、以下のコード(DigitalShadowsCollector.py)を貼り付けます。
import urllib3 import json import boto3 import os import base64 import logging import time from datetime import datetime, timedelta, timezone from urllib.parse import urlencode logger = logging.getLogger() logger.setLevel(logging.INFO) HTTP = urllib3.PoolManager(retries=False) storage_client = boto3.client('s3') def _basic_auth_header(key: str, secret: str) -> str: token = base64.b64encode(f"{key}:{secret}".encode("utf-8")).decode("utf-8") return f"Basic {token}" def _load_state(bucket, key, default_days=30) -> str: """Return ISO8601 checkpoint (UTC).""" try: response = storage_client.get_object(Bucket=bucket, Key=key) state_data = response['Body'].read().decode('utf-8') state = json.loads(state_data) ts = state.get("last_timestamp") if ts: return ts except storage_client.exceptions.NoSuchKey: logger.info("No previous state found, starting from default lookback") except Exception as e: logger.warning(f"State read error: {e}") return (datetime.now(timezone.utc) - timedelta(days=default_days)).isoformat() def _save_state(bucket, key, ts: str) -> None: storage_client.put_object( Bucket=bucket, Key=key, Body=json.dumps({"last_timestamp": ts}), ContentType="application/json" ) def _get_json(url: str, headers: dict, params: dict, backoff_s=2, max_retries=3) -> dict: qs = f"?{urlencode(params)}" if params else "" for attempt in range(max_retries): r = HTTP.request("GET", f"{url}{qs}", headers=headers) if r.status == 200: return json.loads(r.data.decode("utf-8")) if r.status in (429, 500, 502, 503, 504): wait = backoff_s * (2 ** attempt) logger.warning(f"HTTP {r.status} from DS API, retrying in {wait}s") time.sleep(wait) continue raise RuntimeError(f"DS API error {r.status}: {r.data[:200]}") raise RuntimeError("Exceeded retry budget for DS API") def _collect(api_base, headers, path, since_ts, account_id, page_size, max_pages, time_param): items = [] for page in range(max_pages): params = { "limit": page_size, "offset": page * page_size, time_param: since_ts, } if account_id: params["account-id"] = account_id data = _get_json(f"{api_base}/{path}", headers, params) batch = data.get("items") or data.get("data") or [] if not batch: break items.extend(batch) if len(batch) < page_size: break return items def lambda_handler(event, context): bucket_name = os.environ["S3_BUCKET"] api_key = os.environ["DS_API_KEY"] api_secret = os.environ["DS_API_SECRET"] prefix = os.environ.get("S3_PREFIX", "digital-shadows") state_key = os.environ.get("STATE_KEY", "digital-shadows/state.json") api_base = os.environ.get("API_BASE", "https://api.searchlight.app/v1") account_id = os.environ.get("DS_ACCOUNT_ID", "") page_size = int(os.environ.get("PAGE_SIZE", "100")) max_pages = int(os.environ.get("MAX_PAGES", "10")) try: last_ts = _load_state(bucket_name, state_key) logger.info(f"Checkpoint: {last_ts}") headers = { "Authorization": _basic_auth_header(api_key, api_secret), "Accept": "application/json", "User-Agent": "Chronicle-DigitalShadows-S3/1.0", } records = [] incidents = _collect( api_base, headers, "incidents", last_ts, account_id, page_size, max_pages, time_param="published-after" ) for incident in incidents: incident['_source_type'] = 'incident' records.extend(incidents) intel_incidents = _collect( api_base, headers, "intel-incidents", last_ts, account_id, page_size, max_pages, time_param="published-after" ) for intel in intel_incidents: intel['_source_type'] = 'intelligence_incident' records.extend(intel_incidents) indicators = _collect( api_base, headers, "indicators", last_ts, account_id, page_size, max_pages, time_param="lastUpdated-after" ) for indicator in indicators: indicator['_source_type'] = 'ioc' records.extend(indicators) if records: newest = max( (r.get("updated") or r.get("raised") or r.get("lastUpdated") or last_ts) for r in records ) key = f"{prefix}/digital_shadows_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}.json" body = "\n".join(json.dumps(r, separators=(",", ":")) for r in records) storage_client.put_object( Bucket=bucket_name, Key=key, Body=body, ContentType="application/x-ndjson" ) _save_state(bucket_name, state_key, newest) msg = f"Wrote {len(records)} records to s3://{bucket_name}/{key}" else: msg = "No new records" logger.info(msg) return {'statusCode': 200, 'body': json.dumps({'message': msg, 'records': len(records) if records else 0})} except Exception as e: logger.error(f"Error processing logs: {str(e)}") raise[構成> 環境変数 > 編集 > 新しい環境変数を追加] に移動します。
次の環境変数を入力し、実際の値に置き換えます。
環境変数
キー 値の例 S3_BUCKETdigital-shadows-logsS3_PREFIXdigital-shadows/STATE_KEYdigital-shadows/state.jsonDS_API_KEYABC123(6 文字の API キー)DS_API_SECRETyour-32-character-api-secretAPI_BASEhttps://api.searchlight.app/v1DS_ACCOUNT_IDyour-account-idPAGE_SIZE100MAX_PAGES10関数が作成されたら、そのページにとどまるか、[Lambda] > [Functions] > [DigitalShadowsCollector] を開きます。
[CONFIGURATION] タブを選択します。
[全般設定] パネルで、[編集] をクリックします。
[タイムアウト] を [5 分(300 秒)] に変更し、[保存] をクリックします。
EventBridge スケジュールを作成する
- [Amazon EventBridge] > [Scheduler] > [スケジュールの作成] に移動します。
- 次の構成の詳細を入力します。
- 定期的なスケジュール: レート(
1 hour) - ターゲット: Lambda 関数
DigitalShadowsCollector - 名前:
DigitalShadowsCollector-1h
- 定期的なスケジュール: レート(
- [スケジュールを作成] をクリックします。
Digital Shadows Indicators のログを取り込むように Google SecOps でフィードを構成する
- [SIEM 設定] > [フィード] に移動します。
- [Add New Feed] をクリックします。
- 次のページで [単一フィードを設定] をクリックします。
- [Feed name] に一意の名前を入力します。
- [ソースタイプ] として [Amazon S3 V2] を選択します。
- [Log type] として [Digital Shadows Indicators] を選択します。
- [次へ] をクリックしてから、[送信] をクリックします。
次のフィールドに値を指定します。
- S3 URI:
s3://digital-shadows-logs/digital-shadows/ - Source deletion option: 必要に応じて削除オプションを選択します。
- ファイルの最大経過日数: 過去の日数内に変更されたファイルを含めます(デフォルトは 180 日)。
- アクセスキー ID: S3 バケットにアクセスできるユーザー アクセスキー
- シークレット アクセスキー: S3 バケットにアクセスできるユーザーのシークレット キー
- アセットの名前空間: アセットの名前空間
- Ingestion labels: このフィードのイベントに適用されるラベル
- S3 URI:
[次へ] をクリックしてから、[送信] をクリックします。
UDM マッピング テーブル
| ログフィールド | UDM マッピング | ロジック |
|---|---|---|
| 値 | entity.entity.file.md5 | type == "MD5" の場合に設定します。 |
| 値 | entity.entity.file.sha1 | type == "SHA1" の場合に設定 |
| 値 | entity.entity.file.sha256 | type == "SHA256" の場合に設定 |
| 値 | entity.entity.hostname | type == "HOST" の場合に設定 |
| 値 | entity.entity.ip | 型が「IP」の場合は値が直接コピーされます |
| 値 | entity.entity.url | type == "URL" の場合に設定 |
| 値 | entity.entity.user.email_addresses | 型が「EMAIL」の場合は値が直接コピーされます |
| type | entity.metadata.entity_type | type == 「HOST」の場合は「DOMAIN_NAME」、type == 「IP」の場合は「IP_ADDRESS」、type == 「URL」の場合は「URL」、type == 「EMAIL」の場合は「USER」、type が ["SHA1","SHA256","MD5"] の場合は「FILE」、それ以外の場合は「UNKNOWN_ENTITYTYPE」に設定します。 |
| lastUpdated | entity.metadata.interval.start_time | 空でない場合は ISO8601 からタイムスタンプに変換されます |
| id | entity.metadata.product_entity_id | 空でない場合、値が直接コピーされます |
| attributionTag.id、attributionTag.name、attributionTag.type | entity.metadata.threat.about.labels | 空でない場合は、オブジェクト {key: タグ フィールド名, value: タグ値} と統合されます |
| sourceType | entity.metadata.threat.category_details | 値を直接コピーしました |
| entity.metadata.threat.threat_feed_name | [インジケーター] に設定 | |
| id | entity.metadata.threat.threat_id | 空でない場合、値が直接コピーされます |
| sourceIdentifier | entity.metadata.threat.url_back_to_product | 値を直接コピーしました |
| entity.metadata.product_name | [インジケーター] に設定 | |
| entity.metadata.vendor_name | 「デジタル シャドウ」に設定 |
さらにサポートが必要な場合 コミュニティ メンバーや Google SecOps のプロフェッショナルから回答を得ることができます。