Journaux d'audit ServiceNow

Compatible avec :

Ce document explique comment ingérer des journaux d'audit ServiceNow dans Google Security Operations à l'aide de plusieurs méthodes.

Option A : GCS avec une fonction Cloud Run

Cette méthode utilise une fonction Cloud Run pour interroger régulièrement l'API REST ServiceNow afin d'obtenir les journaux d'audit et de les stocker dans un bucket GCS. Google Security Operations collecte ensuite les journaux du bucket GCS.

Avant de commencer

Assurez-vous de remplir les conditions préalables suivantes :

  • Une instance Google SecOps
  • Accès privilégié au locataire ou à l'API ServiceNow avec les rôles appropriés (généralement admin ou utilisateur avec accès en lecture à la table sys_audit)
  • Un projet GCP avec l'API Cloud Storage activée
  • Autorisations pour créer et gérer des buckets GCS
  • Autorisations permettant de gérer les stratégies IAM sur les buckets GCS
  • Autorisations permettant de créer des services Cloud Run, des sujets Pub/Sub et des tâches Cloud Scheduler

Collecter les prérequis ServiceNow (ID, clés API, ID d'organisation, jetons)

  1. Connectez-vous à la console d'administration ServiceNow.
  2. Accédez à Sécurité système > Utilisateurs et groupes > Utilisateurs.
  3. Créez un utilisateur ou sélectionnez-en un existant disposant des autorisations appropriées pour accéder aux journaux d'audit.
  4. Copiez et enregistrez les informations suivantes dans un emplacement sécurisé :

    • Nom d'utilisateur
    • Mot de passe
    • URL de l'instance (par exemple, https://instance.service-now.com)

Configurer les LCA pour les utilisateurs non administrateurs

Si vous souhaitez utiliser un compte utilisateur non administrateur, vous devez créer une liste de contrôle d'accès (ACL) personnalisée pour accorder l'accès en lecture à la table sys_audit :

  1. Connectez-vous à la console d'administration ServiceNow en tant qu'administrateur.
  2. Accédez à System Security> Access Control (ACL) (Sécurité système > Contrôle des accès [LCA]).
  3. Cliquez sur Nouveau.
  4. Fournissez les informations de configuration suivantes :
    • Type : sélectionnez record.
    • Operation (Opération) : sélectionnez read (lire).
    • Nom : saisissez sys_audit.
    • Description : saisissez Allow read access to sys_audit table for Chronicle integration.
  5. Dans le champ Rôle requis, ajoutez le rôle attribué à votre utilisateur d'intégration (par exemple, chronicle_reader).
  6. Cliquez sur Envoyer.
  7. Vérifiez que la liste de contrôle d'accès est active et que l'utilisateur peut interroger la table sys_audit.

Créer un bucket Google Cloud Storage

  1. Accédez à la console Google Cloud.
  2. Sélectionnez votre projet ou créez-en un.
  3. Dans le menu de navigation, accédez à Cloud Storage> Buckets.
  4. Cliquez sur Créer un bucket.
  5. Fournissez les informations de configuration suivantes :

    Paramètre Valeur
    Nommer votre bucket Saisissez un nom unique (par exemple, servicenow-audit-logs).
    Type d'emplacement Choisissez en fonction de vos besoins (région, birégion ou multirégion).
    Emplacement Sélectionnez l'emplacement (par exemple, us-central1).
    Classe de stockage Standard (recommandé pour les journaux auxquels vous accédez fréquemment)
    Access control (Contrôle des accès) Uniforme (recommandé)
    Outils de protection Facultatif : Activer la gestion des versions des objets ou la règle de conservation
  6. Cliquez sur Créer.

Créer un compte de service pour la fonction Cloud Run

La fonction Cloud Run a besoin d'un compte de service disposant des autorisations nécessaires pour écrire dans le bucket GCS et être appelée par Pub/Sub.

Créer un compte de service

  1. Dans la console GCP, accédez à IAM et administration > Comptes de service.
  2. Cliquez sur Créer un compte de service.
  3. Fournissez les informations de configuration suivantes :
    • Nom du compte de service : saisissez servicenow-audit-collector-sa.
    • Description du compte de service : saisissez Service account for Cloud Run function to collect ServiceNow audit logs.
  4. Cliquez sur Créer et continuer.
  5. Dans la section Autoriser ce compte de service à accéder au projet, ajoutez les rôles suivants :
    1. Cliquez sur Sélectionner un rôle.
    2. Recherchez et sélectionnez Administrateur des objets de l'espace de stockage.
    3. Cliquez sur + Ajouter un autre rôle.
    4. Recherchez et sélectionnez Demandeur Cloud Run.
    5. Cliquez sur + Ajouter un autre rôle.
    6. Recherchez et sélectionnez Demandeur Cloud Functions.
  6. Cliquez sur Continuer.
  7. Cliquez sur OK.

Ces rôles sont requis pour :

  • Administrateur des objets Storage : écrire des journaux dans le bucket GCS et gérer les fichiers d'état
  • Demandeur Cloud Run : autorise Pub/Sub à appeler la fonction
  • Demandeur Cloud Functions : autorise l'appel de fonctions

Accorder des autorisations IAM sur un bucket GCS

Accordez au compte de service des autorisations d'écriture sur le bucket GCS :

  1. Accédez à Cloud Storage > Buckets.
  2. Cliquez sur le nom de votre bucket.
  3. Accédez à l'onglet Autorisations.
  4. Cliquez sur Accorder l'accès.
  5. Fournissez les informations de configuration suivantes :
    • Ajouter des comptes principaux : saisissez l'adresse e-mail du compte de service (par exemple, servicenow-audit-collector-sa@PROJECT_ID.iam.gserviceaccount.com).
    • Attribuer des rôles : sélectionnez Administrateur des objets Storage.
  6. Cliquez sur Enregistrer.

Créer un sujet Pub/Sub

Créez un sujet Pub/Sub auquel Cloud Scheduler publiera des messages et auquel la fonction Cloud Run s'abonnera.

  1. Dans la console GCP, accédez à Pub/Sub > Sujets.
  2. Cliquez sur Create topic (Créer un sujet).
  3. Fournissez les informations de configuration suivantes :
    • ID du sujet : saisissez servicenow-audit-trigger.
    • Conservez les valeurs par défaut des autres paramètres.
  4. Cliquez sur Créer.

Créer une fonction Cloud Run pour collecter les journaux

La fonction Cloud Run est déclenchée par des messages Pub/Sub provenant de Cloud Scheduler pour extraire les journaux de l'API ServiceNow et les écrire dans GCS.

  1. Dans la console GCP, accédez à Cloud Run.
  2. Cliquez sur Créer un service.
  3. Sélectionnez Fonction (utilisez un éditeur intégré pour créer une fonction).
  4. Dans la section Configurer, fournissez les informations de configuration suivantes :

    Paramètre Valeur
    Nom du service servicenow-audit-collector
    Région Sélectionnez la région correspondant à votre bucket GCS (par exemple, us-central1).
    Runtime (durée d'exécution) Sélectionnez Python 3.12 ou version ultérieure.
  5. Dans la section Déclencheur (facultatif) :

    1. Cliquez sur + Ajouter un déclencheur.
    2. Sélectionnez Cloud Pub/Sub.
    3. Dans Sélectionner un sujet Cloud Pub/Sub, choisissez le sujet Pub/Sub (servicenow-audit-trigger).
    4. Cliquez sur Enregistrer.
  6. Dans la section Authentification :

    1. Sélectionnez Exiger l'authentification.
    2. Consultez Identity and Access Management (IAM).
  7. Faites défiler la page vers le bas, puis développez Conteneurs, mise en réseau, sécurité.

  8. Accédez à l'onglet Sécurité :

    • Compte de service : sélectionnez le compte de service (servicenow-audit-collector-sa).
  9. Accédez à l'onglet Conteneurs :

    1. Cliquez sur Variables et secrets.
    2. Cliquez sur + Ajouter une variable pour chaque variable d'environnement :
    Nom de la variable Exemple de valeur Description
    GCS_BUCKET servicenow-audit-logs Nom du bucket GCS
    GCS_PREFIX audit-logs Préfixe des fichiers journaux
    STATE_KEY audit-logs/state.json Chemin d'accès au fichier d'état
    API_BASE_URL https://instance.service-now.com URL de l'instance ServiceNow
    API_USERNAME your-username Nom d'utilisateur ServiceNow
    API_PASSWORD your-password Mot de passe ServiceNow
    PAGE_SIZE 1000 Enregistrements par page
    MAX_PAGES 1000 Nombre maximal de pages à extraire
  10. Dans la section Variables et secrets, faites défiler la page jusqu'à Requêtes :

    • Délai avant expiration de la requête : saisissez 600 secondes (10 minutes).
  11. Accédez à l'onglet Paramètres :

    • Dans la section Ressources :
      • Mémoire : sélectionnez 512 Mio ou plus.
      • CPU : sélectionnez 1.
  12. Dans la section Scaling de révision :

    • Nombre minimal d'instances : saisissez 0.
    • Nombre maximal d'instances : saisissez 100 (ou ajustez en fonction de la charge attendue).
  13. Cliquez sur Créer.

  14. Attendez que le service soit créé (1 à 2 minutes).

  15. Une fois le service créé, l'éditeur de code intégré s'ouvre automatiquement.

Ajouter un code de fonction

  1. Saisissez main dans le champ Point d'entrée.
  2. Dans l'éditeur de code intégré, créez deux fichiers :

    • Premier fichier : main.py:

          import functions_framework
          from google.cloud import storage
          import json
          import os
          import urllib3
          from datetime import datetime, timezone, timedelta
          import time
          import base64
      
          # Initialize HTTP client with timeouts
          http = urllib3.PoolManager(
              timeout=urllib3.Timeout(connect=5.0, read=30.0),
              retries=False,
          )
      
          # Initialize Storage client
          storage_client = storage.Client()
      
          # Environment variables
          GCS_BUCKET = os.environ.get('GCS_BUCKET')
          GCS_PREFIX = os.environ.get('GCS_PREFIX', 'audit-logs')
          STATE_KEY = os.environ.get('STATE_KEY', 'audit-logs/state.json')
          API_BASE = os.environ.get('API_BASE_URL')
          USERNAME = os.environ.get('API_USERNAME')
          PASSWORD = os.environ.get('API_PASSWORD')
          PAGE_SIZE = int(os.environ.get('PAGE_SIZE', '1000'))
          MAX_PAGES = int(os.environ.get('MAX_PAGES', '1000'))
      
          def parse_datetime(value: str) -> datetime:
              """Parse ServiceNow datetime string to datetime object."""
              # ServiceNow format: YYYY-MM-DD HH:MM:SS
              try:
                  return datetime.strptime(value, '%Y-%m-%d %H:%M:%S').replace(tzinfo=timezone.utc)
              except ValueError:
                  # Try ISO format as fallback
                  if value.endswith("Z"):
                      value = value[:-1] + "+00:00"
                  return datetime.fromisoformat(value)
      
          @functions_framework.cloud_event
          def main(cloud_event):
              """
              Cloud Run function triggered by Pub/Sub to fetch ServiceNow audit logs and write to GCS.
      
              Args:
                  cloud_event: CloudEvent object containing Pub/Sub message
              """
      
              if not all([GCS_BUCKET, API_BASE, USERNAME, PASSWORD]):
                  print('Error: Missing required environment variables')
                  return
      
              try:
                  # Get GCS bucket
                  bucket = storage_client.bucket(GCS_BUCKET)
      
                  # Load state
                  state = load_state(bucket, STATE_KEY)
      
                  # Determine time window
                  now = datetime.now(timezone.utc)
                  last_time = None
      
                  if isinstance(state, dict) and state.get("last_event_time"):
                      try:
                          last_time = parse_datetime(state["last_event_time"])
                          # Overlap by 2 minutes to catch any delayed events
                          last_time = last_time - timedelta(minutes=2)
                      except Exception as e:
                          print(f"Warning: Could not parse last_event_time: {e}")
      
                  if last_time is None:
                      last_time = now - timedelta(hours=24)
      
                  print(f"Fetching logs from {last_time.strftime('%Y-%m-%d %H:%M:%S')} to {now.strftime('%Y-%m-%d %H:%M:%S')}")
      
                  # Fetch logs
                  records, newest_event_time = fetch_logs(
                      api_base=API_BASE,
                      username=USERNAME,
                      password=PASSWORD,
                      start_time=last_time,
                      end_time=now,
                      page_size=PAGE_SIZE,
                      max_pages=MAX_PAGES,
                  )
      
                  if not records:
                      print("No new log records found.")
                      save_state(bucket, STATE_KEY, now.strftime('%Y-%m-%d %H:%M:%S'))
                      return
      
                  # Write to GCS as NDJSON
                  timestamp = now.strftime('%Y%m%d_%H%M%S')
                  object_key = f"{GCS_PREFIX}/logs_{timestamp}.ndjson"
                  blob = bucket.blob(object_key)
      
                  ndjson = '\n'.join([json.dumps(record, ensure_ascii=False) for record in records]) + '\n'
                  blob.upload_from_string(ndjson, content_type='application/x-ndjson')
      
                  print(f"Wrote {len(records)} records to gs://{GCS_BUCKET}/{object_key}")
      
                  # Update state with newest event time
                  if newest_event_time:
                      save_state(bucket, STATE_KEY, newest_event_time)
                  else:
                      save_state(bucket, STATE_KEY, now.strftime('%Y-%m-%d %H:%M:%S'))
      
                  print(f"Successfully processed {len(records)} records")
      
              except Exception as e:
                  print(f'Error processing logs: {str(e)}')
                  raise
      
          def load_state(bucket, key):
              """Load state from GCS."""
              try:
                  blob = bucket.blob(key)
                  if blob.exists():
                      state_data = blob.download_as_text()
                      return json.loads(state_data)
              except Exception as e:
                  print(f"Warning: Could not load state: {e}")
      
              return {}
      
          def save_state(bucket, key, last_event_time: str):
              """Save the last event timestamp to GCS state file."""
              try:
                  state = {'last_event_time': last_event_time}
                  blob = bucket.blob(key)
                  blob.upload_from_string(
                      json.dumps(state, indent=2),
                      content_type='application/json'
                  )
                  print(f"Saved state: last_event_time={last_event_time}")
              except Exception as e:
                  print(f"Warning: Could not save state: {e}")
      
          def fetch_logs(api_base: str, username: str, password: str, start_time: datetime, end_time: datetime, page_size: int, max_pages: int):
              """
              Fetch logs from ServiceNow sys_audit table with pagination and rate limiting.
      
              Args:
                  api_base: ServiceNow instance URL
                  username: ServiceNow username
                  password: ServiceNow password
                  start_time: Start time for log query
                  end_time: End time for log query
                  page_size: Number of records per page
                  max_pages: Maximum total pages to fetch
      
              Returns:
                  Tuple of (records list, newest_event_time string)
              """
              # Clean up base URL
              base_url = api_base.rstrip('/')
      
              endpoint = f"{base_url}/api/now/table/sys_audit"
      
              # Encode credentials using UTF-8
              auth_string = f"{username}:{password}"
              auth_bytes = auth_string.encode('utf-8')
              auth_b64 = base64.b64encode(auth_bytes).decode('utf-8')
      
              headers = {
                  'Authorization': f'Basic {auth_b64}',
                  'Accept': 'application/json',
                  'Content-Type': 'application/json',
                  'User-Agent': 'GoogleSecOps-ServiceNowCollector/1.0'
              }
      
              records = []
              newest_time = None
              page_num = 0
              backoff = 1.0
              offset = 0
      
              # Format timestamps for ServiceNow (YYYY-MM-DD HH:MM:SS)
              start_time_str = start_time.strftime('%Y-%m-%d %H:%M:%S')
      
              while True:
                  page_num += 1
      
                  if len(records) >= page_size * max_pages:
                      print(f"Reached max_pages limit ({max_pages})")
                      break
      
                  # Build query parameters
                  # Use >= operator for sys_created_on field (on or after)
                  params = []
                  params.append(f"sysparm_query=sys_created_on>={start_time_str}")
                  params.append(f"sysparm_display_value=true")
                  params.append(f"sysparm_limit={page_size}")
                  params.append(f"sysparm_offset={offset}")
      
                  url = f"{endpoint}?{'&'.join(params)}"
      
                  try:
                      response = http.request('GET', url, headers=headers)
      
                      # Handle rate limiting with exponential backoff
                      if response.status == 429:
                          retry_after = int(response.headers.get('Retry-After', str(int(backoff))))
                          print(f"Rate limited (429). Retrying after {retry_after}s...")
                          time.sleep(retry_after)
                          backoff = min(backoff * 2, 30.0)
                          continue
      
                      backoff = 1.0
      
                      if response.status != 200:
                          print(f"HTTP Error: {response.status}")
                          response_text = response.data.decode('utf-8')
                          print(f"Response body: {response_text}")
                          return [], None
      
                      data = json.loads(response.data.decode('utf-8'))
                      page_results = data.get('result', [])
      
                      if not page_results:
                          print(f"No more results (empty page)")
                          break
      
                      print(f"Page {page_num}: Retrieved {len(page_results)} events")
                      records.extend(page_results)
      
                      # Track newest event time
                      for event in page_results:
                          try:
                              event_time = event.get('sys_created_on')
                              if event_time:
                                  if newest_time is None or parse_datetime(event_time) > parse_datetime(newest_time):
                                      newest_time = event_time
                          except Exception as e:
                              print(f"Warning: Could not parse event time: {e}")
      
                      # Check for more results
                      if len(page_results) < page_size:
                          print(f"Reached last page (size={len(page_results)} < limit={page_size})")
                          break
      
                      # Move to next page
                      offset += page_size
      
                      # Small delay to avoid rate limiting
                      time.sleep(0.1)
      
                  except Exception as e:
                      print(f"Error fetching logs: {e}")
                      return [], None
      
              print(f"Retrieved {len(records)} total records from {page_num} pages")
              return records, newest_time
          ```
      
    • Deuxième fichier : requirements.txt:

      ```
      functions-framework==3.*
      google-cloud-storage==2.*
      urllib3>=2.0.0
      ```
      
  3. Cliquez sur Déployer pour enregistrer et déployer la fonction.

  4. Attendez la fin du déploiement (deux à trois minutes).

Créer une tâche Cloud Scheduler

Cloud Scheduler publie des messages sur le sujet Pub/Sub à intervalles réguliers, ce qui déclenche la fonction Cloud Run.

  1. Dans la console GCP, accédez à Cloud Scheduler.
  2. Cliquez sur Créer une tâche.
  3. Fournissez les informations de configuration suivantes :

    Paramètre Valeur
    Nom servicenow-audit-collector-hourly
    Région Sélectionnez la même région que la fonction Cloud Run.
    Fréquence 0 * * * * (toutes les heures)
    Fuseau horaire Sélectionnez un fuseau horaire (UTC recommandé).
    Type de cible Pub/Sub
    Topic Sélectionnez le sujet Pub/Sub (servicenow-audit-trigger).
    Corps du message {} (objet JSON vide)
  4. Cliquez sur Créer.

Options de fréquence de planification

  • Choisissez la fréquence en fonction du volume de journaux et des exigences de latence :

    Fréquence Expression Cron Cas d'utilisation
    Toutes les 5 minutes */5 * * * * Volume élevé, faible latence
    Toutes les 15 minutes */15 * * * * Volume moyen
    Toutes les heures 0 * * * * Standard (recommandé)
    Toutes les 6 heures 0 */6 * * * Traitement par lot à faible volume
    Tous les jours 0 0 * * * Collecte de données historiques

Tester l'intégration

  1. Dans la console Cloud Scheduler, recherchez votre job.
  2. Cliquez sur Exécuter de force pour déclencher le job manuellement.
  3. Patientez pendant quelques secondes.
  4. Accédez à Cloud Run > Services.
  5. Cliquez sur le nom de votre fonction (servicenow-audit-collector).
  6. Cliquez sur l'onglet Journaux.
  7. Vérifiez que la fonction s'est exécutée correctement. Recherchez les éléments suivants :

    Fetching logs from YYYY-MM-DD HH:MM:SS to YYYY-MM-DD HH:MM:SS
    Page 1: Retrieved X events
    Wrote X records to gs://bucket-name/audit-logs/logs_YYYYMMDD_HHMMSS.ndjson
    Successfully processed X records
    
  8. Accédez à Cloud Storage > Buckets.

  9. Cliquez sur le nom de votre bucket.

  10. Accédez au dossier de préfixe (audit-logs/).

  11. Vérifiez qu'un fichier .ndjson a été créé avec le code temporel actuel.

Si vous constatez des erreurs dans les journaux :

  • HTTP 401 : vérifiez les identifiants de l'API dans les variables d'environnement
  • HTTP 403 : vérifiez que le compte dispose des autorisations requises (rôle d'administrateur ou ACL personnalisée pour sys_audit).
  • HTTP 429 : limitation du débit. La fonction effectuera automatiquement une nouvelle tentative avec un intervalle de temps.
  • Variables d'environnement manquantes : vérifiez que toutes les variables requises sont définies.

Récupérer le compte de service Google SecOps

Google SecOps utilise un compte de service unique pour lire les données de votre bucket GCS. Vous devez accorder à ce compte de service l'accès à votre bucket.

Obtenir l'adresse e-mail du compte de service

  1. Accédez à Paramètres SIEM> Flux.
  2. Cliquez sur Add New Feed (Ajouter un flux).
  3. Cliquez sur Configurer un flux unique.
  4. Dans le champ Nom du flux, saisissez un nom pour le flux (par exemple, ServiceNow Audit logs).
  5. Sélectionnez Google Cloud Storage V2 comme Type de source.
  6. Sélectionnez Audit ServiceNow comme Type de journal.
  7. Cliquez sur Obtenir un compte de service. Une adresse e-mail unique pour le compte de service s'affiche, par exemple :

    chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com
    
  8. Copiez cette adresse e-mail pour l'utiliser à l'étape suivante.

Accorder des autorisations IAM au compte de service Google SecOps

Le compte de service Google SecOps a besoin du rôle Lecteur des objets Storage sur votre bucket GCS.

  1. Accédez à Cloud Storage > Buckets.
  2. Cliquez sur le nom de votre bucket.
  3. Accédez à l'onglet Autorisations.
  4. Cliquez sur Accorder l'accès.
  5. Fournissez les informations de configuration suivantes :
    • Ajouter des comptes principaux : collez l'adresse e-mail du compte de service Google SecOps.
    • Attribuez des rôles : sélectionnez Lecteur des objets de l'espace de stockage.
  6. Cliquez sur Enregistrer.

Configurer un flux dans Google SecOps pour ingérer les journaux d'audit ServiceNow

  1. Accédez à Paramètres SIEM> Flux.
  2. Cliquez sur Add New Feed (Ajouter un flux).
  3. Cliquez sur Configurer un flux unique.
  4. Dans le champ Nom du flux, saisissez un nom pour le flux (par exemple, ServiceNow Audit logs).
  5. Sélectionnez Google Cloud Storage V2 comme Type de source.
  6. Sélectionnez Audit ServiceNow comme Type de journal.
  7. Cliquez sur Suivant.
  8. Spécifiez les valeurs des paramètres d'entrée suivants :

    • URL du bucket Storage : saisissez l'URI du bucket GCS avec le préfixe du chemin d'accès :

      gs://servicenow-audit-logs/audit-logs/
      
      • Remplacez :

        • servicenow-audit-logs : nom de votre bucket GCS.
        • audit-logs : préfixe/chemin d'accès au dossier dans lequel les journaux sont stockés.
    • Option de suppression de la source : sélectionnez l'option de suppression de votre choix :

      • Jamais : ne supprime jamais aucun fichier après les transferts (recommandé pour les tests).
      • Supprimer les fichiers transférés : supprime les fichiers après un transfert réussi.
      • Supprimer les fichiers transférés et les répertoires vides : supprime les fichiers et les répertoires vides après un transfert réussi.

    • Âge maximal des fichiers : incluez les fichiers modifiés au cours des derniers jours. La valeur par défaut est de 180 jours.

    • Espace de noms de l'élément : espace de noms de l'élément.

    • Libellés d'ingestion : libellé à appliquer aux événements de ce flux.

  9. Cliquez sur Suivant.

  10. Vérifiez la configuration de votre nouveau flux sur l'écran Finaliser, puis cliquez sur Envoyer.

Option B : Agent Bindplane avec syslog

Cette méthode utilise un agent Bindplane pour collecter les journaux d'audit ServiceNow et les transférer vers Google Security Operations. Comme ServiceNow n'est pas compatible avec syslog pour les journaux d'audit, nous allons utiliser un script pour interroger l'API REST ServiceNow et transférer les journaux à l'agent Bindplane via syslog.

Avant de commencer

Assurez-vous de remplir les conditions suivantes :

  • Une instance Google SecOps
  • Windows Server 2016 ou version ultérieure, ou hôte Linux avec systemd
  • Connectivité réseau entre l'agent Bindplane et ServiceNow
  • Si vous exécutez l'agent derrière un proxy, assurez-vous que les ports de pare-feu sont ouverts conformément aux exigences de l'agent Bindplane.
  • Accès privilégié à la console ou à l'appliance de gestion ServiceNow avec les rôles appropriés (généralement admin ou utilisateur avec accès en lecture à la table sys_audit)

Obtenir le fichier d'authentification d'ingestion Google SecOps

  1. Connectez-vous à la console Google SecOps.
  2. Accédez à Paramètres du SIEM > Agent de collecte.
  3. Cliquez sur Télécharger pour télécharger le fichier d'authentification pour l'ingestion.
  4. Enregistrez le fichier de manière sécurisée sur le système sur lequel Bindplane sera installé.

Obtenir l'ID client Google SecOps

  1. Connectez-vous à la console Google SecOps.
  2. Accédez à Paramètres SIEM> Profil.
  3. Copiez et enregistrez le numéro client de la section Informations sur l'organisation.

Installer l'agent BindPlane

Installez l'agent Bindplane sur votre système d'exploitation Windows ou Linux en suivant les instructions ci-dessous.

Installation de fenêtres

  1. Ouvrez l'invite de commandes ou PowerShell en tant qu'administrateur.
  2. Exécutez la commande suivante :

    msiexec /i "https://github.com/observIQ/bindplane-agent/releases/latest/download/observiq-otel-collector.msi" /quiet
    
  3. Attendez la fin de l'installation.

  4. Vérifiez l'installation en exécutant la commande suivante :

    sc query observiq-otel-collector
    

Le service doit être indiqué comme RUNNING (EN COURS D'EXÉCUTION).

Installation de Linux

  1. Ouvrez un terminal avec les droits root ou sudo.
  2. Exécutez la commande suivante :

    sudo sh -c "$(curl -fsSlL https://github.com/observiq/bindplane-agent/releases/latest/download/install_unix.sh)" install_unix.sh
    
  3. Attendez la fin de l'installation.

  4. Vérifiez l'installation en exécutant la commande suivante :

    sudo systemctl status observiq-otel-collector
    

Le service doit être indiqué comme actif (en cours d'exécution).

Autres ressources d'installation

Pour obtenir d'autres options d'installation et de dépannage, consultez le guide d'installation de l'agent Bindplane.

Configurer l'agent Bindplane pour ingérer les journaux syslog et les envoyer à Google SecOps

Localiser le fichier de configuration

Linux : bash sudo nano /etc/bindplane-agent/config.yaml

Windows : cmd notepad "C:\Program Files\observIQ OpenTelemetry Collector\config.yaml"

Modifiez le fichier de configuration

Remplacez l'intégralité du contenu de config.yaml par la configuration suivante :

```yaml
receivers:
  udplog:
    listen_address: "0.0.0.0:514"

exporters:
  chronicle/servicenow_audit:
    compression: gzip
    creds_file_path: '/path/to/ingestion-authentication-file.json'
    customer_id: '<YOUR_CUSTOMER_ID>'
    endpoint: <CUSTOMER_REGION_ENDPOINT>
    log_type: 'SERVICENOW_AUDIT'
    raw_log_field: body
    ingestion_labels:
      service: servicenow

service:
  pipelines:
    logs/servicenow_to_chronicle:
      receivers:
        - udplog
      exporters:
        - chronicle/servicenow_audit
```

Paramètres de configuration

Remplacez les espaces réservés suivants :

  • listen_address : adresse IP et port à écouter. Utilisez 0.0.0.0:514 pour écouter sur toutes les interfaces sur le port 514.
  • creds_file_path : chemin d'accès complet au fichier d'authentification de l'ingestion :
    • Linux : /etc/bindplane-agent/ingestion-auth.json
    • Windows : C:\Program Files\observIQ OpenTelemetry Collector\ingestion-auth.json
  • <YOUR_CUSTOMER_ID> : ID client de l'étape précédente.
  • <CUSTOMER_REGION_ENDPOINT> : URL du point de terminaison régional :
    • États-Unis : malachiteingestion-pa.googleapis.com
    • Europe : europe-malachiteingestion-pa.googleapis.com
    • Asie : asia-southeast1-malachiteingestion-pa.googleapis.com
    • Pour obtenir la liste complète, consultez Points de terminaison régionaux.

Enregistrez le fichier de configuration.

Après avoir modifié le fichier, enregistrez-le : * Linux : appuyez sur Ctrl+O, puis sur Enter, puis sur Ctrl+X. * Windows : cliquez sur Fichier > Enregistrer.

Redémarrez l'agent Bindplane pour appliquer les modifications.

  • Pour redémarrer l'agent Bindplane sous Linux, exécutez la commande suivante :

    sudo systemctl restart observiq-otel-collector
    
    1. Vérifiez que le service est en cours d'exécution :

      sudo systemctl status observiq-otel-collector
      
    2. Recherchez les erreurs dans les journaux :

      sudo journalctl -u observiq-otel-collector -f
      
  • Pour redémarrer l'agent Bindplane dans Windows, choisissez l'une des options suivantes :

    • À l'aide de l'invite de commandes ou de PowerShell en tant qu'administrateur :

      net stop observiq-otel-collector && net start observiq-otel-collector
      
    • Utiliser la console Services :

    1. Appuyez sur Win+R, saisissez services.msc, puis appuyez sur Entrée.
    2. Localisez observIQ OpenTelemetry Collector.
    3. Effectuez un clic droit, puis sélectionnez Redémarrer.

    4. Vérifiez que le service est en cours d'exécution :

      sc query observiq-otel-collector
      
    5. Recherchez les erreurs dans les journaux :

      type "C:\Program Files\observIQ OpenTelemetry Collector\log\collector.log"
      

Créer un script pour transférer les journaux d'audit ServiceNow vers syslog

Comme ServiceNow n'est pas compatible avec syslog pour les journaux d'audit, nous allons créer un script qui interroge l'API REST ServiceNow et transfère les journaux vers syslog. Vous pouvez programmer l'exécution de ce script à intervalles réguliers.

Exemple de script Python (Linux)

  • Créez un fichier nommé servicenow_audit_to_syslog.py avec le contenu suivant :

    import urllib3
    import json
    import datetime
    import base64
    import socket
    import time
    import os
    
    # ServiceNow API details
    BASE_URL = 'https://instance.service-now.com'  # Replace with your ServiceNow instance URL
    USERNAME = 'admin'  # Replace with your ServiceNow username
    PASSWORD = 'password'  # Replace with your ServiceNow password
    
    # Syslog details
    SYSLOG_SERVER = '127.0.0.1'  # Replace with your Bindplane agent IP
    SYSLOG_PORT = 514  # Replace with your Bindplane agent port
    
    # State file to keep track of last run
    STATE_FILE = '/tmp/servicenow_audit_last_run.txt'
    
    # Pagination settings
    PAGE_SIZE = 1000
    MAX_PAGES = 1000
    
    def get_last_run_timestamp():
        try:
            with open(STATE_FILE, 'r') as f:
                return f.read().strip()
        except:
            return '1970-01-01 00:00:00'
    
    def update_state_file(timestamp):
        with open(STATE_FILE, 'w') as f:
            f.write(timestamp)
    
    def send_to_syslog(message):
        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        sock.sendto(message.encode(), (SYSLOG_SERVER, SYSLOG_PORT))
        sock.close()
    
    def get_audit_logs(last_run_timestamp):
        """
        Query ServiceNow sys_audit table with proper pagination.
        Uses sys_created_on field for timestamp filtering.
        """
        # Encode credentials using UTF-8
        auth_string = f"{USERNAME}:{PASSWORD}"
        auth_bytes = auth_string.encode('utf-8')
        auth_encoded = base64.b64encode(auth_bytes).decode('utf-8')
    
        # Setup HTTP client
        http = urllib3.PoolManager()
        headers = {
            'Authorization': f'Basic {auth_encoded}',
            'Accept': 'application/json'
        }
    
        results = []
        offset = 0
    
        # Format timestamp for ServiceNow (YYYY-MM-DD HH:MM:SS format)
        # Convert ISO format to ServiceNow format if needed
        if 'T' in last_run_timestamp:
            last_run_timestamp = last_run_timestamp.replace('T', ' ').split('.')[0]
    
        for page in range(MAX_PAGES):
            # Build query with pagination
            # Use >= operator for sys_created_on field (on or after)
            query_params = (
                f"sysparm_query=sys_created_on>={last_run_timestamp}"
                f"&sysparm_display_value=true"
                f"&sysparm_limit={PAGE_SIZE}"
                f"&sysparm_offset={offset}"
            )
    
            url = f"{BASE_URL}/api/now/table/sys_audit?{query_params}"
    
            try:
                response = http.request('GET', url, headers=headers)
    
                if response.status == 200:
                    data = json.loads(response.data.decode('utf-8'))
                    chunk = data.get('result', [])
                    results.extend(chunk)
    
                    # Stop if we got fewer records than PAGE_SIZE (last page)
                    if len(chunk) < PAGE_SIZE:
                        break
    
                    # Move to next page
                    offset += PAGE_SIZE
                else:
                    print(f"Error querying ServiceNow API: {response.status} - {response.data.decode('utf-8')}")
                    break
    
            except Exception as e:
                print(f"Exception querying ServiceNow API: {str(e)}")
                break
    
        return results
    
    def main():
        # Get last run timestamp
        last_run_timestamp = get_last_run_timestamp()
    
        # Current timestamp for this run
        current_timestamp = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    
        # Query ServiceNow API for audit logs
        audit_logs = get_audit_logs(last_run_timestamp)
    
        if audit_logs:
            # Send each log to syslog
            for log in audit_logs:
                # Format the log as JSON
                log_json = json.dumps(log)
    
                # Send to syslog
                send_to_syslog(log_json)
    
                # Sleep briefly to avoid flooding
                time.sleep(0.01)
    
            # Update state file
            update_state_file(current_timestamp)
    
            print(f"Successfully forwarded {len(audit_logs)} audit logs to syslog")
        else:
            print("No new audit logs to forward")
    
    if __name__ == "__main__":
        main()
    

Configurer l'exécution planifiée (Linux)

  • Rendez le script exécutable :

    chmod +x servicenow_audit_to_syslog.py
    
  • Créez une tâche cron pour exécuter le script toutes les heures :

    crontab -e
    
  • Ajoutez la ligne suivante :

    0 * * * * /usr/bin/python3 /path/to/servicenow_audit_to_syslog.py >> /tmp/servicenow_audit_to_syslog.log 2>&1
    

Exemple de script PowerShell (Windows)

  • Créez un fichier nommé ServiceNow-Audit-To-Syslog.ps1 avec le contenu suivant :

    # ServiceNow API details
    $BaseUrl = 'https://instance.service-now.com'  # Replace with your ServiceNow instance URL
    $Username = 'admin'  # Replace with your ServiceNow username
    $Password = 'password'  # Replace with your ServiceNow password
    
    # Syslog details
    $SyslogServer = '127.0.0.1'  # Replace with your Bindplane agent IP
    $SyslogPort = 514  # Replace with your Bindplane agent port
    
    # State file to keep track of last run
    $StateFile = "$env:TEMP\ServiceNowAuditLastRun.txt"
    
    # Pagination settings
    $PageSize = 1000
    $MaxPages = 1000
    
    function Get-LastRunTimestamp {
        try {
            if (Test-Path $StateFile) {
                return Get-Content $StateFile
            } else {
                return '1970-01-01 00:00:00'
            }
        } catch {
            return '1970-01-01 00:00:00'
        }
    }
    
    function Update-StateFile {
        param([string]$Timestamp)
        Set-Content -Path $StateFile -Value $Timestamp
    }
    
    function Send-ToSyslog {
        param([string]$Message)
        $UdpClient = New-Object System.Net.Sockets.UdpClient
        $UdpClient.Connect($SyslogServer, $SyslogPort)
        $Encoding = [System.Text.Encoding]::ASCII
        $Bytes = $Encoding.GetBytes($Message)
        $UdpClient.Send($Bytes, $Bytes.Length)
        $UdpClient.Close()
    }
    
    function Get-AuditLogs {
        param([string]$LastRunTimestamp)
    
        # Create auth header using UTF-8 encoding
        $Auth = [System.Convert]::ToBase64String([System.Text.Encoding]::UTF8.GetBytes("${Username}:${Password}"))
        $Headers = @{
            Authorization = "Basic ${Auth}"
            Accept = 'application/json'
        }
    
        $Results = @()
        $Offset = 0
    
        # Format timestamp for ServiceNow (YYYY-MM-DD HH:MM:SS format)
        # Convert ISO format to ServiceNow format if needed
        if ($LastRunTimestamp -match 'T') {
            $LastRunTimestamp = $LastRunTimestamp -replace 'T', ' '
            $LastRunTimestamp = $LastRunTimestamp -replace '\.\d+', ''
        }
    
        for ($page = 0; $page -lt $MaxPages; $page++) {
            # Build query with pagination
            # Use >= operator for sys_created_on field (on or after)
            $QueryParams = "sysparm_query=sys_created_on>=${LastRunTimestamp}&sysparm_display_value=true&sysparm_limit=${PageSize}&sysparm_offset=${Offset}"
            $Url = "${BaseUrl}/api/now/table/sys_audit?${QueryParams}"
    
            try {
                $Response = Invoke-RestMethod -Uri $Url -Headers $Headers -Method Get
                $Chunk = $Response.result
                $Results += $Chunk
    
                # Stop if we got fewer records than PageSize (last page)
                if ($Chunk.Count -lt $PageSize) {
                    break
                }
    
                # Move to next page
                $Offset += $PageSize
            } catch {
                Write-Error "Error querying ServiceNow API: $_"
                break
            }
        }
    
        return $Results
    }
    
    # Main execution
    $LastRunTimestamp = Get-LastRunTimestamp
    $CurrentTimestamp = (Get-Date).ToString('yyyy-MM-dd HH:mm:ss')
    
    $AuditLogs = Get-AuditLogs -LastRunTimestamp $LastRunTimestamp
    
    if ($AuditLogs -and $AuditLogs.Count -gt 0) {
        # Send each log to syslog
        foreach ($Log in $AuditLogs) {
            # Format the log as JSON
            $LogJson = $Log | ConvertTo-Json -Compress
    
            # Send to syslog
            Send-ToSyslog -Message $LogJson
    
            # Sleep briefly to avoid flooding
            Start-Sleep -Milliseconds 10
        }
    
        # Update state file
        Update-StateFile -Timestamp $CurrentTimestamp
    
        Write-Output "Successfully forwarded $($AuditLogs.Count) audit logs to syslog"
    } else {
        Write-Output "No new audit logs to forward"
    }
    

Configurer l'exécution planifiée (Windows)

  1. Ouvrez le Planificateur de tâches.
  2. Cliquez sur Créer une tâche.
  3. Fournissez la configuration suivante :
    • Nom : ServiceNowAuditToSyslog
    • Options de sécurité : exécuter que l'utilisateur soit connecté ou non
  4. Accédez à l'onglet Triggers (Déclencheurs).
  5. Cliquez sur Nouveau et définissez l'exécution toutes les heures.
  6. Accédez à l'onglet Actions.
  7. Cliquez sur Nouveau, puis définissez les paramètres suivants :
    • Action : Démarrer un programme
    • Programme/script : powershell.exe
    • Arguments : -ExecutionPolicy Bypass -File "C:\path\to\ServiceNow-Audit-To-Syslog.ps1"
  8. Cliquez sur OK pour enregistrer la tâche.

Vous avez encore besoin d'aide ? Obtenez des réponses de membres de la communauté et de professionnels Google SecOps.