Team Cymru Scout 위협 인텔리전스 데이터 수집
이 문서에서는 Amazon S3를 사용하여 Team Cymru Scout 위협 인텔리전스 데이터를 Google Security Operations로 수집하는 방법을 설명합니다.
시작하기 전에
다음 기본 요건이 충족되었는지 확인합니다.
- Google SecOps 인스턴스
- Team Cymru Scout 테넌트에 대한 액세스 권한 관리
- AWS (S3, IAM, Lambda, EventBridge)에 대한 액세스 권한
Get Team Cymru Scout 사전 요구사항
- Team Cymru Scout Platform에 로그인합니다.
- API 키 웹으로 이동합니다.
- 만들기 버튼을 클릭합니다.
- 필요한 경우 키에 대한 설명을 입력합니다.
- 키 만들기 버튼을 클릭하여 API 키를 생성합니다.
- 다음 세부정보를 복사하여 안전한 위치에 저장합니다.
- SCOUT_API_KEY - API 액세스 키
- SCOUT_BASE_URL - Scout API 기본 URL
Google SecOps용 AWS S3 버킷 및 IAM 구성
- 이 사용자 가이드(버킷 만들기)에 따라 Amazon S3 버킷을 만듭니다.
- 나중에 참조할 수 있도록 버킷 이름과 리전을 저장합니다(예:
team-cymru-scout-ti). - 이 사용자 가이드(IAM 사용자 만들기)에 따라 사용자를 만듭니다.
- 생성된 사용자를 선택합니다.
- 보안용 사용자 인증 정보 탭을 선택합니다.
- 액세스 키 섹션에서 액세스 키 만들기를 클릭합니다.
- 사용 사례로 서드 파티 서비스를 선택합니다.
- 다음을 클릭합니다.
- 선택사항: 설명 태그를 추가합니다.
- 액세스 키 만들기를 클릭합니다.
- CSV 파일 다운로드를 클릭하여 나중에 사용할 수 있도록 액세스 키와 보안 비밀 액세스 키를 저장합니다.
- 완료를 클릭합니다.
- 권한 탭을 선택합니다.
- 권한 정책 섹션에서 권한 추가를 클릭합니다.
- 권한 추가를 선택합니다.
- 정책 직접 연결을 선택합니다.
- AmazonS3FullAccess 정책을 검색합니다.
- 정책을 선택합니다.
- 다음을 클릭합니다.
- 권한 추가를 클릭합니다.
S3 업로드용 IAM 정책 및 역할 구성
- AWS 콘솔에서 IAM > 정책으로 이동합니다.
- 정책 만들기 > JSON 탭을 클릭합니다.
다음 정책을 입력합니다.
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::team-cymru-scout-ti/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::team-cymru-scout-ti/team-cymru/scout-ti/state.json" } ] }- 다른 버킷 이름을 입력한 경우
team-cymru-scout-ti을 바꿉니다.
- 다른 버킷 이름을 입력한 경우
다음 > 정책 만들기를 클릭합니다.
IAM > 역할 > 역할 생성 > AWS 서비스 > Lambda로 이동합니다.
새로 만든 정책을 연결합니다.
역할 이름을
TeamCymruScoutToS3Role로 지정하고 역할 만들기를 클릭합니다.
Lambda 함수 만들기
- AWS 콘솔에서 Lambda > 함수 > 함수 만들기로 이동합니다.
- 처음부터 작성을 클릭합니다.
다음 구성 세부정보를 제공합니다.
설정 값 이름 team_cymru_scout_ti_to_s3런타임 Python 3.13 아키텍처 x86_64 실행 역할 TeamCymruScoutToS3Role함수를 만든 후 코드 탭을 열고 스텁을 삭제하고 다음 코드를 입력합니다 (
team_cymru_scout_ti_to_s3.py).```python #!/usr/bin/env python3 # Lambda: Pull Team Cymru Scout Threat Intelligence exports to S3 (no transform) import os, json, time from urllib.request import Request, urlopen from urllib.error import HTTPError, URLError import boto3 S3_BUCKET = os.environ["S3_BUCKET"] S3_PREFIX = os.environ.get("S3_PREFIX", "team-cymru/scout-ti/") STATE_KEY = os.environ.get("STATE_KEY", "team-cymru/scout-ti/state.json") WINDOW_SEC = int(os.environ.get("WINDOW_SECONDS", "3600")) HTTP_TIMEOUT = int(os.environ.get("HTTP_TIMEOUT", "60")) HTTP_RETRIES = int(os.environ.get("HTTP_RETRIES", "3")) MODE = os.environ.get("MODE", "GET").upper() API_HEADERS = json.loads(os.environ.get("API_HEADERS", "{}")) MAX_PAGES = int(os.environ.get("MAX_PAGES", "10")) # GET mode DOWNLOAD_URL_TEMPLATE = os.environ.get("DOWNLOAD_URL_TEMPLATE", "") # POST_JSON mode API_URL = os.environ.get("API_URL", "") JSON_BODY_TEMPLATE = os.environ.get("JSON_BODY_TEMPLATE", "") # Team Cymru Scout specific SCOUT_BASE_URL = os.environ.get("SCOUT_BASE_URL", "https://api.scout.cymru.com") SCOUT_API_KEY = os.environ.get("SCOUT_API_KEY", "") s3 = boto3.client("s3") def _iso(ts: float) -> str: return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(ts)) def _get_state() -> dict: try: obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY) b = obj["Body"].read() return json.loads(b) if b else {} except Exception: return {} def _put_state(st: dict): s3.put_object( Bucket=S3_BUCKET, Key=STATE_KEY, Body=json.dumps(st, separators=(",", ":")).encode("utf-8"), ContentType="application/json", ) def _http(url: str, method: str = "GET", body: bytes | None = None) -> tuple[bytes, str]: attempt = 0 while True: try: req = Request(url, method=method) # Add headers headers = API_HEADERS.copy() if SCOUT_API_KEY and "Authorization" not in headers: headers["Authorization"] = f"Bearer {SCOUT_API_KEY}" headers.setdefault("Accept", "application/json") for k, v in headers.items(): req.add_header(k, v) if body is not None: req.add_header("Content-Type", "application/json") with urlopen(req, data=body, timeout=HTTP_TIMEOUT) as r: return r.read(), r.headers.get("Content-Type", "application/json") except HTTPError as e: if e.code in (429, 500, 502, 503, 504) and attempt < HTTP_RETRIES: delay = 1 + attempt try: delay = int(e.headers.get("Retry-After", delay)) except Exception: pass time.sleep(max(1, delay)) attempt += 1 continue raise except URLError: if attempt < HTTP_RETRIES: time.sleep(1 + attempt) attempt += 1 continue raise def _write(blob: bytes, ctype: str, from_ts: float, to_ts: float, page: int) -> str: date_path = time.strftime("%Y/%m/%d", time.gmtime(to_ts)) key = f"{S3_PREFIX}/{date_path}/scout_ti_{int(from_ts)}_{int(to_ts)}_p{page:03d}.json" s3.put_object(Bucket=S3_BUCKET, Key=key, Body=blob, ContentType=ctype or "application/json") return key def _next_cursor(obj: dict) -> str | None: if not isinstance(obj, dict): return None for container in (obj, obj.get("meta", {}) or {}, obj.get("metadata", {}) or {}): for k in ("next", "next_cursor", "nextCursor", "nextPageToken", "continuation", "cursor", "pagedResultsCookie"): v = container.get(k) if v: return str(v) return None def _loop(from_ts: float, to_ts: float) -> dict: cursor, page, written = None, 0, 0 while page < MAX_PAGES: if MODE == "GET": if DOWNLOAD_URL_TEMPLATE: url = (DOWNLOAD_URL_TEMPLATE .replace("{FROM}", _iso(from_ts)) .replace("{TO}", _iso(to_ts)) .replace("{CURSOR}", cursor or "")) else: # Default Scout API endpoint (adjust based on actual API) url = f"{SCOUT_BASE_URL}/v1/threat-intelligence?start={_iso(from_ts)}&end={_iso(to_ts)}" if cursor: url += f"&cursor={cursor}" blob, ctype = _http(url, method="GET") else: assert API_URL and JSON_BODY_TEMPLATE, "API_URL and JSON_BODY_TEMPLATE required for MODE=POST_JSON" body = (JSON_BODY_TEMPLATE .replace("{FROM}", _iso(from_ts)) .replace("{TO}", _iso(to_ts)) .replace("{CURSOR}", cursor or "")).encode("utf-8") blob, ctype = _http(API_URL, method="POST", body=body) # Normalize to JSON bytes for storage try: parsed = json.loads(blob.decode("utf-8")) normalized = json.dumps(parsed, separators=(",", ":")).encode("utf-8") ctype_out = "application/json" except Exception: normalized = blob ctype_out = ctype or "application/octet-stream" _ = _write(normalized, ctype_out, from_ts, to_ts, page) written += 1 page += 1 # Follow cursor if JSON and cursor exists try: if parsed and isinstance(parsed, dict): cursor = _next_cursor(parsed) if not cursor: break except Exception: break return {"pages": page, "objects": written} def lambda_handler(event=None, context=None): st = _get_state() now = time.time() from_ts = st.get("last_to_ts") or (now - WINDOW_SEC) to_ts = now res = _loop(from_ts, to_ts) st["last_to_ts"] = to_ts _put_state(st) return {"ok": True, "window": {"from": _iso(from_ts), "to": _iso(to_ts)}, **res} if __name__ == "__main__": print(lambda_handler()) ```구성 > 환경 변수로 이동합니다.
수정 > 새 환경 변수 추가를 클릭합니다.
제공된 다음 환경 변수를 입력하고 값으로 바꿉니다.
키 예시 값 S3_BUCKETteam-cymru-scout-tiS3_PREFIXteam-cymru/scout-ti/STATE_KEYteam-cymru/scout-ti/state.jsonSCOUT_BASE_URLhttps://api.scout.cymru.comSCOUT_API_KEYyour-scout-api-keyWINDOW_SECONDS3600HTTP_TIMEOUT60HTTP_RETRIES3MODEGET또는POST_JSONAPI_HEADERS{"Authorization":"Bearer <token>","Accept":"application/json"}DOWNLOAD_URL_TEMPLATE(GET 모드) {FROM},{TO},{CURSOR}가 포함된 맞춤 URL 템플릿API_URL(POST_JSON 모드) API 엔드포인트 URL JSON_BODY_TEMPLATE(POST_JSON 모드) {FROM},{TO},{CURSOR}이 포함된 JSON 본문MAX_PAGES10함수가 생성되면 해당 페이지에 머무르거나 Lambda > Functions > your-function을 엽니다.
구성 탭을 선택합니다.
일반 구성 패널에서 수정을 클릭합니다.
제한 시간을 5분 (300초)로 변경하고 저장을 클릭합니다.
EventBridge 일정 만들기
- Amazon EventBridge > 스케줄러 > 일정 만들기로 이동합니다.
- 다음 구성 세부정보를 제공합니다.
- 반복 일정: 요금 (
1 hour) - 타겟: Lambda 함수
team_cymru_scout_ti_to_s3 - 이름:
team-cymru-scout-ti-1h.
- 반복 일정: 요금 (
- 일정 만들기를 클릭합니다.
선택사항: Google SecOps용 읽기 전용 IAM 사용자 및 키 만들기
- AWS 콘솔 > IAM > 사용자 > 사용자 추가로 이동합니다.
- 사용자 추가를 클릭합니다.
- 다음 구성 세부정보를 제공합니다.
- 사용자:
secops-reader를 입력합니다. - 액세스 유형: 액세스 키 – 프로그래매틱 액세스를 선택합니다.
- 사용자:
- 사용자 만들기를 클릭합니다.
- 최소 읽기 정책(맞춤) 연결: 사용자 > secops-reader > 권한 > 권한 추가 > 정책 직접 연결 > 정책 만들기
JSON 편집기에 다음 정책을 입력합니다.
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::team-cymru-scout-ti/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::team-cymru-scout-ti" } ] }이름을
secops-reader-policy로 설정합니다.정책 만들기 > 검색/선택 > 다음 > 권한 추가로 이동합니다.
보안용 사용자 인증 정보> 액세스 키> 액세스 키 만들기로 이동합니다.
CSV를 다운로드합니다(이러한 값은 피드에 입력됨).
Team Cymru Scout 위협 인텔리전스를 수집하도록 Google SecOps에서 피드 구성
- SIEM 설정> 피드로 이동합니다.
- 새 피드 추가를 클릭합니다.
- 피드 이름 필드에 피드 이름을 입력합니다(예:
Team Cymru Scout Threat Intelligence). - 소스 유형으로 Amazon S3 V2를 선택합니다.
- 로그 유형으로 Team Cymru Scout Threat Intelligence를 선택합니다.
- 다음을 클릭합니다.
- 다음 입력 파라미터의 값을 지정합니다.
- S3 URI:
s3://team-cymru-scout-ti/team-cymru/scout-ti/ - 소스 삭제 옵션: 환경설정에 따라 삭제 옵션을 선택합니다.
- 최대 파일 기간: 지난 일수 동안 수정된 파일을 포함합니다. 기본값은 180일입니다.
- 액세스 키 ID: S3 버킷에 대한 액세스 권한이 있는 사용자 액세스 키
- 보안 비밀 액세스 키: S3 버킷에 액세스할 수 있는 사용자 보안 비밀 키입니다.
- 애셋 네임스페이스: 애셋 네임스페이스입니다.
- 수집 라벨: 이 피드의 이벤트에 적용된 라벨입니다.
- S3 URI:
- 다음을 클릭합니다.
- 확정 화면에서 새 피드 구성을 검토한 다음 제출을 클릭합니다.
지원되는 Team Cymru Scout 위협 인텔리전스 로그 형식
Team Cymru Scout 위협 인텔리전스 파서는 KV (LEEF) 및 CSV 형식의 로그를 지원합니다.
지원되는 Team Cymru Scout 위협 인텔리전스 샘플 로그
JSON
{ "account_name": "dummy_secops_user", "account_type": "basic_auth", "used_queries": 1414, "remaining_queries": 48586, "used_queries_percentage": 2.828, "query_limit": 50000, "used_foundation_queries": 4224, "remaining_foundation_queries": 5776, "foundation_query_limit": 10000, "used_foundation_queries_percentage": 42.24, "event_type": "account_usage" }
도움이 더 필요하신가요? 커뮤니티 회원 및 Google SecOps 전문가에게 문의하여 답변을 받으세요.