收集 Druva Backup 日志
本文档介绍了如何通过设置 Google Cloud Run 函数来收集 Druva Backup 日志,该函数可从 Druva REST API 中检索事件并将其写入 Google Cloud Storage 存储桶,然后使用 Google Cloud Storage V2 配置 Google Security Operations Feed。
Druva 是一个云原生数据保护和管理平台,可为端点、SaaS 应用和企业工作负载提供备份、灾难恢复和归档服务。该平台会生成全面的审核轨迹、备份事件、恢复活动和安全提醒,这些信息可以与 SIEM 解决方案集成,以实现监控和合规性。
准备工作
请确保满足以下前提条件:
- Google SecOps 实例
- 启用了结算功能的 Google Cloud 项目
- 已启用以下 Google Cloud API:
- Cloud Run Functions API
- Cloud Scheduler API
- Cloud Storage API
- Pub/Sub API
- IAM API
- Druva Cloud 管理员对 Druva Cloud Platform 控制台的访问权限
- 访问 Druva 集成中心以创建 API 凭据
创建 Google Cloud Storage 存储分区
- 前往 Google Cloud 控制台。
- 选择您的项目或创建新项目。
- 在导航菜单中,依次前往 Cloud Storage > 存储分区。
- 点击创建存储分区。
提供以下配置详细信息:
设置 值 为存储桶命名 输入一个全局唯一的名称(例如 druva-backup-logs)位置类型 根据您的需求进行选择(区域级、双区域级、多区域级) 位置 选择离您的 Google SecOps 实例最近的位置(例如 us-central1)存储类别 标准(建议用于经常访问的日志) 访问权限控制 统一(推荐) 保护工具 可选:启用对象版本控制或保留政策 点击创建。
收集 Druva API 凭据
如需让 Cloud Run 函数能够从 Druva 检索事件,您需要创建具有 OAuth 2.0 身份验证的 API 凭据。
创建 API 凭据
- 登录 Druva Cloud Platform Console。
- 在全局导航菜单中,选择集成中心。
- 在左侧面板中,点击 API 凭据。
- 点击新凭据。
- 在新凭据窗口中,提供以下详细信息:
名称:输入一个描述性名称(例如
Google SecOps Cloud Storage Integration)。 - 如需应用授权限制,请执行以下操作:
- 选择 Druva Cloud Administrator 以允许完全访问数据检索和修改权限。
- 或者,选择产品管理员,然后选择:Cloud Admin(只读)角色:限制对数据检索的访问权限,但不授予修改权限(建议用于 SIEM 集成)
- 点击保存。
记录 API 凭据
创建 API 凭据后,系统会显示凭据详细信息窗口:
- 点击客户端 ID 旁边的复制图标,将该值复制到剪贴板。
- 安全地保存客户端 ID(例如
McNkxxxx4Vicxxxx4Ldpxxxx/09Uxxxx)。 - 点击密钥旁边的复制图标,将相应值复制到剪贴板。
妥善保存 Secret Key(例如
Xmcxxxx8j5xxxx6NxxxxRbRxxxxNNyPt)。
创建服务账号
为 Cloud Run 函数创建专用服务账号,以访问 Google Cloud Storage。
- 在 Google Cloud 控制台中,依次前往 IAM 和管理 > 服务账号。
- 点击创建服务账号。
- 提供以下配置详细信息:
- 服务账号名称:输入
druva-backup-function(或一个描述性名称) - 服务账号说明:输入
Service account for Druva Backup Cloud Run function
- 服务账号名称:输入
- 点击创建并继续。
- 在向此服务账号授予对项目的访问权限部分中,添加以下角色:
- 点击选择角色,然后选择 Storage Object Admin。
- 点击添加其他角色,然后选择 Cloud Run Invoker。
- 点击继续。
- 点击完成。
记录服务账号电子邮件地址(例如
druva-backup-function@PROJECT_ID.iam.gserviceaccount.com)。
创建 Pub/Sub 主题
创建一个 Pub/Sub 主题,供 Cloud Scheduler 用来触发 Cloud Run 函数。
- 在 Google Cloud 控制台中,前往 Pub/Sub > 主题。
- 点击创建主题。
- 提供以下配置详细信息:
- 主题 ID:输入
druva-backup-trigger
- 主题 ID:输入
- 取消选中添加默认订阅。
点击创建。
创建 Cloud Run 函数
准备函数代码
创建一个 Cloud Run 函数,该函数使用 OAuth 2.0 客户端凭据向 Druva API 进行身份验证,通过事件端点检索事件(带分页),并将结果以 NDJSON 格式写入 GCS 存储桶。
部署 Cloud Run 函数
- 在 Google Cloud 控制台中,前往 Cloud Run 函数。
- 点击创建函数。
提供以下配置详细信息:
- 环境:选择第 2 代
- 函数名称:输入
druva-backup-to-gcs - 区域:选择距离您的 GCS 存储桶最近的区域(例如
us-central1) - 触发器类型:选择 Cloud Pub/Sub
- Cloud Pub/Sub 主题:选择
druva-backup-trigger - 服务账号:选择
druva-backup-function@PROJECT_ID.iam.gserviceaccount.com - 分配的内存:
512 MiB - 超时:
540秒 - 实例数上限:
1
点击下一步。
选择 Python 3.11 作为运行时。
将入口点设置为
main。在源代码编辑器中,将
main.py的内容替换为以下内容:import base64 import json import os import time from datetime import datetime, timezone, timedelta import requests from google.cloud import storage GCS_BUCKET = os.environ["GCS_BUCKET"] GCS_PREFIX = os.environ.get("GCS_PREFIX", "druva_backup") STATE_KEY = os.environ.get("STATE_KEY", "druva_state.json") DRUVA_BASE_URL = os.environ.get("DRUVA_BASE_URL", "apis.druva.com") CLIENT_ID = os.environ["CLIENT_ID"] CLIENT_SECRET = os.environ["CLIENT_SECRET"] MAX_RECORDS = int(os.environ.get("MAX_RECORDS", "10000")) PAGE_SIZE = int(os.environ.get("PAGE_SIZE", "500")) LOOKBACK_HOURS = int(os.environ.get("LOOKBACK_HOURS", "24")) def get_oauth_token(): """Obtain OAuth 2.0 access token using client credentials grant.""" token_url = f"https://{DRUVA_BASE_URL}/token" payload = { "grant_type": "client_credentials", "scope": "read", } resp = requests.post( token_url, data=payload, auth=(CLIENT_ID, CLIENT_SECRET), timeout=30, ) resp.raise_for_status() return resp.json()["access_token"] def load_state(storage_client): """Load the persisted state (last event time and tracker) from GCS.""" bucket = storage_client.bucket(GCS_BUCKET) blob = bucket.blob(f"{GCS_PREFIX}/{STATE_KEY}") if blob.exists(): return json.loads(blob.download_as_text()) return {} def save_state(storage_client, state): """Persist state to GCS.""" bucket = storage_client.bucket(GCS_BUCKET) blob = bucket.blob(f"{GCS_PREFIX}/{STATE_KEY}") blob.upload_from_string( json.dumps(state), content_type="application/json", ) def fetch_events(token, state): """Fetch events from Druva API with pagination via nextPageToken.""" events_url = f"https://{DRUVA_BASE_URL}/insync/eventmanagement/v2/events" headers = { "Authorization": f"Bearer {token}", "Accept": "application/json", } params = {"pageSize": PAGE_SIZE} tracker = state.get("tracker") last_event_time = state.get("last_event_time") if tracker: params["tracker"] = tracker elif last_event_time: params["fromTime"] = last_event_time else: lookback = datetime.now(timezone.utc) - timedelta(hours=LOOKBACK_HOURS) params["fromTime"] = lookback.strftime("%Y-%m-%dT%H:%M:%SZ") all_events = [] total_fetched = 0 while total_fetched < MAX_RECORDS: resp = requests.get( events_url, headers=headers, params=params, timeout=60, ) resp.raise_for_status() data = resp.json() events = data.get("events", []) all_events.extend(events) total_fetched += len(events) new_tracker = data.get("tracker") next_page_token = data.get("nextPageToken") if new_tracker: state["tracker"] = new_tracker if next_page_token: params["nextPageToken"] = next_page_token params.pop("tracker", None) params.pop("fromTime", None) else: break if all_events: last_ts = all_events[-1].get("eventTime", "") if last_ts: state["last_event_time"] = last_ts return all_events, state def write_events_to_gcs(storage_client, events): """Write events as NDJSON to GCS.""" if not events: return now = datetime.now(timezone.utc) filename = now.strftime("%Y%m%d_%H%M%S") + ".ndjson" blob_path = f"{GCS_PREFIX}/{now.strftime('%Y/%m/%d')}/{filename}" ndjson_lines = "\n".join(json.dumps(event) for event in events) bucket = storage_client.bucket(GCS_BUCKET) blob = bucket.blob(blob_path) blob.upload_from_string( ndjson_lines, content_type="application/x-ndjson", ) print(f"Wrote {len(events)} events to gs://{GCS_BUCKET}/{blob_path}") def main(event, context): """Cloud Run function entry point triggered by Pub/Sub.""" storage_client = storage.Client() token = get_oauth_token() state = load_state(storage_client) events, updated_state = fetch_events(token, state) write_events_to_gcs(storage_client, events) save_state(storage_client, updated_state) print(f"Completed: fetched {len(events)} events") return f"OK: {len(events)} events"使用以下内容替换
requirements.txt的内容:requests>=2.31.0 google-cloud-storage>=2.14.0
配置环境变量
- 在 Cloud Run 函数配置中,前往运行时、构建、连接和安全设置部分。
在运行时环境变量下,添加以下变量:
GCS_BUCKET:您的 GCS 存储桶的名称(例如druva-backup-logs)GCS_PREFIX:日志文件的前缀路径(例如druva_backup)STATE_KEY:状态文件名(例如druva_state.json)DRUVA_BASE_URL:Druva API 基本网址:apis.druva.com(适用于 Druva Cloud [标准])govcloudapis.druva.com(适用于 Druva GovCloud)
CLIENT_ID:Druva API 凭据中的客户端 IDCLIENT_SECRET:Druva API 凭据中的 Secret KeyMAX_RECORDS:每次调用时要提取的记录数上限(例如10000)PAGE_SIZE:每个 API 页面的事件数(最多500)LOOKBACK_HOURS:首次运行的回溯小时数(例如,24)
点击部署。
等待部署成功完成。
创建 Cloud Scheduler 作业
创建一个 Cloud Scheduler 作业,以定期触发 Cloud Run 函数。
- 在 Google Cloud 控制台中,前往 Cloud Scheduler。
- 点击创建作业。
提供以下配置详细信息:
- 名称:输入
druva-backup-scheduler - 区域:选择与 Cloud Run 函数相同的区域(例如
us-central1) - 说明:输入
Triggers Druva Backup log collection every 30 minutes - 频次:输入
*/30 * * * *(每 30 分钟) - 时区:选择您的首选时区(例如
UTC)
- 名称:输入
点击继续。
配置目标:
- 目标类型:选择 Pub/Sub
- Cloud Pub/Sub 主题:选择
druva-backup-trigger - 消息正文:输入
{"trigger": "scheduled"}
点击创建。
测试 Cloud Scheduler 作业
- 在 Cloud Scheduler 列表中,找到
druva-backup-scheduler。 - 点击强制运行以立即触发该函数。
- 通过以下方式验证执行情况:
- Cloud Run functions > druva-backup-to-gcs > 日志中的 Cloud Run 函数日志
- Cloud Storage> druva-backup-logs 中新 NDJSON 文件的 GCS 存储桶
检索 Google SecOps 服务账号并配置 Feed
获取服务账号电子邮件地址
- 依次前往 SIEM 设置> Feed。
- 点击添加新 Feed。
- 点击配置单个 Feed。
- 在 Feed 名称字段中,输入 Feed 的名称(例如
Druva Backup Events)。 - 选择 Google Cloud Storage V2 作为来源类型。
- 选择 Druva Backup 作为日志类型。
点击获取服务账号。系统会显示一个唯一的服务账号电子邮件地址,例如:
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com复制此电子邮件地址,以便在下一步中使用。
配置 Feed
- 点击下一步。
为以下输入参数指定值:
存储桶网址:输入带有前缀路径的 GCS 存储桶 URI:
gs://druva-backup-logs/druva_backup/来源删除选项:根据您的偏好选择删除选项:
- 永不:转移后永不删除任何文件(建议用于测试)
- 删除已转移的文件:成功转移后删除文件
- 删除已转移的文件和空目录:成功转移后删除文件和空目录
文件存在时间上限:包含在过去指定天数内修改的文件(默认值为 180 天)
资产命名空间:资产命名空间
注入标签:要应用于此 Feed 中事件的标签
点击下一步。
在最终确定界面中查看新的 Feed 配置,然后点击提交。
向 Google SecOps 服务账号授予 IAM 权限
Google SecOps 服务账号需要对您的 GCS 存储桶具有 Storage Object Viewer 角色,才能读取 Cloud Run 函数写入的日志文件。
- 前往 Cloud Storage > 存储分区。
- 点击您的存储桶名称(例如
druva-backup-logs)。 - 前往权限标签页。
- 点击授予访问权限。
- 提供以下配置详细信息:
- 添加主账号:粘贴 Google SecOps 服务账号电子邮件地址(例如
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com) - 分配角色:选择 Storage Object Viewer
- 添加主账号:粘贴 Google SecOps 服务账号电子邮件地址(例如
点击保存。
UDM 映射表
| 日志字段 | UDM 映射 | 逻辑 |
|---|---|---|
| inSyncUserID、eventsGroupId、FilesMissed、FilesBackedup、TotalBackupSize、TotalBytesTransferred、facility、inSyncDataSourceID、initiator、event_type | additional.fields | 如果非空,则与根据每个字段创建的标签合并 |
| 发起方 | extensions.auth.type | 如果发起者与电子邮件正则表达式匹配,则设置为“AUTHTYPE_UNSPECIFIED” |
| metadata.event_type | 如果 has_target_user 为 true 且 has_principal 为 true,则设置为“USER_LOGIN”;如果 has_principal 为 true 且 has_target 为 false,则设置为“STATUS_UPDATE”;否则设置为“GENERIC_EVENT” | |
| eventID | metadata.product_log_id | 已转换为字符串 |
| metadata.product_name | 设置为“DRUVA_BACKUP” | |
| clientVersion | metadata.product_version | 直接复制值 |
| inSyncDataSourceName | principal.asset.hostname | 直接复制值 |
| ip | principal.asset.ip | 合并自 IP |
| inSyncDataSourceName | principal.hostname | 直接复制值 |
| ip | principal.ip | 合并自 IP |
| clientOS | principal.platform | 如果匹配 (?i)Linux,则设置为“LINUX”;如果匹配 (?i)windows,则设置为“WINDOWS”;如果匹配 (?i)mac,则设置为“MAC” |
| profileName | principal.resource.name | 直接复制值 |
| profileID | principal.resource.product_object_id | 已转换为字符串 |
| eventState | security_result.action | 如果与 (?i)Success 匹配,则设置为“ALLOW”,否则设置为“BLOCK” |
| eventState | security_result.action_details | 直接复制值 |
| 和程度上减少 | security_result.severity | 如果值在 [0,1,2,3,LOW] 范围内,则设置为“LOW”;如果值在 [4,5,6,MEDIUM,SUBSTANTIAL,INFO] 范围内,则设置为“MEDIUM”;如果值在 [7,8,HIGH,SEVERE] 范围内,则设置为“HIGH”;如果值在 [9,10,VERY-HIGH,CRITICAL] 范围内,则设置为“CRITICAL” |
| inSyncUserEmail,启动器 | target.user.email_addresses | 从 inSyncUserEmail 合并;如果与电子邮件正则表达式匹配,则也从发起者合并 |
| inSyncUserName | target.user.userid | 直接复制值 |
| metadata.vendor_name | 设置为“DRUVA_BACKUP” |
需要更多帮助?获得社区成员和 Google SecOps 专业人士的解答。