Coletar registros do SSO da Delinea
Este documento explica como ingerir registros de logon único (SSO) da Delinea (antiga Centrify) no Google Security Operations usando o Amazon S3. O analisador extrai os registros, processando os formatos JSON e syslog. Ele analisa pares de chave-valor, carimbos de data/hora e outros campos relevantes, mapeando-os para o modelo da UDM, com uma lógica específica para lidar com falhas de login, user agents, níveis de gravidade, mecanismos de autenticação e vários tipos de eventos. Ele prioriza FailUserName
em vez de NormalizedUser
para endereços de e-mail de destino em eventos de falha.
Antes de começar
Verifique se você tem os pré-requisitos a seguir:
- Uma instância do Google SecOps.
- Acesso privilegiado ao locatário do SSO da Delinea (Centrify).
- Acesso privilegiado à AWS (S3, Identity and Access Management (IAM), Lambda, EventBridge).
Coletar os pré-requisitos do SSO da Delinea (Centrify) (IDs, chaves de API, IDs da organização, tokens)
- Faça login no Portal do administrador da Delinea.
- Acesse Apps > Adicionar apps.
- Pesquise Cliente OAuth2 e clique em Adicionar.
- Clique em Sim na caixa de diálogo Adicionar app da Web.
- Clique em Fechar na caixa de diálogo Adicionar apps da Web.
- Na página Configuração do aplicativo, configure o seguinte:
- Guia Geral:
- ID do aplicativo: insira um identificador exclusivo (por exemplo,
secops-oauth-client
). - Nome do aplicativo: insira um nome descritivo, por exemplo,
SecOps Data Export
. - Descrição do aplicativo: insira uma descrição (por exemplo,
OAuth client for exporting audit events to SecOps
)
- ID do aplicativo: insira um identificador exclusivo (por exemplo,
- Guia Confiança:
- A inscrição é confidencial: marque essa opção
- Tipo de ID do cliente: selecione Confidencial
- ID do cliente emitido: copie e salve esse valor.
- Chave secreta do cliente emitida: copie e salve esse valor.
- Guia Tokens:
- Métodos de autenticação: selecione Credenciais do cliente.
- Tipo de token: selecione Jwt RS256.
- Guia Escopo:
- Adicione o escopo siem com a descrição Acesso à integração do SIEM.
- Adicione o escopo redrock/query com a descrição Acesso à API Query.
- Guia Geral:
- Clique em Salvar para criar o cliente OAuth.
- Acesse Serviços principais > Usuários > Adicionar usuário.
- Configure o usuário de serviço:
- Nome de login: insira o ID do cliente da etapa 6.
- Endereço de e-mail: insira um e-mail válido (campo obrigatório).
- Nome de exibição: insira um nome descritivo, por exemplo,
SecOps Service User
. - Senha e Confirmar senha: insira a Chave secreta do cliente da etapa 6.
- Status: selecione É um cliente OAuth confidencial.
- Clique em Create User.
- Acesse Acesso > Papéis e atribua ao usuário de serviço um papel com as permissões adequadas para consultar eventos de auditoria.
- Copie e salve em um local seguro os seguintes detalhes:
- URL do locatário: o URL do locatário do Centrify (por exemplo,
https://yourtenant.my.centrify.com
) - ID do cliente: da etapa 6
- Chave secreta do cliente: da etapa 6
- ID do aplicativo OAuth: na configuração do aplicativo
- URL do locatário: o URL do locatário do Centrify (por exemplo,
Configurar o bucket do AWS S3 e o IAM para o Google SecOps
- Crie um bucket do Amazon S3 seguindo este guia do usuário: Como criar um bucket.
- Salve o Nome e a Região do bucket para referência futura (por exemplo,
delinea-centrify-logs-bucket
). - Crie um usuário seguindo este guia: Como criar um usuário do IAM.
- Selecione o usuário criado.
- Selecione a guia Credenciais de segurança.
- Clique em Criar chave de acesso na seção Chaves de acesso.
- Selecione Serviço de terceiros como Caso de uso.
- Clique em Próxima.
- Opcional: adicione uma tag de descrição.
- Clique em Criar chave de acesso.
- Clique em Fazer o download do arquivo .CSV para salvar a chave de acesso e a chave de acesso secreta para referência futura.
- Clique em Concluído.
- Selecione a guia Permissões.
- Clique em Adicionar permissões na seção Políticas de permissões.
- Selecione Adicionar permissões.
- Selecione Anexar políticas diretamente.
- Pesquise a política AmazonS3FullAccess.
- Selecione a política.
- Clique em Próxima.
- Clique em Adicionar permissões
Configurar a política e o papel do IAM para uploads do S3
- No console da AWS, acesse IAM > Políticas.
- Clique em Criar política > guia JSON.
- Copie e cole a política a seguir.
JSON da política (substitua
delinea-centrify-logs-bucket
se você tiver inserido um nome de bucket diferente):{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket/centrify-sso-logs/state.json" } ] }
Clique em Próxima > Criar política.
Acesse IAM > Papéis.
Clique em Criar função > Serviço da AWS > Lambda.
Anexe a política recém-criada e a política gerenciada AWSLambdaBasicExecutionRole (para geração de registros do CloudWatch).
Nomeie a função como
CentrifySSOLogExportRole
e clique em Criar função.
Criar a função Lambda
- No console da AWS, acesse Lambda > Functions > Create function.
- Clique em Criar do zero.
Informe os seguintes detalhes de configuração:
Configuração Valor Nome CentrifySSOLogExport
Ambiente de execução Python 3.13 Arquitetura x86_64 Função de execução CentrifySSOLogExportRole
Depois que a função for criada, abra a guia Código, exclua o stub e cole o código a seguir (
CentrifySSOLogExport.py
).import json import boto3 import requests import base64 from datetime import datetime, timedelta import os from typing import Dict, List, Optional def lambda_handler(event, context): """ Lambda function to fetch Delinea Centrify SSO audit events and store them in S3 """ # Environment variables S3_BUCKET = os.environ['S3_BUCKET'] S3_PREFIX = os.environ['S3_PREFIX'] STATE_KEY = os.environ['STATE_KEY'] # Centrify API credentials TENANT_URL = os.environ['TENANT_URL'] CLIENT_ID = os.environ['CLIENT_ID'] CLIENT_SECRET = os.environ['CLIENT_SECRET'] OAUTH_APP_ID = os.environ['OAUTH_APP_ID'] # Optional parameters PAGE_SIZE = int(os.environ.get('PAGE_SIZE', '1000')) MAX_PAGES = int(os.environ.get('MAX_PAGES', '10')) s3_client = boto3.client('s3') try: # Get last execution state last_timestamp = get_last_state(s3_client, S3_BUCKET, STATE_KEY) # Get OAuth access token access_token = get_oauth_token(TENANT_URL, CLIENT_ID, CLIENT_SECRET, OAUTH_APP_ID) # Fetch audit events events = fetch_audit_events(TENANT_URL, access_token, last_timestamp, PAGE_SIZE, MAX_PAGES) if events: # Store events in S3 current_timestamp = datetime.utcnow() filename = f"{S3_PREFIX}centrify-sso-events-{current_timestamp.strftime('%Y%m%d_%H%M%S')}.json" store_events_to_s3(s3_client, S3_BUCKET, filename, events) # Update state with latest timestamp latest_timestamp = get_latest_event_timestamp(events) update_state(s3_client, S3_BUCKET, STATE_KEY, latest_timestamp) print(f"Successfully processed {len(events)} events and stored to {filename}") else: print("No new events found") return { 'statusCode': 200, 'body': json.dumps(f'Successfully processed {len(events) if events else 0} events') } except Exception as e: print(f"Error processing Centrify SSO logs: {str(e)}") return { 'statusCode': 500, 'body': json.dumps(f'Error: {str(e)}') } def get_oauth_token(tenant_url: str, client_id: str, client_secret: str, oauth_app_id: str) -> str: """ Get OAuth access token using client credentials flow """ # Create basic auth token credentials = f"{client_id}:{client_secret}" basic_auth = base64.b64encode(credentials.encode()).decode() token_url = f"{tenant_url}/oauth2/token/{oauth_app_id}" headers = { 'Authorization': f'Basic {basic_auth}', 'X-CENTRIFY-NATIVE-CLIENT': 'True', 'Content-Type': 'application/x-www-form-urlencoded' } data = { 'grant_type': 'client_credentials', 'scope': 'siem redrock/query' } response = requests.post(token_url, headers=headers, data=data) response.raise_for_status() token_data = response.json() return token_data['access_token'] def fetch_audit_events(tenant_url: str, access_token: str, last_timestamp: str, page_size: int, max_pages: int) -> List[Dict]: """ Fetch audit events from Centrify using the Redrock/query API """ query_url = f"{tenant_url}/Redrock/query" headers = { 'Authorization': f'Bearer {access_token}', 'X-CENTRIFY-NATIVE-CLIENT': 'True', 'Content-Type': 'application/json' } # Build SQL query with timestamp filter if last_timestamp: sql_query = f"Select * from Event where WhenOccurred > '{last_timestamp}' ORDER BY WhenOccurred ASC" else: # First run - get events from last 24 hours sql_query = "Select * from Event where WhenOccurred > datefunc('now', '-1') ORDER BY WhenOccurred ASC" payload = { "Script": sql_query, "args": { "PageSize": page_size, "Limit": page_size * max_pages, "Caching": -1 } } response = requests.post(query_url, headers=headers, json=payload) response.raise_for_status() response_data = response.json() if not response_data.get('success', False): raise Exception(f"API query failed: {response_data.get('Message', 'Unknown error')}") # Parse the response result = response_data.get('Result', {}) columns = {col['Name']: i for i, col in enumerate(result.get('Columns', []))} raw_results = result.get('Results', []) events = [] for raw_event in raw_results: event = {} row_data = raw_event.get('Row', {}) # Map column names to values for col_name, col_index in columns.items(): if col_name in row_data and row_data[col_name] is not None: event[col_name] = row_data[col_name] # Add metadata event['_source'] = 'centrify_sso' event['_collected_at'] = datetime.utcnow().isoformat() + 'Z' events.append(event) return events def get_last_state(s3_client, bucket: str, state_key: str) -> Optional[str]: """ Get the last processed timestamp from S3 state file """ try: response = s3_client.get_object(Bucket=bucket, Key=state_key) state_data = json.loads(response['Body'].read().decode('utf-8')) return state_data.get('last_timestamp') except s3_client.exceptions.NoSuchKey: print("No previous state found, starting from 24 hours ago") return None except Exception as e: print(f"Error reading state: {e}") return None def update_state(s3_client, bucket: str, state_key: str, timestamp: str): """ Update the state file with the latest processed timestamp """ state_data = { 'last_timestamp': timestamp, 'updated_at': datetime.utcnow().isoformat() + 'Z' } s3_client.put_object( Bucket=bucket, Key=state_key, Body=json.dumps(state_data), ContentType='application/json' ) def store_events_to_s3(s3_client, bucket: str, key: str, events: List[Dict]): """ Store events as JSONL (one JSON object per line) in S3 """ # Convert to JSONL format (one JSON object per line) jsonl_content = 'n'.join(json.dumps(event, default=str) for event in events) s3_client.put_object( Bucket=bucket, Key=key, Body=jsonl_content, ContentType='application/x-ndjson' ) def get_latest_event_timestamp(events: List[Dict]) -> str: """ Get the latest timestamp from the events for state tracking """ if not events: return datetime.utcnow().isoformat() + 'Z' latest = None for event in events: when_occurred = event.get('WhenOccurred') if when_occurred: if latest is None or when_occurred > latest: latest = when_occurred return latest or datetime.utcnow().isoformat() + 'Z'
Acesse Configuração > Variáveis de ambiente.
Clique em Editar > Adicionar nova variável de ambiente.
Insira as variáveis de ambiente fornecidas na tabela a seguir, substituindo os valores de exemplo pelos seus.
Variáveis de ambiente
Chave Valor de exemplo S3_BUCKET
delinea-centrify-logs-bucket
S3_PREFIX
centrify-sso-logs/
STATE_KEY
centrify-sso-logs/state.json
TENANT_URL
https://yourtenant.my.centrify.com
CLIENT_ID
your-client-id
CLIENT_SECRET
your-client-secret
OAUTH_APP_ID
your-oauth-application-id
OAUTH_SCOPE
siem
PAGE_SIZE
1000
MAX_PAGES
10
Depois que a função for criada, permaneça na página dela ou abra Lambda > Functions > sua-função.
Selecione a guia Configuração.
No painel Configuração geral, clique em Editar.
Mude Tempo limite para 5 minutos (300 segundos) e clique em Salvar.
Criar uma programação do EventBridge
- Acesse Amazon EventBridge > Scheduler > Criar programação.
- Informe os seguintes detalhes de configuração:
- Programação recorrente: Taxa (
1 hour
). - Destino: sua função Lambda
CentrifySSOLogExport
. - Nome:
CentrifySSOLogExport-1h
.
- Programação recorrente: Taxa (
- Clique em Criar programação.
(Opcional) Criar um usuário e chaves do IAM somente leitura para o Google SecOps
- No console da AWS, acesse IAM > Usuários.
- Clique em Add users.
- Informe os seguintes detalhes de configuração:
- Usuário: insira
secops-reader
. - Tipo de acesso: selecione Chave de acesso – Acesso programático.
- Usuário: insira
- Clique em Criar usuário.
- Anexe a política de leitura mínima (personalizada): Usuários > secops-reader > Permissões.
- Clique em Adicionar permissões > Anexar políticas diretamente.
- Selecione Criar política.
JSON:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket" } ] }
Name =
secops-reader-policy
.Clique em Criar política > pesquise/selecione > Próxima.
Clique em Adicionar permissões
Crie uma chave de acesso para
secops-reader
: Credenciais de segurança > Chaves de acesso.Clique em Criar chave de acesso.
Faça o download do
.CSV
. Cole esses valores no feed.
Configurar um feed no Google SecOps para ingerir registros de SSO da Delinea (Centrify)
- Acesse Configurações do SIEM > Feeds.
- Clique em + Adicionar novo feed.
- No campo Nome do feed, insira um nome para o feed (por exemplo,
Delinea Centrify SSO logs
). - Selecione Amazon S3 V2 como o Tipo de origem.
- Selecione Centrify como o Tipo de registro.
- Clique em Próxima.
- Especifique valores para os seguintes parâmetros de entrada:
- URI do S3:
s3://delinea-centrify-logs-bucket/centrify-sso-logs/
- Opções de exclusão de fontes: selecione a opção de exclusão de acordo com sua preferência.
- Idade máxima do arquivo: inclui arquivos modificados no último número de dias. O padrão é de 180 dias.
- ID da chave de acesso: chave de acesso do usuário com acesso ao bucket do S3.
- Chave de acesso secreta: chave secreta do usuário com acesso ao bucket do S3.
- Namespace do recurso: o namespace do recurso.
- Rótulos de ingestão: o rótulo aplicado aos eventos deste feed.
- URI do S3:
- Clique em Próxima.
- Revise a nova configuração do feed na tela Finalizar e clique em Enviar.
Tabela de mapeamento do UDM
Campo de registro | Mapeamento do UDM | Lógica |
---|---|---|
AccountID |
security_result.detection_fields.value |
O valor de AccountID do registro bruto é atribuído a um objeto security_result.detection_fields com key :Account ID . |
ApplicationName |
target.application |
O valor de ApplicationName do registro bruto é atribuído ao campo target.application . |
AuthorityFQDN |
target.asset.network_domain |
O valor de AuthorityFQDN do registro bruto é atribuído ao campo target.asset.network_domain . |
AuthorityID |
target.asset.asset_id |
O valor de AuthorityID do registro bruto é atribuído ao campo target.asset.asset_id , com o prefixo "AuthorityID:". |
AzDeploymentId |
security_result.detection_fields.value |
O valor de AzDeploymentId do registro bruto é atribuído a um objeto security_result.detection_fields com key :AzDeploymentId . |
AzRoleId |
additional.fields.value.string_value |
O valor de AzRoleId do registro bruto é atribuído a um objeto additional.fields com key :AzRole Id . |
AzRoleName |
target.user.attribute.roles.name |
O valor de AzRoleName do registro bruto é atribuído ao campo target.user.attribute.roles.name . |
ComputerFQDN |
principal.asset.network_domain |
O valor de ComputerFQDN do registro bruto é atribuído ao campo principal.asset.network_domain . |
ComputerID |
principal.asset.asset_id |
O valor de ComputerID do registro bruto é atribuído ao campo principal.asset.asset_id , com o prefixo "ComputerId:". |
ComputerName |
about.hostname |
O valor de ComputerName do registro bruto é atribuído ao campo about.hostname . |
CredentialId |
security_result.detection_fields.value |
O valor de CredentialId do registro bruto é atribuído a um objeto security_result.detection_fields com key :Credential Id . |
DirectoryServiceName |
security_result.detection_fields.value |
O valor de DirectoryServiceName do registro bruto é atribuído a um objeto security_result.detection_fields com key :Directory Service Name . |
DirectoryServiceNameLocalized |
security_result.detection_fields.value |
O valor de DirectoryServiceNameLocalized do registro bruto é atribuído a um objeto security_result.detection_fields com key :Directory Service Name Localized . |
DirectoryServiceUuid |
security_result.detection_fields.value |
O valor de DirectoryServiceUuid do registro bruto é atribuído a um objeto security_result.detection_fields com key :Directory Service Uuid . |
EventMessage |
security_result.summary |
O valor de EventMessage do registro bruto é atribuído ao campo security_result.summary . |
EventType |
metadata.product_event_type |
O valor de EventType do registro bruto é atribuído ao campo metadata.product_event_type . Ele também é usado para determinar o metadata.event_type . |
FailReason |
security_result.summary |
O valor de FailReason do registro bruto é atribuído ao campo security_result.summary quando presente. |
FailUserName |
target.user.email_addresses |
O valor de FailUserName do registro bruto é atribuído ao campo target.user.email_addresses quando presente. |
FromIPAddress |
principal.ip |
O valor de FromIPAddress do registro bruto é atribuído ao campo principal.ip . |
ID |
security_result.detection_fields.value |
O valor de ID do registro bruto é atribuído a um objeto security_result.detection_fields com key :ID . |
InternalTrackingID |
metadata.product_log_id |
O valor de InternalTrackingID do registro bruto é atribuído ao campo metadata.product_log_id . |
JumpType |
additional.fields.value.string_value |
O valor de JumpType do registro bruto é atribuído a um objeto additional.fields com key :Jump Type . |
NormalizedUser |
target.user.email_addresses |
O valor de NormalizedUser do registro bruto é atribuído ao campo target.user.email_addresses . |
OperationMode |
additional.fields.value.string_value |
O valor de OperationMode do registro bruto é atribuído a um objeto additional.fields com key :Operation Mode . |
ProxyId |
security_result.detection_fields.value |
O valor de ProxyId do registro bruto é atribuído a um objeto security_result.detection_fields com key :Proxy Id . |
RequestUserAgent |
network.http.user_agent |
O valor de RequestUserAgent do registro bruto é atribuído ao campo network.http.user_agent . |
SessionGuid |
network.session_id |
O valor de SessionGuid do registro bruto é atribuído ao campo network.session_id . |
Tenant |
additional.fields.value.string_value |
O valor de Tenant do registro bruto é atribuído a um objeto additional.fields com key :Tenant . |
ThreadType |
additional.fields.value.string_value |
O valor de ThreadType do registro bruto é atribuído a um objeto additional.fields com key :Thread Type . |
UserType |
principal.user.attribute.roles.name |
O valor de UserType do registro bruto é atribuído ao campo principal.user.attribute.roles.name . |
WhenOccurred |
metadata.event_timestamp |
O valor de WhenOccurred do registro bruto é analisado e atribuído ao campo metadata.event_timestamp . Esse campo também preenche o campo timestamp de nível superior. Valor codificado "SSO". Determinado pelo campo EventType . O padrão será STATUS_UPDATE se EventType não estiver presente ou não corresponder a nenhum critério específico. Pode ser USER_LOGIN , USER_CREATION , USER_RESOURCE_ACCESS , USER_LOGOUT ou USER_CHANGE_PASSWORD . Valor codificado "CENTRIFY_SSO". Valor codificado "SSO". Valor codificado "Centrify". Se o campo message contiver um ID de sessão, ele será extraído e usado. Caso contrário, o padrão é "1". Extraído do campo host , se disponível, que vem do cabeçalho do syslog. Extraído do campo pid , se disponível, que vem do cabeçalho do syslog. Se UserGuid estiver presente, o valor dele será usado. Caso contrário, se o campo message contiver um ID de usuário, ele será extraído e usado. Definido como "ALLOW" se Level for "Info" e "BLOCK" se FailReason estiver presente. Definido como "AUTH_VIOLATION" se FailReason estiver presente. Determinado pelo campo Level . Definido como "INFORMATIONAL" se Level for "Info", "MEDIUM" se Level for "Warning" e "ERROR" se Level for "Error". |
Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais do Google SecOps.