收集 CrowdStrike IDP 服务日志

支持的平台:

本文档介绍了如何使用 Amazon S3 将 CrowdStrike Identity Protection (IDP) Services 日志注入到 Google Security Operations。该集成使用 CrowdStrike Unified Alerts API 收集身份保护事件,并以 NDJSON 格式存储这些事件,以便由内置的 CS_IDP 解析器进行处理。

准备工作

请确保满足以下前提条件:

  • Google SecOps 实例。
  • CrowdStrike Falcon 控制台的特权访问权限和 API 密钥管理。
  • AWS(S3、Identity and Access Management (IAM)、Lambda、EventBridge)的特权访问权限。

获取 CrowdStrike Identity Protection 前提条件

  1. 登录 CrowdStrike Falcon 控制台
  2. 依次前往支持和资源 > API 客户端和密钥
  3. 点击添加新的 API 客户端
  4. 提供以下配置详细信息:
    • 客户端名称:输入 Google SecOps IDP Integration
    • 说明:输入 API client for Google SecOps integration
    • 范围:选择提醒:读取 (alerts:read) 范围(包括 Identity Protection 提醒)。
  5. 点击 Add(添加)。
  6. 复制以下详细信息并将其保存在安全的位置:
    • 客户端 ID (Client ID)
    • 客户端密钥(此密钥仅显示一次)
    • 基本网址(示例:api.crowdstrike.com [美国 1],api.us-2.crowdstrike.com [美国 2],api.eu-1.crowdstrike.com [欧盟 1])

为 Google SecOps 配置 AWS S3 存储桶和 IAM

  1. 按照以下用户指南创建 Amazon S3 存储桶创建存储桶
  2. 保存存储桶名称区域以供日后参考(例如 crowdstrike-idp-logs-bucket)。
  3. 按照以下用户指南创建用户创建 IAM 用户
  4. 选择创建的用户
  5. 选择安全凭据标签页。
  6. 访问密钥部分中,点击创建访问密钥
  7. 选择第三方服务作为使用情形
  8. 点击下一步
  9. 可选:添加说明标记。
  10. 点击创建访问密钥
  11. 点击下载 .CSV 文件,保存访问密钥秘密访问密钥,以供日后参考。
  12. 点击完成
  13. 选择权限标签页。
  14. 权限政策部分中,点击添加权限
  15. 选择添加权限
  16. 选择直接附加政策
  17. 搜索 AmazonS3FullAccess 政策。
  18. 选择相应政策。
  19. 点击下一步
  20. 点击添加权限

为 S3 上传配置 IAM 政策和角色

  1. AWS 控制台中,依次前往 IAM > 政策
  2. 依次点击创建政策 > JSON 标签页
  3. 复制并粘贴以下政策。
  4. 政策 JSON(如果您输入了其他存储桶名称,请替换 crowdstrike-idp-logs-bucket):

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::crowdstrike-idp-logs-bucket/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": "s3:GetObject",
          "Resource": "arn:aws:s3:::crowdstrike-idp-logs-bucket/crowdstrike-idp/state.json"
        }
      ]
    }
    
  5. 依次点击下一步 > 创建政策

  6. 依次前往 IAM > 角色 > 创建角色 > AWS 服务 > Lambda

  7. 附加新创建的政策。

  8. 将角色命名为 CrowdStrike-IDP-Lambda-Role,然后点击创建角色

