CrowdStrike Identity Protection (IDP) 서비스 로그 수집
이 문서에서는 Google Cloud Storage를 사용하여 CrowdStrike Identity Protection (IDP) Services 로그를 Google Security Operations에 수집하는 방법을 설명합니다. 통합은 CrowdStrike Unified Alerts API를 사용하여 ID 보호 이벤트를 수집하고 내장된 CS_IDP 파서에서 처리할 수 있도록 NDJSON 형식으로 저장합니다.
시작하기 전에
다음 기본 요건이 충족되었는지 확인합니다.
- Google SecOps 인스턴스
- Cloud Storage API가 사용 설정된 GCP 프로젝트
- GCS 버킷을 만들고 관리할 수 있는 권한
- GCS 버킷의 IAM 정책을 관리할 수 있는 권한
- Cloud Run 서비스, Pub/Sub 주제, Cloud Scheduler 작업을 만들 수 있는 권한
- CrowdStrike Falcon Console 및 API 키 관리에 대한 권한 액세스
Google Cloud Storage 버킷 만들기
- Google Cloud Console로 이동합니다.
- 프로젝트를 선택하거나 새 프로젝트를 만듭니다.
- 탐색 메뉴에서 Cloud Storage> 버킷으로 이동합니다.
- 버킷 만들기를 클릭합니다.
다음 구성 세부정보를 제공합니다.
설정 값 버킷 이름 지정 전역적으로 고유한 이름 (예: crowdstrike-idp-logs-bucket)을 입력합니다.위치 유형 필요에 따라 선택 (리전, 이중 리전, 멀티 리전) 위치 위치를 선택합니다 (예: us-central1).스토리지 클래스 Standard (자주 액세스하는 로그에 권장) 액세스 제어 균일 (권장) 보호 도구 선택사항: 객체 버전 관리 또는 보관 정책 사용 설정 만들기를 클릭합니다.
CrowdStrike ID 보호 필수 요건 가져오기
- CrowdStrike Falcon Console에 로그인합니다.
- 지원 및 리소스 > API 클라이언트 및 키로 이동합니다.
- 새 API 클라이언트 추가를 클릭합니다.
- 다음 구성 세부정보를 제공합니다.
- 클라이언트 이름:
Google SecOps IDP Integration를 입력합니다. - 설명:
API client for Google SecOps integration을 입력합니다. - 범위: 알림: 읽기 (alerts:read) 범위를 선택합니다 (여기에는 ID 보호 알림이 포함됨).
- 클라이언트 이름:
- 추가를 클릭합니다.
- 다음 세부정보를 복사하여 안전한 위치에 저장합니다.
- Client ID
- 클라이언트 보안 비밀번호 (한 번만 표시됨)
- 기준 URL (예: 미국-1의 경우
api.crowdstrike.com, 미국-2의 경우api.us-2.crowdstrike.com, EU-1의 경우api.eu-1.crowdstrike.com)
Cloud Run 함수의 서비스 계정 만들기
Cloud Run 함수에는 GCS 버킷에 쓸 수 있는 권한이 있는 서비스 계정이 필요합니다.
서비스 계정 만들기
- GCP 콘솔에서 IAM 및 관리자 > 서비스 계정으로 이동합니다.
- 서비스 계정 만들기를 클릭합니다.
- 다음 구성 세부정보를 제공합니다.
- 서비스 계정 이름:
crowdstrike-idp-collector-sa을 입력합니다. - 서비스 계정 설명:
Service account for Cloud Run function to collect CrowdStrike IDP logs을 입력합니다.
- 서비스 계정 이름:
- 만들고 계속하기를 클릭합니다.
- 이 서비스 계정에 프로젝트에 대한 액세스 권한 부여 섹션에서 다음 단계를 따르세요.
- 역할 선택을 클릭합니다.
- 스토리지 객체 관리자를 검색하여 선택합니다.
- + 다른 역할 추가를 클릭합니다.
- Cloud Run 호출자를 검색하여 선택합니다.
- + 다른 역할 추가를 클릭합니다.
- Cloud Functions 호출자를 검색하여 선택합니다.
- 계속을 클릭합니다.
- 완료를 클릭합니다.
이러한 역할은 다음 작업에 필요합니다.
- 스토리지 객체 관리자: GCS 버킷에 로그를 쓰고 상태 파일을 관리합니다.
- Cloud Run 호출자: Pub/Sub가 함수를 호출하도록 허용
- Cloud Functions 호출자: 함수 호출 허용
GCS 버킷에 대한 IAM 권한 부여
GCS 버킷에 대한 쓰기 권한을 서비스 계정에 부여합니다.
- Cloud Storage> 버킷으로 이동합니다.
- 버킷 이름을 클릭합니다.
- 권한 탭으로 이동합니다.
- 액세스 권한 부여를 클릭합니다.
- 다음 구성 세부정보를 제공합니다.
- 주 구성원 추가: 서비스 계정 이메일 (예:
crowdstrike-idp-collector-sa@PROJECT_ID.iam.gserviceaccount.com)을 입력합니다. - 역할 할당: 스토리지 객체 관리자를 선택합니다.
- 주 구성원 추가: 서비스 계정 이메일 (예:
- 저장을 클릭합니다.
게시/구독 주제 만들기
Cloud Scheduler가 게시하고 Cloud Run 함수가 구독할 Pub/Sub 주제를 만듭니다.
- GCP Console에서 Pub/Sub > 주제로 이동합니다.
- 주제 만들기를 클릭합니다.
- 다음 구성 세부정보를 제공합니다.
- 주제 ID:
crowdstrike-idp-trigger를 입력합니다. - 다른 설정은 기본값으로 둡니다.
- 주제 ID:
- 만들기를 클릭합니다.
로그를 수집하는 Cloud Run 함수 만들기
Cloud Run 함수는 Cloud Scheduler의 Pub/Sub 메시지에 의해 트리거되어 CrowdStrike Identity Protection API에서 로그를 가져오고 GCS에 씁니다.
- GCP 콘솔에서 Cloud Run으로 이동합니다.
- 서비스 만들기를 클릭합니다.
- 함수를 선택합니다 (인라인 편집기를 사용하여 함수 만들기).
구성 섹션에서 다음 구성 세부정보를 제공합니다.
설정 값 서비스 이름 crowdstrike-idp-collector리전 GCS 버킷과 일치하는 리전을 선택합니다 (예: us-central1).런타임 Python 3.12 이상 선택 트리거 (선택사항) 섹션에서 다음을 수행합니다.
- + 트리거 추가를 클릭합니다.
- Cloud Pub/Sub를 선택합니다.
- Cloud Pub/Sub 주제 선택에서
crowdstrike-idp-trigger주제를 선택합니다. - 저장을 클릭합니다.
인증 섹션에서 다음을 구성합니다.
- 인증 필요를 선택합니다.
- ID 및 액세스 관리 (IAM)를 확인합니다.
아래로 스크롤하고 컨테이너, 네트워킹, 보안을 펼칩니다.
보안 탭으로 이동합니다.
- 서비스 계정: 서비스 계정
crowdstrike-idp-collector-sa를 선택합니다.
- 서비스 계정: 서비스 계정
컨테이너 탭으로 이동합니다.
- 변수 및 보안 비밀을 클릭합니다.
- 각 환경 변수에 대해 + 변수 추가를 클릭합니다.
변수 이름 예시 값 GCS_BUCKETcrowdstrike-idp-logs-bucketGCS_PREFIXcrowdstrike-idp/STATE_KEYcrowdstrike-idp/state.jsonCROWDSTRIKE_CLIENT_IDyour-client-idCROWDSTRIKE_CLIENT_SECRETyour-client-secretAPI_BASEapi.crowdstrike.com(미국-1),api.us-2.crowdstrike.com(미국-2),api.eu-1.crowdstrike.com(EU-1)ALERTS_LIMIT1000(선택사항, 페이지당 최대 10,000개)변수 및 보안 비밀 탭에서 요청까지 아래로 스크롤합니다.
- 요청 제한 시간:
600초 (10분)를 입력합니다.
- 요청 제한 시간:
컨테이너의 설정 탭으로 이동합니다.
- 리소스 섹션에서 다음을 수행합니다.
- 메모리: 512MiB 이상을 선택합니다.
- CPU: 1을 선택합니다.
- 완료를 클릭합니다.
- 리소스 섹션에서 다음을 수행합니다.
실행 환경으로 스크롤합니다.
- 기본을 선택합니다 (권장).
버전 확장 섹션에서 다음을 수행합니다.
- 최소 인스턴스 수:
0를 입력합니다. - 최대 인스턴스 수:
100을 입력합니다 (또는 예상 부하에 따라 조정).
- 최소 인스턴스 수:
만들기를 클릭합니다.
서비스가 생성될 때까지 기다립니다 (1~2분).
서비스가 생성되면 인라인 코드 편집기가 자동으로 열립니다.
함수 코드 추가
- 함수 진입점에 main을 입력합니다.
인라인 코드 편집기에서 다음 두 파일을 만듭니다.
- 첫 번째 파일: main.py:
import functions_framework from google.cloud import storage import json import os import urllib3 from datetime import datetime, timezone from urllib.parse import urlencode # Initialize HTTP client http = urllib3.PoolManager() # Initialize Storage client storage_client = storage.Client() @functions_framework.cloud_event def main(cloud_event): """ Fetch CrowdStrike Identity Protection alerts (Unified Alerts API) and store RAW JSON (NDJSON) to GCS for the CS_IDP parser. No transformation is performed. """ # Get environment variables bucket_name = os.environ.get('GCS_BUCKET') prefix = os.environ.get('GCS_PREFIX', 'crowdstrike-idp/') state_key = os.environ.get('STATE_KEY', 'crowdstrike-idp/state.json') client_id = os.environ.get('CROWDSTRIKE_CLIENT_ID') client_secret = os.environ.get('CROWDSTRIKE_CLIENT_SECRET') api_base = os.environ.get('API_BASE') if not all([bucket_name, client_id, client_secret, api_base]): print('Error: Missing required environment variables') return try: bucket = storage_client.bucket(bucket_name) # Get OAuth token token = get_token(client_id, client_secret, api_base) # Load last processed timestamp last_ts = get_last_timestamp(bucket, state_key) # FQL filter for Identity Protection alerts only, newer than checkpoint fql_filter = f"product:'idp' + updated_timestamp:>'{last_ts}'" sort = 'updated_timestamp.asc' # Step 1: Get list of alert IDs all_ids = [] per_page = int(os.environ.get('ALERTS_LIMIT', '1000')) offset = 0 while True: page_ids = query_alert_ids(api_base, token, fql_filter, sort, per_page, offset) if not page_ids: break all_ids.extend(page_ids) if len(page_ids) < per_page: break offset += per_page if not all_ids: print('No new Identity Protection alerts.') return # Step 2: Get alert details in batches (max 1000 IDs per request) details = [] max_batch = 1000 for i in range(0, len(all_ids), max_batch): batch = all_ids[i:i + max_batch] details.extend(fetch_alert_details(api_base, token, batch)) if details: # Sort by updated_timestamp details.sort(key=lambda d: d.get('updated_timestamp', d.get('created_timestamp', ''))) latest = details[-1].get('updated_timestamp') or details[-1].get('created_timestamp') # Write to GCS timestamp = datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S') blob_name = f"{prefix}cs_idp_{timestamp}.json" blob = bucket.blob(blob_name) # NDJSON format log_lines = '\n'.join([json.dumps(d, separators=(',', ':')) for d in details]) blob.upload_from_string(log_lines, content_type='application/x-ndjson') print(f'Wrote {len(details)} alerts to {blob_name}') # Update state update_state(bucket, state_key, latest) except Exception as e: print(f'Error processing alerts: {str(e)}') raise def get_token(client_id, client_secret, api_base): """Get OAuth2 token from CrowdStrike API""" url = f"https://{api_base}/oauth2/token" data = f"client_id={client_id}&client_secret={client_secret}&grant_type=client_credentials" headers = {'Content-Type': 'application/x-www-form-urlencoded'} r = http.request('POST', url, body=data, headers=headers) if r.status != 200: raise Exception(f'Auth failed: {r.status} {r.data}') return json.loads(r.data.decode('utf-8'))['access_token'] def query_alert_ids(api_base, token, fql_filter, sort, limit, offset): """Query alert IDs using filters""" url = f"https://{api_base}/alerts/queries/alerts/v2" params = { 'filter': fql_filter, 'sort': sort, 'limit': str(limit), 'offset': str(offset) } qs = urlencode(params) r = http.request('GET', f"{url}?{qs}", headers={'Authorization': f'Bearer {token}'}) if r.status != 200: raise Exception(f'Query alerts failed: {r.status} {r.data}') resp = json.loads(r.data.decode('utf-8')) return resp.get('resources', []) def fetch_alert_details(api_base, token, composite_ids): """Fetch detailed alert data by composite IDs""" url = f"https://{api_base}/alerts/entities/alerts/v2" body = {'composite_ids': composite_ids} headers = { 'Authorization': f'Bearer {token}', 'Content-Type': 'application/json' } r = http.request('POST', url, body=json.dumps(body).encode('utf-8'), headers=headers) if r.status != 200: raise Exception(f'Fetch alert details failed: {r.status} {r.data}') resp = json.loads(r.data.decode('utf-8')) return resp.get('resources', []) def get_last_timestamp(bucket, key, default='2023-01-01T00:00:00Z'): """Get last processed timestamp from GCS state file""" try: blob = bucket.blob(key) if blob.exists(): state_data = blob.download_as_text() state = json.loads(state_data) return state.get('last_timestamp', default) except Exception as e: print(f'Warning: Could not load state: {str(e)}') return default def update_state(bucket, key, ts): """Update last processed timestamp in GCS state file""" state = { 'last_timestamp': ts, 'updated': datetime.now(timezone.utc).isoformat() } blob = bucket.blob(key) blob.upload_from_string(json.dumps(state), content_type='application/json')- 두 번째 파일: requirements.txt:
functions-framework==3.* google-cloud-storage==2.* urllib3>=2.0.0배포를 클릭하여 함수를 저장하고 배포합니다.
배포가 완료될 때까지 기다립니다 (2~3분).
Cloud Scheduler 작업 만들기
Cloud Scheduler는 일정 간격으로 Pub/Sub 주제에 메시지를 게시하여 Cloud Run 함수를 트리거합니다.
- GCP Console에서 Cloud Scheduler로 이동합니다.
- 작업 만들기를 클릭합니다.
다음 구성 세부정보를 제공합니다.
설정 값 이름 crowdstrike-idp-collector-15m리전 Cloud Run 함수와 동일한 리전 선택 주파수 */15 * * * *(15분마다)시간대 시간대 선택 (UTC 권장) 타겟 유형 Pub/Sub 주제 crowdstrike-idp-trigger주제를 선택합니다.메일 본문 {}(빈 JSON 객체)만들기를 클릭합니다.
스케줄러 작업 테스트
- Cloud Scheduler 콘솔에서 작업을 찾습니다.
- 강제 실행을 클릭하여 수동으로 트리거합니다.
- 몇 초간 기다린 후 Cloud Run > 서비스 > crowdstrike-idp-collector > 로그로 이동합니다.
- 함수가 성공적으로 실행되었는지 확인합니다.
- GCS 버킷을 확인하여 로그가 작성되었는지 확인합니다.
Google SecOps 서비스 계정 가져오기
Google SecOps는 고유한 서비스 계정을 사용하여 GCS 버킷에서 데이터를 읽습니다. 이 서비스 계정에 버킷에 대한 액세스 권한을 부여해야 합니다.
서비스 계정 이메일 가져오기
- SIEM 설정> 피드로 이동합니다.
- 새 피드 추가를 클릭합니다.
- 단일 피드 구성을 클릭합니다.
- 피드 이름 필드에 피드 이름을 입력합니다(예:
CrowdStrike Identity Protection Services logs). - 소스 유형으로 Google Cloud Storage V2를 선택합니다.
- 로그 유형으로 Crowdstrike Identity Protection Services를 선택합니다.
서비스 계정 가져오기를 클릭합니다. 고유한 서비스 계정 이메일이 표시됩니다(예:
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com다음 단계에서 사용할 수 있도록 이 이메일 주소를 복사합니다.
Google SecOps 서비스 계정에 IAM 권한 부여
Google SecOps 서비스 계정에는 GCS 버킷에 대한 스토리지 객체 뷰어 역할이 필요합니다.
- Cloud Storage> 버킷으로 이동합니다.
- 버킷 이름을 클릭합니다.
- 권한 탭으로 이동합니다.
- 액세스 권한 부여를 클릭합니다.
- 다음 구성 세부정보를 제공합니다.
- 주 구성원 추가: Google SecOps 서비스 계정 이메일을 붙여넣습니다.
- 역할 할당: 스토리지 객체 뷰어를 선택합니다.
저장을 클릭합니다.
CrowdStrike Identity Protection Services 로그를 수집하도록 Google SecOps에서 피드 구성
- SIEM 설정> 피드로 이동합니다.
- 새 피드 추가를 클릭합니다.
- 단일 피드 구성을 클릭합니다.
- 피드 이름 필드에 피드 이름을 입력합니다(예:
CrowdStrike Identity Protection Services logs). - 소스 유형으로 Google Cloud Storage V2를 선택합니다.
- 로그 유형으로 Crowdstrike Identity Protection Services를 선택합니다.
- 다음을 클릭합니다.
다음 입력 매개변수의 값을 지정합니다.
스토리지 버킷 URL: 다음 접두사 경로를 사용하여 GCS 버킷 URI를 입력합니다.
gs://crowdstrike-idp-logs-bucket/crowdstrike-idp/다음과 같이 바꿉니다.
crowdstrike-idp-logs-bucket: GCS 버킷 이름입니다.crowdstrike-idp/: 로그가 저장되는 접두사/폴더 경로입니다.
소스 삭제 옵션: 환경설정에 따라 삭제 옵션을 선택합니다.
- 삭제 안함: 전송 후 파일을 삭제하지 않습니다 (테스트에 권장).
- 전송된 파일 삭제: 전송이 완료되면 파일을 삭제합니다.
전송된 파일 및 빈 디렉터리 삭제: 전송이 완료되면 파일과 빈 디렉터리를 삭제합니다.
최대 파일 기간: 지난 일수 동안 수정된 파일을 포함합니다. 기본값은 180일입니다.
애셋 네임스페이스: 애셋 네임스페이스입니다.
수집 라벨: 이 피드의 이벤트에 적용할 라벨입니다.
다음을 클릭합니다.
확정 화면에서 새 피드 구성을 검토한 다음 제출을 클릭합니다.
도움이 더 필요한가요? 커뮤니티 회원 및 Google SecOps 전문가에게 문의하여 답변을 받으세요.