收集 Delinea SSO 日志

支持的平台:

本文档介绍了如何使用 Amazon S3 将 Delinea(以前称为 Centrify)单点登录 (SSO) 日志注入到 Google Security Operations。解析器会提取日志,同时处理 JSON 和 syslog 格式。它会解析键值对、时间戳和其他相关字段,将它们映射到 UDM 模型,并使用特定逻辑来处理登录失败、用户代理、严重程度、身份验证机制和各种事件类型。对于失败事件中的目标电子邮件地址,该属性优先考虑 FailUserName 而不是 NormalizedUser

准备工作

请确保满足以下前提条件:

  • Google SecOps 实例。
  • Delinea (Centrify) SSO 租户的特权访问权限。
  • AWS(S3、Identity and Access Management (IAM)、Lambda、EventBridge)的特权访问权限。

收集 Delinea (Centrify) SSO 前提条件(ID、API 密钥、组织 ID、令牌)

  1. 登录 Delinea 管理员门户
  2. 依次前往应用 > 添加应用
  3. 搜索 OAuth2 客户端,然后点击添加
  4. 添加 Web 应用对话框中,点击
  5. 添加 Web 应用对话框中,点击关闭
  6. 应用配置页面上,配置以下内容:
    • 常规标签页:
      • 应用 ID:输入唯一标识符(例如 secops-oauth-client
      • 应用名称:输入一个描述性名称(例如 SecOps Data Export
      • 应用说明:输入说明(例如 OAuth client for exporting audit events to SecOps
    • 信任标签页:
      • 申请保密:选中此选项
      • Client ID Type:选择 Confidential
      • 已签发的客户端 ID:复制并保存此值
      • 已签发的客户端密钥:复制并保存此值
    • 令牌标签页:
      • 身份验证方法:选择 Client Creds
      • 令牌类型:选择 Jwt RS256
    • 范围标签页:
      • 添加范围 siem,并添加说明 SIEM 集成访问权限
      • 添加范围 redrock/query,说明为 Query API 访问权限
  7. 点击保存以创建 OAuth 客户端。
  8. 依次前往核心服务 > 用户 > 添加用户
  9. 配置服务用户:
    • 登录名:输入第 6 步中的客户端 ID
    • 电子邮件地址:输入有效的电子邮件地址(必填字段)。
    • 显示名称:输入一个描述性名称(例如 SecOps Service User)。
    • 密码确认密码:输入第 6 步中的客户端密钥
    • 状态:选择 Is OAuth confidential client
  10. 点击 Create User(创建用户)。
  11. 前往访问权限 > 角色,然后将服务用户分配给具有相应权限的角色,以便查询审核事件。
  12. 复制以下详细信息并将其保存在安全的位置:
    • 租户网址:您的 Centrify 租户网址(例如 https://yourtenant.my.centrify.com
    • 客户端 ID:来自第 6 步
    • 客户端密钥:来自第 6 步
    • OAuth 应用 ID:来自应用配置

为 Google SecOps 配置 AWS S3 存储桶和 IAM

  1. 按照以下用户指南创建 Amazon S3 存储桶创建存储桶
  2. 保存存储桶名称区域以供日后参考(例如 delinea-centrify-logs-bucket)。
  3. 按照以下用户指南创建用户创建 IAM 用户
  4. 选择创建的用户
  5. 选择安全凭据标签页。
  6. 访问密钥部分中,点击创建访问密钥
  7. 选择第三方服务作为使用情形
  8. 点击下一步
  9. 可选:添加说明标记。
  10. 点击创建访问密钥
  11. 点击下载 .CSV 文件,保存访问密钥秘密访问密钥,以供日后参考。
  12. 点击完成
  13. 选择权限标签页。
  14. 权限政策部分中,点击添加权限
  15. 选择添加权限
  16. 选择直接附加政策
  17. 搜索 AmazonS3FullAccess 政策。
  18. 选择相应政策。
  19. 点击下一步
  20. 点击添加权限

为 S3 上传配置 IAM 政策和角色

  1. AWS 控制台中,依次前往 IAM > 政策
  2. 依次点击创建政策 > JSON 标签页
  3. 复制并粘贴以下政策。
  4. 政策 JSON(如果您输入了其他存储桶名称,请替换 delinea-centrify-logs-bucket):

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": "s3:GetObject",
          "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket/centrify-sso-logs/state.json"
        }
      ]
    }
    
  5. 依次点击下一步 > 创建政策

  6. 依次前往 IAM > 角色

  7. 依次点击创建角色 > AWS 服务 > Lambda

  8. 附加新创建的政策和受管政策 AWSLambdaBasicExecutionRole(用于 CloudWatch 日志记录)。

  9. 将角色命名为 CentrifySSOLogExportRole,然后点击创建角色

创建 Lambda 函数

  1. AWS 控制台中,依次前往 Lambda > 函数 > 创建函数
  2. 点击从头开始创作
  3. 提供以下配置详细信息:

    设置
    名称 CentrifySSOLogExport
    运行时 Python 3.13
    架构 x86_64
    执行角色 CentrifySSOLogExportRole
  4. 创建函数后,打开 Code 标签页,删除桩代码并粘贴以下代码 (CentrifySSOLogExport.py)。

    import json
    import boto3
    import requests
    import base64
    from datetime import datetime, timedelta
    import os
    from typing import Dict, List, Optional
    
    def lambda_handler(event, context):
        """
        Lambda function to fetch Delinea Centrify SSO audit events and store them in S3
        """
    
        # Environment variables
        S3_BUCKET = os.environ['S3_BUCKET']
        S3_PREFIX = os.environ['S3_PREFIX']
        STATE_KEY = os.environ['STATE_KEY']
    
        # Centrify API credentials
        TENANT_URL = os.environ['TENANT_URL']
        CLIENT_ID = os.environ['CLIENT_ID']
        CLIENT_SECRET = os.environ['CLIENT_SECRET']
        OAUTH_APP_ID = os.environ['OAUTH_APP_ID']
    
        # Optional parameters
        PAGE_SIZE = int(os.environ.get('PAGE_SIZE', '1000'))
        MAX_PAGES = int(os.environ.get('MAX_PAGES', '10'))
    
        s3_client = boto3.client('s3')
    
        try:
            # Get last execution state
            last_timestamp = get_last_state(s3_client, S3_BUCKET, STATE_KEY)
    
            # Get OAuth access token
            access_token = get_oauth_token(TENANT_URL, CLIENT_ID, CLIENT_SECRET, OAUTH_APP_ID)
    
            # Fetch audit events
            events = fetch_audit_events(TENANT_URL, access_token, last_timestamp, PAGE_SIZE, MAX_PAGES)
    
            if events:
                # Store events in S3
                current_timestamp = datetime.utcnow()
                filename = f"{S3_PREFIX}centrify-sso-events-{current_timestamp.strftime('%Y%m%d_%H%M%S')}.json"
    
                store_events_to_s3(s3_client, S3_BUCKET, filename, events)
    
                # Update state with latest timestamp
                latest_timestamp = get_latest_event_timestamp(events)
                update_state(s3_client, S3_BUCKET, STATE_KEY, latest_timestamp)
    
                print(f"Successfully processed {len(events)} events and stored to {filename}")
            else:
                print("No new events found")
    
            return {
                'statusCode': 200,
                'body': json.dumps(f'Successfully processed {len(events) if events else 0} events')
            }
    
        except Exception as e:
            print(f"Error processing Centrify SSO logs: {str(e)}")
            return {
                'statusCode': 500,
                'body': json.dumps(f'Error: {str(e)}')
            }
    
    def get_oauth_token(tenant_url: str, client_id: str, client_secret: str, oauth_app_id: str) -> str:
        """
        Get OAuth access token using client credentials flow
        """
    
        # Create basic auth token
        credentials = f"{client_id}:{client_secret}"
        basic_auth = base64.b64encode(credentials.encode()).decode()
    
        token_url = f"{tenant_url}/oauth2/token/{oauth_app_id}"
    
        headers = {
            'Authorization': f'Basic {basic_auth}',
            'X-CENTRIFY-NATIVE-CLIENT': 'True',
            'Content-Type': 'application/x-www-form-urlencoded'
        }
    
        data = {
            'grant_type': 'client_credentials',
            'scope': 'siem redrock/query'
        }
    
        response = requests.post(token_url, headers=headers, data=data)
        response.raise_for_status()
    
        token_data = response.json()
        return token_data['access_token']
    
    def fetch_audit_events(tenant_url: str, access_token: str, last_timestamp: str, page_size: int, max_pages: int) -> List[Dict]:
        """
        Fetch audit events from Centrify using the Redrock/query API
        """
    
        query_url = f"{tenant_url}/Redrock/query"
    
        headers = {
            'Authorization': f'Bearer {access_token}',
            'X-CENTRIFY-NATIVE-CLIENT': 'True',
            'Content-Type': 'application/json'
        }
    
        # Build SQL query with timestamp filter
        if last_timestamp:
            sql_query = f"Select * from Event where WhenOccurred > '{last_timestamp}' ORDER BY WhenOccurred ASC"
        else:
            # First run - get events from last 24 hours
            sql_query = "Select * from Event where WhenOccurred > datefunc('now', '-1') ORDER BY WhenOccurred ASC"
    
        payload = {
            "Script": sql_query,
            "args": {
                "PageSize": page_size,
                "Limit": page_size * max_pages,
                "Caching": -1
            }
        }
    
        response = requests.post(query_url, headers=headers, json=payload)
        response.raise_for_status()
    
        response_data = response.json()
    
        if not response_data.get('success', False):
            raise Exception(f"API query failed: {response_data.get('Message', 'Unknown error')}")
    
        # Parse the response
        result = response_data.get('Result', {})
        columns = {col['Name']: i for i, col in enumerate(result.get('Columns', []))}
        raw_results = result.get('Results', [])
    
        events = []
        for raw_event in raw_results:
            event = {}
            row_data = raw_event.get('Row', {})
    
            # Map column names to values
            for col_name, col_index in columns.items():
                if col_name in row_data and row_data[col_name] is not None:
                    event[col_name] = row_data[col_name]
    
            # Add metadata
            event['_source'] = 'centrify_sso'
            event['_collected_at'] = datetime.utcnow().isoformat() + 'Z'
    
            events.append(event)
    
        return events
    
    def get_last_state(s3_client, bucket: str, state_key: str) -> Optional[str]:
        """
        Get the last processed timestamp from S3 state file
        """
        try:
            response = s3_client.get_object(Bucket=bucket, Key=state_key)
            state_data = json.loads(response['Body'].read().decode('utf-8'))
            return state_data.get('last_timestamp')
        except s3_client.exceptions.NoSuchKey:
            print("No previous state found, starting from 24 hours ago")
            return None
        except Exception as e:
            print(f"Error reading state: {e}")
            return None
    
    def update_state(s3_client, bucket: str, state_key: str, timestamp: str):
        """
        Update the state file with the latest processed timestamp
        """
        state_data = {
            'last_timestamp': timestamp,
            'updated_at': datetime.utcnow().isoformat() + 'Z'
        }
    
        s3_client.put_object(
            Bucket=bucket,
            Key=state_key,
            Body=json.dumps(state_data),
            ContentType='application/json'
        )
    
    def store_events_to_s3(s3_client, bucket: str, key: str, events: List[Dict]):
        """
        Store events as JSONL (one JSON object per line) in S3
        """
        # Convert to JSONL format (one JSON object per line)
        jsonl_content = 'n'.join(json.dumps(event, default=str) for event in events)
    
        s3_client.put_object(
            Bucket=bucket,
            Key=key,
            Body=jsonl_content,
            ContentType='application/x-ndjson'
        )
    
    def get_latest_event_timestamp(events: List[Dict]) -> str:
        """
        Get the latest timestamp from the events for state tracking
        """
        if not events:
            return datetime.utcnow().isoformat() + 'Z'
    
        latest = None
        for event in events:
            when_occurred = event.get('WhenOccurred')
            if when_occurred:
                if latest is None or when_occurred > latest:
                    latest = when_occurred
    
        return latest or datetime.utcnow().isoformat() + 'Z'
    
  5. 依次前往配置 > 环境变量

  6. 依次点击修改 > 添加新的环境变量

  7. 输入下表中提供的环境变量,并将示例值替换为您的值。

    环境变量

    示例值
    S3_BUCKET delinea-centrify-logs-bucket
    S3_PREFIX centrify-sso-logs/
    STATE_KEY centrify-sso-logs/state.json
    TENANT_URL https://yourtenant.my.centrify.com
    CLIENT_ID your-client-id
    CLIENT_SECRET your-client-secret
    OAUTH_APP_ID your-oauth-application-id
    OAUTH_SCOPE siem
    PAGE_SIZE 1000
    MAX_PAGES 10
  8. 创建函数后,请停留在其页面上(或依次打开 Lambda > 函数 > 您的函数)。

  9. 选择配置标签页。

  10. 常规配置面板中,点击修改

  11. 超时更改为 5 分钟(300 秒),然后点击保存

创建 EventBridge 计划

  1. 依次前往 Amazon EventBridge > 调度器 > 创建调度
  2. 提供以下配置详细信息:
    • 周期性安排费率 (1 hour)。
    • 目标:您的 Lambda 函数 CentrifySSOLogExport
    • 名称CentrifySSOLogExport-1h
  3. 点击创建时间表

(可选)为 Google SecOps 创建只读 IAM 用户和密钥

  1. AWS 控制台中,依次前往 IAM > 用户
  2. 点击 Add users(添加用户)。
  3. 提供以下配置详细信息:
    • 用户:输入 secops-reader
    • 访问类型:选择访问密钥 - 以程序化方式访问
  4. 点击创建用户
  5. 附加最低限度的读取政策(自定义):用户 > secops-reader > 权限
  6. 依次点击添加权限 > 直接附加政策
  7. 选择创建政策
  8. JSON:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket"
        }
      ]
    }
    
  9. 名称 = secops-reader-policy

  10. 依次点击创建政策 > 搜索/选择 > 下一步

  11. 点击添加权限

  12. secops-reader 创建访问密钥:安全凭据 > 访问密钥

  13. 点击创建访问密钥

  14. 下载 .CSV。(您需要将这些值粘贴到 Feed 中)。

