收集 Digital Guardian EDR 日志

支持的平台:

本文档介绍了如何使用 Google Cloud Storage V2 通过 Cloud Run 函数将 Digital Guardian EDR 日志注入到 Google Security Operations 中。

Fortra 的 Digital Guardian(以前称为 Digital Guardian)是一个全面的数据丢失防护和端点检测与响应平台,可让您了解端点、网络和云应用中的系统、用户和数据事件。Analytics & Reporting Cloud (ARC) 服务提供高级分析、工作流和报告功能,可实现全面的数据保护。Cloud Run 函数使用 OAuth 2.0 向 ARC Export API 进行身份验证,检索导出数据,确认书签以推进到下一个块,将结果以 NDJSON 格式写入 GCS 存储桶,然后 Google SecOps 通过 GCS V2 Feed 提取这些结果。

准备工作

请确保满足以下前提条件:

  • Google SecOps 实例
  • 已启用以下 API 的 Google Cloud 项目:
    • Cloud Storage
    • Cloud Run functions
    • Cloud Scheduler
    • Pub/Sub
    • Cloud Build
  • 创建和管理 Cloud Storage 存储分区、Cloud Run 函数、Pub/Sub 主题和 Cloud Scheduler 作业的权限
  • 对 Digital Guardian 管理控制台 (DGMC) 的特权访问权限
  • 对 Digital Guardian Analytics & Reporting Cloud (ARC) 租户设置的访问权限
  • 在 DGMC 中配置 Cloud 服务的管理员权限
  • 在 DGMC 中创建的具有有效 GUID 的导出配置文件

