收集 CrowdStrike FileVantage 記錄

支援的國家/地區:

本文說明如何使用 Amazon S3,將 CrowdStrike FileVantage 記錄擷取至 Google Security Operations。

事前準備

請確認您已完成下列事前準備事項:

  • Google SecOps 執行個體。
  • CrowdStrike Falcon Console 的特殊權限。
  • AWS 的特殊權限 (S3、Identity and Access Management (IAM)、Lambda、EventBridge)。

收集 CrowdStrike FileVantage 必要條件 (API 憑證)

  1. 登入 CrowdStrike Falcon Console
  2. 依序前往「支援與資源」> API 用戶端和金鑰
  3. 按一下「新增 API 用戶端」
  4. 提供下列設定詳細資料:
    • 用戶端名稱:輸入描述性名稱 (例如 Google SecOps FileVantage Integration)。
    • 說明:簡要說明整合用途。
    • API 範圍:選取「Falcon FileVantage:read」
  5. 按一下「新增」即可完成程序。
  6. 複製下列詳細資料並存放在安全位置:
    • 用戶端 ID
    • 用戶端密碼
    • 基本網址 (決定雲端區域)

為 Google SecOps 設定 AWS S3 值區和 IAM

  1. 按照這份使用者指南建立 Amazon S3 bucket建立 bucket
  2. 儲存 bucket 的「名稱」和「區域」,以供日後參考 (例如 crowdstrike-filevantage-logs)。
  3. 請按照這份使用者指南建立使用者建立 IAM 使用者
  4. 選取建立的「使用者」
  5. 選取「安全憑證」分頁標籤。
  6. 在「Access Keys」部分中,按一下「Create Access Key」
  7. 選取「第三方服務」做為「用途」
  8. 點選「下一步」
  9. 選用:新增說明標記。
  10. 按一下「建立存取金鑰」
  11. 按一下「下載 .CSV 檔案」,儲存「存取金鑰」和「私密存取金鑰」以供日後參考。
  12. 按一下 [完成]
  13. 選取「權限」分頁標籤。
  14. 在「權限政策」部分中,按一下「新增權限」
  15. 選取「新增權限」
  16. 選取「直接附加政策」
  17. 搜尋「AmazonS3FullAccess」AmazonS3FullAccess政策。
  18. 選取政策。
  19. 點選「下一步」
  20. 按一下「Add permissions」。

設定 S3 上傳的身分與存取權管理政策和角色

  1. AWS 控制台中,依序前往「IAM」>「Policies」(政策)
  2. 按一下「建立政策」>「JSON」分頁
  3. 複製並貼上下列政策。
  4. 政策 JSON (如果您輸入的 bucket 名稱不同,請替換 crowdstrike-filevantage-logs):

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::crowdstrike-filevantage-logs/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": "s3:GetObject",
          "Resource": "arn:aws:s3:::crowdstrike-filevantage-logs/filevantage/state.json"
        }
      ]
    }
    
  5. 依序點選「下一步」>「建立政策」

  6. 依序前往「IAM」>「Roles」>「Create role」>「AWS service」>「Lambda」。

  7. 附加新建立的政策。

  8. 為角色命名 CrowdStrikeFileVantageRole,然後按一下「建立角色」

