收集 Digital Guardian EDR 記錄
本文說明如何透過 Cloud Run 函式,使用 Google Cloud Storage V2 將 Digital Guardian EDR 記錄檔擷取至 Google Security Operations。
Fortra 的 Digital Guardian (舊稱 Digital Guardian) 是全方位的資料外洩防護和端點偵測與應變平台,可讓您掌握端點、網路和雲端應用程式的系統、使用者和資料事件。Analytics & Reporting Cloud (ARC) 服務提供進階分析、工作流程和報表功能,可全面保護資料。Cloud Run 函式會使用 OAuth 2.0 向 ARC Export API 進行驗證、擷取匯出資料、確認書籤以繼續處理下一個區塊、將結果以 NDJSON 格式寫入 GCS bucket,然後 Google SecOps 會透過 GCS V2 資訊提供擷取這些資料。
事前準備
請確認您已完成下列事前準備事項:
- Google SecOps 執行個體
- 已啟用下列 API 的 Google Cloud 專案:
- Cloud Storage
- Cloud Run 函式
- Cloud Scheduler
- Pub/Sub
- Cloud Build
- 建立及管理 Cloud Storage 值區、Cloud Run 函式、Pub/Sub 主題和 Cloud Scheduler 工作的權限
- Digital Guardian Management Console (DGMC) 的特殊權限
- 存取 Digital Guardian Analytics & Reporting Cloud (ARC) 租戶設定
- 在 DGMC 中設定 Cloud Services 的管理員權限
- 在 DGMC 中建立的匯出設定檔,且具有有效的 GUID
建立 Google Cloud Storage 值區
- 前往 Google Cloud 控制台。
- 選取專案或建立新專案。
- 在導覽選單中,依序前往「Cloud Storage」>「Bucket」。
- 按一下「建立值區」。
請提供下列設定詳細資料:
設定 值 為 bucket 命名 輸入全域不重複的名稱 (例如 digitalguardian-edr-logs)位置類型 根據需求選擇 (區域、雙區域、多區域) 位置 選取最接近 Google SecOps 執行個體的位置 (例如 us-central1)儲存空間級別 標準 (建議用於經常存取的記錄) 存取控管 統一 (建議) 保護工具 選用:啟用物件版本管理或保留政策 點選「建立」。
收集 Digital Guardian API 憑證
如要讓 Cloud Run 函式從 Digital Guardian ARC 擷取匯出資料,您必須取得 API 憑證並設定匯出設定檔。
從 DGMC 取得 API 憑證
- 登入 Digital Guardian Management Console (DGMC)。
- 依序前往「系統」>「設定」>「雲端服務」。
在「API Access」(API 存取權) 區段中,找出並記錄下列值:
- API 存取 ID:這是 OAuth 2.0 驗證的用戶端 ID。
- API 存取密鑰:這是 OAuth 2.0 驗證的用戶端密鑰。
- Access Gateway 基準網址:API 閘道端點 (例如
https://accessgw-usw.msp.digitalguardian.com)。 - 授權伺服器網址:OAuth 2.0 權杖端點 (例如
https://authsrv.msp.digitalguardian.com/as/token.oauth2)。
建立及設定匯出設定檔
- 在 Digital Guardian 管理控制台 (DGMC) 中,依序前往「Admin」>「Reports」>「Export Profiles」。
- 按一下「建立匯出設定檔」,或選取現有的匯出設定檔。
- 使用下列設定配置匯出設定檔:
- 設定檔名稱:輸入描述性名稱 (例如
Google SecOps SIEM Integration)。 - 資料來源:根據要匯出的資料,選取「事件」或「快訊」。
- 匯出格式:選取「JSON Flattened Table」(JSON 平面表格) (建議用於 SIEM 整合)。
- 欄位:選取要匯出的欄位。
- 篩選器:設定篩選器來限制匯出的資料 (選用)。
- 設定檔名稱:輸入描述性名稱 (例如
- 按一下「儲存」即可建立匯出設定檔。
儲存後,在清單中找到匯出設定檔,然後從匯出設定檔網址或詳細資料頁面複製 GUID。
記錄憑證摘要
請儲存下列資訊,以設定 Cloud Run 函式環境變數:
- 用戶端 ID (API 存取 ID):來自 DGMC Cloud Services
- 用戶端密鑰 (API 存取密鑰):來自 DGMC Cloud Services
- 授權伺服器網址:例如
https://authsrv.msp.digitalguardian.com/as/token.oauth2 - Access Gateway 基準網址:例如
https://accessgw-usw.msp.digitalguardian.com - 匯出設定檔 GUID:從 DGMC 建立的匯出設定檔
測試 API 存取權
執行下列指令,確認您的憑證是否有效:
# Step 1: Obtain OAuth 2.0 access token curl -s -X POST \ -d "grant_type=client_credentials&client_id=YOUR_CLIENT_ID&client_secret=YOUR_CLIENT_SECRET&scope=client" \ "https://authsrv.msp.digitalguardian.com/as/token.oauth2"# Step 2: Test export endpoint with the access token curl -s -H "Authorization: Bearer YOUR_ACCESS_TOKEN" \ "https://accessgw-usw.msp.digitalguardian.com/rest/1.0/export/YOUR_EXPORT_PROFILE_GUID"成功的回應會傳回內含匯出資料的 JSON 文件。如果收到驗證錯誤訊息,請在 DGMC Cloud Services 中驗證 API 存取 ID 和密鑰。
為 Cloud Run 函式建立服務帳戶
- 在 Google Cloud 控制台中,依序前往「IAM 與管理」>「服務帳戶」。
- 按一下「Create Service Account」(建立服務帳戶)。
- 請提供下列設定詳細資料:
- 「服務帳戶名稱」:輸入
digitalguardian-ingestion(或說明性名稱)。 - 服務帳戶說明:輸入
Service account for Digital Guardian EDR Cloud Run function to write logs to GCS。
- 「服務帳戶名稱」:輸入
- 按一下「建立並繼續」。
- 在「將專案存取權授予這個服務帳戶」部分,新增下列角色:
- Storage Object Admin (讀取/寫入 Cloud Storage 值區中的物件)
- Cloud Run 叫用者 (允許 Cloud Scheduler 叫用函式)
- 按一下「繼續」。
按一下 [完成]。
建立 Pub/Sub 主題
Cloud Scheduler 會透過 Pub/Sub 主題觸發 Cloud Run 函式。
- 在 Google Cloud 控制台中,依序前往「Pub/Sub」>「Topics」(主題)。
- 按一下「建立主題」。
- 在「Topic ID」(主題 ID) 欄位中輸入
digitalguardian-edr-trigger。 - 保留預設設定。
點選「建立」。
建立 Cloud Run 函式
建立 Cloud Run 函式,使用 OAuth 2.0 用戶端憑證向 Digital Guardian ARC 進行驗證、擷取匯出資料、確認書籤以移至下一個區塊,並將結果以 NDJSON 格式寫入 GCS。
準備函式來源檔案
建立下列兩個檔案,用於部署 Cloud Run 函式。
requirements.txt
functions-framework==3.* google-cloud-storage==2.* urllib3==2.*main.py
"""Cloud Run function to ingest Digital Guardian EDR logs into GCS.""" import json import os import time import urllib.parse from datetime import datetime, timezone import functions_framework import urllib3 from google.cloud import storage GCS_BUCKET = os.environ["GCS_BUCKET"] GCS_PREFIX = os.environ.get("GCS_PREFIX", "digitalguardian_edr") STATE_KEY = os.environ.get("STATE_KEY", "digitalguardian_edr_state.json") AUTH_SERVER_URL = os.environ["AUTH_SERVER_URL"] ARC_SERVER_URL = os.environ["ARC_SERVER_URL"] CLIENT_ID = os.environ["CLIENT_ID"] CLIENT_SECRET = os.environ["CLIENT_SECRET"] EXPORT_PROFILE_GUID = os.environ["EXPORT_PROFILE_GUID"] MAX_RECORDS = int(os.environ.get("MAX_RECORDS", "10000")) http = urllib3.PoolManager() gcs = storage.Client() def _get_access_token() -> str: """Obtain an OAuth 2.0 access token using client credentials grant.""" body = urllib.parse.urlencode({ "grant_type": "client_credentials", "client_id": CLIENT_ID, "client_secret": CLIENT_SECRET, "scope": "client", }) resp = http.request( "POST", AUTH_SERVER_URL, body=body, headers={"Content-Type": "application/x-www-form-urlencoded"}, ) if resp.status != 200: raise RuntimeError( f"OAuth token request failed: {resp.status} — " f"{resp.data.decode('utf-8')}" ) token_data = json.loads(resp.data.decode("utf-8")) return token_data["access_token"] def _arc_get(token: str, path: str, retries: int = 5) -> dict: """Execute a GET request against the ARC API with retry on 429.""" url = f"{ARC_SERVER_URL}{path}" headers = { "Authorization": f"Bearer {token}", "Accept": "application/json", } backoff = 2 for attempt in range(retries): resp = http.request("GET", url, headers=headers) if resp.status == 200: return json.loads(resp.data.decode("utf-8")) if resp.status == 429: wait = backoff * (2 ** attempt) print( f"Rate limited (429). Retrying in {wait}s " f"(attempt {attempt + 1}/{retries})." ) time.sleep(wait) continue raise RuntimeError( f"ARC API error: {resp.status} — {resp.data.decode('utf-8')}" ) raise RuntimeError( "ARC API rate limit exceeded after maximum retries." ) def _arc_acknowledge(token: str) -> None: """POST to the acknowledge endpoint to advance the export bookmark.""" url = ( f"{ARC_SERVER_URL}/rest/1.0/export/" f"{EXPORT_PROFILE_GUID}/acknowledge" ) headers = { "Authorization": f"Bearer {token}", "Accept": "application/json", } resp = http.request("POST", url, headers=headers) if resp.status not in (200, 204): raise RuntimeError( f"ARC acknowledge failed: {resp.status} — " f"{resp.data.decode('utf-8')}" ) print("Export bookmark acknowledged successfully.") def _load_state() -> dict: """Load the last run state from GCS.""" bucket = gcs.bugcs.bucketUCKET) blob = bucket.blob(f"{GCS_PREFIX}/{STATE_KEY}") if blob.exists(): return json.loads(blob.downlodownload_as_text return {} def _save_state(state: dict) -> None: """Persist run state to GCS.""" bucket = gcs.bugcs.bucketUCKET) blob = bucket.blob(f"{GCS_PREFIX}/{STATE_KEY}") blob.uploadupload_from_string json.dumps(state), content_type="application/json" ) def _fetch_export(token: str) -> list: """Fetch export data from the ARC Export API.""" path = f"/rest/1.0/export/{EXPORT_PROFILE_GUID}" data = _arc_get(token, path) records = data if isinstance(data, list) else data.get("data", []) return records[:MAX_RECORDS] def _write_ndjson(records: list, run_ts: str) -> str: """Write records as NDJSON to GCS and return the blob path.""" bucket = gcs.bugcs.bucketUCKET) blob_path = ( f"{GCS_PREFIX}/year={run_ts[:4]}/month={run_ts[5:7]}/" f"day={run_ts[8:10]}/{run_ts}_export.ndjson" ) blob = bucket.blob(blob_path) ndjson = "\n".join( json.dumps(r, separators=(",", ":")) for r in records ) blob.uploadupload_from_stringn, content_type="application/x-ndjson") return blob_path @functions_framework.cloud_event def main(cloud_event): """Entry point triggered by Pub/Sub via Cloud Scheduler.""" state = _load_state() now = datetime.now(timezone.utc) print("Authenticating to Digital Guardian ARC.") token = _get_access_token() print( f"Fetching export data for profile {EXPORT_PROFILE_GUID}." ) records = _fetch_export(token) if not records: print("No new export data found.") return "OK" run_ts = now.strftime("%Y-%m-%dT%H%M%SZ") blob_path = _write_ndjson(records, run_ts) print( f"Wrote {len(records)} records to " f"gs://{GCS_BUCKET}/{blob_path}." ) _arc_acknowledge(token) state["last_run"] = now.isoformat() state["records_written"] = len(records) _save_state(state) print(f"State updated. last_run={now.isoformat()}.") return "OK"
部署 Cloud Run 函式
- 將這兩個檔案 (
main.py和requirements.txt) 儲存到本機目錄 (例如digitalguardian-function/)。 - 開啟 Cloud Shell 或已安裝
gcloudCLI 的終端機。 輸入下列指令來部署函式:
gcloud functions deploy digitalguardian-edr-to-gcs \ --gen2 \ --region=us-central1 \ --runtime=python312 \ --trigger-topic=digitalguardian-edr-trigger \ --entry-point=main \ --memory=512MB \ --timeout=540s \ --service-account=digitalguardian-ingestion@PROJECT_ID.iam.gserviceaccount.com \ --set-env-vars=\ "GCS_BUCKET=digitalguardian-edr-logs",\ "GCS_PREFIX=digitalguardian_edr",\ "STATE_KEY=digitalguardian_edr_state.json",\ "AUTH_SERVER_URL=https://authsrv.msp.digitalguardian.com/as/token.oauth2",\ "ARC_SERVER_URL=https://accessgw-usw.msp.digitalguardian.com",\ "CLIENT_ID=YOUR_CLIENT_ID",\ "CLIENT_SECRET=YOUR_CLIENT_SECRET",\ "EXPORT_PROFILE_GUID=YOUR_EXPORT_PROFILE_GUID",\ "MAX_RECORDS=10000"替換下列預留位置值:
PROJECT_ID:您的 Google Cloud 專案 ID。digitalguardian-edr-logs:您的 GCS bucket 名稱。YOUR_CLIENT_ID:您的 Digital Guardian API 存取 ID。YOUR_CLIENT_SECRET:您的 Digital Guardian API 存取密鑰。YOUR_EXPORT_PROFILE_GUID:DGMC 中的匯出設定檔 GUID。
檢查函式狀態,驗證部署作業:
gcloud functions describe digitalguardian-edr-to-gcs --region=us-central1 --gen2
環境變數參照
| 變數 | 必填 | 預設 | 說明 |
|---|---|---|---|
GCS_BUCKET |
是 | 儲存 NDJSON 輸出內容的 GCS bucket 名稱 | |
GCS_PREFIX |
否 | digitalguardian_edr |
bucket 中的物件前置字串 (資料夾路徑) |
STATE_KEY |
否 | digitalguardian_edr_state.json |
前置字串中的狀態檔案 Blob 名稱 |
AUTH_SERVER_URL |
是 | OAuth 2.0 授權伺服器網址 | |
ARC_SERVER_URL |
是 | ARC Access Gateway 基礎網址 | |
CLIENT_ID |
是 | DGMC 的 API 存取 ID | |
CLIENT_SECRET |
是 | DGMC 的 API 存取密鑰 | |
EXPORT_PROFILE_GUID |
是 | 從 DGMC 匯出設定檔 GUID | |
MAX_RECORDS |
否 | 10000 |
每次執行可寫入的記錄數量上限 |
建立 Cloud Scheduler 工作
Cloud Scheduler 會透過 Pub/Sub 主題,定期觸發 Cloud Run 函式。
- 前往 Google Cloud 控制台的「Cloud Scheduler」。
- 點選「建立工作」。
請提供下列設定詳細資料:
- 「Name」(名稱):輸入
digitalguardian-edr-ingestion-schedule。 - 區域:選取與 Cloud Run 函式相同的區域 (例如
us-central1)。 頻率:輸入
*/5 * * * *(每 5 分鐘)。時區:選取偏好的時區 (例如
UTC)。
- 「Name」(名稱):輸入
按一下「繼續」。
在「設定執行作業」部分:
- 目標類型:選取「Pub/Sub」。
- 主題:選取
digitalguardian-edr-trigger。 - 訊息內文:輸入
{"run": true}。
按一下「繼續」。
在「Configure optional settings」(設定選用設定) 部分:
- 重試次數上限:輸入
3。 - 輪詢持續時間下限:輸入
5s。 - 輪詢持續時間上限:輸入
60s。
- 重試次數上限:輸入
點選「建立」。
如要立即執行測試,請按一下工作名稱旁的三點圖示 (...),然後選取「強制執行」。
擷取 Google SecOps 服務帳戶並設定動態饋給
Google SecOps 會使用專屬服務帳戶,從 GCS bucket 讀取資料。您必須授予這個服務帳戶值區存取權。
取得服務帳戶電子郵件地址
- 依序前往「SIEM 設定」>「動態饋給」。
- 按一下「新增動態消息」。
- 按一下「設定單一動態饋給」。
- 在「動態饋給名稱」欄位中輸入動態饋給名稱 (例如
Digital Guardian EDR Logs)。 - 選取「Google Cloud Storage V2」做為「來源類型」。
- 選取「Digital Guardian EDR」做為「記錄類型」。
按一下「取得服務帳戶」。系統會顯示不重複的服務帳戶電子郵件地址,例如:
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com複製這個電子郵件地址,以便在下一步中使用。
點選「下一步」。
指定下列輸入參數的值:
儲存空間 bucket URL:輸入 GCS bucket URI:
gs://digitalguardian-edr-logs/digitalguardian_edr/- 請將
digitalguardian-edr-logs替換為您的 GCS bucket 名稱。 - 將
digitalguardian_edr替換為您設定的GCS_PREFIX值。
- 請將
來源刪除選項:根據偏好設定選取刪除選項:
- 永不:移轉後一律不刪除任何檔案 (建議用於測試)。
- 刪除已轉移的檔案:成功轉移檔案後刪除檔案。
刪除已轉移的檔案和空白目錄:成功轉移後刪除檔案和空白目錄。
檔案存在時間上限:包含在過去天數內修改的檔案 (預設為 180 天)。
資產命名空間:資產命名空間。
擷取標籤:要套用至這個動態饋給事件的標籤。
點選「下一步」。
在「Finalize」(完成) 畫面中檢查新的動態饋給設定,然後按一下「Submit」(提交)。
將 IAM 權限授予 Google SecOps 服務帳戶
Google SecOps 服務帳戶必須具備 Cloud Storage bucket 的「Storage 物件檢視者」角色。
- 依序前往「Cloud Storage」>「Buckets」。
- 按一下 bucket 名稱 (例如
digitalguardian-edr-logs)。 - 前往「權限」分頁標籤。
- 按一下「授予存取權」。
- 請提供下列設定詳細資料:
- 新增主體:貼上 Google SecOps 服務帳戶電子郵件地址 (例如
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com)。 - 指派角色:選取「Storage 物件檢視者」。
- 新增主體:貼上 Google SecOps 服務帳戶電子郵件地址 (例如
按一下 [儲存]。
UDM 對應表
| 記錄欄位 | UDM 對應 | 邏輯 |
|---|---|---|
| 應用程式 | target.application | 直接複製值 |
| 應用程式 | target.process.command_line | 如果規則符合「Printer」,請設為「%{Application}」。 |
| Bytes_Written | network.sent_bytes | 直接複製值並轉換為 uinteger |
| 類別、電腦名稱、詳細資料 | metadata.description | 如果 Category == "Policies" 且 Computer_Name 為空,則設為 %{Detail};否則在 grok_parse_failure 上設為 %{message} |
| Command_Line、Command_Line1 | principal.process.command_line | 從 Command_Line 移除尾端引號後的值 (如不為空),否則從 Command_Line1 移除尾端引號後的值 |
| Computer_Name、來源 | principal.hostname | 如果 computerName 不為空白,則為該值,否則設為 %{source} |
| Destination_Device_Serial_Number、Destination_Device_Serial_Number1 | 使用 grok 模式擷取,以處理引號 | |
| Destination_Directory、Destination_File | target.file.full_path | 如果 Destination_Directory 和 Destination_File 皆不為空,則會串連兩者 |
| Destination_Drive_Type | security_result.detection_fields | 已與 destination_drive_type_label 合併 (鍵:Destination_Drive_Type,值:%{Destination_Drive_Type}) |
| Destination_File | target.file.names | 從 Destination_File 合併 |
| Destination_File_Extension | target.file.mime_type | 直接複製值 |
| Dll_SHA1_Hash | target.process.file.sha1 | 轉換為小寫後直接複製值 |
| Email_Address | principal.user.email_addresses | 已從「Email_Address」合併 |
| Email_Sender、Email_Subject | network.email.from | 如果不是空白,請設為 %{Email_Sender} |
| Email_Sender、Email_Subject | network.email.subject | 如果 Email_Sender 不為空,則從主旨 (%{Email_Subject}) 合併 |
| File_Extension | principal.process.file.mime_type | 直接複製值 |
| IP_Address、source_ip | principal.ip | 如果來源 IP 位址不為空,則從來源 IP 位址合併,否則從 IP 位址合併 |
| Local_Port、source_port | principal.port | 如果 source_port 不為空且已轉換為整數,則為該值;否則為 Local_Port 並轉換為整數 |
| MD5_Checksum | target.process.file.md5 | 轉換為小寫後直接複製值 |
| Network_Direction | network.direction | 如果為 True,則設為 INBOUND;如果為 False,則設為 OUTBOUND |
| Process_PID | principal.process.pid | 直接複製值 |
| Process_SHA256_Hash | target.process.file.sha256 | 轉換為小寫後直接複製值 |
| Product_Version | metadata.product_version | 直接複製值 |
| 通訊協定 | network.ip_protocol | 如果 ==「1」,則設為 ICMP |
| Remote_Port | target.port | 直接複製值並轉換為整數 |
| 規則 | security_result.rule_name | 直接複製值 |
| 規則 | metadata.event_type | 如果符合 .Printer.,則設為 PROCESS_UNCATEGORIZED;如果符合 DLP.*,則設為 FILE_MOVE |
| 嚴重性 | security_result.severity | 轉換為整數後,如果 <=3,請設為「低」;如果 <=6,請設為「中」;如果 <=8,請設為「高」;如果 <=10,請設為「重大」 |
| 嚴重性 | security_result.severity_details | 直接複製值 |
| Source_Directory、Source_File | src.file.full_path | 如果 Source_Directory 和 Source_File 皆不為空,則會串連兩者 |
| Source_Drive_Type | security_result.detection_fields | 已與 source_drive_type_label 合併 (鍵:Source_Drive_Type,值:%{Source_Drive_Type}) |
| Source_File | src.file.names | 從「來源檔案」合併 |
| Source_File_Extension | src.file.mime_type | 直接複製值 |
| URL_Path、http_url | target.url | 如果 http_url 不為空白,則取自該網址,否則取自 URL_Path |
| User_Name | principal.user.userid | 從 userName 擷取 grok 後的值 |
| User_Name | principal.administrative_domain | 從 grok 擷取後取得 domainName 的值 |
| Was_Removable | security_result.detection_fields | 已與 was_removable_label 合併 (鍵:Was_Removable,值:%{Was_Removable}) |
| Was_Source_Removable | security_result.detection_fields | 已與 was_source_removable_label 合併 (鍵:Was_Source_Removable,值:%{Was_Source_Removable}) |
| computerName、destination_ip、protocol、source_ip、IP_Address、destination、userName、Process_PID、Category、Computer_Name | metadata.event_type | 一開始設為 GENERIC_EVENT;如果通訊協定為 HTTPS 且 (destination_ip 或 computerName),則設為 NETWORK_HTTP;如果 (source_ip 或 IP_Address) 且 destination_ip,則設為 NETWORK_CONNECTION;如果 userName 不為空白,則設為 USER_UNCATEGORIZED;如果 Process_PID 不為空白,則設為 SCAN_PROCESS |
| destination_ip | target.ip | 已從 destination_ip 合併 |
| incidents_url、matched_policies_by_severity | security_result | 已與 _sr 合併 (規則名稱:%{matched_policies_by_severity},產品返回網址:%{incidents_url}) |
| 通訊協定 | network.application_protocol | 如果通訊協定為 HTTP 或 HTTPS,則設為 HTTPS |
| security_action | security_result.action | 從 security_action 合併 |
| metadata.product_name | 設為「Enterprise DLP Platform」 | |
| metadata.vendor_name | 設為「DigitalGuardian」 |
還有其他問題嗎?向社群成員和 Google SecOps 專業人員尋求答案。