Recopilar registros de SSO de Delinea
En este documento se explica cómo ingerir registros de inicio de sesión único (SSO) de Delinea (antes Centrify) en Google Security Operations mediante Amazon S3. El analizador extrae los registros y gestiona los formatos JSON y syslog. Analiza pares clave-valor, marcas de tiempo y otros campos relevantes, y los asigna al modelo de datos unificado, con una lógica específica para gestionar los fallos de inicio de sesión, los agentes de usuario, los niveles de gravedad, los mecanismos de autenticación y varios tipos de eventos. Prioriza FailUserName
sobre NormalizedUser
para las direcciones de correo de destino en los eventos de error.
Antes de empezar
Asegúrate de que cumples los siguientes requisitos previos:
- Una instancia de Google SecOps.
- Acceso privilegiado al arrendatario de Delinea (Centrify) SSO.
- Acceso privilegiado a AWS (S3, Gestión de Identidades y Accesos [IAM], Lambda y EventBridge).
Recopilar los requisitos previos de SSO de Delinea (Centrify) (IDs, claves de API, IDs de organización y tokens)
- Inicia sesión en el portal de administración de Delinea.
- Ve a Aplicaciones > Añadir aplicaciones.
- Busca Cliente de OAuth2 y haz clic en Añadir.
- Haz clic en Sí en el cuadro de diálogo Añadir aplicación web.
- Haz clic en Cerrar en el cuadro de diálogo Añadir aplicaciones web.
- En la página Configuración de la aplicación, configure lo siguiente:
- Pestaña General:
- ID de aplicación: introduzca un identificador único (por ejemplo,
secops-oauth-client
). - Nombre de la aplicación: introduce un nombre descriptivo (por ejemplo,
SecOps Data Export
). - Descripción de la aplicación: introduce una descripción (por ejemplo,
OAuth client for exporting audit events to SecOps
).
- ID de aplicación: introduzca un identificador único (por ejemplo,
- Pestaña Confianza:
- La solicitud es confidencial: marca esta opción.
- Tipo de ID de cliente: selecciona Confidencial.
- ID de cliente emitido: copia y guarda este valor.
- Secreto de cliente emitido: copia y guarda este valor.
- Pestaña Tokens:
- Métodos de autenticación: selecciona Credenciales de cliente.
- Tipo de token: selecciona Jwt RS256.
- Pestaña Ámbito:
- Añade el ámbito siem con la descripción SIEM Integration Access (Acceso a la integración de SIEM).
- Añade el ámbito redrock/query con la descripción Acceso a la API Query.
- Pestaña General:
- Haz clic en Guardar para crear el cliente de OAuth.
- Ve a Servicios principales > Usuarios > Añadir usuario.
- Configura el usuario de servicio:
- Nombre de inicio de sesión: introduce el ID de cliente del paso 6.
- Dirección de correo electrónico: introduce una dirección de correo válida (campo obligatorio).
- Nombre visible: introduce un nombre descriptivo (por ejemplo,
SecOps Service User
). - Contraseña y Confirmar contraseña: introduce el valor de Client Secret del paso 6.
- Estado: selecciona Es un cliente confidencial de OAuth.
- Haz clic en Create User (Crear usuario).
- Ve a Acceso > Roles y asigna al usuario de servicio un rol con los permisos adecuados para consultar eventos de auditoría.
- Copia y guarda en un lugar seguro los siguientes detalles:
- URL de arrendatario: la URL de su arrendatario de Centrify (por ejemplo,
https://yourtenant.my.centrify.com
) - ID de cliente: del paso 6
- Secreto de cliente: del paso 6
- ID de aplicación OAuth: en la configuración de la aplicación
- URL de arrendatario: la URL de su arrendatario de Centrify (por ejemplo,
Configurar un segmento de AWS S3 y IAM para Google SecOps
- Crea un segmento de Amazon S3 siguiendo esta guía de usuario: Crear un segmento.
- Guarda el nombre y la región del segmento para consultarlos más adelante (por ejemplo,
delinea-centrify-logs-bucket
). - Crea un usuario siguiendo esta guía: Crear un usuario de gestión de identidades y accesos.
- Selecciona el Usuario creado.
- Selecciona la pestaña Credenciales de seguridad.
- En la sección Claves de acceso, haz clic en Crear clave de acceso.
- Selecciona Servicio de terceros en Caso práctico.
- Haz clic en Siguiente.
- Opcional: añade una etiqueta de descripción.
- Haz clic en Crear clave de acceso.
- Haz clic en Descargar archivo .CSV para guardar la clave de acceso y la clave de acceso secreta para futuras consultas.
- Haz clic en Listo.
- Selecciona la pestaña Permisos.
- En la sección Políticas de permisos, haz clic en Añadir permisos.
- Selecciona Añadir permisos.
- Seleccione Adjuntar políticas directamente.
- Busca la política AmazonS3FullAccess.
- Selecciona la política.
- Haz clic en Siguiente.
- Haz clic en Añadir permisos.
Configurar la política y el rol de gestión de identidades y accesos para las subidas de S3
- En la consola de AWS, ve a IAM > Políticas.
- Haz clic en Crear política > pestaña JSON.
- Copia y pega la siguiente política.
JSON de la política (sustituye
delinea-centrify-logs-bucket
si has introducido otro nombre de contenedor):{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket/centrify-sso-logs/state.json" } ] }
Haz clic en Siguiente > Crear política.
Ve a IAM > Roles.
Haga clic en Crear rol > Servicio de AWS > Lambda.
Adjunta la política que acabas de crear y la política gestionada AWSLambdaBasicExecutionRole (para el registro de CloudWatch).
Dale el nombre
CentrifySSOLogExportRole
al rol y haz clic en Crear rol.
Crear la función Lambda
- En la consola de AWS, ve a Lambda > Funciones > Crear función.
- Haz clic en Crear desde cero.
Proporciona los siguientes detalles de configuración:
Ajuste Valor Nombre CentrifySSOLogExport
Tiempo de ejecución Python 3.13 Arquitectura x86_64 Rol de ejecución CentrifySSOLogExportRole
Una vez creada la función, abra la pestaña Código, elimine el stub y pegue el siguiente código (
CentrifySSOLogExport.py
).import json import boto3 import requests import base64 from datetime import datetime, timedelta import os from typing import Dict, List, Optional def lambda_handler(event, context): """ Lambda function to fetch Delinea Centrify SSO audit events and store them in S3 """ # Environment variables S3_BUCKET = os.environ['S3_BUCKET'] S3_PREFIX = os.environ['S3_PREFIX'] STATE_KEY = os.environ['STATE_KEY'] # Centrify API credentials TENANT_URL = os.environ['TENANT_URL'] CLIENT_ID = os.environ['CLIENT_ID'] CLIENT_SECRET = os.environ['CLIENT_SECRET'] OAUTH_APP_ID = os.environ['OAUTH_APP_ID'] # Optional parameters PAGE_SIZE = int(os.environ.get('PAGE_SIZE', '1000')) MAX_PAGES = int(os.environ.get('MAX_PAGES', '10')) s3_client = boto3.client('s3') try: # Get last execution state last_timestamp = get_last_state(s3_client, S3_BUCKET, STATE_KEY) # Get OAuth access token access_token = get_oauth_token(TENANT_URL, CLIENT_ID, CLIENT_SECRET, OAUTH_APP_ID) # Fetch audit events events = fetch_audit_events(TENANT_URL, access_token, last_timestamp, PAGE_SIZE, MAX_PAGES) if events: # Store events in S3 current_timestamp = datetime.utcnow() filename = f"{S3_PREFIX}centrify-sso-events-{current_timestamp.strftime('%Y%m%d_%H%M%S')}.json" store_events_to_s3(s3_client, S3_BUCKET, filename, events) # Update state with latest timestamp latest_timestamp = get_latest_event_timestamp(events) update_state(s3_client, S3_BUCKET, STATE_KEY, latest_timestamp) print(f"Successfully processed {len(events)} events and stored to {filename}") else: print("No new events found") return { 'statusCode': 200, 'body': json.dumps(f'Successfully processed {len(events) if events else 0} events') } except Exception as e: print(f"Error processing Centrify SSO logs: {str(e)}") return { 'statusCode': 500, 'body': json.dumps(f'Error: {str(e)}') } def get_oauth_token(tenant_url: str, client_id: str, client_secret: str, oauth_app_id: str) -> str: """ Get OAuth access token using client credentials flow """ # Create basic auth token credentials = f"{client_id}:{client_secret}" basic_auth = base64.b64encode(credentials.encode()).decode() token_url = f"{tenant_url}/oauth2/token/{oauth_app_id}" headers = { 'Authorization': f'Basic {basic_auth}', 'X-CENTRIFY-NATIVE-CLIENT': 'True', 'Content-Type': 'application/x-www-form-urlencoded' } data = { 'grant_type': 'client_credentials', 'scope': 'siem redrock/query' } response = requests.post(token_url, headers=headers, data=data) response.raise_for_status() token_data = response.json() return token_data['access_token'] def fetch_audit_events(tenant_url: str, access_token: str, last_timestamp: str, page_size: int, max_pages: int) -> List[Dict]: """ Fetch audit events from Centrify using the Redrock/query API """ query_url = f"{tenant_url}/Redrock/query" headers = { 'Authorization': f'Bearer {access_token}', 'X-CENTRIFY-NATIVE-CLIENT': 'True', 'Content-Type': 'application/json' } # Build SQL query with timestamp filter if last_timestamp: sql_query = f"Select * from Event where WhenOccurred > '{last_timestamp}' ORDER BY WhenOccurred ASC" else: # First run - get events from last 24 hours sql_query = "Select * from Event where WhenOccurred > datefunc('now', '-1') ORDER BY WhenOccurred ASC" payload = { "Script": sql_query, "args": { "PageSize": page_size, "Limit": page_size * max_pages, "Caching": -1 } } response = requests.post(query_url, headers=headers, json=payload) response.raise_for_status() response_data = response.json() if not response_data.get('success', False): raise Exception(f"API query failed: {response_data.get('Message', 'Unknown error')}") # Parse the response result = response_data.get('Result', {}) columns = {col['Name']: i for i, col in enumerate(result.get('Columns', []))} raw_results = result.get('Results', []) events = [] for raw_event in raw_results: event = {} row_data = raw_event.get('Row', {}) # Map column names to values for col_name, col_index in columns.items(): if col_name in row_data and row_data[col_name] is not None: event[col_name] = row_data[col_name] # Add metadata event['_source'] = 'centrify_sso' event['_collected_at'] = datetime.utcnow().isoformat() + 'Z' events.append(event) return events def get_last_state(s3_client, bucket: str, state_key: str) -> Optional[str]: """ Get the last processed timestamp from S3 state file """ try: response = s3_client.get_object(Bucket=bucket, Key=state_key) state_data = json.loads(response['Body'].read().decode('utf-8')) return state_data.get('last_timestamp') except s3_client.exceptions.NoSuchKey: print("No previous state found, starting from 24 hours ago") return None except Exception as e: print(f"Error reading state: {e}") return None def update_state(s3_client, bucket: str, state_key: str, timestamp: str): """ Update the state file with the latest processed timestamp """ state_data = { 'last_timestamp': timestamp, 'updated_at': datetime.utcnow().isoformat() + 'Z' } s3_client.put_object( Bucket=bucket, Key=state_key, Body=json.dumps(state_data), ContentType='application/json' ) def store_events_to_s3(s3_client, bucket: str, key: str, events: List[Dict]): """ Store events as JSONL (one JSON object per line) in S3 """ # Convert to JSONL format (one JSON object per line) jsonl_content = 'n'.join(json.dumps(event, default=str) for event in events) s3_client.put_object( Bucket=bucket, Key=key, Body=jsonl_content, ContentType='application/x-ndjson' ) def get_latest_event_timestamp(events: List[Dict]) -> str: """ Get the latest timestamp from the events for state tracking """ if not events: return datetime.utcnow().isoformat() + 'Z' latest = None for event in events: when_occurred = event.get('WhenOccurred') if when_occurred: if latest is None or when_occurred > latest: latest = when_occurred return latest or datetime.utcnow().isoformat() + 'Z'
Vaya a Configuración > Variables de entorno.
Haz clic en Editar > Añadir nueva variable de entorno.
Introduce las variables de entorno que se indican en la siguiente tabla y sustituye los valores de ejemplo por los tuyos.
Variables de entorno
Clave Valor de ejemplo S3_BUCKET
delinea-centrify-logs-bucket
S3_PREFIX
centrify-sso-logs/
STATE_KEY
centrify-sso-logs/state.json
TENANT_URL
https://yourtenant.my.centrify.com
CLIENT_ID
your-client-id
CLIENT_SECRET
your-client-secret
OAUTH_APP_ID
your-oauth-application-id
OAUTH_SCOPE
siem
PAGE_SIZE
1000
MAX_PAGES
10
Una vez creada la función, permanece en su página (o abre Lambda > Funciones > tu-función).
Seleccione la pestaña Configuración.
En el panel Configuración general, haz clic en Editar.
Cambia Tiempo de espera a 5 minutos (300 segundos) y haz clic en Guardar.
Crear una programación de EventBridge
- Ve a Amazon EventBridge > Scheduler > Create schedule (Amazon EventBridge > Programador > Crear programación).
- Proporcione los siguientes detalles de configuración:
- Programación periódica: Precio (
1 hour
). - Destino: tu función Lambda
CentrifySSOLogExport
. - Nombre:
CentrifySSOLogExport-1h
.
- Programación periódica: Precio (
- Haz clic en Crear programación.
(Opcional) Crear un usuario y claves de IAM de solo lectura para Google SecOps
- En la consola de AWS, ve a IAM > Usuarios.
- Haz clic en Add users (Añadir usuarios).
- Proporcione los siguientes detalles de configuración:
- Usuario: introduce
secops-reader
. - Tipo de acceso: selecciona Clave de acceso – Acceso programático.
- Usuario: introduce
- Haz clic en Crear usuario.
- Asigna una política de lectura mínima (personalizada): Usuarios > lector-secops > Permisos.
- Haz clic en Añadir permisos > Adjuntar políticas directamente.
- Selecciona Crear política.
JSON:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket" } ] }
Nombre =
secops-reader-policy
.Haz clic en Crear política > busca o selecciona > Siguiente.
Haz clic en Añadir permisos.
Crea una clave de acceso para
secops-reader
: Credenciales de seguridad > Claves de acceso.Haz clic en Crear clave de acceso.
Descarga la
.CSV
. (Estos valores se pegarán en el feed).
Configurar un feed en Google SecOps para ingerir registros de SSO de Delinea (Centrify)
- Ve a Configuración de SIEM > Feeds.
- Haz clic en + Añadir nuevo feed.
- En el campo Nombre del feed, introduce un nombre para el feed (por ejemplo,
Delinea Centrify SSO logs
). - Selecciona Amazon S3 V2 como Tipo de fuente.
- Seleccione Centrify como Tipo de registro.
- Haz clic en Siguiente.
- Especifique los valores de los siguientes parámetros de entrada:
- URI de S3:
s3://delinea-centrify-logs-bucket/centrify-sso-logs/
- Opciones de eliminación de la fuente: selecciona la opción de eliminación que prefieras.
- Antigüedad máxima del archivo: incluye los archivos modificados en los últimos días. El valor predeterminado es 180 días.
- ID de clave de acceso: clave de acceso de usuario con acceso al bucket de S3.
- Clave de acceso secreta: clave secreta del usuario con acceso al bucket de S3.
- Espacio de nombres de recursos: el espacio de nombres de recursos.
- Etiquetas de ingestión: la etiqueta aplicada a los eventos de este feed.
- URI de S3:
- Haz clic en Siguiente.
- Revise la configuración de la nueva fuente en la pantalla Finalizar y, a continuación, haga clic en Enviar.
Tabla de asignación de UDM
Campo de registro | Asignación de UDM | Lógica |
---|---|---|
AccountID |
security_result.detection_fields.value |
El valor de AccountID del registro sin procesar se asigna a un objeto security_result.detection_fields con key :Account ID . |
ApplicationName |
target.application |
El valor de ApplicationName del registro sin procesar se asigna al campo target.application . |
AuthorityFQDN |
target.asset.network_domain |
El valor de AuthorityFQDN del registro sin procesar se asigna al campo target.asset.network_domain . |
AuthorityID |
target.asset.asset_id |
El valor de AuthorityID del registro sin procesar se asigna al campo target.asset.asset_id , con el prefijo "AuthorityID:". |
AzDeploymentId |
security_result.detection_fields.value |
El valor de AzDeploymentId del registro sin procesar se asigna a un objeto security_result.detection_fields con key :AzDeploymentId . |
AzRoleId |
additional.fields.value.string_value |
El valor de AzRoleId del registro sin procesar se asigna a un objeto additional.fields con key :AzRole Id . |
AzRoleName |
target.user.attribute.roles.name |
El valor de AzRoleName del registro sin procesar se asigna al campo target.user.attribute.roles.name . |
ComputerFQDN |
principal.asset.network_domain |
El valor de ComputerFQDN del registro sin procesar se asigna al campo principal.asset.network_domain . |
ComputerID |
principal.asset.asset_id |
El valor de ComputerID del registro sin procesar se asigna al campo principal.asset.asset_id , con el prefijo "ComputerId:". |
ComputerName |
about.hostname |
El valor de ComputerName del registro sin procesar se asigna al campo about.hostname . |
CredentialId |
security_result.detection_fields.value |
El valor de CredentialId del registro sin procesar se asigna a un objeto security_result.detection_fields con key :Credential Id . |
DirectoryServiceName |
security_result.detection_fields.value |
El valor de DirectoryServiceName del registro sin procesar se asigna a un objeto security_result.detection_fields con key :Directory Service Name . |
DirectoryServiceNameLocalized |
security_result.detection_fields.value |
El valor de DirectoryServiceNameLocalized del registro sin procesar se asigna a un objeto security_result.detection_fields con key :Directory Service Name Localized . |
DirectoryServiceUuid |
security_result.detection_fields.value |
El valor de DirectoryServiceUuid del registro sin procesar se asigna a un objeto security_result.detection_fields con key :Directory Service Uuid . |
EventMessage |
security_result.summary |
El valor de EventMessage del registro sin procesar se asigna al campo security_result.summary . |
EventType |
metadata.product_event_type |
El valor de EventType del registro sin procesar se asigna al campo metadata.product_event_type . También se usa para determinar el metadata.event_type . |
FailReason |
security_result.summary |
El valor de FailReason del registro sin procesar se asigna al campo security_result.summary cuando está presente. |
FailUserName |
target.user.email_addresses |
El valor de FailUserName del registro sin procesar se asigna al campo target.user.email_addresses cuando está presente. |
FromIPAddress |
principal.ip |
El valor de FromIPAddress del registro sin procesar se asigna al campo principal.ip . |
ID |
security_result.detection_fields.value |
El valor de ID del registro sin procesar se asigna a un objeto security_result.detection_fields con key :ID . |
InternalTrackingID |
metadata.product_log_id |
El valor de InternalTrackingID del registro sin procesar se asigna al campo metadata.product_log_id . |
JumpType |
additional.fields.value.string_value |
El valor de JumpType del registro sin procesar se asigna a un objeto additional.fields con key :Jump Type . |
NormalizedUser |
target.user.email_addresses |
El valor de NormalizedUser del registro sin procesar se asigna al campo target.user.email_addresses . |
OperationMode |
additional.fields.value.string_value |
El valor de OperationMode del registro sin procesar se asigna a un objeto additional.fields con key :Operation Mode . |
ProxyId |
security_result.detection_fields.value |
El valor de ProxyId del registro sin procesar se asigna a un objeto security_result.detection_fields con key :Proxy Id . |
RequestUserAgent |
network.http.user_agent |
El valor de RequestUserAgent del registro sin procesar se asigna al campo network.http.user_agent . |
SessionGuid |
network.session_id |
El valor de SessionGuid del registro sin procesar se asigna al campo network.session_id . |
Tenant |
additional.fields.value.string_value |
El valor de Tenant del registro sin procesar se asigna a un objeto additional.fields con key :Tenant . |
ThreadType |
additional.fields.value.string_value |
El valor de ThreadType del registro sin procesar se asigna a un objeto additional.fields con key :Thread Type . |
UserType |
principal.user.attribute.roles.name |
El valor de UserType del registro sin procesar se asigna al campo principal.user.attribute.roles.name . |
WhenOccurred |
metadata.event_timestamp |
El valor de WhenOccurred del registro sin procesar se analiza y se asigna al campo metadata.event_timestamp . Este campo también rellena el campo timestamp de nivel superior. Valor codificado "SSO". Determinado por el campo EventType . El valor predeterminado es STATUS_UPDATE si EventType no está presente o no coincide con ningún criterio específico. Puede ser USER_LOGIN , USER_CREATION , USER_RESOURCE_ACCESS , USER_LOGOUT o USER_CHANGE_PASSWORD . Valor codificado "CENTRIFY_SSO". Valor codificado "SSO". Valor codificado "Centrify". Si el campo message contiene un ID de sesión, se extrae y se usa. De lo contrario, el valor predeterminado es "1". Se extrae del campo host , si está disponible, que procede del encabezado syslog. Se extrae del campo pid , si está disponible, que procede del encabezado syslog. Si UserGuid está presente, se usa su valor. De lo contrario, si el campo message contiene un ID de usuario, se extrae y se usa. Se debe asignar el valor "ALLOW" si Level es "Info" y "BLOCK" si FailReason está presente. Se asigna el valor "AUTH_VIOLATION" si FailReason está presente. Determinado por el campo Level . Se asigna el valor "INFORMATIONAL" si Level es "Info", "MEDIUM" si Level es "Warning" y "ERROR" si Level es "Error". |
¿Necesitas más ayuda? Recibe respuestas de los miembros de la comunidad y de los profesionales de Google SecOps.