Registros de auditoria do ServiceNow

Compatível com:

Este documento explica como ingerir registros de auditoria do ServiceNow no Google Security Operations usando vários métodos.

Opção A: GCS com função do Cloud Run

Esse método usa uma função do Cloud Run para consultar periodicamente a API REST do ServiceNow em busca de registros de auditoria e armazená-los em um bucket do GCS. Em seguida, o Google Security Operations coleta os registros do bucket do GCS.

Antes de começar

Verifique se você atende os seguintes pré-requisitos:

  • Uma instância do Google SecOps
  • Acesso privilegiado ao locatário ou à API do ServiceNow com as funções adequadas (normalmente admin ou usuário com acesso de leitura à tabela sys_audit)
  • Um projeto do GCP com a API Cloud Storage ativada
  • Permissões para criar e gerenciar buckets do GCS
  • Permissões para gerenciar políticas do IAM em buckets do GCS
  • Permissões para criar serviços do Cloud Run, tópicos do Pub/Sub e jobs do Cloud Scheduler

Coletar os pré-requisitos do ServiceNow (IDs, chaves de API, IDs da organização, tokens)

  1. Faça login no ServiceNow Admin Console.
  2. Acesse Segurança do sistema > Usuários e grupos > Usuários.
  3. Crie um usuário ou selecione um usuário com as permissões adequadas para acessar os registros de auditoria.
  4. Copie e salve em um local seguro os seguintes detalhes:

    • Nome de usuário
    • Senha
    • URL da instância (por exemplo, https://instance.service-now.com)

Configurar ACL para usuários que não são administradores

Se você quiser usar uma conta de usuário não administrador, crie uma lista de controle de acesso (ACL) personalizada para conceder acesso de leitura à tabela sys_audit:

  1. Faça login no Admin Console do ServiceNow como administrador.
  2. Acesse Segurança do sistema > Controle de acesso (ACL).
  3. Clique em Novo.
  4. Informe os seguintes detalhes de configuração:
    • Tipo: selecione registro.
    • Operação: selecione ler.
    • Nome: insira sys_audit.
    • Descrição: insira Allow read access to sys_audit table for Chronicle integration.
  5. No campo Requer função, adicione a função atribuída ao usuário da integração (por exemplo, chronicle_reader).
  6. Clique em Enviar.
  7. Verifique se a ACL está ativa e se o usuário pode consultar a tabela sys_audit.

Criar um bucket do Google Cloud Storage

  1. Acesse o Console do Google Cloud.
  2. Selecione seu projeto ou crie um novo.
  3. No menu de navegação, acesse Cloud Storage > Buckets.
  4. Clique em Criar bucket.
  5. Informe os seguintes detalhes de configuração:

    Configuração Valor
    Nomeie seu bucket Insira um nome exclusivo globalmente, por exemplo, servicenow-audit-logs.
    Tipo de local Escolha com base nas suas necessidades (região, birregional, multirregional)
    Local Selecione o local (por exemplo, us-central1).
    Classe de armazenamento Padrão (recomendado para registros acessados com frequência)
    Controle de acesso Uniforme (recomendado)
    Ferramentas de proteção Opcional: ativar o controle de versões de objetos ou a política de retenção
  6. Clique em Criar.

Criar uma conta de serviço para a função do Cloud Run

A função do Cloud Run precisa de uma conta de serviço com permissões para gravar no bucket do GCS e ser invocada pelo Pub/Sub.

Criar conta de serviço

  1. No Console do GCP, acesse IAM e administrador > Contas de serviço.
  2. Clique em Criar conta de serviço.
  3. Informe os seguintes detalhes de configuração:
    • Nome da conta de serviço: insira servicenow-audit-collector-sa.
    • Descrição da conta de serviço: insira Service account for Cloud Run function to collect ServiceNow audit logs.
  4. Clique em Criar e continuar.
  5. Na seção Conceder acesso a essa conta de serviço ao projeto, adicione os seguintes papéis:
    1. Clique em Selecionar papel.
    2. Pesquise e selecione Administrador de objetos do Storage.
    3. Clique em + Adicionar outro papel.
    4. Pesquise e selecione Invocador do Cloud Run.
    5. Clique em + Adicionar outro papel.
    6. Pesquise e selecione Invocador do Cloud Functions.
  6. Clique em Continuar.
  7. Clique em Concluído.

Esses papéis são necessários para:

  • Administrador de objetos do Storage: grava registros em um bucket do GCS e gerencia arquivos de estado.
  • Invocador do Cloud Run: permite que o Pub/Sub invoque a função
  • Invocador do Cloud Functions: permite a invocação de funções

Conceder permissões do IAM no bucket do GCS

Conceda permissões de gravação à conta de serviço no bucket do GCS:

  1. Acesse Cloud Storage > Buckets.
  2. Clique no nome do bucket.
  3. Acesse a guia Permissões.
  4. Clique em Conceder acesso.
  5. Informe os seguintes detalhes de configuração:
    • Adicionar principais: insira o e-mail da conta de serviço (por exemplo, servicenow-audit-collector-sa@PROJECT_ID.iam.gserviceaccount.com).
    • Atribuir papéis: selecione Administrador de objetos do Storage.
  6. Clique em Salvar.

Criar tópico Pub/Sub

Crie um tópico do Pub/Sub em que o Cloud Scheduler vai publicar e a função do Cloud Run vai se inscrever.

  1. No Console do GCP, acesse Pub/Sub > Tópicos.
  2. Selecione Criar tópico.
  3. Informe os seguintes detalhes de configuração:
    • ID do tópico: insira servicenow-audit-trigger.
    • Não altere as outras configurações.
  4. Clique em Criar.

Criar uma função do Cloud Run para coletar registros

A função do Cloud Run é acionada por mensagens do Pub/Sub do Cloud Scheduler para buscar registros da API ServiceNow e gravá-los no GCS.

  1. No console do GCP, acesse o Cloud Run.
  2. Clique em Criar serviço.
  3. Selecione Função (use um editor in-line para criar uma função).
  4. Na seção Configurar, forneça os seguintes detalhes de configuração:

    Configuração Valor
    Nome do serviço servicenow-audit-collector
    Região Selecione a região que corresponde ao seu bucket do GCS (por exemplo, us-central1).
    Ambiente de execução Selecione Python 3.12 ou uma versão mais recente.
  5. Na seção Acionador (opcional):

    1. Clique em + Adicionar gatilho.
    2. Selecione Cloud Pub/Sub.
    3. Em Selecionar um tópico do Cloud Pub/Sub, escolha o tópico do Pub/Sub (servicenow-audit-trigger).
    4. Clique em Salvar.
  6. Na seção Autenticação:

    1. Selecione Exigir autenticação.
    2. Confira o Identity and Access Management (IAM).
  7. Role a tela para baixo e abra Contêineres, rede, segurança.

  8. Acesse a guia Segurança:

    • Conta de serviço: selecione a conta de serviço (servicenow-audit-collector-sa).
  9. Acesse a guia Contêineres:

    1. Clique em Variáveis e secrets.
    2. Clique em + Adicionar variável para cada variável de ambiente:
    Nome da variável Valor de exemplo Descrição
    GCS_BUCKET servicenow-audit-logs Nome do bucket do GCS
    GCS_PREFIX audit-logs Prefixo para arquivos de registro
    STATE_KEY audit-logs/state.json Caminho do arquivo de estado
    API_BASE_URL https://instance.service-now.com URL da instância do ServiceNow
    API_USERNAME your-username Nome de usuário do ServiceNow
    API_PASSWORD your-password Senha do ServiceNow
    PAGE_SIZE 1000 Registros por página
    MAX_PAGES 1000 Número máximo de páginas a serem buscadas
  10. Na seção Variáveis e secrets, role a tela para baixo até Solicitações:

    • Tempo limite da solicitação: insira 600 segundos (10 minutos).
  11. Acesse a guia Configurações:

    • Na seção Recursos:
      • Memória: selecione 512 MiB ou mais.
      • CPU: selecione 1.
  12. Na seção Escalonamento de revisão:

    • Número mínimo de instâncias: insira 0.
    • Número máximo de instâncias: insira 100 ou ajuste com base na carga esperada.
  13. Clique em Criar.

  14. Aguarde a criação do serviço (1 a 2 minutos).

  15. Depois que o serviço é criado, o editor de código inline é aberto automaticamente.

Adicionar código da função

  1. Insira main no campo Ponto de entrada.
  2. No editor de código em linha, crie dois arquivos:

    • Primeiro arquivo: main.py::

          import functions_framework
          from google.cloud import storage
          import json
          import os
          import urllib3
          from datetime import datetime, timezone, timedelta
          import time
          import base64
      
          # Initialize HTTP client with timeouts
          http = urllib3.PoolManager(
              timeout=urllib3.Timeout(connect=5.0, read=30.0),
              retries=False,
          )
      
          # Initialize Storage client
          storage_client = storage.Client()
      
          # Environment variables
          GCS_BUCKET = os.environ.get('GCS_BUCKET')
          GCS_PREFIX = os.environ.get('GCS_PREFIX', 'audit-logs')
          STATE_KEY = os.environ.get('STATE_KEY', 'audit-logs/state.json')
          API_BASE = os.environ.get('API_BASE_URL')
          USERNAME = os.environ.get('API_USERNAME')
          PASSWORD = os.environ.get('API_PASSWORD')
          PAGE_SIZE = int(os.environ.get('PAGE_SIZE', '1000'))
          MAX_PAGES = int(os.environ.get('MAX_PAGES', '1000'))
      
          def parse_datetime(value: str) -> datetime:
              """Parse ServiceNow datetime string to datetime object."""
              # ServiceNow format: YYYY-MM-DD HH:MM:SS
              try:
                  return datetime.strptime(value, '%Y-%m-%d %H:%M:%S').replace(tzinfo=timezone.utc)
              except ValueError:
                  # Try ISO format as fallback
                  if value.endswith("Z"):
                      value = value[:-1] + "+00:00"
                  return datetime.fromisoformat(value)
      
          @functions_framework.cloud_event
          def main(cloud_event):
              """
              Cloud Run function triggered by Pub/Sub to fetch ServiceNow audit logs and write to GCS.
      
              Args:
                  cloud_event: CloudEvent object containing Pub/Sub message
              """
      
              if not all([GCS_BUCKET, API_BASE, USERNAME, PASSWORD]):
                  print('Error: Missing required environment variables')
                  return
      
              try:
                  # Get GCS bucket
                  bucket = storage_client.bucket(GCS_BUCKET)
      
                  # Load state
                  state = load_state(bucket, STATE_KEY)
      
                  # Determine time window
                  now = datetime.now(timezone.utc)
                  last_time = None
      
                  if isinstance(state, dict) and state.get("last_event_time"):
                      try:
                          last_time = parse_datetime(state["last_event_time"])
                          # Overlap by 2 minutes to catch any delayed events
                          last_time = last_time - timedelta(minutes=2)
                      except Exception as e:
                          print(f"Warning: Could not parse last_event_time: {e}")
      
                  if last_time is None:
                      last_time = now - timedelta(hours=24)
      
                  print(f"Fetching logs from {last_time.strftime('%Y-%m-%d %H:%M:%S')} to {now.strftime('%Y-%m-%d %H:%M:%S')}")
      
                  # Fetch logs
                  records, newest_event_time = fetch_logs(
                      api_base=API_BASE,
                      username=USERNAME,
                      password=PASSWORD,
                      start_time=last_time,
                      end_time=now,
                      page_size=PAGE_SIZE,
                      max_pages=MAX_PAGES,
                  )
      
                  if not records:
                      print("No new log records found.")
                      save_state(bucket, STATE_KEY, now.strftime('%Y-%m-%d %H:%M:%S'))
                      return
      
                  # Write to GCS as NDJSON
                  timestamp = now.strftime('%Y%m%d_%H%M%S')
                  object_key = f"{GCS_PREFIX}/logs_{timestamp}.ndjson"
                  blob = bucket.blob(object_key)
      
                  ndjson = '\n'.join([json.dumps(record, ensure_ascii=False) for record in records]) + '\n'
                  blob.upload_from_string(ndjson, content_type='application/x-ndjson')
      
                  print(f"Wrote {len(records)} records to gs://{GCS_BUCKET}/{object_key}")
      
                  # Update state with newest event time
                  if newest_event_time:
                      save_state(bucket, STATE_KEY, newest_event_time)
                  else:
                      save_state(bucket, STATE_KEY, now.strftime('%Y-%m-%d %H:%M:%S'))
      
                  print(f"Successfully processed {len(records)} records")
      
              except Exception as e:
                  print(f'Error processing logs: {str(e)}')
                  raise
      
          def load_state(bucket, key):
              """Load state from GCS."""
              try:
                  blob = bucket.blob(key)
                  if blob.exists():
                      state_data = blob.download_as_text()
                      return json.loads(state_data)
              except Exception as e:
                  print(f"Warning: Could not load state: {e}")
      
              return {}
      
          def save_state(bucket, key, last_event_time: str):
              """Save the last event timestamp to GCS state file."""
              try:
                  state = {'last_event_time': last_event_time}
                  blob = bucket.blob(key)
                  blob.upload_from_string(
                      json.dumps(state, indent=2),
                      content_type='application/json'
                  )
                  print(f"Saved state: last_event_time={last_event_time}")
              except Exception as e:
                  print(f"Warning: Could not save state: {e}")
      
          def fetch_logs(api_base: str, username: str, password: str, start_time: datetime, end_time: datetime, page_size: int, max_pages: int):
              """
              Fetch logs from ServiceNow sys_audit table with pagination and rate limiting.
      
              Args:
                  api_base: ServiceNow instance URL
                  username: ServiceNow username
                  password: ServiceNow password
                  start_time: Start time for log query
                  end_time: End time for log query
                  page_size: Number of records per page
                  max_pages: Maximum total pages to fetch
      
              Returns:
                  Tuple of (records list, newest_event_time string)
              """
              # Clean up base URL
              base_url = api_base.rstrip('/')
      
              endpoint = f"{base_url}/api/now/table/sys_audit"
      
              # Encode credentials using UTF-8
              auth_string = f"{username}:{password}"
              auth_bytes = auth_string.encode('utf-8')
              auth_b64 = base64.b64encode(auth_bytes).decode('utf-8')
      
              headers = {
                  'Authorization': f'Basic {auth_b64}',
                  'Accept': 'application/json',
                  'Content-Type': 'application/json',
                  'User-Agent': 'GoogleSecOps-ServiceNowCollector/1.0'
              }
      
              records = []
              newest_time = None
              page_num = 0
              backoff = 1.0
              offset = 0
      
              # Format timestamps for ServiceNow (YYYY-MM-DD HH:MM:SS)
              start_time_str = start_time.strftime('%Y-%m-%d %H:%M:%S')
      
              while True:
                  page_num += 1
      
                  if len(records) >= page_size * max_pages:
                      print(f"Reached max_pages limit ({max_pages})")
                      break
      
                  # Build query parameters
                  # Use >= operator for sys_created_on field (on or after)
                  params = []
                  params.append(f"sysparm_query=sys_created_on>={start_time_str}")
                  params.append(f"sysparm_display_value=true")
                  params.append(f"sysparm_limit={page_size}")
                  params.append(f"sysparm_offset={offset}")
      
                  url = f"{endpoint}?{'&'.join(params)}"
      
                  try:
                      response = http.request('GET', url, headers=headers)
      
                      # Handle rate limiting with exponential backoff
                      if response.status == 429:
                          retry_after = int(response.headers.get('Retry-After', str(int(backoff))))
                          print(f"Rate limited (429). Retrying after {retry_after}s...")
                          time.sleep(retry_after)
                          backoff = min(backoff * 2, 30.0)
                          continue
      
                      backoff = 1.0
      
                      if response.status != 200:
                          print(f"HTTP Error: {response.status}")
                          response_text = response.data.decode('utf-8')
                          print(f"Response body: {response_text}")
                          return [], None
      
                      data = json.loads(response.data.decode('utf-8'))
                      page_results = data.get('result', [])
      
                      if not page_results:
                          print(f"No more results (empty page)")
                          break
      
                      print(f"Page {page_num}: Retrieved {len(page_results)} events")
                      records.extend(page_results)
      
                      # Track newest event time
                      for event in page_results:
                          try:
                              event_time = event.get('sys_created_on')
                              if event_time:
                                  if newest_time is None or parse_datetime(event_time) > parse_datetime(newest_time):
                                      newest_time = event_time
                          except Exception as e:
                              print(f"Warning: Could not parse event time: {e}")
      
                      # Check for more results
                      if len(page_results) < page_size:
                          print(f"Reached last page (size={len(page_results)} < limit={page_size})")
                          break
      
                      # Move to next page
                      offset += page_size
      
                      # Small delay to avoid rate limiting
                      time.sleep(0.1)
      
                  except Exception as e:
                      print(f"Error fetching logs: {e}")
                      return [], None
      
              print(f"Retrieved {len(records)} total records from {page_num} pages")
              return records, newest_time
          ```
      
    • Segundo arquivo: requirements.txt:

      ```
      functions-framework==3.*
      google-cloud-storage==2.*
      urllib3>=2.0.0
      ```
      
  3. Clique em Implantar para salvar e implantar a função.

  4. Aguarde a conclusão da implantação (2 a 3 minutos).

Criar o job do Cloud Scheduler

O Cloud Scheduler publica mensagens no tópico do Pub/Sub em intervalos regulares, acionando a função do Cloud Run.

  1. No Console do GCP, acesse o Cloud Scheduler.
  2. Clique em Criar job.
  3. Informe os seguintes detalhes de configuração:

    Configuração Valor
    Nome servicenow-audit-collector-hourly
    Região Selecione a mesma região da função do Cloud Run
    Frequência 0 * * * * (a cada hora, na hora)
    Fuso horário Selecione o fuso horário (UTC recomendado)
    Tipo de destino Pub/Sub
    Tópico Selecione o tópico do Pub/Sub (servicenow-audit-trigger).
    Corpo da mensagem {} (objeto JSON vazio)
  4. Clique em Criar.

Opções de frequência de programação

  • Escolha a frequência com base no volume de registros e nos requisitos de latência:

    Frequência Expressão Cron Caso de uso
    A cada 5 minutos */5 * * * * Alto volume e baixa latência
    A cada 15 minutos */15 * * * * Volume médio
    A cada hora 0 * * * * Padrão (recomendado)
    A cada 6 horas 0 */6 * * * Baixo volume, processamento em lote
    Diário 0 0 * * * Coleta de dados históricos

Testar a integração

  1. No console do Cloud Scheduler, encontre seu job.
  2. Clique em Executar à força para acionar o job manualmente.
  3. Aguarde alguns segundos.
  4. Acesse Cloud Run > Serviços.
  5. Clique no nome da função (servicenow-audit-collector).
  6. Clique na guia Registros.
  7. Verifique se a função foi executada com sucesso. Procure o seguinte:

    Fetching logs from YYYY-MM-DD HH:MM:SS to YYYY-MM-DD HH:MM:SS
    Page 1: Retrieved X events
    Wrote X records to gs://bucket-name/audit-logs/logs_YYYYMMDD_HHMMSS.ndjson
    Successfully processed X records
    
  8. Acesse Cloud Storage > Buckets.

  9. Clique no nome do bucket.

  10. Navegue até a pasta de prefixo (audit-logs/).

  11. Verifique se um novo arquivo .ndjson foi criado com o carimbo de data/hora atual.

Se você encontrar erros nos registros:

  • HTTP 401: verifique as credenciais da API nas variáveis de ambiente
  • HTTP 403: verifique se a conta tem as permissões necessárias (função de administrador ou ACL personalizada para sys_audit)
  • HTTP 429: limitação de taxa. A função vai tentar novamente automaticamente com espera.
  • Variáveis de ambiente ausentes: verifique se todas as variáveis necessárias estão definidas.

Recuperar a conta de serviço do Google SecOps

O Google SecOps usa uma conta de serviço exclusiva para ler dados do seu bucket do GCS. Você precisa conceder a essa conta de serviço acesso ao seu bucket.

Receber o e-mail da conta de serviço

  1. Acesse Configurações do SIEM > Feeds.
  2. Clique em Adicionar novo feed.
  3. Clique em Configurar um único feed.
  4. No campo Nome do feed, insira um nome para o feed (por exemplo, ServiceNow Audit logs).
  5. Selecione Google Cloud Storage V2 como o Tipo de origem.
  6. Selecione Auditoria do ServiceNow como o Tipo de registro.
  7. Clique em Receber conta de serviço. Um e-mail exclusivo da conta de serviço é exibido, por exemplo:

    chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com
    
  8. Copie esse endereço de e-mail para usar na próxima etapa.

Conceder permissões do IAM à conta de serviço do Google SecOps

A conta de serviço do Google SecOps precisa do papel de Leitor de objetos do Storage no seu bucket do GCS.

  1. Acesse Cloud Storage > Buckets.
  2. Clique no nome do bucket.
  3. Acesse a guia Permissões.
  4. Clique em Conceder acesso.
  5. Informe os seguintes detalhes de configuração:
    • Adicionar participantes: cole o e-mail da conta de serviço do Google SecOps.
    • Atribuir papéis: selecione Leitor de objetos do Storage.
  6. Clique em Salvar.

Configurar um feed no Google SecOps para ingerir registros de auditoria do ServiceNow

  1. Acesse Configurações do SIEM > Feeds.
  2. Clique em Adicionar novo feed.
  3. Clique em Configurar um único feed.
  4. No campo Nome do feed, insira um nome para o feed (por exemplo, ServiceNow Audit logs).
  5. Selecione Google Cloud Storage V2 como o Tipo de origem.
  6. Selecione Auditoria do ServiceNow como o Tipo de registro.
  7. Clique em Próxima.
  8. Especifique valores para os seguintes parâmetros de entrada:

    • URL do bucket de armazenamento: insira o URI do bucket do GCS com o caminho do prefixo:

      gs://servicenow-audit-logs/audit-logs/
      
      • Substitua:

        • servicenow-audit-logs: o nome do bucket do GCS.
        • audit-logs: prefixo/caminho da pasta onde os registros são armazenados.
    • Opção de exclusão da fonte: selecione a opção de exclusão de acordo com sua preferência:

      • Nunca: nunca exclui arquivos após as transferências (recomendado para testes).
      • Excluir arquivos transferidos: exclui os arquivos após a transferência bem-sucedida.
      • Excluir arquivos transferidos e diretórios vazios: exclui arquivos e diretórios vazios após a transferência bem-sucedida.

    • Idade máxima do arquivo: inclui arquivos modificados no último número de dias. O padrão é de 180 dias.

    • Namespace do recurso: o namespace do recurso.

    • Rótulos de ingestão: o rótulo a ser aplicado aos eventos deste feed.

  9. Clique em Próxima.

  10. Revise a nova configuração do feed na tela Finalizar e clique em Enviar.

Opção B: agente do Bindplane com syslog

Esse método usa um agente do Bindplane para coletar registros de auditoria do ServiceNow e encaminhá-los para as Operações de Segurança do Google. Como o ServiceNow não oferece suporte nativo ao syslog para registros de auditoria, vamos usar um script para consultar a API REST do ServiceNow e encaminhar os registros ao agente do Bindplane via syslog.

Antes de começar

Verifique se você tem os pré-requisitos a seguir:

  • Uma instância do Google SecOps
  • Windows Server 2016 ou mais recente ou host Linux com systemd
  • Conectividade de rede entre o agente do Bindplane e o ServiceNow
  • Se você estiver executando por trás de um proxy, verifique se as portas do firewall estão abertas de acordo com os requisitos do agente do Bindplane.
  • Acesso privilegiado ao console de gerenciamento ou appliance do ServiceNow com as funções adequadas (normalmente admin ou usuário com acesso de leitura à tabela sys_audit)

Receber o arquivo de autenticação de ingestão do Google SecOps

  1. Faça login no console do Google SecOps.
  2. Acesse Configurações do SIEM > Agente de coleta.
  3. Clique em Fazer o download para baixar o arquivo de autenticação de ingestão.
  4. Salve o arquivo de forma segura no sistema em que o Bindplane será instalado.

Receber o ID de cliente do Google SecOps

  1. Faça login no console do Google SecOps.
  2. Acesse Configurações do SIEM > Perfil.
  3. Copie e salve o ID do cliente na seção Detalhes da organização.

Instalar o agente do BindPlane

Instale o agente do Bindplane no seu sistema operacional Windows ou Linux de acordo com as instruções a seguir.

Instalação do Windows

  1. Abra o prompt de comando ou o PowerShell como administrador.
  2. Execute este comando:

    msiexec /i "https://github.com/observIQ/bindplane-agent/releases/latest/download/observiq-otel-collector.msi" /quiet
    
  3. Aguarde a conclusão da instalação.

  4. Execute o seguinte comando para confirmar a instalação:

    sc query observiq-otel-collector
    

O serviço vai aparecer como EM EXECUÇÃO.

Instalação do Linux

  1. Abra um terminal com privilégios de root ou sudo.
  2. Execute este comando:

    sudo sh -c "$(curl -fsSlL https://github.com/observiq/bindplane-agent/releases/latest/download/install_unix.sh)" install_unix.sh
    
  3. Aguarde a conclusão da instalação.

  4. Execute o seguinte comando para confirmar a instalação:

    sudo systemctl status observiq-otel-collector
    

O serviço vai aparecer como ativo (em execução).

Outros recursos de instalação

Para mais opções de instalação e solução de problemas, consulte o guia de instalação do agente do Bindplane.

Configurar o agente do Bindplane para ingerir syslog e enviar ao Google SecOps

Localizar o arquivo de configuração

Linux:bash sudo nano /etc/bindplane-agent/config.yaml

Windows:cmd notepad "C:\Program Files\observIQ OpenTelemetry Collector\config.yaml"

Editar o arquivo de configuração

Substitua todo o conteúdo de config.yaml pela seguinte configuração:

```yaml
receivers:
  udplog:
    listen_address: "0.0.0.0:514"

exporters:
  chronicle/servicenow_audit:
    compression: gzip
    creds_file_path: '/path/to/ingestion-authentication-file.json'
    customer_id: '<YOUR_CUSTOMER_ID>'
    endpoint: <CUSTOMER_REGION_ENDPOINT>
    log_type: 'SERVICENOW_AUDIT'
    raw_log_field: body
    ingestion_labels:
      service: servicenow

service:
  pipelines:
    logs/servicenow_to_chronicle:
      receivers:
        - udplog
      exporters:
        - chronicle/servicenow_audit
```

Parâmetros de configuração

Substitua os seguintes marcadores de posição:

  • listen_address: endereço IP e porta a serem detectados. Use 0.0.0.0:514 para detectar todas as interfaces na porta 514.
  • creds_file_path: caminho completo para o arquivo de autenticação de ingestão:
    • Linux: /etc/bindplane-agent/ingestion-auth.json
    • Windows: C:\Program Files\observIQ OpenTelemetry Collector\ingestion-auth.json
  • <YOUR_CUSTOMER_ID>: ID do cliente da etapa anterior.
  • <CUSTOMER_REGION_ENDPOINT>: URL do endpoint regional:
    • EUA: malachiteingestion-pa.googleapis.com
    • Europa: europe-malachiteingestion-pa.googleapis.com
    • Ásia: asia-southeast1-malachiteingestion-pa.googleapis.com
    • Consulte a lista completa em Endpoints regionais.

Salvar o arquivo de configuração

Depois de editar, salve o arquivo: * Linux: pressione Ctrl+O, Enter e Ctrl+X. * Windows: clique em Arquivo > Salvar.

Reinicie o agente do Bindplane para aplicar as mudanças

  • Para reiniciar o agente do Bindplane no Linux, execute o seguinte comando:

    sudo systemctl restart observiq-otel-collector
    
    1. Verifique se o serviço está em execução:

      sudo systemctl status observiq-otel-collector
      
    2. Verifique se há erros nos registros:

      sudo journalctl -u observiq-otel-collector -f
      
  • Para reiniciar o agente do Bindplane em Windows, escolha uma das seguintes opções:

    • Usando o prompt de comando ou o PowerShell como administrador:

      net stop observiq-otel-collector && net start observiq-otel-collector
      
    • Usando o console do Services:

    1. Pressione Win+R, digite services.msc e pressione Enter.
    2. Localize o Coletor do OpenTelemetry da observIQ.
    3. Clique com o botão direito do mouse e selecione Reiniciar.

    4. Verifique se o serviço está em execução:

      sc query observiq-otel-collector
      
    5. Verifique se há erros nos registros:

      type "C:\Program Files\observIQ OpenTelemetry Collector\log\collector.log"
      

Criar um script para encaminhar registros de auditoria do ServiceNow para o syslog

Como o ServiceNow não oferece suporte nativo ao syslog para registros de auditoria, vamos criar um script que consulta a API REST do ServiceNow e encaminha os registros para o syslog. É possível programar a execução periódica desse script.

Exemplo de script do Python (Linux)

  • Crie um arquivo chamado servicenow_audit_to_syslog.py com o conteúdo a seguir:

    import urllib3
    import json
    import datetime
    import base64
    import socket
    import time
    import os
    
    # ServiceNow API details
    BASE_URL = 'https://instance.service-now.com'  # Replace with your ServiceNow instance URL
    USERNAME = 'admin'  # Replace with your ServiceNow username
    PASSWORD = 'password'  # Replace with your ServiceNow password
    
    # Syslog details
    SYSLOG_SERVER = '127.0.0.1'  # Replace with your Bindplane agent IP
    SYSLOG_PORT = 514  # Replace with your Bindplane agent port
    
    # State file to keep track of last run
    STATE_FILE = '/tmp/servicenow_audit_last_run.txt'
    
    # Pagination settings
    PAGE_SIZE = 1000
    MAX_PAGES = 1000
    
    def get_last_run_timestamp():
        try:
            with open(STATE_FILE, 'r') as f:
                return f.read().strip()
        except:
            return '1970-01-01 00:00:00'
    
    def update_state_file(timestamp):
        with open(STATE_FILE, 'w') as f:
            f.write(timestamp)
    
    def send_to_syslog(message):
        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        sock.sendto(message.encode(), (SYSLOG_SERVER, SYSLOG_PORT))
        sock.close()
    
    def get_audit_logs(last_run_timestamp):
        """
        Query ServiceNow sys_audit table with proper pagination.
        Uses sys_created_on field for timestamp filtering.
        """
        # Encode credentials using UTF-8
        auth_string = f"{USERNAME}:{PASSWORD}"
        auth_bytes = auth_string.encode('utf-8')
        auth_encoded = base64.b64encode(auth_bytes).decode('utf-8')
    
        # Setup HTTP client
        http = urllib3.PoolManager()
        headers = {
            'Authorization': f'Basic {auth_encoded}',
            'Accept': 'application/json'
        }
    
        results = []
        offset = 0
    
        # Format timestamp for ServiceNow (YYYY-MM-DD HH:MM:SS format)
        # Convert ISO format to ServiceNow format if needed
        if 'T' in last_run_timestamp:
            last_run_timestamp = last_run_timestamp.replace('T', ' ').split('.')[0]
    
        for page in range(MAX_PAGES):
            # Build query with pagination
            # Use >= operator for sys_created_on field (on or after)
            query_params = (
                f"sysparm_query=sys_created_on>={last_run_timestamp}"
                f"&sysparm_display_value=true"
                f"&sysparm_limit={PAGE_SIZE}"
                f"&sysparm_offset={offset}"
            )
    
            url = f"{BASE_URL}/api/now/table/sys_audit?{query_params}"
    
            try:
                response = http.request('GET', url, headers=headers)
    
                if response.status == 200:
                    data = json.loads(response.data.decode('utf-8'))
                    chunk = data.get('result', [])
                    results.extend(chunk)
    
                    # Stop if we got fewer records than PAGE_SIZE (last page)
                    if len(chunk) < PAGE_SIZE:
                        break
    
                    # Move to next page
                    offset += PAGE_SIZE
                else:
                    print(f"Error querying ServiceNow API: {response.status} - {response.data.decode('utf-8')}")
                    break
    
            except Exception as e:
                print(f"Exception querying ServiceNow API: {str(e)}")
                break
    
        return results
    
    def main():
        # Get last run timestamp
        last_run_timestamp = get_last_run_timestamp()
    
        # Current timestamp for this run
        current_timestamp = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    
        # Query ServiceNow API for audit logs
        audit_logs = get_audit_logs(last_run_timestamp)
    
        if audit_logs:
            # Send each log to syslog
            for log in audit_logs:
                # Format the log as JSON
                log_json = json.dumps(log)
    
                # Send to syslog
                send_to_syslog(log_json)
    
                # Sleep briefly to avoid flooding
                time.sleep(0.01)
    
            # Update state file
            update_state_file(current_timestamp)
    
            print(f"Successfully forwarded {len(audit_logs)} audit logs to syslog")
        else:
            print("No new audit logs to forward")
    
    if __name__ == "__main__":
        main()
    

Configurar a execução programada (Linux)

  • Torne o script executável:

    chmod +x servicenow_audit_to_syslog.py
    
  • Crie um cron job para executar o script a cada hora:

    crontab -e
    
  • Adicione a linha abaixo:

    0 * * * * /usr/bin/python3 /path/to/servicenow_audit_to_syslog.py >> /tmp/servicenow_audit_to_syslog.log 2>&1
    

Exemplo de script do PowerShell (Windows)

  • Crie um arquivo chamado ServiceNow-Audit-To-Syslog.ps1 com o conteúdo a seguir:

    # ServiceNow API details
    $BaseUrl = 'https://instance.service-now.com'  # Replace with your ServiceNow instance URL
    $Username = 'admin'  # Replace with your ServiceNow username
    $Password = 'password'  # Replace with your ServiceNow password
    
    # Syslog details
    $SyslogServer = '127.0.0.1'  # Replace with your Bindplane agent IP
    $SyslogPort = 514  # Replace with your Bindplane agent port
    
    # State file to keep track of last run
    $StateFile = "$env:TEMP\ServiceNowAuditLastRun.txt"
    
    # Pagination settings
    $PageSize = 1000
    $MaxPages = 1000
    
    function Get-LastRunTimestamp {
        try {
            if (Test-Path $StateFile) {
                return Get-Content $StateFile
            } else {
                return '1970-01-01 00:00:00'
            }
        } catch {
            return '1970-01-01 00:00:00'
        }
    }
    
    function Update-StateFile {
        param([string]$Timestamp)
        Set-Content -Path $StateFile -Value $Timestamp
    }
    
    function Send-ToSyslog {
        param([string]$Message)
        $UdpClient = New-Object System.Net.Sockets.UdpClient
        $UdpClient.Connect($SyslogServer, $SyslogPort)
        $Encoding = [System.Text.Encoding]::ASCII
        $Bytes = $Encoding.GetBytes($Message)
        $UdpClient.Send($Bytes, $Bytes.Length)
        $UdpClient.Close()
    }
    
    function Get-AuditLogs {
        param([string]$LastRunTimestamp)
    
        # Create auth header using UTF-8 encoding
        $Auth = [System.Convert]::ToBase64String([System.Text.Encoding]::UTF8.GetBytes("${Username}:${Password}"))
        $Headers = @{
            Authorization = "Basic ${Auth}"
            Accept = 'application/json'
        }
    
        $Results = @()
        $Offset = 0
    
        # Format timestamp for ServiceNow (YYYY-MM-DD HH:MM:SS format)
        # Convert ISO format to ServiceNow format if needed
        if ($LastRunTimestamp -match 'T') {
            $LastRunTimestamp = $LastRunTimestamp -replace 'T', ' '
            $LastRunTimestamp = $LastRunTimestamp -replace '\.\d+', ''
        }
    
        for ($page = 0; $page -lt $MaxPages; $page++) {
            # Build query with pagination
            # Use >= operator for sys_created_on field (on or after)
            $QueryParams = "sysparm_query=sys_created_on>=${LastRunTimestamp}&sysparm_display_value=true&sysparm_limit=${PageSize}&sysparm_offset=${Offset}"
            $Url = "${BaseUrl}/api/now/table/sys_audit?${QueryParams}"
    
            try {
                $Response = Invoke-RestMethod -Uri $Url -Headers $Headers -Method Get
                $Chunk = $Response.result
                $Results += $Chunk
    
                # Stop if we got fewer records than PageSize (last page)
                if ($Chunk.Count -lt $PageSize) {
                    break
                }
    
                # Move to next page
                $Offset += $PageSize
            } catch {
                Write-Error "Error querying ServiceNow API: $_"
                break
            }
        }
    
        return $Results
    }
    
    # Main execution
    $LastRunTimestamp = Get-LastRunTimestamp
    $CurrentTimestamp = (Get-Date).ToString('yyyy-MM-dd HH:mm:ss')
    
    $AuditLogs = Get-AuditLogs -LastRunTimestamp $LastRunTimestamp
    
    if ($AuditLogs -and $AuditLogs.Count -gt 0) {
        # Send each log to syslog
        foreach ($Log in $AuditLogs) {
            # Format the log as JSON
            $LogJson = $Log | ConvertTo-Json -Compress
    
            # Send to syslog
            Send-ToSyslog -Message $LogJson
    
            # Sleep briefly to avoid flooding
            Start-Sleep -Milliseconds 10
        }
    
        # Update state file
        Update-StateFile -Timestamp $CurrentTimestamp
    
        Write-Output "Successfully forwarded $($AuditLogs.Count) audit logs to syslog"
    } else {
        Write-Output "No new audit logs to forward"
    }
    

Configurar a execução programada (Windows)

  1. Abra o Agendador de tarefas.
  2. Clique em Criar tarefa.
  3. Forneça a seguinte configuração:
    • Nome: ServiceNowAuditToSyslog
    • Opções de segurança: executar se o usuário estiver conectado ou não
  4. Acesse a guia Gatilhos.
  5. Clique em Novo e defina para execução a cada hora.
  6. Acesse a guia Ações.
  7. Clique em Novo e defina:
    • Ação: iniciar um programa
    • Programa/script: powershell.exe
    • Arguments: -ExecutionPolicy Bypass -File "C:\path\to\ServiceNow-Audit-To-Syslog.ps1"
  8. Clique em OK para salvar a tarefa.

Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais do Google SecOps.