CrowdStrike FileVantage のログを収集する

以下でサポートされています。

このドキュメントでは、Amazon S3 を使用して CrowdStrike FileVantage ログを Google Security Operations に取り込む方法について説明します。

始める前に

次の前提条件を満たしていることを確認してください。

  • Google SecOps インスタンス。
  • CrowdStrike Falcon Console への特権アクセス。
  • AWS(S3、Identity and Access Management(IAM)、Lambda、EventBridge)への特権アクセス。

CrowdStrike FileVantage の前提条件(API 認証情報)を収集する

  1. CrowdStrike Falcon Console にログインします。
  2. [サポートとリソース] > [API クライアントとキー] に移動します。
  3. [新しい API クライアントを追加] をクリックします。
  4. 次の構成の詳細を入力します。
    • クライアント名: わかりやすい名前を入力します(例: Google SecOps FileVantage Integration)。
    • 説明: 統合の目的の簡単な説明を入力します。
    • API スコープ: [Falcon FileVantage:read] を選択します。
  5. [Add] をクリックして処理を完了します。
  6. 次の詳細をコピーして安全な場所に保存します。
    • クライアント ID
    • Client Secret
    • ベース URL(クラウド リージョンを決定します)

Google SecOps 用に AWS S3 バケットと IAM を構成する

  1. バケットの作成のユーザーガイドに沿って、Amazon S3 バケットを作成します。
  2. 後で参照できるように、バケットの名前リージョンを保存します(例: crowdstrike-filevantage-logs)。
  3. IAM ユーザーの作成のユーザーガイドに沿って、ユーザーを作成します。
  4. 作成したユーザーを選択します。
  5. [セキュリティ認証情報] タブを選択します。
  6. [アクセスキー] セクションで [アクセスキーを作成] をクリックします。
  7. [ユースケース] として [サードパーティ サービス] を選択します。
  8. [次へ] をクリックします。
  9. 省略可: 説明タグを追加します。
  10. [アクセスキーを作成] をクリックします。
  11. [.csv ファイルをダウンロード] をクリックし、[アクセスキー] と [シークレット アクセスキー] を保存して、今後の参照に備えます。
  12. [完了] をクリックします。
  13. [権限] タブを選択します。
  14. [権限ポリシー] セクションの [権限を追加] をクリックします。
  15. [権限を追加] を選択します。
  16. [ポリシーを直接アタッチする] を選択します。
  17. AmazonS3FullAccess ポリシーを検索します。
  18. ポリシーを選択します。
  19. [次へ] をクリックします。
  20. [権限を追加] をクリックします。

S3 アップロードの IAM ポリシーとロールを構成する

  1. AWS コンソールで、[IAM] > [ポリシー] に移動します。
  2. [ポリシーを作成> [JSON] タブ] をクリックします。
  3. 次のポリシーをコピーして貼り付けます。
  4. ポリシー JSON(別のバケット名を入力した場合は crowdstrike-filevantage-logs を置き換えます):

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::crowdstrike-filevantage-logs/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": "s3:GetObject",
          "Resource": "arn:aws:s3:::crowdstrike-filevantage-logs/filevantage/state.json"
        }
      ]
    }
    
  5. [次へ] > [ポリシーを作成] をクリックします。

  6. [IAM] > [ロール] > [ロールの作成] > [AWS サービス] > [Lambda] に移動します。

  7. 新しく作成したポリシーを関連付けます。

  8. ロールに「CrowdStrikeFileVantageRole」という名前を付けて、[ロールを作成] をクリックします。

