收集 ServiceNow 审核日志

支持的平台:

本文档介绍了如何使用多种方法将 ServiceNow 审核日志注入到 Google Security Operations。

选项 A:AWS S3 与 Lambda

此方法使用 AWS Lambda 定期查询 ServiceNow REST API 以获取审核日志,并将这些日志存储在 S3 存储桶中。然后,Google Security Operations 会从 S3 存储桶中收集日志。

准备工作

  • Google SecOps 实例
  • ServiceNow 租户或 API 的特权访问权限
  • AWS(S3、IAM、Lambda、EventBridge)的特权访问权限

收集 ServiceNow 前提条件(ID、API 密钥、组织 ID、令牌)

  1. 登录 ServiceNow 管理控制台
  2. 依次前往系统安全性 > 用户和群组 > 用户
  3. 创建新用户或选择具有适当权限的现有用户来访问审核日志。
  4. 复制以下详细信息并将其保存在安全的位置:
    • 用户名
    • 密码
    • 实例网址(例如 https://instance.service-now.com)

为 Google SecOps 配置 AWS S3 存储桶和 IAM

  1. 按照以下用户指南创建 Amazon S3 存储桶创建存储桶
  2. 保存存储桶名称区域以供日后参考(例如 servicenow-audit-logs)。
  3. 按照以下用户指南创建用户创建 IAM 用户
  4. 选择创建的用户
  5. 选择安全凭据标签页。
  6. 访问密钥部分中,点击创建访问密钥
  7. 选择第三方服务作为使用情形
  8. 点击下一步
  9. 可选:添加说明标记。
  10. 点击创建访问密钥
  11. 点击下载 CSV 文件,保存访问密钥不公开的访问密钥以供日后使用。
  12. 点击完成
  13. 选择权限标签页。
  14. 权限政策部分中,点击添加权限
  15. 选择添加权限
  16. 选择直接附加政策
  17. 搜索并选择 AmazonS3FullAccess 政策。
  18. 点击下一步
  19. 点击添加权限

为 S3 上传配置 IAM 政策和角色

  1. 在 AWS 控制台中,依次前往 IAM > 政策 > 创建政策 > JSON 标签页
  2. 复制并粘贴下方的政策。

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::servicenow-audit-logs/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": "s3:GetObject",
          "Resource": "arn:aws:s3:::servicenow-audit-logs/audit-logs/state.json"
        }
      ]
    }
    
    • 如果您输入了其他存储桶名称,请替换 servicenow-audit-logs
  3. 依次点击下一步 > 创建政策

  4. 依次前往 IAM > 角色 > 创建角色 > AWS 服务 > Lambda

  5. 附加新创建的政策。

  6. 将角色命名为 servicenow-audit-lambda-role,然后点击创建角色