创建 Lambda 函数

  1. AWS 控制台中,依次前往 Lambda > 函数 > 创建函数
  2. 点击从头开始创作
  3. 提供以下配置详细信息:

    设置
    名称 CrowdStrike-IDP-Collector
    运行时 Python 3.13
    架构 x86_64
    执行角色 CrowdStrike-IDP-Lambda-Role
  4. 创建函数后,打开 Code 标签页,删除桩代码并粘贴以下代码:

    import json
    import boto3
    import urllib3
    import os
    from datetime import datetime, timezone
    from urllib.parse import urlencode
    
    HTTP = urllib3.PoolManager()
    
    def lambda_handler(event, context):
        """
        Fetch CrowdStrike Identity Protection alerts (Unified Alerts API)
        and store RAW JSON (NDJSON) to S3 for the CS_IDP parser.
        No transformation is performed.
        """
        # Environment variables
        s3_bucket = os.environ['S3_BUCKET']
        s3_prefix = os.environ['S3_PREFIX']
        state_key = os.environ['STATE_KEY']
        client_id = os.environ['CROWDSTRIKE_CLIENT_ID']
        client_secret = os.environ['CROWDSTRIKE_CLIENT_SECRET']
        api_base = os.environ['API_BASE']
    
        s3 = boto3.client('s3')
    
        token = get_token(client_id, client_secret, api_base)
        last_ts = get_last_timestamp(s3, s3_bucket, state_key)
    
        # FQL filter for Identity Protection alerts only, newer than checkpoint
        fql_filter = f"product:'idp'+updated_timestamp:>'{last_ts}'"
        sort = 'updated_timestamp.asc'
    
        # Step 1: Get list of alert IDs
        all_ids = []
        per_page = int(os.environ.get('ALERTS_LIMIT', '1000'))  # up to 10000 per SDK docs
        offset = 0
        while True:
            page_ids = query_alert_ids(api_base, token, fql_filter, sort, per_page, offset)
            if not page_ids:
                break
            all_ids.extend(page_ids)
            if len(page_ids) < per_page:
                break
            offset += per_page
    
        if not all_ids:
            return {'statusCode': 200, 'body': 'No new Identity Protection alerts.'}
    
        # Step 2: Get alert details in batches (max 1000 IDs per request)
        details = []
        max_batch = 1000
        for i in range(0, len(all_ids), max_batch):
            batch = all_ids[i:i+max_batch]
            details.extend(fetch_alert_details(api_base, token, batch))
    
        if details:
            details.sort(key=lambda d: d.get('updated_timestamp', d.get('created_timestamp', '')))
            latest = details[-1].get('updated_timestamp') or details[-1].get('created_timestamp')
    
            key = f"{s3_prefix}cs_idp_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}.json"
            body = '\n'.join(json.dumps(d, separators=(',', ':')) for d in details)
    
            s3.put_object(
                Bucket=s3_bucket,
                Key=key,
                Body=body.encode('utf-8'),
                ContentType='application/x-ndjson'
            )
    
            update_state(s3, s3_bucket, state_key, latest)
    
        return {'statusCode': 200, 'body': f'Wrote {len(details)} alerts to S3.'}
    
    def get_token(client_id, client_secret, api_base):
        """Get OAuth2 token from CrowdStrike API"""
        url = f"https://{api_base}/oauth2/token"
        data = f"client_id={client_id}&client_secret={client_secret}&grant_type=client_credentials"
        headers = {'Content-Type': 'application/x-www-form-urlencoded'}
        r = HTTP.request('POST', url, body=data, headers=headers)
        if r.status != 200:
            raise Exception(f'Auth failed: {r.status} {r.data}')
        return json.loads(r.data.decode('utf-8'))['access_token']
    
    def query_alert_ids(api_base, token, fql_filter, sort, limit, offset):
        """Query alert IDs using filters"""
        url = f"https://{api_base}/alerts/queries/alerts/v2"
        params = {'filter': fql_filter, 'sort': sort, 'limit': str(limit), 'offset': str(offset)}
        qs = urlencode(params)
        r = HTTP.request('GET', f"{url}?{qs}", headers={'Authorization': f'Bearer {token}'})
        if r.status != 200:
            raise Exception(f'Query alerts failed: {r.status} {r.data}')
        resp = json.loads(r.data.decode('utf-8'))
        return resp.get('resources', [])
    
    def fetch_alert_details(api_base, token, composite_ids):
        """Fetch detailed alert data by composite IDs"""
        url = f"https://{api_base}/alerts/entities/alerts/v2"
        body = {'composite_ids': composite_ids}
        headers = {'Authorization': f'Bearer {token}', 'Content-Type': 'application/json'}
        r = HTTP.request('POST', url, body=json.dumps(body).encode('utf-8'), headers=headers)
        if r.status != 200:
            raise Exception(f'Fetch alert details failed: {r.status} {r.data}')
        resp = json.loads(r.data.decode('utf-8'))
        return resp.get('resources', [])
    
    def get_last_timestamp(s3, bucket, key, default='2023-01-01T00:00:00Z'):
        """Get last processed timestamp from S3 state file"""
        try:
            obj = s3.get_object(Bucket=bucket, Key=key)
            state = json.loads(obj['Body'].read().decode('utf-8'))
            return state.get('last_timestamp', default)
        except s3.exceptions.NoSuchKey:
            return default
    
    def update_state(s3, bucket, key, ts):
        """Update last processed timestamp in S3 state file"""
        state = {'last_timestamp': ts, 'updated': datetime.now(timezone.utc).isoformat()}
        s3.put_object(Bucket=bucket, Key=key, Body=json.dumps(state).encode('utf-8'), ContentType='application/json')
    
  5. 依次前往配置 > 环境变量

  6. 依次点击修改 > 添加新的环境变量

  7. 输入下表中提供的环境变量,并将示例值替换为您的值。

    环境变量

    示例值
    S3_BUCKET crowdstrike-idp-logs-bucket
    S3_PREFIX crowdstrike-idp/
    STATE_KEY crowdstrike-idp/state.json
    CROWDSTRIKE_CLIENT_ID <your-client-id>
    CROWDSTRIKE_CLIENT_SECRET <your-client-secret>
    API_BASE api.crowdstrike.com (US-1)、api.us-2.crowdstrike.com (US-2)、api.eu-1.crowdstrike.com (EU-1)
    ALERTS_LIMIT 1000(可选,每页最多 10000 个)
  8. 创建函数后,请停留在其页面上(或依次打开 Lambda > 函数 > 您的函数)。

  9. 选择配置标签页。

  10. 常规配置面板中,点击修改

  11. 超时更改为 5 分钟(300 秒),然后点击保存