Lambda 関数を作成する

  1. AWS コンソールで、[Lambda] > [Functions] > [Create function] に移動します。
  2. [Author from scratch] をクリックします。
  3. 次の構成情報を提供してください。

    設定
    名前 crowdstrike-filevantage-logs
    ランタイム Python 3.13
    アーキテクチャ x86_64
    実行ロール CrowdStrikeFileVantageRole
  4. 関数を作成したら、[コード] タブを開き、スタブを削除して次のコード(crowdstrike-filevantage-logs.py)を貼り付けます。

    import os
    import json
    import boto3
    import urllib3
    from datetime import datetime, timezone
    from urllib.parse import urlencode
    
    def lambda_handler(event, context):
        """
        Lambda function to fetch CrowdStrike FileVantage logs and store them in S3
        """
    
        # Environment variables
        s3_bucket = os.environ['S3_BUCKET']
        s3_prefix = os.environ['S3_PREFIX']
        state_key = os.environ['STATE_KEY']
        client_id = os.environ['FALCON_CLIENT_ID']
        client_secret = os.environ['FALCON_CLIENT_SECRET']
        base_url = os.environ['FALCON_BASE_URL']
    
        # Initialize clients
        s3_client = boto3.client('s3')
        http = urllib3.PoolManager()
    
        try:
            # Get OAuth token
            token_url = f"{base_url}/oauth2/token"
            token_headers = {
                'Content-Type': 'application/x-www-form-urlencoded',
                'Accept': 'application/json'
            }
            token_data = urlencode({
                'client_id': client_id,
                'client_secret': client_secret,
                'grant_type': 'client_credentials'
            })
    
            token_response = http.request('POST', token_url, body=token_data, headers=token_headers)
    
            if token_response.status != 200:
                print(f"Failed to get OAuth token: {token_response.status}")
                return {'statusCode': 500, 'body': 'Authentication failed'}
    
            token_data = json.loads(token_response.data.decode('utf-8'))
            access_token = token_data['access_token']
    
            # Get last checkpoint
            last_timestamp = get_last_checkpoint(s3_client, s3_bucket, state_key)
    
            # Fetch file changes
            changes_url = f"{base_url}/filevantage/queries/changes/v1"
            headers = {
                'Authorization': f'Bearer {access_token}',
                'Accept': 'application/json'
            }
    
            # Build query parameters
            params = {
                'limit': 500,
                'sort': 'action_timestamp.asc'
            }
    
            if last_timestamp:
                params['filter'] = f"action_timestamp:>'{last_timestamp}'"
    
            query_url = f"{changes_url}?{urlencode(params)}"
            response = http.request('GET', query_url, headers=headers)
    
            if response.status != 200:
                print(f"Failed to query changes: {response.status}")
                return {'statusCode': 500, 'body': 'Failed to fetch changes'}
    
            response_data = json.loads(response.data.decode('utf-8'))
            change_ids = response_data.get('resources', [])
    
            if not change_ids:
                print("No new changes found")
                return {'statusCode': 200, 'body': 'No new changes'}
    
            # Get detailed change information
            details_url = f"{base_url}/filevantage/entities/changes/v1"
            batch_size = 100
            all_changes = []
            latest_timestamp = last_timestamp
    
            for i in range(0, len(change_ids), batch_size):
                batch_ids = change_ids[i:i + batch_size]
                details_params = {'ids': batch_ids}
                details_query_url = f"{details_url}?{urlencode(details_params, doseq=True)}"
    
                details_response = http.request('GET', details_query_url, headers=headers)
    
                if details_response.status == 200:
                    details_data = json.loads(details_response.data.decode('utf-8'))
                    changes = details_data.get('resources', [])
                    all_changes.extend(changes)
    
                    # Track latest timestamp
                    for change in changes:
                        change_time = change.get('action_timestamp')
                        if change_time and (not latest_timestamp or change_time > latest_timestamp):
                            latest_timestamp = change_time
    
            if all_changes:
                # Store logs in S3
                timestamp = datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')
                s3_key = f"{s3_prefix}filevantage_changes_{timestamp}.json"
    
                s3_client.put_object(
                    Bucket=s3_bucket,
                    Key=s3_key,
                    Body='\n'.join(json.dumps(change) for change in all_changes),
                    ContentType='application/json'
                )
    
                # Update checkpoint
                save_checkpoint(s3_client, s3_bucket, state_key, latest_timestamp)
    
                print(f"Stored {len(all_changes)} changes in S3: {s3_key}")
    
            return {
                'statusCode': 200,
                'body': f'Processed {len(all_changes)} changes'
            }
    
        except Exception as e:
            print(f"Error: {str(e)}")
            return {'statusCode': 500, 'body': f'Error: {str(e)}'}
    
    def get_last_checkpoint(s3_client, bucket, key):
        """Get the last processed timestamp from S3 state file"""
        try:
            response = s3_client.get_object(Bucket=bucket, Key=key)
            state = json.loads(response['Body'].read().decode('utf-8'))
            return state.get('last_timestamp')
        except s3_client.exceptions.NoSuchKey:
            return None
        except Exception as e:
            print(f"Error reading checkpoint: {e}")
            return None
    
    def save_checkpoint(s3_client, bucket, key, timestamp):
        """Save the last processed timestamp to S3 state file"""
        try:
            state = {
                'last_timestamp': timestamp,
                'updated_at': datetime.now(timezone.utc).isoformat()
            }
            s3_client.put_object(
                Bucket=bucket,
                Key=key,
                Body=json.dumps(state),
                ContentType='application/json'
            )
        except Exception as e:
            print(f"Error saving checkpoint: {e}")
    
  5. [構成] > [環境変数] に移動します。

  6. [編集>新しい環境変数を追加] をクリックします。

  7. 次の表に示す環境変数を入力し、サンプル値を自分の値に置き換えます。

    環境変数

    キー 値の例
    S3_BUCKET crowdstrike-filevantage-logs
    S3_PREFIX filevantage/
    STATE_KEY filevantage/state.json
    FALCON_CLIENT_ID <your-client-id>
    FALCON_CLIENT_SECRET <your-client-secret>
    FALCON_BASE_URL https://api.crowdstrike.com(US-1)/ https://api.us-2.crowdstrike.com(US-2)/ https://api.eu-1.crowdstrike.com(EU-1)
  8. 関数が作成されたら、そのページにとどまるか、[Lambda] > [関数] > [your-function] を開きます。

  9. [CONFIGURATION] タブを選択します。

  10. [全般設定] パネルで、[編集] をクリックします。

  11. [Timeout] を [5 minutes (300 seconds)] に変更し、[Save] をクリックします。

