Recopilar registros de SSO de Delinea

Disponible en:

En este documento se explica cómo ingerir registros de inicio de sesión único (SSO) de Delinea (antes Centrify) en Google Security Operations mediante Amazon S3. El analizador extrae los registros y gestiona los formatos JSON y syslog. Analiza pares clave-valor, marcas de tiempo y otros campos relevantes, y los asigna al modelo de datos unificado, con una lógica específica para gestionar los fallos de inicio de sesión, los agentes de usuario, los niveles de gravedad, los mecanismos de autenticación y varios tipos de eventos. Prioriza FailUserName sobre NormalizedUser para las direcciones de correo de destino en los eventos de error.

Antes de empezar

Asegúrate de que cumples los siguientes requisitos previos:

  • Una instancia de Google SecOps.
  • Acceso privilegiado al arrendatario de Delinea (Centrify) SSO.
  • Acceso privilegiado a AWS (S3, Gestión de Identidades y Accesos [IAM], Lambda y EventBridge).

Recopilar los requisitos previos de SSO de Delinea (Centrify) (IDs, claves de API, IDs de organización y tokens)

  1. Inicia sesión en el portal de administración de Delinea.
  2. Ve a Aplicaciones > Añadir aplicaciones.
  3. Busca Cliente de OAuth2 y haz clic en Añadir.
  4. Haz clic en en el cuadro de diálogo Añadir aplicación web.
  5. Haz clic en Cerrar en el cuadro de diálogo Añadir aplicaciones web.
  6. En la página Configuración de la aplicación, configure lo siguiente:
    • Pestaña General:
      • ID de aplicación: introduzca un identificador único (por ejemplo, secops-oauth-client).
      • Nombre de la aplicación: introduce un nombre descriptivo (por ejemplo, SecOps Data Export).
      • Descripción de la aplicación: introduce una descripción (por ejemplo, OAuth client for exporting audit events to SecOps).
    • Pestaña Confianza:
      • La solicitud es confidencial: marca esta opción.
      • Tipo de ID de cliente: selecciona Confidencial.
      • ID de cliente emitido: copia y guarda este valor.
      • Secreto de cliente emitido: copia y guarda este valor.
    • Pestaña Tokens:
      • Métodos de autenticación: selecciona Credenciales de cliente.
      • Tipo de token: selecciona Jwt RS256.
    • Pestaña Ámbito:
      • Añade el ámbito siem con la descripción SIEM Integration Access (Acceso a la integración de SIEM).
      • Añade el ámbito redrock/query con la descripción Acceso a la API Query.
  7. Haz clic en Guardar para crear el cliente de OAuth.
  8. Ve a Servicios principales > Usuarios > Añadir usuario.
  9. Configura el usuario de servicio:
    • Nombre de inicio de sesión: introduce el ID de cliente del paso 6.
    • Dirección de correo electrónico: introduce una dirección de correo válida (campo obligatorio).
    • Nombre visible: introduce un nombre descriptivo (por ejemplo, SecOps Service User).
    • Contraseña y Confirmar contraseña: introduce el valor de Client Secret del paso 6.
    • Estado: selecciona Es un cliente confidencial de OAuth.
  10. Haz clic en Create User (Crear usuario).
  11. Ve a Acceso > Roles y asigna al usuario de servicio un rol con los permisos adecuados para consultar eventos de auditoría.
  12. Copia y guarda en un lugar seguro los siguientes detalles:
    • URL de arrendatario: la URL de su arrendatario de Centrify (por ejemplo, https://yourtenant.my.centrify.com)
    • ID de cliente: del paso 6
    • Secreto de cliente: del paso 6
    • ID de aplicación OAuth: en la configuración de la aplicación

Configurar un segmento de AWS S3 y IAM para Google SecOps

  1. Crea un segmento de Amazon S3 siguiendo esta guía de usuario: Crear un segmento.
  2. Guarda el nombre y la región del segmento para consultarlos más adelante (por ejemplo, delinea-centrify-logs-bucket).
  3. Crea un usuario siguiendo esta guía: Crear un usuario de gestión de identidades y accesos.
  4. Selecciona el Usuario creado.
  5. Selecciona la pestaña Credenciales de seguridad.
  6. En la sección Claves de acceso, haz clic en Crear clave de acceso.
  7. Selecciona Servicio de terceros en Caso práctico.
  8. Haz clic en Siguiente.
  9. Opcional: añade una etiqueta de descripción.
  10. Haz clic en Crear clave de acceso.
  11. Haz clic en Descargar archivo .CSV para guardar la clave de acceso y la clave de acceso secreta para futuras consultas.
  12. Haz clic en Listo.
  13. Selecciona la pestaña Permisos.
  14. En la sección Políticas de permisos, haz clic en Añadir permisos.
  15. Selecciona Añadir permisos.
  16. Seleccione Adjuntar políticas directamente.
  17. Busca la política AmazonS3FullAccess.
  18. Selecciona la política.
  19. Haz clic en Siguiente.
  20. Haz clic en Añadir permisos.

Configurar la política y el rol de gestión de identidades y accesos para las subidas de S3

  1. En la consola de AWS, ve a IAM > Políticas.
  2. Haz clic en Crear política > pestaña JSON.
  3. Copia y pega la siguiente política.
  4. JSON de la política (sustituye delinea-centrify-logs-bucket si has introducido otro nombre de contenedor):

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": "s3:GetObject",
          "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket/centrify-sso-logs/state.json"
        }
      ]
    }
    
  5. Haz clic en Siguiente > Crear política.

  6. Ve a IAM > Roles.

  7. Haga clic en Crear rol > Servicio de AWS > Lambda.

  8. Adjunta la política que acabas de crear y la política gestionada AWSLambdaBasicExecutionRole (para el registro de CloudWatch).

  9. Dale el nombre CentrifySSOLogExportRole al rol y haz clic en Crear rol.

