收集 SailPoint IAM 日志

支持的平台:

本文档介绍了如何使用 Amazon S3 将 SailPoint Identity and Access Management (IAM) 日志注入到 Google Security Operations。解析器会处理 JSON 和 XML 格式的日志,并将其转换为统一数据模型 (UDM)。它可区分单个 UDM 事件(ProvisioningPlan、AccountRequest、SOAP-ENV)、多个 UDM 事件 (ProvisioningProject) 和 UDM 实体 (Identity),并为每个事件应用特定的解析逻辑和字段映射,包括针对非 XML 数据的通用事件处理。

准备工作

请确保满足以下前提条件:

  • Google SecOps 实例。
  • SailPoint Identity Security Cloud 的特权访问权限。
  • AWS(S3、IAM、Lambda、EventBridge)的特权访问权限。

收集 SailPoint IAM 前提条件(ID、API 密钥、组织 ID、令牌)

  1. 以管理员身份登录 SailPoint Identity Security Cloud 管理控制台
  2. 依次前往全局 > 安全设置 > API 管理
  3. 点击创建 API 客户端
  4. 选择客户端凭据作为授权类型。
  5. 提供以下配置详细信息:
    • 名称:输入一个描述性名称(例如 Google SecOps Export API)。
    • 说明:输入 API 客户端的说明。
    • 范围:选择 sp:scopes:all
  6. 点击创建,并将生成的 API 凭据保存到安全的位置。
  7. 记录您的 SailPoint 租户基础网址(例如 https://tenant.api.identitynow.com)。
  8. 复制以下详细信息并将其保存在安全的位置:
    • IDN_CLIENT_ID
    • IDN_CLIENT_SECRET
    • IDN_BASE

为 Google SecOps 配置 AWS S3 存储桶和 IAM

  1. 按照以下用户指南创建 Amazon S3 存储桶创建存储桶
  2. 保存存储桶名称区域以供日后参考(例如 sailpoint-iam-logs)。
  3. 按照以下用户指南创建用户创建 IAM 用户
  4. 选择创建的用户
  5. 选择安全凭据标签页。
  6. 访问密钥部分中,点击创建访问密钥
  7. 选择第三方服务作为使用情形
  8. 点击下一步
  9. 可选:添加说明标记。
  10. 点击创建访问密钥
  11. 点击下载 CSV 文件,保存访问密钥秘密访问密钥,以供日后参考。
  12. 点击完成
  13. 选择权限标签页。
  14. 权限政策部分中,点击添加权限
  15. 选择添加权限
  16. 选择直接附加政策
  17. 搜索 AmazonS3FullAccess 政策。
  18. 选择相应政策。
  19. 点击下一步
  20. 点击添加权限

为 S3 上传配置 IAM 政策和角色

  1. AWS 控制台中,依次前往 IAM > 政策
  2. 依次点击创建政策 > JSON 标签页
  3. 复制并粘贴以下政策。
  4. 政策 JSON(如果您输入了其他存储桶名称,请替换 sailpoint-iam-logs):

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::sailpoint-iam-logs/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": "s3:GetObject",
          "Resource": "arn:aws:s3:::sailpoint-iam-logs/sailpoint/iam/state.json"
        }
      ]
    }
    
  5. 依次点击下一步 > 创建政策

  6. 依次前往 IAM > 角色 > 创建角色 > AWS 服务 > Lambda

  7. 附加新创建的政策。

  8. 将角色命名为 SailPointIamToS3Role,然后点击创建角色