创建 EventBridge 计划

  1. 依次前往 Amazon EventBridge > 调度器 > 创建调度
  2. 提供以下配置详细信息:
    • 周期性安排费率 (15 minutes)。
    • 目标:您的 Lambda 函数 CrowdStrike-IDP-Collector
    • 名称CrowdStrike-IDP-Collector-15m
  3. 点击创建时间表

(可选)为 Google SecOps 创建只读 IAM 用户和密钥

  1. 依次前往 AWS 控制台 > IAM > 用户
  2. 点击 Add users(添加用户)。
  3. 提供以下配置详细信息:
    • 用户:输入 secops-reader
    • 访问类型:选择访问密钥 - 以程序化方式访问
  4. 点击创建用户
  5. 附加最低限度的读取政策(自定义):依次选择用户 > secops-reader > 权限 > 添加权限 > 直接附加政策 > 创建政策
  6. JSON:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::crowdstrike-idp-logs-bucket/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::crowdstrike-idp-logs-bucket"
        }
      ]
    }
    
  7. 名称 = secops-reader-policy

  8. 依次点击创建政策 > 搜索/选择 > 下一步 > 添加权限

  9. secops-reader 创建访问密钥:安全凭据 > 访问密钥

  10. 点击创建访问密钥

  11. 下载 .CSV。(您需要将这些值粘贴到 Feed 中)。

在 Google SecOps 中配置 Feed 以注入 CrowdStrike Identity Protection Services 日志

  1. 依次前往 SIEM 设置> Feed
  2. 点击 + 添加新 Feed
  3. Feed 名称字段中,输入 Feed 的名称(例如 CrowdStrike Identity Protection Services logs)。
  4. 选择 Amazon S3 V2 作为来源类型
  5. 选择 Crowdstrike Identity Protection Services 作为日志类型
  6. 点击下一步
  7. 为以下输入参数指定值:
    • S3 URIs3://crowdstrike-idp-logs-bucket/crowdstrike-idp/
    • 来源删除选项:根据您的偏好选择删除选项。
    • 文件存在时间上限:包含在过去指定天数内修改的文件。默认值为 180 天。
    • 访问密钥 ID:有权访问 S3 存储桶的用户访问密钥。
    • 私有访问密钥:具有 S3 存储桶访问权限的用户私有密钥。
    • 资产命名空间资产命名空间
    • 注入标签:应用于此 Feed 中事件的标签。
  8. 点击下一步
  9. 最终确定界面中查看新的 Feed 配置,然后点击提交

需要更多帮助?从社区成员和 Google SecOps 专业人士那里获得解答。