CSV カスタム IOC ファイルを収集する
このドキュメントでは、Google Cloud Storage を使用して CSV カスタム IOC ファイルを Google Security Operations に取り込む方法について説明します。次に、これらのフィールドを UDM にマッピングし、IP、ドメイン、ハッシュなどのさまざまなデータ型を処理し、脅威の詳細、エンティティ情報、重大度レベルで出力を拡充します。
始める前に
次の前提条件を満たしていることを確認します。
- Google SecOps インスタンス
- Cloud Storage API が有効になっている GCP プロジェクト
- GCS バケットを作成および管理する権限
- GCS バケットの IAM ポリシーを管理する権限
- Cloud Run サービス、Pub/Sub トピック、Cloud Scheduler ジョブを作成する権限
- 1 つ以上の CSV IOC フィード URL(HTTPS)または CSV を提供する内部エンドポイントへのアクセス
Google Cloud Storage バケットを作成する
- Google Cloud Console に移動します。
- プロジェクトを選択するか、新しいプロジェクトを作成します。
- ナビゲーション メニューで、[Cloud Storage > バケット] に移動します。
- [バケットを作成] をクリックします。
次の構成情報を提供してください。
設定 値 バケットに名前を付ける グローバルに一意の名前( csv-ioc-logsなど)を入力します。ロケーション タイプ ニーズに基づいて選択します(リージョン、デュアルリージョン、マルチリージョン)。 ロケーション ロケーションを選択します(例: us-central1)。ストレージ クラス Standard(頻繁にアクセスされるログにおすすめ) アクセス制御 均一(推奨) 保護ツール 省略可: オブジェクトのバージョニングまたは保持ポリシーを有効にする [作成] をクリックします。
Cloud Run functions のサービス アカウントを作成する
Cloud Run 関数には、GCS バケットに書き込み、Pub/Sub によって呼び出される権限を持つサービス アカウントが必要です。
サービス アカウントの作成
- GCP Console で、[IAM と管理>サービス アカウント] に移動します。
- [サービス アカウントを作成] をクリックします。
- 次の構成の詳細を指定します。
- サービス アカウント名: 「
csv-ioc-collector-sa」と入力します。 - サービス アカウントの説明: 「
Service account for Cloud Run function to collect CSV IOC files」と入力します。
- サービス アカウント名: 「
- [作成して続行] をクリックします。
- [このサービス アカウントにプロジェクトへのアクセスを許可する] セクションで、次のロールを追加します。
- [ロールを選択] をクリックします。
- [ストレージ オブジェクト管理者] を検索して選択します。
- [+ 別のロールを追加] をクリックします。
- [Cloud Run 起動元] を検索して選択します。
- [+ 別のロールを追加] をクリックします。
- [Cloud Functions 起動元] を検索して選択します。
- [続行] をクリックします。
- [完了] をクリックします。
これらのロールは、次の目的で必要です。
- Storage オブジェクト管理者: GCS バケットにログを書き込む
- Cloud Run 起動元: Pub/Sub が関数を呼び出すことを許可する
- Cloud Functions 起動元: 関数の呼び出しを許可する
GCS バケットに対する IAM 権限を付与する
GCS バケットに対する書き込み権限をサービス アカウントに付与します。
- [Cloud Storage] > [バケット] に移動します。
- バケット名をクリックします。
- [権限] タブに移動します。
- [アクセス権を付与] をクリックします。
- 次の構成の詳細を指定します。
- プリンシパルを追加: サービス アカウントのメールアドレス(例:
csv-ioc-collector-sa@PROJECT_ID.iam.gserviceaccount.com)を入力します。 - ロールを割り当てる: [Storage オブジェクト管理者] を選択します。
- プリンシパルを追加: サービス アカウントのメールアドレス(例:
- [保存] をクリックします。
Pub/Sub トピックの作成
Cloud Scheduler がパブリッシュし、Cloud Run functions がサブスクライブする Pub/Sub トピックを作成します。
- GCP Console で、[Pub/Sub> トピック] に移動します。
- [トピックを作成] をクリックします。
- 次の構成の詳細を指定します。
- トピック ID: 「
csv-ioc-trigger」と入力します。 - その他の設定はデフォルトのままにします。
- トピック ID: 「
- [作成] をクリックします。
CSV IOC ファイルを収集する Cloud Run 関数を作成する
Cloud Run functions は、Cloud Scheduler からの Pub/Sub メッセージによってトリガーされ、HTTPS エンドポイントから CSV IOC ファイルを取得して GCS に書き込みます。
- GCP Console で、[Cloud Run] に移動します。
- [サービスを作成] をクリックします。
- [関数] を選択します(インライン エディタを使用して関数を作成します)。
[構成] セクションで、次の構成の詳細を指定します。
設定 値 サービス名 csv-ioc-collectorリージョン GCS バケットと一致するリージョンを選択します(例: us-central1)。ランタイム [Python 3.12] 以降を選択します。 [トリガー(省略可)] セクションで、次の操作を行います。
- [+ トリガーを追加] をクリックします。
- [Cloud Pub/Sub] を選択します。
- [Cloud Pub/Sub トピックを選択してください] で、Pub/Sub トピック(
csv-ioc-trigger)を選択します。 - [保存] をクリックします。
[認証] セクションで、次の操作を行います。
- [認証が必要] を選択します。
- Identity and Access Management(IAM)を確認します。
下にスクロールして、[コンテナ、ネットワーキング、セキュリティ] を開きます。
[セキュリティ] タブに移動します。
- サービス アカウント: サービス アカウントを選択します(
csv-ioc-collector-sa)。
- サービス アカウント: サービス アカウントを選択します(
[コンテナ] タブに移動します。
- [変数とシークレット] をクリックします。
- 環境変数ごとに [+ 変数を追加] をクリックします。
変数名 値の例 説明 GCS_BUCKETcsv-ioc-logsGCS バケット名 GCS_PREFIXcsv-iocログファイルの接頭辞 IOC_URLShttps://ioc.example.com/feed.csv,https://another.example.org/iocs.csvカンマ区切りの HTTPS URL AUTH_HEADERAuthorization: Bearer <token>オプションの認証ヘッダー TIMEOUT60リクエスト タイムアウト(秒) [変数とシークレット] セクションで、[リクエスト] まで下にスクロールします。
- リクエストのタイムアウト:
600秒(10 分)を入力します。
- リクエストのタイムアウト:
[設定] タブに移動します。
- [リソース] セクションで次の操作を行います。
- メモリ: 512 MiB 以上を選択します。
- CPU: [1] を選択します。
- [完了] をクリックします。
- [リソース] セクションで次の操作を行います。
[リビジョン スケーリング] セクションで、次の操作を行います。
- [インスタンスの最小数] に「
0」と入力します。 - インスタンスの最大数:
100と入力します(または、予想される負荷に基づいて調整します)。
- [インスタンスの最小数] に「
[作成] をクリックします。
サービスが作成されるまで待ちます(1 ~ 2 分)。
サービスを作成すると、インライン コードエディタが自動的に開きます。
関数コードを追加する
- [関数のエントリ ポイント] に「main」と入力します。
インライン コードエディタで、次の 2 つのファイルを作成します。
- 最初のファイル: main.py:
import functions_framework from google.cloud import storage import json import os import urllib3 from datetime import datetime, timezone import time # Initialize HTTP client with timeouts http = urllib3.PoolManager( timeout=urllib3.Timeout(connect=5.0, read=30.0), retries=False, ) # Initialize Storage client storage_client = storage.Client() @functions_framework.cloud_event def main(cloud_event): """ Cloud Run function triggered by Pub/Sub to fetch CSV IOC feeds over HTTPS and write to GCS. Args: cloud_event: CloudEvent object containing Pub/Sub message """ # Get environment variables bucket_name = os.environ.get('GCS_BUCKET') prefix = os.environ.get('GCS_PREFIX', 'csv-ioc').strip('/') ioc_urls_str = os.environ.get('IOC_URLS', '') auth_header = os.environ.get('AUTH_HEADER', '') timeout = int(os.environ.get('TIMEOUT', '60')) ioc_urls = [u.strip() for u in ioc_urls_str.split(',') if u.strip()] if not bucket_name: print('Error: GCS_BUCKET environment variable is required') return if not ioc_urls: print('Error: IOC_URLS must contain at least one HTTPS URL') return try: # Get GCS bucket bucket = storage_client.bucket(bucket_name) run_ts = int(time.time()) written = [] for i, url in enumerate(ioc_urls): print(f'Processing URL {i+1}/{len(ioc_urls)}: {url}') # Build request req_headers = {'Accept': 'text/csv, */*'} # Add authentication header if provided if auth_header: if ':' in auth_header: k, v = auth_header.split(':', 1) req_headers[k.strip()] = v.strip() else: req_headers['Authorization'] = auth_header.strip() # Fetch data with retries data = fetch_with_retries(url, req_headers, timeout) if data: # Write to GCS key = generate_blob_name(prefix, url, run_ts, i) blob = bucket.blob(key) blob.upload_from_string(data, content_type='text/csv') written.append({ 'url': url, 'gcs_key': key, 'bytes': len(data) }) print(f'Wrote {len(data)} bytes to gs://{bucket_name}/{key}') else: print(f'Warning: No data retrieved from {url}') print(f'Successfully processed {len(written)} URLs') print(json.dumps({'ok': True, 'written': written}, indent=2)) except Exception as e: print(f'Error processing CSV IOC feeds: {str(e)}') raise def fetch_with_retries(url, headers, timeout, max_retries=5): """Fetch data from URL with retry logic for 429/5xx errors.""" if not url.lower().startswith('https://'): raise ValueError('Only HTTPS URLs are allowed in IOC_URLS') attempt = 0 backoff = 1.0 while attempt < max_retries: try: response = http.request('GET', url, headers=headers, timeout=timeout) if response.status == 200: return response.data.decode('utf-8') elif response.status == 429 or (500 <= response.status < 600): print(f'Received status {response.status}, retrying in {backoff}s (attempt {attempt+1}/{max_retries})') time.sleep(backoff) attempt += 1 backoff *= 2 else: print(f'Error: Received unexpected status {response.status} from {url}') return None except Exception as e: if attempt < max_retries - 1: print(f'Request failed: {str(e)}, retrying in {backoff}s (attempt {attempt+1}/{max_retries})') time.sleep(backoff) attempt += 1 backoff *= 2 else: raise print(f'Max retries exceeded for {url}') return None def generate_blob_name(prefix, url, run_ts, idx): """Generate a unique blob name for the CSV file.""" # Create a short, filesystem-safe token for the URL safe_url = url.replace('://', '_').replace('/', '_').replace('?', '_').replace('&', '_')[:100] # Generate timestamp-based path timestamp_path = time.strftime('%Y/%m/%d/%H%M%S', time.gmtime(run_ts)) return f"{prefix}/{timestamp_path}-url{idx:03d}-{safe_url}.csv"- 2 つ目のファイル: requirements.txt:
functions-framework==3.* google-cloud-storage==2.* urllib3>=2.0.0[デプロイ] をクリックして、関数を保存してデプロイします。
デプロイが完了するまで待ちます(2 ~ 3 分)。
Cloud Scheduler ジョブの作成
Cloud Scheduler は、定期的に Pub/Sub トピックにメッセージをパブリッシュし、Cloud Run functions の関数をトリガーします。
- GCP Console で、[Cloud Scheduler] に移動します。
- [ジョブを作成] をクリックします。
次の構成情報を提供してください。
設定 値 名前 csv-ioc-collector-hourlyリージョン Cloud Run functions と同じリージョンを選択する 周波数 0 * * * *(1 時間ごとに正時)タイムゾーン タイムゾーンを選択します(UTC を推奨)。 ターゲット タイプ Pub/Sub トピック Pub/Sub トピック( csv-ioc-trigger)を選択するメッセージ本文 {}(空の JSON オブジェクト)[作成] をクリックします。
スケジュールの頻度のオプション
ログの量とレイテンシの要件に基づいて頻度を選択します。
頻度 CRON 式 ユースケース 5 分毎 */5 * * * *大容量、低レイテンシ 15 分ごと */15 * * * *検索量が普通 1 時間ごと 0 * * * *標準(推奨) 6 時間ごと 0 */6 * * *少量、バッチ処理 毎日 0 0 * * *履歴データの収集
統合をテストする
- Cloud Scheduler コンソールで、ジョブ(
csv-ioc-collector-hourly)を見つけます。 - [強制実行] をクリックして、ジョブを手動でトリガーします。
- 数秒待ちます。
- Cloud Run > サービスに移動します。
- 関数名(
csv-ioc-collector)をクリックします。 - [Logs] タブをクリックします。
関数が正常に実行されたことを確認します。次の内容を確認します。
Processing URL 1/X: https://... Wrote X bytes to gs://csv-ioc-logs/csv-ioc/YYYY/MM/DD/HHMMSS-url000-...csv Successfully processed X URLs[Cloud Storage] > [バケット] に移動します。
バケット名(
csv-ioc-logs)をクリックします。プレフィックス フォルダ(
csv-ioc/)に移動します。新しい
.csvファイルが現在のタイムスタンプで作成されたことを確認します。
ログにエラーが表示された場合:
- HTTP 401/403: AUTH_HEADER 環境変数を確認する
- HTTP 429: レート制限 - 関数はバックオフで自動的に再試行されます
- 環境変数が不足している: 必要な変数がすべて設定されていることを確認します
- HTTPS URL のみ許可: IOC_URLS に HTTPS URL のみが含まれていることを確認します
Google SecOps サービス アカウントを取得する
Google SecOps は、一意のサービス アカウントを使用して GCS バケットからデータを読み取ります。このサービス アカウントにバケットへのアクセス権を付与する必要があります。
サービス アカウントのメールアドレスを取得する
- [SIEM 設定] > [フィード] に移動します。
- [Add New Feed] をクリックします。
- [単一フィードを設定] をクリックします。
- [フィード名] フィールドに、フィードの名前を入力します(例:
CSV Custom IOC)。 - [ソースタイプ] として [Google Cloud Storage V2] を選択します。
- [ログタイプ] として [CSV カスタム IOC] を選択します。
[サービス アカウントを取得する] をクリックします。一意のサービス アカウント メールアドレスが表示されます(例:)。
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.comこのメールアドレスをコピーして、次のステップで使用します。
Google SecOps サービス アカウントに IAM 権限を付与する
Google SecOps サービス アカウントには、GCS バケットに対する Storage オブジェクト閲覧者ロールが必要です。
- [Cloud Storage] > [バケット] に移動します。
- バケット名(
csv-ioc-logs)をクリックします。 - [権限] タブに移動します。
- [アクセス権を付与] をクリックします。
- 次の構成の詳細を指定します。
- プリンシパルを追加: Google SecOps サービス アカウントのメールアドレスを貼り付けます。
- ロールを割り当てる: [ストレージ オブジェクト閲覧者] を選択します。
[保存] をクリックします。
CSV カスタム IOC ファイルを取り込むように Google SecOps でフィードを構成する
- [SIEM 設定] > [フィード] に移動します。
- [Add New Feed] をクリックします。
- [単一フィードを設定] をクリックします。
- [フィード名] フィールドに、フィードの名前を入力します(例:
CSV Custom IOC)。 - [ソースタイプ] として [Google Cloud Storage V2] を選択します。
- [ログタイプ] として [CSV カスタム IOC] を選択します。
- [次へ] をクリックします。
次の入力パラメータの値を指定します。
ストレージ バケットの URL: 接頭辞パスを含む GCS バケット URI を入力します。
gs://csv-ioc-logs/csv-ioc/次のように置き換えます。
csv-ioc-logs: GCS バケット名。csv-ioc: ログが保存される接頭辞/フォルダパス(省略可)。
例:
- ルートバケット:
gs://csv-ioc-logs/ - 接頭辞あり:
gs://csv-ioc-logs/csv-ioc/ - サブフォルダあり:
gs://csv-ioc-logs/ioc-feeds/
- ルートバケット:
Source deletion option: 必要に応じて削除オプションを選択します。
- なし: 転送後にファイルを削除しません(テストにおすすめ)。
- 転送されたファイルを削除する: 転送が完了した後にファイルを削除します。
転送されたファイルと空のディレクトリを削除する: 転送が完了した後にファイルと空のディレクトリを削除します。
ファイルの最大経過日数: 指定した日数以内に変更されたファイルを含めます。デフォルトは 180 日です。
アセットの名前空間: アセットの名前空間。
Ingestion labels: このフィードのイベントに適用されるラベル。
[次へ] をクリックします。
[Finalize] 画面で新しいフィードの設定を確認し、[送信] をクリックします。
UDM マッピング テーブル
| ログフィールド | UDM マッピング | 論理 |
|---|---|---|
| asn | entity.metadata.threat.detection_fields.asn_label.value | 「asn」フィールドから直接マッピングされます。 |
| category | entity.metadata.threat.category_details | 「category」フィールドから直接マッピングされます。 |
| 分類 | entity.metadata.threat.category_details | 「classification - 」に追加され、「entity.metadata.threat.category_details」フィールドにマッピングされます。 |
| column2 | entity.entity.hostname | [category] が「. ?ip」または「. ?proxy」と一致し、[not_ip] が true の場合、「entity.entity.hostname」にマッピングされます。 |
| column2 | entity.entity.ip | [category] が「. ?ip」または「. ?proxy」と一致し、[not_ip] が false の場合、「entity.entity.ip」に統合されます。 |
| 自信 | entity.metadata.threat.confidence_score | 浮動小数点数に変換され、「entity.metadata.threat.confidence_score」フィールドにマッピングされます。 |
| country | entity.entity.location.country_or_region | 「country」フィールドから直接マッピングされます。 |
| date_first | entity.metadata.threat.first_discovered_time | ISO8601 として解析され、「entity.metadata.threat.first_discovered_time」フィールドにマッピングされます。 |
| date_last | entity.metadata.threat.last_updated_time | ISO8601 として解析され、「entity.metadata.threat.last_updated_time」フィールドにマッピングされます。 |
| 詳細 | entity.metadata.threat.summary | 「detail」フィールドから直接マッピングされます。 |
| detail2 | entity.metadata.threat.description | 「detail2」フィールドから直接マッピングされます。 |
| ドメイン | entity.entity.hostname | 「domain」フィールドから直接マッピングされます。 |
| メール | entity.entity.user.email_addresses | 「entity.entity.user.email_addresses」フィールドに統合されます。 |
| id | entity.metadata.product_entity_id | 「id - 」に追加され、「entity.metadata.product_entity_id」フィールドにマッピングされます。 |
| import_session_id | entity.metadata.threat.detection_fields.import_session_id_label.value | 「import_session_id」フィールドから直接マッピングされます。 |
| itype | entity.metadata.threat.detection_fields.itype_label.value | 「itype」フィールドから直接マッピングされます。 |
| lat | entity.entity.location.region_latitude | 浮動小数点数に変換され、「entity.entity.location.region_latitude」フィールドにマッピングされます。 |
| lon | entity.entity.location.region_longitude | 浮動小数点数に変換され、「entity.entity.location.region_longitude」フィールドにマッピングされます。 |
| maltype | entity.metadata.threat.detection_fields.maltype_label.value | 「maltype」フィールドから直接マッピングされます。 |
| md5 | entity.entity.file.md5 | 「md5」フィールドから直接マッピングされます。 |
| メディア | entity.metadata.threat.detection_fields.media_label.value | 「media」フィールドから直接マッピングされます。 |
| media_type | entity.metadata.threat.detection_fields.media_type_label.value | 「media_type」フィールドから直接マッピングされます。 |
| 組織 | entity.metadata.threat.detection_fields.org_label.value | 「org」フィールドから直接マッピングされます。 |
| resource_uri | entity.entity.url | [itype] が「(ip |
| resource_uri | entity.metadata.threat.url_back_to_product | [itype] が「(ip |
| スコア | entity.metadata.threat.confidence_details | 「score」フィールドから直接マッピングされます。 |
| 重要度 | entity.metadata.threat.severity | 大文字に変換され、「LOW」、「MEDIUM」、「HIGH」、「CRITICAL」と一致する場合は、「entity.metadata.threat.severity」フィールドにマッピングされます。 |
| source | entity.metadata.threat.detection_fields.source_label.value | 「source」フィールドから直接マッピングされます。 |
| source_feed_id | entity.metadata.threat.detection_fields.source_feed_id_label.value | 「source_feed_id」フィールドから直接マッピングされます。 |
| srcip | entity.entity.ip | [srcip] が空ではなく、[value] と等しくない場合は、「entity.entity.ip」に統合されます。 |
| state | entity.metadata.threat.detection_fields.state_label.value | 「state」フィールドから直接マッピングされます。 |
| trusted_circle_ids | entity.metadata.threat.detection_fields.trusted_circle_ids_label.value | 「trusted_circle_ids」フィールドから直接マッピングされます。 |
| update_id | entity.metadata.threat.detection_fields.update_id_label.value | 「update_id」フィールドから直接マッピングされます。 |
| 値 | entity.entity.file.full_path | [category] が「.*?file」と一致する場合、「entity.entity.file.full_path」にマッピングされます。 |
| 値 | entity.entity.file.md5 | [category] が「.*?md5」と一致し、[value] が 32 文字の 16 進文字列の場合、「entity.entity.file.md5」にマッピングされます。 |
| 値 | entity.entity.file.sha1 | ([category] が「. ?md5」と一致し、[value] が 40 文字の 16 進文字列である)または([category] が「. ?sha1」と一致し、[value] が 40 文字の 16 進文字列である)場合、「entity.entity.file.sha1」にマッピングされます。 |
| 値 | entity.entity.file.sha256 | ([category] が「. ?md5」と一致し、[value] が 16 進文字列で、[file_type] が「md5」でない) または ([category] が「. ?sha256」と一致し、[value] が 16 進文字列である) 場合、「entity.entity.file.sha256」にマッピングされます。 |
| 値 | entity.entity.hostname | ([category] が「. ?domain」と一致する)または([category] が「. ?ip」または「.*?proxy」と一致し、[not_ip] が true)の場合、「entity.entity.hostname」にマッピングされます。 |
| 値 | entity.entity.url | ([category] が「.*?url」と一致する) または ([category] が「url」と一致し、[resource_uri] が空でない) 場合、「entity.entity.url」にマッピングされます。 |
| なし | entity.metadata.collected_timestamp | イベントのタイムスタンプが入力されます。 |
| なし | entity.metadata.interval.end_time | 253402300799 秒の定数値に設定します。 |
| なし | entity.metadata.interval.start_time | イベントのタイムスタンプが入力されます。 |
| なし | entity.metadata.vendor_name | 「カスタム IOC」の定数値に設定します。 |
ご不明な点がございましたら、コミュニティ メンバーや Google SecOps のプロフェッショナルから回答を得ることができます。