Collecter les journaux Delinea SSO

Compatible avec :

Ce document explique comment ingérer les journaux d'authentification unique (SSO) Delinea (anciennement Centrify) dans Google Security Operations à l'aide d'Amazon S3. L'analyseur extrait les journaux, en gérant les formats JSON et syslog. Il analyse les paires clé/valeur, les codes temporels et d'autres champs pertinents, en les mappant au modèle UDM, avec une logique spécifique pour gérer les échecs de connexion, les user-agents, les niveaux de gravité, les mécanismes d'authentification et divers types d'événements. Il donne la priorité à FailUserName sur NormalizedUser pour les adresses e-mail cibles dans les événements d'échec.

Avant de commencer

Assurez-vous de remplir les conditions suivantes :

  • Une instance Google SecOps.
  • Accès privilégié au locataire Delinea (Centrify) SSO.
  • Accès privilégié à AWS (S3, Identity and Access Management (IAM), Lambda, EventBridge).

Collecter les conditions préalables du SSO Delinea (Centrify) (ID, clés API, ID d'organisation, jetons)

  1. Connectez-vous au portail d'administration Delinea.
  2. Accédez à Applications > Ajouter des applications.
  3. Recherchez OAuth2 Client, puis cliquez sur Add (Ajouter).
  4. Cliquez sur Oui dans la boîte de dialogue Ajouter une application Web.
  5. Cliquez sur Fermer dans la boîte de dialogue Ajouter des applications Web.
  6. Sur la page Application Configuration, configurez les éléments suivants :
    • Onglet Général :
      • ID de l'application : saisissez un identifiant unique (par exemple, secops-oauth-client).
      • Nom de l'application : saisissez un nom descriptif (par exemple, SecOps Data Export).
      • Description de l'application : saisissez une description (par exemple, OAuth client for exporting audit events to SecOps).
    • Onglet Confiance :
      • L'application est confidentielle : cochez cette option.
      • Type d'ID client : sélectionnez Confidentiel.
      • ID client émis : copiez et enregistrez cette valeur.
      • Code secret du client émis : copiez et enregistrez cette valeur.
    • Onglet Jetons :
      • Méthodes d'authentification : sélectionnez Identifiants client.
      • Type de jeton : sélectionnez Jwt RS256.
    • Onglet Champ d'application :
      • Ajoutez le champ d'application siem avec la description SIEM Integration Access (Accès à l'intégration SIEM).
      • Ajoutez le champ d'application redrock/query avec la description Accès à l'API Query.
  7. Cliquez sur Enregistrer pour créer le client OAuth.
  8. Accédez à Services principaux > Utilisateurs > Ajouter un utilisateur.
  9. Configurez l'utilisateur du service :
    • Nom de connexion : saisissez l'ID client de l'étape 6.
    • Adresse e-mail : saisissez une adresse e-mail valide (champ obligatoire).
    • Nom à afficher : saisissez un nom descriptif (par exemple, SecOps Service User).
    • Mot de passe et Confirmer le mot de passe : saisissez le code secret client de l'étape 6.
    • État : sélectionnez Est un client OAuth confidentiel.
  10. Cliquez sur Créer un utilisateur.
  11. Accédez à Accès > Rôles et attribuez au compte de service un rôle disposant des autorisations appropriées pour interroger les événements d'audit.
  12. Copiez et enregistrez les informations suivantes dans un emplacement sécurisé :
    • URL du locataire : URL de votre locataire Centrify (par exemple, https://yourtenant.my.centrify.com)
    • ID client : à l'étape 6
    • Code secret du client : à partir de l'étape 6
    • ID d'application OAuth : à partir de la configuration de l'application

Configurer un bucket AWS S3 et IAM pour Google SecOps

  1. Créez un bucket Amazon S3 en suivant ce guide de l'utilisateur : Créer un bucket.
  2. Enregistrez le Nom et la Région du bucket pour référence ultérieure (par exemple, delinea-centrify-logs-bucket).
  3. Créez un utilisateur en suivant ce guide de l'utilisateur : Créer un utilisateur IAM.
  4. Sélectionnez l'utilisateur créé.
  5. Sélectionnez l'onglet Informations d'identification de sécurité.
  6. Cliquez sur Créer une clé d'accès dans la section Clés d'accès.
  7. Sélectionnez Service tiers comme Cas d'utilisation.
  8. Cliquez sur Suivant.
  9. Facultatif : Ajoutez un tag de description.
  10. Cliquez sur Créer une clé d'accès.
  11. Cliquez sur Télécharger le fichier CSV pour enregistrer la clé d'accès et la clé d'accès secrète pour référence ultérieure.
  12. Cliquez sur OK.
  13. Sélectionnez l'onglet Autorisations.
  14. Cliquez sur Ajouter des autorisations dans la section Règles relatives aux autorisations.
  15. Sélectionnez Ajouter des autorisations.
  16. Sélectionnez Joindre directement des règles.
  17. Recherchez la règle AmazonS3FullAccess.
  18. Sélectionnez la règle.
  19. Cliquez sur Suivant.
  20. Cliquez sur Ajouter des autorisations.

Configurer la stratégie et le rôle IAM pour les importations S3

  1. Dans la console AWS, accédez à IAM > Stratégies.
  2. Cliquez sur Créer une règle > onglet JSON.
  3. Copiez et collez le règlement suivant.
  4. JSON de la règle (remplacez delinea-centrify-logs-bucket si vous avez saisi un autre nom de bucket) :

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": "s3:GetObject",
          "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket/centrify-sso-logs/state.json"
        }
      ]
    }
    
  5. Cliquez sur Suivant > Créer une règle.

  6. Accédez à IAM > Rôles.

  7. Cliquez sur Créer un rôle > Service AWS > Lambda.

  8. Associez la stratégie que vous venez de créer et la stratégie gérée AWSLambdaBasicExecutionRole (pour la journalisation CloudWatch).

  9. Nommez le rôle CentrifySSOLogExportRole, puis cliquez sur Créer un rôle.

