收集 Netwrix Auditor 日志
本文档介绍了如何使用 Google Cloud Storage V2 将 Netwrix Auditor 日志注入到 Google Security Operations。
Netwrix Auditor 是一款用户行为分析和风险缓释可视化平台,可让您掌控混合 IT 环境中的更改、配置和访问权限。该平台提供安全分析功能,可在发生数据泄露之前检测用户行为异常并调查威胁模式。借助 RESTful 集成 API,该平台可以统一的方式提供对所有本地或基于云的 IT 系统的可见性和控制。
准备工作
请确保您满足以下前提条件:
- Google SecOps 实例
- 启用了 Cloud Storage、Cloud Run、Pub/Sub 和 Cloud Scheduler API 的 GCP 项目
- 创建和管理 GCS 存储分区的权限
- 管理 GCS 存储分区的 IAM 政策的权限
- 创建 Cloud Run 服务、Pub/Sub 主题和 Cloud Scheduler 作业的权限
- 对 Netwrix Auditor 服务器的管理员访问权限
- 具有适当 API 访问权限的 Windows 网域账号
- 已启用 Integration API 的 Netwrix Auditor 服务器(默认处于启用状态)
- 在 Netwrix Auditor 中配置了审核数据库
- 从 Cloud Run 函数到 Netwrix Auditor Server(端口 9699,默认)的网络连接
配置 Netwrix Auditor API 访问权限
如需使 Cloud Run 函数能够检索活动记录,您需要验证 Integration API 是否已启用,并在 Netwrix Auditor 中创建具有相应角色的 Windows 网域账号。
验证 Integration API 是否已启用
- 在安装 Netwrix Auditor Server 的计算机上,启动 Netwrix Auditor。
- 前往设置 > 集成。
- 验证利用 Integration API 选项是否已启用。
- 记下端口号(默认值为
9699)。 如果您需要更改端口,请执行以下操作:
- 点击 API 设置部分下的修改。
- 指定新的端口号。
- 点击确定。
创建服务账号以访问 API
- 在 Windows 网域控制器上,打开 Active Directory 用户和计算机。
- 前往要创建服务账号的组织部门。
- 依次右键点击组织部门 > 新建 > 用户。
- 在名字字段中,输入
Chronicle Integration。 - 在用户登录名字段中,输入
chronicle-api(或您偏好的用户名)。 - 点击下一步。
- 输入安全系数高的密码,并根据贵组织的政策配置密码设置。
- 清除用户下次登录时必须更改密码复选框。
- 选择密码永不过期(建议为服务账号选择此选项)。
- 依次点击下一步 > 完成。
分配“全局审核者”角色
- 在 Netwrix Auditor 主窗口中,前往监控计划。
- 在监控方案树中,选择所有监控方案(根文件夹)。
- 点击委托。
- 在委托对话框中,点击添加用户。
- 在选择用户或群组对话框中:
- 点击浏览。
- 在输入要选择的对象名称字段中,输入用户名
chronicle-api。 - 点击检查名称以验证账号。
- 点击确定。
- 在角色下拉菜单中,选择全球审核者。
- 点击确定。
点击保存。
记录 API 凭据
记录以下信息,以便配置 Cloud Run 函数环境变量:
- 用户名:网域账号,格式为
DOMAIN\username(例如ENTERPRISE\chronicle-api) - 密码:服务账号的密码
- 主机名:Netwrix Auditor 服务器的完全限定域名 (FQDN) 或 IP 地址(例如
auditor.enterprise.local或172.28.6.15) 端口:集成 API 端口(默认值为
9699)
验证权限
如需验证账号是否具有所需权限,请执行以下操作:
- 在 Netwrix Auditor 中,前往监控计划。
- 选择所有监控方案。
- 点击委托。
- 验证
chronicle-api账号是否显示为具有全局审核者角色。 - 如果未显示该账号,请按照上文中的分配“全球审核者”角色步骤操作。
测试 API 访问权限
在继续进行集成之前,请先测试您的凭据:
# Replace with your actual values NETWRIX_HOST="auditor.enterprise.local" NETWRIX_PORT="9699" NETWRIX_USER="ENTERPRISE\\chronicle-api" NETWRIX_PASS="your-password" # Test API access (retrieve first batch of activity records) curl -k --ntlm -u "${NETWRIX_USER}:${NETWRIX_PASS}" \ "https://${NETWRIX_HOST}:${NETWRIX_PORT}/netwrix/api/v1/activity_records/enum" \ -H "Content-Type: application/json" \ -H "Accept: application/json"
如果响应成功,则会返回一个 JSON 对象,其中包含一个活动记录数组和一个用于分页的 ContinuationMark。
创建 Google Cloud Storage 存储分区
- 转到 Google Cloud Console。
- 选择您的项目或创建新项目。
- 在导航菜单中,依次前往 Cloud Storage > 存储分区。
- 点击创建存储分区。
提供以下配置详细信息:
设置 值 为存储桶命名 输入一个全局唯一的名称(例如 netwrix-auditor-logs)位置类型 根据您的需求进行选择(区域级、双区域级、多区域级) 位置 选择营业地点(例如 us-central1)存储类别 标准(建议用于经常访问的日志) 访问权限控制 均匀(推荐) 保护工具 可选:启用对象版本控制或保留政策 点击创建。
为 Cloud Run 函数创建服务账号
- 在 GCP 控制台中,依次前往 IAM 和管理 > 服务账号。
- 点击创建服务账号。
- 提供以下配置详细信息:
- 服务账号名称:输入
netwrix-audit-collector-sa - 服务账号说明:输入
Service account for Cloud Run function to collect Netwrix Auditor logs
- 服务账号名称:输入
- 点击创建并继续。
- 在向此服务账号授予对项目的访问权限部分中,添加以下角色:
- 点击选择角色。
- 搜索并选择 Storage Object Admin。
- 点击 + 添加其他角色。
- 搜索并选择 Cloud Run Invoker。
- 点击 + 添加其他角色。
- 搜索并选择 Cloud Functions Invoker。
- 点击继续。
- 点击完成。
授予对 GCS 存储桶的 IAM 权限
- 前往 Cloud Storage > 存储分区。
- 点击您的存储桶名称 (
netwrix-auditor-logs)。 - 前往权限标签页。
- 点击授予访问权限。
- 提供以下配置详细信息:
- 添加主账号:输入服务账号电子邮件地址 (
netwrix-audit-collector-sa@PROJECT_ID.iam.gserviceaccount.com) - 分配角色:选择 Storage Object Admin
- 添加主账号:输入服务账号电子邮件地址 (
- 点击保存。
创建发布/订阅主题
- 在 GCP 控制台中,前往 Pub/Sub > 主题。
- 点击创建主题。
- 提供以下配置详细信息:
- 主题 ID:输入
netwrix-audit-trigger - 将其他设置保留为默认值
- 主题 ID:输入
- 点击创建。
创建 Cloud Run 函数以收集日志
Cloud Run 函数将由来自 Cloud Scheduler 的 Pub/Sub 消息触发,以从 Netwrix Auditor Integration API 中提取活动记录并将其写入 GCS。
- 在 GCP 控制台中,前往 Cloud Run。
- 点击创建服务。
- 选择函数(使用内嵌编辑器创建函数)。
在配置部分中,提供以下配置详细信息:
设置 值 Service 名称 netwrix-audit-collector区域 选择与您的 GCS 存储桶匹配的区域(例如 us-central1)运行时 选择 Python 3.12 或更高版本 在触发器(可选)部分中:
- 点击 + 添加触发器。
- 选择 Cloud Pub/Sub。
- 在选择 Cloud Pub/Sub 主题中,选择
netwrix-audit-trigger。 - 点击保存。
在身份验证部分中:
- 选择需要进行身份验证。
- 检查 Identity and Access Management (IAM)。
向下滚动并展开容器、网络、安全性。
前往安全性标签页:
- 服务账号:选择
netwrix-audit-collector-sa
- 服务账号:选择
前往容器标签页:
- 点击变量和密钥。
- 为每个环境变量点击+ 添加变量:
变量名称 示例值 说明 GCS_BUCKETnetwrix-auditor-logsGCS 存储桶名称 GCS_PREFIXnetwrix-audit日志文件的前缀 STATE_KEYnetwrix-audit/state.json状态文件路径 NETWRIX_HOSTauditor.enterprise.localNetwrix Auditor 服务器 FQDN 或 IP NETWRIX_PORT9699Integration API 端口 NETWRIX_USERENTERPRISE\chronicle-api网域账号(格式为 DOMAIN\username) NETWRIX_PASSyour-password服务账号密码 MAX_RECORDS10000每次运行的记录数上限 LOOKBACK_HOURS24初始回溯期 在变量和 Secret 部分中,向下滚动到请求:
- 请求超时:输入
600秒(10 分钟)
- 请求超时:输入
前往设置标签页:
- 在资源部分中:
- 内存:选择 512 MiB 或更高值
- CPU:选择 1
- 在资源部分中:
在修订版本伸缩部分中:
- 实例数下限:输入
0 - 实例数上限:输入
100
- 实例数下限:输入
点击创建。
等待服务创建完成(1-2 分钟)。
创建服务后,系统会自动打开内嵌代码编辑器。
添加函数代码
- 在入口点字段中输入 main。
在内嵌代码编辑器中,创建两个文件:
- main.py:
import functions_framework from google.cloud import storage import json import os import requests from requests_ntlm import HttpNtlmAuth from datetime import datetime, timezone, timedelta import time import urllib3 # Suppress insecure HTTPS warnings for self-signed certificates urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) # Initialize Storage client storage_client = storage.Client() # Environment variables GCS_BUCKET = os.environ.get('GCS_BUCKET') GCS_PREFIX = os.environ.get('GCS_PREFIX', 'netwrix-audit') STATE_KEY = os.environ.get('STATE_KEY', 'netwrix-audit/state.json') NETWRIX_HOST = os.environ.get('NETWRIX_HOST') NETWRIX_PORT = os.environ.get('NETWRIX_PORT', '9699') NETWRIX_USER = os.environ.get('NETWRIX_USER') NETWRIX_PASS = os.environ.get('NETWRIX_PASS') MAX_RECORDS = int(os.environ.get('MAX_RECORDS', '10000')) LOOKBACK_HOURS = int(os.environ.get('LOOKBACK_HOURS', '24')) def parse_datetime(value): """Parse ISO datetime string to datetime object.""" if value.endswith("Z"): value = value[:-1] + "+00:00" return datetime.fromisoformat(value) @functions_framework.cloud_event def main(cloud_event): """ Cloud Run function triggered by Pub/Sub to fetch Netwrix Auditor activity records and write to GCS. Args: cloud_event: CloudEvent object containing Pub/Sub message """ if not all([GCS_BUCKET, NETWRIX_HOST, NETWRIX_USER, NETWRIX_PASS]): print('Error: Missing required environment variables') return try: bucket = storage_client.bucket(GCS_BUCKET) state = load_state(bucket) now = datetime.now(timezone.utc) if isinstance(state, dict) and state.get('last_event_time'): try: last_time = parse_datetime(state['last_event_time']) last_time = last_time - timedelta(minutes=2) except Exception as e: print(f"Warning: Could not parse last_event_time: {e}") last_time = now - timedelta(hours=LOOKBACK_HOURS) else: last_time = now - timedelta(hours=LOOKBACK_HOURS) print(f"Fetching activity records from {last_time.isoformat()} " f"to {now.isoformat()}") records, newest_event_time = fetch_activity_records( last_time, now ) if not records: print("No new activity records found.") save_state(bucket, now.isoformat()) return timestamp = now.strftime('%Y%m%d_%H%M%S') object_key = ( f"{GCS_PREFIX}/netwrix_audit_{timestamp}.ndjson" ) blob = bucket.blob(object_key) ndjson = '\n'.join( [json.dumps(r, ensure_ascii=False, default=str) for r in records] ) + '\n' blob.upload_from_string( ndjson, content_type='application/x-ndjson' ) print(f"Wrote {len(records)} records to " f"gs://{GCS_BUCKET}/{object_key}") if newest_event_time: save_state(bucket, newest_event_time) else: save_state(bucket, now.isoformat()) print(f"Successfully processed {len(records)} records") except Exception as e: print(f'Error processing activity records: {str(e)}') raise def load_state(bucket): """Load state from GCS.""" try: blob = bucket.blob(STATE_KEY) if blob.exists(): return json.loads(blob.download_as_text()) except Exception as e: print(f"Warning: Could not load state: {e}") return {} def save_state(bucket, last_event_time_iso): """Save the last event timestamp to GCS state file.""" try: state = { 'last_event_time': last_event_time_iso, 'last_run': datetime.now(timezone.utc).isoformat() } blob = bucket.blob(STATE_KEY) blob.upload_from_string( json.dumps(state, indent=2), content_type='application/json' ) print(f"Saved state: last_event_time={last_event_time_iso}") except Exception as e: print(f"Warning: Could not save state: {e}") def fetch_activity_records(start_time, end_time): """ Fetch activity records from Netwrix Auditor Integration API using the enum endpoint with continuation mark pagination. The API returns up to 1000 records per request. Subsequent requests include the ContinuationMark from the previous response to retrieve the next batch. Args: start_time: Start time for filtering records end_time: End time for filtering records Returns: Tuple of (records list, newest_event_time ISO string) """ base_url = ( f"https://{NETWRIX_HOST}:{NETWRIX_PORT}" f"/netwrix/api/v1/activity_records/enum" ) auth = HttpNtlmAuth(NETWRIX_USER, NETWRIX_PASS) session = requests.Session() session.auth = auth session.verify = False session.headers.update({ 'Content-Type': 'application/json', 'Accept': 'application/json', 'User-Agent': 'GoogleSecOps-NetwrixCollector/1.0' }) all_records = [] newest_time = None continuation_mark = None page_num = 0 backoff = 1.0 while True: page_num += 1 if len(all_records) >= MAX_RECORDS: print(f"Reached max_records limit ({MAX_RECORDS})") break try: if continuation_mark: response = session.post( base_url, json={"ContinuationMark": continuation_mark}, timeout=(10, 60) ) else: response = session.get( base_url, timeout=(10, 60) ) if response.status_code == 429: retry_after = int( response.headers.get( 'Retry-After', str(int(backoff)) ) ) print(f"Rate limited (429). Retrying after " f"{retry_after}s...") time.sleep(retry_after) backoff = min(backoff * 2, 30.0) continue backoff = 1.0 if response.status_code != 200: print(f"HTTP Error: {response.status_code}") print(f"Response body: {response.text}") return all_records, newest_time data = response.json() page_results = data.get('ActivityRecordList', []) continuation_mark = data.get('ContinuationMark') if not page_results: print("No more activity records (empty page)") break # Filter records by time window filtered = [] for record in page_results: when = record.get('When') if when: try: record_time = parse_datetime(when) if start_time <= record_time <= end_time: filtered.append(record) if (newest_time is None or record_time > parse_datetime(newest_time)): newest_time = when except Exception as e: print(f"Warning: Could not parse " f"record time: {e}") filtered.append(record) else: filtered.append(record) print(f"Page {page_num}: Retrieved " f"{len(page_results)} records, " f"{len(filtered)} within time window") all_records.extend(filtered) if not continuation_mark: print("No more pages (no ContinuationMark)") break except requests.exceptions.Timeout: print(f"Request timeout on page {page_num}") return all_records, newest_time except Exception as e: print(f"Error fetching activity records: {e}") return all_records, newest_time print(f"Retrieved {len(all_records)} total records " f"from {page_num} pages") return all_records, newest_time- requirements.txt:
functions-framework==3.* google-cloud-storage==2.* requests>=2.31.0 requests-ntlm>=1.2.0点击部署以保存并部署函数。
等待部署完成(2-3 分钟)。
创建 Cloud Scheduler 作业
- 在 GCP Console 中,前往 Cloud Scheduler。
- 点击创建作业。
提供以下配置详细信息:
设置 值 名称 netwrix-audit-collector-hourly区域 选择与 Cloud Run 函数相同的区域 频率 0 * * * *(每小时一次,在整点时)时区 选择时区(建议选择世界协调时间 [UTC]) 目标类型 Pub/Sub 主题 选择 netwrix-audit-trigger消息正文 {}(空 JSON 对象)点击创建。
时间表频率选项
根据日志量和延迟时间要求选择频次:
| 频率 | Cron 表达式 | 使用场景 |
|---|---|---|
| 每隔 5 分钟 | */5 * * * * |
大批量、低延迟 |
| 每隔 15 分钟 | */15 * * * * |
搜索量中等 |
| 每小时 | 0 * * * * |
标准(推荐) |
| 每 6 小时 | 0 */6 * * * |
低成交量、批处理 |
| 每天 | 0 0 * * * |
历史数据收集 |
测试集成
- 在 Cloud Scheduler 控制台中,找到您的作业 (
netwrix-audit-collector-hourly)。 - 点击强制运行以手动触发作业。
- 等待几秒钟。
- 前往 Cloud Run > 服务。
- 点击
netwrix-audit-collector。 - 点击日志标签页。
验证函数是否已成功执行。查找:
Fetching activity records from YYYY-MM-DDTHH:MM:SS+00:00 to YYYY-MM-DDTHH:MM:SS+00:00 Page 1: Retrieved X records, X within time window Wrote X records to gs://netwrix-auditor-logs/netwrix-audit/netwrix_audit_YYYYMMDD_HHMMSS.ndjson Successfully processed X records前往 Cloud Storage > 存储分区。
点击
netwrix-auditor-logs。转到
netwrix-audit/文件夹。验证是否已创建具有当前时间戳的新
.ndjson文件。
如果您在日志中看到错误,请执行以下操作:
- HTTP 401:验证
NETWRIX_USER和NETWRIX_PASS环境变量是否正确,并使用DOMAIN\username格式 - HTTP 403:验证服务账号在 Netwrix Auditor 中是否具有全局审核员角色
- HTTP 429:速率限制 - 函数将自动使用指数退避算法重试
- 连接超时:验证从 Cloud Run 到 Netwrix Auditor 服务器(位于端口 9691 上)的网络连接。如果服务器位于本地,请确保已配置 VPC 连接器或 Cloud VPN
- 缺少环境变量:验证是否已在 Cloud Run 函数配置中设置所有必需的变量
检索 Google SecOps 服务账号
- 依次前往 SIEM 设置 > Feed。
- 点击添加新 Feed。
- 点击配置单个 Feed。
- 在 Feed 名称字段中,输入 Feed 的名称(例如
Netwrix Auditor Activity Records)。 - 选择 Google Cloud Storage V2 作为来源类型。
- 选择 Netwrix 作为日志类型。
- 点击获取服务账号。
系统会显示一个唯一的服务账号电子邮件地址。例如:
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com复制此电子邮件地址,以便在下一步中使用。
点击下一步。
为以下输入参数指定值:
存储桶网址:输入带有前缀路径的 GCS 存储桶 URI:
gs://netwrix-auditor-logs/netwrix-audit/
- 来源删除选项:根据您的偏好选择删除选项:
- 永不:转移后永不删除任何文件(建议用于测试)。
- 删除已转移的文件:在成功转移后删除文件。
- 删除已转移的文件和空目录:成功转移后删除文件和空目录。
- 文件存在时间上限:包含在过去指定天数内修改的文件(默认值为 180 天)
- 资产命名空间:资产命名空间
- 注入标签:要应用于此 Feed 中事件的标签
点击下一步。
在最终确定界面中查看新的 Feed 配置,然后点击提交。
向 Google SecOps 服务账号授予 IAM 权限
- 前往 Cloud Storage > 存储分区。
- 点击
netwrix-auditor-logs。 - 前往权限标签页。
- 点击授予访问权限。
- 提供以下配置详细信息:
- 添加主账号:粘贴 Google SecOps 服务账号电子邮件地址
- 分配角色:选择 Storage Object Viewer
点击保存。
UDM 映射表
| 日志字段 | UDM 映射 | 逻辑 |
|---|---|---|
| 操作码 | about.labels | 与“关于”信息相关联的标签 |
| 配文 | about.resource.attribute.labels | “关于”部分中资源的属性标签 |
| 任务 | additional.fields | 包含有关事件的额外信息的其他字段 |
| 内容 | additional.fields | |
| 通知 | additional.fields | |
| 说明 | additional.fields | |
| 已添加 | additional.fields | |
| 已移除 | additional.fields | |
| service_type | additional.fields | |
| 详细信息 | additional.fields | |
| extensions.auth.type | extensions.auth.type | 所用身份验证的类型 |
| EventReceivedTime | metadata.collected_timestamp | 系统收集相应事件时的时间戳 |
| 消息 | metadata.description | 事件的说明 |
| event_type | metadata.event_type | 事件类型 |
| EventType | metadata.product_event_type | 特定于产品的事件类型 |
| EventID | metadata.product_log_id | 特定于产品的日志标识符 |
| SourceModuleType | observer.application | 观察到该事件的应用 |
| 主机名 | principal.asset.hostname | 与正文相关联的资产的主机名 |
| 位置 | principal.asset.hostname | |
| 工作站 | principal.asset.hostname | |
| device_name | principal.asset.hostname | |
| 工作站 | principal.hostname | 主账号的主机名 |
| device_name | principal.hostname | |
| ProcessID | principal.process.pid | 主账号的进程 ID |
| 名称 | principal.resource.name | 与正文关联的资源的名称 |
| 参与者 | principal.user.user_display_name | 用户的显示名称 |
| SourceName | security_result.about.resource.attribute.labels | 安全结果中“关于”的资源属性标签 |
| 操作 | security_result.action | 安全性结果中采取的操作 |
| action_details | security_result.action_details | 安全结果中操作的详细信息 |
| backup_name | security_result.description | 安全结果的说明 |
| service_failed | security_result.description | |
| 关键字 | security_result.detection_fields | 安全结果中用于检测的字段 |
| RecordNumber | security_result.detection_fields | |
| session_ID | security_result.detection_fields | |
| allow_connection_with_desktop | security_result.detection_fields | |
| service_account | security_result.detection_fields | |
| 严重程度 | security_result.severity | 安全结果的严重程度级别 |
| SeverityValue | security_result.severity | |
| 摘要 | security_result.summary | 安全性结果摘要 |
| application_name | target.application | 目标设备上的应用 |
| 主机名 | target.asset.hostname | 与目标相关联的资产的主机名 |
| 位置 | target.asset.hostname | |
| file_path | target.file.full_path | 目标文件的完整路径 |
| 大小 | target.file.size | 目标文件的大小 |
| 主机名 | target.hostname | 目标的 hostname |
| 位置 | target.hostname | |
| 类型 | target.resource.attribute.labels | 目标资源的属性标签 |
| SourceModuleName | target.resource.name | 目标资源的名称 |
| DataSource | metadata.product_name | 生成相应事件的产品的名称 |
| metadata.vendor_name | metadata.vendor_name | 供应商名称 |
需要更多帮助?获得社区成员和 Google SecOps 专业人士的解答。