Registros de auditoria do ServiceNow
Este documento explica como ingerir registros de auditoria do ServiceNow no Google Security Operations usando vários métodos.
Opção A: GCS com função do Cloud Run
Esse método usa uma função do Cloud Run para consultar periodicamente a API REST do ServiceNow em busca de registros de auditoria e armazená-los em um bucket do GCS. Em seguida, o Google Security Operations coleta os registros do bucket do GCS.
Antes de começar
Verifique se você atende os seguintes pré-requisitos:
- Uma instância do Google SecOps
- Acesso privilegiado ao locatário ou à API do ServiceNow com as funções adequadas (normalmente
adminou usuário com acesso de leitura à tabela sys_audit) - Um projeto do GCP com a API Cloud Storage ativada
- Permissões para criar e gerenciar buckets do GCS
- Permissões para gerenciar políticas do IAM em buckets do GCS
- Permissões para criar serviços do Cloud Run, tópicos do Pub/Sub e jobs do Cloud Scheduler
Coletar os pré-requisitos do ServiceNow (IDs, chaves de API, IDs da organização, tokens)
- Faça login no ServiceNow Admin Console.
- Acesse Segurança do sistema > Usuários e grupos > Usuários.
- Crie um usuário ou selecione um usuário com as permissões adequadas para acessar os registros de auditoria.
Copie e salve em um local seguro os seguintes detalhes:
- Nome de usuário
- Senha
- URL da instância (por exemplo,
https://instance.service-now.com)
Configurar ACL para usuários que não são administradores
Se você quiser usar uma conta de usuário não administrador, crie uma lista de controle de acesso (ACL) personalizada para conceder acesso de leitura à tabela sys_audit:
- Faça login no Admin Console do ServiceNow como administrador.
- Acesse Segurança do sistema > Controle de acesso (ACL).
- Clique em Novo.
- Informe os seguintes detalhes de configuração:
- Tipo: selecione registro.
- Operação: selecione ler.
- Nome: insira
sys_audit. - Descrição: insira
Allow read access to sys_audit table for Chronicle integration.
- No campo Requer função, adicione a função atribuída ao usuário da integração (por exemplo,
chronicle_reader). - Clique em Enviar.
- Verifique se a ACL está ativa e se o usuário pode consultar a tabela sys_audit.
Criar um bucket do Google Cloud Storage
- Acesse o Console do Google Cloud.
- Selecione seu projeto ou crie um novo.
- No menu de navegação, acesse Cloud Storage > Buckets.
- Clique em Criar bucket.
Informe os seguintes detalhes de configuração:
Configuração Valor Nomeie seu bucket Insira um nome exclusivo globalmente, por exemplo, servicenow-audit-logs.Tipo de local Escolha com base nas suas necessidades (região, birregional, multirregional) Local Selecione o local (por exemplo, us-central1).Classe de armazenamento Padrão (recomendado para registros acessados com frequência) Controle de acesso Uniforme (recomendado) Ferramentas de proteção Opcional: ativar o controle de versões de objetos ou a política de retenção Clique em Criar.
Criar uma conta de serviço para a função do Cloud Run
A função do Cloud Run precisa de uma conta de serviço com permissões para gravar no bucket do GCS e ser invocada pelo Pub/Sub.
Criar conta de serviço
- No Console do GCP, acesse IAM e administrador > Contas de serviço.
- Clique em Criar conta de serviço.
- Informe os seguintes detalhes de configuração:
- Nome da conta de serviço: insira
servicenow-audit-collector-sa. - Descrição da conta de serviço: insira
Service account for Cloud Run function to collect ServiceNow audit logs.
- Nome da conta de serviço: insira
- Clique em Criar e continuar.
- Na seção Conceder acesso a essa conta de serviço ao projeto, adicione os seguintes papéis:
- Clique em Selecionar papel.
- Pesquise e selecione Administrador de objetos do Storage.
- Clique em + Adicionar outro papel.
- Pesquise e selecione Invocador do Cloud Run.
- Clique em + Adicionar outro papel.
- Pesquise e selecione Invocador do Cloud Functions.
- Clique em Continuar.
- Clique em Concluído.
Esses papéis são necessários para:
- Administrador de objetos do Storage: grava registros em um bucket do GCS e gerencia arquivos de estado.
- Invocador do Cloud Run: permite que o Pub/Sub invoque a função
- Invocador do Cloud Functions: permite a invocação de funções
Conceder permissões do IAM no bucket do GCS
Conceda permissões de gravação à conta de serviço no bucket do GCS:
- Acesse Cloud Storage > Buckets.
- Clique no nome do bucket.
- Acesse a guia Permissões.
- Clique em Conceder acesso.
- Informe os seguintes detalhes de configuração:
- Adicionar principais: insira o e-mail da conta de serviço (por exemplo,
servicenow-audit-collector-sa@PROJECT_ID.iam.gserviceaccount.com). - Atribuir papéis: selecione Administrador de objetos do Storage.
- Adicionar principais: insira o e-mail da conta de serviço (por exemplo,
- Clique em Salvar.
Criar tópico Pub/Sub
Crie um tópico do Pub/Sub em que o Cloud Scheduler vai publicar e a função do Cloud Run vai se inscrever.
- No Console do GCP, acesse Pub/Sub > Tópicos.
- Selecione Criar tópico.
- Informe os seguintes detalhes de configuração:
- ID do tópico: insira
servicenow-audit-trigger. - Não altere as outras configurações.
- ID do tópico: insira
- Clique em Criar.
Criar uma função do Cloud Run para coletar registros
A função do Cloud Run é acionada por mensagens do Pub/Sub do Cloud Scheduler para buscar registros da API ServiceNow e gravá-los no GCS.
- No console do GCP, acesse o Cloud Run.
- Clique em Criar serviço.
- Selecione Função (use um editor in-line para criar uma função).
Na seção Configurar, forneça os seguintes detalhes de configuração:
Configuração Valor Nome do serviço servicenow-audit-collectorRegião Selecione a região que corresponde ao seu bucket do GCS (por exemplo, us-central1).Ambiente de execução Selecione Python 3.12 ou uma versão mais recente. Na seção Acionador (opcional):
- Clique em + Adicionar gatilho.
- Selecione Cloud Pub/Sub.
- Em Selecionar um tópico do Cloud Pub/Sub, escolha o tópico do Pub/Sub (
servicenow-audit-trigger). - Clique em Salvar.
Na seção Autenticação:
- Selecione Exigir autenticação.
- Confira o Identity and Access Management (IAM).
Role a tela para baixo e abra Contêineres, rede, segurança.
Acesse a guia Segurança:
- Conta de serviço: selecione a conta de serviço (
servicenow-audit-collector-sa).
- Conta de serviço: selecione a conta de serviço (
Acesse a guia Contêineres:
- Clique em Variáveis e secrets.
- Clique em + Adicionar variável para cada variável de ambiente:
Nome da variável Valor de exemplo Descrição GCS_BUCKETservicenow-audit-logsNome do bucket do GCS GCS_PREFIXaudit-logsPrefixo para arquivos de registro STATE_KEYaudit-logs/state.jsonCaminho do arquivo de estado API_BASE_URLhttps://instance.service-now.comURL da instância do ServiceNow API_USERNAMEyour-usernameNome de usuário do ServiceNow API_PASSWORDyour-passwordSenha do ServiceNow PAGE_SIZE1000Registros por página MAX_PAGES1000Número máximo de páginas a serem buscadas Na seção Variáveis e secrets, role a tela para baixo até Solicitações:
- Tempo limite da solicitação: insira
600segundos (10 minutos).
- Tempo limite da solicitação: insira
Acesse a guia Configurações:
- Na seção Recursos:
- Memória: selecione 512 MiB ou mais.
- CPU: selecione 1.
- Na seção Recursos:
Na seção Escalonamento de revisão:
- Número mínimo de instâncias: insira
0. - Número máximo de instâncias: insira
100ou ajuste com base na carga esperada.
- Número mínimo de instâncias: insira
Clique em Criar.
Aguarde a criação do serviço (1 a 2 minutos).
Depois que o serviço é criado, o editor de código inline é aberto automaticamente.
Adicionar código da função
- Insira main no campo Ponto de entrada.
No editor de código em linha, crie dois arquivos:
Primeiro arquivo: main.py::
import functions_framework from google.cloud import storage import json import os import urllib3 from datetime import datetime, timezone, timedelta import time import base64 # Initialize HTTP client with timeouts http = urllib3.PoolManager( timeout=urllib3.Timeout(connect=5.0, read=30.0), retries=False, ) # Initialize Storage client storage_client = storage.Client() # Environment variables GCS_BUCKET = os.environ.get('GCS_BUCKET') GCS_PREFIX = os.environ.get('GCS_PREFIX', 'audit-logs') STATE_KEY = os.environ.get('STATE_KEY', 'audit-logs/state.json') API_BASE = os.environ.get('API_BASE_URL') USERNAME = os.environ.get('API_USERNAME') PASSWORD = os.environ.get('API_PASSWORD') PAGE_SIZE = int(os.environ.get('PAGE_SIZE', '1000')) MAX_PAGES = int(os.environ.get('MAX_PAGES', '1000')) def parse_datetime(value: str) -> datetime: """Parse ServiceNow datetime string to datetime object.""" # ServiceNow format: YYYY-MM-DD HH:MM:SS try: return datetime.strptime(value, '%Y-%m-%d %H:%M:%S').replace(tzinfo=timezone.utc) except ValueError: # Try ISO format as fallback if value.endswith("Z"): value = value[:-1] + "+00:00" return datetime.fromisoformat(value) @functions_framework.cloud_event def main(cloud_event): """ Cloud Run function triggered by Pub/Sub to fetch ServiceNow audit logs and write to GCS. Args: cloud_event: CloudEvent object containing Pub/Sub message """ if not all([GCS_BUCKET, API_BASE, USERNAME, PASSWORD]): print('Error: Missing required environment variables') return try: # Get GCS bucket bucket = storage_client.bucket(GCS_BUCKET) # Load state state = load_state(bucket, STATE_KEY) # Determine time window now = datetime.now(timezone.utc) last_time = None if isinstance(state, dict) and state.get("last_event_time"): try: last_time = parse_datetime(state["last_event_time"]) # Overlap by 2 minutes to catch any delayed events last_time = last_time - timedelta(minutes=2) except Exception as e: print(f"Warning: Could not parse last_event_time: {e}") if last_time is None: last_time = now - timedelta(hours=24) print(f"Fetching logs from {last_time.strftime('%Y-%m-%d %H:%M:%S')} to {now.strftime('%Y-%m-%d %H:%M:%S')}") # Fetch logs records, newest_event_time = fetch_logs( api_base=API_BASE, username=USERNAME, password=PASSWORD, start_time=last_time, end_time=now, page_size=PAGE_SIZE, max_pages=MAX_PAGES, ) if not records: print("No new log records found.") save_state(bucket, STATE_KEY, now.strftime('%Y-%m-%d %H:%M:%S')) return # Write to GCS as NDJSON timestamp = now.strftime('%Y%m%d_%H%M%S') object_key = f"{GCS_PREFIX}/logs_{timestamp}.ndjson" blob = bucket.blob(object_key) ndjson = '\n'.join([json.dumps(record, ensure_ascii=False) for record in records]) + '\n' blob.upload_from_string(ndjson, content_type='application/x-ndjson') print(f"Wrote {len(records)} records to gs://{GCS_BUCKET}/{object_key}") # Update state with newest event time if newest_event_time: save_state(bucket, STATE_KEY, newest_event_time) else: save_state(bucket, STATE_KEY, now.strftime('%Y-%m-%d %H:%M:%S')) print(f"Successfully processed {len(records)} records") except Exception as e: print(f'Error processing logs: {str(e)}') raise def load_state(bucket, key): """Load state from GCS.""" try: blob = bucket.blob(key) if blob.exists(): state_data = blob.download_as_text() return json.loads(state_data) except Exception as e: print(f"Warning: Could not load state: {e}") return {} def save_state(bucket, key, last_event_time: str): """Save the last event timestamp to GCS state file.""" try: state = {'last_event_time': last_event_time} blob = bucket.blob(key) blob.upload_from_string( json.dumps(state, indent=2), content_type='application/json' ) print(f"Saved state: last_event_time={last_event_time}") except Exception as e: print(f"Warning: Could not save state: {e}") def fetch_logs(api_base: str, username: str, password: str, start_time: datetime, end_time: datetime, page_size: int, max_pages: int): """ Fetch logs from ServiceNow sys_audit table with pagination and rate limiting. Args: api_base: ServiceNow instance URL username: ServiceNow username password: ServiceNow password start_time: Start time for log query end_time: End time for log query page_size: Number of records per page max_pages: Maximum total pages to fetch Returns: Tuple of (records list, newest_event_time string) """ # Clean up base URL base_url = api_base.rstrip('/') endpoint = f"{base_url}/api/now/table/sys_audit" # Encode credentials using UTF-8 auth_string = f"{username}:{password}" auth_bytes = auth_string.encode('utf-8') auth_b64 = base64.b64encode(auth_bytes).decode('utf-8') headers = { 'Authorization': f'Basic {auth_b64}', 'Accept': 'application/json', 'Content-Type': 'application/json', 'User-Agent': 'GoogleSecOps-ServiceNowCollector/1.0' } records = [] newest_time = None page_num = 0 backoff = 1.0 offset = 0 # Format timestamps for ServiceNow (YYYY-MM-DD HH:MM:SS) start_time_str = start_time.strftime('%Y-%m-%d %H:%M:%S') while True: page_num += 1 if len(records) >= page_size * max_pages: print(f"Reached max_pages limit ({max_pages})") break # Build query parameters # Use >= operator for sys_created_on field (on or after) params = [] params.append(f"sysparm_query=sys_created_on>={start_time_str}") params.append(f"sysparm_display_value=true") params.append(f"sysparm_limit={page_size}") params.append(f"sysparm_offset={offset}") url = f"{endpoint}?{'&'.join(params)}" try: response = http.request('GET', url, headers=headers) # Handle rate limiting with exponential backoff if response.status == 429: retry_after = int(response.headers.get('Retry-After', str(int(backoff)))) print(f"Rate limited (429). Retrying after {retry_after}s...") time.sleep(retry_after) backoff = min(backoff * 2, 30.0) continue backoff = 1.0 if response.status != 200: print(f"HTTP Error: {response.status}") response_text = response.data.decode('utf-8') print(f"Response body: {response_text}") return [], None data = json.loads(response.data.decode('utf-8')) page_results = data.get('result', []) if not page_results: print(f"No more results (empty page)") break print(f"Page {page_num}: Retrieved {len(page_results)} events") records.extend(page_results) # Track newest event time for event in page_results: try: event_time = event.get('sys_created_on') if event_time: if newest_time is None or parse_datetime(event_time) > parse_datetime(newest_time): newest_time = event_time except Exception as e: print(f"Warning: Could not parse event time: {e}") # Check for more results if len(page_results) < page_size: print(f"Reached last page (size={len(page_results)} < limit={page_size})") break # Move to next page offset += page_size # Small delay to avoid rate limiting time.sleep(0.1) except Exception as e: print(f"Error fetching logs: {e}") return [], None print(f"Retrieved {len(records)} total records from {page_num} pages") return records, newest_time ```
Segundo arquivo: requirements.txt:
``` functions-framework==3.* google-cloud-storage==2.* urllib3>=2.0.0 ```
Clique em Implantar para salvar e implantar a função.
Aguarde a conclusão da implantação (2 a 3 minutos).
Criar o job do Cloud Scheduler
O Cloud Scheduler publica mensagens no tópico do Pub/Sub em intervalos regulares, acionando a função do Cloud Run.
- No Console do GCP, acesse o Cloud Scheduler.
- Clique em Criar job.
Informe os seguintes detalhes de configuração:
Configuração Valor Nome servicenow-audit-collector-hourlyRegião Selecione a mesma região da função do Cloud Run Frequência 0 * * * *(a cada hora, na hora)Fuso horário Selecione o fuso horário (UTC recomendado) Tipo de destino Pub/Sub Tópico Selecione o tópico do Pub/Sub ( servicenow-audit-trigger).Corpo da mensagem {}(objeto JSON vazio)Clique em Criar.
Opções de frequência de programação
Escolha a frequência com base no volume de registros e nos requisitos de latência:
Frequência Expressão Cron Caso de uso A cada 5 minutos */5 * * * *Alto volume e baixa latência A cada 15 minutos */15 * * * *Volume médio A cada hora 0 * * * *Padrão (recomendado) A cada 6 horas 0 */6 * * *Baixo volume, processamento em lote Diário 0 0 * * *Coleta de dados históricos
Testar a integração
- No console do Cloud Scheduler, encontre seu job.
- Clique em Executar à força para acionar o job manualmente.
- Aguarde alguns segundos.
- Acesse Cloud Run > Serviços.
- Clique no nome da função (
servicenow-audit-collector). - Clique na guia Registros.
Verifique se a função foi executada com sucesso. Procure o seguinte:
Fetching logs from YYYY-MM-DD HH:MM:SS to YYYY-MM-DD HH:MM:SS Page 1: Retrieved X events Wrote X records to gs://bucket-name/audit-logs/logs_YYYYMMDD_HHMMSS.ndjson Successfully processed X recordsAcesse Cloud Storage > Buckets.
Clique no nome do bucket.
Navegue até a pasta de prefixo (
audit-logs/).Verifique se um novo arquivo
.ndjsonfoi criado com o carimbo de data/hora atual.
Se você encontrar erros nos registros:
- HTTP 401: verifique as credenciais da API nas variáveis de ambiente
- HTTP 403: verifique se a conta tem as permissões necessárias (função de administrador ou ACL personalizada para sys_audit)
- HTTP 429: limitação de taxa. A função vai tentar novamente automaticamente com espera.
- Variáveis de ambiente ausentes: verifique se todas as variáveis necessárias estão definidas.
Recuperar a conta de serviço do Google SecOps
O Google SecOps usa uma conta de serviço exclusiva para ler dados do seu bucket do GCS. Você precisa conceder a essa conta de serviço acesso ao seu bucket.
Receber o e-mail da conta de serviço
- Acesse Configurações do SIEM > Feeds.
- Clique em Adicionar novo feed.
- Clique em Configurar um único feed.
- No campo Nome do feed, insira um nome para o feed (por exemplo,
ServiceNow Audit logs). - Selecione Google Cloud Storage V2 como o Tipo de origem.
- Selecione Auditoria do ServiceNow como o Tipo de registro.
Clique em Receber conta de serviço. Um e-mail exclusivo da conta de serviço é exibido, por exemplo:
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.comCopie esse endereço de e-mail para usar na próxima etapa.
Conceder permissões do IAM à conta de serviço do Google SecOps
A conta de serviço do Google SecOps precisa do papel de Leitor de objetos do Storage no seu bucket do GCS.
- Acesse Cloud Storage > Buckets.
- Clique no nome do bucket.
- Acesse a guia Permissões.
- Clique em Conceder acesso.
- Informe os seguintes detalhes de configuração:
- Adicionar participantes: cole o e-mail da conta de serviço do Google SecOps.
- Atribuir papéis: selecione Leitor de objetos do Storage.
Clique em Salvar.
Configurar um feed no Google SecOps para ingerir registros de auditoria do ServiceNow
- Acesse Configurações do SIEM > Feeds.
- Clique em Adicionar novo feed.
- Clique em Configurar um único feed.
- No campo Nome do feed, insira um nome para o feed (por exemplo,
ServiceNow Audit logs). - Selecione Google Cloud Storage V2 como o Tipo de origem.
- Selecione Auditoria do ServiceNow como o Tipo de registro.
- Clique em Próxima.
Especifique valores para os seguintes parâmetros de entrada:
URL do bucket de armazenamento: insira o URI do bucket do GCS com o caminho do prefixo:
gs://servicenow-audit-logs/audit-logs/Substitua:
servicenow-audit-logs: o nome do bucket do GCS.audit-logs: prefixo/caminho da pasta onde os registros são armazenados.
Opção de exclusão da fonte: selecione a opção de exclusão de acordo com sua preferência:
- Nunca: nunca exclui arquivos após as transferências (recomendado para testes).
- Excluir arquivos transferidos: exclui os arquivos após a transferência bem-sucedida.
Excluir arquivos transferidos e diretórios vazios: exclui arquivos e diretórios vazios após a transferência bem-sucedida.
Idade máxima do arquivo: inclui arquivos modificados no último número de dias. O padrão é de 180 dias.
Namespace do recurso: o namespace do recurso.
Rótulos de ingestão: o rótulo a ser aplicado aos eventos deste feed.
Clique em Próxima.
Revise a nova configuração do feed na tela Finalizar e clique em Enviar.
Opção B: agente do Bindplane com syslog
Esse método usa um agente do Bindplane para coletar registros de auditoria do ServiceNow e encaminhá-los para as Operações de Segurança do Google. Como o ServiceNow não oferece suporte nativo ao syslog para registros de auditoria, vamos usar um script para consultar a API REST do ServiceNow e encaminhar os registros ao agente do Bindplane via syslog.
Antes de começar
Verifique se você tem os pré-requisitos a seguir:
- Uma instância do Google SecOps
- Windows Server 2016 ou mais recente ou host Linux com
systemd - Conectividade de rede entre o agente do Bindplane e o ServiceNow
- Se você estiver executando por trás de um proxy, verifique se as portas do firewall estão abertas de acordo com os requisitos do agente do Bindplane.
- Acesso privilegiado ao console de gerenciamento ou appliance do ServiceNow com as funções adequadas (normalmente
adminou usuário com acesso de leitura à tabela sys_audit)
Receber o arquivo de autenticação de ingestão do Google SecOps
- Faça login no console do Google SecOps.
- Acesse Configurações do SIEM > Agente de coleta.
- Clique em Fazer o download para baixar o arquivo de autenticação de ingestão.
Salve o arquivo de forma segura no sistema em que o Bindplane será instalado.
Receber o ID de cliente do Google SecOps
- Faça login no console do Google SecOps.
- Acesse Configurações do SIEM > Perfil.
Copie e salve o ID do cliente na seção Detalhes da organização.
Instalar o agente do BindPlane
Instale o agente do Bindplane no seu sistema operacional Windows ou Linux de acordo com as instruções a seguir.
Instalação do Windows
- Abra o prompt de comando ou o PowerShell como administrador.
Execute este comando:
msiexec /i "https://github.com/observIQ/bindplane-agent/releases/latest/download/observiq-otel-collector.msi" /quietAguarde a conclusão da instalação.
Execute o seguinte comando para confirmar a instalação:
sc query observiq-otel-collector
O serviço vai aparecer como EM EXECUÇÃO.
Instalação do Linux
- Abra um terminal com privilégios de root ou sudo.
Execute este comando:
sudo sh -c "$(curl -fsSlL https://github.com/observiq/bindplane-agent/releases/latest/download/install_unix.sh)" install_unix.shAguarde a conclusão da instalação.
Execute o seguinte comando para confirmar a instalação:
sudo systemctl status observiq-otel-collector
O serviço vai aparecer como ativo (em execução).
Outros recursos de instalação
Para mais opções de instalação e solução de problemas, consulte o guia de instalação do agente do Bindplane.
Configurar o agente do Bindplane para ingerir syslog e enviar ao Google SecOps
Localizar o arquivo de configuração
Linux:bash
sudo nano /etc/bindplane-agent/config.yaml
Windows:cmd
notepad "C:\Program Files\observIQ OpenTelemetry Collector\config.yaml"
Editar o arquivo de configuração
Substitua todo o conteúdo de config.yaml pela seguinte configuração:
```yaml
receivers:
udplog:
listen_address: "0.0.0.0:514"
exporters:
chronicle/servicenow_audit:
compression: gzip
creds_file_path: '/path/to/ingestion-authentication-file.json'
customer_id: '<YOUR_CUSTOMER_ID>'
endpoint: <CUSTOMER_REGION_ENDPOINT>
log_type: 'SERVICENOW_AUDIT'
raw_log_field: body
ingestion_labels:
service: servicenow
service:
pipelines:
logs/servicenow_to_chronicle:
receivers:
- udplog
exporters:
- chronicle/servicenow_audit
```
Parâmetros de configuração
Substitua os seguintes marcadores de posição:
listen_address: endereço IP e porta a serem detectados. Use0.0.0.0:514para detectar todas as interfaces na porta 514.creds_file_path: caminho completo para o arquivo de autenticação de ingestão:- Linux:
/etc/bindplane-agent/ingestion-auth.json - Windows:
C:\Program Files\observIQ OpenTelemetry Collector\ingestion-auth.json
- Linux:
<YOUR_CUSTOMER_ID>: ID do cliente da etapa anterior.<CUSTOMER_REGION_ENDPOINT>: URL do endpoint regional:- EUA:
malachiteingestion-pa.googleapis.com - Europa:
europe-malachiteingestion-pa.googleapis.com - Ásia:
asia-southeast1-malachiteingestion-pa.googleapis.com - Consulte a lista completa em Endpoints regionais.
- EUA:
Salvar o arquivo de configuração
Depois de editar, salve o arquivo:
* Linux: pressione Ctrl+O, Enter e Ctrl+X.
* Windows: clique em Arquivo > Salvar.
Reinicie o agente do Bindplane para aplicar as mudanças
Para reiniciar o agente do Bindplane no Linux, execute o seguinte comando:
sudo systemctl restart observiq-otel-collectorVerifique se o serviço está em execução:
sudo systemctl status observiq-otel-collectorVerifique se há erros nos registros:
sudo journalctl -u observiq-otel-collector -f
Para reiniciar o agente do Bindplane em Windows, escolha uma das seguintes opções:
Usando o prompt de comando ou o PowerShell como administrador:
net stop observiq-otel-collector && net start observiq-otel-collectorUsando o console do Services:
- Pressione
Win+R, digiteservices.msce pressione Enter. - Localize o Coletor do OpenTelemetry da observIQ.
Clique com o botão direito do mouse e selecione Reiniciar.
Verifique se o serviço está em execução:
sc query observiq-otel-collectorVerifique se há erros nos registros:
type "C:\Program Files\observIQ OpenTelemetry Collector\log\collector.log"
Criar um script para encaminhar registros de auditoria do ServiceNow para o syslog
Como o ServiceNow não oferece suporte nativo ao syslog para registros de auditoria, vamos criar um script que consulta a API REST do ServiceNow e encaminha os registros para o syslog. É possível programar a execução periódica desse script.
Exemplo de script do Python (Linux)
Crie um arquivo chamado
servicenow_audit_to_syslog.pycom o conteúdo a seguir:import urllib3 import json import datetime import base64 import socket import time import os # ServiceNow API details BASE_URL = 'https://instance.service-now.com' # Replace with your ServiceNow instance URL USERNAME = 'admin' # Replace with your ServiceNow username PASSWORD = 'password' # Replace with your ServiceNow password # Syslog details SYSLOG_SERVER = '127.0.0.1' # Replace with your Bindplane agent IP SYSLOG_PORT = 514 # Replace with your Bindplane agent port # State file to keep track of last run STATE_FILE = '/tmp/servicenow_audit_last_run.txt' # Pagination settings PAGE_SIZE = 1000 MAX_PAGES = 1000 def get_last_run_timestamp(): try: with open(STATE_FILE, 'r') as f: return f.read().strip() except: return '1970-01-01 00:00:00' def update_state_file(timestamp): with open(STATE_FILE, 'w') as f: f.write(timestamp) def send_to_syslog(message): sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.sendto(message.encode(), (SYSLOG_SERVER, SYSLOG_PORT)) sock.close() def get_audit_logs(last_run_timestamp): """ Query ServiceNow sys_audit table with proper pagination. Uses sys_created_on field for timestamp filtering. """ # Encode credentials using UTF-8 auth_string = f"{USERNAME}:{PASSWORD}" auth_bytes = auth_string.encode('utf-8') auth_encoded = base64.b64encode(auth_bytes).decode('utf-8') # Setup HTTP client http = urllib3.PoolManager() headers = { 'Authorization': f'Basic {auth_encoded}', 'Accept': 'application/json' } results = [] offset = 0 # Format timestamp for ServiceNow (YYYY-MM-DD HH:MM:SS format) # Convert ISO format to ServiceNow format if needed if 'T' in last_run_timestamp: last_run_timestamp = last_run_timestamp.replace('T', ' ').split('.')[0] for page in range(MAX_PAGES): # Build query with pagination # Use >= operator for sys_created_on field (on or after) query_params = ( f"sysparm_query=sys_created_on>={last_run_timestamp}" f"&sysparm_display_value=true" f"&sysparm_limit={PAGE_SIZE}" f"&sysparm_offset={offset}" ) url = f"{BASE_URL}/api/now/table/sys_audit?{query_params}" try: response = http.request('GET', url, headers=headers) if response.status == 200: data = json.loads(response.data.decode('utf-8')) chunk = data.get('result', []) results.extend(chunk) # Stop if we got fewer records than PAGE_SIZE (last page) if len(chunk) < PAGE_SIZE: break # Move to next page offset += PAGE_SIZE else: print(f"Error querying ServiceNow API: {response.status} - {response.data.decode('utf-8')}") break except Exception as e: print(f"Exception querying ServiceNow API: {str(e)}") break return results def main(): # Get last run timestamp last_run_timestamp = get_last_run_timestamp() # Current timestamp for this run current_timestamp = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') # Query ServiceNow API for audit logs audit_logs = get_audit_logs(last_run_timestamp) if audit_logs: # Send each log to syslog for log in audit_logs: # Format the log as JSON log_json = json.dumps(log) # Send to syslog send_to_syslog(log_json) # Sleep briefly to avoid flooding time.sleep(0.01) # Update state file update_state_file(current_timestamp) print(f"Successfully forwarded {len(audit_logs)} audit logs to syslog") else: print("No new audit logs to forward") if __name__ == "__main__": main()
Configurar a execução programada (Linux)
Torne o script executável:
chmod +x servicenow_audit_to_syslog.pyCrie um cron job para executar o script a cada hora:
crontab -eAdicione a linha abaixo:
0 * * * * /usr/bin/python3 /path/to/servicenow_audit_to_syslog.py >> /tmp/servicenow_audit_to_syslog.log 2>&1
Exemplo de script do PowerShell (Windows)
Crie um arquivo chamado
ServiceNow-Audit-To-Syslog.ps1com o conteúdo a seguir:# ServiceNow API details $BaseUrl = 'https://instance.service-now.com' # Replace with your ServiceNow instance URL $Username = 'admin' # Replace with your ServiceNow username $Password = 'password' # Replace with your ServiceNow password # Syslog details $SyslogServer = '127.0.0.1' # Replace with your Bindplane agent IP $SyslogPort = 514 # Replace with your Bindplane agent port # State file to keep track of last run $StateFile = "$env:TEMP\ServiceNowAuditLastRun.txt" # Pagination settings $PageSize = 1000 $MaxPages = 1000 function Get-LastRunTimestamp { try { if (Test-Path $StateFile) { return Get-Content $StateFile } else { return '1970-01-01 00:00:00' } } catch { return '1970-01-01 00:00:00' } } function Update-StateFile { param([string]$Timestamp) Set-Content -Path $StateFile -Value $Timestamp } function Send-ToSyslog { param([string]$Message) $UdpClient = New-Object System.Net.Sockets.UdpClient $UdpClient.Connect($SyslogServer, $SyslogPort) $Encoding = [System.Text.Encoding]::ASCII $Bytes = $Encoding.GetBytes($Message) $UdpClient.Send($Bytes, $Bytes.Length) $UdpClient.Close() } function Get-AuditLogs { param([string]$LastRunTimestamp) # Create auth header using UTF-8 encoding $Auth = [System.Convert]::ToBase64String([System.Text.Encoding]::UTF8.GetBytes("${Username}:${Password}")) $Headers = @{ Authorization = "Basic ${Auth}" Accept = 'application/json' } $Results = @() $Offset = 0 # Format timestamp for ServiceNow (YYYY-MM-DD HH:MM:SS format) # Convert ISO format to ServiceNow format if needed if ($LastRunTimestamp -match 'T') { $LastRunTimestamp = $LastRunTimestamp -replace 'T', ' ' $LastRunTimestamp = $LastRunTimestamp -replace '\.\d+', '' } for ($page = 0; $page -lt $MaxPages; $page++) { # Build query with pagination # Use >= operator for sys_created_on field (on or after) $QueryParams = "sysparm_query=sys_created_on>=${LastRunTimestamp}&sysparm_display_value=true&sysparm_limit=${PageSize}&sysparm_offset=${Offset}" $Url = "${BaseUrl}/api/now/table/sys_audit?${QueryParams}" try { $Response = Invoke-RestMethod -Uri $Url -Headers $Headers -Method Get $Chunk = $Response.result $Results += $Chunk # Stop if we got fewer records than PageSize (last page) if ($Chunk.Count -lt $PageSize) { break } # Move to next page $Offset += $PageSize } catch { Write-Error "Error querying ServiceNow API: $_" break } } return $Results } # Main execution $LastRunTimestamp = Get-LastRunTimestamp $CurrentTimestamp = (Get-Date).ToString('yyyy-MM-dd HH:mm:ss') $AuditLogs = Get-AuditLogs -LastRunTimestamp $LastRunTimestamp if ($AuditLogs -and $AuditLogs.Count -gt 0) { # Send each log to syslog foreach ($Log in $AuditLogs) { # Format the log as JSON $LogJson = $Log | ConvertTo-Json -Compress # Send to syslog Send-ToSyslog -Message $LogJson # Sleep briefly to avoid flooding Start-Sleep -Milliseconds 10 } # Update state file Update-StateFile -Timestamp $CurrentTimestamp Write-Output "Successfully forwarded $($AuditLogs.Count) audit logs to syslog" } else { Write-Output "No new audit logs to forward" }
Configurar a execução programada (Windows)
- Abra o Agendador de tarefas.
- Clique em Criar tarefa.
- Forneça a seguinte configuração:
- Nome:
ServiceNowAuditToSyslog - Opções de segurança: executar se o usuário estiver conectado ou não
- Nome:
- Acesse a guia Gatilhos.
- Clique em Novo e defina para execução a cada hora.
- Acesse a guia Ações.
- Clique em Novo e defina:
- Ação: iniciar um programa
- Programa/script:
powershell.exe - Arguments:
-ExecutionPolicy Bypass -File "C:\path\to\ServiceNow-Audit-To-Syslog.ps1"
- Clique em OK para salvar a tarefa.
Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais do Google SecOps.