创建 Lambda 函数

  1. AWS 控制台中,依次前往 Lambda > 函数 > 创建函数
  2. 点击从头开始创作
  3. 提供以下配置详细信息:

    设置
    名称 servicenow-audit-collector
    运行时 Python 3.13
    架构 x86_64
    执行角色 servicenow-audit-lambda-role
  4. 创建函数后,打开 Code 标签页,删除桩代码并输入以下代码 (servicenow-audit-collector.py):

    import urllib3
    import json
    import os
    import datetime
    import boto3
    import base64
    
    def lambda_handler(event, context):
        # ServiceNow API details
        base_url = os.environ['API_BASE_URL']  # e.g., https://instance.service-now.com
        username = os.environ['API_USERNAME']
        password = os.environ['API_PASSWORD']
    
        # S3 details
        s3_bucket = os.environ['S3_BUCKET']
        s3_prefix = os.environ['S3_PREFIX']
    
        # State management
        state_key = os.environ.get('STATE_KEY', f"{s3_prefix}/state.json")
    
        # Pagination settings
        page_size = int(os.environ.get('PAGE_SIZE', '1000'))
        max_pages = int(os.environ.get('MAX_PAGES', '1000'))
    
        # Initialize S3 client
        s3 = boto3.client('s3')
    
        # Get last run timestamp from state file
        last_run_timestamp = get_last_run_timestamp(s3, s3_bucket, state_key)
    
        # Current timestamp for this run
        current_timestamp = datetime.datetime.now().isoformat()
    
        # Query ServiceNow API for audit logs with pagination
        audit_logs = get_audit_logs(base_url, username, password, last_run_timestamp, page_size, max_pages)
    
        if audit_logs:
            # Write logs to S3 in NDJSON format (newline-delimited JSON)
            timestamp = datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S')
            s3_key = f"{s3_prefix}/servicenow-audit-{timestamp}.ndjson"
    
            # Format as NDJSON: one JSON object per line
            body = "\n".join(json.dumps(log) for log in audit_logs) + "\n"
    
            s3.put_object(
                Bucket=s3_bucket,
                Key=s3_key,
                Body=body,
                ContentType='application/x-ndjson'
            )
    
            # Update state file
            update_state_file(s3, s3_bucket, state_key, current_timestamp)
    
            return {
                'statusCode': 200,
                'body': json.dumps(f'Successfully exported {len(audit_logs)} audit logs to S3')
            }
        else:
            return {
                'statusCode': 200,
                'body': json.dumps('No new audit logs to export')
            }
    
    def get_last_run_timestamp(s3, bucket, key):
        try:
            response = s3.get_object(Bucket=bucket, Key=key)
            state = json.loads(response['Body'].read().decode('utf-8'))
            return state.get('last_run_timestamp', '1970-01-01T00:00:00')
        except:
            return '1970-01-01T00:00:00'
    
    def update_state_file(s3, bucket, key, timestamp):
        state = {'last_run_timestamp': timestamp}
        s3.put_object(
            Bucket=bucket,
            Key=key,
            Body=json.dumps(state),
            ContentType='application/json'
        )
    
    def get_audit_logs(base_url, username, password, last_run_timestamp, page_size=1000, max_pages=1000):
        """
        Query ServiceNow sys_audit table with proper pagination.
        Uses sys_created_on field for timestamp filtering.
        """
        # Encode credentials
        auth_string = f"{username}:{password}"
        auth_bytes = auth_string.encode('ascii')
        auth_encoded = base64.b64encode(auth_bytes).decode('ascii')
    
        # Setup HTTP client
        http = urllib3.PoolManager()
    
        headers = {
            'Authorization': f'Basic {auth_encoded}',
            'Accept': 'application/json'
        }
    
        results = []
        offset = 0
    
        for page in range(max_pages):
            # Build query with pagination
            # Use sys_created_on (not created_on) for timestamp filtering
            query_params = (
                f"sysparm_query=sys_created_onAFTER{last_run_timestamp}"
                f"&sysparm_display_value=true"
                f"&sysparm_limit={page_size}"
                f"&sysparm_offset={offset}"
            )
    
            url = f"{base_url}/api/now/table/sys_audit?{query_params}"
    
            try:
                response = http.request('GET', url, headers=headers)
    
                if response.status == 200:
                    data = json.loads(response.data.decode('utf-8'))
                    chunk = data.get('result', [])
                    results.extend(chunk)
    
                    # Stop if we got fewer records than page_size (last page)
                    if len(chunk) < page_size:
                        break
    
                    # Move to next page
                    offset += page_size
                else:
                    print(f"Error querying ServiceNow API: {response.status} - {response.data.decode('utf-8')}")
                    break
            except Exception as e:
                print(f"Exception querying ServiceNow API: {str(e)}")
                break
    
        return results
    
  5. 依次前往配置 > 环境变量 > 修改 > 添加新的环境变量

  6. 输入以下环境变量,并将其替换为您的值。

    示例值
    S3_BUCKET servicenow-audit-logs
    S3_PREFIX audit-logs/
    STATE_KEY audit-logs/state.json
    API_BASE_URL https://instance.service-now.com
    API_USERNAME <your-username>
    API_PASSWORD <your-password>
    PAGE_SIZE 1000
    MAX_PAGES 1000
  7. 创建函数后,请停留在其页面上(或依次打开 Lambda > 函数 > servicenow-audit-collector)。

  8. 选择配置标签页。

  9. 常规配置面板中,点击修改

  10. 超时更改为 5 分钟(300 秒),然后点击保存

创建 EventBridge 计划

  1. 依次前往 Amazon EventBridge > 调度器 > 创建调度
  2. 提供以下配置详细信息:
    • 周期性安排费率 (1 hour)。
    • 目标:您的 Lambda 函数 servicenow-audit-collector
    • 名称servicenow-audit-collector-1h
  3. 点击创建时间表

