Delinea SSO 로그 수집
이 문서에서는 Amazon S3를 사용하여 Delinea (이전 Centrify) SSO (Single Sign-On) 로그를 Google Security Operations에 수집하는 방법을 설명합니다. 파서는 JSON 및 syslog 형식을 모두 처리하여 로그를 추출합니다. 키-값 쌍, 타임스탬프, 기타 관련 필드를 파싱하여 UDM 모델에 매핑하고 로그인 실패, 사용자 에이전트, 심각도 수준, 인증 메커니즘, 다양한 이벤트 유형을 처리하는 특정 로직을 사용합니다. 실패 이벤트의 타겟 이메일 주소에 대해 NormalizedUser
보다 FailUserName
에 우선순위를 부여합니다.
시작하기 전에
다음 기본 요건이 충족되었는지 확인합니다.
- Google SecOps 인스턴스입니다.
- Delinea (Centrify) SSO 테넌트에 대한 액세스 권한 관리
- AWS (S3, Identity and Access Management (IAM), Lambda, EventBridge)에 대한 권한 있는 액세스
Delinea (Centrify) SSO 필수 요건 (ID, API 키, 조직 ID, 토큰) 수집
- Delinea 관리 포털에 로그인합니다.
- 앱 > 앱 추가로 이동합니다.
- OAuth2 클라이언트를 검색하고 추가를 클릭합니다.
- 웹 앱 추가 대화상자에서 예를 클릭합니다.
- 웹 앱 추가 대화상자에서 닫기를 클릭합니다.
- 애플리케이션 구성 페이지에서 다음을 구성합니다.
- 일반 탭:
- 애플리케이션 ID: 고유 식별자 (예:
secops-oauth-client
)를 입력합니다. - 애플리케이션 이름: 설명이 포함된 이름을 입력합니다 (예:
SecOps Data Export
). - 애플리케이션 설명: 설명을 입력합니다 (예:
OAuth client for exporting audit events to SecOps
).
- 애플리케이션 ID: 고유 식별자 (예:
- 신뢰 탭:
- 애플리케이션이 기밀임: 이 옵션을 선택합니다.
- 클라이언트 ID 유형: 기밀을 선택합니다.
- 발급된 클라이언트 ID: 이 값을 복사하여 저장합니다.
- 발급된 클라이언트 보안 비밀번호: 이 값을 복사하여 저장합니다.
- 토큰 탭:
- 인증 방법: 클라이언트 사용자 인증 정보를 선택합니다.
- 토큰 유형: Jwt RS256을 선택합니다.
- 범위 탭:
- SIEM 통합 액세스라는 설명이 있는 범위 siem을 추가합니다.
- Query API 액세스라는 설명과 함께 redrock/query 범위를 추가합니다.
- 일반 탭:
- 저장을 클릭하여 OAuth 클라이언트를 만듭니다.
- 핵심 서비스 > 사용자 > 사용자 추가로 이동합니다.
- 서비스 사용자를 구성합니다.
- 로그인 이름: 6단계의 클라이언트 ID를 입력합니다.
- 이메일 주소: 올바른 이메일을 입력합니다 (필수 입력란).
- 표시 이름: 설명이 포함된 이름 (예:
SecOps Service User
)을 입력합니다. - 비밀번호 및 비밀번호 확인: 6단계의 클라이언트 보안 비밀번호를 입력합니다.
- 상태: OAuth 컨피덴셜 클라이언트를 선택합니다.
- Create User(사용자 만들기)를 클릭합니다.
- 액세스 > 역할로 이동하여 감사 이벤트를 쿼리할 적절한 권한이 있는 역할에 서비스 사용자를 할당합니다.
- 다음 세부정보를 복사하여 안전한 위치에 저장합니다.
- 테넌트 URL: Centrify 테넌트 URL (예:
https://yourtenant.my.centrify.com
) - 클라이언트 ID: 6단계에서 확인
- 클라이언트 보안 비밀번호: 6단계에서 확인한 값
- OAuth 애플리케이션 ID: 애플리케이션 구성에서
- 테넌트 URL: Centrify 테넌트 URL (예:
Google SecOps용 AWS S3 버킷 및 IAM 구성
- 이 사용자 가이드(버킷 만들기)에 따라 Amazon S3 버킷을 만듭니다.
- 나중에 참조할 수 있도록 버킷 이름과 리전을 저장합니다 (예:
delinea-centrify-logs-bucket
). - 이 사용자 가이드(IAM 사용자 만들기)에 따라 사용자를 만듭니다.
- 생성된 사용자를 선택합니다.
- 보안 사용자 인증 정보 탭을 선택합니다.
- 액세스 키 섹션에서 액세스 키 만들기를 클릭합니다.
- 사용 사례로 서드 파티 서비스를 선택합니다.
- 다음을 클릭합니다.
- 선택사항: 설명 태그를 추가합니다.
- 액세스 키 만들기를 클릭합니다.
- .CSV 파일 다운로드를 클릭하여 향후 참조할 수 있도록 액세스 키와 비밀 액세스 키를 저장합니다.
- 완료를 클릭합니다.
- 권한 탭을 선택합니다.
- 권한 정책 섹션에서 권한 추가를 클릭합니다.
- 권한 추가를 선택합니다.
- 정책 직접 연결을 선택합니다.
- AmazonS3FullAccess 정책을 검색합니다.
- 정책을 선택합니다.
- 다음을 클릭합니다.
- 권한 추가를 클릭합니다.
S3 업로드용 IAM 정책 및 역할 구성
- AWS 콘솔에서 IAM > 정책으로 이동합니다.
- 정책 만들기 > JSON 탭을 클릭합니다.
- 다음 정책을 복사하여 붙여넣습니다.
정책 JSON (다른 버킷 이름을 입력한 경우
delinea-centrify-logs-bucket
대체):{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket/centrify-sso-logs/state.json" } ] }
다음 > 정책 만들기를 클릭합니다.
IAM > 역할로 이동합니다.
역할 만들기 > AWS 서비스 > Lambda를 클릭합니다.
새로 만든 정책과 관리형 정책 AWSLambdaBasicExecutionRole (CloudWatch 로깅용)을 연결합니다.
역할 이름을
CentrifySSOLogExportRole
로 지정하고 역할 만들기를 클릭합니다.
Lambda 함수 만들기
- AWS 콘솔에서 Lambda > 함수 > 함수 만들기로 이동합니다.
- 처음부터 작성을 클릭합니다.
다음 구성 세부정보를 제공합니다.
설정 값 이름 CentrifySSOLogExport
런타임 Python 3.13 아키텍처 x86_64 실행 역할 CentrifySSOLogExportRole
함수가 생성되면 코드 탭을 열고 스텁을 삭제한 후 다음 코드 (
CentrifySSOLogExport.py
)를 붙여넣습니다.import json import boto3 import requests import base64 from datetime import datetime, timedelta import os from typing import Dict, List, Optional def lambda_handler(event, context): """ Lambda function to fetch Delinea Centrify SSO audit events and store them in S3 """ # Environment variables S3_BUCKET = os.environ['S3_BUCKET'] S3_PREFIX = os.environ['S3_PREFIX'] STATE_KEY = os.environ['STATE_KEY'] # Centrify API credentials TENANT_URL = os.environ['TENANT_URL'] CLIENT_ID = os.environ['CLIENT_ID'] CLIENT_SECRET = os.environ['CLIENT_SECRET'] OAUTH_APP_ID = os.environ['OAUTH_APP_ID'] # Optional parameters PAGE_SIZE = int(os.environ.get('PAGE_SIZE', '1000')) MAX_PAGES = int(os.environ.get('MAX_PAGES', '10')) s3_client = boto3.client('s3') try: # Get last execution state last_timestamp = get_last_state(s3_client, S3_BUCKET, STATE_KEY) # Get OAuth access token access_token = get_oauth_token(TENANT_URL, CLIENT_ID, CLIENT_SECRET, OAUTH_APP_ID) # Fetch audit events events = fetch_audit_events(TENANT_URL, access_token, last_timestamp, PAGE_SIZE, MAX_PAGES) if events: # Store events in S3 current_timestamp = datetime.utcnow() filename = f"{S3_PREFIX}centrify-sso-events-{current_timestamp.strftime('%Y%m%d_%H%M%S')}.json" store_events_to_s3(s3_client, S3_BUCKET, filename, events) # Update state with latest timestamp latest_timestamp = get_latest_event_timestamp(events) update_state(s3_client, S3_BUCKET, STATE_KEY, latest_timestamp) print(f"Successfully processed {len(events)} events and stored to {filename}") else: print("No new events found") return { 'statusCode': 200, 'body': json.dumps(f'Successfully processed {len(events) if events else 0} events') } except Exception as e: print(f"Error processing Centrify SSO logs: {str(e)}") return { 'statusCode': 500, 'body': json.dumps(f'Error: {str(e)}') } def get_oauth_token(tenant_url: str, client_id: str, client_secret: str, oauth_app_id: str) -> str: """ Get OAuth access token using client credentials flow """ # Create basic auth token credentials = f"{client_id}:{client_secret}" basic_auth = base64.b64encode(credentials.encode()).decode() token_url = f"{tenant_url}/oauth2/token/{oauth_app_id}" headers = { 'Authorization': f'Basic {basic_auth}', 'X-CENTRIFY-NATIVE-CLIENT': 'True', 'Content-Type': 'application/x-www-form-urlencoded' } data = { 'grant_type': 'client_credentials', 'scope': 'siem redrock/query' } response = requests.post(token_url, headers=headers, data=data) response.raise_for_status() token_data = response.json() return token_data['access_token'] def fetch_audit_events(tenant_url: str, access_token: str, last_timestamp: str, page_size: int, max_pages: int) -> List[Dict]: """ Fetch audit events from Centrify using the Redrock/query API """ query_url = f"{tenant_url}/Redrock/query" headers = { 'Authorization': f'Bearer {access_token}', 'X-CENTRIFY-NATIVE-CLIENT': 'True', 'Content-Type': 'application/json' } # Build SQL query with timestamp filter if last_timestamp: sql_query = f"Select * from Event where WhenOccurred > '{last_timestamp}' ORDER BY WhenOccurred ASC" else: # First run - get events from last 24 hours sql_query = "Select * from Event where WhenOccurred > datefunc('now', '-1') ORDER BY WhenOccurred ASC" payload = { "Script": sql_query, "args": { "PageSize": page_size, "Limit": page_size * max_pages, "Caching": -1 } } response = requests.post(query_url, headers=headers, json=payload) response.raise_for_status() response_data = response.json() if not response_data.get('success', False): raise Exception(f"API query failed: {response_data.get('Message', 'Unknown error')}") # Parse the response result = response_data.get('Result', {}) columns = {col['Name']: i for i, col in enumerate(result.get('Columns', []))} raw_results = result.get('Results', []) events = [] for raw_event in raw_results: event = {} row_data = raw_event.get('Row', {}) # Map column names to values for col_name, col_index in columns.items(): if col_name in row_data and row_data[col_name] is not None: event[col_name] = row_data[col_name] # Add metadata event['_source'] = 'centrify_sso' event['_collected_at'] = datetime.utcnow().isoformat() + 'Z' events.append(event) return events def get_last_state(s3_client, bucket: str, state_key: str) -> Optional[str]: """ Get the last processed timestamp from S3 state file """ try: response = s3_client.get_object(Bucket=bucket, Key=state_key) state_data = json.loads(response['Body'].read().decode('utf-8')) return state_data.get('last_timestamp') except s3_client.exceptions.NoSuchKey: print("No previous state found, starting from 24 hours ago") return None except Exception as e: print(f"Error reading state: {e}") return None def update_state(s3_client, bucket: str, state_key: str, timestamp: str): """ Update the state file with the latest processed timestamp """ state_data = { 'last_timestamp': timestamp, 'updated_at': datetime.utcnow().isoformat() + 'Z' } s3_client.put_object( Bucket=bucket, Key=state_key, Body=json.dumps(state_data), ContentType='application/json' ) def store_events_to_s3(s3_client, bucket: str, key: str, events: List[Dict]): """ Store events as JSONL (one JSON object per line) in S3 """ # Convert to JSONL format (one JSON object per line) jsonl_content = 'n'.join(json.dumps(event, default=str) for event in events) s3_client.put_object( Bucket=bucket, Key=key, Body=jsonl_content, ContentType='application/x-ndjson' ) def get_latest_event_timestamp(events: List[Dict]) -> str: """ Get the latest timestamp from the events for state tracking """ if not events: return datetime.utcnow().isoformat() + 'Z' latest = None for event in events: when_occurred = event.get('WhenOccurred') if when_occurred: if latest is None or when_occurred > latest: latest = when_occurred return latest or datetime.utcnow().isoformat() + 'Z'
구성 > 환경 변수로 이동합니다.
수정 > 새 환경 변수 추가를 클릭합니다.
다음 표에 제공된 환경 변수를 입력하고 예시 값을 실제 값으로 바꿉니다.
환경 변수
키 예시 값 S3_BUCKET
delinea-centrify-logs-bucket
S3_PREFIX
centrify-sso-logs/
STATE_KEY
centrify-sso-logs/state.json
TENANT_URL
https://yourtenant.my.centrify.com
CLIENT_ID
your-client-id
CLIENT_SECRET
your-client-secret
OAUTH_APP_ID
your-oauth-application-id
OAUTH_SCOPE
siem
PAGE_SIZE
1000
MAX_PAGES
10
함수가 생성되면 해당 페이지에 머무르거나 Lambda > 함수 > your-function을 엽니다.
구성 탭을 선택합니다.
일반 구성 패널에서 수정을 클릭합니다.
제한 시간을 5분 (300초)으로 변경하고 저장을 클릭합니다.
EventBridge 일정 만들기
- Amazon EventBridge > 스케줄러 > 일정 만들기로 이동합니다.
- 다음 구성 세부정보를 제공합니다.
- 반복 일정: 요금 (
1 hour
) - 타겟: Lambda 함수
CentrifySSOLogExport
- 이름:
CentrifySSOLogExport-1h
.
- 반복 일정: 요금 (
- 일정 만들기를 클릭합니다.
(선택사항) Google SecOps용 읽기 전용 IAM 사용자 및 키 만들기
- AWS 콘솔에서 IAM > 사용자로 이동합니다.
- Add users를 클릭합니다.
- 다음 구성 세부정보를 제공합니다.
- 사용자:
secops-reader
를 입력합니다. - 액세스 유형: 액세스 키 – 프로그래매틱 액세스를 선택합니다.
- 사용자:
- 사용자 만들기를 클릭합니다.
- 최소 읽기 정책 (맞춤) 연결: 사용자 > secops-reader > 권한
- 권한 추가 > 정책 직접 연결을 클릭합니다.
- 정책 만들기를 선택합니다.
JSON:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket" } ] }
이름 =
secops-reader-policy
정책 만들기 > 검색/선택 > 다음을 클릭합니다.
권한 추가를 클릭합니다.
secops-reader
의 액세스 키를 만듭니다(보안 사용자 인증 정보 > 액세스 키).액세스 키 만들기를 클릭합니다.
.CSV
을 다운로드합니다. (이 값은 피드에 붙여넣습니다.)
Delinea (Centrify) SSO 로그를 수집하도록 Google SecOps에서 피드 구성
- SIEM 설정> 피드로 이동합니다.
- + 새 피드 추가를 클릭합니다.
- 피드 이름 필드에 피드 이름을 입력합니다 (예:
Delinea Centrify SSO logs
). - 소스 유형으로 Amazon S3 V2를 선택합니다.
- 로그 유형으로 Centrify를 선택합니다.
- 다음을 클릭합니다.
- 다음 입력 파라미터의 값을 지정합니다.
- S3 URI:
s3://delinea-centrify-logs-bucket/centrify-sso-logs/
- 소스 삭제 옵션: 환경설정에 따라 삭제 옵션을 선택합니다.
- 최대 파일 기간: 지난 일수 동안 수정된 파일을 포함합니다. 기본값은 180일입니다.
- 액세스 키 ID: S3 버킷에 대한 액세스 권한이 있는 사용자 액세스 키입니다.
- 보안 비밀 액세스 키: S3 버킷에 액세스할 수 있는 사용자 보안 비밀 키입니다.
- 애셋 네임스페이스: 애셋 네임스페이스입니다.
- 수집 라벨: 이 피드의 이벤트에 적용된 라벨입니다.
- S3 URI:
- 다음을 클릭합니다.
- 확정 화면에서 새 피드 구성을 검토한 다음 제출을 클릭합니다.
UDM 매핑 테이블
로그 필드 | UDM 매핑 | 논리 |
---|---|---|
AccountID |
security_result.detection_fields.value |
원시 로그의 AccountID 값이 key :Account ID 인 security_result.detection_fields 객체에 할당됩니다. |
ApplicationName |
target.application |
원시 로그의 ApplicationName 값이 target.application 필드에 할당됩니다. |
AuthorityFQDN |
target.asset.network_domain |
원시 로그의 AuthorityFQDN 값이 target.asset.network_domain 필드에 할당됩니다. |
AuthorityID |
target.asset.asset_id |
원시 로그의 AuthorityID 값이 target.asset.asset_id 필드에 할당되며, 'AuthorityID:'가 앞에 붙습니다. |
AzDeploymentId |
security_result.detection_fields.value |
원시 로그의 AzDeploymentId 값이 key :AzDeploymentId 인 security_result.detection_fields 객체에 할당됩니다. |
AzRoleId |
additional.fields.value.string_value |
원시 로그의 AzRoleId 값이 key :AzRole Id 인 additional.fields 객체에 할당됩니다. |
AzRoleName |
target.user.attribute.roles.name |
원시 로그의 AzRoleName 값이 target.user.attribute.roles.name 필드에 할당됩니다. |
ComputerFQDN |
principal.asset.network_domain |
원시 로그의 ComputerFQDN 값이 principal.asset.network_domain 필드에 할당됩니다. |
ComputerID |
principal.asset.asset_id |
원시 로그의 ComputerID 값이 principal.asset.asset_id 필드에 할당되며, 'ComputerId:'가 앞에 붙습니다. |
ComputerName |
about.hostname |
원시 로그의 ComputerName 값이 about.hostname 필드에 할당됩니다. |
CredentialId |
security_result.detection_fields.value |
원시 로그의 CredentialId 값이 key :Credential Id 인 security_result.detection_fields 객체에 할당됩니다. |
DirectoryServiceName |
security_result.detection_fields.value |
원시 로그의 DirectoryServiceName 값이 key :Directory Service Name 인 security_result.detection_fields 객체에 할당됩니다. |
DirectoryServiceNameLocalized |
security_result.detection_fields.value |
원시 로그의 DirectoryServiceNameLocalized 값이 key :Directory Service Name Localized 인 security_result.detection_fields 객체에 할당됩니다. |
DirectoryServiceUuid |
security_result.detection_fields.value |
원시 로그의 DirectoryServiceUuid 값이 key :Directory Service Uuid 인 security_result.detection_fields 객체에 할당됩니다. |
EventMessage |
security_result.summary |
원시 로그의 EventMessage 값이 security_result.summary 필드에 할당됩니다. |
EventType |
metadata.product_event_type |
원시 로그의 EventType 값이 metadata.product_event_type 필드에 할당됩니다. metadata.event_type 를 결정하는 데도 사용됩니다. |
FailReason |
security_result.summary |
원시 로그의 FailReason 값은 있는 경우 security_result.summary 필드에 할당됩니다. |
FailUserName |
target.user.email_addresses |
원시 로그의 FailUserName 값은 있는 경우 target.user.email_addresses 필드에 할당됩니다. |
FromIPAddress |
principal.ip |
원시 로그의 FromIPAddress 값이 principal.ip 필드에 할당됩니다. |
ID |
security_result.detection_fields.value |
원시 로그의 ID 값이 key :ID 인 security_result.detection_fields 객체에 할당됩니다. |
InternalTrackingID |
metadata.product_log_id |
원시 로그의 InternalTrackingID 값이 metadata.product_log_id 필드에 할당됩니다. |
JumpType |
additional.fields.value.string_value |
원시 로그의 JumpType 값이 key :Jump Type 인 additional.fields 객체에 할당됩니다. |
NormalizedUser |
target.user.email_addresses |
원시 로그의 NormalizedUser 값이 target.user.email_addresses 필드에 할당됩니다. |
OperationMode |
additional.fields.value.string_value |
원시 로그의 OperationMode 값이 key :Operation Mode 인 additional.fields 객체에 할당됩니다. |
ProxyId |
security_result.detection_fields.value |
원시 로그의 ProxyId 값이 key :Proxy Id 인 security_result.detection_fields 객체에 할당됩니다. |
RequestUserAgent |
network.http.user_agent |
원시 로그의 RequestUserAgent 값이 network.http.user_agent 필드에 할당됩니다. |
SessionGuid |
network.session_id |
원시 로그의 SessionGuid 값이 network.session_id 필드에 할당됩니다. |
Tenant |
additional.fields.value.string_value |
원시 로그의 Tenant 값이 key :Tenant 인 additional.fields 객체에 할당됩니다. |
ThreadType |
additional.fields.value.string_value |
원시 로그의 ThreadType 값이 key :Thread Type 인 additional.fields 객체에 할당됩니다. |
UserType |
principal.user.attribute.roles.name |
원시 로그의 UserType 값이 principal.user.attribute.roles.name 필드에 할당됩니다. |
WhenOccurred |
metadata.event_timestamp |
원시 로그의 WhenOccurred 값이 파싱되어 metadata.event_timestamp 필드에 할당됩니다. 이 필드는 최상위 timestamp 필드도 채웁니다. 하드코딩된 값 'SSO'입니다. EventType 필드에 따라 결정됩니다. EventType 이 없거나 특정 기준과 일치하지 않으면 기본값은 STATUS_UPDATE 입니다. USER_LOGIN , USER_CREATION , USER_RESOURCE_ACCESS , USER_LOGOUT , USER_CHANGE_PASSWORD 일 수 있습니다. 하드코딩된 값 'CENTRIFY_SSO'입니다. 하드코딩된 값 'SSO'입니다. 하드코딩된 값 'Centrify' message 필드에 세션 ID가 포함된 경우 추출되어 사용됩니다. 그렇지 않으면 기본값은 '1'입니다. syslog 헤더에서 가져온 host 필드에서 추출합니다(사용 가능한 경우). syslog 헤더에서 가져온 pid 필드에서 추출합니다(사용 가능한 경우). UserGuid 이 있으면 해당 값이 사용됩니다. 그렇지 않고 message 필드에 사용자 ID가 포함된 경우 추출되어 사용됩니다. Level 이 '정보'인 경우 '허용'으로 설정하고 FailReason 이 있는 경우 '차단'으로 설정합니다. FailReason 이 있는 경우 'AUTH_VIOLATION'으로 설정됩니다. Level 필드에 따라 결정됩니다. Level 이 '정보'인 경우 'INFORMATIONAL', Level 이 '경고'인 경우 'MEDIUM', Level 이 '오류'인 경우 'ERROR'로 설정됩니다. |
도움이 더 필요하신가요? 커뮤니티 회원 및 Google SecOps 전문가로부터 답변을 받으세요.