Crear la función Lambda

  1. En la consola de AWS, ve a Lambda > Funciones > Crear función.
  2. Haz clic en Crear desde cero.
  3. Proporciona los siguientes detalles de configuración:

    Ajuste Valor
    Nombre CentrifySSOLogExport
    Tiempo de ejecución Python 3.13
    Arquitectura x86_64
    Rol de ejecución CentrifySSOLogExportRole
  4. Una vez creada la función, abra la pestaña Código, elimine el stub y pegue el siguiente código (CentrifySSOLogExport.py).

    import json
    import boto3
    import requests
    import base64
    from datetime import datetime, timedelta
    import os
    from typing import Dict, List, Optional
    
    def lambda_handler(event, context):
        """
        Lambda function to fetch Delinea Centrify SSO audit events and store them in S3
        """
    
        # Environment variables
        S3_BUCKET = os.environ['S3_BUCKET']
        S3_PREFIX = os.environ['S3_PREFIX']
        STATE_KEY = os.environ['STATE_KEY']
    
        # Centrify API credentials
        TENANT_URL = os.environ['TENANT_URL']
        CLIENT_ID = os.environ['CLIENT_ID']
        CLIENT_SECRET = os.environ['CLIENT_SECRET']
        OAUTH_APP_ID = os.environ['OAUTH_APP_ID']
    
        # Optional parameters
        PAGE_SIZE = int(os.environ.get('PAGE_SIZE', '1000'))
        MAX_PAGES = int(os.environ.get('MAX_PAGES', '10'))
    
        s3_client = boto3.client('s3')
    
        try:
            # Get last execution state
            last_timestamp = get_last_state(s3_client, S3_BUCKET, STATE_KEY)
    
            # Get OAuth access token
            access_token = get_oauth_token(TENANT_URL, CLIENT_ID, CLIENT_SECRET, OAUTH_APP_ID)
    
            # Fetch audit events
            events = fetch_audit_events(TENANT_URL, access_token, last_timestamp, PAGE_SIZE, MAX_PAGES)
    
            if events:
                # Store events in S3
                current_timestamp = datetime.utcnow()
                filename = f"{S3_PREFIX}centrify-sso-events-{current_timestamp.strftime('%Y%m%d_%H%M%S')}.json"
    
                store_events_to_s3(s3_client, S3_BUCKET, filename, events)
    
                # Update state with latest timestamp
                latest_timestamp = get_latest_event_timestamp(events)
                update_state(s3_client, S3_BUCKET, STATE_KEY, latest_timestamp)
    
                print(f"Successfully processed {len(events)} events and stored to {filename}")
            else:
                print("No new events found")
    
            return {
                'statusCode': 200,
                'body': json.dumps(f'Successfully processed {len(events) if events else 0} events')
            }
    
        except Exception as e:
            print(f"Error processing Centrify SSO logs: {str(e)}")
            return {
                'statusCode': 500,
                'body': json.dumps(f'Error: {str(e)}')
            }
    
    def get_oauth_token(tenant_url: str, client_id: str, client_secret: str, oauth_app_id: str) -> str:
        """
        Get OAuth access token using client credentials flow
        """
    
        # Create basic auth token
        credentials = f"{client_id}:{client_secret}"
        basic_auth = base64.b64encode(credentials.encode()).decode()
    
        token_url = f"{tenant_url}/oauth2/token/{oauth_app_id}"
    
        headers = {
            'Authorization': f'Basic {basic_auth}',
            'X-CENTRIFY-NATIVE-CLIENT': 'True',
            'Content-Type': 'application/x-www-form-urlencoded'
        }
    
        data = {
            'grant_type': 'client_credentials',
            'scope': 'siem redrock/query'
        }
    
        response = requests.post(token_url, headers=headers, data=data)
        response.raise_for_status()
    
        token_data = response.json()
        return token_data['access_token']
    
    def fetch_audit_events(tenant_url: str, access_token: str, last_timestamp: str, page_size: int, max_pages: int) -> List[Dict]:
        """
        Fetch audit events from Centrify using the Redrock/query API
        """
    
        query_url = f"{tenant_url}/Redrock/query"
    
        headers = {
            'Authorization': f'Bearer {access_token}',
            'X-CENTRIFY-NATIVE-CLIENT': 'True',
            'Content-Type': 'application/json'
        }
    
        # Build SQL query with timestamp filter
        if last_timestamp:
            sql_query = f"Select * from Event where WhenOccurred > '{last_timestamp}' ORDER BY WhenOccurred ASC"
        else:
            # First run - get events from last 24 hours
            sql_query = "Select * from Event where WhenOccurred > datefunc('now', '-1') ORDER BY WhenOccurred ASC"
    
        payload = {
            "Script": sql_query,
            "args": {
                "PageSize": page_size,
                "Limit": page_size * max_pages,
                "Caching": -1
            }
        }
    
        response = requests.post(query_url, headers=headers, json=payload)
        response.raise_for_status()
    
        response_data = response.json()
    
        if not response_data.get('success', False):
            raise Exception(f"API query failed: {response_data.get('Message', 'Unknown error')}")
    
        # Parse the response
        result = response_data.get('Result', {})
        columns = {col['Name']: i for i, col in enumerate(result.get('Columns', []))}
        raw_results = result.get('Results', [])
    
        events = []
        for raw_event in raw_results:
            event = {}
            row_data = raw_event.get('Row', {})
    
            # Map column names to values
            for col_name, col_index in columns.items():
                if col_name in row_data and row_data[col_name] is not None:
                    event[col_name] = row_data[col_name]
    
            # Add metadata
            event['_source'] = 'centrify_sso'
            event['_collected_at'] = datetime.utcnow().isoformat() + 'Z'
    
            events.append(event)
    
        return events
    
    def get_last_state(s3_client, bucket: str, state_key: str) -> Optional[str]:
        """
        Get the last processed timestamp from S3 state file
        """
        try:
            response = s3_client.get_object(Bucket=bucket, Key=state_key)
            state_data = json.loads(response['Body'].read().decode('utf-8'))
            return state_data.get('last_timestamp')
        except s3_client.exceptions.NoSuchKey:
            print("No previous state found, starting from 24 hours ago")
            return None
        except Exception as e:
            print(f"Error reading state: {e}")
            return None
    
    def update_state(s3_client, bucket: str, state_key: str, timestamp: str):
        """
        Update the state file with the latest processed timestamp
        """
        state_data = {
            'last_timestamp': timestamp,
            'updated_at': datetime.utcnow().isoformat() + 'Z'
        }
    
        s3_client.put_object(
            Bucket=bucket,
            Key=state_key,
            Body=json.dumps(state_data),
            ContentType='application/json'
        )
    
    def store_events_to_s3(s3_client, bucket: str, key: str, events: List[Dict]):
        """
        Store events as JSONL (one JSON object per line) in S3
        """
        # Convert to JSONL format (one JSON object per line)
        jsonl_content = 'n'.join(json.dumps(event, default=str) for event in events)
    
        s3_client.put_object(
            Bucket=bucket,
            Key=key,
            Body=jsonl_content,
            ContentType='application/x-ndjson'
        )
    
    def get_latest_event_timestamp(events: List[Dict]) -> str:
        """
        Get the latest timestamp from the events for state tracking
        """
        if not events:
            return datetime.utcnow().isoformat() + 'Z'
    
        latest = None
        for event in events:
            when_occurred = event.get('WhenOccurred')
            if when_occurred:
                if latest is None or when_occurred > latest:
                    latest = when_occurred
    
        return latest or datetime.utcnow().isoformat() + 'Z'
    
  5. Vaya a Configuración > Variables de entorno.

  6. Haz clic en Editar > Añadir nueva variable de entorno.

  7. Introduce las variables de entorno que se indican en la siguiente tabla y sustituye los valores de ejemplo por los tuyos.

    Variables de entorno

    Clave Valor de ejemplo
    S3_BUCKET delinea-centrify-logs-bucket
    S3_PREFIX centrify-sso-logs/
    STATE_KEY centrify-sso-logs/state.json
    TENANT_URL https://yourtenant.my.centrify.com
    CLIENT_ID your-client-id
    CLIENT_SECRET your-client-secret
    OAUTH_APP_ID your-oauth-application-id
    OAUTH_SCOPE siem
    PAGE_SIZE 1000
    MAX_PAGES 10
  8. Una vez creada la función, permanece en su página (o abre Lambda > Funciones > tu-función).

  9. Seleccione la pestaña Configuración.

  10. En el panel Configuración general, haz clic en Editar.

  11. Cambia Tiempo de espera a 5 minutos (300 segundos) y haz clic en Guardar.

