收集 BeyondTrust Endpoint Privilege Management (EPM) 日志

支持的平台:

本文档介绍了如何使用 Google Cloud Storage 将 BeyondTrust Endpoint Privilege Management (EPM) 日志提取到 Google Security Operations。该解析器专注于将 BeyondTrust Endpoint 的原始 JSON 日志数据转换为符合 Chronicle UDM 的结构化格式。它首先初始化各个字段的默认值,然后解析 JSON 载荷,随后将原始日志中的特定字段映射到 event.idm.read_only_udm 对象中的相应 UDM 字段。

准备工作

确保您满足以下前提条件:

  • Google SecOps 实例
  • 已启用 Cloud Storage API 的 GCP 项目
  • 创建和管理 GCS 存储分区的权限
  • 管理 GCS 存储分区的 IAM 政策的权限
  • 创建 Cloud Run 服务、Pub/Sub 主题和 Cloud Scheduler 作业的权限
  • 对 BeyondTrust Endpoint Privilege Management 租户或 API 的特权访问权限

创建 Google Cloud Storage 存储分区

  1. 前往 Google Cloud 控制台
  2. 选择您的项目或创建新项目。
  3. 在导航菜单中,依次前往 Cloud Storage > 存储分区
  4. 点击创建存储分区
  5. 提供以下配置详细信息:

    设置
    为存储分区命名 输入一个全局唯一的名称(例如 beyondtrust-epm-logs
    位置类型 根据您的需求进行选择(区域级、双区域级、多区域级)
    位置 选择相应位置(例如 us-central1
    存储类别 标准(建议用于经常访问的日志)
    访问权限控制 统一(推荐)
    保护工具 可选:启用对象版本控制或保留政策
  6. 点击创建

收集 BeyondTrust EPM API 凭据

  1. 以管理员身份登录 BeyondTrust Privilege Management Web 控制台。
  2. 依次前往配置 > 设置 > API 设置
  3. 点击 Create an API Account
  4. 提供以下配置详细信息:
    • 名称:输入 Google SecOps Collector
    • API 访问权限:根据需要启用审核(读取)和其他范围。
  5. 复制并保存客户端 ID客户端密钥
  6. 复制您的 API 基准网址;该网址通常为 https://<your-tenant>-services.pm.beyondtrustcloud.com(您将使用此网址作为 BPT_API_网址)。

为 Cloud Run 函数创建服务账号

Cloud Run 函数需要一个服务账号,该账号具有向 GCS 存储分区写入内容以及被 Pub/Sub 调用的权限。

创建服务账号

  1. GCP 控制台中,依次前往 IAM 和管理 > 服务账号
  2. 点击创建服务账号
  3. 提供以下配置详细信息:
    • 服务账号名称:输入 beyondtrust-epm-collector-sa
    • 服务账号说明:输入 Service account for Cloud Run function to collect BeyondTrust EPM logs
  4. 点击创建并继续
  5. 向此服务账号授予对项目的访问权限部分中,添加以下角色:
    1. 点击选择角色
    2. 搜索并选择 Storage Object Admin
    3. 点击 + 添加其他角色
    4. 搜索并选择 Cloud Run Invoker
    5. 点击 + 添加其他角色
    6. 搜索并选择 Cloud Functions Invoker
  6. 点击继续
  7. 点击完成

必须拥有这些角色,才能:

  • Storage Object Admin:将日志写入 GCS 存储分区并管理状态文件
  • Cloud Run Invoker:允许 Pub/Sub 调用函数
  • Cloud Functions Invoker:允许调用函数

授予对 GCS 存储分区的 IAM 权限

向服务账号授予对 GCS 存储分区的写入权限:

  1. 前往 Cloud Storage > 存储分区
  2. 点击您的存储分区名称。
  3. 前往权限标签页。
  4. 点击授予访问权限
  5. 提供以下配置详细信息:
    • 添加主账号:输入服务账号电子邮件地址(例如 beyondtrust-epm-collector-sa@PROJECT_ID.iam.gserviceaccount.com)。
    • 分配角色:选择 Storage Object Admin
  6. 点击保存

创建发布/订阅主题

创建一个 Pub/Sub 主题,供 Cloud Scheduler 发布消息,并供 Cloud Run 函数订阅。

  1. GCP 控制台中,前往 Pub/Sub > 主题
  2. 点击创建主题
  3. 提供以下配置详细信息:
    • 主题 ID:输入 beyondtrust-epm-trigger
    • 将其他设置保留为默认值。
  4. 点击创建

创建 Cloud Run 函数以收集日志

Cloud Run 函数由来自 Cloud Scheduler 的 Pub/Sub 消息触发,用于从 BeyondTrust EPM API 中提取日志并将其写入 GCS。

  1. GCP 控制台中,前往 Cloud Run
  2. 点击创建服务
  3. 选择函数(使用内嵌编辑器创建函数)。
  4. 配置部分中,提供以下配置详细信息:

    设置
    Service 名称 beyondtrust-epm-collector
    区域 选择与您的 GCS 存储分区匹配的区域(例如 us-central1
    运行时 选择 Python 3.12 或更高版本
  5. 触发器(可选)部分中:

    1. 点击 + 添加触发器
    2. 选择 Cloud Pub/Sub
    3. 选择 Cloud Pub/Sub 主题部分,选择主题 beyondtrust-epm-trigger
    4. 点击保存
  6. 身份验证部分中:

    1. 选择需要进行身份验证
    2. 检查 Identity and Access Management (IAM)
  7. 向下滚动并展开容器、网络、安全性

  8. 前往安全标签页:

    • 服务账号:选择服务账号 beyondtrust-epm-collector-sa
  9. 前往容器标签页:

    1. 点击变量和密钥
    2. 为每个环境变量点击 + 添加变量
    变量名称 示例值
    GCS_BUCKET beyondtrust-epm-logs
    GCS_PREFIX beyondtrust-epm/
    STATE_KEY beyondtrust-epm/state.json
    BPT_API_URL https://yourtenant-services.pm.beyondtrustcloud.com
    CLIENT_ID your-client-id
    CLIENT_SECRET your-client-secret
    OAUTH_SCOPE management-api
    RECORD_SIZE 1000
    MAX_ITERATIONS 10
  10. 变量和密钥标签页中,向下滚动到请求

    • 请求超时:输入 600 秒(10 分钟)。
  11. 前往容器中的设置标签页:

    • 资源部分中:
      • 内存:选择 512 MiB 或更高值。
      • CPU:选择 1
    • 点击完成
  12. 滚动到执行环境

    • 选择默认(推荐)。
  13. 修订版本扩缩部分中:

    • 实例数下限:输入 0
    • 实例数上限:输入 100(或根据预期负载进行调整)。
  14. 点击创建

  15. 等待服务创建完成(1-2 分钟)。

  16. 创建服务后,系统会自动打开内嵌代码编辑器

添加函数代码

  1. 函数入口点中输入 main
  2. 在内嵌代码编辑器中,创建两个文件:

    • 第一个文件:main.py:
    import functions_framework
    from google.cloud import storage
    import json
    import os
    import urllib3
    from datetime import datetime, timezone, timedelta
    import time
    
    # Initialize HTTP client
    http = urllib3.PoolManager()
    
    # Initialize Storage client
    storage_client = storage.Client()
    
    @functions_framework.cloud_event
    def main(cloud_event):
        """
        Cloud Run function triggered by Pub/Sub to fetch logs from BeyondTrust EPM API and write to GCS.
    
        Args:
            cloud_event: CloudEvent object containing Pub/Sub message
        """
    
        # Get environment variables
        bucket_name = os.environ.get('GCS_BUCKET')
        prefix = os.environ.get('GCS_PREFIX', 'beyondtrust-epm/')
        state_key = os.environ.get('STATE_KEY', 'beyondtrust-epm/state.json')
    
        # BeyondTrust EPM API credentials
        api_url = os.environ.get('BPT_API_URL')
        client_id = os.environ.get('CLIENT_ID')
        client_secret = os.environ.get('CLIENT_SECRET')
        oauth_scope = os.environ.get('OAUTH_SCOPE', 'management-api')
        record_size = int(os.environ.get('RECORD_SIZE', '1000'))
        max_iterations = int(os.environ.get('MAX_ITERATIONS', '10'))
    
        if not all([bucket_name, api_url, client_id, client_secret]):
            print('Error: Missing required environment variables')
            return
    
        try:
            # Get GCS bucket
            bucket = storage_client.bucket(bucket_name)
    
            # Load state (last processed timestamp)
            state = load_state(bucket, state_key)
            last_timestamp = state.get('last_timestamp', (datetime.utcnow() - timedelta(hours=24)).strftime("%Y-%m-%dT%H:%M:%SZ"))
    
            print(f'Processing logs since {last_timestamp}')
    
            # Get OAuth access token
            token = get_oauth_token(api_url, client_id, client_secret, oauth_scope)
    
            # Fetch audit events
            events = fetch_audit_events(api_url, token, last_timestamp, record_size, max_iterations)
    
            if events:
                # Store events in GCS
                current_timestamp = datetime.utcnow()
                filename = f"{prefix}beyondtrust-epm-events-{current_timestamp.strftime('%Y%m%d_%H%M%S')}.json"
                store_events_to_gcs(bucket, filename, events)
    
                # Update state with latest timestamp
                latest_timestamp = get_latest_event_timestamp(events)
                save_state(bucket, state_key, {'last_timestamp': latest_timestamp, 'updated_at': datetime.utcnow().isoformat() + 'Z'})
    
                print(f'Successfully processed {len(events)} events and stored to {filename}')
            else:
                print('No new events found')
    
        except Exception as e:
            print(f'Error processing logs: {str(e)}')
            raise
    
    def get_oauth_token(api_url, client_id, client_secret, scope):
        """
        Get OAuth access token using client credentials flow for BeyondTrust EPM.
        Uses the correct endpoint: /oauth/token
        """
        token_url = f"{api_url}/oauth/token"
        headers = {'Content-Type': 'application/x-www-form-urlencoded'}
        body = f"grant_type=client_credentials&client_id={client_id}&client_secret={client_secret}&scope={scope}"
    
        response = http.request('POST', token_url, headers=headers, body=body, timeout=urllib3.Timeout(60.0))
    
        if response.status != 200:
            raise RuntimeError(f"Token request failed: {response.status} {response.data[:256]!r}")
    
        token_data = json.loads(response.data.decode('utf-8'))
        return token_data['access_token']
    
    def fetch_audit_events(api_url, access_token, last_timestamp, record_size, max_iterations):
        """
        Fetch audit events using the BeyondTrust EPM API endpoint: /management-api/v2/AuditEvents
        with query parameters for filtering and pagination
        """
        headers = {
            'Authorization': f'Bearer {access_token}',
            'Content-Type': 'application/json'
        }
    
        all_events = []
        current_start_date = last_timestamp
        iterations = 0
    
        # Enforce maximum RecordSize limit of 1000 based on BeyondTrust documentation
        record_size_limited = min(record_size, 1000)
    
        while iterations < max_iterations:
            iterations += 1
    
            if len(all_events) >= 10000:
                print(f"Reached maximum events limit (10000)")
                break
    
            # Use the BeyondTrust EPM API endpoint for audit events
            query_url = f"{api_url}/management-api/v2/AuditEvents"
            params = {
                'StartDate': current_start_date,
                'RecordSize': record_size_limited
            }
    
            # Construct URL with query parameters
            query_string = '&'.join([f"{k}={v}" for k, v in params.items()])
            full_url = f"{query_url}?{query_string}"
    
            try:
                response = http.request('GET', full_url, headers=headers, timeout=urllib3.Timeout(300.0))
    
                # Handle rate limiting with exponential backoff
                if response.status == 429:
                    retry_after = int(response.headers.get('Retry-After', '30'))
                    print(f"Rate limited (429). Retrying after {retry_after}s...")
                    time.sleep(retry_after)
                    continue
    
                if response.status != 200:
                    raise RuntimeError(f"API request failed: {response.status} {response.data[:256]!r}")
    
                response_data = json.loads(response.data.decode('utf-8'))
                events = response_data.get('events', [])
    
                if not events:
                    break
    
                print(f"Page {iterations}: Retrieved {len(events)} events")
                all_events.extend(events)
    
                # If we got fewer events than RecordSize, we've reached the end
                if len(events) < record_size_limited:
                    break
    
                # For pagination, update StartDate to the timestamp of the last event
                last_event = events[-1]
                last_timestamp = extract_event_timestamp(last_event)
    
                if not last_timestamp:
                    print("Warning: Could not find timestamp in last event for pagination")
                    break
    
                # Convert to datetime and add 1 second to avoid retrieving the same event again
                try:
                    dt = parse_timestamp(last_timestamp)
                    dt = dt + timedelta(seconds=1)
                    current_start_date = dt.strftime("%Y-%m-%dT%H:%M:%SZ")
                except Exception as e:
                    print(f"Error parsing timestamp {last_timestamp}: {e}")
                    break
    
            except Exception as e:
                print(f"Error fetching page {iterations}: {e}")
                break
    
        return all_events
    
    def extract_event_timestamp(event):
        """Extract timestamp from event, checking multiple possible fields"""
        # Check common timestamp fields
        timestamp_fields = ['event.dateTime', 'event.timestamp', 'timestamp', 'eventTime', 'dateTime', 'whenOccurred', 'date', 'time', 'event.ingested']
    
        # Try nested event.dateTime first (common in BeyondTrust)
        if isinstance(event, dict) and isinstance(event.get("event"), dict):
            ts = event["event"].get("dateTime")
            if ts:
                return ts
            ts = event["event"].get("timestamp")
            if ts:
                return ts
    
        # Fallback to other timestamp fields
        for field in timestamp_fields:
            if field in event and event[field]:
                return event[field]
    
        return None
    
    def parse_timestamp(timestamp_str):
        """Parse timestamp string to datetime object, handling various formats"""
        if isinstance(timestamp_str, (int, float)):
            # Unix timestamp (in milliseconds or seconds)
            if timestamp_str > 1e12:  # Milliseconds
                return datetime.fromtimestamp(timestamp_str / 1000, tz=timezone.utc)
            else:  # Seconds
                return datetime.fromtimestamp(timestamp_str, tz=timezone.utc)
    
        if isinstance(timestamp_str, str):
            # Try different string formats
            try:
                # ISO format with Z
                if timestamp_str.endswith('Z'):
                    return datetime.fromisoformat(timestamp_str.replace('Z', '+00:00'))
                # ISO format with timezone
                elif '+' in timestamp_str or timestamp_str.endswith('00:00'):
                    return datetime.fromisoformat(timestamp_str)
                # ISO format without timezone (assume UTC)
                else:
                    dt = datetime.fromisoformat(timestamp_str)
                    if dt.tzinfo is None:
                        dt = dt.replace(tzinfo=timezone.utc)
                    return dt
            except ValueError:
                pass
    
        raise ValueError(f"Could not parse timestamp: {timestamp_str}")
    
    def load_state(bucket, key):
        """Load state from GCS."""
        try:
            blob = bucket.blob(key)
            if blob.exists():
                state_data = blob.download_as_text()
                return json.loads(state_data)
        except Exception as e:
            print(f'Warning: Could not load state: {str(e)}')
        return {}
    
    def save_state(bucket, key, state):
        """Save state to GCS."""
        try:
            blob = bucket.blob(key)
            blob.upload_from_string(
                json.dumps(state),
                content_type='application/json'
            )
        except Exception as e:
            print(f'Warning: Could not save state: {str(e)}')
    
    def store_events_to_gcs(bucket, key, events):
        """Store events as JSONL (one JSON object per line) in GCS"""
        # Convert to JSONL format (one JSON object per line)
        jsonl_content = '\n'.join(json.dumps(event, default=str) for event in events)
    
        blob = bucket.blob(key)
        blob.upload_from_string(jsonl_content, content_type='application/x-ndjson')
    
    def get_latest_event_timestamp(events):
        """Get the latest timestamp from the events for state tracking"""
        if not events:
            return datetime.utcnow().isoformat() + 'Z'
    
        latest = None
        for event in events:
            timestamp = extract_event_timestamp(event)
            if timestamp:
                try:
                    event_dt = parse_timestamp(timestamp)
                    event_iso = event_dt.isoformat() + 'Z'
                    if latest is None or event_iso > latest:
                        latest = event_iso
                except Exception as e:
                    print(f"Error parsing event timestamp {timestamp}: {e}")
                    continue
    
        return latest or datetime.utcnow().isoformat() + 'Z'
    
    • 第二个文件:requirements.txt:
    functions-framework==3.*
    google-cloud-storage==2.*
    urllib3>=2.0.0
    
  3. 点击部署以保存并部署该函数。

  4. 等待部署完成(2-3 分钟)。

创建 Cloud Scheduler 作业

Cloud Scheduler 会定期向 Pub/Sub 主题发布消息,从而触发 Cloud Run 函数。

  1. GCP Console 中,前往 Cloud Scheduler
  2. 点击创建作业
  3. 提供以下配置详细信息:

    设置
    名称 beyondtrust-epm-collector-hourly
    区域 选择与 Cloud Run 函数相同的区域
    频率 0 * * * *(每小时一次,在整点时)
    时区 选择时区(建议选择世界协调时间 [UTC])
    目标类型 Pub/Sub
    主题 选择主题 beyondtrust-epm-trigger
    消息正文 {}(空 JSON 对象)
  4. 点击创建

时间表频率选项

  • 根据日志量和延迟时间要求选择频次:

    频率 Cron 表达式 使用场景
    每隔 5 分钟 */5 * * * * 高容量、低延迟
    每隔 15 分钟 */15 * * * * 搜索量中等
    每小时 0 * * * * 标准(推荐)
    每 6 小时 0 */6 * * * 量小、批处理
    每天 0 0 * * * 历史数据收集

测试调度器作业

  1. Cloud Scheduler 控制台中,找到您的作业。
  2. 点击强制运行以手动触发。
  3. 等待几秒钟,然后前往 Cloud Run > 服务 > beyondtrust-epm-collector > 日志
  4. 验证函数是否已成功执行。
  5. 检查 GCS 存储分区,确认日志已写入。

检索 Google SecOps 服务账号

Google SecOps 使用唯一的服务账号从您的 GCS 存储分区中读取数据。您必须授予此服务账号对您的存储分区的访问权限。

获取服务账号电子邮件地址

  1. 依次前往 SIEM 设置 > Feed
  2. 点击添加新 Feed
  3. 点击配置单个 Feed
  4. Feed 名称字段中,输入 Feed 的名称(例如 BeyondTrust EPM logs)。
  5. 选择 Google Cloud Storage V2 作为来源类型
  6. 选择 BeyondTrust Endpoint Privilege Management 作为日志类型
  7. 点击获取服务账号。系统会显示一个唯一的服务账号电子邮件地址,例如:

    chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com
    
  8. 复制此电子邮件地址,以便在下一步中使用。

向 Google SecOps 服务账号授予 IAM 权限

Google SecOps 服务账号需要对您的 GCS 存储分区具有 Storage Object Viewer 角色。

  1. 前往 Cloud Storage > 存储分区
  2. 点击您的存储分区名称。
  3. 前往权限标签页。
  4. 点击授予访问权限
  5. 提供以下配置详细信息:
    • 添加主账号:粘贴 Google SecOps 服务账号电子邮件地址。
    • 分配角色:选择 Storage Object Viewer
  6. 点击保存

在 Google SecOps 中配置 Feed 以提取 BeyondTrust EPM 日志

  1. 依次前往 SIEM 设置 > Feed
  2. 点击添加新 Feed
  3. 点击配置单个 Feed
  4. Feed 名称字段中,输入 Feed 的名称(例如 BeyondTrust EPM logs)。
  5. 选择 Google Cloud Storage V2 作为来源类型
  6. 选择 BeyondTrust Endpoint Privilege Management 作为日志类型
  7. 点击下一步
  8. 为以下输入参数指定值:

    • 存储分区网址:输入带有前缀路径的 GCS 存储分区 URI:

      gs://beyondtrust-epm-logs/beyondtrust-epm/
      
        • beyondtrust-epm-logs:您的 GCS 存储分区名称。
        • beyondtrust-epm/:存储日志的可选前缀/文件夹路径(留空表示根目录)。
      • 示例

        • 根存储分区:gs://beyondtrust-epm-logs/
        • 带前缀:gs://beyondtrust-epm-logs/beyondtrust-epm/
    • 来源删除选项:根据您的偏好选择删除选项:

      • 永不:永不删除转移后的任何文件(建议用于测试)。
      • 删除已转移的文件:在成功转移后删除文件。
      • 删除已转移的文件和空目录:在成功转移后删除文件和空目录。

    • 文件存在时间上限:包含在过去指定天数内修改的文件。默认值为 180 天。

    • 资产命名空间资产命名空间

    • 注入标签:要应用于此 Feed 中事件的标签。

  9. 点击下一步

  10. 最终确定界面中查看新的 Feed 配置,然后点击提交

UDM 映射表

日志字段 UDM 映射 逻辑
agent.id principal.asset.attribute.labels.value 映射到键为 agent_id 的标签
agent.version principal.asset.attribute.labels.value 映射到键为 agent_version 的标签
ecs.version principal.asset.attribute.labels.value 映射到具有键 ecs_version 的标签
event_data.reason metadata.description 原始日志中的事件说明
event_datas.ActionId metadata.product_log_id 特定于产品的日志标识符
file.path principal.file.full_path 活动中的完整文件路径
headers.content_length additional.fields.value.string_value 映射到具有关键 content_length 的标签
headers.content_type additional.fields.value.string_value 映射到键为 content_type 的标签
headers.http_host additional.fields.value.string_value 映射到键为 http_host 的标签
headers.http_version network.application_protocol_version HTTP 协议版本
headers.request_method network.http.method HTTP 请求方法
host.hostname principal.hostname 主账号主机名
host.hostname principal.asset.hostname 主要资产主机名
host.ip principal.asset.ip 主要资产 IP 地址
host.ip principal.ip 主 IP 地址
host.mac principal.mac 主要 MAC 地址
host.os.platform principal.platform 如果等于 macOS,则设置为 MAC
host.os.version principal.platform_version 操作系统版本
labels.related_item_id metadata.product_log_id 相关商品标识符
process.command_line principal.process.command_line 进程命令行
process.name additional.fields.value.string_value 映射到具有键 process_name 的标签
process.parent.name additional.fields.value.string_value 映射到键为 process_parent_name 的标签
process.parent.pid principal.process.parent_process.pid 父级进程 PID 已转换为字符串
process.pid principal.process.pid 进程 PID 已转换为字符串
user.id principal.user.userid 用户标识符
user.name principal.user.user_display_name 用户显示名称
不适用 metadata.event_timestamp 事件时间戳设置为日志条目时间戳
不适用 metadata.event_type 如果没有正文,则为 GENERIC_EVENT;否则为 STATUS_UPDATE
不适用 network.application_protocol 如果 http_version 字段包含 HTTP,则设置为 HTTP

需要更多帮助?获得社区成员和 Google SecOps 专业人士的解答。