收集 Digital Guardian EDR 記錄

支援的國家/地區:

本文說明如何透過 Cloud Run 函式,使用 Google Cloud Storage V2 將 Digital Guardian EDR 記錄檔擷取至 Google Security Operations。

Fortra 的 Digital Guardian (舊稱 Digital Guardian) 是全方位的資料外洩防護和端點偵測與應變平台,可讓您掌握端點、網路和雲端應用程式的系統、使用者和資料事件。Analytics & Reporting Cloud (ARC) 服務提供進階分析、工作流程和報表功能,可全面保護資料。Cloud Run 函式會使用 OAuth 2.0 向 ARC Export API 進行驗證、擷取匯出資料、確認書籤以繼續處理下一個區塊、將結果以 NDJSON 格式寫入 GCS bucket,然後 Google SecOps 會透過 GCS V2 資訊提供擷取這些資料。

事前準備

請確認您已完成下列事前準備事項:

  • Google SecOps 執行個體
  • 已啟用下列 API 的 Google Cloud 專案:
    • Cloud Storage
    • Cloud Run 函式
    • Cloud Scheduler
    • Pub/Sub
    • Cloud Build
  • 建立及管理 Cloud Storage 值區、Cloud Run 函式、Pub/Sub 主題和 Cloud Scheduler 工作的權限
  • Digital Guardian Management Console (DGMC) 的特殊權限
  • 存取 Digital Guardian Analytics & Reporting Cloud (ARC) 租戶設定
  • 在 DGMC 中設定 Cloud Services 的管理員權限
  • 在 DGMC 中建立的匯出設定檔,且具有有效的 GUID

建立 Google Cloud Storage 值區

  1. 前往 Google Cloud 控制台
  2. 選取專案或建立新專案。
  3. 在導覽選單中,依序前往「Cloud Storage」>「Bucket」
  4. 按一下「建立值區」
  5. 請提供下列設定詳細資料:

    設定
    為 bucket 命名 輸入全域不重複的名稱 (例如 digitalguardian-edr-logs)
    位置類型 根據需求選擇 (區域、雙區域、多區域)
    位置 選取最接近 Google SecOps 執行個體的位置 (例如 us-central1)
    儲存空間級別 標準 (建議用於經常存取的記錄)
    存取控管 統一 (建議)
    保護工具 選用:啟用物件版本管理或保留政策
  6. 點選「建立」

收集 Digital Guardian API 憑證

  • 如要讓 Cloud Run 函式從 Digital Guardian ARC 擷取匯出資料,您必須取得 API 憑證並設定匯出設定檔。

