Journaux d'audit ServiceNow
Ce document explique comment ingérer des journaux d'audit ServiceNow dans Google Security Operations à l'aide de plusieurs méthodes.
Option A : GCS avec une fonction Cloud Run
Cette méthode utilise une fonction Cloud Run pour interroger régulièrement l'API REST ServiceNow afin d'obtenir les journaux d'audit et de les stocker dans un bucket GCS. Google Security Operations collecte ensuite les journaux du bucket GCS.
Avant de commencer
Assurez-vous de remplir les conditions préalables suivantes :
- Une instance Google SecOps
- Accès privilégié au locataire ou à l'API ServiceNow avec les rôles appropriés (généralement
adminou utilisateur avec accès en lecture à la table sys_audit) - Un projet GCP avec l'API Cloud Storage activée
- Autorisations pour créer et gérer des buckets GCS
- Autorisations permettant de gérer les stratégies IAM sur les buckets GCS
- Autorisations permettant de créer des services Cloud Run, des sujets Pub/Sub et des tâches Cloud Scheduler
Collecter les prérequis ServiceNow (ID, clés API, ID d'organisation, jetons)
- Connectez-vous à la console d'administration ServiceNow.
- Accédez à Sécurité système > Utilisateurs et groupes > Utilisateurs.
- Créez un utilisateur ou sélectionnez-en un existant disposant des autorisations appropriées pour accéder aux journaux d'audit.
Copiez et enregistrez les informations suivantes dans un emplacement sécurisé :
- Nom d'utilisateur
- Mot de passe
- URL de l'instance (par exemple,
https://instance.service-now.com)
Configurer les LCA pour les utilisateurs non administrateurs
Si vous souhaitez utiliser un compte utilisateur non administrateur, vous devez créer une liste de contrôle d'accès (ACL) personnalisée pour accorder l'accès en lecture à la table sys_audit :
- Connectez-vous à la console d'administration ServiceNow en tant qu'administrateur.
- Accédez à System Security> Access Control (ACL) (Sécurité système > Contrôle des accès [LCA]).
- Cliquez sur Nouveau.
- Fournissez les informations de configuration suivantes :
- Type : sélectionnez record.
- Operation (Opération) : sélectionnez read (lire).
- Nom : saisissez
sys_audit. - Description : saisissez
Allow read access to sys_audit table for Chronicle integration.
- Dans le champ Rôle requis, ajoutez le rôle attribué à votre utilisateur d'intégration (par exemple,
chronicle_reader). - Cliquez sur Envoyer.
- Vérifiez que la liste de contrôle d'accès est active et que l'utilisateur peut interroger la table sys_audit.
Créer un bucket Google Cloud Storage
- Accédez à la console Google Cloud.
- Sélectionnez votre projet ou créez-en un.
- Dans le menu de navigation, accédez à Cloud Storage> Buckets.
- Cliquez sur Créer un bucket.
Fournissez les informations de configuration suivantes :
Paramètre Valeur Nommer votre bucket Saisissez un nom unique (par exemple, servicenow-audit-logs).Type d'emplacement Choisissez en fonction de vos besoins (région, birégion ou multirégion). Emplacement Sélectionnez l'emplacement (par exemple, us-central1).Classe de stockage Standard (recommandé pour les journaux auxquels vous accédez fréquemment) Access control (Contrôle des accès) Uniforme (recommandé) Outils de protection Facultatif : Activer la gestion des versions des objets ou la règle de conservation Cliquez sur Créer.
Créer un compte de service pour la fonction Cloud Run
La fonction Cloud Run a besoin d'un compte de service disposant des autorisations nécessaires pour écrire dans le bucket GCS et être appelée par Pub/Sub.
Créer un compte de service
- Dans la console GCP, accédez à IAM et administration > Comptes de service.
- Cliquez sur Créer un compte de service.
- Fournissez les informations de configuration suivantes :
- Nom du compte de service : saisissez
servicenow-audit-collector-sa. - Description du compte de service : saisissez
Service account for Cloud Run function to collect ServiceNow audit logs.
- Nom du compte de service : saisissez
- Cliquez sur Créer et continuer.
- Dans la section Autoriser ce compte de service à accéder au projet, ajoutez les rôles suivants :
- Cliquez sur Sélectionner un rôle.
- Recherchez et sélectionnez Administrateur des objets de l'espace de stockage.
- Cliquez sur + Ajouter un autre rôle.
- Recherchez et sélectionnez Demandeur Cloud Run.
- Cliquez sur + Ajouter un autre rôle.
- Recherchez et sélectionnez Demandeur Cloud Functions.
- Cliquez sur Continuer.
- Cliquez sur OK.
Ces rôles sont requis pour :
- Administrateur des objets Storage : écrire des journaux dans le bucket GCS et gérer les fichiers d'état
- Demandeur Cloud Run : autorise Pub/Sub à appeler la fonction
- Demandeur Cloud Functions : autorise l'appel de fonctions
Accorder des autorisations IAM sur un bucket GCS
Accordez au compte de service des autorisations d'écriture sur le bucket GCS :
- Accédez à Cloud Storage > Buckets.
- Cliquez sur le nom de votre bucket.
- Accédez à l'onglet Autorisations.
- Cliquez sur Accorder l'accès.
- Fournissez les informations de configuration suivantes :
- Ajouter des comptes principaux : saisissez l'adresse e-mail du compte de service (par exemple,
servicenow-audit-collector-sa@PROJECT_ID.iam.gserviceaccount.com). - Attribuer des rôles : sélectionnez Administrateur des objets Storage.
- Ajouter des comptes principaux : saisissez l'adresse e-mail du compte de service (par exemple,
- Cliquez sur Enregistrer.
Créer un sujet Pub/Sub
Créez un sujet Pub/Sub auquel Cloud Scheduler publiera des messages et auquel la fonction Cloud Run s'abonnera.
- Dans la console GCP, accédez à Pub/Sub > Sujets.
- Cliquez sur Create topic (Créer un sujet).
- Fournissez les informations de configuration suivantes :
- ID du sujet : saisissez
servicenow-audit-trigger. - Conservez les valeurs par défaut des autres paramètres.
- ID du sujet : saisissez
- Cliquez sur Créer.
Créer une fonction Cloud Run pour collecter les journaux
La fonction Cloud Run est déclenchée par des messages Pub/Sub provenant de Cloud Scheduler pour extraire les journaux de l'API ServiceNow et les écrire dans GCS.
- Dans la console GCP, accédez à Cloud Run.
- Cliquez sur Créer un service.
- Sélectionnez Fonction (utilisez un éditeur intégré pour créer une fonction).
Dans la section Configurer, fournissez les informations de configuration suivantes :
Paramètre Valeur Nom du service servicenow-audit-collectorRégion Sélectionnez la région correspondant à votre bucket GCS (par exemple, us-central1).Runtime (durée d'exécution) Sélectionnez Python 3.12 ou version ultérieure. Dans la section Déclencheur (facultatif) :
- Cliquez sur + Ajouter un déclencheur.
- Sélectionnez Cloud Pub/Sub.
- Dans Sélectionner un sujet Cloud Pub/Sub, choisissez le sujet Pub/Sub (
servicenow-audit-trigger). - Cliquez sur Enregistrer.
Dans la section Authentification :
- Sélectionnez Exiger l'authentification.
- Consultez Identity and Access Management (IAM).
Faites défiler la page vers le bas, puis développez Conteneurs, mise en réseau, sécurité.
Accédez à l'onglet Sécurité :
- Compte de service : sélectionnez le compte de service (
servicenow-audit-collector-sa).
- Compte de service : sélectionnez le compte de service (
Accédez à l'onglet Conteneurs :
- Cliquez sur Variables et secrets.
- Cliquez sur + Ajouter une variable pour chaque variable d'environnement :
Nom de la variable Exemple de valeur Description GCS_BUCKETservicenow-audit-logsNom du bucket GCS GCS_PREFIXaudit-logsPréfixe des fichiers journaux STATE_KEYaudit-logs/state.jsonChemin d'accès au fichier d'état API_BASE_URLhttps://instance.service-now.comURL de l'instance ServiceNow API_USERNAMEyour-usernameNom d'utilisateur ServiceNow API_PASSWORDyour-passwordMot de passe ServiceNow PAGE_SIZE1000Enregistrements par page MAX_PAGES1000Nombre maximal de pages à extraire Dans la section Variables et secrets, faites défiler la page jusqu'à Requêtes :
- Délai avant expiration de la requête : saisissez
600secondes (10 minutes).
- Délai avant expiration de la requête : saisissez
Accédez à l'onglet Paramètres :
- Dans la section Ressources :
- Mémoire : sélectionnez 512 Mio ou plus.
- CPU : sélectionnez 1.
- Dans la section Ressources :
Dans la section Scaling de révision :
- Nombre minimal d'instances : saisissez
0. - Nombre maximal d'instances : saisissez
100(ou ajustez en fonction de la charge attendue).
- Nombre minimal d'instances : saisissez
Cliquez sur Créer.
Attendez que le service soit créé (1 à 2 minutes).
Une fois le service créé, l'éditeur de code intégré s'ouvre automatiquement.
Ajouter un code de fonction
- Saisissez main dans le champ Point d'entrée.
Dans l'éditeur de code intégré, créez deux fichiers :
Premier fichier : main.py:
import functions_framework from google.cloud import storage import json import os import urllib3 from datetime import datetime, timezone, timedelta import time import base64 # Initialize HTTP client with timeouts http = urllib3.PoolManager( timeout=urllib3.Timeout(connect=5.0, read=30.0), retries=False, ) # Initialize Storage client storage_client = storage.Client() # Environment variables GCS_BUCKET = os.environ.get('GCS_BUCKET') GCS_PREFIX = os.environ.get('GCS_PREFIX', 'audit-logs') STATE_KEY = os.environ.get('STATE_KEY', 'audit-logs/state.json') API_BASE = os.environ.get('API_BASE_URL') USERNAME = os.environ.get('API_USERNAME') PASSWORD = os.environ.get('API_PASSWORD') PAGE_SIZE = int(os.environ.get('PAGE_SIZE', '1000')) MAX_PAGES = int(os.environ.get('MAX_PAGES', '1000')) def parse_datetime(value: str) -> datetime: """Parse ServiceNow datetime string to datetime object.""" # ServiceNow format: YYYY-MM-DD HH:MM:SS try: return datetime.strptime(value, '%Y-%m-%d %H:%M:%S').replace(tzinfo=timezone.utc) except ValueError: # Try ISO format as fallback if value.endswith("Z"): value = value[:-1] + "+00:00" return datetime.fromisoformat(value) @functions_framework.cloud_event def main(cloud_event): """ Cloud Run function triggered by Pub/Sub to fetch ServiceNow audit logs and write to GCS. Args: cloud_event: CloudEvent object containing Pub/Sub message """ if not all([GCS_BUCKET, API_BASE, USERNAME, PASSWORD]): print('Error: Missing required environment variables') return try: # Get GCS bucket bucket = storage_client.bucket(GCS_BUCKET) # Load state state = load_state(bucket, STATE_KEY) # Determine time window now = datetime.now(timezone.utc) last_time = None if isinstance(state, dict) and state.get("last_event_time"): try: last_time = parse_datetime(state["last_event_time"]) # Overlap by 2 minutes to catch any delayed events last_time = last_time - timedelta(minutes=2) except Exception as e: print(f"Warning: Could not parse last_event_time: {e}") if last_time is None: last_time = now - timedelta(hours=24) print(f"Fetching logs from {last_time.strftime('%Y-%m-%d %H:%M:%S')} to {now.strftime('%Y-%m-%d %H:%M:%S')}") # Fetch logs records, newest_event_time = fetch_logs( api_base=API_BASE, username=USERNAME, password=PASSWORD, start_time=last_time, end_time=now, page_size=PAGE_SIZE, max_pages=MAX_PAGES, ) if not records: print("No new log records found.") save_state(bucket, STATE_KEY, now.strftime('%Y-%m-%d %H:%M:%S')) return # Write to GCS as NDJSON timestamp = now.strftime('%Y%m%d_%H%M%S') object_key = f"{GCS_PREFIX}/logs_{timestamp}.ndjson" blob = bucket.blob(object_key) ndjson = '\n'.join([json.dumps(record, ensure_ascii=False) for record in records]) + '\n' blob.upload_from_string(ndjson, content_type='application/x-ndjson') print(f"Wrote {len(records)} records to gs://{GCS_BUCKET}/{object_key}") # Update state with newest event time if newest_event_time: save_state(bucket, STATE_KEY, newest_event_time) else: save_state(bucket, STATE_KEY, now.strftime('%Y-%m-%d %H:%M:%S')) print(f"Successfully processed {len(records)} records") except Exception as e: print(f'Error processing logs: {str(e)}') raise def load_state(bucket, key): """Load state from GCS.""" try: blob = bucket.blob(key) if blob.exists(): state_data = blob.download_as_text() return json.loads(state_data) except Exception as e: print(f"Warning: Could not load state: {e}") return {} def save_state(bucket, key, last_event_time: str): """Save the last event timestamp to GCS state file.""" try: state = {'last_event_time': last_event_time} blob = bucket.blob(key) blob.upload_from_string( json.dumps(state, indent=2), content_type='application/json' ) print(f"Saved state: last_event_time={last_event_time}") except Exception as e: print(f"Warning: Could not save state: {e}") def fetch_logs(api_base: str, username: str, password: str, start_time: datetime, end_time: datetime, page_size: int, max_pages: int): """ Fetch logs from ServiceNow sys_audit table with pagination and rate limiting. Args: api_base: ServiceNow instance URL username: ServiceNow username password: ServiceNow password start_time: Start time for log query end_time: End time for log query page_size: Number of records per page max_pages: Maximum total pages to fetch Returns: Tuple of (records list, newest_event_time string) """ # Clean up base URL base_url = api_base.rstrip('/') endpoint = f"{base_url}/api/now/table/sys_audit" # Encode credentials using UTF-8 auth_string = f"{username}:{password}" auth_bytes = auth_string.encode('utf-8') auth_b64 = base64.b64encode(auth_bytes).decode('utf-8') headers = { 'Authorization': f'Basic {auth_b64}', 'Accept': 'application/json', 'Content-Type': 'application/json', 'User-Agent': 'GoogleSecOps-ServiceNowCollector/1.0' } records = [] newest_time = None page_num = 0 backoff = 1.0 offset = 0 # Format timestamps for ServiceNow (YYYY-MM-DD HH:MM:SS) start_time_str = start_time.strftime('%Y-%m-%d %H:%M:%S') while True: page_num += 1 if len(records) >= page_size * max_pages: print(f"Reached max_pages limit ({max_pages})") break # Build query parameters # Use >= operator for sys_created_on field (on or after) params = [] params.append(f"sysparm_query=sys_created_on>={start_time_str}") params.append(f"sysparm_display_value=true") params.append(f"sysparm_limit={page_size}") params.append(f"sysparm_offset={offset}") url = f"{endpoint}?{'&'.join(params)}" try: response = http.request('GET', url, headers=headers) # Handle rate limiting with exponential backoff if response.status == 429: retry_after = int(response.headers.get('Retry-After', str(int(backoff)))) print(f"Rate limited (429). Retrying after {retry_after}s...") time.sleep(retry_after) backoff = min(backoff * 2, 30.0) continue backoff = 1.0 if response.status != 200: print(f"HTTP Error: {response.status}") response_text = response.data.decode('utf-8') print(f"Response body: {response_text}") return [], None data = json.loads(response.data.decode('utf-8')) page_results = data.get('result', []) if not page_results: print(f"No more results (empty page)") break print(f"Page {page_num}: Retrieved {len(page_results)} events") records.extend(page_results) # Track newest event time for event in page_results: try: event_time = event.get('sys_created_on') if event_time: if newest_time is None or parse_datetime(event_time) > parse_datetime(newest_time): newest_time = event_time except Exception as e: print(f"Warning: Could not parse event time: {e}") # Check for more results if len(page_results) < page_size: print(f"Reached last page (size={len(page_results)} < limit={page_size})") break # Move to next page offset += page_size # Small delay to avoid rate limiting time.sleep(0.1) except Exception as e: print(f"Error fetching logs: {e}") return [], None print(f"Retrieved {len(records)} total records from {page_num} pages") return records, newest_time ```
Deuxième fichier : requirements.txt:
``` functions-framework==3.* google-cloud-storage==2.* urllib3>=2.0.0 ```
Cliquez sur Déployer pour enregistrer et déployer la fonction.
Attendez la fin du déploiement (deux à trois minutes).
Créer une tâche Cloud Scheduler
Cloud Scheduler publie des messages sur le sujet Pub/Sub à intervalles réguliers, ce qui déclenche la fonction Cloud Run.
- Dans la console GCP, accédez à Cloud Scheduler.
- Cliquez sur Créer une tâche.
Fournissez les informations de configuration suivantes :
Paramètre Valeur Nom servicenow-audit-collector-hourlyRégion Sélectionnez la même région que la fonction Cloud Run. Fréquence 0 * * * *(toutes les heures)Fuseau horaire Sélectionnez un fuseau horaire (UTC recommandé). Type de cible Pub/Sub Topic Sélectionnez le sujet Pub/Sub ( servicenow-audit-trigger).Corps du message {}(objet JSON vide)Cliquez sur Créer.
Options de fréquence de planification
Choisissez la fréquence en fonction du volume de journaux et des exigences de latence :
Fréquence Expression Cron Cas d'utilisation Toutes les 5 minutes */5 * * * *Volume élevé, faible latence Toutes les 15 minutes */15 * * * *Volume moyen Toutes les heures 0 * * * *Standard (recommandé) Toutes les 6 heures 0 */6 * * *Traitement par lot à faible volume Tous les jours 0 0 * * *Collecte de données historiques
Tester l'intégration
- Dans la console Cloud Scheduler, recherchez votre job.
- Cliquez sur Exécuter de force pour déclencher le job manuellement.
- Patientez pendant quelques secondes.
- Accédez à Cloud Run > Services.
- Cliquez sur le nom de votre fonction (
servicenow-audit-collector). - Cliquez sur l'onglet Journaux.
Vérifiez que la fonction s'est exécutée correctement. Recherchez les éléments suivants :
Fetching logs from YYYY-MM-DD HH:MM:SS to YYYY-MM-DD HH:MM:SS Page 1: Retrieved X events Wrote X records to gs://bucket-name/audit-logs/logs_YYYYMMDD_HHMMSS.ndjson Successfully processed X recordsAccédez à Cloud Storage > Buckets.
Cliquez sur le nom de votre bucket.
Accédez au dossier de préfixe (
audit-logs/).Vérifiez qu'un fichier
.ndjsona été créé avec le code temporel actuel.
Si vous constatez des erreurs dans les journaux :
- HTTP 401 : vérifiez les identifiants de l'API dans les variables d'environnement
- HTTP 403 : vérifiez que le compte dispose des autorisations requises (rôle d'administrateur ou ACL personnalisée pour sys_audit).
- HTTP 429 : limitation du débit. La fonction effectuera automatiquement une nouvelle tentative avec un intervalle de temps.
- Variables d'environnement manquantes : vérifiez que toutes les variables requises sont définies.
Récupérer le compte de service Google SecOps
Google SecOps utilise un compte de service unique pour lire les données de votre bucket GCS. Vous devez accorder à ce compte de service l'accès à votre bucket.
Obtenir l'adresse e-mail du compte de service
- Accédez à Paramètres SIEM> Flux.
- Cliquez sur Add New Feed (Ajouter un flux).
- Cliquez sur Configurer un flux unique.
- Dans le champ Nom du flux, saisissez un nom pour le flux (par exemple,
ServiceNow Audit logs). - Sélectionnez Google Cloud Storage V2 comme Type de source.
- Sélectionnez Audit ServiceNow comme Type de journal.
Cliquez sur Obtenir un compte de service. Une adresse e-mail unique pour le compte de service s'affiche, par exemple :
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.comCopiez cette adresse e-mail pour l'utiliser à l'étape suivante.
Accorder des autorisations IAM au compte de service Google SecOps
Le compte de service Google SecOps a besoin du rôle Lecteur des objets Storage sur votre bucket GCS.
- Accédez à Cloud Storage > Buckets.
- Cliquez sur le nom de votre bucket.
- Accédez à l'onglet Autorisations.
- Cliquez sur Accorder l'accès.
- Fournissez les informations de configuration suivantes :
- Ajouter des comptes principaux : collez l'adresse e-mail du compte de service Google SecOps.
- Attribuez des rôles : sélectionnez Lecteur des objets de l'espace de stockage.
Cliquez sur Enregistrer.
Configurer un flux dans Google SecOps pour ingérer les journaux d'audit ServiceNow
- Accédez à Paramètres SIEM> Flux.
- Cliquez sur Add New Feed (Ajouter un flux).
- Cliquez sur Configurer un flux unique.
- Dans le champ Nom du flux, saisissez un nom pour le flux (par exemple,
ServiceNow Audit logs). - Sélectionnez Google Cloud Storage V2 comme Type de source.
- Sélectionnez Audit ServiceNow comme Type de journal.
- Cliquez sur Suivant.
Spécifiez les valeurs des paramètres d'entrée suivants :
URL du bucket Storage : saisissez l'URI du bucket GCS avec le préfixe du chemin d'accès :
gs://servicenow-audit-logs/audit-logs/Remplacez :
servicenow-audit-logs: nom de votre bucket GCS.audit-logs: préfixe/chemin d'accès au dossier dans lequel les journaux sont stockés.
Option de suppression de la source : sélectionnez l'option de suppression de votre choix :
- Jamais : ne supprime jamais aucun fichier après les transferts (recommandé pour les tests).
- Supprimer les fichiers transférés : supprime les fichiers après un transfert réussi.
Supprimer les fichiers transférés et les répertoires vides : supprime les fichiers et les répertoires vides après un transfert réussi.
Âge maximal des fichiers : incluez les fichiers modifiés au cours des derniers jours. La valeur par défaut est de 180 jours.
Espace de noms de l'élément : espace de noms de l'élément.
Libellés d'ingestion : libellé à appliquer aux événements de ce flux.
Cliquez sur Suivant.
Vérifiez la configuration de votre nouveau flux sur l'écran Finaliser, puis cliquez sur Envoyer.
Option B : Agent Bindplane avec syslog
Cette méthode utilise un agent Bindplane pour collecter les journaux d'audit ServiceNow et les transférer vers Google Security Operations. Comme ServiceNow n'est pas compatible avec syslog pour les journaux d'audit, nous allons utiliser un script pour interroger l'API REST ServiceNow et transférer les journaux à l'agent Bindplane via syslog.
Avant de commencer
Assurez-vous de remplir les conditions suivantes :
- Une instance Google SecOps
- Windows Server 2016 ou version ultérieure, ou hôte Linux avec
systemd - Connectivité réseau entre l'agent Bindplane et ServiceNow
- Si vous exécutez l'agent derrière un proxy, assurez-vous que les ports de pare-feu sont ouverts conformément aux exigences de l'agent Bindplane.
- Accès privilégié à la console ou à l'appliance de gestion ServiceNow avec les rôles appropriés (généralement
adminou utilisateur avec accès en lecture à la table sys_audit)
Obtenir le fichier d'authentification d'ingestion Google SecOps
- Connectez-vous à la console Google SecOps.
- Accédez à Paramètres du SIEM > Agent de collecte.
- Cliquez sur Télécharger pour télécharger le fichier d'authentification pour l'ingestion.
Enregistrez le fichier de manière sécurisée sur le système sur lequel Bindplane sera installé.
Obtenir l'ID client Google SecOps
- Connectez-vous à la console Google SecOps.
- Accédez à Paramètres SIEM> Profil.
Copiez et enregistrez le numéro client de la section Informations sur l'organisation.
Installer l'agent BindPlane
Installez l'agent Bindplane sur votre système d'exploitation Windows ou Linux en suivant les instructions ci-dessous.
Installation de fenêtres
- Ouvrez l'invite de commandes ou PowerShell en tant qu'administrateur.
Exécutez la commande suivante :
msiexec /i "https://github.com/observIQ/bindplane-agent/releases/latest/download/observiq-otel-collector.msi" /quietAttendez la fin de l'installation.
Vérifiez l'installation en exécutant la commande suivante :
sc query observiq-otel-collector
Le service doit être indiqué comme RUNNING (EN COURS D'EXÉCUTION).
Installation de Linux
- Ouvrez un terminal avec les droits root ou sudo.
Exécutez la commande suivante :
sudo sh -c "$(curl -fsSlL https://github.com/observiq/bindplane-agent/releases/latest/download/install_unix.sh)" install_unix.shAttendez la fin de l'installation.
Vérifiez l'installation en exécutant la commande suivante :
sudo systemctl status observiq-otel-collector
Le service doit être indiqué comme actif (en cours d'exécution).
Autres ressources d'installation
Pour obtenir d'autres options d'installation et de dépannage, consultez le guide d'installation de l'agent Bindplane.
Configurer l'agent Bindplane pour ingérer les journaux syslog et les envoyer à Google SecOps
Localiser le fichier de configuration
Linux : bash
sudo nano /etc/bindplane-agent/config.yaml
Windows :
cmd
notepad "C:\Program Files\observIQ OpenTelemetry Collector\config.yaml"
Modifiez le fichier de configuration
Remplacez l'intégralité du contenu de config.yaml par la configuration suivante :
```yaml
receivers:
udplog:
listen_address: "0.0.0.0:514"
exporters:
chronicle/servicenow_audit:
compression: gzip
creds_file_path: '/path/to/ingestion-authentication-file.json'
customer_id: '<YOUR_CUSTOMER_ID>'
endpoint: <CUSTOMER_REGION_ENDPOINT>
log_type: 'SERVICENOW_AUDIT'
raw_log_field: body
ingestion_labels:
service: servicenow
service:
pipelines:
logs/servicenow_to_chronicle:
receivers:
- udplog
exporters:
- chronicle/servicenow_audit
```
Paramètres de configuration
Remplacez les espaces réservés suivants :
listen_address: adresse IP et port à écouter. Utilisez0.0.0.0:514pour écouter sur toutes les interfaces sur le port 514.creds_file_path: chemin d'accès complet au fichier d'authentification de l'ingestion :- Linux :
/etc/bindplane-agent/ingestion-auth.json - Windows :
C:\Program Files\observIQ OpenTelemetry Collector\ingestion-auth.json
- Linux :
<YOUR_CUSTOMER_ID>: ID client de l'étape précédente.<CUSTOMER_REGION_ENDPOINT>: URL du point de terminaison régional :- États-Unis :
malachiteingestion-pa.googleapis.com - Europe :
europe-malachiteingestion-pa.googleapis.com - Asie :
asia-southeast1-malachiteingestion-pa.googleapis.com - Pour obtenir la liste complète, consultez Points de terminaison régionaux.
- États-Unis :
Enregistrez le fichier de configuration.
Après avoir modifié le fichier, enregistrez-le :
* Linux : appuyez sur Ctrl+O, puis sur Enter, puis sur Ctrl+X.
* Windows : cliquez sur Fichier > Enregistrer.
Redémarrez l'agent Bindplane pour appliquer les modifications.
Pour redémarrer l'agent Bindplane sous Linux, exécutez la commande suivante :
sudo systemctl restart observiq-otel-collectorVérifiez que le service est en cours d'exécution :
sudo systemctl status observiq-otel-collectorRecherchez les erreurs dans les journaux :
sudo journalctl -u observiq-otel-collector -f
Pour redémarrer l'agent Bindplane dans Windows, choisissez l'une des options suivantes :
À l'aide de l'invite de commandes ou de PowerShell en tant qu'administrateur :
net stop observiq-otel-collector && net start observiq-otel-collectorUtiliser la console Services :
- Appuyez sur
Win+R, saisissezservices.msc, puis appuyez sur Entrée. - Localisez observIQ OpenTelemetry Collector.
Effectuez un clic droit, puis sélectionnez Redémarrer.
Vérifiez que le service est en cours d'exécution :
sc query observiq-otel-collectorRecherchez les erreurs dans les journaux :
type "C:\Program Files\observIQ OpenTelemetry Collector\log\collector.log"
Créer un script pour transférer les journaux d'audit ServiceNow vers syslog
Comme ServiceNow n'est pas compatible avec syslog pour les journaux d'audit, nous allons créer un script qui interroge l'API REST ServiceNow et transfère les journaux vers syslog. Vous pouvez programmer l'exécution de ce script à intervalles réguliers.
Exemple de script Python (Linux)
Créez un fichier nommé
servicenow_audit_to_syslog.pyavec le contenu suivant :import urllib3 import json import datetime import base64 import socket import time import os # ServiceNow API details BASE_URL = 'https://instance.service-now.com' # Replace with your ServiceNow instance URL USERNAME = 'admin' # Replace with your ServiceNow username PASSWORD = 'password' # Replace with your ServiceNow password # Syslog details SYSLOG_SERVER = '127.0.0.1' # Replace with your Bindplane agent IP SYSLOG_PORT = 514 # Replace with your Bindplane agent port # State file to keep track of last run STATE_FILE = '/tmp/servicenow_audit_last_run.txt' # Pagination settings PAGE_SIZE = 1000 MAX_PAGES = 1000 def get_last_run_timestamp(): try: with open(STATE_FILE, 'r') as f: return f.read().strip() except: return '1970-01-01 00:00:00' def update_state_file(timestamp): with open(STATE_FILE, 'w') as f: f.write(timestamp) def send_to_syslog(message): sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.sendto(message.encode(), (SYSLOG_SERVER, SYSLOG_PORT)) sock.close() def get_audit_logs(last_run_timestamp): """ Query ServiceNow sys_audit table with proper pagination. Uses sys_created_on field for timestamp filtering. """ # Encode credentials using UTF-8 auth_string = f"{USERNAME}:{PASSWORD}" auth_bytes = auth_string.encode('utf-8') auth_encoded = base64.b64encode(auth_bytes).decode('utf-8') # Setup HTTP client http = urllib3.PoolManager() headers = { 'Authorization': f'Basic {auth_encoded}', 'Accept': 'application/json' } results = [] offset = 0 # Format timestamp for ServiceNow (YYYY-MM-DD HH:MM:SS format) # Convert ISO format to ServiceNow format if needed if 'T' in last_run_timestamp: last_run_timestamp = last_run_timestamp.replace('T', ' ').split('.')[0] for page in range(MAX_PAGES): # Build query with pagination # Use >= operator for sys_created_on field (on or after) query_params = ( f"sysparm_query=sys_created_on>={last_run_timestamp}" f"&sysparm_display_value=true" f"&sysparm_limit={PAGE_SIZE}" f"&sysparm_offset={offset}" ) url = f"{BASE_URL}/api/now/table/sys_audit?{query_params}" try: response = http.request('GET', url, headers=headers) if response.status == 200: data = json.loads(response.data.decode('utf-8')) chunk = data.get('result', []) results.extend(chunk) # Stop if we got fewer records than PAGE_SIZE (last page) if len(chunk) < PAGE_SIZE: break # Move to next page offset += PAGE_SIZE else: print(f"Error querying ServiceNow API: {response.status} - {response.data.decode('utf-8')}") break except Exception as e: print(f"Exception querying ServiceNow API: {str(e)}") break return results def main(): # Get last run timestamp last_run_timestamp = get_last_run_timestamp() # Current timestamp for this run current_timestamp = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') # Query ServiceNow API for audit logs audit_logs = get_audit_logs(last_run_timestamp) if audit_logs: # Send each log to syslog for log in audit_logs: # Format the log as JSON log_json = json.dumps(log) # Send to syslog send_to_syslog(log_json) # Sleep briefly to avoid flooding time.sleep(0.01) # Update state file update_state_file(current_timestamp) print(f"Successfully forwarded {len(audit_logs)} audit logs to syslog") else: print("No new audit logs to forward") if __name__ == "__main__": main()
Configurer l'exécution planifiée (Linux)
Rendez le script exécutable :
chmod +x servicenow_audit_to_syslog.pyCréez une tâche cron pour exécuter le script toutes les heures :
crontab -eAjoutez la ligne suivante :
0 * * * * /usr/bin/python3 /path/to/servicenow_audit_to_syslog.py >> /tmp/servicenow_audit_to_syslog.log 2>&1
Exemple de script PowerShell (Windows)
Créez un fichier nommé
ServiceNow-Audit-To-Syslog.ps1avec le contenu suivant :# ServiceNow API details $BaseUrl = 'https://instance.service-now.com' # Replace with your ServiceNow instance URL $Username = 'admin' # Replace with your ServiceNow username $Password = 'password' # Replace with your ServiceNow password # Syslog details $SyslogServer = '127.0.0.1' # Replace with your Bindplane agent IP $SyslogPort = 514 # Replace with your Bindplane agent port # State file to keep track of last run $StateFile = "$env:TEMP\ServiceNowAuditLastRun.txt" # Pagination settings $PageSize = 1000 $MaxPages = 1000 function Get-LastRunTimestamp { try { if (Test-Path $StateFile) { return Get-Content $StateFile } else { return '1970-01-01 00:00:00' } } catch { return '1970-01-01 00:00:00' } } function Update-StateFile { param([string]$Timestamp) Set-Content -Path $StateFile -Value $Timestamp } function Send-ToSyslog { param([string]$Message) $UdpClient = New-Object System.Net.Sockets.UdpClient $UdpClient.Connect($SyslogServer, $SyslogPort) $Encoding = [System.Text.Encoding]::ASCII $Bytes = $Encoding.GetBytes($Message) $UdpClient.Send($Bytes, $Bytes.Length) $UdpClient.Close() } function Get-AuditLogs { param([string]$LastRunTimestamp) # Create auth header using UTF-8 encoding $Auth = [System.Convert]::ToBase64String([System.Text.Encoding]::UTF8.GetBytes("${Username}:${Password}")) $Headers = @{ Authorization = "Basic ${Auth}" Accept = 'application/json' } $Results = @() $Offset = 0 # Format timestamp for ServiceNow (YYYY-MM-DD HH:MM:SS format) # Convert ISO format to ServiceNow format if needed if ($LastRunTimestamp -match 'T') { $LastRunTimestamp = $LastRunTimestamp -replace 'T', ' ' $LastRunTimestamp = $LastRunTimestamp -replace '\.\d+', '' } for ($page = 0; $page -lt $MaxPages; $page++) { # Build query with pagination # Use >= operator for sys_created_on field (on or after) $QueryParams = "sysparm_query=sys_created_on>=${LastRunTimestamp}&sysparm_display_value=true&sysparm_limit=${PageSize}&sysparm_offset=${Offset}" $Url = "${BaseUrl}/api/now/table/sys_audit?${QueryParams}" try { $Response = Invoke-RestMethod -Uri $Url -Headers $Headers -Method Get $Chunk = $Response.result $Results += $Chunk # Stop if we got fewer records than PageSize (last page) if ($Chunk.Count -lt $PageSize) { break } # Move to next page $Offset += $PageSize } catch { Write-Error "Error querying ServiceNow API: $_" break } } return $Results } # Main execution $LastRunTimestamp = Get-LastRunTimestamp $CurrentTimestamp = (Get-Date).ToString('yyyy-MM-dd HH:mm:ss') $AuditLogs = Get-AuditLogs -LastRunTimestamp $LastRunTimestamp if ($AuditLogs -and $AuditLogs.Count -gt 0) { # Send each log to syslog foreach ($Log in $AuditLogs) { # Format the log as JSON $LogJson = $Log | ConvertTo-Json -Compress # Send to syslog Send-ToSyslog -Message $LogJson # Sleep briefly to avoid flooding Start-Sleep -Milliseconds 10 } # Update state file Update-StateFile -Timestamp $CurrentTimestamp Write-Output "Successfully forwarded $($AuditLogs.Count) audit logs to syslog" } else { Write-Output "No new audit logs to forward" }
Configurer l'exécution planifiée (Windows)
- Ouvrez le Planificateur de tâches.
- Cliquez sur Créer une tâche.
- Fournissez la configuration suivante :
- Nom :
ServiceNowAuditToSyslog - Options de sécurité : exécuter que l'utilisateur soit connecté ou non
- Nom :
- Accédez à l'onglet Triggers (Déclencheurs).
- Cliquez sur Nouveau et définissez l'exécution toutes les heures.
- Accédez à l'onglet Actions.
- Cliquez sur Nouveau, puis définissez les paramètres suivants :
- Action : Démarrer un programme
- Programme/script :
powershell.exe - Arguments :
-ExecutionPolicy Bypass -File "C:\path\to\ServiceNow-Audit-To-Syslog.ps1"
- Cliquez sur OK pour enregistrer la tâche.
Vous avez encore besoin d'aide ? Obtenez des réponses de membres de la communauté et de professionnels Google SecOps.