收集 HYPR MFA 記錄

支援的國家/地區:

本文說明如何使用 Webhook 或 Google Cloud Storage V2,將 HYPR MFA 記錄檔擷取至 Google Security Operations。

HYPR MFA 是一種無密碼多重驗證解決方案,可使用 FIDO2 密碼金鑰、生物特徵辨識和行動裝置啟動登入程序,提供防範網路釣魚的驗證機制。HYPR 會以安全的公開金鑰密碼編譯機制取代傳統密碼,消除憑證式攻擊,同時簡化工作站、網路應用程式和雲端服務的使用者驗證程序。

事前準備

請確認您已完成下列事前準備事項:

  • Google SecOps 執行個體
  • HYPR 控制中心的管理員存取權
  • 聯絡 HYPR 支援團隊,為要監控的 RP 應用程式啟用自訂事件掛鉤

收集方法差異

HYPR MFA 支援兩種將記錄傳送至 Google Security Operations 的方法:

  • Webhook (建議使用):HYPR 會透過自訂事件 Hook,將事件即時傳送至 Google Security Operations。這個方法可立即傳送事件,且不需要額外基礎架構。
  • Google Cloud Storage:系統會透過 API 收集 HYPR 事件並儲存在 GCS 中,然後由 Google Security Operations 擷取。這個方法提供批次處理和歷史資料保留功能。

請選擇最符合需求的做法:

功能 Webhook Google Cloud Storage
延遲時間 即時 (秒) 批次 (分鐘至小時)
基礎架構 具有 Cloud Run 函式的 GCP 專案
歷來資料 只能產生成效報表 在 GCS 中完整保留
設定複雜度 簡單
費用 最低 GCP 運算和儲存空間費用

方法 1:設定 Webhook 整合

在 Google SecOps 中建立 Webhook 動態饋給

建立動態饋給

  1. 依序前往「SIEM 設定」>「動態饋給」
  2. 按一下「新增動態消息」
  3. 在下一個頁面中,按一下「設定單一動態饋給」
  4. 在「動態饋給名稱」欄位中輸入動態饋給名稱 (例如 HYPR MFA Events)。
  5. 選取「Webhook」做為「來源類型」
  6. 選取「HYPR MFA」做為「記錄類型」
  7. 點選「下一步」
  8. 指定下列輸入參數的值:
    • 分割分隔符 (選填):留空。每個 Webhook 要求都包含單一 JSON 事件。
    • 資產命名空間資產命名空間
    • 擷取標籤:要套用至這個動態饋給事件的標籤。
  9. 點選「下一步」
  10. 在「Finalize」(完成) 畫面中檢查新的動態饋給設定,然後按一下「Submit」(提交)

產生並儲存密鑰

建立動態饋給後,您必須產生驗證用的密鑰:

  1. 在動態消息詳細資料頁面中,按一下「產生密鑰」
  2. 對話方塊會顯示密鑰。
  3. 複製並妥善儲存密鑰。

取得動態饋給端點網址

  1. 前往動態消息的「詳細資料」分頁。
  2. 在「端點資訊」部分,複製「動態消息端點網址」
  3. 網址格式為:

    https://malachiteingestion-pa.googleapis.com/v2/unstructuredlogentries:batchCreate
    

    https://<REGION>-malachiteingestion-pa.googleapis.com/v2/unstructuredlogentries:batchCreate
    
  4. 請儲存這個網址,以供後續步驟使用。

  5. 按一下 [完成]

建立 Google Cloud API 金鑰

Chronicle 需要 API 金鑰才能進行驗證。在 Google Cloud 控制台中建立受限制的 API 金鑰。

建立 API 金鑰

  1. 前往 Google Cloud 控制台的「憑證」頁面
  2. 選取專案 (與 Chronicle 執行個體相關聯的專案)。
  3. 依序按一下「建立憑證」>「API 金鑰」
  4. 系統會建立 API 金鑰,並在對話方塊中顯示。
  5. 按一下「編輯 API 金鑰」即可限制金鑰。

限制 API 金鑰

  1. 在「API 金鑰」設定頁面中:
    • 名稱:輸入描述性名稱 (例如 Chronicle Webhook API Key)。
  2. 在「API 限制」下方:
    1. 選取「Restrict key」(限制金鑰)
    2. 在「選取 API」下拉式選單中,搜尋並選取「Google SecOps API」 (或「Chronicle API」)。
  3. 按一下 [儲存]
  4. 從頁面頂端的「API key」(API 金鑰) 欄位複製 API 金鑰值
  5. 安全儲存 API 金鑰。

