收集 Digital Shadows Indicators 記錄
本文說明如何使用 Amazon S3,將 Digital Shadows Indicators 記錄檔擷取至 Google Security Operations。
Digital Shadows Indicators (現為 ReliaQuest GreyMatter DRP 的一部分) 是一項數位風險防護平台,可持續監控並偵測公開網路、深網、暗網和社群媒體中的外部威脅、資料外洩和品牌冒用行為。提供威脅情報、事件快訊和入侵指標 (IOC),協助機構識別及降低數位風險。
事前準備
- Google SecOps 執行個體
- Digital Shadows Indicators 入口網站的特殊存取權
- AWS (S3、IAM) 的特殊存取權
- 已啟用 API 存取權的 Digital Shadows Indicators 訂閱方案
收集 Digital Shadows Indicators API 憑證
- 前往
https://portal-digitalshadows.com,登入 Digital Shadows Indicators Portal。 - 依序前往「設定」>「API 憑證」。
- 如果沒有現有 API 金鑰,請按一下「Create New API Client」(建立新的 API 用戶端) 或「Generate API Key」(產生 API 金鑰)。
複製下列詳細資料並存放在安全的位置:
- API 金鑰:6 個字元的 API 金鑰
- API 密鑰:32 個字元的 API 密鑰
- 帳戶 ID:您的帳戶 ID (顯示在入口網站中,或由 Digital Shadows 代表提供)
- API 基準網址:
https://api.searchlight.app/v1或https://portal-digitalshadows.com/api/v1(視您的租戶區域而定)
為 Google SecOps 設定 AWS S3 值區和 IAM
- 按照這份使用者指南的指示,建立 Amazon S3 bucket:建立 bucket。
- 儲存 bucket 的「名稱」和「地區」,以供日後參考 (例如
digital-shadows-logs)。 - 按照這份使用者指南的指示建立「使用者」:建立 IAM 使用者。
- 選取建立的「使用者」。
- 選取「安全憑證」分頁標籤。
- 在「Access Keys」部分中,按一下「Create Access Key」。
- 選取「第三方服務」做為「用途」。
- 點選「下一步」。
- 選用:新增說明標記。
- 按一下「建立存取金鑰」。
- 按一下「Download .csv file」(下載 .csv 檔案),儲存「Access Key」(存取金鑰) 和「Secret Access Key」(私密存取金鑰),以供日後參考。
- 按一下 [完成]。
- 選取 [權限] 分頁標籤。
- 在「Permissions policies」(權限政策) 區段中,按一下「Add permissions」(新增權限)。
- 選取「新增權限」。
- 選取「直接附加政策」。
- 搜尋「AmazonS3FullAccess」AmazonS3FullAccess政策。
- 選取政策。
- 點選「下一步」。
- 按一下「Add permissions」。
設定 S3 上傳的身分與存取權管理政策和角色
- 在 AWS 控制台中,依序前往「IAM」>「Policies」>「Create policy」>「JSON」分頁標籤。
- 複製並貼上下列政策。
政策 JSON (如果您輸入的 bucket 名稱不同,請替換
digital-shadows-logs):{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::digital-shadows-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::digital-shadows-logs/digital-shadows/state.json" } ] }依序點選「下一步」>「建立政策」。
依序前往「IAM」>「Roles」>「Create role」>「AWS service」>「Lambda」。
附加新建立的政策。
為角色命名
DigitalShadowsLambdaRole,然後按一下「建立角色」。
建立 Lambda 函式
- 在 AWS 控制台中,依序前往「Lambda」>「Functions」>「Create function」。
- 按一下「從頭開始撰寫」。
請提供下列設定詳細資料:
設定 值 名稱 DigitalShadowsCollector執行階段 Python 3.13 架構 x86_64 執行角色 DigitalShadowsLambdaRole建立函式後,開啟「程式碼」分頁,刪除存根,然後貼上以下程式碼 (DigitalShadowsCollector.py)。
import urllib3 import json import boto3 import os import base64 import logging import time from datetime import datetime, timedelta, timezone from urllib.parse import urlencode logger = logging.getLogger() logger.setLevel(logging.INFO) HTTP = urllib3.PoolManager(retries=False) storage_client = boto3.client('s3') def _basic_auth_header(key: str, secret: str) -> str: token = base64.b64encode(f"{key}:{secret}".encode("utf-8")).decode("utf-8") return f"Basic {token}" def _load_state(bucket, key, default_days=30) -> str: """Return ISO8601 checkpoint (UTC).""" try: response = storage_client.get_object(Bucket=bucket, Key=key) state_data = response['Body'].read().decode('utf-8') state = json.loads(state_data) ts = state.get("last_timestamp") if ts: return ts except storage_client.exceptions.NoSuchKey: logger.info("No previous state found, starting from default lookback") except Exception as e: logger.warning(f"State read error: {e}") return (datetime.now(timezone.utc) - timedelta(days=default_days)).isoformat() def _save_state(bucket, key, ts: str) -> None: storage_client.put_object( Bucket=bucket, Key=key, Body=json.dumps({"last_timestamp": ts}), ContentType="application/json" ) def _get_json(url: str, headers: dict, params: dict, backoff_s=2, max_retries=3) -> dict: qs = f"?{urlencode(params)}" if params else "" for attempt in range(max_retries): r = HTTP.request("GET", f"{url}{qs}", headers=headers) if r.status == 200: return json.loads(r.data.decode("utf-8")) if r.status in (429, 500, 502, 503, 504): wait = backoff_s * (2 ** attempt) logger.warning(f"HTTP {r.status} from DS API, retrying in {wait}s") time.sleep(wait) continue raise RuntimeError(f"DS API error {r.status}: {r.data[:200]}") raise RuntimeError("Exceeded retry budget for DS API") def _collect(api_base, headers, path, since_ts, account_id, page_size, max_pages, time_param): items = [] for page in range(max_pages): params = { "limit": page_size, "offset": page * page_size, time_param: since_ts, } if account_id: params["account-id"] = account_id data = _get_json(f"{api_base}/{path}", headers, params) batch = data.get("items") or data.get("data") or [] if not batch: break items.extend(batch) if len(batch) < page_size: break return items def lambda_handler(event, context): bucket_name = os.environ["S3_BUCKET"] api_key = os.environ["DS_API_KEY"] api_secret = os.environ["DS_API_SECRET"] prefix = os.environ.get("S3_PREFIX", "digital-shadows") state_key = os.environ.get("STATE_KEY", "digital-shadows/state.json") api_base = os.environ.get("API_BASE", "https://api.searchlight.app/v1") account_id = os.environ.get("DS_ACCOUNT_ID", "") page_size = int(os.environ.get("PAGE_SIZE", "100")) max_pages = int(os.environ.get("MAX_PAGES", "10")) try: last_ts = _load_state(bucket_name, state_key) logger.info(f"Checkpoint: {last_ts}") headers = { "Authorization": _basic_auth_header(api_key, api_secret), "Accept": "application/json", "User-Agent": "Chronicle-DigitalShadows-S3/1.0", } records = [] incidents = _collect( api_base, headers, "incidents", last_ts, account_id, page_size, max_pages, time_param="published-after" ) for incident in incidents: incident['_source_type'] = 'incident' records.extend(incidents) intel_incidents = _collect( api_base, headers, "intel-incidents", last_ts, account_id, page_size, max_pages, time_param="published-after" ) for intel in intel_incidents: intel['_source_type'] = 'intelligence_incident' records.extend(intel_incidents) indicators = _collect( api_base, headers, "indicators", last_ts, account_id, page_size, max_pages, time_param="lastUpdated-after" ) for indicator in indicators: indicator['_source_type'] = 'ioc' records.extend(indicators) if records: newest = max( (r.get("updated") or r.get("raised") or r.get("lastUpdated") or last_ts) for r in records ) key = f"{prefix}/digital_shadows_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}.json" body = "\n".join(json.dumps(r, separators=(",", ":")) for r in records) storage_client.put_object( Bucket=bucket_name, Key=key, Body=body, ContentType="application/x-ndjson" ) _save_state(bucket_name, state_key, newest) msg = f"Wrote {len(records)} records to s3://{bucket_name}/{key}" else: msg = "No new records" logger.info(msg) return {'statusCode': 200, 'body': json.dumps({'message': msg, 'records': len(records) if records else 0})} except Exception as e: logger.error(f"Error processing logs: {str(e)}") raise依序前往「Configuration」>「Environment variables」>「Edit」>「Add new environment variable」。
輸入下列環境變數,並將 換成您的值。
環境變數
鍵 範例值 S3_BUCKETdigital-shadows-logsS3_PREFIXdigital-shadows/STATE_KEYdigital-shadows/state.jsonDS_API_KEYABC123(您的 6 個字元 API 金鑰)DS_API_SECRETyour-32-character-api-secretAPI_BASEhttps://api.searchlight.app/v1DS_ACCOUNT_IDyour-account-idPAGE_SIZE100MAX_PAGES10建立函式後,請留在函式頁面 (或依序開啟「Lambda」>「Functions」>「DigitalShadowsCollector」)。
選取「設定」分頁標籤。
在「一般設定」面板中,按一下「編輯」。
將「逾時」變更為「5 分鐘 (300 秒)」,然後按一下「儲存」。
建立 EventBridge 排程
- 依序前往「Amazon EventBridge」>「Scheduler」>「Create schedule」。
- 請提供下列設定詳細資料:
- 週期性時間表:費率 (
1 hour) - 目標:您的 Lambda 函式
DigitalShadowsCollector - Name (名稱):
DigitalShadowsCollector-1h
- 週期性時間表:費率 (
- 按一下「建立時間表」。
在 Google SecOps 中設定動態饋給,擷取 Digital Shadows Indicators 記錄
- 依序前往「SIEM 設定」>「動態饋給」。
- 按一下「新增動態消息」。
- 在下一個頁面中,按一下「設定單一動態饋給」。
- 輸入動態饋給名稱的專屬名稱。
- 選取「Amazon S3 V2」做為「來源類型」。
- 選取「數位足跡指標」做為「記錄類型」。
- 依序點按「繼續」和「提交」。
為下列欄位指定值:
- S3 URI:
s3://digital-shadows-logs/digital-shadows/ - 來源刪除選項:根據偏好設定選取刪除選項
- 檔案存在時間上限:包含在過去天數內修改的檔案 (預設為 180 天)
- 存取金鑰 ID:可存取 S3 儲存貯體的使用者存取金鑰
- 存取密鑰:具有 S3 bucket 存取權的使用者私密金鑰
- 資產命名空間:資產命名空間
- 擷取標籤:要套用至這個動態饋給事件的標籤
- S3 URI:
依序點按「繼續」和「提交」。
UDM 對應表
| 記錄欄位 | UDM 對應 | 邏輯 |
|---|---|---|
| 值 | entity.entity.file.md5 | 如果 type ==「MD5」,則為必填 |
| 值 | entity.entity.file.sha1 | 如果 type ==「SHA1」,則為必填 |
| 值 | entity.entity.file.sha256 | 如果 type ==「SHA256」,則為必填 |
| 值 | entity.entity.hostname | 如果 type ==「HOST」,請設定 |
| 值 | entity.entity.ip | 如果 type ==「IP」,則直接複製值 |
| 值 | entity.entity.url | Set if type == "URL" |
| 值 | entity.entity.user.email_addresses | 如果 type ==「EMAIL」,則直接複製值 |
| 類型 | entity.metadata.entity_type | 如果 type == "HOST",則設為「DOMAIN_NAME」;如果 type == "IP",則設為「IP_ADDRESS」;如果 type == "URL",則設為「URL」;如果 type == "EMAIL",則設為「USER」;如果 type 位於 ["SHA1","SHA256","MD5"] 中,則設為「FILE」;否則設為「UNKNOWN_ENTITYTYPE」 |
| lastUpdated | entity.metadata.interval.start_time | 如果不是空白,則從 ISO8601 轉換為時間戳記 |
| id | entity.metadata.product_entity_id | 如果值不是空白,則直接複製該值 |
| attributionTag.id、attributionTag.name、attributionTag.type | entity.metadata.threat.about.labels | 如果不是空白,則與物件 {key: tag field name, value: tag value} 合併 |
| sourceType | entity.metadata.threat.category_details | 直接複製值 |
| entity.metadata.threat.threat_feed_name | 設為「指標」 | |
| id | entity.metadata.threat.threat_id | 如果值不是空白,則直接複製該值 |
| sourceIdentifier | entity.metadata.threat.url_back_to_product | 直接複製值 |
| entity.metadata.product_name | 設為「指標」 | |
| entity.metadata.vendor_name | 設為「數位陰影」 |
還有其他問題嗎?向社群成員和 Google SecOps 專業人員尋求答案。