收集 Delinea 單一登入記錄

支援的國家/地區:

本文說明如何使用 Amazon S3,將 Delinea (前身為 Centrify) 單一登入 (SSO) 記錄擷取至 Google Security Operations。剖析器會擷取記錄,並處理 JSON 和系統記錄格式。並將鍵/值組合、時間戳記和其他相關欄位對應至 UDM 模型,以及處理登入失敗、使用者代理程式、嚴重程度、驗證機制和各種事件類型的特定邏輯。在失敗事件中,系統會優先使用 FailUserName,而非 NormalizedUser 做為目標電子郵件地址。

事前準備

請確認您已完成下列事前準備事項:

  • Google SecOps 執行個體。
  • Delinea (Centrify) SSO 租戶的特殊存取權。
  • AWS 的特殊權限 (S3、Identity and Access Management (IAM)、Lambda、EventBridge)。

收集 Delinea (Centrify) 單一登入先決條件 (ID、API 金鑰、機構 ID、權杖)

  1. 登入 Delinea 管理入口網站
  2. 依序前往「應用程式」>「新增應用程式」
  3. 搜尋「OAuth2 Client」,然後按一下「Add」
  4. 在「新增網頁應用程式」對話方塊中,按一下「是」
  5. 按一下「新增網頁應用程式」對話方塊中的「關閉」
  6. 在「Application Configuration」頁面中,設定下列項目:
    • 「一般」分頁:
      • 「Application ID」:輸入專屬 ID (例如 secops-oauth-client)
      • 應用程式名稱:輸入描述性名稱 (例如 SecOps Data Export)
      • 應用程式說明:輸入說明 (例如 OAuth client for exporting audit events to SecOps)
    • 「信任」分頁:
      • 應用程式為機密內容:勾選這個選項
      • 用戶端 ID 類型:選取「機密」
      • 已核發的用戶端 ID:複製並儲存這個值
      • 核發的用戶端密鑰:複製並儲存這個值
    • 「符記」分頁:
      • 驗證方法:選取「用戶端憑證」
      • 權杖類型:選取「Jwt RS256」
    • 「範圍」分頁:
      • 新增範圍 siem,並加上「SIEM Integration Access」說明。
      • 新增範圍 redrock/query,並加上「Query API Access」說明。
  7. 按一下「儲存」即可建立 OAuth 用戶端。
  8. 依序前往「核心服務」>「使用者」>「新增使用者」。
  9. 設定服務使用者:
    • 「登入名稱」:輸入步驟 6 中的「用戶端 ID」
    • 電子郵件地址:輸入有效的電子郵件地址 (必填欄位)。
    • 顯示名稱:輸入描述性名稱 (例如 SecOps Service User)。
    • 「密碼」和「確認密碼」:輸入步驟 6 中的「用戶端密鑰」
    • 狀態:選取「Is OAuth confidential client」(OAuth 機密用戶端)。
  10. 按一下「Create User」
  11. 依序前往「存取權」>「角色」,然後將服務使用者指派給具有適當權限的角色,以便查詢稽核事件。
  12. 複製下列詳細資料並存放在安全位置:
    • 租戶網址:您的 Centrify 租戶網址 (例如 https://yourtenant.my.centrify.com)
    • 用戶端 ID:來自步驟 6
    • 用戶端密鑰:來自步驟 6
    • OAuth 應用程式 ID:來自應用程式設定

為 Google SecOps 設定 AWS S3 值區和 IAM

  1. 按照這份使用者指南建立 Amazon S3 bucket建立 bucket
  2. 儲存 bucket 的「名稱」和「區域」,以供日後參考 (例如 delinea-centrify-logs-bucket)。
  3. 請按照這份使用者指南建立使用者建立 IAM 使用者
  4. 選取建立的「使用者」
  5. 選取「安全憑證」分頁標籤。
  6. 在「Access Keys」部分中,按一下「Create Access Key」
  7. 選取「第三方服務」做為「用途」
  8. 點選「下一步」
  9. 選用:新增說明標記。
  10. 按一下「建立存取金鑰」
  11. 按一下「下載 .CSV 檔案」,儲存「存取金鑰」和「私密存取金鑰」以供日後參考。
  12. 按一下 [完成]
  13. 選取「權限」分頁標籤。
  14. 在「權限政策」部分中,按一下「新增權限」
  15. 選取「新增權限」
  16. 選取「直接附加政策」
  17. 搜尋「AmazonS3FullAccess」AmazonS3FullAccess政策。
  18. 選取政策。
  19. 點選「下一步」
  20. 按一下「Add permissions」。

設定 S3 上傳的身分與存取權管理政策和角色

  1. AWS 控制台中,依序前往「IAM」>「Policies」(政策)
  2. 按一下「建立政策」>「JSON」分頁
  3. 複製並貼上下列政策。
  4. 政策 JSON (如果您輸入的 bucket 名稱不同,請替換 delinea-centrify-logs-bucket):

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": "s3:GetObject",
          "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket/centrify-sso-logs/state.json"
        }
      ]
    }
    
  5. 依序點選「下一步」>「建立政策」

  6. 依序前往「IAM」>「角色」

  7. 依序點選「建立角色」>「AWS 服務」>「Lambda」

  8. 附加新建立的政策和代管政策 AWSLambdaBasicExecutionRole (適用於 CloudWatch 記錄)。

  9. 為角色命名 CentrifySSOLogExportRole,然後按一下「建立角色」

