ServiceNow 审核日志

支持的平台:

本文档介绍了如何使用多种方法将 ServiceNow 审核日志提取到 Google Security Operations。

选项 A:将 GCS 与 Cloud Run 函数搭配使用

此方法使用 Cloud Run 函数定期查询 ServiceNow REST API 以获取审核日志,并将这些日志存储在 GCS 存储分区中。然后,Google Security Operations 会从 GCS 存储分区中收集日志。

准备工作

确保您满足以下前提条件:

  • Google SecOps 实例
  • 对 ServiceNow 租户或 API 的特权访问权限,并具有适当的角色(通常为 admin 或对 sys_audit 表具有读取权限的用户)
  • 已启用 Cloud Storage API 的 GCP 项目
  • 创建和管理 GCS 存储分区的权限
  • 管理 GCS 存储分区的 IAM 政策的权限
  • 创建 Cloud Run 服务、Pub/Sub 主题和 Cloud Scheduler 作业的权限

收集 ServiceNow 前提条件(ID、API 密钥、组织 ID、令牌)

  1. 登录 ServiceNow 管理控制台
  2. 依次前往系统安全性 > 用户和群组 > 用户
  3. 创建新用户或选择具有适当权限的现有用户来访问审核日志。
  4. 复制以下详细信息并将其保存在安全的位置:

    • 用户名
    • 密码
    • 实例网址(例如 https://instance.service-now.com

为非管理员用户配置 ACL

如果您想使用非管理员用户账号,则必须创建自定义访问控制列表 (ACL),以授予对 sys_audit 表的读取权限:

  1. 以管理员身份登录 ServiceNow 管理控制台
  2. 依次前往系统安全性 > 访问权限控制 (ACL)
  3. 点击新建
  4. 提供以下配置详细信息:
    • 类型:选择记录
    • 操作:选择读取
    • 名称:输入 sys_audit
    • 说明:输入 Allow read access to sys_audit table for Chronicle integration
  5. Requires role 字段中,添加分配给集成用户的角色(例如 chronicle_reader)。
  6. 点击提交
  7. 验证 ACL 是否处于有效状态,以及用户是否可以查询 sys_audit 表。

创建 Google Cloud Storage 存储分区

  1. 前往 Google Cloud 控制台
  2. 选择您的项目或创建新项目。
  3. 在导航菜单中,依次前往 Cloud Storage > 存储分区
  4. 点击创建存储分区
  5. 提供以下配置详细信息:

    设置
    为存储分区命名 输入一个全局唯一的名称(例如 servicenow-audit-logs
    位置类型 根据您的需求进行选择(区域级、双区域级、多区域级)
    位置 选择相应位置(例如 us-central1
    存储类别 标准(建议用于经常访问的日志)
    访问权限控制 统一(推荐)
    保护工具 可选:启用对象版本控制或保留政策
  6. 点击创建

为 Cloud Run 函数创建服务账号

Cloud Run 函数需要一个服务账号,该账号具有向 GCS 存储分区写入内容以及被 Pub/Sub 调用的权限。

创建服务账号

  1. GCP 控制台中,依次前往 IAM 和管理 > 服务账号
  2. 点击创建服务账号
  3. 提供以下配置详细信息:
    • 服务账号名称:输入 servicenow-audit-collector-sa
    • 服务账号说明:输入 Service account for Cloud Run function to collect ServiceNow audit logs
  4. 点击创建并继续
  5. 向此服务账号授予对项目的访问权限部分中,添加以下角色:
    1. 点击选择角色
    2. 搜索并选择 Storage Object Admin
    3. 点击 + 添加其他角色
    4. 搜索并选择 Cloud Run Invoker
    5. 点击 + 添加其他角色
    6. 搜索并选择 Cloud Functions Invoker
  6. 点击继续
  7. 点击完成

必须拥有这些角色,才能:

  • Storage Object Admin:将日志写入 GCS 存储分区并管理状态文件
  • Cloud Run Invoker:允许 Pub/Sub 调用函数
  • Cloud Functions Invoker:允许调用函数

授予对 GCS 存储分区的 IAM 权限

向服务账号授予对 GCS 存储分区的写入权限:

  1. 前往 Cloud Storage > 存储分区
  2. 点击您的存储分区名称。
  3. 前往权限标签页。
  4. 点击授予访问权限
  5. 提供以下配置详细信息:
    • 添加主账号:输入服务账号电子邮件地址(例如 servicenow-audit-collector-sa@PROJECT_ID.iam.gserviceaccount.com)。
    • 分配角色:选择 Storage Object Admin
  6. 点击保存

创建发布/订阅主题

创建一个 Pub/Sub 主题,供 Cloud Scheduler 发布消息,并供 Cloud Run 函数订阅。

  1. GCP 控制台中,前往 Pub/Sub > 主题
  2. 点击创建主题
  3. 提供以下配置详细信息:
    • 主题 ID:输入 servicenow-audit-trigger
    • 将其他设置保留为默认值。
  4. 点击创建

创建 Cloud Run 函数以收集日志

Cloud Run 函数由来自 Cloud Scheduler 的 Pub/Sub 消息触发,用于从 ServiceNow API 中提取日志并将其写入 GCS。

  1. GCP 控制台中,前往 Cloud Run
  2. 点击创建服务
  3. 选择函数(使用内嵌编辑器创建函数)。
  4. 配置部分中,提供以下配置详细信息:

    设置
    Service 名称 servicenow-audit-collector
    区域 选择与您的 GCS 存储分区匹配的区域(例如 us-central1
    运行时 选择 Python 3.12 或更高版本
  5. 触发器(可选)部分中:

    1. 点击 + 添加触发器
    2. 选择 Cloud Pub/Sub
    3. 选择 Cloud Pub/Sub 主题中,选择 Pub/Sub 主题 (servicenow-audit-trigger)。
    4. 点击保存
  6. 身份验证部分中:

    1. 选择需要进行身份验证
    2. 检查 Identity and Access Management (IAM)
  7. 向下滚动并展开容器、网络、安全性

  8. 前往安全标签页:

    • 服务账号:选择服务账号 (servicenow-audit-collector-sa)。
  9. 前往容器标签页:

    1. 点击变量和密钥
    2. 为每个环境变量点击 + 添加变量
    变量名称 示例值 说明
    GCS_BUCKET servicenow-audit-logs GCS 存储分区名称
    GCS_PREFIX audit-logs 日志文件的前缀
    STATE_KEY audit-logs/state.json 状态文件路径
    API_BASE_URL https://instance.service-now.com ServiceNow 实例网址
    API_USERNAME your-username ServiceNow 用户名
    API_PASSWORD your-password ServiceNow 密码
    PAGE_SIZE 1000 每页记录数
    MAX_PAGES 1000 要提取的最大网页数
  10. 变量和 Secret 部分中,向下滚动到请求

    • 请求超时:输入 600 秒(10 分钟)。
  11. 前往设置标签页:

    • 资源部分中:
      • 内存:选择 512 MiB 或更高值。
      • CPU:选择 1
  12. 修订版本扩缩部分中:

    • 实例数下限:输入 0
    • 实例数上限:输入 100(或根据预期负载进行调整)。
  13. 点击创建

  14. 等待服务创建完成(1-2 分钟)。

  15. 创建服务后,系统会自动打开内嵌代码编辑器

添加函数代码

  1. 入口点字段中输入 main
  2. 在内嵌代码编辑器中,创建两个文件:

    • 第一个文件:main.py:

          import functions_framework
          from google.cloud import storage
          import json
          import os
          import urllib3
          from datetime import datetime, timezone, timedelta
          import time
          import base64
      
          # Initialize HTTP client with timeouts
          http = urllib3.PoolManager(
              timeout=urllib3.Timeout(connect=5.0, read=30.0),
              retries=False,
          )
      
          # Initialize Storage client
          storage_client = storage.Client()
      
          # Environment variables
          GCS_BUCKET = os.environ.get('GCS_BUCKET')
          GCS_PREFIX = os.environ.get('GCS_PREFIX', 'audit-logs')
          STATE_KEY = os.environ.get('STATE_KEY', 'audit-logs/state.json')
          API_BASE = os.environ.get('API_BASE_URL')
          USERNAME = os.environ.get('API_USERNAME')
          PASSWORD = os.environ.get('API_PASSWORD')
          PAGE_SIZE = int(os.environ.get('PAGE_SIZE', '1000'))
          MAX_PAGES = int(os.environ.get('MAX_PAGES', '1000'))
      
          def parse_datetime(value: str) -> datetime:
              """Parse ServiceNow datetime string to datetime object."""
              # ServiceNow format: YYYY-MM-DD HH:MM:SS
              try:
                  return datetime.strptime(value, '%Y-%m-%d %H:%M:%S').replace(tzinfo=timezone.utc)
              except ValueError:
                  # Try ISO format as fallback
                  if value.endswith("Z"):
                      value = value[:-1] + "+00:00"
                  return datetime.fromisoformat(value)
      
          @functions_framework.cloud_event
          def main(cloud_event):
              """
              Cloud Run function triggered by Pub/Sub to fetch ServiceNow audit logs and write to GCS.
      
              Args:
                  cloud_event: CloudEvent object containing Pub/Sub message
              """
      
              if not all([GCS_BUCKET, API_BASE, USERNAME, PASSWORD]):
                  print('Error: Missing required environment variables')
                  return
      
              try:
                  # Get GCS bucket
                  bucket = storage_client.bucket(GCS_BUCKET)
      
                  # Load state
                  state = load_state(bucket, STATE_KEY)
      
                  # Determine time window
                  now = datetime.now(timezone.utc)
                  last_time = None
      
                  if isinstance(state, dict) and state.get("last_event_time"):
                      try:
                          last_time = parse_datetime(state["last_event_time"])
                          # Overlap by 2 minutes to catch any delayed events
                          last_time = last_time - timedelta(minutes=2)
                      except Exception as e:
                          print(f"Warning: Could not parse last_event_time: {e}")
      
                  if last_time is None:
                      last_time = now - timedelta(hours=24)
      
                  print(f"Fetching logs from {last_time.strftime('%Y-%m-%d %H:%M:%S')} to {now.strftime('%Y-%m-%d %H:%M:%S')}")
      
                  # Fetch logs
                  records, newest_event_time = fetch_logs(
                      api_base=API_BASE,
                      username=USERNAME,
                      password=PASSWORD,
                      start_time=last_time,
                      end_time=now,
                      page_size=PAGE_SIZE,
                      max_pages=MAX_PAGES,
                  )
      
                  if not records:
                      print("No new log records found.")
                      save_state(bucket, STATE_KEY, now.strftime('%Y-%m-%d %H:%M:%S'))
                      return
      
                  # Write to GCS as NDJSON
                  timestamp = now.strftime('%Y%m%d_%H%M%S')
                  object_key = f"{GCS_PREFIX}/logs_{timestamp}.ndjson"
                  blob = bucket.blob(object_key)
      
                  ndjson = '\n'.join([json.dumps(record, ensure_ascii=False) for record in records]) + '\n'
                  blob.upload_from_string(ndjson, content_type='application/x-ndjson')
      
                  print(f"Wrote {len(records)} records to gs://{GCS_BUCKET}/{object_key}")
      
                  # Update state with newest event time
                  if newest_event_time:
                      save_state(bucket, STATE_KEY, newest_event_time)
                  else:
                      save_state(bucket, STATE_KEY, now.strftime('%Y-%m-%d %H:%M:%S'))
      
                  print(f"Successfully processed {len(records)} records")
      
              except Exception as e:
                  print(f'Error processing logs: {str(e)}')
                  raise
      
          def load_state(bucket, key):
              """Load state from GCS."""
              try:
                  blob = bucket.blob(key)
                  if blob.exists():
                      state_data = blob.download_as_text()
                      return json.loads(state_data)
              except Exception as e:
                  print(f"Warning: Could not load state: {e}")
      
              return {}
      
          def save_state(bucket, key, last_event_time: str):
              """Save the last event timestamp to GCS state file."""
              try:
                  state = {'last_event_time': last_event_time}
                  blob = bucket.blob(key)
                  blob.upload_from_string(
                      json.dumps(state, indent=2),
                      content_type='application/json'
                  )
                  print(f"Saved state: last_event_time={last_event_time}")
              except Exception as e:
                  print(f"Warning: Could not save state: {e}")
      
          def fetch_logs(api_base: str, username: str, password: str, start_time: datetime, end_time: datetime, page_size: int, max_pages: int):
              """
              Fetch logs from ServiceNow sys_audit table with pagination and rate limiting.
      
              Args:
                  api_base: ServiceNow instance URL
                  username: ServiceNow username
                  password: ServiceNow password
                  start_time: Start time for log query
                  end_time: End time for log query
                  page_size: Number of records per page
                  max_pages: Maximum total pages to fetch
      
              Returns:
                  Tuple of (records list, newest_event_time string)
              """
              # Clean up base URL
              base_url = api_base.rstrip('/')
      
              endpoint = f"{base_url}/api/now/table/sys_audit"
      
              # Encode credentials using UTF-8
              auth_string = f"{username}:{password}"
              auth_bytes = auth_string.encode('utf-8')
              auth_b64 = base64.b64encode(auth_bytes).decode('utf-8')
      
              headers = {
                  'Authorization': f'Basic {auth_b64}',
                  'Accept': 'application/json',
                  'Content-Type': 'application/json',
                  'User-Agent': 'GoogleSecOps-ServiceNowCollector/1.0'
              }
      
              records = []
              newest_time = None
              page_num = 0
              backoff = 1.0
              offset = 0
      
              # Format timestamps for ServiceNow (YYYY-MM-DD HH:MM:SS)
              start_time_str = start_time.strftime('%Y-%m-%d %H:%M:%S')
      
              while True:
                  page_num += 1
      
                  if len(records) >= page_size * max_pages:
                      print(f"Reached max_pages limit ({max_pages})")
                      break
      
                  # Build query parameters
                  # Use >= operator for sys_created_on field (on or after)
                  params = []
                  params.append(f"sysparm_query=sys_created_on>={start_time_str}")
                  params.append(f"sysparm_display_value=true")
                  params.append(f"sysparm_limit={page_size}")
                  params.append(f"sysparm_offset={offset}")
      
                  url = f"{endpoint}?{'&'.join(params)}"
      
                  try:
                      response = http.request('GET', url, headers=headers)
      
                      # Handle rate limiting with exponential backoff
                      if response.status == 429:
                          retry_after = int(response.headers.get('Retry-After', str(int(backoff))))
                          print(f"Rate limited (429). Retrying after {retry_after}s...")
                          time.sleep(retry_after)
                          backoff = min(backoff * 2, 30.0)
                          continue
      
                      backoff = 1.0
      
                      if response.status != 200:
                          print(f"HTTP Error: {response.status}")
                          response_text = response.data.decode('utf-8')
                          print(f"Response body: {response_text}")
                          return [], None
      
                      data = json.loads(response.data.decode('utf-8'))
                      page_results = data.get('result', [])
      
                      if not page_results:
                          print(f"No more results (empty page)")
                          break
      
                      print(f"Page {page_num}: Retrieved {len(page_results)} events")
                      records.extend(page_results)
      
                      # Track newest event time
                      for event in page_results:
                          try:
                              event_time = event.get('sys_created_on')
                              if event_time:
                                  if newest_time is None or parse_datetime(event_time) > parse_datetime(newest_time):
                                      newest_time = event_time
                          except Exception as e:
                              print(f"Warning: Could not parse event time: {e}")
      
                      # Check for more results
                      if len(page_results) < page_size:
                          print(f"Reached last page (size={len(page_results)} < limit={page_size})")
                          break
      
                      # Move to next page
                      offset += page_size
      
                      # Small delay to avoid rate limiting
                      time.sleep(0.1)
      
                  except Exception as e:
                      print(f"Error fetching logs: {e}")
                      return [], None
      
              print(f"Retrieved {len(records)} total records from {page_num} pages")
              return records, newest_time
          ```
      
    • 第二个文件:requirements.txt:

      ```
      functions-framework==3.*
      google-cloud-storage==2.*
      urllib3>=2.0.0
      ```
      
  3. 点击部署以保存并部署该函数。

  4. 等待部署完成(2-3 分钟)。

创建 Cloud Scheduler 作业

Cloud Scheduler 会定期向 Pub/Sub 主题发布消息,从而触发 Cloud Run 函数。

  1. GCP Console 中,前往 Cloud Scheduler
  2. 点击创建作业
  3. 提供以下配置详细信息:

    设置
    名称 servicenow-audit-collector-hourly
    区域 选择与 Cloud Run 函数相同的区域
    频率 0 * * * *(每小时一次,在整点时)
    时区 选择时区(建议选择世界协调时间 [UTC])
    目标类型 Pub/Sub
    主题 选择 Pub/Sub 主题 (servicenow-audit-trigger)
    消息正文 {}(空 JSON 对象)
  4. 点击创建

时间表频率选项

  • 根据日志量和延迟时间要求选择频次:

    频率 Cron 表达式 使用场景
    每隔 5 分钟 */5 * * * * 高容量、低延迟
    每隔 15 分钟 */15 * * * * 搜索量中等
    每小时 0 * * * * 标准(推荐)
    每 6 小时 0 */6 * * * 量小、批处理
    每天 0 0 * * * 历史数据收集

测试集成

  1. Cloud Scheduler 控制台中,找到您的作业。
  2. 点击强制运行以手动触发作业。
  3. 等待几秒钟。
  4. 前往 Cloud Run > 服务
  5. 点击函数名称 (servicenow-audit-collector)。
  6. 点击日志标签页。
  7. 验证函数是否已成功执行。查找以下项:

    Fetching logs from YYYY-MM-DD HH:MM:SS to YYYY-MM-DD HH:MM:SS
    Page 1: Retrieved X events
    Wrote X records to gs://bucket-name/audit-logs/logs_YYYYMMDD_HHMMSS.ndjson
    Successfully processed X records
    
  8. 前往 Cloud Storage > 存储分区

  9. 点击您的存储分区名称。

  10. 前往前缀文件夹 (audit-logs/)。

  11. 验证是否已创建具有当前时间戳的新 .ndjson 文件。

如果您在日志中看到错误,请执行以下操作:

  • HTTP 401:检查环境变量中的 API 凭据
  • HTTP 403:验证账号是否具有所需权限(管理员角色或 sys_audit 的自定义 ACL)
  • HTTP 429:速率限制 - 函数将自动重试并进行退避
  • 缺少环境变量:检查是否已设置所有必需的变量

检索 Google SecOps 服务账号

Google SecOps 使用唯一的服务账号从您的 GCS 存储分区中读取数据。您必须授予此服务账号对您的存储分区的访问权限。

获取服务账号电子邮件地址

  1. 依次前往 SIEM 设置 > Feed
  2. 点击添加新 Feed
  3. 点击配置单个 Feed
  4. Feed 名称字段中,输入 Feed 的名称(例如 ServiceNow Audit logs)。
  5. 选择 Google Cloud Storage V2 作为来源类型
  6. 选择 ServiceNow Audit 作为日志类型
  7. 点击获取服务账号。系统会显示一个唯一的服务账号电子邮件地址,例如:

    chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com
    
  8. 复制此电子邮件地址,以便在下一步中使用。

向 Google SecOps 服务账号授予 IAM 权限

Google SecOps 服务账号需要对您的 GCS 存储分区具有 Storage Object Viewer 角色。

  1. 前往 Cloud Storage > 存储分区
  2. 点击您的存储分区名称。
  3. 前往权限标签页。
  4. 点击授予访问权限
  5. 提供以下配置详细信息:
    • 添加主账号:粘贴 Google SecOps 服务账号电子邮件地址。
    • 分配角色:选择 Storage Object Viewer
  6. 点击保存

在 Google SecOps 中配置 Feed 以提取 ServiceNow 审核日志

  1. 依次前往 SIEM 设置 > Feed
  2. 点击添加新 Feed
  3. 点击配置单个 Feed
  4. Feed 名称字段中,输入 Feed 的名称(例如 ServiceNow Audit logs)。
  5. 选择 Google Cloud Storage V2 作为来源类型
  6. 选择 ServiceNow Audit 作为日志类型
  7. 点击下一步
  8. 为以下输入参数指定值:

    • 存储分区网址:输入带有前缀路径的 GCS 存储分区 URI:

      gs://servicenow-audit-logs/audit-logs/
      
        • servicenow-audit-logs:您的 GCS 存储分区名称。
        • audit-logs:存储日志的前缀/文件夹路径。
    • 来源删除选项:根据您的偏好选择删除选项:

      • 永不:永不删除转移后的任何文件(建议用于测试)。
      • 删除已转移的文件:在成功转移后删除文件。
      • 删除已转移的文件和空目录:在成功转移后删除文件和空目录。

    • 文件存在时间上限:包含在过去指定天数内修改的文件。默认值为 180 天。

    • 资产命名空间资产命名空间

    • 注入标签:要应用于此 Feed 中事件的标签。

  9. 点击下一步

  10. 最终确定界面中查看新的 Feed 配置,然后点击提交

选项 B:使用 syslog 的 Bindplane 代理

此方法使用 Bindplane 代理收集 ServiceNow 审核日志,并将其转发到 Google Security Operations。由于 ServiceNow 本身不支持将 syslog 用于审核日志,因此我们将使用脚本查询 ServiceNow REST API,并通过 syslog 将日志转发到 Bindplane 代理。

准备工作

请确保满足以下前提条件:

  • Google SecOps 实例
  • Windows Server 2016 或更高版本,或者具有 systemd 的 Linux 主机
  • Bindplane 代理与 ServiceNow 之间的网络连接
  • 如果在代理后面运行,请确保防火墙端口根据 Bindplane 代理要求处于开放状态
  • 对 ServiceNow 管理控制台或设备具有适当角色的特权访问权限(通常为 admin 或对 sys_audit 表具有读取权限的用户)

获取 Google SecOps 注入身份验证文件

  1. 登录 Google SecOps 控制台。
  2. 依次前往 SIEM 设置 > 收集代理
  3. 点击下载以下载内容提交身份验证文件
  4. 将文件安全地保存在将要安装 Bindplane 的系统上。

获取 Google SecOps 客户 ID

  1. 登录 Google SecOps 控制台。
  2. 依次前往 SIEM 设置 > 个人资料
  3. 复制并保存组织详细信息部分中的客户 ID

安装 Bindplane 代理

按照以下说明在 Windows 或 Linux 操作系统上安装 Bindplane 代理。

Windows 安装

  1. 以管理员身份打开命令提示符PowerShell
  2. 运行以下命令:

    msiexec /i "https://github.com/observIQ/bindplane-agent/releases/latest/download/observiq-otel-collector.msi" /quiet
    
  3. 等待安装完成。

  4. 运行以下命令来验证安装:

    sc query observiq-otel-collector
    

该服务应显示为 RUNNING

Linux 安装

  1. 打开具有 root 或 sudo 权限的终端。
  2. 运行以下命令:

    sudo sh -c "$(curl -fsSlL https://github.com/observiq/bindplane-agent/releases/latest/download/install_unix.sh)" install_unix.sh
    
  3. 等待安装完成。

  4. 运行以下命令来验证安装:

    sudo systemctl status observiq-otel-collector
    

该服务应显示为有效(正在运行)

其他安装资源

如需了解其他安装选项和问题排查信息,请参阅 Bindplane 代理安装指南

配置 Bindplane 代理以注入 syslog 并将其发送到 Google SecOps

找到配置文件

Linuxbash sudo nano /etc/bindplane-agent/config.yaml

Windowscmd notepad "C:\Program Files\observIQ OpenTelemetry Collector\config.yaml"

修改配置文件

config.yaml 的全部内容替换为以下配置:

```yaml
receivers:
  udplog:
    listen_address: "0.0.0.0:514"

exporters:
  chronicle/servicenow_audit:
    compression: gzip
    creds_file_path: '/path/to/ingestion-authentication-file.json'
    customer_id: '<YOUR_CUSTOMER_ID>'
    endpoint: <CUSTOMER_REGION_ENDPOINT>
    log_type: 'SERVICENOW_AUDIT'
    raw_log_field: body
    ingestion_labels:
      service: servicenow

service:
  pipelines:
    logs/servicenow_to_chronicle:
      receivers:
        - udplog
      exporters:
        - chronicle/servicenow_audit
```

配置参数

替换以下占位符:

  • listen_address:要监听的 IP 地址和端口。使用 0.0.0.0:514 在端口 514 上监听所有接口。
  • creds_file_path:提取身份验证文件的完整路径:
    • Linux/etc/bindplane-agent/ingestion-auth.json
    • WindowsC:\Program Files\observIQ OpenTelemetry Collector\ingestion-auth.json
  • <YOUR_CUSTOMER_ID>:上一步中的客户 ID。
  • <CUSTOMER_REGION_ENDPOINT>:区域端点网址:
    • 美国malachiteingestion-pa.googleapis.com
    • 欧洲europe-malachiteingestion-pa.googleapis.com
    • 亚洲asia-southeast1-malachiteingestion-pa.googleapis.com
    • 如需查看完整列表,请参阅区域级端点

保存配置文件

修改后,保存文件: * Linux:依次按 Ctrl+OEnterCtrl+X * Windows:依次点击文件 > 保存

重启 Bindplane 代理以应用更改

  • 如需在 Linux 中重启 Bindplane 代理,请运行以下命令:

    sudo systemctl restart observiq-otel-collector
    
    1. 验证服务是否正在运行:

      sudo systemctl status observiq-otel-collector
      
    2. 检查日志是否存在错误:

      sudo journalctl -u observiq-otel-collector -f
      
  • 如需在 Windows 中重启 Bindplane 代理,请选择以下选项之一:

    • 以管理员身份使用命令提示符或 PowerShell:

      net stop observiq-otel-collector && net start observiq-otel-collector
      
    • 使用“服务”控制台:

    1. Win+R,输入 services.msc,然后按 Enter 键。
    2. 找到 observIQ OpenTelemetry 收集器
    3. 右键点击并选择重新启动

    4. 验证服务是否正在运行:

      sc query observiq-otel-collector
      
    5. 检查日志是否存在错误:

      type "C:\Program Files\observIQ OpenTelemetry Collector\log\collector.log"
      

创建脚本以将 ServiceNow 审核日志转发到 syslog

由于 ServiceNow 本身不支持将 syslog 用于审核日志,因此我们将创建一个脚本来查询 ServiceNow REST API 并将日志转发到 syslog。您可以安排此脚本定期运行。

Python 脚本示例 (Linux)

  • 创建一个名为 servicenow_audit_to_syslog.py 的文件,其中包含以下内容:

    import urllib3
    import json
    import datetime
    import base64
    import socket
    import time
    import os
    
    # ServiceNow API details
    BASE_URL = 'https://instance.service-now.com'  # Replace with your ServiceNow instance URL
    USERNAME = 'admin'  # Replace with your ServiceNow username
    PASSWORD = 'password'  # Replace with your ServiceNow password
    
    # Syslog details
    SYSLOG_SERVER = '127.0.0.1'  # Replace with your Bindplane agent IP
    SYSLOG_PORT = 514  # Replace with your Bindplane agent port
    
    # State file to keep track of last run
    STATE_FILE = '/tmp/servicenow_audit_last_run.txt'
    
    # Pagination settings
    PAGE_SIZE = 1000
    MAX_PAGES = 1000
    
    def get_last_run_timestamp():
        try:
            with open(STATE_FILE, 'r') as f:
                return f.read().strip()
        except:
            return '1970-01-01 00:00:00'
    
    def update_state_file(timestamp):
        with open(STATE_FILE, 'w') as f:
            f.write(timestamp)
    
    def send_to_syslog(message):
        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        sock.sendto(message.encode(), (SYSLOG_SERVER, SYSLOG_PORT))
        sock.close()
    
    def get_audit_logs(last_run_timestamp):
        """
        Query ServiceNow sys_audit table with proper pagination.
        Uses sys_created_on field for timestamp filtering.
        """
        # Encode credentials using UTF-8
        auth_string = f"{USERNAME}:{PASSWORD}"
        auth_bytes = auth_string.encode('utf-8')
        auth_encoded = base64.b64encode(auth_bytes).decode('utf-8')
    
        # Setup HTTP client
        http = urllib3.PoolManager()
        headers = {
            'Authorization': f'Basic {auth_encoded}',
            'Accept': 'application/json'
        }
    
        results = []
        offset = 0
    
        # Format timestamp for ServiceNow (YYYY-MM-DD HH:MM:SS format)
        # Convert ISO format to ServiceNow format if needed
        if 'T' in last_run_timestamp:
            last_run_timestamp = last_run_timestamp.replace('T', ' ').split('.')[0]
    
        for page in range(MAX_PAGES):
            # Build query with pagination
            # Use >= operator for sys_created_on field (on or after)
            query_params = (
                f"sysparm_query=sys_created_on>={last_run_timestamp}"
                f"&sysparm_display_value=true"
                f"&sysparm_limit={PAGE_SIZE}"
                f"&sysparm_offset={offset}"
            )
    
            url = f"{BASE_URL}/api/now/table/sys_audit?{query_params}"
    
            try:
                response = http.request('GET', url, headers=headers)
    
                if response.status == 200:
                    data = json.loads(response.data.decode('utf-8'))
                    chunk = data.get('result', [])
                    results.extend(chunk)
    
                    # Stop if we got fewer records than PAGE_SIZE (last page)
                    if len(chunk) < PAGE_SIZE:
                        break
    
                    # Move to next page
                    offset += PAGE_SIZE
                else:
                    print(f"Error querying ServiceNow API: {response.status} - {response.data.decode('utf-8')}")
                    break
    
            except Exception as e:
                print(f"Exception querying ServiceNow API: {str(e)}")
                break
    
        return results
    
    def main():
        # Get last run timestamp
        last_run_timestamp = get_last_run_timestamp()
    
        # Current timestamp for this run
        current_timestamp = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    
        # Query ServiceNow API for audit logs
        audit_logs = get_audit_logs(last_run_timestamp)
    
        if audit_logs:
            # Send each log to syslog
            for log in audit_logs:
                # Format the log as JSON
                log_json = json.dumps(log)
    
                # Send to syslog
                send_to_syslog(log_json)
    
                # Sleep briefly to avoid flooding
                time.sleep(0.01)
    
            # Update state file
            update_state_file(current_timestamp)
    
            print(f"Successfully forwarded {len(audit_logs)} audit logs to syslog")
        else:
            print("No new audit logs to forward")
    
    if __name__ == "__main__":
        main()
    

设置预定执行(Linux)

  • 让该脚本可执行:

    chmod +x servicenow_audit_to_syslog.py
    
  • 创建每小时运行一次脚本的 Cron 作业:

    crontab -e
    
  • 添加以下代码行:

    0 * * * * /usr/bin/python3 /path/to/servicenow_audit_to_syslog.py >> /tmp/servicenow_audit_to_syslog.log 2>&1
    

PowerShell 脚本示例 (Windows)

  • 创建一个名为 ServiceNow-Audit-To-Syslog.ps1 的文件,其中包含以下内容:

    # ServiceNow API details
    $BaseUrl = 'https://instance.service-now.com'  # Replace with your ServiceNow instance URL
    $Username = 'admin'  # Replace with your ServiceNow username
    $Password = 'password'  # Replace with your ServiceNow password
    
    # Syslog details
    $SyslogServer = '127.0.0.1'  # Replace with your Bindplane agent IP
    $SyslogPort = 514  # Replace with your Bindplane agent port
    
    # State file to keep track of last run
    $StateFile = "$env:TEMP\ServiceNowAuditLastRun.txt"
    
    # Pagination settings
    $PageSize = 1000
    $MaxPages = 1000
    
    function Get-LastRunTimestamp {
        try {
            if (Test-Path $StateFile) {
                return Get-Content $StateFile
            } else {
                return '1970-01-01 00:00:00'
            }
        } catch {
            return '1970-01-01 00:00:00'
        }
    }
    
    function Update-StateFile {
        param([string]$Timestamp)
        Set-Content -Path $StateFile -Value $Timestamp
    }
    
    function Send-ToSyslog {
        param([string]$Message)
        $UdpClient = New-Object System.Net.Sockets.UdpClient
        $UdpClient.Connect($SyslogServer, $SyslogPort)
        $Encoding = [System.Text.Encoding]::ASCII
        $Bytes = $Encoding.GetBytes($Message)
        $UdpClient.Send($Bytes, $Bytes.Length)
        $UdpClient.Close()
    }
    
    function Get-AuditLogs {
        param([string]$LastRunTimestamp)
    
        # Create auth header using UTF-8 encoding
        $Auth = [System.Convert]::ToBase64String([System.Text.Encoding]::UTF8.GetBytes("${Username}:${Password}"))
        $Headers = @{
            Authorization = "Basic ${Auth}"
            Accept = 'application/json'
        }
    
        $Results = @()
        $Offset = 0
    
        # Format timestamp for ServiceNow (YYYY-MM-DD HH:MM:SS format)
        # Convert ISO format to ServiceNow format if needed
        if ($LastRunTimestamp -match 'T') {
            $LastRunTimestamp = $LastRunTimestamp -replace 'T', ' '
            $LastRunTimestamp = $LastRunTimestamp -replace '\.\d+', ''
        }
    
        for ($page = 0; $page -lt $MaxPages; $page++) {
            # Build query with pagination
            # Use >= operator for sys_created_on field (on or after)
            $QueryParams = "sysparm_query=sys_created_on>=${LastRunTimestamp}&sysparm_display_value=true&sysparm_limit=${PageSize}&sysparm_offset=${Offset}"
            $Url = "${BaseUrl}/api/now/table/sys_audit?${QueryParams}"
    
            try {
                $Response = Invoke-RestMethod -Uri $Url -Headers $Headers -Method Get
                $Chunk = $Response.result
                $Results += $Chunk
    
                # Stop if we got fewer records than PageSize (last page)
                if ($Chunk.Count -lt $PageSize) {
                    break
                }
    
                # Move to next page
                $Offset += $PageSize
            } catch {
                Write-Error "Error querying ServiceNow API: $_"
                break
            }
        }
    
        return $Results
    }
    
    # Main execution
    $LastRunTimestamp = Get-LastRunTimestamp
    $CurrentTimestamp = (Get-Date).ToString('yyyy-MM-dd HH:mm:ss')
    
    $AuditLogs = Get-AuditLogs -LastRunTimestamp $LastRunTimestamp
    
    if ($AuditLogs -and $AuditLogs.Count -gt 0) {
        # Send each log to syslog
        foreach ($Log in $AuditLogs) {
            # Format the log as JSON
            $LogJson = $Log | ConvertTo-Json -Compress
    
            # Send to syslog
            Send-ToSyslog -Message $LogJson
    
            # Sleep briefly to avoid flooding
            Start-Sleep -Milliseconds 10
        }
    
        # Update state file
        Update-StateFile -Timestamp $CurrentTimestamp
    
        Write-Output "Successfully forwarded $($AuditLogs.Count) audit logs to syslog"
    } else {
        Write-Output "No new audit logs to forward"
    }
    

设置预定执行时间(Windows)

  1. 打开任务计划程序
  2. 点击创建任务
  3. 提供以下配置:
    • 名称ServiceNowAuditToSyslog
    • 安全选项:无论用户是否已登录,都运行
  4. 前往触发器标签页。
  5. 点击新建,然后将其设置为每小时运行一次。
  6. 前往操作标签页。
  7. 点击新建,然后设置:
    • 操作:启动程序
    • 程序/脚本powershell.exe
    • 参数-ExecutionPolicy Bypass -File "C:\path\to\ServiceNow-Audit-To-Syslog.ps1"
  8. 点击确定以保存任务。

需要更多帮助?获得社区成员和 Google SecOps 专业人士的解答。