收集 Druva Backup 日志

支持的平台:

本文档介绍了如何通过设置 Google Cloud Run 函数来收集 Druva Backup 日志,该函数可从 Druva REST API 中检索事件并将其写入 Google Cloud Storage 存储桶,然后使用 Google Cloud Storage V2 配置 Google Security Operations Feed。

Druva 是一个云原生数据保护和管理平台,可为端点、SaaS 应用和企业工作负载提供备份、灾难恢复和归档服务。该平台会生成全面的审核轨迹、备份事件、恢复活动和安全提醒,这些信息可以与 SIEM 解决方案集成,以实现监控和合规性。

准备工作

请确保满足以下前提条件:

  • Google SecOps 实例
  • 启用了结算功能的 Google Cloud 项目
  • 已启用以下 Google Cloud API:
    • Cloud Run Functions API
    • Cloud Scheduler API
    • Cloud Storage API
    • Pub/Sub API
    • IAM API
  • Druva Cloud 管理员对 Druva Cloud Platform 控制台的访问权限
  • 访问 Druva 集成中心以创建 API 凭据

创建 Google Cloud Storage 存储分区

  1. 前往 Google Cloud 控制台
  2. 选择您的项目或创建新项目。
  3. 在导航菜单中,依次前往 Cloud Storage > 存储分区
  4. 点击创建存储分区
  5. 提供以下配置详细信息:

    设置
    为存储桶命名 输入一个全局唯一的名称(例如 druva-backup-logs
    位置类型 根据您的需求进行选择(区域级、双区域级、多区域级)
    位置 选择离您的 Google SecOps 实例最近的位置(例如 us-central1
    存储类别 标准(建议用于经常访问的日志)
    访问权限控制 统一(推荐)
    保护工具 可选:启用对象版本控制或保留政策
  6. 点击创建

收集 Druva API 凭据

如需让 Cloud Run 函数能够从 Druva 检索事件,您需要创建具有 OAuth 2.0 身份验证的 API 凭据。

创建 API 凭据

  1. 登录 Druva Cloud Platform Console
  2. 全局导航菜单中,选择集成中心
  3. 在左侧面板中,点击 API 凭据
  4. 点击新凭据
  5. 新凭据窗口中,提供以下详细信息: 名称:输入一个描述性名称(例如 Google SecOps Cloud Storage Integration)。
  6. 如需应用授权限制,请执行以下操作:
    1. 选择 Druva Cloud Administrator 以允许完全访问数据检索和修改权限。
    2. 或者,选择产品管理员,然后选择:Cloud Admin(只读)角色:限制对数据检索的访问权限,但不授予修改权限(建议用于 SIEM 集成)
  7. 点击保存

记录 API 凭据

创建 API 凭据后,系统会显示凭据详细信息窗口:

  1. 点击客户端 ID 旁边的复制图标,将该值复制到剪贴板。
  2. 安全地保存客户端 ID(例如 McNkxxxx4Vicxxxx4Ldpxxxx/09Uxxxx)。
  3. 点击密钥旁边的复制图标,将相应值复制到剪贴板。
  4. 妥善保存 Secret Key(例如 Xmcxxxx8j5xxxx6NxxxxRbRxxxxNNyPt)。

创建服务账号

为 Cloud Run 函数创建专用服务账号,以访问 Google Cloud Storage。

  1. Google Cloud 控制台中,依次前往 IAM 和管理 > 服务账号
  2. 点击创建服务账号
  3. 提供以下配置详细信息:
    • 服务账号名称:输入 druva-backup-function(或一个描述性名称)
    • 服务账号说明:输入 Service account for Druva Backup Cloud Run function
  4. 点击创建并继续
  5. 向此服务账号授予对项目的访问权限部分中,添加以下角色:
    1. 点击选择角色,然后选择 Storage Object Admin
    2. 点击添加其他角色,然后选择 Cloud Run Invoker
  6. 点击继续
  7. 点击完成
  8. 记录服务账号电子邮件地址(例如 druva-backup-function@PROJECT_ID.iam.gserviceaccount.com)。

创建 Pub/Sub 主题

创建一个 Pub/Sub 主题,供 Cloud Scheduler 用来触发 Cloud Run 函数。

  1. Google Cloud 控制台中,前往 Pub/Sub > 主题
  2. 点击创建主题
  3. 提供以下配置详细信息:
    • 主题 ID:输入 druva-backup-trigger
  4. 取消选中添加默认订阅
  5. 点击创建

创建 Cloud Run 函数

准备函数代码

创建一个 Cloud Run 函数,该函数使用 OAuth 2.0 客户端凭据向 Druva API 进行身份验证,通过事件端点检索事件(带分页),并将结果以 NDJSON 格式写入 GCS 存储桶。

部署 Cloud Run 函数

  1. Google Cloud 控制台中,前往 Cloud Run 函数
  2. 点击创建函数
  3. 提供以下配置详细信息:

    • 环境:选择第 2 代
    • 函数名称:输入 druva-backup-to-gcs
    • 区域:选择距离您的 GCS 存储桶最近的区域(例如 us-central1
    • 触发器类型:选择 Cloud Pub/Sub
    • Cloud Pub/Sub 主题:选择 druva-backup-trigger
    • 服务账号:选择 druva-backup-function@PROJECT_ID.iam.gserviceaccount.com
    • 分配的内存512 MiB
    • 超时540
    • 实例数上限1
  4. 点击下一步

  5. 选择 Python 3.11 作为运行时

  6. 入口点设置为 main

  7. 源代码编辑器中,将 main.py 的内容替换为以下内容:

    import base64
    import json
    import os
    import time
    from datetime import datetime, timezone, timedelta
    
    import requests
    from google.cloud import storage
    
    GCS_BUCKET = os.environ["GCS_BUCKET"]
    GCS_PREFIX = os.environ.get("GCS_PREFIX", "druva_backup")
    STATE_KEY = os.environ.get("STATE_KEY", "druva_state.json")
    DRUVA_BASE_URL = os.environ.get("DRUVA_BASE_URL", "apis.druva.com")
    CLIENT_ID = os.environ["CLIENT_ID"]
    CLIENT_SECRET = os.environ["CLIENT_SECRET"]
    MAX_RECORDS = int(os.environ.get("MAX_RECORDS", "10000"))
    PAGE_SIZE = int(os.environ.get("PAGE_SIZE", "500"))
    LOOKBACK_HOURS = int(os.environ.get("LOOKBACK_HOURS", "24"))
    
    def get_oauth_token():
        """Obtain OAuth 2.0 access token using client credentials grant."""
        token_url = f"https://{DRUVA_BASE_URL}/token"
        payload = {
            "grant_type": "client_credentials",
            "scope": "read",
        }
        resp = requests.post(
            token_url,
            data=payload,
            auth=(CLIENT_ID, CLIENT_SECRET),
            timeout=30,
        )
        resp.raise_for_status()
        return resp.json()["access_token"]
    
    def load_state(storage_client):
        """Load the persisted state (last event time and tracker) from GCS."""
        bucket = storage_client.bucket(GCS_BUCKET)
        blob = bucket.blob(f"{GCS_PREFIX}/{STATE_KEY}")
        if blob.exists():
            return json.loads(blob.download_as_text())
        return {}
    
    def save_state(storage_client, state):
        """Persist state to GCS."""
        bucket = storage_client.bucket(GCS_BUCKET)
        blob = bucket.blob(f"{GCS_PREFIX}/{STATE_KEY}")
        blob.upload_from_string(
            json.dumps(state),
            content_type="application/json",
        )
    
    def fetch_events(token, state):
        """Fetch events from Druva API with pagination via nextPageToken."""
        events_url = f"https://{DRUVA_BASE_URL}/insync/eventmanagement/v2/events"
        headers = {
            "Authorization": f"Bearer {token}",
            "Accept": "application/json",
        }
    
        params = {"pageSize": PAGE_SIZE}
    
        tracker = state.get("tracker")
        last_event_time = state.get("last_event_time")
    
        if tracker:
            params["tracker"] = tracker
        elif last_event_time:
            params["fromTime"] = last_event_time
        else:
            lookback = datetime.now(timezone.utc) - timedelta(hours=LOOKBACK_HOURS)
            params["fromTime"] = lookback.strftime("%Y-%m-%dT%H:%M:%SZ")
    
        all_events = []
        total_fetched = 0
    
        while total_fetched < MAX_RECORDS:
            resp = requests.get(
                events_url,
                headers=headers,
                params=params,
                timeout=60,
            )
            resp.raise_for_status()
            data = resp.json()
    
            events = data.get("events", [])
            all_events.extend(events)
            total_fetched += len(events)
    
            new_tracker = data.get("tracker")
            next_page_token = data.get("nextPageToken")
    
            if new_tracker:
                state["tracker"] = new_tracker
    
            if next_page_token:
                params["nextPageToken"] = next_page_token
                params.pop("tracker", None)
                params.pop("fromTime", None)
            else:
                break
    
        if all_events:
            last_ts = all_events[-1].get("eventTime", "")
            if last_ts:
                state["last_event_time"] = last_ts
    
        return all_events, state
    
    def write_events_to_gcs(storage_client, events):
        """Write events as NDJSON to GCS."""
        if not events:
            return
    
        now = datetime.now(timezone.utc)
        filename = now.strftime("%Y%m%d_%H%M%S") + ".ndjson"
        blob_path = f"{GCS_PREFIX}/{now.strftime('%Y/%m/%d')}/{filename}"
    
        ndjson_lines = "\n".join(json.dumps(event) for event in events)
    
        bucket = storage_client.bucket(GCS_BUCKET)
        blob = bucket.blob(blob_path)
        blob.upload_from_string(
            ndjson_lines,
            content_type="application/x-ndjson",
        )
        print(f"Wrote {len(events)} events to gs://{GCS_BUCKET}/{blob_path}")
    
    def main(event, context):
        """Cloud Run function entry point triggered by Pub/Sub."""
        storage_client = storage.Client()
    
        token = get_oauth_token()
    
        state = load_state(storage_client)
    
        events, updated_state = fetch_events(token, state)
    
        write_events_to_gcs(storage_client, events)
    
        save_state(storage_client, updated_state)
    
        print(f"Completed: fetched {len(events)} events")
        return f"OK: {len(events)} events"
    
  8. 使用以下内容替换 requirements.txt 的内容:

    requests>=2.31.0
    google-cloud-storage>=2.14.0
    

配置环境变量

  1. 在 Cloud Run 函数配置中,前往运行时、构建、连接和安全设置部分。
  2. 运行时环境变量下,添加以下变量:

    • GCS_BUCKET:您的 GCS 存储桶的名称(例如 druva-backup-logs
    • GCS_PREFIX:日志文件的前缀路径(例如 druva_backup
    • STATE_KEY:状态文件名(例如 druva_state.json
    • DRUVA_BASE_URL:Druva API 基本网址:
      • apis.druva.com(适用于 Druva Cloud [标准])
      • govcloudapis.druva.com(适用于 Druva GovCloud)
    • CLIENT_ID:Druva API 凭据中的客户端 ID
    • CLIENT_SECRET:Druva API 凭据中的 Secret Key
    • MAX_RECORDS:每次调用时要提取的记录数上限(例如 10000
    • PAGE_SIZE:每个 API 页面的事件数(最多 500
    • LOOKBACK_HOURS:首次运行的回溯小时数(例如,24
  3. 点击部署

  4. 等待部署成功完成。

创建 Cloud Scheduler 作业

创建一个 Cloud Scheduler 作业,以定期触发 Cloud Run 函数。

  1. Google Cloud 控制台中,前往 Cloud Scheduler
  2. 点击创建作业
  3. 提供以下配置详细信息:

    • 名称:输入 druva-backup-scheduler
    • 区域:选择与 Cloud Run 函数相同的区域(例如 us-central1
    • 说明:输入 Triggers Druva Backup log collection every 30 minutes
    • 频次:输入 */30 * * * *(每 30 分钟)
    • 时区:选择您的首选时区(例如 UTC
  4. 点击继续

  5. 配置目标

    • 目标类型:选择 Pub/Sub
    • Cloud Pub/Sub 主题:选择 druva-backup-trigger
    • 消息正文:输入 {"trigger": "scheduled"}
  6. 点击创建

测试 Cloud Scheduler 作业

  1. Cloud Scheduler 列表中,找到 druva-backup-scheduler
  2. 点击强制运行以立即触发该函数。
  3. 通过以下方式验证执行情况:
    • Cloud Run functions > druva-backup-to-gcs > 日志中的 Cloud Run 函数日志
    • Cloud Storage> druva-backup-logs 中新 NDJSON 文件的 GCS 存储桶

检索 Google SecOps 服务账号并配置 Feed

获取服务账号电子邮件地址

  1. 依次前往 SIEM 设置> Feed
  2. 点击添加新 Feed
  3. 点击配置单个 Feed
  4. Feed 名称字段中,输入 Feed 的名称(例如 Druva Backup Events)。
  5. 选择 Google Cloud Storage V2 作为来源类型
  6. 选择 Druva Backup 作为日志类型
  7. 点击获取服务账号。系统会显示一个唯一的服务账号电子邮件地址,例如:

    chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com
    
  8. 复制此电子邮件地址,以便在下一步中使用。

配置 Feed

  1. 点击下一步
  2. 为以下输入参数指定值:

    • 存储桶网址:输入带有前缀路径的 GCS 存储桶 URI:

      gs://druva-backup-logs/druva_backup/
      
    • 来源删除选项:根据您的偏好选择删除选项:

      • 永不:转移后永不删除任何文件(建议用于测试)
      • 删除已转移的文件:成功转移后删除文件
      • 删除已转移的文件和空目录:成功转移后删除文件和空目录
    • 文件存在时间上限:包含在过去指定天数内修改的文件(默认值为 180 天)

    • 资产命名空间资产命名空间

    • 注入标签:要应用于此 Feed 中事件的标签

  3. 点击下一步

  4. 最终确定界面中查看新的 Feed 配置,然后点击提交

向 Google SecOps 服务账号授予 IAM 权限

Google SecOps 服务账号需要对您的 GCS 存储桶具有 Storage Object Viewer 角色,才能读取 Cloud Run 函数写入的日志文件。

  1. 前往 Cloud Storage > 存储分区
  2. 点击您的存储桶名称(例如 druva-backup-logs)。
  3. 前往权限标签页。
  4. 点击授予访问权限
  5. 提供以下配置详细信息:
    • 添加主账号:粘贴 Google SecOps 服务账号电子邮件地址(例如 chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com
    • 分配角色:选择 Storage Object Viewer
  6. 点击保存

UDM 映射表

日志字段 UDM 映射 逻辑
inSyncUserID、eventsGroupId、FilesMissed、FilesBackedup、TotalBackupSize、TotalBytesTransferred、facility、inSyncDataSourceID、initiator、event_type additional.fields 如果非空,则与根据每个字段创建的标签合并
发起方 extensions.auth.type 如果发起者与电子邮件正则表达式匹配,则设置为“AUTHTYPE_UNSPECIFIED”
metadata.event_type 如果 has_target_user 为 true 且 has_principal 为 true,则设置为“USER_LOGIN”;如果 has_principal 为 true 且 has_target 为 false,则设置为“STATUS_UPDATE”;否则设置为“GENERIC_EVENT”
eventID metadata.product_log_id 已转换为字符串
metadata.product_name 设置为“DRUVA_BACKUP”
clientVersion metadata.product_version 直接复制值
inSyncDataSourceName principal.asset.hostname 直接复制值
ip principal.asset.ip 合并自 IP
inSyncDataSourceName principal.hostname 直接复制值
ip principal.ip 合并自 IP
clientOS principal.platform 如果匹配 (?i)Linux,则设置为“LINUX”;如果匹配 (?i)windows,则设置为“WINDOWS”;如果匹配 (?i)mac,则设置为“MAC”
profileName principal.resource.name 直接复制值
profileID principal.resource.product_object_id 已转换为字符串
eventState security_result.action 如果与 (?i)Success 匹配,则设置为“ALLOW”,否则设置为“BLOCK”
eventState security_result.action_details 直接复制值
和程度上减少 security_result.severity 如果值在 [0,1,2,3,LOW] 范围内,则设置为“LOW”;如果值在 [4,5,6,MEDIUM,SUBSTANTIAL,INFO] 范围内,则设置为“MEDIUM”;如果值在 [7,8,HIGH,SEVERE] 范围内,则设置为“HIGH”;如果值在 [9,10,VERY-HIGH,CRITICAL] 范围内,则设置为“CRITICAL”
inSyncUserEmail,启动器 target.user.email_addresses 从 inSyncUserEmail 合并;如果与电子邮件正则表达式匹配,则也从发起者合并
inSyncUserName target.user.userid 直接复制值
metadata.vendor_name 设置为“DRUVA_BACKUP”

需要更多帮助?获得社区成员和 Google SecOps 专业人士的解答。