從 DGMC 取得 API 憑證

  1. 登入 Digital Guardian Management Console (DGMC)
  2. 依序前往「系統」>「設定」>「雲端服務」
  3. 在「API Access」(API 存取權) 區段中,找出並記錄下列值:

    • API 存取 ID:這是 OAuth 2.0 驗證的用戶端 ID。
    • API 存取密鑰:這是 OAuth 2.0 驗證的用戶端密鑰。
    • Access Gateway 基準網址:API 閘道端點 (例如 https://accessgw-usw.msp.digitalguardian.com)。
    • 授權伺服器網址:OAuth 2.0 權杖端點 (例如 https://authsrv.msp.digitalguardian.com/as/token.oauth2)。

建立及設定匯出設定檔

  1. Digital Guardian 管理控制台 (DGMC) 中,依序前往「Admin」>「Reports」>「Export Profiles」
  2. 按一下「建立匯出設定檔」,或選取現有的匯出設定檔。
  3. 使用下列設定配置匯出設定檔:
    • 設定檔名稱:輸入描述性名稱 (例如 Google SecOps SIEM Integration)。
    • 資料來源:根據要匯出的資料,選取「事件」或「快訊」
    • 匯出格式:選取「JSON Flattened Table」(JSON 平面表格) (建議用於 SIEM 整合)。
    • 欄位:選取要匯出的欄位。
    • 篩選器:設定篩選器來限制匯出的資料 (選用)。
  4. 按一下「儲存」即可建立匯出設定檔。
  5. 儲存後,在清單中找到匯出設定檔,然後從匯出設定檔網址或詳細資料頁面複製 GUID

記錄憑證摘要

請儲存下列資訊,以設定 Cloud Run 函式環境變數:

  • 用戶端 ID (API 存取 ID):來自 DGMC Cloud Services
  • 用戶端密鑰 (API 存取密鑰):來自 DGMC Cloud Services
  • 授權伺服器網址:例如 https://authsrv.msp.digitalguardian.com/as/token.oauth2
  • Access Gateway 基準網址:例如 https://accessgw-usw.msp.digitalguardian.com
  • 匯出設定檔 GUID:從 DGMC 建立的匯出設定檔

測試 API 存取權

  1. 執行下列指令,確認您的憑證是否有效:

    # Step 1: Obtain OAuth 2.0 access token
    curl -s -X POST \
      -d "grant_type=client_credentials&client_id=YOUR_CLIENT_ID&client_secret=YOUR_CLIENT_SECRET&scope=client" \
      "https://authsrv.msp.digitalguardian.com/as/token.oauth2"
    
    # Step 2: Test export endpoint with the access token
    curl -s -H "Authorization: Bearer YOUR_ACCESS_TOKEN" \
      "https://accessgw-usw.msp.digitalguardian.com/rest/1.0/export/YOUR_EXPORT_PROFILE_GUID"
    
  2. 成功的回應會傳回內含匯出資料的 JSON 文件。如果收到驗證錯誤訊息,請在 DGMC Cloud Services 中驗證 API 存取 ID 和密鑰。

為 Cloud Run 函式建立服務帳戶

  1. Google Cloud 控制台中,依序前往「IAM 與管理」>「服務帳戶」
  2. 按一下「Create Service Account」(建立服務帳戶)
  3. 請提供下列設定詳細資料:
    • 「服務帳戶名稱」:輸入 digitalguardian-ingestion (或說明性名稱)。
    • 服務帳戶說明:輸入 Service account for Digital Guardian EDR Cloud Run function to write logs to GCS
  4. 按一下「建立並繼續」
  5. 在「將專案存取權授予這個服務帳戶」部分,新增下列角色:
    • Storage Object Admin (讀取/寫入 Cloud Storage 值區中的物件)
    • Cloud Run 叫用者 (允許 Cloud Scheduler 叫用函式)
  6. 按一下「繼續」
  7. 按一下 [完成]

建立 Pub/Sub 主題

Cloud Scheduler 會透過 Pub/Sub 主題觸發 Cloud Run 函式。

  1. Google Cloud 控制台中,依序前往「Pub/Sub」>「Topics」(主題)
  2. 按一下「建立主題」
  3. 在「Topic ID」(主題 ID) 欄位中輸入 digitalguardian-edr-trigger
  4. 保留預設設定。
  5. 點選「建立」

建立 Cloud Run 函式

建立 Cloud Run 函式,使用 OAuth 2.0 用戶端憑證向 Digital Guardian ARC 進行驗證、擷取匯出資料、確認書籤以移至下一個區塊,並將結果以 NDJSON 格式寫入 GCS。

準備函式來源檔案

建立下列兩個檔案,用於部署 Cloud Run 函式。

  • requirements.txt

    functions-framework==3.*
    google-cloud-storage==2.*
    urllib3==2.*
    
  • main.py

    """Cloud Run function to ingest Digital Guardian EDR logs into GCS."""
    
    import json
    import os
    import time
    import urllib.parse
    from datetime import datetime, timezone
    
    import functions_framework
    import urllib3
    from google.cloud import storage
    
    GCS_BUCKET = os.environ["GCS_BUCKET"]
    GCS_PREFIX = os.environ.get("GCS_PREFIX", "digitalguardian_edr")
    STATE_KEY = os.environ.get("STATE_KEY", "digitalguardian_edr_state.json")
    AUTH_SERVER_URL = os.environ["AUTH_SERVER_URL"]
    ARC_SERVER_URL = os.environ["ARC_SERVER_URL"]
    CLIENT_ID = os.environ["CLIENT_ID"]
    CLIENT_SECRET = os.environ["CLIENT_SECRET"]
    EXPORT_PROFILE_GUID = os.environ["EXPORT_PROFILE_GUID"]
    MAX_RECORDS = int(os.environ.get("MAX_RECORDS", "10000"))
    
    http = urllib3.PoolManager()
    gcs = storage.Client()
    
    def _get_access_token() -> str:
        """Obtain an OAuth 2.0 access token using client credentials grant."""
        body = urllib.parse.urlencode({
            "grant_type": "client_credentials",
            "client_id": CLIENT_ID,
            "client_secret": CLIENT_SECRET,
            "scope": "client",
        })
        resp = http.request(
            "POST",
            AUTH_SERVER_URL,
            body=body,
            headers={"Content-Type": "application/x-www-form-urlencoded"},
        )
        if resp.status != 200:
            raise RuntimeError(
                f"OAuth token request failed: {resp.status} — "
                f"{resp.data.decode('utf-8')}"
            )
        token_data = json.loads(resp.data.decode("utf-8"))
        return token_data["access_token"]
    
    def _arc_get(token: str, path: str, retries: int = 5) -> dict:
        """Execute a GET request against the ARC API with retry on 429."""
        url = f"{ARC_SERVER_URL}{path}"
        headers = {
            "Authorization": f"Bearer {token}",
            "Accept": "application/json",
        }
        backoff = 2
        for attempt in range(retries):
            resp = http.request("GET", url, headers=headers)
            if resp.status == 200:
                return json.loads(resp.data.decode("utf-8"))
            if resp.status == 429:
                wait = backoff * (2 ** attempt)
                print(
                    f"Rate limited (429). Retrying in {wait}s "
                    f"(attempt {attempt + 1}/{retries})."
                )
                time.sleep(wait)
                continue
            raise RuntimeError(
                f"ARC API error: {resp.status}{resp.data.decode('utf-8')}"
            )
        raise RuntimeError(
            "ARC API rate limit exceeded after maximum retries."
        )
    
    def _arc_acknowledge(token: str) -> None:
        """POST to the acknowledge endpoint to advance the export bookmark."""
        url = (
            f"{ARC_SERVER_URL}/rest/1.0/export/"
            f"{EXPORT_PROFILE_GUID}/acknowledge"
        )
        headers = {
            "Authorization": f"Bearer {token}",
            "Accept": "application/json",
        }
        resp = http.request("POST", url, headers=headers)
        if resp.status not in (200, 204):
            raise RuntimeError(
                f"ARC acknowledge failed: {resp.status} — "
                f"{resp.data.decode('utf-8')}"
            )
        print("Export bookmark acknowledged successfully.")
    
    def _load_state() -> dict:
        """Load the last run state from GCS."""
        bucket = gcs.bugcs.bucketUCKET)
        blob = bucket.blob(f"{GCS_PREFIX}/{STATE_KEY}")
        if blob.exists():
            return json.loads(blob.downlodownload_as_text  return {}
    
    def _save_state(state: dict) -> None:
        """Persist run state to GCS."""
        bucket = gcs.bugcs.bucketUCKET)
        blob = bucket.blob(f"{GCS_PREFIX}/{STATE_KEY}")
        blob.uploadupload_from_string    json.dumps(state), content_type="application/json"
        )
    
    def _fetch_export(token: str) -> list:
        """Fetch export data from the ARC Export API."""
        path = f"/rest/1.0/export/{EXPORT_PROFILE_GUID}"
        data = _arc_get(token, path)
        records = data if isinstance(data, list) else data.get("data", [])
        return records[:MAX_RECORDS]
    
    def _write_ndjson(records: list, run_ts: str) -> str:
        """Write records as NDJSON to GCS and return the blob path."""
        bucket = gcs.bugcs.bucketUCKET)
        blob_path = (
            f"{GCS_PREFIX}/year={run_ts[:4]}/month={run_ts[5:7]}/"
            f"day={run_ts[8:10]}/{run_ts}_export.ndjson"
        )
        blob = bucket.blob(blob_path)
        ndjson = "\n".join(
            json.dumps(r, separators=(",", ":")) for r in records
        )
        blob.uploadupload_from_stringn, content_type="application/x-ndjson")
        return blob_path
    
    @functions_framework.cloud_event
    def main(cloud_event):
        """Entry point triggered by Pub/Sub via Cloud Scheduler."""
        state = _load_state()
        now = datetime.now(timezone.utc)
    
        print("Authenticating to Digital Guardian ARC.")
        token = _get_access_token()
    
        print(
            f"Fetching export data for profile {EXPORT_PROFILE_GUID}."
        )
        records = _fetch_export(token)
    
        if not records:
            print("No new export data found.")
            return "OK"
    
        run_ts = now.strftime("%Y-%m-%dT%H%M%SZ")
        blob_path = _write_ndjson(records, run_ts)
        print(
            f"Wrote {len(records)} records to "
            f"gs://{GCS_BUCKET}/{blob_path}."
        )
    
        _arc_acknowledge(token)
    
        state["last_run"] = now.isoformat()
        state["records_written"] = len(records)
        _save_state(state)
        print(f"State updated. last_run={now.isoformat()}.")
        return "OK"
    