EventBridge スケジュールを作成する

  1. [Amazon EventBridge] > [Scheduler] > [スケジュールの作成] に移動します。
  2. 次の構成の詳細を入力します。
    • 定期的なスケジュール: レート1 hour)。
    • ターゲット: Lambda 関数 crowdstrike-filevantage-logs
    • 名前: crowdstrike-filevantage-logs-1h
  3. [スケジュールを作成] をクリックします。

(省略可)Google SecOps の読み取り専用 IAM ユーザーと鍵を作成する

  1. AWS コンソール > IAM > ユーザーに移動します。
  2. [ユーザーを追加] をクリックします。
  3. 次の構成の詳細を入力します。
    • ユーザー: 「secops-reader」と入力します。
    • アクセスの種類: [アクセスキー - プログラムによるアクセス] を選択します。
  4. [ユーザーを作成] をクリックします。
  5. 最小限の読み取りポリシー(カスタム)を関連付ける: [ユーザー] > [secops-reader] > [権限] > [権限を追加] > [ポリシーを直接関連付ける] > [ポリシーを作成]
  6. JSON:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::crowdstrike-filevantage-logs/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::crowdstrike-filevantage-logs"
        }
      ]
    }
    
  7. 名前 = secops-reader-policy

  8. [ポリシーを作成> 検索/選択> 次へ> 権限を追加] をクリックします。

  9. secops-reader のアクセスキーを作成します。[セキュリティ認証情報] > [アクセスキー] に移動します。

  10. [アクセスキーを作成] をクリックします。

  11. .CSV をダウンロードします。(これらの値はフィードに貼り付けます)。

CrowdStrike FileVantage のログを取り込むように Google SecOps でフィードを構成する

  1. [SIEM 設定] > [フィード] に移動します。
  2. [+ 新しいフィードを追加] をクリックします。
  3. [フィード名] フィールドに、フィードの名前を入力します(例: CrowdStrike FileVantage logs)。
  4. [ソースタイプ] として [Amazon S3 V2] を選択します。
  5. [ログタイプ] として [CrowdStrike Filevantage] を選択します。
  6. [次へ] をクリックします。
  7. 次の入力パラメータの値を指定します。
    • S3 URI: s3://crowdstrike-filevantage-logs/filevantage/
    • Source deletion options: 必要に応じて削除オプションを選択します。
    • ファイルの最大経過日数: 指定した日数以内に変更されたファイルを含めます。デフォルトは 180 日です。
    • アクセスキー ID: S3 バケットにアクセスできるユーザー アクセスキー。
    • シークレット アクセスキー: S3 バケットにアクセスできるユーザーのシークレット キー。
    • アセットの名前空間: アセットの名前空間
    • Ingestion labels: このフィードのイベントに適用されるラベル。
  8. [次へ] をクリックします。
  9. [Finalize] 画面で新しいフィードの設定を確認し、[送信] をクリックします。

さらにサポートが必要な場合 コミュニティ メンバーや Google SecOps のプロフェッショナルから回答を得ることができます。