Coletar registros do serviço Citrix Monitor
Este documento explica como ingerir registros do Citrix Monitor Service no Google Security Operations usando o Google Cloud Storage. O analisador transforma registros brutos formatados em JSON em um formato estruturado de acordo com a UDM do Google SecOps. Ele extrai campos relevantes do registro bruto, mapeia-os para os campos correspondentes da UDM e enriquece os dados com contexto adicional, como informações do usuário, detalhes da máquina e atividade de rede.
Antes de começar
Verifique se você tem os pré-requisitos a seguir:
- Uma instância do Google SecOps
- Um projeto do GCP com a API Cloud Storage ativada
- Permissões para criar e gerenciar buckets do GCS
- Permissões para gerenciar políticas do IAM em buckets do GCS
- Permissões para criar funções do Cloud Run, tópicos do Pub/Sub e jobs do Cloud Scheduler
- Permissões para criar contas de serviço e gerenciar papéis do IAM
- Acesso privilegiado ao locatário do Citrix Cloud
- Credenciais da API Citrix Cloud (ID do cliente, chave secreta do cliente, ID do cliente)
Coletar os pré-requisitos do serviço Citrix Monitor
- Faça login no Citrix Cloud Console (em inglês).
- Acesse Identity and Access Management > Acesso à API.
- Clique em Criar cliente.
Copie e salve em um local seguro os seguintes detalhes:
- Client-ID
- Client Secret
- ID do cliente (visível no console do Citrix Cloud)
- URL base da API:
- EUA/UE/AP-S:
https://api.cloud.com - Japão:
https://api.citrixcloud.jp
- EUA/UE/AP-S:
Criar um bucket do Google Cloud Storage
- Acesse o Console do Google Cloud.
- Selecione seu projeto ou crie um novo.
- No menu de navegação, acesse Cloud Storage > Buckets.
- Clique em Criar bucket.
Informe os seguintes detalhes de configuração:
Configuração Valor Nomeie seu bucket Insira um nome exclusivo globalmente, por exemplo, citrix-monitor-logs.Tipo de local Escolha com base nas suas necessidades (região, birregional, multirregional) Local Selecione o local (por exemplo, us-central1).Classe de armazenamento Padrão (recomendado para registros acessados com frequência) Controle de acesso Uniforme (recomendado) Ferramentas de proteção Opcional: ativar o controle de versões de objetos ou a política de retenção Clique em Criar.
Criar uma conta de serviço para a função do Cloud Run
A função do Cloud Run precisa de uma conta de serviço com permissões para gravar no bucket do GCS.
Criar conta de serviço
- No Console do GCP, acesse IAM e administrador > Contas de serviço.
- Clique em Criar conta de serviço.
- Informe os seguintes detalhes de configuração:
- Nome da conta de serviço: insira
citrix-monitor-collector-sa. - Descrição da conta de serviço: insira
Service account for Cloud Run function to collect Citrix Monitor Service logs.
- Nome da conta de serviço: insira
- Clique em Criar e continuar.
- Na seção Conceda a essa conta de serviço acesso ao projeto:
- Clique em Selecionar papel.
- Pesquise e selecione Administrador de objetos do Storage.
- Clique em + Adicionar outro papel.
- Pesquise e selecione Invocador do Cloud Run.
- Clique em + Adicionar outro papel.
- Pesquise e selecione Invocador do Cloud Functions.
- Clique em Continuar.
- Clique em Concluído.
Esses papéis são necessários para:
- Administrador de objetos do Storage: grava registros em um bucket do GCS e gerencia arquivos de estado.
- Invocador do Cloud Run: permite que o Pub/Sub invoque a função
- Invocador do Cloud Functions: permite a invocação de funções
Conceder permissões do IAM no bucket do GCS
Conceda permissões de gravação à conta de serviço no bucket do GCS:
- Acesse Cloud Storage > Buckets.
- Clique no nome do bucket.
- Acesse a guia Permissões.
- Clique em Conceder acesso.
- Informe os seguintes detalhes de configuração:
- Adicionar principais: insira o e-mail da conta de serviço (
citrix-monitor-collector-sa@PROJECT_ID.iam.gserviceaccount.com). - Atribuir papéis: selecione Administrador de objetos do Storage.
- Adicionar principais: insira o e-mail da conta de serviço (
- Clique em Salvar.
Criar tópico Pub/Sub
Crie um tópico do Pub/Sub em que o Cloud Scheduler vai publicar e a função do Cloud Run vai se inscrever.
- No Console do GCP, acesse Pub/Sub > Tópicos.
- Selecione Criar tópico.
- Informe os seguintes detalhes de configuração:
- ID do tópico: insira
citrix-monitor-trigger. - Não altere as outras configurações.
- ID do tópico: insira
- Clique em Criar.
Criar uma função do Cloud Run para coletar registros
A função do Cloud Run é acionada por mensagens do Pub/Sub do Cloud Scheduler para buscar registros da API Citrix Monitor Service e gravá-los no GCS.
- No console do GCP, acesse o Cloud Run.
- Clique em Criar serviço.
- Selecione Função (use um editor in-line para criar uma função).
Na seção Configurar, forneça os seguintes detalhes de configuração:
Configuração Valor Nome do serviço citrix-monitor-collectorRegião Selecione a região que corresponde ao seu bucket do GCS (por exemplo, us-central1).Ambiente de execução Selecione Python 3.12 ou uma versão mais recente. Na seção Acionador (opcional):
- Clique em + Adicionar gatilho.
- Selecione Cloud Pub/Sub.
- Em Selecionar um tópico do Cloud Pub/Sub, escolha o tópico (
citrix-monitor-trigger). - Clique em Salvar.
Na seção Autenticação:
- Selecione Exigir autenticação.
- Confira o Identity and Access Management (IAM).
Role a tela para baixo e abra Contêineres, rede, segurança.
Acesse a guia Segurança:
- Conta de serviço: selecione a conta de serviço (
citrix-monitor-collector-sa).
- Conta de serviço: selecione a conta de serviço (
Acesse a guia Contêineres:
- Clique em Variáveis e secrets.
- Clique em + Adicionar variável para cada variável de ambiente:
Nome da variável Valor de exemplo GCS_BUCKETcitrix-monitor-logsGCS_PREFIXcitrix_monitorSTATE_KEYcitrix_monitor/state.jsonCITRIX_CLIENT_IDyour-client-idCITRIX_CLIENT_SECRETyour-client-secretCITRIX_CUSTOMER_IDyour-customer-idAPI_BASEhttps://api.cloud.comENTITIESMachines,Sessions,Connections,Applications,UsersPAGE_SIZE1000LOOKBACK_MINUTES75USE_TIME_FILTERtrueRole a tela para baixo na guia Variáveis e secrets até Solicitações:
- Tempo limite da solicitação: insira
600segundos (10 minutos).
- Tempo limite da solicitação: insira
Acesse a guia Configurações em Contêineres:
- Na seção Recursos:
- Memória: selecione 512 MiB ou mais.
- CPU: selecione 1.
- Clique em Concluído.
- Na seção Recursos:
Role até Ambiente de execução:
- Selecione Padrão (recomendado).
Na seção Escalonamento de revisão:
- Número mínimo de instâncias: insira
0. - Número máximo de instâncias: insira
100ou ajuste com base na carga esperada.
- Número mínimo de instâncias: insira
Clique em Criar.
Aguarde a criação do serviço (1 a 2 minutos).
Depois que o serviço é criado, o editor de código inline é aberto automaticamente.
Adicionar código da função
- Insira main em Ponto de entrada da função.
No editor de código em linha, crie dois arquivos:
- Primeiro arquivo: main.py::
import functions_framework from google.cloud import storage import json import os import urllib3 from datetime import datetime, timedelta, timezone import uuid # Citrix Cloud OAuth2 endpoint template TOKEN_URL_TMPL = "{api_base}/cctrustoauth2/{customerid}/tokens/clients" DEFAULT_API_BASE = "https://api.cloud.com" MONITOR_BASE_PATH = "/monitorodata" # Initialize HTTP client http = urllib3.PoolManager() # Initialize Storage client storage_client = storage.Client() def http_post_form(url, data_dict): """POST form data to get authentication token.""" encoded_data = urllib3.request.urlencode(data_dict) response = http.request( 'POST', url, body=encoded_data, headers={ 'Accept': 'application/json', 'Content-Type': 'application/x-www-form-urlencoded' } ) return json.loads(response.data.decode('utf-8')) def http_get_json(url, headers): """GET JSON data from API endpoint.""" response = http.request('GET', url, headers=headers) return json.loads(response.data.decode('utf-8')) def get_citrix_token(api_base, customer_id, client_id, client_secret): """Get Citrix Cloud authentication token.""" url = TOKEN_URL_TMPL.format( api_base=api_base.rstrip('/'), customerid=customer_id ) payload = { 'grant_type': 'client_credentials', 'client_id': client_id, 'client_secret': client_secret } response = http_post_form(url, payload) return response['access_token'] def build_entity_url(api_base, entity, filter_query=None, top=None): """Build OData URL with optional filter and pagination.""" base = api_base.rstrip('/') + MONITOR_BASE_PATH + '/' + entity params = [] if filter_query: # Encode filter query with safe characters for OData encoded_filter = urllib3.request.urlencode({'$filter': filter_query})[9:] # Remove '$filter=' params.append('$filter=' + encoded_filter) if top: params.append('$top=' + str(top)) return base + ('?' + '&'.join(params) if params else '') def fetch_entity_rows(entity, start_iso=None, end_iso=None, page_size=1000, headers=None, api_base=DEFAULT_API_BASE): """Fetch entity data with optional time filtering and pagination.""" first_url = None if start_iso and end_iso: filter_query = f"(ModifiedDate ge {start_iso} and ModifiedDate lt {end_iso})" first_url = build_entity_url(api_base, entity, filter_query, page_size) else: first_url = build_entity_url(api_base, entity, None, page_size) url = first_url while url: try: data = http_get_json(url, headers) items = data.get('value', []) for item in items: yield item url = data.get('@odata.nextLink') except Exception as e: # If ModifiedDate filtering fails, fall back to unfiltered query if 'Bad Request' in str(e) and start_iso and end_iso: print(f"ModifiedDate filter not supported for {entity}, falling back to unfiltered query") url = build_entity_url(api_base, entity, None, page_size) continue else: raise def load_state(bucket, key): """Read the last processed timestamp from GCS state file.""" try: blob = bucket.blob(key) if blob.exists(): content = blob.download_as_text() state = json.loads(content) timestamp_str = state.get('last_hour_utc') if timestamp_str: return datetime.fromisoformat(timestamp_str.replace('Z', '+00:00')).replace(tzinfo=None) except Exception as e: print(f"Warning: Could not load state: {str(e)}") return None def save_state(bucket, key, dt_utc): """Write the current processed timestamp to GCS state file.""" state = {'last_hour_utc': dt_utc.isoformat() + 'Z'} blob = bucket.blob(key) blob.upload_from_string( json.dumps(state, separators=(',', ':')), content_type='application/json' ) def write_ndjson_to_gcs(bucket, key, rows): """Write rows as NDJSON to GCS.""" body_lines = [] for row in rows: json_line = json.dumps(row, separators=(',', ':'), ensure_ascii=False) body_lines.append(json_line) body = '\n'.join(body_lines) + '\n' blob = bucket.blob(key) blob.upload_from_string(body, content_type='application/x-ndjson') @functions_framework.cloud_event def main(cloud_event): """ Cloud Run function triggered by Pub/Sub to fetch Citrix Monitor Service logs and write to GCS. Args: cloud_event: CloudEvent object containing Pub/Sub message """ # Get environment variables bucket_name = os.environ.get('GCS_BUCKET') prefix = os.environ.get('GCS_PREFIX', 'citrix_monitor').strip('/') state_key = os.environ.get('STATE_KEY') or f"{prefix}/state.json" customer_id = os.environ.get('CITRIX_CUSTOMER_ID') client_id = os.environ.get('CITRIX_CLIENT_ID') client_secret = os.environ.get('CITRIX_CLIENT_SECRET') api_base = os.environ.get('API_BASE', DEFAULT_API_BASE) entities = [e.strip() for e in os.environ.get('ENTITIES', 'Machines,Sessions,Connections,Applications,Users').split(',') if e.strip()] page_size = int(os.environ.get('PAGE_SIZE', '1000')) lookback_minutes = int(os.environ.get('LOOKBACK_MINUTES', '75')) use_time_filter = os.environ.get('USE_TIME_FILTER', 'true').lower() == 'true' if not all([bucket_name, customer_id, client_id, client_secret]): print('Error: Missing required environment variables') return try: # Get GCS bucket bucket = storage_client.bucket(bucket_name) # Time window calculation now = datetime.utcnow() fallback_hour = (now - timedelta(minutes=lookback_minutes)).replace(minute=0, second=0, microsecond=0) last_processed = load_state(bucket, state_key) target_hour = (last_processed + timedelta(hours=1)) if last_processed else fallback_hour start_iso = target_hour.isoformat() + 'Z' end_iso = (target_hour + timedelta(hours=1)).isoformat() + 'Z' # Authentication token = get_citrix_token(api_base, customer_id, client_id, client_secret) headers = { 'Authorization': f'CWSAuth bearer={token}', 'Citrix-CustomerId': customer_id, 'Accept': 'application/json', 'Accept-Encoding': 'gzip, deflate, br', 'User-Agent': 'citrix-monitor-gcs-collector/1.0' } total_records = 0 # Process each entity type for entity in entities: rows_batch = [] try: entity_generator = fetch_entity_rows( entity=entity, start_iso=start_iso if use_time_filter else None, end_iso=end_iso if use_time_filter else None, page_size=page_size, headers=headers, api_base=api_base ) for row in entity_generator: # Store raw Citrix data directly for proper parser recognition rows_batch.append(row) # Write in batches to avoid memory issues if len(rows_batch) >= 1000: gcs_key = f"{prefix}/{entity}/year={target_hour.year:04d}/month={target_hour.month:02d}/day={target_hour.day:02d}/hour={target_hour.hour:02d}/part-{uuid.uuid4().hex}.ndjson" write_ndjson_to_gcs(bucket, gcs_key, rows_batch) total_records += len(rows_batch) rows_batch = [] except Exception as ex: print(f"Error processing entity {entity}: {str(ex)}") continue # Write remaining records if rows_batch: gcs_key = f"{prefix}/{entity}/year={target_hour.year:04d}/month={target_hour.month:02d}/day={target_hour.day:02d}/hour={target_hour.hour:02d}/part-{uuid.uuid4().hex}.ndjson" write_ndjson_to_gcs(bucket, gcs_key, rows_batch) total_records += len(rows_batch) # Update state file save_state(bucket, state_key, target_hour) print(f"Successfully processed {total_records} records for hour {start_iso}") print(f"Entities processed: {', '.join(entities)}") except Exception as e: print(f'Error processing Citrix Monitor logs: {str(e)}') raise- Segundo arquivo: requirements.txt:
functions-framework==3.* google-cloud-storage==2.* urllib3>=2.0.0Clique em Implantar para salvar e implantar a função.
Aguarde a conclusão da implantação (2 a 3 minutos).
Criar o job do Cloud Scheduler
O Cloud Scheduler publica mensagens no tópico do Pub/Sub em intervalos regulares, acionando a função do Cloud Run.
- No Console do GCP, acesse o Cloud Scheduler.
- Clique em Criar job.
Informe os seguintes detalhes de configuração:
Configuração Valor Nome citrix-monitor-collector-hourlyRegião Selecione a mesma região da função do Cloud Run Frequência 0 * * * *(a cada hora, na hora)Fuso horário Selecione o fuso horário (UTC recomendado) Tipo de destino Pub/Sub Tópico Selecione o assunto ( citrix-monitor-trigger)Corpo da mensagem {}(objeto JSON vazio)Clique em Criar.
Testar o job do programador
- No console do Cloud Scheduler, encontre seu job.
- Clique em Forçar execução para acionar manualmente.
- Aguarde alguns segundos e acesse Cloud Run > Serviços > citrix-monitor-collector > Registros.
- Verifique se a função foi executada com sucesso.
- Verifique o bucket do GCS para confirmar se os registros foram gravados.
Recuperar a conta de serviço do Google SecOps
O Google SecOps usa uma conta de serviço exclusiva para ler dados do seu bucket do GCS. Você precisa conceder a essa conta de serviço acesso ao seu bucket.
Receber o e-mail da conta de serviço
- Acesse Configurações do SIEM > Feeds.
- Clique em Adicionar novo feed.
- Clique em Configurar um único feed.
- No campo Nome do feed, insira um nome para o feed (por exemplo,
Citrix Monitor Service logs). - Selecione Google Cloud Storage V2 como o Tipo de origem.
- Selecione Citrix Monitor como o Tipo de registro.
Clique em Receber conta de serviço. Um e-mail exclusivo da conta de serviço será exibido, por exemplo:
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.comCopie esse endereço de e-mail para usar na próxima etapa.
Conceder permissões do IAM à conta de serviço do Google SecOps
A conta de serviço do Google SecOps precisa do papel de Leitor de objetos do Storage no seu bucket do GCS.
- Acesse Cloud Storage > Buckets.
- Clique no nome do bucket.
- Acesse a guia Permissões.
- Clique em Conceder acesso.
- Informe os seguintes detalhes de configuração:
- Adicionar participantes: cole o e-mail da conta de serviço do Google SecOps.
- Atribuir papéis: selecione Leitor de objetos do Storage.
Clique em Salvar.
Configurar um feed no Google SecOps para ingerir registros do Citrix Monitor Service
- Acesse Configurações do SIEM > Feeds.
- Clique em Adicionar novo feed.
- Clique em Configurar um único feed.
- No campo Nome do feed, insira um nome para o feed (por exemplo,
Citrix Monitor Service logs). - Selecione Google Cloud Storage V2 como o Tipo de origem.
- Selecione Citrix Monitor como o Tipo de registro.
- Clique em Próxima.
Especifique valores para os seguintes parâmetros de entrada:
URL do bucket de armazenamento: insira o URI do bucket do GCS com o caminho do prefixo:
gs://citrix-monitor-logs/citrix_monitor/Substitua:
citrix-monitor-logs: o nome do bucket do GCS.citrix_monitor: prefixo/caminho da pasta opcional onde os registros são armazenados (deixe em branco para a raiz).
Opção de exclusão da fonte: selecione a opção de exclusão de acordo com sua preferência:
- Nunca: nunca exclui arquivos após as transferências (recomendado para testes).
- Excluir arquivos transferidos: exclui os arquivos após a transferência bem-sucedida.
Excluir arquivos transferidos e diretórios vazios: exclui arquivos e diretórios vazios após a transferência bem-sucedida.
Idade máxima do arquivo: inclui arquivos modificados no último número de dias. O padrão é de 180 dias.
Namespace do recurso: o namespace do recurso.
Rótulos de ingestão: o rótulo a ser aplicado aos eventos deste feed.
Clique em Próxima.
Revise a nova configuração do feed na tela Finalizar e clique em Enviar.
Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais do Google SecOps.