收集 HYPR MFA 日志

支持的平台:

本文档介绍了如何使用 Webhook 或 Google Cloud Storage V2 将 HYPR MFA 日志注入到 Google Security Operations。

HYPR MFA 是一种无密码多重身份验证解决方案,可使用 FIDO2 通行密钥、生物识别技术和移动设备发起的登录提供防网上诱骗的身份验证。HYPR 使用安全的公钥加密取代了传统密码,从而消除了基于凭据的攻击,同时简化了工作站、Web 应用和云服务中的用户身份验证。

准备工作

确保您满足以下前提条件:

  • Google SecOps 实例
  • 对 HYPR 控制中心的管理员访问权限
  • 联系 HYPR 支持团队,为要监控的 RP 应用启用自定义事件钩子

收集方法差异

HYPR MFA 支持两种将日志发送到 Google Security Operations 的方法:

  • Webhook(推荐):HYPR 通过自定义事件钩子将事件实时发送到 Google Security Operations。此方法可立即传送事件,无需额外的基础设施。
  • Google Cloud Storage:HYPR 事件通过 API 收集并存储在 GCS 中,然后由 Google Security Operations 提取。此方法提供批量处理和历史数据保留功能。

选择最符合您要求的方法:

功能 网络钩子 Google Cloud Storage
延迟时间 实时(秒) 批处理(分钟到小时)
基础架构 无要求 包含 Cloud Run 函数的 GCP 项目
历史数据 只能生成事件流 GCS 中的完整保留
设置复杂性 简单
费用 轻微 GCP 计算和存储费用

方法 1:配置网络钩子集成

在 Google SecOps 中创建 Webhook Feed

创建 Feed

  1. 依次前往 SIEM 设置 > Feed
  2. 点击添加新 Feed
  3. 在下一页上,点击配置单个 Feed
  4. Feed 名称字段中,输入 Feed 的名称(例如 HYPR MFA Events)。
  5. 选择 Webhook 作为来源类型
  6. 选择 HYPR MFA 作为日志类型
  7. 点击下一步
  8. 为以下输入参数指定值:
    • 拆分分隔符(可选):留空。每个 webhook 请求都包含一个 JSON 事件。
    • 资产命名空间资产命名空间
    • 注入标签:要应用于此 Feed 中事件的标签。
  9. 点击下一步
  10. 最终确定界面中查看新的 Feed 配置,然后点击提交

生成并保存密钥

创建 Feed 后,您必须生成用于身份验证的密钥:

  1. 在 Feed 详情页面上,点击生成密钥
  2. 系统会显示一个包含密钥的对话框。
  3. 复制并妥善保存此密钥。

获取 Feed 端点网址

  1. 前往 Feed 的详细信息标签页。
  2. 端点信息部分,复制 Feed 端点网址
  3. 网址格式为:

    https://malachiteingestion-pa.googleapis.com/v2/unstructuredlogentries:batchCreate
    

    https://<REGION>-malachiteingestion-pa.googleapis.com/v2/unstructuredlogentries:batchCreate
    
  4. 保存此网址以供后续步骤使用。

  5. 点击完成

创建 Google Cloud API 密钥

Chronicle 需要 API 密钥才能进行身份验证。在 Google Cloud Console 中创建受限 API 密钥。

创建 API 密钥

  1. 前往 Google Cloud 控制台的“凭据”页面
  2. 选择您的项目(与您的 Chronicle 实例关联的项目)。
  3. 依次点击创建凭据> API 密钥
  4. 系统会创建一个 API 密钥,并在对话框中显示该密钥。
  5. 点击修改 API 密钥以限制密钥。

限制 API 密钥

  1. API 密钥设置页面中:
    • 名称:输入一个描述性名称(例如 Chronicle Webhook API Key)。
  2. API 限制下:
    1. 选择限制密钥
    2. 选择 API 下拉菜单中,搜索并选择 Google SecOps API(或 Chronicle API)。
  3. 点击保存
  4. 从页面顶部的 API 密钥字段复制 API 密钥值。
  5. 安全地保存 API 密钥。

配置 HYPR MFA 自定义事件钩子

构建带有标头的网络钩子网址

