HYPR MFA ログを収集する
このドキュメントでは、Webhook または Google Cloud Storage V2 を使用して HYPR MFA ログを Google Security Operations に取り込む方法について説明します。
HYPR MFA は、FIDO2 パスキー、生体認証、モバイル開始ログインを使用してフィッシング耐性のある認証を提供するパスワードレスの多要素認証ソリューションです。HYPR は、従来のパスワードを安全な公開鍵暗号に置き換えて、認証情報ベースの攻撃を排除し、ワークステーション、ウェブ アプリケーション、クラウド サービス全体でユーザー認証を合理化します。
始める前に
次の前提条件を満たしていることを確認します。
- Google SecOps インスタンス
- HYPR Control Center への管理者アクセス
- モニタリングする RP アプリケーションのカスタム イベントフックを有効にするには、HYPR サポートにお問い合わせください。
収集方法の違い
HYPR MFA は、Google Security Operations にログを送信する 2 つの方法をサポートしています。
- Webhook(推奨): HYPR は、カスタム イベントフックを介してイベントをリアルタイムで Google Security Operations に送信します。この方法では、イベントがすぐに配信され、追加のインフラストラクチャは必要ありません。
- Google Cloud Storage: HYPR イベントは API 経由で収集され、GCS に保存されてから、Google Security Operations に取り込まれます。この方法では、バッチ処理と履歴データの保持が可能です。
要件に最適な方法を選択します。
| 機能 | Webhook | Google Cloud Storage |
|---|---|---|
| レイテンシ | リアルタイム(秒) | バッチ(数分から数時間) |
| インフラストラクチャ | 必要なし | Cloud Run 関数を含む GCP プロジェクト |
| 過去のデータ | イベント ストリームに限定 | GCS での完全な保持 |
| 設定の難易度 | シンプル | 中 |
| 費用 | 最小 | GCP のコンピューティング費用とストレージ費用 |
オプション 1: Webhook 統合を構成する
Google SecOps で Webhook フィードを作成する
フィードを作成する
- [SIEM 設定] > [フィード] に移動します。
- [Add New Feed] をクリックします。
- 次のページで [単一のフィードを設定] をクリックします。
- [フィード名] フィールドに、フィードの名前を入力します(例:
HYPR MFA Events)。 - [Source type] として [Webhook] を選択します。
- [ログタイプ] として [HYPR MFA] を選択します。
- [次へ] をクリックします。
- 次の入力パラメータの値を指定します。
- Split delimiter(省略可): 空白のままにします。各 Webhook リクエストには、単一の JSON イベントが含まれます。
- アセットの名前空間: アセットの名前空間。
- Ingestion labels: このフィードのイベントに適用されるラベル。
- [次へ] をクリックします。
- [Finalize] 画面で新しいフィードの設定を確認し、[送信] をクリックします。
秘密鍵を生成して保存する
フィードを作成したら、認証用のシークレット キーを生成する必要があります。
- フィードの詳細ページで、[シークレット キーを生成] をクリックします。
- ダイアログに秘密鍵が表示されます。
- 秘密鍵をコピーして安全に保存します。
フィード エンドポイントの URL を取得する
- フィードの [詳細] タブに移動します。
- [エンドポイント情報] セクションで、[フィード エンドポイント URL] をコピーします。
URL の形式は次のとおりです。
https://malachiteingestion-pa.googleapis.com/v2/unstructuredlogentries:batchCreateまたは
https://<REGION>-malachiteingestion-pa.googleapis.com/v2/unstructuredlogentries:batchCreate次の手順で使用するため、この URL を保存します。
[完了] をクリックします。
Google Cloud API キーを作成する
Chronicle では、認証に API キーが必要です。Google Cloud コンソールで制限付き API キーを作成します。
API キーを作成する
- Google Cloud コンソールの [認証情報] ページに移動します。
- プロジェクト(Chronicle インスタンスに関連付けられているプロジェクト)を選択します。
- [認証情報を作成> API キー] をクリックします。
- API キーが作成され、ダイアログに表示されます。
- [API キーを編集] をクリックして、キーを制限します。
API キーを制限する
- [API キー] 設定ページで、次の操作を行います。
- 名前: わかりやすい名前を入力します(例:
Chronicle Webhook API Key)。
- 名前: わかりやすい名前を入力します(例:
- [API の制限] で次の操作を行います。
- [キーを制限] を選択します。
- [API を選択] プルダウンで、[Google SecOps API](または [Chronicle API])を検索して選択します。
- [保存] をクリックします。
- ページ上部の [API キー] フィールドから API キーの値をコピーします。
- API キーを安全に保存します。
HYPR MFA カスタム イベントフックを構成する
ヘッダーを含む Webhook URL を作成する
HYPR は認証用のカスタム ヘッダーをサポートしています。セキュリティを強化するには、ヘッダー認証方法を使用します。
エンドポイント URL(パラメータなし):
<ENDPOINT_URL>ヘッダー:
x-goog-chronicle-auth: <API_KEY> x-chronicle-auth: <SECRET_KEY>- 次のように置き換えます。
<ENDPOINT_URL>: 前の手順で取得したフィード エンドポイント URL。<API_KEY>: 作成した Google Cloud API キー。<SECRET_KEY>: Chronicle フィードの作成で取得した秘密鍵。
- 次のように置き換えます。
カスタム イベントフックの JSON 構成を準備する
HYPR カスタム イベントフックは JSON を使用して構成されます。次の JSON 構成を準備し、プレースホルダ値を置き換えます。
{ "name": "Chronicle SIEM Integration", "eventType": "ALL", "invocationEndpoint": "<ENDPOINT_URL>", "httpMethod": "POST", "authType": "API_KEY", "authParams": { "apiKeyAuthParameters": { "apiKeyName": "x-goog-chronicle-auth", "apiKeyValue": "<API_KEY>" }, "invocationHttpParameters": { "headerParameters": [ { "key": "Content-Type", "value": "application/json", "isValueSecret": false }, { "key": "x-chronicle-auth", "value": "<SECRET_KEY>", "isValueSecret": true } ] } } }次のように置き換えます。
<ENDPOINT_URL>: Chronicle フィード エンドポイントの URL。<API_KEY>: Google Cloud API キー。<SECRET_KEY>: Chronicle のシークレット キー。
構成パラメータ:
name: イベントフックのわかりやすい名前(例:
Chronicle SIEM Integration)。eventType: すべての HYPR イベントを送信する場合は
ALLに設定します。特定のイベントタグ(AUTHENTICATION、REGISTRATION、ACCESS_TOKENなど)を指定することもできます。invocationEndpoint: Chronicle フィード エンドポイントの URL。
httpMethod:
POSTに設定します。authType: API キー認証の場合は
API_KEYに設定します。apiKeyName: API キーのヘッダー名(
x-goog-chronicle-auth)。apiKeyValue: Google Cloud API キーの値。
headerParameters:
Content-Type: application/jsonとx-chronicle-authヘッダーの Chronicle シークレット キーを含む追加のヘッダー。
HYPR Control Center でカスタム イベントフックを作成する
- 管理者として HYPR Control Center にログインします。
- 左側のナビゲーション メニューで、[Integrations] をクリックします。
- [統合] ページで、[新しい統合を追加] をクリックします。
- HYPR Control Center に、利用可能な統合が表示されます。
- [カスタム イベント] の [イベントフック] の下のタイルをクリックします。
- [Add New Event Hook] をクリックします。
- [新しいイベントフックを追加] ダイアログで、準備した JSON コンテンツをテキスト フィールドに貼り付けます。
- [イベントフックを追加] をクリックします。
- HYPR Control Center が [イベントフック] ページに戻ります。
これで、カスタム イベントフックが構成され、Google SecOps へのイベントの送信が開始されます。
Webhook が機能していることを確認する
HYPR Control Center のイベントフックのステータスを確認する
- HYPR Control Center にログインします。
- [インテグレーション] に移動します。
- [カスタム イベント] 統合をクリックします。
- [イベントフック] テーブルで、イベントフックがリストされていることを確認します。
- イベントフック名をクリックして詳細を表示します。
- 構成が設定と一致していることを確認します。
Chronicle フィードのステータスを確認する
- Chronicle の [SIEM 設定] > [フィード] に移動します。
- Webhook フィードを見つけます。
- [ステータス] 列を確認します([有効] になっているはずです)。
- [Events received] のカウントを確認します(増加しているはずです)。
- [Last succeeded on](最後に成功した日時)のタイムスタンプを確認します(最新である必要があります)。
Chronicle でログを確認する
- [検索> UDM 検索] に移動します。
次のクエリを使用してください。
metadata.vendor_name = "HYPR" AND metadata.product_name = "MFA"期間を過去 1 時間に調整します。
結果にイベントが表示されることを確認します。
認証方法のリファレンス
HYPR カスタム イベントフックは、複数の認証方法をサポートしています。Chronicle で推奨される方法は、カスタム ヘッダーを使用した API キー認証です。
API キー認証(Chronicle に推奨)
構成:
{ "authType": "API_KEY", "authParams": { "apiKeyAuthParameters": { "apiKeyName": "x-goog-chronicle-auth", "apiKeyValue": "<API_KEY>" }, "invocationHttpParameters": { "headerParameters": [ { "key": "Content-Type", "value": "application/json", "isValueSecret": false }, { "key": "x-chronicle-auth", "value": "<SECRET_KEY>", "isValueSecret": true } ] } } }メリット:
- ヘッダーで送信される API キーとシークレット(URL パラメータよりも安全)。
- 複数の認証ヘッダーをサポートします。
- ヘッダーがウェブサーバーのアクセスログに記録されない。
基本認証
構成:
{ "authType": "BASIC", "authParams": { "basicAuthParameters": { "username": "your-username", "password": "your-password" }, "invocationHttpParameters": { "headerParameters": [ { "key": "Content-Type", "value": "application/json", "isValueSecret": false } ] } } }- ユースケース: ターゲット システムで HTTP 基本認証が必要な場合。
OAuth 2.0 クライアント認証情報
構成:
{ "authType": "OAUTH_CLIENT_CREDENTIALS", "authParams": { "oauthParameters": { "clientParameters": { "clientId": "your-client-id", "clientSecret": "your-client-secret" }, "authorizationEndpoint": "https://login.example.com/oauth2/v2.0/token", "httpMethod": "POST", "oauthHttpParameters": { "bodyParameters": [ { "key": "scope", "value": "api://your-api/.default", "isValueSecret": false }, { "key": "grant_type", "value": "client_credentials", "isValueSecret": false } ] } }, "invocationHttpParameters": { "headerParameters": [ { "key": "Content-Type", "value": "application/json", "isValueSecret": false } ] } } }- ユースケース: ターゲット システムで OAuth 2.0 認証が必要な場合。
イベントタイプとフィルタリング
HYPR イベントは eventTags パラメータを使用してグループ化されます。カスタム イベントフックを構成して、すべてのイベントを送信するか、特定のイベントタイプでフィルタできます。
イベントタグ
- AUTHENTICATION: ユーザー認証イベント(ログイン、ロック解除)。
- REGISTRATION: デバイス登録イベント(モバイル デバイス、セキュリティ キーのペア設定)。
- ACCESS_TOKEN: アクセス トークンの生成と使用に関するイベント。
- AUDIT: 監査ログイベント(管理アクション、構成の変更)。
イベント フィルタリングを構成する
特定のイベントタイプのみを送信するには、JSON 構成の eventType パラメータを変更します。
すべてのイベントを送信:
{ "eventType": "ALL" }認証イベントのみを送信:
{ "eventType": "AUTHENTICATION" }登録イベントのみを送信する:
{ "eventType": "REGISTRATION" }
オプション 2: Google Cloud Storage インテグレーションを構成する
GCS 統合の追加の前提条件
「始める前に」セクションに記載されている前提条件に加えて、次のものが必要です。
- Cloud Storage API が有効になっている GCP プロジェクト
- GCS バケットを作成および管理する権限
- GCS バケットの IAM ポリシーを管理する権限
- Cloud Run サービス、Pub/Sub トピック、Cloud Scheduler ジョブを作成する権限
- HYPR API 認証情報(API アクセスについては HYPR サポートにお問い合わせください)
Google Cloud Storage バケットを作成する
- Google Cloud Console に移動します。
- プロジェクトを選択するか、新しいプロジェクトを作成します。
- ナビゲーション メニューで、[Cloud Storage > バケット] に移動します。
- [バケットを作成] をクリックします。
次の構成情報を提供してください。
設定 値 バケットに名前を付ける グローバルに一意の名前( hypr-mfa-logsなど)を入力します。ロケーション タイプ ニーズに基づいて選択します(リージョン、デュアルリージョン、マルチリージョン)。 ロケーション ロケーションを選択します(例: us-central1)。ストレージ クラス Standard(頻繁にアクセスされるログにおすすめ) アクセス制御 均一(推奨) 保護ツール 省略可: オブジェクトのバージョニングまたは保持ポリシーを有効にする [作成] をクリックします。
HYPR API 認証情報を収集する
HYPR イベントデータにアクセスするための API 認証情報を取得するには、HYPR サポートにお問い合わせください。以下のものが必要になります。
- API ベース URL: HYPR インスタンスの URL(
https://your-tenant.hypr.comなど) - API トークン: API アクセス用の認証トークン
- RP アプリ ID: モニタリングするリライング パーティ アプリケーション ID
Cloud Run functions のサービス アカウントを作成する
Cloud Run 関数には、GCS バケットに書き込み、Pub/Sub によって呼び出される権限を持つサービス アカウントが必要です。
サービス アカウントの作成
- GCP Console で、[IAM と管理>サービス アカウント] に移動します。
- [サービス アカウントを作成] をクリックします。
- 次の構成の詳細を指定します。
- サービス アカウント名: 「
hypr-logs-collector-sa」と入力します。 - サービス アカウントの説明: 「
Service account for Cloud Run function to collect HYPR MFA logs」と入力します。
- サービス アカウント名: 「
- [作成して続行] をクリックします。
- [このサービス アカウントにプロジェクトへのアクセスを許可する] セクションで、次のロールを追加します。
- [ロールを選択] をクリックします。
- [ストレージ オブジェクト管理者] を検索して選択します。
- [+ 別のロールを追加] をクリックします。
- [Cloud Run 起動元] を検索して選択します。
- [+ 別のロールを追加] をクリックします。
- [Cloud Functions 起動元] を検索して選択します。
- [続行] をクリックします。
- [完了] をクリックします。
これらのロールは、次の目的で必要です。
- Storage オブジェクト管理者: ログを GCS バケットに書き込み、状態ファイルを管理する
- Cloud Run 起動元: Pub/Sub が関数を呼び出すことを許可する
- Cloud Functions 起動元: 関数の呼び出しを許可する
GCS バケットに対する IAM 権限を付与する
GCS バケットに対する書き込み権限をサービス アカウント(hypr-logs-collector-sa)に付与します。
- [Cloud Storage] > [バケット] に移動します。
- バケット名(例:
hypr-mfa-logs)をクリックします。 - [権限] タブに移動します。
- [アクセス権を付与] をクリックします。
- 次の構成の詳細を指定します。
- プリンシパルを追加: サービス アカウントのメールアドレス(例:
hypr-logs-collector-sa@PROJECT_ID.iam.gserviceaccount.com)を入力します。 - ロールを割り当てる: [Storage オブジェクト管理者] を選択します。
- プリンシパルを追加: サービス アカウントのメールアドレス(例:
- [保存] をクリックします。
Pub/Sub トピックの作成
Cloud Scheduler がパブリッシュし、Cloud Run functions がサブスクライブする Pub/Sub トピックを作成します。
- GCP Console で、[Pub/Sub> トピック] に移動します。
- [トピックを作成] をクリックします。
- 次の構成の詳細を指定します。
- トピック ID: 「
hypr-logs-trigger」と入力します。 - その他の設定はデフォルトのままにします。
- トピック ID: 「
- [作成] をクリックします。
ログを収集する Cloud Run 関数を作成する
Cloud Run functions は、Cloud Scheduler からの Pub/Sub メッセージによってトリガーされ、HYPR API からログを取得して GCS に書き込みます。
- GCP Console で、[Cloud Run] に移動します。
- [サービスを作成] をクリックします。
- [関数] を選択します(インライン エディタを使用して関数を作成します)。
[構成] セクションで、次の構成の詳細を指定します。
設定 値 サービス名 hypr-logs-collectorリージョン GCS バケットと一致するリージョンを選択します(例: us-central1)。ランタイム [Python 3.12] 以降を選択します。 [トリガー(省略可)] セクションで、次の操作を行います。
- [+ トリガーを追加] をクリックします。
- [Cloud Pub/Sub] を選択します。
- [Cloud Pub/Sub トピックを選択してください] で、Pub/Sub トピック(
hypr-logs-trigger)を選択します。 - [保存] をクリックします。
[認証] セクションで、次の操作を行います。
- [認証が必要] を選択します。
- Identity and Access Management(IAM)を確認します。
下にスクロールして、[コンテナ、ネットワーキング、セキュリティ] を開きます。
[セキュリティ] タブに移動します。
- サービス アカウント: サービス アカウントを選択します(
hypr-logs-collector-sa)。
- サービス アカウント: サービス アカウントを選択します(
[コンテナ] タブに移動します。
- [変数とシークレット] をクリックします。
- 環境変数ごとに [+ 変数を追加] をクリックします。
変数名 値の例 説明 GCS_BUCKEThypr-mfa-logsGCS バケット名 GCS_PREFIXhypr-eventsログファイルの接頭辞 STATE_KEYhypr-events/state.json状態ファイルのパス HYPR_API_URLhttps://your-tenant.hypr.comHYPR API のベース URL HYPR_API_TOKENyour-api-tokenHYPR API 認証トークン HYPR_RP_APP_IDyour-rp-app-idHYPR RP アプリケーション ID MAX_RECORDS1000実行あたりの最大レコード数 PAGE_SIZE100ページあたりのレコード数 LOOKBACK_HOURS24最初のルックバック期間 [変数とシークレット] セクションで、[リクエスト] まで下にスクロールします。
- リクエストのタイムアウト:
600秒(10 分)を入力します。
- リクエストのタイムアウト:
[設定] タブに移動します。
- [リソース] セクションで次の操作を行います。
- メモリ: 512 MiB 以上を選択します。
- CPU: [1] を選択します。
- [リソース] セクションで次の操作を行います。
[リビジョン スケーリング] セクションで、次の操作を行います。
- [インスタンスの最小数] に「
0」と入力します。 - インスタンスの最大数:
100と入力します(または、予想される負荷に基づいて調整します)。
- [インスタンスの最小数] に「
[作成] をクリックします。
サービスが作成されるまで待ちます(1 ~ 2 分)。
サービスが作成されると、インライン コードエディタが自動的に開きます。
関数コードを追加する
- [エントリ ポイント] フィールドに「main」と入力します。
インライン コードエディタで、次の 2 つのファイルを作成します。
- 最初のファイル: main.py:
import functions_framework from google.cloud import storage import json import os import urllib3 from datetime import datetime, timezone, timedelta import time import base64 # Initialize HTTP client with timeouts http = urllib3.PoolManager( timeout=urllib3.Timeout(connect=5.0, read=30.0), retries=False, ) # Initialize Storage client storage_client = storage.Client() # Environment variables GCS_BUCKET = os.environ.get('GCS_BUCKET') GCS_PREFIX = os.environ.get('GCS_PREFIX', 'hypr-events') STATE_KEY = os.environ.get('STATE_KEY', 'hypr-events/state.json') HYPR_API_URL = os.environ.get('HYPR_API_URL') HYPR_API_TOKEN = os.environ.get('HYPR_API_TOKEN') HYPR_RP_APP_ID = os.environ.get('HYPR_RP_APP_ID') MAX_RECORDS = int(os.environ.get('MAX_RECORDS', '1000')) PAGE_SIZE = int(os.environ.get('PAGE_SIZE', '100')) LOOKBACK_HOURS = int(os.environ.get('LOOKBACK_HOURS', '24')) def to_unix_millis(dt: datetime) -> int: """Convert datetime to Unix epoch milliseconds.""" if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) dt = dt.astimezone(timezone.utc) return int(dt.timestamp() * 1000) def parse_datetime(value: str) -> datetime: """Parse ISO datetime string to datetime object.""" if value.endswith("Z"): value = value[:-1] + "+00:00" return datetime.fromisoformat(value) @functions_framework.cloud_event def main(cloud_event): """ Cloud Run function triggered by Pub/Sub to fetch HYPR MFA logs and write to GCS. Args: cloud_event: CloudEvent object containing Pub/Sub message """ if not all([GCS_BUCKET, HYPR_API_URL, HYPR_API_TOKEN, HYPR_RP_APP_ID]): print('Error: Missing required environment variables') return try: # Get GCS bucket bucket = storage_client.bucket(GCS_BUCKET) # Load state state = load_state(bucket, STATE_KEY) # Determine time window now = datetime.now(timezone.utc) last_time = None if isinstance(state, dict) and state.get("last_event_time"): try: last_time = parse_datetime(state["last_event_time"]) # Overlap by 2 minutes to catch any delayed events last_time = last_time - timedelta(minutes=2) except Exception as e: print(f"Warning: Could not parse last_event_time: {e}") if last_time is None: last_time = now - timedelta(hours=LOOKBACK_HOURS) print(f"Fetching logs from {last_time.isoformat()} to {now.isoformat()}") # Convert to Unix milliseconds for HYPR API start_millis = to_unix_millis(last_time) end_millis = to_unix_millis(now) # Fetch logs records, newest_event_time = fetch_logs( api_url=HYPR_API_URL, api_token=HYPR_API_TOKEN, rp_app_id=HYPR_RP_APP_ID, start_time_ms=start_millis, end_time_ms=end_millis, page_size=PAGE_SIZE, max_records=MAX_RECORDS, ) if not records: print("No new log records found.") save_state(bucket, STATE_KEY, now.isoformat()) return # Write to GCS as NDJSON timestamp = now.strftime('%Y%m%d_%H%M%S') object_key = f"{GCS_PREFIX}/logs_{timestamp}.ndjson" blob = bucket.blob(object_key) ndjson = '\n'.join([json.dumps(record, ensure_ascii=False) for record in records]) + '\n' blob.upload_from_string(ndjson, content_type='application/x-ndjson') print(f"Wrote {len(records)} records to gs://{GCS_BUCKET}/{object_key}") # Update state with newest event time if newest_event_time: save_state(bucket, STATE_KEY, newest_event_time) else: save_state(bucket, STATE_KEY, now.isoformat()) print(f"Successfully processed {len(records)} records") except Exception as e: print(f'Error processing logs: {str(e)}') raise def load_state(bucket, key): """Load state from GCS.""" try: blob = bucket.blob(key) if blob.exists(): state_data = blob.download_as_text() return json.loads(state_data) except Exception as e: print(f"Warning: Could not load state: {e}") return {} def save_state(bucket, key, last_event_time_iso: str): """Save the last event timestamp to GCS state file.""" try: state = {'last_event_time': last_event_time_iso} blob = bucket.blob(key) blob.upload_from_string( json.dumps(state, indent=2), content_type='application/json' ) print(f"Saved state: last_event_time={last_event_time_iso}") except Exception as e: print(f"Warning: Could not save state: {e}") def fetch_logs(api_url: str, api_token: str, rp_app_id: str, start_time_ms: int, end_time_ms: int, page_size: int, max_records: int): """ Fetch logs from HYPR API with pagination and rate limiting. Args: api_url: HYPR API base URL api_token: HYPR API authentication token rp_app_id: HYPR RP application ID start_time_ms: Start time in Unix milliseconds end_time_ms: End time in Unix milliseconds page_size: Number of records per page max_records: Maximum total records to fetch Returns: Tuple of (records list, newest_event_time ISO string) """ # Clean up API URL base_url = api_url.rstrip('/') endpoint = f"{base_url}/rp/api/versioned/events" # Bearer token authentication headers = { 'Authorization': f'Bearer {api_token}', 'Accept': 'application/json', 'Content-Type': 'application/json', 'User-Agent': 'GoogleSecOps-HYPRCollector/1.0' } records = [] newest_time = None page_num = 0 backoff = 1.0 # Offset-based pagination start_index = 0 while True: page_num += 1 if len(records) >= max_records: print(f"Reached max_records limit ({max_records})") break # Build request parameters params = [] params.append(f"rpAppId={rp_app_id}") params.append(f"startDate={start_time_ms}") params.append(f"endDate={end_time_ms}") params.append(f"start={start_index}") params.append(f"limit={min(page_size, max_records - len(records))}") url = f"{endpoint}?{'&'.join(params)}" try: response = http.request('GET', url, headers=headers) # Handle rate limiting with exponential backoff if response.status == 429: retry_after = int(response.headers.get('Retry-After', str(int(backoff)))) print(f"Rate limited (429). Retrying after {retry_after}s...") time.sleep(retry_after) backoff = min(backoff * 2, 30.0) continue backoff = 1.0 if response.status != 200: print(f"HTTP Error: {response.status}") response_text = response.data.decode('utf-8') print(f"Response body: {response_text}") return [], None data = json.loads(response.data.decode('utf-8')) # Extract results page_results = data.get('data', []) if not page_results: print(f"No more results (empty page)") break print(f"Page {page_num}: Retrieved {len(page_results)} events") records.extend(page_results) # Track newest event time for event in page_results: try: # HYPR uses LOGGEDTIMEINUTC field with Unix milliseconds event_time_ms = event.get('LOGGEDTIMEINUTC') if event_time_ms: event_dt = datetime.fromtimestamp(event_time_ms / 1000, tz=timezone.utc) event_time = event_dt.isoformat() if newest_time is None or parse_datetime(event_time) > parse_datetime(newest_time): newest_time = event_time except Exception as e: print(f"Warning: Could not parse event time: {e}") # Check for more results current_size = data.get('size', 0) if current_size < page_size: print(f"Reached last page (size={current_size} < limit={page_size})") break start_index += current_size except Exception as e: print(f"Error fetching logs: {e}") return [], None print(f"Retrieved {len(records)} total records from {page_num} pages") return records, newest_time- 2 つ目のファイル: requirements.txt:
functions-framework==3.* google-cloud-storage==2.* urllib3>=2.0.0[デプロイ] をクリックして、関数を保存してデプロイします。
デプロイが完了するまで待ちます(2 ~ 3 分)。
Cloud Scheduler ジョブの作成
Cloud Scheduler は、定期的に Pub/Sub トピック(hypr-logs-trigger)にメッセージをパブリッシュし、Cloud Run functions の関数をトリガーします。
- GCP Console で、[Cloud Scheduler] に移動します。
- [ジョブを作成] をクリックします。
次の構成情報を提供してください。
設定 値 名前 hypr-logs-collector-hourlyリージョン Cloud Run functions と同じリージョンを選択する 周波数 0 * * * *(1 時間ごとに正時)タイムゾーン タイムゾーンを選択します(UTC を推奨)。 ターゲット タイプ Pub/Sub トピック Pub/Sub トピック( hypr-logs-trigger)を選択するメッセージ本文 {}(空の JSON オブジェクト)[作成] をクリックします。
スケジュールの頻度のオプション
ログの量とレイテンシの要件に基づいて頻度を選択します。
| 頻度 | CRON 式 | ユースケース |
|---|---|---|
| 5 分おき | */5 * * * * |
大容量、低レイテンシ |
| 15 分ごと | */15 * * * * |
検索量が普通 |
| 1 時間ごと | 0 * * * * |
標準(推奨) |
| 6 時間ごと | 0 */6 * * * |
少量、バッチ処理 |
| 毎日 | 0 0 * * * |
履歴データの収集 |
統合をテストする
- Cloud Scheduler コンソールで、ジョブ(
hypr-logs-collector-hourly)を見つけます。 - [強制実行] をクリックして、ジョブを手動でトリガーします。
- 数秒待ちます。
- Cloud Run > サービスに移動します。
- 関数名(
hypr-logs-collector)をクリックします。 - [Logs] タブをクリックします。
関数が正常に実行されたことを確認します。以下のものを探します。
Fetching logs from YYYY-MM-DDTHH:MM:SS+00:00 to YYYY-MM-DDTHH:MM:SS+00:00 Page 1: Retrieved X events Wrote X records to gs://bucket-name/prefix/logs_YYYYMMDD_HHMMSS.ndjson Successfully processed X records[Cloud Storage] > [バケット] に移動します。
バケット名(例:
hypr-mfa-logs)をクリックします。接頭辞フォルダ(
hypr-events/など)に移動します。現在のタイムスタンプで新しい
.ndjsonファイルが作成されたことを確認します。
ログにエラーが表示された場合:
- HTTP 401: 環境変数で API 認証情報を確認する
- HTTP 403: HYPR API トークンに必要な権限があり、RP アプリ ID が正しいことを確認する
- HTTP 429: レート制限 - 関数はバックオフで自動的に再試行されます
- 環境変数が不足している: 必要な変数がすべて設定されていることを確認します
Google SecOps サービス アカウントを取得する
Google SecOps は、一意のサービス アカウントを使用して GCS バケットからデータを読み取ります。このサービス アカウントにバケットへのアクセス権を付与する必要があります。
HYPR MFA のログを取り込むように Google SecOps でフィードを構成する
- [SIEM 設定] > [フィード] に移動します。
- [Add New Feed] をクリックします。
- [単一フィードを設定] をクリックします。
- [フィード名] フィールドに、フィードの名前を入力します(例:
HYPR MFA Logs from GCS)。 - [ソースタイプ] として [Google Cloud Storage V2] を選択します。
[ログタイプ] として [HYPR MFA] を選択します。
[サービス アカウントを取得する] をクリックします。一意のサービス アカウント メールアドレスが表示されます(例:)。
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.comこのメールアドレスをコピーして、次のステップで使用します。
[次へ] をクリックします。
次の入力パラメータの値を指定します。
ストレージ バケットの URL: 接頭辞パスを含む GCS バケット URI を入力します。
gs://hypr-mfa-logs/hypr-events/- 次のように置き換えます。
hypr-mfa-logs: GCS バケット名。hypr-events: ログが保存されるオプションの接頭辞/フォルダパス(ルートの場合は空のままにします)。
- 次のように置き換えます。
Source deletion option: 必要に応じて削除オプションを選択します。
- なし: 転送後にファイルを削除しません(テストにおすすめ)。
- 転送されたファイルを削除する: 転送が完了した後にファイルを削除します。
転送されたファイルと空のディレクトリを削除する: 転送が完了した後にファイルと空のディレクトリを削除します。
ファイルの最大経過日数: 指定した日数以内に変更されたファイルを含めます。デフォルトは 180 日です。
アセットの名前空間: アセットの名前空間。
Ingestion labels: このフィードのイベントに適用されるラベル。
[次へ] をクリックします。
[Finalize] 画面で新しいフィードの設定を確認し、[送信] をクリックします。
Google SecOps サービス アカウントに IAM 権限を付与する
Google SecOps サービス アカウントには、GCS バケットに対する Storage オブジェクト閲覧者ロールが必要です。
- [Cloud Storage] > [バケット] に移動します。
- バケット名(
hypr-mfa-logsなど)をクリックします。 - [権限] タブに移動します。
- [アクセス権を付与] をクリックします。
- 次の構成の詳細を指定します。
- プリンシパルを追加: Google SecOps サービス アカウントのメールアドレスを貼り付けます。
- ロールを割り当てる: [Storage オブジェクト閲覧者] を選択します。
- [保存] をクリックします。
UDM マッピング テーブル
| ログフィールド | UDM マッピング | ロジック |
|---|---|---|
| extensions.auth.type | 認証タイプ(SSO、MFA) | |
| metadata.event_type | イベントのタイプ(USER_LOGIN、NETWORK_CONNECTION) | |
| EVENTNAME | metadata.product_event_type | プロダクト固有のイベントタイプ |
| ID | metadata.product_log_id | プロダクト固有のログ ID |
| USERAGENT | network.http.parsed_user_agent | 解析された HTTP ユーザー エージェント |
| USERAGENT | network.http.user_agent | HTTP ユーザー エージェント文字列 |
| SESSIONID | network.session_id | セッション ID |
| DEVICEMODEL | principal.asset.hardware.model | アセットのハードウェア モデル |
| COMPANION,MACHINEDOMAIN | principal.asset.hostname | アセットのホスト名 |
| REMOTEIP | principal.asset.ip | アセットの IP アドレス |
| DEVICEID | principal.asset_id | アセットの一意の識別子 |
| COMPANION,MACHINEDOMAIN | principal.hostname | プリンシパルに関連付けられたホスト名 |
| REMOTEIP | principal.ip | プリンシパルに関連付けられた IP アドレス |
| DEVICEOS | principal.platform | プラットフォーム(例: WINDOWS、LINUX) |
| DEVICEOSVERSION | principal.platform_version | プラットフォームのバージョン |
| ISSUCCESSFUL | security_result.action | セキュリティ システムによって実行されたアクション(例: ALLOW、BLOCK) |
| メッセージ | security_result.description | セキュリティ結果の説明 |
| MACHINEUSERNAME | target.user.user_display_name | ユーザーの表示名 |
| FIDOUSER | target.user.userid | ユーザー ID |
| metadata.product_name | 商品名 | |
| metadata.vendor_name | ベンダー/会社名 |
さらにサポートが必要な場合 コミュニティ メンバーや Google SecOps のプロフェッショナルから回答を得ることができます。