收集 Druva Backup 記錄

支援的國家/地區:

本文說明如何設定 Google Cloud Run 函式,從 Druva REST API 擷取事件並寫入 Google Cloud Storage bucket,然後使用 Google Cloud Storage V2 設定 Google Security Operations 資訊提供,藉此收集 Druva Backup 記錄。

Druva 是雲端原生資料保護與管理平台,可為端點、SaaS 應用程式和企業工作負載提供備份、災難復原和封存服務。這個平台會產生完整的稽核追蹤記錄、備份事件、還原活動和安全警示,並與 SIEM 解決方案整合,方便您監控及確保法規遵循。

事前準備

請確認您已完成下列事前準備事項:

  • Google SecOps 執行個體
  • 已啟用計費功能的 Google Cloud 專案
  • 已啟用下列 Google Cloud API:
    • Cloud Run 函式 API
    • Cloud Scheduler API
    • Cloud Storage API
    • Pub/Sub API
    • IAM API
  • Druva Cloud 管理員可存取 Druva Cloud Platform Console
  • 存取 Druva Integration Center,建立 API 憑證

建立 Google Cloud Storage 值區

  1. 前往 Google Cloud 控制台
  2. 選取專案或建立新專案。
  3. 在導覽選單中,依序前往「Cloud Storage」>「Bucket」
  4. 按一下「建立值區」
  5. 請提供下列設定詳細資料:

    設定
    為 bucket 命名 輸入全域不重複的名稱 (例如 druva-backup-logs)
    位置類型 根據需求選擇 (區域、雙區域、多區域)
    位置 選取最接近 Google SecOps 執行個體的位置 (例如 us-central1)
    儲存空間級別 標準 (建議用於經常存取的記錄)
    存取控管 統一 (建議)
    保護工具 選用:啟用物件版本管理或保留政策
  6. 點選「建立」

收集 Druva API 憑證

如要讓 Cloud Run 函式從 Druva 擷取事件,您需要使用 OAuth 2.0 驗證建立 API 憑證。

建立 API 憑證

  1. 登入 Druva Cloud Platform Console
  2. 在「全域導覽」選單中,選取「整合中心」
  3. 按一下左側面板中的「API 憑證」
  4. 按一下「新憑證」
  5. 在「New Credentials」(新增憑證) 視窗中,提供下列詳細資料: 「Name」(名稱):輸入描述性名稱 (例如 Google SecOps Cloud Storage Integration)。
  6. 如要套用授權限制:
    1. 選取「Druva Cloud Administrator」,允許完整存取資料擷取和修改權限。
    2. 或者,選取「產品管理員」,然後選擇:Cloud 管理員 (唯讀) 角色:限制只能擷取資料,沒有修改權限 (建議用於 SIEM 整合)
  7. 按一下 [儲存]

記錄 API 憑證

建立 API 憑證後,系統會顯示「憑證詳細資料」視窗:

  1. 按一下「用戶端 ID」旁的複製圖示,將值複製到剪貼簿。
  2. 以安全方式儲存用戶端 ID (例如 McNkxxxx4Vicxxxx4Ldpxxxx/09Uxxxx)。
  3. 按一下「Secret Key」旁的複製圖示,將值複製到剪貼簿。
  4. 安全儲存密鑰 (例如 Xmcxxxx8j5xxxx6NxxxxRbRxxxxNNyPt)。

建立服務帳戶

為 Cloud Run 函式建立專屬服務帳戶,以便存取 Google Cloud Storage。

  1. Google Cloud 控制台中,依序前往「IAM 與管理」>「服務帳戶」
  2. 按一下「Create Service Account」(建立服務帳戶)
  3. 請提供下列設定詳細資料:
    • 服務帳戶名稱:輸入 druva-backup-function (或說明性名稱)
    • 服務帳戶說明:輸入 Service account for Druva Backup Cloud Run function
  4. 按一下「建立並繼續」
  5. 在「將專案存取權授予這個服務帳戶」部分,新增下列角色:
    1. 按一下「選取角色」,然後選取「Storage 物件管理員」
    2. 按一下「新增其他角色」,然後選取「Cloud Run Invoker」
  6. 按一下「繼續」
  7. 按一下 [完成]
  8. 記錄服務帳戶電子郵件地址 (例如 druva-backup-function@PROJECT_ID.iam.gserviceaccount.com)。

建立 Pub/Sub 主題

