ForgeRock OpenIDM のログを収集する
このドキュメントでは、Google Cloud Storage V2 を使用して ForgeRock OpenIDM ログを Google Security Operations に取り込む方法について説明します。
ForgeRock OpenIDM(現在は PingIDM としてブランド化)は、ユーザーのプロビジョニング、同期、パスワード管理、アクセス ガバナンスを提供する ID 管理プラットフォームです。ID ライフサイクル イベント、認証試行、調整オペレーション、構成変更を REST 経由でアクセス可能な監査ログに記録します。
始める前に
次の前提条件を満たしていることを確認します。
- Google SecOps インスタンス
- Cloud Storage API が有効になっている GCP プロジェクト
- GCS バケットを作成および管理する権限
- GCS バケットの IAM ポリシーを管理する権限
- Cloud Run サービス、Pub/Sub トピック、Cloud Scheduler ジョブを作成する権限
- 管理認証情報を使用した ForgeRock OpenIDM または PingIDM インスタンスへの特権アクセス
Google Cloud Storage バケットを作成する
- Google Cloud Console に移動します。
- プロジェクトを選択するか、新しいプロジェクトを作成します。
- ナビゲーション メニューで、[Cloud Storage > バケット] に移動します。
- [バケットを作成] をクリックします。
次の構成情報を提供してください。
設定 値 バケットに名前を付ける グローバルに一意の名前( forgerock-openidm-audit-logsなど)を入力します。ロケーション タイプ ニーズに基づいて選択します(リージョン、デュアルリージョン、マルチリージョン)。 ロケーション ロケーションを選択します(例: us-central1)。ストレージ クラス Standard(頻繁にアクセスされるログにおすすめ) アクセス制御 均一(推奨) 保護ツール 省略可: オブジェクトのバージョニングまたは保持ポリシーを有効にする [作成] をクリックします。
ForgeRock OpenIDM の認証情報を収集する
ForgeRock OpenIDM ベース URL を取得する
- ForgeRock OpenIDM または PingIDM インスタンスにログインします。
- ブラウザのアドレスバーからベース URL をメモします。
- 形式:
https://openidm.example.com - 末尾のスラッシュや
/adminなどのパスは含めないでください。
- 形式:
管理者の認証情報を取得する
- ForgeRock OpenIDM インスタンスの管理者認証情報を取得します。
以下のものが必要になります。
- ユーザー名: 管理ユーザー名(
openidm-adminなど) - パスワード: 管理パスワード
- ユーザー名: 管理ユーザー名(
権限を確認する
アカウントに必要な権限があることを確認するには:
- ForgeRock OpenIDM にログインします。
- [Configure] > [System Preferences] > [Audit] に移動します。
- 監査構成とトピックが表示される場合は、必要な権限が付与されています。
- このオプションが表示されない場合は、管理者に監査読み取り権限の付与を依頼してください。
テスト API アクセス
統合に進む前に、認証情報をテストします。
# Replace with your actual credentials
OPENIDM_BASE_URL="https://openidm.example.com"
OPENIDM_USERNAME="openidm-admin"
OPENIDM_PASSWORD="your-admin-password"
# Test API access to authentication audit topic
curl -v \
-H "X-OpenIDM-Username: ${OPENIDM_USERNAME}" \
-H "X-OpenIDM-Password: ${OPENIDM_PASSWORD}" \
-H "Accept-API-Version: resource=1.0" \
-H "Accept: application/json" \
"${OPENIDM_BASE_URL}/openidm/audit/authentication?_queryFilter=true&_pageSize=1"
想定されるレスポンス: 監査イベントを含む JSON を含む HTTP 200。
Cloud Run functions のサービス アカウントを作成する
Cloud Run 関数には、GCS バケットに書き込み、Pub/Sub によって呼び出される権限を持つサービス アカウントが必要です。
サービス アカウントの作成
- GCP Console で、[IAM と管理>サービス アカウント] に移動します。
- [サービス アカウントを作成] をクリックします。
- 次の構成の詳細を指定します。
- サービス アカウント名: 「
forgerock-openidm-collector-sa」と入力します。 - サービス アカウントの説明: 「
Service account for Cloud Run function to collect ForgeRock OpenIDM logs」と入力します。
- サービス アカウント名: 「
- [作成して続行] をクリックします。
- [このサービス アカウントにプロジェクトへのアクセスを許可する] セクションで、次のロールを追加します。
- [ロールを選択] をクリックします。
- [ストレージ オブジェクト管理者] を検索して選択します。
- [+ 別のロールを追加] をクリックします。
- [Cloud Run 起動元] を検索して選択します。
- [+ 別のロールを追加] をクリックします。
- [Cloud Functions 起動元] を検索して選択します。
- [続行] をクリックします。
- [完了] をクリックします。
これらのロールは、次の目的で必要です。
- Storage オブジェクト管理者: ログを GCS バケットに書き込み、状態ファイルを管理する
- Cloud Run 起動元: Pub/Sub が関数を呼び出すことを許可する
- Cloud Functions 起動元: 関数の呼び出しを許可する
GCS バケットに対する IAM 権限を付与する
GCS バケットに対する書き込み権限をサービス アカウントに付与します。
- [Cloud Storage] > [バケット] に移動します。
- バケット名(
forgerock-openidm-audit-logsなど)をクリックします。 - [権限] タブに移動します。
- [アクセス権を付与] をクリックします。
- 次の構成の詳細を指定します。
- プリンシパルを追加: サービス アカウントのメールアドレス(例:
forgerock-openidm-collector-sa@PROJECT_ID.iam.gserviceaccount.com)を入力します。 - ロールを割り当てる: [Storage オブジェクト管理者] を選択します。
- プリンシパルを追加: サービス アカウントのメールアドレス(例:
- [保存] をクリックします。
Pub/Sub トピックの作成
Cloud Scheduler がパブリッシュし、Cloud Run functions がサブスクライブする Pub/Sub トピックを作成します。
- GCP Console で、[Pub/Sub> トピック] に移動します。
- [トピックを作成] をクリックします。
- 次の構成の詳細を指定します。
- トピック ID: 「
forgerock-openidm-trigger」と入力します。 - その他の設定はデフォルトのままにします。
- トピック ID: 「
- [作成] をクリックします。
ログを収集する Cloud Run 関数を作成する
Cloud Run 関数は、Cloud Scheduler からの Pub/Sub メッセージによってトリガーされ、ForgeRock OpenIDM API からログを取得して GCS に書き込みます。
- GCP Console で、[Cloud Run] に移動します。
- [サービスを作成] をクリックします。
- [関数] を選択します(インライン エディタを使用して関数を作成します)。
[構成] セクションで、次の構成の詳細を指定します。
設定 値 サービス名 forgerock-openidm-collectorリージョン GCS バケットと一致するリージョンを選択します(例: us-central1)。ランタイム [Python 3.12] 以降を選択します。 [トリガー(省略可)] セクションで、次の操作を行います。
- [+ トリガーを追加] をクリックします。
- [Cloud Pub/Sub] を選択します。
- [Cloud Pub/Sub トピックを選択してください] で、Pub/Sub トピック
forgerock-openidm-triggerを選択します。 - [保存] をクリックします。
[認証] セクションで、次の操作を行います。
- [認証が必要] を選択します。
- Identity and Access Management(IAM)を確認します。
[コンテナ、ネットワーキング、セキュリティ] までスクロールして開きます。
[セキュリティ] タブに移動します。
- サービス アカウント: サービス アカウント
forgerock-openidm-collector-saを選択します。
- サービス アカウント: サービス アカウント
[コンテナ] タブに移動します。
- [変数とシークレット] をクリックします。
環境変数ごとに [+ 変数を追加] をクリックします。
変数名 値の例 説明 GCS_BUCKETforgerock-openidm-audit-logsGCS バケット名 GCS_PREFIXopenidmログファイルの接頭辞 STATE_KEYopenidm/state.json状態ファイルのパス OPENIDM_BASE_URLhttps://openidm.example.comOpenIDM ベース URL OPENIDM_USERNAMEopenidm-adminOpenIDM 管理者のユーザー名 OPENIDM_PASSWORDyour-admin-passwordOpenIDM 管理者パスワード AUDIT_TOPICSaccess,activity,authentication,config,syncカンマ区切りの監査トピック PAGE_SIZE1001 ページあたりのレコード数 MAX_PAGES50トピックあたりの最大ページ数
[変数とシークレット] セクションで、[リクエスト] まで下にスクロールします。
- リクエストのタイムアウト:
600秒(10 分)を入力します。
- リクエストのタイムアウト:
[設定] タブに移動します。
- [リソース] セクションで次の操作を行います。
- メモリ: 512 MiB 以上を選択します。
- CPU: [1] を選択します。
- [リビジョン スケーリング] セクションで、次の操作を行います。
- [インスタンスの最小数] に「
0」と入力します。 - インスタンスの最大数:
100と入力します(または、予想される負荷に基づいて調整します)。
- [インスタンスの最小数] に「
- [リソース] セクションで次の操作を行います。
[作成] をクリックします。
サービスが作成されるまで待ちます(1 ~ 2 分)。
サービスが作成されると、インライン コードエディタが自動的に開きます。
関数コードを追加する
- [エントリ ポイント] フィールドに「main」と入力します。
インライン コードエディタで、次の 2 つのファイルを作成します。
- 最初のファイル: main.py:
import functions_framework from google.cloud import storage import json import os import urllib3 from datetime import datetime, timezone # Initialize HTTP client with timeouts http = urllib3.PoolManager( timeout=urllib3.Timeout(connect=5.0, read=30.0), retries=False, ) # Initialize Storage client storage_client = storage.Client() # Environment variables GCS_BUCKET = os.environ.get('GCS_BUCKET') GCS_PREFIX = os.environ.get('GCS_PREFIX', 'openidm') STATE_KEY = os.environ.get('STATE_KEY', 'openidm/state.json') OPENIDM_BASE_URL = os.environ.get('OPENIDM_BASE_URL') OPENIDM_USERNAME = os.environ.get('OPENIDM_USERNAME') OPENIDM_PASSWORD = os.environ.get('OPENIDM_PASSWORD') AUDIT_TOPICS = os.environ.get('AUDIT_TOPICS', 'access,activity,authentication,config,sync').split(',') PAGE_SIZE = int(os.environ.get('PAGE_SIZE', '100')) MAX_PAGES = int(os.environ.get('MAX_PAGES', '50')) @functions_framework.cloud_event def main(cloud_event): """ Cloud Run function triggered by Pub/Sub to fetch ForgeRock OpenIDM logs and write to GCS. Args: cloud_event: CloudEvent object containing Pub/Sub message """ if not all([GCS_BUCKET, OPENIDM_BASE_URL, OPENIDM_USERNAME, OPENIDM_PASSWORD]): print('Error: Missing required environment variables') return try: # Get GCS bucket bucket = storage_client.bucket(GCS_BUCKET) # Load state state = load_state(bucket, STATE_KEY) all_events = [] for topic in AUDIT_TOPICS: topic = topic.strip() print(f"Fetching audit logs for topic: {topic}") events = fetch_audit_logs(topic, state.get(topic, {})) all_events.extend(events) if events: latest_timestamp = max(e.get('timestamp', '') for e in events) state[topic] = { 'last_timestamp': latest_timestamp, 'last_run': datetime.now(timezone.utc).isoformat(), 'events_count': len(events) } if all_events: write_to_gcs(bucket, all_events) save_state(bucket, STATE_KEY, state) print(f"Successfully processed {len(all_events)} audit events") else: print("No new audit events to process") except Exception as e: print(f'Error processing logs: {str(e)}') raise def load_state(bucket, key): """Load state from GCS.""" try: blob = bucket.blob(key) if blob.exists(): state_data = blob.download_as_text() return json.loads(state_data) except Exception as e: print(f"Warning: Could not load state: {e}") return {} def save_state(bucket, key, state): """Save state to GCS.""" try: blob = bucket.blob(key) blob.upload_from_string( json.dumps(state, indent=2), content_type='application/json' ) print(f"Saved state: {json.dumps(state)}") except Exception as e: print(f"Warning: Could not save state: {e}") def fetch_audit_logs(topic, topic_state): """ Fetch audit logs from ForgeRock OpenIDM API with pagination. Args: topic: Audit topic name topic_state: State dictionary for this topic Returns: List of audit events """ base_url = OPENIDM_BASE_URL.rstrip('/') all_events = [] last_timestamp = topic_state.get('last_timestamp') query_filter = 'true' if last_timestamp: query_filter = f'timestamp gt "{last_timestamp}"' page_offset = 0 page_count = 0 while page_count < MAX_PAGES: try: url = f"{base_url}/openidm/audit/{topic}" params = { '_queryFilter': query_filter, '_pageSize': str(PAGE_SIZE), '_pagedResultsOffset': str(page_offset), '_sortKeys': 'timestamp' } query_string = '&'.join([f"{k}={urllib3.request.urlencode({k: v})[len(k)+1:]}" for k, v in params.items()]) full_url = f"{url}?{query_string}" headers = { 'X-OpenIDM-Username': OPENIDM_USERNAME, 'X-OpenIDM-Password': OPENIDM_PASSWORD, 'Accept-API-Version': 'resource=1.0', 'Accept': 'application/json' } response = http.request('GET', full_url, headers=headers) if response.status != 200: print(f"API error for topic {topic}: {response.status} - {response.data.decode('utf-8')}") break data = json.loads(response.data.decode('utf-8')) events = data.get('result', []) if not events: print(f"No more events for topic {topic}") break all_events.extend(events) page_offset += PAGE_SIZE page_count += 1 print(f"Fetched page {page_count} for topic {topic}, total events: {len(all_events)}") if len(events) < PAGE_SIZE: break except urllib3.exceptions.HTTPError as e: print(f"HTTP error for topic {topic}: {str(e)}") break except json.JSONDecodeError as e: print(f"JSON decode error for topic {topic}: {str(e)}") break except Exception as e: print(f"Unexpected error for topic {topic}: {str(e)}") break return all_events def write_to_gcs(bucket, events): """Write events to GCS as NDJSON.""" timestamp = datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S') filename = f"{GCS_PREFIX}/openidm_audit_{timestamp}.json" ndjson_content = '\n'.join([json.dumps(event) for event in events]) blob = bucket.blob(filename) blob.upload_from_string( ndjson_content.encode('utf-8'), content_type='application/x-ndjson' ) print(f"Wrote {len(events)} events to gs://{GCS_BUCKET}/{filename}")- 2 つ目のファイル: requirements.txt:
functions-framework==3.* google-cloud-storage==2.* urllib3>=2.0.0[デプロイ] をクリックして、関数を保存してデプロイします。
デプロイが完了するまで待ちます(2 ~ 3 分)。
Cloud Scheduler ジョブの作成
Cloud Scheduler は、定期的に Pub/Sub トピックにメッセージをパブリッシュし、Cloud Run functions の関数をトリガーします。
- GCP Console で、[Cloud Scheduler] に移動します。
- [ジョブを作成] をクリックします。
次の構成情報を提供してください。
設定 値 名前 forgerock-openidm-collector-hourlyリージョン Cloud Run functions と同じリージョンを選択する 周波数 0 * * * *(1 時間ごとに正時)タイムゾーン タイムゾーンを選択します(UTC を推奨)。 ターゲット タイプ Pub/Sub トピック Pub/Sub トピック forgerock-openidm-triggerを選択します。メッセージ本文 {}(空の JSON オブジェクト)[作成] をクリックします。
スケジュールの頻度のオプション
ログの量とレイテンシの要件に基づいて頻度を選択します。
| 頻度 | CRON 式 | ユースケース |
|---|---|---|
| 5 分おき | */5 * * * * |
大容量、低レイテンシ |
| 15 分ごと | */15 * * * * |
検索量が普通 |
| 1 時間ごと | 0 * * * * |
標準(推奨) |
| 6 時間ごと | 0 */6 * * * |
少量、バッチ処理 |
| 毎日 | 0 0 * * * |
履歴データの収集 |
統合をテストする
- Cloud Scheduler コンソールで、ジョブ
forgerock-openidm-collector-hourlyを見つけます。 - [強制実行] をクリックして、ジョブを手動でトリガーします。
- 数秒待ちます。
- Cloud Run > サービスに移動します。
- 関数名
forgerock-openidm-collectorをクリックします。 - [Logs] タブをクリックします。
関数が正常に実行されたことを確認します。以下のものを探します。
Fetching audit logs for topic: access Fetched page 1 for topic access, total events: X Wrote X events to gs://forgerock-openidm-audit-logs/openidm/openidm_audit_YYYYMMDD_HHMMSS.json Successfully processed X audit events[Cloud Storage] > [バケット] に移動します。
バケット名
forgerock-openidm-audit-logsをクリックします。プレフィックス フォルダ
openidm/に移動します。現在のタイムスタンプで新しい
.jsonファイルが作成されたことを確認します。
ログにエラーが表示された場合:
- HTTP 401: 環境変数で OpenIDM 認証情報を確認する
- HTTP 403: アカウントに監査の読み取り権限があることを確認する
- HTTP 404: OPENIDM_BASE_URL が正しく、末尾にパスが含まれていないことを確認します
- 環境変数が不足している: 必要な変数がすべて設定されていることを確認します
Google SecOps サービス アカウントを取得する
Google SecOps は、一意のサービス アカウントを使用して GCS バケットからデータを読み取ります。このサービス アカウントにバケットへのアクセス権を付与する必要があります。
ForgeRock OpenIDM のログを取り込むように Google SecOps でフィードを構成する
- [SIEM 設定] > [フィード] に移動します。
- [Add New Feed] をクリックします。
- [単一フィードを設定] をクリックします。
- [フィード名] フィールドに、フィードの名前を入力します(例:
ForgeRock OpenIDM Audit Logs)。 - [ソースタイプ] として [Google Cloud Storage V2] を選択します。
[ログタイプ] として [FORGEROCK_OPENIDM] を選択します。
[サービス アカウントを取得する] をクリックします。
一意のサービス アカウント メールアドレスが表示されます(例:)。
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.comこのメールアドレスをコピーします。次のステップでこれを使用します。
[次へ] をクリックします。
次の入力パラメータの値を指定します。
- ストレージ バケットの URL: 接頭辞パスを含む GCS バケット URI を入力します。
gs://forgerock-openidm-audit-logs/openidm/- 次のように置き換えます。
forgerock-openidm-audit-logs: GCS バケット名。openidm: ログが保存される接頭辞パス。
- Source deletion option: 必要に応じて削除オプションを選択します。
- なし: 転送後にファイルを削除しません(テストにおすすめ)。
- 転送されたファイルを削除する: 転送が完了した後にファイルを削除します。
- 転送されたファイルと空のディレクトリを削除する: 転送が完了した後にファイルと空のディレクトリを削除します。
- ファイルの最大経過日数: 指定した日数以内に変更されたファイルを含めます。デフォルトは 180 日です。
- アセットの名前空間: アセットの名前空間。
- Ingestion labels: このフィードのイベントに適用されるラベル。
[次へ] をクリックします。
[Finalize] 画面で新しいフィードの設定を確認し、[送信] をクリックします。
Google SecOps サービス アカウントに IAM 権限を付与する
Google SecOps サービス アカウントには、GCS バケットに対する Storage オブジェクト閲覧者ロールが必要です。
- [Cloud Storage] > [バケット] に移動します。
- バケット名
forgerock-openidm-audit-logsをクリックします。 - [権限] タブに移動します。
- [アクセス権を付与] をクリックします。
- 次の構成の詳細を指定します。
- プリンシパルを追加: Google SecOps サービス アカウントのメールアドレスを貼り付けます。
- ロールを割り当てる: [ストレージ オブジェクト閲覧者] を選択します。
[保存] をクリックします。
UDM マッピング テーブル
| ログフィールド | UDM マッピング | ロジック |
|---|---|---|
| additional_label、additional_elapsed_time、additional_ContentLength、additional_accept_encoding、additional_Accept、additional_accept_language、additional_origin_hop、additional_cache_control、additional_Connection、additional_Cookie、additional_Pragma、additional_exchange_id、additional_contentType、additional_X-content_type-Options、fluenttag_label、source_label、topic_label、request_protocol_label、taskName_label、linkQualifier_label、situation_label、mapping_label、eventid_label、context_roles_label、field_names_label | additional.fields | 追加の Key-Value ペア |
| Via | intermediary.hostname | 仲介者のホスト名 |
| x_forwarded_ip、ip、caller.callerIps | intermediary.ip | 仲介者の IP アドレス |
| timestamp | metadata.event_timestamp | イベントのタイムスタンプ |
| metadata.event_type | イベントのタイプ | |
| transactionId | metadata.product_deployment_id | プロダクトのデプロイ ID |
| eventName | metadata.product_event_type | プロダクトのイベントタイプ |
| _id、trackingIds | metadata.product_log_id | プロダクトのログ ID |
| http.request.secure | network.application_protocol | アプリケーション プロトコル |
| http_version | network.application_protocol_version | アプリケーション プロトコルのバージョン |
| request_method、http.request.method | network.http.method | HTTP メソッド |
| user_agent、http.request.headers.user_agent.0 | network.http.parsed_user_agent | 解析されたユーザー エージェント |
| refferal_url | network.http.referral_url | 参照 URL |
| response.statusCode, status_code | network.http.response_code | HTTP レスポンス コード |
| user_agent、http.request.headers.user_agent | network.http.user_agent | ユーザー エージェント文字列 |
| transaction_id, transactionId | network.session_id | セッション ID |
| ホスト | principal.asset.hostname | プリンシパルのアセットのホスト名 |
| true_client_ip、client.ip、context.ipAddress、entry.info.ipAddress、src_ip | principal.asset.ip | プリンシパルのアセットの IP アドレス |
| ホスト | principal.hostname | プリンシパルのホスト名 |
| true_client_ip、client.ip、context.ipAddress、entry.info.ipAddress、src_ip | principal.ip | プリンシパルの IP アドレス |
| client.port、src_port | principal.port | プリンシパルのポート |
| component_label、moduleId_label、query_id_label | principal.resource.attribute.labels | プリンシパルのリソースの属性ラベル |
| entry.info.treeName | principal.resource.name | プリンシパルのリソースの名前 |
| sourceObjectId、objectId、entry.info.nodeId | principal.resource.product_object_id | プリンシパルのリソースのプロダクト オブジェクト ID |
| entry.info.authLevel | principal.resource.resource_subtype | プリンシパルのリソースのサブタイプ |
| user_roles_property_label、authentication_id_label、authentication_id_property_label | principal.user.attribute.labels | プリンシパルのユーザーの属性ラベル |
| userId, principalData.0 | principal.user.userid | プリンシパルのユーザー ID |
| security_action | security_result.action | セキュリティ結果で実行されたアクション |
| 結果、アクション | security_result.action_details | アクションの詳細 |
| result_label、moduleId_label、nodeType_label、displayName_label、nodeOutcome_label、elapsedTimeUnits_label、elapsedTime_label、operation_label、taskName_label、linkQualifier_label、situation_label、mapping_label | security_result.detection_fields | 検出フィールド |
| レベル | security_result.severity | セキュリティ結果の重大度 |
| レベル | security_result.severity_details | 重大度の詳細 |
| response_detail_reason | security_result.summary | セキュリティ結果の概要 |
| http.request.headers.host.0 | target.asset.hostname | ターゲットのアセットのホスト名 |
| server.ip、x_forwarded_ip | target.asset.ip | ターゲットのアセットの IP アドレス |
| http.request.headers.host.0 | target.hostname | ターゲットのホスト名 |
| server.ip、x_forwarded_ip | target.ip | ターゲットの IP アドレス |
| server.port | target.port | ターゲットのポート |
| targetObjectId | target.resource.product_object_id | ターゲットのリソースのアイテム オブジェクト ID |
| http.request.path | target.url | ターゲットの URL |
| metadata.product_name | 商品名 | |
| metadata.vendor_name | ベンダー名 |
さらにサポートが必要な場合 コミュニティ メンバーや Google SecOps のプロフェッショナルから回答を得ることができます。