BeyondTrust Endpoint Privilege Management(EPM)のログを収集する
このドキュメントでは、Google Cloud Storage を使用して BeyondTrust Endpoint Privilege Management(EPM)ログを Google Security Operations に取り込む方法について説明します。このパーサーは、BeyondTrust Endpoint からの未加工の JSON ログデータを Chronicle UDM に準拠した構造化形式に変換することに重点を置いています。まず、さまざまなフィールドのデフォルト値を初期化し、JSON ペイロードを解析します。その後、未加工ログの特定のフィールドを event.idm.read_only_udm オブジェクト内の対応する UDM フィールドにマッピングします。
始める前に
次の前提条件を満たしていることを確認します。
- Google SecOps インスタンス
- Cloud Storage API が有効になっている GCP プロジェクト
- GCS バケットを作成および管理する権限
- GCS バケットの IAM ポリシーを管理する権限
- Cloud Run サービス、Pub/Sub トピック、Cloud Scheduler ジョブを作成する権限
- BeyondTrust Endpoint Privilege Management テナントまたは API への特権アクセス
Google Cloud Storage バケットを作成する
- Google Cloud Console に移動します。
- プロジェクトを選択するか、新しいプロジェクトを作成します。
- ナビゲーション メニューで、[Cloud Storage > バケット] に移動します。
- [バケットを作成] をクリックします。
次の構成情報を提供してください。
設定 値 バケットに名前を付ける グローバルに一意の名前( beyondtrust-epm-logsなど)を入力します。ロケーション タイプ ニーズに基づいて選択します(リージョン、デュアルリージョン、マルチリージョン)。 ロケーション ロケーションを選択します(例: us-central1)。ストレージ クラス Standard(頻繁にアクセスされるログにおすすめ) アクセス制御 均一(推奨) 保護ツール 省略可: オブジェクトのバージョニングまたは保持ポリシーを有効にする [作成] をクリックします。
BeyondTrust EPM API 認証情報を収集する
- 管理者として BeyondTrust Privilege Management ウェブ コンソールにログインします。
- [構成> 設定> API 設定] に移動します。
- [Create an API Account] をクリックします。
- 次の構成の詳細を指定します。
- 名前: 「
Google SecOps Collector」と入力します。 - API アクセス: 必要に応じて、監査(読み取り)などのスコープを有効にします。
- 名前: 「
- クライアント ID とクライアント シークレットをコピーして保存します。
- API ベース URL をコピーします。通常は
https://<your-tenant>-services.pm.beyondtrustcloud.comです(これは BPT_API_URL として使用します)。
Cloud Run functions のサービス アカウントを作成する
Cloud Run 関数には、GCS バケットに書き込み、Pub/Sub によって呼び出される権限を持つサービス アカウントが必要です。
サービス アカウントの作成
- GCP Console で、[IAM と管理>サービス アカウント] に移動します。
- [サービス アカウントを作成] をクリックします。
- 次の構成の詳細を指定します。
- サービス アカウント名: 「
beyondtrust-epm-collector-sa」と入力します。 - サービス アカウントの説明: 「
Service account for Cloud Run function to collect BeyondTrust EPM logs」と入力します。
- サービス アカウント名: 「
- [作成して続行] をクリックします。
- [このサービス アカウントにプロジェクトへのアクセスを許可する] セクションで、次のロールを追加します。
- [ロールを選択] をクリックします。
- [ストレージ オブジェクト管理者] を検索して選択します。
- [+ 別のロールを追加] をクリックします。
- [Cloud Run 起動元] を検索して選択します。
- [+ 別のロールを追加] をクリックします。
- [Cloud Functions 起動元] を検索して選択します。
- [続行] をクリックします。
- [完了] をクリックします。
これらのロールは、次の目的で必要です。
- Storage オブジェクト管理者: ログを GCS バケットに書き込み、状態ファイルを管理する
- Cloud Run 起動元: Pub/Sub が関数を呼び出すことを許可する
- Cloud Functions 起動元: 関数の呼び出しを許可する
GCS バケットに対する IAM 権限を付与する
GCS バケットに対する書き込み権限をサービス アカウントに付与します。
- [Cloud Storage] > [バケット] に移動します。
- バケット名をクリックします。
- [権限] タブに移動します。
- [アクセス権を付与] をクリックします。
- 次の構成の詳細を指定します。
- プリンシパルを追加: サービス アカウントのメールアドレス(例:
beyondtrust-epm-collector-sa@PROJECT_ID.iam.gserviceaccount.com)を入力します。 - ロールを割り当てる: [Storage オブジェクト管理者] を選択します。
- プリンシパルを追加: サービス アカウントのメールアドレス(例:
- [保存] をクリックします。
Pub/Sub トピックの作成
Cloud Scheduler がパブリッシュし、Cloud Run functions がサブスクライブする Pub/Sub トピックを作成します。
- GCP Console で、[Pub/Sub> トピック] に移動します。
- [トピックを作成] をクリックします。
- 次の構成の詳細を指定します。
- トピック ID: 「
beyondtrust-epm-trigger」と入力します。 - その他の設定はデフォルトのままにします。
- トピック ID: 「
- [作成] をクリックします。
ログを収集する Cloud Run 関数を作成する
Cloud Run 関数は、Cloud Scheduler からの Pub/Sub メッセージによってトリガーされ、BeyondTrust EPM API からログを取得して GCS に書き込みます。
- GCP Console で、[Cloud Run] に移動します。
- [サービスを作成] をクリックします。
- [関数] を選択します(インライン エディタを使用して関数を作成します)。
[構成] セクションで、次の構成の詳細を指定します。
設定 値 サービス名 beyondtrust-epm-collectorリージョン GCS バケットと一致するリージョンを選択します(例: us-central1)。ランタイム [Python 3.12] 以降を選択します。 [トリガー(省略可)] セクションで、次の操作を行います。
- [+ トリガーを追加] をクリックします。
- [Cloud Pub/Sub] を選択します。
- [Cloud Pub/Sub トピックを選択してください] で、トピック
beyondtrust-epm-triggerを選択します。 - [保存] をクリックします。
[認証] セクションで、次の操作を行います。
- [認証が必要] を選択します。
- Identity and Access Management(IAM)を確認します。
下にスクロールして、[コンテナ、ネットワーキング、セキュリティ] を開きます。
[セキュリティ] タブに移動します。
- サービス アカウント: サービス アカウント
beyondtrust-epm-collector-saを選択します。
- サービス アカウント: サービス アカウント
[コンテナ] タブに移動します。
- [変数とシークレット] をクリックします。
- 環境変数ごとに [+ 変数を追加] をクリックします。
変数名 値の例 GCS_BUCKETbeyondtrust-epm-logsGCS_PREFIXbeyondtrust-epm/STATE_KEYbeyondtrust-epm/state.jsonBPT_API_URLhttps://yourtenant-services.pm.beyondtrustcloud.comCLIENT_IDyour-client-idCLIENT_SECRETyour-client-secretOAUTH_SCOPEmanagement-apiRECORD_SIZE1000MAX_ITERATIONS10[変数とシークレット] タブで [リクエスト] まで下にスクロールします。
- リクエストのタイムアウト:
600秒(10 分)を入力します。
- リクエストのタイムアウト:
[コンテナ] の [設定] タブに移動します。
- [リソース] セクションで次の操作を行います。
- メモリ: 512 MiB 以上を選択します。
- CPU: [1] を選択します。
- [完了] をクリックします。
- [リソース] セクションで次の操作を行います。
[実行環境] までスクロールします。
- [デフォルト](推奨)を選択します。
[リビジョン スケーリング] セクションで、次の操作を行います。
- [インスタンスの最小数] に「
0」と入力します。 - インスタンスの最大数:
100と入力します(または、予想される負荷に基づいて調整します)。
- [インスタンスの最小数] に「
[作成] をクリックします。
サービスが作成されるまで待ちます(1 ~ 2 分)。
サービスを作成すると、インライン コードエディタが自動的に開きます。
関数コードを追加する
- [関数のエントリ ポイント] に「main」と入力します。
インライン コードエディタで、次の 2 つのファイルを作成します。
- 最初のファイル: main.py:
import functions_framework from google.cloud import storage import json import os import urllib3 from datetime import datetime, timezone, timedelta import time # Initialize HTTP client http = urllib3.PoolManager() # Initialize Storage client storage_client = storage.Client() @functions_framework.cloud_event def main(cloud_event): """ Cloud Run function triggered by Pub/Sub to fetch logs from BeyondTrust EPM API and write to GCS. Args: cloud_event: CloudEvent object containing Pub/Sub message """ # Get environment variables bucket_name = os.environ.get('GCS_BUCKET') prefix = os.environ.get('GCS_PREFIX', 'beyondtrust-epm/') state_key = os.environ.get('STATE_KEY', 'beyondtrust-epm/state.json') # BeyondTrust EPM API credentials api_url = os.environ.get('BPT_API_URL') client_id = os.environ.get('CLIENT_ID') client_secret = os.environ.get('CLIENT_SECRET') oauth_scope = os.environ.get('OAUTH_SCOPE', 'management-api') record_size = int(os.environ.get('RECORD_SIZE', '1000')) max_iterations = int(os.environ.get('MAX_ITERATIONS', '10')) if not all([bucket_name, api_url, client_id, client_secret]): print('Error: Missing required environment variables') return try: # Get GCS bucket bucket = storage_client.bucket(bucket_name) # Load state (last processed timestamp) state = load_state(bucket, state_key) last_timestamp = state.get('last_timestamp', (datetime.utcnow() - timedelta(hours=24)).strftime("%Y-%m-%dT%H:%M:%SZ")) print(f'Processing logs since {last_timestamp}') # Get OAuth access token token = get_oauth_token(api_url, client_id, client_secret, oauth_scope) # Fetch audit events events = fetch_audit_events(api_url, token, last_timestamp, record_size, max_iterations) if events: # Store events in GCS current_timestamp = datetime.utcnow() filename = f"{prefix}beyondtrust-epm-events-{current_timestamp.strftime('%Y%m%d_%H%M%S')}.json" store_events_to_gcs(bucket, filename, events) # Update state with latest timestamp latest_timestamp = get_latest_event_timestamp(events) save_state(bucket, state_key, {'last_timestamp': latest_timestamp, 'updated_at': datetime.utcnow().isoformat() + 'Z'}) print(f'Successfully processed {len(events)} events and stored to {filename}') else: print('No new events found') except Exception as e: print(f'Error processing logs: {str(e)}') raise def get_oauth_token(api_url, client_id, client_secret, scope): """ Get OAuth access token using client credentials flow for BeyondTrust EPM. Uses the correct endpoint: /oauth/token """ token_url = f"{api_url}/oauth/token" headers = {'Content-Type': 'application/x-www-form-urlencoded'} body = f"grant_type=client_credentials&client_id={client_id}&client_secret={client_secret}&scope={scope}" response = http.request('POST', token_url, headers=headers, body=body, timeout=urllib3.Timeout(60.0)) if response.status != 200: raise RuntimeError(f"Token request failed: {response.status} {response.data[:256]!r}") token_data = json.loads(response.data.decode('utf-8')) return token_data['access_token'] def fetch_audit_events(api_url, access_token, last_timestamp, record_size, max_iterations): """ Fetch audit events using the BeyondTrust EPM API endpoint: /management-api/v2/AuditEvents with query parameters for filtering and pagination """ headers = { 'Authorization': f'Bearer {access_token}', 'Content-Type': 'application/json' } all_events = [] current_start_date = last_timestamp iterations = 0 # Enforce maximum RecordSize limit of 1000 based on BeyondTrust documentation record_size_limited = min(record_size, 1000) while iterations < max_iterations: iterations += 1 if len(all_events) >= 10000: print(f"Reached maximum events limit (10000)") break # Use the BeyondTrust EPM API endpoint for audit events query_url = f"{api_url}/management-api/v2/AuditEvents" params = { 'StartDate': current_start_date, 'RecordSize': record_size_limited } # Construct URL with query parameters query_string = '&'.join([f"{k}={v}" for k, v in params.items()]) full_url = f"{query_url}?{query_string}" try: response = http.request('GET', full_url, headers=headers, timeout=urllib3.Timeout(300.0)) # Handle rate limiting with exponential backoff if response.status == 429: retry_after = int(response.headers.get('Retry-After', '30')) print(f"Rate limited (429). Retrying after {retry_after}s...") time.sleep(retry_after) continue if response.status != 200: raise RuntimeError(f"API request failed: {response.status} {response.data[:256]!r}") response_data = json.loads(response.data.decode('utf-8')) events = response_data.get('events', []) if not events: break print(f"Page {iterations}: Retrieved {len(events)} events") all_events.extend(events) # If we got fewer events than RecordSize, we've reached the end if len(events) < record_size_limited: break # For pagination, update StartDate to the timestamp of the last event last_event = events[-1] last_timestamp = extract_event_timestamp(last_event) if not last_timestamp: print("Warning: Could not find timestamp in last event for pagination") break # Convert to datetime and add 1 second to avoid retrieving the same event again try: dt = parse_timestamp(last_timestamp) dt = dt + timedelta(seconds=1) current_start_date = dt.strftime("%Y-%m-%dT%H:%M:%SZ") except Exception as e: print(f"Error parsing timestamp {last_timestamp}: {e}") break except Exception as e: print(f"Error fetching page {iterations}: {e}") break return all_events def extract_event_timestamp(event): """Extract timestamp from event, checking multiple possible fields""" # Check common timestamp fields timestamp_fields = ['event.dateTime', 'event.timestamp', 'timestamp', 'eventTime', 'dateTime', 'whenOccurred', 'date', 'time', 'event.ingested'] # Try nested event.dateTime first (common in BeyondTrust) if isinstance(event, dict) and isinstance(event.get("event"), dict): ts = event["event"].get("dateTime") if ts: return ts ts = event["event"].get("timestamp") if ts: return ts # Fallback to other timestamp fields for field in timestamp_fields: if field in event and event[field]: return event[field] return None def parse_timestamp(timestamp_str): """Parse timestamp string to datetime object, handling various formats""" if isinstance(timestamp_str, (int, float)): # Unix timestamp (in milliseconds or seconds) if timestamp_str > 1e12: # Milliseconds return datetime.fromtimestamp(timestamp_str / 1000, tz=timezone.utc) else: # Seconds return datetime.fromtimestamp(timestamp_str, tz=timezone.utc) if isinstance(timestamp_str, str): # Try different string formats try: # ISO format with Z if timestamp_str.endswith('Z'): return datetime.fromisoformat(timestamp_str.replace('Z', '+00:00')) # ISO format with timezone elif '+' in timestamp_str or timestamp_str.endswith('00:00'): return datetime.fromisoformat(timestamp_str) # ISO format without timezone (assume UTC) else: dt = datetime.fromisoformat(timestamp_str) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return dt except ValueError: pass raise ValueError(f"Could not parse timestamp: {timestamp_str}") def load_state(bucket, key): """Load state from GCS.""" try: blob = bucket.blob(key) if blob.exists(): state_data = blob.download_as_text() return json.loads(state_data) except Exception as e: print(f'Warning: Could not load state: {str(e)}') return {} def save_state(bucket, key, state): """Save state to GCS.""" try: blob = bucket.blob(key) blob.upload_from_string( json.dumps(state), content_type='application/json' ) except Exception as e: print(f'Warning: Could not save state: {str(e)}') def store_events_to_gcs(bucket, key, events): """Store events as JSONL (one JSON object per line) in GCS""" # Convert to JSONL format (one JSON object per line) jsonl_content = '\n'.join(json.dumps(event, default=str) for event in events) blob = bucket.blob(key) blob.upload_from_string(jsonl_content, content_type='application/x-ndjson') def get_latest_event_timestamp(events): """Get the latest timestamp from the events for state tracking""" if not events: return datetime.utcnow().isoformat() + 'Z' latest = None for event in events: timestamp = extract_event_timestamp(event) if timestamp: try: event_dt = parse_timestamp(timestamp) event_iso = event_dt.isoformat() + 'Z' if latest is None or event_iso > latest: latest = event_iso except Exception as e: print(f"Error parsing event timestamp {timestamp}: {e}") continue return latest or datetime.utcnow().isoformat() + 'Z'- 2 つ目のファイル: requirements.txt:
functions-framework==3.* google-cloud-storage==2.* urllib3>=2.0.0[デプロイ] をクリックして、関数を保存してデプロイします。
デプロイが完了するまで待ちます(2 ~ 3 分)。
Cloud Scheduler ジョブの作成
Cloud Scheduler は、定期的に Pub/Sub トピックにメッセージをパブリッシュし、Cloud Run functions の関数をトリガーします。
- GCP Console で、[Cloud Scheduler] に移動します。
- [ジョブを作成] をクリックします。
次の構成情報を提供してください。
設定 値 名前 beyondtrust-epm-collector-hourlyリージョン Cloud Run functions と同じリージョンを選択する 周波数 0 * * * *(1 時間ごとに正時)タイムゾーン タイムゾーンを選択します(UTC を推奨)。 ターゲット タイプ Pub/Sub トピック トピック beyondtrust-epm-triggerを選択します。メッセージ本文 {}(空の JSON オブジェクト)[作成] をクリックします。
スケジュールの頻度のオプション
ログの量とレイテンシの要件に基づいて頻度を選択します。
頻度 CRON 式 ユースケース 5 分毎 */5 * * * *大容量、低レイテンシ 15 分ごと */15 * * * *検索量が普通 1 時間ごと 0 * * * *標準(推奨) 6 時間ごと 0 */6 * * *少量、バッチ処理 毎日 0 0 * * *履歴データの収集
スケジューラ ジョブをテストする
- Cloud Scheduler コンソールで、ジョブを見つけます。
- [強制実行] をクリックして手動でトリガーします。
- 数秒待ってから、[Cloud Run > サービス > beyondtrust-epm-collector > ログ] に移動します。
- 関数が正常に実行されたことを確認します。
- GCS バケットをチェックして、ログが書き込まれたことを確認します。
Google SecOps サービス アカウントを取得する
Google SecOps は、一意のサービス アカウントを使用して GCS バケットからデータを読み取ります。このサービス アカウントにバケットへのアクセス権を付与する必要があります。
サービス アカウントのメールアドレスを取得する
- [SIEM 設定] > [フィード] に移動します。
- [Add New Feed] をクリックします。
- [単一フィードを設定] をクリックします。
- [フィード名] フィールドに、フィードの名前を入力します(例:
BeyondTrust EPM logs)。 - [ソースタイプ] として [Google Cloud Storage V2] を選択します。
- [ログタイプ] として [BeyondTrust Endpoint Privilege Management] を選択します。
[サービス アカウントを取得する] をクリックします。一意のサービス アカウント メールアドレスが表示されます(例:)。
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.comこのメールアドレスをコピーして、次のステップで使用します。
Google SecOps サービス アカウントに IAM 権限を付与する
Google SecOps サービス アカウントには、GCS バケットに対する Storage オブジェクト閲覧者ロールが必要です。
- [Cloud Storage] > [バケット] に移動します。
- バケット名をクリックします。
- [権限] タブに移動します。
- [アクセス権を付与] をクリックします。
- 次の構成の詳細を指定します。
- プリンシパルを追加: Google SecOps サービス アカウントのメールアドレスを貼り付けます。
- ロールを割り当てる: [ストレージ オブジェクト閲覧者] を選択します。
[保存] をクリックします。
BeyondTrust EPM のログを取り込むように Google SecOps でフィードを構成する
- [SIEM 設定] > [フィード] に移動します。
- [Add New Feed] をクリックします。
- [単一フィードを設定] をクリックします。
- [フィード名] フィールドに、フィードの名前を入力します(例:
BeyondTrust EPM logs)。 - [ソースタイプ] として [Google Cloud Storage V2] を選択します。
- [ログタイプ] として [BeyondTrust Endpoint Privilege Management] を選択します。
- [次へ] をクリックします。
次の入力パラメータの値を指定します。
ストレージ バケットの URL: 接頭辞パスを含む GCS バケット URI を入力します。
gs://beyondtrust-epm-logs/beyondtrust-epm/次のように置き換えます。
beyondtrust-epm-logs: GCS バケット名。beyondtrust-epm/: ログが保存されるオプションの接頭辞/フォルダパス(ルートの場合は空のままにします)。
例:
- ルートバケット:
gs://beyondtrust-epm-logs/ - 接頭辞あり:
gs://beyondtrust-epm-logs/beyondtrust-epm/
- ルートバケット:
Source deletion option: 必要に応じて削除オプションを選択します。
- なし: 転送後にファイルを削除しません(テストにおすすめ)。
- 転送されたファイルを削除する: 転送が完了した後にファイルを削除します。
転送されたファイルと空のディレクトリを削除する: 転送が完了した後にファイルと空のディレクトリを削除します。
ファイルの最大経過日数: 指定した日数以内に変更されたファイルを含めます。デフォルトは 180 日です。
アセットの名前空間: アセットの名前空間。
Ingestion labels: このフィードのイベントに適用されるラベル。
[次へ] をクリックします。
[Finalize] 画面で新しいフィードの設定を確認し、[送信] をクリックします。
UDM マッピング テーブル
| ログフィールド | UDM マッピング | 論理 |
|---|---|---|
| agent.id | principal.asset.attribute.labels.value | キー agent_id を持つラベルにマッピングされます |
| agent.version | principal.asset.attribute.labels.value | キー agent_version を持つラベルにマッピングされます |
| ecs.version | principal.asset.attribute.labels.value | キー ecs_version を持つラベルにマッピングされます |
| event_data.reason | metadata.description | 未加工ログのイベントの説明 |
| event_datas.ActionId | metadata.product_log_id | プロダクト固有のログ識別子 |
| file.path | principal.file.full_path | イベントのフルファイルパス |
| headers.content_length | additional.fields.value.string_value | キー content_length のラベルにマッピングされます |
| headers.content_type | additional.fields.value.string_value | キー content_type のラベルにマッピングされます |
| headers.http_host | additional.fields.value.string_value | キー http_host を持つラベルにマッピングされます |
| headers.http_version | network.application_protocol_version | HTTP プロトコル バージョン |
| headers.request_method | network.http.method | HTTP リクエスト メソッド |
| host.hostname | principal.hostname | プリンシパル ホスト名 |
| host.hostname | principal.asset.hostname | プリンシパル アセットのホスト名 |
| host.ip | principal.asset.ip | プリンシパル アセットの IP アドレス |
| host.ip | principal.ip | プリンシパル IP アドレス |
| host.mac | principal.mac | プリンシパル MAC アドレス |
| host.os.platform | principal.platform | macOS と等しい場合は MAC に設定 |
| host.os.version | principal.platform_version | OS のバージョン |
| labels.related_item_id | metadata.product_log_id | 関連アイテムの識別子 |
| process.command_line | principal.process.command_line | プロセス コマンドライン |
| process.name | additional.fields.value.string_value | キー process_name を持つラベルにマッピングされます |
| process.parent.name | additional.fields.value.string_value | キー process_parent_name を持つラベルにマッピングされます |
| process.parent.pid | principal.process.parent_process.pid | 親プロセスの PID を文字列に変換したもの |
| process.pid | principal.process.pid | プロセス PID を文字列に変換 |
| user.id | principal.user.userid | ユーザー識別子 |
| user.name | principal.user.user_display_name | ユーザーの表示名 |
| なし | metadata.event_timestamp | イベントのタイムスタンプがログエントリのタイムスタンプに設定されている |
| なし | metadata.event_type | プリンシパルがない場合は GENERIC_EVENT、それ以外の場合は STATUS_UPDATE |
| なし | network.application_protocol | http_version フィールドに HTTP が含まれている場合は HTTP に設定 |
ご不明な点がございましたら、コミュニティ メンバーや Google SecOps のプロフェッショナルから回答を得ることができます。