建立 Pub/Sub 主題,Cloud Scheduler 會使用這個主題觸發 Cloud Run 函式。

  1. Google Cloud 控制台中,依序前往「Pub/Sub」>「Topics」(主題)
  2. 按一下 [Create Topic] (建立主題)
  3. 請提供下列設定詳細資料:
    • 主題 ID:輸入 druva-backup-trigger
  4. 取消勾選「Add a default subscription」(新增預設訂閱)
  5. 點選「建立」

建立 Cloud Run 函式

準備函式程式碼

建立 Cloud Run 函式,使用 OAuth 2.0 用戶端憑證向 Druva API 進行驗證,透過事件端點擷取事件 (含分頁),並將結果以 NDJSON 格式寫入 GCS bucket。

部署 Cloud Run 函式

  1. 前往 Google Cloud 控制台中的「Cloud Run functions」(Cloud Run 函式)
  2. 按一下「Create Function」(建立函式)
  3. 請提供下列設定詳細資料:

    • 環境:選取「第 2 代」
    • 函式名稱:輸入 druva-backup-to-gcs
    • 區域:選取最靠近 GCS bucket 的區域 (例如 us-central1)
    • 觸發條件類型:選取「Cloud Pub/Sub」
    • Cloud Pub/Sub 主題:選取 druva-backup-trigger
    • 服務帳戶:選取 druva-backup-function@PROJECT_ID.iam.gserviceaccount.com
    • 已分配的記憶體512 MiB
    • 逾時540
    • 執行個體數量上限1
  4. 點選「下一步」

  5. 選取「Python 3.11」做為「執行階段」

  6. 將「Entry point」(進入點) 設為 main

  7. 在「Source code」(原始碼) 編輯器中,將 main.py 的內容替換為下列內容:

    import base64
    import json
    import os
    import time
    from datetime import datetime, timezone, timedelta
    
    import requests
    from google.cloud import storage
    
    GCS_BUCKET = os.environ["GCS_BUCKET"]
    GCS_PREFIX = os.environ.get("GCS_PREFIX", "druva_backup")
    STATE_KEY = os.environ.get("STATE_KEY", "druva_state.json")
    DRUVA_BASE_URL = os.environ.get("DRUVA_BASE_URL", "apis.druva.com")
    CLIENT_ID = os.environ["CLIENT_ID"]
    CLIENT_SECRET = os.environ["CLIENT_SECRET"]
    MAX_RECORDS = int(os.environ.get("MAX_RECORDS", "10000"))
    PAGE_SIZE = int(os.environ.get("PAGE_SIZE", "500"))
    LOOKBACK_HOURS = int(os.environ.get("LOOKBACK_HOURS", "24"))
    
    def get_oauth_token():
        """Obtain OAuth 2.0 access token using client credentials grant."""
        token_url = f"https://{DRUVA_BASE_URL}/token"
        payload = {
            "grant_type": "client_credentials",
            "scope": "read",
        }
        resp = requests.post(
            token_url,
            data=payload,
            auth=(CLIENT_ID, CLIENT_SECRET),
            timeout=30,
        )
        resp.raise_for_status()
        return resp.json()["access_token"]
    
    def load_state(storage_client):
        """Load the persisted state (last event time and tracker) from GCS."""
        bucket = storage_client.bucket(GCS_BUCKET)
        blob = bucket.blob(f"{GCS_PREFIX}/{STATE_KEY}")
        if blob.exists():
            return json.loads(blob.download_as_text())
        return {}
    
    def save_state(storage_client, state):
        """Persist state to GCS."""
        bucket = storage_client.bucket(GCS_BUCKET)
        blob = bucket.blob(f"{GCS_PREFIX}/{STATE_KEY}")
        blob.upload_from_string(
            json.dumps(state),
            content_type="application/json",
        )
    
    def fetch_events(token, state):
        """Fetch events from Druva API with pagination via nextPageToken."""
        events_url = f"https://{DRUVA_BASE_URL}/insync/eventmanagement/v2/events"
        headers = {
            "Authorization": f"Bearer {token}",
            "Accept": "application/json",
        }
    
        params = {"pageSize": PAGE_SIZE}
    
        tracker = state.get("tracker")
        last_event_time = state.get("last_event_time")
    
        if tracker:
            params["tracker"] = tracker
        elif last_event_time:
            params["fromTime"] = last_event_time
        else:
            lookback = datetime.now(timezone.utc) - timedelta(hours=LOOKBACK_HOURS)
            params["fromTime"] = lookback.strftime("%Y-%m-%dT%H:%M:%SZ")
    
        all_events = []
        total_fetched = 0
    
        while total_fetched < MAX_RECORDS:
            resp = requests.get(
                events_url,
                headers=headers,
                params=params,
                timeout=60,
            )
            resp.raise_for_status()
            data = resp.json()
    
            events = data.get("events", [])
            all_events.extend(events)
            total_fetched += len(events)
    
            new_tracker = data.get("tracker")
            next_page_token = data.get("nextPageToken")
    
            if new_tracker:
                state["tracker"] = new_tracker
    
            if next_page_token:
                params["nextPageToken"] = next_page_token
                params.pop("tracker", None)
                params.pop("fromTime", None)
            else:
                break
    
        if all_events:
            last_ts = all_events[-1].get("eventTime", "")
            if last_ts:
                state["last_event_time"] = last_ts
    
        return all_events, state
    
    def write_events_to_gcs(storage_client, events):
        """Write events as NDJSON to GCS."""
        if not events:
            return
    
        now = datetime.now(timezone.utc)
        filename = now.strftime("%Y%m%d_%H%M%S") + ".ndjson"
        blob_path = f"{GCS_PREFIX}/{now.strftime('%Y/%m/%d')}/{filename}"
    
        ndjson_lines = "\n".join(json.dumps(event) for event in events)
    
        bucket = storage_client.bucket(GCS_BUCKET)
        blob = bucket.blob(blob_path)
        blob.upload_from_string(
            ndjson_lines,
            content_type="application/x-ndjson",
        )
        print(f"Wrote {len(events)} events to gs://{GCS_BUCKET}/{blob_path}")
    
    def main(event, context):
        """Cloud Run function entry point triggered by Pub/Sub."""
        storage_client = storage.Client()
    
        token = get_oauth_token()
    
        state = load_state(storage_client)
    
        events, updated_state = fetch_events(token, state)
    
        write_events_to_gcs(storage_client, events)
    
        save_state(storage_client, updated_state)
    
        print(f"Completed: fetched {len(events)} events")
        return f"OK: {len(events)} events"
    
  8. requirements.txt 改成以下內容:

    requests>=2.31.0
    google-cloud-storage>=2.14.0
    

