Collecter les journaux Delinea SSO
Ce document explique comment ingérer les journaux d'authentification unique (SSO) Delinea (anciennement Centrify) dans Google Security Operations à l'aide d'Amazon S3. L'analyseur extrait les journaux, en gérant les formats JSON et syslog. Il analyse les paires clé/valeur, les codes temporels et d'autres champs pertinents, en les mappant au modèle UDM, avec une logique spécifique pour gérer les échecs de connexion, les user-agents, les niveaux de gravité, les mécanismes d'authentification et divers types d'événements. Il donne la priorité à FailUserName
sur NormalizedUser
pour les adresses e-mail cibles dans les événements d'échec.
Avant de commencer
Assurez-vous de remplir les conditions suivantes :
- Une instance Google SecOps.
- Accès privilégié au locataire Delinea (Centrify) SSO.
- Accès privilégié à AWS (S3, Identity and Access Management (IAM), Lambda, EventBridge).
Collecter les conditions préalables du SSO Delinea (Centrify) (ID, clés API, ID d'organisation, jetons)
- Connectez-vous au portail d'administration Delinea.
- Accédez à Applications > Ajouter des applications.
- Recherchez OAuth2 Client, puis cliquez sur Add (Ajouter).
- Cliquez sur Oui dans la boîte de dialogue Ajouter une application Web.
- Cliquez sur Fermer dans la boîte de dialogue Ajouter des applications Web.
- Sur la page Application Configuration, configurez les éléments suivants :
- Onglet Général :
- ID de l'application : saisissez un identifiant unique (par exemple,
secops-oauth-client
). - Nom de l'application : saisissez un nom descriptif (par exemple,
SecOps Data Export
). - Description de l'application : saisissez une description (par exemple,
OAuth client for exporting audit events to SecOps
).
- ID de l'application : saisissez un identifiant unique (par exemple,
- Onglet Confiance :
- L'application est confidentielle : cochez cette option.
- Type d'ID client : sélectionnez Confidentiel.
- ID client émis : copiez et enregistrez cette valeur.
- Code secret du client émis : copiez et enregistrez cette valeur.
- Onglet Jetons :
- Méthodes d'authentification : sélectionnez Identifiants client.
- Type de jeton : sélectionnez Jwt RS256.
- Onglet Champ d'application :
- Ajoutez le champ d'application siem avec la description SIEM Integration Access (Accès à l'intégration SIEM).
- Ajoutez le champ d'application redrock/query avec la description Accès à l'API Query.
- Onglet Général :
- Cliquez sur Enregistrer pour créer le client OAuth.
- Accédez à Services principaux > Utilisateurs > Ajouter un utilisateur.
- Configurez l'utilisateur du service :
- Nom de connexion : saisissez l'ID client de l'étape 6.
- Adresse e-mail : saisissez une adresse e-mail valide (champ obligatoire).
- Nom à afficher : saisissez un nom descriptif (par exemple,
SecOps Service User
). - Mot de passe et Confirmer le mot de passe : saisissez le code secret client de l'étape 6.
- État : sélectionnez Est un client OAuth confidentiel.
- Cliquez sur Créer un utilisateur.
- Accédez à Accès > Rôles et attribuez au compte de service un rôle disposant des autorisations appropriées pour interroger les événements d'audit.
- Copiez et enregistrez les informations suivantes dans un emplacement sécurisé :
- URL du locataire : URL de votre locataire Centrify (par exemple,
https://yourtenant.my.centrify.com
) - ID client : à l'étape 6
- Code secret du client : à partir de l'étape 6
- ID d'application OAuth : à partir de la configuration de l'application
- URL du locataire : URL de votre locataire Centrify (par exemple,
Configurer un bucket AWS S3 et IAM pour Google SecOps
- Créez un bucket Amazon S3 en suivant ce guide de l'utilisateur : Créer un bucket.
- Enregistrez le Nom et la Région du bucket pour référence ultérieure (par exemple,
delinea-centrify-logs-bucket
). - Créez un utilisateur en suivant ce guide de l'utilisateur : Créer un utilisateur IAM.
- Sélectionnez l'utilisateur créé.
- Sélectionnez l'onglet Informations d'identification de sécurité.
- Cliquez sur Créer une clé d'accès dans la section Clés d'accès.
- Sélectionnez Service tiers comme Cas d'utilisation.
- Cliquez sur Suivant.
- Facultatif : Ajoutez un tag de description.
- Cliquez sur Créer une clé d'accès.
- Cliquez sur Télécharger le fichier CSV pour enregistrer la clé d'accès et la clé d'accès secrète pour référence ultérieure.
- Cliquez sur OK.
- Sélectionnez l'onglet Autorisations.
- Cliquez sur Ajouter des autorisations dans la section Règles relatives aux autorisations.
- Sélectionnez Ajouter des autorisations.
- Sélectionnez Joindre directement des règles.
- Recherchez la règle AmazonS3FullAccess.
- Sélectionnez la règle.
- Cliquez sur Suivant.
- Cliquez sur Ajouter des autorisations.
Configurer la stratégie et le rôle IAM pour les importations S3
- Dans la console AWS, accédez à IAM > Stratégies.
- Cliquez sur Créer une règle > onglet JSON.
- Copiez et collez le règlement suivant.
JSON de la règle (remplacez
delinea-centrify-logs-bucket
si vous avez saisi un autre nom de bucket) :{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket/centrify-sso-logs/state.json" } ] }
Cliquez sur Suivant > Créer une règle.
Accédez à IAM > Rôles.
Cliquez sur Créer un rôle > Service AWS > Lambda.
Associez la stratégie que vous venez de créer et la stratégie gérée AWSLambdaBasicExecutionRole (pour la journalisation CloudWatch).
Nommez le rôle
CentrifySSOLogExportRole
, puis cliquez sur Créer un rôle.
Créer la fonction Lambda
- Dans la console AWS, accédez à Lambda > Fonctions > Créer une fonction.
- Cliquez sur Créer à partir de zéro.
Fournissez les informations de configuration suivantes :
Paramètre Valeur Nom CentrifySSOLogExport
Durée d'exécution Python 3.13 Architecture x86_64 Rôle d'exécution CentrifySSOLogExportRole
Une fois la fonction créée, ouvrez l'onglet Code, supprimez le stub et collez le code suivant (
CentrifySSOLogExport.py
).import json import boto3 import requests import base64 from datetime import datetime, timedelta import os from typing import Dict, List, Optional def lambda_handler(event, context): """ Lambda function to fetch Delinea Centrify SSO audit events and store them in S3 """ # Environment variables S3_BUCKET = os.environ['S3_BUCKET'] S3_PREFIX = os.environ['S3_PREFIX'] STATE_KEY = os.environ['STATE_KEY'] # Centrify API credentials TENANT_URL = os.environ['TENANT_URL'] CLIENT_ID = os.environ['CLIENT_ID'] CLIENT_SECRET = os.environ['CLIENT_SECRET'] OAUTH_APP_ID = os.environ['OAUTH_APP_ID'] # Optional parameters PAGE_SIZE = int(os.environ.get('PAGE_SIZE', '1000')) MAX_PAGES = int(os.environ.get('MAX_PAGES', '10')) s3_client = boto3.client('s3') try: # Get last execution state last_timestamp = get_last_state(s3_client, S3_BUCKET, STATE_KEY) # Get OAuth access token access_token = get_oauth_token(TENANT_URL, CLIENT_ID, CLIENT_SECRET, OAUTH_APP_ID) # Fetch audit events events = fetch_audit_events(TENANT_URL, access_token, last_timestamp, PAGE_SIZE, MAX_PAGES) if events: # Store events in S3 current_timestamp = datetime.utcnow() filename = f"{S3_PREFIX}centrify-sso-events-{current_timestamp.strftime('%Y%m%d_%H%M%S')}.json" store_events_to_s3(s3_client, S3_BUCKET, filename, events) # Update state with latest timestamp latest_timestamp = get_latest_event_timestamp(events) update_state(s3_client, S3_BUCKET, STATE_KEY, latest_timestamp) print(f"Successfully processed {len(events)} events and stored to {filename}") else: print("No new events found") return { 'statusCode': 200, 'body': json.dumps(f'Successfully processed {len(events) if events else 0} events') } except Exception as e: print(f"Error processing Centrify SSO logs: {str(e)}") return { 'statusCode': 500, 'body': json.dumps(f'Error: {str(e)}') } def get_oauth_token(tenant_url: str, client_id: str, client_secret: str, oauth_app_id: str) -> str: """ Get OAuth access token using client credentials flow """ # Create basic auth token credentials = f"{client_id}:{client_secret}" basic_auth = base64.b64encode(credentials.encode()).decode() token_url = f"{tenant_url}/oauth2/token/{oauth_app_id}" headers = { 'Authorization': f'Basic {basic_auth}', 'X-CENTRIFY-NATIVE-CLIENT': 'True', 'Content-Type': 'application/x-www-form-urlencoded' } data = { 'grant_type': 'client_credentials', 'scope': 'siem redrock/query' } response = requests.post(token_url, headers=headers, data=data) response.raise_for_status() token_data = response.json() return token_data['access_token'] def fetch_audit_events(tenant_url: str, access_token: str, last_timestamp: str, page_size: int, max_pages: int) -> List[Dict]: """ Fetch audit events from Centrify using the Redrock/query API """ query_url = f"{tenant_url}/Redrock/query" headers = { 'Authorization': f'Bearer {access_token}', 'X-CENTRIFY-NATIVE-CLIENT': 'True', 'Content-Type': 'application/json' } # Build SQL query with timestamp filter if last_timestamp: sql_query = f"Select * from Event where WhenOccurred > '{last_timestamp}' ORDER BY WhenOccurred ASC" else: # First run - get events from last 24 hours sql_query = "Select * from Event where WhenOccurred > datefunc('now', '-1') ORDER BY WhenOccurred ASC" payload = { "Script": sql_query, "args": { "PageSize": page_size, "Limit": page_size * max_pages, "Caching": -1 } } response = requests.post(query_url, headers=headers, json=payload) response.raise_for_status() response_data = response.json() if not response_data.get('success', False): raise Exception(f"API query failed: {response_data.get('Message', 'Unknown error')}") # Parse the response result = response_data.get('Result', {}) columns = {col['Name']: i for i, col in enumerate(result.get('Columns', []))} raw_results = result.get('Results', []) events = [] for raw_event in raw_results: event = {} row_data = raw_event.get('Row', {}) # Map column names to values for col_name, col_index in columns.items(): if col_name in row_data and row_data[col_name] is not None: event[col_name] = row_data[col_name] # Add metadata event['_source'] = 'centrify_sso' event['_collected_at'] = datetime.utcnow().isoformat() + 'Z' events.append(event) return events def get_last_state(s3_client, bucket: str, state_key: str) -> Optional[str]: """ Get the last processed timestamp from S3 state file """ try: response = s3_client.get_object(Bucket=bucket, Key=state_key) state_data = json.loads(response['Body'].read().decode('utf-8')) return state_data.get('last_timestamp') except s3_client.exceptions.NoSuchKey: print("No previous state found, starting from 24 hours ago") return None except Exception as e: print(f"Error reading state: {e}") return None def update_state(s3_client, bucket: str, state_key: str, timestamp: str): """ Update the state file with the latest processed timestamp """ state_data = { 'last_timestamp': timestamp, 'updated_at': datetime.utcnow().isoformat() + 'Z' } s3_client.put_object( Bucket=bucket, Key=state_key, Body=json.dumps(state_data), ContentType='application/json' ) def store_events_to_s3(s3_client, bucket: str, key: str, events: List[Dict]): """ Store events as JSONL (one JSON object per line) in S3 """ # Convert to JSONL format (one JSON object per line) jsonl_content = 'n'.join(json.dumps(event, default=str) for event in events) s3_client.put_object( Bucket=bucket, Key=key, Body=jsonl_content, ContentType='application/x-ndjson' ) def get_latest_event_timestamp(events: List[Dict]) -> str: """ Get the latest timestamp from the events for state tracking """ if not events: return datetime.utcnow().isoformat() + 'Z' latest = None for event in events: when_occurred = event.get('WhenOccurred') if when_occurred: if latest is None or when_occurred > latest: latest = when_occurred return latest or datetime.utcnow().isoformat() + 'Z'
Accédez à Configuration > Variables d'environnement.
Cliquez sur Modifier > Ajouter une variable d'environnement.
Saisissez les variables d'environnement fournies dans le tableau suivant, en remplaçant les exemples de valeurs par les vôtres.
Variables d'environnement
Clé Exemple de valeur S3_BUCKET
delinea-centrify-logs-bucket
S3_PREFIX
centrify-sso-logs/
STATE_KEY
centrify-sso-logs/state.json
TENANT_URL
https://yourtenant.my.centrify.com
CLIENT_ID
your-client-id
CLIENT_SECRET
your-client-secret
OAUTH_APP_ID
your-oauth-application-id
OAUTH_SCOPE
siem
PAGE_SIZE
1000
MAX_PAGES
10
Une fois la fonction créée, restez sur sa page (ou ouvrez Lambda > Fonctions > votre-fonction).
Accédez à l'onglet Configuration.
Dans le panneau Configuration générale, cliquez sur Modifier.
Définissez le délai avant expiration sur 5 minutes (300 secondes), puis cliquez sur Enregistrer.
Créer une programmation EventBridge
- Accédez à Amazon EventBridge> Scheduler> Create schedule.
- Fournissez les informations de configuration suivantes :
- Planning récurrent : Tarif (
1 hour
). - Cible : votre fonction Lambda
CentrifySSOLogExport
. - Nom :
CentrifySSOLogExport-1h
.
- Planning récurrent : Tarif (
- Cliquez sur Créer la programmation.
(Facultatif) Créez un utilisateur et des clés IAM en lecture seule pour Google SecOps
- Dans la console AWS, accédez à IAM > Utilisateurs.
- Cliquez sur Add users (Ajouter des utilisateurs).
- Fournissez les informations de configuration suivantes :
- Utilisateur : saisissez
secops-reader
. - Type d'accès : sélectionnez Clé d'accès – Accès programmatique.
- Utilisateur : saisissez
- Cliquez sur Créer un utilisateur.
- Associez une stratégie de lecture minimale (personnalisée) : Utilisateurs > secops-reader > Autorisations.
- Cliquez sur Ajouter des autorisations> Joindre directement des règles.
- Sélectionnez Créer une règle.
JSON :
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket" } ] }
Nom =
secops-reader-policy
.Cliquez sur Créer une règle > recherchez/sélectionnez > Suivant.
Cliquez sur Ajouter des autorisations.
Créez une clé d'accès pour
secops-reader
: Identifiants de sécurité > Clés d'accès.Cliquez sur Créer une clé d'accès.
Téléchargez le fichier
.CSV
. (Vous collerez ces valeurs dans le flux.)
Configurer un flux dans Google SecOps pour ingérer les journaux SSO Delinea (Centrify)
- Accédez à Paramètres SIEM> Flux.
- Cliquez sur + Ajouter un flux.
- Dans le champ Nom du flux, saisissez un nom pour le flux (par exemple,
Delinea Centrify SSO logs
). - Sélectionnez Amazon S3 V2 comme type de source.
- Sélectionnez Centrify comme type de journal.
- Cliquez sur Suivant.
- Spécifiez les valeurs des paramètres d'entrée suivants :
- URI S3 :
s3://delinea-centrify-logs-bucket/centrify-sso-logs/
- Options de suppression de la source : sélectionnez l'option de suppression de votre choix.
- Âge maximal des fichiers : incluez les fichiers modifiés au cours des derniers jours. La valeur par défaut est de 180 jours.
- ID de clé d'accès : clé d'accès utilisateur ayant accès au bucket S3.
- Clé d'accès secrète : clé secrète de l'utilisateur ayant accès au bucket S3.
- Espace de noms de l'élément : espace de noms de l'élément.
- Libellés d'ingestion : libellé appliqué aux événements de ce flux.
- URI S3 :
- Cliquez sur Suivant.
- Vérifiez la configuration de votre nouveau flux sur l'écran Finaliser, puis cliquez sur Envoyer.
Table de mappage UDM
Champ du journal | Mappage UDM | Logique |
---|---|---|
AccountID |
security_result.detection_fields.value |
La valeur de AccountID du journal brut est attribuée à un objet security_result.detection_fields avec key : Account ID . |
ApplicationName |
target.application |
La valeur de ApplicationName du journal brut est attribuée au champ target.application . |
AuthorityFQDN |
target.asset.network_domain |
La valeur de AuthorityFQDN du journal brut est attribuée au champ target.asset.network_domain . |
AuthorityID |
target.asset.asset_id |
La valeur de AuthorityID du journal brut est attribuée au champ target.asset.asset_id , précédée de "AuthorityID:". |
AzDeploymentId |
security_result.detection_fields.value |
La valeur de AzDeploymentId du journal brut est attribuée à un objet security_result.detection_fields avec key : AzDeploymentId . |
AzRoleId |
additional.fields.value.string_value |
La valeur de AzRoleId du journal brut est attribuée à un objet additional.fields avec key : AzRole Id . |
AzRoleName |
target.user.attribute.roles.name |
La valeur de AzRoleName du journal brut est attribuée au champ target.user.attribute.roles.name . |
ComputerFQDN |
principal.asset.network_domain |
La valeur de ComputerFQDN du journal brut est attribuée au champ principal.asset.network_domain . |
ComputerID |
principal.asset.asset_id |
La valeur de ComputerID du journal brut est attribuée au champ principal.asset.asset_id , précédée de "ComputerId:". |
ComputerName |
about.hostname |
La valeur de ComputerName du journal brut est attribuée au champ about.hostname . |
CredentialId |
security_result.detection_fields.value |
La valeur de CredentialId du journal brut est attribuée à un objet security_result.detection_fields avec key : Credential Id . |
DirectoryServiceName |
security_result.detection_fields.value |
La valeur de DirectoryServiceName du journal brut est attribuée à un objet security_result.detection_fields avec key : Directory Service Name . |
DirectoryServiceNameLocalized |
security_result.detection_fields.value |
La valeur de DirectoryServiceNameLocalized du journal brut est attribuée à un objet security_result.detection_fields avec key : Directory Service Name Localized . |
DirectoryServiceUuid |
security_result.detection_fields.value |
La valeur de DirectoryServiceUuid du journal brut est attribuée à un objet security_result.detection_fields avec key : Directory Service Uuid . |
EventMessage |
security_result.summary |
La valeur de EventMessage du journal brut est attribuée au champ security_result.summary . |
EventType |
metadata.product_event_type |
La valeur de EventType du journal brut est attribuée au champ metadata.product_event_type . Il est également utilisé pour déterminer le metadata.event_type . |
FailReason |
security_result.summary |
La valeur de FailReason du journal brut est attribuée au champ security_result.summary , le cas échéant. |
FailUserName |
target.user.email_addresses |
La valeur de FailUserName du journal brut est attribuée au champ target.user.email_addresses , le cas échéant. |
FromIPAddress |
principal.ip |
La valeur de FromIPAddress du journal brut est attribuée au champ principal.ip . |
ID |
security_result.detection_fields.value |
La valeur de ID du journal brut est attribuée à un objet security_result.detection_fields avec key : ID . |
InternalTrackingID |
metadata.product_log_id |
La valeur de InternalTrackingID du journal brut est attribuée au champ metadata.product_log_id . |
JumpType |
additional.fields.value.string_value |
La valeur de JumpType du journal brut est attribuée à un objet additional.fields avec key : Jump Type . |
NormalizedUser |
target.user.email_addresses |
La valeur de NormalizedUser du journal brut est attribuée au champ target.user.email_addresses . |
OperationMode |
additional.fields.value.string_value |
La valeur de OperationMode du journal brut est attribuée à un objet additional.fields avec key : Operation Mode . |
ProxyId |
security_result.detection_fields.value |
La valeur de ProxyId du journal brut est attribuée à un objet security_result.detection_fields avec key : Proxy Id . |
RequestUserAgent |
network.http.user_agent |
La valeur de RequestUserAgent du journal brut est attribuée au champ network.http.user_agent . |
SessionGuid |
network.session_id |
La valeur de SessionGuid du journal brut est attribuée au champ network.session_id . |
Tenant |
additional.fields.value.string_value |
La valeur de Tenant du journal brut est attribuée à un objet additional.fields avec key : Tenant . |
ThreadType |
additional.fields.value.string_value |
La valeur de ThreadType du journal brut est attribuée à un objet additional.fields avec key : Thread Type . |
UserType |
principal.user.attribute.roles.name |
La valeur de UserType du journal brut est attribuée au champ principal.user.attribute.roles.name . |
WhenOccurred |
metadata.event_timestamp |
La valeur de WhenOccurred du journal brut est analysée et attribuée au champ metadata.event_timestamp . Ce champ renseigne également le champ de niveau supérieur timestamp . Valeur codée en dur "SSO". Déterminé par le champ EventType . La valeur par défaut est STATUS_UPDATE si EventType est absent ou ne correspond à aucun critère spécifique. Il peut s'agir de USER_LOGIN , USER_CREATION , USER_RESOURCE_ACCESS , USER_LOGOUT ou USER_CHANGE_PASSWORD . Valeur codée en dur "CENTRIFY_SSO". Valeur codée en dur "SSO". Valeur codée en dur "Centrify". Si le champ message contient un ID de session, il est extrait et utilisé. Sinon, la valeur par défaut est "1". Extrait du champ host s'il est disponible, qui provient de l'en-tête syslog. Extrait du champ pid s'il est disponible, qui provient de l'en-tête syslog. Si UserGuid est présent, sa valeur est utilisée. Sinon, si le champ message contient un ID utilisateur, il est extrait et utilisé. Définissez la valeur sur "ALLOW" (AUTORISER) si Level est "Info", et sur "BLOCK" (BLOQUER) si FailReason est présent. Définissez sur "AUTH_VIOLATION" si FailReason est présent. Déterminé par le champ Level . Définissez la valeur sur "INFORMATIONAL" si Level est "Info", sur "MEDIUM" si Level est "Warning" et sur "ERROR" si Level est "Error". |
Vous avez encore besoin d'aide ? Obtenez des réponses de membres de la communauté et de professionnels Google SecOps.