部署 Cloud Run 函式

  1. 將這兩個檔案 (main.pyrequirements.txt) 儲存到本機目錄 (例如 digitalguardian-function/)。
  2. 開啟 Cloud Shell 或已安裝 gcloud CLI 的終端機。
  3. 輸入下列指令來部署函式:

    gcloud functions deploy digitalguardian-edr-to-gcs \
      --gen2 \
      --region=us-central1 \
      --runtime=python312 \
      --trigger-topic=digitalguardian-edr-trigger \
      --entry-point=main \
      --memory=512MB \
      --timeout=540s \
      --service-account=digitalguardian-ingestion@PROJECT_ID.iam.gserviceaccount.com \
      --set-env-vars=\
      "GCS_BUCKET=digitalguardian-edr-logs",\
      "GCS_PREFIX=digitalguardian_edr",\
      "STATE_KEY=digitalguardian_edr_state.json",\
      "AUTH_SERVER_URL=https://authsrv.msp.digitalguardian.com/as/token.oauth2",\
      "ARC_SERVER_URL=https://accessgw-usw.msp.digitalguardian.com",\
      "CLIENT_ID=YOUR_CLIENT_ID",\
      "CLIENT_SECRET=YOUR_CLIENT_SECRET",\
      "EXPORT_PROFILE_GUID=YOUR_EXPORT_PROFILE_GUID",\
      "MAX_RECORDS=10000"
    
  4. 替換下列預留位置值:

    • PROJECT_ID:您的 Google Cloud 專案 ID。
    • digitalguardian-edr-logs:您的 GCS bucket 名稱。
    • YOUR_CLIENT_ID:您的 Digital Guardian API 存取 ID。
    • YOUR_CLIENT_SECRET:您的 Digital Guardian API 存取密鑰。
    • YOUR_EXPORT_PROFILE_GUID:DGMC 中的匯出設定檔 GUID。
  5. 檢查函式狀態,驗證部署作業:

    gcloud functions describe digitalguardian-edr-to-gcs --region=us-central1 --gen2
    

