收集 Team Cymru Scout 威胁情报数据
本文档介绍了如何使用 Amazon S3 将 Team Cymru Scout 威胁情报数据注入到 Google Security Operations。
准备工作
请确保满足以下前提条件:
- Google SecOps 实例
- 对 Team Cymru Scout 租户的特权访问权限
- 对 AWS(S3、IAM、Lambda、EventBridge)的特权访问权限
获取 Team Cymru Scout 的前提条件
- 登录 Team Cymru Scout Platform。
- 前往 API 密钥网页。
- 点击创建按钮。
- 根据需要提供密钥说明。
- 点击创建密钥按钮以生成 API 密钥。
- 复制以下详细信息并将其保存在安全的位置:
- SCOUT_API_KEY - API 访问密钥
- SCOUT_BASE_URL - Scout API 基本网址
为 Google SecOps 配置 AWS S3 存储桶和 IAM
- 按照以下用户指南创建 Amazon S3 存储桶:创建存储桶。
- 保存存储桶名称和区域以供日后参考(例如
team-cymru-scout-ti)。 - 按照以下用户指南创建用户:创建 IAM 用户。
- 选择创建的用户。
- 选择安全凭据标签页。
- 在访问密钥部分中,点击创建访问密钥。
- 选择第三方服务作为使用情形。
- 点击下一步。
- 可选:添加说明标记。
- 点击创建访问密钥。
- 点击下载 CSV 文件,保存访问密钥和不公开的访问密钥以供日后参考。
- 点击完成。
- 选择权限标签页。
- 在权限政策部分中,点击添加权限。
- 选择添加权限。
- 选择直接附加政策。
- 搜索 AmazonS3FullAccess 政策。
- 选择相应政策。
- 点击下一步。
- 点击添加权限。
为 S3 上传配置 IAM 政策和角色
- 在 AWS 控制台中,依次前往 IAM > 政策。
- 依次点击创建政策 > JSON 标签页。
输入以下政策:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::team-cymru-scout-ti/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::team-cymru-scout-ti/team-cymru/scout-ti/state.json" } ] }- 如果您输入了其他存储桶名称,请替换
team-cymru-scout-ti。
- 如果您输入了其他存储桶名称,请替换
依次点击下一步 > 创建政策。
依次前往 IAM > 角色 > 创建角色 > AWS 服务 > Lambda。
附加新创建的政策。
将角色命名为
TeamCymruScoutToS3Role,然后点击创建角色。
创建 Lambda 函数
- 在 AWS 控制台中,依次前往 Lambda > 函数 > 创建函数。
- 点击从头开始创作。
提供以下配置详细信息:
设置 值 名称 team_cymru_scout_ti_to_s3运行时 Python 3.13 架构 x86_64 执行角色 TeamCymruScoutToS3Role创建函数后,打开 Code 标签页,删除桩代码并输入以下代码 (
team_cymru_scout_ti_to_s3.py):```python #!/usr/bin/env python3 # Lambda: Pull Team Cymru Scout Threat Intelligence exports to S3 (no transform) import os, json, time from urllib.request import Request, urlopen from urllib.error import HTTPError, URLError import boto3 S3_BUCKET = os.environ["S3_BUCKET"] S3_PREFIX = os.environ.get("S3_PREFIX", "team-cymru/scout-ti/") STATE_KEY = os.environ.get("STATE_KEY", "team-cymru/scout-ti/state.json") WINDOW_SEC = int(os.environ.get("WINDOW_SECONDS", "3600")) HTTP_TIMEOUT = int(os.environ.get("HTTP_TIMEOUT", "60")) HTTP_RETRIES = int(os.environ.get("HTTP_RETRIES", "3")) MODE = os.environ.get("MODE", "GET").upper() API_HEADERS = json.loads(os.environ.get("API_HEADERS", "{}")) MAX_PAGES = int(os.environ.get("MAX_PAGES", "10")) # GET mode DOWNLOAD_URL_TEMPLATE = os.environ.get("DOWNLOAD_URL_TEMPLATE", "") # POST_JSON mode API_URL = os.environ.get("API_URL", "") JSON_BODY_TEMPLATE = os.environ.get("JSON_BODY_TEMPLATE", "") # Team Cymru Scout specific SCOUT_BASE_URL = os.environ.get("SCOUT_BASE_URL", "https://api.scout.cymru.com") SCOUT_API_KEY = os.environ.get("SCOUT_API_KEY", "") s3 = boto3.client("s3") def _iso(ts: float) -> str: return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(ts)) def _get_state() -> dict: try: obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY) b = obj["Body"].read() return json.loads(b) if b else {} except Exception: return {} def _put_state(st: dict): s3.put_object( Bucket=S3_BUCKET, Key=STATE_KEY, Body=json.dumps(st, separators=(",", ":")).encode("utf-8"), ContentType="application/json", ) def _http(url: str, method: str = "GET", body: bytes | None = None) -> tuple[bytes, str]: attempt = 0 while True: try: req = Request(url, method=method) # Add headers headers = API_HEADERS.copy() if SCOUT_API_KEY and "Authorization" not in headers: headers["Authorization"] = f"Bearer {SCOUT_API_KEY}" headers.setdefault("Accept", "application/json") for k, v in headers.items(): req.add_header(k, v) if body is not None: req.add_header("Content-Type", "application/json") with urlopen(req, data=body, timeout=HTTP_TIMEOUT) as r: return r.read(), r.headers.get("Content-Type", "application/json") except HTTPError as e: if e.code in (429, 500, 502, 503, 504) and attempt < HTTP_RETRIES: delay = 1 + attempt try: delay = int(e.headers.get("Retry-After", delay)) except Exception: pass time.sleep(max(1, delay)) attempt += 1 continue raise except URLError: if attempt < HTTP_RETRIES: time.sleep(1 + attempt) attempt += 1 continue raise def _write(blob: bytes, ctype: str, from_ts: float, to_ts: float, page: int) -> str: date_path = time.strftime("%Y/%m/%d", time.gmtime(to_ts)) key = f"{S3_PREFIX}/{date_path}/scout_ti_{int(from_ts)}_{int(to_ts)}_p{page:03d}.json" s3.put_object(Bucket=S3_BUCKET, Key=key, Body=blob, ContentType=ctype or "application/json") return key def _next_cursor(obj: dict) -> str | None: if not isinstance(obj, dict): return None for container in (obj, obj.get("meta", {}) or {}, obj.get("metadata", {}) or {}): for k in ("next", "next_cursor", "nextCursor", "nextPageToken", "continuation", "cursor", "pagedResultsCookie"): v = container.get(k) if v: return str(v) return None def _loop(from_ts: float, to_ts: float) -> dict: cursor, page, written = None, 0, 0 while page < MAX_PAGES: if MODE == "GET": if DOWNLOAD_URL_TEMPLATE: url = (DOWNLOAD_URL_TEMPLATE .replace("{FROM}", _iso(from_ts)) .replace("{TO}", _iso(to_ts)) .replace("{CURSOR}", cursor or "")) else: # Default Scout API endpoint (adjust based on actual API) url = f"{SCOUT_BASE_URL}/v1/threat-intelligence?start={_iso(from_ts)}&end={_iso(to_ts)}" if cursor: url += f"&cursor={cursor}" blob, ctype = _http(url, method="GET") else: assert API_URL and JSON_BODY_TEMPLATE, "API_URL and JSON_BODY_TEMPLATE required for MODE=POST_JSON" body = (JSON_BODY_TEMPLATE .replace("{FROM}", _iso(from_ts)) .replace("{TO}", _iso(to_ts)) .replace("{CURSOR}", cursor or "")).encode("utf-8") blob, ctype = _http(API_URL, method="POST", body=body) # Normalize to JSON bytes for storage try: parsed = json.loads(blob.decode("utf-8")) normalized = json.dumps(parsed, separators=(",", ":")).encode("utf-8") ctype_out = "application/json" except Exception: normalized = blob ctype_out = ctype or "application/octet-stream" _ = _write(normalized, ctype_out, from_ts, to_ts, page) written += 1 page += 1 # Follow cursor if JSON and cursor exists try: if parsed and isinstance(parsed, dict): cursor = _next_cursor(parsed) if not cursor: break except Exception: break return {"pages": page, "objects": written} def lambda_handler(event=None, context=None): st = _get_state() now = time.time() from_ts = st.get("last_to_ts") or (now - WINDOW_SEC) to_ts = now res = _loop(from_ts, to_ts) st["last_to_ts"] = to_ts _put_state(st) return {"ok": True, "window": {"from": _iso(from_ts), "to": _iso(to_ts)}, **res} if __name__ == "__main__": print(lambda_handler()) ```依次前往配置 > 环境变量。
依次点击修改 > 添加新的环境变量。
输入以下提供的环境变量,并将其替换为您的值。
键 示例值 S3_BUCKETteam-cymru-scout-tiS3_PREFIXteam-cymru/scout-ti/STATE_KEYteam-cymru/scout-ti/state.jsonSCOUT_BASE_URLhttps://api.scout.cymru.comSCOUT_API_KEYyour-scout-api-keyWINDOW_SECONDS3600HTTP_TIMEOUT60HTTP_RETRIES3MODEGET或POST_JSONAPI_HEADERS{"Authorization":"Bearer <token>","Accept":"application/json"}DOWNLOAD_URL_TEMPLATE(GET 模式)包含 {FROM}、{TO}、{CURSOR}的自定义网址模板API_URL(POST_JSON 模式) API 端点网址 JSON_BODY_TEMPLATE(POST_JSON 模式)包含 {FROM}、{TO}、{CURSOR}的 JSON 正文MAX_PAGES10创建函数后,停留在其页面上(或依次打开 Lambda > 函数 > 您的函数)。
选择配置标签页。
在常规配置面板中,点击修改。
将超时更改为 5 分钟(300 秒),然后点击保存。
创建 EventBridge 计划
- 依次前往 Amazon EventBridge > 调度器 > 创建调度。
- 提供以下配置详细信息:
- 周期性安排:费率 (
1 hour)。 - 目标:您的 Lambda 函数
team_cymru_scout_ti_to_s3。 - 名称:
team-cymru-scout-ti-1h。
- 周期性安排:费率 (
- 点击创建时间表。
可选:为 Google SecOps 创建只读 IAM 用户和密钥
- 依次前往 AWS 控制台> IAM > 用户 > 添加用户。
- 点击 Add users(添加用户)。
- 提供以下配置详细信息:
- 用户:输入
secops-reader。 - 访问类型:选择访问密钥 - 以程序化方式访问。
- 用户:输入
- 点击创建用户。
- 附加最低限度的读取政策(自定义):用户 > secops-reader > 权限 > 添加权限 > 直接附加政策 > 创建政策。
在 JSON 编辑器中,输入以下政策:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::team-cymru-scout-ti/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::team-cymru-scout-ti" } ] }将名称设置为
secops-reader-policy。依次前往创建政策 > 搜索/选择 > 下一步 > 添加权限。
依次前往安全凭据 > 访问密钥 > 创建访问密钥。
下载 CSV(这些值会输入到 Feed 中)。
在 Google SecOps 中配置 Feed 以注入 Team Cymru Scout Threat Intelligence
- 依次前往 SIEM 设置 > Feed。
- 点击添加新 Feed。
- 在Feed 名称字段中,输入 Feed 的名称(例如
Team Cymru Scout Threat Intelligence)。 - 选择 Amazon S3 V2 作为来源类型。
- 选择 Team Cymru Scout 威胁情报作为日志类型。
- 点击下一步。
- 为以下输入参数指定值:
- S3 URI:
s3://team-cymru-scout-ti/team-cymru/scout-ti/ - 源删除选项:根据您的偏好选择删除选项。
- 文件存在时间上限:包含在过去指定天数内修改的文件。默认值为 180 天。
- 访问密钥 ID:有权访问 S3 存储桶的用户访问密钥。
- 私有访问密钥:有权访问 S3 存储桶的用户私有密钥。
- 资产命名空间:资产命名空间。
- 注入标签:应用于此 Feed 中事件的标签。
- S3 URI:
- 点击下一步。
- 在最终确定界面中查看新的 Feed 配置,然后点击提交。
支持的 Team Cymru Scout 威胁情报日志格式
Team Cymru Scout Threat Intelligence 解析器支持 KV (LEEF) 和 CSV 格式的日志。
支持的 Team Cymru Scout 威胁情报示例日志
JSON
{ "account_name": "dummy_secops_user", "account_type": "basic_auth", "used_queries": 1414, "remaining_queries": 48586, "used_queries_percentage": 2.828, "query_limit": 50000, "used_foundation_queries": 4224, "remaining_foundation_queries": 5776, "foundation_query_limit": 10000, "used_foundation_queries_percentage": 42.24, "event_type": "account_usage" }
需要更多帮助?获得社区成员和 Google SecOps 专业人士的解答。