Proofpoint Emerging Threats Pro の IOC ログを収集する
このドキュメントでは、Amazon S3 を使用して Proofpoint Emerging Threats Pro IOC ログを Google Security Operations に取り込む方法について説明します。Emerging Threats Intelligence は、カテゴリ、スコア、時間情報などの脅威インテリジェンス データを含む IP とドメインの評価リストを CSV 形式で 1 時間ごとに公開しています。パーサーコードは、CSV 形式の ET_PRO 脅威インテリジェンス データを処理します。IP アドレス、ドメイン、カテゴリ、スコアなどの関連情報を抽出し、標準化された IOC 形式と Chronicle UDM スキーマの両方にマッピングして、Google SecOps 内でのさらなる分析と使用を可能にします。
始める前に
次の前提条件を満たしていることを確認してください。
- フィードを作成する権限を持つ Google SecOps インスタンス
- レピュテーション リストにアクセスできる Proofpoint ET Intelligence サブスクリプション
- https://etadmin.proofpoint.com/api-access からの ET Intelligence API キー
- AWS(S3、IAM、Lambda、EventBridge)への特権アクセス
Emerging Threats Pro の前提条件を収集する
- https://etadmin.proofpoint.com で ET Intelligence 管理ポータルにログインします。
- [API アクセス] に移動します。
- API キーをコピーして保存する
- Proofpoint の担当者に連絡して、次の情報を入手します。
- IP レピュテーションの詳細リストの URL
- ドメイン レピュテーションの詳細リストの URL
ET Intelligence は、IP とドメインの評判リストを個別の CSV ファイルで提供し、1 時間ごとに更新します。次の列を含む「詳細」形式を使用します。
* ドメイン リスト: Domain Name, Category, Score, First Seen, Last Seen, Ports
* IP リスト: IP Address, Category, Score, First Seen, Last Seen, Ports
AWS S3 バケットと IAM を構成する
S3 バケットを作成する
- Amazon S3 コンソールを開きます。
- [バケットを作成] をクリックします。
- バケット名:
et-pro-ioc-bucket(または任意の名前)を入力します。 - リージョン: 使用するリージョンを選択します。
- [バケットを作成] をクリックします。
Google SecOps 用の IAM ユーザーを作成する
- IAM コンソールを開きます。
- [ユーザー] > [ユーザーを作成] をクリックします。
- ユーザー名: 「
secops-reader」と入力します。 - [次へ] をクリックします。
- [ポリシーを直接アタッチする] を選択します。
- [ポリシーを作成] をクリックします。
JSON エディタで、次のポリシーを入力します。
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::et-pro-ioc-bucket/et-pro-ioc/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::et-pro-ioc-bucket", "Condition": { "StringLike": { "s3:prefix": ["et-pro-ioc/*"] } } } ] }ポリシーに
SecOpsReaderPolicyという名前を付けます。[ポリシーを作成] をクリックします。
ユーザーの作成に戻り、新しく作成したポリシーを選択します。
[次へ> ユーザーを作成] をクリックします。
[セキュリティ認証情報] タブに移動します。
[アクセスキーを作成] をクリックします。
[サードパーティ サービス] を選択します。
[アクセスキーを作成] をクリックします。
認証情報をダウンロードして保存します。
Lambda の IAM ロールを構成する
- AWS コンソールで、[IAM] > [ロール] > [ロールの作成] に移動します。
- [AWS サービス> Lambda] を選択します。
- [次へ] をクリックします。
- [ポリシーを作成] をクリックします。
[JSON] タブを選択し、次の情報を入力します。
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::et-pro-ioc-bucket/et-pro-ioc/*" }, { "Sid": "AllowStateManagement", "Effect": "Allow", "Action": ["s3:GetObject", "s3:PutObject"], "Resource": "arn:aws:s3:::et-pro-ioc-bucket/et-pro-ioc/state.json" } ] }ポリシーに
EtProIocLambdaPolicyという名前を付けます。[ポリシーを作成] をクリックします。
ロールの作成に戻り、ポリシーを関連付けます。
ロールに
EtProIocLambdaRoleという名前を付けます。[ロールを作成] をクリックします。
Lambda 関数を作成する
- AWS コンソールで、[Lambda] > [Functions] > [Create function] に移動します。
- [Author from scratch] をクリックします。
次の構成情報を提供してください。
- 関数名:
et-pro-ioc-fetcher - ランタイム: Python 3.13
- アーキテクチャ: x86_64
- 実行ロール: 既存のロール
EtProIocLambdaRoleを使用する
- 関数名:
作成後、[コード] タブに移動して、次のコードに置き換えます。
#!/usr/bin/env python3 # Lambda: Fetch ET Pro IOC reputation lists and write raw CSV to S3 import os import time import json from datetime import datetime from urllib.request import Request, urlopen from urllib.error import HTTPError, URLError import boto3 # Environment variables BUCKET = os.environ["S3_BUCKET"] PREFIX = os.environ.get("S3_PREFIX", "et-pro-ioc/").strip("/") ET_API_KEY = os.environ["ET_API_KEY"] ET_IP_LIST_URL = os.environ["ET_IP_LIST_URL"] ET_DOMAIN_LIST_URL = os.environ["ET_DOMAIN_LIST_URL"] STATE_KEY = os.environ.get("STATE_KEY", f"{PREFIX}/state.json") TIMEOUT = int(os.environ.get("TIMEOUT", "120")) s3 = boto3.client("s3") def _build_request(url: str) -> Request: """Build request with ET API authentication""" if not url.lower().startswith("https://"): raise ValueError("Only HTTPS URLs are allowed") req = Request(url, method="GET") # ET Intelligence uses Authorization header with API key req.add_header("Authorization", ET_API_KEY) return req def fetch_with_retry(url: str, max_retries: int = 3) -> bytes: """Fetch URL with retry logic for rate limits""" for attempt in range(max_retries): try: req = _build_request(url) with urlopen(req, timeout=TIMEOUT) as response: if response.status == 200: return response.read() elif response.status == 429: # Rate limited, wait and retry wait_time = min(30 * (2 ** attempt), 300) print(f"Rate limited, waiting {wait_time}s...") time.sleep(wait_time) else: raise HTTPError(url, response.status, response.reason, {}, None) except URLError as e: if attempt == max_retries - 1: raise time.sleep(5 * (attempt + 1)) raise Exception(f"Failed to fetch {url} after {max_retries} attempts") def save_to_s3(key: str, content: bytes): """Save content to S3 with appropriate content type""" s3.put_object( Bucket=BUCKET, Key=key, Body=content, ContentType="text/csv" ) print(f"Saved {len(content)} bytes to s3://{BUCKET}/{key}") def get_state(): """Get last fetch state from S3""" try: response = s3.get_object(Bucket=BUCKET, Key=STATE_KEY) return json.loads(response['Body'].read()) except: return {} def save_state(state: dict): """Save fetch state to S3""" s3.put_object( Bucket=BUCKET, Key=STATE_KEY, Body=json.dumps(state, indent=2), ContentType="application/json" ) def lambda_handler(event, context): """Main Lambda handler""" print("Starting ET Pro IOC fetch") # Generate timestamp for file naming now = datetime.utcnow() timestamp = now.strftime("%Y/%m/%d/%H%M%S") results = [] errors = [] # Fetch IP reputation list try: print(f"Fetching IP reputation list...") ip_data = fetch_with_retry(ET_IP_LIST_URL) ip_key = f"{PREFIX}/ip/{timestamp}.csv" save_to_s3(ip_key, ip_data) results.append({"type": "ip", "key": ip_key, "size": len(ip_data)}) except Exception as e: error_msg = f"Failed to fetch IP list: {str(e)}" print(error_msg) errors.append(error_msg) # Fetch Domain reputation list try: print(f"Fetching Domain reputation list...") domain_data = fetch_with_retry(ET_DOMAIN_LIST_URL) domain_key = f"{PREFIX}/domain/{timestamp}.csv" save_to_s3(domain_key, domain_data) results.append({"type": "domain", "key": domain_key, "size": len(domain_data)}) except Exception as e: error_msg = f"Failed to fetch Domain list: {str(e)}" print(error_msg) errors.append(error_msg) # Save state state = { "last_fetch": now.isoformat(), "results": results, "errors": errors } save_state(state) return { "statusCode": 200 if not errors else 207, "body": json.dumps(state) }[構成] > [全般設定] に移動します。
[編集] をクリックします。
[タイムアウト] を [5 分] に設定します。
[保存] をクリックします。
環境変数を構成する
- [構成] > [環境変数] に移動します。
- [編集>環境変数を追加] をクリックします。
次の変数を追加します。
キー 値 S3_BUCKETet-pro-ioc-bucketS3_PREFIXet-pro-iocSTATE_KEYet-pro-ioc/state.jsonET_API_KEY[Your ET API Key]ET_IP_LIST_URL[Your detailed IP list URL]ET_DOMAIN_LIST_URL[Your detailed Domain list URL]TIMEOUT120[保存] をクリックします。
サブスクリプションの正確な URL については、Proofpoint の担当者にお問い合わせください。詳細形式の URL は通常、次のパターンに従います。
* IP リスト: https://rules.emergingthreatspro.com/[your-code]/reputation/detailed-iprepdata.txt
* ドメイン リスト: https://rules.emergingthreatspro.com/[your-code]/reputation/detailed-domainrepdata.txt
EventBridge スケジュールを作成する
- [Amazon EventBridge] > [スケジュール] > [スケジュールの作成] に移動します。
- スケジュール名:
et-pro-ioc-hourly - スケジュール パターン: レートベースのスケジュール
- レート式: 1 時間
- [次へ] をクリックします。
- ターゲット: Lambda 関数
- 関数:
et-pro-ioc-fetcher - 残りのステップで [次へ] をクリックします。
- [スケジュールを作成] をクリックします。
Google SecOps でフィードを構成する
IP レピュテーション用とドメイン レピュテーション用の 2 つのフィードを個別に作成する必要があります。
IP レピュテーション フィードを作成する
- [SIEM 設定] > [フィード] に移動します。
- [新規追加] をクリックします。
- [フィード名] フィールドに「
ET Pro IOC - IP Reputation」と入力します。 - [ソースタイプ] リストで、[Amazon S3] を選択します。
- [ログタイプ] として [Emerging Threats Pro] を選択します。
- [次へ] をクリックします。
- 次の入力パラメータの値を指定します。
- S3 URI:
s3://et-pro-ioc-bucket/et-pro-ioc/ip/ - Source deletion options: 必要に応じて選択します
- ファイルの最大経過日数: 指定した日数以内に変更されたファイルを含めます。デフォルトは 180 日です。
- アクセスキー ID: SecOps 読み取り専用アクセスキー
- シークレット アクセスキー: SecOps 読み取り専用シークレット キー
- アセットの名前空間: アセットの名前空間。
- Ingestion labels: このフィードのイベントに適用されるラベル。
- S3 URI:
- [次へ] をクリックします。
- 確認して [送信] をクリックします。
ドメイン レピュテーション フィードを作成する
- フィードの作成プロセスを繰り返します。
- [フィード名] フィールドに「
ET Pro IOC - Domain Reputation」と入力します。 - [ソースタイプ] リストで、[Amazon S3] を選択します。
- [ログタイプ] として [Emerging Threats Pro] を選択します。
- [次へ] をクリックします。
- 次の入力パラメータの値を指定します。
- S3 URI:
s3://et-pro-ioc-bucket/et-pro-ioc/domain/ - Source deletion options: 必要に応じて選択します
- ファイルの最大経過日数: 指定した日数以内に変更されたファイルを含めます。デフォルトは 180 日です。
- アクセスキー ID: SecOps 読み取り専用アクセスキー
- シークレット アクセスキー: SecOps 読み取り専用シークレット キー
- アセットの名前空間: アセットの名前空間。
- Ingestion labels: このフィードのイベントに適用されるラベル。
- S3 URI:
- [次へ] をクリックします。
- 確認して [送信] をクリックします。
UDM マッピング テーブル
| ログフィールド | UDM マッピング | ロジック |
|---|---|---|
| category | このフィールドはパーサー ロジックで使用されますが、UDM に直接マッピングされません。ルックアップ テーブルを使用して event.ioc.categorization の値を決定します。 |
|
| collection_time.nanos | event.idm.entity.metadata.collected_timestamp.nanos | 未加工ログから直接マッピングされます。 |
| collection_time.seconds | event.idm.entity.metadata.collected_timestamp.seconds | 未加工ログから直接マッピングされます。 |
| データ | このフィールドは、その内容に基づいて複数の UDM フィールドに解析されます。 | |
| first_seen | event.idm.entity.metadata.interval.start_time | 日付として解析され、UDM にマッピングされます。 |
| first_seen | event.ioc.active_timerange.start | 日付として解析され、UDM にマッピングされます。 |
| ip_or_domain | event.idm.entity.entity.hostname | grok パターンがフィールドからホストを抽出した場合、UDM にマッピングされます。 |
| ip_or_domain | event.idm.entity.entity.ip | Grok パターンでフィールドからホストが抽出されない場合、UDM にマッピングされます。 |
| ip_or_domain | event.ioc.domain_and_ports.domain | grok パターンがフィールドからホストを抽出した場合、UDM にマッピングされます。 |
| ip_or_domain | event.ioc.ip_and_ports.ip_address | Grok パターンでフィールドからホストが抽出されない場合、UDM にマッピングされます。 |
| last_seen | event.idm.entity.metadata.interval.end_time | 日付として解析され、UDM にマッピングされます。 |
| last_seen | event.ioc.active_timerange.end | 日付として解析され、UDM にマッピングされます。 |
| ports | event.idm.entity.entity.labels.value | 複数のポートがある場合は、解析され、カンマ区切り文字で結合され、UDM にマッピングされます。 |
| ports | event.idm.entity.entity.port | ポートが 1 つしかない場合は、解析されて UDM にマッピングされます。 |
| ports | event.ioc.domain_and_ports.ports | grok パターンがフィールドからホストを抽出した場合、解析されて UDM にマッピングされます。 |
| ports | event.ioc.ip_and_ports.ports | grok パターンがフィールドからホストを抽出しない場合、解析されて UDM にマッピングされます。 |
| スコア | event.ioc.confidence_score | 未加工ログから直接マッピングされます。 |
| event.idm.entity.entity.labels.key | 複数のポートがある場合は「ports」に設定します。 | |
| event.idm.entity.metadata.entity_type | Grok パターンが ip_or_domain フィールドからホストを抽出する場合は「DOMAIN_NAME」に設定し、それ以外の場合は「IP_ADDRESS」に設定します。 |
|
| event.idm.entity.metadata.threat.category | 「SOFTWARE_MALICIOUS」に設定します。 | |
| event.idm.entity.metadata.threat.category_details | ルックアップ テーブルを使用して category フィールドから派生します。 |
|
| event.idm.entity.metadata.threat.threat_name | 「ET Intelligence Rep List」に設定します。 | |
| event.idm.entity.metadata.vendor_name | 「ET_PRO_IOC」に設定します。 | |
| event.ioc.feed_name | 「ET Intelligence Rep List」に設定します。 | |
| event.ioc.raw_severity | 「悪意のある」に設定します。 | |
| timestamp.nanos | collection_time.nanos からコピーされます。 |
|
| timestamp.seconds | collection_time.seconds からコピーされます。 |
さらにサポートが必要な場合 コミュニティ メンバーや Google SecOps のプロフェッショナルから回答を得ることができます。