HYPR MFA ログを収集する

以下でサポートされています。

このドキュメントでは、Webhook または Google Cloud Storage V2 を使用して HYPR MFA ログを Google Security Operations に取り込む方法について説明します。

HYPR MFA は、FIDO2 パスキー、生体認証、モバイル開始ログインを使用してフィッシング耐性のある認証を提供するパスワードレスの多要素認証ソリューションです。HYPR は、従来のパスワードを安全な公開鍵暗号に置き換えて、認証情報ベースの攻撃を排除し、ワークステーション、ウェブ アプリケーション、クラウド サービス全体でユーザー認証を合理化します。

始める前に

次の前提条件を満たしていることを確認します。

  • Google SecOps インスタンス
  • HYPR Control Center への管理者アクセス
  • モニタリングする RP アプリケーションのカスタム イベントフックを有効にするには、HYPR サポートにお問い合わせください。

収集方法の違い

HYPR MFA は、Google Security Operations にログを送信する 2 つの方法をサポートしています。

  • Webhook(推奨): HYPR は、カスタム イベントフックを介してイベントをリアルタイムで Google Security Operations に送信します。この方法では、イベントがすぐに配信され、追加のインフラストラクチャは必要ありません。
  • Google Cloud Storage: HYPR イベントは API 経由で収集され、GCS に保存されてから、Google Security Operations に取り込まれます。この方法では、バッチ処理と履歴データの保持が可能です。

要件に最適な方法を選択します。

機能 Webhook Google Cloud Storage
レイテンシ リアルタイム(秒) バッチ(数分から数時間)
インフラストラクチャ 必要なし Cloud Run 関数を含む GCP プロジェクト
過去のデータ イベント ストリームに限定 GCS での完全な保持
設定の難易度 シンプル
費用 最小 GCP のコンピューティング費用とストレージ費用

オプション 1: Webhook 統合を構成する

Google SecOps で Webhook フィードを作成する

フィードを作成する

  1. [SIEM 設定] > [フィード] に移動します。
  2. [Add New Feed] をクリックします。
  3. 次のページで [単一のフィードを設定] をクリックします。
  4. [フィード名] フィールドに、フィードの名前を入力します(例: HYPR MFA Events)。
  5. [Source type] として [Webhook] を選択します。
  6. [ログタイプ] として [HYPR MFA] を選択します。
  7. [次へ] をクリックします。
  8. 次の入力パラメータの値を指定します。
    • Split delimiter(省略可): 空白のままにします。各 Webhook リクエストには、単一の JSON イベントが含まれます。
    • アセットの名前空間: アセットの名前空間
    • Ingestion labels: このフィードのイベントに適用されるラベル。
  9. [次へ] をクリックします。
  10. [Finalize] 画面で新しいフィードの設定を確認し、[送信] をクリックします。

秘密鍵を生成して保存する

フィードを作成したら、認証用のシークレット キーを生成する必要があります。

  1. フィードの詳細ページで、[シークレット キーを生成] をクリックします。
  2. ダイアログに秘密鍵が表示されます。
  3. 秘密鍵をコピーして安全に保存します。

フィード エンドポイントの URL を取得する

  1. フィードの [詳細] タブに移動します。
  2. [エンドポイント情報] セクションで、[フィード エンドポイント URL] をコピーします。
  3. URL の形式は次のとおりです。

    https://malachiteingestion-pa.googleapis.com/v2/unstructuredlogentries:batchCreate
    

    または

    https://<REGION>-malachiteingestion-pa.googleapis.com/v2/unstructuredlogentries:batchCreate
    
  4. 次の手順で使用するため、この URL を保存します。

  5. [完了] をクリックします。

Google Cloud API キーを作成する

Chronicle では、認証に API キーが必要です。Google Cloud コンソールで制限付き API キーを作成します。

API キーを作成する

  1. Google Cloud コンソールの [認証情報] ページに移動します。
  2. プロジェクト(Chronicle インスタンスに関連付けられているプロジェクト)を選択します。
  3. [認証情報を作成> API キー] をクリックします。
  4. API キーが作成され、ダイアログに表示されます。
  5. [API キーを編集] をクリックして、キーを制限します。

API キーを制限する

  1. [API キー] 設定ページで、次の操作を行います。
    • 名前: わかりやすい名前を入力します(例: Chronicle Webhook API Key)。
  2. [API の制限] で次の操作を行います。
    1. [キーを制限] を選択します。
    2. [API を選択] プルダウンで、[Google SecOps API](または [Chronicle API])を検索して選択します。
  3. [保存] をクリックします。
  4. ページ上部の [API キー] フィールドから API キーの値をコピーします。
  5. API キーを安全に保存します。

