Raccogliere i log SSO di Delinea
Questo documento spiega come importare i log del servizio Single Sign-On (SSO) di Delinea (in precedenza Centrify) in Google Security Operations utilizzando Amazon S3. Il parser estrae i log, gestendo
i formati JSON e syslog. Analizza coppie chiave-valore, timestamp e altri
campi pertinenti, mappandoli al modello UDM, con una logica specifica per la gestione
di errori di accesso, user agent, livelli di gravità, meccanismi di autenticazione e
vari tipi di eventi. Dà la priorità a FailUserName
rispetto a NormalizedUser
per
gli indirizzi email di destinazione negli eventi di errore.
Prima di iniziare
Assicurati di soddisfare i seguenti prerequisiti:
- Un'istanza Google SecOps.
- Accesso con privilegi al tenant SSO Delinea (Centrify).
- Accesso privilegiato ad AWS (S3, Identity and Access Management (IAM), Lambda, EventBridge).
Raccogli i prerequisiti per l'SSO Delinea (Centrify) (ID, chiavi API, ID organizzazione, token)
- Accedi al portale di amministrazione Delinea.
- Vai ad App > Aggiungi app.
- Cerca OAuth2 Client e fai clic su Aggiungi.
- Fai clic su Sì nella finestra di dialogo Aggiungi app web.
- Fai clic su Chiudi nella finestra di dialogo Aggiungi app web.
- Nella pagina Configurazione applicazione, configura quanto segue:
- Scheda Generale:
- ID applicazione: inserisci un identificatore univoco (ad esempio,
secops-oauth-client
) - Application Name (Nome applicazione): inserisci un nome descrittivo (ad esempio,
SecOps Data Export
) - Descrizione dell'applicazione: inserisci una descrizione (ad esempio,
OAuth client for exporting audit events to SecOps
)
- ID applicazione: inserisci un identificatore univoco (ad esempio,
- Scheda Fiducia:
- La richiesta è riservata: seleziona questa opzione
- Tipo di ID cliente: seleziona Riservato
- ID client emesso: copia e salva questo valore.
- Client secret emesso: copia e salva questo valore
- Scheda Token:
- Auth methods (Metodi di autenticazione): seleziona Client Creds (Credenziali client)
- Tipo di token: seleziona Jwt RS256.
- Scheda Ambito:
- Aggiungi l'ambito siem con la descrizione SIEM Integration Access.
- Aggiungi l'ambito redrock/query con la descrizione Query API Access.
- Scheda Generale:
- Fai clic su Salva per creare il client OAuth.
- Vai a Servizi di base > Utenti > Aggiungi utente.
- Configura l'utente del servizio:
- Nome di accesso: inserisci l'ID client del passaggio 6.
- Indirizzo email: inserisci un indirizzo email valido (campo obbligatorio).
- Nome visualizzato: inserisci un nome descrittivo (ad esempio,
SecOps Service User
). - Password e Conferma password: inserisci il client secret del passaggio 6.
- Stato: seleziona È un client OAuth confidenziale.
- Fai clic su Create User (Crea utente).
- Vai a Accesso > Ruoli e assegna l'utente del servizio a un ruolo con le autorizzazioni appropriate per eseguire query sugli eventi di controllo.
- Copia e salva in una posizione sicura i seguenti dettagli:
- URL tenant: l'URL tenant di Centrify (ad esempio,
https://yourtenant.my.centrify.com
) - ID client: dal passaggio 6
- Client secret: dal passaggio 6
- ID applicazione OAuth: dalla configurazione dell'applicazione
- URL tenant: l'URL tenant di Centrify (ad esempio,
Configura il bucket AWS S3 e IAM per Google SecOps
- Crea un bucket Amazon S3 seguendo questa guida utente: Creazione di un bucket.
- Salva il nome e la regione del bucket per riferimento futuro (ad esempio,
delinea-centrify-logs-bucket
). - Crea un utente seguendo questa guida utente: Creazione di un utente IAM.
- Seleziona l'utente creato.
- Seleziona la scheda Credenziali di sicurezza.
- Fai clic su Crea chiave di accesso nella sezione Chiavi di accesso.
- Seleziona Servizio di terze parti come Caso d'uso.
- Fai clic su Avanti.
- (Facoltativo) Aggiungi un tag di descrizione.
- Fai clic su Crea chiave di accesso.
- Fai clic su Scarica file .CSV per salvare la chiave di accesso e la chiave di accesso segreta per riferimento futuro.
- Fai clic su Fine.
- Seleziona la scheda Autorizzazioni.
- Fai clic su Aggiungi autorizzazioni nella sezione Criteri per le autorizzazioni.
- Seleziona Aggiungi autorizzazioni.
- Seleziona Allega direttamente i criteri.
- Cerca i criteri AmazonS3FullAccess.
- Seleziona la policy.
- Fai clic su Avanti.
- Fai clic su Aggiungi autorizzazioni.
Configura il ruolo e il criterio IAM per i caricamenti S3
- Nella console AWS, vai a IAM > Policy.
- Fai clic su Crea criterio > scheda JSON.
- Copia e incolla i seguenti criteri.
JSON delle policy (sostituisci
delinea-centrify-logs-bucket
se hai inserito un nome del bucket diverso):{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket/centrify-sso-logs/state.json" } ] }
Fai clic su Avanti > Crea criterio.
Vai a IAM > Ruoli.
Fai clic su Crea ruolo > Servizio AWS > Lambda.
Collega il criterio appena creato e il criterio gestito AWSLambdaBasicExecutionRole (per la registrazione CloudWatch).
Assegna al ruolo il nome
CentrifySSOLogExportRole
e fai clic su Crea ruolo.
Crea la funzione Lambda
- Nella console AWS, vai a Lambda > Funzioni > Crea funzione.
- Fai clic su Crea autore da zero.
Fornisci i seguenti dettagli di configurazione:
Impostazione Valore Nome CentrifySSOLogExport
Tempo di esecuzione Python 3.13 Architettura x86_64 Ruolo di esecuzione CentrifySSOLogExportRole
Dopo aver creato la funzione, apri la scheda Codice, elimina lo stub e incolla il seguente codice (
CentrifySSOLogExport.py
).import json import boto3 import requests import base64 from datetime import datetime, timedelta import os from typing import Dict, List, Optional def lambda_handler(event, context): """ Lambda function to fetch Delinea Centrify SSO audit events and store them in S3 """ # Environment variables S3_BUCKET = os.environ['S3_BUCKET'] S3_PREFIX = os.environ['S3_PREFIX'] STATE_KEY = os.environ['STATE_KEY'] # Centrify API credentials TENANT_URL = os.environ['TENANT_URL'] CLIENT_ID = os.environ['CLIENT_ID'] CLIENT_SECRET = os.environ['CLIENT_SECRET'] OAUTH_APP_ID = os.environ['OAUTH_APP_ID'] # Optional parameters PAGE_SIZE = int(os.environ.get('PAGE_SIZE', '1000')) MAX_PAGES = int(os.environ.get('MAX_PAGES', '10')) s3_client = boto3.client('s3') try: # Get last execution state last_timestamp = get_last_state(s3_client, S3_BUCKET, STATE_KEY) # Get OAuth access token access_token = get_oauth_token(TENANT_URL, CLIENT_ID, CLIENT_SECRET, OAUTH_APP_ID) # Fetch audit events events = fetch_audit_events(TENANT_URL, access_token, last_timestamp, PAGE_SIZE, MAX_PAGES) if events: # Store events in S3 current_timestamp = datetime.utcnow() filename = f"{S3_PREFIX}centrify-sso-events-{current_timestamp.strftime('%Y%m%d_%H%M%S')}.json" store_events_to_s3(s3_client, S3_BUCKET, filename, events) # Update state with latest timestamp latest_timestamp = get_latest_event_timestamp(events) update_state(s3_client, S3_BUCKET, STATE_KEY, latest_timestamp) print(f"Successfully processed {len(events)} events and stored to {filename}") else: print("No new events found") return { 'statusCode': 200, 'body': json.dumps(f'Successfully processed {len(events) if events else 0} events') } except Exception as e: print(f"Error processing Centrify SSO logs: {str(e)}") return { 'statusCode': 500, 'body': json.dumps(f'Error: {str(e)}') } def get_oauth_token(tenant_url: str, client_id: str, client_secret: str, oauth_app_id: str) -> str: """ Get OAuth access token using client credentials flow """ # Create basic auth token credentials = f"{client_id}:{client_secret}" basic_auth = base64.b64encode(credentials.encode()).decode() token_url = f"{tenant_url}/oauth2/token/{oauth_app_id}" headers = { 'Authorization': f'Basic {basic_auth}', 'X-CENTRIFY-NATIVE-CLIENT': 'True', 'Content-Type': 'application/x-www-form-urlencoded' } data = { 'grant_type': 'client_credentials', 'scope': 'siem redrock/query' } response = requests.post(token_url, headers=headers, data=data) response.raise_for_status() token_data = response.json() return token_data['access_token'] def fetch_audit_events(tenant_url: str, access_token: str, last_timestamp: str, page_size: int, max_pages: int) -> List[Dict]: """ Fetch audit events from Centrify using the Redrock/query API """ query_url = f"{tenant_url}/Redrock/query" headers = { 'Authorization': f'Bearer {access_token}', 'X-CENTRIFY-NATIVE-CLIENT': 'True', 'Content-Type': 'application/json' } # Build SQL query with timestamp filter if last_timestamp: sql_query = f"Select * from Event where WhenOccurred > '{last_timestamp}' ORDER BY WhenOccurred ASC" else: # First run - get events from last 24 hours sql_query = "Select * from Event where WhenOccurred > datefunc('now', '-1') ORDER BY WhenOccurred ASC" payload = { "Script": sql_query, "args": { "PageSize": page_size, "Limit": page_size * max_pages, "Caching": -1 } } response = requests.post(query_url, headers=headers, json=payload) response.raise_for_status() response_data = response.json() if not response_data.get('success', False): raise Exception(f"API query failed: {response_data.get('Message', 'Unknown error')}") # Parse the response result = response_data.get('Result', {}) columns = {col['Name']: i for i, col in enumerate(result.get('Columns', []))} raw_results = result.get('Results', []) events = [] for raw_event in raw_results: event = {} row_data = raw_event.get('Row', {}) # Map column names to values for col_name, col_index in columns.items(): if col_name in row_data and row_data[col_name] is not None: event[col_name] = row_data[col_name] # Add metadata event['_source'] = 'centrify_sso' event['_collected_at'] = datetime.utcnow().isoformat() + 'Z' events.append(event) return events def get_last_state(s3_client, bucket: str, state_key: str) -> Optional[str]: """ Get the last processed timestamp from S3 state file """ try: response = s3_client.get_object(Bucket=bucket, Key=state_key) state_data = json.loads(response['Body'].read().decode('utf-8')) return state_data.get('last_timestamp') except s3_client.exceptions.NoSuchKey: print("No previous state found, starting from 24 hours ago") return None except Exception as e: print(f"Error reading state: {e}") return None def update_state(s3_client, bucket: str, state_key: str, timestamp: str): """ Update the state file with the latest processed timestamp """ state_data = { 'last_timestamp': timestamp, 'updated_at': datetime.utcnow().isoformat() + 'Z' } s3_client.put_object( Bucket=bucket, Key=state_key, Body=json.dumps(state_data), ContentType='application/json' ) def store_events_to_s3(s3_client, bucket: str, key: str, events: List[Dict]): """ Store events as JSONL (one JSON object per line) in S3 """ # Convert to JSONL format (one JSON object per line) jsonl_content = 'n'.join(json.dumps(event, default=str) for event in events) s3_client.put_object( Bucket=bucket, Key=key, Body=jsonl_content, ContentType='application/x-ndjson' ) def get_latest_event_timestamp(events: List[Dict]) -> str: """ Get the latest timestamp from the events for state tracking """ if not events: return datetime.utcnow().isoformat() + 'Z' latest = None for event in events: when_occurred = event.get('WhenOccurred') if when_occurred: if latest is None or when_occurred > latest: latest = when_occurred return latest or datetime.utcnow().isoformat() + 'Z'
Vai a Configurazione > Variabili di ambiente.
Fai clic su Modifica > Aggiungi nuova variabile di ambiente.
Inserisci le variabili di ambiente fornite nella tabella seguente, sostituendo i valori di esempio con i tuoi valori.
Variabili di ambiente
Chiave Valore di esempio S3_BUCKET
delinea-centrify-logs-bucket
S3_PREFIX
centrify-sso-logs/
STATE_KEY
centrify-sso-logs/state.json
TENANT_URL
https://yourtenant.my.centrify.com
CLIENT_ID
your-client-id
CLIENT_SECRET
your-client-secret
OAUTH_APP_ID
your-oauth-application-id
OAUTH_SCOPE
siem
PAGE_SIZE
1000
MAX_PAGES
10
Dopo aver creato la funzione, rimani sulla relativa pagina (o apri Lambda > Funzioni > la tua funzione).
Seleziona la scheda Configurazione.
Nel riquadro Configurazione generale, fai clic su Modifica.
Modifica Timeout impostando 5 minuti (300 secondi) e fai clic su Salva.
Creare una pianificazione EventBridge
- Vai a Amazon EventBridge > Scheduler > Crea pianificazione.
- Fornisci i seguenti dettagli di configurazione:
- Programma ricorrente: Tariffa (
1 hour
). - Destinazione: la tua funzione Lambda
CentrifySSOLogExport
. - Nome:
CentrifySSOLogExport-1h
- Programma ricorrente: Tariffa (
- Fai clic su Crea pianificazione.
(Facoltativo) Crea chiavi e utenti IAM di sola lettura per Google SecOps
- Nella console AWS, vai a IAM > Utenti.
- Fai clic su Add users (Aggiungi utenti).
- Fornisci i seguenti dettagli di configurazione:
- Utente: inserisci
secops-reader
. - Tipo di accesso: seleziona Chiave di accesso - Accesso programmatico.
- Utente: inserisci
- Fai clic su Crea utente.
- Allega la criterio per la lettura minima (personalizzata): Utenti > secops-reader > Autorizzazioni.
- Fai clic su Aggiungi autorizzazioni > Allega norme direttamente.
- Seleziona Crea policy.
JSON:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket" } ] }
Name =
secops-reader-policy
.Fai clic su Crea criterio > cerca/seleziona > Avanti.
Fai clic su Aggiungi autorizzazioni.
Crea la chiave di accesso per
secops-reader
: Credenziali di sicurezza > Chiavi di accesso.Fai clic su Crea chiave di accesso.
Scarica il
.CSV
. Incollerai questi valori nel feed.
Configura un feed in Google SecOps per importare i log SSO di Delinea (Centrify)
- Vai a Impostazioni SIEM > Feed.
- Fai clic su + Aggiungi nuovo feed.
- Nel campo Nome feed, inserisci un nome per il feed (ad esempio,
Delinea Centrify SSO logs
). - Seleziona Amazon S3 V2 come Tipo di origine.
- Seleziona Centrify come Tipo di log.
- Fai clic su Avanti.
- Specifica i valori per i seguenti parametri di input:
- URI S3:
s3://delinea-centrify-logs-bucket/centrify-sso-logs/
- Opzioni di eliminazione dell'origine: seleziona l'opzione di eliminazione in base alle tue preferenze.
- Età massima del file: includi i file modificati nell'ultimo numero di giorni. Il valore predefinito è 180 giorni.
- ID chiave di accesso: chiave di accesso utente con accesso al bucket S3.
- Chiave di accesso segreta: chiave segreta dell'utente con accesso al bucket S3.
- Spazio dei nomi dell'asset: lo spazio dei nomi dell'asset.
- Etichette di importazione: l'etichetta applicata agli eventi di questo feed.
- URI S3:
- Fai clic su Avanti.
- Controlla la nuova configurazione del feed nella schermata Finalizza e poi fai clic su Invia.
Tabella di mappatura UDM
Campo log | Mappatura UDM | Logic |
---|---|---|
AccountID |
security_result.detection_fields.value |
Il valore di AccountID del log non elaborato viene assegnato a un oggetto security_result.detection_fields con key :Account ID . |
ApplicationName |
target.application |
Il valore di ApplicationName del log non elaborato viene assegnato al campo target.application . |
AuthorityFQDN |
target.asset.network_domain |
Il valore di AuthorityFQDN del log non elaborato viene assegnato al campo target.asset.network_domain . |
AuthorityID |
target.asset.asset_id |
Il valore di AuthorityID del log non elaborato viene assegnato al campo target.asset.asset_id , con il prefisso "AuthorityID:". |
AzDeploymentId |
security_result.detection_fields.value |
Il valore di AzDeploymentId del log non elaborato viene assegnato a un oggetto security_result.detection_fields con key :AzDeploymentId . |
AzRoleId |
additional.fields.value.string_value |
Il valore di AzRoleId del log non elaborato viene assegnato a un oggetto additional.fields con key :AzRole Id . |
AzRoleName |
target.user.attribute.roles.name |
Il valore di AzRoleName del log non elaborato viene assegnato al campo target.user.attribute.roles.name . |
ComputerFQDN |
principal.asset.network_domain |
Il valore di ComputerFQDN del log non elaborato viene assegnato al campo principal.asset.network_domain . |
ComputerID |
principal.asset.asset_id |
Il valore di ComputerID del log non elaborato viene assegnato al campo principal.asset.asset_id , con il prefisso "ComputerId:". |
ComputerName |
about.hostname |
Il valore di ComputerName del log non elaborato viene assegnato al campo about.hostname . |
CredentialId |
security_result.detection_fields.value |
Il valore di CredentialId del log non elaborato viene assegnato a un oggetto security_result.detection_fields con key :Credential Id . |
DirectoryServiceName |
security_result.detection_fields.value |
Il valore di DirectoryServiceName del log non elaborato viene assegnato a un oggetto security_result.detection_fields con key :Directory Service Name . |
DirectoryServiceNameLocalized |
security_result.detection_fields.value |
Il valore di DirectoryServiceNameLocalized del log non elaborato viene assegnato a un oggetto security_result.detection_fields con key :Directory Service Name Localized . |
DirectoryServiceUuid |
security_result.detection_fields.value |
Il valore di DirectoryServiceUuid del log non elaborato viene assegnato a un oggetto security_result.detection_fields con key :Directory Service Uuid . |
EventMessage |
security_result.summary |
Il valore di EventMessage del log non elaborato viene assegnato al campo security_result.summary . |
EventType |
metadata.product_event_type |
Il valore di EventType del log non elaborato viene assegnato al campo metadata.product_event_type . Viene utilizzato anche per determinare il metadata.event_type . |
FailReason |
security_result.summary |
Se presente, il valore di FailReason del log non elaborato viene assegnato al campo security_result.summary . |
FailUserName |
target.user.email_addresses |
Se presente, il valore di FailUserName del log non elaborato viene assegnato al campo target.user.email_addresses . |
FromIPAddress |
principal.ip |
Il valore di FromIPAddress del log non elaborato viene assegnato al campo principal.ip . |
ID |
security_result.detection_fields.value |
Il valore di ID del log non elaborato viene assegnato a un oggetto security_result.detection_fields con key :ID . |
InternalTrackingID |
metadata.product_log_id |
Il valore di InternalTrackingID del log non elaborato viene assegnato al campo metadata.product_log_id . |
JumpType |
additional.fields.value.string_value |
Il valore di JumpType del log non elaborato viene assegnato a un oggetto additional.fields con key :Jump Type . |
NormalizedUser |
target.user.email_addresses |
Il valore di NormalizedUser del log non elaborato viene assegnato al campo target.user.email_addresses . |
OperationMode |
additional.fields.value.string_value |
Il valore di OperationMode del log non elaborato viene assegnato a un oggetto additional.fields con key :Operation Mode . |
ProxyId |
security_result.detection_fields.value |
Il valore di ProxyId del log non elaborato viene assegnato a un oggetto security_result.detection_fields con key :Proxy Id . |
RequestUserAgent |
network.http.user_agent |
Il valore di RequestUserAgent del log non elaborato viene assegnato al campo network.http.user_agent . |
SessionGuid |
network.session_id |
Il valore di SessionGuid del log non elaborato viene assegnato al campo network.session_id . |
Tenant |
additional.fields.value.string_value |
Il valore di Tenant del log non elaborato viene assegnato a un oggetto additional.fields con key :Tenant . |
ThreadType |
additional.fields.value.string_value |
Il valore di ThreadType del log non elaborato viene assegnato a un oggetto additional.fields con key :Thread Type . |
UserType |
principal.user.attribute.roles.name |
Il valore di UserType del log non elaborato viene assegnato al campo principal.user.attribute.roles.name . |
WhenOccurred |
metadata.event_timestamp |
Il valore di WhenOccurred del log non elaborato viene analizzato e assegnato al campo metadata.event_timestamp . Questo campo compila anche il campo timestamp di primo livello. Valore hardcoded "SSO". Determinato dal campo EventType . Il valore predefinito è STATUS_UPDATE se EventType non è presente o non corrisponde a criteri specifici. Può essere USER_LOGIN , USER_CREATION , USER_RESOURCE_ACCESS , USER_LOGOUT o USER_CHANGE_PASSWORD . Valore hardcoded "CENTRIFY_SSO". Valore hardcoded "SSO". Valore hardcoded "Centrify". Se il campo message contiene un ID sessione, questo viene estratto e utilizzato. In caso contrario, il valore predefinito è "1". Estratto dal campo host , se disponibile, che proviene dall'intestazione syslog. Estratto dal campo pid , se disponibile, che proviene dall'intestazione syslog. Se è presente UserGuid , viene utilizzato il relativo valore. In caso contrario, se il campo message contiene un ID utente, questo viene estratto e utilizzato. Imposta "ALLOW" se Level è "Info" e "BLOCK" se è presente FailReason . Impostato su "AUTH_VIOLATION" se è presente FailReason . Determinato dal campo Level . Impostato su "INFORMATIONAL" se Level è "Info", "MEDIUM" se Level è "Warning" e "ERROR" se Level è "Error". |
Hai bisogno di ulteriore assistenza? Ricevi risposte dai membri della community e dai professionisti di Google SecOps.