環境變數參照

變數 必填 預設 說明
GCS_BUCKET 儲存 NDJSON 輸出內容的 GCS bucket 名稱
GCS_PREFIX digitalguardian_edr bucket 中的物件前置字串 (資料夾路徑)
STATE_KEY digitalguardian_edr_state.json 前置字串中的狀態檔案 Blob 名稱
AUTH_SERVER_URL OAuth 2.0 授權伺服器網址
ARC_SERVER_URL ARC Access Gateway 基礎網址
CLIENT_ID DGMC 的 API 存取 ID
CLIENT_SECRET DGMC 的 API 存取密鑰
EXPORT_PROFILE_GUID 從 DGMC 匯出設定檔 GUID
MAX_RECORDS 10000 每次執行可寫入的記錄數量上限

建立 Cloud Scheduler 工作

Cloud Scheduler 會透過 Pub/Sub 主題,定期觸發 Cloud Run 函式。

  1. 前往 Google Cloud 控制台的「Cloud Scheduler」
  2. 點選「建立工作」
  3. 請提供下列設定詳細資料:

    • 「Name」(名稱):輸入 digitalguardian-edr-ingestion-schedule
    • 區域:選取與 Cloud Run 函式相同的區域 (例如 us-central1)。
    • 頻率:輸入 */5 * * * * (每 5 分鐘)。

    • 時區:選取偏好的時區 (例如 UTC)。

  4. 按一下「繼續」

  5. 在「設定執行作業」部分:

    • 目標類型:選取「Pub/Sub」
    • 主題:選取 digitalguardian-edr-trigger
    • 訊息內文:輸入 {"run": true}
  6. 按一下「繼續」

  7. 在「Configure optional settings」(設定選用設定) 部分:

    • 重試次數上限:輸入 3
    • 輪詢持續時間下限:輸入 5s
    • 輪詢持續時間上限:輸入 60s
  8. 點選「建立」

  9. 如要立即執行測試,請按一下工作名稱旁的三點圖示 (...),然後選取「強制執行」