設定環境變數

  1. 在 Cloud Run 函式設定中,前往「執行階段、建構作業、連線和安全性設定」專區。
  2. 在「Runtime environment variables」(執行階段環境變數) 下方,新增下列變數:

    • GCS_BUCKET:GCS bucket 的名稱 (例如 druva-backup-logs)
    • GCS_PREFIX:記錄檔的前置路徑 (例如 druva_backup)
    • STATE_KEY:狀態檔案名稱 (例如 druva_state.json)
    • DRUVA_BASE_URL:Druva API 基本網址:
      • apis.druva.com for Druva Cloud (Standard)
      • govcloudapis.druva.com for Druva GovCloud
    • CLIENT_ID:Druva API 憑證中的用戶端 ID
    • CLIENT_SECRET:Druva API 憑證中的密鑰
    • MAX_RECORDS:每次呼叫要擷取的記錄數量上限 (例如 10000)
    • PAGE_SIZE:每個 API 頁面的事件數量 (最多 500)
    • LOOKBACK_HOURS:回溯首次執行時間的小時數 (例如 24)
  3. 點選「Deploy」(部署)

  4. 等待部署作業順利完成。

建立 Cloud Scheduler 工作

建立 Cloud Scheduler 工作,定期觸發 Cloud Run 函式。

  1. 前往 Google Cloud 控制台的「Cloud Scheduler」
  2. 點選「建立工作」
  3. 請提供下列設定詳細資料:

    • 「Name」(名稱):輸入 druva-backup-scheduler
    • 區域:選取與 Cloud Run 函式相同的區域 (例如 us-central1)。
    • 說明:輸入 Triggers Druva Backup log collection every 30 minutes
    • 頻率:輸入 */30 * * * * (每 30 分鐘)
    • 時區:選取偏好的時區 (例如 UTC)
  4. 按一下「繼續」

  5. 設定「目標」

    • 目標類型:選取「Pub/Sub」
    • Cloud Pub/Sub 主題:選取 druva-backup-trigger
    • 訊息內文:輸入 {"trigger": "scheduled"}
  6. 點選「建立」

測試 Cloud Scheduler 工作

  1. 在「Cloud Scheduler」清單中,找出 druva-backup-scheduler
  2. 按一下「強制執行」,即可立即觸發函式。
  3. 請檢查下列項目,確認執行作業:
    • Cloud Run 函式記錄位於「Cloud Run functions」>「druva-backup-to-gcs」>「Logs」
    • Cloud Storage> druva-backup-logs 中的新 NDJSON 檔案 GCS bucket

擷取 Google SecOps 服務帳戶並設定動態饋給