設定 HYPR MFA 自訂事件掛鉤

使用標頭建構 Webhook 網址

HYPR 支援用於驗證的自訂標頭。使用標頭驗證方法,提升安全性。

  • 端點網址 (不含參數):

    <ENDPOINT_URL>
    
  • 標題:

    x-goog-chronicle-auth: <API_KEY>
    x-chronicle-auth: <SECRET_KEY>
    
    • 取代:
      • <ENDPOINT_URL>:上一步中的動態消息端點網址。
      • <API_KEY>:您建立的 Google Cloud API 金鑰。
      • <SECRET_KEY>:從 Chronicle 動態饋給建立作業取得的密鑰。

準備自訂事件掛鉤 JSON 設定

  • HYPR 自訂事件掛鉤是使用 JSON 設定。準備下列 JSON 設定,並替換預留位置值:

    {
      "name": "Chronicle SIEM Integration",
      "eventType": "ALL",
      "invocationEndpoint": "<ENDPOINT_URL>",
      "httpMethod": "POST",
      "authType": "API_KEY",
      "authParams": {
        "apiKeyAuthParameters": {
          "apiKeyName": "x-goog-chronicle-auth",
          "apiKeyValue": "<API_KEY>"
        },
        "invocationHttpParameters": {
          "headerParameters": [
            {
              "key": "Content-Type",
              "value": "application/json",
              "isValueSecret": false
            },
            {
              "key": "x-chronicle-auth",
              "value": "<SECRET_KEY>",
              "isValueSecret": true
            }
          ]
        }
      }
    }
    
    • 取代:

      • <ENDPOINT_URL>:Chronicle 動態消息端點網址。
      • <API_KEY>:Google Cloud API 金鑰。
      • <SECRET_KEY>:Chronicle 密鑰。
    • 設定參數:

    • 名稱:事件掛鉤的描述性名稱 (例如 Chronicle SIEM Integration)。

    • eventType:設為 ALL 可傳送所有 HYPR 事件,或指定特定事件標記,例如 AUTHENTICATIONREGISTRATIONACCESS_TOKEN

    • invocationEndpoint:Chronicle 動態消息端點網址。

    • httpMethod:設為 POST

    • authType:設為 API_KEY,進行 API 金鑰驗證。

    • apiKeyName:API 金鑰的標頭名稱 (x-goog-chronicle-auth)。

    • apiKeyValue:Google Cloud API 金鑰值。

    • headerParameters:其他標頭,包括 Content-Type: application/jsonx-chronicle-auth 標頭中的 Chronicle 密鑰。

在 HYPR Control Center 中建立自訂事件掛鉤

  1. 以管理員身分登入 HYPR Control Center
  2. 在左側導覽選單中,按一下 [Integrations] (整合)。
  3. 在「整合」頁面中,按一下「新增整合」
  4. HYPR Control Center 會顯示可用的整合功能。
  5. 按一下「自訂事件」下方的「事件掛鉤」圖塊。
  6. 按一下「新增事件掛鉤」
  7. 在「新增事件掛鉤」對話方塊中,將準備好的 JSON 內容貼到文字欄位。
  8. 按一下「新增事件掛鉤」
  9. HYPR 控制中心會返回「事件掛鉤」頁面。

自訂事件 Hook 現已設定完成,並開始將事件傳送至 Google SecOps。

確認 Webhook 運作正常

檢查 HYPR Control Center 事件掛鉤狀態

  1. 登入 HYPR 控制中心
  2. 前往「整合」
  3. 按一下「自訂事件」整合。
  4. 在「事件掛鉤」表格中,確認事件掛鉤是否已列出。
  5. 按一下事件掛鉤名稱即可查看詳細資料。
  6. 確認設定符合您的設定。

查看 Chronicle 動態消息狀態

  1. 前往 Chronicle 的「SIEM 設定」>「動態饋給」
  2. 找出 Webhook 動態消息。
  3. 檢查「狀態」欄 (應為「有效」)。
  4. 檢查「收到的事件」計數 (應會遞增)。
  5. 檢查「上次成功時間」時間戳記 (應為最近的時間)。

在 Chronicle 中驗證記錄

  1. 依序前往「搜尋」>「UDM 搜尋」
  2. 請使用下列查詢:

    metadata.vendor_name = "HYPR" AND metadata.product_name = "MFA"
    
  3. 將時間範圍調整為過去 1 小時。

  4. 確認結果中顯示事件。