HYPR MFA カスタム イベントフックを構成する

ヘッダーを含む Webhook URL を作成する

HYPR は認証用のカスタム ヘッダーをサポートしています。セキュリティを強化するには、ヘッダー認証方法を使用します。

  • エンドポイント URL(パラメータなし):

    <ENDPOINT_URL>
    
  • ヘッダー:

    x-goog-chronicle-auth: <API_KEY>
    x-chronicle-auth: <SECRET_KEY>
    
    • 次のように置き換えます。
      • <ENDPOINT_URL>: 前の手順で取得したフィード エンドポイント URL。
      • <API_KEY>: 作成した Google Cloud API キー。
      • <SECRET_KEY>: Chronicle フィードの作成で取得した秘密鍵。

カスタム イベントフックの JSON 構成を準備する

  • HYPR カスタム イベントフックは JSON を使用して構成されます。次の JSON 構成を準備し、プレースホルダ値を置き換えます。

    {
      "name": "Chronicle SIEM Integration",
      "eventType": "ALL",
      "invocationEndpoint": "<ENDPOINT_URL>",
      "httpMethod": "POST",
      "authType": "API_KEY",
      "authParams": {
        "apiKeyAuthParameters": {
          "apiKeyName": "x-goog-chronicle-auth",
          "apiKeyValue": "<API_KEY>"
        },
        "invocationHttpParameters": {
          "headerParameters": [
            {
              "key": "Content-Type",
              "value": "application/json",
              "isValueSecret": false
            },
            {
              "key": "x-chronicle-auth",
              "value": "<SECRET_KEY>",
              "isValueSecret": true
            }
          ]
        }
      }
    }
    
    • 次のように置き換えます。

      • <ENDPOINT_URL>: Chronicle フィード エンドポイントの URL。
      • <API_KEY>: Google Cloud API キー。
      • <SECRET_KEY>: Chronicle のシークレット キー。
    • 構成パラメータ:

    • name: イベントフックのわかりやすい名前(例: Chronicle SIEM Integration)。

    • eventType: すべての HYPR イベントを送信する場合は ALL に設定します。特定のイベントタグ(AUTHENTICATIONREGISTRATIONACCESS_TOKEN など)を指定することもできます。

    • invocationEndpoint: Chronicle フィード エンドポイントの URL。

    • httpMethod: POST に設定します。

    • authType: API キー認証の場合は API_KEY に設定します。

    • apiKeyName: API キーのヘッダー名(x-goog-chronicle-auth)。

    • apiKeyValue: Google Cloud API キーの値。

    • headerParameters: Content-Type: application/jsonx-chronicle-auth ヘッダーの Chronicle シークレット キーを含む追加のヘッダー。

HYPR Control Center でカスタム イベントフックを作成する

  1. 管理者として HYPR Control Center にログインします。
  2. 左側のナビゲーション メニューで、[Integrations] をクリックします。
  3. [統合] ページで、[新しい統合を追加] をクリックします。
  4. HYPR Control Center に、利用可能な統合が表示されます。
  5. [カスタム イベント] の [イベントフック] の下のタイルをクリックします。
  6. [Add New Event Hook] をクリックします。
  7. [新しいイベントフックを追加] ダイアログで、準備した JSON コンテンツをテキスト フィールドに貼り付けます。
  8. [イベントフックを追加] をクリックします。
  9. HYPR Control Center が [イベントフック] ページに戻ります。

これで、カスタム イベントフックが構成され、Google SecOps へのイベントの送信が開始されます。

Webhook が機能していることを確認する

HYPR Control Center のイベントフックのステータスを確認する

  1. HYPR Control Center にログインします。
  2. [インテグレーション] に移動します。
  3. [カスタム イベント] 統合をクリックします。
  4. [イベントフック] テーブルで、イベントフックがリストされていることを確認します。
  5. イベントフック名をクリックして詳細を表示します。
  6. 構成が設定と一致していることを確認します。

Chronicle フィードのステータスを確認する

  1. Chronicle の [SIEM 設定] > [フィード] に移動します。
  2. Webhook フィードを見つけます。
  3. [ステータス] 列を確認します([有効] になっているはずです)。
  4. [Events received] のカウントを確認します(増加しているはずです)。
  5. [Last succeeded on](最後に成功した日時)のタイムスタンプを確認します(最新である必要があります)。

Chronicle でログを確認する

  1. [検索> UDM 検索] に移動します。
  2. 次のクエリを使用してください。

    metadata.vendor_name = "HYPR" AND metadata.product_name = "MFA"
    
  3. 期間を過去 1 時間に調整します。

  4. 結果にイベントが表示されることを確認します。