Créer la fonction Lambda

  1. Dans la console AWS, accédez à Lambda > Fonctions > Créer une fonction.
  2. Cliquez sur Créer à partir de zéro.
  3. Fournissez les informations de configuration suivantes :

    Paramètre Valeur
    Nom CentrifySSOLogExport
    Durée d'exécution Python 3.13
    Architecture x86_64
    Rôle d'exécution CentrifySSOLogExportRole
  4. Une fois la fonction créée, ouvrez l'onglet Code, supprimez le stub et collez le code suivant (CentrifySSOLogExport.py).

    import json
    import boto3
    import requests
    import base64
    from datetime import datetime, timedelta
    import os
    from typing import Dict, List, Optional
    
    def lambda_handler(event, context):
        """
        Lambda function to fetch Delinea Centrify SSO audit events and store them in S3
        """
    
        # Environment variables
        S3_BUCKET = os.environ['S3_BUCKET']
        S3_PREFIX = os.environ['S3_PREFIX']
        STATE_KEY = os.environ['STATE_KEY']
    
        # Centrify API credentials
        TENANT_URL = os.environ['TENANT_URL']
        CLIENT_ID = os.environ['CLIENT_ID']
        CLIENT_SECRET = os.environ['CLIENT_SECRET']
        OAUTH_APP_ID = os.environ['OAUTH_APP_ID']
    
        # Optional parameters
        PAGE_SIZE = int(os.environ.get('PAGE_SIZE', '1000'))
        MAX_PAGES = int(os.environ.get('MAX_PAGES', '10'))
    
        s3_client = boto3.client('s3')
    
        try:
            # Get last execution state
            last_timestamp = get_last_state(s3_client, S3_BUCKET, STATE_KEY)
    
            # Get OAuth access token
            access_token = get_oauth_token(TENANT_URL, CLIENT_ID, CLIENT_SECRET, OAUTH_APP_ID)
    
            # Fetch audit events
            events = fetch_audit_events(TENANT_URL, access_token, last_timestamp, PAGE_SIZE, MAX_PAGES)
    
            if events:
                # Store events in S3
                current_timestamp = datetime.utcnow()
                filename = f"{S3_PREFIX}centrify-sso-events-{current_timestamp.strftime('%Y%m%d_%H%M%S')}.json"
    
                store_events_to_s3(s3_client, S3_BUCKET, filename, events)
    
                # Update state with latest timestamp
                latest_timestamp = get_latest_event_timestamp(events)
                update_state(s3_client, S3_BUCKET, STATE_KEY, latest_timestamp)
    
                print(f"Successfully processed {len(events)} events and stored to {filename}")
            else:
                print("No new events found")
    
            return {
                'statusCode': 200,
                'body': json.dumps(f'Successfully processed {len(events) if events else 0} events')
            }
    
        except Exception as e:
            print(f"Error processing Centrify SSO logs: {str(e)}")
            return {
                'statusCode': 500,
                'body': json.dumps(f'Error: {str(e)}')
            }
    
    def get_oauth_token(tenant_url: str, client_id: str, client_secret: str, oauth_app_id: str) -> str:
        """
        Get OAuth access token using client credentials flow
        """
    
        # Create basic auth token
        credentials = f"{client_id}:{client_secret}"
        basic_auth = base64.b64encode(credentials.encode()).decode()
    
        token_url = f"{tenant_url}/oauth2/token/{oauth_app_id}"
    
        headers = {
            'Authorization': f'Basic {basic_auth}',
            'X-CENTRIFY-NATIVE-CLIENT': 'True',
            'Content-Type': 'application/x-www-form-urlencoded'
        }
    
        data = {
            'grant_type': 'client_credentials',
            'scope': 'siem redrock/query'
        }
    
        response = requests.post(token_url, headers=headers, data=data)
        response.raise_for_status()
    
        token_data = response.json()
        return token_data['access_token']
    
    def fetch_audit_events(tenant_url: str, access_token: str, last_timestamp: str, page_size: int, max_pages: int) -> List[Dict]:
        """
        Fetch audit events from Centrify using the Redrock/query API
        """
    
        query_url = f"{tenant_url}/Redrock/query"
    
        headers = {
            'Authorization': f'Bearer {access_token}',
            'X-CENTRIFY-NATIVE-CLIENT': 'True',
            'Content-Type': 'application/json'
        }
    
        # Build SQL query with timestamp filter
        if last_timestamp:
            sql_query = f"Select * from Event where WhenOccurred > '{last_timestamp}' ORDER BY WhenOccurred ASC"
        else:
            # First run - get events from last 24 hours
            sql_query = "Select * from Event where WhenOccurred > datefunc('now', '-1') ORDER BY WhenOccurred ASC"
    
        payload = {
            "Script": sql_query,
            "args": {
                "PageSize": page_size,
                "Limit": page_size * max_pages,
                "Caching": -1
            }
        }
    
        response = requests.post(query_url, headers=headers, json=payload)
        response.raise_for_status()
    
        response_data = response.json()
    
        if not response_data.get('success', False):
            raise Exception(f"API query failed: {response_data.get('Message', 'Unknown error')}")
    
        # Parse the response
        result = response_data.get('Result', {})
        columns = {col['Name']: i for i, col in enumerate(result.get('Columns', []))}
        raw_results = result.get('Results', [])
    
        events = []
        for raw_event in raw_results:
            event = {}
            row_data = raw_event.get('Row', {})
    
            # Map column names to values
            for col_name, col_index in columns.items():
                if col_name in row_data and row_data[col_name] is not None:
                    event[col_name] = row_data[col_name]
    
            # Add metadata
            event['_source'] = 'centrify_sso'
            event['_collected_at'] = datetime.utcnow().isoformat() + 'Z'
    
            events.append(event)
    
        return events
    
    def get_last_state(s3_client, bucket: str, state_key: str) -> Optional[str]:
        """
        Get the last processed timestamp from S3 state file
        """
        try:
            response = s3_client.get_object(Bucket=bucket, Key=state_key)
            state_data = json.loads(response['Body'].read().decode('utf-8'))
            return state_data.get('last_timestamp')
        except s3_client.exceptions.NoSuchKey:
            print("No previous state found, starting from 24 hours ago")
            return None
        except Exception as e:
            print(f"Error reading state: {e}")
            return None
    
    def update_state(s3_client, bucket: str, state_key: str, timestamp: str):
        """
        Update the state file with the latest processed timestamp
        """
        state_data = {
            'last_timestamp': timestamp,
            'updated_at': datetime.utcnow().isoformat() + 'Z'
        }
    
        s3_client.put_object(
            Bucket=bucket,
            Key=state_key,
            Body=json.dumps(state_data),
            ContentType='application/json'
        )
    
    def store_events_to_s3(s3_client, bucket: str, key: str, events: List[Dict]):
        """
        Store events as JSONL (one JSON object per line) in S3
        """
        # Convert to JSONL format (one JSON object per line)
        jsonl_content = 'n'.join(json.dumps(event, default=str) for event in events)
    
        s3_client.put_object(
            Bucket=bucket,
            Key=key,
            Body=jsonl_content,
            ContentType='application/x-ndjson'
        )
    
    def get_latest_event_timestamp(events: List[Dict]) -> str:
        """
        Get the latest timestamp from the events for state tracking
        """
        if not events:
            return datetime.utcnow().isoformat() + 'Z'
    
        latest = None
        for event in events:
            when_occurred = event.get('WhenOccurred')
            if when_occurred:
                if latest is None or when_occurred > latest:
                    latest = when_occurred
    
        return latest or datetime.utcnow().isoformat() + 'Z'
    
  5. Accédez à Configuration > Variables d'environnement.

  6. Cliquez sur Modifier > Ajouter une variable d'environnement.

  7. Saisissez les variables d'environnement fournies dans le tableau suivant, en remplaçant les exemples de valeurs par les vôtres.

    Variables d'environnement

    Clé Exemple de valeur
    S3_BUCKET delinea-centrify-logs-bucket
    S3_PREFIX centrify-sso-logs/
    STATE_KEY centrify-sso-logs/state.json
    TENANT_URL https://yourtenant.my.centrify.com
    CLIENT_ID your-client-id
    CLIENT_SECRET your-client-secret
    OAUTH_APP_ID your-oauth-application-id
    OAUTH_SCOPE siem
    PAGE_SIZE 1000
    MAX_PAGES 10
  8. Une fois la fonction créée, restez sur sa page (ou ouvrez Lambda > Fonctions > votre-fonction).

  9. Accédez à l'onglet Configuration.

  10. Dans le panneau Configuration générale, cliquez sur Modifier.

  11. Définissez le délai avant expiration sur 5 minutes (300 secondes), puis cliquez sur Enregistrer.

