收集 Delinea 單一登入記錄
本文說明如何使用 Amazon S3,將 Delinea (前身為 Centrify) 單一登入 (SSO) 記錄擷取至 Google Security Operations。剖析器會擷取記錄,並處理 JSON 和系統記錄格式。並將鍵/值組合、時間戳記和其他相關欄位對應至 UDM 模型,以及處理登入失敗、使用者代理程式、嚴重程度、驗證機制和各種事件類型的特定邏輯。在失敗事件中,系統會優先使用 FailUserName
,而非 NormalizedUser
做為目標電子郵件地址。
事前準備
請確認您已完成下列事前準備事項:
- Google SecOps 執行個體。
- Delinea (Centrify) SSO 租戶的特殊存取權。
- AWS 的特殊權限 (S3、Identity and Access Management (IAM)、Lambda、EventBridge)。
收集 Delinea (Centrify) 單一登入先決條件 (ID、API 金鑰、機構 ID、權杖)
- 登入 Delinea 管理入口網站。
- 依序前往「應用程式」>「新增應用程式」。
- 搜尋「OAuth2 Client」,然後按一下「Add」。
- 在「新增網頁應用程式」對話方塊中,按一下「是」。
- 按一下「新增網頁應用程式」對話方塊中的「關閉」。
- 在「Application Configuration」頁面中,設定下列項目:
- 「一般」分頁:
- 「Application ID」:輸入專屬 ID (例如
secops-oauth-client
) - 應用程式名稱:輸入描述性名稱 (例如
SecOps Data Export
) - 應用程式說明:輸入說明 (例如
OAuth client for exporting audit events to SecOps
)
- 「Application ID」:輸入專屬 ID (例如
- 「信任」分頁:
- 應用程式為機密內容:勾選這個選項
- 用戶端 ID 類型:選取「機密」
- 已核發的用戶端 ID:複製並儲存這個值
- 核發的用戶端密鑰:複製並儲存這個值
- 「符記」分頁:
- 驗證方法:選取「用戶端憑證」
- 權杖類型:選取「Jwt RS256」
- 「範圍」分頁:
- 新增範圍 siem,並加上「SIEM Integration Access」說明。
- 新增範圍 redrock/query,並加上「Query API Access」說明。
- 「一般」分頁:
- 按一下「儲存」即可建立 OAuth 用戶端。
- 依序前往「核心服務」>「使用者」>「新增使用者」。
- 設定服務使用者:
- 「登入名稱」:輸入步驟 6 中的「用戶端 ID」。
- 電子郵件地址:輸入有效的電子郵件地址 (必填欄位)。
- 顯示名稱:輸入描述性名稱 (例如
SecOps Service User
)。 - 「密碼」和「確認密碼」:輸入步驟 6 中的「用戶端密鑰」
- 狀態:選取「Is OAuth confidential client」(OAuth 機密用戶端)。
- 按一下「Create User」。
- 依序前往「存取權」>「角色」,然後將服務使用者指派給具有適當權限的角色,以便查詢稽核事件。
- 複製下列詳細資料並存放在安全位置:
- 租戶網址:您的 Centrify 租戶網址 (例如
https://yourtenant.my.centrify.com
) - 用戶端 ID:來自步驟 6
- 用戶端密鑰:來自步驟 6
- OAuth 應用程式 ID:來自應用程式設定
- 租戶網址:您的 Centrify 租戶網址 (例如
為 Google SecOps 設定 AWS S3 值區和 IAM
- 按照這份使用者指南建立 Amazon S3 bucket:建立 bucket。
- 儲存 bucket 的「名稱」和「區域」,以供日後參考 (例如
delinea-centrify-logs-bucket
)。 - 請按照這份使用者指南建立使用者:建立 IAM 使用者。
- 選取建立的「使用者」。
- 選取「安全憑證」分頁標籤。
- 在「Access Keys」部分中,按一下「Create Access Key」。
- 選取「第三方服務」做為「用途」。
- 點選「下一步」。
- 選用:新增說明標記。
- 按一下「建立存取金鑰」。
- 按一下「下載 .CSV 檔案」,儲存「存取金鑰」和「私密存取金鑰」以供日後參考。
- 按一下 [完成]。
- 選取「權限」分頁標籤。
- 在「權限政策」部分中,按一下「新增權限」。
- 選取「新增權限」。
- 選取「直接附加政策」。
- 搜尋「AmazonS3FullAccess」AmazonS3FullAccess政策。
- 選取政策。
- 點選「下一步」。
- 按一下「Add permissions」。
設定 S3 上傳的身分與存取權管理政策和角色
- 在 AWS 控制台中,依序前往「IAM」>「Policies」(政策)。
- 按一下「建立政策」>「JSON」分頁。
- 複製並貼上下列政策。
政策 JSON (如果您輸入的 bucket 名稱不同,請替換
delinea-centrify-logs-bucket
):{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket/centrify-sso-logs/state.json" } ] }
依序點選「下一步」>「建立政策」。
依序前往「IAM」>「角色」。
依序點選「建立角色」>「AWS 服務」>「Lambda」。
附加新建立的政策和代管政策 AWSLambdaBasicExecutionRole (適用於 CloudWatch 記錄)。
為角色命名
CentrifySSOLogExportRole
,然後按一下「建立角色」。
建立 Lambda 函式
- 在 AWS 控制台中,依序前往「Lambda」>「Functions」>「Create function」。
- 按一下「從頭開始撰寫」。
請提供下列設定詳細資料:
設定 值 名稱 CentrifySSOLogExport
執行階段 Python 3.13 架構 x86_64 執行角色 CentrifySSOLogExportRole
建立函式後,開啟「程式碼」分頁,刪除存根並貼上以下程式碼 (
CentrifySSOLogExport.py
)。import json import boto3 import requests import base64 from datetime import datetime, timedelta import os from typing import Dict, List, Optional def lambda_handler(event, context): """ Lambda function to fetch Delinea Centrify SSO audit events and store them in S3 """ # Environment variables S3_BUCKET = os.environ['S3_BUCKET'] S3_PREFIX = os.environ['S3_PREFIX'] STATE_KEY = os.environ['STATE_KEY'] # Centrify API credentials TENANT_URL = os.environ['TENANT_URL'] CLIENT_ID = os.environ['CLIENT_ID'] CLIENT_SECRET = os.environ['CLIENT_SECRET'] OAUTH_APP_ID = os.environ['OAUTH_APP_ID'] # Optional parameters PAGE_SIZE = int(os.environ.get('PAGE_SIZE', '1000')) MAX_PAGES = int(os.environ.get('MAX_PAGES', '10')) s3_client = boto3.client('s3') try: # Get last execution state last_timestamp = get_last_state(s3_client, S3_BUCKET, STATE_KEY) # Get OAuth access token access_token = get_oauth_token(TENANT_URL, CLIENT_ID, CLIENT_SECRET, OAUTH_APP_ID) # Fetch audit events events = fetch_audit_events(TENANT_URL, access_token, last_timestamp, PAGE_SIZE, MAX_PAGES) if events: # Store events in S3 current_timestamp = datetime.utcnow() filename = f"{S3_PREFIX}centrify-sso-events-{current_timestamp.strftime('%Y%m%d_%H%M%S')}.json" store_events_to_s3(s3_client, S3_BUCKET, filename, events) # Update state with latest timestamp latest_timestamp = get_latest_event_timestamp(events) update_state(s3_client, S3_BUCKET, STATE_KEY, latest_timestamp) print(f"Successfully processed {len(events)} events and stored to {filename}") else: print("No new events found") return { 'statusCode': 200, 'body': json.dumps(f'Successfully processed {len(events) if events else 0} events') } except Exception as e: print(f"Error processing Centrify SSO logs: {str(e)}") return { 'statusCode': 500, 'body': json.dumps(f'Error: {str(e)}') } def get_oauth_token(tenant_url: str, client_id: str, client_secret: str, oauth_app_id: str) -> str: """ Get OAuth access token using client credentials flow """ # Create basic auth token credentials = f"{client_id}:{client_secret}" basic_auth = base64.b64encode(credentials.encode()).decode() token_url = f"{tenant_url}/oauth2/token/{oauth_app_id}" headers = { 'Authorization': f'Basic {basic_auth}', 'X-CENTRIFY-NATIVE-CLIENT': 'True', 'Content-Type': 'application/x-www-form-urlencoded' } data = { 'grant_type': 'client_credentials', 'scope': 'siem redrock/query' } response = requests.post(token_url, headers=headers, data=data) response.raise_for_status() token_data = response.json() return token_data['access_token'] def fetch_audit_events(tenant_url: str, access_token: str, last_timestamp: str, page_size: int, max_pages: int) -> List[Dict]: """ Fetch audit events from Centrify using the Redrock/query API """ query_url = f"{tenant_url}/Redrock/query" headers = { 'Authorization': f'Bearer {access_token}', 'X-CENTRIFY-NATIVE-CLIENT': 'True', 'Content-Type': 'application/json' } # Build SQL query with timestamp filter if last_timestamp: sql_query = f"Select * from Event where WhenOccurred > '{last_timestamp}' ORDER BY WhenOccurred ASC" else: # First run - get events from last 24 hours sql_query = "Select * from Event where WhenOccurred > datefunc('now', '-1') ORDER BY WhenOccurred ASC" payload = { "Script": sql_query, "args": { "PageSize": page_size, "Limit": page_size * max_pages, "Caching": -1 } } response = requests.post(query_url, headers=headers, json=payload) response.raise_for_status() response_data = response.json() if not response_data.get('success', False): raise Exception(f"API query failed: {response_data.get('Message', 'Unknown error')}") # Parse the response result = response_data.get('Result', {}) columns = {col['Name']: i for i, col in enumerate(result.get('Columns', []))} raw_results = result.get('Results', []) events = [] for raw_event in raw_results: event = {} row_data = raw_event.get('Row', {}) # Map column names to values for col_name, col_index in columns.items(): if col_name in row_data and row_data[col_name] is not None: event[col_name] = row_data[col_name] # Add metadata event['_source'] = 'centrify_sso' event['_collected_at'] = datetime.utcnow().isoformat() + 'Z' events.append(event) return events def get_last_state(s3_client, bucket: str, state_key: str) -> Optional[str]: """ Get the last processed timestamp from S3 state file """ try: response = s3_client.get_object(Bucket=bucket, Key=state_key) state_data = json.loads(response['Body'].read().decode('utf-8')) return state_data.get('last_timestamp') except s3_client.exceptions.NoSuchKey: print("No previous state found, starting from 24 hours ago") return None except Exception as e: print(f"Error reading state: {e}") return None def update_state(s3_client, bucket: str, state_key: str, timestamp: str): """ Update the state file with the latest processed timestamp """ state_data = { 'last_timestamp': timestamp, 'updated_at': datetime.utcnow().isoformat() + 'Z' } s3_client.put_object( Bucket=bucket, Key=state_key, Body=json.dumps(state_data), ContentType='application/json' ) def store_events_to_s3(s3_client, bucket: str, key: str, events: List[Dict]): """ Store events as JSONL (one JSON object per line) in S3 """ # Convert to JSONL format (one JSON object per line) jsonl_content = 'n'.join(json.dumps(event, default=str) for event in events) s3_client.put_object( Bucket=bucket, Key=key, Body=jsonl_content, ContentType='application/x-ndjson' ) def get_latest_event_timestamp(events: List[Dict]) -> str: """ Get the latest timestamp from the events for state tracking """ if not events: return datetime.utcnow().isoformat() + 'Z' latest = None for event in events: when_occurred = event.get('WhenOccurred') if when_occurred: if latest is None or when_occurred > latest: latest = when_occurred return latest or datetime.utcnow().isoformat() + 'Z'
依序前往「設定」>「環境變數」。
依序點選「編輯」> 新增環境變數。
輸入下表提供的環境變數,並將範例值換成您的值。
環境變數
鍵 範例值 S3_BUCKET
delinea-centrify-logs-bucket
S3_PREFIX
centrify-sso-logs/
STATE_KEY
centrify-sso-logs/state.json
TENANT_URL
https://yourtenant.my.centrify.com
CLIENT_ID
your-client-id
CLIENT_SECRET
your-client-secret
OAUTH_APP_ID
your-oauth-application-id
OAUTH_SCOPE
siem
PAGE_SIZE
1000
MAX_PAGES
10
建立函式後,請留在函式頁面 (或依序開啟「Lambda」>「Functions」>「your-function」)。
選取「設定」分頁標籤。
在「一般設定」面板中,按一下「編輯」。
將「Timeout」(逾時間隔) 變更為「5 minutes (300 seconds)」(5 分鐘 (300 秒)),然後按一下「Save」(儲存)。
建立 EventBridge 排程
- 依序前往「Amazon EventBridge」>「Scheduler」>「Create schedule」。
- 提供下列設定詳細資料:
- 週期性時間表:費率 (
1 hour
)。 - 目標:您的 Lambda 函式
CentrifySSOLogExport
。 - 名稱:
CentrifySSOLogExport-1h
。
- 週期性時間表:費率 (
- 按一下「建立時間表」。
(選用) 為 Google SecOps 建立唯讀 IAM 使用者和金鑰
- 在 AWS 控制台中,依序前往「IAM」>「Users」。
- 點選 [Add users] (新增使用者)。
- 提供下列設定詳細資料:
- 使用者:輸入
secops-reader
。 - 存取類型:選取「存取金鑰 - 程式輔助存取」。
- 使用者:輸入
- 按一下「建立使用者」。
- 附加最低讀取政策 (自訂):依序點選「使用者」>「secops-reader」>「權限」。
- 依序點選「新增權限」>「直接附加政策」。
- 選取「建立政策」。
JSON:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket" } ] }
Name =
secops-reader-policy
。依序點選「建立政策」> 搜尋/選取 >「下一步」。
按一下「Add permissions」。
為
secops-reader
建立存取金鑰:依序點選「安全憑證」>「存取金鑰」。按一下「建立存取金鑰」。
下載
.CSV
。(您會將這些值貼到動態饋給中)。
在 Google SecOps 中設定動態饋給,擷取 Delinea (Centrify) 單一登入記錄
- 依序前往「SIEM 設定」>「動態饋給」。
- 按一下「+ 新增動態消息」。
- 在「動態饋給名稱」欄位中輸入動態饋給名稱 (例如
Delinea Centrify SSO logs
)。 - 選取「Amazon S3 V2」做為「來源類型」。
- 選取「Centrify」做為「記錄類型」。
- 點選「下一步」。
- 指定下列輸入參數的值:
- S3 URI:
s3://delinea-centrify-logs-bucket/centrify-sso-logs/
- 來源刪除選項:根據偏好設定選取刪除選項。
- 檔案存在時間上限:包含在過去天數內修改的檔案。預設值為 180 天。
- 存取金鑰 ID:具有 S3 值區存取權的使用者存取金鑰。
- 存取密鑰:具有 S3 bucket 存取權的使用者私密金鑰。
- 資產命名空間:資產命名空間。
- 擷取標籤:套用至這個動態饋給事件的標籤。
- S3 URI:
- 點選「下一步」。
- 在「完成」畫面中檢查新的動態饋給設定,然後按一下「提交」。
UDM 對應表
記錄欄位 | UDM 對應 | 邏輯 |
---|---|---|
AccountID |
security_result.detection_fields.value |
原始記錄中的 AccountID 值會指派給 security_result.detection_fields 物件,並帶有 key :Account ID 。 |
ApplicationName |
target.application |
原始記錄中的 ApplicationName 值會指派給 target.application 欄位。 |
AuthorityFQDN |
target.asset.network_domain |
原始記錄中的 AuthorityFQDN 值會指派給 target.asset.network_domain 欄位。 |
AuthorityID |
target.asset.asset_id |
原始記錄中的 AuthorityID 值會指派給 target.asset.asset_id 欄位,並加上「AuthorityID:」前置字元。 |
AzDeploymentId |
security_result.detection_fields.value |
原始記錄中的 AzDeploymentId 值會指派給具有 key 的 security_result.detection_fields 物件:AzDeploymentId 。 |
AzRoleId |
additional.fields.value.string_value |
原始記錄中的 AzRoleId 值會指派給具有 key 的 additional.fields 物件:AzRole Id 。 |
AzRoleName |
target.user.attribute.roles.name |
原始記錄中的 AzRoleName 值會指派給 target.user.attribute.roles.name 欄位。 |
ComputerFQDN |
principal.asset.network_domain |
原始記錄中的 ComputerFQDN 值會指派給 principal.asset.network_domain 欄位。 |
ComputerID |
principal.asset.asset_id |
原始記錄中的 ComputerID 值會指派給 principal.asset.asset_id 欄位,並加上「ComputerId:」前置字元。 |
ComputerName |
about.hostname |
原始記錄中的 ComputerName 值會指派給 about.hostname 欄位。 |
CredentialId |
security_result.detection_fields.value |
原始記錄中的 CredentialId 值會指派給具有 key 的 security_result.detection_fields 物件:Credential Id 。 |
DirectoryServiceName |
security_result.detection_fields.value |
原始記錄中的 DirectoryServiceName 值會指派給具有 key 的 security_result.detection_fields 物件:Directory Service Name 。 |
DirectoryServiceNameLocalized |
security_result.detection_fields.value |
原始記錄中的 DirectoryServiceNameLocalized 值會指派給具有 key 的 security_result.detection_fields 物件:Directory Service Name Localized 。 |
DirectoryServiceUuid |
security_result.detection_fields.value |
原始記錄中的 DirectoryServiceUuid 值會指派給具有 key 的 security_result.detection_fields 物件:Directory Service Uuid 。 |
EventMessage |
security_result.summary |
原始記錄中的 EventMessage 值會指派給 security_result.summary 欄位。 |
EventType |
metadata.product_event_type |
原始記錄中的 EventType 值會指派給 metadata.product_event_type 欄位。這項資訊也會用於判斷 metadata.event_type 。 |
FailReason |
security_result.summary |
如果原始記錄中存在 FailReason 的值,系統會將該值指派給 security_result.summary 欄位。 |
FailUserName |
target.user.email_addresses |
如果原始記錄中存在 FailUserName 的值,系統會將該值指派給 target.user.email_addresses 欄位。 |
FromIPAddress |
principal.ip |
原始記錄中的 FromIPAddress 值會指派給 principal.ip 欄位。 |
ID |
security_result.detection_fields.value |
原始記錄中的 ID 值會指派給具有 key 的 security_result.detection_fields 物件:ID 。 |
InternalTrackingID |
metadata.product_log_id |
原始記錄中的 InternalTrackingID 值會指派給 metadata.product_log_id 欄位。 |
JumpType |
additional.fields.value.string_value |
原始記錄中的 JumpType 值會指派給具有 key 的 additional.fields 物件:Jump Type 。 |
NormalizedUser |
target.user.email_addresses |
原始記錄中的 NormalizedUser 值會指派給 target.user.email_addresses 欄位。 |
OperationMode |
additional.fields.value.string_value |
原始記錄中的 OperationMode 值會指派給具有 key 的 additional.fields 物件:Operation Mode 。 |
ProxyId |
security_result.detection_fields.value |
原始記錄中的 ProxyId 值會指派給具有 key 的 security_result.detection_fields 物件:Proxy Id 。 |
RequestUserAgent |
network.http.user_agent |
原始記錄中的 RequestUserAgent 值會指派給 network.http.user_agent 欄位。 |
SessionGuid |
network.session_id |
原始記錄中的 SessionGuid 值會指派給 network.session_id 欄位。 |
Tenant |
additional.fields.value.string_value |
原始記錄中的 Tenant 值會指派給具有 key 的 additional.fields 物件:Tenant 。 |
ThreadType |
additional.fields.value.string_value |
原始記錄中的 ThreadType 值會指派給具有 key 的 additional.fields 物件:Thread Type 。 |
UserType |
principal.user.attribute.roles.name |
原始記錄中的 UserType 值會指派給 principal.user.attribute.roles.name 欄位。 |
WhenOccurred |
metadata.event_timestamp |
系統會剖析原始記錄中的 WhenOccurred 值,並指派給 metadata.event_timestamp 欄位。這個欄位也會填入頂層 timestamp 欄位。硬式編碼值「SSO」。由 EventType 欄位決定。如果沒有 EventType 或不符合任何特定條件,則預設為 STATUS_UPDATE 。可以是 USER_LOGIN 、USER_CREATION 、USER_RESOURCE_ACCESS 、USER_LOGOUT 或 USER_CHANGE_PASSWORD 。硬式編碼值「CENTRIFY_SSO」。硬式編碼值「SSO」。硬式編碼值「Centrify」。如果 message 欄位包含工作階段 ID,系統會擷取並使用該 ID。否則預設為「1」。如果有的話,會從 host 欄位擷取,該欄位來自系統記錄檔標頭。如果有的話,會從 pid 欄位擷取,該欄位來自系統記錄檔標頭。如果存在 UserGuid ,系統會使用其值。否則,如果 message 欄位包含使用者 ID,系統會擷取並使用該 ID。如果 Level 為「Info」,請設為「ALLOW」,如果 FailReason 存在,請設為「BLOCK」。如果存在 FailReason ,請設為「AUTH_VIOLATION」。由 Level 欄位決定。如果 Level 為「Info」,則設為「INFORMATIONAL」;如果 Level 為「Warning」,則設為「MEDIUM」;如果 Level 為「Error」,則設為「ERROR」。 |
還有其他問題嗎?向社群成員和 Google SecOps 專業人員尋求答案。