创建 Google Cloud Storage 存储分区

  1. 前往 Google Cloud 控制台
  2. 选择您的项目或创建新项目。
  3. 在导航菜单中,依次前往 Cloud Storage > 存储分区
  4. 点击创建存储分区
  5. 提供以下配置详细信息:

    设置
    为存储桶命名 输入一个全局唯一的名称(例如 digitalguardian-edr-logs
    位置类型 根据您的需求进行选择(区域级、双区域级、多区域级)
    位置 选择离您的 Google SecOps 实例最近的位置(例如 us-central1
    存储类别 标准(建议用于经常访问的日志)
    访问权限控制 统一(推荐)
    保护工具 可选:启用对象版本控制或保留政策
  6. 点击创建

收集 Digital Guardian API 凭据

  • 如需使 Cloud Run 函数能够从 Digital Guardian ARC 检索导出数据,您需要获取 API 凭据并配置导出配置文件。

从 DGMC 获取 API 凭据

  1. 登录 Digital Guardian Management Console (DGMC)
  2. 依次前往系统 > 配置 > 云服务
  3. API 访问权限部分中,找到并记录以下值:

    • API 访问 ID:这是用于 OAuth 2.0 身份验证的客户端 ID。
    • API 访问密钥:这是用于 OAuth 2.0 身份验证的客户端密钥。
    • 访问网关基础网址:API 网关端点(例如 https://accessgw-usw.msp.digitalguardian.com)。
    • 授权服务器网址:OAuth 2.0 令牌端点(例如 https://authsrv.msp.digitalguardian.com/as/token.oauth2)。

创建和配置导出配置文件

  1. Digital Guardian 管理控制台 (DGMC) 中,依次前往 Admin > Reports > Export Profiles
  2. 点击创建导出配置或选择现有导出配置。
  3. 使用以下设置配置导出配置文件:
    • 配置文件名称:输入一个描述性名称(例如 Google SecOps SIEM Integration)。
    • 数据源:根据要导出的数据,选择事件提醒
    • 导出格式:选择 JSON 平展表(建议用于 SIEM 集成)。
    • 字段:选择要包含在导出内容中的字段。
    • 过滤条件:配置任何过滤条件以限制导出的数据(可选)。
  4. 点击保存以创建导出配置文件。
  5. 保存后,在列表中找到导出配置文件,然后从导出配置文件网址或详情页面中复制 GUID

记录凭据摘要

保存以下信息,以便配置 Cloud Run 函数环境变量:

  • 客户端 ID(API 访问 ID):来自 DGMC Cloud Services
  • 客户端密钥 (API 访问密钥):来自 DGMC Cloud Services
  • 授权服务器网址:例如,https://authsrv.msp.digitalguardian.com/as/token.oauth2
  • Access Gateway 基准网址:例如 https://accessgw-usw.msp.digitalguardian.com
  • 导出配置文件 GUID:来自在 DGMC 中创建的导出配置文件

测试 API 访问权限

  1. 运行以下命令,验证您的凭据是否有效:

    # Step 1: Obtain OAuth 2.0 access token
    curl -s -X POST \
      -d "grant_type=client_credentials&client_id=YOUR_CLIENT_ID&client_secret=YOUR_CLIENT_SECRET&scope=client" \
      "https://authsrv.msp.digitalguardian.com/as/token.oauth2"
    
    # Step 2: Test export endpoint with the access token
    curl -s -H "Authorization: Bearer YOUR_ACCESS_TOKEN" \
      "https://accessgw-usw.msp.digitalguardian.com/rest/1.0/export/YOUR_EXPORT_PROFILE_GUID"
    
  2. 如果响应成功,则会返回包含导出数据的 JSON 文档。如果您收到身份验证错误,请验证 DGMC Cloud Services 中的 API 访问 ID 和密钥。

为 Cloud Run 函数创建服务账号

  1. Google Cloud 控制台中,依次前往 IAM 和管理 > 服务账号
  2. 点击创建服务账号
  3. 提供以下配置详细信息:
    • 服务账号名称:输入 digitalguardian-ingestion(或一个描述性名称)。
    • 服务账号说明:输入 Service account for Digital Guardian EDR Cloud Run function to write logs to GCS
  4. 点击创建并继续
  5. 向此服务账号授予对项目的访问权限部分中,添加以下角色:
    • Storage Object Admin(用于读取/写入 Cloud Storage 存储桶中的对象)
    • Cloud Run Invoker(允许 Cloud Scheduler 调用该函数)
  6. 点击继续
  7. 点击完成

创建 Pub/Sub 主题

Cloud Scheduler 通过 Pub/Sub 主题触发 Cloud Run 函数。

  1. Google Cloud 控制台中,前往 Pub/Sub > 主题
  2. 点击创建主题
  3. 主题 ID 字段中,输入 digitalguardian-edr-trigger
  4. 保留默认设置。
  5. 点击创建

创建 Cloud Run 函数

创建一个 Cloud Run 函数,该函数使用 OAuth 2.0 客户端凭据向 Digital Guardian ARC 进行身份验证,检索导出数据,确认书签以推进到下一个块,并将结果以 NDJSON 格式写入 GCS。

准备函数源文件

为 Cloud Run 函数部署创建以下两个文件。

  • requirements.txt

    functions-framework==3.*
    google-cloud-storage==2.*
    urllib3==2.*
    
  • main.py

    """Cloud Run function to ingest Digital Guardian EDR logs into GCS."""
    
    import json
    import os
    import time
    import urllib.parse
    from datetime import datetime, timezone
    
    import functions_framework
    import urllib3
    from google.cloud import storage
    
    GCS_BUCKET = os.environ["GCS_BUCKET"]
    GCS_PREFIX = os.environ.get("GCS_PREFIX", "digitalguardian_edr")
    STATE_KEY = os.environ.get("STATE_KEY", "digitalguardian_edr_state.json")
    AUTH_SERVER_URL = os.environ["AUTH_SERVER_URL"]
    ARC_SERVER_URL = os.environ["ARC_SERVER_URL"]
    CLIENT_ID = os.environ["CLIENT_ID"]
    CLIENT_SECRET = os.environ["CLIENT_SECRET"]
    EXPORT_PROFILE_GUID = os.environ["EXPORT_PROFILE_GUID"]
    MAX_RECORDS = int(os.environ.get("MAX_RECORDS", "10000"))
    
    http = urllib3.PoolManager()
    gcs = storage.Client()
    
    def _get_access_token() -> str:
        """Obtain an OAuth 2.0 access token using client credentials grant."""
        body = urllib.parse.urlencode({
            "grant_type": "client_credentials",
            "client_id": CLIENT_ID,
            "client_secret": CLIENT_SECRET,
            "scope": "client",
        })
        resp = http.request(
            "POST",
            AUTH_SERVER_URL,
            body=body,
            headers={"Content-Type": "application/x-www-form-urlencoded"},
        )
        if resp.status != 200:
            raise RuntimeError(
                f"OAuth token request failed: {resp.status} — "
                f"{resp.data.decode('utf-8')}"
            )
        token_data = json.loads(resp.data.decode("utf-8"))
        return token_data["access_token"]
    
    def _arc_get(token: str, path: str, retries: int = 5) -> dict:
        """Execute a GET request against the ARC API with retry on 429."""
        url = f"{ARC_SERVER_URL}{path}"
        headers = {
            "Authorization": f"Bearer {token}",
            "Accept": "application/json",
        }
        backoff = 2
        for attempt in range(retries):
            resp = http.request("GET", url, headers=headers)
            if resp.status == 200:
                return json.loads(resp.data.decode("utf-8"))
            if resp.status == 429:
                wait = backoff * (2 ** attempt)
                print(
                    f"Rate limited (429). Retrying in {wait}s "
                    f"(attempt {attempt + 1}/{retries})."
                )
                time.sleep(wait)
                continue
            raise RuntimeError(
                f"ARC API error: {resp.status}{resp.data.decode('utf-8')}"
            )
        raise RuntimeError(
            "ARC API rate limit exceeded after maximum retries."
        )
    
    def _arc_acknowledge(token: str) -> None:
        """POST to the acknowledge endpoint to advance the export bookmark."""
        url = (
            f"{ARC_SERVER_URL}/rest/1.0/export/"
            f"{EXPORT_PROFILE_GUID}/acknowledge"
        )
        headers = {
            "Authorization": f"Bearer {token}",
            "Accept": "application/json",
        }
        resp = http.request("POST", url, headers=headers)
        if resp.status not in (200, 204):
            raise RuntimeError(
                f"ARC acknowledge failed: {resp.status} — "
                f"{resp.data.decode('utf-8')}"
            )
        print("Export bookmark acknowledged successfully.")
    
    def _load_state() -> dict:
        """Load the last run state from GCS."""
        bucket = gcs.bugcs.bucketUCKET)
        blob = bucket.blob(f"{GCS_PREFIX}/{STATE_KEY}")
        if blob.exists():
            return json.loads(blob.downlodownload_as_text  return {}
    
    def _save_state(state: dict) -> None:
        """Persist run state to GCS."""
        bucket = gcs.bugcs.bucketUCKET)
        blob = bucket.blob(f"{GCS_PREFIX}/{STATE_KEY}")
        blob.uploadupload_from_string    json.dumps(state), content_type="application/json"
        )
    
    def _fetch_export(token: str) -> list:
        """Fetch export data from the ARC Export API."""
        path = f"/rest/1.0/export/{EXPORT_PROFILE_GUID}"
        data = _arc_get(token, path)
        records = data if isinstance(data, list) else data.get("data", [])
        return records[:MAX_RECORDS]
    
    def _write_ndjson(records: list, run_ts: str) -> str:
        """Write records as NDJSON to GCS and return the blob path."""
        bucket = gcs.bugcs.bucketUCKET)
        blob_path = (
            f"{GCS_PREFIX}/year={run_ts[:4]}/month={run_ts[5:7]}/"
            f"day={run_ts[8:10]}/{run_ts}_export.ndjson"
        )
        blob = bucket.blob(blob_path)
        ndjson = "\n".join(
            json.dumps(r, separators=(",", ":")) for r in records
        )
        blob.uploadupload_from_stringn, content_type="application/x-ndjson")
        return blob_path
    
    @functions_framework.cloud_event
    def main(cloud_event):
        """Entry point triggered by Pub/Sub via Cloud Scheduler."""
        state = _load_state()
        now = datetime.now(timezone.utc)
    
        print("Authenticating to Digital Guardian ARC.")
        token = _get_access_token()
    
        print(
            f"Fetching export data for profile {EXPORT_PROFILE_GUID}."
        )
        records = _fetch_export(token)
    
        if not records:
            print("No new export data found.")
            return "OK"
    
        run_ts = now.strftime("%Y-%m-%dT%H%M%SZ")
        blob_path = _write_ndjson(records, run_ts)
        print(
            f"Wrote {len(records)} records to "
            f"gs://{GCS_BUCKET}/{blob_path}."
        )
    
        _arc_acknowledge(token)
    
        state["last_run"] = now.isoformat()
        state["records_written"] = len(records)
        _save_state(state)
        print(f"State updated. last_run={now.isoformat()}.")
        return "OK"
    

