收集 CrowdStrike IDP 服務記錄
本文說明如何使用 Amazon S3,將 CrowdStrike Identity Protection (IDP) 服務記錄擷取至 Google Security Operations。這項整合功能會使用 CrowdStrike Unified Alerts API 收集 Identity Protection 事件,並以 NDJSON 格式儲存,以供內建的 CS_IDP 剖析器處理。
事前準備
請確認您已完成下列事前準備事項:
- Google SecOps 執行個體。
- CrowdStrike Falcon Console 的特殊存取權和 API 金鑰管理。
- AWS 的特殊權限 (S3、Identity and Access Management (IAM)、Lambda、EventBridge)。
取得 CrowdStrike Identity Protection 的必要條件
- 登入 CrowdStrike Falcon Console。
- 依序前往「支援與資源」>「API 用戶端和金鑰」。
- 按一下「Add new API Client」(新增 API 用戶端)。
- 提供下列設定詳細資料:
- 用戶端名稱:輸入
Google SecOps IDP Integration
。 - 說明:輸入
API client for Google SecOps integration
。 - 範圍:選取「Alerts: READ」(快訊:讀取) (
alerts:read
) 範圍 (包括 Identity Protection 快訊)。
- 用戶端名稱:輸入
- 按一下「新增」。
- 複製下列詳細資料並存放在安全位置:
- 用戶端 ID
- 用戶端密鑰 (只會顯示一次)
- 基本網址 (例如:美國 1 的
api.crowdstrike.com
、美國 2 的api.us-2.crowdstrike.com
、歐洲 1 的api.eu-1.crowdstrike.com
)
為 Google SecOps 設定 AWS S3 值區和 IAM
- 按照這份使用者指南建立 Amazon S3 bucket:建立 bucket
- 儲存 bucket 的「名稱」和「區域」,以供日後參考 (例如
crowdstrike-idp-logs-bucket
)。 - 請按照這份使用者指南建立使用者:建立 IAM 使用者。
- 選取建立的「使用者」。
- 選取「安全憑證」分頁標籤。
- 在「Access Keys」部分中,按一下「Create Access Key」。
- 選取「第三方服務」做為「用途」。
- 點選「下一步」。
- 選用:新增說明標記。
- 按一下「建立存取金鑰」。
- 按一下「下載 .CSV 檔案」,儲存「存取金鑰」和「私密存取金鑰」以供日後參考。
- 按一下 [完成]。
- 選取「權限」分頁標籤。
- 在「權限政策」部分中,按一下「新增權限」。
- 選取「新增權限」。
- 選取「直接附加政策」。
- 搜尋「AmazonS3FullAccess」AmazonS3FullAccess政策。
- 選取政策。
- 點選「下一步」。
- 按一下「Add permissions」。
設定 S3 上傳的身分與存取權管理政策和角色
- 在 AWS 控制台中,依序前往「IAM」>「Policies」(政策)。
- 按一下「建立政策」>「JSON」分頁。
- 複製並貼上下列政策。
政策 JSON (如果您輸入的 bucket 名稱不同,請替換
crowdstrike-idp-logs-bucket
):{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::crowdstrike-idp-logs-bucket/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::crowdstrike-idp-logs-bucket/crowdstrike-idp/state.json" } ] }
依序點選「下一步」>「建立政策」。
依序前往「IAM」>「Roles」>「Create role」>「AWS service」>「Lambda」。
附加新建立的政策。
為角色命名
CrowdStrike-IDP-Lambda-Role
,然後按一下「建立角色」。
建立 Lambda 函式
- 在 AWS 控制台中,依序前往「Lambda」>「Functions」>「Create function」。
- 按一下「從頭開始撰寫」。
請提供下列設定詳細資料:
設定 值 名稱 CrowdStrike-IDP-Collector
執行階段 Python 3.13 架構 x86_64 執行角色 CrowdStrike-IDP-Lambda-Role
建立函式後,開啟「程式碼」分頁,刪除存根並貼上下列程式碼:
import json import boto3 import urllib3 import os from datetime import datetime, timezone from urllib.parse import urlencode HTTP = urllib3.PoolManager() def lambda_handler(event, context): """ Fetch CrowdStrike Identity Protection alerts (Unified Alerts API) and store RAW JSON (NDJSON) to S3 for the CS_IDP parser. No transformation is performed. """ # Environment variables s3_bucket = os.environ['S3_BUCKET'] s3_prefix = os.environ['S3_PREFIX'] state_key = os.environ['STATE_KEY'] client_id = os.environ['CROWDSTRIKE_CLIENT_ID'] client_secret = os.environ['CROWDSTRIKE_CLIENT_SECRET'] api_base = os.environ['API_BASE'] s3 = boto3.client('s3') token = get_token(client_id, client_secret, api_base) last_ts = get_last_timestamp(s3, s3_bucket, state_key) # FQL filter for Identity Protection alerts only, newer than checkpoint fql_filter = f"product:'idp'+updated_timestamp:>'{last_ts}'" sort = 'updated_timestamp.asc' # Step 1: Get list of alert IDs all_ids = [] per_page = int(os.environ.get('ALERTS_LIMIT', '1000')) # up to 10000 per SDK docs offset = 0 while True: page_ids = query_alert_ids(api_base, token, fql_filter, sort, per_page, offset) if not page_ids: break all_ids.extend(page_ids) if len(page_ids) < per_page: break offset += per_page if not all_ids: return {'statusCode': 200, 'body': 'No new Identity Protection alerts.'} # Step 2: Get alert details in batches (max 1000 IDs per request) details = [] max_batch = 1000 for i in range(0, len(all_ids), max_batch): batch = all_ids[i:i+max_batch] details.extend(fetch_alert_details(api_base, token, batch)) if details: details.sort(key=lambda d: d.get('updated_timestamp', d.get('created_timestamp', ''))) latest = details[-1].get('updated_timestamp') or details[-1].get('created_timestamp') key = f"{s3_prefix}cs_idp_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}.json" body = '\n'.join(json.dumps(d, separators=(',', ':')) for d in details) s3.put_object( Bucket=s3_bucket, Key=key, Body=body.encode('utf-8'), ContentType='application/x-ndjson' ) update_state(s3, s3_bucket, state_key, latest) return {'statusCode': 200, 'body': f'Wrote {len(details)} alerts to S3.'} def get_token(client_id, client_secret, api_base): """Get OAuth2 token from CrowdStrike API""" url = f"https://{api_base}/oauth2/token" data = f"client_id={client_id}&client_secret={client_secret}&grant_type=client_credentials" headers = {'Content-Type': 'application/x-www-form-urlencoded'} r = HTTP.request('POST', url, body=data, headers=headers) if r.status != 200: raise Exception(f'Auth failed: {r.status} {r.data}') return json.loads(r.data.decode('utf-8'))['access_token'] def query_alert_ids(api_base, token, fql_filter, sort, limit, offset): """Query alert IDs using filters""" url = f"https://{api_base}/alerts/queries/alerts/v2" params = {'filter': fql_filter, 'sort': sort, 'limit': str(limit), 'offset': str(offset)} qs = urlencode(params) r = HTTP.request('GET', f"{url}?{qs}", headers={'Authorization': f'Bearer {token}'}) if r.status != 200: raise Exception(f'Query alerts failed: {r.status} {r.data}') resp = json.loads(r.data.decode('utf-8')) return resp.get('resources', []) def fetch_alert_details(api_base, token, composite_ids): """Fetch detailed alert data by composite IDs""" url = f"https://{api_base}/alerts/entities/alerts/v2" body = {'composite_ids': composite_ids} headers = {'Authorization': f'Bearer {token}', 'Content-Type': 'application/json'} r = HTTP.request('POST', url, body=json.dumps(body).encode('utf-8'), headers=headers) if r.status != 200: raise Exception(f'Fetch alert details failed: {r.status} {r.data}') resp = json.loads(r.data.decode('utf-8')) return resp.get('resources', []) def get_last_timestamp(s3, bucket, key, default='2023-01-01T00:00:00Z'): """Get last processed timestamp from S3 state file""" try: obj = s3.get_object(Bucket=bucket, Key=key) state = json.loads(obj['Body'].read().decode('utf-8')) return state.get('last_timestamp', default) except s3.exceptions.NoSuchKey: return default def update_state(s3, bucket, key, ts): """Update last processed timestamp in S3 state file""" state = {'last_timestamp': ts, 'updated': datetime.now(timezone.utc).isoformat()} s3.put_object(Bucket=bucket, Key=key, Body=json.dumps(state).encode('utf-8'), ContentType='application/json')
依序前往「設定」>「環境變數」。
依序點選「編輯」> 新增環境變數。
輸入下表提供的環境變數,並將範例值換成您的值。
環境變數
鍵 範例值 S3_BUCKET
crowdstrike-idp-logs-bucket
S3_PREFIX
crowdstrike-idp/
STATE_KEY
crowdstrike-idp/state.json
CROWDSTRIKE_CLIENT_ID
<your-client-id>
CROWDSTRIKE_CLIENT_SECRET
<your-client-secret>
API_BASE
api.crowdstrike.com
(美國 - 1)、api.us-2.crowdstrike.com
(美國 - 2)、api.eu-1.crowdstrike.com
(歐洲 - 1)ALERTS_LIMIT
1000
(選用,每頁最多 10000 個)建立函式後,請留在函式頁面 (或依序開啟「Lambda」>「Functions」>「your-function」)。
選取「設定」分頁標籤。
在「一般設定」面板中,按一下「編輯」。
將「Timeout」(逾時間隔) 變更為「5 minutes (300 seconds)」(5 分鐘 (300 秒)),然後按一下「Save」(儲存)。
建立 EventBridge 排程
- 依序前往「Amazon EventBridge」>「Scheduler」>「Create schedule」。
- 提供下列設定詳細資料:
- 週期性時間表:費率 (
15 minutes
)。 - 目標:您的 Lambda 函式
CrowdStrike-IDP-Collector
。 - 名稱:
CrowdStrike-IDP-Collector-15m
。
- 週期性時間表:費率 (
- 按一下「建立時間表」。
(選用) 為 Google SecOps 建立唯讀 IAM 使用者和金鑰
- 前往 AWS 控制台 > IAM > 使用者。
- 點選 [Add users] (新增使用者)。
- 提供下列設定詳細資料:
- 使用者:輸入
secops-reader
。 - 存取類型:選取「存取金鑰 - 程式輔助存取」。
- 使用者:輸入
- 按一下「建立使用者」。
- 附加最低讀取權限政策 (自訂):依序選取「Users」(使用者) >「secops-reader」>「Permissions」(權限) >「Add permissions」(新增權限) >「Attach policies directly」(直接附加政策) >「Create policy」(建立政策)。
JSON:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::crowdstrike-idp-logs-bucket/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::crowdstrike-idp-logs-bucket" } ] }
Name =
secops-reader-policy
。依序點選「建立政策」> 搜尋/選取 >「下一步」>「新增權限」。
為
secops-reader
建立存取金鑰:依序點選「安全憑證」>「存取金鑰」。按一下「建立存取金鑰」。
下載
.CSV
。(您會將這些值貼到動態饋給中)。
在 Google SecOps 中設定動態消息,擷取 CrowdStrike Identity Protection Services 記錄
- 依序前往「SIEM 設定」>「動態饋給」。
- 按一下「+ 新增動態消息」。
- 在「動態饋給名稱」欄位中輸入動態饋給名稱 (例如
CrowdStrike Identity Protection Services logs
)。 - 選取「Amazon S3 V2」做為「來源類型」。
- 選取「Crowdstrike Identity Protection Services」做為「記錄類型」。
- 點選「下一步」。
- 指定下列輸入參數的值:
- S3 URI:
s3://crowdstrike-idp-logs-bucket/crowdstrike-idp/
- 來源刪除選項:根據偏好設定選取刪除選項。
- 檔案存在時間上限:包含在過去天數內修改的檔案。預設值為 180 天。
- 存取金鑰 ID:具有 S3 值區存取權的使用者存取金鑰。
- 存取密鑰:具有 S3 bucket 存取權的使用者私密金鑰。
- 資產命名空間:資產命名空間。
- 擷取標籤:套用至這個動態饋給事件的標籤。
- S3 URI:
- 點選「下一步」。
- 在「完成」畫面中檢查新的動態饋給設定,然後按一下「提交」。
還有其他問題嗎?向社群成員和 Google SecOps 專業人員尋求答案。