Crear una programación de EventBridge

  1. Ve a Amazon EventBridge > Scheduler > Create schedule (Amazon EventBridge > Programador > Crear programación).
  2. Proporcione los siguientes detalles de configuración:
    • Programación periódica: Precio (1 hour).
    • Destino: tu función Lambda CentrifySSOLogExport.
    • Nombre: CentrifySSOLogExport-1h.
  3. Haz clic en Crear programación.

(Opcional) Crear un usuario y claves de IAM de solo lectura para Google SecOps

  1. En la consola de AWS, ve a IAM > Usuarios.
  2. Haz clic en Add users (Añadir usuarios).
  3. Proporcione los siguientes detalles de configuración:
    • Usuario: introduce secops-reader.
    • Tipo de acceso: selecciona Clave de acceso – Acceso programático.
  4. Haz clic en Crear usuario.
  5. Asigna una política de lectura mínima (personalizada): Usuarios > lector-secops > Permisos.
  6. Haz clic en Añadir permisos > Adjuntar políticas directamente.
  7. Selecciona Crear política.
  8. JSON:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket"
        }
      ]
    }
    
  9. Nombre = secops-reader-policy.

  10. Haz clic en Crear política > busca o selecciona > Siguiente.

  11. Haz clic en Añadir permisos.

  12. Crea una clave de acceso para secops-reader: Credenciales de seguridad > Claves de acceso.

  13. Haz clic en Crear clave de acceso.

  14. Descarga la .CSV. (Estos valores se pegarán en el feed).

