CrowdStrike IDP Services 로그 수집
이 문서에서는 Amazon S3를 사용하여 CrowdStrike Identity Protection (IDP) 서비스 로그를 Google Security Operations에 수집하는 방법을 설명합니다. 통합은 CrowdStrike Unified Alerts API를 사용하여 ID 보호 이벤트를 수집하고 내장된 CS_IDP 파서에서 처리할 수 있도록 NDJSON 형식으로 저장합니다.
시작하기 전에
다음 기본 요건이 충족되었는지 확인합니다.
- Google SecOps 인스턴스입니다.
- CrowdStrike Falcon Console에 대한 권한 있는 액세스 및 API 키 관리
- AWS (S3, Identity and Access Management (IAM), Lambda, EventBridge)에 대한 권한 있는 액세스
CrowdStrike Identity Protection 필수사항 가져오기
- CrowdStrike Falcon Console에 로그인합니다.
- 지원 및 리소스 > API 클라이언트 및 키로 이동합니다.
- 새 API 클라이언트 추가를 클릭합니다.
- 다음 구성 세부정보를 제공합니다.
- 클라이언트 이름:
Google SecOps IDP Integration
를 입력합니다. - 설명:
API client for Google SecOps integration
을 입력합니다. - 범위: 알림: 읽기 (
alerts:read
) 범위를 선택합니다 (여기에는 ID 보호 알림이 포함됨).
- 클라이언트 이름:
- 추가를 클릭합니다.
- 다음 세부정보를 복사하여 안전한 위치에 저장합니다.
- 클라이언트 ID
- 클라이언트 보안 비밀번호 (한 번만 표시됨)
- 기준 URL (예: 미국-1의 경우
api.crowdstrike.com
, 미국-2의 경우api.us-2.crowdstrike.com
, EU-1의 경우api.eu-1.crowdstrike.com
)
Google SecOps용 AWS S3 버킷 및 IAM 구성
- 이 사용자 가이드(버킷 만들기)에 따라 Amazon S3 버킷을 만듭니다.
- 나중에 참조할 수 있도록 버킷 이름과 리전을 저장합니다 (예:
crowdstrike-idp-logs-bucket
). - 이 사용자 가이드(IAM 사용자 만들기)에 따라 사용자를 만듭니다.
- 생성된 사용자를 선택합니다.
- 보안 사용자 인증 정보 탭을 선택합니다.
- 액세스 키 섹션에서 액세스 키 만들기를 클릭합니다.
- 사용 사례로 서드 파티 서비스를 선택합니다.
- 다음을 클릭합니다.
- 선택사항: 설명 태그를 추가합니다.
- 액세스 키 만들기를 클릭합니다.
- .CSV 파일 다운로드를 클릭하여 향후 참조할 수 있도록 액세스 키와 비밀 액세스 키를 저장합니다.
- 완료를 클릭합니다.
- 권한 탭을 선택합니다.
- 권한 정책 섹션에서 권한 추가를 클릭합니다.
- 권한 추가를 선택합니다.
- 정책 직접 연결을 선택합니다.
- AmazonS3FullAccess 정책을 검색합니다.
- 정책을 선택합니다.
- 다음을 클릭합니다.
- 권한 추가를 클릭합니다.
S3 업로드용 IAM 정책 및 역할 구성
- AWS 콘솔에서 IAM > 정책으로 이동합니다.
- 정책 만들기 > JSON 탭을 클릭합니다.
- 다음 정책을 복사하여 붙여넣습니다.
정책 JSON (다른 버킷 이름을 입력한 경우
crowdstrike-idp-logs-bucket
대체):{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::crowdstrike-idp-logs-bucket/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::crowdstrike-idp-logs-bucket/crowdstrike-idp/state.json" } ] }
다음 > 정책 만들기를 클릭합니다.
IAM > 역할 > 역할 생성 > AWS 서비스 > Lambda로 이동합니다.
새로 만든 정책을 연결합니다.
역할 이름을
CrowdStrike-IDP-Lambda-Role
로 지정하고 역할 만들기를 클릭합니다.
Lambda 함수 만들기
- AWS 콘솔에서 Lambda > 함수 > 함수 만들기로 이동합니다.
- 처음부터 작성을 클릭합니다.
다음 구성 세부정보를 제공합니다.
설정 값 이름 CrowdStrike-IDP-Collector
런타임 Python 3.13 아키텍처 x86_64 실행 역할 CrowdStrike-IDP-Lambda-Role
함수가 생성되면 코드 탭을 열고 스텁을 삭제한 후 다음 코드를 붙여넣습니다.
import json import boto3 import urllib3 import os from datetime import datetime, timezone from urllib.parse import urlencode HTTP = urllib3.PoolManager() def lambda_handler(event, context): """ Fetch CrowdStrike Identity Protection alerts (Unified Alerts API) and store RAW JSON (NDJSON) to S3 for the CS_IDP parser. No transformation is performed. """ # Environment variables s3_bucket = os.environ['S3_BUCKET'] s3_prefix = os.environ['S3_PREFIX'] state_key = os.environ['STATE_KEY'] client_id = os.environ['CROWDSTRIKE_CLIENT_ID'] client_secret = os.environ['CROWDSTRIKE_CLIENT_SECRET'] api_base = os.environ['API_BASE'] s3 = boto3.client('s3') token = get_token(client_id, client_secret, api_base) last_ts = get_last_timestamp(s3, s3_bucket, state_key) # FQL filter for Identity Protection alerts only, newer than checkpoint fql_filter = f"product:'idp'+updated_timestamp:>'{last_ts}'" sort = 'updated_timestamp.asc' # Step 1: Get list of alert IDs all_ids = [] per_page = int(os.environ.get('ALERTS_LIMIT', '1000')) # up to 10000 per SDK docs offset = 0 while True: page_ids = query_alert_ids(api_base, token, fql_filter, sort, per_page, offset) if not page_ids: break all_ids.extend(page_ids) if len(page_ids) < per_page: break offset += per_page if not all_ids: return {'statusCode': 200, 'body': 'No new Identity Protection alerts.'} # Step 2: Get alert details in batches (max 1000 IDs per request) details = [] max_batch = 1000 for i in range(0, len(all_ids), max_batch): batch = all_ids[i:i+max_batch] details.extend(fetch_alert_details(api_base, token, batch)) if details: details.sort(key=lambda d: d.get('updated_timestamp', d.get('created_timestamp', ''))) latest = details[-1].get('updated_timestamp') or details[-1].get('created_timestamp') key = f"{s3_prefix}cs_idp_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}.json" body = '\n'.join(json.dumps(d, separators=(',', ':')) for d in details) s3.put_object( Bucket=s3_bucket, Key=key, Body=body.encode('utf-8'), ContentType='application/x-ndjson' ) update_state(s3, s3_bucket, state_key, latest) return {'statusCode': 200, 'body': f'Wrote {len(details)} alerts to S3.'} def get_token(client_id, client_secret, api_base): """Get OAuth2 token from CrowdStrike API""" url = f"https://{api_base}/oauth2/token" data = f"client_id={client_id}&client_secret={client_secret}&grant_type=client_credentials" headers = {'Content-Type': 'application/x-www-form-urlencoded'} r = HTTP.request('POST', url, body=data, headers=headers) if r.status != 200: raise Exception(f'Auth failed: {r.status} {r.data}') return json.loads(r.data.decode('utf-8'))['access_token'] def query_alert_ids(api_base, token, fql_filter, sort, limit, offset): """Query alert IDs using filters""" url = f"https://{api_base}/alerts/queries/alerts/v2" params = {'filter': fql_filter, 'sort': sort, 'limit': str(limit), 'offset': str(offset)} qs = urlencode(params) r = HTTP.request('GET', f"{url}?{qs}", headers={'Authorization': f'Bearer {token}'}) if r.status != 200: raise Exception(f'Query alerts failed: {r.status} {r.data}') resp = json.loads(r.data.decode('utf-8')) return resp.get('resources', []) def fetch_alert_details(api_base, token, composite_ids): """Fetch detailed alert data by composite IDs""" url = f"https://{api_base}/alerts/entities/alerts/v2" body = {'composite_ids': composite_ids} headers = {'Authorization': f'Bearer {token}', 'Content-Type': 'application/json'} r = HTTP.request('POST', url, body=json.dumps(body).encode('utf-8'), headers=headers) if r.status != 200: raise Exception(f'Fetch alert details failed: {r.status} {r.data}') resp = json.loads(r.data.decode('utf-8')) return resp.get('resources', []) def get_last_timestamp(s3, bucket, key, default='2023-01-01T00:00:00Z'): """Get last processed timestamp from S3 state file""" try: obj = s3.get_object(Bucket=bucket, Key=key) state = json.loads(obj['Body'].read().decode('utf-8')) return state.get('last_timestamp', default) except s3.exceptions.NoSuchKey: return default def update_state(s3, bucket, key, ts): """Update last processed timestamp in S3 state file""" state = {'last_timestamp': ts, 'updated': datetime.now(timezone.utc).isoformat()} s3.put_object(Bucket=bucket, Key=key, Body=json.dumps(state).encode('utf-8'), ContentType='application/json')
구성 > 환경 변수로 이동합니다.
수정 > 새 환경 변수 추가를 클릭합니다.
다음 표에 제공된 환경 변수를 입력하고 예시 값을 실제 값으로 바꿉니다.
환경 변수
키 예시 값 S3_BUCKET
crowdstrike-idp-logs-bucket
S3_PREFIX
crowdstrike-idp/
STATE_KEY
crowdstrike-idp/state.json
CROWDSTRIKE_CLIENT_ID
<your-client-id>
CROWDSTRIKE_CLIENT_SECRET
<your-client-secret>
API_BASE
api.crowdstrike.com
(미국-1),api.us-2.crowdstrike.com
(미국-2),api.eu-1.crowdstrike.com
(EU-1)ALERTS_LIMIT
1000
(선택사항, 페이지당 최대 10,000개)함수가 생성되면 해당 페이지에 머무르거나 Lambda > 함수 > your-function을 엽니다.
구성 탭을 선택합니다.
일반 구성 패널에서 수정을 클릭합니다.
제한 시간을 5분 (300초)으로 변경하고 저장을 클릭합니다.
EventBridge 일정 만들기
- Amazon EventBridge > 스케줄러 > 일정 만들기로 이동합니다.
- 다음 구성 세부정보를 제공합니다.
- 반복 일정: 요금 (
15 minutes
) - 타겟: Lambda 함수
CrowdStrike-IDP-Collector
- 이름:
CrowdStrike-IDP-Collector-15m
.
- 반복 일정: 요금 (
- 일정 만들기를 클릭합니다.
(선택사항) Google SecOps용 읽기 전용 IAM 사용자 및 키 만들기
- AWS 콘솔 > IAM > 사용자로 이동합니다.
- Add users를 클릭합니다.
- 다음 구성 세부정보를 제공합니다.
- 사용자:
secops-reader
를 입력합니다. - 액세스 유형: 액세스 키 – 프로그래매틱 액세스를 선택합니다.
- 사용자:
- 사용자 만들기를 클릭합니다.
- 최소 읽기 정책 (맞춤) 연결: 사용자 > secops-reader > 권한 > 권한 추가 > 정책 직접 연결 > 정책 만들기
JSON:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::crowdstrike-idp-logs-bucket/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::crowdstrike-idp-logs-bucket" } ] }
이름 =
secops-reader-policy
정책 만들기 > 검색/선택 > 다음 > 권한 추가를 클릭합니다.
secops-reader
의 액세스 키를 만듭니다(보안 사용자 인증 정보 > 액세스 키).액세스 키 만들기를 클릭합니다.
.CSV
을 다운로드합니다. (이 값은 피드에 붙여넣습니다.)
CrowdStrike Identity Protection Services 로그를 수집하도록 Google SecOps에서 피드 구성
- SIEM 설정> 피드로 이동합니다.
- + 새 피드 추가를 클릭합니다.
- 피드 이름 필드에 피드 이름을 입력합니다 (예:
CrowdStrike Identity Protection Services logs
). - 소스 유형으로 Amazon S3 V2를 선택합니다.
- 로그 유형으로 Crowdstrike Identity Protection Services를 선택합니다.
- 다음을 클릭합니다.
- 다음 입력 파라미터의 값을 지정합니다.
- S3 URI:
s3://crowdstrike-idp-logs-bucket/crowdstrike-idp/
- 소스 삭제 옵션: 환경설정에 따라 삭제 옵션을 선택합니다.
- 최대 파일 기간: 지난 일수 동안 수정된 파일을 포함합니다. 기본값은 180일입니다.
- 액세스 키 ID: S3 버킷에 대한 액세스 권한이 있는 사용자 액세스 키입니다.
- 보안 비밀 액세스 키: S3 버킷에 액세스할 수 있는 사용자 보안 비밀 키입니다.
- 애셋 네임스페이스: 애셋 네임스페이스입니다.
- 수집 라벨: 이 피드의 이벤트에 적용된 라벨입니다.
- S3 URI:
- 다음을 클릭합니다.
- 확정 화면에서 새 피드 구성을 검토한 다음 제출을 클릭합니다.
도움이 더 필요하신가요? 커뮤니티 회원 및 Google SecOps 전문가로부터 답변을 받으세요.