收集 SailPoint IAM 記錄

支援的國家/地區:

本文說明如何使用 Amazon S3,將 SailPoint Identity and Access Management (IAM) 記錄檔擷取至 Google Security Operations。剖析器會處理 JSON 和 XML 格式的記錄,並將其轉換為統一資料模型 (UDM)。這項服務會區分單一 UDM 事件 (ProvisioningPlan、AccountRequest、SOAP-ENV)、多個 UDM 事件 (ProvisioningProject) 和 UDM 實體 (Identity),並為每個事件套用特定的剖析邏輯和欄位對應,包括處理非 XML 資料的通用事件。

事前準備

請確認您已完成下列事前準備事項:

  • Google SecOps 執行個體。
  • SailPoint Identity Security Cloud 的特殊存取權。
  • AWS 的特殊權限 (S3、IAM、Lambda、EventBridge)。

收集 SailPoint IAM 必要條件 (ID、API 金鑰、機構 ID、權杖)

  1. 以管理員身分登入 SailPoint Identity Security Cloud 管理控制台
  2. 依序前往「全域」>「安全性設定」>「API 管理」
  3. 按一下「建立 API 用戶端」
  4. 選取「用戶端憑證」做為授權類型。
  5. 提供下列設定詳細資料:
    • 名稱:輸入描述性名稱 (例如 Google SecOps Export API)。
    • 說明:輸入 API 用戶端的說明。
    • 「範圍」:選取「sp:scopes:all」。
  6. 按一下「建立」,然後將產生的 API 憑證儲存在安全地點。
  7. 記錄 SailPoint 租戶基準網址 (例如 https://tenant.api.identitynow.com)。
  8. 複製下列詳細資料並存放在安全位置:
    • IDN_CLIENT_ID
    • IDN_CLIENT_SECRET
    • IDN_BASE

為 Google SecOps 設定 AWS S3 值區和 IAM

  1. 按照這份使用者指南建立 Amazon S3 bucket建立 bucket
  2. 儲存 bucket 的「名稱」和「區域」,以供日後參考 (例如 sailpoint-iam-logs)。
  3. 請按照這份使用者指南建立使用者建立 IAM 使用者
  4. 選取建立的「使用者」
  5. 選取「安全憑證」分頁標籤。
  6. 在「Access Keys」部分中,按一下「Create Access Key」
  7. 選取「第三方服務」做為「用途」
  8. 點選「下一步」
  9. 選用:新增說明標記。
  10. 按一下「建立存取金鑰」
  11. 按一下「下載 CSV 檔案」,儲存「存取金鑰」和「私密存取金鑰」以供日後參考。
  12. 按一下 [完成]
  13. 選取「權限」分頁標籤。
  14. 在「權限政策」部分中,按一下「新增權限」
  15. 選取「新增權限」
  16. 選取「直接附加政策」
  17. 搜尋「AmazonS3FullAccess」AmazonS3FullAccess政策。
  18. 選取政策。
  19. 點選「下一步」
  20. 按一下「Add permissions」。

設定 S3 上傳的身分與存取權管理政策和角色

  1. AWS 控制台中,依序前往「IAM」>「Policies」(政策)
  2. 按一下「建立政策」>「JSON」分頁
  3. 複製並貼上下列政策。
  4. 政策 JSON (如果您輸入的 bucket 名稱不同,請替換 sailpoint-iam-logs):

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::sailpoint-iam-logs/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": "s3:GetObject",
          "Resource": "arn:aws:s3:::sailpoint-iam-logs/sailpoint/iam/state.json"
        }
      ]
    }
    
  5. 依序點選「下一步」>「建立政策」

  6. 依序前往「IAM」>「Roles」>「Create role」>「AWS service」>「Lambda」。

  7. 附加新建立的政策。

  8. 為角色命名 SailPointIamToS3Role,然後按一下「建立角色」

