收集 SailPoint IAM 記錄
本文說明如何使用 Amazon S3,將 SailPoint Identity and Access Management (IAM) 記錄檔擷取至 Google Security Operations。剖析器會處理 JSON 和 XML 格式的記錄,並將其轉換為統一資料模型 (UDM)。這項服務會區分單一 UDM 事件 (ProvisioningPlan、AccountRequest、SOAP-ENV)、多個 UDM 事件 (ProvisioningProject) 和 UDM 實體 (Identity),並為每個事件套用特定的剖析邏輯和欄位對應,包括處理非 XML 資料的通用事件。
事前準備
請確認您已完成下列事前準備事項:
- Google SecOps 執行個體。
- SailPoint Identity Security Cloud 的特殊存取權。
- AWS 的特殊權限 (S3、IAM、Lambda、EventBridge)。
收集 SailPoint IAM 必要條件 (ID、API 金鑰、機構 ID、權杖)
- 以管理員身分登入 SailPoint Identity Security Cloud 管理控制台。
- 依序前往「全域」>「安全性設定」>「API 管理」。
- 按一下「建立 API 用戶端」。
- 選取「用戶端憑證」做為授權類型。
- 提供下列設定詳細資料:
- 名稱:輸入描述性名稱 (例如
Google SecOps Export API
)。 - 說明:輸入 API 用戶端的說明。
- 「範圍」:選取「
sp:scopes:all
」。
- 名稱:輸入描述性名稱 (例如
- 按一下「建立」,然後將產生的 API 憑證儲存在安全地點。
- 記錄 SailPoint 租戶基準網址 (例如
https://tenant.api.identitynow.com
)。 - 複製下列詳細資料並存放在安全位置:
- IDN_CLIENT_ID。
- IDN_CLIENT_SECRET。
- IDN_BASE。
為 Google SecOps 設定 AWS S3 值區和 IAM
- 按照這份使用者指南建立 Amazon S3 bucket:建立 bucket
- 儲存 bucket 的「名稱」和「區域」,以供日後參考 (例如
sailpoint-iam-logs
)。 - 請按照這份使用者指南建立使用者:建立 IAM 使用者。
- 選取建立的「使用者」。
- 選取「安全憑證」分頁標籤。
- 在「Access Keys」部分中,按一下「Create Access Key」。
- 選取「第三方服務」做為「用途」。
- 點選「下一步」。
- 選用:新增說明標記。
- 按一下「建立存取金鑰」。
- 按一下「下載 CSV 檔案」,儲存「存取金鑰」和「私密存取金鑰」以供日後參考。
- 按一下 [完成]。
- 選取「權限」分頁標籤。
- 在「權限政策」部分中,按一下「新增權限」。
- 選取「新增權限」。
- 選取「直接附加政策」。
- 搜尋「AmazonS3FullAccess」AmazonS3FullAccess政策。
- 選取政策。
- 點選「下一步」。
- 按一下「Add permissions」。
設定 S3 上傳的身分與存取權管理政策和角色
- 在 AWS 控制台中,依序前往「IAM」>「Policies」(政策)。
- 按一下「建立政策」>「JSON」分頁。
- 複製並貼上下列政策。
政策 JSON (如果您輸入的 bucket 名稱不同,請替換
sailpoint-iam-logs
):{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::sailpoint-iam-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::sailpoint-iam-logs/sailpoint/iam/state.json" } ] }
依序點選「下一步」>「建立政策」。
依序前往「IAM」>「Roles」>「Create role」>「AWS service」>「Lambda」。
附加新建立的政策。
為角色命名
SailPointIamToS3Role
,然後按一下「建立角色」。
建立 Lambda 函式
- 在 AWS 控制台中,依序前往「Lambda」>「Functions」>「Create function」。
- 按一下「從頭開始撰寫」。
請提供下列設定詳細資料:
設定 值 名稱 sailpoint_iam_to_s3
執行階段 Python 3.13 架構 x86_64 執行角色 SailPointIamToS3Role
建立函式後,開啟「程式碼」分頁,刪除存根並貼上以下程式碼 (
sailpoint_iam_to_s3.py
)。#!/usr/bin/env python3 # Lambda: Pull SailPoint Identity Security Cloud audit events and store raw JSON payloads to S3 # - Uses /v3/search API with pagination for audit events. # - Preserves vendor-native JSON format for identity events. # - Retries with exponential backoff; unique S3 keys to avoid overwrites. import os, json, time, uuid, urllib.parse from urllib.request import Request, urlopen from urllib.error import URLError, HTTPError import boto3 S3_BUCKET = os.environ["S3_BUCKET"] S3_PREFIX = os.environ.get("S3_PREFIX", "sailpoint/iam/") STATE_KEY = os.environ.get("STATE_KEY", "sailpoint/iam/state.json") WINDOW_SEC = int(os.environ.get("WINDOW_SECONDS", "3600")) # default 1h HTTP_TIMEOUT= int(os.environ.get("HTTP_TIMEOUT", "60")) IDN_BASE = os.environ["IDN_BASE"] # e.g. https://tenant.api.identitynow.com CLIENT_ID = os.environ["IDN_CLIENT_ID"] CLIENT_SECRET = os.environ["IDN_CLIENT_SECRET"] SCOPE = os.environ.get("IDN_SCOPE", "sp:scopes:all") PAGE_SIZE = int(os.environ.get("PAGE_SIZE", "250")) MAX_PAGES = int(os.environ.get("MAX_PAGES", "20")) MAX_RETRIES = int(os.environ.get("MAX_RETRIES", "3")) USER_AGENT = os.environ.get("USER_AGENT", "sailpoint-iam-to-s3/1.0") s3 = boto3.client("s3") def _load_state(): try: obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY) return json.loads(obj["Body"].read()) except Exception: return {} def _save_state(st): s3.put_object( Bucket=S3_BUCKET, Key=STATE_KEY, Body=json.dumps(st, separators=(",", ":")).encode("utf-8"), ContentType="application/json", ) def _iso(ts: float) -> str: return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(ts)) def _get_oauth_token() -> str: """Get OAuth2 access token using Client Credentials flow""" token_url = f"{IDN_BASE.rstrip('/')}/oauth/token" data = urllib.parse.urlencode({ 'grant_type': 'client_credentials', 'client_id': CLIENT_ID, 'client_secret': CLIENT_SECRET, 'scope': SCOPE }).encode('utf-8') req = Request(token_url, data=data, method="POST") req.add_header("Content-Type", "application/x-www-form-urlencoded") req.add_header("User-Agent", USER_AGENT) with urlopen(req, timeout=HTTP_TIMEOUT) as r: response = json.loads(r.read()) return response["access_token"] def _search_events(access_token: str, created_from: str, search_after: list = None) -> list: """Search for audit events using SailPoint's /v3/search API""" search_url = f"{IDN_BASE.rstrip('/')}/v3/search" # Build search query for events created after specified time query_str = f'created:">={created_from}"' payload = { "indices": ["events"], "query": {"query": query_str}, "sort": ["created", "+id"], "limit": PAGE_SIZE } if search_after: payload["searchAfter"] = search_after attempt = 0 while True: req = Request(search_url, data=json.dumps(payload).encode('utf-8'), method="POST") req.add_header("Content-Type", "application/json") req.add_header("Accept", "application/json") req.add_header("Authorization", f"Bearer {access_token}") req.add_header("User-Agent", USER_AGENT) try: with urlopen(req, timeout=HTTP_TIMEOUT) as r: response = json.loads(r.read()) # Handle different response formats if isinstance(response, list): return response return response.get("results", response.get("data", [])) except (HTTPError, URLError) as e: attempt += 1 print(f"HTTP error on attempt {attempt}: {e}") if attempt > MAX_RETRIES: raise # exponential backoff with jitter time.sleep(min(60, 2 ** attempt) + (time.time() % 1)) def _put_events_data(events: list, from_ts: float, to_ts: float, page_num: int) -> str: # Create unique S3 key for events data ts_path = time.strftime("%Y/%m/%d", time.gmtime(to_ts)) uniq = f"{int(time.time()*1e6)}_{uuid.uuid4().hex[:8]}" key = f"{S3_PREFIX}{ts_path}/sailpoint_iam_{int(from_ts)}_{int(to_ts)}_p{page_num:03d}_{uniq}.json" s3.put_object( Bucket=S3_BUCKET, Key=key, Body=json.dumps(events, separators=(",", ":")).encode("utf-8"), ContentType="application/json", Metadata={ 'source': 'sailpoint-iam', 'from_timestamp': str(int(from_ts)), 'to_timestamp': str(int(to_ts)), 'page_number': str(page_num), 'events_count': str(len(events)) } ) return key def _get_item_id(item: dict) -> str: """Extract ID from event item, trying multiple possible fields""" for field in ("id", "uuid", "eventId", "_id"): if field in item and item[field]: return str(item[field]) return "" def lambda_handler(event=None, context=None): st = _load_state() now = time.time() from_ts = float(st.get("last_to_ts") or (now - WINDOW_SEC)) to_ts = now # Get OAuth token access_token = _get_oauth_token() created_from = _iso(from_ts) print(f"Fetching SailPoint IAM events from: {created_from}") # Handle pagination state last_created = st.get("last_created") last_id = st.get("last_id") search_after = [last_created, last_id] if (last_created and last_id) else None pages = 0 total_events = 0 written_keys = [] newest_created = last_created or created_from newest_id = last_id or "" while pages < MAX_PAGES: events = _search_events(access_token, created_from, search_after) if not events: break # Write page to S3 key = _put_events_data(events, from_ts, to_ts, pages + 1) written_keys.append(key) total_events += len(events) # Update pagination state from last item last_event = events[-1] last_event_created = last_event.get("created") or last_event.get("metadata", {}).get("created") last_event_id = _get_item_id(last_event) if last_event_created: newest_created = last_event_created if last_event_id: newest_id = last_event_id search_after = [newest_created, newest_id] pages += 1 # If we got less than page size, we're done if len(events) < PAGE_SIZE: break print(f"Successfully retrieved {total_events} events across {pages} pages") # Save state for next run st["last_to_ts"] = to_ts st["last_created"] = newest_created st["last_id"] = newest_id st["last_successful_run"] = now _save_state(st) return { "statusCode": 200, "body": { "success": True, "pages": pages, "total_events": total_events, "s3_keys": written_keys, "from_timestamp": from_ts, "to_timestamp": to_ts, "last_created": newest_created, "last_id": newest_id } } if __name__ == "__main__": print(lambda_handler())
依序前往「設定」>「環境變數」。
依序點選「編輯」> 新增環境變數。
輸入下表提供的環境變數,並將範例值換成您的值。
環境變數
鍵 範例值 S3_BUCKET
sailpoint-iam-logs
S3_PREFIX
sailpoint/iam/
STATE_KEY
sailpoint/iam/state.json
WINDOW_SECONDS
3600
HTTP_TIMEOUT
60
MAX_RETRIES
3
USER_AGENT
sailpoint-iam-to-s3/1.0
IDN_BASE
https://tenant.api.identitynow.com
IDN_CLIENT_ID
your-client-id
(步驟 2)IDN_CLIENT_SECRET
your-client-secret
(步驟 2)IDN_SCOPE
sp:scopes:all
PAGE_SIZE
250
MAX_PAGES
20
建立函式後,請留在函式頁面 (或依序開啟「Lambda」>「Functions」>「your-function」)。
選取「設定」分頁標籤。
在「一般設定」面板中,按一下「編輯」。
將「Timeout」(逾時間隔) 變更為「5 minutes (300 seconds)」(5 分鐘 (300 秒)),然後按一下「Save」(儲存)。
建立 EventBridge 排程
- 依序前往「Amazon EventBridge」>「Scheduler」>「Create schedule」。
- 提供下列設定詳細資料:
- 週期性時間表:費率 (
1 hour
)。 - 目標:您的 Lambda 函式
sailpoint_iam_to_s3
。 - 名稱:
sailpoint-iam-1h
。
- 週期性時間表:費率 (
- 按一下「建立時間表」。
(選用) 為 Google SecOps 建立唯讀 IAM 使用者和金鑰
- 前往 AWS 控制台 > IAM > 使用者。
- 點選 [Add users] (新增使用者)。
- 提供下列設定詳細資料:
- 使用者:輸入
secops-reader
。 - 存取類型:選取「存取金鑰 - 程式輔助存取」。
- 使用者:輸入
- 按一下「建立使用者」。
- 附加最低讀取權限政策 (自訂):依序選取「Users」(使用者) >「secops-reader」>「Permissions」(權限) >「Add permissions」(新增權限) >「Attach policies directly」(直接附加政策) >「Create policy」(建立政策)。
JSON:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::sailpoint-iam-logs/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::sailpoint-iam-logs" } ] }
Name =
secops-reader-policy
。依序點選「建立政策」> 搜尋/選取 >「下一步」>「新增權限」。
為
secops-reader
建立存取金鑰:依序點選「安全憑證」>「存取金鑰」。按一下「建立存取金鑰」。
下載
.CSV
。(您會將這些值貼到動態饋給中)。
在 Google SecOps 中設定動態饋給,擷取 SailPoint IAM 記錄
- 依序前往「SIEM 設定」>「動態饋給」。
- 按一下「+ 新增動態消息」。
- 在「動態饋給名稱」欄位中輸入動態饋給名稱 (例如
SailPoint IAM logs
)。 - 選取「Amazon S3 V2」做為「來源類型」。
- 選取「SailPoint IAM」做為「記錄類型」。
- 點選「下一步」。
- 指定下列輸入參數的值:
- S3 URI:
s3://sailpoint-iam-logs/sailpoint/iam/
- 來源刪除選項:根據偏好設定選取刪除選項。
- 檔案存在時間上限:包含在過去天數內修改的檔案。預設值為 180 天。
- 存取金鑰 ID:具有 S3 值區存取權的使用者存取金鑰。
- 存取密鑰:具有 S3 bucket 存取權的使用者私密金鑰。
- 資產命名空間:資產命名空間。
- 擷取標籤:套用至這個動態饋給事件的標籤。
- S3 URI:
- 點選「下一步」。
- 在「完成」畫面中檢查新的動態饋給設定,然後按一下「提交」。
UDM 對應表
記錄欄位 | UDM 對應 | 邏輯 |
---|---|---|
action |
metadata.description |
原始記錄中的 action 欄位值。 |
actor.name |
principal.user.user_display_name |
原始記錄中的 actor.name 欄位值。 |
attributes.accountName |
principal.user.group_identifiers |
原始記錄中的 attributes.accountName 欄位值。 |
attributes.appId |
target.asset_id |
「App ID: 」與原始記錄中 attributes.appId 欄位的值串連。 |
attributes.attributeName |
additional.fields[0].value.string_value |
原始記錄中的 attributes.attributeName 欄位值,放在 additional.fields 物件中。索引鍵會設為「屬性名稱」。 |
attributes.attributeValue |
additional.fields[1].value.string_value |
原始記錄中的 attributes.attributeValue 欄位值,放在 additional.fields 物件中。索引鍵設為「屬性值」。 |
attributes.cloudAppName |
target.application |
原始記錄中的 attributes.cloudAppName 欄位值。 |
attributes.hostName |
target.hostname 、target.asset.hostname |
原始記錄中的 attributes.hostName 欄位值。 |
attributes.interface |
additional.fields[2].value.string_value |
原始記錄中的 attributes.interface 欄位值,放在 additional.fields 物件中。金鑰設為「Interface」。 |
attributes.operation |
security_result.action_details |
原始記錄中的 attributes.operation 欄位值。 |
attributes.previousValue |
additional.fields[3].value.string_value |
原始記錄中的 attributes.previousValue 欄位值,放在 additional.fields 物件中。索引鍵設為「Previous Value」。 |
attributes.provisioningResult |
security_result.detection_fields.value |
原始記錄中的 attributes.provisioningResult 欄位值,放置在 security_result.detection_fields 物件中。索引鍵設為「Provisioning Result」。 |
attributes.sourceId |
principal.labels[0].value |
原始記錄中的 attributes.sourceId 欄位值,放置在 principal.labels 物件中。索引鍵設為「來源 ID」。 |
attributes.sourceName |
principal.labels[1].value |
原始記錄中的 attributes.sourceName 欄位值,放置在 principal.labels 物件中。索引鍵設為「來源名稱」。 |
auditClassName |
metadata.product_event_type |
原始記錄中的 auditClassName 欄位值。 |
created |
metadata.event_timestamp.seconds 、metadata.event_timestamp.nanos |
原始記錄中的 created 欄位值,如果沒有 instant.epochSecond ,則會轉換為時間戳記。 |
id |
metadata.product_log_id |
原始記錄中的 id 欄位值。 |
instant.epochSecond |
metadata.event_timestamp.seconds |
原始記錄中的 instant.epochSecond 欄位值,用於時間戳記。 |
ipAddress |
principal.asset.ip 、principal.ip |
原始記錄中的 ipAddress 欄位值。 |
interface |
additional.fields[0].value.string_value |
原始記錄中的 interface 欄位值,放在 additional.fields 物件中。金鑰設為「介面」。 |
loggerName |
intermediary.application |
原始記錄中的 loggerName 欄位值。 |
message |
metadata.description 、security_result.description |
用於各種用途,包括在 metadata 和 security_result 中設定說明,以及擷取 XML 內容。 |
name |
security_result.description |
原始記錄中的 name 欄位值。 |
operation |
target.resource.attribute.labels[0].value 、metadata.product_event_type |
原始記錄中的 operation 欄位值,放置在 target.resource.attribute.labels 物件中。索引鍵設為「operation」。metadata.product_event_type 也使用這筆付款資料。 |
org |
principal.administrative_domain |
原始記錄中的 org 欄位值。 |
pod |
principal.location.name |
原始記錄中的 pod 欄位值。 |
referenceClass |
additional.fields[1].value.string_value |
原始記錄中的 referenceClass 欄位值,放在 additional.fields 物件中。索引鍵設為「referenceClass」。 |
referenceId |
additional.fields[2].value.string_value |
原始記錄中的 referenceId 欄位值,放在 additional.fields 物件中。金鑰設為「referenceId」。 |
sailPointObjectName |
additional.fields[3].value.string_value |
原始記錄中的 sailPointObjectName 欄位值,放在 additional.fields 物件中。索引鍵設為「sailPointObjectName」。 |
serverHost |
principal.hostname 、principal.asset.hostname |
原始記錄中的 serverHost 欄位值。 |
stack |
additional.fields[4].value.string_value |
原始記錄中的 stack 欄位值,放在 additional.fields 物件中。索引鍵設為「Stack」。 |
status |
security_result.severity_details |
原始記錄中的 status 欄位值。 |
target |
additional.fields[4].value.string_value |
原始記錄中的 target 欄位值,放在 additional.fields 物件中。索引鍵設為「target」。 |
target.name |
principal.user.userid |
原始記錄中的 target.name 欄位值。 |
technicalName |
security_result.summary |
原始記錄中的 technicalName 欄位值。 |
thrown.cause.message |
xml_body 、detailed_message |
原始記錄中的 thrown.cause.message 欄位值,用於擷取 XML 內容。 |
thrown.message |
xml_body 、detailed_message |
原始記錄中的 thrown.message 欄位值,用於擷取 XML 內容。 |
trackingNumber |
additional.fields[5].value.string_value |
原始記錄中的 trackingNumber 欄位值,放在 additional.fields 物件中。索引鍵設為「追蹤號碼」。 |
type |
metadata.product_event_type |
原始記錄中的 type 欄位值。 |
_version |
metadata.product_version |
原始記錄中的 _version 欄位值。 |
不適用 | metadata.event_timestamp |
衍生自 instant.epochSecond 或 created 欄位。 |
不適用 | metadata.event_type |
由剖析器邏輯根據各種欄位 (包括 has_principal_user 、has_target_application 、technicalName 和 action ) 判斷。預設值為「GENERIC_EVENT」。 |
不適用 | metadata.log_type |
設為「SAILPOINT_IAM」。 |
不適用 | metadata.product_name |
設為 IAM 。 |
不適用 | metadata.vendor_name |
設為「SAILPOINT」。 |
不適用 | extensions.auth.type |
在特定情況下,請設為「AUTHTYPE_UNSPECIFIED」。 |
不適用 | target.resource.attribute.labels[0].key |
設為「operation」。 |
還有其他問題嗎?向社群成員和 Google SecOps 專業人員尋求答案。