認証方法のリファレンス

HYPR カスタム イベントフックは、複数の認証方法をサポートしています。Chronicle で推奨される方法は、カスタム ヘッダーを使用した API キー認証です。

  • 構成:

    {
      "authType": "API_KEY",
      "authParams": {
        "apiKeyAuthParameters": {
          "apiKeyName": "x-goog-chronicle-auth",
          "apiKeyValue": "<API_KEY>"
        },
        "invocationHttpParameters": {
          "headerParameters": [
            {
              "key": "Content-Type",
              "value": "application/json",
              "isValueSecret": false
            },
            {
              "key": "x-chronicle-auth",
              "value": "<SECRET_KEY>",
              "isValueSecret": true
            }
          ]
        }
      }
    }
    
  • メリット:

    • ヘッダーで送信される API キーとシークレット(URL パラメータよりも安全)。
    • 複数の認証ヘッダーをサポートします。
    • ヘッダーがウェブサーバーのアクセスログに記録されない。

基本認証

  • 構成:

    {
      "authType": "BASIC",
      "authParams": {
        "basicAuthParameters": {
          "username": "your-username",
          "password": "your-password"
        },
        "invocationHttpParameters": {
          "headerParameters": [
            {
              "key": "Content-Type",
              "value": "application/json",
              "isValueSecret": false
            }
          ]
        }
      }
    }
    
    • ユースケース: ターゲット システムで HTTP 基本認証が必要な場合。

OAuth 2.0 クライアント認証情報

  • 構成:

    {
      "authType": "OAUTH_CLIENT_CREDENTIALS",
      "authParams": {
        "oauthParameters": {
          "clientParameters": {
            "clientId": "your-client-id",
            "clientSecret": "your-client-secret"
          },
          "authorizationEndpoint": "https://login.example.com/oauth2/v2.0/token",
          "httpMethod": "POST",
          "oauthHttpParameters": {
            "bodyParameters": [
              {
                "key": "scope",
                "value": "api://your-api/.default",
                "isValueSecret": false
              },
              {
                "key": "grant_type",
                "value": "client_credentials",
                "isValueSecret": false
              }
            ]
          }
        },
        "invocationHttpParameters": {
          "headerParameters": [
            {
              "key": "Content-Type",
              "value": "application/json",
              "isValueSecret": false
            }
          ]
        }
      }
    }
    
    • ユースケース: ターゲット システムで OAuth 2.0 認証が必要な場合。

イベントタイプとフィルタリング

HYPR イベントは eventTags パラメータを使用してグループ化されます。カスタム イベントフックを構成して、すべてのイベントを送信するか、特定のイベントタイプでフィルタできます。

イベントタグ

  • AUTHENTICATION: ユーザー認証イベント(ログイン、ロック解除)。
  • REGISTRATION: デバイス登録イベント(モバイル デバイス、セキュリティ キーのペア設定)。
  • ACCESS_TOKEN: アクセス トークンの生成と使用に関するイベント。
  • AUDIT: 監査ログイベント(管理アクション、構成の変更)。

イベント フィルタリングを構成する

特定のイベントタイプのみを送信するには、JSON 構成の eventType パラメータを変更します。

  • すべてのイベントを送信:

    {
      "eventType": "ALL"
    }
    
  • 認証イベントのみを送信:

    {
      "eventType": "AUTHENTICATION"
    }
    
  • 登録イベントのみを送信する:

    {
      "eventType": "REGISTRATION"
    }
    

オプション 2: Google Cloud Storage インテグレーションを構成する

GCS 統合の追加の前提条件

「始める前に」セクションに記載されている前提条件に加えて、次のものが必要です。

  • Cloud Storage API が有効になっている GCP プロジェクト
  • GCS バケットを作成および管理する権限
  • GCS バケットの IAM ポリシーを管理する権限
  • Cloud Run サービス、Pub/Sub トピック、Cloud Scheduler ジョブを作成する権限
  • HYPR API 認証情報(API アクセスについては HYPR サポートにお問い合わせください)

Google Cloud Storage バケットを作成する

  1. Google Cloud Console に移動します。
  2. プロジェクトを選択するか、新しいプロジェクトを作成します。
  3. ナビゲーション メニューで、[Cloud Storage > バケット] に移動します。
  4. [バケットを作成] をクリックします。
  5. 次の構成情報を提供してください。

    設定
    バケットに名前を付ける グローバルに一意の名前(hypr-mfa-logs など)を入力します。
    ロケーション タイプ ニーズに基づいて選択します(リージョン、デュアルリージョン、マルチリージョン)。
    ロケーション ロケーションを選択します(例: us-central1)。
    ストレージ クラス Standard(頻繁にアクセスされるログにおすすめ)
    アクセス制御 均一(推奨)
    保護ツール 省略可: オブジェクトのバージョニングまたは保持ポリシーを有効にする
  6. [作成] をクリックします。

