收集 Delinea SSO 日志
本文档介绍了如何使用 Amazon S3 将 Delinea(以前称为 Centrify)单点登录 (SSO) 日志注入到 Google Security Operations。解析器会提取日志,同时处理 JSON 和 syslog 格式。它会解析键值对、时间戳和其他相关字段,将它们映射到 UDM 模型,并使用特定逻辑来处理登录失败、用户代理、严重程度、身份验证机制和各种事件类型。对于失败事件中的目标电子邮件地址,该属性优先考虑 FailUserName
而不是 NormalizedUser
。
准备工作
请确保满足以下前提条件:
- Google SecOps 实例。
- 对 Delinea (Centrify) SSO 租户的特权访问权限。
- 对 AWS(S3、Identity and Access Management (IAM)、Lambda、EventBridge)的特权访问权限。
收集 Delinea (Centrify) SSO 前提条件(ID、API 密钥、组织 ID、令牌)
- 登录 Delinea 管理员门户。
- 依次前往应用 > 添加应用。
- 搜索 OAuth2 客户端,然后点击添加。
- 在添加 Web 应用对话框中,点击是。
- 在添加 Web 应用对话框中,点击关闭。
- 在应用配置页面上,配置以下内容:
- 常规标签页:
- 应用 ID:输入唯一标识符(例如
secops-oauth-client
) - 应用名称:输入一个描述性名称(例如
SecOps Data Export
) - 应用说明:输入说明(例如
OAuth client for exporting audit events to SecOps
)
- 应用 ID:输入唯一标识符(例如
- 信任标签页:
- 申请保密:选中此选项
- Client ID Type:选择 Confidential
- 已签发的客户端 ID:复制并保存此值
- 已签发的客户端密钥:复制并保存此值
- 令牌标签页:
- 身份验证方法:选择 Client Creds
- 令牌类型:选择 Jwt RS256
- 范围标签页:
- 添加范围 siem,并添加说明 SIEM 集成访问权限。
- 添加范围 redrock/query,说明为 Query API 访问权限。
- 常规标签页:
- 点击保存以创建 OAuth 客户端。
- 依次前往核心服务 > 用户 > 添加用户。
- 配置服务用户:
- 登录名:输入第 6 步中的客户端 ID。
- 电子邮件地址:输入有效的电子邮件地址(必填字段)。
- 显示名称:输入一个描述性名称(例如
SecOps Service User
)。 - 密码和确认密码:输入第 6 步中的客户端密钥
- 状态:选择 Is OAuth confidential client。
- 点击 Create User(创建用户)。
- 前往访问权限 > 角色,然后将服务用户分配给具有相应权限的角色,以便查询审核事件。
- 复制以下详细信息并将其保存在安全的位置:
- 租户网址:您的 Centrify 租户网址(例如
https://yourtenant.my.centrify.com
) - 客户端 ID:来自第 6 步
- 客户端密钥:来自第 6 步
- OAuth 应用 ID:来自应用配置
- 租户网址:您的 Centrify 租户网址(例如
为 Google SecOps 配置 AWS S3 存储桶和 IAM
- 按照以下用户指南创建 Amazon S3 存储桶:创建存储桶。
- 保存存储桶名称和区域以供日后参考(例如
delinea-centrify-logs-bucket
)。 - 按照以下用户指南创建用户:创建 IAM 用户。
- 选择创建的用户。
- 选择安全凭据标签页。
- 在访问密钥部分中,点击创建访问密钥。
- 选择第三方服务作为使用情形。
- 点击下一步。
- 可选:添加说明标记。
- 点击创建访问密钥。
- 点击下载 .CSV 文件,保存访问密钥和秘密访问密钥,以供日后参考。
- 点击完成。
- 选择权限标签页。
- 在权限政策部分中,点击添加权限。
- 选择添加权限。
- 选择直接附加政策。
- 搜索 AmazonS3FullAccess 政策。
- 选择相应政策。
- 点击下一步。
- 点击添加权限。
为 S3 上传配置 IAM 政策和角色
- 在 AWS 控制台中,依次前往 IAM > 政策。
- 依次点击创建政策 > JSON 标签页。
- 复制并粘贴以下政策。
政策 JSON(如果您输入了其他存储桶名称,请替换
delinea-centrify-logs-bucket
):{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket/centrify-sso-logs/state.json" } ] }
依次点击下一步 > 创建政策。
依次前往 IAM > 角色。
依次点击创建角色 > AWS 服务 > Lambda。
附加新创建的政策和受管政策 AWSLambdaBasicExecutionRole(用于 CloudWatch 日志记录)。
将角色命名为
CentrifySSOLogExportRole
,然后点击创建角色。
创建 Lambda 函数
- 在 AWS 控制台中,依次前往 Lambda > 函数 > 创建函数。
- 点击从头开始创作。
提供以下配置详细信息:
设置 值 名称 CentrifySSOLogExport
运行时 Python 3.13 架构 x86_64 执行角色 CentrifySSOLogExportRole
创建函数后,打开 Code 标签页,删除桩代码并粘贴以下代码 (
CentrifySSOLogExport.py
)。import json import boto3 import requests import base64 from datetime import datetime, timedelta import os from typing import Dict, List, Optional def lambda_handler(event, context): """ Lambda function to fetch Delinea Centrify SSO audit events and store them in S3 """ # Environment variables S3_BUCKET = os.environ['S3_BUCKET'] S3_PREFIX = os.environ['S3_PREFIX'] STATE_KEY = os.environ['STATE_KEY'] # Centrify API credentials TENANT_URL = os.environ['TENANT_URL'] CLIENT_ID = os.environ['CLIENT_ID'] CLIENT_SECRET = os.environ['CLIENT_SECRET'] OAUTH_APP_ID = os.environ['OAUTH_APP_ID'] # Optional parameters PAGE_SIZE = int(os.environ.get('PAGE_SIZE', '1000')) MAX_PAGES = int(os.environ.get('MAX_PAGES', '10')) s3_client = boto3.client('s3') try: # Get last execution state last_timestamp = get_last_state(s3_client, S3_BUCKET, STATE_KEY) # Get OAuth access token access_token = get_oauth_token(TENANT_URL, CLIENT_ID, CLIENT_SECRET, OAUTH_APP_ID) # Fetch audit events events = fetch_audit_events(TENANT_URL, access_token, last_timestamp, PAGE_SIZE, MAX_PAGES) if events: # Store events in S3 current_timestamp = datetime.utcnow() filename = f"{S3_PREFIX}centrify-sso-events-{current_timestamp.strftime('%Y%m%d_%H%M%S')}.json" store_events_to_s3(s3_client, S3_BUCKET, filename, events) # Update state with latest timestamp latest_timestamp = get_latest_event_timestamp(events) update_state(s3_client, S3_BUCKET, STATE_KEY, latest_timestamp) print(f"Successfully processed {len(events)} events and stored to {filename}") else: print("No new events found") return { 'statusCode': 200, 'body': json.dumps(f'Successfully processed {len(events) if events else 0} events') } except Exception as e: print(f"Error processing Centrify SSO logs: {str(e)}") return { 'statusCode': 500, 'body': json.dumps(f'Error: {str(e)}') } def get_oauth_token(tenant_url: str, client_id: str, client_secret: str, oauth_app_id: str) -> str: """ Get OAuth access token using client credentials flow """ # Create basic auth token credentials = f"{client_id}:{client_secret}" basic_auth = base64.b64encode(credentials.encode()).decode() token_url = f"{tenant_url}/oauth2/token/{oauth_app_id}" headers = { 'Authorization': f'Basic {basic_auth}', 'X-CENTRIFY-NATIVE-CLIENT': 'True', 'Content-Type': 'application/x-www-form-urlencoded' } data = { 'grant_type': 'client_credentials', 'scope': 'siem redrock/query' } response = requests.post(token_url, headers=headers, data=data) response.raise_for_status() token_data = response.json() return token_data['access_token'] def fetch_audit_events(tenant_url: str, access_token: str, last_timestamp: str, page_size: int, max_pages: int) -> List[Dict]: """ Fetch audit events from Centrify using the Redrock/query API """ query_url = f"{tenant_url}/Redrock/query" headers = { 'Authorization': f'Bearer {access_token}', 'X-CENTRIFY-NATIVE-CLIENT': 'True', 'Content-Type': 'application/json' } # Build SQL query with timestamp filter if last_timestamp: sql_query = f"Select * from Event where WhenOccurred > '{last_timestamp}' ORDER BY WhenOccurred ASC" else: # First run - get events from last 24 hours sql_query = "Select * from Event where WhenOccurred > datefunc('now', '-1') ORDER BY WhenOccurred ASC" payload = { "Script": sql_query, "args": { "PageSize": page_size, "Limit": page_size * max_pages, "Caching": -1 } } response = requests.post(query_url, headers=headers, json=payload) response.raise_for_status() response_data = response.json() if not response_data.get('success', False): raise Exception(f"API query failed: {response_data.get('Message', 'Unknown error')}") # Parse the response result = response_data.get('Result', {}) columns = {col['Name']: i for i, col in enumerate(result.get('Columns', []))} raw_results = result.get('Results', []) events = [] for raw_event in raw_results: event = {} row_data = raw_event.get('Row', {}) # Map column names to values for col_name, col_index in columns.items(): if col_name in row_data and row_data[col_name] is not None: event[col_name] = row_data[col_name] # Add metadata event['_source'] = 'centrify_sso' event['_collected_at'] = datetime.utcnow().isoformat() + 'Z' events.append(event) return events def get_last_state(s3_client, bucket: str, state_key: str) -> Optional[str]: """ Get the last processed timestamp from S3 state file """ try: response = s3_client.get_object(Bucket=bucket, Key=state_key) state_data = json.loads(response['Body'].read().decode('utf-8')) return state_data.get('last_timestamp') except s3_client.exceptions.NoSuchKey: print("No previous state found, starting from 24 hours ago") return None except Exception as e: print(f"Error reading state: {e}") return None def update_state(s3_client, bucket: str, state_key: str, timestamp: str): """ Update the state file with the latest processed timestamp """ state_data = { 'last_timestamp': timestamp, 'updated_at': datetime.utcnow().isoformat() + 'Z' } s3_client.put_object( Bucket=bucket, Key=state_key, Body=json.dumps(state_data), ContentType='application/json' ) def store_events_to_s3(s3_client, bucket: str, key: str, events: List[Dict]): """ Store events as JSONL (one JSON object per line) in S3 """ # Convert to JSONL format (one JSON object per line) jsonl_content = 'n'.join(json.dumps(event, default=str) for event in events) s3_client.put_object( Bucket=bucket, Key=key, Body=jsonl_content, ContentType='application/x-ndjson' ) def get_latest_event_timestamp(events: List[Dict]) -> str: """ Get the latest timestamp from the events for state tracking """ if not events: return datetime.utcnow().isoformat() + 'Z' latest = None for event in events: when_occurred = event.get('WhenOccurred') if when_occurred: if latest is None or when_occurred > latest: latest = when_occurred return latest or datetime.utcnow().isoformat() + 'Z'
依次前往配置 > 环境变量。
依次点击修改 > 添加新的环境变量。
输入下表中提供的环境变量,并将示例值替换为您的值。
环境变量
键 示例值 S3_BUCKET
delinea-centrify-logs-bucket
S3_PREFIX
centrify-sso-logs/
STATE_KEY
centrify-sso-logs/state.json
TENANT_URL
https://yourtenant.my.centrify.com
CLIENT_ID
your-client-id
CLIENT_SECRET
your-client-secret
OAUTH_APP_ID
your-oauth-application-id
OAUTH_SCOPE
siem
PAGE_SIZE
1000
MAX_PAGES
10
创建函数后,请停留在其页面上(或依次打开 Lambda > 函数 > 您的函数)。
选择配置标签页。
在常规配置面板中,点击修改。
将超时更改为 5 分钟(300 秒),然后点击保存。
创建 EventBridge 计划
- 依次前往 Amazon EventBridge > 调度器 > 创建调度。
- 提供以下配置详细信息:
- 周期性安排:费率 (
1 hour
)。 - 目标:您的 Lambda 函数
CentrifySSOLogExport
。 - 名称:
CentrifySSOLogExport-1h
。
- 周期性安排:费率 (
- 点击创建时间表。
(可选)为 Google SecOps 创建只读 IAM 用户和密钥
- 在 AWS 控制台中,依次前往 IAM > 用户。
- 点击 Add users(添加用户)。
- 提供以下配置详细信息:
- 用户:输入
secops-reader
。 - 访问类型:选择访问密钥 - 以程序化方式访问。
- 用户:输入
- 点击创建用户。
- 附加最低限度的读取政策(自定义):用户 > secops-reader > 权限。
- 依次点击添加权限 > 直接附加政策。
- 选择创建政策。
JSON:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket" } ] }
名称 =
secops-reader-policy
。依次点击创建政策 > 搜索/选择 > 下一步。
点击添加权限。
为
secops-reader
创建访问密钥:安全凭据 > 访问密钥。点击创建访问密钥。
下载
.CSV
。(您需要将这些值粘贴到 Feed 中)。
在 Google SecOps 中配置 Feed 以注入 Delinea (Centrify) SSO 日志
- 依次前往 SIEM 设置> Feed。
- 点击 + 添加新 Feed。
- 在Feed 名称字段中,输入 Feed 的名称(例如
Delinea Centrify SSO logs
)。 - 选择 Amazon S3 V2 作为来源类型。
- 选择 Centrify 作为日志类型。
- 点击下一步。
- 为以下输入参数指定值:
- S3 URI:
s3://delinea-centrify-logs-bucket/centrify-sso-logs/
- 来源删除选项:根据您的偏好选择删除选项。
- 文件存在时间上限:包含在过去指定天数内修改的文件。默认值为 180 天。
- 访问密钥 ID:有权访问 S3 存储桶的用户访问密钥。
- 私有访问密钥:具有 S3 存储桶访问权限的用户私有密钥。
- 资源命名空间:资源命名空间。
- 注入标签:应用于此 Feed 中事件的标签。
- S3 URI:
- 点击下一步。
- 在最终确定界面中查看新的 Feed 配置,然后点击提交。
UDM 映射表
日志字段 | UDM 映射 | 逻辑 |
---|---|---|
AccountID |
security_result.detection_fields.value |
原始日志中的 AccountID 值会分配给具有 key :Account ID 的 security_result.detection_fields 对象。 |
ApplicationName |
target.application |
原始日志中的 ApplicationName 值会分配给 target.application 字段。 |
AuthorityFQDN |
target.asset.network_domain |
原始日志中的 AuthorityFQDN 值会分配给 target.asset.network_domain 字段。 |
AuthorityID |
target.asset.asset_id |
原始日志中的 AuthorityID 值会分配给 target.asset.asset_id 字段,并以“AuthorityID:”为前缀。 |
AzDeploymentId |
security_result.detection_fields.value |
原始日志中的 AzDeploymentId 值会分配给具有 key :AzDeploymentId 的 security_result.detection_fields 对象。 |
AzRoleId |
additional.fields.value.string_value |
原始日志中的 AzRoleId 值会分配给具有 key :AzRole Id 的 additional.fields 对象。 |
AzRoleName |
target.user.attribute.roles.name |
原始日志中的 AzRoleName 值会分配给 target.user.attribute.roles.name 字段。 |
ComputerFQDN |
principal.asset.network_domain |
原始日志中的 ComputerFQDN 值会分配给 principal.asset.network_domain 字段。 |
ComputerID |
principal.asset.asset_id |
原始日志中的 ComputerID 值会分配给 principal.asset.asset_id 字段,并以“ComputerId:”为前缀。 |
ComputerName |
about.hostname |
原始日志中的 ComputerName 值会分配给 about.hostname 字段。 |
CredentialId |
security_result.detection_fields.value |
原始日志中的 CredentialId 值会分配给具有 key :Credential Id 的 security_result.detection_fields 对象。 |
DirectoryServiceName |
security_result.detection_fields.value |
原始日志中的 DirectoryServiceName 值会分配给具有 key :Directory Service Name 的 security_result.detection_fields 对象。 |
DirectoryServiceNameLocalized |
security_result.detection_fields.value |
原始日志中的 DirectoryServiceNameLocalized 值会分配给具有 key :Directory Service Name Localized 的 security_result.detection_fields 对象。 |
DirectoryServiceUuid |
security_result.detection_fields.value |
原始日志中的 DirectoryServiceUuid 值会分配给具有 key :Directory Service Uuid 的 security_result.detection_fields 对象。 |
EventMessage |
security_result.summary |
原始日志中的 EventMessage 值会分配给 security_result.summary 字段。 |
EventType |
metadata.product_event_type |
原始日志中的 EventType 值会分配给 metadata.product_event_type 字段。它还用于确定 metadata.event_type 。 |
FailReason |
security_result.summary |
如果原始日志中存在 FailReason ,则将其值分配给 security_result.summary 字段。 |
FailUserName |
target.user.email_addresses |
如果原始日志中存在 FailUserName ,则将其值分配给 target.user.email_addresses 字段。 |
FromIPAddress |
principal.ip |
原始日志中的 FromIPAddress 值会分配给 principal.ip 字段。 |
ID |
security_result.detection_fields.value |
原始日志中的 ID 值会分配给具有 key :ID 的 security_result.detection_fields 对象。 |
InternalTrackingID |
metadata.product_log_id |
原始日志中的 InternalTrackingID 值会分配给 metadata.product_log_id 字段。 |
JumpType |
additional.fields.value.string_value |
原始日志中的 JumpType 值会分配给具有 key :Jump Type 的 additional.fields 对象。 |
NormalizedUser |
target.user.email_addresses |
原始日志中的 NormalizedUser 值会分配给 target.user.email_addresses 字段。 |
OperationMode |
additional.fields.value.string_value |
原始日志中的 OperationMode 值会分配给具有 key :Operation Mode 的 additional.fields 对象。 |
ProxyId |
security_result.detection_fields.value |
原始日志中的 ProxyId 值会分配给具有 key :Proxy Id 的 security_result.detection_fields 对象。 |
RequestUserAgent |
network.http.user_agent |
原始日志中的 RequestUserAgent 值会分配给 network.http.user_agent 字段。 |
SessionGuid |
network.session_id |
原始日志中的 SessionGuid 值会分配给 network.session_id 字段。 |
Tenant |
additional.fields.value.string_value |
原始日志中的 Tenant 值会分配给具有 key :Tenant 的 additional.fields 对象。 |
ThreadType |
additional.fields.value.string_value |
原始日志中的 ThreadType 值会分配给具有 key :Thread Type 的 additional.fields 对象。 |
UserType |
principal.user.attribute.roles.name |
原始日志中的 UserType 值会分配给 principal.user.attribute.roles.name 字段。 |
WhenOccurred |
metadata.event_timestamp |
系统会解析原始日志中的 WhenOccurred 值,并将其分配给 metadata.event_timestamp 字段。此字段还会填充顶级 timestamp 字段。硬编码值“SSO”。由 EventType 字段确定。如果不存在 EventType 或 EventType 不符合任何特定条件,则默认为 STATUS_UPDATE 。可以是 USER_LOGIN 、USER_CREATION 、USER_RESOURCE_ACCESS 、USER_LOGOUT 或 USER_CHANGE_PASSWORD 。硬编码值“CENTRIFY_SSO”。硬编码值“SSO”。硬编码值“Centrify”。如果 message 字段包含会话 ID,则会提取并使用该 ID。否则,默认值为“1”。如果可用,则从 host 字段(来自 syslog 标头)中提取。如果可用,则从 pid 字段(来自 syslog 标头)中提取。如果存在 UserGuid ,则使用其值。否则,如果 message 字段包含用户 ID,则会提取并使用该 ID。如果 Level 为“Info”,则设置为“ALLOW”;如果存在 FailReason ,则设置为“BLOCK”。如果存在 FailReason ,则设置为“AUTH_VIOLATION”。由 Level 字段确定。如果 Level 为“Info”,则设置为“INFORMATIONAL”;如果 Level 为“Warning”,则设置为“MEDIUM”;如果 Level 为“Error”,则设置为“ERROR”。 |
需要更多帮助?从社区成员和 Google SecOps 专业人士那里获得解答。