Collecter les journaux SailPoint IAM
Ce document explique comment ingérer des journaux SailPoint IAM dans Google Security Operations à l'aide d'Amazon S3.
Avant de commencer
Assurez-vous de remplir les conditions suivantes :
- Une instance Google SecOps
- Accès privilégié au locataire ou à l'API SailPoint Identity Security Cloud
- Accès privilégié à AWS (S3, IAM, Lambda, EventBridge)
Collecter les prérequis SailPoint IAM (ID, clés API, ID d'organisation, jetons)
- Connectez-vous à la console d'administration SailPoint Identity Security Cloud en tant qu'administrateur.
- Accédez à Global > Paramètres de sécurité > Gestion des API.
- Cliquez sur Créer un client API.
- Choisissez Client Credentials (Identifiants client) comme type d'attribution.
- Fournissez les informations de configuration suivantes :
- Nom : saisissez un nom descriptif (par exemple,
Chronicle Export API). - Description : saisissez une description pour le client API.
- Scopes : sélectionnez
sp:scopes:all(ou les niveaux d'accès en lecture appropriés pour les événements d'audit).
- Nom : saisissez un nom descriptif (par exemple,
- Cliquez sur Créer et copiez les identifiants API générés de manière sécurisée.
- Notez l'URL de base de votre locataire SailPoint (par exemple,
https://tenant.api.identitynow.com). - Copiez et enregistrez les informations suivantes dans un emplacement sécurisé :
- IDN_CLIENT_ID
- IDN_CLIENT_SECRET
- IDN_BASE
Configurer un bucket AWS S3 et IAM pour Google SecOps
- Créez un bucket Amazon S3 en suivant ce guide de l'utilisateur : Créer un bucket.
- Enregistrez le Nom et la Région du bucket pour référence ultérieure (par exemple,
sailpoint-iam-logs). - Créez un utilisateur en suivant ce guide : Créer un utilisateur IAM.
- Sélectionnez l'utilisateur créé.
- Sélectionnez l'onglet Informations d'identification de sécurité.
- Cliquez sur Créer une clé d'accès dans la section Clés d'accès.
- Sélectionnez Service tiers comme Cas d'utilisation.
- Cliquez sur Suivant.
- Facultatif : ajoutez un tag de description.
- Cliquez sur Créer une clé d'accès.
- Cliquez sur Télécharger le fichier CSV pour enregistrer la clé d'accès et la clé d'accès secrète pour une utilisation ultérieure.
- Cliquez sur OK.
- Sélectionnez l'onglet Autorisations.
- Cliquez sur Ajouter des autorisations dans la section Règles d'autorisation.
- Sélectionnez Ajouter des autorisations.
- Sélectionnez Joindre directement des règles.
- Recherchez et sélectionnez la règle AmazonS3FullAccess.
- Cliquez sur Suivant.
- Cliquez sur Ajouter des autorisations.
Configurer la stratégie et le rôle IAM pour les importations S3
- Dans la console AWS, accédez à IAM > Stratégies > Créer une stratégie > onglet JSON.
Copiez et collez le règlement suivant :
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::sailpoint-iam-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::sailpoint-iam-logs/sailpoint/iam/state.json" } ] }- Remplacez
sailpoint-iam-logssi vous avez saisi un autre nom de bucket :
- Remplacez
Cliquez sur Suivant > Créer une règle.
Accédez à IAM > Rôles > Créer un rôle > Service AWS > Lambda.
Associez la règle que vous venez de créer.
Nommez le rôle
SailPointIamToS3Role, puis cliquez sur Créer un rôle.
Créer la fonction Lambda
- Dans la console AWS, accédez à Lambda > Fonctions > Créer une fonction.
- Cliquez sur Créer à partir de zéro.
Fournissez les informations de configuration suivantes :
Paramètre Valeur Nom sailpoint_iam_to_s3Durée d'exécution Python 3.13 Architecture x86_64 Rôle d'exécution SailPointIamToS3RoleUne fois la fonction créée, ouvrez l'onglet Code, supprimez le stub et saisissez le code suivant (
sailpoint_iam_to_s3.py) :#!/usr/bin/env python3 # Lambda: Pull SailPoint Identity Security Cloud audit events and store raw JSONL payloads to S3 # - Uses /v3/search API with pagination for audit events. # - Preserves vendor-native JSON format for identity events. # - Retries with exponential backoff; unique S3 keys to avoid overwrites. # - Outputs JSONL format (one event per line) for optimal Chronicle ingestion. import os, json, time, uuid, urllib.parse from urllib.request import Request, urlopen from urllib.error import URLError, HTTPError import boto3 S3_BUCKET = os.environ["S3_BUCKET"] S3_PREFIX = os.environ.get("S3_PREFIX", "sailpoint/iam/") STATE_KEY = os.environ.get("STATE_KEY", "sailpoint/iam/state.json") WINDOW_SEC = int(os.environ.get("WINDOW_SECONDS", "3600")) # default 1h HTTP_TIMEOUT= int(os.environ.get("HTTP_TIMEOUT", "60")) IDN_BASE = os.environ["IDN_BASE"] # e.g. https://tenant.api.identitynow.com CLIENT_ID = os.environ["IDN_CLIENT_ID"] CLIENT_SECRET = os.environ["IDN_CLIENT_SECRET"] SCOPE = os.environ.get("IDN_SCOPE", "sp:scopes:all") PAGE_SIZE = int(os.environ.get("PAGE_SIZE", "250")) MAX_PAGES = int(os.environ.get("MAX_PAGES", "20")) MAX_RETRIES = int(os.environ.get("MAX_RETRIES", "3")) USER_AGENT = os.environ.get("USER_AGENT", "sailpoint-iam-to-s3/1.0") s3 = boto3.client("s3") def _load_state(): try: obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY) return json.loads(obj["Body"].read()) except Exception: return {} def _save_state(st): s3.put_object( Bucket=S3_BUCKET, Key=STATE_KEY, Body=json.dumps(st, separators=(",", ":")).encode("utf-8"), ContentType="application/json", ) def _iso(ts: float) -> str: return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(ts)) def _get_oauth_token() -> str: """Get OAuth2 access token using Client Credentials flow""" token_url = f"{IDN_BASE.rstrip('/')}/oauth/token" data = urllib.parse.urlencode({ 'grant_type': 'client_credentials', 'client_id': CLIENT_ID, 'client_secret': CLIENT_SECRET, 'scope': SCOPE }).encode('utf-8') req = Request(token_url, data=data, method="POST") req.add_header("Content-Type", "application/x-www-form-urlencoded") req.add_header("User-Agent", USER_AGENT) with urlopen(req, timeout=HTTP_TIMEOUT) as r: response = json.loads(r.read()) return response["access_token"] def _search_events(access_token: str, created_from: str, search_after: list = None) -> list: """Search for audit events using SailPoint's /v3/search API IMPORTANT: SailPoint requires colons in ISO8601 timestamps to be escaped with backslashes. Example: 2024-01-15T10:30:00Z must be sent as 2024-01-15T10\:30\:00Z Reference: https://developer.sailpoint.com/discuss/t/datetime-searches/6609 """ search_url = f"{IDN_BASE.rstrip('/')}/v3/search" # Escape colons in timestamp for SailPoint search query # SailPoint requires: created:>=2024-01-15T10\:30\:00Z (colons must be escaped) escaped_timestamp = created_from.replace(":", "\\:") query_str = f'created:>={escaped_timestamp}' payload = { "indices": ["events"], "query": {"query": query_str}, "sort": ["created", "+id"], "limit": PAGE_SIZE } if search_after: payload["searchAfter"] = search_after attempt = 0 while True: req = Request(search_url, data=json.dumps(payload).encode('utf-8'), method="POST") req.add_header("Content-Type", "application/json") req.add_header("Accept", "application/json") req.add_header("Authorization", f"Bearer {access_token}") req.add_header("User-Agent", USER_AGENT) try: with urlopen(req, timeout=HTTP_TIMEOUT) as r: response = json.loads(r.read()) # Handle different response formats if isinstance(response, list): return response return response.get("results", response.get("data", [])) except (HTTPError, URLError) as e: attempt += 1 print(f"HTTP error on attempt {attempt}: {e}") if attempt > MAX_RETRIES: raise # exponential backoff with jitter time.sleep(min(60, 2 ** attempt) + (time.time() % 1)) def _put_events_data(events: list, from_ts: float, to_ts: float, page_num: int) -> str: """Write events to S3 in JSONL format (one JSON object per line) JSONL format is preferred for Chronicle ingestion as it allows: - Line-by-line processing - Better error recovery - Lower memory footprint """ # Create unique S3 key for events data ts_path = time.strftime("%Y/%m/%d", time.gmtime(to_ts)) uniq = f"{int(time.time()*1e6)}_{uuid.uuid4().hex[:8]}" key = f"{S3_PREFIX}{ts_path}/sailpoint_iam_{int(from_ts)}_{int(to_ts)}_p{page_num:03d}_{uniq}.jsonl" # Convert events list to JSONL format (one JSON object per line) jsonl_lines = [json.dumps(event, separators=(",", ":")) for event in events] jsonl_content = "\n".join(jsonl_lines) s3.put_object( Bucket=S3_BUCKET, Key=key, Body=jsonl_content.encode("utf-8"), ContentType="application/x-ndjson", # JSONL MIME type Metadata={ 'source': 'sailpoint-iam', 'from_timestamp': str(int(from_ts)), 'to_timestamp': str(int(to_ts)), 'page_number': str(page_num), 'events_count': str(len(events)), 'format': 'jsonl' } ) return key def _get_item_id(item: dict) -> str: """Extract ID from event item, trying multiple possible fields""" for field in ("id", "uuid", "eventId", "_id"): if field in item and item[field]: return str(item[field]) return "" def lambda_handler(event=None, context=None): st = _load_state() now = time.time() from_ts = float(st.get("last_to_ts") or (now - WINDOW_SEC)) to_ts = now # Get OAuth token access_token = _get_oauth_token() created_from = _iso(from_ts) print(f"Fetching SailPoint IAM events from: {created_from}") # Handle pagination state last_created = st.get("last_created") last_id = st.get("last_id") search_after = [last_created, last_id] if (last_created and last_id) else None pages = 0 total_events = 0 written_keys = [] newest_created = last_created or created_from newest_id = last_id or "" while pages < MAX_PAGES: events = _search_events(access_token, created_from, search_after) if not events: break # Write page to S3 in JSONL format key = _put_events_data(events, from_ts, to_ts, pages + 1) written_keys.append(key) total_events += len(events) # Update pagination state from last item last_event = events[-1] last_event_created = last_event.get("created") or last_event.get("metadata", {}).get("created") last_event_id = _get_item_id(last_event) if last_event_created: newest_created = last_event_created if last_event_id: newest_id = last_event_id search_after = [newest_created, newest_id] pages += 1 # If we got less than page size, we're done if len(events) < PAGE_SIZE: break print(f"Successfully retrieved {total_events} events across {pages} pages") # Save state for next run st["last_to_ts"] = to_ts st["last_created"] = newest_created st["last_id"] = newest_id st["last_successful_run"] = now _save_state(st) return { "statusCode": 200, "body": { "success": True, "pages": pages, "total_events": total_events, "s3_keys": written_keys, "from_timestamp": from_ts, "to_timestamp": to_ts, "last_created": newest_created, "last_id": newest_id, "format": "jsonl" } } if __name__ == "__main__": print(lambda_handler())Accédez à Configuration> Variables d'environnement> Modifier> Ajouter une variable d'environnement.
Saisissez les variables d'environnement suivantes en remplaçant par vos valeurs.
Variables d'environnement
Clé Exemple de valeur S3_BUCKETsailpoint-iam-logsS3_PREFIXsailpoint/iam/STATE_KEYsailpoint/iam/state.jsonWINDOW_SECONDS3600HTTP_TIMEOUT60MAX_RETRIES3USER_AGENTsailpoint-iam-to-s3/1.0IDN_BASEhttps://tenant.api.identitynow.comIDN_CLIENT_IDyour-client-id(à partir de l'étape 2)IDN_CLIENT_SECRETyour-client-secret(à partir de l'étape 2)IDN_SCOPEsp:scopes:allPAGE_SIZE250MAX_PAGES20Une fois la fonction créée, restez sur sa page (ou ouvrez Lambda > Fonctions > votre-fonction).
Accédez à l'onglet Configuration.
Dans le panneau Configuration générale, cliquez sur Modifier.
Définissez Délai avant expiration sur 5 minutes (300 secondes), puis cliquez sur Enregistrer.
Créer une programmation EventBridge
- Accédez à Amazon EventBridge> Scheduler> Create schedule.
- Fournissez les informations de configuration suivantes :
- Planning récurrent : Tarif (
1 hour). - Cible : votre fonction Lambda
sailpoint_iam_to_s3. - Nom :
sailpoint-iam-1h.
- Planning récurrent : Tarif (
- Cliquez sur Créer la programmation.
Facultatif : Créez un utilisateur et des clés IAM en lecture seule pour Google SecOps
- Accédez à la console AWS> IAM> Utilisateurs> Ajouter des utilisateurs.
- Cliquez sur Add users (Ajouter des utilisateurs).
- Fournissez les informations de configuration suivantes :
- Utilisateur : saisissez
secops-reader. - Type d'accès : sélectionnez Clé d'accès – Accès programmatique.
- Utilisateur : saisissez
- Cliquez sur Créer un utilisateur.
- Associez une stratégie de lecture minimale (personnalisée) : Utilisateurs > secops-reader > Autorisations > Ajouter des autorisations > Associer des stratégies directement > Créer une stratégie.
Dans l'éditeur JSON, saisissez la stratégie suivante :
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::sailpoint-iam-logs/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::sailpoint-iam-logs" } ] }Définissez le nom sur
secops-reader-policy.Accédez à Créer une règle > recherchez/sélectionnez > Suivant > Ajouter des autorisations.
Accédez à Identifiants de sécurité > Clés d'accès > Créer une clé d'accès.
Téléchargez le CSV (ces valeurs sont saisies dans le flux).
Configurer un flux dans Google SecOps pour ingérer les journaux SailPoint IAM
- Accédez à Paramètres SIEM> Flux.
- Cliquez sur + Ajouter un flux.
- Sur la page suivante, cliquez sur Configurer un seul flux.
- Dans le champ Nom du flux, saisissez un nom pour le flux (par exemple,
SailPoint IAM logs). - Sélectionnez Amazon S3 V2 comme type de source.
- Sélectionnez SailPoint IAM comme Type de journal.
- Cliquez sur Suivant.
- Spécifiez les valeurs des paramètres d'entrée suivants :
- URI S3 :
s3://sailpoint-iam-logs/sailpoint/iam/ - Options de suppression de la source : sélectionnez l'option de suppression de votre choix.
- Âge maximal des fichiers : 180 jours par défaut.
- ID de clé d'accès : clé d'accès utilisateur ayant accès au bucket S3.
- Clé d'accès secrète : clé secrète de l'utilisateur ayant accès au bucket S3.
- Espace de noms de l'élément : espace de noms de l'élément.
- Libellés d'ingestion : libellé appliqué aux événements de ce flux.
- URI S3 :
- Cliquez sur Suivant.
- Vérifiez la configuration de votre nouveau flux sur l'écran Finaliser, puis cliquez sur Envoyer.
Table de mappage UDM
| Champ de journal | Mappage UDM | Logique |
|---|---|---|
action |
metadata.description |
Valeur du champ action du journal brut. |
actor.name |
principal.user.user_display_name |
Valeur du champ actor.name du journal brut. |
attributes.accountName |
principal.user.group_identifiers |
Valeur du champ attributes.accountName du journal brut. |
attributes.appId |
target.asset_id |
"ID de l'application : " concaténé avec la valeur du champ attributes.appId du journal brut. |
attributes.attributeName |
additional.fields[0].value.string_value |
Valeur du champ attributes.attributeName du journal brut, placée dans un objet additional.fields. La clé est définie sur "Nom de l'attribut". |
attributes.attributeValue |
additional.fields[1].value.string_value |
Valeur du champ attributes.attributeValue du journal brut, placée dans un objet additional.fields. La clé est définie sur "Attribute Value" (Valeur de l'attribut). |
attributes.cloudAppName |
target.application |
Valeur du champ attributes.cloudAppName du journal brut. |
attributes.hostName |
target.hostname, target.asset.hostname |
Valeur du champ attributes.hostName du journal brut. |
attributes.interface |
additional.fields[2].value.string_value |
Valeur du champ attributes.interface du journal brut, placée dans un objet additional.fields. La clé est définie sur "Interface". |
attributes.operation |
security_result.action_details |
Valeur du champ attributes.operation du journal brut. |
attributes.previousValue |
additional.fields[3].value.string_value |
Valeur du champ attributes.previousValue du journal brut, placée dans un objet additional.fields. La clé est définie sur "Previous Value" (Valeur précédente). |
attributes.provisioningResult |
security_result.detection_fields.value |
Valeur du champ attributes.provisioningResult du journal brut, placée dans un objet security_result.detection_fields. La clé est définie sur "Provisioning Result". |
attributes.sourceId |
principal.labels[0].value |
Valeur du champ attributes.sourceId du journal brut, placée dans un objet principal.labels. La clé est définie sur "ID de la source". |
attributes.sourceName |
principal.labels[1].value |
Valeur du champ attributes.sourceName du journal brut, placée dans un objet principal.labels. La clé est définie sur "Nom de la source". |
auditClassName |
metadata.product_event_type |
Valeur du champ auditClassName du journal brut. |
created |
metadata.event_timestamp.seconds, metadata.event_timestamp.nanos |
Valeur du champ created du journal brut, convertie en code temporel si instant.epochSecond n'est pas présent. |
id |
metadata.product_log_id |
Valeur du champ id du journal brut. |
instant.epochSecond |
metadata.event_timestamp.seconds |
Valeur du champ instant.epochSecond du journal brut, utilisée pour l'horodatage. |
ipAddress |
principal.asset.ip, principal.ip |
Valeur du champ ipAddress du journal brut. |
interface |
additional.fields[0].value.string_value |
Valeur du champ interface du journal brut, placée dans un objet additional.fields. La clé est définie sur "interface". |
loggerName |
intermediary.application |
Valeur du champ loggerName du journal brut. |
message |
metadata.description, security_result.description |
Utilisé à diverses fins, y compris pour définir la description dans les métadonnées et security_result, et pour extraire le contenu XML. |
name |
security_result.description |
Valeur du champ name du journal brut. |
operation |
target.resource.attribute.labels[0].value, metadata.product_event_type |
Valeur du champ operation du journal brut, placée dans un objet target.resource.attribute.labels. La clé est définie sur "operation". Également utilisé pour metadata.product_event_type. |
org |
principal.administrative_domain |
Valeur du champ org du journal brut. |
pod |
principal.location.name |
Valeur du champ pod du journal brut. |
referenceClass |
additional.fields[1].value.string_value |
Valeur du champ referenceClass du journal brut, placée dans un objet additional.fields. La clé est définie sur "referenceClass". |
referenceId |
additional.fields[2].value.string_value |
Valeur du champ referenceId du journal brut, placée dans un objet additional.fields. La clé est définie sur "referenceId". |
sailPointObjectName |
additional.fields[3].value.string_value |
Valeur du champ sailPointObjectName du journal brut, placée dans un objet additional.fields. La clé est définie sur "sailPointObjectName". |
serverHost |
principal.hostname, principal.asset.hostname |
Valeur du champ serverHost du journal brut. |
stack |
additional.fields[4].value.string_value |
Valeur du champ stack du journal brut, placée dans un objet additional.fields. La clé est définie sur "Stack". |
status |
security_result.severity_details |
Valeur du champ status du journal brut. |
target |
additional.fields[4].value.string_value |
Valeur du champ target du journal brut, placée dans un objet additional.fields. La clé est définie sur "target". |
target.name |
principal.user.userid |
Valeur du champ target.name du journal brut. |
technicalName |
security_result.summary |
Valeur du champ technicalName du journal brut. |
thrown.cause.message |
xml_body, detailed_message |
Valeur du champ thrown.cause.message du journal brut, utilisée pour extraire le contenu XML. |
thrown.message |
xml_body, detailed_message |
Valeur du champ thrown.message du journal brut, utilisée pour extraire le contenu XML. |
trackingNumber |
additional.fields[5].value.string_value |
Valeur du champ trackingNumber du journal brut, placée dans un objet additional.fields. La clé est définie sur "Tracking Number" (Numéro de suivi). |
type |
metadata.product_event_type |
Valeur du champ type du journal brut. |
_version |
metadata.product_version |
Valeur du champ _version du journal brut. |
| N/A | metadata.event_timestamp |
Dérivé des champs instant.epochSecond ou created. |
| N/A | metadata.event_type |
Déterminé par la logique de l'analyseur en fonction de différents champs, y compris has_principal_user, has_target_application, technicalName et action. La valeur par défaut est "GENERIC_EVENT". |
| N/A | metadata.log_type |
Défini sur "SAILPOINT_IAM". |
| N/A | metadata.product_name |
Défini sur "IAM". |
| N/A | metadata.vendor_name |
Défini sur "SAILPOINT". |
| N/A | extensions.auth.type |
Défini sur "AUTHTYPE_UNSPECIFIED" dans certaines conditions. |
| N/A | target.resource.attribute.labels[0].key |
Définissez-le sur "operation". |
Vous avez encore besoin d'aide ? Obtenez des réponses de membres de la communauté et de professionnels Google SecOps.