CrowdStrike Identity Protection(IDP)サービスのログを収集する
このドキュメントでは、Google Cloud Storage を使用して CrowdStrike Identity Protection(IDP)Services ログを Google Security Operations に取り込む方法について説明します。この統合では、CrowdStrike Unified Alerts API を使用して Identity Protection イベントを収集し、組み込みの CS_IDP パーサーで処理するために NDJSON 形式で保存します。
始める前に
次の前提条件を満たしていることを確認します。
- Google SecOps インスタンス
- Cloud Storage API が有効になっている GCP プロジェクト
- GCS バケットを作成および管理する権限
- GCS バケットの IAM ポリシーを管理する権限
- Cloud Run サービス、Pub/Sub トピック、Cloud Scheduler ジョブを作成する権限
- CrowdStrike Falcon コンソールと API キー管理への特権アクセス
Google Cloud Storage バケットを作成する
- Google Cloud Console に移動します。
- プロジェクトを選択するか、新しいプロジェクトを作成します。
- ナビゲーション メニューで、[Cloud Storage > バケット] に移動します。
- [バケットを作成] をクリックします。
次の構成情報を提供してください。
設定 値 バケットに名前を付ける グローバルに一意の名前( crowdstrike-idp-logs-bucketなど)を入力します。ロケーション タイプ ニーズに基づいて選択します(リージョン、デュアルリージョン、マルチリージョン)。 ロケーション ロケーションを選択します(例: us-central1)。ストレージ クラス Standard(頻繁にアクセスされるログにおすすめ) アクセス制御 均一(推奨) 保護ツール 省略可: オブジェクトのバージョニングまたは保持ポリシーを有効にする [作成] をクリックします。
CrowdStrike Identity Protection の前提条件を取得する
- CrowdStrike Falcon Console にログインします。
- [サポートとリソース> API クライアントとキー] に移動します。
- [新しい API クライアントを追加] をクリックします。
- 次の構成の詳細を指定します。
- お客様名: 「
Google SecOps IDP Integration」と入力します。 - 説明: 「
API client for Google SecOps integration」と入力します。 - スコープ: [アラート: 読み取り](alerts:read)スコープを選択します(これには Identity Protection アラートが含まれます)。
- お客様名: 「
- [追加] をクリックします。
- 次の詳細をコピーして安全な場所に保存します。
- Client-ID
- クライアント シークレット(1 回のみ表示されます)
- ベース URL(例: US-1 の場合は
api.crowdstrike.com、US-2 の場合はapi.us-2.crowdstrike.com、EU-1 の場合はapi.eu-1.crowdstrike.com)
Cloud Run functions のサービス アカウントを作成する
Cloud Run 関数には、GCS バケットに書き込む権限を持つサービス アカウントが必要です。
サービス アカウントの作成
- GCP Console で、[IAM と管理>サービス アカウント] に移動します。
- [サービス アカウントを作成] をクリックします。
- 次の構成の詳細を指定します。
- サービス アカウント名: 「
crowdstrike-idp-collector-sa」と入力します。 - サービス アカウントの説明: 「
Service account for Cloud Run function to collect CrowdStrike IDP logs」と入力します。
- サービス アカウント名: 「
- [作成して続行] をクリックします。
- [このサービス アカウントにプロジェクトへのアクセスを許可する] セクションで、次の操作を行います。
- [ロールを選択] をクリックします。
- [ストレージ オブジェクト管理者] を検索して選択します。
- [+ 別のロールを追加] をクリックします。
- [Cloud Run 起動元] を検索して選択します。
- [+ 別のロールを追加] をクリックします。
- [Cloud Functions 起動元] を検索して選択します。
- [続行] をクリックします。
- [完了] をクリックします。
これらのロールは、次の目的で必要です。
- Storage オブジェクト管理者: ログを GCS バケットに書き込み、状態ファイルを管理する
- Cloud Run 起動元: Pub/Sub が関数を呼び出すことを許可する
- Cloud Functions 起動元: 関数の呼び出しを許可する
GCS バケットに対する IAM 権限を付与する
GCS バケットに対する書き込み権限をサービス アカウントに付与します。
- [Cloud Storage] > [バケット] に移動します。
- バケット名をクリックします。
- [権限] タブに移動します。
- [アクセス権を付与] をクリックします。
- 次の構成の詳細を指定します。
- プリンシパルを追加: サービス アカウントのメールアドレス(例:
crowdstrike-idp-collector-sa@PROJECT_ID.iam.gserviceaccount.com)を入力します。 - ロールを割り当てる: [Storage オブジェクト管理者] を選択します。
- プリンシパルを追加: サービス アカウントのメールアドレス(例:
- [保存] をクリックします。
Pub/Sub トピックの作成
Cloud Scheduler がパブリッシュし、Cloud Run functions がサブスクライブする Pub/Sub トピックを作成します。
- GCP Console で、[Pub/Sub> トピック] に移動します。
- [トピックを作成] をクリックします。
- 次の構成の詳細を指定します。
- トピック ID: 「
crowdstrike-idp-trigger」と入力します。 - その他の設定はデフォルトのままにします。
- トピック ID: 「
- [作成] をクリックします。
ログを収集する Cloud Run 関数を作成する
Cloud Run 関数は、Cloud Scheduler からの Pub/Sub メッセージによってトリガーされ、CrowdStrike Identity Protection API からログを取得して GCS に書き込みます。
- GCP Console で、[Cloud Run] に移動します。
- [サービスを作成] をクリックします。
- [関数] を選択します(インライン エディタを使用して関数を作成します)。
[構成] セクションで、次の構成の詳細を指定します。
設定 値 サービス名 crowdstrike-idp-collectorリージョン GCS バケットと一致するリージョンを選択します(例: us-central1)。ランタイム [Python 3.12] 以降を選択します。 [トリガー(省略可)] セクションで、次の操作を行います。
- [+ トリガーを追加] をクリックします。
- [Cloud Pub/Sub] を選択します。
- [Cloud Pub/Sub トピックを選択してください] で、トピック
crowdstrike-idp-triggerを選択します。 - [保存] をクリックします。
[認証] セクションで、次の操作を行います。
- [認証が必要] を選択します。
- Identity and Access Management(IAM)を確認します。
下にスクロールして、[コンテナ、ネットワーキング、セキュリティ] を開きます。
[セキュリティ] タブに移動します。
- サービス アカウント: サービス アカウント
crowdstrike-idp-collector-saを選択します。
- サービス アカウント: サービス アカウント
[コンテナ] タブに移動します。
- [変数とシークレット] をクリックします。
- 環境変数ごとに [+ 変数を追加] をクリックします。
変数名 値の例 GCS_BUCKETcrowdstrike-idp-logs-bucketGCS_PREFIXcrowdstrike-idp/STATE_KEYcrowdstrike-idp/state.jsonCROWDSTRIKE_CLIENT_IDyour-client-idCROWDSTRIKE_CLIENT_SECRETyour-client-secretAPI_BASEapi.crowdstrike.com(US-1)、api.us-2.crowdstrike.com(US-2)、api.eu-1.crowdstrike.com(EU-1)ALERTS_LIMIT1000(省略可、ページあたり最大 10, 000)[変数とシークレット] タブで [リクエスト] まで下にスクロールします。
- リクエストのタイムアウト:
600秒(10 分)を入力します。
- リクエストのタイムアウト:
[コンテナ] の [設定] タブに移動します。
- [リソース] セクションで次の操作を行います。
- メモリ: 512 MiB 以上を選択します。
- CPU: [1] を選択します。
- [完了] をクリックします。
- [リソース] セクションで次の操作を行います。
[実行環境] までスクロールします。
- [デフォルト](推奨)を選択します。
[リビジョン スケーリング] セクションで、次の操作を行います。
- [インスタンスの最小数] に「
0」と入力します。 - インスタンスの最大数:
100と入力します(または、予想される負荷に基づいて調整します)。
- [インスタンスの最小数] に「
[作成] をクリックします。
サービスが作成されるまで待ちます(1 ~ 2 分)。
サービスを作成すると、インライン コードエディタが自動的に開きます。
関数コードを追加する
- [関数のエントリ ポイント] に「main」と入力します。
インライン コードエディタで、次の 2 つのファイルを作成します。
- 最初のファイル: main.py:
import functions_framework from google.cloud import storage import json import os import urllib3 from datetime import datetime, timezone from urllib.parse import urlencode # Initialize HTTP client http = urllib3.PoolManager() # Initialize Storage client storage_client = storage.Client() @functions_framework.cloud_event def main(cloud_event): """ Fetch CrowdStrike Identity Protection alerts (Unified Alerts API) and store RAW JSON (NDJSON) to GCS for the CS_IDP parser. No transformation is performed. """ # Get environment variables bucket_name = os.environ.get('GCS_BUCKET') prefix = os.environ.get('GCS_PREFIX', 'crowdstrike-idp/') state_key = os.environ.get('STATE_KEY', 'crowdstrike-idp/state.json') client_id = os.environ.get('CROWDSTRIKE_CLIENT_ID') client_secret = os.environ.get('CROWDSTRIKE_CLIENT_SECRET') api_base = os.environ.get('API_BASE') if not all([bucket_name, client_id, client_secret, api_base]): print('Error: Missing required environment variables') return try: bucket = storage_client.bucket(bucket_name) # Get OAuth token token = get_token(client_id, client_secret, api_base) # Load last processed timestamp last_ts = get_last_timestamp(bucket, state_key) # FQL filter for Identity Protection alerts only, newer than checkpoint fql_filter = f"product:'idp' + updated_timestamp:>'{last_ts}'" sort = 'updated_timestamp.asc' # Step 1: Get list of alert IDs all_ids = [] per_page = int(os.environ.get('ALERTS_LIMIT', '1000')) offset = 0 while True: page_ids = query_alert_ids(api_base, token, fql_filter, sort, per_page, offset) if not page_ids: break all_ids.extend(page_ids) if len(page_ids) < per_page: break offset += per_page if not all_ids: print('No new Identity Protection alerts.') return # Step 2: Get alert details in batches (max 1000 IDs per request) details = [] max_batch = 1000 for i in range(0, len(all_ids), max_batch): batch = all_ids[i:i + max_batch] details.extend(fetch_alert_details(api_base, token, batch)) if details: # Sort by updated_timestamp details.sort(key=lambda d: d.get('updated_timestamp', d.get('created_timestamp', ''))) latest = details[-1].get('updated_timestamp') or details[-1].get('created_timestamp') # Write to GCS timestamp = datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S') blob_name = f"{prefix}cs_idp_{timestamp}.json" blob = bucket.blob(blob_name) # NDJSON format log_lines = '\n'.join([json.dumps(d, separators=(',', ':')) for d in details]) blob.upload_from_string(log_lines, content_type='application/x-ndjson') print(f'Wrote {len(details)} alerts to {blob_name}') # Update state update_state(bucket, state_key, latest) except Exception as e: print(f'Error processing alerts: {str(e)}') raise def get_token(client_id, client_secret, api_base): """Get OAuth2 token from CrowdStrike API""" url = f"https://{api_base}/oauth2/token" data = f"client_id={client_id}&client_secret={client_secret}&grant_type=client_credentials" headers = {'Content-Type': 'application/x-www-form-urlencoded'} r = http.request('POST', url, body=data, headers=headers) if r.status != 200: raise Exception(f'Auth failed: {r.status} {r.data}') return json.loads(r.data.decode('utf-8'))['access_token'] def query_alert_ids(api_base, token, fql_filter, sort, limit, offset): """Query alert IDs using filters""" url = f"https://{api_base}/alerts/queries/alerts/v2" params = { 'filter': fql_filter, 'sort': sort, 'limit': str(limit), 'offset': str(offset) } qs = urlencode(params) r = http.request('GET', f"{url}?{qs}", headers={'Authorization': f'Bearer {token}'}) if r.status != 200: raise Exception(f'Query alerts failed: {r.status} {r.data}') resp = json.loads(r.data.decode('utf-8')) return resp.get('resources', []) def fetch_alert_details(api_base, token, composite_ids): """Fetch detailed alert data by composite IDs""" url = f"https://{api_base}/alerts/entities/alerts/v2" body = {'composite_ids': composite_ids} headers = { 'Authorization': f'Bearer {token}', 'Content-Type': 'application/json' } r = http.request('POST', url, body=json.dumps(body).encode('utf-8'), headers=headers) if r.status != 200: raise Exception(f'Fetch alert details failed: {r.status} {r.data}') resp = json.loads(r.data.decode('utf-8')) return resp.get('resources', []) def get_last_timestamp(bucket, key, default='2023-01-01T00:00:00Z'): """Get last processed timestamp from GCS state file""" try: blob = bucket.blob(key) if blob.exists(): state_data = blob.download_as_text() state = json.loads(state_data) return state.get('last_timestamp', default) except Exception as e: print(f'Warning: Could not load state: {str(e)}') return default def update_state(bucket, key, ts): """Update last processed timestamp in GCS state file""" state = { 'last_timestamp': ts, 'updated': datetime.now(timezone.utc).isoformat() } blob = bucket.blob(key) blob.upload_from_string(json.dumps(state), content_type='application/json')- 2 つ目のファイル: requirements.txt:
functions-framework==3.* google-cloud-storage==2.* urllib3>=2.0.0[デプロイ] をクリックして、関数を保存してデプロイします。
デプロイが完了するまで待ちます(2 ~ 3 分)。
Cloud Scheduler ジョブの作成
Cloud Scheduler は、定期的に Pub/Sub トピックにメッセージをパブリッシュし、Cloud Run functions の関数をトリガーします。
- GCP Console で、[Cloud Scheduler] に移動します。
- [ジョブを作成] をクリックします。
次の構成情報を提供してください。
設定 値 名前 crowdstrike-idp-collector-15mリージョン Cloud Run functions と同じリージョンを選択する 周波数 */15 * * * *(15 分ごと)タイムゾーン タイムゾーンを選択します(UTC を推奨)。 ターゲット タイプ Pub/Sub トピック トピック crowdstrike-idp-triggerを選択します。メッセージ本文 {}(空の JSON オブジェクト)[作成] をクリックします。
スケジューラ ジョブをテストする
- Cloud Scheduler コンソールで、ジョブを見つけます。
- [強制実行] をクリックして手動でトリガーします。
- 数秒待ってから、[Cloud Run] > [サービス] > [crowdstrike-idp-collector] > [ログ] に移動します。
- 関数が正常に実行されたことを確認します。
- GCS バケットをチェックして、ログが書き込まれたことを確認します。
Google SecOps サービス アカウントを取得する
Google SecOps は、一意のサービス アカウントを使用して GCS バケットからデータを読み取ります。このサービス アカウントにバケットへのアクセス権を付与する必要があります。
サービス アカウントのメールアドレスを取得する
- [SIEM 設定] > [フィード] に移動します。
- [Add New Feed] をクリックします。
- [単一フィードを設定] をクリックします。
- [フィード名] フィールドに、フィードの名前を入力します(例:
CrowdStrike Identity Protection Services logs)。 - [ソースタイプ] として [Google Cloud Storage V2] を選択します。
- [ログタイプ] として [Crowdstrike Identity Protection Services] を選択します。
[サービス アカウントを取得する] をクリックします。一意のサービス アカウント メールアドレスが表示されます(例:)。
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.comこのメールアドレスをコピーして、次のステップで使用します。
Google SecOps サービス アカウントに IAM 権限を付与する
Google SecOps サービス アカウントには、GCS バケットに対する Storage オブジェクト閲覧者ロールが必要です。
- [Cloud Storage] > [バケット] に移動します。
- バケット名をクリックします。
- [権限] タブに移動します。
- [アクセス権を付与] をクリックします。
- 次の構成の詳細を指定します。
- プリンシパルを追加: Google SecOps サービス アカウントのメールアドレスを貼り付けます。
- ロールを割り当てる: [ストレージ オブジェクト閲覧者] を選択します。
[保存] をクリックします。
CrowdStrike Identity Protection Services のログを取り込むように Google SecOps でフィードを構成する
- [SIEM 設定] > [フィード] に移動します。
- [Add New Feed] をクリックします。
- [単一フィードを設定] をクリックします。
- [フィード名] フィールドに、フィードの名前を入力します(例:
CrowdStrike Identity Protection Services logs)。 - [ソースタイプ] として [Google Cloud Storage V2] を選択します。
- [ログタイプ] として [Crowdstrike Identity Protection Services] を選択します。
- [次へ] をクリックします。
次の入力パラメータの値を指定します。
ストレージ バケットの URL: 接頭辞パスを含む GCS バケット URI を入力します。
gs://crowdstrike-idp-logs-bucket/crowdstrike-idp/次のように置き換えます。
crowdstrike-idp-logs-bucket: GCS バケット名。crowdstrike-idp/: ログが保存される接頭辞/フォルダパス。
Source deletion option: 必要に応じて削除オプションを選択します。
- なし: 転送後にファイルを削除しません(テストにおすすめ)。
- 転送されたファイルを削除する: 転送が完了した後にファイルを削除します。
転送されたファイルと空のディレクトリを削除する: 転送が完了した後にファイルと空のディレクトリを削除します。
ファイルの最大経過日数: 指定した日数以内に変更されたファイルを含めます。デフォルトは 180 日です。
アセットの名前空間: アセットの名前空間。
Ingestion labels: このフィードのイベントに適用されるラベル。
[次へ] をクリックします。
[Finalize] 画面で新しいフィードの設定を確認し、[送信] をクリックします。
ご不明な点がございましたら、コミュニティ メンバーや Google SecOps のプロフェッショナルから回答を得ることができます。