部署 Cloud Run 函数

  1. 将这两个文件(main.pyrequirements.txt)保存到本地目录(例如 digitalguardian-function/)中。
  2. 打开 Cloud Shell 或安装了 gcloud CLI 的终端。
  3. 运行以下命令以部署函数:

    gcloud functions deploy digitalguardian-edr-to-gcs \
      --gen2 \
      --region=us-central1 \
      --runtime=python312 \
      --trigger-topic=digitalguardian-edr-trigger \
      --entry-point=main \
      --memory=512MB \
      --timeout=540s \
      --service-account=digitalguardian-ingestion@PROJECT_ID.iam.gserviceaccount.com \
      --set-env-vars=\
      "GCS_BUCKET=digitalguardian-edr-logs",\
      "GCS_PREFIX=digitalguardian_edr",\
      "STATE_KEY=digitalguardian_edr_state.json",\
      "AUTH_SERVER_URL=https://authsrv.msp.digitalguardian.com/as/token.oauth2",\
      "ARC_SERVER_URL=https://accessgw-usw.msp.digitalguardian.com",\
      "CLIENT_ID=YOUR_CLIENT_ID",\
      "CLIENT_SECRET=YOUR_CLIENT_SECRET",\
      "EXPORT_PROFILE_GUID=YOUR_EXPORT_PROFILE_GUID",\
      "MAX_RECORDS=10000"
    
  4. 替换以下占位值:

    • PROJECT_ID:您的 Google Cloud 项目 ID。
    • digitalguardian-edr-logs:您的 GCS 存储桶名称。
    • YOUR_CLIENT_ID:您的 Digital Guardian API 访问 ID。
    • YOUR_CLIENT_SECRET:您的 Digital Guardian API 访问密钥。
    • YOUR_EXPORT_PROFILE_GUID:DGMC 中的导出配置文件 GUID。
  5. 通过检查函数状态来验证部署:

    gcloud functions describe digitalguardian-edr-to-gcs --region=us-central1 --gen2
    

