收集 Druva Backup 記錄
本文說明如何設定 Google Cloud Run 函式,從 Druva REST API 擷取事件並寫入 Google Cloud Storage bucket,然後使用 Google Cloud Storage V2 設定 Google Security Operations 資訊提供,藉此收集 Druva Backup 記錄。
Druva 是雲端原生資料保護與管理平台,可為端點、SaaS 應用程式和企業工作負載提供備份、災難復原和封存服務。這個平台會產生完整的稽核追蹤記錄、備份事件、還原活動和安全警示,並與 SIEM 解決方案整合,方便您監控及確保法規遵循。
事前準備
請確認您已完成下列事前準備事項:
- Google SecOps 執行個體
- 已啟用計費功能的 Google Cloud 專案
- 已啟用下列 Google Cloud API:
- Cloud Run 函式 API
- Cloud Scheduler API
- Cloud Storage API
- Pub/Sub API
- IAM API
- Druva Cloud 管理員可存取 Druva Cloud Platform Console
- 存取 Druva Integration Center,建立 API 憑證
建立 Google Cloud Storage 值區
- 前往 Google Cloud 控制台。
- 選取專案或建立新專案。
- 在導覽選單中,依序前往「Cloud Storage」>「Bucket」。
- 按一下「建立值區」。
請提供下列設定詳細資料:
設定 值 為 bucket 命名 輸入全域不重複的名稱 (例如 druva-backup-logs)位置類型 根據需求選擇 (區域、雙區域、多區域) 位置 選取最接近 Google SecOps 執行個體的位置 (例如 us-central1)儲存空間級別 標準 (建議用於經常存取的記錄) 存取控管 統一 (建議) 保護工具 選用:啟用物件版本管理或保留政策 點選「建立」。
收集 Druva API 憑證
如要讓 Cloud Run 函式從 Druva 擷取事件,您需要使用 OAuth 2.0 驗證建立 API 憑證。
建立 API 憑證
- 登入 Druva Cloud Platform Console。
- 在「全域導覽」選單中,選取「整合中心」。
- 按一下左側面板中的「API 憑證」。
- 按一下「新憑證」。
- 在「New Credentials」(新增憑證) 視窗中,提供下列詳細資料:
「Name」(名稱):輸入描述性名稱 (例如
Google SecOps Cloud Storage Integration)。 - 如要套用授權限制:
- 選取「Druva Cloud Administrator」,允許完整存取資料擷取和修改權限。
- 或者,選取「產品管理員」,然後選擇:Cloud 管理員 (唯讀) 角色:限制只能擷取資料,沒有修改權限 (建議用於 SIEM 整合)
- 按一下 [儲存]。
記錄 API 憑證
建立 API 憑證後,系統會顯示「憑證詳細資料」視窗:
- 按一下「用戶端 ID」旁的複製圖示,將值複製到剪貼簿。
- 以安全方式儲存用戶端 ID (例如
McNkxxxx4Vicxxxx4Ldpxxxx/09Uxxxx)。 - 按一下「Secret Key」旁的複製圖示,將值複製到剪貼簿。
安全儲存密鑰 (例如
Xmcxxxx8j5xxxx6NxxxxRbRxxxxNNyPt)。
建立服務帳戶
為 Cloud Run 函式建立專屬服務帳戶,以便存取 Google Cloud Storage。
- 在 Google Cloud 控制台中,依序前往「IAM 與管理」>「服務帳戶」。
- 按一下「Create Service Account」(建立服務帳戶)。
- 請提供下列設定詳細資料:
- 服務帳戶名稱:輸入
druva-backup-function(或說明性名稱) - 服務帳戶說明:輸入
Service account for Druva Backup Cloud Run function
- 服務帳戶名稱:輸入
- 按一下「建立並繼續」。
- 在「將專案存取權授予這個服務帳戶」部分,新增下列角色:
- 按一下「選取角色」,然後選取「Storage 物件管理員」。
- 按一下「新增其他角色」,然後選取「Cloud Run Invoker」。
- 按一下「繼續」。
- 按一下 [完成]。
記錄服務帳戶電子郵件地址 (例如
druva-backup-function@PROJECT_ID.iam.gserviceaccount.com)。
建立 Pub/Sub 主題
建立 Pub/Sub 主題,Cloud Scheduler 會使用這個主題觸發 Cloud Run 函式。
- 在 Google Cloud 控制台中,依序前往「Pub/Sub」>「Topics」(主題)。
- 按一下 [Create Topic] (建立主題)。
- 請提供下列設定詳細資料:
- 主題 ID:輸入
druva-backup-trigger
- 主題 ID:輸入
- 取消勾選「Add a default subscription」(新增預設訂閱)。
點選「建立」。
建立 Cloud Run 函式
準備函式程式碼
建立 Cloud Run 函式,使用 OAuth 2.0 用戶端憑證向 Druva API 進行驗證,透過事件端點擷取事件 (含分頁),並將結果以 NDJSON 格式寫入 GCS bucket。
部署 Cloud Run 函式
- 前往 Google Cloud 控制台中的「Cloud Run functions」(Cloud Run 函式)。
- 按一下「Create Function」(建立函式)。
請提供下列設定詳細資料:
- 環境:選取「第 2 代」
- 函式名稱:輸入
druva-backup-to-gcs - 區域:選取最靠近 GCS bucket 的區域 (例如
us-central1) - 觸發條件類型:選取「Cloud Pub/Sub」
- Cloud Pub/Sub 主題:選取
druva-backup-trigger - 服務帳戶:選取
druva-backup-function@PROJECT_ID.iam.gserviceaccount.com - 已分配的記憶體:
512 MiB - 逾時:
540秒 - 執行個體數量上限:
1
點選「下一步」。
選取「Python 3.11」做為「執行階段」。
將「Entry point」(進入點) 設為
main。在「Source code」(原始碼) 編輯器中,將
main.py的內容替換為下列內容:import base64 import json import os import time from datetime import datetime, timezone, timedelta import requests from google.cloud import storage GCS_BUCKET = os.environ["GCS_BUCKET"] GCS_PREFIX = os.environ.get("GCS_PREFIX", "druva_backup") STATE_KEY = os.environ.get("STATE_KEY", "druva_state.json") DRUVA_BASE_URL = os.environ.get("DRUVA_BASE_URL", "apis.druva.com") CLIENT_ID = os.environ["CLIENT_ID"] CLIENT_SECRET = os.environ["CLIENT_SECRET"] MAX_RECORDS = int(os.environ.get("MAX_RECORDS", "10000")) PAGE_SIZE = int(os.environ.get("PAGE_SIZE", "500")) LOOKBACK_HOURS = int(os.environ.get("LOOKBACK_HOURS", "24")) def get_oauth_token(): """Obtain OAuth 2.0 access token using client credentials grant.""" token_url = f"https://{DRUVA_BASE_URL}/token" payload = { "grant_type": "client_credentials", "scope": "read", } resp = requests.post( token_url, data=payload, auth=(CLIENT_ID, CLIENT_SECRET), timeout=30, ) resp.raise_for_status() return resp.json()["access_token"] def load_state(storage_client): """Load the persisted state (last event time and tracker) from GCS.""" bucket = storage_client.bucket(GCS_BUCKET) blob = bucket.blob(f"{GCS_PREFIX}/{STATE_KEY}") if blob.exists(): return json.loads(blob.download_as_text()) return {} def save_state(storage_client, state): """Persist state to GCS.""" bucket = storage_client.bucket(GCS_BUCKET) blob = bucket.blob(f"{GCS_PREFIX}/{STATE_KEY}") blob.upload_from_string( json.dumps(state), content_type="application/json", ) def fetch_events(token, state): """Fetch events from Druva API with pagination via nextPageToken.""" events_url = f"https://{DRUVA_BASE_URL}/insync/eventmanagement/v2/events" headers = { "Authorization": f"Bearer {token}", "Accept": "application/json", } params = {"pageSize": PAGE_SIZE} tracker = state.get("tracker") last_event_time = state.get("last_event_time") if tracker: params["tracker"] = tracker elif last_event_time: params["fromTime"] = last_event_time else: lookback = datetime.now(timezone.utc) - timedelta(hours=LOOKBACK_HOURS) params["fromTime"] = lookback.strftime("%Y-%m-%dT%H:%M:%SZ") all_events = [] total_fetched = 0 while total_fetched < MAX_RECORDS: resp = requests.get( events_url, headers=headers, params=params, timeout=60, ) resp.raise_for_status() data = resp.json() events = data.get("events", []) all_events.extend(events) total_fetched += len(events) new_tracker = data.get("tracker") next_page_token = data.get("nextPageToken") if new_tracker: state["tracker"] = new_tracker if next_page_token: params["nextPageToken"] = next_page_token params.pop("tracker", None) params.pop("fromTime", None) else: break if all_events: last_ts = all_events[-1].get("eventTime", "") if last_ts: state["last_event_time"] = last_ts return all_events, state def write_events_to_gcs(storage_client, events): """Write events as NDJSON to GCS.""" if not events: return now = datetime.now(timezone.utc) filename = now.strftime("%Y%m%d_%H%M%S") + ".ndjson" blob_path = f"{GCS_PREFIX}/{now.strftime('%Y/%m/%d')}/{filename}" ndjson_lines = "\n".join(json.dumps(event) for event in events) bucket = storage_client.bucket(GCS_BUCKET) blob = bucket.blob(blob_path) blob.upload_from_string( ndjson_lines, content_type="application/x-ndjson", ) print(f"Wrote {len(events)} events to gs://{GCS_BUCKET}/{blob_path}") def main(event, context): """Cloud Run function entry point triggered by Pub/Sub.""" storage_client = storage.Client() token = get_oauth_token() state = load_state(storage_client) events, updated_state = fetch_events(token, state) write_events_to_gcs(storage_client, events) save_state(storage_client, updated_state) print(f"Completed: fetched {len(events)} events") return f"OK: {len(events)} events"將
requirements.txt改成以下內容:requests>=2.31.0 google-cloud-storage>=2.14.0
設定環境變數
- 在 Cloud Run 函式設定中,前往「執行階段、建構作業、連線和安全性設定」專區。
在「Runtime environment variables」(執行階段環境變數) 下方,新增下列變數:
GCS_BUCKET:GCS bucket 的名稱 (例如druva-backup-logs)GCS_PREFIX:記錄檔的前置路徑 (例如druva_backup)STATE_KEY:狀態檔案名稱 (例如druva_state.json)DRUVA_BASE_URL:Druva API 基本網址:apis.druva.comfor Druva Cloud (Standard)govcloudapis.druva.comfor Druva GovCloud
CLIENT_ID:Druva API 憑證中的用戶端 IDCLIENT_SECRET:Druva API 憑證中的密鑰MAX_RECORDS:每次呼叫要擷取的記錄數量上限 (例如10000)PAGE_SIZE:每個 API 頁面的事件數量 (最多500)LOOKBACK_HOURS:回溯首次執行時間的小時數 (例如24)
點選「Deploy」(部署)。
等待部署作業順利完成。
建立 Cloud Scheduler 工作
建立 Cloud Scheduler 工作,定期觸發 Cloud Run 函式。
- 前往 Google Cloud 控制台的「Cloud Scheduler」。
- 點選「建立工作」。
請提供下列設定詳細資料:
- 「Name」(名稱):輸入
druva-backup-scheduler - 區域:選取與 Cloud Run 函式相同的區域 (例如
us-central1)。 - 說明:輸入
Triggers Druva Backup log collection every 30 minutes - 頻率:輸入
*/30 * * * *(每 30 分鐘) - 時區:選取偏好的時區 (例如
UTC)
- 「Name」(名稱):輸入
按一下「繼續」。
設定「目標」:
- 目標類型:選取「Pub/Sub」
- Cloud Pub/Sub 主題:選取
druva-backup-trigger - 訊息內文:輸入
{"trigger": "scheduled"}
點選「建立」。
測試 Cloud Scheduler 工作
- 在「Cloud Scheduler」清單中,找出
druva-backup-scheduler。 - 按一下「強制執行」,即可立即觸發函式。
- 請檢查下列項目,確認執行作業:
- Cloud Run 函式記錄位於「Cloud Run functions」>「druva-backup-to-gcs」>「Logs」
- Cloud Storage> druva-backup-logs 中的新 NDJSON 檔案 GCS bucket
擷取 Google SecOps 服務帳戶並設定動態饋給
取得服務帳戶電子郵件地址
- 依序前往「SIEM 設定」>「動態饋給」。
- 按一下「新增動態消息」。
- 按一下「設定單一動態饋給」。
- 在「動態饋給名稱」欄位中輸入動態饋給名稱 (例如
Druva Backup Events)。 - 選取「Google Cloud Storage V2」做為「來源類型」。
- 選取「Druva Backup」做為「記錄類型」。
按一下「取得服務帳戶」。系統會顯示不重複的服務帳戶電子郵件地址,例如:
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com複製這個電子郵件地址,以便在下一步中使用。
設定動態饋給
- 點選「下一步」。
指定下列輸入參數的值:
儲存空間 bucket URL:輸入 GCS bucket URI,並加上前置路徑:
gs://druva-backup-logs/druva_backup/來源刪除選項:根據偏好設定選取刪除選項:
- 永不:移轉後一律不刪除任何檔案 (建議用於測試)
- 刪除轉移的檔案:成功轉移檔案後刪除檔案
- 刪除已轉移的檔案和空白目錄:成功轉移後刪除檔案和空白目錄
檔案存在時間上限:包含在過去天數內修改的檔案 (預設為 180 天)
資產命名空間:資產命名空間
擷取標籤:要套用至這個動態饋給事件的標籤
點選「下一步」。
在「Finalize」(完成) 畫面中檢查新的動態饋給設定,然後按一下「Submit」(提交)。
將 IAM 權限授予 Google SecOps 服務帳戶
Google SecOps 服務帳戶必須在 GCS bucket 中具備「Storage 物件檢視者」角色,才能讀取 Cloud Run 函式寫入的記錄檔。
- 依序前往「Cloud Storage」>「Buckets」。
- 按一下 bucket 名稱 (例如
druva-backup-logs)。 - 前往「權限」分頁標籤。
- 按一下「授予存取權」。
- 請提供下列設定詳細資料:
- 新增主體:貼上 Google SecOps 服務帳戶電子郵件地址 (例如
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com) - 指派角色:選取「Storage 物件檢視者」
- 新增主體:貼上 Google SecOps 服務帳戶電子郵件地址 (例如
按一下 [儲存]。
UDM 對應表
| 記錄欄位 | UDM 對應 | 邏輯 |
|---|---|---|
| inSyncUserID、eventsGroupId、FilesMissed、FilesBackedup、TotalBackupSize、TotalBytesTransferred、facility、inSyncDataSourceID、initiator、event_type | additional.fields | 如果不是空白,則會與從每個欄位建立的標籤合併 |
| 發起者 | extensions.auth.type | 如果發起者符合電子郵件規則運算式,請設為「AUTHTYPE_UNSPECIFIED」 |
| metadata.event_type | 如果 has_target_user 為 true 且 has_principal 為 true,則設為「USER_LOGIN」;如果 has_principal 為 true 且 has_target 為 false,則設為「STATUS_UPDATE」;否則設為「GENERIC_EVENT」 | |
| eventID | metadata.product_log_id | 已轉換為字串 |
| metadata.product_name | 設為「DRUVA_BACKUP」 | |
| clientVersion | metadata.product_version | 直接複製值 |
| inSyncDataSourceName | principal.asset.hostname | 直接複製值 |
| ip | principal.asset.ip | 從 IP 合併 |
| inSyncDataSourceName | principal.hostname | 直接複製值 |
| ip | principal.ip | 從 IP 合併 |
| clientOS | principal.platform | 如果符合 (?i)Linux,則設為「LINUX」;如果符合 (?i)windows,則設為「WINDOWS」;如果符合 (?i)mac,則設為「MAC」 |
| profileName | principal.resource.name | 直接複製值 |
| profileID | principal.resource.product_object_id | 已轉換為字串 |
| eventState | security_result.action | 如果符合 (?i)Success,則設為「ALLOW」,否則設為「BLOCK」 |
| eventState | security_result.action_details | 直接複製值 |
| 嚴重性 | security_result.severity | 如果值為 [0,1,2,3,LOW],請設為「LOW」;如果值為 [4,5,6,MEDIUM,SUBSTANTIAL,INFO],請設為「MEDIUM」;如果值為 [7,8,HIGH,SEVERE],請設為「HIGH」;如果值為 [9,10,VERY-HIGH,CRITICAL],請設為「CRITICAL」 |
| inSyncUserEmail、發起者 | target.user.email_addresses | 從 inSyncUserEmail 合併;如果符合電子郵件規則運算式,也會從發起者合併 |
| inSyncUserName | target.user.userid | 直接複製值 |
| metadata.vendor_name | 設為「DRUVA_BACKUP」 |
還有其他問題嗎?向社群成員和 Google SecOps 專業人員尋求答案。