收集 Delinea 单点登录 (SSO) 日志

支持的平台:

本文档介绍了如何使用 Google Cloud Storage 将 Delinea(以前称为 Centrify)单点登录 (SSO) 日志提取到 Google Security Operations。解析器会提取日志,同时处理 JSON 和 syslog 格式。它会解析键值对、时间戳和其他相关字段,将它们映射到 UDM 模型,并使用特定逻辑来处理登录失败、用户代理、严重程度、身份验证机制和各种事件类型。对于失败事件中的目标电子邮件地址,它会优先考虑 FailUserName 而不是 NormalizedUser。

准备工作

确保您满足以下前提条件:

  • Google SecOps 实例
  • 已启用 Cloud Storage API 的 GCP 项目
  • 创建和管理 GCS 存储分区的权限
  • 管理 GCS 存储分区的 IAM 政策的权限
  • 创建 Cloud Run 函数、Pub/Sub 主题和 Cloud Scheduler 作业的权限
  • 对 Delinea (Centrify) SSO 租户的特权访问权限

收集 Delinea (Centrify) SSO 凭据

创建 OAuth2 客户端应用

  1. 登录 Delinea 管理员门户
  2. 依次前往应用 > 添加 Web 应用
  3. 点击自定义标签页。
  4. 搜索 OAuth2 客户端,然后点击添加
  5. 添加 Web 应用对话框中,点击
  6. 添加 Web 应用对话框中,点击关闭
  7. 应用配置页面上,配置以下内容:
    • “设置”标签页:
      • 应用 ID:输入唯一标识符(例如 secops-oauth-client)。
      • 应用名称:输入一个描述性名称(例如 SecOps Data Export)。
      • 应用说明:输入说明(例如 OAuth client for exporting audit events to SecOps)。
    • “一般用法”部分:
      • 客户端 ID 类型:选择保密
      • 已签发的客户端 ID:复制并保存此值。
      • 已签发的客户端密钥:复制并保存此值。
    • 令牌标签页:
      • 身份验证方法:选择客户端凭据
      • 令牌类型:选择 JwtRS256
    • 范围标签页:
      • 添加范围 redrock/query,说明为 Query API Access
  8. 点击保存以创建 OAuth 客户端。
  9. 复制以下详细信息并将其保存在安全的位置:
    • 租户网址:您的 Centrify 租户网址(例如 https://yourtenant.my.centrify.com)。
    • 客户端 ID:来自第 7 步。
    • 客户端密钥:来自第 7 步。
    • OAuth 应用 ID:来自应用配置。

验证权限

如需验证 OAuth 客户端是否拥有所需权限,请执行以下操作:

  1. 登录 Delinea 管理员门户
  2. 依次前往设置 (⚙️) > 资源 > 角色
  3. 验证分配给 OAuth 客户端的角色是否包含报告:审核事件:查看权限。
  4. 如果缺少相应权限,请与 Delinea 管理员联系,让其授予所需权限。

测试 API 访问权限

  • 在继续进行集成之前,请先测试您的凭据:

    # Replace with your actual credentials
    TENANT_URL="https://yourtenant.my.centrify.com"
    CLIENT_ID="your-client-id"
    CLIENT_SECRET="your-client-secret"
    OAUTH_APP_ID="your-oauth-application-id"
    
    # Get OAuth token
    TOKEN=$(curl -s -X POST "${TENANT_URL}/oauth2/token/${OAUTH_APP_ID}" \
      -H "Authorization: Basic $(echo -n "${CLIENT_ID}:${CLIENT_SECRET}" | base64)" \
      -H "X-CENTRIFY-NATIVE-CLIENT: True" \
      -H "Content-Type: application/x-www-form-urlencoded" \
      -d "grant_type=client_credentials&scope=redrock/query" | jq -r '.access_token')
    
    # Test query API access
    curl -v -X POST "${TENANT_URL}/Redrock/query" \
      -H "Authorization: Bearer ${TOKEN}" \
      -H "X-CENTRIFY-NATIVE-CLIENT: True" \
      -H "Content-Type: application/json" \
      -d '{"Script":"Select * from Event where WhenOccurred > datefunc('"'"'now'"'"', '"'"'-1'"'"') ORDER BY WhenOccurred ASC","args":{"PageNumber":1,"PageSize":10,"Limit":10,"Caching":-1}}'
    

如果成功,您应该会看到包含审核事件的 JSON 响应。如果您收到 401 或 403 错误,请验证您的凭据和权限。

创建 Google Cloud Storage 存储分区

  1. 前往 Google Cloud 控制台
  2. 选择您的项目或创建新项目。
  3. 在导航菜单中,依次前往 Cloud Storage > 存储分区
  4. 点击创建存储分区
  5. 提供以下配置详细信息:

    设置
    为存储分区命名 输入一个全局唯一的名称(例如 delinea-centrify-logs-bucket
    位置类型 根据您的需求进行选择(区域级、双区域级、多区域级)
    位置 选择相应位置(例如 us-central1
    存储类别 标准(建议用于经常访问的日志)
    访问权限控制 统一(推荐)
    保护工具 可选:启用对象版本控制或保留政策
  6. 点击创建

为 Cloud Run 函数创建服务账号

Cloud Run 函数需要一个服务账号,该账号具有向 GCS 存储分区写入内容以及被 Pub/Sub 调用的权限。

创建服务账号

  1. GCP 控制台中,依次前往 IAM 和管理 > 服务账号
  2. 点击创建服务账号
  3. 提供以下配置详细信息:
    • 服务账号名称:输入 delinea-sso-collector-sa
    • 服务账号说明:输入 Service account for Cloud Run function to collect Delinea SSO logs
  4. 点击创建并继续
  5. 向此服务账号授予对项目的访问权限部分中,添加以下角色:
    1. 点击选择角色
    2. 搜索并选择 Storage Object Admin
    3. 点击 + 添加其他角色
    4. 搜索并选择 Cloud Run Invoker
    5. 点击 + 添加其他角色
    6. 搜索并选择 Cloud Functions Invoker
  6. 点击继续
  7. 点击完成

必须拥有这些角色,才能:

  • Storage Object Admin:将日志写入 GCS 存储分区并管理状态文件
  • Cloud Run Invoker:允许 Pub/Sub 调用函数
  • Cloud Functions Invoker:允许调用函数

授予对 GCS 存储分区的 IAM 权限

向服务账号授予对 GCS 存储分区的写入权限:

  1. 前往 Cloud Storage > 存储分区
  2. 点击您的存储分区名称(例如 delinea-centrify-logs-bucket)。
  3. 前往权限标签页。
  4. 点击授予访问权限
  5. 提供以下配置详细信息:
    • 添加主账号:输入服务账号电子邮件地址(例如 delinea-sso-collector-sa@PROJECT_ID.iam.gserviceaccount.com)。
    • 分配角色:选择 Storage Object Admin
  6. 点击保存

创建发布/订阅主题

创建一个 Pub/Sub 主题,供 Cloud Scheduler 发布消息,并供 Cloud Run 函数订阅。

  1. GCP 控制台中,前往 Pub/Sub > 主题
  2. 点击创建主题
  3. 提供以下配置详细信息:
    • 主题 ID:输入 delinea-sso-logs-trigger
    • 将其他设置保留为默认值。
  4. 点击创建

创建 Cloud Run 函数以收集日志

Cloud Run 函数由来自 Cloud Scheduler 的 Pub/Sub 消息触发,用于从 Delinea SSO API 中提取日志并将其写入 GCS。

  1. GCP 控制台中,前往 Cloud Run
  2. 点击创建服务
  3. 选择函数(使用内嵌编辑器创建函数)。
  4. 配置部分中,提供以下配置详细信息:

    设置
    Service 名称 delinea-sso-log-export
    区域 选择与您的 GCS 存储分区匹配的区域(例如 us-central1
    运行时 选择 Python 3.12 或更高版本
  5. 触发器(可选)部分中:

    1. 点击 + 添加触发器
    2. 选择 Cloud Pub/Sub
    3. 选择 Cloud Pub/Sub 主题部分,选择 Pub/Sub 主题 delinea-sso-logs-trigger
    4. 点击保存
  6. 身份验证部分中:

    1. 选择需要进行身份验证
    2. 检查 Identity and Access Management (IAM)
  7. 向下滚动并展开容器、网络、安全性

  8. 前往安全标签页:

    • 服务账号:选择服务账号 delinea-sso-collector-sa
  9. 前往容器标签页:

    1. 点击变量和密钥
    2. 为每个环境变量点击 + 添加变量
    变量名称 示例值 说明
    GCS_BUCKET delinea-centrify-logs-bucket GCS 存储分区名称
    GCS_PREFIX centrify-sso-logs 日志文件的前缀
    STATE_KEY centrify-sso-logs/state.json 状态文件路径
    TENANT_URL https://yourtenant.my.centrify.com Delinea 租户网址
    CLIENT_ID your-client-id OAuth 客户端 ID
    CLIENT_SECRET your-client-secret OAuth 客户端密钥
    OAUTH_APP_ID your-oauth-application-id OAuth 应用 ID
    PAGE_SIZE 1000 每页记录数
    MAX_PAGES 10 要提取的最大网页数
  10. 变量和 Secret 部分中,向下滚动到请求

    • 请求超时:输入 600 秒(10 分钟)。
  11. 前往设置标签页:

    • 资源部分中:
      • 内存:选择 512 MiB 或更高值。
      • CPU:选择 1
  12. 修订版本扩缩部分中:

    • 实例数下限:输入 0
    • 实例数上限:输入 100(或根据预期负载进行调整)。
  13. 点击创建

  14. 等待服务创建完成(1-2 分钟)。

  15. 创建服务后,系统会自动打开内嵌代码编辑器

添加函数代码

  1. 函数入口点中输入 main
  2. 在内嵌代码编辑器中,创建两个文件:

    • 第一个文件:main.py:
    import functions_framework
    from google.cloud import storage
    import json
    import os
    import urllib3
    from datetime import datetime, timezone, timedelta
    import time
    import base64
    
    # Initialize HTTP client with timeouts
    http = urllib3.PoolManager(
        timeout=urllib3.Timeout(connect=5.0, read=30.0),
        retries=False,
    )
    
    # Initialize Storage client
    storage_client = storage.Client()
    
    @functions_framework.cloud_event
    def main(cloud_event):
        """
        Cloud Run function triggered by Pub/Sub to fetch Delinea Centrify SSO audit events and write to GCS.
    
        Args:
            cloud_event: CloudEvent object containing Pub/Sub message
        """
    
        # Get environment variables
        bucket_name = os.environ.get('GCS_BUCKET')
        prefix = os.environ.get('GCS_PREFIX', 'centrify-sso-logs')
        state_key = os.environ.get('STATE_KEY', 'centrify-sso-logs/state.json')
    
        # Centrify API credentials
        tenant_url = os.environ.get('TENANT_URL')
        client_id = os.environ.get('CLIENT_ID')
        client_secret = os.environ.get('CLIENT_SECRET')
        oauth_app_id = os.environ.get('OAUTH_APP_ID')
    
        # Optional parameters
        page_size = int(os.environ.get('PAGE_SIZE', '1000'))
        max_pages = int(os.environ.get('MAX_PAGES', '10'))
    
        if not all([bucket_name, tenant_url, client_id, client_secret, oauth_app_id]):
            print('Error: Missing required environment variables')
            return
    
        try:
            # Get GCS bucket
            bucket = storage_client.bucket(bucket_name)
    
            # Load state (last processed timestamp)
            state = load_state(bucket, state_key)
            last_timestamp = state.get('last_timestamp')
    
            print(f'Processing logs since {last_timestamp if last_timestamp else "24 hours ago"}')
    
            # Get OAuth access token
            access_token = get_oauth_token(tenant_url, client_id, client_secret, oauth_app_id)
    
            # Fetch audit events
            events = fetch_audit_events(tenant_url, access_token, last_timestamp, page_size, max_pages)
    
            if events:
                # Write events to GCS
                current_timestamp = datetime.now(timezone.utc)
                blob_name = f"{prefix}/centrify-sso-events-{current_timestamp.strftime('%Y%m%d_%H%M%S')}.json"
                blob = bucket.blob(blob_name)
    
                # Convert to JSONL format (one JSON object per line)
                jsonl_content = '\n'.join([json.dumps(event, default=str) for event in events])
                blob.upload_from_string(jsonl_content, content_type='application/x-ndjson')
    
                print(f'Wrote {len(events)} events to {blob_name}')
    
                # Update state with latest timestamp
                latest_timestamp = get_latest_event_timestamp(events)
                save_state(bucket, state_key, {'last_timestamp': latest_timestamp, 'updated_at': current_timestamp.isoformat() + 'Z'})
    
                print(f'Successfully processed {len(events)} events')
            else:
                print('No new events found')
    
        except Exception as e:
            print(f'Error processing Centrify SSO logs: {str(e)}')
            raise
    
    def get_oauth_token(tenant_url, client_id, client_secret, oauth_app_id):
        """Get OAuth access token using client credentials flow."""
        credentials = f"{client_id}:{client_secret}"
        basic_auth = base64.b64encode(credentials.encode('utf-8')).decode('utf-8')
    
        token_url = f"{tenant_url}/oauth2/token/{oauth_app_id}"
        headers = {
            'Authorization': f'Basic {basic_auth}',
            'X-CENTRIFY-NATIVE-CLIENT': 'True',
            'Content-Type': 'application/x-www-form-urlencoded'
        }
    
        data = {
            'grant_type': 'client_credentials',
            'scope': 'redrock/query'
        }
    
        response = http.request('POST', token_url, headers=headers, fields=data)
    
        if response.status != 200:
            raise Exception(f"OAuth token request failed: {response.status} {response.data.decode('utf-8')}")
    
        token_data = json.loads(response.data.decode('utf-8'))
        return token_data['access_token']
    
    def fetch_audit_events(tenant_url, access_token, last_timestamp, page_size, max_pages):
        """Fetch audit events from Centrify using the Redrock/query API with proper pagination."""
        query_url = f"{tenant_url}/Redrock/query"
        headers = {
            'Authorization': f'Bearer {access_token}',
            'X-CENTRIFY-NATIVE-CLIENT': 'True',
            'Content-Type': 'application/json'
        }
    
        # Build SQL query with timestamp filter
        if last_timestamp:
            sql_query = f"Select * from Event where WhenOccurred > '{last_timestamp}' ORDER BY WhenOccurred ASC"
        else:
            # First run - get events from last 24 hours
            sql_query = "Select * from Event where WhenOccurred > datefunc('now', '-1') ORDER BY WhenOccurred ASC"
    
        all_events = []
        page_num = 1
        backoff = 1.0
    
        while page_num <= max_pages:
            payload = {
                "Script": sql_query,
                "args": {
                    "PageNumber": page_num,
                    "PageSize": page_size,
                    "Limit": page_size * max_pages,
                    "Caching": -1
                }
            }
    
            try:
                response = http.request('POST', query_url, headers=headers, body=json.dumps(payload))
    
                # Handle rate limiting with exponential backoff
                if response.status == 429:
                    retry_after = int(response.headers.get('Retry-After', str(int(backoff))))
                    print(f"Rate limited (429). Retrying after {retry_after}s...")
                    time.sleep(retry_after)
                    backoff = min(backoff * 2, 30.0)
                    continue
    
                backoff = 1.0
    
                if response.status != 200:
                    raise Exception(f"API query failed: {response.status} {response.data.decode('utf-8')}")
    
                response_data = json.loads(response.data.decode('utf-8'))
    
                if not response_data.get('success', False):
                    raise Exception(f"API query failed: {response_data.get('Message', 'Unknown error')}")
    
                # Parse the response
                result = response_data.get('Result', {})
                columns = {col['Name']: i for i, col in enumerate(result.get('Columns', []))}
                raw_results = result.get('Results', [])
    
                if not raw_results:
                    print(f"No more results on page {page_num}")
                    break
    
                print(f"Page {page_num}: Retrieved {len(raw_results)} events")
    
                for raw_event in raw_results:
                    event = {}
                    row_data = raw_event.get('Row', {})
    
                    # Map column names to values
                    for col_name, col_index in columns.items():
                        if col_name in row_data and row_data[col_name] is not None:
                            event[col_name] = row_data[col_name]
    
                    # Add metadata
                    event['_source'] = 'centrify_sso'
                    event['_collected_at'] = datetime.now(timezone.utc).isoformat() + 'Z'
    
                    all_events.append(event)
    
                # Check if we've reached the end
                if len(raw_results) < page_size:
                    print(f"Reached last page (page {page_num} returned {len(raw_results)} < {page_size})")
                    break
    
                page_num += 1
    
            except Exception as e:
                print(f"Error fetching page {page_num}: {e}")
                raise
    
        print(f"Retrieved {len(all_events)} total events from {page_num} pages")
        return all_events
    
    def get_latest_event_timestamp(events):
        """Get the latest timestamp from the events for state tracking."""
        if not events:
            return datetime.now(timezone.utc).isoformat() + 'Z'
    
        latest = None
        for event in events:
            when_occurred = event.get('WhenOccurred')
            if when_occurred:
                if latest is None or when_occurred > latest:
                    latest = when_occurred
    
        return latest or datetime.now(timezone.utc).isoformat() + 'Z'
    
    def load_state(bucket, key):
        """Load state from GCS."""
        try:
            blob = bucket.blob(key)
            if blob.exists():
                state_data = blob.download_as_text()
                return json.loads(state_data)
        except Exception as e:
            print(f'Warning: Could not load state: {str(e)}')
        return {}
    
    def save_state(bucket, key, state):
        """Save state to GCS."""
        try:
            blob = bucket.blob(key)
            blob.upload_from_string(
                json.dumps(state),
                content_type='application/json'
            )
        except Exception as e:
            print(f'Warning: Could not save state: {str(e)}')
    
    • 第二个文件:requirements.txt:
    functions-framework==3.*
    google-cloud-storage==2.*
    urllib3>=2.0.0
    
  3. 点击部署以保存并部署该函数。

  4. 等待部署完成(2-3 分钟)。

创建 Cloud Scheduler 作业

Cloud Scheduler 会定期向 Pub/Sub 主题发布消息,从而触发 Cloud Run 函数。

  1. GCP Console 中,前往 Cloud Scheduler
  2. 点击创建作业
  3. 提供以下配置详细信息:

    设置
    名称 delinea-sso-log-export-hourly
    区域 选择与 Cloud Run 函数相同的区域
    频率 0 * * * *(每小时一次,在整点时)
    时区 选择时区(建议选择世界协调时间 [UTC])
    目标类型 Pub/Sub
    主题 选择 Pub/Sub 主题 delinea-sso-logs-trigger
    消息正文 {}(空 JSON 对象)
  4. 点击创建

时间表频率选项

  • 根据日志量和延迟时间要求选择频次:

    频率 Cron 表达式 使用场景
    每隔 5 分钟 */5 * * * * 高容量、低延迟
    每隔 15 分钟 */15 * * * * 搜索量中等
    每小时 0 * * * * 标准(推荐)
    每 6 小时 0 */6 * * * 量小、批处理
    每天 0 0 * * * 历史数据收集

测试集成

  1. Cloud Scheduler 控制台中,找到您的作业(例如 delinea-sso-log-export-hourly)。
  2. 点击强制运行以手动触发作业。
  3. 等待几秒钟。
  4. 前往 Cloud Run > 服务
  5. 点击函数名称 delinea-sso-log-export
  6. 点击日志标签页。
  7. 验证函数是否已成功执行。查找以下项:

    Processing logs since YYYY-MM-DDTHH:MM:SS+00:00
    Page 1: Retrieved X events
    Wrote X events to centrify-sso-logs/centrify-sso-events_YYYYMMDD_HHMMSS.json
    Successfully processed X events
    
  8. 前往 Cloud Storage > 存储分区

  9. 点击您的存储分区名称(例如 delinea-centrify-logs-bucket)。

  10. 前往前缀文件夹 centrify-sso-logs/

  11. 验证是否已创建具有当前时间戳的新 .json 文件。

如果您在日志中看到错误,请执行以下操作:

  • HTTP 401:检查环境变量中的 API 凭据(CLIENT_ID、CLIENT_SECRET、OAUTH_APP_ID)
  • HTTP 403:验证 OAuth 客户端是否具有报告:审核事件:查看权限
  • HTTP 429:速率限制 - 函数将自动重试并进行退避
  • 缺少环境变量:检查是否已在 Cloud Run 函数配置中设置所有必需的变量

检索 Google SecOps 服务账号

Google SecOps 使用唯一的服务账号从您的 GCS 存储分区中读取数据。您必须授予此服务账号对您的存储分区的访问权限。

获取服务账号电子邮件地址

  1. 依次前往 SIEM 设置 > Feed
  2. 点击添加新 Feed
  3. 点击配置单个 Feed
  4. Feed 名称字段中,输入 Feed 的名称(例如 Delinea Centrify SSO logs)。
  5. 选择 Google Cloud Storage V2 作为来源类型
  6. 选择 Centrify 作为日志类型
  7. 点击获取服务账号。系统会显示一个唯一的服务账号电子邮件地址,例如:

    chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com
    
  8. 复制此电子邮件地址,以便在下一步中使用。

向 Google SecOps 服务账号授予 IAM 权限

Google SecOps 服务账号需要对您的 GCS 存储分区具有 Storage Object Viewer 角色。

  1. 前往 Cloud Storage > 存储分区
  2. 点击您的存储分区名称(例如 delinea-centrify-logs-bucket)。
  3. 前往权限标签页。
  4. 点击授予访问权限
  5. 提供以下配置详细信息:
    • 添加主账号:粘贴 Google SecOps 服务账号电子邮件地址。
    • 分配角色:选择 Storage Object Viewer
  6. 点击保存

在 Google SecOps 中配置 Feed 以提取 Delinea (Centrify) SSO 日志

  1. 依次前往 SIEM 设置 > Feed
  2. 点击添加新 Feed
  3. 点击配置单个 Feed
  4. Feed 名称字段中,输入 Feed 的名称(例如 Delinea Centrify SSO logs)。
  5. 选择 Google Cloud Storage V2 作为来源类型
  6. 选择 Centrify 作为日志类型
  7. 点击下一步
  8. 为以下输入参数指定值:

    • 存储分区网址:输入带有前缀路径的 GCS 存储分区 URI:

      gs://delinea-centrify-logs-bucket/centrify-sso-logs/
      
        • delinea-centrify-logs-bucket:您的 GCS 存储分区名称。
        • centrify-sso-logs:存储日志的可选前缀/文件夹路径(留空表示根目录)。
      • 示例

        • 根存储分区:gs://company-logs/
        • 带前缀:gs://company-logs/centrify-sso-logs/
        • 使用子文件夹:gs://company-logs/delinea/sso/
    • 来源删除选项:根据您的偏好选择删除选项:

      • 永不:永不删除转移后的任何文件(建议用于测试)。
      • 删除已转移的文件:在成功转移后删除文件。
      • 删除已转移的文件和空目录:在成功转移后删除文件和空目录。

    • 文件存在时间上限:包含在过去指定天数内修改的文件。默认值为 180 天。

    • 资产命名空间资产命名空间

    • 注入标签:要应用于此 Feed 中事件的标签。

  9. 点击下一步

  10. 最终确定界面中查看新的 Feed 配置,然后点击提交

UDM 映射表

日志字段 UDM 映射 逻辑
AccountID security_result.detection_fields.value 原始日志中的 AccountID 值会分配给一个 security_result.detection_fields 对象,该对象的键为“Account ID”。
ApplicationName target.application 原始日志中的 ApplicationName 值会分配给 target.application 字段。
AuthorityFQDN target.asset.network_domain 原始日志中的 AuthorityFQDN 值会分配给 target.asset.network_domain 字段。
AuthorityID target.asset.asset_id 原始日志中的 AuthorityID 值会分配给 target.asset.asset_id 字段,并以“AuthorityID:”为前缀。
AzDeploymentId security_result.detection_fields.value 原始日志中的 AzDeploymentId 值会分配给键为 AzDeploymentId 的 security_result.detection_fields 对象。
AzRoleId additional.fields.value.string_value 原始日志中的 AzRoleId 值会分配给一个键为 AzRoleId 的 additional.fields 对象。
AzRoleName target.user.attribute.roles.name 原始日志中的 AzRoleName 值会分配给 target.user.attribute.roles.name 字段。
ComputerFQDN principal.asset.network_domain 原始日志中的 ComputerFQDN 值会分配给 principal.asset.network_domain 字段。
ComputerID principal.asset.asset_id 原始日志中的 ComputerID 值会分配给 principal.asset.asset_id 字段,并带有“ComputerId:”前缀。
ComputerName about.hostname 原始日志中的 ComputerName 值会分配给 about.hostname 字段。
CredentialId security_result.detection_fields.value 原始日志中的 CredentialId 值会分配给一个键为“Credential Id”的 security_result.detection_fields 对象。
DirectoryServiceName security_result.detection_fields.value 原始日志中的 DirectoryServiceName 值会分配给一个键为“Directory Service Name”的 security_result.detection_fields 对象。
DirectoryServiceNameLocalized security_result.detection_fields.value 原始日志中的 DirectoryServiceNameLocalized 值会分配给一个 security_result.detection_fields 对象,该对象的键为“Directory Service Name Localized”。
DirectoryServiceUuid security_result.detection_fields.value 原始日志中的 DirectoryServiceUuid 值会分配给一个键为“Directory Service Uuid”的 security_result.detection_fields 对象。
EventMessage security_result.summary 原始日志中的 EventMessage 值会分配给 security_result.summary 字段。
EventType metadata.product_event_type 原始日志中的 EventType 值会分配给 metadata.product_event_type 字段。它还用于确定 metadata.event_type。
FailReason security_result.summary 如果原始日志中存在 FailReason,则将其值分配给 security_result.summary 字段。
FailUserName target.user.email_addresses 如果原始日志中存在 FailUserName 的值,则会将其分配给 target.user.email_addresses 字段。
FromIPAddress principal.ip 原始日志中的 FromIPAddress 值会分配给 principal.ip 字段。
ID security_result.detection_fields.value 原始日志中的 ID 值会分配给键为“ID”的 security_result.detection_fields 对象。
InternalTrackingID metadata.product_log_id 原始日志中的 InternalTrackingID 值会分配给元数据的 product_log_id 字段。
JumpType additional.fields.value.string_value 原始日志中的 JumpType 值会分配给一个键为“跳转类型”的 additional.fields 对象。
NormalizedUser target.user.email_addresses 原始日志中的 NormalizedUser 值会分配给 target.user.email_addresses 字段。
OperationMode additional.fields.value.string_value 原始日志中的 OperationMode 值会分配给一个键为“Operation Mode”的 additional.fields 对象。
ProxyId security_result.detection_fields.value 原始日志中的 ProxyId 值会分配给一个键为“Proxy Id”的 security_result.detection_fields 对象。
RequestUserAgent network.http.user_agent 原始日志中的 RequestUserAgent 值会分配给 network.http.user_agent 字段。
SessionGuid network.session_id 原始日志中的 SessionGuid 值会分配给 network.session_id 字段。
租户 additional.fields.value.string_value 原始日志中的租户值会分配给一个键为“Tenant”的 additional.fields 对象。
ThreadType additional.fields.value.string_value 原始日志中的 ThreadType 值会分配给一个键为“Thread Type”的 additional.fields 对象。
UserType principal.user.attribute.roles.name 原始日志中的 UserType 值会分配给 principal.user.attribute.roles.name 字段。
WhenOccurred metadata.event_timestamp 系统会解析原始日志中的 WhenOccurred 值,并将其分配给 metadata.event_timestamp 字段。此字段还会填充顶级时间戳字段。
硬编码值 metadata.product_name “SSO”。
硬编码值 metadata.event_type 由 EventType 字段确定。如果未提供 EventType 或提供的 EventType 不符合任何特定条件,则默认为 STATUS_UPDATE。可以是 USER_LOGIN、USER_CREATION、USER_RESOURCE_ACCESS、USER_LOGOUT 或 USER_CHANGE_PASSWORD。
硬编码值 metadata.vendor_name “CENTRIFY_SSO”。
硬编码值 metadata.product_version “SSO”。
硬编码值 metadata.log_type “Centrify”。
从消息字段中提取 network.session_id 如果消息字段包含会话 ID,则系统会提取并使用该 ID。否则,默认值为“1”。
从主机字段中提取 principal.hostname 如果可用,则从 syslog 标头的 host 字段中提取。
从 PID 字段中提取 principal.process.pid 如果可用,则从来自 syslog 标头的 pid 字段中提取。
UserGuid 或从消息中提取 target.user.userid 如果存在 UserGuid,则使用其值。否则,如果消息字段包含用户 ID,系统会提取并使用该 ID。
由 Level 和 FailReason 确定 security_result.action 如果 Level 为“Info”,则设置为“ALLOW”;如果存在 FailReason,则设置为“BLOCK”。
由 FailReason 确定 security_result.category 如果存在 FailReason,则设置为“AUTH_VIOLATION”。
由“级别”字段决定 security_result.severity 由“级别”字段确定。如果 Level 为“Info”,则设置为“INFORMATIONAL”;如果 Level 为“Warning”,则设置为“MEDIUM”;如果 Level 为“Error”,则设置为“ERROR”。

需要更多帮助?获得社区成员和 Google SecOps 专业人士的解答。