驗證方式參考資料

HYPR 自訂事件掛鉤支援多種驗證方法。建議使用自訂標頭進行 API 金鑰驗證,以存取 Chronicle。

  • 設定:

    {
      "authType": "API_KEY",
      "authParams": {
        "apiKeyAuthParameters": {
          "apiKeyName": "x-goog-chronicle-auth",
          "apiKeyValue": "<API_KEY>"
        },
        "invocationHttpParameters": {
          "headerParameters": [
            {
              "key": "Content-Type",
              "value": "application/json",
              "isValueSecret": false
            },
            {
              "key": "x-chronicle-auth",
              "value": "<SECRET_KEY>",
              "isValueSecret": true
            }
          ]
        }
      }
    }
    
  • 優點:

    • 標頭中傳送的 API 金鑰和密鑰 (比網址參數更安全)。
    • 支援多個驗證標頭。
    • 網路伺服器存取記錄中未記錄標頭。

基本驗證

  • 設定:

    {
      "authType": "BASIC",
      "authParams": {
        "basicAuthParameters": {
          "username": "your-username",
          "password": "your-password"
        },
        "invocationHttpParameters": {
          "headerParameters": [
            {
              "key": "Content-Type",
              "value": "application/json",
              "isValueSecret": false
            }
          ]
        }
      }
    }
    
    • 使用案例:目標系統需要 HTTP 基本驗證。

OAuth 2.0 用戶端憑證

  • 設定:

    {
      "authType": "OAUTH_CLIENT_CREDENTIALS",
      "authParams": {
        "oauthParameters": {
          "clientParameters": {
            "clientId": "your-client-id",
            "clientSecret": "your-client-secret"
          },
          "authorizationEndpoint": "https://login.example.com/oauth2/v2.0/token",
          "httpMethod": "POST",
          "oauthHttpParameters": {
            "bodyParameters": [
              {
                "key": "scope",
                "value": "api://your-api/.default",
                "isValueSecret": false
              },
              {
                "key": "grant_type",
                "value": "client_credentials",
                "isValueSecret": false
              }
            ]
          }
        },
        "invocationHttpParameters": {
          "headerParameters": [
            {
              "key": "Content-Type",
              "value": "application/json",
              "isValueSecret": false
            }
          ]
        }
      }
    }
    
    • 用途:目標系統需要 OAuth 2.0 驗證時。

事件類型和篩選條件

系統會使用 eventTags 參數將 HYPR 事件分組。您可以設定自訂事件掛鉤,傳送所有事件或依特定事件類型篩選。

事件代碼

  • 驗證:使用者驗證事件 (登入、解鎖)。
  • REGISTRATION:裝置註冊事件 (配對行動裝置、安全金鑰)。
  • ACCESS_TOKEN:存取權杖產生和使用事件。
  • 稽核:稽核記錄事件 (管理動作、設定變更)。

設定事件篩選條件

如要只傳送特定事件類型,請修改 JSON 設定中的 eventType 參數:

  • 傳送所有事件:

    {
      "eventType": "ALL"
    }
    
  • 只傳送驗證事件:

    {
      "eventType": "AUTHENTICATION"
    }
    
  • 只傳送登記事件:

    {
      "eventType": "REGISTRATION"
    }
    

選項 2:設定 Google Cloud Storage 整合功能

整合 GCS 的其他必要條件

除了「事前準備」一節列出的先決條件外,您還需要:

  • 已啟用 Cloud Storage API 的 GCP 專案
  • 建立及管理 GCS 值區的權限
  • 管理 Google Cloud Storage 值區 IAM 政策的權限
  • 建立 Cloud Run 服務、Pub/Sub 主題和 Cloud Scheduler 工作的權限
  • HYPR API 憑證 (如要存取 API,請與 HYPR 支援團隊聯絡)

建立 Google Cloud Storage 值區

  1. 前往 Google Cloud Console
  2. 選取專案或建立新專案。
  3. 在導覽選單中,依序前往「Cloud Storage」>「Bucket」
  4. 按一下「建立值區」
  5. 請提供下列設定詳細資料:

    設定
    為 bucket 命名 輸入全域不重複的名稱 (例如 hypr-mfa-logs)
    位置類型 根據需求選擇 (區域、雙區域、多區域)
    位置 選取位置 (例如 us-central1)
    儲存空間級別 標準 (建議用於經常存取的記錄)
    存取控管 統一 (建議)
    保護工具 選用:啟用物件版本管理或保留政策
  6. 點選「建立」