擷取 Google SecOps 服務帳戶並設定動態饋給

Google SecOps 會使用專屬服務帳戶,從 GCS bucket 讀取資料。您必須授予這個服務帳戶值區存取權。

取得服務帳戶電子郵件地址

  1. 依序前往「SIEM 設定」>「動態饋給」
  2. 按一下「新增動態消息」
  3. 按一下「設定單一動態饋給」
  4. 在「動態饋給名稱」欄位中輸入動態饋給名稱 (例如 Digital Guardian EDR Logs)。
  5. 選取「Google Cloud Storage V2」做為「來源類型」
  6. 選取「Digital Guardian EDR」做為「記錄類型」
  7. 按一下「取得服務帳戶」。系統會顯示不重複的服務帳戶電子郵件地址,例如:

    chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com
    
  8. 複製這個電子郵件地址,以便在下一步中使用。

  9. 點選「下一步」

  10. 指定下列輸入參數的值:

    • 儲存空間 bucket URL:輸入 GCS bucket URI:

      gs://digitalguardian-edr-logs/digitalguardian_edr/
      
      • 請將 digitalguardian-edr-logs 替換為您的 GCS bucket 名稱。
      • digitalguardian_edr 替換為您設定的 GCS_PREFIX 值。
    • 來源刪除選項:根據偏好設定選取刪除選項:

      • 永不:移轉後一律不刪除任何檔案 (建議用於測試)。
      • 刪除已轉移的檔案:成功轉移檔案後刪除檔案。
      • 刪除已轉移的檔案和空白目錄:成功轉移後刪除檔案和空白目錄。

    • 檔案存在時間上限:包含在過去天數內修改的檔案 (預設為 180 天)。

    • 資產命名空間資產命名空間

    • 擷取標籤:要套用至這個動態饋給事件的標籤。

  11. 點選「下一步」

  12. 在「Finalize」(完成) 畫面中檢查新的動態饋給設定,然後按一下「Submit」(提交)

將 IAM 權限授予 Google SecOps 服務帳戶

Google SecOps 服務帳戶必須具備 Cloud Storage bucket 的「Storage 物件檢視者」角色。

  1. 依序前往「Cloud Storage」>「Buckets」
  2. 按一下 bucket 名稱 (例如 digitalguardian-edr-logs)。
  3. 前往「權限」分頁標籤。
  4. 按一下「授予存取權」
  5. 請提供下列設定詳細資料:
    • 新增主體:貼上 Google SecOps 服務帳戶電子郵件地址 (例如 chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com)。
    • 指派角色:選取「Storage 物件檢視者」
  6. 按一下 [儲存]

UDM 對應表

