收集 Elastic Defend 日志
本文档介绍了如何使用 Google Cloud Storage V2 和 Cloud Run 函数将 Elastic Defend 日志注入到 Google Security Operations 中。
Elastic Defend 是 Elastic Security 中的一种端点检测和响应 (EDR) 解决方案,可提供防护、检测和响应功能,并可深入了解 Windows、macOS 和 Linux 操作系统。它会监控进程执行、文件活动、网络连接、注册表修改和库加载,以在端点级别检测和防范威胁。数据存储在 Elasticsearch 中,可以使用 Elasticsearch Search API 进行检索。
准备工作
请确保满足以下前提条件:
- Google SecOps 实例
- 已启用以下 API 的 Google Cloud 项目:
- Cloud Storage
- Cloud Run functions
- Cloud Scheduler
- Pub/Sub
- IAM
- 对已部署 Elastic Defend 的 Elasticsearch 集群的访问权限
- 在 Elasticsearch 中创建 API 密钥的权限(
manage_security、manage_api_key或manage_own_api_key集群权限) - 从 Cloud Run 函数到 Elasticsearch 集群的网络连接
创建 Google Cloud Storage 存储分区
- 前往 Google Cloud 控制台。
- 选择您的项目或创建新项目。
- 在导航菜单中,依次前往 Cloud Storage > 存储分区。
- 点击创建存储分区。
提供以下配置详细信息:
设置 值 为存储桶命名 输入一个全局唯一的名称(例如 elastic-defend-logs)位置类型 根据您的需求进行选择(区域级、双区域级、多区域级) 位置 选择相应位置(例如 us-central1)存储类别 标准(建议用于经常访问的日志) 访问权限控制 统一(推荐) 保护工具 可选:启用对象版本控制或保留政策 点击创建。
收集 Elastic Defend 凭据
如需让 Cloud Run 函数能够检索 Elastic Defend 事件,您需要创建一个对 logs-endpoint 数据流具有读取权限的 API 密钥。
使用 Kibana 创建 API 密钥
- 登录 Kibana。
- 在导航菜单或全局搜索字段中,依次前往堆栈管理 > API 密钥。
- 点击创建 API 密钥。
- 在名称字段中,输入
Google SecOps Cloud Storage Integration。 - 在失效字段中,可选择性地设置失效日期。默认情况下,API 密钥不会过期。
- 点击控制安全权限。
- 在指数部分中,点击添加指数权限。
- 配置索引权限:
- 指数:输入
logs-endpoint.* - 权限:选择读取
- 指数:输入
- 将集群权限部分留空(无需任何集群权限)。
- 点击创建 API 密钥。
记录 API 凭据
创建 API 密钥后,系统会显示一个对话框,其中包含您的凭据:
编码:经过 base64 编码的 API 密钥(例如
VnVhQ2ZHY0JDZGJrUW0tZTVhT3g6dWkybHAyYXhUTm1zeWFrdzl0dk5udw==)您还需要记录 Elasticsearch 端点网址:
对于 Elastic Cloud:端点显示在 Cloud 控制台中部署的 Elasticsearch 部分下(例如
https://my-deployment.es.us-central1.gcp.cloud.es.io:443)对于自行管理的 Elasticsearch:使用 Elasticsearch 集群的主机名或 IP 地址以及端口(例如
https://elasticsearch.example.com:9200)
使用开发者工具创建 API 密钥(替代方法)
或者,您也可以使用 Kibana 开发工具创建 API 密钥:
- 登录 Kibana。
- 前往管理 > 开发者工具。
在控制台中,运行以下命令:
POST /_security/api_key { "name": "Google SecOps Cloud Storage Integration", "role_descriptors": { "chronicle_reader": { "indices": [ { "names": ["logs-endpoint.*"], "privileges": ["read"] } ] } }, "metadata": { "application": "google-chronicle-gcs", "environment": "production" } }响应包含您的 API 密钥凭据:
{ "id": "VuaCfGcBCdbkQm-e5aOx", "name": "Google SecOps Cloud Storage Integration", "api_key": "ui2lp2axTNmsyakw9tvNnw", "encoded": "VnVhQ2ZHY0JDZGJrUW0tZTVhT3g6dWkybHAyYXhUTm1zeWFrdzl0dk5udw==" }复制并保存编码后的值。这是您将用于身份验证的 base64 编码 API 密钥。
创建服务账号
为 Cloud Run 函数创建专用服务账号。
- 在 Google Cloud 控制台中,依次前往 IAM 和管理 > 服务账号。
- 点击创建服务账号。
- 提供以下配置详细信息:
- 服务账号名称:输入
elastic-defend-collector - 服务账号说明:输入
Service account for Elastic Defend log collection to GCS
- 服务账号名称:输入
- 点击创建并继续。
- 在向此服务账号授予对项目的访问权限部分中,添加以下角色:
- 点击选择角色,然后选择 Storage Object Admin (
roles/storage.objectAdmin)。 - 点击添加其他角色,然后选择 Cloud Run Invoker (
roles/run.invoker)。
- 点击选择角色,然后选择 Storage Object Admin (
- 点击继续。
- 点击完成。
创建发布/订阅主题
创建一个 Pub/Sub 主题,以便从 Cloud Scheduler 触发 Cloud Run 函数。
- 在 Google Cloud 控制台中,前往 Pub/Sub > 主题。
- 点击创建主题。
- 提供以下配置详细信息:
- 主题 ID:输入
elastic-defend-trigger - 添加默认订阅:保持选中状态
- 主题 ID:输入
- 点击创建。
创建 Cloud Run 函数
创建 Cloud Run 函数,用于从 Elasticsearch 中检索事件并将其写入 GCS。
创建函数
- 在 Google Cloud 控制台中,前往 Cloud Run 函数。
- 点击创建函数。
提供以下配置详细信息:
设置 值 环境 第 2 代 函数名称 elastic-defend-to-gcs区域 选择与您的 GCS 存储桶相同的区域 触发器类型 Cloud Pub/Sub Cloud Pub/Sub 主题 选择 elastic-defend-trigger分配的内存 512 MiB(如果数据量较大,可增加此值) 超时 540 秒 运行时服务账号 选择 elastic-defend-collector点击下一步。
添加环境变量
在运行时、构建、连接和安全设置部分中添加以下环境变量:
变量 值 GCS_BUCKETGCS 存储桶的名称(例如 elastic-defend-logs)GCS_PREFIX日志文件的前缀(例如 elastic-defend)STATE_KEY状态文件的名称(例如, state.json)ES_HOSTElasticsearch 网址(例如 https://my-deployment.es.us-central1.gcp.cloud.es.io:443)ES_API_KEY凭据创建步骤中的编码 API 密钥 MAX_RECORDS每次执行的记录数上限(例如, 100000)PAGE_SIZE每次搜索请求的记录数(例如 1000)LOOKBACK_HOURS首次运行时的回溯小时数(例如 24)
添加函数代码
- 选择 Python 3.11 作为运行时。
- 将入口点设置为
main。 - 在源代码部分中,选择内嵌编辑器。
将
main.py的内容替换为以下代码:import os import json import datetime import base64 import requests from google.cloud import storage GCS_BUCKET = os.environ["GCS_BUCKET"] GCS_PREFIX = os.environ.get("GCS_PREFIX", "elastic-defend") STATE_KEY = os.environ.get("STATE_KEY", "state.json") ES_HOST = os.environ["ES_HOST"] ES_API_KEY = os.environ["ES_API_KEY"] MAX_RECORDS = int(os.environ.get("MAX_RECORDS", "100000")) PAGE_SIZE = int(os.environ.get("PAGE_SIZE", "1000")) LOOKBACK_HOURS = int(os.environ.get("LOOKBACK_HOURS", "24")) INDEX_PATTERN = "logs-endpoint.*" SEARCH_PATH = f"/{INDEX_PATTERN}/_search" def _gcs_client(): return storage.Client() def _load_state(bucket): blob = bucket.blob(f"{GCS_PREFIX}/{STATE_KEY}") if blob.exists(): return json.loads(blob.download_as_text()) return {} def _save_state(bucket, state): blob = bucket.blob(f"{GCS_PREFIX}/{STATE_KEY}") blob.upload_from_string( json.dumps(state), content_type="application/json", ) def _build_query(gte_ts, sort_after=None): body = { "size": PAGE_SIZE, "query": { "range": { "@timestamp": { "gte": gte_ts, "format": "strict_date_optional_time", } } }, "sort": [ {"@timestamp": {"order": "asc"}}, {"_shard_doc": "asc"}, ], } if sort_after: body["search_after"] = sort_after return body def _search(session, body): url = f"{ES_HOST.rstrip('/')}{SEARCH_PATH}" resp = session.post( url, json=body, headers={ "Authorization": f"ApiKey {ES_API_KEY}", "Content-Type": "application/json", }, timeout=120, ) resp.raise_for_status() return resp.json() def _write_ndjson(bucket, records, ts_label): if not records: return now = datetime.datetime.utcnow().strftime("%Y%m%dT%H%M%SZ") blob_name = f"{GCS_PREFIX}/{ts_label}/{now}.ndjson" blob = bucket.blob(blob_name) ndjson = "\n".join(json.dumps(r, separators=(",", ":")) for r in records) blob.upload_from_string(ndjson, content_type="application/x-ndjson") print(f"Wrote {len(records)} records to gs://{GCS_BUCKET}/{blob_name}") def main(event, context): """Cloud Run function entry point triggered by Pub/Sub.""" client = _gcs_client() bucket = client.bucket(GCS_BUCKET) state = _load_state(bucket) sort_after = state.get("sort_after") if state.get("last_timestamp"): gte_ts = state["last_timestamp"] else: gte_ts = ( datetime.datetime.utcnow() - datetime.timedelta(hours=LOOKBACK_HOURS) ).strftime("%Y-%m-%dT%H:%M:%S.%fZ") session = requests.Session() total = 0 batch = [] last_ts = gte_ts ts_label = datetime.datetime.utcnow().strftime("%Y/%m/%d/%H") while total < MAX_RECORDS: body = _build_query(gte_ts, sort_after) result = _search(session, body) hits = result.get("hits", {}).get("hits", []) if not hits: break for hit in hits: doc = hit.get("_source", {}) doc["_id"] = hit.get("_id") doc["_index"] = hit.get("_index") batch.append(doc) hit_ts = doc.get("@timestamp", last_ts) if hit_ts > last_ts: last_ts = hit_ts sort_after = hits[-1].get("sort") total += len(hits) if len(batch) >= PAGE_SIZE: _write_ndjson(bucket, batch, ts_label) batch = [] if len(hits) < PAGE_SIZE: break if batch: _write_ndjson(bucket, batch, ts_label) new_state = { "last_timestamp": last_ts, "sort_after": sort_after, } _save_state(bucket, new_state) print(f"Done. Fetched {total} records. State: {json.dumps(new_state)}") return f"OK: {total} records"使用以下内容替换
requirements.txt的内容:functions-framework==3.* google-cloud-storage==2.* requests==2.*点击部署。
等待部署成功完成。
创建 Cloud Scheduler 作业
创建一个 Cloud Scheduler 作业,以便按常规时间表触发 Cloud Run 函数。
- 在 Google Cloud 控制台中,前往 Cloud Scheduler。
- 点击创建作业。
提供以下配置详细信息:
设置 值 名称 elastic-defend-scheduler区域 选择与 Cloud Run 函数相同的区域 频率 */5 * * * *(每 5 分钟)时区 选择您所在的时区(例如 UTC)点击继续。
在配置执行部分中:
- 目标类型:选择 Pub/Sub
- Cloud Pub/Sub 主题:选择
elastic-defend-trigger - 消息正文:输入
{"run": true}
点击创建。
检索 Google SecOps 服务账号并配置 Feed
Google SecOps 使用唯一的服务账号从您的 GCS 存储桶中读取数据。您必须向此服务账号授予对您的存储桶的访问权限。
获取服务账号电子邮件地址
- 依次前往 SIEM 设置> Feed。
- 点击添加新 Feed。
- 点击配置单个 Feed。
- 在 Feed 名称字段中,输入 Feed 的名称(例如
Elastic Defend Events)。 - 选择 Google Cloud Storage V2 作为来源类型。
- 选择 Elastic Defend 作为日志类型。
点击获取服务账号。系统会显示一个唯一的服务账号电子邮件地址,例如:
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com复制此电子邮件地址,以便在下一步中使用。
配置 Feed
- 点击下一步。
为以下输入参数指定值:
存储桶网址:输入带有前缀路径的 GCS 存储桶 URI:
gs://elastic-defend-logs/elastic-defend/将
elastic-defend-logs替换为您的 GCS 存储桶名称。来源删除选项:根据您的偏好选择删除选项:
- 永不:转移后永不删除任何文件(建议用于测试)。
- 删除已转移的文件:成功转移后删除文件。
删除已转移的文件和空目录:成功转移后删除文件和空目录。
文件存在时间上限:包含在过去指定天数内修改的文件(默认值为 180 天)
资产命名空间:资产命名空间
注入标签:要应用于此 Feed 中事件的标签
点击下一步。
在最终确定界面中查看新的 Feed 配置,然后点击提交。
向 Google SecOps 服务账号授予 IAM 权限
Google SecOps 服务账号需要对您的 GCS 存储桶具有 Storage Object Viewer 角色。
- 前往 Cloud Storage > 存储分区。
- 点击您的存储桶名称(例如
elastic-defend-logs)。 - 前往权限标签页。
- 点击授予访问权限。
- 提供以下配置详细信息:
- 添加主账号:粘贴 Google SecOps 服务账号电子邮件地址
- 分配角色:选择 Storage Object Viewer
点击保存。
UDM 映射表
| 日志字段 | UDM 映射 | 逻辑 |
|---|---|---|
| _source.agent.id、_source.agent.type、_source.agent.version、_source.host.architecture、_source.event.agent_id_status、_source.event.id、_source.user.id、_source.group.id、_source.data_stream.type、_source.agent.build.original | additional.fields | 与包含所列字段值的标签对象合并 |
| _source.process.Ext.session_info.logon_type | extensions.auth.auth_details | 直接复制值 |
| _source.host.os.full | hardware.cpu_platform | 直接复制值 |
| _source.host.id | hardware.serial_number | 直接复制值 |
| _source.rule.description | metadata.description | 直接复制值 |
| _source.@timestamp | metadata.event_timestamp | 使用日期过滤器转换,格式为 ISO8601、yyyy-MM-ddTHH:mm:ss.SSSSSSSSSZ、yyyy-MM-ddTHH:mm:ss.SSSSSSZ、yyyy-MM-ddTHH:mm:ss.SSSZ、yyyy-MM-ddTHH:mm:ssZ |
| metadata.event_type | 根据索引、event.action 和 has* 条件设置 | |
| metadata.log_type | 设置为“ELASTIC_DEFEND” | |
| metadata.product_event_type | 如果 _index ~ events.file,则设置为“文件事件”;如果 events.library,则设置为“库事件”;如果 events.network,则设置为“网络事件”;如果 events.process,则设置为“进程事件”;如果 events.registry,则设置为“注册表事件”;如果 events.security,则设置为“安全事件”;如果 events.api,则设置为“API 事件”;如果 .alert,则设置为“提醒” | |
| _id | metadata.product_log_id | 如果 _id 中的值不在 ["", " ", "null", "N/A"] 中,则使用该值 |
| _source.ecs.version | metadata.product_version | 直接复制值 |
| _source.network.type | network.application_protocol_version | 直接复制值 |
| _source.network.transport | network.ip_protocol | 如果匹配 (?i)tcp,则设置为“TCP”;如果匹配 (?i)udp,则设置为“UDP”;如果匹配 (?i)icmp,则设置为“ICMP”;否则设置为“UNKNOWN_IP_PROTOCOL” |
| _source.destination.as.organization.name | network.organization_name | 直接复制值 |
| _source.Endpoint.policy.applied.artifacts.global.identifiers | observer.file.names | 从 _source.Endpoint.policy.applied.artifacts.global.identifiers 合并 |
| _source.Endpoint.policy.applied.artifacts.global.version, _source.Endpoint.policy.applied.artifacts.global.snapshot | observer.resource.attribute.labels | 与包含所列字段值的标签对象合并 |
| _source.Endpoint.policy.applied.artifacts.user.version | observer.user.attribute.labels | 与包含来自 _source.Endpoint.policy.applied.artifacts.user.version 的值的标签对象合并 |
| _source.host.os.full | principal.asset.hardware.cpu_platform | 直接复制值 |
| _source.host.id | principal.asset.hardware.serial_number | 直接复制值 |
| _source.host.name | principal.asset.hostname | 直接复制值 |
| _source.host.ip | principal.asset.ip | 从 _source.host.ip 合并 |
| _source.host.os.type | principal.asset.platform_software.platform | 如果匹配 (?i)windows,则设置为“WINDOWS”;如果匹配 (?i)linux,则设置为“LINUX”;如果匹配 (?i)mac,则设置为“MAC”;如果匹配 (?i)ios,则设置为“IOS”;否则设置为“UNKNOWN_PLATFORM” |
| _source.host.os.kernel | principal.asset.platform_software.platform_patch_level | 直接复制值 |
| _source.event.created | principal.domain.creation_time | 使用日期过滤器转换,格式为 ISO8601、yyyy-MM-ddTHH:mm:ss.SSSSSSSSSZ、yyyy-MM-ddTHH:mm:ss.SSSSSSZ、yyyy-MM-ddTHH:mm:ss.SSSZ、yyyy-MM-ddTHH:mm:ssZ |
| _source.user.domain | principal.domain.name | 直接复制值 |
| _source.process.thread.capabilities.effective | principal.file.capabilities_tags | 从 _source.process.thread.capabilities.effective 合并 |
| _source.process.executable | principal.file.full_path | 直接复制值 |
| _source.process.hash.md5 | principal.file.md5 | 直接复制值 |
| _source.file.name | principal.file.names | 从 _source.file.name 合并 |
| _source.process.hash.sha1 | principal.file.sha1 | 直接复制值 |
| _source.process.hash.sha256 | principal.file.sha256 | 直接复制值 |
| _source.host.hostname | principal.hostname | 直接复制值 |
| _source.host.ip | principal.ip | 从 _source.host.ip 合并 |
| _source.host.mac | principal.mac | 在将 - 替换为 : 后,从 _source.host.mac 合并 |
| _source.host.os.Ext.variant | principal.platform_version | 直接复制值 |
| _source.source.port | principal.port | 转换为字符串,然后再转换为整数 |
| _source.process.command_line, _source.process.name | principal.process.command_line | 如果 _source.process.command_line 不为空,则取自该字段,否则取自 _source.process.name |
| _source.process.thread.capabilities.permitted | principal.process.file.capabilities_tags | 从 _source.process.thread.capabilities.permitted 合并 |
| _source.process.executable | principal.process.file.full_path | 直接复制值 |
| _source.process.hash.md5 | principal.process.file.md5 | 直接复制值 |
| _source.process.hash.sha1 | principal.process.file.sha1 | 直接复制值 |
| _source.process.hash.sha256 | principal.process.file.sha256 | 直接复制值 |
| _source.process.parent.executable | principal.process.parent_process.file.full_path | 直接复制值 |
| _source.process.pid | principal.process.pid | 转换为字符串,然后复制 |
| _source.process.Ext.api.name | principal.resource.attribute.labels | 与包含来自 _source.process.Ext.api.name 的值的标签对象合并 |
| _source.event.code | principal.resource.product_object_id | 直接复制值 |
| _source.group.name | principal.user.group_identifiers | 从 _source.group.name 合并 |
| _source.user.name | principal.user.userid | 直接复制值 |
| _source.user.id | principal.user.windows_sid | 如果与正则表达式 ^S-\\d-(\\\\d+-){1,14}\\\\d+$ 匹配,则为 _source.user.id 中的值 |
| _source.file.Ext.malware_signature.primary.signature.hash.sha256 | security_result.about.file.sha256 | 直接复制值 |
| _source.event.outcome | security_result.action | 来自 _source.event.outcome 的值(大写),然后设置为 ALLOW(如果位于 [SUCCESS, ALLOW] 中)、BLOCK(如果位于 [FAILURE, DENY, SKIPPED, RATE_LIMIT] 中)、UNKNOWN_ACTION(如果为 UNKNOWN) |
| _source.event.action | security_result.action_details | 直接复制值 |
| _source.destination.geo.region_iso_code | security_result.associations | 与包含来自 _source.destination.geo.region_iso_code 的名称的对象合并 |
| _source.kibana.alert.rule.parameters.threat.tactic.id, _source.kibana.alert.rule.parameters.threat.tactic.name | security_result.attack_details.tactics | 与包含所列字段中的 ID 和名称的对象合并 |
| _source.kibana.alert.rule.parameters.threat.technique.id, _source.kibana.alert.rule.parameters.threat.technique.name, _source.kibana.alert.rule.parameters.threat.technique.subtechnique.id, _source.kibana.alert.rule.parameters.threat.technique.subtechnique.name | security_result.attack_details.techniques | 与包含以下字段的对象合并:id、name、subtechnique_id、subtechnique_name |
| _source.event.category | security_result.category_details | 从 _source.event.category 合并 |
| _source.kibana.alert.rule.description | security_result.description | 直接复制值 |
| _source.event.kind, _source.file.Ext.malware_signature.all_names, _source.file.Ext.malware_signature.identifier, _source.event.risk_score, _source.threat.tactic.reference, _source.threat.technique.reference, _source.threat.technique.subtechnique.reference | security_result.detection_fields | 与包含所列字段值的标签对象合并 |
| _source.rule.id, _source.kibana.alert.rule.rule_id | security_result.rule_id | 如果 _source.rule.id 不为空,则返回该值;否则,返回 _source.kibana.alert.rule.rule_id |
| _source.rule.name, _source.kibana.alert.rule.name | security_result.rule_name | 如果 _source.rule.name 不为空,则为该值;否则为 _source.kibana.alert.rule.name 中的值 |
| _source.rule.ruleset | security_result.rule_set | 直接复制值 |
| security_result.severity | 设置为“LOW”;如果 _index 与 .alert 匹配,则设置为“HIGH”;如果 _source.kibana.alert.rule.parameters.severity 与 (?i)LOW 匹配,则设置为“LOW” | |
| _source.message | security_result.summary | 直接复制值 |
| _source.file.Ext.malware_signature.primary.signature.id | security_result.threat_id | 直接复制值 |
| _source.file.Ext.malware_signature.primary.signature.name | security_result.threat_name | 直接复制值 |
| _source.source.address, _source.source.ip | src.asset.ip | 从 _source.source.address 和 _source.source.ip 合并而来 |
| _source.source.address, _source.source.ip | src.ip | 从 _source.source.address 和 _source.source.ip 合并而来 |
| _source.host.name | target.asset.hostname | 直接复制值 |
| _source.destination.address, _source.destination.ip | target.asset.ip | 从 _source.destination.address 和 _source.destination.ip 合并而来 |
| _source.file.path、_source.dll.path、_source.process.executable、_source.Target.process.executable | target.file.full_path | 如果为 events.file,则为 _source.file.path 中的值;如果为 events.library,则为 _source.dll.path 中的值;如果为 events.process 或 events.api,则为 _source.process.executable 中的值;如果为 events.api,则为 _source.Target.process.executable 中的值 |
| _source.dll.hash.md5、_source.process.hash.md5 | target.file.md5 | 如果为 events.library,则值为 _source.dll.hash.md5;如果为.alert,则值为 _source.process.hash .md5 |
| _source.dll.name, _source.process.name | target.file.names | 如果为 events.library,则从 _source.dll.name 合并;如果为.alert,则从 _source.process .name 合并 |
| _source.dll.hash.sha1, _source.process.hash.sha1 | target.file.sha1 | 如果为 events.library,则值为 _source.dll.hash.sha1;如果为.alert,则值为 _source.process.hash .sha1 |
| _source.dll.hash.sha256、_source.process.hash.sha256 | target.file.sha256 | 如果为 events.library,则为 _source.dll.hash.sha256 中的值;如果为.alert,则为 _source.process.hash .sha256 中的值 |
| _source.host.name | target.hostname | 直接复制值 |
| _source.destination.address, _source.destination.ip | target.ip | 从 _source.destination.address 和 _source.destination.ip 合并而来 |
| _source.destination.geo.city_name | target.location.city | 直接复制值 |
| _source.destination.geo.country_name | target.location.country_or_region | 直接复制值 |
| _source.destination.geo.continent_name | target.location.name | 直接复制值 |
| _source.destination.geo.location.lat | target.location.region_coordinates.latitude | 转换为字符串,然后再转换为浮点数 |
| _source.destination.geo.location.lon | target.location.region_coordinates.longitude | 转换为字符串,然后再转换为浮点数 |
| _source.destination.geo.region_name | target.location.state | 直接复制值 |
| _source.data_stream.namespace | target.namespace | 直接复制值 |
| _source.destination.port | target.port | 转换为字符串,然后再转换为整数 |
| _source.process.command_line | target.process.command_line | 直接复制值 |
| _source.process.executable | target.process.file.full_path | 直接复制值 |
| _source.process.hash.md5 | target.process.file.md5 | 直接复制值 |
| _source.process.hash.sha1 | target.process.file.sha1 | 直接复制值 |
| _source.process.hash.sha256 | target.process.file.sha256 | 直接复制值 |
| _source.process.name | target.process.file.names | 从 _source.process.name 合并 |
| _source.registry.key | target.registry.registry_key | 直接复制值 |
| _source.registry.path | target.registry.registry_value_data | 直接复制值 |
| _source.registry.value | target.registry.registry_value_name | 直接复制值 |
| _source.data_stream.dataset | target.resource.name | 直接复制值 |
| _source.process.entity_id | target.user.userid | 直接复制值 |
| metadata.product_name | 设置为“Elastic Defend” | |
| metadata.vendor_name | 设置为“弹性” |
需要更多帮助?获得社区成员和 Google SecOps 专业人士的解答。