HYPR 支持使用自定义标头进行身份验证。使用标头身份验证方法可提高安全性。

  • 端点网址(不含参数)

    <ENDPOINT_URL>
    
  • 标头

    x-goog-chronicle-auth: <API_KEY>
    x-chronicle-auth: <SECRET_KEY>
    
    • 替换:
      • <ENDPOINT_URL>:上一步中的 Feed 端点网址。
      • <API_KEY>:您创建的 Google Cloud API 密钥。
      • <SECRET_KEY>:从 Chronicle Feed 创建中获得的密钥。

准备自定义事件钩子 JSON 配置

  • HYPR 自定义事件钩子使用 JSON 进行配置。准备以下 JSON 配置,替换占位值:

    {
      "name": "Chronicle SIEM Integration",
      "eventType": "ALL",
      "invocationEndpoint": "<ENDPOINT_URL>",
      "httpMethod": "POST",
      "authType": "API_KEY",
      "authParams": {
        "apiKeyAuthParameters": {
          "apiKeyName": "x-goog-chronicle-auth",
          "apiKeyValue": "<API_KEY>"
        },
        "invocationHttpParameters": {
          "headerParameters": [
            {
              "key": "Content-Type",
              "value": "application/json",
              "isValueSecret": false
            },
            {
              "key": "x-chronicle-auth",
              "value": "<SECRET_KEY>",
              "isValueSecret": true
            }
          ]
        }
      }
    }
    
      • <ENDPOINT_URL>:Chronicle Feed 端点网址。
      • <API_KEY>:Google Cloud API 密钥。
      • <SECRET_KEY>:Chronicle 密钥。
    • 配置参数

    • 名称:事件钩子的描述性名称(例如 Chronicle SIEM Integration)。

    • eventType:设置为 ALL 可发送所有 HYPR 事件,也可指定特定事件标记,例如 AUTHENTICATIONREGISTRATIONACCESS_TOKEN

    • invocationEndpoint:Chronicle Feed 端点网址。

    • httpMethod:设置为 POST

    • authType:设置为 API_KEY 以进行 API 密钥身份验证。

    • apiKeyName:API 密钥的标头名称 (x-goog-chronicle-auth)。

    • apiKeyValue:Google Cloud API 密钥值。

    • headerParameters:其他标头,包括 Content-Type: application/jsonx-chronicle-auth 标头中的 Chronicle 密钥。

在 HYPR 控制中心内创建自定义事件钩子

  1. 以管理员身份登录 HYPR 控制中心
  2. 在左侧的导航菜单中,点击集成
  3. 集成页面上,点击添加新集成
  4. HYPR 控制中心会显示可用的集成。
  5. 点击自定义事件事件钩子下的相应功能块。
  6. 点击 Add New Event Hook
  7. 添加新事件钩子对话框中,将您准备的 JSON 内容粘贴到文本字段中。
  8. 点击添加事件钩子
  9. HYPR 控制中心返回到 Event Hooks 页面。

自定义事件钩子现已配置完毕,并将开始向 Google SecOps 发送事件。

验证 Webhook 是否正常运行

检查 HYPR 控制中心事件钩子状态

  1. 登录 HYPR 控制中心
  2. 前往集成
  3. 点击自定义事件集成。
  4. 事件钩子表格中,验证您的事件钩子是否已列出。
  5. 点击事件钩子名称可查看详细信息。
  6. 验证配置是否与您的设置一致。

检查 Chronicle Feed 状态

  1. 在 Chronicle 中,依次前往 SIEM 设置 > Feed
  2. 找到您的 Webhook Feed。
  3. 查看状态列(应为有效)。
  4. 检查收到的事件数(应会递增)。
  5. 检查上次成功启动时间时间戳(应为最近的时间)。

验证 Chronicle 中的日志

  1. 依次前往搜索 > UDM 搜索
  2. 使用以下查询:

    metadata.vendor_name = "HYPR" AND metadata.product_name = "MFA"
    
  3. 将时间范围调整为“过去 1 小时”。

  4. 验证活动是否显示在结果中。

身份验证方法参考