建立 Lambda 函式

  1. AWS 控制台中,依序前往「Lambda」>「Functions」>「Create function」
  2. 按一下「從頭開始撰寫」
  3. 請提供下列設定詳細資料:

    設定
    名稱 sailpoint_iam_to_s3
    執行階段 Python 3.13
    架構 x86_64
    執行角色 SailPointIamToS3Role
  4. 建立函式後,開啟「程式碼」分頁,刪除存根並貼上以下程式碼 (sailpoint_iam_to_s3.py)。

    #!/usr/bin/env python3
    # Lambda: Pull SailPoint Identity Security Cloud audit events and store raw JSON payloads to S3
    # - Uses /v3/search API with pagination for audit events.
    # - Preserves vendor-native JSON format for identity events.
    # - Retries with exponential backoff; unique S3 keys to avoid overwrites.
    
    import os, json, time, uuid, urllib.parse
    from urllib.request import Request, urlopen
    from urllib.error import URLError, HTTPError
    
    import boto3
    
    S3_BUCKET   = os.environ["S3_BUCKET"]
    S3_PREFIX   = os.environ.get("S3_PREFIX", "sailpoint/iam/")
    STATE_KEY   = os.environ.get("STATE_KEY", "sailpoint/iam/state.json")
    WINDOW_SEC  = int(os.environ.get("WINDOW_SECONDS", "3600"))  # default 1h
    HTTP_TIMEOUT= int(os.environ.get("HTTP_TIMEOUT", "60"))
    IDN_BASE    = os.environ["IDN_BASE"]  # e.g. https://tenant.api.identitynow.com
    CLIENT_ID   = os.environ["IDN_CLIENT_ID"]
    CLIENT_SECRET = os.environ["IDN_CLIENT_SECRET"]
    SCOPE       = os.environ.get("IDN_SCOPE", "sp:scopes:all")
    PAGE_SIZE   = int(os.environ.get("PAGE_SIZE", "250"))
    MAX_PAGES   = int(os.environ.get("MAX_PAGES", "20"))
    MAX_RETRIES = int(os.environ.get("MAX_RETRIES", "3"))
    USER_AGENT  = os.environ.get("USER_AGENT", "sailpoint-iam-to-s3/1.0")
    
    s3 = boto3.client("s3")
    
    def _load_state():
        try:
            obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY)
            return json.loads(obj["Body"].read())
        except Exception:
            return {}
    
    def _save_state(st):
        s3.put_object(
            Bucket=S3_BUCKET,
            Key=STATE_KEY,
            Body=json.dumps(st, separators=(",", ":")).encode("utf-8"),
            ContentType="application/json",
        )
    
    def _iso(ts: float) -> str:
        return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(ts))
    
    def _get_oauth_token() -> str:
        """Get OAuth2 access token using Client Credentials flow"""
        token_url = f"{IDN_BASE.rstrip('/')}/oauth/token"
    
        data = urllib.parse.urlencode({
            'grant_type': 'client_credentials',
            'client_id': CLIENT_ID,
            'client_secret': CLIENT_SECRET,
            'scope': SCOPE
        }).encode('utf-8')
    
        req = Request(token_url, data=data, method="POST")
        req.add_header("Content-Type", "application/x-www-form-urlencoded")
        req.add_header("User-Agent", USER_AGENT)
    
        with urlopen(req, timeout=HTTP_TIMEOUT) as r:
            response = json.loads(r.read())
            return response["access_token"]
    
    def _search_events(access_token: str, created_from: str, search_after: list = None) -> list:
        """Search for audit events using SailPoint's /v3/search API"""
        search_url = f"{IDN_BASE.rstrip('/')}/v3/search"
    
        # Build search query for events created after specified time
        query_str = f'created:">={created_from}"'
    
        payload = {
            "indices": ["events"],
            "query": {"query": query_str},
            "sort": ["created", "+id"],
            "limit": PAGE_SIZE
        }
    
        if search_after:
            payload["searchAfter"] = search_after
    
        attempt = 0
        while True:
            req = Request(search_url, data=json.dumps(payload).encode('utf-8'), method="POST")
            req.add_header("Content-Type", "application/json")
            req.add_header("Accept", "application/json")
            req.add_header("Authorization", f"Bearer {access_token}")
            req.add_header("User-Agent", USER_AGENT)
    
            try:
                with urlopen(req, timeout=HTTP_TIMEOUT) as r:
                    response = json.loads(r.read())
                    # Handle different response formats
                    if isinstance(response, list):
                        return response
                    return response.get("results", response.get("data", []))
            except (HTTPError, URLError) as e:
                attempt += 1
                print(f"HTTP error on attempt {attempt}: {e}")
                if attempt > MAX_RETRIES:
                    raise
                # exponential backoff with jitter
                time.sleep(min(60, 2 ** attempt) + (time.time() % 1))
    
    def _put_events_data(events: list, from_ts: float, to_ts: float, page_num: int) -> str:
        # Create unique S3 key for events data
        ts_path = time.strftime("%Y/%m/%d", time.gmtime(to_ts))
        uniq = f"{int(time.time()*1e6)}_{uuid.uuid4().hex[:8]}"
        key = f"{S3_PREFIX}{ts_path}/sailpoint_iam_{int(from_ts)}_{int(to_ts)}_p{page_num:03d}_{uniq}.json"
    
        s3.put_object(
            Bucket=S3_BUCKET, 
            Key=key, 
            Body=json.dumps(events, separators=(",", ":")).encode("utf-8"), 
            ContentType="application/json",
            Metadata={
                'source': 'sailpoint-iam',
                'from_timestamp': str(int(from_ts)),
                'to_timestamp': str(int(to_ts)),
                'page_number': str(page_num),
                'events_count': str(len(events))
            }
        )
        return key
    
    def _get_item_id(item: dict) -> str:
        """Extract ID from event item, trying multiple possible fields"""
        for field in ("id", "uuid", "eventId", "_id"):
            if field in item and item[field]:
                return str(item[field])
        return ""
    
    def lambda_handler(event=None, context=None):
        st = _load_state()
        now = time.time()
        from_ts = float(st.get("last_to_ts") or (now - WINDOW_SEC))
        to_ts = now
    
        # Get OAuth token
        access_token = _get_oauth_token()
    
        created_from = _iso(from_ts)
        print(f"Fetching SailPoint IAM events from: {created_from}")
    
        # Handle pagination state
        last_created = st.get("last_created")
        last_id = st.get("last_id")
        search_after = [last_created, last_id] if (last_created and last_id) else None
    
        pages = 0
        total_events = 0
        written_keys = []
        newest_created = last_created or created_from
        newest_id = last_id or ""
    
        while pages < MAX_PAGES:
            events = _search_events(access_token, created_from, search_after)
    
            if not events:
                break
    
            # Write page to S3
            key = _put_events_data(events, from_ts, to_ts, pages + 1)
            written_keys.append(key)
            total_events += len(events)
    
            # Update pagination state from last item
            last_event = events[-1]
            last_event_created = last_event.get("created") or last_event.get("metadata", {}).get("created")
            last_event_id = _get_item_id(last_event)
    
            if last_event_created:
                newest_created = last_event_created
            if last_event_id:
                newest_id = last_event_id
    
            search_after = [newest_created, newest_id]
            pages += 1
    
            # If we got less than page size, we're done
            if len(events) < PAGE_SIZE:
                break
    
        print(f"Successfully retrieved {total_events} events across {pages} pages")
    
        # Save state for next run
        st["last_to_ts"] = to_ts
        st["last_created"] = newest_created
        st["last_id"] = newest_id
        st["last_successful_run"] = now
        _save_state(st)
    
        return {
            "statusCode": 200,
            "body": {
                "success": True,
                "pages": pages,
                "total_events": total_events,
                "s3_keys": written_keys,
                "from_timestamp": from_ts,
                "to_timestamp": to_ts,
                "last_created": newest_created,
                "last_id": newest_id
            }
        }
    
    if __name__ == "__main__":
        print(lambda_handler())
    
  5. 依序前往「設定」>「環境變數」

  6. 依序點選「編輯」> 新增環境變數

  7. 輸入下表提供的環境變數,並將範例值換成您的值。

    環境變數

    範例值
    S3_BUCKET sailpoint-iam-logs
    S3_PREFIX sailpoint/iam/
    STATE_KEY sailpoint/iam/state.json
    WINDOW_SECONDS 3600
    HTTP_TIMEOUT 60
    MAX_RETRIES 3
    USER_AGENT sailpoint-iam-to-s3/1.0
    IDN_BASE https://tenant.api.identitynow.com
    IDN_CLIENT_ID your-client-id (步驟 2)
    IDN_CLIENT_SECRET your-client-secret (步驟 2)
    IDN_SCOPE sp:scopes:all
    PAGE_SIZE 250
    MAX_PAGES 20
  8. 建立函式後,請留在函式頁面 (或依序開啟「Lambda」>「Functions」>「your-function」)。

  9. 選取「設定」分頁標籤。

  10. 在「一般設定」面板中,按一下「編輯」

  11. 將「Timeout」(逾時間隔) 變更為「5 minutes (300 seconds)」(5 分鐘 (300 秒)),然後按一下「Save」(儲存)