在 Google SecOps 中配置 Feed 以注入 ServiceNow 审核日志

  1. 依次前往 SIEM 设置 > Feed
  2. 点击 + 添加新 Feed
  3. Feed 名称字段中,输入 Feed 的名称(例如 ServiceNow Audit logs)。
  4. 选择 Amazon S3 V2 作为来源类型
  5. 选择 ServiceNow 审核作为日志类型
  6. 点击下一步
  7. 为以下输入参数指定值:
    • S3 URIs3://servicenow-audit-logs/audit-logs/
    • 源删除选项:根据您的偏好选择删除选项。
    • 文件存在时间上限:包含在过去指定天数内修改的文件。默认值为 180 天。
    • 访问密钥 ID:有权访问 S3 存储桶的用户访问密钥。
    • 私有访问密钥:有权访问 S3 存储桶的用户私有密钥。
    • 资产命名空间资产命名空间
    • 注入标签:应用于此 Feed 中事件的标签。
  8. 点击下一步
  9. 最终确定界面中查看新的 Feed 配置,然后点击提交

选项 B:使用 syslog 的 Bindplane 代理

此方法使用 Bindplane 代理收集 ServiceNow 审核日志,并将其转发到 Google Security Operations。由于 ServiceNow 本身不支持通过 syslog 记录审核日志,因此我们将使用脚本查询 ServiceNow REST API,并通过 syslog 将日志转发到 Bindplane 代理。

准备工作

请确保满足以下前提条件:

  • Google SecOps 实例
  • 搭载 systemd 的 Windows 2016 或更高版本或 Linux 主机
  • 如果通过代理运行,请确保防火墙端口已根据 Bindplane 代理要求打开
  • 对 ServiceNow 管理控制台或设备的特权访问权限

获取 Google SecOps 注入身份验证文件

  1. 登录 Google SecOps 控制台。
  2. 依次前往 SIEM 设置 > 收集代理
  3. 下载注入身份验证文件。将文件安全地保存在将要安装 Bindplane 的系统上。

获取 Google SecOps 客户 ID

  1. 登录 Google SecOps 控制台。
  2. 依次前往 SIEM 设置 > 个人资料
  3. 复制并保存组织详细信息部分中的客户 ID

安装 Bindplane 代理

按照以下说明在 Windows 或 Linux 操作系统上安装 Bindplane 代理。

Linux 安装

  1. 打开具有 root 或 sudo 权限的终端。
  2. 运行以下命令:

    sudo sh -c "$(curl -fsSlL https://github.com/observiq/bindplane-agent/releases/latest/download/install_unix.sh)" install_unix.sh
    

其他安装资源

  • 如需了解其他安装选项,请参阅此安装指南

配置 Bindplane 代理以注入 Syslog 并将其发送到 Google SecOps

  1. 访问配置文件:

    1. 找到 config.yaml 文件。通常,它位于 Linux 上的 /etc/bindplane-agent/ 目录中或 Windows 上的安装目录中。
    2. 使用文本编辑器(例如 nanovi 或记事本)打开该文件。
  2. 按如下方式修改 config.yaml 文件:

    receivers:
      udplog:
        # Replace the port and IP address as required
        listen_address: "0.0.0.0:514"
    
    exporters:
      chronicle/chronicle_w_labels:
        compression: gzip
        # Adjust the path to the credentials file you downloaded in Step 1
        creds_file_path: '/path/to/ingestion-authentication-file.json'
        # Replace with your actual customer ID from Step 2
        customer_id: <YOUR_CUSTOMER_ID>
        # Replace with the appropriate regional endpoint
        endpoint: <CUSTOMER_REGION_ENDPOINT>
        # Add optional ingestion labels for better organization
        log_type: 'SERVICENOW_AUDIT'
        raw_log_field: body
        ingestion_labels:
    
    service:
      pipelines:
        logs/source0__chronicle_w_labels-0:
          receivers:
            - udplog
          exporters:
            - chronicle/chronicle_w_labels
    
  • 根据基础架构的需要替换端口和 IP 地址。
  • <YOUR_CUSTOMER_ID> 替换为实际的客户 ID。
  • <CUSTOMER_REGION_ENDPOINT> 替换为“区域端点”文档中的相应区域端点。
  • /path/to/ingestion-authentication-file.json 更新为获取 Google SecOps 注入身份验证文件部分中保存身份验证文件的路径。

重启 Bindplane 代理以应用更改

  • 如需在 Linux 中重启 Bindplane 代理,请运行以下命令:

    sudo systemctl restart bindplane-agent
    
  • 如需在 Windows 中重启 Bindplane 代理,您可以使用服务控制台,也可以输入以下命令:

    net stop BindPlaneAgent && net start BindPlaneAgent
    

创建脚本以将 ServiceNow 审核日志转发到 syslog

由于 ServiceNow 本身不支持将审核日志发送到 syslog,因此我们将创建一个脚本来查询 ServiceNow REST API 并将日志转发到 syslog。您可以安排此脚本定期运行。