建立 Lambda 函式

  1. AWS 控制台中,依序前往「Lambda」>「Functions」>「Create function」
  2. 按一下「從頭開始撰寫」
  3. 請提供下列設定詳細資料:

    設定
    名稱 crowdstrike-filevantage-logs
    執行階段 Python 3.13
    架構 x86_64
    執行角色 CrowdStrikeFileVantageRole
  4. 建立函式後,開啟「程式碼」分頁,刪除存根並貼上以下程式碼 (crowdstrike-filevantage-logs.py)。

    import os
    import json
    import boto3
    import urllib3
    from datetime import datetime, timezone
    from urllib.parse import urlencode
    
    def lambda_handler(event, context):
        """
        Lambda function to fetch CrowdStrike FileVantage logs and store them in S3
        """
    
        # Environment variables
        s3_bucket = os.environ['S3_BUCKET']
        s3_prefix = os.environ['S3_PREFIX']
        state_key = os.environ['STATE_KEY']
        client_id = os.environ['FALCON_CLIENT_ID']
        client_secret = os.environ['FALCON_CLIENT_SECRET']
        base_url = os.environ['FALCON_BASE_URL']
    
        # Initialize clients
        s3_client = boto3.client('s3')
        http = urllib3.PoolManager()
    
        try:
            # Get OAuth token
            token_url = f"{base_url}/oauth2/token"
            token_headers = {
                'Content-Type': 'application/x-www-form-urlencoded',
                'Accept': 'application/json'
            }
            token_data = urlencode({
                'client_id': client_id,
                'client_secret': client_secret,
                'grant_type': 'client_credentials'
            })
    
            token_response = http.request('POST', token_url, body=token_data, headers=token_headers)
    
            if token_response.status != 200:
                print(f"Failed to get OAuth token: {token_response.status}")
                return {'statusCode': 500, 'body': 'Authentication failed'}
    
            token_data = json.loads(token_response.data.decode('utf-8'))
            access_token = token_data['access_token']
    
            # Get last checkpoint
            last_timestamp = get_last_checkpoint(s3_client, s3_bucket, state_key)
    
            # Fetch file changes
            changes_url = f"{base_url}/filevantage/queries/changes/v1"
            headers = {
                'Authorization': f'Bearer {access_token}',
                'Accept': 'application/json'
            }
    
            # Build query parameters
            params = {
                'limit': 500,
                'sort': 'action_timestamp.asc'
            }
    
            if last_timestamp:
                params['filter'] = f"action_timestamp:>'{last_timestamp}'"
    
            query_url = f"{changes_url}?{urlencode(params)}"
            response = http.request('GET', query_url, headers=headers)
    
            if response.status != 200:
                print(f"Failed to query changes: {response.status}")
                return {'statusCode': 500, 'body': 'Failed to fetch changes'}
    
            response_data = json.loads(response.data.decode('utf-8'))
            change_ids = response_data.get('resources', [])
    
            if not change_ids:
                print("No new changes found")
                return {'statusCode': 200, 'body': 'No new changes'}
    
            # Get detailed change information
            details_url = f"{base_url}/filevantage/entities/changes/v1"
            batch_size = 100
            all_changes = []
            latest_timestamp = last_timestamp
    
            for i in range(0, len(change_ids), batch_size):
                batch_ids = change_ids[i:i + batch_size]
                details_params = {'ids': batch_ids}
                details_query_url = f"{details_url}?{urlencode(details_params, doseq=True)}"
    
                details_response = http.request('GET', details_query_url, headers=headers)
    
                if details_response.status == 200:
                    details_data = json.loads(details_response.data.decode('utf-8'))
                    changes = details_data.get('resources', [])
                    all_changes.extend(changes)
    
                    # Track latest timestamp
                    for change in changes:
                        change_time = change.get('action_timestamp')
                        if change_time and (not latest_timestamp or change_time > latest_timestamp):
                            latest_timestamp = change_time
    
            if all_changes:
                # Store logs in S3
                timestamp = datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')
                s3_key = f"{s3_prefix}filevantage_changes_{timestamp}.json"
    
                s3_client.put_object(
                    Bucket=s3_bucket,
                    Key=s3_key,
                    Body='\n'.join(json.dumps(change) for change in all_changes),
                    ContentType='application/json'
                )
    
                # Update checkpoint
                save_checkpoint(s3_client, s3_bucket, state_key, latest_timestamp)
    
                print(f"Stored {len(all_changes)} changes in S3: {s3_key}")
    
            return {
                'statusCode': 200,
                'body': f'Processed {len(all_changes)} changes'
            }
    
        except Exception as e:
            print(f"Error: {str(e)}")
            return {'statusCode': 500, 'body': f'Error: {str(e)}'}
    
    def get_last_checkpoint(s3_client, bucket, key):
        """Get the last processed timestamp from S3 state file"""
        try:
            response = s3_client.get_object(Bucket=bucket, Key=key)
            state = json.loads(response['Body'].read().decode('utf-8'))
            return state.get('last_timestamp')
        except s3_client.exceptions.NoSuchKey:
            return None
        except Exception as e:
            print(f"Error reading checkpoint: {e}")
            return None
    
    def save_checkpoint(s3_client, bucket, key, timestamp):
        """Save the last processed timestamp to S3 state file"""
        try:
            state = {
                'last_timestamp': timestamp,
                'updated_at': datetime.now(timezone.utc).isoformat()
            }
            s3_client.put_object(
                Bucket=bucket,
                Key=key,
                Body=json.dumps(state),
                ContentType='application/json'
            )
        except Exception as e:
            print(f"Error saving checkpoint: {e}")
    
  5. 依序前往「設定」>「環境變數」

  6. 依序點選「編輯」> 新增環境變數

  7. 輸入下表提供的環境變數,並將範例值換成您的值。

    環境變數

    範例值
    S3_BUCKET crowdstrike-filevantage-logs
    S3_PREFIX filevantage/
    STATE_KEY filevantage/state.json
    FALCON_CLIENT_ID <your-client-id>
    FALCON_CLIENT_SECRET <your-client-secret>
    FALCON_BASE_URL https://api.crowdstrike.com (US-1) / https://api.us-2.crowdstrike.com (US-2) / https://api.eu-1.crowdstrike.com (EU-1)
  8. 建立函式後,請留在函式頁面 (或依序開啟「Lambda」>「Functions」>「your-function」)。

  9. 選取「設定」分頁標籤。

  10. 在「一般設定」面板中,按一下「編輯」

  11. 將「Timeout」(逾時間隔) 變更為「5 minutes (300 seconds)」(5 分鐘 (300 秒)),然後按一下「Save」(儲存)