创建 Lambda 函数

  1. AWS 控制台中,依次前往 Lambda > 函数 > 创建函数
  2. 点击从头开始创作
  3. 提供以下配置详细信息:

    设置
    名称 sailpoint_iam_to_s3
    运行时 Python 3.13
    架构 x86_64
    执行角色 SailPointIamToS3Role
  4. 创建函数后,打开 Code 标签页,删除桩代码并粘贴以下代码 (sailpoint_iam_to_s3.py)。

    #!/usr/bin/env python3
    # Lambda: Pull SailPoint Identity Security Cloud audit events and store raw JSON payloads to S3
    # - Uses /v3/search API with pagination for audit events.
    # - Preserves vendor-native JSON format for identity events.
    # - Retries with exponential backoff; unique S3 keys to avoid overwrites.
    
    import os, json, time, uuid, urllib.parse
    from urllib.request import Request, urlopen
    from urllib.error import URLError, HTTPError
    
    import boto3
    
    S3_BUCKET   = os.environ["S3_BUCKET"]
    S3_PREFIX   = os.environ.get("S3_PREFIX", "sailpoint/iam/")
    STATE_KEY   = os.environ.get("STATE_KEY", "sailpoint/iam/state.json")
    WINDOW_SEC  = int(os.environ.get("WINDOW_SECONDS", "3600"))  # default 1h
    HTTP_TIMEOUT= int(os.environ.get("HTTP_TIMEOUT", "60"))
    IDN_BASE    = os.environ["IDN_BASE"]  # e.g. https://tenant.api.identitynow.com
    CLIENT_ID   = os.environ["IDN_CLIENT_ID"]
    CLIENT_SECRET = os.environ["IDN_CLIENT_SECRET"]
    SCOPE       = os.environ.get("IDN_SCOPE", "sp:scopes:all")
    PAGE_SIZE   = int(os.environ.get("PAGE_SIZE", "250"))
    MAX_PAGES   = int(os.environ.get("MAX_PAGES", "20"))
    MAX_RETRIES = int(os.environ.get("MAX_RETRIES", "3"))
    USER_AGENT  = os.environ.get("USER_AGENT", "sailpoint-iam-to-s3/1.0")
    
    s3 = boto3.client("s3")
    
    def _load_state():
        try:
            obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY)
            return json.loads(obj["Body"].read())
        except Exception:
            return {}
    
    def _save_state(st):
        s3.put_object(
            Bucket=S3_BUCKET,
            Key=STATE_KEY,
            Body=json.dumps(st, separators=(",", ":")).encode("utf-8"),
            ContentType="application/json",
        )
    
    def _iso(ts: float) -> str:
        return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(ts))
    
    def _get_oauth_token() -> str:
        """Get OAuth2 access token using Client Credentials flow"""
        token_url = f"{IDN_BASE.rstrip('/')}/oauth/token"
    
        data = urllib.parse.urlencode({
            'grant_type': 'client_credentials',
            'client_id': CLIENT_ID,
            'client_secret': CLIENT_SECRET,
            'scope': SCOPE
        }).encode('utf-8')
    
        req = Request(token_url, data=data, method="POST")
        req.add_header("Content-Type", "application/x-www-form-urlencoded")
        req.add_header("User-Agent", USER_AGENT)
    
        with urlopen(req, timeout=HTTP_TIMEOUT) as r:
            response = json.loads(r.read())
            return response["access_token"]
    
    def _search_events(access_token: str, created_from: str, search_after: list = None) -> list:
        """Search for audit events using SailPoint's /v3/search API"""
        search_url = f"{IDN_BASE.rstrip('/')}/v3/search"
    
        # Build search query for events created after specified time
        query_str = f'created:">={created_from}"'
    
        payload = {
            "indices": ["events"],
            "query": {"query": query_str},
            "sort": ["created", "+id"],
            "limit": PAGE_SIZE
        }
    
        if search_after:
            payload["searchAfter"] = search_after
    
        attempt = 0
        while True:
            req = Request(search_url, data=json.dumps(payload).encode('utf-8'), method="POST")
            req.add_header("Content-Type", "application/json")
            req.add_header("Accept", "application/json")
            req.add_header("Authorization", f"Bearer {access_token}")
            req.add_header("User-Agent", USER_AGENT)
    
            try:
                with urlopen(req, timeout=HTTP_TIMEOUT) as r:
                    response = json.loads(r.read())
                    # Handle different response formats
                    if isinstance(response, list):
                        return response
                    return response.get("results", response.get("data", []))
            except (HTTPError, URLError) as e:
                attempt += 1
                print(f"HTTP error on attempt {attempt}: {e}")
                if attempt > MAX_RETRIES:
                    raise
                # exponential backoff with jitter
                time.sleep(min(60, 2 ** attempt) + (time.time() % 1))
    
    def _put_events_data(events: list, from_ts: float, to_ts: float, page_num: int) -> str:
        # Create unique S3 key for events data
        ts_path = time.strftime("%Y/%m/%d", time.gmtime(to_ts))
        uniq = f"{int(time.time()*1e6)}_{uuid.uuid4().hex[:8]}"
        key = f"{S3_PREFIX}{ts_path}/sailpoint_iam_{int(from_ts)}_{int(to_ts)}_p{page_num:03d}_{uniq}.json"
    
        s3.put_object(
            Bucket=S3_BUCKET, 
            Key=key, 
            Body=json.dumps(events, separators=(",", ":")).encode("utf-8"), 
            ContentType="application/json",
            Metadata={
                'source': 'sailpoint-iam',
                'from_timestamp': str(int(from_ts)),
                'to_timestamp': str(int(to_ts)),
                'page_number': str(page_num),
                'events_count': str(len(events))
            }
        )
        return key
    
    def _get_item_id(item: dict) -> str:
        """Extract ID from event item, trying multiple possible fields"""
        for field in ("id", "uuid", "eventId", "_id"):
            if field in item and item[field]:
                return str(item[field])
        return ""
    
    def lambda_handler(event=None, context=None):
        st = _load_state()
        now = time.time()
        from_ts = float(st.get("last_to_ts") or (now - WINDOW_SEC))
        to_ts = now
    
        # Get OAuth token
        access_token = _get_oauth_token()
    
        created_from = _iso(from_ts)
        print(f"Fetching SailPoint IAM events from: {created_from}")
    
        # Handle pagination state
        last_created = st.get("last_created")
        last_id = st.get("last_id")
        search_after = [last_created, last_id] if (last_created and last_id) else None
    
        pages = 0
        total_events = 0
        written_keys = []
        newest_created = last_created or created_from
        newest_id = last_id or ""
    
        while pages < MAX_PAGES:
            events = _search_events(access_token, created_from, search_after)
    
            if not events:
                break
    
            # Write page to S3
            key = _put_events_data(events, from_ts, to_ts, pages + 1)
            written_keys.append(key)
            total_events += len(events)
    
            # Update pagination state from last item
            last_event = events[-1]
            last_event_created = last_event.get("created") or last_event.get("metadata", {}).get("created")
            last_event_id = _get_item_id(last_event)
    
            if last_event_created:
                newest_created = last_event_created
            if last_event_id:
                newest_id = last_event_id
    
            search_after = [newest_created, newest_id]
            pages += 1
    
            # If we got less than page size, we're done
            if len(events) < PAGE_SIZE:
                break
    
        print(f"Successfully retrieved {total_events} events across {pages} pages")
    
        # Save state for next run
        st["last_to_ts"] = to_ts
        st["last_created"] = newest_created
        st["last_id"] = newest_id
        st["last_successful_run"] = now
        _save_state(st)
    
        return {
            "statusCode": 200,
            "body": {
                "success": True,
                "pages": pages,
                "total_events": total_events,
                "s3_keys": written_keys,
                "from_timestamp": from_ts,
                "to_timestamp": to_ts,
                "last_created": newest_created,
                "last_id": newest_id
            }
        }
    
    if __name__ == "__main__":
        print(lambda_handler())
    
  5. 依次前往配置 > 环境变量

  6. 依次点击修改 > 添加新的环境变量

  7. 输入下表中提供的环境变量,并将示例值替换为您的值。

    环境变量

    示例值
    S3_BUCKET sailpoint-iam-logs
    S3_PREFIX sailpoint/iam/
    STATE_KEY sailpoint/iam/state.json
    WINDOW_SECONDS 3600
    HTTP_TIMEOUT 60
    MAX_RETRIES 3
    USER_AGENT sailpoint-iam-to-s3/1.0
    IDN_BASE https://tenant.api.identitynow.com
    IDN_CLIENT_ID your-client-id(来自第 2 步)
    IDN_CLIENT_SECRET your-client-secret(来自第 2 步)
    IDN_SCOPE sp:scopes:all
    PAGE_SIZE 250
    MAX_PAGES 20
  8. 创建函数后,请停留在其页面上(或依次打开 Lambda > 函数 > 您的函数)。

  9. 选择配置标签页。

  10. 常规配置面板中,点击修改

  11. 超时更改为 5 分钟(300 秒),然后点击保存