收集 HYPR API 憑證

請與 HYPR 支援團隊聯絡,取得存取 HYPR 事件資料的 API 憑證。你需要:

  • API 基準網址:HYPR 執行個體網址 (例如 https://your-tenant.hypr.com)
  • API 權杖:用於 API 存取權的驗證權杖
  • RP 應用程式 ID:要監控的 Relying Party 應用程式 ID

為 Cloud Run 函式建立服務帳戶

Cloud Run 函式需要具備 GCS bucket 寫入權限的服務帳戶,並由 Pub/Sub 叫用。

建立服務帳戶

  1. GCP 主控台中,依序前往「IAM & Admin」(IAM 與管理) >「Service Accounts」(服務帳戶)
  2. 按一下「Create Service Account」(建立服務帳戶)
  3. 請提供下列設定詳細資料:
    • 服務帳戶名稱:輸入 hypr-logs-collector-sa
    • 服務帳戶說明:輸入 Service account for Cloud Run function to collect HYPR MFA logs
  4. 按一下「建立並繼續」
  5. 在「將專案存取權授予這個服務帳戶」部分,新增下列角色:
    1. 按一下「選擇角色」
    2. 搜尋並選取「Storage 物件管理員」
    3. 點選「+ 新增其他角色」
    4. 搜尋並選取「Cloud Run Invoker」
    5. 點選「+ 新增其他角色」
    6. 搜尋並選取「Cloud Functions Invoker」(Cloud Functions 叫用者)
  6. 按一下「繼續」
  7. 按一下 [完成]

這些角色適用於:

  • Storage 物件管理員:將記錄檔寫入 GCS 值區,並管理狀態檔案
  • Cloud Run 叫用者:允許 Pub/Sub 叫用函式
  • Cloud Functions 叫用者:允許函式叫用

授予 GCS 值區的 IAM 權限

將 GCS bucket 的寫入權限授予服務帳戶 (hypr-logs-collector-sa):

  1. 依序前往「Cloud Storage」>「Buckets」
  2. 按一下 bucket 名稱 (例如 hypr-mfa-logs)。
  3. 前往「權限」分頁標籤。
  4. 按一下「授予存取權」
  5. 請提供下列設定詳細資料:
    • 新增主體:輸入服務帳戶電子郵件地址 (例如 hypr-logs-collector-sa@PROJECT_ID.iam.gserviceaccount.com)。
    • 指派角色:選取「Storage 物件管理員」
  6. 按一下 [儲存]

建立 Pub/Sub 主題

建立 Pub/Sub 主題,Cloud Scheduler 會將訊息發布至該主題,而 Cloud Run 函式會訂閱該主題。

  1. GCP Console 中,前往「Pub/Sub」>「Topics」(主題)
  2. 按一下「建立主題」
  3. 請提供下列設定詳細資料:
    • 主題 ID:輸入 hypr-logs-trigger
    • 其他設定保留預設值。
  4. 點選「建立」

建立 Cloud Run 函式來收集記錄

Cloud Run 函式會由 Cloud Scheduler 的 Pub/Sub 訊息觸發,從 HYPR API 擷取記錄並寫入 GCS。

  1. 前往 GCP Console 的「Cloud Run」
  2. 按一下「Create service」(建立服務)
  3. 選取「函式」 (使用內嵌編輯器建立函式)。
  4. 在「設定」部分,提供下列設定詳細資料:

    設定
    服務名稱 hypr-logs-collector
    區域 選取與 GCS bucket 相符的區域 (例如 us-central1)
    執行階段 選取 Python 3.12 以上版本
  5. 在「Trigger (optional)」(觸發條件 (選用)) 專區:

    1. 按一下「+ 新增觸發條件」
    2. 選取「Cloud Pub/Sub」
    3. 在「選取 Cloud Pub/Sub 主題」中,選擇 Pub/Sub 主題 (hypr-logs-trigger)。
    4. 按一下 [儲存]
  6. 在「Authentication」(驗證) 部分:

    1. 選取「需要驗證」
    2. 檢查 Identity and Access Management (IAM)
  7. 向下捲動並展開「Containers, Networking, Security」

  8. 前往「安全性」分頁:

    • 服務帳戶:選取服務帳戶 (hypr-logs-collector-sa)。
  9. 前往「容器」分頁:

    1. 按一下「變數與密鑰」
    2. 針對每個環境變數,按一下「+ 新增變數」
    變數名稱 範例值 說明
    GCS_BUCKET hypr-mfa-logs GCS bucket 名稱
    GCS_PREFIX hypr-events 記錄檔的前置字串
    STATE_KEY hypr-events/state.json 狀態檔案路徑
    HYPR_API_URL https://your-tenant.hypr.com HYPR API 基本網址
    HYPR_API_TOKEN your-api-token HYPR API 驗證權杖
    HYPR_RP_APP_ID your-rp-app-id HYPR RP 應用程式 ID
    MAX_RECORDS 1000 每次執行的記錄數量上限
    PAGE_SIZE 100 每頁記錄數
    LOOKBACK_HOURS 24 初始回溯期
  10. 在「變數與密鑰」部分,向下捲動至「要求」

    • 要求逾時:輸入 600 秒 (10 分鐘)。
  11. 前往「設定」分頁:

    • 在「資源」部分:
      • 記憶體:選取 512 MiB 以上。
      • CPU:選取 1
  12. 在「修訂版本資源調度」部分:

    • 執行個體數量下限:輸入 0
    • 「Maximum number of instances」(執行個體數量上限):輸入 100 (或根據預期負載調整)。
  13. 點選「建立」

  14. 等待服務建立完成 (1 到 2 分鐘)。

  15. 服務建立完成後,系統會自動開啟內嵌程式碼編輯器

新增函式程式碼

  1. 在「進入點」欄位中輸入「main」
  2. 在內嵌程式碼編輯器中建立兩個檔案:

    • 第一個檔案:main.py:
    import functions_framework
    from google.cloud import storage
    import json
    import os
    import urllib3
    from datetime import datetime, timezone, timedelta
    import time
    import base64
    
    # Initialize HTTP client with timeouts
    http = urllib3.PoolManager(
        timeout=urllib3.Timeout(connect=5.0, read=30.0),
        retries=False,
    )
    
    # Initialize Storage client
    storage_client = storage.Client()
    
    # Environment variables
    GCS_BUCKET = os.environ.get('GCS_BUCKET')
    GCS_PREFIX = os.environ.get('GCS_PREFIX', 'hypr-events')
    STATE_KEY = os.environ.get('STATE_KEY', 'hypr-events/state.json')
    HYPR_API_URL = os.environ.get('HYPR_API_URL')
    HYPR_API_TOKEN = os.environ.get('HYPR_API_TOKEN')
    HYPR_RP_APP_ID = os.environ.get('HYPR_RP_APP_ID')
    MAX_RECORDS = int(os.environ.get('MAX_RECORDS', '1000'))
    PAGE_SIZE = int(os.environ.get('PAGE_SIZE', '100'))
    LOOKBACK_HOURS = int(os.environ.get('LOOKBACK_HOURS', '24'))
    
    def to_unix_millis(dt: datetime) -> int:
        """Convert datetime to Unix epoch milliseconds."""
        if dt.tzinfo is None:
            dt = dt.replace(tzinfo=timezone.utc)
        dt = dt.astimezone(timezone.utc)
        return int(dt.timestamp() * 1000)
    
    def parse_datetime(value: str) -> datetime:
        """Parse ISO datetime string to datetime object."""
        if value.endswith("Z"):
            value = value[:-1] + "+00:00"
        return datetime.fromisoformat(value)
    
    @functions_framework.cloud_event
    def main(cloud_event):
        """
        Cloud Run function triggered by Pub/Sub to fetch HYPR MFA logs and write to GCS.
    
        Args:
            cloud_event: CloudEvent object containing Pub/Sub message
        """
    
        if not all([GCS_BUCKET, HYPR_API_URL, HYPR_API_TOKEN, HYPR_RP_APP_ID]):
            print('Error: Missing required environment variables')
            return
    
        try:
            # Get GCS bucket
            bucket = storage_client.bucket(GCS_BUCKET)
    
            # Load state
            state = load_state(bucket, STATE_KEY)
    
            # Determine time window
            now = datetime.now(timezone.utc)
            last_time = None
    
            if isinstance(state, dict) and state.get("last_event_time"):
                try:
                    last_time = parse_datetime(state["last_event_time"])
                    # Overlap by 2 minutes to catch any delayed events
                    last_time = last_time - timedelta(minutes=2)
                except Exception as e:
                    print(f"Warning: Could not parse last_event_time: {e}")
    
            if last_time is None:
                last_time = now - timedelta(hours=LOOKBACK_HOURS)
    
            print(f"Fetching logs from {last_time.isoformat()} to {now.isoformat()}")
    
            # Convert to Unix milliseconds for HYPR API
            start_millis = to_unix_millis(last_time)
            end_millis = to_unix_millis(now)
    
            # Fetch logs
            records, newest_event_time = fetch_logs(
                api_url=HYPR_API_URL,
                api_token=HYPR_API_TOKEN,
                rp_app_id=HYPR_RP_APP_ID,
                start_time_ms=start_millis,
                end_time_ms=end_millis,
                page_size=PAGE_SIZE,
                max_records=MAX_RECORDS,
            )
    
            if not records:
                print("No new log records found.")
                save_state(bucket, STATE_KEY, now.isoformat())
                return
    
            # Write to GCS as NDJSON
            timestamp = now.strftime('%Y%m%d_%H%M%S')
            object_key = f"{GCS_PREFIX}/logs_{timestamp}.ndjson"
            blob = bucket.blob(object_key)
    
            ndjson = '\n'.join([json.dumps(record, ensure_ascii=False) for record in records]) + '\n'
            blob.upload_from_string(ndjson, content_type='application/x-ndjson')
    
            print(f"Wrote {len(records)} records to gs://{GCS_BUCKET}/{object_key}")
    
            # Update state with newest event time
            if newest_event_time:
                save_state(bucket, STATE_KEY, newest_event_time)
            else:
                save_state(bucket, STATE_KEY, now.isoformat())
    
            print(f"Successfully processed {len(records)} records")
    
        except Exception as e:
            print(f'Error processing logs: {str(e)}')
            raise
    
    def load_state(bucket, key):
        """Load state from GCS."""
        try:
            blob = bucket.blob(key)
            if blob.exists():
                state_data = blob.download_as_text()
                return json.loads(state_data)
        except Exception as e:
            print(f"Warning: Could not load state: {e}")
    
        return {}
    
    def save_state(bucket, key, last_event_time_iso: str):
        """Save the last event timestamp to GCS state file."""
        try:
            state = {'last_event_time': last_event_time_iso}
            blob = bucket.blob(key)
            blob.upload_from_string(
                json.dumps(state, indent=2),
                content_type='application/json'
            )
            print(f"Saved state: last_event_time={last_event_time_iso}")
        except Exception as e:
            print(f"Warning: Could not save state: {e}")
    
    def fetch_logs(api_url: str, api_token: str, rp_app_id: str, start_time_ms: int, end_time_ms: int, page_size: int, max_records: int):
        """
        Fetch logs from HYPR API with pagination and rate limiting.
    
        Args:
            api_url: HYPR API base URL
            api_token: HYPR API authentication token
            rp_app_id: HYPR RP application ID
            start_time_ms: Start time in Unix milliseconds
            end_time_ms: End time in Unix milliseconds
            page_size: Number of records per page
            max_records: Maximum total records to fetch
    
        Returns:
            Tuple of (records list, newest_event_time ISO string)
        """
        # Clean up API URL
        base_url = api_url.rstrip('/')
    
        endpoint = f"{base_url}/rp/api/versioned/events"
    
        # Bearer token authentication
        headers = {
            'Authorization': f'Bearer {api_token}',
            'Accept': 'application/json',
            'Content-Type': 'application/json',
            'User-Agent': 'GoogleSecOps-HYPRCollector/1.0'
        }
    
        records = []
        newest_time = None
        page_num = 0
        backoff = 1.0
    
        # Offset-based pagination
        start_index = 0
    
        while True:
            page_num += 1
    
            if len(records) >= max_records:
                print(f"Reached max_records limit ({max_records})")
                break
    
            # Build request parameters
            params = []
            params.append(f"rpAppId={rp_app_id}")
            params.append(f"startDate={start_time_ms}")
            params.append(f"endDate={end_time_ms}")
            params.append(f"start={start_index}")
            params.append(f"limit={min(page_size, max_records - len(records))}")
            url = f"{endpoint}?{'&'.join(params)}"
    
            try:
                response = http.request('GET', url, headers=headers)
    
                # Handle rate limiting with exponential backoff
                if response.status == 429:
                    retry_after = int(response.headers.get('Retry-After', str(int(backoff))))
                    print(f"Rate limited (429). Retrying after {retry_after}s...")
                    time.sleep(retry_after)
                    backoff = min(backoff * 2, 30.0)
                    continue
    
                backoff = 1.0
    
                if response.status != 200:
                    print(f"HTTP Error: {response.status}")
                    response_text = response.data.decode('utf-8')
                    print(f"Response body: {response_text}")
                    return [], None
    
                data = json.loads(response.data.decode('utf-8'))
    
                # Extract results
                page_results = data.get('data', [])
    
                if not page_results:
                    print(f"No more results (empty page)")
                    break
    
                print(f"Page {page_num}: Retrieved {len(page_results)} events")
                records.extend(page_results)
    
                # Track newest event time
                for event in page_results:
                    try:
                        # HYPR uses LOGGEDTIMEINUTC field with Unix milliseconds
                        event_time_ms = event.get('LOGGEDTIMEINUTC')
                        if event_time_ms:
                            event_dt = datetime.fromtimestamp(event_time_ms / 1000, tz=timezone.utc)
                            event_time = event_dt.isoformat()
                            if newest_time is None or parse_datetime(event_time) > parse_datetime(newest_time):
                                newest_time = event_time
                    except Exception as e:
                        print(f"Warning: Could not parse event time: {e}")
    
                # Check for more results
                current_size = data.get('size', 0)
                if current_size < page_size:
                    print(f"Reached last page (size={current_size} < limit={page_size})")
                    break
    
                start_index += current_size
    
            except Exception as e:
                print(f"Error fetching logs: {e}")
                return [], None
    
        print(f"Retrieved {len(records)} total records from {page_num} pages")
        return records, newest_time
    
    • 第二個檔案:requirements.txt:
    functions-framework==3.*
    google-cloud-storage==2.*
    urllib3>=2.0.0
    
  3. 點選「部署」來儲存並部署函式。

  4. 等待部署作業完成 (2 到 3 分鐘)。

建立 Cloud Scheduler 工作

Cloud Scheduler 會定期將訊息發布至 Pub/Sub 主題 (hypr-logs-trigger),觸發 Cloud Run 函式。

  1. 前往 GCP 主控台的「Cloud Scheduler」
  2. 點選「建立工作」
  3. 請提供下列設定詳細資料:

    設定
    名稱 hypr-logs-collector-hourly
    區域 選取與 Cloud Run 函式相同的區域
    頻率 0 * * * * (每小時整點)
    時區 選取時區 (建議使用世界標準時間)
    目標類型 Pub/Sub
    主題 選取 Pub/Sub 主題 (hypr-logs-trigger)
    郵件內文 {} (空白 JSON 物件)
  4. 點選「建立」

排程頻率選項

根據記錄檔量和延遲時間要求選擇頻率:

頻率 Cron 運算式 用途
每 5 分鐘 */5 * * * * 高容量、低延遲
每 15 分鐘 */15 * * * * 中等
每小時 0 * * * * 標準 (建議)
每 6 小時 0 */6 * * * 少量、批次處理
每日 0 0 * * * 歷來資料集合

測試整合項目

  1. Cloud Scheduler 控制台中,找出您的作業 (hypr-logs-collector-hourly)。
  2. 按一下「強制執行」,手動觸發工作。
  3. 稍等幾秒鐘。
  4. 前往「Cloud Run」>「Services」
  5. 按一下函式名稱 (hypr-logs-collector)。
  6. 按一下 [Logs] (記錄) 分頁標籤。
  7. 確認函式是否已順利執行。尋找:

    Fetching logs from YYYY-MM-DDTHH:MM:SS+00:00 to YYYY-MM-DDTHH:MM:SS+00:00
    Page 1: Retrieved X events
    Wrote X records to gs://bucket-name/prefix/logs_YYYYMMDD_HHMMSS.ndjson
    Successfully processed X records
    
  8. 依序前往「Cloud Storage」>「Buckets」

  9. 按一下 bucket 名稱 (例如 hypr-mfa-logs)。

  10. 前往前置字串資料夾 (例如 hypr-events/)。

  11. 確認是否已建立含有目前時間戳記的新 .ndjson 檔案。

如果在記錄中發現錯誤:

  • HTTP 401:檢查環境變數中的 API 憑證
  • HTTP 403:確認 HYPR API 權杖具備必要權限,且 RP 應用程式 ID 正確無誤
  • HTTP 429:頻率限制 - 函式會自動重試並延遲
  • 缺少環境變數:檢查是否已設定所有必要變數

擷取 Google SecOps 服務帳戶

Google SecOps 會使用專屬服務帳戶,從 GCS bucket 讀取資料。您必須授予這個服務帳戶值區存取權。

在 Google SecOps 中設定資訊提供,擷取 HYPR MFA 記錄

  1. 依序前往「SIEM 設定」>「動態饋給」
  2. 按一下「新增動態消息」
  3. 按一下「設定單一動態饋給」
  4. 在「動態饋給名稱」欄位中輸入動態饋給名稱 (例如 HYPR MFA Logs from GCS)。
  5. 選取「Google Cloud Storage V2」做為「來源類型」
  6. 選取「HYPR MFA」做為「記錄類型」

  7. 按一下「取得服務帳戶」。系統會顯示不重複的服務帳戶電子郵件地址,例如:

    chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com
    
  8. 複製這個電子郵件地址,以便在下一步中使用。

  9. 點選「下一步」

  10. 指定下列輸入參數的值:

    • 儲存空間 bucket URL:輸入 GCS bucket URI,並加上前置路徑:

      gs://hypr-mfa-logs/hypr-events/
      
      • 取代:
        • hypr-mfa-logs:您的 GCS bucket 名稱。
        • hypr-events:儲存記錄的選用前置字元/資料夾路徑 (如為根目錄,請留空)。
    • 來源刪除選項:根據偏好設定選取刪除選項:

      • 永不:移轉後一律不刪除任何檔案 (建議用於測試)。
      • 刪除已轉移的檔案:成功轉移檔案後刪除檔案。
      • 刪除已轉移的檔案和空白目錄:成功轉移後刪除檔案和空白目錄。

    • 檔案存在時間上限:包含在過去天數內修改的檔案。預設值為 180 天。

    • 資產命名空間資產命名空間

    • 擷取標籤:要套用至這個動態饋給事件的標籤。

  11. 點選「下一步」

  12. 在「Finalize」(完成) 畫面中檢查新的動態饋給設定,然後按一下「Submit」(提交)

將 IAM 權限授予 Google SecOps 服務帳戶

Google SecOps 服務帳戶需要 GCS bucket 的「Storage 物件檢視者」角色。

  1. 依序前往「Cloud Storage」>「Buckets」
  2. 按一下 bucket 名稱 (例如 hypr-mfa-logs)。
  3. 前往「權限」分頁標籤。
  4. 按一下「授予存取權」
  5. 請提供下列設定詳細資料:
    • 新增主體:貼上 Google SecOps 服務帳戶電子郵件地址。
    • 指派角色:選取「Storage 物件檢視者」
  6. 按一下 [儲存]

UDM 對應表

記錄欄位 UDM 對應 邏輯
extensions.auth.type 驗證類型 (例如單一登入 (SSO)、多重驗證 (MFA)
metadata.event_type 事件類型 (例如USER_LOGIN、NETWORK_CONNECTION)
EVENTNAME metadata.product_event_type 產品專屬事件類型
ID metadata.product_log_id 產品專屬記錄 ID
USERAGENT network.http.parsed_user_agent 已剖析的 HTTP 使用者代理程式
USERAGENT network.http.user_agent HTTP 使用者代理程式字串
SESSIONID network.session_id 工作階段 ID
DEVICEMODEL principal.asset.hardware.model 資產的硬體型號
COMPANION,MACHINEDOMAIN principal.asset.hostname 資產的主機名稱
REMOTEIP principal.asset.ip 資產的 IP 位址
DEVICEID principal.asset_id 資產的專屬 ID
COMPANION,MACHINEDOMAIN principal.hostname 與主體相關聯的主機名稱
REMOTEIP principal.ip 與主體相關聯的 IP 位址
DEVICEOS principal.platform 平台 (例如 WINDOWS、LINUX)
DEVICEOSVERSION principal.platform_version 平台版本
ISSUCCESSFUL security_result.action 保全系統採取的動作 (例如允許、封鎖)
訊息 security_result.description 安全性結果說明
MACHINEUSERNAME target.user.user_display_name 使用者的顯示名稱
FIDOUSER target.user.userid 使用者 ID
metadata.product_name 產品名稱
metadata.vendor_name 供應商/公司名稱

還有其他問題嗎?向社群成員和 Google SecOps 專業人員尋求答案。