Python 脚本示例 (Linux)

  • 创建一个名为 servicenow_audit_to_syslog.py 的文件,其中包含以下内容:

    import urllib3
    import json
    import datetime
    import base64
    import socket
    import time
    import os
    
    # ServiceNow API details
    BASE_URL = 'https://instance.service-now.com'  # Replace with your ServiceNow instance URL
    USERNAME = 'admin'  # Replace with your ServiceNow username
    PASSWORD = 'password'  # Replace with your ServiceNow password
    
    # Syslog details
    SYSLOG_SERVER = '127.0.0.1'  # Replace with your Bindplane agent IP
    SYSLOG_PORT = 514  # Replace with your Bindplane agent port
    
    # State file to keep track of last run
    STATE_FILE = '/tmp/servicenow_audit_last_run.txt'
    
    # Pagination settings
    PAGE_SIZE = 1000
    MAX_PAGES = 1000
    
    def get_last_run_timestamp():
        try:
            with open(STATE_FILE, 'r') as f:
                return f.read().strip()
        except:
            return '1970-01-01T00:00:00'
    
    def update_state_file(timestamp):
        with open(STATE_FILE, 'w') as f:
            f.write(timestamp)
    
    def send_to_syslog(message):
        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        sock.sendto(message.encode(), (SYSLOG_SERVER, SYSLOG_PORT))
        sock.close()
    
    def get_audit_logs(last_run_timestamp):
        """
        Query ServiceNow sys_audit table with proper pagination.
        Uses sys_created_on field for timestamp filtering.
        """
        # Encode credentials
        auth_string = f"{USERNAME}:{PASSWORD}"
        auth_bytes = auth_string.encode('ascii')
        auth_encoded = base64.b64encode(auth_bytes).decode('ascii')
    
        # Setup HTTP client
        http = urllib3.PoolManager()
    
        headers = {
            'Authorization': f'Basic {auth_encoded}',
            'Accept': 'application/json'
        }
    
        results = []
        offset = 0
    
        for page in range(MAX_PAGES):
            # Build query with pagination
            # Use sys_created_on (not created_on) for timestamp filtering
            query_params = (
                f"sysparm_query=sys_created_onAFTER{last_run_timestamp}"
                f"&sysparm_display_value=true"
                f"&sysparm_limit={PAGE_SIZE}"
                f"&sysparm_offset={offset}"
            )
    
            url = f"{BASE_URL}/api/now/table/sys_audit?{query_params}"
    
            try:
                response = http.request('GET', url, headers=headers)
    
                if response.status == 200:
                    data = json.loads(response.data.decode('utf-8'))
                    chunk = data.get('result', [])
                    results.extend(chunk)
    
                    # Stop if we got fewer records than PAGE_SIZE (last page)
                    if len(chunk) < PAGE_SIZE:
                        break
    
                    # Move to next page
                    offset += PAGE_SIZE
                else:
                    print(f"Error querying ServiceNow API: {response.status} - {response.data.decode('utf-8')}")
                    break
            except Exception as e:
                print(f"Exception querying ServiceNow API: {str(e)}")
                break
    
        return results
    
    def main():
        # Get last run timestamp
        last_run_timestamp = get_last_run_timestamp()
    
        # Current timestamp for this run
        current_timestamp = datetime.datetime.now().isoformat()
    
        # Query ServiceNow API for audit logs
        audit_logs = get_audit_logs(last_run_timestamp)
    
        if audit_logs:
            # Send each log to syslog
            for log in audit_logs:
                # Format the log as JSON
                log_json = json.dumps(log)
    
                # Send to syslog
                send_to_syslog(log_json)
    
                # Sleep briefly to avoid flooding
                time.sleep(0.01)
    
            # Update state file
            update_state_file(current_timestamp)
    
            print(f"Successfully forwarded {len(audit_logs)} audit logs to syslog")
        else:
            print("No new audit logs to forward")
    
    if __name__ == "__main__":
        main()
    

设置预定执行(Linux)

  1. 让该脚本可执行:

    chmod +x servicenow_audit_to_syslog.py
    
  2. 创建每小时运行一次脚本的 Cron 作业:

    crontab -e
    
  3. 添加以下行:

    0 * * * * /usr/bin/python3 /path/to/servicenow_audit_to_syslog.py >> /tmp/servicenow_audit_to_syslog.log 2>&1
    

