SailPoint IAM 로그 수집
이 문서에서는 Amazon S3를 사용하여 SailPoint Identity and Access Management (IAM) 로그를 Google Security Operations에 수집하는 방법을 설명합니다. 파서는 JSON 및 XML 형식의 로그를 처리하여 통합 데이터 모델 (UDM)로 변환합니다. 단일 UDM 이벤트 (ProvisioningPlan, AccountRequest, SOAP-ENV), 다중 UDM 이벤트 (ProvisioningProject), UDM 항목 (Identity)을 구분하여 XML이 아닌 데이터의 일반 이벤트 처리를 비롯한 각 항목에 특정 파싱 로직과 필드 매핑을 적용합니다.
시작하기 전에
다음 기본 요건이 충족되었는지 확인합니다.
- Google SecOps 인스턴스입니다.
- SailPoint Identity Security Cloud에 대한 액세스 권한 관리
- AWS (S3, IAM, Lambda, EventBridge)에 대한 권한 액세스
SailPoint IAM 필수사항 (ID, API 키, 조직 ID, 토큰) 수집
- 관리자로 SailPoint Identity Security Cloud Admin Console에 로그인합니다.
- 전체 > 보안 설정 > API 관리로 이동합니다.
- API 클라이언트 만들기를 클릭합니다.
- 부여 유형으로 클라이언트 사용자 인증 정보를 선택합니다.
- 다음 구성 세부정보를 제공합니다.
- 이름: 설명이 포함된 이름을 입력합니다 (예:
Google SecOps Export API
). - 설명: API 클라이언트의 설명을 입력합니다.
- 범위:
sp:scopes:all
를 선택합니다.
- 이름: 설명이 포함된 이름을 입력합니다 (예:
- 만들기를 클릭하고 생성된 API 사용자 인증 정보를 안전한 위치에 저장합니다.
- SailPoint 테넌트 기본 URL (예:
https://tenant.api.identitynow.com
)을 기록합니다. - 다음 세부정보를 복사하여 안전한 위치에 저장합니다.
- IDN_CLIENT_ID.
- IDN_CLIENT_SECRET.
- IDN_BASE
Google SecOps용 AWS S3 버킷 및 IAM 구성
- 이 사용자 가이드(버킷 만들기)에 따라 Amazon S3 버킷을 만듭니다.
- 나중에 참조할 수 있도록 버킷 이름과 리전을 저장합니다 (예:
sailpoint-iam-logs
). - 이 사용자 가이드(IAM 사용자 만들기)에 따라 사용자를 만듭니다.
- 생성된 사용자를 선택합니다.
- 보안 사용자 인증 정보 탭을 선택합니다.
- 액세스 키 섹션에서 액세스 키 만들기를 클릭합니다.
- 사용 사례로 서드 파티 서비스를 선택합니다.
- 다음을 클릭합니다.
- 선택사항: 설명 태그를 추가합니다.
- 액세스 키 만들기를 클릭합니다.
- CSV 파일 다운로드를 클릭하여 향후 참조할 수 있도록 액세스 키와 비밀 액세스 키를 저장합니다.
- 완료를 클릭합니다.
- 권한 탭을 선택합니다.
- 권한 정책 섹션에서 권한 추가를 클릭합니다.
- 권한 추가를 선택합니다.
- 정책 직접 연결을 선택합니다.
- AmazonS3FullAccess 정책을 검색합니다.
- 정책을 선택합니다.
- 다음을 클릭합니다.
- 권한 추가를 클릭합니다.
S3 업로드용 IAM 정책 및 역할 구성
- AWS 콘솔에서 IAM > 정책으로 이동합니다.
- 정책 만들기 > JSON 탭을 클릭합니다.
- 다음 정책을 복사하여 붙여넣습니다.
정책 JSON (다른 버킷 이름을 입력한 경우
sailpoint-iam-logs
대체):{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::sailpoint-iam-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::sailpoint-iam-logs/sailpoint/iam/state.json" } ] }
다음 > 정책 만들기를 클릭합니다.
IAM > 역할 > 역할 생성 > AWS 서비스 > Lambda로 이동합니다.
새로 만든 정책을 연결합니다.
역할 이름을
SailPointIamToS3Role
로 지정하고 역할 만들기를 클릭합니다.
Lambda 함수 만들기
- AWS 콘솔에서 Lambda > 함수 > 함수 만들기로 이동합니다.
- 처음부터 작성을 클릭합니다.
다음 구성 세부정보를 제공합니다.
설정 값 이름 sailpoint_iam_to_s3
런타임 Python 3.13 아키텍처 x86_64 실행 역할 SailPointIamToS3Role
함수가 생성되면 코드 탭을 열고 스텁을 삭제한 후 다음 코드 (
sailpoint_iam_to_s3.py
)를 붙여넣습니다.#!/usr/bin/env python3 # Lambda: Pull SailPoint Identity Security Cloud audit events and store raw JSON payloads to S3 # - Uses /v3/search API with pagination for audit events. # - Preserves vendor-native JSON format for identity events. # - Retries with exponential backoff; unique S3 keys to avoid overwrites. import os, json, time, uuid, urllib.parse from urllib.request import Request, urlopen from urllib.error import URLError, HTTPError import boto3 S3_BUCKET = os.environ["S3_BUCKET"] S3_PREFIX = os.environ.get("S3_PREFIX", "sailpoint/iam/") STATE_KEY = os.environ.get("STATE_KEY", "sailpoint/iam/state.json") WINDOW_SEC = int(os.environ.get("WINDOW_SECONDS", "3600")) # default 1h HTTP_TIMEOUT= int(os.environ.get("HTTP_TIMEOUT", "60")) IDN_BASE = os.environ["IDN_BASE"] # e.g. https://tenant.api.identitynow.com CLIENT_ID = os.environ["IDN_CLIENT_ID"] CLIENT_SECRET = os.environ["IDN_CLIENT_SECRET"] SCOPE = os.environ.get("IDN_SCOPE", "sp:scopes:all") PAGE_SIZE = int(os.environ.get("PAGE_SIZE", "250")) MAX_PAGES = int(os.environ.get("MAX_PAGES", "20")) MAX_RETRIES = int(os.environ.get("MAX_RETRIES", "3")) USER_AGENT = os.environ.get("USER_AGENT", "sailpoint-iam-to-s3/1.0") s3 = boto3.client("s3") def _load_state(): try: obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY) return json.loads(obj["Body"].read()) except Exception: return {} def _save_state(st): s3.put_object( Bucket=S3_BUCKET, Key=STATE_KEY, Body=json.dumps(st, separators=(",", ":")).encode("utf-8"), ContentType="application/json", ) def _iso(ts: float) -> str: return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(ts)) def _get_oauth_token() -> str: """Get OAuth2 access token using Client Credentials flow""" token_url = f"{IDN_BASE.rstrip('/')}/oauth/token" data = urllib.parse.urlencode({ 'grant_type': 'client_credentials', 'client_id': CLIENT_ID, 'client_secret': CLIENT_SECRET, 'scope': SCOPE }).encode('utf-8') req = Request(token_url, data=data, method="POST") req.add_header("Content-Type", "application/x-www-form-urlencoded") req.add_header("User-Agent", USER_AGENT) with urlopen(req, timeout=HTTP_TIMEOUT) as r: response = json.loads(r.read()) return response["access_token"] def _search_events(access_token: str, created_from: str, search_after: list = None) -> list: """Search for audit events using SailPoint's /v3/search API""" search_url = f"{IDN_BASE.rstrip('/')}/v3/search" # Build search query for events created after specified time query_str = f'created:">={created_from}"' payload = { "indices": ["events"], "query": {"query": query_str}, "sort": ["created", "+id"], "limit": PAGE_SIZE } if search_after: payload["searchAfter"] = search_after attempt = 0 while True: req = Request(search_url, data=json.dumps(payload).encode('utf-8'), method="POST") req.add_header("Content-Type", "application/json") req.add_header("Accept", "application/json") req.add_header("Authorization", f"Bearer {access_token}") req.add_header("User-Agent", USER_AGENT) try: with urlopen(req, timeout=HTTP_TIMEOUT) as r: response = json.loads(r.read()) # Handle different response formats if isinstance(response, list): return response return response.get("results", response.get("data", [])) except (HTTPError, URLError) as e: attempt += 1 print(f"HTTP error on attempt {attempt}: {e}") if attempt > MAX_RETRIES: raise # exponential backoff with jitter time.sleep(min(60, 2 ** attempt) + (time.time() % 1)) def _put_events_data(events: list, from_ts: float, to_ts: float, page_num: int) -> str: # Create unique S3 key for events data ts_path = time.strftime("%Y/%m/%d", time.gmtime(to_ts)) uniq = f"{int(time.time()*1e6)}_{uuid.uuid4().hex[:8]}" key = f"{S3_PREFIX}{ts_path}/sailpoint_iam_{int(from_ts)}_{int(to_ts)}_p{page_num:03d}_{uniq}.json" s3.put_object( Bucket=S3_BUCKET, Key=key, Body=json.dumps(events, separators=(",", ":")).encode("utf-8"), ContentType="application/json", Metadata={ 'source': 'sailpoint-iam', 'from_timestamp': str(int(from_ts)), 'to_timestamp': str(int(to_ts)), 'page_number': str(page_num), 'events_count': str(len(events)) } ) return key def _get_item_id(item: dict) -> str: """Extract ID from event item, trying multiple possible fields""" for field in ("id", "uuid", "eventId", "_id"): if field in item and item[field]: return str(item[field]) return "" def lambda_handler(event=None, context=None): st = _load_state() now = time.time() from_ts = float(st.get("last_to_ts") or (now - WINDOW_SEC)) to_ts = now # Get OAuth token access_token = _get_oauth_token() created_from = _iso(from_ts) print(f"Fetching SailPoint IAM events from: {created_from}") # Handle pagination state last_created = st.get("last_created") last_id = st.get("last_id") search_after = [last_created, last_id] if (last_created and last_id) else None pages = 0 total_events = 0 written_keys = [] newest_created = last_created or created_from newest_id = last_id or "" while pages < MAX_PAGES: events = _search_events(access_token, created_from, search_after) if not events: break # Write page to S3 key = _put_events_data(events, from_ts, to_ts, pages + 1) written_keys.append(key) total_events += len(events) # Update pagination state from last item last_event = events[-1] last_event_created = last_event.get("created") or last_event.get("metadata", {}).get("created") last_event_id = _get_item_id(last_event) if last_event_created: newest_created = last_event_created if last_event_id: newest_id = last_event_id search_after = [newest_created, newest_id] pages += 1 # If we got less than page size, we're done if len(events) < PAGE_SIZE: break print(f"Successfully retrieved {total_events} events across {pages} pages") # Save state for next run st["last_to_ts"] = to_ts st["last_created"] = newest_created st["last_id"] = newest_id st["last_successful_run"] = now _save_state(st) return { "statusCode": 200, "body": { "success": True, "pages": pages, "total_events": total_events, "s3_keys": written_keys, "from_timestamp": from_ts, "to_timestamp": to_ts, "last_created": newest_created, "last_id": newest_id } } if __name__ == "__main__": print(lambda_handler())
구성 > 환경 변수로 이동합니다.
수정 > 새 환경 변수 추가를 클릭합니다.
다음 표에 제공된 환경 변수를 입력하고 예시 값을 실제 값으로 바꿉니다.
환경 변수
키 예시 값 S3_BUCKET
sailpoint-iam-logs
S3_PREFIX
sailpoint/iam/
STATE_KEY
sailpoint/iam/state.json
WINDOW_SECONDS
3600
HTTP_TIMEOUT
60
MAX_RETRIES
3
USER_AGENT
sailpoint-iam-to-s3/1.0
IDN_BASE
https://tenant.api.identitynow.com
IDN_CLIENT_ID
your-client-id
(2단계에서)IDN_CLIENT_SECRET
your-client-secret
(2단계에서)IDN_SCOPE
sp:scopes:all
PAGE_SIZE
250
MAX_PAGES
20
함수가 생성되면 해당 페이지에 머무르거나 Lambda > 함수 > your-function을 엽니다.
구성 탭을 선택합니다.
일반 구성 패널에서 수정을 클릭합니다.
제한 시간을 5분 (300초)으로 변경하고 저장을 클릭합니다.
EventBridge 일정 만들기
- Amazon EventBridge > 스케줄러 > 일정 만들기로 이동합니다.
- 다음 구성 세부정보를 제공합니다.
- 반복 일정: 요금 (
1 hour
) - 타겟: Lambda 함수
sailpoint_iam_to_s3
- 이름:
sailpoint-iam-1h
.
- 반복 일정: 요금 (
- 일정 만들기를 클릭합니다.
(선택사항) Google SecOps용 읽기 전용 IAM 사용자 및 키 만들기
- AWS 콘솔 > IAM > 사용자로 이동합니다.
- Add users를 클릭합니다.
- 다음 구성 세부정보를 제공합니다.
- 사용자:
secops-reader
를 입력합니다. - 액세스 유형: 액세스 키 – 프로그래매틱 액세스를 선택합니다.
- 사용자:
- 사용자 만들기를 클릭합니다.
- 최소 읽기 정책 (맞춤) 연결: 사용자 > secops-reader > 권한 > 권한 추가 > 정책 직접 연결 > 정책 만들기
JSON:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::sailpoint-iam-logs/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::sailpoint-iam-logs" } ] }
이름 =
secops-reader-policy
정책 만들기 > 검색/선택 > 다음 > 권한 추가를 클릭합니다.
secops-reader
의 액세스 키를 만듭니다(보안 사용자 인증 정보 > 액세스 키).액세스 키 만들기를 클릭합니다.
.CSV
을 다운로드합니다. (이 값은 피드에 붙여넣습니다.)
SailPoint IAM 로그를 수집하도록 Google SecOps에서 피드 구성
- SIEM 설정> 피드로 이동합니다.
- + 새 피드 추가를 클릭합니다.
- 피드 이름 필드에 피드 이름을 입력합니다 (예:
SailPoint IAM logs
). - 소스 유형으로 Amazon S3 V2를 선택합니다.
- 로그 유형으로 SailPoint IAM을 선택합니다.
- 다음을 클릭합니다.
- 다음 입력 파라미터의 값을 지정합니다.
- S3 URI:
s3://sailpoint-iam-logs/sailpoint/iam/
- 소스 삭제 옵션: 환경설정에 따라 삭제 옵션을 선택합니다.
- 최대 파일 기간: 지난 일수 동안 수정된 파일을 포함합니다. 기본값은 180일입니다.
- 액세스 키 ID: S3 버킷에 대한 액세스 권한이 있는 사용자 액세스 키입니다.
- 보안 비밀 액세스 키: S3 버킷에 액세스할 수 있는 사용자 보안 비밀 키입니다.
- 애셋 네임스페이스: 애셋 네임스페이스입니다.
- 수집 라벨: 이 피드의 이벤트에 적용된 라벨입니다.
- S3 URI:
- 다음을 클릭합니다.
- 확정 화면에서 새 피드 구성을 검토한 다음 제출을 클릭합니다.
UDM 매핑 테이블
로그 필드 | UDM 매핑 | 논리 |
---|---|---|
action |
metadata.description |
원시 로그의 action 필드 값입니다. |
actor.name |
principal.user.user_display_name |
원시 로그의 actor.name 필드 값입니다. |
attributes.accountName |
principal.user.group_identifiers |
원시 로그의 attributes.accountName 필드 값입니다. |
attributes.appId |
target.asset_id |
원시 로그의 attributes.appId 필드 값과 연결된 '앱 ID: ' |
attributes.attributeName |
additional.fields[0].value.string_value |
additional.fields 객체 내에 배치된 원시 로그의 attributes.attributeName 필드 값입니다. 키는 '속성 이름'으로 설정됩니다. |
attributes.attributeValue |
additional.fields[1].value.string_value |
additional.fields 객체 내에 배치된 원시 로그의 attributes.attributeValue 필드 값입니다. 키가 '속성 값'으로 설정됩니다. |
attributes.cloudAppName |
target.application |
원시 로그의 attributes.cloudAppName 필드 값입니다. |
attributes.hostName |
target.hostname , target.asset.hostname |
원시 로그의 attributes.hostName 필드 값입니다. |
attributes.interface |
additional.fields[2].value.string_value |
additional.fields 객체 내에 배치된 원시 로그의 attributes.interface 필드 값입니다. 키가 'Interface'로 설정됩니다. |
attributes.operation |
security_result.action_details |
원시 로그의 attributes.operation 필드 값입니다. |
attributes.previousValue |
additional.fields[3].value.string_value |
additional.fields 객체 내에 배치된 원시 로그의 attributes.previousValue 필드 값입니다. 키가 '이전 값'으로 설정됩니다. |
attributes.provisioningResult |
security_result.detection_fields.value |
security_result.detection_fields 객체 내에 배치된 원시 로그의 attributes.provisioningResult 필드 값입니다. 키는 '프로비저닝 결과'로 설정됩니다. |
attributes.sourceId |
principal.labels[0].value |
principal.labels 객체 내에 배치된 원시 로그의 attributes.sourceId 필드 값입니다. 키가 '소스 ID'로 설정됩니다. |
attributes.sourceName |
principal.labels[1].value |
principal.labels 객체 내에 배치된 원시 로그의 attributes.sourceName 필드 값입니다. 키가 '소스 이름'으로 설정되어 있습니다. |
auditClassName |
metadata.product_event_type |
원시 로그의 auditClassName 필드 값입니다. |
created |
metadata.event_timestamp.seconds , metadata.event_timestamp.nanos |
원시 로그의 created 필드 값입니다. instant.epochSecond 이 없는 경우 타임스탬프로 변환됩니다. |
id |
metadata.product_log_id |
원시 로그의 id 필드 값입니다. |
instant.epochSecond |
metadata.event_timestamp.seconds |
타임스탬프에 사용되는 원시 로그의 instant.epochSecond 필드 값입니다. |
ipAddress |
principal.asset.ip , principal.ip |
원시 로그의 ipAddress 필드 값입니다. |
interface |
additional.fields[0].value.string_value |
additional.fields 객체 내에 배치된 원시 로그의 interface 필드 값입니다. 키가 'interface'로 설정됩니다. |
loggerName |
intermediary.application |
원시 로그의 loggerName 필드 값입니다. |
message |
metadata.description , security_result.description |
메타데이터 및 security_result에 설명을 설정하고 XML 콘텐츠를 추출하는 등 다양한 용도로 사용됩니다. |
name |
security_result.description |
원시 로그의 name 필드 값입니다. |
operation |
target.resource.attribute.labels[0].value , metadata.product_event_type |
target.resource.attribute.labels 객체 내에 배치된 원시 로그의 operation 필드 값입니다. 키는 'operation'으로 설정됩니다. metadata.product_event_type 에도 사용됩니다. |
org |
principal.administrative_domain |
원시 로그의 org 필드 값입니다. |
pod |
principal.location.name |
원시 로그의 pod 필드 값입니다. |
referenceClass |
additional.fields[1].value.string_value |
additional.fields 객체 내에 배치된 원시 로그의 referenceClass 필드 값입니다. 키는 'referenceClass'로 설정됩니다. |
referenceId |
additional.fields[2].value.string_value |
additional.fields 객체 내에 배치된 원시 로그의 referenceId 필드 값입니다. 키는 'referenceId'로 설정됩니다. |
sailPointObjectName |
additional.fields[3].value.string_value |
additional.fields 객체 내에 배치된 원시 로그의 sailPointObjectName 필드 값입니다. 키는 'sailPointObjectName'으로 설정됩니다. |
serverHost |
principal.hostname , principal.asset.hostname |
원시 로그의 serverHost 필드 값입니다. |
stack |
additional.fields[4].value.string_value |
additional.fields 객체 내에 배치된 원시 로그의 stack 필드 값입니다. 키가 'Stack'으로 설정됩니다. |
status |
security_result.severity_details |
원시 로그의 status 필드 값입니다. |
target |
additional.fields[4].value.string_value |
additional.fields 객체 내에 배치된 원시 로그의 target 필드 값입니다. 키가 'target'으로 설정됩니다. |
target.name |
principal.user.userid |
원시 로그의 target.name 필드 값입니다. |
technicalName |
security_result.summary |
원시 로그의 technicalName 필드 값입니다. |
thrown.cause.message |
xml_body , detailed_message |
XML 콘텐츠를 추출하는 데 사용되는 원시 로그의 thrown.cause.message 필드 값입니다. |
thrown.message |
xml_body , detailed_message |
XML 콘텐츠를 추출하는 데 사용되는 원시 로그의 thrown.message 필드 값입니다. |
trackingNumber |
additional.fields[5].value.string_value |
additional.fields 객체 내에 배치된 원시 로그의 trackingNumber 필드 값입니다. 키는 '운송장 번호'로 설정됩니다. |
type |
metadata.product_event_type |
원시 로그의 type 필드 값입니다. |
_version |
metadata.product_version |
원시 로그의 _version 필드 값입니다. |
해당 사항 없음 | metadata.event_timestamp |
instant.epochSecond 또는 created 필드에서 파생됩니다. |
해당 사항 없음 | metadata.event_type |
has_principal_user , has_target_application , technicalName , action 등 다양한 필드를 기반으로 파서 로직에 의해 결정됩니다. 기본값은 'GENERIC_EVENT'입니다. |
해당 사항 없음 | metadata.log_type |
'SAILPOINT_IAM'으로 설정됩니다. |
해당 사항 없음 | metadata.product_name |
IAM 로 설정합니다. |
해당 사항 없음 | metadata.vendor_name |
'SAILPOINT'로 설정합니다. |
해당 사항 없음 | extensions.auth.type |
특정 조건에서 'AUTHTYPE_UNSPECIFIED'로 설정됩니다. |
해당 사항 없음 | target.resource.attribute.labels[0].key |
'operation'으로 설정합니다. |
도움이 더 필요하신가요? 커뮤니티 회원 및 Google SecOps 전문가로부터 답변을 받으세요.