创建 EventBridge 计划

  1. 依次前往 Amazon EventBridge > 调度器 > 创建调度
  2. 提供以下配置详细信息:
    • 周期性安排费率 (1 hour)。
    • 目标:您的 Lambda 函数 sailpoint_iam_to_s3
    • 名称sailpoint-iam-1h
  3. 点击创建时间表

(可选)为 Google SecOps 创建只读 IAM 用户和密钥

  1. 依次前往 AWS 控制台 > IAM > 用户
  2. 点击 Add users(添加用户)。
  3. 提供以下配置详细信息:
    • 用户:输入 secops-reader
    • 访问类型:选择访问密钥 - 以程序化方式访问
  4. 点击创建用户
  5. 附加最低限度的读取政策(自定义):依次选择用户 > secops-reader > 权限 > 添加权限 > 直接附加政策 > 创建政策
  6. JSON:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::sailpoint-iam-logs/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::sailpoint-iam-logs"
        }
      ]
    }
    
  7. 名称 = secops-reader-policy

  8. 依次点击创建政策 > 搜索/选择 > 下一步 > 添加权限

  9. secops-reader 创建访问密钥:安全凭据 > 访问密钥

  10. 点击创建访问密钥

  11. 下载 .CSV。(您需要将这些值粘贴到 Feed 中)。