建立 Lambda 函式

  1. AWS 控制台中,依序前往「Lambda」>「Functions」>「Create function」
  2. 按一下「從頭開始撰寫」
  3. 請提供下列設定詳細資料:

    設定
    名稱 CentrifySSOLogExport
    執行階段 Python 3.13
    架構 x86_64
    執行角色 CentrifySSOLogExportRole
  4. 建立函式後,開啟「程式碼」分頁,刪除存根並貼上以下程式碼 (CentrifySSOLogExport.py)。

    import json
    import boto3
    import requests
    import base64
    from datetime import datetime, timedelta
    import os
    from typing import Dict, List, Optional
    
    def lambda_handler(event, context):
        """
        Lambda function to fetch Delinea Centrify SSO audit events and store them in S3
        """
    
        # Environment variables
        S3_BUCKET = os.environ['S3_BUCKET']
        S3_PREFIX = os.environ['S3_PREFIX']
        STATE_KEY = os.environ['STATE_KEY']
    
        # Centrify API credentials
        TENANT_URL = os.environ['TENANT_URL']
        CLIENT_ID = os.environ['CLIENT_ID']
        CLIENT_SECRET = os.environ['CLIENT_SECRET']
        OAUTH_APP_ID = os.environ['OAUTH_APP_ID']
    
        # Optional parameters
        PAGE_SIZE = int(os.environ.get('PAGE_SIZE', '1000'))
        MAX_PAGES = int(os.environ.get('MAX_PAGES', '10'))
    
        s3_client = boto3.client('s3')
    
        try:
            # Get last execution state
            last_timestamp = get_last_state(s3_client, S3_BUCKET, STATE_KEY)
    
            # Get OAuth access token
            access_token = get_oauth_token(TENANT_URL, CLIENT_ID, CLIENT_SECRET, OAUTH_APP_ID)
    
            # Fetch audit events
            events = fetch_audit_events(TENANT_URL, access_token, last_timestamp, PAGE_SIZE, MAX_PAGES)
    
            if events:
                # Store events in S3
                current_timestamp = datetime.utcnow()
                filename = f"{S3_PREFIX}centrify-sso-events-{current_timestamp.strftime('%Y%m%d_%H%M%S')}.json"
    
                store_events_to_s3(s3_client, S3_BUCKET, filename, events)
    
                # Update state with latest timestamp
                latest_timestamp = get_latest_event_timestamp(events)
                update_state(s3_client, S3_BUCKET, STATE_KEY, latest_timestamp)
    
                print(f"Successfully processed {len(events)} events and stored to {filename}")
            else:
                print("No new events found")
    
            return {
                'statusCode': 200,
                'body': json.dumps(f'Successfully processed {len(events) if events else 0} events')
            }
    
        except Exception as e:
            print(f"Error processing Centrify SSO logs: {str(e)}")
            return {
                'statusCode': 500,
                'body': json.dumps(f'Error: {str(e)}')
            }
    
    def get_oauth_token(tenant_url: str, client_id: str, client_secret: str, oauth_app_id: str) -> str:
        """
        Get OAuth access token using client credentials flow
        """
    
        # Create basic auth token
        credentials = f"{client_id}:{client_secret}"
        basic_auth = base64.b64encode(credentials.encode()).decode()
    
        token_url = f"{tenant_url}/oauth2/token/{oauth_app_id}"
    
        headers = {
            'Authorization': f'Basic {basic_auth}',
            'X-CENTRIFY-NATIVE-CLIENT': 'True',
            'Content-Type': 'application/x-www-form-urlencoded'
        }
    
        data = {
            'grant_type': 'client_credentials',
            'scope': 'siem redrock/query'
        }
    
        response = requests.post(token_url, headers=headers, data=data)
        response.raise_for_status()
    
        token_data = response.json()
        return token_data['access_token']
    
    def fetch_audit_events(tenant_url: str, access_token: str, last_timestamp: str, page_size: int, max_pages: int) -> List[Dict]:
        """
        Fetch audit events from Centrify using the Redrock/query API
        """
    
        query_url = f"{tenant_url}/Redrock/query"
    
        headers = {
            'Authorization': f'Bearer {access_token}',
            'X-CENTRIFY-NATIVE-CLIENT': 'True',
            'Content-Type': 'application/json'
        }
    
        # Build SQL query with timestamp filter
        if last_timestamp:
            sql_query = f"Select * from Event where WhenOccurred > '{last_timestamp}' ORDER BY WhenOccurred ASC"
        else:
            # First run - get events from last 24 hours
            sql_query = "Select * from Event where WhenOccurred > datefunc('now', '-1') ORDER BY WhenOccurred ASC"
    
        payload = {
            "Script": sql_query,
            "args": {
                "PageSize": page_size,
                "Limit": page_size * max_pages,
                "Caching": -1
            }
        }
    
        response = requests.post(query_url, headers=headers, json=payload)
        response.raise_for_status()
    
        response_data = response.json()
    
        if not response_data.get('success', False):
            raise Exception(f"API query failed: {response_data.get('Message', 'Unknown error')}")
    
        # Parse the response
        result = response_data.get('Result', {})
        columns = {col['Name']: i for i, col in enumerate(result.get('Columns', []))}
        raw_results = result.get('Results', [])
    
        events = []
        for raw_event in raw_results:
            event = {}
            row_data = raw_event.get('Row', {})
    
            # Map column names to values
            for col_name, col_index in columns.items():
                if col_name in row_data and row_data[col_name] is not None:
                    event[col_name] = row_data[col_name]
    
            # Add metadata
            event['_source'] = 'centrify_sso'
            event['_collected_at'] = datetime.utcnow().isoformat() + 'Z'
    
            events.append(event)
    
        return events
    
    def get_last_state(s3_client, bucket: str, state_key: str) -> Optional[str]:
        """
        Get the last processed timestamp from S3 state file
        """
        try:
            response = s3_client.get_object(Bucket=bucket, Key=state_key)
            state_data = json.loads(response['Body'].read().decode('utf-8'))
            return state_data.get('last_timestamp')
        except s3_client.exceptions.NoSuchKey:
            print("No previous state found, starting from 24 hours ago")
            return None
        except Exception as e:
            print(f"Error reading state: {e}")
            return None
    
    def update_state(s3_client, bucket: str, state_key: str, timestamp: str):
        """
        Update the state file with the latest processed timestamp
        """
        state_data = {
            'last_timestamp': timestamp,
            'updated_at': datetime.utcnow().isoformat() + 'Z'
        }
    
        s3_client.put_object(
            Bucket=bucket,
            Key=state_key,
            Body=json.dumps(state_data),
            ContentType='application/json'
        )
    
    def store_events_to_s3(s3_client, bucket: str, key: str, events: List[Dict]):
        """
        Store events as JSONL (one JSON object per line) in S3
        """
        # Convert to JSONL format (one JSON object per line)
        jsonl_content = 'n'.join(json.dumps(event, default=str) for event in events)
    
        s3_client.put_object(
            Bucket=bucket,
            Key=key,
            Body=jsonl_content,
            ContentType='application/x-ndjson'
        )
    
    def get_latest_event_timestamp(events: List[Dict]) -> str:
        """
        Get the latest timestamp from the events for state tracking
        """
        if not events:
            return datetime.utcnow().isoformat() + 'Z'
    
        latest = None
        for event in events:
            when_occurred = event.get('WhenOccurred')
            if when_occurred:
                if latest is None or when_occurred > latest:
                    latest = when_occurred
    
        return latest or datetime.utcnow().isoformat() + 'Z'
    
  5. 依序前往「設定」>「環境變數」

  6. 依序點選「編輯」> 新增環境變數

  7. 輸入下表提供的環境變數,並將範例值換成您的值。

    環境變數

    範例值
    S3_BUCKET delinea-centrify-logs-bucket
    S3_PREFIX centrify-sso-logs/
    STATE_KEY centrify-sso-logs/state.json
    TENANT_URL https://yourtenant.my.centrify.com
    CLIENT_ID your-client-id
    CLIENT_SECRET your-client-secret
    OAUTH_APP_ID your-oauth-application-id
    OAUTH_SCOPE siem
    PAGE_SIZE 1000
    MAX_PAGES 10
  8. 建立函式後,請留在函式頁面 (或依序開啟「Lambda」>「Functions」>「your-function」)。

  9. 選取「設定」分頁標籤。

  10. 在「一般設定」面板中,按一下「編輯」

  11. 將「Timeout」(逾時間隔) 變更為「5 minutes (300 seconds)」(5 分鐘 (300 秒)),然後按一下「Save」(儲存)