記錄欄位 UDM 對應 邏輯
應用程式 target.application 直接複製值
應用程式 target.process.command_line 如果規則符合「Printer」,請設為「%{Application}」。
Bytes_Written network.sent_bytes 直接複製值並轉換為 uinteger
類別、電腦名稱、詳細資料 metadata.description 如果 Category == "Policies" 且 Computer_Name 為空,則設為 %{Detail};否則在 grok_parse_failure 上設為 %{message}
Command_Line、Command_Line1 principal.process.command_line 從 Command_Line 移除尾端引號後的值 (如不為空),否則從 Command_Line1 移除尾端引號後的值
Computer_Name、來源 principal.hostname 如果 computerName 不為空白,則為該值,否則設為 %{source}
Destination_Device_Serial_Number、Destination_Device_Serial_Number1 使用 grok 模式擷取,以處理引號
Destination_Directory、Destination_File target.file.full_path 如果 Destination_Directory 和 Destination_File 皆不為空,則會串連兩者
Destination_Drive_Type security_result.detection_fields 已與 destination_drive_type_label 合併 (鍵:Destination_Drive_Type,值:%{Destination_Drive_Type})
Destination_File target.file.names 從 Destination_File 合併
Destination_File_Extension target.file.mime_type 直接複製值
Dll_SHA1_Hash target.process.file.sha1 轉換為小寫後直接複製值
Email_Address principal.user.email_addresses 已從「Email_Address」合併
Email_Sender、Email_Subject network.email.from 如果不是空白,請設為 %{Email_Sender}
Email_Sender、Email_Subject network.email.subject 如果 Email_Sender 不為空,則從主旨 (%{Email_Subject}) 合併
File_Extension principal.process.file.mime_type 直接複製值
IP_Address、source_ip principal.ip 如果來源 IP 位址不為空,則從來源 IP 位址合併,否則從 IP 位址合併
Local_Port、source_port principal.port 如果 source_port 不為空且已轉換為整數,則為該值;否則為 Local_Port 並轉換為整數
MD5_Checksum target.process.file.md5 轉換為小寫後直接複製值
Network_Direction network.direction 如果為 True,則設為 INBOUND;如果為 False,則設為 OUTBOUND
Process_PID principal.process.pid 直接複製值
Process_SHA256_Hash target.process.file.sha256 轉換為小寫後直接複製值
Product_Version metadata.product_version 直接複製值
通訊協定 network.ip_protocol 如果 ==「1」,則設為 ICMP
Remote_Port target.port 直接複製值並轉換為整數
規則 security_result.rule_name 直接複製值
規則 metadata.event_type 如果符合 .Printer.,則設為 PROCESS_UNCATEGORIZED;如果符合 DLP.*,則設為 FILE_MOVE
嚴重性 security_result.severity 轉換為整數後,如果 <=3,請設為「低」;如果 <=6,請設為「中」;如果 <=8,請設為「高」;如果 <=10,請設為「重大」
嚴重性 security_result.severity_details 直接複製值
Source_Directory、Source_File src.file.full_path 如果 Source_Directory 和 Source_File 皆不為空,則會串連兩者
Source_Drive_Type security_result.detection_fields 已與 source_drive_type_label 合併 (鍵:Source_Drive_Type,值:%{Source_Drive_Type})
Source_File src.file.names 從「來源檔案」合併
Source_File_Extension src.file.mime_type 直接複製值
URL_Path、http_url target.url 如果 http_url 不為空白,則取自該網址,否則取自 URL_Path
User_Name principal.user.userid 從 userName 擷取 grok 後的值
User_Name principal.administrative_domain 從 grok 擷取後取得 domainName 的值
Was_Removable security_result.detection_fields 已與 was_removable_label 合併 (鍵:Was_Removable,值:%{Was_Removable})
Was_Source_Removable security_result.detection_fields 已與 was_source_removable_label 合併 (鍵:Was_Source_Removable,值:%{Was_Source_Removable})
computerName、destination_ip、protocol、source_ip、IP_Address、destination、userName、Process_PID、Category、Computer_Name metadata.event_type 一開始設為 GENERIC_EVENT;如果通訊協定為 HTTPS 且 (destination_ip 或 computerName),則設為 NETWORK_HTTP;如果 (source_ip 或 IP_Address) 且 destination_ip,則設為 NETWORK_CONNECTION;如果 userName 不為空白,則設為 USER_UNCATEGORIZED;如果 Process_PID 不為空白,則設為 SCAN_PROCESS
destination_ip target.ip 已從 destination_ip 合併
incidents_url、matched_policies_by_severity security_result 已與 _sr 合併 (規則名稱:%{matched_policies_by_severity},產品返回網址:%{incidents_url})
通訊協定 network.application_protocol 如果通訊協定為 HTTP 或 HTTPS,則設為 HTTPS
security_action security_result.action 從 security_action 合併
metadata.product_name 設為「Enterprise DLP Platform」
metadata.vendor_name 設為「DigitalGuardian」

還有其他問題嗎?向社群成員和 Google SecOps 專業人員尋求答案。