Mengumpulkan log SSO Delinea
Dokumen ini menjelaskan cara menyerap log Single Sign-On (SSO) Delinea (sebelumnya Centrify)
ke Google Security Operations menggunakan Amazon S3. Parser mengekstrak log, menangani format JSON dan syslog. Proses ini mengurai pasangan nilai kunci, stempel waktu, dan kolom relevan lainnya, memetakannya ke model UDM, dengan logika khusus untuk menangani kegagalan login, agen pengguna, tingkat keparahan, mekanisme autentikasi, dan berbagai jenis peristiwa. FailUserName
diprioritaskan daripada NormalizedUser
untuk
alamat email target dalam peristiwa kegagalan.
Sebelum memulai
Pastikan Anda memenuhi prasyarat berikut:
- Instance Google SecOps.
- Akses istimewa ke tenant SSO Delinea (Centrify).
- Akses istimewa ke AWS (S3, Identity and Access Management (IAM), Lambda, EventBridge).
Mengumpulkan prasyarat SSO Delinea (Centrify) (ID, kunci API, ID organisasi, token)
- Login ke Delinea Admin Portal.
- Buka Aplikasi > Tambahkan Aplikasi.
- Telusuri OAuth2 Client, lalu klik Add.
- Klik Ya dalam dialog Tambahkan Aplikasi Web.
- Klik Tutup di dialog Tambahkan Aplikasi Web.
- Di halaman Application Configuration, konfigurasikan hal berikut:
- Tab General:
- ID Aplikasi: Masukkan ID unik (misalnya,
secops-oauth-client
) - Nama Aplikasi: Masukkan nama deskriptif (misalnya,
SecOps Data Export
) - Deskripsi Aplikasi: Masukkan deskripsi (misalnya,
OAuth client for exporting audit events to SecOps
)
- ID Aplikasi: Masukkan ID unik (misalnya,
- Tab Kepercayaan:
- Aplikasi bersifat Rahasia: Centang opsi ini
- Jenis Client ID: Pilih Rahasia
- Issued Client ID: Salin dan simpan nilai ini
- Issued Client Secret: Salin dan simpan nilai ini
- Tab Token:
- Metode autentikasi: Pilih Kredensial Klien
- Token Type: Pilih Jwt RS256
- Tab Cakupan:
- Tambahkan cakupan siem dengan deskripsi SIEM Integration Access.
- Tambahkan cakupan redrock/query dengan deskripsi Akses Query API.
- Tab General:
- Klik Simpan untuk membuat klien OAuth.
- Buka Layanan Inti > Pengguna > Tambahkan Pengguna.
- Konfigurasi pengguna layanan:
- Nama Login: Masukkan ID Klien dari langkah 6.
- Alamat Email: Masukkan email yang valid (kolom wajib diisi).
- Nama Tampilan: Masukkan nama deskriptif (misalnya,
SecOps Service User
). - Sandi dan Konfirmasi Sandi: Masukkan Rahasia Klien dari langkah 6
- Status: Pilih Is OAuth confidential client.
- Klik Create User.
- Buka Akses > Peran dan tetapkan pengguna layanan ke peran dengan izin yang sesuai untuk membuat kueri peristiwa audit.
- Salin dan simpan detail berikut di lokasi yang aman:
- URL Tenant: URL tenant Centrify Anda (misalnya,
https://yourtenant.my.centrify.com
) - Client ID: Dari langkah 6
- Rahasia Klien: Dari langkah 6
- ID Aplikasi OAuth: Dari Konfigurasi Aplikasi
- URL Tenant: URL tenant Centrify Anda (misalnya,
Mengonfigurasi bucket AWS S3 dan IAM untuk Google SecOps
- Buat bucket Amazon S3 dengan mengikuti panduan pengguna ini: Membuat bucket.
- Simpan Name dan Region bucket untuk referensi di masa mendatang (misalnya,
delinea-centrify-logs-bucket
). - Buat Pengguna dengan mengikuti panduan pengguna ini: Membuat pengguna IAM.
- Pilih Pengguna yang dibuat.
- Pilih tab Kredensial keamanan.
- Klik Create Access Key di bagian Access Keys.
- Pilih Layanan pihak ketiga sebagai Kasus penggunaan.
- Klik Berikutnya.
- Opsional: Tambahkan tag deskripsi.
- Klik Create access key.
- Klik Download file .CSV untuk menyimpan Kunci Akses dan Kunci Akses Rahasia untuk referensi di masa mendatang.
- Klik Selesai.
- Pilih tab Permissions.
- Klik Tambahkan izin di bagian Kebijakan izin.
- Pilih Tambahkan izin.
- Pilih Lampirkan kebijakan secara langsung.
- Cari kebijakan AmazonS3FullAccess.
- Pilih kebijakan.
- Klik Berikutnya.
- Klik Add permissions.
Mengonfigurasi kebijakan dan peran IAM untuk upload S3
- Di konsol AWS, buka IAM > Policies.
- Klik Buat kebijakan > tab JSON.
- Salin dan tempel kebijakan berikut.
Policy JSON (ganti
delinea-centrify-logs-bucket
jika Anda memasukkan nama bucket yang berbeda):{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket/centrify-sso-logs/state.json" } ] }
Klik Berikutnya > Buat kebijakan.
Buka IAM > Roles.
Klik Create role > AWS service > Lambda.
Lampirkan kebijakan yang baru dibuat dan kebijakan terkelola AWSLambdaBasicExecutionRole (untuk logging CloudWatch).
Beri nama peran
CentrifySSOLogExportRole
, lalu klik Buat peran.
Buat fungsi Lambda
- Di Konsol AWS, buka Lambda > Functions > Create function.
- Klik Buat dari awal.
Berikan detail konfigurasi berikut:
Setelan Nilai Nama CentrifySSOLogExport
Runtime Python 3.13 Arsitektur x86_64 Peran eksekusi CentrifySSOLogExportRole
Setelah fungsi dibuat, buka tab Code, hapus stub, dan tempelkan kode berikut (
CentrifySSOLogExport.py
).import json import boto3 import requests import base64 from datetime import datetime, timedelta import os from typing import Dict, List, Optional def lambda_handler(event, context): """ Lambda function to fetch Delinea Centrify SSO audit events and store them in S3 """ # Environment variables S3_BUCKET = os.environ['S3_BUCKET'] S3_PREFIX = os.environ['S3_PREFIX'] STATE_KEY = os.environ['STATE_KEY'] # Centrify API credentials TENANT_URL = os.environ['TENANT_URL'] CLIENT_ID = os.environ['CLIENT_ID'] CLIENT_SECRET = os.environ['CLIENT_SECRET'] OAUTH_APP_ID = os.environ['OAUTH_APP_ID'] # Optional parameters PAGE_SIZE = int(os.environ.get('PAGE_SIZE', '1000')) MAX_PAGES = int(os.environ.get('MAX_PAGES', '10')) s3_client = boto3.client('s3') try: # Get last execution state last_timestamp = get_last_state(s3_client, S3_BUCKET, STATE_KEY) # Get OAuth access token access_token = get_oauth_token(TENANT_URL, CLIENT_ID, CLIENT_SECRET, OAUTH_APP_ID) # Fetch audit events events = fetch_audit_events(TENANT_URL, access_token, last_timestamp, PAGE_SIZE, MAX_PAGES) if events: # Store events in S3 current_timestamp = datetime.utcnow() filename = f"{S3_PREFIX}centrify-sso-events-{current_timestamp.strftime('%Y%m%d_%H%M%S')}.json" store_events_to_s3(s3_client, S3_BUCKET, filename, events) # Update state with latest timestamp latest_timestamp = get_latest_event_timestamp(events) update_state(s3_client, S3_BUCKET, STATE_KEY, latest_timestamp) print(f"Successfully processed {len(events)} events and stored to {filename}") else: print("No new events found") return { 'statusCode': 200, 'body': json.dumps(f'Successfully processed {len(events) if events else 0} events') } except Exception as e: print(f"Error processing Centrify SSO logs: {str(e)}") return { 'statusCode': 500, 'body': json.dumps(f'Error: {str(e)}') } def get_oauth_token(tenant_url: str, client_id: str, client_secret: str, oauth_app_id: str) -> str: """ Get OAuth access token using client credentials flow """ # Create basic auth token credentials = f"{client_id}:{client_secret}" basic_auth = base64.b64encode(credentials.encode()).decode() token_url = f"{tenant_url}/oauth2/token/{oauth_app_id}" headers = { 'Authorization': f'Basic {basic_auth}', 'X-CENTRIFY-NATIVE-CLIENT': 'True', 'Content-Type': 'application/x-www-form-urlencoded' } data = { 'grant_type': 'client_credentials', 'scope': 'siem redrock/query' } response = requests.post(token_url, headers=headers, data=data) response.raise_for_status() token_data = response.json() return token_data['access_token'] def fetch_audit_events(tenant_url: str, access_token: str, last_timestamp: str, page_size: int, max_pages: int) -> List[Dict]: """ Fetch audit events from Centrify using the Redrock/query API """ query_url = f"{tenant_url}/Redrock/query" headers = { 'Authorization': f'Bearer {access_token}', 'X-CENTRIFY-NATIVE-CLIENT': 'True', 'Content-Type': 'application/json' } # Build SQL query with timestamp filter if last_timestamp: sql_query = f"Select * from Event where WhenOccurred > '{last_timestamp}' ORDER BY WhenOccurred ASC" else: # First run - get events from last 24 hours sql_query = "Select * from Event where WhenOccurred > datefunc('now', '-1') ORDER BY WhenOccurred ASC" payload = { "Script": sql_query, "args": { "PageSize": page_size, "Limit": page_size * max_pages, "Caching": -1 } } response = requests.post(query_url, headers=headers, json=payload) response.raise_for_status() response_data = response.json() if not response_data.get('success', False): raise Exception(f"API query failed: {response_data.get('Message', 'Unknown error')}") # Parse the response result = response_data.get('Result', {}) columns = {col['Name']: i for i, col in enumerate(result.get('Columns', []))} raw_results = result.get('Results', []) events = [] for raw_event in raw_results: event = {} row_data = raw_event.get('Row', {}) # Map column names to values for col_name, col_index in columns.items(): if col_name in row_data and row_data[col_name] is not None: event[col_name] = row_data[col_name] # Add metadata event['_source'] = 'centrify_sso' event['_collected_at'] = datetime.utcnow().isoformat() + 'Z' events.append(event) return events def get_last_state(s3_client, bucket: str, state_key: str) -> Optional[str]: """ Get the last processed timestamp from S3 state file """ try: response = s3_client.get_object(Bucket=bucket, Key=state_key) state_data = json.loads(response['Body'].read().decode('utf-8')) return state_data.get('last_timestamp') except s3_client.exceptions.NoSuchKey: print("No previous state found, starting from 24 hours ago") return None except Exception as e: print(f"Error reading state: {e}") return None def update_state(s3_client, bucket: str, state_key: str, timestamp: str): """ Update the state file with the latest processed timestamp """ state_data = { 'last_timestamp': timestamp, 'updated_at': datetime.utcnow().isoformat() + 'Z' } s3_client.put_object( Bucket=bucket, Key=state_key, Body=json.dumps(state_data), ContentType='application/json' ) def store_events_to_s3(s3_client, bucket: str, key: str, events: List[Dict]): """ Store events as JSONL (one JSON object per line) in S3 """ # Convert to JSONL format (one JSON object per line) jsonl_content = 'n'.join(json.dumps(event, default=str) for event in events) s3_client.put_object( Bucket=bucket, Key=key, Body=jsonl_content, ContentType='application/x-ndjson' ) def get_latest_event_timestamp(events: List[Dict]) -> str: """ Get the latest timestamp from the events for state tracking """ if not events: return datetime.utcnow().isoformat() + 'Z' latest = None for event in events: when_occurred = event.get('WhenOccurred') if when_occurred: if latest is None or when_occurred > latest: latest = when_occurred return latest or datetime.utcnow().isoformat() + 'Z'
Buka Configuration > Environment variables.
Klik Edit > Tambahkan variabel lingkungan baru.
Masukkan variabel lingkungan yang diberikan dalam tabel berikut, dengan mengganti nilai contoh dengan nilai Anda.
Variabel lingkungan
Kunci Nilai contoh S3_BUCKET
delinea-centrify-logs-bucket
S3_PREFIX
centrify-sso-logs/
STATE_KEY
centrify-sso-logs/state.json
TENANT_URL
https://yourtenant.my.centrify.com
CLIENT_ID
your-client-id
CLIENT_SECRET
your-client-secret
OAUTH_APP_ID
your-oauth-application-id
OAUTH_SCOPE
siem
PAGE_SIZE
1000
MAX_PAGES
10
Setelah fungsi dibuat, tetap buka halamannya (atau buka Lambda > Functions > your-function).
Pilih tab Configuration
Di panel General configuration, klik Edit.
Ubah Waktu Tunggu menjadi 5 menit (300 detik), lalu klik Simpan.
Membuat jadwal EventBridge
- Buka Amazon EventBridge > Scheduler > Create schedule.
- Berikan detail konfigurasi berikut:
- Jadwal berulang: Tarif (
1 hour
). - Target: fungsi Lambda Anda
CentrifySSOLogExport
. - Name:
CentrifySSOLogExport-1h
.
- Jadwal berulang: Tarif (
- Klik Buat jadwal.
(Opsional) Buat pengguna & kunci IAM hanya baca untuk Google SecOps
- Di AWS Console, buka IAM > Users.
- Klik Add users.
- Berikan detail konfigurasi berikut:
- Pengguna: Masukkan
secops-reader
. - Jenis akses: Pilih Kunci akses – Akses terprogram.
- Pengguna: Masukkan
- Klik Buat pengguna.
- Lampirkan kebijakan baca minimal (kustom): Pengguna > secops-reader > Izin.
- Klik Add permissions > Attach policies directly.
- Pilih Create Policy.
JSON:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket" } ] }
Nama =
secops-reader-policy
.Klik Buat kebijakan > cari/pilih > Berikutnya.
Klik Add permissions.
Buat kunci akses untuk
secops-reader
: Kredensial keamanan > Kunci akses.Klik Create access key.
Download
.CSV
. (Anda akan menempelkan nilai ini ke feed).
Mengonfigurasi feed di Google SecOps untuk menyerap log SSO Delinea (Centrify)
- Buka Setelan SIEM > Feed.
- Klik + Tambahkan Feed Baru.
- Di kolom Nama feed, masukkan nama untuk feed (misalnya,
Delinea Centrify SSO logs
). - Pilih Amazon S3 V2 sebagai Jenis sumber.
- Pilih Centrify sebagai Jenis log.
- Klik Berikutnya.
- Tentukan nilai untuk parameter input berikut:
- URI S3:
s3://delinea-centrify-logs-bucket/centrify-sso-logs/
- Opsi penghapusan sumber: Pilih opsi penghapusan sesuai preferensi Anda.
- Usia File Maksimum: Menyertakan file yang diubah dalam jumlah hari terakhir. Defaultnya adalah 180 hari.
- ID Kunci Akses: Kunci akses pengguna dengan akses ke bucket S3.
- Kunci Akses Rahasia: Kunci rahasia pengguna dengan akses ke bucket S3.
- Namespace aset: namespace aset.
- Label penyerapan: label yang diterapkan ke peristiwa dari feed ini.
- URI S3:
- Klik Berikutnya.
- Tinjau konfigurasi feed baru Anda di layar Selesaikan, lalu klik Kirim.
Tabel pemetaan UDM
Kolom log | Pemetaan UDM | Logika |
---|---|---|
AccountID |
security_result.detection_fields.value |
Nilai AccountID dari log mentah ditetapkan ke objek security_result.detection_fields dengan key :Account ID . |
ApplicationName |
target.application |
Nilai ApplicationName dari log mentah ditetapkan ke kolom target.application . |
AuthorityFQDN |
target.asset.network_domain |
Nilai AuthorityFQDN dari log mentah ditetapkan ke kolom target.asset.network_domain . |
AuthorityID |
target.asset.asset_id |
Nilai AuthorityID dari log mentah ditetapkan ke kolom target.asset.asset_id , dengan awalan "AuthorityID:". |
AzDeploymentId |
security_result.detection_fields.value |
Nilai AzDeploymentId dari log mentah ditetapkan ke objek security_result.detection_fields dengan key :AzDeploymentId . |
AzRoleId |
additional.fields.value.string_value |
Nilai AzRoleId dari log mentah ditetapkan ke objek additional.fields dengan key :AzRole Id . |
AzRoleName |
target.user.attribute.roles.name |
Nilai AzRoleName dari log mentah ditetapkan ke kolom target.user.attribute.roles.name . |
ComputerFQDN |
principal.asset.network_domain |
Nilai ComputerFQDN dari log mentah ditetapkan ke kolom principal.asset.network_domain . |
ComputerID |
principal.asset.asset_id |
Nilai ComputerID dari log mentah ditetapkan ke kolom principal.asset.asset_id , dengan awalan "ComputerId:". |
ComputerName |
about.hostname |
Nilai ComputerName dari log mentah ditetapkan ke kolom about.hostname . |
CredentialId |
security_result.detection_fields.value |
Nilai CredentialId dari log mentah ditetapkan ke objek security_result.detection_fields dengan key :Credential Id . |
DirectoryServiceName |
security_result.detection_fields.value |
Nilai DirectoryServiceName dari log mentah ditetapkan ke objek security_result.detection_fields dengan key :Directory Service Name . |
DirectoryServiceNameLocalized |
security_result.detection_fields.value |
Nilai DirectoryServiceNameLocalized dari log mentah ditetapkan ke objek security_result.detection_fields dengan key :Directory Service Name Localized . |
DirectoryServiceUuid |
security_result.detection_fields.value |
Nilai DirectoryServiceUuid dari log mentah ditetapkan ke objek security_result.detection_fields dengan key :Directory Service Uuid . |
EventMessage |
security_result.summary |
Nilai EventMessage dari log mentah ditetapkan ke kolom security_result.summary . |
EventType |
metadata.product_event_type |
Nilai EventType dari log mentah ditetapkan ke kolom metadata.product_event_type . Nilai ini juga digunakan untuk menentukan metadata.event_type . |
FailReason |
security_result.summary |
Nilai FailReason dari log mentah ditetapkan ke kolom security_result.summary jika ada. |
FailUserName |
target.user.email_addresses |
Nilai FailUserName dari log mentah ditetapkan ke kolom target.user.email_addresses jika ada. |
FromIPAddress |
principal.ip |
Nilai FromIPAddress dari log mentah ditetapkan ke kolom principal.ip . |
ID |
security_result.detection_fields.value |
Nilai ID dari log mentah ditetapkan ke objek security_result.detection_fields dengan key :ID . |
InternalTrackingID |
metadata.product_log_id |
Nilai InternalTrackingID dari log mentah ditetapkan ke kolom metadata.product_log_id . |
JumpType |
additional.fields.value.string_value |
Nilai JumpType dari log mentah ditetapkan ke objek additional.fields dengan key :Jump Type . |
NormalizedUser |
target.user.email_addresses |
Nilai NormalizedUser dari log mentah ditetapkan ke kolom target.user.email_addresses . |
OperationMode |
additional.fields.value.string_value |
Nilai OperationMode dari log mentah ditetapkan ke objek additional.fields dengan key :Operation Mode . |
ProxyId |
security_result.detection_fields.value |
Nilai ProxyId dari log mentah ditetapkan ke objek security_result.detection_fields dengan key :Proxy Id . |
RequestUserAgent |
network.http.user_agent |
Nilai RequestUserAgent dari log mentah ditetapkan ke kolom network.http.user_agent . |
SessionGuid |
network.session_id |
Nilai SessionGuid dari log mentah ditetapkan ke kolom network.session_id . |
Tenant |
additional.fields.value.string_value |
Nilai Tenant dari log mentah ditetapkan ke objek additional.fields dengan key :Tenant . |
ThreadType |
additional.fields.value.string_value |
Nilai ThreadType dari log mentah ditetapkan ke objek additional.fields dengan key :Thread Type . |
UserType |
principal.user.attribute.roles.name |
Nilai UserType dari log mentah ditetapkan ke kolom principal.user.attribute.roles.name . |
WhenOccurred |
metadata.event_timestamp |
Nilai WhenOccurred dari log mentah diuraikan dan ditetapkan ke kolom metadata.event_timestamp . Kolom ini juga mengisi kolom timestamp tingkat teratas. Nilai yang dikodekan secara permanen "SSO". Ditentukan oleh kolom EventType . Nilai defaultnya adalah STATUS_UPDATE jika EventType tidak ada atau tidak cocok dengan kriteria tertentu. Dapat berupa USER_LOGIN , USER_CREATION , USER_RESOURCE_ACCESS , USER_LOGOUT , atau USER_CHANGE_PASSWORD . Nilai yang di-hardcode "CENTRIFY_SSO". Nilai yang dikodekan secara permanen "SSO". Nilai yang dikodekan secara permanen "Centrify". Jika kolom message berisi ID sesi, ID tersebut akan diekstrak dan digunakan. Jika tidak, defaultnya adalah "1". Diekstrak dari kolom host jika tersedia, yang berasal dari header syslog. Diekstrak dari kolom pid jika tersedia, yang berasal dari header syslog. Jika UserGuid ada, nilainya akan digunakan. Jika tidak, jika kolom message berisi ID pengguna, ID tersebut akan diekstrak dan digunakan. Tetapkan ke "ALLOW" jika Level adalah "Info", dan "BLOCK" jika FailReason ada. Ditetapkan ke "AUTH_VIOLATION" jika FailReason ada. Ditentukan oleh kolom Level . Ditetapkan ke "INFORMATIONAL" jika Level adalah "Info", "MEDIUM" jika Level adalah "Warning", dan "ERROR" jika Level adalah "Error". |
Perlu bantuan lain? Dapatkan jawaban dari anggota Komunitas dan profesional Google SecOps.