环境变量参考文档

变量 必需 默认 说明
GCS_BUCKET 用于存储 NDJSON 输出的 GCS 存储桶名称
GCS_PREFIX digitalguardian_edr 存储桶中的对象前缀(文件夹路径)
STATE_KEY digitalguardian_edr_state.json 前缀内状态文件的 blob 名称
AUTH_SERVER_URL OAuth 2.0 授权服务器网址
ARC_SERVER_URL ARC Access Gateway 基准网址
CLIENT_ID 来自 DGMC 的 API 访问 ID
CLIENT_SECRET 来自 DGMC 的 API 访问密钥
EXPORT_PROFILE_GUID 从 DGMC 导出个人资料 GUID
MAX_RECORDS 10000 每次执行要写入的记录数上限

创建 Cloud Scheduler 作业

Cloud Scheduler 通过 Pub/Sub 主题定期触发 Cloud Run 函数。

  1. Google Cloud 控制台中,前往 Cloud Scheduler
  2. 点击创建作业
  3. 提供以下配置详细信息:

    • 名称:输入 digitalguardian-edr-ingestion-schedule
    • 区域:选择与 Cloud Run 函数相同的区域(例如 us-central1)。
    • 频次:输入 */5 * * * *(每 5 分钟)。

    • 时区:选择您的首选时区(例如 UTC)。

  4. 点击继续

  5. 配置执行部分中:

    • 目标类型:选择 Pub/Sub
    • 主题:选择 digitalguardian-edr-trigger
    • 消息正文:输入 {"run": true}
  6. 点击继续

  7. 配置可选设置部分中:

    • 重试次数上限:输入 3
    • 退避时长下限:输入 5s
    • 退避时长上限:输入 60s
  8. 点击创建

  9. 如需立即运行测试,请点击作业名称旁边的三点状图标 (...),然后选择强制运行

