SpyCloud ログを収集する
このドキュメントでは、Google Cloud Storage V2 を使用して SpyCloud ログを Google Security Operations に取り込む方法について説明します。
SpyCloud は、漏洩した認証情報と盗まれた認証情報に関するインテリジェンスを提供するアカウントの乗っ取り防止プラットフォームです。REST API を介して、漏洩レコード、ウォッチリスト アラート、不正使用された認証情報のレポートを提供します。
始める前に
次の前提条件を満たしていることを確認してください。
- Google SecOps インスタンス
- Cloud Storage API が有効になっている GCP プロジェクト
- GCS バケットを作成および管理する権限
- GCS バケットの IAM ポリシーを管理する権限
- Cloud Run サービス、Pub/Sub トピック、Cloud Scheduler ジョブを作成する権限
- API アクセス権と有効な API キーを持つ SpyCloud アカウント
Google Cloud Storage バケットを作成する
- Google Cloud コンソールに移動します。
- プロジェクトを選択するか、新しいプロジェクトを作成します。
- ナビゲーション メニューで、[Cloud Storage > バケット] に移動します。
- [バケットを作成] をクリックします。
次の構成情報を提供してください。
設定 値 バケットに名前を付ける グローバルに一意の名前を入力します(例: spycloud-logs)。ロケーション タイプ ニーズに基づいて選択します(リージョン、デュアルリージョン、マルチリージョン)。 ロケーション ロケーションを選択します(例: us-central1)。ストレージ クラス Standard(頻繁にアクセスされるログにおすすめ) アクセス制御 均一(推奨) 保護ツール 省略可: オブジェクトのバージョニングまたは保持ポリシーを有効にする [作成] をクリックします。
SpyCloud API 認証情報を収集する
API キーを取得する
- 管理者として SpyCloud ポータルにログインします。
- [設定] > [API] に移動します。
- API キーをコピーして安全な場所に保存します。
権限を確認する
API キーに必要なアクセス権があることを確認するには:
- SpyCloud ポータルにログインします。
- [設定] > [API] に移動します。
- API キーが有効で、必要なエンドポイント(違反データ、ウォッチリスト、コンパス)にアクセスできることを確認します。
- アクセスが制限されている場合は、SpyCloud 管理者にお問い合わせください。
テスト API へのアクセス
統合に進む前に、認証情報をテストします。
# Replace with your actual API key API_KEY="your-api-key" # Test API access - fetch watchlist data curl -v -H "X-API-Key: ${API_KEY}" \ "https://api.spycloud.io/enterprise-v2/breach/data/watchlist?since=2024-01-01&until=2024-01-02"
Cloud Run functions のサービス アカウントを作成する
Cloud Run 関数には、GCS バケットに書き込み、Pub/Sub によって呼び出される権限を持つサービス アカウントが必要です。
サービス アカウントの作成
- GCP Console で、[IAM と管理>サービス アカウント] に移動します。
- [サービス アカウントを作成] をクリックします。
- 次の構成の詳細を入力します。
- サービス アカウント名: 「
spycloud-collector-sa」と入力します。 - サービス アカウントの説明: 「
Service account for Cloud Run function to collect SpyCloud logs」と入力します。
- サービス アカウント名: 「
- [作成して続行] をクリックします。
- [このサービス アカウントにプロジェクトへのアクセスを許可する] セクションで、次のロールを追加します。
- [ロールを選択] をクリックします。
- [ストレージ オブジェクト管理者] を検索して選択します。
- [+ 別のロールを追加] をクリックします。
- [Cloud Run 起動元] を検索して選択します。
- [+ 別のロールを追加] をクリックします。
- [Cloud Functions 起動元] を検索して選択します。
- [続行] をクリックします。
- [完了] をクリックします。
これらのロールは、次の操作に必要です。
- Storage オブジェクト管理者: ログを GCS バケットに書き込み、状態ファイルを管理する
- Cloud Run 起動元: Pub/Sub が関数を呼び出すことを許可します
- Cloud Functions 起動元: 関数の呼び出しを許可する
GCS バケットに対する IAM 権限を付与する
GCS バケットに対する書き込み権限をサービス アカウントに付与します。
- [Cloud Storage] > [バケット] に移動します。
- バケット名(
spycloud-logsなど)をクリックします。 - [権限] タブに移動します。
- [アクセス権を付与] をクリックします。
- 次の構成の詳細を入力します。
- プリンシパルを追加: サービス アカウントのメールアドレス(例:
spycloud-collector-sa@your-project.iam.gserviceaccount.com)を入力します。 - ロールを割り当てる: [ストレージ オブジェクト管理者] を選択します。
- プリンシパルを追加: サービス アカウントのメールアドレス(例:
- [保存] をクリックします。
Pub/Sub トピックの作成
Cloud Scheduler がパブリッシュし、Cloud Run functions の関数がサブスクライブする Pub/Sub トピックを作成します。
- GCP Console で、[Pub/Sub> トピック] に移動します。
- [トピックを作成] をクリックします。
- 次の構成の詳細を入力します。
- トピック ID: 「
spycloud-trigger」と入力します。 - その他の設定はデフォルトのままにします。
- トピック ID: 「
- [作成] をクリックします。
ログを収集する Cloud Run 関数を作成する
Cloud Run functions の関数は、Cloud Scheduler からの Pub/Sub メッセージによってトリガーされ、SpyCloud API からログを取得して GCS に書き込みます。
- GCP Console で、[Cloud Run] に移動します。
- [サービスを作成] をクリックします。
- [関数] を選択します(インライン エディタを使用して関数を作成します)。
[構成] セクションで、次の構成の詳細を指定します。
設定 値 サービス名 spycloud-collectorリージョン GCS バケットと一致するリージョンを選択します(例: us-central1)。ランタイム [Python 3.12] 以降を選択します。 [トリガー(省略可)] セクションで、次の操作を行います。
- [+ トリガーを追加] をクリックします。
- [Cloud Pub/Sub] を選択します。
- [Cloud Pub/Sub トピックを選択してください] で、トピック
spycloud-triggerを選択します。 - [保存] をクリックします。
[認証] セクションで次の操作を行います。
- [認証が必要] を選択します。
- Identity and Access Management(IAM)を確認します。
下にスクロールして、[コンテナ、ネットワーキング、セキュリティ] を開きます。
[セキュリティ] タブに移動します。
- サービス アカウント: サービス アカウント
spycloud-collector-saを選択します。
- サービス アカウント: サービス アカウント
[コンテナ] タブに移動します。
- [変数とシークレット] をクリックします。
- 環境変数ごとに [+ 変数を追加] をクリックします。
変数名 値の例 説明 GCS_BUCKETspycloud-logsGCS バケット名 GCS_PREFIXspycloudログファイルの接頭辞 STATE_KEYspycloud/state.json状態ファイルのパス API_KEYyour-api-keySpyCloud API キー API_BASEhttps://api.spycloud.ioAPI ベース URL MAX_RECORDS10000実行あたりの最大レコード数 LOOKBACK_DAYS7最初のルックバック期間(日数) STREAMSwatchlist,catalogカンマ区切りのデータ ストリーム [変数とシークレット] タブで [リクエスト] まで下にスクロールします。
- リクエストのタイムアウト:
600秒(10 分)を入力します。
- リクエストのタイムアウト:
[コンテナ] の [設定] タブに移動します。
- [リソース] セクションで次の操作を行います。
- メモリ: 512 MiB 以上を選択します。
- CPU: [1] を選択します。
- [リソース] セクションで次の操作を行います。
[リビジョン スケーリング] セクションで、次の操作を行います。
- [インスタンスの最小数] に「
0」と入力します。 - インスタンスの最大数:
100と入力します(または、予想される負荷に基づいて調整します)。
- [インスタンスの最小数] に「
[作成] をクリックします。
サービスが作成されるまで待ちます(1 ~ 2 分)。
サービスが作成されると、インライン コードエディタが自動的に開きます。
関数コードを追加する
- [関数のエントリ ポイント] に「main」と入力します。
インライン コードエディタで、次の 2 つのファイルを作成します。
最初のファイル: main.py:
import functions_framework from google.cloud import storage import json import os import urllib3 from datetime import datetime, timezone, timedelta import time # Initialize HTTP client with timeouts http = urllib3.PoolManager( timeout=urllib3.Timeout(connect=5.0, read=30.0), retries=False, ) # Initialize Storage client storage_client = storage.Client() # Environment variables GCS_BUCKET = os.environ.get('GCS_BUCKET') GCS_PREFIX = os.environ.get('GCS_PREFIX', 'spycloud') STATE_KEY = os.environ.get('STATE_KEY', 'spycloud/state.json') API_KEY = os.environ.get('API_KEY', '') API_BASE = os.environ.get('API_BASE', 'https://api.spycloud.io').rstrip('/') MAX_RECORDS = int(os.environ.get('MAX_RECORDS', '10000')) LOOKBACK_DAYS = int(os.environ.get('LOOKBACK_DAYS', '7')) STREAMS = [s.strip() for s in os.environ.get('STREAMS', 'watchlist').split(',') if s.strip()] def parse_datetime(value: str) -> datetime: """Parse ISO datetime string to datetime object.""" if value.endswith("Z"): value = value[:-1] + "+00:00" return datetime.fromisoformat(value) @functions_framework.cloud_event def main(cloud_event): """ Cloud Run function triggered by Pub/Sub to fetch SpyCloud logs and write to GCS. Args: cloud_event: CloudEvent object containing Pub/Sub message """ if not all([GCS_BUCKET, API_KEY]): print('Error: Missing required environment variables') return try: bucket = storage_client.bucket(GCS_BUCKET) # Load state state = load_state(bucket, STATE_KEY) now = datetime.now(timezone.utc) # Determine date range if isinstance(state, dict) and state.get("last_date"): since_date = state["last_date"] else: since_date = (now - timedelta(days=LOOKBACK_DAYS)).strftime('%Y-%m-%d') until_date = now.strftime('%Y-%m-%d') print(f"Fetching data from {since_date} to {until_date}") report = {} if 'watchlist' in STREAMS: print("Fetching watchlist breach data...") count = pull_watchlist(bucket, since_date, until_date) report['watchlist_records'] = count if 'catalog' in STREAMS: print("Fetching breach catalog...") count = pull_catalog(bucket, since_date, until_date) report['catalog_records'] = count if 'compass' in STREAMS: print("Fetching compass data...") count = pull_compass(bucket, since_date, until_date) report['compass_records'] = count # Update state save_state(bucket, STATE_KEY, until_date) print(f"Successfully processed: {json.dumps(report)}") except Exception as e: print(f'Error processing logs: {str(e)}') raise def load_state(bucket, key): """Load state from GCS.""" try: blob = bucket.blob(key) if blob.exists(): state_data = blob.download_as_text() return json.loads(state_data) except Exception as e: print(f"Warning: Could not load state: {e}") return {} def save_state(bucket, key, last_date: str): """Save the last query date to GCS state file.""" try: state = {'last_date': last_date, 'last_event_time': datetime.now(timezone.utc).isoformat()} blob = bucket.blob(key) blob.upload_from_string( json.dumps(state, indent=2), content_type='application/json' ) print(f"Saved state: last_date={last_date}") except Exception as e: print(f"Warning: Could not save state: {e}") def api_get(endpoint: str, params: dict = None): """Make authenticated GET request to SpyCloud API with rate limiting.""" url = f"{API_BASE}{endpoint}" if params: query = '&'.join([f"{k}={v}" for k, v in params.items()]) url = f"{url}?{query}" headers = { 'X-API-Key': API_KEY, 'Accept': 'application/json', 'User-Agent': 'GoogleSecOps-SpyCloudCollector/1.0' } backoff = 1.0 max_retries = 3 for attempt in range(max_retries): response = http.request('GET', url, headers=headers) if response.status == 429: retry_after = int(response.headers.get('Retry-After', str(int(backoff)))) print(f"Rate limited (429). Retrying after {retry_after}s...") time.sleep(retry_after) backoff = min(backoff * 2, 30.0) continue if response.status != 200: print(f"HTTP Error: {response.status} - {response.data.decode('utf-8')}") return None return json.loads(response.data.decode('utf-8')) print(f"Failed after {max_retries} retries due to rate limiting") return None def write_ndjson(bucket, prefix: str, stream_name: str, records: list): """Write records to GCS as NDJSON.""" if not records: return 0 now = datetime.now(timezone.utc) timestamp = now.strftime('%Y%m%d_%H%M%S') object_key = f"{GCS_PREFIX}/{stream_name}/logs_{timestamp}.ndjson" blob = bucket.blob(object_key) ndjson = '\n'.join([json.dumps(record, ensure_ascii=False) for record in records]) + '\n' blob.upload_from_string(ndjson, content_type='application/x-ndjson') print(f"Wrote {len(records)} records to gs://{GCS_BUCKET}/{object_key}") return len(records) def pull_watchlist(bucket, since_date: str, until_date: str): """Fetch watchlist breach data.""" cursor = None all_records = [] while True: params = {'since': since_date, 'until': until_date} if cursor: params['cursor'] = cursor data = api_get('/enterprise-v2/breach/data/watchlist', params) if not data: break results = data.get('results', []) if not results: break all_records.extend(results) if len(all_records) >= MAX_RECORDS: print(f"Reached max_records limit ({MAX_RECORDS})") break cursor = data.get('cursor') if not cursor: break return write_ndjson(bucket, GCS_PREFIX, 'watchlist', all_records) def pull_catalog(bucket, since_date: str, until_date: str): """Fetch breach catalog.""" params = {'since': since_date, 'until': until_date} data = api_get('/enterprise-v2/breach/catalog', params) if not data: return 0 results = data.get('results', []) return write_ndjson(bucket, GCS_PREFIX, 'catalog', results) def pull_compass(bucket, since_date: str, until_date: str): """Fetch compass findings.""" cursor = None all_records = [] while True: params = {'since': since_date, 'until': until_date} if cursor: params['cursor'] = cursor data = api_get('/enterprise-v2/compass/data', params) if not data: break results = data.get('results', []) if not results: break all_records.extend(results) if len(all_records) >= MAX_RECORDS: print(f"Reached max_records limit ({MAX_RECORDS})") break cursor = data.get('cursor') if not cursor: break return write_ndjson(bucket, GCS_PREFIX, 'compass', all_records)2 つ目のファイル: requirements.txt:
functions-framework==3.* google-cloud-storage==2.* urllib3>=2.0.0
[デプロイ] をクリックして、関数を保存してデプロイします。
デプロイが完了するまで待ちます(2 ~ 3 分)。
Cloud Scheduler ジョブの作成
Cloud Scheduler は、定期的に Pub/Sub トピックにメッセージをパブリッシュし、Cloud Run functions の関数をトリガーします。
- GCP Console で、[Cloud Scheduler] に移動します。
- [ジョブを作成] をクリックします。
次の構成情報を提供してください。
設定 値 名前 spycloud-collector-dailyリージョン Cloud Run functions と同じリージョンを選択する 周波数 0 0 * * *(毎日午前 0 時)タイムゾーン タイムゾーンを選択します(UTC を推奨) ターゲット タイプ Pub/Sub トピック トピック spycloud-triggerを選択します。メッセージ本文 {}(空の JSON オブジェクト)[作成] をクリックします。
スケジュールの頻度のオプション
ログの量とレイテンシの要件に基づいて頻度を選択します。
| 頻度 | CRON 式 | ユースケース |
|---|---|---|
| 5 分おき | */5 * * * * |
大容量、低レイテンシ |
| 15 分ごと | */15 * * * * |
検索ボリュームが普通 |
| 1 時間ごと | 0 * * * * |
標準 |
| 6 時間ごと | 0 */6 * * * |
少量、バッチ処理 |
| 毎日 | 0 0 * * * |
過去のデータの収集(侵害データに推奨) |
統合をテストする
- Cloud Scheduler コンソールで、ジョブ(
spycloud-collector-daily)を見つけます。 - [強制実行] をクリックして手動でトリガーします。
- 数秒待ってから、[Cloud Run] > [サービス] > [spycloud-collector] > [ログ] に移動します。
関数が正常に実行されたことを確認します。以下のものを探します。
Fetching data from YYYY-MM-DD to YYYY-MM-DD Fetching watchlist breach data... Wrote X records to gs://spycloud-logs/spycloud/watchlist/logs_YYYYMMDD_HHMMSS.ndjson Successfully processed: {"watchlist_records": X, "catalog_records": Y}GCS バケット(
spycloud-logs)をチェックして、ログが書き込まれたことを確認します。
ログにエラーが表示された場合:
- HTTP 401: 環境変数で API キーを確認する
- HTTP 403: 必要なエンドポイントに API キーがアクセスできることを確認する
- HTTP 429: レート制限 - 関数はバックオフで自動的に再試行されます
- 環境変数が不足している: 必要な変数がすべて設定されていることを確認します
SpyCloud のログを取り込むように Google SecOps でフィードを構成する
- [SIEM 設定] > [フィード] に移動します。
- [Add New Feed] をクリックします。
- [単一フィードを設定] をクリックします。
- [フィード名] フィールドに、フィードの名前を入力します(例:
SpyCloud Logs)。 - [ソースタイプ] として [Google Cloud Storage V2] を選択します。
- [ログタイプ] として [SpyCloud] を選択します。
[サービス アカウントを取得する] をクリックします。一意のサービス アカウントのメールアドレスが表示されます(例:)。
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.comこのメールアドレスをコピーします。次のステップでこれを使用します。
[次へ] をクリックします。
次の入力パラメータの値を指定します。
ストレージ バケットの URL: 接頭辞パスを含む GCS バケット URI を入力します。
gs://spycloud-logs/spycloud/- 次のように置き換えます。
spycloud-logs: GCS バケット名。spycloud: ログが保存されるオプションの接頭辞/フォルダパス(ルートの場合は空のままにします)。
- 次のように置き換えます。
Source deletion option: 必要に応じて削除オプションを選択します。
- なし: 転送後にファイルを削除しません(テストにおすすめ)。
- 転送されたファイルを削除する: 転送が完了した後にファイルを削除します。
転送されたファイルと空のディレクトリを削除する: 転送が完了した後にファイルと空のディレクトリを削除します。
ファイルの最大経過日数: 指定した日数以内に変更されたファイルを含めます(デフォルトは 180 日)。
アセットの名前空間: アセットの名前空間。
Ingestion labels: このフィードのイベントに適用されるラベル。
[次へ] をクリックします。
[Finalize] 画面で新しいフィードの設定を確認し、[送信] をクリックします。
Google SecOps サービス アカウントに IAM 権限を付与する
Google SecOps サービス アカウントには、GCS バケットに対する Storage オブジェクト閲覧者ロールが必要です。
- [Cloud Storage] > [バケット] に移動します。
- バケット名(
spycloud-logs)をクリックします。 - [権限] タブに移動します。
- [アクセス権を付与] をクリックします。
- 次の構成の詳細を入力します。
- プリンシパルを追加: Google SecOps サービス アカウントのメールアドレスを貼り付けます。
- ロールを割り当てる: [Storage オブジェクト閲覧者] を選択します。
[保存] をクリックします。
UDM マッピング テーブル
| ログフィールド | UDM マッピング | ロジック |
|---|---|---|
| av_softwares、assets.av_softwares、assets.country、assets.country_code、assets.display_resolution、assets.email、assets.full_name、assets.infected_machine_id、assets.infected_path、assets.infected_time、assets.ip_addresses、assets.keyboard_languages、assets.password、assets.target_url、assets.username、assets.user_browser、assets.user_hostname、assets.user_os、assets.user_sys_registered_owner | additional.fields | これらのフィールドと av_software_list から作成されたラベルから統合されました |
| パスワード | extensions.auth.auth_details | 値を直接コピーしました |
| when | metadata.event_timestamp | ISO8601 タイムスタンプとして解析されます |
| metadata.event_type | 条件に基づいて設定されます。has_principal、has_target、has_network の場合は NETWORK_CONNECTION、has_principal と has_principal_userid の場合は USER_UNCATEGORIZED、has_principal で has_principal_ip でない場合は STATUS_UPDATE、それ以外の場合は GENERIC_EVENT、または user_hostname、ip、infected_machine_id が存在する場合は USER_UNCATEGORIZED | |
| infected_time | metadata.ingested_timestamp | yyyy-MM-ddTHH:mm:ssZ、RFC3339、ISO8601 形式のタイムスタンプとして解析されます |
| log_id、assets.log_id、uuid | metadata.product_log_id | log_id(空でない場合)、assets.log_id、uuid の順に値を取得します。 |
| user_os | network.http.parsed_user_agent | 解析されたユーザー エージェントに変換 |
| user_os | network.http.user_agent | 値を直接コピーしました |
| cookie_domain | principal.administrative_domain | 値を直接コピーしました |
| country | principal.asset.location.country_or_region | 値を直接コピーしました |
| infected_machine_id | principal.asset_id | 「id: 」+ 感染したマシンの ID として連結されます。 |
| infected_path | principal.file.full_path | 値を直接コピーしました |
| user_hostname、domain | principal.hostname | 空でない場合は user_hostname に設定し、空でない場合はドメインに設定します |
| ip、ip_addresses | principal.ip | ip 配列と ip_addresses 配列から統合 |
| country_code | principal.resource.attribute.labels | country_code から作成された country_code_label から統合 |
| id | principal.resource.id | 値が直接コピーされた(文字列に変換された) |
| ホームページ | principal.url | 値を直接コピーしました |
| メール | principal.user.email_addresses | 値を直接コピーしました |
| full_name | principal.user.user_display_name | 値を直接コピーしました |
| user_sys_registered_owner、email_username | principal.user.userid | user_sys_registered_owner に設定し、空でない場合は email_username に設定 |
| confidence | security_result.confidence_details | 文字列に変換しました |
| 説明 | security_result.description | 値を直接コピーしました |
| cookie_expiration、cookie_name、cookie_subdomain、cookie_value、day、document_id、locality_zone、source_id、spycloud_publishdate、spycloud_publish_date、user_browser、infected_time、timezone、password_type、password_plaintext、email_domain、api_token、account_status、breach_category、breach_main_category、consumer_category、malware_family、num_records、premium_flag、sensitive_source、short_title、site_description、title、tlp、type、display_resolution、keyboard_languages | security_result.detection_fields | これらのフィールドから作成されたラベルから統合されました |
| 重要度 | security_result.severity | 「2」の場合は LOW、「5」の場合は INFORMATIONAL、「20」の場合は HIGH、「25」または「26」の場合は CRITICAL に設定します。 |
| 重要度 | security_result.severity_details | 値を直接コピーしました |
| target_subdomain | target.administrative_domain | 値を直接コピーしました |
| target_domain | target.asset.hostname | 値を直接コピーしました |
| target_domain | target.hostname | 値を直接コピーしました |
| target_url | target.url | 値を直接コピーしました |
| ユーザー名 | target.user.userid | 値を直接コピーしました |
| metadata.product_name | 「SPYCLOUD」に設定 | |
| metadata.vendor_name | 「SpyCloud」に設定 |
さらにサポートが必要な場合 コミュニティ メンバーや Google SecOps のプロフェッショナルから回答を得ることができます。