PowerShell 脚本示例 (Windows)

  • 创建一个名为 ServiceNow-Audit-To-Syslog.ps1 的文件,其中包含以下内容:

    # ServiceNow API details
    $BaseUrl = 'https://instance.service-now.com'  # Replace with your ServiceNow instance URL
    $Username = 'admin'  # Replace with your ServiceNow username
    $Password = 'password'  # Replace with your ServiceNow password
    
    # Syslog details
    $SyslogServer = '127.0.0.1'  # Replace with your Bindplane agent IP
    $SyslogPort = 514  # Replace with your Bindplane agent port
    
    # State file to keep track of last run
    $StateFile = "$env:TEMP\ServiceNowAuditLastRun.txt"
    
    # Pagination settings
    $PageSize = 1000
    $MaxPages = 1000
    
    function Get-LastRunTimestamp {
        try {
            if (Test-Path $StateFile) {
                return Get-Content $StateFile
            }
            else {
                return '1970-01-01T00:00:00'
            }
        }
        catch {
            return '1970-01-01T00:00:00'
        }
    }
    
    function Update-StateFile {
        param (
            [string]$Timestamp
        )
    
        Set-Content -Path $StateFile -Value $Timestamp
    }
    
    function Send-ToSyslog {
        param (
            [string]$Message
        )
    
        $UdpClient = New-Object System.Net.Sockets.UdpClient
        $UdpClient.Connect($SyslogServer, $SyslogPort)
    
        $Encoding = [System.Text.Encoding]::ASCII
        $Bytes = $Encoding.GetBytes($Message)
    
        $UdpClient.Send($Bytes, $Bytes.Length)
        $UdpClient.Close()
    }
    
    function Get-AuditLogs {
        param (
            [string]$LastRunTimestamp
        )
    
        # Create auth header
        $Auth = [System.Convert]::ToBase64String([System.Text.Encoding]::ASCII.GetBytes("${Username}:${Password}"))
    
        $Headers = @{
            Authorization = "Basic ${Auth}"
            Accept = 'application/json'
        }
    
        $Results = @()
        $Offset = 0
    
        for ($page = 0; $page -lt $MaxPages; $page++) {
            # Build query with pagination
            # Use sys_created_on (not created_on) for timestamp filtering
            $QueryParams = "sysparm_query=sys_created_onAFTER${LastRunTimestamp}&sysparm_display_value=true&sysparm_limit=${PageSize}&sysparm_offset=${Offset}"
    
            $Url = "${BaseUrl}/api/now/table/sys_audit?${QueryParams}"
    
            try {
                $Response = Invoke-RestMethod -Uri $Url -Headers $Headers -Method Get
                $Chunk = $Response.result
    
                $Results += $Chunk
    
                # Stop if we got fewer records than PageSize (last page)
                if ($Chunk.Count -lt $PageSize) {
                    break
                }
    
                # Move to next page
                $Offset += $PageSize
            }
            catch {
                Write-Error "Error querying ServiceNow API: $_"
                break
            }
        }
    
        return $Results
    }
    
    # Main execution
    $LastRunTimestamp = Get-LastRunTimestamp
    $CurrentTimestamp = (Get-Date).ToString('yyyy-MM-ddTHH:mm:ss')
    
    $AuditLogs = Get-AuditLogs -LastRunTimestamp $LastRunTimestamp
    
    if ($AuditLogs -and $AuditLogs.Count -gt 0) {
        # Send each log to syslog
        foreach ($Log in $AuditLogs) {
            # Format the log as JSON
            $LogJson = $Log | ConvertTo-Json -Compress
    
            # Send to syslog
            Send-ToSyslog -Message $LogJson
    
            # Sleep briefly to avoid flooding
            Start-Sleep -Milliseconds 10
        }
    
        # Update state file
        Update-StateFile -Timestamp $CurrentTimestamp
    
        Write-Output "Successfully forwarded $($AuditLogs.Count) audit logs to syslog"
    }
    else {
        Write-Output "No new audit logs to forward"
    }
    

设置预定执行时间(Windows)

  1. 打开任务计划程序
  2. 点击创建任务
  3. 提供以下配置:
    • 名称:ServiceNowAuditToSyslog
    • 安全选项:无论用户是否已登录,都运行
  4. 前往触发器标签页。
  5. 点击新建,然后将其设置为每小时运行一次。
  6. 前往操作标签页。
  7. 点击新建,然后设置:
    • 操作:启动程序
    • 程序/脚本:powershell.exe
    • 实参:-ExecutionPolicy Bypass -File "C:\path\to\ServiceNow-Audit-To-Syslog.ps1"
  8. 点击确定以保存任务。

需要更多帮助?获得社区成员和 Google SecOps 专业人士的解答。