Delinea SSO ログを収集する
このドキュメントでは、Amazon S3 を使用して Delinea(旧 Centrify)シングル サインオン(SSO)ログを Google Security Operations に取り込む方法について説明します。パーサーはログを抽出し、JSON 形式と syslog 形式の両方を処理します。Key-Value ペア、タイムスタンプ、その他の関連フィールドを解析して UDM モデルにマッピングします。ログイン失敗、ユーザー エージェント、重大度レベル、認証メカニズム、さまざまなイベントタイプを処理する特定のロジックがあります。エラー イベントの宛先メールアドレスでは、NormalizedUser
よりも FailUserName
が優先されます。
始める前に
次の前提条件を満たしていることを確認してください。
- Google SecOps インスタンス。
- Delinea(Centrify)SSO テナントへの特権アクセス。
- AWS(S3、Identity and Access Management(IAM)、Lambda、EventBridge)への特権アクセス。
Delinea(Centrify)SSO の前提条件(ID、API キー、組織 ID、トークン)を収集する
- Delinea 管理ポータルにログインします。
- [アプリ] > [アプリを追加] に移動します。
- [OAuth2 Client] を検索して、[追加] をクリックします。
- [ウェブアプリを追加] ダイアログで [はい] をクリックします。
- [ウェブアプリを追加] ダイアログで [閉じる] をクリックします。
- [アプリケーション構成] ページで、次の項目を構成します。
- [全般] タブ:
- Application ID: 一意の識別子を入力します(例:
secops-oauth-client
)。 - アプリケーション名: わかりやすい名前を入力します(例:
SecOps Data Export
)。 - Application Description(アプリケーションの説明): 説明を入力します(例:
OAuth client for exporting audit events to SecOps
)
- Application ID: 一意の識別子を入力します(例:
- [信頼] タブ:
- Application is Confidential: このオプションをオンにします。
- クライアント ID のタイプ: [Confidential] を選択します。
- 発行されたクライアント ID: この値をコピーして保存します。
- 発行されたクライアント シークレット: この値をコピーして保存します。
- [トークン] タブ:
- Auth methods: [Client Creds] を選択します。
- トークンタイプ: [Jwt RS256] を選択します。
- [Scope] タブ:
- 説明に「SIEM Integration Access」と入力して、スコープ siem を追加します。
- スコープ redrock/query を追加し、説明を Query API Access にします。
- [全般] タブ:
- [保存] をクリックして、OAuth クライアントを作成します。
- [Core Services] > [Users] > [Add User] に移動します。
- サービス ユーザーを構成します。
- ログイン名: ステップ 6 のクライアント ID を入力します。
- メールアドレス: 有効なメールアドレスを入力します(必須項目)。
- 表示名: わかりやすい名前を入力します(例:
SecOps Service User
)。 - [Password] と [Confirm Password]: 手順 6 で取得したクライアント シークレットを入力します。
- ステータス: [OAuth の機密クライアントである] を選択します。
- [Create User] をクリックします。
- [アクセス > ロール] に移動し、監査イベントのクエリに必要な権限を持つロールにサービスユーザーを割り当てます。
- 次の詳細をコピーして安全な場所に保存します。
- テナント URL: Centrify テナントの URL(例:
https://yourtenant.my.centrify.com
) - クライアント ID: 手順 6 で取得したクライアント ID
- クライアント シークレット: ステップ 6 で取得した値
- OAuth Application ID: アプリケーション構成から取得
- テナント URL: Centrify テナントの URL(例:
Google SecOps 用に AWS S3 バケットと IAM を構成する
- バケットの作成のユーザーガイドに沿って、Amazon S3 バケットを作成します。
- 後で参照できるように、バケットの名前とリージョンを保存します(例:
delinea-centrify-logs-bucket
)。 - IAM ユーザーの作成のユーザーガイドに沿って、ユーザーを作成します。
- 作成したユーザーを選択します。
- [セキュリティ認証情報] タブを選択します。
- [アクセスキー] セクションで [アクセスキーを作成] をクリックします。
- [ユースケース] として [サードパーティ サービス] を選択します。
- [次へ] をクリックします。
- 省略可: 説明タグを追加します。
- [アクセスキーを作成] をクリックします。
- [.csv ファイルをダウンロード] をクリックし、[アクセスキー] と [シークレット アクセスキー] を保存して、今後の参照に備えます。
- [完了] をクリックします。
- [権限] タブを選択します。
- [権限ポリシー] セクションの [権限を追加] をクリックします。
- [権限を追加] を選択します。
- [ポリシーを直接アタッチする] を選択します。
- AmazonS3FullAccess ポリシーを検索します。
- ポリシーを選択します。
- [次へ] をクリックします。
- [権限を追加] をクリックします。
S3 アップロードの IAM ポリシーとロールを構成する
- AWS コンソールで、[IAM] > [ポリシー] に移動します。
- [ポリシーを作成> [JSON] タブ] をクリックします。
- 次のポリシーをコピーして貼り付けます。
ポリシー JSON(別のバケット名を入力した場合は
delinea-centrify-logs-bucket
を置き換えます):{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket/centrify-sso-logs/state.json" } ] }
[次へ] > [ポリシーを作成] をクリックします。
[IAM]> [ロール] に移動します。
[ロールを作成> AWS サービス > Lambda] をクリックします。
新しく作成したポリシーと、マネージド ポリシー AWSLambdaBasicExecutionRole(CloudWatch ロギング用)を関連付けます。
ロールに「
CentrifySSOLogExportRole
」という名前を付けて、[ロールを作成] をクリックします。
Lambda 関数を作成する
- AWS コンソールで、[Lambda] > [Functions] > [Create function] に移動します。
- [Author from scratch] をクリックします。
次の構成情報を提供してください。
設定 値 名前 CentrifySSOLogExport
ランタイム Python 3.13 アーキテクチャ x86_64 実行ロール CentrifySSOLogExportRole
関数を作成したら、[コード] タブを開き、スタブを削除して次のコード(
CentrifySSOLogExport.py
)を貼り付けます。import json import boto3 import requests import base64 from datetime import datetime, timedelta import os from typing import Dict, List, Optional def lambda_handler(event, context): """ Lambda function to fetch Delinea Centrify SSO audit events and store them in S3 """ # Environment variables S3_BUCKET = os.environ['S3_BUCKET'] S3_PREFIX = os.environ['S3_PREFIX'] STATE_KEY = os.environ['STATE_KEY'] # Centrify API credentials TENANT_URL = os.environ['TENANT_URL'] CLIENT_ID = os.environ['CLIENT_ID'] CLIENT_SECRET = os.environ['CLIENT_SECRET'] OAUTH_APP_ID = os.environ['OAUTH_APP_ID'] # Optional parameters PAGE_SIZE = int(os.environ.get('PAGE_SIZE', '1000')) MAX_PAGES = int(os.environ.get('MAX_PAGES', '10')) s3_client = boto3.client('s3') try: # Get last execution state last_timestamp = get_last_state(s3_client, S3_BUCKET, STATE_KEY) # Get OAuth access token access_token = get_oauth_token(TENANT_URL, CLIENT_ID, CLIENT_SECRET, OAUTH_APP_ID) # Fetch audit events events = fetch_audit_events(TENANT_URL, access_token, last_timestamp, PAGE_SIZE, MAX_PAGES) if events: # Store events in S3 current_timestamp = datetime.utcnow() filename = f"{S3_PREFIX}centrify-sso-events-{current_timestamp.strftime('%Y%m%d_%H%M%S')}.json" store_events_to_s3(s3_client, S3_BUCKET, filename, events) # Update state with latest timestamp latest_timestamp = get_latest_event_timestamp(events) update_state(s3_client, S3_BUCKET, STATE_KEY, latest_timestamp) print(f"Successfully processed {len(events)} events and stored to {filename}") else: print("No new events found") return { 'statusCode': 200, 'body': json.dumps(f'Successfully processed {len(events) if events else 0} events') } except Exception as e: print(f"Error processing Centrify SSO logs: {str(e)}") return { 'statusCode': 500, 'body': json.dumps(f'Error: {str(e)}') } def get_oauth_token(tenant_url: str, client_id: str, client_secret: str, oauth_app_id: str) -> str: """ Get OAuth access token using client credentials flow """ # Create basic auth token credentials = f"{client_id}:{client_secret}" basic_auth = base64.b64encode(credentials.encode()).decode() token_url = f"{tenant_url}/oauth2/token/{oauth_app_id}" headers = { 'Authorization': f'Basic {basic_auth}', 'X-CENTRIFY-NATIVE-CLIENT': 'True', 'Content-Type': 'application/x-www-form-urlencoded' } data = { 'grant_type': 'client_credentials', 'scope': 'siem redrock/query' } response = requests.post(token_url, headers=headers, data=data) response.raise_for_status() token_data = response.json() return token_data['access_token'] def fetch_audit_events(tenant_url: str, access_token: str, last_timestamp: str, page_size: int, max_pages: int) -> List[Dict]: """ Fetch audit events from Centrify using the Redrock/query API """ query_url = f"{tenant_url}/Redrock/query" headers = { 'Authorization': f'Bearer {access_token}', 'X-CENTRIFY-NATIVE-CLIENT': 'True', 'Content-Type': 'application/json' } # Build SQL query with timestamp filter if last_timestamp: sql_query = f"Select * from Event where WhenOccurred > '{last_timestamp}' ORDER BY WhenOccurred ASC" else: # First run - get events from last 24 hours sql_query = "Select * from Event where WhenOccurred > datefunc('now', '-1') ORDER BY WhenOccurred ASC" payload = { "Script": sql_query, "args": { "PageSize": page_size, "Limit": page_size * max_pages, "Caching": -1 } } response = requests.post(query_url, headers=headers, json=payload) response.raise_for_status() response_data = response.json() if not response_data.get('success', False): raise Exception(f"API query failed: {response_data.get('Message', 'Unknown error')}") # Parse the response result = response_data.get('Result', {}) columns = {col['Name']: i for i, col in enumerate(result.get('Columns', []))} raw_results = result.get('Results', []) events = [] for raw_event in raw_results: event = {} row_data = raw_event.get('Row', {}) # Map column names to values for col_name, col_index in columns.items(): if col_name in row_data and row_data[col_name] is not None: event[col_name] = row_data[col_name] # Add metadata event['_source'] = 'centrify_sso' event['_collected_at'] = datetime.utcnow().isoformat() + 'Z' events.append(event) return events def get_last_state(s3_client, bucket: str, state_key: str) -> Optional[str]: """ Get the last processed timestamp from S3 state file """ try: response = s3_client.get_object(Bucket=bucket, Key=state_key) state_data = json.loads(response['Body'].read().decode('utf-8')) return state_data.get('last_timestamp') except s3_client.exceptions.NoSuchKey: print("No previous state found, starting from 24 hours ago") return None except Exception as e: print(f"Error reading state: {e}") return None def update_state(s3_client, bucket: str, state_key: str, timestamp: str): """ Update the state file with the latest processed timestamp """ state_data = { 'last_timestamp': timestamp, 'updated_at': datetime.utcnow().isoformat() + 'Z' } s3_client.put_object( Bucket=bucket, Key=state_key, Body=json.dumps(state_data), ContentType='application/json' ) def store_events_to_s3(s3_client, bucket: str, key: str, events: List[Dict]): """ Store events as JSONL (one JSON object per line) in S3 """ # Convert to JSONL format (one JSON object per line) jsonl_content = 'n'.join(json.dumps(event, default=str) for event in events) s3_client.put_object( Bucket=bucket, Key=key, Body=jsonl_content, ContentType='application/x-ndjson' ) def get_latest_event_timestamp(events: List[Dict]) -> str: """ Get the latest timestamp from the events for state tracking """ if not events: return datetime.utcnow().isoformat() + 'Z' latest = None for event in events: when_occurred = event.get('WhenOccurred') if when_occurred: if latest is None or when_occurred > latest: latest = when_occurred return latest or datetime.utcnow().isoformat() + 'Z'
[構成] > [環境変数] に移動します。
[編集>新しい環境変数を追加] をクリックします。
次の表に示す環境変数を入力し、サンプル値を自分の値に置き換えます。
環境変数
キー 値の例 S3_BUCKET
delinea-centrify-logs-bucket
S3_PREFIX
centrify-sso-logs/
STATE_KEY
centrify-sso-logs/state.json
TENANT_URL
https://yourtenant.my.centrify.com
CLIENT_ID
your-client-id
CLIENT_SECRET
your-client-secret
OAUTH_APP_ID
your-oauth-application-id
OAUTH_SCOPE
siem
PAGE_SIZE
1000
MAX_PAGES
10
関数が作成されたら、そのページにとどまるか、[Lambda] > [関数] > [your-function] を開きます。
[CONFIGURATION] タブを選択します。
[全般設定] パネルで、[編集] をクリックします。
[Timeout] を [5 minutes (300 seconds)] に変更し、[Save] をクリックします。
EventBridge スケジュールを作成する
- [Amazon EventBridge] > [Scheduler] > [スケジュールの作成] に移動します。
- 次の構成の詳細を入力します。
- 定期的なスケジュール: レート(
1 hour
)。 - ターゲット: Lambda 関数
CentrifySSOLogExport
。 - 名前:
CentrifySSOLogExport-1h
- 定期的なスケジュール: レート(
- [スケジュールを作成] をクリックします。
(省略可)Google SecOps 用の読み取り専用の IAM ユーザーと鍵を作成する
- AWS コンソールで、[IAM] > [ユーザー] に移動します。
- [ユーザーを追加] をクリックします。
- 次の構成の詳細を入力します。
- ユーザー: 「
secops-reader
」と入力します。 - アクセスの種類: [アクセスキー - プログラムによるアクセス] を選択します。
- ユーザー: 「
- [ユーザーを作成] をクリックします。
- 最小限の読み取りポリシー(カスタム)を適用します: [ユーザー] > [secops-reader] > [権限]。
- [権限を追加> ポリシーを直接アタッチする] をクリックします。
- [ポリシーを作成] を選択します。
JSON:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket" } ] }
名前 =
secops-reader-policy
。[ポリシーを作成> 検索/選択 > 次へ] をクリックします。
[権限を追加] をクリックします。
secops-reader
のアクセスキーを作成します。[セキュリティ認証情報] > [アクセスキー] に移動します。[アクセスキーを作成] をクリックします。
.CSV
をダウンロードします。(これらの値はフィードに貼り付けます)。
Delinea(Centrify)SSO ログを取り込むように Google SecOps でフィードを構成する
- [SIEM 設定] > [フィード] に移動します。
- [+ 新しいフィードを追加] をクリックします。
- [フィード名] フィールドに、フィードの名前を入力します(例:
Delinea Centrify SSO logs
)。 - [ソースタイプ] として [Amazon S3 V2] を選択します。
- [ログタイプ] として [Centrify] を選択します。
- [次へ] をクリックします。
- 次の入力パラメータの値を指定します。
- S3 URI:
s3://delinea-centrify-logs-bucket/centrify-sso-logs/
- Source deletion options: 必要に応じて削除オプションを選択します。
- ファイルの最大経過日数: 指定した日数以内に変更されたファイルを含めます。デフォルトは 180 日です。
- アクセスキー ID: S3 バケットにアクセスできるユーザー アクセスキー。
- シークレット アクセスキー: S3 バケットにアクセスできるユーザーのシークレット キー。
- Asset namespace: アセットの名前空間。
- Ingestion labels: このフィードのイベントに適用されるラベル。
- S3 URI:
- [次へ] をクリックします。
- [Finalize] 画面で新しいフィードの設定を確認し、[送信] をクリックします。
UDM マッピング テーブル
ログフィールド | UDM マッピング | ロジック |
---|---|---|
AccountID |
security_result.detection_fields.value |
未加工ログの AccountID の値は、key :Account ID を持つ security_result.detection_fields オブジェクトに割り当てられます。 |
ApplicationName |
target.application |
未加工ログの ApplicationName の値が target.application フィールドに割り当てられます。 |
AuthorityFQDN |
target.asset.network_domain |
未加工ログの AuthorityFQDN の値が target.asset.network_domain フィールドに割り当てられます。 |
AuthorityID |
target.asset.asset_id |
未加工ログの AuthorityID の値が target.asset.asset_id フィールドに割り当てられ、「AuthorityID:」という接頭辞が付加されます。 |
AzDeploymentId |
security_result.detection_fields.value |
未加工ログの AzDeploymentId の値は、key :AzDeploymentId を持つ security_result.detection_fields オブジェクトに割り当てられます。 |
AzRoleId |
additional.fields.value.string_value |
未加工ログの AzRoleId の値は、key :AzRole Id を持つ additional.fields オブジェクトに割り当てられます。 |
AzRoleName |
target.user.attribute.roles.name |
未加工ログの AzRoleName の値が target.user.attribute.roles.name フィールドに割り当てられます。 |
ComputerFQDN |
principal.asset.network_domain |
未加工ログの ComputerFQDN の値が principal.asset.network_domain フィールドに割り当てられます。 |
ComputerID |
principal.asset.asset_id |
未加工ログの ComputerID の値が principal.asset.asset_id フィールドに割り当てられ、「ComputerId:」という接頭辞が付加されます。 |
ComputerName |
about.hostname |
未加工ログの ComputerName の値が about.hostname フィールドに割り当てられます。 |
CredentialId |
security_result.detection_fields.value |
未加工ログの CredentialId の値は、key :Credential Id を持つ security_result.detection_fields オブジェクトに割り当てられます。 |
DirectoryServiceName |
security_result.detection_fields.value |
未加工ログの DirectoryServiceName の値は、key :Directory Service Name を持つ security_result.detection_fields オブジェクトに割り当てられます。 |
DirectoryServiceNameLocalized |
security_result.detection_fields.value |
未加工ログの DirectoryServiceNameLocalized の値は、key :Directory Service Name Localized を持つ security_result.detection_fields オブジェクトに割り当てられます。 |
DirectoryServiceUuid |
security_result.detection_fields.value |
未加工ログの DirectoryServiceUuid の値は、key :Directory Service Uuid を持つ security_result.detection_fields オブジェクトに割り当てられます。 |
EventMessage |
security_result.summary |
未加工ログの EventMessage の値が security_result.summary フィールドに割り当てられます。 |
EventType |
metadata.product_event_type |
未加工ログの EventType の値が metadata.product_event_type フィールドに割り当てられます。また、metadata.event_type の決定にも使用されます。 |
FailReason |
security_result.summary |
未加工ログの FailReason の値は、存在する場合は security_result.summary フィールドに割り当てられます。 |
FailUserName |
target.user.email_addresses |
未加工ログの FailUserName の値は、存在する場合は target.user.email_addresses フィールドに割り当てられます。 |
FromIPAddress |
principal.ip |
未加工ログの FromIPAddress の値が principal.ip フィールドに割り当てられます。 |
ID |
security_result.detection_fields.value |
未加工ログの ID の値は、key :ID を持つ security_result.detection_fields オブジェクトに割り当てられます。 |
InternalTrackingID |
metadata.product_log_id |
未加工ログの InternalTrackingID の値が metadata.product_log_id フィールドに割り当てられます。 |
JumpType |
additional.fields.value.string_value |
未加工ログの JumpType の値は、key :Jump Type を持つ additional.fields オブジェクトに割り当てられます。 |
NormalizedUser |
target.user.email_addresses |
未加工ログの NormalizedUser の値が target.user.email_addresses フィールドに割り当てられます。 |
OperationMode |
additional.fields.value.string_value |
未加工ログの OperationMode の値は、key :Operation Mode を持つ additional.fields オブジェクトに割り当てられます。 |
ProxyId |
security_result.detection_fields.value |
未加工ログの ProxyId の値は、key :Proxy Id を持つ security_result.detection_fields オブジェクトに割り当てられます。 |
RequestUserAgent |
network.http.user_agent |
未加工ログの RequestUserAgent の値が network.http.user_agent フィールドに割り当てられます。 |
SessionGuid |
network.session_id |
未加工ログの SessionGuid の値が network.session_id フィールドに割り当てられます。 |
Tenant |
additional.fields.value.string_value |
未加工ログの Tenant の値は、key :Tenant を持つ additional.fields オブジェクトに割り当てられます。 |
ThreadType |
additional.fields.value.string_value |
未加工ログの ThreadType の値は、key :Thread Type を持つ additional.fields オブジェクトに割り当てられます。 |
UserType |
principal.user.attribute.roles.name |
未加工ログの UserType の値が principal.user.attribute.roles.name フィールドに割り当てられます。 |
WhenOccurred |
metadata.event_timestamp |
未加工ログの WhenOccurred の値が解析され、metadata.event_timestamp フィールドに割り当てられます。このフィールドには、最上位の timestamp フィールドも入力されます。ハードコードされた値「SSO」。EventType フィールドによって決定されます。EventType が存在しない場合や、特定の条件に一致しない場合は、デフォルトで STATUS_UPDATE になります。USER_LOGIN 、USER_CREATION 、USER_RESOURCE_ACCESS 、USER_LOGOUT 、または USER_CHANGE_PASSWORD のいずれかです。ハードコードされた値「CENTRIFY_SSO」。ハードコードされた値「SSO」。ハードコードされた値「Centrify」。message フィールドにセッション ID が含まれている場合は、抽出されて使用されます。それ以外の場合は、デフォルトで「1」になります。syslog ヘッダーから取得された host フィールドから抽出されます(利用可能な場合)。syslog ヘッダーから取得された pid フィールドから抽出されます(利用可能な場合)。UserGuid が存在する場合は、その値が使用されます。それ以外の場合、message フィールドにユーザー ID が含まれている場合は、抽出されて使用されます。Level が「Info」の場合は「ALLOW」、FailReason が存在する場合は「BLOCK」に設定されます。FailReason が存在する場合は、「AUTH_VIOLATION」に設定されます。Level フィールドによって決定されます。Level が「Info」の場合は「INFORMATIONAL」、Level が「Warning」の場合は「MEDIUM」、Level が「Error」の場合は「ERROR」に設定します。 |
さらにサポートが必要な場合 コミュニティ メンバーや Google SecOps のプロフェッショナルから回答を得ることができます。