HYPR API 認証情報を収集する

HYPR イベントデータにアクセスするための API 認証情報を取得するには、HYPR サポートにお問い合わせください。以下のものが必要になります。

  • API ベース URL: HYPR インスタンスの URL(https://your-tenant.hypr.com など)
  • API トークン: API アクセス用の認証トークン
  • RP アプリ ID: モニタリングするリライング パーティ アプリケーション ID

Cloud Run functions のサービス アカウントを作成する

Cloud Run 関数には、GCS バケットに書き込み、Pub/Sub によって呼び出される権限を持つサービス アカウントが必要です。

サービス アカウントの作成

  1. GCP Console で、[IAM と管理>サービス アカウント] に移動します。
  2. [サービス アカウントを作成] をクリックします。
  3. 次の構成の詳細を指定します。
    • サービス アカウント名: 「hypr-logs-collector-sa」と入力します。
    • サービス アカウントの説明: 「Service account for Cloud Run function to collect HYPR MFA logs」と入力します。
  4. [作成して続行] をクリックします。
  5. [このサービス アカウントにプロジェクトへのアクセスを許可する] セクションで、次のロールを追加します。
    1. [ロールを選択] をクリックします。
    2. [ストレージ オブジェクト管理者] を検索して選択します。
    3. [+ 別のロールを追加] をクリックします。
    4. [Cloud Run 起動元] を検索して選択します。
    5. [+ 別のロールを追加] をクリックします。
    6. [Cloud Functions 起動元] を検索して選択します。
  6. [続行] をクリックします。
  7. [完了] をクリックします。

これらのロールは、次の目的で必要です。

  • Storage オブジェクト管理者: ログを GCS バケットに書き込み、状態ファイルを管理する
  • Cloud Run 起動元: Pub/Sub が関数を呼び出すことを許可する
  • Cloud Functions 起動元: 関数の呼び出しを許可する

GCS バケットに対する IAM 権限を付与する

GCS バケットに対する書き込み権限をサービス アカウント(hypr-logs-collector-sa)に付与します。

  1. [Cloud Storage] > [バケット] に移動します。
  2. バケット名(例: hypr-mfa-logs)をクリックします。
  3. [権限] タブに移動します。
  4. [アクセス権を付与] をクリックします。
  5. 次の構成の詳細を指定します。
    • プリンシパルを追加: サービス アカウントのメールアドレス(例: hypr-logs-collector-sa@PROJECT_ID.iam.gserviceaccount.com)を入力します。
    • ロールを割り当てる: [Storage オブジェクト管理者] を選択します。
  6. [保存] をクリックします。

Pub/Sub トピックの作成

Cloud Scheduler がパブリッシュし、Cloud Run functions がサブスクライブする Pub/Sub トピックを作成します。

  1. GCP Console で、[Pub/Sub> トピック] に移動します。
  2. [トピックを作成] をクリックします。
  3. 次の構成の詳細を指定します。
    • トピック ID: 「hypr-logs-trigger」と入力します。
    • その他の設定はデフォルトのままにします。
  4. [作成] をクリックします。

ログを収集する Cloud Run 関数を作成する

Cloud Run functions は、Cloud Scheduler からの Pub/Sub メッセージによってトリガーされ、HYPR API からログを取得して GCS に書き込みます。

  1. GCP Console で、[Cloud Run] に移動します。
  2. [サービスを作成] をクリックします。
  3. [関数] を選択します(インライン エディタを使用して関数を作成します)。
  4. [構成] セクションで、次の構成の詳細を指定します。

    設定
    サービス名 hypr-logs-collector
    リージョン GCS バケットと一致するリージョンを選択します(例: us-central1)。
    ランタイム [Python 3.12] 以降を選択します。
  5. [トリガー(省略可)] セクションで、次の操作を行います。

    1. [+ トリガーを追加] をクリックします。
    2. [Cloud Pub/Sub] を選択します。
    3. [Cloud Pub/Sub トピックを選択してください] で、Pub/Sub トピック(hypr-logs-trigger)を選択します。
    4. [保存] をクリックします。
  6. [認証] セクションで、次の操作を行います。

    1. [認証が必要] を選択します。
    2. Identity and Access Management(IAM)を確認します。
  7. 下にスクロールして、[コンテナ、ネットワーキング、セキュリティ] を開きます。

  8. [セキュリティ] タブに移動します。

    • サービス アカウント: サービス アカウントを選択します(hypr-logs-collector-sa)。
  9. [コンテナ] タブに移動します。

    1. [変数とシークレット] をクリックします。
    2. 環境変数ごとに [+ 変数を追加] をクリックします。
    変数名 値の例 説明
    GCS_BUCKET hypr-mfa-logs GCS バケット名
    GCS_PREFIX hypr-events ログファイルの接頭辞
    STATE_KEY hypr-events/state.json 状態ファイルのパス
    HYPR_API_URL https://your-tenant.hypr.com HYPR API のベース URL
    HYPR_API_TOKEN your-api-token HYPR API 認証トークン
    HYPR_RP_APP_ID your-rp-app-id HYPR RP アプリケーション ID
    MAX_RECORDS 1000 実行あたりの最大レコード数
    PAGE_SIZE 100 ページあたりのレコード数
    LOOKBACK_HOURS 24 最初のルックバック期間
  10. [変数とシークレット] セクションで、[リクエスト] まで下にスクロールします。

    • リクエストのタイムアウト: 600 秒(10 分)を入力します。
  11. [設定] タブに移動します。

    • [リソース] セクションで次の操作を行います。
      • メモリ: 512 MiB 以上を選択します。
      • CPU: [1] を選択します。
  12. [リビジョン スケーリング] セクションで、次の操作を行います。

    • [インスタンスの最小数] に「0」と入力します。
    • インスタンスの最大数: 100 と入力します(または、予想される負荷に基づいて調整します)。
  13. [作成] をクリックします。

  14. サービスが作成されるまで待ちます(1 ~ 2 分)。

  15. サービスが作成されると、インライン コードエディタが自動的に開きます。

関数コードを追加する

  1. [エントリ ポイント] フィールドに「main」と入力します。
  2. インライン コードエディタで、次の 2 つのファイルを作成します。

    • 最初のファイル: main.py:
    import functions_framework
    from google.cloud import storage
    import json
    import os
    import urllib3
    from datetime import datetime, timezone, timedelta
    import time
    import base64
    
    # Initialize HTTP client with timeouts
    http = urllib3.PoolManager(
        timeout=urllib3.Timeout(connect=5.0, read=30.0),
        retries=False,
    )
    
    # Initialize Storage client
    storage_client = storage.Client()
    
    # Environment variables
    GCS_BUCKET = os.environ.get('GCS_BUCKET')
    GCS_PREFIX = os.environ.get('GCS_PREFIX', 'hypr-events')
    STATE_KEY = os.environ.get('STATE_KEY', 'hypr-events/state.json')
    HYPR_API_URL = os.environ.get('HYPR_API_URL')
    HYPR_API_TOKEN = os.environ.get('HYPR_API_TOKEN')
    HYPR_RP_APP_ID = os.environ.get('HYPR_RP_APP_ID')
    MAX_RECORDS = int(os.environ.get('MAX_RECORDS', '1000'))
    PAGE_SIZE = int(os.environ.get('PAGE_SIZE', '100'))
    LOOKBACK_HOURS = int(os.environ.get('LOOKBACK_HOURS', '24'))
    
    def to_unix_millis(dt: datetime) -> int:
        """Convert datetime to Unix epoch milliseconds."""
        if dt.tzinfo is None:
            dt = dt.replace(tzinfo=timezone.utc)
        dt = dt.astimezone(timezone.utc)
        return int(dt.timestamp() * 1000)
    
    def parse_datetime(value: str) -> datetime:
        """Parse ISO datetime string to datetime object."""
        if value.endswith("Z"):
            value = value[:-1] + "+00:00"
        return datetime.fromisoformat(value)
    
    @functions_framework.cloud_event
    def main(cloud_event):
        """
        Cloud Run function triggered by Pub/Sub to fetch HYPR MFA logs and write to GCS.
    
        Args:
            cloud_event: CloudEvent object containing Pub/Sub message
        """
    
        if not all([GCS_BUCKET, HYPR_API_URL, HYPR_API_TOKEN, HYPR_RP_APP_ID]):
            print('Error: Missing required environment variables')
            return
    
        try:
            # Get GCS bucket
            bucket = storage_client.bucket(GCS_BUCKET)
    
            # Load state
            state = load_state(bucket, STATE_KEY)
    
            # Determine time window
            now = datetime.now(timezone.utc)
            last_time = None
    
            if isinstance(state, dict) and state.get("last_event_time"):
                try:
                    last_time = parse_datetime(state["last_event_time"])
                    # Overlap by 2 minutes to catch any delayed events
                    last_time = last_time - timedelta(minutes=2)
                except Exception as e:
                    print(f"Warning: Could not parse last_event_time: {e}")
    
            if last_time is None:
                last_time = now - timedelta(hours=LOOKBACK_HOURS)
    
            print(f"Fetching logs from {last_time.isoformat()} to {now.isoformat()}")
    
            # Convert to Unix milliseconds for HYPR API
            start_millis = to_unix_millis(last_time)
            end_millis = to_unix_millis(now)
    
            # Fetch logs
            records, newest_event_time = fetch_logs(
                api_url=HYPR_API_URL,
                api_token=HYPR_API_TOKEN,
                rp_app_id=HYPR_RP_APP_ID,
                start_time_ms=start_millis,
                end_time_ms=end_millis,
                page_size=PAGE_SIZE,
                max_records=MAX_RECORDS,
            )
    
            if not records:
                print("No new log records found.")
                save_state(bucket, STATE_KEY, now.isoformat())
                return
    
            # Write to GCS as NDJSON
            timestamp = now.strftime('%Y%m%d_%H%M%S')
            object_key = f"{GCS_PREFIX}/logs_{timestamp}.ndjson"
            blob = bucket.blob(object_key)
    
            ndjson = '\n'.join([json.dumps(record, ensure_ascii=False) for record in records]) + '\n'
            blob.upload_from_string(ndjson, content_type='application/x-ndjson')
    
            print(f"Wrote {len(records)} records to gs://{GCS_BUCKET}/{object_key}")
    
            # Update state with newest event time
            if newest_event_time:
                save_state(bucket, STATE_KEY, newest_event_time)
            else:
                save_state(bucket, STATE_KEY, now.isoformat())
    
            print(f"Successfully processed {len(records)} records")
    
        except Exception as e:
            print(f'Error processing logs: {str(e)}')
            raise
    
    def load_state(bucket, key):
        """Load state from GCS."""
        try:
            blob = bucket.blob(key)
            if blob.exists():
                state_data = blob.download_as_text()
                return json.loads(state_data)
        except Exception as e:
            print(f"Warning: Could not load state: {e}")
    
        return {}
    
    def save_state(bucket, key, last_event_time_iso: str):
        """Save the last event timestamp to GCS state file."""
        try:
            state = {'last_event_time': last_event_time_iso}
            blob = bucket.blob(key)
            blob.upload_from_string(
                json.dumps(state, indent=2),
                content_type='application/json'
            )
            print(f"Saved state: last_event_time={last_event_time_iso}")
        except Exception as e:
            print(f"Warning: Could not save state: {e}")
    
    def fetch_logs(api_url: str, api_token: str, rp_app_id: str, start_time_ms: int, end_time_ms: int, page_size: int, max_records: int):
        """
        Fetch logs from HYPR API with pagination and rate limiting.
    
        Args:
            api_url: HYPR API base URL
            api_token: HYPR API authentication token
            rp_app_id: HYPR RP application ID
            start_time_ms: Start time in Unix milliseconds
            end_time_ms: End time in Unix milliseconds
            page_size: Number of records per page
            max_records: Maximum total records to fetch
    
        Returns:
            Tuple of (records list, newest_event_time ISO string)
        """
        # Clean up API URL
        base_url = api_url.rstrip('/')
    
        endpoint = f"{base_url}/rp/api/versioned/events"
    
        # Bearer token authentication
        headers = {
            'Authorization': f'Bearer {api_token}',
            'Accept': 'application/json',
            'Content-Type': 'application/json',
            'User-Agent': 'GoogleSecOps-HYPRCollector/1.0'
        }
    
        records = []
        newest_time = None
        page_num = 0
        backoff = 1.0
    
        # Offset-based pagination
        start_index = 0
    
        while True:
            page_num += 1
    
            if len(records) >= max_records:
                print(f"Reached max_records limit ({max_records})")
                break
    
            # Build request parameters
            params = []
            params.append(f"rpAppId={rp_app_id}")
            params.append(f"startDate={start_time_ms}")
            params.append(f"endDate={end_time_ms}")
            params.append(f"start={start_index}")
            params.append(f"limit={min(page_size, max_records - len(records))}")
            url = f"{endpoint}?{'&'.join(params)}"
    
            try:
                response = http.request('GET', url, headers=headers)
    
                # Handle rate limiting with exponential backoff
                if response.status == 429:
                    retry_after = int(response.headers.get('Retry-After', str(int(backoff))))
                    print(f"Rate limited (429). Retrying after {retry_after}s...")
                    time.sleep(retry_after)
                    backoff = min(backoff * 2, 30.0)
                    continue
    
                backoff = 1.0
    
                if response.status != 200:
                    print(f"HTTP Error: {response.status}")
                    response_text = response.data.decode('utf-8')
                    print(f"Response body: {response_text}")
                    return [], None
    
                data = json.loads(response.data.decode('utf-8'))
    
                # Extract results
                page_results = data.get('data', [])
    
                if not page_results:
                    print(f"No more results (empty page)")
                    break
    
                print(f"Page {page_num}: Retrieved {len(page_results)} events")
                records.extend(page_results)
    
                # Track newest event time
                for event in page_results:
                    try:
                        # HYPR uses LOGGEDTIMEINUTC field with Unix milliseconds
                        event_time_ms = event.get('LOGGEDTIMEINUTC')
                        if event_time_ms:
                            event_dt = datetime.fromtimestamp(event_time_ms / 1000, tz=timezone.utc)
                            event_time = event_dt.isoformat()
                            if newest_time is None or parse_datetime(event_time) > parse_datetime(newest_time):
                                newest_time = event_time
                    except Exception as e:
                        print(f"Warning: Could not parse event time: {e}")
    
                # Check for more results
                current_size = data.get('size', 0)
                if current_size < page_size:
                    print(f"Reached last page (size={current_size} < limit={page_size})")
                    break
    
                start_index += current_size
    
            except Exception as e:
                print(f"Error fetching logs: {e}")
                return [], None
    
        print(f"Retrieved {len(records)} total records from {page_num} pages")
        return records, newest_time
    
    • 2 つ目のファイル: requirements.txt:
    functions-framework==3.*
    google-cloud-storage==2.*
    urllib3>=2.0.0
    
  3. [デプロイ] をクリックして、関数を保存してデプロイします。

  4. デプロイが完了するまで待ちます(2 ~ 3 分)。

Cloud Scheduler ジョブの作成

Cloud Scheduler は、定期的に Pub/Sub トピック(hypr-logs-trigger)にメッセージをパブリッシュし、Cloud Run functions の関数をトリガーします。

  1. GCP Console で、[Cloud Scheduler] に移動します。
  2. [ジョブを作成] をクリックします。
  3. 次の構成情報を提供してください。

    設定
    名前 hypr-logs-collector-hourly
    リージョン Cloud Run functions と同じリージョンを選択する
    周波数 0 * * * *(1 時間ごとに正時)
    タイムゾーン タイムゾーンを選択します(UTC を推奨)。
    ターゲット タイプ Pub/Sub
    トピック Pub/Sub トピック(hypr-logs-trigger)を選択する
    メッセージ本文 {}(空の JSON オブジェクト)
  4. [作成] をクリックします。

スケジュールの頻度のオプション

ログの量とレイテンシの要件に基づいて頻度を選択します。

頻度 CRON 式 ユースケース
5 分おき */5 * * * * 大容量、低レイテンシ
15 分ごと */15 * * * * 検索量が普通
1 時間ごと 0 * * * * 標準(推奨)
6 時間ごと 0 */6 * * * 少量、バッチ処理
毎日 0 0 * * * 履歴データの収集

統合をテストする

  1. Cloud Scheduler コンソールで、ジョブ(hypr-logs-collector-hourly)を見つけます。
  2. [強制実行] をクリックして、ジョブを手動でトリガーします。
  3. 数秒待ちます。
  4. Cloud Run > サービスに移動します。
  5. 関数名(hypr-logs-collector)をクリックします。
  6. [Logs] タブをクリックします。
  7. 関数が正常に実行されたことを確認します。以下のものを探します。

    Fetching logs from YYYY-MM-DDTHH:MM:SS+00:00 to YYYY-MM-DDTHH:MM:SS+00:00
    Page 1: Retrieved X events
    Wrote X records to gs://bucket-name/prefix/logs_YYYYMMDD_HHMMSS.ndjson
    Successfully processed X records
    
  8. [Cloud Storage] > [バケット] に移動します。

  9. バケット名(例: hypr-mfa-logs)をクリックします。

  10. 接頭辞フォルダ(hypr-events/ など)に移動します。

  11. 現在のタイムスタンプで新しい .ndjson ファイルが作成されたことを確認します。

ログにエラーが表示された場合:

  • HTTP 401: 環境変数で API 認証情報を確認する
  • HTTP 403: HYPR API トークンに必要な権限があり、RP アプリ ID が正しいことを確認する
  • HTTP 429: レート制限 - 関数はバックオフで自動的に再試行されます
  • 環境変数が不足している: 必要な変数がすべて設定されていることを確認します

Google SecOps サービス アカウントを取得する

Google SecOps は、一意のサービス アカウントを使用して GCS バケットからデータを読み取ります。このサービス アカウントにバケットへのアクセス権を付与する必要があります。

HYPR MFA のログを取り込むように Google SecOps でフィードを構成する

  1. [SIEM 設定] > [フィード] に移動します。
  2. [Add New Feed] をクリックします。
  3. [単一フィードを設定] をクリックします。
  4. [フィード名] フィールドに、フィードの名前を入力します(例: HYPR MFA Logs from GCS)。
  5. [ソースタイプ] として [Google Cloud Storage V2] を選択します。
  6. [ログタイプ] として [HYPR MFA] を選択します。

  7. [サービス アカウントを取得する] をクリックします。一意のサービス アカウント メールアドレスが表示されます(例:)。

    chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com
    
  8. このメールアドレスをコピーして、次のステップで使用します。

  9. [次へ] をクリックします。

  10. 次の入力パラメータの値を指定します。

    • ストレージ バケットの URL: 接頭辞パスを含む GCS バケット URI を入力します。

      gs://hypr-mfa-logs/hypr-events/
      
      • 次のように置き換えます。
        • hypr-mfa-logs: GCS バケット名。
        • hypr-events: ログが保存されるオプションの接頭辞/フォルダパス(ルートの場合は空のままにします)。
    • Source deletion option: 必要に応じて削除オプションを選択します。

      • なし: 転送後にファイルを削除しません(テストにおすすめ)。
      • 転送されたファイルを削除する: 転送が完了した後にファイルを削除します。
      • 転送されたファイルと空のディレクトリを削除する: 転送が完了した後にファイルと空のディレクトリを削除します。

    • ファイルの最大経過日数: 指定した日数以内に変更されたファイルを含めます。デフォルトは 180 日です。

    • アセットの名前空間: アセットの名前空間

    • Ingestion labels: このフィードのイベントに適用されるラベル。

  11. [次へ] をクリックします。

  12. [Finalize] 画面で新しいフィードの設定を確認し、[送信] をクリックします。

Google SecOps サービス アカウントに IAM 権限を付与する

Google SecOps サービス アカウントには、GCS バケットに対する Storage オブジェクト閲覧者ロールが必要です。

  1. [Cloud Storage] > [バケット] に移動します。
  2. バケット名(hypr-mfa-logs など)をクリックします。
  3. [権限] タブに移動します。
  4. [アクセス権を付与] をクリックします。
  5. 次の構成の詳細を指定します。
    • プリンシパルを追加: Google SecOps サービス アカウントのメールアドレスを貼り付けます。
    • ロールを割り当てる: [Storage オブジェクト閲覧者] を選択します。
  6. [保存] をクリックします。

UDM マッピング テーブル

ログフィールド UDM マッピング ロジック
extensions.auth.type 認証タイプ(SSO、MFA)
metadata.event_type イベントのタイプ(USER_LOGIN、NETWORK_CONNECTION)
EVENTNAME metadata.product_event_type プロダクト固有のイベントタイプ
ID metadata.product_log_id プロダクト固有のログ ID
USERAGENT network.http.parsed_user_agent 解析された HTTP ユーザー エージェント
USERAGENT network.http.user_agent HTTP ユーザー エージェント文字列
SESSIONID network.session_id セッション ID
DEVICEMODEL principal.asset.hardware.model アセットのハードウェア モデル
COMPANION,MACHINEDOMAIN principal.asset.hostname アセットのホスト名
REMOTEIP principal.asset.ip アセットの IP アドレス
DEVICEID principal.asset_id アセットの一意の識別子
COMPANION,MACHINEDOMAIN principal.hostname プリンシパルに関連付けられたホスト名
REMOTEIP principal.ip プリンシパルに関連付けられた IP アドレス
DEVICEOS principal.platform プラットフォーム(例: WINDOWS、LINUX)
DEVICEOSVERSION principal.platform_version プラットフォームのバージョン
ISSUCCESSFUL security_result.action セキュリティ システムによって実行されたアクション(例: ALLOW、BLOCK)
メッセージ security_result.description セキュリティ結果の説明
MACHINEUSERNAME target.user.user_display_name ユーザーの表示名
FIDOUSER target.user.userid ユーザー ID
metadata.product_name 商品名
metadata.vendor_name ベンダー/会社名

さらにサポートが必要な場合 コミュニティ メンバーや Google SecOps のプロフェッショナルから回答を得ることができます。