Configurar un feed en Google SecOps para ingerir registros de SSO de Delinea (Centrify)

  1. Ve a Configuración de SIEM > Feeds.
  2. Haz clic en + Añadir nuevo feed.
  3. En el campo Nombre del feed, introduce un nombre para el feed (por ejemplo, Delinea Centrify SSO logs).
  4. Selecciona Amazon S3 V2 como Tipo de fuente.
  5. Seleccione Centrify como Tipo de registro.
  6. Haz clic en Siguiente.
  7. Especifique los valores de los siguientes parámetros de entrada:
    • URI de S3: s3://delinea-centrify-logs-bucket/centrify-sso-logs/
    • Opciones de eliminación de la fuente: selecciona la opción de eliminación que prefieras.
    • Antigüedad máxima del archivo: incluye los archivos modificados en los últimos días. El valor predeterminado es 180 días.
    • ID de clave de acceso: clave de acceso de usuario con acceso al bucket de S3.
    • Clave de acceso secreta: clave secreta del usuario con acceso al bucket de S3.
    • Espacio de nombres de recursos: el espacio de nombres de recursos.
    • Etiquetas de ingestión: la etiqueta aplicada a los eventos de este feed.
  8. Haz clic en Siguiente.
  9. Revise la configuración de la nueva fuente en la pantalla Finalizar y, a continuación, haga clic en Enviar.

