Team Cymru Scout Threat Intelligence データを収集する
このドキュメントでは、Amazon S3 を使用して Team Cymru Scout Threat Intelligence データを Google Security Operations に取り込む方法について説明します。
始める前に
次の前提条件を満たしていることを確認してください。
- Google SecOps インスタンス
- Team Cymru Scout テナントへの特権アクセス
- AWS(S3、IAM、Lambda、EventBridge)への特権アクセス
Team Cymru Scout の前提条件を取得する
- Team Cymru Scout Platform にログインします。
- API キーのウェブに移動します。
- [作成] ボタンをクリックします。
- 必要に応じて、鍵の説明を入力します。
- [キーを作成] ボタンをクリックして、API キーを生成します。
- 次の詳細をコピーして安全な場所に保存します。
- SCOUT_API_KEY - API アクセスキー
- SCOUT_BASE_URL - Scout API のベース URL
Google SecOps 用に AWS S3 バケットと IAM を構成する
- バケットの作成のユーザーガイドに沿って、Amazon S3 バケットを作成します。
- 後で参照できるように、バケットの名前とリージョンを保存します(例:
team-cymru-scout-ti)。 - IAM ユーザーの作成のユーザーガイドに沿って、ユーザーを作成します。
- 作成したユーザーを選択します。
- [セキュリティ認証情報] タブを選択します。
- [アクセスキー] セクションで [アクセスキーを作成] をクリックします。
- [ユースケース] で [サードパーティ サービス] を選択します。
- [次へ] をクリックします。
- 省略可: 説明タグを追加します。
- [アクセスキーを作成] をクリックします。
- [CSV ファイルをダウンロード] をクリックし、[アクセスキー] と [シークレット アクセスキー] を保存して、今後の参照に備えます。
- [完了] をクリックします。
- [権限] タブを選択します。
- [権限ポリシー] セクションで [権限を追加] をクリックします。
- [権限を追加] を選択します。
- [ポリシーを直接アタッチする] を選択します。
- AmazonS3FullAccess ポリシーを検索します。
- ポリシーを選択します。
- [次へ] をクリックします。
- [権限を追加] をクリックします。
S3 アップロードの IAM ポリシーとロールを構成する
- AWS コンソールで、[IAM] > [ポリシー] に移動します。
- [ポリシーを作成> [JSON] タブ] をクリックします。
次のポリシーを入力します。
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::team-cymru-scout-ti/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::team-cymru-scout-ti/team-cymru/scout-ti/state.json" } ] }- 別のバケット名を入力した場合は、
team-cymru-scout-tiを置き換えます。
- 別のバケット名を入力した場合は、
[次へ] > [ポリシーを作成] をクリックします。
[IAM] > [ロール] > [ロールの作成] > [AWS サービス] > [Lambda] に移動します。
新しく作成したポリシーを関連付けます。
ロールに「
TeamCymruScoutToS3Role」という名前を付けて、[ロールを作成] をクリックします。
Lambda 関数を作成する
- AWS コンソールで、[Lambda] > [Functions] > [Create function] に移動します。
- [Author from scratch] をクリックします。
次の構成情報を提供してください。
設定 値 名前 team_cymru_scout_ti_to_s3ランタイム Python 3.13 アーキテクチャ x86_64 実行ロール TeamCymruScoutToS3Role関数を作成したら、[コード] タブを開き、スタブを削除して次のコード(
team_cymru_scout_ti_to_s3.py)を入力します。```python #!/usr/bin/env python3 # Lambda: Pull Team Cymru Scout Threat Intelligence exports to S3 (no transform) import os, json, time from urllib.request import Request, urlopen from urllib.error import HTTPError, URLError import boto3 S3_BUCKET = os.environ["S3_BUCKET"] S3_PREFIX = os.environ.get("S3_PREFIX", "team-cymru/scout-ti/") STATE_KEY = os.environ.get("STATE_KEY", "team-cymru/scout-ti/state.json") WINDOW_SEC = int(os.environ.get("WINDOW_SECONDS", "3600")) HTTP_TIMEOUT = int(os.environ.get("HTTP_TIMEOUT", "60")) HTTP_RETRIES = int(os.environ.get("HTTP_RETRIES", "3")) MODE = os.environ.get("MODE", "GET").upper() API_HEADERS = json.loads(os.environ.get("API_HEADERS", "{}")) MAX_PAGES = int(os.environ.get("MAX_PAGES", "10")) # GET mode DOWNLOAD_URL_TEMPLATE = os.environ.get("DOWNLOAD_URL_TEMPLATE", "") # POST_JSON mode API_URL = os.environ.get("API_URL", "") JSON_BODY_TEMPLATE = os.environ.get("JSON_BODY_TEMPLATE", "") # Team Cymru Scout specific SCOUT_BASE_URL = os.environ.get("SCOUT_BASE_URL", "https://api.scout.cymru.com") SCOUT_API_KEY = os.environ.get("SCOUT_API_KEY", "") s3 = boto3.client("s3") def _iso(ts: float) -> str: return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(ts)) def _get_state() -> dict: try: obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY) b = obj["Body"].read() return json.loads(b) if b else {} except Exception: return {} def _put_state(st: dict): s3.put_object( Bucket=S3_BUCKET, Key=STATE_KEY, Body=json.dumps(st, separators=(",", ":")).encode("utf-8"), ContentType="application/json", ) def _http(url: str, method: str = "GET", body: bytes | None = None) -> tuple[bytes, str]: attempt = 0 while True: try: req = Request(url, method=method) # Add headers headers = API_HEADERS.copy() if SCOUT_API_KEY and "Authorization" not in headers: headers["Authorization"] = f"Bearer {SCOUT_API_KEY}" headers.setdefault("Accept", "application/json") for k, v in headers.items(): req.add_header(k, v) if body is not None: req.add_header("Content-Type", "application/json") with urlopen(req, data=body, timeout=HTTP_TIMEOUT) as r: return r.read(), r.headers.get("Content-Type", "application/json") except HTTPError as e: if e.code in (429, 500, 502, 503, 504) and attempt < HTTP_RETRIES: delay = 1 + attempt try: delay = int(e.headers.get("Retry-After", delay)) except Exception: pass time.sleep(max(1, delay)) attempt += 1 continue raise except URLError: if attempt < HTTP_RETRIES: time.sleep(1 + attempt) attempt += 1 continue raise def _write(blob: bytes, ctype: str, from_ts: float, to_ts: float, page: int) -> str: date_path = time.strftime("%Y/%m/%d", time.gmtime(to_ts)) key = f"{S3_PREFIX}/{date_path}/scout_ti_{int(from_ts)}_{int(to_ts)}_p{page:03d}.json" s3.put_object(Bucket=S3_BUCKET, Key=key, Body=blob, ContentType=ctype or "application/json") return key def _next_cursor(obj: dict) -> str | None: if not isinstance(obj, dict): return None for container in (obj, obj.get("meta", {}) or {}, obj.get("metadata", {}) or {}): for k in ("next", "next_cursor", "nextCursor", "nextPageToken", "continuation", "cursor", "pagedResultsCookie"): v = container.get(k) if v: return str(v) return None def _loop(from_ts: float, to_ts: float) -> dict: cursor, page, written = None, 0, 0 while page < MAX_PAGES: if MODE == "GET": if DOWNLOAD_URL_TEMPLATE: url = (DOWNLOAD_URL_TEMPLATE .replace("{FROM}", _iso(from_ts)) .replace("{TO}", _iso(to_ts)) .replace("{CURSOR}", cursor or "")) else: # Default Scout API endpoint (adjust based on actual API) url = f"{SCOUT_BASE_URL}/v1/threat-intelligence?start={_iso(from_ts)}&end={_iso(to_ts)}" if cursor: url += f"&cursor={cursor}" blob, ctype = _http(url, method="GET") else: assert API_URL and JSON_BODY_TEMPLATE, "API_URL and JSON_BODY_TEMPLATE required for MODE=POST_JSON" body = (JSON_BODY_TEMPLATE .replace("{FROM}", _iso(from_ts)) .replace("{TO}", _iso(to_ts)) .replace("{CURSOR}", cursor or "")).encode("utf-8") blob, ctype = _http(API_URL, method="POST", body=body) # Normalize to JSON bytes for storage try: parsed = json.loads(blob.decode("utf-8")) normalized = json.dumps(parsed, separators=(",", ":")).encode("utf-8") ctype_out = "application/json" except Exception: normalized = blob ctype_out = ctype or "application/octet-stream" _ = _write(normalized, ctype_out, from_ts, to_ts, page) written += 1 page += 1 # Follow cursor if JSON and cursor exists try: if parsed and isinstance(parsed, dict): cursor = _next_cursor(parsed) if not cursor: break except Exception: break return {"pages": page, "objects": written} def lambda_handler(event=None, context=None): st = _get_state() now = time.time() from_ts = st.get("last_to_ts") or (now - WINDOW_SEC) to_ts = now res = _loop(from_ts, to_ts) st["last_to_ts"] = to_ts _put_state(st) return {"ok": True, "window": {"from": _iso(from_ts), "to": _iso(to_ts)}, **res} if __name__ == "__main__": print(lambda_handler()) ```[構成] > [環境変数] に移動します。
[編集>新しい環境変数を追加] をクリックします。
次の環境変数を入力し、実際の値に置き換えます。
キー 値の例 S3_BUCKETteam-cymru-scout-tiS3_PREFIXteam-cymru/scout-ti/STATE_KEYteam-cymru/scout-ti/state.jsonSCOUT_BASE_URLhttps://api.scout.cymru.comSCOUT_API_KEYyour-scout-api-keyWINDOW_SECONDS3600HTTP_TIMEOUT60HTTP_RETRIES3MODEGETまたはPOST_JSONAPI_HEADERS{"Authorization":"Bearer <token>","Accept":"application/json"}DOWNLOAD_URL_TEMPLATE(GET モード) {FROM}、{TO}、{CURSOR}を含むカスタム URL テンプレートAPI_URL(POST_JSON モード)API エンドポイント URL JSON_BODY_TEMPLATE(POST_JSON モード) {FROM}、{TO}、{CURSOR}を含む JSON 本文MAX_PAGES10関数が作成されたら、そのページにとどまるか、[Lambda> 関数 > your-function] を開きます。
[CONFIGURATION] タブを選択します。
[全般設定] パネルで、[編集] をクリックします。
[Timeout] を [5 minutes (300 seconds)] に変更し、[Save] をクリックします。
EventBridge スケジュールを作成する
- [Amazon EventBridge] > [Scheduler] > [スケジュールの作成] に移動します。
- 次の構成の詳細を入力します。
- 定期的なスケジュール: レート(
1 hour)。 - ターゲット: Lambda 関数
team_cymru_scout_ti_to_s3。 - 名前:
team-cymru-scout-ti-1h
- 定期的なスケジュール: レート(
- [スケジュールを作成] をクリックします。
省略可: Google SecOps 用の読み取り専用の IAM ユーザーと鍵を作成する
- AWS コンソール > IAM > ユーザー > ユーザーを追加 に移動します。
- [ユーザーを追加] をクリックします。
- 次の構成の詳細を入力します。
- ユーザー: 「
secops-reader」と入力します。 - アクセスの種類: [アクセスキー - プログラムによるアクセス] を選択します。
- ユーザー: 「
- [Create user] をクリックします。
- 最小限の読み取りポリシー(カスタム)を適用する: [ユーザー] > [secops-reader] > [権限] > [権限を追加] > [ポリシーを直接適用] > [ポリシーを作成]。
JSON エディタで、次のポリシーを入力します。
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::team-cymru-scout-ti/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::team-cymru-scout-ti" } ] }名前を
secops-reader-policyに設定します。[Create policy] > を検索して選択 > [Next] > [Add permissions] に移動します。
[セキュリティ認証情報] > [アクセスキー] > [アクセスキーを作成] に移動します。
CSV をダウンロードします(これらの値はフィードに入力されます)。
Team Cymru Scout Threat Intelligence を取り込むように Google SecOps でフィードを構成する
- [SIEM 設定] > [フィード] に移動します。
- [Add New Feed] をクリックします。
- [フィード名] フィールドに、フィードの名前を入力します(例:
Team Cymru Scout Threat Intelligence)。 - [ソースタイプ] として [Amazon S3 V2] を選択します。
- [Log type] として [Team Cymru Scout Threat Intelligence] を選択します。
- [次へ] をクリックします。
- 次の入力パラメータの値を指定します。
- S3 URI:
s3://team-cymru-scout-ti/team-cymru/scout-ti/ - Source deletion options: 必要に応じて削除オプションを選択します。
- ファイルの最大経過日数: 指定した日数以内に変更されたファイルを含めます。デフォルトは 180 日です。
- アクセスキー ID: S3 バケットにアクセスできるユーザー アクセスキー。
- シークレット アクセスキー: S3 バケットにアクセスできるユーザーのシークレット キー。
- アセットの名前空間: アセットの名前空間。
- Ingestion labels: このフィードのイベントに適用されるラベル。
- S3 URI:
- [次へ] をクリックします。
- [Finalize] 画面で新しいフィードの設定を確認し、[送信] をクリックします。
サポートされている Team Cymru Scout Threat Intelligence ログ形式
Team Cymru Scout Threat Intelligence パーサーは、KV(LEEF)形式と CSV 形式のログをサポートしています。
サポートされている Team Cymru Scout Threat Intelligence のサンプルログ
JSON
{ "account_name": "dummy_secops_user", "account_type": "basic_auth", "used_queries": 1414, "remaining_queries": 48586, "used_queries_percentage": 2.828, "query_limit": 50000, "used_foundation_queries": 4224, "remaining_foundation_queries": 5776, "foundation_query_limit": 10000, "used_foundation_queries_percentage": 42.24, "event_type": "account_usage" }
さらにサポートが必要な場合 コミュニティ メンバーや Google SecOps のプロフェッショナルから回答を得ることができます。