Proofpoint Emerging Threats Pro の IOC ログを収集する
このドキュメントでは、Google Cloud Storage を使用して Proofpoint Emerging Threats Pro IOC ログを Google Security Operations に取り込む方法について説明します。Emerging Threats Intelligence は、カテゴリ、スコア、時間情報などの脅威インテリジェンス データを含む IP とドメインの評価リストを CSV 形式で 1 時間ごとに公開しています。パーサーコードは、CSV 形式の ET_PRO 脅威インテリジェンス データを処理します。IP アドレス、ドメイン、カテゴリ、スコアなどの関連情報を抽出し、標準化された IOC 形式と Chronicle UDM スキーマの両方にマッピングして、Google SecOps 内でのさらなる分析と使用を可能にします。
始める前に
次の前提条件を満たしていることを確認します。
- Google SecOps インスタンス
- Cloud Storage API が有効になっている GCP プロジェクト
- GCS バケットを作成および管理する権限
- GCS バケットの IAM ポリシーを管理する権限
- Cloud Run サービス、Pub/Sub トピック、Cloud Scheduler ジョブを作成する権限
- 評判リストにアクセスできる Proofpoint ET Intelligence サブスクリプション
- https://etadmin.proofpoint.com/api-access からの ET Intelligence API キー
Emerging Threats Pro の前提条件を収集する
- ET Intelligence 管理ポータル(https://etadmin.proofpoint.com)にログインします。
- [API アクセス] に移動します。
- API キーをコピーして保存します。
- Proofpoint の担当者に連絡して、次の情報を入手します。
- 詳細な IP レピュテーション リストの URL
- 詳細なドメインの評判リストの URL
ET Intelligence は、IP とドメインの評判リストを個別の CSV ファイルで提供し、1 時間ごとに更新します。次の列を含む [詳細] 形式を使用します。
- ドメインリスト: ドメイン名、カテゴリ、スコア、初回検出、最終検出、ポート
- IP リスト: IP アドレス、カテゴリ、スコア、初回検出、最終検出、ポート
詳細形式の URL は通常、次のパターンに従います。
- IP リスト:
https://rules.emergingthreatspro.com/[your-code]/reputation/detailed-iprepdata.txt - ドメインリスト:
https://rules.emergingthreatspro.com/[your-code]/reputation/detailed-domainrepdata.txt
Authorization ヘッダーには、Emerging Threats API の要件に合わせて、Bearer 接頭辞のない生の API キー値が含まれている必要があります。
Google Cloud Storage バケットを作成する
- Google Cloud Console に移動します。
- プロジェクトを選択するか、新しいプロジェクトを作成します。
- ナビゲーション メニューで、[Cloud Storage > バケット] に移動します。
- [バケットを作成] をクリックします。
次の構成情報を提供してください。
設定 値 バケットに名前を付ける グローバルに一意の名前( et-pro-ioc-bucketなど)を入力します。ロケーション タイプ ニーズに基づいて選択します(リージョン、デュアルリージョン、マルチリージョン)。 ロケーション ロケーションを選択します(例: us-central1)。ストレージ クラス Standard(頻繁にアクセスされるログにおすすめ) アクセス制御 均一(推奨) 保護ツール 省略可: オブジェクトのバージョニングまたは保持ポリシーを有効にする [作成] をクリックします。
Cloud Run functions のサービス アカウントを作成する
Cloud Run 関数には、GCS バケットに書き込み、Pub/Sub によって呼び出される権限を持つサービス アカウントが必要です。
サービス アカウントの作成
- GCP Console で、[IAM と管理>サービス アカウント] に移動します。
- [サービス アカウントを作成] をクリックします。
- 次の構成の詳細を指定します。
- サービス アカウント名: 「
et-pro-ioc-fetcher-sa」と入力します。 - サービス アカウントの説明: 「
Service account for Cloud Run function to collect Proofpoint ET Pro IOC logs」と入力します。
- サービス アカウント名: 「
- [作成して続行] をクリックします。
- [このサービス アカウントにプロジェクトへのアクセスを許可する] セクションで、次のロールを追加します。
- [ロールを選択] をクリックします。
- [ストレージ オブジェクト管理者] を検索して選択します。
- [+ 別のロールを追加] をクリックします。
- [Cloud Run 起動元] を検索して選択します。
- [+ 別のロールを追加] をクリックします。
- [Cloud Functions 起動元] を検索して選択します。
- [続行] をクリックします。
- [完了] をクリックします。
これらのロールは、次の目的で必要です。
- Storage オブジェクト管理者: ログを GCS バケットに書き込み、状態ファイルを管理する
- Cloud Run 起動元: Pub/Sub が関数を呼び出すことを許可する
- Cloud Functions 起動元: 関数の呼び出しを許可する
GCS バケットに対する IAM 権限を付与する
GCS バケットに対する書き込み権限をサービス アカウントに付与します。
- [Cloud Storage] > [バケット] に移動します。
- バケット名をクリックします。
- [権限] タブに移動します。
- [アクセス権を付与] をクリックします。
- 次の構成の詳細を指定します。
- プリンシパルを追加: サービス アカウントのメールアドレス(例:
et-pro-ioc-fetcher-sa@PROJECT_ID.iam.gserviceaccount.com)を入力します。 - ロールを割り当てる: [Storage オブジェクト管理者] を選択します。
- プリンシパルを追加: サービス アカウントのメールアドレス(例:
- [保存] をクリックします。
Pub/Sub トピックの作成
Cloud Scheduler がパブリッシュし、Cloud Run functions がサブスクライブする Pub/Sub トピックを作成します。
- GCP Console で、[Pub/Sub> トピック] に移動します。
- [トピックを作成] をクリックします。
- 次の構成の詳細を指定します。
- トピック ID: 「
et-pro-ioc-trigger」と入力します。 - その他の設定はデフォルトのままにします。
- トピック ID: 「
- [作成] をクリックします。
ログを収集する Cloud Run 関数を作成する
Cloud Run 関数は、Cloud Scheduler からの Pub/Sub メッセージによってトリガーされ、Proofpoint ET Intelligence API からログを取得して GCS に書き込みます。
- GCP Console で、[Cloud Run] に移動します。
- [サービスを作成] をクリックします。
- [関数] を選択します(インライン エディタを使用して関数を作成します)。
[構成] セクションで、次の構成の詳細を指定します。
設定 値 サービス名 et-pro-ioc-fetcherリージョン GCS バケットと一致するリージョンを選択します(例: us-central1)。ランタイム [Python 3.12] 以降を選択します。 [トリガー(省略可)] セクションで、次の操作を行います。
- [+ トリガーを追加] をクリックします。
- [Cloud Pub/Sub] を選択します。
- [Cloud Pub/Sub トピックを選択してください] で、[
et-pro-ioc-trigger] を選択します。 - [保存] をクリックします。
[認証] セクションで、次の操作を行います。
- [認証が必要] を選択します。
- Identity and Access Management(IAM)を確認します。
下にスクロールして、[コンテナ、ネットワーキング、セキュリティ] を開きます。
[セキュリティ] タブに移動します。
- サービス アカウント:
et-pro-ioc-fetcher-saを選択します。
- サービス アカウント:
[コンテナ] タブに移動します。
- [変数とシークレット] をクリックします。
- 環境変数ごとに [+ 変数を追加] をクリックします。
変数名 値の例 説明 GCS_BUCKETet-pro-ioc-bucketGCS バケット名 GCS_PREFIXet-pro-iocログファイルの接頭辞 STATE_KEYet-pro-ioc/state.json状態ファイルのパス ET_API_KEYyour-et-api-keyET Intelligence API キー ET_IP_LIST_URLhttps://rules.emergingthreatspro.com/[your-code]/reputation/detailed-iprepdata.txt詳細な IP レピュテーション リストの URL ET_DOMAIN_LIST_URLhttps://rules.emergingthreatspro.com/[your-code]/reputation/detailed-domainrepdata.txt詳細なドメインの評判リストの URL TIMEOUT120HTTP リクエスト タイムアウト(秒) [変数とシークレット] タブで [リクエスト] まで下にスクロールします。
- リクエストのタイムアウト:
600秒(10 分)を入力します。
- リクエストのタイムアウト:
[コンテナ] の [設定] タブに移動します。
- [リソース] セクションで次の操作を行います。
- メモリ: 512 MiB 以上を選択します。
- CPU: [1] を選択します。
- [完了] をクリックします。
- [リソース] セクションで次の操作を行います。
[実行環境] までスクロールします。
- [デフォルト](推奨)を選択します。
[リビジョン スケーリング] セクションで、次の操作を行います。
- [インスタンスの最小数] に「
0」と入力します。 - インスタンスの最大数:
100と入力します(または、予想される負荷に基づいて調整します)。
- [インスタンスの最小数] に「
[作成] をクリックします。
サービスが作成されるまで待ちます(1 ~ 2 分)。
サービスを作成すると、インライン コードエディタが自動的に開きます。
関数コードを追加する
- [関数のエントリ ポイント] に「main」と入力します。
インライン コードエディタで、次の 2 つのファイルを作成します。
- 最初のファイル: main.py:
import functions_framework from google.cloud import storage import json import os import urllib3 from datetime import datetime, timezone import time # Initialize HTTP client with timeouts http = urllib3.PoolManager( timeout=urllib3.Timeout(connect=5.0, read=30.0), retries=False, ) # Initialize Storage client storage_client = storage.Client() @functions_framework.cloud_event def main(cloud_event): """ Cloud Run function triggered by Pub/Sub to fetch ET Pro IOC reputation lists and write to GCS. Args: cloud_event: CloudEvent object containing Pub/Sub message """ # Get environment variables bucket_name = os.environ.get('GCS_BUCKET') prefix = os.environ.get('GCS_PREFIX', 'et-pro-ioc').strip('/') state_key = os.environ.get('STATE_KEY', f'{prefix}/state.json') et_api_key = os.environ.get('ET_API_KEY') et_ip_list_url = os.environ.get('ET_IP_LIST_URL') et_domain_list_url = os.environ.get('ET_DOMAIN_LIST_URL') timeout = int(os.environ.get('TIMEOUT', '120')) if not all([bucket_name, et_api_key, et_ip_list_url, et_domain_list_url]): print('Error: Missing required environment variables') return try: # Get GCS bucket bucket = storage_client.bucket(bucket_name) # Generate timestamp for file naming now = datetime.now(timezone.utc) timestamp = now.strftime('%Y/%m/%d/%H%M%S') results = [] errors = [] # Fetch IP reputation list try: print('Fetching IP reputation list...') ip_data = fetch_with_retry(et_ip_list_url, et_api_key, timeout) ip_key = f'{prefix}/ip/{timestamp}.csv' save_to_gcs(bucket, ip_key, ip_data) results.append({'type': 'ip', 'key': ip_key, 'size': len(ip_data)}) print(f'Successfully fetched IP list: {len(ip_data)} bytes') except Exception as e: error_msg = f'Failed to fetch IP list: {str(e)}' print(error_msg) errors.append(error_msg) # Fetch Domain reputation list try: print('Fetching Domain reputation list...') domain_data = fetch_with_retry(et_domain_list_url, et_api_key, timeout) domain_key = f'{prefix}/domain/{timestamp}.csv' save_to_gcs(bucket, domain_key, domain_data) results.append({'type': 'domain', 'key': domain_key, 'size': len(domain_data)}) print(f'Successfully fetched Domain list: {len(domain_data)} bytes') except Exception as e: error_msg = f'Failed to fetch Domain list: {str(e)}' print(error_msg) errors.append(error_msg) # Save state state = { 'last_fetch': now.isoformat(), 'results': results, 'errors': errors } save_state(bucket, state_key, state) if errors: print(f'Completed with {len(errors)} error(s)') else: print('Successfully completed all fetches') except Exception as e: print(f'Error processing logs: {str(e)}') raise def fetch_with_retry(url, api_key, timeout, max_retries=3): """Fetch URL with retry logic for rate limits.""" if not url.lower().startswith('https://'): raise ValueError('Only HTTPS URLs are allowed') headers = {'Authorization': api_key} for attempt in range(max_retries): try: response = http.request('GET', url, headers=headers, timeout=timeout) if response.status == 200: return response.data elif response.status == 429: # Rate limited, wait and retry wait_time = min(30 * (2 ** attempt), 300) print(f'Rate limited, waiting {wait_time}s...') time.sleep(wait_time) else: raise Exception(f'HTTP {response.status}: {response.reason}') except Exception as e: if attempt == max_retries - 1: raise time.sleep(5 * (attempt + 1)) raise Exception(f'Failed to fetch {url} after {max_retries} attempts') def save_to_gcs(bucket, key, content): """Save content to GCS with appropriate content type.""" blob = bucket.blob(key) blob.upload_from_string(content, content_type='text/csv') print(f'Saved {len(content)} bytes to gs://{bucket.name}/{key}') def save_state(bucket, key, state): """Save state to GCS.""" try: blob = bucket.blob(key) blob.upload_from_string( json.dumps(state, indent=2), content_type='application/json' ) except Exception as e: print(f'Warning: Could not save state: {str(e)}')- 2 つ目のファイル: requirements.txt:
functions-framework==3.* google-cloud-storage==2.* urllib3>=2.0.0[デプロイ] をクリックして、関数を保存してデプロイします。
デプロイが完了するまで待ちます(2 ~ 3 分)。
Cloud Scheduler ジョブの作成
Cloud Scheduler は、定期的に Pub/Sub トピックにメッセージをパブリッシュし、Cloud Run functions の関数をトリガーします。
- GCP Console で、[Cloud Scheduler] に移動します。
- [ジョブを作成] をクリックします。
次の構成情報を提供してください。
設定 値 名前 et-pro-ioc-fetcher-hourlyリージョン Cloud Run functions と同じリージョンを選択する 周波数 0 * * * *(1 時間ごとに正時)タイムゾーン タイムゾーンを選択します(UTC を推奨)。 ターゲット タイプ Pub/Sub トピック et-pro-ioc-triggerを選択メッセージ本文 {}(空の JSON オブジェクト)[作成] をクリックします。
スケジュールの頻度のオプション
ログの量とレイテンシの要件に基づいて頻度を選択します。
頻度 CRON 式 ユースケース 1 時間ごと 0 * * * *標準(ET Pro IOC に推奨) 2 時間ごと 0 */2 * * *ピッチを下げる 6 時間ごと 0 */6 * * *最小限の更新
統合をテストする
- Cloud Scheduler コンソールで、ジョブを見つけます。
- [強制実行] をクリックして、ジョブを手動でトリガーします。
- 数秒待ちます。
- Cloud Run > サービスに移動します。
- 関数名(
et-pro-ioc-fetcher)をクリックします。 - [Logs] タブをクリックします。
関数が正常に実行されたことを確認します。次の内容を確認します。
Fetching IP reputation list... Successfully fetched IP list: X bytes Fetching Domain reputation list... Successfully fetched Domain list: X bytes Successfully completed all fetches[Cloud Storage] > [バケット] に移動します。
バケット名をクリックします。
プレフィックス フォルダ(
et-pro-ioc/ip/とet-pro-ioc/domain/)に移動します。新しい
.csvファイルが現在のタイムスタンプで作成されたことを確認します。
ログにエラーが表示された場合:
- HTTP 401: 環境変数で ET_API_KEY を確認する
- HTTP 403: API キーに必要な権限があることを確認する
- HTTP 429: レート制限 - 関数はバックオフで自動的に再試行されます
- 環境変数が不足している: 必要な変数がすべて設定されていることを確認します
Google SecOps サービス アカウントを取得する
Google SecOps は、一意のサービス アカウントを使用して GCS バケットからデータを読み取ります。このサービス アカウントにバケットへのアクセス権を付与する必要があります。
サービス アカウントのメールアドレスを取得する
- [SIEM 設定] > [フィード] に移動します。
- [Add New Feed] をクリックします。
- [単一フィードを設定] をクリックします。
- [フィード名] フィールドに、フィードの名前を入力します(例:
ET Pro IOC - IP Reputation)。 - [ソースタイプ] として [Google Cloud Storage V2] を選択します。
- [ログタイプ] として [Emerging Threats Pro] を選択します。
[サービス アカウントを取得する] をクリックします。一意のサービス アカウント メールアドレスが表示されます(例:)。
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.comこのメールアドレスをコピーして、次のステップで使用します。
Google SecOps サービス アカウントに IAM 権限を付与する
Google SecOps サービス アカウントには、GCS バケットに対する Storage オブジェクト閲覧者ロールが必要です。
- [Cloud Storage] > [バケット] に移動します。
- バケット名をクリックします。
- [権限] タブに移動します。
- [アクセス権を付与] をクリックします。
- 次の構成の詳細を指定します。
- プリンシパルを追加: Google SecOps サービス アカウントのメールアドレスを貼り付けます。
- ロールを割り当てる: [ストレージ オブジェクト閲覧者] を選択します。
[保存] をクリックします。
Proofpoint Emerging Threats Pro IOC ログを取り込むように Google SecOps でフィードを構成する
IP レピュテーション用とドメイン レピュテーション用の 2 つのフィードを個別に作成する必要があります。
IP レピュテーション フィードを作成する
- [SIEM 設定] > [フィード] に移動します。
- [Add New Feed] をクリックします。
- [単一フィードを設定] をクリックします。
- [フィード名] フィールドに「
ET Pro IOC - IP Reputation」と入力します。 - [ソースタイプ] として [Google Cloud Storage V2] を選択します。
- [ログタイプ] として [Emerging Threats Pro] を選択します。
- [次へ] をクリックします。
次の入力パラメータの値を指定します。
ストレージ バケットの URL: 接頭辞パスを含む GCS バケット URI を入力します。
gs://et-pro-ioc-bucket/et-pro-ioc/ip/et-pro-ioc-bucketは、実際の GCS バケット名に置き換えます。
Source deletion option: 必要に応じて削除オプションを選択します。
- なし: 転送後にファイルを削除しません(テストにおすすめ)。
- 転送されたファイルを削除する: 転送が完了した後にファイルを削除します。
転送されたファイルと空のディレクトリを削除する: 転送が完了した後にファイルと空のディレクトリを削除します。
ファイルの最大経過日数: 指定した日数以内に変更されたファイルを含めます。デフォルトは 180 日です。
アセットの名前空間: アセットの名前空間。
Ingestion labels: このフィードのイベントに適用されるラベル。
[次へ] をクリックします。
[Finalize] 画面で新しいフィードの設定を確認し、[送信] をクリックします。
ドメイン レピュテーション フィードを作成する
フィードの作成プロセスを繰り返します。
- [SIEM 設定] > [フィード] に移動します。
- [Add New Feed] をクリックします。
- [単一フィードを設定] をクリックします。
- [フィード名] フィールドに「
ET Pro IOC - Domain Reputation」と入力します。 - [ソースタイプ] として [Google Cloud Storage V2] を選択します。
- [ログタイプ] として [Emerging Threats Pro] を選択します。
- [次へ] をクリックします。
次の入力パラメータの値を指定します。
- ストレージ バケットの URL: 接頭辞パスを含む GCS バケット URI を入力します。
gs://et-pro-ioc-bucket/et-pro-ioc/domain/et-pro-ioc-bucketは、実際の GCS バケット名に置き換えます。- Source deletion option: 必要に応じて選択します。
- ファイルの最大経過日数: 指定した日数以内に変更されたファイルを含めます。デフォルトは 180 日です。
- アセットの名前空間: アセットの名前空間。
- Ingestion labels: このフィードのイベントに適用されるラベル。
[次へ] をクリックします。
[Finalize] 画面で新しいフィードの設定を確認し、[送信] をクリックします。
UDM マッピング テーブル
| ログフィールド | UDM マッピング | 論理 |
|---|---|---|
| category | このフィールドはパーサー ロジックで使用されますが、UDM に直接マッピングされません。 | ルックアップ テーブルを使用して event.ioc.categorization の値を決定します。 |
| collection_time.nanos | event.idm.entity.metadata.collected_timestamp.nanos | 未加工ログから直接マッピングされます。 |
| collection_time.seconds | event.idm.entity.metadata.collected_timestamp.seconds | 未加工ログから直接マッピングされます。 |
| データ | このフィールドは、その内容に基づいて複数の UDM フィールドに解析されます。 | |
| first_seen | event.idm.entity.metadata.interval.start_time | 日付として解析され、UDM にマッピングされます。 |
| first_seen | event.ioc.active_timerange.start | 日付として解析され、UDM にマッピングされます。 |
| ip_or_domain | event.idm.entity.entity.hostname | grok パターンがフィールドからホストを抽出した場合、UDM にマッピングされます。 |
| ip_or_domain | event.idm.entity.entity.ip | grok パターンがフィールドからホストを抽出しない場合、UDM にマッピングされます。 |
| ip_or_domain | event.ioc.domain_and_ports.domain | grok パターンがフィールドからホストを抽出した場合、UDM にマッピングされます。 |
| ip_or_domain | event.ioc.ip_and_ports.ip_address | grok パターンがフィールドからホストを抽出しない場合、UDM にマッピングされます。 |
| last_seen | event.idm.entity.metadata.interval.end_time | 日付として解析され、UDM にマッピングされます。 |
| last_seen | event.ioc.active_timerange.end | 日付として解析され、UDM にマッピングされます。 |
| ports | event.idm.entity.entity.labels.value | 複数のポートがある場合は、解析され、カンマ区切り文字で結合され、UDM にマッピングされます。 |
| ports | event.idm.entity.entity.port | ポートが 1 つしかない場合は、解析されて UDM にマッピングされます。 |
| ports | event.ioc.domain_and_ports.ports | grok パターンがフィールドからホストを抽出した場合、解析されて UDM にマッピングされます。 |
| ports | event.ioc.ip_and_ports.ports | grok パターンがフィールドからホストを抽出しない場合、解析されて UDM にマッピングされます。 |
| スコア | event.ioc.confidence_score | 未加工ログから直接マッピングされます。 |
| event.idm.entity.entity.labels.key | 複数のポートがある場合は「ports」に設定します。 | |
| event.idm.entity.metadata.entity_type | Grok パターンが ip_or_domain フィールドからホストを抽出する場合は「DOMAIN_NAME」に設定し、それ以外の場合は「IP_ADDRESS」に設定します。 | |
| event.idm.entity.metadata.threat.category | 「SOFTWARE_MALICIOUS」に設定します。 | |
| event.idm.entity.metadata.threat.category_details | ルックアップ テーブルを使用して、カテゴリ フィールドから派生します。 | |
| event.idm.entity.metadata.threat.threat_name | 「ET Intelligence Rep List」に設定します。 | |
| event.idm.entity.metadata.vendor_name | 「ET_PRO_IOC」に設定します。 | |
| event.ioc.feed_name | 「ET Intelligence Rep List」に設定します。 | |
| event.ioc.raw_severity | 「悪意のある」に設定します。 | |
| timestamp.nanos | collection_time.nanos からコピーされます。 | |
| timestamp.seconds | collection_time.seconds からコピーされます。 |
ご不明な点がございましたら、コミュニティ メンバーや Google SecOps のプロフェッショナルから回答を得ることができます。