Créer une programmation EventBridge

  1. Accédez à Amazon EventBridge> Scheduler> Create schedule.
  2. Fournissez les informations de configuration suivantes :
    • Planning récurrent : Tarif (1 hour).
    • Cible : votre fonction Lambda CentrifySSOLogExport.
    • Nom : CentrifySSOLogExport-1h.
  3. Cliquez sur Créer la programmation.

(Facultatif) Créez un utilisateur et des clés IAM en lecture seule pour Google SecOps

  1. Dans la console AWS, accédez à IAM > Utilisateurs.
  2. Cliquez sur Add users (Ajouter des utilisateurs).
  3. Fournissez les informations de configuration suivantes :
    • Utilisateur : saisissez secops-reader.
    • Type d'accès : sélectionnez Clé d'accès – Accès programmatique.
  4. Cliquez sur Créer un utilisateur.
  5. Associez une stratégie de lecture minimale (personnalisée) : Utilisateurs > secops-reader > Autorisations.
  6. Cliquez sur Ajouter des autorisations> Joindre directement des règles.
  7. Sélectionnez Créer une règle.
  8. JSON :

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket"
        }
      ]
    }
    
  9. Nom = secops-reader-policy.

  10. Cliquez sur Créer une règle > recherchez/sélectionnez > Suivant.

  11. Cliquez sur Ajouter des autorisations.

  12. Créez une clé d'accès pour secops-reader : Identifiants de sécurité > Clés d'accès.

  13. Cliquez sur Créer une clé d'accès.

  14. Téléchargez le fichier .CSV. (Vous collerez ces valeurs dans le flux.)