在 Google SecOps 中配置 Feed 以注入 Delinea (Centrify) SSO 日志

  1. 依次前往 SIEM 设置> Feed
  2. 点击 + 添加新 Feed
  3. Feed 名称字段中,输入 Feed 的名称(例如 Delinea Centrify SSO logs)。
  4. 选择 Amazon S3 V2 作为来源类型
  5. 选择 Centrify 作为日志类型
  6. 点击下一步
  7. 为以下输入参数指定值:
    • S3 URIs3://delinea-centrify-logs-bucket/centrify-sso-logs/
    • 来源删除选项:根据您的偏好选择删除选项。
    • 文件存在时间上限:包含在过去指定天数内修改的文件。默认值为 180 天。
    • 访问密钥 ID:有权访问 S3 存储桶的用户访问密钥。
    • 私有访问密钥:具有 S3 存储桶访问权限的用户私有密钥。
    • 资源命名空间资源命名空间
    • 注入标签:应用于此 Feed 中事件的标签。
  8. 点击下一步
  9. 最终确定界面中查看新的 Feed 配置,然后点击提交

UDM 映射表

日志字段 UDM 映射 逻辑
AccountID security_result.detection_fields.value 原始日志中的 AccountID 值会分配给具有 key:Account IDsecurity_result.detection_fields 对象。
ApplicationName target.application 原始日志中的 ApplicationName 值会分配给 target.application 字段。
AuthorityFQDN target.asset.network_domain 原始日志中的 AuthorityFQDN 值会分配给 target.asset.network_domain 字段。
AuthorityID target.asset.asset_id 原始日志中的 AuthorityID 值会分配给 target.asset.asset_id 字段,并以“AuthorityID:”为前缀。
AzDeploymentId security_result.detection_fields.value 原始日志中的 AzDeploymentId 值会分配给具有 key:AzDeploymentIdsecurity_result.detection_fields 对象。
AzRoleId additional.fields.value.string_value 原始日志中的 AzRoleId 值会分配给具有 key:AzRole Idadditional.fields 对象。
AzRoleName target.user.attribute.roles.name 原始日志中的 AzRoleName 值会分配给 target.user.attribute.roles.name 字段。
ComputerFQDN principal.asset.network_domain 原始日志中的 ComputerFQDN 值会分配给 principal.asset.network_domain 字段。
ComputerID principal.asset.asset_id 原始日志中的 ComputerID 值会分配给 principal.asset.asset_id 字段,并以“ComputerId:”为前缀。
ComputerName about.hostname 原始日志中的 ComputerName 值会分配给 about.hostname 字段。
CredentialId security_result.detection_fields.value 原始日志中的 CredentialId 值会分配给具有 key:Credential Idsecurity_result.detection_fields 对象。
DirectoryServiceName security_result.detection_fields.value 原始日志中的 DirectoryServiceName 值会分配给具有 key:Directory Service Namesecurity_result.detection_fields 对象。
DirectoryServiceNameLocalized security_result.detection_fields.value 原始日志中的 DirectoryServiceNameLocalized 值会分配给具有 key:Directory Service Name Localizedsecurity_result.detection_fields 对象。
DirectoryServiceUuid security_result.detection_fields.value 原始日志中的 DirectoryServiceUuid 值会分配给具有 key:Directory Service Uuidsecurity_result.detection_fields 对象。
EventMessage security_result.summary 原始日志中的 EventMessage 值会分配给 security_result.summary 字段。
EventType metadata.product_event_type 原始日志中的 EventType 值会分配给 metadata.product_event_type 字段。它还用于确定 metadata.event_type
FailReason security_result.summary 如果原始日志中存在 FailReason,则将其值分配给 security_result.summary 字段。
FailUserName target.user.email_addresses 如果原始日志中存在 FailUserName,则将其值分配给 target.user.email_addresses 字段。
FromIPAddress principal.ip 原始日志中的 FromIPAddress 值会分配给 principal.ip 字段。
ID security_result.detection_fields.value 原始日志中的 ID 值会分配给具有 key:IDsecurity_result.detection_fields 对象。
InternalTrackingID metadata.product_log_id 原始日志中的 InternalTrackingID 值会分配给 metadata.product_log_id 字段。
JumpType additional.fields.value.string_value 原始日志中的 JumpType 值会分配给具有 key:Jump Typeadditional.fields 对象。
NormalizedUser target.user.email_addresses 原始日志中的 NormalizedUser 值会分配给 target.user.email_addresses 字段。
OperationMode additional.fields.value.string_value 原始日志中的 OperationMode 值会分配给具有 key:Operation Modeadditional.fields 对象。
ProxyId security_result.detection_fields.value 原始日志中的 ProxyId 值会分配给具有 key:Proxy Idsecurity_result.detection_fields 对象。
RequestUserAgent network.http.user_agent 原始日志中的 RequestUserAgent 值会分配给 network.http.user_agent 字段。
SessionGuid network.session_id 原始日志中的 SessionGuid 值会分配给 network.session_id 字段。
Tenant additional.fields.value.string_value 原始日志中的 Tenant 值会分配给具有 key:Tenantadditional.fields 对象。
ThreadType additional.fields.value.string_value 原始日志中的 ThreadType 值会分配给具有 key:Thread Typeadditional.fields 对象。
UserType principal.user.attribute.roles.name 原始日志中的 UserType 值会分配给 principal.user.attribute.roles.name 字段。
WhenOccurred metadata.event_timestamp 系统会解析原始日志中的 WhenOccurred 值,并将其分配给 metadata.event_timestamp 字段。此字段还会填充顶级 timestamp 字段。硬编码值“SSO”。由 EventType 字段确定。如果不存在 EventTypeEventType 不符合任何特定条件,则默认为 STATUS_UPDATE。可以是 USER_LOGINUSER_CREATIONUSER_RESOURCE_ACCESSUSER_LOGOUTUSER_CHANGE_PASSWORD。硬编码值“CENTRIFY_SSO”。硬编码值“SSO”。硬编码值“Centrify”。如果 message 字段包含会话 ID,则会提取并使用该 ID。否则,默认值为“1”。如果可用,则从 host 字段(来自 syslog 标头)中提取。如果可用,则从 pid 字段(来自 syslog 标头)中提取。如果存在 UserGuid,则使用其值。否则,如果 message 字段包含用户 ID,则会提取并使用该 ID。如果 Level 为“Info”,则设置为“ALLOW”;如果存在 FailReason,则设置为“BLOCK”。如果存在 FailReason,则设置为“AUTH_VIOLATION”。由 Level 字段确定。如果 Level 为“Info”,则设置为“INFORMATIONAL”;如果 Level 为“Warning”,则设置为“MEDIUM”;如果 Level 为“Error”,则设置为“ERROR”。

需要更多帮助?从社区成员和 Google SecOps 专业人士那里获得解答。