建立 EventBridge 排程

  1. 依序前往「Amazon EventBridge」>「Scheduler」>「Create schedule」
  2. 提供下列設定詳細資料:
    • 週期性時間表費率 (1 hour)。
    • 目標:您的 Lambda 函式 sailpoint_iam_to_s3
    • 名稱sailpoint-iam-1h
  3. 按一下「建立時間表」

(選用) 為 Google SecOps 建立唯讀 IAM 使用者和金鑰

  1. 前往 AWS 控制台 > IAM > 使用者
  2. 點選 [Add users] (新增使用者)。
  3. 提供下列設定詳細資料:
    • 使用者:輸入 secops-reader
    • 存取類型:選取「存取金鑰 - 程式輔助存取」
  4. 按一下「建立使用者」
  5. 附加最低讀取權限政策 (自訂):依序選取「Users」(使用者) >「secops-reader」>「Permissions」(權限) >「Add permissions」(新增權限) >「Attach policies directly」(直接附加政策) >「Create policy」(建立政策)
  6. JSON:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::sailpoint-iam-logs/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::sailpoint-iam-logs"
        }
      ]
    }
    
  7. Name = secops-reader-policy

  8. 依序點選「建立政策」> 搜尋/選取 >「下一步」>「新增權限」

  9. secops-reader 建立存取金鑰:依序點選「安全憑證」>「存取金鑰」

  10. 按一下「建立存取金鑰」

  11. 下載 .CSV。(您會將這些值貼到動態饋給中)。