检索 Google SecOps 服务账号并配置 Feed

Google SecOps 使用唯一的服务账号从您的 GCS 存储桶中读取数据。您必须向此服务账号授予对您的存储桶的访问权限。

获取服务账号电子邮件地址

  1. 依次前往 SIEM 设置> Feed
  2. 点击添加新 Feed
  3. 点击配置单个 Feed
  4. Feed 名称字段中,输入 Feed 的名称(例如 Digital Guardian EDR Logs)。
  5. 选择 Google Cloud Storage V2 作为来源类型
  6. 选择 Digital Guardian EDR 作为日志类型
  7. 点击获取服务账号。系统会显示一个唯一的服务账号电子邮件地址,例如:

    chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com
    
  8. 复制此电子邮件地址,以便在下一步中使用。

  9. 点击下一步

  10. 为以下输入参数指定值:

    • 存储桶网址:输入 GCS 存储桶 URI:

      gs://digitalguardian-edr-logs/digitalguardian_edr/
      
      • digitalguardian-edr-logs 替换为您的 GCS 存储桶名称。
      • digitalguardian_edr 替换为您配置的 GCS_PREFIX 值。
    • 来源删除选项:根据您的偏好选择删除选项:

      • 永不:转移后永不删除任何文件(建议用于测试)。
      • 删除已转移的文件:成功转移后删除文件。
      • 删除已转移的文件和空目录:成功转移后删除文件和空目录。

    • 文件存在时间上限:包含在过去指定天数内修改的文件(默认值为 180 天)。

    • 资产命名空间资产命名空间

    • 注入标签:要应用于此 Feed 中事件的标签。

  11. 点击下一步

  12. 最终确定界面中查看新的 Feed 配置,然后点击提交

向 Google SecOps 服务账号授予 IAM 权限

Google SecOps 服务账号需要对您的 Cloud Storage 存储桶具有 Storage Object Viewer 角色。

  1. 前往 Cloud Storage > 存储分区
  2. 点击您的存储桶名称(例如 digitalguardian-edr-logs)。
  3. 前往权限标签页。
  4. 点击授予访问权限
  5. 提供以下配置详细信息:
    • 添加主账号:粘贴 Google SecOps 服务账号电子邮件地址(例如 chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com)。
    • 分配角色:选择 Storage Object Viewer
  6. 点击保存

UDM 映射表