建立 EventBridge 排程

  1. 依序前往「Amazon EventBridge」>「Scheduler」>「Create schedule」
  2. 提供下列設定詳細資料:
    • 週期性時間表費率 (1 hour)。
    • 目標:您的 Lambda 函式 CentrifySSOLogExport
    • 名稱CentrifySSOLogExport-1h
  3. 按一下「建立時間表」

(選用) 為 Google SecOps 建立唯讀 IAM 使用者和金鑰

  1. AWS 控制台中,依序前往「IAM」>「Users」
  2. 點選 [Add users] (新增使用者)。
  3. 提供下列設定詳細資料:
    • 使用者:輸入 secops-reader
    • 存取類型:選取「存取金鑰 - 程式輔助存取」
  4. 按一下「建立使用者」
  5. 附加最低讀取政策 (自訂):依序點選「使用者」>「secops-reader」>「權限」
  6. 依序點選「新增權限」>「直接附加政策」
  7. 選取「建立政策」
  8. JSON:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket"
        }
      ]
    }
    
  9. Name = secops-reader-policy

  10. 依序點選「建立政策」> 搜尋/選取 >「下一步」

  11. 按一下「Add permissions」。

  12. secops-reader 建立存取金鑰:依序點選「安全憑證」>「存取金鑰」

  13. 按一下「建立存取金鑰」

  14. 下載 .CSV。(您會將這些值貼到動態饋給中)。