在 Google SecOps 中設定動態饋給,擷取 SailPoint IAM 記錄

  1. 依序前往「SIEM 設定」>「動態饋給」
  2. 按一下「+ 新增動態消息」
  3. 在「動態饋給名稱」欄位中輸入動態饋給名稱 (例如 SailPoint IAM logs)。
  4. 選取「Amazon S3 V2」做為「來源類型」
  5. 選取「SailPoint IAM」做為「記錄類型」
  6. 點選「下一步」
  7. 指定下列輸入參數的值:
    • S3 URIs3://sailpoint-iam-logs/sailpoint/iam/
    • 來源刪除選項:根據偏好設定選取刪除選項。
    • 檔案存在時間上限:包含在過去天數內修改的檔案。預設值為 180 天。
    • 存取金鑰 ID:具有 S3 值區存取權的使用者存取金鑰。
    • 存取密鑰:具有 S3 bucket 存取權的使用者私密金鑰。
    • 資產命名空間資產命名空間
    • 擷取標籤:套用至這個動態饋給事件的標籤。
  8. 點選「下一步」
  9. 在「完成」畫面中檢查新的動態饋給設定,然後按一下「提交」

UDM 對應表

記錄欄位 UDM 對應 邏輯
action metadata.description 原始記錄中的 action 欄位值。
actor.name principal.user.user_display_name 原始記錄中的 actor.name 欄位值。
attributes.accountName principal.user.group_identifiers 原始記錄中的 attributes.accountName 欄位值。
attributes.appId target.asset_id 「App ID: 」與原始記錄中 attributes.appId 欄位的值串連。
attributes.attributeName additional.fields[0].value.string_value 原始記錄中的 attributes.attributeName 欄位值,放在 additional.fields 物件中。索引鍵會設為「屬性名稱」。
attributes.attributeValue additional.fields[1].value.string_value 原始記錄中的 attributes.attributeValue 欄位值,放在 additional.fields 物件中。索引鍵設為「屬性值」。
attributes.cloudAppName target.application 原始記錄中的 attributes.cloudAppName 欄位值。
attributes.hostName target.hostnametarget.asset.hostname 原始記錄中的 attributes.hostName 欄位值。
attributes.interface additional.fields[2].value.string_value 原始記錄中的 attributes.interface 欄位值,放在 additional.fields 物件中。金鑰設為「Interface」。
attributes.operation security_result.action_details 原始記錄中的 attributes.operation 欄位值。
attributes.previousValue additional.fields[3].value.string_value 原始記錄中的 attributes.previousValue 欄位值,放在 additional.fields 物件中。索引鍵設為「Previous Value」。
attributes.provisioningResult security_result.detection_fields.value 原始記錄中的 attributes.provisioningResult 欄位值,放置在 security_result.detection_fields 物件中。索引鍵設為「Provisioning Result」。
attributes.sourceId principal.labels[0].value 原始記錄中的 attributes.sourceId 欄位值,放置在 principal.labels 物件中。索引鍵設為「來源 ID」。
attributes.sourceName principal.labels[1].value 原始記錄中的 attributes.sourceName 欄位值,放置在 principal.labels 物件中。索引鍵設為「來源名稱」。
auditClassName metadata.product_event_type 原始記錄中的 auditClassName 欄位值。
created metadata.event_timestamp.secondsmetadata.event_timestamp.nanos 原始記錄中的 created 欄位值,如果沒有 instant.epochSecond,則會轉換為時間戳記。
id metadata.product_log_id 原始記錄中的 id 欄位值。
instant.epochSecond metadata.event_timestamp.seconds 原始記錄中的 instant.epochSecond 欄位值,用於時間戳記。
ipAddress principal.asset.ipprincipal.ip 原始記錄中的 ipAddress 欄位值。
interface additional.fields[0].value.string_value 原始記錄中的 interface 欄位值,放在 additional.fields 物件中。金鑰設為「介面」。
loggerName intermediary.application 原始記錄中的 loggerName 欄位值。
message metadata.descriptionsecurity_result.description 用於各種用途,包括在 metadata 和 security_result 中設定說明,以及擷取 XML 內容。
name security_result.description 原始記錄中的 name 欄位值。
operation target.resource.attribute.labels[0].valuemetadata.product_event_type 原始記錄中的 operation 欄位值,放置在 target.resource.attribute.labels 物件中。索引鍵設為「operation」。metadata.product_event_type 也使用這筆付款資料。
org principal.administrative_domain 原始記錄中的 org 欄位值。
pod principal.location.name 原始記錄中的 pod 欄位值。
referenceClass additional.fields[1].value.string_value 原始記錄中的 referenceClass 欄位值,放在 additional.fields 物件中。索引鍵設為「referenceClass」。
referenceId additional.fields[2].value.string_value 原始記錄中的 referenceId 欄位值,放在 additional.fields 物件中。金鑰設為「referenceId」。
sailPointObjectName additional.fields[3].value.string_value 原始記錄中的 sailPointObjectName 欄位值,放在 additional.fields 物件中。索引鍵設為「sailPointObjectName」。
serverHost principal.hostnameprincipal.asset.hostname 原始記錄中的 serverHost 欄位值。
stack additional.fields[4].value.string_value 原始記錄中的 stack 欄位值,放在 additional.fields 物件中。索引鍵設為「Stack」。
status security_result.severity_details 原始記錄中的 status 欄位值。
target additional.fields[4].value.string_value 原始記錄中的 target 欄位值,放在 additional.fields 物件中。索引鍵設為「target」。
target.name principal.user.userid 原始記錄中的 target.name 欄位值。
technicalName security_result.summary 原始記錄中的 technicalName 欄位值。
thrown.cause.message xml_bodydetailed_message 原始記錄中的 thrown.cause.message 欄位值,用於擷取 XML 內容。
thrown.message xml_bodydetailed_message 原始記錄中的 thrown.message 欄位值,用於擷取 XML 內容。
trackingNumber additional.fields[5].value.string_value 原始記錄中的 trackingNumber 欄位值,放在 additional.fields 物件中。索引鍵設為「追蹤號碼」。
type metadata.product_event_type 原始記錄中的 type 欄位值。
_version metadata.product_version 原始記錄中的 _version 欄位值。
不適用 metadata.event_timestamp 衍生自 instant.epochSecondcreated 欄位。
不適用 metadata.event_type 由剖析器邏輯根據各種欄位 (包括 has_principal_userhas_target_applicationtechnicalNameaction) 判斷。預設值為「GENERIC_EVENT」。
不適用 metadata.log_type 設為「SAILPOINT_IAM」。
不適用 metadata.product_name 設為 IAM
不適用 metadata.vendor_name 設為「SAILPOINT」。
不適用 extensions.auth.type 在特定情況下,請設為「AUTHTYPE_UNSPECIFIED」。
不適用 target.resource.attribute.labels[0].key 設為「operation」。

還有其他問題嗎?向社群成員和 Google SecOps 專業人員尋求答案。