收集 SailPoint IAM 日志
本文档介绍了如何使用 Amazon S3 将 SailPoint Identity and Access Management (IAM) 日志注入到 Google Security Operations。解析器会处理 JSON 和 XML 格式的日志,并将其转换为统一数据模型 (UDM)。它可区分单个 UDM 事件(ProvisioningPlan、AccountRequest、SOAP-ENV)、多个 UDM 事件 (ProvisioningProject) 和 UDM 实体 (Identity),并为每个事件应用特定的解析逻辑和字段映射,包括针对非 XML 数据的通用事件处理。
准备工作
请确保满足以下前提条件:
- Google SecOps 实例。
- 对 SailPoint Identity Security Cloud 的特权访问权限。
- 对 AWS(S3、IAM、Lambda、EventBridge)的特权访问权限。
收集 SailPoint IAM 前提条件(ID、API 密钥、组织 ID、令牌)
- 以管理员身份登录 SailPoint Identity Security Cloud 管理控制台。
- 依次前往全局 > 安全设置 > API 管理。
- 点击创建 API 客户端。
- 选择客户端凭据作为授权类型。
- 提供以下配置详细信息:
- 名称:输入一个描述性名称(例如
Google SecOps Export API
)。 - 说明:输入 API 客户端的说明。
- 范围:选择
sp:scopes:all
。
- 名称:输入一个描述性名称(例如
- 点击创建,并将生成的 API 凭据保存到安全的位置。
- 记录您的 SailPoint 租户基础网址(例如
https://tenant.api.identitynow.com
)。 - 复制以下详细信息并将其保存在安全的位置:
- IDN_CLIENT_ID。
- IDN_CLIENT_SECRET。
- IDN_BASE。
为 Google SecOps 配置 AWS S3 存储桶和 IAM
- 按照以下用户指南创建 Amazon S3 存储桶:创建存储桶
- 保存存储桶名称和区域以供日后参考(例如
sailpoint-iam-logs
)。 - 按照以下用户指南创建用户:创建 IAM 用户。
- 选择创建的用户。
- 选择安全凭据标签页。
- 在访问密钥部分中,点击创建访问密钥。
- 选择第三方服务作为使用情形。
- 点击下一步。
- 可选:添加说明标记。
- 点击创建访问密钥。
- 点击下载 CSV 文件,保存访问密钥和秘密访问密钥,以供日后参考。
- 点击完成。
- 选择权限标签页。
- 在权限政策部分中,点击添加权限。
- 选择添加权限。
- 选择直接附加政策。
- 搜索 AmazonS3FullAccess 政策。
- 选择相应政策。
- 点击下一步。
- 点击添加权限。
为 S3 上传配置 IAM 政策和角色
- 在 AWS 控制台中,依次前往 IAM > 政策。
- 依次点击创建政策 > JSON 标签页。
- 复制并粘贴以下政策。
政策 JSON(如果您输入了其他存储桶名称,请替换
sailpoint-iam-logs
):{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::sailpoint-iam-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::sailpoint-iam-logs/sailpoint/iam/state.json" } ] }
依次点击下一步 > 创建政策。
依次前往 IAM > 角色 > 创建角色 > AWS 服务 > Lambda。
附加新创建的政策。
将角色命名为
SailPointIamToS3Role
,然后点击创建角色。
创建 Lambda 函数
- 在 AWS 控制台中,依次前往 Lambda > 函数 > 创建函数。
- 点击从头开始创作。
提供以下配置详细信息:
设置 值 名称 sailpoint_iam_to_s3
运行时 Python 3.13 架构 x86_64 执行角色 SailPointIamToS3Role
创建函数后,打开 Code 标签页,删除桩代码并粘贴以下代码 (
sailpoint_iam_to_s3.py
)。#!/usr/bin/env python3 # Lambda: Pull SailPoint Identity Security Cloud audit events and store raw JSON payloads to S3 # - Uses /v3/search API with pagination for audit events. # - Preserves vendor-native JSON format for identity events. # - Retries with exponential backoff; unique S3 keys to avoid overwrites. import os, json, time, uuid, urllib.parse from urllib.request import Request, urlopen from urllib.error import URLError, HTTPError import boto3 S3_BUCKET = os.environ["S3_BUCKET"] S3_PREFIX = os.environ.get("S3_PREFIX", "sailpoint/iam/") STATE_KEY = os.environ.get("STATE_KEY", "sailpoint/iam/state.json") WINDOW_SEC = int(os.environ.get("WINDOW_SECONDS", "3600")) # default 1h HTTP_TIMEOUT= int(os.environ.get("HTTP_TIMEOUT", "60")) IDN_BASE = os.environ["IDN_BASE"] # e.g. https://tenant.api.identitynow.com CLIENT_ID = os.environ["IDN_CLIENT_ID"] CLIENT_SECRET = os.environ["IDN_CLIENT_SECRET"] SCOPE = os.environ.get("IDN_SCOPE", "sp:scopes:all") PAGE_SIZE = int(os.environ.get("PAGE_SIZE", "250")) MAX_PAGES = int(os.environ.get("MAX_PAGES", "20")) MAX_RETRIES = int(os.environ.get("MAX_RETRIES", "3")) USER_AGENT = os.environ.get("USER_AGENT", "sailpoint-iam-to-s3/1.0") s3 = boto3.client("s3") def _load_state(): try: obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY) return json.loads(obj["Body"].read()) except Exception: return {} def _save_state(st): s3.put_object( Bucket=S3_BUCKET, Key=STATE_KEY, Body=json.dumps(st, separators=(",", ":")).encode("utf-8"), ContentType="application/json", ) def _iso(ts: float) -> str: return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(ts)) def _get_oauth_token() -> str: """Get OAuth2 access token using Client Credentials flow""" token_url = f"{IDN_BASE.rstrip('/')}/oauth/token" data = urllib.parse.urlencode({ 'grant_type': 'client_credentials', 'client_id': CLIENT_ID, 'client_secret': CLIENT_SECRET, 'scope': SCOPE }).encode('utf-8') req = Request(token_url, data=data, method="POST") req.add_header("Content-Type", "application/x-www-form-urlencoded") req.add_header("User-Agent", USER_AGENT) with urlopen(req, timeout=HTTP_TIMEOUT) as r: response = json.loads(r.read()) return response["access_token"] def _search_events(access_token: str, created_from: str, search_after: list = None) -> list: """Search for audit events using SailPoint's /v3/search API""" search_url = f"{IDN_BASE.rstrip('/')}/v3/search" # Build search query for events created after specified time query_str = f'created:">={created_from}"' payload = { "indices": ["events"], "query": {"query": query_str}, "sort": ["created", "+id"], "limit": PAGE_SIZE } if search_after: payload["searchAfter"] = search_after attempt = 0 while True: req = Request(search_url, data=json.dumps(payload).encode('utf-8'), method="POST") req.add_header("Content-Type", "application/json") req.add_header("Accept", "application/json") req.add_header("Authorization", f"Bearer {access_token}") req.add_header("User-Agent", USER_AGENT) try: with urlopen(req, timeout=HTTP_TIMEOUT) as r: response = json.loads(r.read()) # Handle different response formats if isinstance(response, list): return response return response.get("results", response.get("data", [])) except (HTTPError, URLError) as e: attempt += 1 print(f"HTTP error on attempt {attempt}: {e}") if attempt > MAX_RETRIES: raise # exponential backoff with jitter time.sleep(min(60, 2 ** attempt) + (time.time() % 1)) def _put_events_data(events: list, from_ts: float, to_ts: float, page_num: int) -> str: # Create unique S3 key for events data ts_path = time.strftime("%Y/%m/%d", time.gmtime(to_ts)) uniq = f"{int(time.time()*1e6)}_{uuid.uuid4().hex[:8]}" key = f"{S3_PREFIX}{ts_path}/sailpoint_iam_{int(from_ts)}_{int(to_ts)}_p{page_num:03d}_{uniq}.json" s3.put_object( Bucket=S3_BUCKET, Key=key, Body=json.dumps(events, separators=(",", ":")).encode("utf-8"), ContentType="application/json", Metadata={ 'source': 'sailpoint-iam', 'from_timestamp': str(int(from_ts)), 'to_timestamp': str(int(to_ts)), 'page_number': str(page_num), 'events_count': str(len(events)) } ) return key def _get_item_id(item: dict) -> str: """Extract ID from event item, trying multiple possible fields""" for field in ("id", "uuid", "eventId", "_id"): if field in item and item[field]: return str(item[field]) return "" def lambda_handler(event=None, context=None): st = _load_state() now = time.time() from_ts = float(st.get("last_to_ts") or (now - WINDOW_SEC)) to_ts = now # Get OAuth token access_token = _get_oauth_token() created_from = _iso(from_ts) print(f"Fetching SailPoint IAM events from: {created_from}") # Handle pagination state last_created = st.get("last_created") last_id = st.get("last_id") search_after = [last_created, last_id] if (last_created and last_id) else None pages = 0 total_events = 0 written_keys = [] newest_created = last_created or created_from newest_id = last_id or "" while pages < MAX_PAGES: events = _search_events(access_token, created_from, search_after) if not events: break # Write page to S3 key = _put_events_data(events, from_ts, to_ts, pages + 1) written_keys.append(key) total_events += len(events) # Update pagination state from last item last_event = events[-1] last_event_created = last_event.get("created") or last_event.get("metadata", {}).get("created") last_event_id = _get_item_id(last_event) if last_event_created: newest_created = last_event_created if last_event_id: newest_id = last_event_id search_after = [newest_created, newest_id] pages += 1 # If we got less than page size, we're done if len(events) < PAGE_SIZE: break print(f"Successfully retrieved {total_events} events across {pages} pages") # Save state for next run st["last_to_ts"] = to_ts st["last_created"] = newest_created st["last_id"] = newest_id st["last_successful_run"] = now _save_state(st) return { "statusCode": 200, "body": { "success": True, "pages": pages, "total_events": total_events, "s3_keys": written_keys, "from_timestamp": from_ts, "to_timestamp": to_ts, "last_created": newest_created, "last_id": newest_id } } if __name__ == "__main__": print(lambda_handler())
依次前往配置 > 环境变量。
依次点击修改 > 添加新的环境变量。
输入下表中提供的环境变量,并将示例值替换为您的值。
环境变量
键 示例值 S3_BUCKET
sailpoint-iam-logs
S3_PREFIX
sailpoint/iam/
STATE_KEY
sailpoint/iam/state.json
WINDOW_SECONDS
3600
HTTP_TIMEOUT
60
MAX_RETRIES
3
USER_AGENT
sailpoint-iam-to-s3/1.0
IDN_BASE
https://tenant.api.identitynow.com
IDN_CLIENT_ID
your-client-id
(来自第 2 步)IDN_CLIENT_SECRET
your-client-secret
(来自第 2 步)IDN_SCOPE
sp:scopes:all
PAGE_SIZE
250
MAX_PAGES
20
创建函数后,请停留在其页面上(或依次打开 Lambda > 函数 > 您的函数)。
选择配置标签页。
在常规配置面板中,点击修改。
将超时更改为 5 分钟(300 秒),然后点击保存。
创建 EventBridge 计划
- 依次前往 Amazon EventBridge > 调度器 > 创建调度。
- 提供以下配置详细信息:
- 周期性安排:费率 (
1 hour
)。 - 目标:您的 Lambda 函数
sailpoint_iam_to_s3
。 - 名称:
sailpoint-iam-1h
。
- 周期性安排:费率 (
- 点击创建时间表。
(可选)为 Google SecOps 创建只读 IAM 用户和密钥
- 依次前往 AWS 控制台 > IAM > 用户。
- 点击 Add users(添加用户)。
- 提供以下配置详细信息:
- 用户:输入
secops-reader
。 - 访问类型:选择访问密钥 - 以程序化方式访问。
- 用户:输入
- 点击创建用户。
- 附加最低限度的读取政策(自定义):依次选择用户 > secops-reader > 权限 > 添加权限 > 直接附加政策 > 创建政策。
JSON:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::sailpoint-iam-logs/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::sailpoint-iam-logs" } ] }
名称 =
secops-reader-policy
。依次点击创建政策 > 搜索/选择 > 下一步 > 添加权限。
为
secops-reader
创建访问密钥:安全凭据 > 访问密钥。点击创建访问密钥。
下载
.CSV
。(您需要将这些值粘贴到 Feed 中)。
在 Google SecOps 中配置 Feed 以注入 SailPoint IAM 日志
- 依次前往 SIEM 设置> Feed。
- 点击 + 添加新 Feed。
- 在Feed 名称字段中,输入 Feed 的名称(例如
SailPoint IAM logs
)。 - 选择 Amazon S3 V2 作为来源类型。
- 选择 SailPoint IAM 作为日志类型。
- 点击下一步。
- 为以下输入参数指定值:
- S3 URI:
s3://sailpoint-iam-logs/sailpoint/iam/
- 来源删除选项:根据您的偏好选择删除选项。
- 文件存在时间上限:包含在过去指定天数内修改的文件。默认值为 180 天。
- 访问密钥 ID:有权访问 S3 存储桶的用户访问密钥。
- 私有访问密钥:具有 S3 存储桶访问权限的用户私有密钥。
- 资产命名空间:资产命名空间。
- 注入标签:应用于此 Feed 中事件的标签。
- S3 URI:
- 点击下一步。
- 在最终确定界面中查看新的 Feed 配置,然后点击提交。
UDM 映射表
日志字段 | UDM 映射 | 逻辑 |
---|---|---|
action |
metadata.description |
原始日志中 action 字段的值。 |
actor.name |
principal.user.user_display_name |
原始日志中 actor.name 字段的值。 |
attributes.accountName |
principal.user.group_identifiers |
原始日志中 attributes.accountName 字段的值。 |
attributes.appId |
target.asset_id |
“应用 ID:”与原始日志中 attributes.appId 字段的值串联。 |
attributes.attributeName |
additional.fields[0].value.string_value |
原始日志中 attributes.attributeName 字段的值,放置在 additional.fields 对象中。键设置为“属性名称”。 |
attributes.attributeValue |
additional.fields[1].value.string_value |
原始日志中 attributes.attributeValue 字段的值,放置在 additional.fields 对象中。键设置为“属性值”。 |
attributes.cloudAppName |
target.application |
原始日志中 attributes.cloudAppName 字段的值。 |
attributes.hostName |
target.hostname ,target.asset.hostname |
原始日志中 attributes.hostName 字段的值。 |
attributes.interface |
additional.fields[2].value.string_value |
原始日志中 attributes.interface 字段的值,放置在 additional.fields 对象中。键设置为“Interface”。 |
attributes.operation |
security_result.action_details |
原始日志中 attributes.operation 字段的值。 |
attributes.previousValue |
additional.fields[3].value.string_value |
原始日志中 attributes.previousValue 字段的值,放置在 additional.fields 对象中。相应键设置为“上一个值”。 |
attributes.provisioningResult |
security_result.detection_fields.value |
原始日志中 attributes.provisioningResult 字段的值,放置在 security_result.detection_fields 对象中。键设置为“Provisioning Result”。 |
attributes.sourceId |
principal.labels[0].value |
原始日志中 attributes.sourceId 字段的值,放置在 principal.labels 对象中。键设置为“Source Id”。 |
attributes.sourceName |
principal.labels[1].value |
原始日志中 attributes.sourceName 字段的值,放置在 principal.labels 对象中。键设置为“来源名称”。 |
auditClassName |
metadata.product_event_type |
原始日志中 auditClassName 字段的值。 |
created |
metadata.event_timestamp.seconds ,metadata.event_timestamp.nanos |
原始日志中 created 字段的值,如果不存在 instant.epochSecond ,则转换为时间戳。 |
id |
metadata.product_log_id |
原始日志中 id 字段的值。 |
instant.epochSecond |
metadata.event_timestamp.seconds |
原始日志中 instant.epochSecond 字段的值,用于时间戳。 |
ipAddress |
principal.asset.ip ,principal.ip |
原始日志中 ipAddress 字段的值。 |
interface |
additional.fields[0].value.string_value |
原始日志中 interface 字段的值,放置在 additional.fields 对象中。键设置为“interface”。 |
loggerName |
intermediary.application |
原始日志中 loggerName 字段的值。 |
message |
metadata.description ,security_result.description |
用于各种用途,包括在元数据和 security_result 中设置说明,以及提取 XML 内容。 |
name |
security_result.description |
原始日志中 name 字段的值。 |
operation |
target.resource.attribute.labels[0].value ,metadata.product_event_type |
原始日志中 operation 字段的值,放置在 target.resource.attribute.labels 对象中。键设置为“operation”。还用于 metadata.product_event_type 。 |
org |
principal.administrative_domain |
原始日志中 org 字段的值。 |
pod |
principal.location.name |
原始日志中 pod 字段的值。 |
referenceClass |
additional.fields[1].value.string_value |
原始日志中 referenceClass 字段的值,放置在 additional.fields 对象中。键设置为“referenceClass”。 |
referenceId |
additional.fields[2].value.string_value |
原始日志中 referenceId 字段的值,放置在 additional.fields 对象中。键设置为“referenceId”。 |
sailPointObjectName |
additional.fields[3].value.string_value |
原始日志中 sailPointObjectName 字段的值,放置在 additional.fields 对象中。键设置为“sailPointObjectName”。 |
serverHost |
principal.hostname ,principal.asset.hostname |
原始日志中 serverHost 字段的值。 |
stack |
additional.fields[4].value.string_value |
原始日志中 stack 字段的值,放置在 additional.fields 对象中。键设置为“Stack”。 |
status |
security_result.severity_details |
原始日志中 status 字段的值。 |
target |
additional.fields[4].value.string_value |
原始日志中 target 字段的值,放置在 additional.fields 对象中。键设置为“target”。 |
target.name |
principal.user.userid |
原始日志中 target.name 字段的值。 |
technicalName |
security_result.summary |
原始日志中 technicalName 字段的值。 |
thrown.cause.message |
xml_body ,detailed_message |
原始日志中 thrown.cause.message 字段的值,用于提取 XML 内容。 |
thrown.message |
xml_body ,detailed_message |
原始日志中 thrown.message 字段的值,用于提取 XML 内容。 |
trackingNumber |
additional.fields[5].value.string_value |
原始日志中 trackingNumber 字段的值,放置在 additional.fields 对象中。键设置为“Tracking Number”。 |
type |
metadata.product_event_type |
原始日志中 type 字段的值。 |
_version |
metadata.product_version |
原始日志中 _version 字段的值。 |
不适用 | metadata.event_timestamp |
派生自 instant.epochSecond 或 created 字段。 |
不适用 | metadata.event_type |
由解析器逻辑根据各种字段(包括 has_principal_user 、has_target_application 、technicalName 和 action )确定。默认值为“GENERIC_EVENT”。 |
不适用 | metadata.log_type |
设置为“SAILPOINT_IAM”。 |
不适用 | metadata.product_name |
设置为 IAM 。 |
不适用 | metadata.vendor_name |
设置为“SAILPOINT”。 |
不适用 | extensions.auth.type |
在某些条件下设置为“AUTHTYPE_UNSPECIFIED”。 |
不适用 | target.resource.attribute.labels[0].key |
设置为“操作”。 |
需要更多帮助?从社区成员和 Google SecOps 专业人士那里获得解答。