Tabla de asignación de UDM

Campo de registro Asignación de UDM Lógica
AccountID security_result.detection_fields.value El valor de AccountID del registro sin procesar se asigna a un objeto security_result.detection_fields con key:Account ID.
ApplicationName target.application El valor de ApplicationName del registro sin procesar se asigna al campo target.application.
AuthorityFQDN target.asset.network_domain El valor de AuthorityFQDN del registro sin procesar se asigna al campo target.asset.network_domain.
AuthorityID target.asset.asset_id El valor de AuthorityID del registro sin procesar se asigna al campo target.asset.asset_id, con el prefijo "AuthorityID:".
AzDeploymentId security_result.detection_fields.value El valor de AzDeploymentId del registro sin procesar se asigna a un objeto security_result.detection_fields con key:AzDeploymentId.
AzRoleId additional.fields.value.string_value El valor de AzRoleId del registro sin procesar se asigna a un objeto additional.fields con key:AzRole Id.
AzRoleName target.user.attribute.roles.name El valor de AzRoleName del registro sin procesar se asigna al campo target.user.attribute.roles.name.
ComputerFQDN principal.asset.network_domain El valor de ComputerFQDN del registro sin procesar se asigna al campo principal.asset.network_domain.
ComputerID principal.asset.asset_id El valor de ComputerID del registro sin procesar se asigna al campo principal.asset.asset_id, con el prefijo "ComputerId:".
ComputerName about.hostname El valor de ComputerName del registro sin procesar se asigna al campo about.hostname.
CredentialId security_result.detection_fields.value El valor de CredentialId del registro sin procesar se asigna a un objeto security_result.detection_fields con key:Credential Id.
DirectoryServiceName security_result.detection_fields.value El valor de DirectoryServiceName del registro sin procesar se asigna a un objeto security_result.detection_fields con key:Directory Service Name.
DirectoryServiceNameLocalized security_result.detection_fields.value El valor de DirectoryServiceNameLocalized del registro sin procesar se asigna a un objeto security_result.detection_fields con key:Directory Service Name Localized.
DirectoryServiceUuid security_result.detection_fields.value El valor de DirectoryServiceUuid del registro sin procesar se asigna a un objeto security_result.detection_fields con key:Directory Service Uuid.
EventMessage security_result.summary El valor de EventMessage del registro sin procesar se asigna al campo security_result.summary.
EventType metadata.product_event_type El valor de EventType del registro sin procesar se asigna al campo metadata.product_event_type. También se usa para determinar el metadata.event_type.
FailReason security_result.summary El valor de FailReason del registro sin procesar se asigna al campo security_result.summary cuando está presente.
FailUserName target.user.email_addresses El valor de FailUserName del registro sin procesar se asigna al campo target.user.email_addresses cuando está presente.
FromIPAddress principal.ip El valor de FromIPAddress del registro sin procesar se asigna al campo principal.ip.
ID security_result.detection_fields.value El valor de ID del registro sin procesar se asigna a un objeto security_result.detection_fields con key:ID.
InternalTrackingID metadata.product_log_id El valor de InternalTrackingID del registro sin procesar se asigna al campo metadata.product_log_id.
JumpType additional.fields.value.string_value El valor de JumpType del registro sin procesar se asigna a un objeto additional.fields con key:Jump Type.
NormalizedUser target.user.email_addresses El valor de NormalizedUser del registro sin procesar se asigna al campo target.user.email_addresses.
OperationMode additional.fields.value.string_value El valor de OperationMode del registro sin procesar se asigna a un objeto additional.fields con key:Operation Mode.
ProxyId security_result.detection_fields.value El valor de ProxyId del registro sin procesar se asigna a un objeto security_result.detection_fields con key:Proxy Id.
RequestUserAgent network.http.user_agent El valor de RequestUserAgent del registro sin procesar se asigna al campo network.http.user_agent.
SessionGuid network.session_id El valor de SessionGuid del registro sin procesar se asigna al campo network.session_id.
Tenant additional.fields.value.string_value El valor de Tenant del registro sin procesar se asigna a un objeto additional.fields con key:Tenant.
ThreadType additional.fields.value.string_value El valor de ThreadType del registro sin procesar se asigna a un objeto additional.fields con key:Thread Type.
UserType principal.user.attribute.roles.name El valor de UserType del registro sin procesar se asigna al campo principal.user.attribute.roles.name.
WhenOccurred metadata.event_timestamp El valor de WhenOccurred del registro sin procesar se analiza y se asigna al campo metadata.event_timestamp. Este campo también rellena el campo timestamp de nivel superior. Valor codificado "SSO". Determinado por el campo EventType. El valor predeterminado es STATUS_UPDATE si EventType no está presente o no coincide con ningún criterio específico. Puede ser USER_LOGIN, USER_CREATION, USER_RESOURCE_ACCESS, USER_LOGOUT o USER_CHANGE_PASSWORD. Valor codificado "CENTRIFY_SSO". Valor codificado "SSO". Valor codificado "Centrify". Si el campo message contiene un ID de sesión, se extrae y se usa. De lo contrario, el valor predeterminado es "1". Se extrae del campo host, si está disponible, que procede del encabezado syslog. Se extrae del campo pid, si está disponible, que procede del encabezado syslog. Si UserGuid está presente, se usa su valor. De lo contrario, si el campo message contiene un ID de usuario, se extrae y se usa. Se debe asignar el valor "ALLOW" si Level es "Info" y "BLOCK" si FailReason está presente. Se asigna el valor "AUTH_VIOLATION" si FailReason está presente. Determinado por el campo Level. Se asigna el valor "INFORMATIONAL" si Level es "Info", "MEDIUM" si Level es "Warning" y "ERROR" si Level es "Error".

¿Necesitas más ayuda? Recibe respuestas de los miembros de la comunidad y de los profesionales de Google SecOps.