建立 EventBridge 排程

  1. 依序前往「Amazon EventBridge」>「Scheduler」>「Create schedule」
  2. 提供下列設定詳細資料:
    • 週期性時間表費率 (1 hour)。
    • 目標:您的 Lambda 函式 crowdstrike-filevantage-logs
    • 名稱crowdstrike-filevantage-logs-1h
  3. 按一下「建立時間表」

(選用) 為 Google SecOps 建立唯讀 IAM 使用者和金鑰

  1. 前往 AWS 控制台 > IAM > 使用者
  2. 點選 [Add users] (新增使用者)。
  3. 提供下列設定詳細資料:
    • 使用者:輸入 secops-reader
    • 存取類型:選取「存取金鑰 - 程式輔助存取」
  4. 按一下「建立使用者」
  5. 附加最低讀取權限政策 (自訂):依序選取「Users」(使用者) >「secops-reader」>「Permissions」(權限) >「Add permissions」(新增權限) >「Attach policies directly」(直接附加政策) >「Create policy」(建立政策)
  6. JSON:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::crowdstrike-filevantage-logs/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::crowdstrike-filevantage-logs"
        }
      ]
    }
    
  7. Name = secops-reader-policy

  8. 依序點選「建立政策」> 搜尋/選取 >「下一步」>「新增權限」

  9. secops-reader 建立存取金鑰:依序點選「安全憑證」>「存取金鑰」

  10. 按一下「建立存取金鑰」

  11. 下載 .CSV。(您會將這些值貼到動態饋給中)。

在 Google SecOps 中設定動態消息,擷取 CrowdStrike FileVantage 記錄

  1. 依序前往「SIEM 設定」>「動態饋給」
  2. 按一下「+ 新增動態消息」
  3. 在「動態饋給名稱」欄位中輸入動態饋給名稱 (例如 CrowdStrike FileVantage logs)。
  4. 選取「Amazon S3 V2」做為「來源類型」
  5. 選取「CrowdStrike Filevantage」做為「記錄類型」
  6. 點選「下一步」
  7. 指定下列輸入參數的值:
    • S3 URIs3://crowdstrike-filevantage-logs/filevantage/
    • 來源刪除選項:根據偏好設定選取刪除選項。
    • 檔案存在時間上限:包含在過去天數內修改的檔案。預設值為 180 天。
    • 存取金鑰 ID:具有 S3 值區存取權的使用者存取金鑰。
    • 存取密鑰:具有 S3 bucket 存取權的使用者私密金鑰。
    • 資產命名空間資產命名空間
    • 擷取標籤:套用至這個動態饋給事件的標籤。
  8. 點選「下一步」
  9. 在「完成」畫面中檢查新的動態饋給設定,然後按一下「提交」

還有其他問題嗎?向社群成員和 Google SecOps 專業人員尋求答案。