在 Google SecOps 中設定動態饋給,擷取 Delinea (Centrify) 單一登入記錄

  1. 依序前往「SIEM 設定」>「動態饋給」
  2. 按一下「+ 新增動態消息」
  3. 在「動態饋給名稱」欄位中輸入動態饋給名稱 (例如 Delinea Centrify SSO logs)。
  4. 選取「Amazon S3 V2」做為「來源類型」
  5. 選取「Centrify」做為「記錄類型」。
  6. 點選「下一步」
  7. 指定下列輸入參數的值:
    • S3 URIs3://delinea-centrify-logs-bucket/centrify-sso-logs/
    • 來源刪除選項:根據偏好設定選取刪除選項。
    • 檔案存在時間上限:包含在過去天數內修改的檔案。預設值為 180 天。
    • 存取金鑰 ID:具有 S3 值區存取權的使用者存取金鑰。
    • 存取密鑰:具有 S3 bucket 存取權的使用者私密金鑰。
    • 資產命名空間資產命名空間
    • 擷取標籤:套用至這個動態饋給事件的標籤。
  8. 點選「下一步」
  9. 在「完成」畫面中檢查新的動態饋給設定,然後按一下「提交」

UDM 對應表

記錄欄位 UDM 對應 邏輯
AccountID security_result.detection_fields.value 原始記錄中的 AccountID 值會指派給 security_result.detection_fields 物件,並帶有 key:Account ID
ApplicationName target.application 原始記錄中的 ApplicationName 值會指派給 target.application 欄位。
AuthorityFQDN target.asset.network_domain 原始記錄中的 AuthorityFQDN 值會指派給 target.asset.network_domain 欄位。
AuthorityID target.asset.asset_id 原始記錄中的 AuthorityID 值會指派給 target.asset.asset_id 欄位,並加上「AuthorityID:」前置字元。
AzDeploymentId security_result.detection_fields.value 原始記錄中的 AzDeploymentId 值會指派給具有 keysecurity_result.detection_fields 物件:AzDeploymentId
AzRoleId additional.fields.value.string_value 原始記錄中的 AzRoleId 值會指派給具有 keyadditional.fields 物件:AzRole Id
AzRoleName target.user.attribute.roles.name 原始記錄中的 AzRoleName 值會指派給 target.user.attribute.roles.name 欄位。
ComputerFQDN principal.asset.network_domain 原始記錄中的 ComputerFQDN 值會指派給 principal.asset.network_domain 欄位。
ComputerID principal.asset.asset_id 原始記錄中的 ComputerID 值會指派給 principal.asset.asset_id 欄位,並加上「ComputerId:」前置字元。
ComputerName about.hostname 原始記錄中的 ComputerName 值會指派給 about.hostname 欄位。
CredentialId security_result.detection_fields.value 原始記錄中的 CredentialId 值會指派給具有 keysecurity_result.detection_fields 物件:Credential Id
DirectoryServiceName security_result.detection_fields.value 原始記錄中的 DirectoryServiceName 值會指派給具有 keysecurity_result.detection_fields 物件:Directory Service Name
DirectoryServiceNameLocalized security_result.detection_fields.value 原始記錄中的 DirectoryServiceNameLocalized 值會指派給具有 keysecurity_result.detection_fields 物件:Directory Service Name Localized
DirectoryServiceUuid security_result.detection_fields.value 原始記錄中的 DirectoryServiceUuid 值會指派給具有 keysecurity_result.detection_fields 物件:Directory Service Uuid
EventMessage security_result.summary 原始記錄中的 EventMessage 值會指派給 security_result.summary 欄位。
EventType metadata.product_event_type 原始記錄中的 EventType 值會指派給 metadata.product_event_type 欄位。這項資訊也會用於判斷 metadata.event_type
FailReason security_result.summary 如果原始記錄中存在 FailReason 的值,系統會將該值指派給 security_result.summary 欄位。
FailUserName target.user.email_addresses 如果原始記錄中存在 FailUserName 的值,系統會將該值指派給 target.user.email_addresses 欄位。
FromIPAddress principal.ip 原始記錄中的 FromIPAddress 值會指派給 principal.ip 欄位。
ID security_result.detection_fields.value 原始記錄中的 ID 值會指派給具有 keysecurity_result.detection_fields 物件:ID
InternalTrackingID metadata.product_log_id 原始記錄中的 InternalTrackingID 值會指派給 metadata.product_log_id 欄位。
JumpType additional.fields.value.string_value 原始記錄中的 JumpType 值會指派給具有 keyadditional.fields 物件:Jump Type
NormalizedUser target.user.email_addresses 原始記錄中的 NormalizedUser 值會指派給 target.user.email_addresses 欄位。
OperationMode additional.fields.value.string_value 原始記錄中的 OperationMode 值會指派給具有 keyadditional.fields 物件:Operation Mode
ProxyId security_result.detection_fields.value 原始記錄中的 ProxyId 值會指派給具有 keysecurity_result.detection_fields 物件:Proxy Id
RequestUserAgent network.http.user_agent 原始記錄中的 RequestUserAgent 值會指派給 network.http.user_agent 欄位。
SessionGuid network.session_id 原始記錄中的 SessionGuid 值會指派給 network.session_id 欄位。
Tenant additional.fields.value.string_value 原始記錄中的 Tenant 值會指派給具有 keyadditional.fields 物件:Tenant
ThreadType additional.fields.value.string_value 原始記錄中的 ThreadType 值會指派給具有 keyadditional.fields 物件:Thread Type
UserType principal.user.attribute.roles.name 原始記錄中的 UserType 值會指派給 principal.user.attribute.roles.name 欄位。
WhenOccurred metadata.event_timestamp 系統會剖析原始記錄中的 WhenOccurred 值,並指派給 metadata.event_timestamp 欄位。這個欄位也會填入頂層 timestamp 欄位。硬式編碼值「SSO」。由 EventType 欄位決定。如果沒有 EventType 或不符合任何特定條件,則預設為 STATUS_UPDATE。可以是 USER_LOGINUSER_CREATIONUSER_RESOURCE_ACCESSUSER_LOGOUTUSER_CHANGE_PASSWORD。硬式編碼值「CENTRIFY_SSO」。硬式編碼值「SSO」。硬式編碼值「Centrify」。如果 message 欄位包含工作階段 ID,系統會擷取並使用該 ID。否則預設為「1」。如果有的話,會從 host 欄位擷取,該欄位來自系統記錄檔標頭。如果有的話,會從 pid 欄位擷取,該欄位來自系統記錄檔標頭。如果存在 UserGuid,系統會使用其值。否則,如果 message 欄位包含使用者 ID,系統會擷取並使用該 ID。如果 Level 為「Info」,請設為「ALLOW」,如果 FailReason 存在,請設為「BLOCK」。如果存在 FailReason,請設為「AUTH_VIOLATION」。由 Level 欄位決定。如果 Level 為「Info」,則設為「INFORMATIONAL」;如果 Level 為「Warning」,則設為「MEDIUM」;如果 Level 為「Error」,則設為「ERROR」。

還有其他問題嗎?向社群成員和 Google SecOps 專業人員尋求答案。