HYPR 自定义事件钩子支持多种身份验证方法。对于 Chronicle,建议使用带有自定义标头的 API 密钥身份验证方法。

  • 配置

    {
      "authType": "API_KEY",
      "authParams": {
        "apiKeyAuthParameters": {
          "apiKeyName": "x-goog-chronicle-auth",
          "apiKeyValue": "<API_KEY>"
        },
        "invocationHttpParameters": {
          "headerParameters": [
            {
              "key": "Content-Type",
              "value": "application/json",
              "isValueSecret": false
            },
            {
              "key": "x-chronicle-auth",
              "value": "<SECRET_KEY>",
              "isValueSecret": true
            }
          ]
        }
      }
    }
    
  • 优点:

    • 在标头中发送的 API 密钥和密钥(比网址参数更安全)。
    • 支持多个身份验证标头。
    • 标头未记录在 Web 服务器访问日志中。

基本身份验证

  • 配置

    {
      "authType": "BASIC",
      "authParams": {
        "basicAuthParameters": {
          "username": "your-username",
          "password": "your-password"
        },
        "invocationHttpParameters": {
          "headerParameters": [
            {
              "key": "Content-Type",
              "value": "application/json",
              "isValueSecret": false
            }
          ]
        }
      }
    }
    
    • 应用场景:当目标系统需要 HTTP 基本身份验证时。

OAuth 2.0 客户端凭据

  • 配置

    {
      "authType": "OAUTH_CLIENT_CREDENTIALS",
      "authParams": {
        "oauthParameters": {
          "clientParameters": {
            "clientId": "your-client-id",
            "clientSecret": "your-client-secret"
          },
          "authorizationEndpoint": "https://login.example.com/oauth2/v2.0/token",
          "httpMethod": "POST",
          "oauthHttpParameters": {
            "bodyParameters": [
              {
                "key": "scope",
                "value": "api://your-api/.default",
                "isValueSecret": false
              },
              {
                "key": "grant_type",
                "value": "client_credentials",
                "isValueSecret": false
              }
            ]
          }
        },
        "invocationHttpParameters": {
          "headerParameters": [
            {
              "key": "Content-Type",
              "value": "application/json",
              "isValueSecret": false
            }
          ]
        }
      }
    }
    
    • 使用情形:当目标系统需要 OAuth 2.0 身份验证时。

事件类型和过滤

HYPR 事件使用 eventTags 参数进行分组。您可以将自定义事件钩子配置为发送所有事件,也可以按特定事件类型进行过滤。

事件代码

  • AUTHENTICATION:用户身份验证事件(登录、解锁)。
  • REGISTRATION:设备注册事件(配对移动设备、安全密钥)。
  • ACCESS_TOKEN:访问令牌生成和使用事件。
  • AUDIT:审核日志事件(管理操作、配置更改)。

配置事件过滤

如需仅发送特定类型的事件,请修改 JSON 配置中的 eventType 参数:

  • 发送所有事件

    {
      "eventType": "ALL"
    }
    
  • 仅发送身份验证事件

    {
      "eventType": "AUTHENTICATION"
    }
    
  • 仅发送注册事件

    {
      "eventType": "REGISTRATION"
    }
    

选项 2:配置 Google Cloud Storage 集成

GCS 集成的其他前提条件

除了“准备工作”部分中列出的前提条件之外,您还需要:

  • 已启用 Cloud Storage API 的 GCP 项目
  • 创建和管理 GCS 存储分区的权限
  • 管理 GCS 存储分区的 IAM 政策的权限
  • 创建 Cloud Run 服务、Pub/Sub 主题和 Cloud Scheduler 作业的权限
  • HYPR API 凭据(请与 HYPR 支持团队联系以获取 API 访问权限)