Configurer un flux dans Google SecOps pour ingérer les journaux SSO Delinea (Centrify)

  1. Accédez à Paramètres SIEM> Flux.
  2. Cliquez sur + Ajouter un flux.
  3. Dans le champ Nom du flux, saisissez un nom pour le flux (par exemple, Delinea Centrify SSO logs).
  4. Sélectionnez Amazon S3 V2 comme type de source.
  5. Sélectionnez Centrify comme type de journal.
  6. Cliquez sur Suivant.
  7. Spécifiez les valeurs des paramètres d'entrée suivants :
    • URI S3 : s3://delinea-centrify-logs-bucket/centrify-sso-logs/
    • Options de suppression de la source : sélectionnez l'option de suppression de votre choix.
    • Âge maximal des fichiers : incluez les fichiers modifiés au cours des derniers jours. La valeur par défaut est de 180 jours.
    • ID de clé d'accès : clé d'accès utilisateur ayant accès au bucket S3.
    • Clé d'accès secrète : clé secrète de l'utilisateur ayant accès au bucket S3.
    • Espace de noms de l'élément : espace de noms de l'élément.
    • Libellés d'ingestion : libellé appliqué aux événements de ce flux.
  8. Cliquez sur Suivant.
  9. Vérifiez la configuration de votre nouveau flux sur l'écran Finaliser, puis cliquez sur Envoyer.