在 Google SecOps 中配置 Feed 以注入 SailPoint IAM 日志

  1. 依次前往 SIEM 设置> Feed
  2. 点击 + 添加新 Feed
  3. Feed 名称字段中,输入 Feed 的名称(例如 SailPoint IAM logs)。
  4. 选择 Amazon S3 V2 作为来源类型
  5. 选择 SailPoint IAM 作为日志类型
  6. 点击下一步
  7. 为以下输入参数指定值:
    • S3 URIs3://sailpoint-iam-logs/sailpoint/iam/
    • 来源删除选项:根据您的偏好选择删除选项。
    • 文件存在时间上限:包含在过去指定天数内修改的文件。默认值为 180 天。
    • 访问密钥 ID:有权访问 S3 存储桶的用户访问密钥。
    • 私有访问密钥:具有 S3 存储桶访问权限的用户私有密钥。
    • 资产命名空间资产命名空间
    • 注入标签:应用于此 Feed 中事件的标签。
  8. 点击下一步
  9. 最终确定界面中查看新的 Feed 配置,然后点击提交

UDM 映射表

日志字段 UDM 映射 逻辑
action metadata.description 原始日志中 action 字段的值。
actor.name principal.user.user_display_name 原始日志中 actor.name 字段的值。
attributes.accountName principal.user.group_identifiers 原始日志中 attributes.accountName 字段的值。
attributes.appId target.asset_id “应用 ID:”与原始日志中 attributes.appId 字段的值串联。
attributes.attributeName additional.fields[0].value.string_value 原始日志中 attributes.attributeName 字段的值,放置在 additional.fields 对象中。键设置为“属性名称”。
attributes.attributeValue additional.fields[1].value.string_value 原始日志中 attributes.attributeValue 字段的值,放置在 additional.fields 对象中。键设置为“属性值”。
attributes.cloudAppName target.application 原始日志中 attributes.cloudAppName 字段的值。
attributes.hostName target.hostnametarget.asset.hostname 原始日志中 attributes.hostName 字段的值。
attributes.interface additional.fields[2].value.string_value 原始日志中 attributes.interface 字段的值,放置在 additional.fields 对象中。键设置为“Interface”。
attributes.operation security_result.action_details 原始日志中 attributes.operation 字段的值。
attributes.previousValue additional.fields[3].value.string_value 原始日志中 attributes.previousValue 字段的值,放置在 additional.fields 对象中。相应键设置为“上一个值”。
attributes.provisioningResult security_result.detection_fields.value 原始日志中 attributes.provisioningResult 字段的值,放置在 security_result.detection_fields 对象中。键设置为“Provisioning Result”。
attributes.sourceId principal.labels[0].value 原始日志中 attributes.sourceId 字段的值,放置在 principal.labels 对象中。键设置为“Source Id”。
attributes.sourceName principal.labels[1].value 原始日志中 attributes.sourceName 字段的值,放置在 principal.labels 对象中。键设置为“来源名称”。
auditClassName metadata.product_event_type 原始日志中 auditClassName 字段的值。
created metadata.event_timestamp.secondsmetadata.event_timestamp.nanos 原始日志中 created 字段的值,如果不存在 instant.epochSecond,则转换为时间戳。
id metadata.product_log_id 原始日志中 id 字段的值。
instant.epochSecond metadata.event_timestamp.seconds 原始日志中 instant.epochSecond 字段的值,用于时间戳。
ipAddress principal.asset.ipprincipal.ip 原始日志中 ipAddress 字段的值。
interface additional.fields[0].value.string_value 原始日志中 interface 字段的值,放置在 additional.fields 对象中。键设置为“interface”。
loggerName intermediary.application 原始日志中 loggerName 字段的值。
message metadata.descriptionsecurity_result.description 用于各种用途,包括在元数据和 security_result 中设置说明,以及提取 XML 内容。
name security_result.description 原始日志中 name 字段的值。
operation target.resource.attribute.labels[0].valuemetadata.product_event_type 原始日志中 operation 字段的值,放置在 target.resource.attribute.labels 对象中。键设置为“operation”。还用于 metadata.product_event_type
org principal.administrative_domain 原始日志中 org 字段的值。
pod principal.location.name 原始日志中 pod 字段的值。
referenceClass additional.fields[1].value.string_value 原始日志中 referenceClass 字段的值,放置在 additional.fields 对象中。键设置为“referenceClass”。
referenceId additional.fields[2].value.string_value 原始日志中 referenceId 字段的值,放置在 additional.fields 对象中。键设置为“referenceId”。
sailPointObjectName additional.fields[3].value.string_value 原始日志中 sailPointObjectName 字段的值,放置在 additional.fields 对象中。键设置为“sailPointObjectName”。
serverHost principal.hostnameprincipal.asset.hostname 原始日志中 serverHost 字段的值。
stack additional.fields[4].value.string_value 原始日志中 stack 字段的值,放置在 additional.fields 对象中。键设置为“Stack”。
status security_result.severity_details 原始日志中 status 字段的值。
target additional.fields[4].value.string_value 原始日志中 target 字段的值,放置在 additional.fields 对象中。键设置为“target”。
target.name principal.user.userid 原始日志中 target.name 字段的值。
technicalName security_result.summary 原始日志中 technicalName 字段的值。
thrown.cause.message xml_bodydetailed_message 原始日志中 thrown.cause.message 字段的值,用于提取 XML 内容。
thrown.message xml_bodydetailed_message 原始日志中 thrown.message 字段的值,用于提取 XML 内容。
trackingNumber additional.fields[5].value.string_value 原始日志中 trackingNumber 字段的值,放置在 additional.fields 对象中。键设置为“Tracking Number”。
type metadata.product_event_type 原始日志中 type 字段的值。
_version metadata.product_version 原始日志中 _version 字段的值。
不适用 metadata.event_timestamp 派生自 instant.epochSecondcreated 字段。
不适用 metadata.event_type 由解析器逻辑根据各种字段(包括 has_principal_userhas_target_applicationtechnicalNameaction)确定。默认值为“GENERIC_EVENT”。
不适用 metadata.log_type 设置为“SAILPOINT_IAM”。
不适用 metadata.product_name 设置为 IAM
不适用 metadata.vendor_name 设置为“SAILPOINT”。
不适用 extensions.auth.type 在某些条件下设置为“AUTHTYPE_UNSPECIFIED”。
不适用 target.resource.attribute.labels[0].key 设置为“操作”。

需要更多帮助?从社区成员和 Google SecOps 专业人士那里获得解答。