创建 Google Cloud Storage 存储分区

  1. 转到 Google Cloud Console
  2. 选择您的项目或创建新项目。
  3. 在导航菜单中,依次前往 Cloud Storage > 存储分区
  4. 点击创建存储分区
  5. 提供以下配置详细信息:

    设置
    为存储桶命名 输入一个全局唯一的名称(例如 hypr-mfa-logs
    位置类型 根据您的需求进行选择(区域级、双区域级、多区域级)
    位置 选择相应位置(例如 us-central1
    存储类别 标准(建议用于经常访问的日志)
    访问权限控制 统一(推荐)
    保护工具 可选:启用对象版本控制或保留政策
  6. 点击创建

收集 HYPR API 凭据

请与 HYPR 支持团队联系,以获取用于访问 HYPR 活动数据的 API 凭据。您需要有:

  • API 基准网址:您的 HYPR 实例网址(例如 https://your-tenant.hypr.com
  • API 令牌:用于 API 访问的身份验证令牌
  • RP 应用 ID:要监控的依赖方应用 ID

为 Cloud Run 函数创建服务账号

Cloud Run 函数需要一个服务账号,该账号具有向 GCS 存储桶写入内容以及被 Pub/Sub 调用的权限。

创建服务账号

  1. GCP 控制台中,依次前往 IAM 和管理 > 服务账号
  2. 点击创建服务账号
  3. 提供以下配置详细信息:
    • 服务账号名称:输入 hypr-logs-collector-sa
    • 服务账号说明:输入 Service account for Cloud Run function to collect HYPR MFA logs
  4. 点击创建并继续
  5. 向此服务账号授予对项目的访问权限部分中,添加以下角色:
    1. 点击选择角色
    2. 搜索并选择 Storage Object Admin
    3. 点击 + 添加其他角色
    4. 搜索并选择 Cloud Run Invoker
    5. 点击 + 添加其他角色
    6. 搜索并选择 Cloud Functions Invoker
  6. 点击继续
  7. 点击完成

必须拥有这些角色,才能:

  • Storage Object Admin:将日志写入 GCS 存储桶并管理状态文件
  • Cloud Run Invoker:允许 Pub/Sub 调用函数
  • Cloud Functions Invoker:允许调用函数

授予对 GCS 存储桶的 IAM 权限

向服务账号 (hypr-logs-collector-sa) 授予对 GCS 存储桶的写入权限:

  1. 前往 Cloud Storage > 存储分区
  2. 点击您的存储桶名称(例如 hypr-mfa-logs)。
  3. 前往权限标签页。
  4. 点击授予访问权限
  5. 提供以下配置详细信息:
    • 添加主账号:输入服务账号电子邮件地址(例如 hypr-logs-collector-sa@PROJECT_ID.iam.gserviceaccount.com)。
    • 分配角色:选择 Storage Object Admin
  6. 点击保存

创建发布/订阅主题

创建一个 Pub/Sub 主题,供 Cloud Scheduler 发布消息,并供 Cloud Run 函数订阅。

  1. GCP 控制台中,前往 Pub/Sub > 主题
  2. 点击创建主题
  3. 提供以下配置详细信息:
    • 主题 ID:输入 hypr-logs-trigger
    • 将其他设置保留为默认值。
  4. 点击创建

创建 Cloud Run 函数以收集日志

Cloud Run 函数将由来自 Cloud Scheduler 的 Pub/Sub 消息触发,以从 HYPR API 中提取日志并将其写入 GCS。

  1. GCP 控制台中,前往 Cloud Run
  2. 点击创建服务
  3. 选择函数(使用内嵌编辑器创建函数)。
  4. 配置部分中,提供以下配置详细信息:

    设置
    Service 名称 hypr-logs-collector
    区域 选择与您的 GCS 存储桶匹配的区域(例如 us-central1
    运行时 选择 Python 3.12 或更高版本
  5. 触发器(可选)部分中:

    1. 点击 + 添加触发器
    2. 选择 Cloud Pub/Sub
    3. 选择 Cloud Pub/Sub 主题中,选择 Pub/Sub 主题 (hypr-logs-trigger)。
    4. 点击保存
  6. 身份验证部分中:

    1. 选择需要进行身份验证
    2. 检查 Identity and Access Management (IAM)
  7. 向下滚动并展开容器、网络、安全性

  8. 前往安全标签页:

    • 服务账号:选择服务账号 (hypr-logs-collector-sa)。
  9. 前往容器标签页:

    1. 点击变量和密钥
    2. 为每个环境变量点击 + 添加变量
    变量名称 示例值 说明
    GCS_BUCKET hypr-mfa-logs GCS 存储桶名称
    GCS_PREFIX hypr-events 日志文件的前缀
    STATE_KEY hypr-events/state.json 状态文件路径
    HYPR_API_URL https://your-tenant.hypr.com HYPR API 基本网址
    HYPR_API_TOKEN your-api-token HYPR API 身份验证令牌
    HYPR_RP_APP_ID your-rp-app-id HYPR RP 应用 ID
    MAX_RECORDS 1000 每次运行的记录数上限
    PAGE_SIZE 100 每页记录数
    LOOKBACK_HOURS 24 初始回溯期
  10. 变量和 Secret 部分中,向下滚动到请求

    • 请求超时:输入 600 秒(10 分钟)。
  11. 前往设置标签页:

    • 资源部分中:
      • 内存:选择 512 MiB 或更高值。
      • CPU:选择 1
  12. 修订版本伸缩部分中:

    • 实例数下限:输入 0
    • 实例数上限:输入 100(或根据预期负载进行调整)。
  13. 点击创建

  14. 等待服务创建完成(1-2 分钟)。

  15. 创建服务后,系统会自动打开内嵌代码编辑器

添加函数代码

  1. 入口点字段中输入 main
  2. 在内嵌代码编辑器中,创建两个文件:

    • 第一个文件:main.py:
    import functions_framework
    from google.cloud import storage
    import json
    import os
    import urllib3
    from datetime import datetime, timezone, timedelta
    import time
    import base64
    
    # Initialize HTTP client with timeouts
    http = urllib3.PoolManager(
        timeout=urllib3.Timeout(connect=5.0, read=30.0),
        retries=False,
    )
    
    # Initialize Storage client
    storage_client = storage.Client()
    
    # Environment variables
    GCS_BUCKET = os.environ.get('GCS_BUCKET')
    GCS_PREFIX = os.environ.get('GCS_PREFIX', 'hypr-events')
    STATE_KEY = os.environ.get('STATE_KEY', 'hypr-events/state.json')
    HYPR_API_URL = os.environ.get('HYPR_API_URL')
    HYPR_API_TOKEN = os.environ.get('HYPR_API_TOKEN')
    HYPR_RP_APP_ID = os.environ.get('HYPR_RP_APP_ID')
    MAX_RECORDS = int(os.environ.get('MAX_RECORDS', '1000'))
    PAGE_SIZE = int(os.environ.get('PAGE_SIZE', '100'))
    LOOKBACK_HOURS = int(os.environ.get('LOOKBACK_HOURS', '24'))
    
    def to_unix_millis(dt: datetime) -> int:
        """Convert datetime to Unix epoch milliseconds."""
        if dt.tzinfo is None:
            dt = dt.replace(tzinfo=timezone.utc)
        dt = dt.astimezone(timezone.utc)
        return int(dt.timestamp() * 1000)
    
    def parse_datetime(value: str) -> datetime:
        """Parse ISO datetime string to datetime object."""
        if value.endswith("Z"):
            value = value[:-1] + "+00:00"
        return datetime.fromisoformat(value)
    
    @functions_framework.cloud_event
    def main(cloud_event):
        """
        Cloud Run function triggered by Pub/Sub to fetch HYPR MFA logs and write to GCS.
    
        Args:
            cloud_event: CloudEvent object containing Pub/Sub message
        """
    
        if not all([GCS_BUCKET, HYPR_API_URL, HYPR_API_TOKEN, HYPR_RP_APP_ID]):
            print('Error: Missing required environment variables')
            return
    
        try:
            # Get GCS bucket
            bucket = storage_client.bucket(GCS_BUCKET)
    
            # Load state
            state = load_state(bucket, STATE_KEY)
    
            # Determine time window
            now = datetime.now(timezone.utc)
            last_time = None
    
            if isinstance(state, dict) and state.get("last_event_time"):
                try:
                    last_time = parse_datetime(state["last_event_time"])
                    # Overlap by 2 minutes to catch any delayed events
                    last_time = last_time - timedelta(minutes=2)
                except Exception as e:
                    print(f"Warning: Could not parse last_event_time: {e}")
    
            if last_time is None:
                last_time = now - timedelta(hours=LOOKBACK_HOURS)
    
            print(f"Fetching logs from {last_time.isoformat()} to {now.isoformat()}")
    
            # Convert to Unix milliseconds for HYPR API
            start_millis = to_unix_millis(last_time)
            end_millis = to_unix_millis(now)
    
            # Fetch logs
            records, newest_event_time = fetch_logs(
                api_url=HYPR_API_URL,
                api_token=HYPR_API_TOKEN,
                rp_app_id=HYPR_RP_APP_ID,
                start_time_ms=start_millis,
                end_time_ms=end_millis,
                page_size=PAGE_SIZE,
                max_records=MAX_RECORDS,
            )
    
            if not records:
                print("No new log records found.")
                save_state(bucket, STATE_KEY, now.isoformat())
                return
    
            # Write to GCS as NDJSON
            timestamp = now.strftime('%Y%m%d_%H%M%S')
            object_key = f"{GCS_PREFIX}/logs_{timestamp}.ndjson"
            blob = bucket.blob(object_key)
    
            ndjson = '\n'.join([json.dumps(record, ensure_ascii=False) for record in records]) + '\n'
            blob.upload_from_string(ndjson, content_type='application/x-ndjson')
    
            print(f"Wrote {len(records)} records to gs://{GCS_BUCKET}/{object_key}")
    
            # Update state with newest event time
            if newest_event_time:
                save_state(bucket, STATE_KEY, newest_event_time)
            else:
                save_state(bucket, STATE_KEY, now.isoformat())
    
            print(f"Successfully processed {len(records)} records")
    
        except Exception as e:
            print(f'Error processing logs: {str(e)}')
            raise
    
    def load_state(bucket, key):
        """Load state from GCS."""
        try:
            blob = bucket.blob(key)
            if blob.exists():
                state_data = blob.download_as_text()
                return json.loads(state_data)
        except Exception as e:
            print(f"Warning: Could not load state: {e}")
    
        return {}
    
    def save_state(bucket, key, last_event_time_iso: str):
        """Save the last event timestamp to GCS state file."""
        try:
            state = {'last_event_time': last_event_time_iso}
            blob = bucket.blob(key)
            blob.upload_from_string(
                json.dumps(state, indent=2),
                content_type='application/json'
            )
            print(f"Saved state: last_event_time={last_event_time_iso}")
        except Exception as e:
            print(f"Warning: Could not save state: {e}")
    
    def fetch_logs(api_url: str, api_token: str, rp_app_id: str, start_time_ms: int, end_time_ms: int, page_size: int, max_records: int):
        """
        Fetch logs from HYPR API with pagination and rate limiting.
    
        Args:
            api_url: HYPR API base URL
            api_token: HYPR API authentication token
            rp_app_id: HYPR RP application ID
            start_time_ms: Start time in Unix milliseconds
            end_time_ms: End time in Unix milliseconds
            page_size: Number of records per page
            max_records: Maximum total records to fetch
    
        Returns:
            Tuple of (records list, newest_event_time ISO string)
        """
        # Clean up API URL
        base_url = api_url.rstrip('/')
    
        endpoint = f"{base_url}/rp/api/versioned/events"
    
        # Bearer token authentication
        headers = {
            'Authorization': f'Bearer {api_token}',
            'Accept': 'application/json',
            'Content-Type': 'application/json',
            'User-Agent': 'GoogleSecOps-HYPRCollector/1.0'
        }
    
        records = []
        newest_time = None
        page_num = 0
        backoff = 1.0
    
        # Offset-based pagination
        start_index = 0
    
        while True:
            page_num += 1
    
            if len(records) >= max_records:
                print(f"Reached max_records limit ({max_records})")
                break
    
            # Build request parameters
            params = []
            params.append(f"rpAppId={rp_app_id}")
            params.append(f"startDate={start_time_ms}")
            params.append(f"endDate={end_time_ms}")
            params.append(f"start={start_index}")
            params.append(f"limit={min(page_size, max_records - len(records))}")
            url = f"{endpoint}?{'&'.join(params)}"
    
            try:
                response = http.request('GET', url, headers=headers)
    
                # Handle rate limiting with exponential backoff
                if response.status == 429:
                    retry_after = int(response.headers.get('Retry-After', str(int(backoff))))
                    print(f"Rate limited (429). Retrying after {retry_after}s...")
                    time.sleep(retry_after)
                    backoff = min(backoff * 2, 30.0)
                    continue
    
                backoff = 1.0
    
                if response.status != 200:
                    print(f"HTTP Error: {response.status}")
                    response_text = response.data.decode('utf-8')
                    print(f"Response body: {response_text}")
                    return [], None
    
                data = json.loads(response.data.decode('utf-8'))
    
                # Extract results
                page_results = data.get('data', [])
    
                if not page_results:
                    print(f"No more results (empty page)")
                    break
    
                print(f"Page {page_num}: Retrieved {len(page_results)} events")
                records.extend(page_results)
    
                # Track newest event time
                for event in page_results:
                    try:
                        # HYPR uses LOGGEDTIMEINUTC field with Unix milliseconds
                        event_time_ms = event.get('LOGGEDTIMEINUTC')
                        if event_time_ms:
                            event_dt = datetime.fromtimestamp(event_time_ms / 1000, tz=timezone.utc)
                            event_time = event_dt.isoformat()
                            if newest_time is None or parse_datetime(event_time) > parse_datetime(newest_time):
                                newest_time = event_time
                    except Exception as e:
                        print(f"Warning: Could not parse event time: {e}")
    
                # Check for more results
                current_size = data.get('size', 0)
                if current_size < page_size:
                    print(f"Reached last page (size={current_size} < limit={page_size})")
                    break
    
                start_index += current_size
    
            except Exception as e:
                print(f"Error fetching logs: {e}")
                return [], None
    
        print(f"Retrieved {len(records)} total records from {page_num} pages")
        return records, newest_time
    
    • 第二个文件:requirements.txt:
    functions-framework==3.*
    google-cloud-storage==2.*
    urllib3>=2.0.0
    
  3. 点击部署以保存并部署该函数。

  4. 等待部署完成(2-3 分钟)。

创建 Cloud Scheduler 作业

Cloud Scheduler 会定期向 Pub/Sub 主题 (hypr-logs-trigger) 发布消息,从而触发 Cloud Run 函数。

  1. GCP Console 中,前往 Cloud Scheduler
  2. 点击创建作业
  3. 提供以下配置详细信息:

    设置
    名称 hypr-logs-collector-hourly
    区域 选择与 Cloud Run 函数相同的区域
    频率 0 * * * *(每小时一次,在整点时)
    时区 选择时区(建议选择世界协调时间 [UTC])
    目标类型 Pub/Sub
    主题 选择 Pub/Sub 主题 (hypr-logs-trigger)
    消息正文 {}(空 JSON 对象)
  4. 点击创建

时间表频率选项

根据日志量和延迟时间要求选择频次:

频率 Cron 表达式 使用场景
每隔 5 分钟 */5 * * * * 高容量、低延迟
每隔 15 分钟 */15 * * * * 搜索量中等
每小时 0 * * * * 标准(推荐)
每 6 小时 0 */6 * * * 量小、批处理
每天 0 0 * * * 历史数据收集

测试集成

  1. Cloud Scheduler 控制台中,找到您的作业 (hypr-logs-collector-hourly)。
  2. 点击强制运行以手动触发作业。
  3. 等待几秒钟。
  4. 前往 Cloud Run > 服务
  5. 点击函数名称 (hypr-logs-collector)。
  6. 点击日志标签页。
  7. 验证函数是否已成功执行。查找:

    Fetching logs from YYYY-MM-DDTHH:MM:SS+00:00 to YYYY-MM-DDTHH:MM:SS+00:00
    Page 1: Retrieved X events
    Wrote X records to gs://bucket-name/prefix/logs_YYYYMMDD_HHMMSS.ndjson
    Successfully processed X records
    
  8. 前往 Cloud Storage > 存储分区

  9. 点击您的存储桶名称(例如 hypr-mfa-logs)。

  10. 前往前缀文件夹(例如 hypr-events/)。

  11. 验证是否已创建具有当前时间戳的新 .ndjson 文件。

如果您在日志中看到错误,请执行以下操作:

  • HTTP 401:检查环境变量中的 API 凭据
  • HTTP 403:验证 HYPR API 令牌是否具有所需权限,以及 RP 应用 ID 是否正确
  • HTTP 429:速率限制 - 函数将自动重试并进行退避
  • 缺少环境变量:检查是否已设置所有必需的变量

检索 Google SecOps 服务账号

Google SecOps 使用唯一的服务账号从您的 GCS 存储桶中读取数据。您必须授予此服务账号对您的存储桶的访问权限。

在 Google SecOps 中配置 Feed 以注入 HYPR MFA 日志

  1. 依次前往 SIEM 设置 > Feed
  2. 点击添加新 Feed
  3. 点击配置单个 Feed
  4. Feed 名称字段中,输入 Feed 的名称(例如 HYPR MFA Logs from GCS)。
  5. 选择 Google Cloud Storage V2 作为来源类型
  6. 选择 HYPR MFA 作为日志类型

  7. 点击获取服务账号。系统会显示一个唯一的服务账号电子邮件地址,例如:

    chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com
    
  8. 复制此电子邮件地址,以便在下一步中使用。

  9. 点击下一步

  10. 为以下输入参数指定值:

    • 存储桶网址:输入带有前缀路径的 GCS 存储桶 URI:

      gs://hypr-mfa-logs/hypr-events/
      
      • 替换:
        • hypr-mfa-logs:您的 GCS 存储桶名称。
        • hypr-events:存储日志的可选前缀/文件夹路径(留空表示根目录)。
    • 来源删除选项:根据您的偏好选择删除选项:

      • 永不:永不删除转移后的任何文件(建议用于测试)。
      • 删除已转移的文件:在成功转移后删除文件。
      • 删除已转移的文件和空目录:在成功转移后删除文件和空目录。

    • 文件存在时间上限:包含在过去指定天数内修改的文件。默认值为 180 天。

    • 资产命名空间资产命名空间

    • 注入标签:要应用于此 Feed 中事件的标签。

  11. 点击下一步

  12. 最终确定界面中查看新的 Feed 配置,然后点击提交

向 Google SecOps 服务账号授予 IAM 权限

Google SecOps 服务账号需要对您的 GCS 存储桶具有 Storage Object Viewer 角色。

  1. 前往 Cloud Storage > 存储分区
  2. 点击您的存储桶名称(例如 hypr-mfa-logs)。
  3. 前往权限标签页。
  4. 点击授予访问权限
  5. 提供以下配置详细信息:
    • 添加主账号:粘贴 Google SecOps 服务账号电子邮件地址。
    • 分配角色:选择 Storage Object Viewer
  6. 点击保存

UDM 映射表

日志字段 UDM 映射 逻辑
extensions.auth.type 身份验证类型(例如,SSO、MFA)
metadata.event_type 活动类型(例如,USER_LOGIN、NETWORK_CONNECTION)
EVENTNAME metadata.product_event_type 特定于产品的事件类型
ID metadata.product_log_id 特定于产品的日志 ID
USERAGENT network.http.parsed_user_agent 已解析的 HTTP 用户代理
USERAGENT network.http.user_agent HTTP 用户代理字符串
SESSIONID network.session_id 会话 ID
DEVICEMODEL principal.asset.hardware.model 相应资产的硬件型号
COMPANION,MACHINEDOMAIN principal.asset.hostname 相应资产的主机名
REMOTEIP principal.asset.ip 资产的 IP 地址
DEVICEID principal.asset_id 相应资产的唯一标识符
COMPANION,MACHINEDOMAIN principal.hostname 与正文相关联的主机名
REMOTEIP principal.ip 与正文相关联的 IP 地址
DEVICEOS principal.platform 平台(例如WINDOWS、LINUX)
DEVICEOSVERSION principal.platform_version 平台版本
ISSUCCESSFUL security_result.action 安防系统采取的操作(例如,允许、屏蔽)
消息 security_result.description 安全结果的说明
MACHINEUSERNAME target.user.user_display_name 用户的显示名称
FIDOUSER target.user.userid 用户 ID
metadata.product_name 产品名称
metadata.vendor_name 供应商/公司名称

需要更多帮助?获得社区成员和 Google SecOps 专业人士的解答。