收集 Digital Guardian EDR 日志
本文档介绍了如何使用 Google Cloud Storage V2 通过 Cloud Run 函数将 Digital Guardian EDR 日志注入到 Google Security Operations 中。
Fortra 的 Digital Guardian(以前称为 Digital Guardian)是一个全面的数据丢失防护和端点检测与响应平台,可让您了解端点、网络和云应用中的系统、用户和数据事件。Analytics & Reporting Cloud (ARC) 服务提供高级分析、工作流和报告功能,可实现全面的数据保护。Cloud Run 函数使用 OAuth 2.0 向 ARC Export API 进行身份验证,检索导出数据,确认书签以推进到下一个块,将结果以 NDJSON 格式写入 GCS 存储桶,然后 Google SecOps 通过 GCS V2 Feed 提取这些结果。
准备工作
请确保满足以下前提条件:
- Google SecOps 实例
- 已启用以下 API 的 Google Cloud 项目:
- Cloud Storage
- Cloud Run functions
- Cloud Scheduler
- Pub/Sub
- Cloud Build
- 创建和管理 Cloud Storage 存储分区、Cloud Run 函数、Pub/Sub 主题和 Cloud Scheduler 作业的权限
- 对 Digital Guardian 管理控制台 (DGMC) 的特权访问权限
- 对 Digital Guardian Analytics & Reporting Cloud (ARC) 租户设置的访问权限
- 在 DGMC 中配置 Cloud 服务的管理员权限
- 在 DGMC 中创建的具有有效 GUID 的导出配置文件
创建 Google Cloud Storage 存储分区
- 前往 Google Cloud 控制台。
- 选择您的项目或创建新项目。
- 在导航菜单中,依次前往 Cloud Storage > 存储分区。
- 点击创建存储分区。
提供以下配置详细信息:
设置 值 为存储桶命名 输入一个全局唯一的名称(例如 digitalguardian-edr-logs)位置类型 根据您的需求进行选择(区域级、双区域级、多区域级) 位置 选择离您的 Google SecOps 实例最近的位置(例如 us-central1)存储类别 标准(建议用于经常访问的日志) 访问权限控制 统一(推荐) 保护工具 可选:启用对象版本控制或保留政策 点击创建。
收集 Digital Guardian API 凭据
如需使 Cloud Run 函数能够从 Digital Guardian ARC 检索导出数据,您需要获取 API 凭据并配置导出配置文件。
从 DGMC 获取 API 凭据
- 登录 Digital Guardian Management Console (DGMC)。
- 依次前往系统 > 配置 > 云服务。
在 API 访问权限部分中,找到并记录以下值:
- API 访问 ID:这是用于 OAuth 2.0 身份验证的客户端 ID。
- API 访问密钥:这是用于 OAuth 2.0 身份验证的客户端密钥。
- 访问网关基础网址:API 网关端点(例如
https://accessgw-usw.msp.digitalguardian.com)。 - 授权服务器网址:OAuth 2.0 令牌端点(例如
https://authsrv.msp.digitalguardian.com/as/token.oauth2)。
创建和配置导出配置文件
- 在 Digital Guardian 管理控制台 (DGMC) 中,依次前往 Admin > Reports > Export Profiles。
- 点击创建导出配置或选择现有导出配置。
- 使用以下设置配置导出配置文件:
- 配置文件名称:输入一个描述性名称(例如
Google SecOps SIEM Integration)。 - 数据源:根据要导出的数据,选择事件或提醒。
- 导出格式:选择 JSON 平展表(建议用于 SIEM 集成)。
- 字段:选择要包含在导出内容中的字段。
- 过滤条件:配置任何过滤条件以限制导出的数据(可选)。
- 配置文件名称:输入一个描述性名称(例如
- 点击保存以创建导出配置文件。
保存后,在列表中找到导出配置文件,然后从导出配置文件网址或详情页面中复制 GUID。
记录凭据摘要
保存以下信息,以便配置 Cloud Run 函数环境变量:
- 客户端 ID(API 访问 ID):来自 DGMC Cloud Services
- 客户端密钥 (API 访问密钥):来自 DGMC Cloud Services
- 授权服务器网址:例如,
https://authsrv.msp.digitalguardian.com/as/token.oauth2 - Access Gateway 基准网址:例如
https://accessgw-usw.msp.digitalguardian.com - 导出配置文件 GUID:来自在 DGMC 中创建的导出配置文件
测试 API 访问权限
运行以下命令,验证您的凭据是否有效:
# Step 1: Obtain OAuth 2.0 access token curl -s -X POST \ -d "grant_type=client_credentials&client_id=YOUR_CLIENT_ID&client_secret=YOUR_CLIENT_SECRET&scope=client" \ "https://authsrv.msp.digitalguardian.com/as/token.oauth2"# Step 2: Test export endpoint with the access token curl -s -H "Authorization: Bearer YOUR_ACCESS_TOKEN" \ "https://accessgw-usw.msp.digitalguardian.com/rest/1.0/export/YOUR_EXPORT_PROFILE_GUID"如果响应成功,则会返回包含导出数据的 JSON 文档。如果您收到身份验证错误,请验证 DGMC Cloud Services 中的 API 访问 ID 和密钥。
为 Cloud Run 函数创建服务账号
- 在 Google Cloud 控制台中,依次前往 IAM 和管理 > 服务账号。
- 点击创建服务账号。
- 提供以下配置详细信息:
- 服务账号名称:输入
digitalguardian-ingestion(或一个描述性名称)。 - 服务账号说明:输入
Service account for Digital Guardian EDR Cloud Run function to write logs to GCS。
- 服务账号名称:输入
- 点击创建并继续。
- 在向此服务账号授予对项目的访问权限部分中,添加以下角色:
- Storage Object Admin(用于读取/写入 Cloud Storage 存储桶中的对象)
- Cloud Run Invoker(允许 Cloud Scheduler 调用该函数)
- 点击继续。
点击完成。
创建 Pub/Sub 主题
Cloud Scheduler 通过 Pub/Sub 主题触发 Cloud Run 函数。
- 在 Google Cloud 控制台中,前往 Pub/Sub > 主题。
- 点击创建主题。
- 在主题 ID 字段中,输入
digitalguardian-edr-trigger。 - 保留默认设置。
点击创建。
创建 Cloud Run 函数
创建一个 Cloud Run 函数,该函数使用 OAuth 2.0 客户端凭据向 Digital Guardian ARC 进行身份验证,检索导出数据,确认书签以推进到下一个块,并将结果以 NDJSON 格式写入 GCS。
准备函数源文件
为 Cloud Run 函数部署创建以下两个文件。
requirements.txt
functions-framework==3.* google-cloud-storage==2.* urllib3==2.*main.py
"""Cloud Run function to ingest Digital Guardian EDR logs into GCS.""" import json import os import time import urllib.parse from datetime import datetime, timezone import functions_framework import urllib3 from google.cloud import storage GCS_BUCKET = os.environ["GCS_BUCKET"] GCS_PREFIX = os.environ.get("GCS_PREFIX", "digitalguardian_edr") STATE_KEY = os.environ.get("STATE_KEY", "digitalguardian_edr_state.json") AUTH_SERVER_URL = os.environ["AUTH_SERVER_URL"] ARC_SERVER_URL = os.environ["ARC_SERVER_URL"] CLIENT_ID = os.environ["CLIENT_ID"] CLIENT_SECRET = os.environ["CLIENT_SECRET"] EXPORT_PROFILE_GUID = os.environ["EXPORT_PROFILE_GUID"] MAX_RECORDS = int(os.environ.get("MAX_RECORDS", "10000")) http = urllib3.PoolManager() gcs = storage.Client() def _get_access_token() -> str: """Obtain an OAuth 2.0 access token using client credentials grant.""" body = urllib.parse.urlencode({ "grant_type": "client_credentials", "client_id": CLIENT_ID, "client_secret": CLIENT_SECRET, "scope": "client", }) resp = http.request( "POST", AUTH_SERVER_URL, body=body, headers={"Content-Type": "application/x-www-form-urlencoded"}, ) if resp.status != 200: raise RuntimeError( f"OAuth token request failed: {resp.status} — " f"{resp.data.decode('utf-8')}" ) token_data = json.loads(resp.data.decode("utf-8")) return token_data["access_token"] def _arc_get(token: str, path: str, retries: int = 5) -> dict: """Execute a GET request against the ARC API with retry on 429.""" url = f"{ARC_SERVER_URL}{path}" headers = { "Authorization": f"Bearer {token}", "Accept": "application/json", } backoff = 2 for attempt in range(retries): resp = http.request("GET", url, headers=headers) if resp.status == 200: return json.loads(resp.data.decode("utf-8")) if resp.status == 429: wait = backoff * (2 ** attempt) print( f"Rate limited (429). Retrying in {wait}s " f"(attempt {attempt + 1}/{retries})." ) time.sleep(wait) continue raise RuntimeError( f"ARC API error: {resp.status} — {resp.data.decode('utf-8')}" ) raise RuntimeError( "ARC API rate limit exceeded after maximum retries." ) def _arc_acknowledge(token: str) -> None: """POST to the acknowledge endpoint to advance the export bookmark.""" url = ( f"{ARC_SERVER_URL}/rest/1.0/export/" f"{EXPORT_PROFILE_GUID}/acknowledge" ) headers = { "Authorization": f"Bearer {token}", "Accept": "application/json", } resp = http.request("POST", url, headers=headers) if resp.status not in (200, 204): raise RuntimeError( f"ARC acknowledge failed: {resp.status} — " f"{resp.data.decode('utf-8')}" ) print("Export bookmark acknowledged successfully.") def _load_state() -> dict: """Load the last run state from GCS.""" bucket = gcs.bugcs.bucketUCKET) blob = bucket.blob(f"{GCS_PREFIX}/{STATE_KEY}") if blob.exists(): return json.loads(blob.downlodownload_as_text return {} def _save_state(state: dict) -> None: """Persist run state to GCS.""" bucket = gcs.bugcs.bucketUCKET) blob = bucket.blob(f"{GCS_PREFIX}/{STATE_KEY}") blob.uploadupload_from_string json.dumps(state), content_type="application/json" ) def _fetch_export(token: str) -> list: """Fetch export data from the ARC Export API.""" path = f"/rest/1.0/export/{EXPORT_PROFILE_GUID}" data = _arc_get(token, path) records = data if isinstance(data, list) else data.get("data", []) return records[:MAX_RECORDS] def _write_ndjson(records: list, run_ts: str) -> str: """Write records as NDJSON to GCS and return the blob path.""" bucket = gcs.bugcs.bucketUCKET) blob_path = ( f"{GCS_PREFIX}/year={run_ts[:4]}/month={run_ts[5:7]}/" f"day={run_ts[8:10]}/{run_ts}_export.ndjson" ) blob = bucket.blob(blob_path) ndjson = "\n".join( json.dumps(r, separators=(",", ":")) for r in records ) blob.uploadupload_from_stringn, content_type="application/x-ndjson") return blob_path @functions_framework.cloud_event def main(cloud_event): """Entry point triggered by Pub/Sub via Cloud Scheduler.""" state = _load_state() now = datetime.now(timezone.utc) print("Authenticating to Digital Guardian ARC.") token = _get_access_token() print( f"Fetching export data for profile {EXPORT_PROFILE_GUID}." ) records = _fetch_export(token) if not records: print("No new export data found.") return "OK" run_ts = now.strftime("%Y-%m-%dT%H%M%SZ") blob_path = _write_ndjson(records, run_ts) print( f"Wrote {len(records)} records to " f"gs://{GCS_BUCKET}/{blob_path}." ) _arc_acknowledge(token) state["last_run"] = now.isoformat() state["records_written"] = len(records) _save_state(state) print(f"State updated. last_run={now.isoformat()}.") return "OK"
部署 Cloud Run 函数
- 将这两个文件(
main.py和requirements.txt)保存到本地目录(例如digitalguardian-function/)中。 - 打开 Cloud Shell 或安装了
gcloudCLI 的终端。 运行以下命令以部署函数:
gcloud functions deploy digitalguardian-edr-to-gcs \ --gen2 \ --region=us-central1 \ --runtime=python312 \ --trigger-topic=digitalguardian-edr-trigger \ --entry-point=main \ --memory=512MB \ --timeout=540s \ --service-account=digitalguardian-ingestion@PROJECT_ID.iam.gserviceaccount.com \ --set-env-vars=\ "GCS_BUCKET=digitalguardian-edr-logs",\ "GCS_PREFIX=digitalguardian_edr",\ "STATE_KEY=digitalguardian_edr_state.json",\ "AUTH_SERVER_URL=https://authsrv.msp.digitalguardian.com/as/token.oauth2",\ "ARC_SERVER_URL=https://accessgw-usw.msp.digitalguardian.com",\ "CLIENT_ID=YOUR_CLIENT_ID",\ "CLIENT_SECRET=YOUR_CLIENT_SECRET",\ "EXPORT_PROFILE_GUID=YOUR_EXPORT_PROFILE_GUID",\ "MAX_RECORDS=10000"替换以下占位值:
PROJECT_ID:您的 Google Cloud 项目 ID。digitalguardian-edr-logs:您的 GCS 存储桶名称。YOUR_CLIENT_ID:您的 Digital Guardian API 访问 ID。YOUR_CLIENT_SECRET:您的 Digital Guardian API 访问密钥。YOUR_EXPORT_PROFILE_GUID:DGMC 中的导出配置文件 GUID。
通过检查函数状态来验证部署:
gcloud functions describe digitalguardian-edr-to-gcs --region=us-central1 --gen2
环境变量参考文档
| 变量 | 必需 | 默认 | 说明 |
|---|---|---|---|
GCS_BUCKET |
是 | 用于存储 NDJSON 输出的 GCS 存储桶名称 | |
GCS_PREFIX |
否 | digitalguardian_edr |
存储桶中的对象前缀(文件夹路径) |
STATE_KEY |
否 | digitalguardian_edr_state.json |
前缀内状态文件的 blob 名称 |
AUTH_SERVER_URL |
是 | OAuth 2.0 授权服务器网址 | |
ARC_SERVER_URL |
是 | ARC Access Gateway 基准网址 | |
CLIENT_ID |
是 | 来自 DGMC 的 API 访问 ID | |
CLIENT_SECRET |
是 | 来自 DGMC 的 API 访问密钥 | |
EXPORT_PROFILE_GUID |
是 | 从 DGMC 导出个人资料 GUID | |
MAX_RECORDS |
否 | 10000 |
每次执行要写入的记录数上限 |
创建 Cloud Scheduler 作业
Cloud Scheduler 通过 Pub/Sub 主题定期触发 Cloud Run 函数。
- 在 Google Cloud 控制台中,前往 Cloud Scheduler。
- 点击创建作业。
提供以下配置详细信息:
- 名称:输入
digitalguardian-edr-ingestion-schedule。 - 区域:选择与 Cloud Run 函数相同的区域(例如
us-central1)。 频次:输入
*/5 * * * *(每 5 分钟)。时区:选择您的首选时区(例如
UTC)。
- 名称:输入
点击继续。
在配置执行部分中:
- 目标类型:选择 Pub/Sub。
- 主题:选择
digitalguardian-edr-trigger。 - 消息正文:输入
{"run": true}。
点击继续。
在配置可选设置部分中:
- 重试次数上限:输入
3。 - 退避时长下限:输入
5s。 - 退避时长上限:输入
60s。
- 重试次数上限:输入
点击创建。
如需立即运行测试,请点击作业名称旁边的三点状图标 (...),然后选择强制运行。
检索 Google SecOps 服务账号并配置 Feed
Google SecOps 使用唯一的服务账号从您的 GCS 存储桶中读取数据。您必须向此服务账号授予对您的存储桶的访问权限。
获取服务账号电子邮件地址
- 依次前往 SIEM 设置> Feed。
- 点击添加新 Feed。
- 点击配置单个 Feed。
- 在 Feed 名称字段中,输入 Feed 的名称(例如
Digital Guardian EDR Logs)。 - 选择 Google Cloud Storage V2 作为来源类型。
- 选择 Digital Guardian EDR 作为日志类型。
点击获取服务账号。系统会显示一个唯一的服务账号电子邮件地址,例如:
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com复制此电子邮件地址,以便在下一步中使用。
点击下一步。
为以下输入参数指定值:
存储桶网址:输入 GCS 存储桶 URI:
gs://digitalguardian-edr-logs/digitalguardian_edr/- 将
digitalguardian-edr-logs替换为您的 GCS 存储桶名称。 - 将
digitalguardian_edr替换为您配置的GCS_PREFIX值。
- 将
来源删除选项:根据您的偏好选择删除选项:
- 永不:转移后永不删除任何文件(建议用于测试)。
- 删除已转移的文件:成功转移后删除文件。
删除已转移的文件和空目录:成功转移后删除文件和空目录。
文件存在时间上限:包含在过去指定天数内修改的文件(默认值为 180 天)。
资产命名空间:资产命名空间。
注入标签:要应用于此 Feed 中事件的标签。
点击下一步。
在最终确定界面中查看新的 Feed 配置,然后点击提交。
向 Google SecOps 服务账号授予 IAM 权限
Google SecOps 服务账号需要对您的 Cloud Storage 存储桶具有 Storage Object Viewer 角色。
- 前往 Cloud Storage > 存储分区。
- 点击您的存储桶名称(例如
digitalguardian-edr-logs)。 - 前往权限标签页。
- 点击授予访问权限。
- 提供以下配置详细信息:
- 添加主账号:粘贴 Google SecOps 服务账号电子邮件地址(例如
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com)。 - 分配角色:选择 Storage Object Viewer。
- 添加主账号:粘贴 Google SecOps 服务账号电子邮件地址(例如
点击保存。
UDM 映射表
| 日志字段 | UDM 映射 | 逻辑 |
|---|---|---|
| 应用 | target.application | 直接复制值 |
| 应用 | target.process.command_line | 如果规则与 Printer 匹配,则设置为 %{Application}。 |
| Bytes_Written | network.sent_bytes | 直接复制并转换为 uinteger 的值 |
| 类别、Computer_Name、详细信息 | metadata.description | 如果 Category == "Policies" 且 Computer_Name 为空,则设置为 %{Detail};否则,在 grok_parse_failure 时设置为 %{message} |
| Command_Line、Command_Line1 | principal.process.command_line | 如果 Command_Line 中的值不为空,则为该值(移除尾随引号),否则为 Command_Line1 中的值(移除尾随引号) |
| Computer_Name,来源 | principal.hostname | 如果 computerName 不为空,则使用该值;否则,设置为 %{source} |
| Destination_Device_Serial_Number, Destination_Device_Serial_Number1 | 使用用于处理引号的 grok 模式提取 | |
| Destination_Directory、Destination_File | target.file.full_path | 如果 Destination_Directory 和 Destination_File 均不为空,则将两者串联 |
| Destination_Drive_Type | security_result.detection_fields | 与 destination_drive_type_label 合并(键:Destination_Drive_Type,值:%{Destination_Drive_Type}) |
| Destination_File | target.file.names | 从 Destination_File 合并 |
| Destination_File_Extension | target.file.mime_type | 直接复制值 |
| Dll_SHA1_Hash | target.process.file.sha1 | 直接复制的值(在转换为小写后) |
| Email_Address | principal.user.email_addresses | 从 Email_Address 合并 |
| Email_Sender、Email_Subject | network.email.from | 如果不为空,则设置为 %{Email_Sender} |
| Email_Sender、Email_Subject | network.email.subject | 如果 Email_Sender 不为空,则从主题 (%{Email_Subject}) 合并 |
| File_Extension | principal.process.file.mime_type | 直接复制值 |
| IP_Address, source_ip | principal.ip | 如果 source_ip 不为空,则从 source_ip 合并,否则从 IP_Address 合并 |
| Local_Port、source_port | principal.port | 如果 source_port 不为空,则使用 source_port 中的值并将其转换为整数;否则,使用 Local_Port 中的值并将其转换为整数 |
| MD5_Checksum | target.process.file.md5 | 直接复制的值(在转换为小写后) |
| Network_Direction | network.direction | 如果为 True,则设置为 INBOUND;如果为 False,则设置为 OUTBOUND |
| Process_PID | principal.process.pid | 直接复制值 |
| Process_SHA256_Hash | target.process.file.sha256 | 直接复制的值(在转换为小写后) |
| Product_Version | metadata.product_version | 直接复制值 |
| 协议 | network.ip_protocol | 如果值为“1”,则设置为 ICMP |
| Remote_Port | target.port | 直接复制并转换为整数的值 |
| 规则 | security_result.rule_name | 直接复制值 |
| 规则 | metadata.event_type | 如果匹配 .Printer.,则设置为 PROCESS_UNCATEGORIZED;如果匹配 DLP.*,则设置为 FILE_MOVE |
| 严重程度 | security_result.severity | 如果转换为整数后 <=3,则设置为 LOW;如果 <=6,则设置为 MEDIUM;如果 <=8,则设置为 HIGH;如果 <=10,则设置为 CRITICAL |
| 严重程度 | security_result.severity_details | 直接复制值 |
| Source_Directory、Source_File | src.file.full_path | 如果 Source_Directory 和 Source_File 均不为空,则将两者串联 |
| Source_Drive_Type | security_result.detection_fields | 与 source_drive_type_label 合并(键:Source_Drive_Type,值:%{Source_Drive_Type}) |
| Source_File | src.file.names | 从 Source_File 合并 |
| Source_File_Extension | src.file.mime_type | 直接复制值 |
| 网址_Path、http_url | target.url | 如果 http_url 不为空,则取自 http_url,否则取自 网址_Path |
| User_Name | principal.user.userid | 从 userName 中提取的 Grok 值 |
| User_Name | principal.administrative_domain | 从 domainName 中提取 grok 后的值 |
| Was_Removable | security_result.detection_fields | 已与 was_removable_label 合并(键:Was_Removable,值:%{Was_Removable}) |
| Was_Source_Removable | security_result.detection_fields | 已与 was_source_removable_label 合并(键:Was_Source_Removable,值:%{Was_Source_Removable}) |
| computerName、destination_ip、protocol、source_ip、IP_Address、destination、userName、Process_PID、Category、Computer_Name | metadata.event_type | 初始设置为 GENERIC_EVENT;如果协议为 HTTPS 且有 destination_ip 或 computerName,则设置为 NETWORK_HTTP;如果有 source_ip 或 IP_Address 且有 destination_ip,则设置为 NETWORK_CONNECTION;如果 userName 不为空,则设置为 USER_UNCATEGORIZED;如果 Process_PID 不为空,则设置为 SCAN_PROCESS |
| destination_ip | target.ip | 从 destination_ip 合并 |
| incidents_url, matched_policies_by_severity | security_result | 已与 _sr 合并(rule_name: %{matched_policies_by_severity}, url_back_to_product: %{incidents_url}) |
| 协议 | network.application_protocol | 如果协议为 HTTP 或 HTTPS,则设置为 HTTPS |
| security_action | security_result.action | 从 security_action 合并 |
| metadata.product_name | 设置为“企业 DLP 平台” | |
| metadata.vendor_name | 设置为“DigitalGuardian” |
需要更多帮助?获得社区成员和 Google SecOps 专业人士的解答。