收集 Netwrix Auditor 記錄
本文說明如何使用 Google Cloud Storage V2,將 Netwrix Auditor 記錄檔擷取至 Google Security Operations。
Netwrix Auditor 是一個使用者行為分析和風險控管平台,可控管混合式 IT 環境中的變更、設定和存取權。這個平台提供安全分析功能,可偵測使用者行為異常,並在發生資料侵害事件前調查威脅模式。這個平台採用 RESTful Integration API,可讓您以統一的方式,掌握及控管所有地端部署或雲端式 IT 系統。
事前準備
請確認您已完成下列事前準備事項:
- Google SecOps 執行個體
- 已啟用 Cloud Storage、Cloud Run、Pub/Sub 和 Cloud Scheduler API 的 GCP 專案
- 建立及管理 GCS bucket 的權限
- 管理 Google Cloud Storage 值區 IAM 政策的權限
- 建立 Cloud Run 服務、Pub/Sub 主題和 Cloud Scheduler 工作的權限
- Netwrix Auditor Server 的管理員存取權
- 具備適當 API 存取權限的 Windows 網域帳戶
- 已啟用 Integration API 的 Netwrix Auditor 伺服器 (預設為啟用)
- 在 Netwrix Auditor 中設定稽核資料庫
- 從 Cloud Run 函式到 Netwrix Auditor Server 的網路連線,通訊埠為 9699 (預設)
設定 Netwrix Auditor API 存取權
如要讓 Cloud Run 函式擷取活動記錄,請確認已啟用 Integration API,並在 Netwrix Auditor 中建立具有適當角色的 Windows 網域帳戶。
確認已啟用 Integration API
- 在安裝 Netwrix Auditor Server 的電腦上啟動 Netwrix Auditor。
- 依序前往「設定」>「整合」。
- 確認已啟用「Leverage Integration API」(運用整合 API) 選項。
- 請記下「通訊埠」編號 (預設為
9699)。 如要變更連接埠,請按照下列步驟操作:
- 按一下「API 設定」部分下方的「修改」。
- 指定新的通訊埠號碼。
- 按一下 [確定]。
建立服務帳戶以存取 API
- 在 Windows 網域控制站上開啟「Active Directory 使用者和電腦」。
- 前往要建立服務帳戶的機構單位。
- 依序按一下機構單位 >「新增」>「使用者」。
- 在「名字」欄位中輸入
Chronicle Integration。 - 在「使用者登入名稱」欄位中,輸入
chronicle-api(或您偏好的使用者名稱)。 - 點選「下一步」。
- 輸入高強度密碼,並根據貴機構的政策設定密碼。
- 清除「使用者必須在下次登入時變更密碼」核取方塊。
- 選取「密碼永不到期」 (建議服務帳戶使用)。
- 依序按一下「下一步」>「完成」。
指派全球審查員角色
- 在 Netwrix Auditor 主視窗中,前往「Monitoring Plans」。
- 在監控計畫樹狀結構中,選取「所有監控計畫」 (根資料夾)。
- 按一下「委派」。
- 在「Delegation」(委派) 對話方塊中,按一下「Add User」(新增使用者)。
- 在「選取使用者或群組」對話方塊中:
- 按一下「瀏覽」。
- 在「Enter the object name to select」(輸入要選取的物件名稱) 欄位中,輸入使用者名稱
chronicle-api。 - 按一下「檢查名稱」來驗證帳戶。
- 按一下 [確定]。
- 在「角色」下拉式選單中,選取「全球審查員」。
- 按一下 [確定]。
按一下 [儲存]。
記錄 API 憑證
請記錄下列資訊,以設定 Cloud Run 函式環境變數:
- 使用者名稱:網域帳戶,格式為
DOMAIN\username(例如ENTERPRISE\chronicle-api) - 密碼:服務帳戶的密碼
- 主機名稱:Netwrix Auditor 伺服器的完整網域名稱 (FQDN) 或 IP 位址 (例如
auditor.enterprise.local或172.28.6.15) 連接埠:Integration API 連接埠 (預設為
9699)
驗證權限
如要確認帳戶是否具備必要權限,請按照下列步驟操作:
- 在 Netwrix Auditor 中,前往「Monitoring Plans」。
- 選取「所有監控方案」。
- 按一下「委派」。
- 確認
chronicle-api帳戶是否顯示「全域審查員」角色。 - 如果帳戶未顯示,請按照上方的「指派全球審查員角色」步驟操作。
測試 API 存取權
請先測試憑證,再繼續進行整合:
# Replace with your actual values NETWRIX_HOST="auditor.enterprise.local" NETWRIX_PORT="9699" NETWRIX_USER="ENTERPRISE\\chronicle-api" NETWRIX_PASS="your-password" # Test API access (retrieve first batch of activity records) curl -k --ntlm -u "${NETWRIX_USER}:${NETWRIX_PASS}" \ "https://${NETWRIX_HOST}:${NETWRIX_PORT}/netwrix/api/v1/activity_records/enum" \ -H "Content-Type: application/json" \ -H "Accept: application/json"
如果要求成功,回應會傳回 JSON 物件,其中包含活動記錄陣列和用於分頁的 ContinuationMark。
建立 Google Cloud Storage 值區
- 前往 Google Cloud Console。
- 選取專案或建立新專案。
- 在導覽選單中,依序前往「Cloud Storage」>「Bucket」。
- 按一下「建立值區」。
請提供下列設定詳細資料:
設定 值 為 bucket 命名 輸入全域不重複的名稱 (例如 netwrix-auditor-logs)位置類型 根據需求選擇 (區域、雙區域、多區域) 位置 選取位置 (例如 us-central1)儲存空間級別 標準 (建議用於經常存取的記錄) 存取控管 統一 (建議) 保護工具 選用:啟用物件版本管理或資料保留政策 點選「建立」。
為 Cloud Run 函式建立服務帳戶
- 在 GCP 主控台中,依序前往「IAM & Admin」(IAM 與管理) >「Service Accounts」(服務帳戶)。
- 按一下「Create Service Account」(建立服務帳戶)。
- 請提供下列設定詳細資料:
- 服務帳戶名稱:輸入
netwrix-audit-collector-sa - 服務帳戶說明:輸入
Service account for Cloud Run function to collect Netwrix Auditor logs
- 服務帳戶名稱:輸入
- 按一下「建立並繼續」。
- 在「將專案存取權授予這個服務帳戶」部分,新增下列角色:
- 按一下「選擇角色」。
- 搜尋並選取「Storage 物件管理員」。
- 點選「+ 新增其他角色」。
- 搜尋並選取「Cloud Run Invoker」。
- 點選「+ 新增其他角色」。
- 搜尋並選取「Cloud Functions Invoker」(Cloud Functions 叫用者)。
- 按一下「繼續」。
- 按一下 [完成]。
授予 GCS 值區的 IAM 權限
- 依序前往「Cloud Storage」>「Buckets」。
- 按一下 bucket 名稱 (
netwrix-auditor-logs)。 - 前往「權限」分頁標籤。
- 按一下「授予存取權」。
- 請提供下列設定詳細資料:
- 新增主體:輸入服務帳戶電子郵件地址 (
netwrix-audit-collector-sa@PROJECT_ID.iam.gserviceaccount.com) - 指派角色:選取「Storage 物件管理員」
- 新增主體:輸入服務帳戶電子郵件地址 (
- 按一下 [儲存]。
建立 Pub/Sub 主題
- 在 GCP Console 中,前往「Pub/Sub」>「Topics」(主題)。
- 按一下「建立主題」。
- 請提供下列設定詳細資料:
- 主題 ID:輸入
netwrix-audit-trigger - 其他設定保留預設值
- 主題 ID:輸入
- 點選「建立」。
建立 Cloud Run 函式來收集記錄
Cloud Run 函式會由 Cloud Scheduler 的 Pub/Sub 訊息觸發,從 Netwrix Auditor Integration API 擷取活動記錄,並寫入 GCS。
- 前往 GCP Console 的「Cloud Run」。
- 按一下「Create service」(建立服務)。
- 選取「函式」 (使用內嵌編輯器建立函式)。
在「設定」部分,提供下列設定詳細資料:
設定 值 服務名稱 netwrix-audit-collector區域 選取與 GCS bucket 相符的區域 (例如 us-central1)執行階段 選取 Python 3.12 以上版本 在「Trigger (optional)」(觸發條件 (選用)) 專區:
- 按一下「+ 新增觸發條件」。
- 選取「Cloud Pub/Sub」。
- 在「Select a Cloud Pub/Sub topic」(選取 Cloud Pub/Sub 主題) 中,選擇
netwrix-audit-trigger。 - 按一下 [儲存]。
在「Authentication」(驗證) 部分:
- 選取「需要驗證」。
- 檢查 Identity and Access Management (IAM)。
向下捲動並展開「Containers, Networking, Security」。
前往「安全性」分頁:
- 服務帳戶:選取
netwrix-audit-collector-sa
- 服務帳戶:選取
前往「容器」分頁:
- 按一下「變數與密鑰」。
- 針對每個環境變數,按一下「+ 新增變數」:
變數名稱 範例值 說明 GCS_BUCKETnetwrix-auditor-logsGCS bucket 名稱 GCS_PREFIXnetwrix-audit記錄檔的前置字串 STATE_KEYnetwrix-audit/state.json狀態檔案路徑 NETWRIX_HOSTauditor.enterprise.localNetwrix Auditor 伺服器 FQDN 或 IP NETWRIX_PORT9699整合 API 連接埠 NETWRIX_USERENTERPRISE\chronicle-api網域帳戶,格式為「網域\使用者名稱」 NETWRIX_PASSyour-password服務帳戶密碼 MAX_RECORDS10000每次執行的記錄數上限 LOOKBACK_HOURS24初始回溯期 在「變數與密鑰」部分,向下捲動至「要求」:
- 要求逾時:輸入
600秒 (10 分鐘)
- 要求逾時:輸入
前往「設定」分頁:
- 在「資源」部分:
- 記憶體:選取 512 MiB 以上
- CPU:選取 1
- 在「資源」部分:
在「修訂版本資源調度」部分:
- 執行個體數量下限:輸入
0 - 執行個體數量上限:輸入
100
- 執行個體數量下限:輸入
點選「建立」。
等待服務建立完成 (1 到 2 分鐘)。
服務建立完成後,系統會自動開啟內嵌程式碼編輯器。
新增函式程式碼
- 在「進入點」欄位中輸入 main。
在內嵌程式碼編輯器中建立兩個檔案:
- main.py:
import functions_framework from google.cloud import storage import json import os import requests from requests_ntlm import HttpNtlmAuth from datetime import datetime, timezone, timedelta import time import urllib3 # Suppress insecure HTTPS warnings for self-signed certificates urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) # Initialize Storage client storage_client = storage.Client() # Environment variables GCS_BUCKET = os.environ.get('GCS_BUCKET') GCS_PREFIX = os.environ.get('GCS_PREFIX', 'netwrix-audit') STATE_KEY = os.environ.get('STATE_KEY', 'netwrix-audit/state.json') NETWRIX_HOST = os.environ.get('NETWRIX_HOST') NETWRIX_PORT = os.environ.get('NETWRIX_PORT', '9699') NETWRIX_USER = os.environ.get('NETWRIX_USER') NETWRIX_PASS = os.environ.get('NETWRIX_PASS') MAX_RECORDS = int(os.environ.get('MAX_RECORDS', '10000')) LOOKBACK_HOURS = int(os.environ.get('LOOKBACK_HOURS', '24')) def parse_datetime(value): """Parse ISO datetime string to datetime object.""" if value.endswith("Z"): value = value[:-1] + "+00:00" return datetime.fromisoformat(value) @functions_framework.cloud_event def main(cloud_event): """ Cloud Run function triggered by Pub/Sub to fetch Netwrix Auditor activity records and write to GCS. Args: cloud_event: CloudEvent object containing Pub/Sub message """ if not all([GCS_BUCKET, NETWRIX_HOST, NETWRIX_USER, NETWRIX_PASS]): print('Error: Missing required environment variables') return try: bucket = storage_client.bucket(GCS_BUCKET) state = load_state(bucket) now = datetime.now(timezone.utc) if isinstance(state, dict) and state.get('last_event_time'): try: last_time = parse_datetime(state['last_event_time']) last_time = last_time - timedelta(minutes=2) except Exception as e: print(f"Warning: Could not parse last_event_time: {e}") last_time = now - timedelta(hours=LOOKBACK_HOURS) else: last_time = now - timedelta(hours=LOOKBACK_HOURS) print(f"Fetching activity records from {last_time.isoformat()} " f"to {now.isoformat()}") records, newest_event_time = fetch_activity_records( last_time, now ) if not records: print("No new activity records found.") save_state(bucket, now.isoformat()) return timestamp = now.strftime('%Y%m%d_%H%M%S') object_key = ( f"{GCS_PREFIX}/netwrix_audit_{timestamp}.ndjson" ) blob = bucket.blob(object_key) ndjson = '\n'.join( [json.dumps(r, ensure_ascii=False, default=str) for r in records] ) + '\n' blob.upload_from_string( ndjson, content_type='application/x-ndjson' ) print(f"Wrote {len(records)} records to " f"gs://{GCS_BUCKET}/{object_key}") if newest_event_time: save_state(bucket, newest_event_time) else: save_state(bucket, now.isoformat()) print(f"Successfully processed {len(records)} records") except Exception as e: print(f'Error processing activity records: {str(e)}') raise def load_state(bucket): """Load state from GCS.""" try: blob = bucket.blob(STATE_KEY) if blob.exists(): return json.loads(blob.download_as_text()) except Exception as e: print(f"Warning: Could not load state: {e}") return {} def save_state(bucket, last_event_time_iso): """Save the last event timestamp to GCS state file.""" try: state = { 'last_event_time': last_event_time_iso, 'last_run': datetime.now(timezone.utc).isoformat() } blob = bucket.blob(STATE_KEY) blob.upload_from_string( json.dumps(state, indent=2), content_type='application/json' ) print(f"Saved state: last_event_time={last_event_time_iso}") except Exception as e: print(f"Warning: Could not save state: {e}") def fetch_activity_records(start_time, end_time): """ Fetch activity records from Netwrix Auditor Integration API using the enum endpoint with continuation mark pagination. The API returns up to 1000 records per request. Subsequent requests include the ContinuationMark from the previous response to retrieve the next batch. Args: start_time: Start time for filtering records end_time: End time for filtering records Returns: Tuple of (records list, newest_event_time ISO string) """ base_url = ( f"https://{NETWRIX_HOST}:{NETWRIX_PORT}" f"/netwrix/api/v1/activity_records/enum" ) auth = HttpNtlmAuth(NETWRIX_USER, NETWRIX_PASS) session = requests.Session() session.auth = auth session.verify = False session.headers.update({ 'Content-Type': 'application/json', 'Accept': 'application/json', 'User-Agent': 'GoogleSecOps-NetwrixCollector/1.0' }) all_records = [] newest_time = None continuation_mark = None page_num = 0 backoff = 1.0 while True: page_num += 1 if len(all_records) >= MAX_RECORDS: print(f"Reached max_records limit ({MAX_RECORDS})") break try: if continuation_mark: response = session.post( base_url, json={"ContinuationMark": continuation_mark}, timeout=(10, 60) ) else: response = session.get( base_url, timeout=(10, 60) ) if response.status_code == 429: retry_after = int( response.headers.get( 'Retry-After', str(int(backoff)) ) ) print(f"Rate limited (429). Retrying after " f"{retry_after}s...") time.sleep(retry_after) backoff = min(backoff * 2, 30.0) continue backoff = 1.0 if response.status_code != 200: print(f"HTTP Error: {response.status_code}") print(f"Response body: {response.text}") return all_records, newest_time data = response.json() page_results = data.get('ActivityRecordList', []) continuation_mark = data.get('ContinuationMark') if not page_results: print("No more activity records (empty page)") break # Filter records by time window filtered = [] for record in page_results: when = record.get('When') if when: try: record_time = parse_datetime(when) if start_time <= record_time <= end_time: filtered.append(record) if (newest_time is None or record_time > parse_datetime(newest_time)): newest_time = when except Exception as e: print(f"Warning: Could not parse " f"record time: {e}") filtered.append(record) else: filtered.append(record) print(f"Page {page_num}: Retrieved " f"{len(page_results)} records, " f"{len(filtered)} within time window") all_records.extend(filtered) if not continuation_mark: print("No more pages (no ContinuationMark)") break except requests.exceptions.Timeout: print(f"Request timeout on page {page_num}") return all_records, newest_time except Exception as e: print(f"Error fetching activity records: {e}") return all_records, newest_time print(f"Retrieved {len(all_records)} total records " f"from {page_num} pages") return all_records, newest_time- requirements.txt:
functions-framework==3.* google-cloud-storage==2.* requests>=2.31.0 requests-ntlm>=1.2.0點選「部署」即可儲存並部署函式。
等待部署作業完成 (2 到 3 分鐘)。
建立 Cloud Scheduler 工作
- 前往 GCP 主控台的「Cloud Scheduler」。
- 點選「建立工作」。
請提供下列設定詳細資料:
設定 值 名稱 netwrix-audit-collector-hourly區域 選取與 Cloud Run 函式相同的區域 頻率 0 * * * *(每小時整點)時區 選取時區 (建議使用世界標準時間) 目標類型 Pub/Sub 主題 選取「 netwrix-audit-trigger」郵件內文 {}(空白 JSON 物件)點選「建立」。
排程頻率選項
根據記錄檔量和延遲時間要求選擇頻率:
| 頻率 | Cron 運算式 | 用途 |
|---|---|---|
| 每 5 分鐘 | */5 * * * * |
高容量、低延遲 |
| 每 15 分鐘 | */15 * * * * |
中等音量 |
| 每小時 | 0 * * * * |
標準 (建議) |
| 每 6 小時 | 0 */6 * * * |
少量、批次處理 |
| 每日 | 0 0 * * * |
歷來資料集合 |
測試整合項目
- 在 Cloud Scheduler 控制台中,找出您的工作 (
netwrix-audit-collector-hourly)。 - 按一下「強制執行」即可手動觸發工作。
- 稍等幾秒鐘。
- 前往「Cloud Run」>「Services」。
- 按一下
netwrix-audit-collector。 - 按一下 [Logs] (記錄) 分頁標籤。
確認函式是否已順利執行。尋找:
Fetching activity records from YYYY-MM-DDTHH:MM:SS+00:00 to YYYY-MM-DDTHH:MM:SS+00:00 Page 1: Retrieved X records, X within time window Wrote X records to gs://netwrix-auditor-logs/netwrix-audit/netwrix_audit_YYYYMMDD_HHMMSS.ndjson Successfully processed X records依序前往「Cloud Storage」>「Buckets」。
按一下
netwrix-auditor-logs。前往
netwrix-audit/資料夾。確認是否已建立含有目前時間戳記的新
.ndjson檔案。
如果記錄中顯示錯誤:
- HTTP 401:確認
NETWRIX_USER和NETWRIX_PASS環境變數正確無誤,並使用DOMAIN\username格式 - HTTP 403:確認服務帳戶在 Netwrix Auditor 中具有「全域審查員」角色
- HTTP 429:頻率限制,函式會自動重試,並採用指數輪詢間隔
- 連線逾時:確認 Cloud Run 與 Netwrix Auditor Server 之間的網路連線 (通訊埠 9691)。如果伺服器位於地端部署環境,請確認已設定 VPC 連接器或 Cloud VPN
- 缺少環境變數:確認 Cloud Run 函式設定中已設定所有必要變數
擷取 Google SecOps 服務帳戶
- 依序前往「SIEM 設定」>「動態饋給」。
- 按一下「新增動態消息」。
- 按一下「設定單一動態饋給」。
- 在「動態饋給名稱」欄位中輸入動態饋給名稱 (例如
Netwrix Auditor Activity Records)。 - 選取「Google Cloud Storage V2」做為「來源類型」。
- 選取「Netwrix」做為「記錄類型」。
- 按一下「取得服務帳戶」。
系統會顯示專屬的服務帳戶電子郵件地址。例如:
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com複製這個電子郵件地址,以便在下一步中使用。
點選「下一步」。
指定下列輸入參數的值:
儲存空間 bucket URL:輸入 GCS bucket URI,並加上前置路徑:
gs://netwrix-auditor-logs/netwrix-audit/
- 來源刪除選項:根據偏好選取刪除選項:
- 永不:移轉後一律不刪除任何檔案 (建議用於測試)。
- 刪除已轉移的檔案:成功轉移檔案後刪除檔案。
- 刪除已轉移的檔案和空白目錄:成功轉移檔案後,刪除檔案和空白目錄。
- 檔案存在時間上限:包含在過去天數內修改的檔案 (預設為 180 天)
- 資產命名空間:資產命名空間
- 擷取標籤:要套用至這個動態饋給事件的標籤
點選「下一步」。
在「Finalize」(完成) 畫面中檢查新的動態饋給設定,然後按一下「Submit」(提交)。
將 IAM 權限授予 Google SecOps 服務帳戶
- 依序前往「Cloud Storage」>「Buckets」。
- 按一下
netwrix-auditor-logs。 - 前往「權限」分頁標籤。
- 按一下「授予存取權」。
- 請提供下列設定詳細資料:
- 新增主體:貼上 Google SecOps 服務帳戶電子郵件地址
- 指派角色:選取「Storage 物件檢視者」
按一下 [儲存]。
UDM 對應表
| 記錄欄位 | UDM 對應 | 邏輯 |
|---|---|---|
| Opcode | about.labels | 與「關於」資訊相關聯的標籤 |
| Caption | about.resource.attribute.labels | 「關於」專區中資源的屬性標籤 |
| 工作 | additional.fields | 包含事件額外資訊的其他欄位 |
| 結果 | additional.fields | |
| 通知 | additional.fields | |
| 說明 | additional.fields | |
| 已新增 | additional.fields | |
| 已移除 | additional.fields | |
| service_type | additional.fields | |
| 詳細資料 | additional.fields | |
| extensions.auth.type | extensions.auth.type | 使用的驗證類型 |
| EventReceivedTime | metadata.collected_timestamp | 系統收集事件時的時間戳記 |
| 訊息 | metadata.description | 事件說明 |
| event_type | metadata.event_type | 活動類型 |
| EventType | metadata.product_event_type | 產品專屬事件類型 |
| EventID | metadata.product_log_id | 產品專屬記錄 ID |
| SourceModuleType | observer.application | 觀察到事件的應用程式 |
| 主機名稱 | principal.asset.hostname | 與主體相關聯的資產主機名稱 |
| 地點 | principal.asset.hostname | |
| 工作站 | principal.asset.hostname | |
| device_name | principal.asset.hostname | |
| 工作站 | principal.hostname | 主體的主機名稱 |
| device_name | principal.hostname | |
| ProcessID | principal.process.pid | 主體的程序 ID |
| 名稱 | principal.resource.name | 與主體相關聯的資源名稱 |
| 參與者 | principal.user.user_display_name | 使用者的顯示名稱 |
| SourceName | security_result.about.resource.attribute.labels | 安全性結果中「關於」部分的資源屬性標籤 |
| 動作 | security_result.action | 安全性結果中採取的動作 |
| action_details | security_result.action_details | 安全結果中動作的詳細資料 |
| backup_name | security_result.description | 安全性結果說明 |
| service_failed | security_result.description | |
| 關鍵字 | security_result.detection_fields | 安全結果中用於偵測的欄位 |
| RecordNumber | security_result.detection_fields | |
| session_ID | security_result.detection_fields | |
| allow_connection_with_desktop | security_result.detection_fields | |
| service_account | security_result.detection_fields | |
| 嚴重性 | security_result.severity | 安全結果的嚴重性等級 |
| SeverityValue | security_result.severity | |
| 摘要 | security_result.summary | 安全性結果摘要 |
| application_name | target.application | 目標上的應用程式 |
| 主機名稱 | target.asset.hostname | 與目標相關聯的資產主機名稱 |
| 地點 | target.asset.hostname | |
| file_path | target.file.full_path | 目標檔案的完整路徑 |
| 大小 | target.file.size | 目標檔案大小 |
| 主機名稱 | target.hostname | 目標主機名稱 |
| 地點 | target.hostname | |
| 類型 | target.resource.attribute.labels | 目標資源的屬性標籤 |
| SourceModuleName | target.resource.name | 目標資源名稱 |
| DataSource | metadata.product_name | 產生事件的產品名稱 |
| metadata.vendor_name | metadata.vendor_name | 供應商名稱 |
還有其他問題嗎?向社群成員和 Google SecOps 專業人員尋求答案。