日志字段 UDM 映射 逻辑
应用 target.application 直接复制值
应用 target.process.command_line 如果规则与 Printer 匹配,则设置为 %{Application}。
Bytes_Written network.sent_bytes 直接复制并转换为 uinteger 的值
类别、Computer_Name、详细信息 metadata.description 如果 Category == "Policies" 且 Computer_Name 为空,则设置为 %{Detail};否则,在 grok_parse_failure 时设置为 %{message}
Command_Line、Command_Line1 principal.process.command_line 如果 Command_Line 中的值不为空,则为该值(移除尾随引号),否则为 Command_Line1 中的值(移除尾随引号)
Computer_Name,来源 principal.hostname 如果 computerName 不为空,则使用该值;否则,设置为 %{source}
Destination_Device_Serial_Number, Destination_Device_Serial_Number1 使用用于处理引号的 grok 模式提取
Destination_Directory、Destination_File target.file.full_path 如果 Destination_Directory 和 Destination_File 均不为空,则将两者串联
Destination_Drive_Type security_result.detection_fields 与 destination_drive_type_label 合并(键:Destination_Drive_Type,值:%{Destination_Drive_Type})
Destination_File target.file.names 从 Destination_File 合并
Destination_File_Extension target.file.mime_type 直接复制值
Dll_SHA1_Hash target.process.file.sha1 直接复制的值(在转换为小写后)
Email_Address principal.user.email_addresses 从 Email_Address 合并
Email_Sender、Email_Subject network.email.from 如果不为空,则设置为 %{Email_Sender}
Email_Sender、Email_Subject network.email.subject 如果 Email_Sender 不为空,则从主题 (%{Email_Subject}) 合并
File_Extension principal.process.file.mime_type 直接复制值
IP_Address, source_ip principal.ip 如果 source_ip 不为空,则从 source_ip 合并,否则从 IP_Address 合并
Local_Port、source_port principal.port 如果 source_port 不为空,则使用 source_port 中的值并将其转换为整数;否则,使用 Local_Port 中的值并将其转换为整数
MD5_Checksum target.process.file.md5 直接复制的值(在转换为小写后)
Network_Direction network.direction 如果为 True,则设置为 INBOUND;如果为 False,则设置为 OUTBOUND
Process_PID principal.process.pid 直接复制值
Process_SHA256_Hash target.process.file.sha256 直接复制的值(在转换为小写后)
Product_Version metadata.product_version 直接复制值
协议 network.ip_protocol 如果值为“1”,则设置为 ICMP
Remote_Port target.port 直接复制并转换为整数的值
规则 security_result.rule_name 直接复制值
规则 metadata.event_type 如果匹配 .Printer.,则设置为 PROCESS_UNCATEGORIZED;如果匹配 DLP.*,则设置为 FILE_MOVE
严重程度 security_result.severity 如果转换为整数后 <=3,则设置为 LOW;如果 <=6,则设置为 MEDIUM;如果 <=8,则设置为 HIGH;如果 <=10,则设置为 CRITICAL
严重程度 security_result.severity_details 直接复制值
Source_Directory、Source_File src.file.full_path 如果 Source_Directory 和 Source_File 均不为空,则将两者串联
Source_Drive_Type security_result.detection_fields 与 source_drive_type_label 合并(键:Source_Drive_Type,值:%{Source_Drive_Type})
Source_File src.file.names 从 Source_File 合并
Source_File_Extension src.file.mime_type 直接复制值
网址_Path、http_url target.url 如果 http_url 不为空,则取自 http_url,否则取自 网址_Path
User_Name principal.user.userid 从 userName 中提取的 Grok 值
User_Name principal.administrative_domain 从 domainName 中提取 grok 后的值
Was_Removable security_result.detection_fields 已与 was_removable_label 合并(键:Was_Removable,值:%{Was_Removable})
Was_Source_Removable security_result.detection_fields 已与 was_source_removable_label 合并(键:Was_Source_Removable,值:%{Was_Source_Removable})
computerName、destination_ip、protocol、source_ip、IP_Address、destination、userName、Process_PID、Category、Computer_Name metadata.event_type 初始设置为 GENERIC_EVENT;如果协议为 HTTPS 且有 destination_ip 或 computerName,则设置为 NETWORK_HTTP;如果有 source_ip 或 IP_Address 且有 destination_ip,则设置为 NETWORK_CONNECTION;如果 userName 不为空,则设置为 USER_UNCATEGORIZED;如果 Process_PID 不为空,则设置为 SCAN_PROCESS
destination_ip target.ip 从 destination_ip 合并
incidents_url, matched_policies_by_severity security_result 已与 _sr 合并(rule_name: %{matched_policies_by_severity}, url_back_to_product: %{incidents_url})
协议 network.application_protocol 如果协议为 HTTP 或 HTTPS,则设置为 HTTPS
security_action security_result.action 从 security_action 合并
metadata.product_name 设置为“企业 DLP 平台”
metadata.vendor_name 设置为“DigitalGuardian”

需要更多帮助?获得社区成员和 Google SecOps 专业人士的解答。