Table de mappage UDM

Champ du journal Mappage UDM Logique
AccountID security_result.detection_fields.value La valeur de AccountID du journal brut est attribuée à un objet security_result.detection_fields avec key : Account ID.
ApplicationName target.application La valeur de ApplicationName du journal brut est attribuée au champ target.application.
AuthorityFQDN target.asset.network_domain La valeur de AuthorityFQDN du journal brut est attribuée au champ target.asset.network_domain.
AuthorityID target.asset.asset_id La valeur de AuthorityID du journal brut est attribuée au champ target.asset.asset_id, précédée de "AuthorityID:".
AzDeploymentId security_result.detection_fields.value La valeur de AzDeploymentId du journal brut est attribuée à un objet security_result.detection_fields avec key : AzDeploymentId.
AzRoleId additional.fields.value.string_value La valeur de AzRoleId du journal brut est attribuée à un objet additional.fields avec key : AzRole Id.
AzRoleName target.user.attribute.roles.name La valeur de AzRoleName du journal brut est attribuée au champ target.user.attribute.roles.name.
ComputerFQDN principal.asset.network_domain La valeur de ComputerFQDN du journal brut est attribuée au champ principal.asset.network_domain.
ComputerID principal.asset.asset_id La valeur de ComputerID du journal brut est attribuée au champ principal.asset.asset_id, précédée de "ComputerId:".
ComputerName about.hostname La valeur de ComputerName du journal brut est attribuée au champ about.hostname.
CredentialId security_result.detection_fields.value La valeur de CredentialId du journal brut est attribuée à un objet security_result.detection_fields avec key : Credential Id.
DirectoryServiceName security_result.detection_fields.value La valeur de DirectoryServiceName du journal brut est attribuée à un objet security_result.detection_fields avec key : Directory Service Name.
DirectoryServiceNameLocalized security_result.detection_fields.value La valeur de DirectoryServiceNameLocalized du journal brut est attribuée à un objet security_result.detection_fields avec key : Directory Service Name Localized.
DirectoryServiceUuid security_result.detection_fields.value La valeur de DirectoryServiceUuid du journal brut est attribuée à un objet security_result.detection_fields avec key : Directory Service Uuid.
EventMessage security_result.summary La valeur de EventMessage du journal brut est attribuée au champ security_result.summary.
EventType metadata.product_event_type La valeur de EventType du journal brut est attribuée au champ metadata.product_event_type. Il est également utilisé pour déterminer le metadata.event_type.
FailReason security_result.summary La valeur de FailReason du journal brut est attribuée au champ security_result.summary, le cas échéant.
FailUserName target.user.email_addresses La valeur de FailUserName du journal brut est attribuée au champ target.user.email_addresses, le cas échéant.
FromIPAddress principal.ip La valeur de FromIPAddress du journal brut est attribuée au champ principal.ip.
ID security_result.detection_fields.value La valeur de ID du journal brut est attribuée à un objet security_result.detection_fields avec key : ID.
InternalTrackingID metadata.product_log_id La valeur de InternalTrackingID du journal brut est attribuée au champ metadata.product_log_id.
JumpType additional.fields.value.string_value La valeur de JumpType du journal brut est attribuée à un objet additional.fields avec key : Jump Type.
NormalizedUser target.user.email_addresses La valeur de NormalizedUser du journal brut est attribuée au champ target.user.email_addresses.
OperationMode additional.fields.value.string_value La valeur de OperationMode du journal brut est attribuée à un objet additional.fields avec key : Operation Mode.
ProxyId security_result.detection_fields.value La valeur de ProxyId du journal brut est attribuée à un objet security_result.detection_fields avec key : Proxy Id.
RequestUserAgent network.http.user_agent La valeur de RequestUserAgent du journal brut est attribuée au champ network.http.user_agent.
SessionGuid network.session_id La valeur de SessionGuid du journal brut est attribuée au champ network.session_id.
Tenant additional.fields.value.string_value La valeur de Tenant du journal brut est attribuée à un objet additional.fields avec key : Tenant.
ThreadType additional.fields.value.string_value La valeur de ThreadType du journal brut est attribuée à un objet additional.fields avec key : Thread Type.
UserType principal.user.attribute.roles.name La valeur de UserType du journal brut est attribuée au champ principal.user.attribute.roles.name.
WhenOccurred metadata.event_timestamp La valeur de WhenOccurred du journal brut est analysée et attribuée au champ metadata.event_timestamp. Ce champ renseigne également le champ de niveau supérieur timestamp. Valeur codée en dur "SSO". Déterminé par le champ EventType. La valeur par défaut est STATUS_UPDATE si EventType est absent ou ne correspond à aucun critère spécifique. Il peut s'agir de USER_LOGIN, USER_CREATION, USER_RESOURCE_ACCESS, USER_LOGOUT ou USER_CHANGE_PASSWORD. Valeur codée en dur "CENTRIFY_SSO". Valeur codée en dur "SSO". Valeur codée en dur "Centrify". Si le champ message contient un ID de session, il est extrait et utilisé. Sinon, la valeur par défaut est "1". Extrait du champ host s'il est disponible, qui provient de l'en-tête syslog. Extrait du champ pid s'il est disponible, qui provient de l'en-tête syslog. Si UserGuid est présent, sa valeur est utilisée. Sinon, si le champ message contient un ID utilisateur, il est extrait et utilisé. Définissez la valeur sur "ALLOW" (AUTORISER) si Level est "Info", et sur "BLOCK" (BLOQUER) si FailReason est présent. Définissez sur "AUTH_VIOLATION" si FailReason est présent. Déterminé par le champ Level. Définissez la valeur sur "INFORMATIONAL" si Level est "Info", sur "MEDIUM" si Level est "Warning" et sur "ERROR" si Level est "Error".

Vous avez encore besoin d'aide ? Obtenez des réponses de membres de la communauté et de professionnels Google SecOps.