Digital Shadows Indicators 로그 수집
이 문서에서는 Amazon S3를 사용하여 Digital Shadows Indicators 로그를 Google Security Operations로 수집하는 방법을 설명합니다.
Digital Shadows Indicators (현재 ReliaQuest GreyMatter DRP의 일부)는 오픈 웹, 딥 웹, 다크 웹, 소셜 미디어 전반에서 외부 위협, 데이터 노출, 브랜드 사칭을 지속적으로 모니터링하고 감지하는 디지털 리스크 보안 플랫폼입니다. 조직이 디지털 위험을 식별하고 완화할 수 있도록 위협 인텔리전스, 사고 알림, 침해 지표 (IOC)를 제공합니다.
시작하기 전에
- Google SecOps 인스턴스
- Digital Shadows Indicators 포털에 대한 액세스 권한
- AWS(S3, IAM)에 대한 액세스 권한
- API 액세스가 사용 설정된 Digital Shadows Indicators의 활성 구독
Digital Shadows Indicators API 사용자 인증 정보 수집
https://portal-digitalshadows.com에서 Digital Shadows Indicators Portal에 로그인합니다.- 설정 > API 사용자 인증 정보로 이동합니다.
- 기존 API 키가 없는 경우 새 API 클라이언트 만들기 또는 API 키 생성을 클릭합니다.
다음 세부정보를 복사하여 안전한 위치에 저장합니다.
- API 키: 6자리 API 키
- API 비밀번호: 32자리 API 비밀번호
- 계정 ID: 계정 ID (포털에 표시되거나 Digital Shadows 담당자가 제공)
- API 기준 URL:
https://api.searchlight.app/v1또는https://portal-digitalshadows.com/api/v1(테넌트 지역에 따라 다름)
Google SecOps용 AWS S3 버킷 및 IAM 구성
- 이 사용자 가이드(버킷 만들기)에 따라 Amazon S3 버킷을 만듭니다.
- 나중에 참조할 수 있도록 버킷 이름과 리전을 저장합니다(예:
digital-shadows-logs). - 이 사용자 가이드(IAM 사용자 만들기)에 따라 사용자를 만듭니다.
- 생성된 사용자를 선택합니다.
- 보안용 사용자 인증 정보 탭을 선택합니다.
- 액세스 키 섹션에서 액세스 키 만들기를 클릭합니다.
- 사용 사례로 서드 파티 서비스를 선택합니다.
- 다음을 클릭합니다.
- 선택사항: 설명 태그를 추가합니다.
- 액세스 키 만들기를 클릭합니다.
- .csv 파일 다운로드를 클릭하여 나중에 참고할 수 있도록 액세스 키와 보안 비밀 액세스 키를 저장합니다.
- 완료를 클릭합니다.
- 권한 탭을 선택합니다.
- 권한 정책 섹션에서 권한 추가를 클릭합니다.
- 권한 추가를 선택합니다.
- 정책 직접 연결을 선택합니다.
- AmazonS3FullAccess 정책을 검색합니다.
- 정책을 선택합니다.
- 다음을 클릭합니다.
- 권한 추가를 클릭합니다.
S3 업로드용 IAM 정책 및 역할 구성
- AWS 콘솔에서 IAM > 정책 > 정책 생성 > JSON 탭으로 이동합니다.
- 아래 정책을 복사하여 붙여넣습니다.
정책 JSON (다른 버킷 이름을 입력한 경우
digital-shadows-logs대체):{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::digital-shadows-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::digital-shadows-logs/digital-shadows/state.json" } ] }다음 > 정책 만들기를 클릭합니다.
IAM > 역할 > 역할 생성 > AWS 서비스 > Lambda로 이동합니다.
새로 만든 정책을 연결합니다.
역할 이름을
DigitalShadowsLambdaRole로 지정하고 역할 만들기를 클릭합니다.
Lambda 함수 만들기
- AWS 콘솔에서 Lambda > 함수 > 함수 만들기로 이동합니다.
- 처음부터 작성을 클릭합니다.
다음 구성 세부정보를 제공합니다.
설정 값 이름 DigitalShadowsCollector런타임 Python 3.13 아키텍처 x86_64 실행 역할 DigitalShadowsLambdaRole함수가 생성되면 코드 탭을 열고 스텁을 삭제한 후 아래 코드 (DigitalShadowsCollector.py)를 붙여넣습니다.
import urllib3 import json import boto3 import os import base64 import logging import time from datetime import datetime, timedelta, timezone from urllib.parse import urlencode logger = logging.getLogger() logger.setLevel(logging.INFO) HTTP = urllib3.PoolManager(retries=False) storage_client = boto3.client('s3') def _basic_auth_header(key: str, secret: str) -> str: token = base64.b64encode(f"{key}:{secret}".encode("utf-8")).decode("utf-8") return f"Basic {token}" def _load_state(bucket, key, default_days=30) -> str: """Return ISO8601 checkpoint (UTC).""" try: response = storage_client.get_object(Bucket=bucket, Key=key) state_data = response['Body'].read().decode('utf-8') state = json.loads(state_data) ts = state.get("last_timestamp") if ts: return ts except storage_client.exceptions.NoSuchKey: logger.info("No previous state found, starting from default lookback") except Exception as e: logger.warning(f"State read error: {e}") return (datetime.now(timezone.utc) - timedelta(days=default_days)).isoformat() def _save_state(bucket, key, ts: str) -> None: storage_client.put_object( Bucket=bucket, Key=key, Body=json.dumps({"last_timestamp": ts}), ContentType="application/json" ) def _get_json(url: str, headers: dict, params: dict, backoff_s=2, max_retries=3) -> dict: qs = f"?{urlencode(params)}" if params else "" for attempt in range(max_retries): r = HTTP.request("GET", f"{url}{qs}", headers=headers) if r.status == 200: return json.loads(r.data.decode("utf-8")) if r.status in (429, 500, 502, 503, 504): wait = backoff_s * (2 ** attempt) logger.warning(f"HTTP {r.status} from DS API, retrying in {wait}s") time.sleep(wait) continue raise RuntimeError(f"DS API error {r.status}: {r.data[:200]}") raise RuntimeError("Exceeded retry budget for DS API") def _collect(api_base, headers, path, since_ts, account_id, page_size, max_pages, time_param): items = [] for page in range(max_pages): params = { "limit": page_size, "offset": page * page_size, time_param: since_ts, } if account_id: params["account-id"] = account_id data = _get_json(f"{api_base}/{path}", headers, params) batch = data.get("items") or data.get("data") or [] if not batch: break items.extend(batch) if len(batch) < page_size: break return items def lambda_handler(event, context): bucket_name = os.environ["S3_BUCKET"] api_key = os.environ["DS_API_KEY"] api_secret = os.environ["DS_API_SECRET"] prefix = os.environ.get("S3_PREFIX", "digital-shadows") state_key = os.environ.get("STATE_KEY", "digital-shadows/state.json") api_base = os.environ.get("API_BASE", "https://api.searchlight.app/v1") account_id = os.environ.get("DS_ACCOUNT_ID", "") page_size = int(os.environ.get("PAGE_SIZE", "100")) max_pages = int(os.environ.get("MAX_PAGES", "10")) try: last_ts = _load_state(bucket_name, state_key) logger.info(f"Checkpoint: {last_ts}") headers = { "Authorization": _basic_auth_header(api_key, api_secret), "Accept": "application/json", "User-Agent": "Chronicle-DigitalShadows-S3/1.0", } records = [] incidents = _collect( api_base, headers, "incidents", last_ts, account_id, page_size, max_pages, time_param="published-after" ) for incident in incidents: incident['_source_type'] = 'incident' records.extend(incidents) intel_incidents = _collect( api_base, headers, "intel-incidents", last_ts, account_id, page_size, max_pages, time_param="published-after" ) for intel in intel_incidents: intel['_source_type'] = 'intelligence_incident' records.extend(intel_incidents) indicators = _collect( api_base, headers, "indicators", last_ts, account_id, page_size, max_pages, time_param="lastUpdated-after" ) for indicator in indicators: indicator['_source_type'] = 'ioc' records.extend(indicators) if records: newest = max( (r.get("updated") or r.get("raised") or r.get("lastUpdated") or last_ts) for r in records ) key = f"{prefix}/digital_shadows_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}.json" body = "\n".join(json.dumps(r, separators=(",", ":")) for r in records) storage_client.put_object( Bucket=bucket_name, Key=key, Body=body, ContentType="application/x-ndjson" ) _save_state(bucket_name, state_key, newest) msg = f"Wrote {len(records)} records to s3://{bucket_name}/{key}" else: msg = "No new records" logger.info(msg) return {'statusCode': 200, 'body': json.dumps({'message': msg, 'records': len(records) if records else 0})} except Exception as e: logger.error(f"Error processing logs: {str(e)}") raise구성 > 환경 변수 > 수정 > 새 환경 변수 추가로 이동합니다.
아래에 제공된 환경 변수를 입력하고 값으로 바꿉니다.
환경 변수
키 예시 값 S3_BUCKETdigital-shadows-logsS3_PREFIXdigital-shadows/STATE_KEYdigital-shadows/state.jsonDS_API_KEYABC123(6자리 API 키)DS_API_SECRETyour-32-character-api-secretAPI_BASEhttps://api.searchlight.app/v1DS_ACCOUNT_IDyour-account-idPAGE_SIZE100MAX_PAGES10함수가 생성된 후 해당 페이지에 머무르거나 Lambda > 함수 > DigitalShadowsCollector를 엽니다.
구성 탭을 선택합니다.
일반 구성 패널에서 수정을 클릭합니다.
제한 시간을 5분 (300초)으로 변경하고 저장을 클릭합니다.
EventBridge 일정 만들기
- Amazon EventBridge > 스케줄러 > 일정 만들기로 이동합니다.
- 다음 구성 세부정보를 제공합니다.
- 반복 일정: 요금 (
1 hour) - 타겟: Lambda 함수
DigitalShadowsCollector - 이름:
DigitalShadowsCollector-1h
- 반복 일정: 요금 (
- 일정 만들기를 클릭합니다.
Digital Shadows Indicators 로그를 수집하도록 Google SecOps에서 피드 구성
- SIEM 설정> 피드로 이동합니다.
- 새 피드 추가를 클릭합니다.
- 다음 페이지에서 단일 피드 구성을 클릭합니다.
- 피드 이름에 고유한 이름을 입력합니다.
- 소스 유형으로 Amazon S3 V2를 선택합니다.
- 로그 유형으로 Digital Shadows Indicators를 선택합니다.
- 다음을 클릭한 후 제출을 클릭합니다.
다음 필드의 값을 지정합니다.
- S3 URI:
s3://digital-shadows-logs/digital-shadows/ - 소스 삭제 옵션: 환경설정에 따라 삭제 옵션을 선택합니다.
- 최대 파일 기간: 지난 일수 동안 수정된 파일을 포함합니다 (기본값은 180일).
- 액세스 키 ID: S3 버킷에 대한 액세스 권한이 있는 사용자 액세스 키
- 보안 비밀 액세스 키: S3 버킷에 액세스할 수 있는 사용자 보안 비밀 키
- 애셋 네임스페이스: 애셋 네임스페이스
- 수집 라벨: 이 피드의 이벤트에 적용할 라벨입니다.
- S3 URI:
다음을 클릭한 후 제출을 클릭합니다.
UDM 매핑 테이블
| 로그 필드 | UDM 매핑 | 논리 |
|---|---|---|
| 값 | entity.entity.file.md5 | type == 'MD5'인 경우 설정 |
| 값 | entity.entity.file.sha1 | type == 'SHA1'인 경우 설정 |
| 값 | entity.entity.file.sha256 | type == 'SHA256'인 경우 설정 |
| 값 | entity.entity.hostname | type == 'HOST'인 경우 설정 |
| 값 | entity.entity.ip | type == 'IP'인 경우 값이 직접 복사됨 |
| 값 | entity.entity.url | type == 'URL'인 경우 설정 |
| 값 | entity.entity.user.email_addresses | type == 'EMAIL'인 경우 값이 직접 복사됨 |
| 유형 | entity.metadata.entity_type | type == 'HOST'인 경우 'DOMAIN_NAME', type == 'IP'인 경우 'IP_ADDRESS', type == 'URL'인 경우 'URL', type == 'EMAIL'인 경우 'USER', type이 ["SHA1","SHA256","MD5"]에 있는 경우 'FILE', 그 외의 경우 'UNKNOWN_ENTITYTYPE'으로 설정됩니다. |
| lastUpdated | entity.metadata.interval.start_time | 비어 있지 않은 경우 ISO8601에서 타임스탬프로 변환됨 |
| id | entity.metadata.product_entity_id | 비어 있지 않은 경우 값이 직접 복사됨 |
| attributionTag.id, attributionTag.name, attributionTag.type | entity.metadata.threat.about.labels | 비어 있지 않은 경우 {key: tag field name, value: tag value} 객체와 병합됩니다. |
| sourceType | entity.metadata.threat.category_details | 값이 직접 복사됨 |
| entity.metadata.threat.threat_feed_name | '표시등'으로 설정 | |
| id | entity.metadata.threat.threat_id | 비어 있지 않은 경우 값이 직접 복사됨 |
| sourceIdentifier | entity.metadata.threat.url_back_to_product | 값이 직접 복사됨 |
| entity.metadata.product_name | '표시등'으로 설정 | |
| entity.metadata.vendor_name | '디지털 그림자'로 설정 |
도움이 더 필요하신가요? 커뮤니티 회원 및 Google SecOps 전문가에게 문의하여 답변을 받으세요.