取得服務帳戶電子郵件地址

  1. 依序前往「SIEM 設定」>「動態饋給」
  2. 按一下「新增動態消息」
  3. 按一下「設定單一動態饋給」
  4. 在「動態饋給名稱」欄位中輸入動態饋給名稱 (例如 Druva Backup Events)。
  5. 選取「Google Cloud Storage V2」做為「來源類型」
  6. 選取「Druva Backup」做為「記錄類型」
  7. 按一下「取得服務帳戶」。系統會顯示不重複的服務帳戶電子郵件地址,例如:

    chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com
    
  8. 複製這個電子郵件地址,以便在下一步中使用。

設定動態饋給

  1. 點選「下一步」
  2. 指定下列輸入參數的值:

    • 儲存空間 bucket URL:輸入 GCS bucket URI,並加上前置路徑:

      gs://druva-backup-logs/druva_backup/
      
    • 來源刪除選項:根據偏好設定選取刪除選項:

      • 永不:移轉後一律不刪除任何檔案 (建議用於測試)
      • 刪除轉移的檔案:成功轉移檔案後刪除檔案
      • 刪除已轉移的檔案和空白目錄:成功轉移後刪除檔案和空白目錄
    • 檔案存在時間上限:包含在過去天數內修改的檔案 (預設為 180 天)

    • 資產命名空間資產命名空間

    • 擷取標籤:要套用至這個動態饋給事件的標籤

  3. 點選「下一步」

  4. 在「Finalize」(完成) 畫面中檢查新的動態饋給設定,然後按一下「Submit」(提交)

將 IAM 權限授予 Google SecOps 服務帳戶

Google SecOps 服務帳戶必須在 GCS bucket 中具備「Storage 物件檢視者」角色,才能讀取 Cloud Run 函式寫入的記錄檔。

  1. 依序前往「Cloud Storage」>「Buckets」
  2. 按一下 bucket 名稱 (例如 druva-backup-logs)。
  3. 前往「權限」分頁標籤。
  4. 按一下「授予存取權」
  5. 請提供下列設定詳細資料:
    • 新增主體:貼上 Google SecOps 服務帳戶電子郵件地址 (例如 chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com)
    • 指派角色:選取「Storage 物件檢視者」
  6. 按一下 [儲存]

UDM 對應表

記錄欄位 UDM 對應 邏輯
inSyncUserID、eventsGroupId、FilesMissed、FilesBackedup、TotalBackupSize、TotalBytesTransferred、facility、inSyncDataSourceID、initiator、event_type additional.fields 如果不是空白,則會與從每個欄位建立的標籤合併
發起者 extensions.auth.type 如果發起者符合電子郵件規則運算式,請設為「AUTHTYPE_UNSPECIFIED」
metadata.event_type 如果 has_target_user 為 true 且 has_principal 為 true,則設為「USER_LOGIN」;如果 has_principal 為 true 且 has_target 為 false,則設為「STATUS_UPDATE」;否則設為「GENERIC_EVENT」
eventID metadata.product_log_id 已轉換為字串
metadata.product_name 設為「DRUVA_BACKUP」
clientVersion metadata.product_version 直接複製值
inSyncDataSourceName principal.asset.hostname 直接複製值
ip principal.asset.ip 從 IP 合併
inSyncDataSourceName principal.hostname 直接複製值
ip principal.ip 從 IP 合併
clientOS principal.platform 如果符合 (?i)Linux,則設為「LINUX」;如果符合 (?i)windows,則設為「WINDOWS」;如果符合 (?i)mac,則設為「MAC」
profileName principal.resource.name 直接複製值
profileID principal.resource.product_object_id 已轉換為字串
eventState security_result.action 如果符合 (?i)Success,則設為「ALLOW」,否則設為「BLOCK」
eventState security_result.action_details 直接複製值
嚴重性 security_result.severity 如果值為 [0,1,2,3,LOW],請設為「LOW」;如果值為 [4,5,6,MEDIUM,SUBSTANTIAL,INFO],請設為「MEDIUM」;如果值為 [7,8,HIGH,SEVERE],請設為「HIGH」;如果值為 [9,10,VERY-HIGH,CRITICAL],請設為「CRITICAL」
inSyncUserEmail、發起者 target.user.email_addresses 從 inSyncUserEmail 合併;如果符合電子郵件規則運算式,也會從發起者合併
inSyncUserName target.user.userid 直接複製值
metadata.vendor_name 設為「DRUVA_BACKUP」

還有其他問題嗎?向社群成員和 Google SecOps 專業人員尋求答案。