收集 Elastic Defend 日志

支持的平台:

本文档介绍了如何使用 Google Cloud Storage V2 和 Cloud Run 函数将 Elastic Defend 日志注入到 Google Security Operations 中。

Elastic Defend 是 Elastic Security 中的一种端点检测和响应 (EDR) 解决方案,可提供防护、检测和响应功能,并可深入了解 Windows、macOS 和 Linux 操作系统。它会监控进程执行、文件活动、网络连接、注册表修改和库加载,以在端点级别检测和防范威胁。数据存储在 Elasticsearch 中,可以使用 Elasticsearch Search API 进行检索。

准备工作

请确保满足以下前提条件:

  • Google SecOps 实例
  • 已启用以下 API 的 Google Cloud 项目:
    • Cloud Storage
    • Cloud Run functions
    • Cloud Scheduler
    • Pub/Sub
    • IAM
  • 对已部署 Elastic Defend 的 Elasticsearch 集群的访问权限
  • 在 Elasticsearch 中创建 API 密钥的权限(manage_securitymanage_api_keymanage_own_api_key 集群权限)
  • 从 Cloud Run 函数到 Elasticsearch 集群的网络连接

创建 Google Cloud Storage 存储分区

  1. 前往 Google Cloud 控制台
  2. 选择您的项目或创建新项目。
  3. 在导航菜单中,依次前往 Cloud Storage > 存储分区
  4. 点击创建存储分区
  5. 提供以下配置详细信息:

    设置
    为存储桶命名 输入一个全局唯一的名称(例如 elastic-defend-logs
    位置类型 根据您的需求进行选择(区域级、双区域级、多区域级)
    位置 选择相应位置(例如 us-central1
    存储类别 标准(建议用于经常访问的日志)
    访问权限控制 统一(推荐)
    保护工具 可选:启用对象版本控制或保留政策
  6. 点击创建

收集 Elastic Defend 凭据

如需让 Cloud Run 函数能够检索 Elastic Defend 事件,您需要创建一个对 logs-endpoint 数据流具有读取权限的 API 密钥。

使用 Kibana 创建 API 密钥

  1. 登录 Kibana
  2. 在导航菜单或全局搜索字段中,依次前往堆栈管理 > API 密钥
  3. 点击创建 API 密钥
  4. 名称字段中,输入 Google SecOps Cloud Storage Integration
  5. 失效字段中,可选择性地设置失效日期。默认情况下,API 密钥不会过期。
  6. 点击控制安全权限
  7. 指数部分中,点击添加指数权限
  8. 配置索引权限:
    • 指数:输入 logs-endpoint.*
    • 权限:选择读取
  9. 集群权限部分留空(无需任何集群权限)。
  10. 点击创建 API 密钥

记录 API 凭据

创建 API 密钥后,系统会显示一个对话框,其中包含您的凭据:

  • 编码:经过 base64 编码的 API 密钥(例如 VnVhQ2ZHY0JDZGJrUW0tZTVhT3g6dWkybHAyYXhUTm1zeWFrdzl0dk5udw==

    您还需要记录 Elasticsearch 端点网址:

  • 对于 Elastic Cloud:端点显示在 Cloud 控制台中部署的 Elasticsearch 部分下(例如 https://my-deployment.es.us-central1.gcp.cloud.es.io:443

  • 对于自行管理的 Elasticsearch:使用 Elasticsearch 集群的主机名或 IP 地址以及端口(例如 https://elasticsearch.example.com:9200

使用开发者工具创建 API 密钥(替代方法)

或者,您也可以使用 Kibana 开发工具创建 API 密钥:

  1. 登录 Kibana
  2. 前往管理 > 开发者工具
  3. 在控制台中,运行以下命令:

    POST /_security/api_key
    {
      "name": "Google SecOps Cloud Storage Integration",
      "role_descriptors": {
        "chronicle_reader": {
          "indices": [
            {
              "names": ["logs-endpoint.*"],
              "privileges": ["read"]
            }
          ]
        }
      },
      "metadata": {
        "application": "google-chronicle-gcs",
        "environment": "production"
      }
    }
    
  4. 响应包含您的 API 密钥凭据:

    {
      "id": "VuaCfGcBCdbkQm-e5aOx",
      "name": "Google SecOps Cloud Storage Integration",
      "api_key": "ui2lp2axTNmsyakw9tvNnw",
      "encoded": "VnVhQ2ZHY0JDZGJrUW0tZTVhT3g6dWkybHAyYXhUTm1zeWFrdzl0dk5udw=="
    }
    
  5. 复制并保存编码后的值。这是您将用于身份验证的 base64 编码 API 密钥。

创建服务账号

为 Cloud Run 函数创建专用服务账号。

  1. Google Cloud 控制台中,依次前往 IAM 和管理 > 服务账号
  2. 点击创建服务账号
  3. 提供以下配置详细信息:
    • 服务账号名称:输入 elastic-defend-collector
    • 服务账号说明:输入 Service account for Elastic Defend log collection to GCS
  4. 点击创建并继续
  5. 向此服务账号授予对项目的访问权限部分中,添加以下角色:
    1. 点击选择角色,然后选择 Storage Object Admin (roles/storage.objectAdmin)。
    2. 点击添加其他角色,然后选择 Cloud Run Invoker (roles/run.invoker)。
  6. 点击继续
  7. 点击完成

创建发布/订阅主题

创建一个 Pub/Sub 主题,以便从 Cloud Scheduler 触发 Cloud Run 函数。

  1. Google Cloud 控制台中,前往 Pub/Sub > 主题
  2. 点击创建主题
  3. 提供以下配置详细信息:
    • 主题 ID:输入 elastic-defend-trigger
    • 添加默认订阅:保持选中状态
  4. 点击创建

创建 Cloud Run 函数

创建 Cloud Run 函数,用于从 Elasticsearch 中检索事件并将其写入 GCS。

创建函数

  1. Google Cloud 控制台中,前往 Cloud Run 函数
  2. 点击创建函数
  3. 提供以下配置详细信息:

    设置
    环境 第 2 代
    函数名称 elastic-defend-to-gcs
    区域 选择与您的 GCS 存储桶相同的区域
    触发器类型 Cloud Pub/Sub
    Cloud Pub/Sub 主题 选择elastic-defend-trigger
    分配的内存 512 MiB(如果数据量较大,可增加此值)
    超时 540 秒
    运行时服务账号 选择elastic-defend-collector
  4. 点击下一步

添加环境变量

  • 运行时、构建、连接和安全设置部分中添加以下环境变量:

    变量
    GCS_BUCKET GCS 存储桶的名称(例如 elastic-defend-logs
    GCS_PREFIX 日志文件的前缀(例如 elastic-defend
    STATE_KEY 状态文件的名称(例如,state.json
    ES_HOST Elasticsearch 网址(例如 https://my-deployment.es.us-central1.gcp.cloud.es.io:443
    ES_API_KEY 凭据创建步骤中的编码 API 密钥
    MAX_RECORDS 每次执行的记录数上限(例如,100000
    PAGE_SIZE 每次搜索请求的记录数(例如 1000
    LOOKBACK_HOURS 首次运行时的回溯小时数(例如 24

添加函数代码

  1. 选择 Python 3.11 作为运行时
  2. 入口点设置为 main
  3. 源代码部分中,选择内嵌编辑器
  4. main.py 的内容替换为以下代码:

    import os
    import json
    import datetime
    import base64
    import requests
    from google.cloud import storage
    
    GCS_BUCKET = os.environ["GCS_BUCKET"]
    GCS_PREFIX = os.environ.get("GCS_PREFIX", "elastic-defend")
    STATE_KEY = os.environ.get("STATE_KEY", "state.json")
    ES_HOST = os.environ["ES_HOST"]
    ES_API_KEY = os.environ["ES_API_KEY"]
    MAX_RECORDS = int(os.environ.get("MAX_RECORDS", "100000"))
    PAGE_SIZE = int(os.environ.get("PAGE_SIZE", "1000"))
    LOOKBACK_HOURS = int(os.environ.get("LOOKBACK_HOURS", "24"))
    
    INDEX_PATTERN = "logs-endpoint.*"
    SEARCH_PATH = f"/{INDEX_PATTERN}/_search"
    
    def _gcs_client():
        return storage.Client()
    
    def _load_state(bucket):
        blob = bucket.blob(f"{GCS_PREFIX}/{STATE_KEY}")
        if blob.exists():
            return json.loads(blob.download_as_text())
        return {}
    
    def _save_state(bucket, state):
        blob = bucket.blob(f"{GCS_PREFIX}/{STATE_KEY}")
        blob.upload_from_string(
            json.dumps(state),
            content_type="application/json",
        )
    
    def _build_query(gte_ts, sort_after=None):
        body = {
            "size": PAGE_SIZE,
            "query": {
                "range": {
                    "@timestamp": {
                        "gte": gte_ts,
                        "format": "strict_date_optional_time",
                    }
                }
            },
            "sort": [
                {"@timestamp": {"order": "asc"}},
                {"_shard_doc": "asc"},
            ],
        }
        if sort_after:
            body["search_after"] = sort_after
        return body
    
    def _search(session, body):
        url = f"{ES_HOST.rstrip('/')}{SEARCH_PATH}"
        resp = session.post(
            url,
            json=body,
            headers={
                "Authorization": f"ApiKey {ES_API_KEY}",
                "Content-Type": "application/json",
            },
            timeout=120,
        )
        resp.raise_for_status()
        return resp.json()
    
    def _write_ndjson(bucket, records, ts_label):
        if not records:
            return
        now = datetime.datetime.utcnow().strftime("%Y%m%dT%H%M%SZ")
        blob_name = f"{GCS_PREFIX}/{ts_label}/{now}.ndjson"
        blob = bucket.blob(blob_name)
        ndjson = "\n".join(json.dumps(r, separators=(",", ":")) for r in records)
        blob.upload_from_string(ndjson, content_type="application/x-ndjson")
        print(f"Wrote {len(records)} records to gs://{GCS_BUCKET}/{blob_name}")
    
    def main(event, context):
        """Cloud Run function entry point triggered by Pub/Sub."""
        client = _gcs_client()
        bucket = client.bucket(GCS_BUCKET)
    
        state = _load_state(bucket)
        sort_after = state.get("sort_after")
    
        if state.get("last_timestamp"):
            gte_ts = state["last_timestamp"]
        else:
            gte_ts = (
                datetime.datetime.utcnow()
                - datetime.timedelta(hours=LOOKBACK_HOURS)
            ).strftime("%Y-%m-%dT%H:%M:%S.%fZ")
    
        session = requests.Session()
        total = 0
        batch = []
        last_ts = gte_ts
        ts_label = datetime.datetime.utcnow().strftime("%Y/%m/%d/%H")
    
        while total < MAX_RECORDS:
            body = _build_query(gte_ts, sort_after)
            result = _search(session, body)
            hits = result.get("hits", {}).get("hits", [])
    
            if not hits:
                break
    
            for hit in hits:
                doc = hit.get("_source", {})
                doc["_id"] = hit.get("_id")
                doc["_index"] = hit.get("_index")
                batch.append(doc)
    
                hit_ts = doc.get("@timestamp", last_ts)
                if hit_ts > last_ts:
                    last_ts = hit_ts
    
            sort_after = hits[-1].get("sort")
            total += len(hits)
    
            if len(batch) >= PAGE_SIZE:
                _write_ndjson(bucket, batch, ts_label)
                batch = []
    
            if len(hits) < PAGE_SIZE:
                break
    
        if batch:
            _write_ndjson(bucket, batch, ts_label)
    
        new_state = {
            "last_timestamp": last_ts,
            "sort_after": sort_after,
        }
        _save_state(bucket, new_state)
        print(f"Done. Fetched {total} records. State: {json.dumps(new_state)}")
        return f"OK: {total} records"
    
  5. 使用以下内容替换 requirements.txt 的内容:

    functions-framework==3.*
    google-cloud-storage==2.*
    requests==2.*
    
  6. 点击部署

  7. 等待部署成功完成。

创建 Cloud Scheduler 作业

创建一个 Cloud Scheduler 作业,以便按常规时间表触发 Cloud Run 函数。

  1. Google Cloud 控制台中,前往 Cloud Scheduler
  2. 点击创建作业
  3. 提供以下配置详细信息:

    设置
    名称 elastic-defend-scheduler
    区域 选择与 Cloud Run 函数相同的区域
    频率 */5 * * * *(每 5 分钟)
    时区 选择您所在的时区(例如 UTC
  4. 点击继续

  5. 配置执行部分中:

    • 目标类型:选择 Pub/Sub
    • Cloud Pub/Sub 主题:选择 elastic-defend-trigger
    • 消息正文:输入 {"run": true}
  6. 点击创建

检索 Google SecOps 服务账号并配置 Feed

Google SecOps 使用唯一的服务账号从您的 GCS 存储桶中读取数据。您必须向此服务账号授予对您的存储桶的访问权限。

获取服务账号电子邮件地址

  1. 依次前往 SIEM 设置> Feed
  2. 点击添加新 Feed
  3. 点击配置单个 Feed
  4. Feed 名称字段中,输入 Feed 的名称(例如 Elastic Defend Events)。
  5. 选择 Google Cloud Storage V2 作为来源类型
  6. 选择 Elastic Defend 作为日志类型
  7. 点击获取服务账号。系统会显示一个唯一的服务账号电子邮件地址,例如:

    chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com
    
  8. 复制此电子邮件地址,以便在下一步中使用。

配置 Feed

  1. 点击下一步
  2. 为以下输入参数指定值:

    • 存储桶网址:输入带有前缀路径的 GCS 存储桶 URI:

      gs://elastic-defend-logs/elastic-defend/
      

      elastic-defend-logs 替换为您的 GCS 存储桶名称。

    • 来源删除选项:根据您的偏好选择删除选项:

      • 永不:转移后永不删除任何文件(建议用于测试)。
      • 删除已转移的文件:成功转移后删除文件。
      • 删除已转移的文件和空目录:成功转移后删除文件和空目录。

    • 文件存在时间上限:包含在过去指定天数内修改的文件(默认值为 180 天)

    • 资产命名空间资产命名空间

    • 注入标签:要应用于此 Feed 中事件的标签

  3. 点击下一步

  4. 最终确定界面中查看新的 Feed 配置,然后点击提交

向 Google SecOps 服务账号授予 IAM 权限

Google SecOps 服务账号需要对您的 GCS 存储桶具有 Storage Object Viewer 角色。

  1. 前往 Cloud Storage > 存储分区
  2. 点击您的存储桶名称(例如 elastic-defend-logs)。
  3. 前往权限标签页。
  4. 点击授予访问权限
  5. 提供以下配置详细信息:
    • 添加主账号:粘贴 Google SecOps 服务账号电子邮件地址
    • 分配角色:选择 Storage Object Viewer
  6. 点击保存

UDM 映射表

日志字段 UDM 映射 逻辑
_source.agent.id、_source.agent.type、_source.agent.version、_source.host.architecture、_source.event.agent_id_status、_source.event.id、_source.user.id、_source.group.id、_source.data_stream.type、_source.agent.build.original additional.fields 与包含所列字段值的标签对象合并
_source.process.Ext.session_info.logon_type extensions.auth.auth_details 直接复制值
_source.host.os.full hardware.cpu_platform 直接复制值
_source.host.id hardware.serial_number 直接复制值
_source.rule.description metadata.description 直接复制值
_source.@timestamp metadata.event_timestamp 使用日期过滤器转换,格式为 ISO8601、yyyy-MM-ddTHH:mm:ss.SSSSSSSSSZ、yyyy-MM-ddTHH:mm:ss.SSSSSSZ、yyyy-MM-ddTHH:mm:ss.SSSZ、yyyy-MM-ddTHH:mm:ssZ
metadata.event_type 根据索引、event.action 和 has* 条件设置
metadata.log_type 设置为“ELASTIC_DEFEND”
metadata.product_event_type 如果 _index ~ events.file,则设置为“文件事件”;如果 events.library,则设置为“库事件”;如果 events.network,则设置为“网络事件”;如果 events.process,则设置为“进程事件”;如果 events.registry,则设置为“注册表事件”;如果 events.security,则设置为“安全事件”;如果 events.api,则设置为“API 事件”;如果 .alert,则设置为“提醒”
_id metadata.product_log_id 如果 _id 中的值不在 ["", " ", "null", "N/A"] 中,则使用该值
_source.ecs.version metadata.product_version 直接复制值
_source.network.type network.application_protocol_version 直接复制值
_source.network.transport network.ip_protocol 如果匹配 (?i)tcp,则设置为“TCP”;如果匹配 (?i)udp,则设置为“UDP”;如果匹配 (?i)icmp,则设置为“ICMP”;否则设置为“UNKNOWN_IP_PROTOCOL”
_source.destination.as.organization.name network.organization_name 直接复制值
_source.Endpoint.policy.applied.artifacts.global.identifiers observer.file.names 从 _source.Endpoint.policy.applied.artifacts.global.identifiers 合并
_source.Endpoint.policy.applied.artifacts.global.version, _source.Endpoint.policy.applied.artifacts.global.snapshot observer.resource.attribute.labels 与包含所列字段值的标签对象合并
_source.Endpoint.policy.applied.artifacts.user.version observer.user.attribute.labels 与包含来自 _source.Endpoint.policy.applied.artifacts.user.version 的值的标签对象合并
_source.host.os.full principal.asset.hardware.cpu_platform 直接复制值
_source.host.id principal.asset.hardware.serial_number 直接复制值
_source.host.name principal.asset.hostname 直接复制值
_source.host.ip principal.asset.ip 从 _source.host.ip 合并
_source.host.os.type principal.asset.platform_software.platform 如果匹配 (?i)windows,则设置为“WINDOWS”;如果匹配 (?i)linux,则设置为“LINUX”;如果匹配 (?i)mac,则设置为“MAC”;如果匹配 (?i)ios,则设置为“IOS”;否则设置为“UNKNOWN_PLATFORM”
_source.host.os.kernel principal.asset.platform_software.platform_patch_level 直接复制值
_source.event.created principal.domain.creation_time 使用日期过滤器转换,格式为 ISO8601、yyyy-MM-ddTHH:mm:ss.SSSSSSSSSZ、yyyy-MM-ddTHH:mm:ss.SSSSSSZ、yyyy-MM-ddTHH:mm:ss.SSSZ、yyyy-MM-ddTHH:mm:ssZ
_source.user.domain principal.domain.name 直接复制值
_source.process.thread.capabilities.effective principal.file.capabilities_tags 从 _source.process.thread.capabilities.effective 合并
_source.process.executable principal.file.full_path 直接复制值
_source.process.hash.md5 principal.file.md5 直接复制值
_source.file.name principal.file.names 从 _source.file.name 合并
_source.process.hash.sha1 principal.file.sha1 直接复制值
_source.process.hash.sha256 principal.file.sha256 直接复制值
_source.host.hostname principal.hostname 直接复制值
_source.host.ip principal.ip 从 _source.host.ip 合并
_source.host.mac principal.mac 在将 - 替换为 : 后,从 _source.host.mac 合并
_source.host.os.Ext.variant principal.platform_version 直接复制值
_source.source.port principal.port 转换为字符串,然后再转换为整数
_source.process.command_line, _source.process.name principal.process.command_line 如果 _source.process.command_line 不为空,则取自该字段,否则取自 _source.process.name
_source.process.thread.capabilities.permitted principal.process.file.capabilities_tags 从 _source.process.thread.capabilities.permitted 合并
_source.process.executable principal.process.file.full_path 直接复制值
_source.process.hash.md5 principal.process.file.md5 直接复制值
_source.process.hash.sha1 principal.process.file.sha1 直接复制值
_source.process.hash.sha256 principal.process.file.sha256 直接复制值
_source.process.parent.executable principal.process.parent_process.file.full_path 直接复制值
_source.process.pid principal.process.pid 转换为字符串,然后复制
_source.process.Ext.api.name principal.resource.attribute.labels 与包含来自 _source.process.Ext.api.name 的值的标签对象合并
_source.event.code principal.resource.product_object_id 直接复制值
_source.group.name principal.user.group_identifiers 从 _source.group.name 合并
_source.user.name principal.user.userid 直接复制值
_source.user.id principal.user.windows_sid 如果与正则表达式 ^S-\\d-(\\\\d+-){1,14}\\\\d+$ 匹配,则为 _source.user.id 中的值
_source.file.Ext.malware_signature.primary.signature.hash.sha256 security_result.about.file.sha256 直接复制值
_source.event.outcome security_result.action 来自 _source.event.outcome 的值(大写),然后设置为 ALLOW(如果位于 [SUCCESS, ALLOW] 中)、BLOCK(如果位于 [FAILURE, DENY, SKIPPED, RATE_LIMIT] 中)、UNKNOWN_ACTION(如果为 UNKNOWN)
_source.event.action security_result.action_details 直接复制值
_source.destination.geo.region_iso_code security_result.associations 与包含来自 _source.destination.geo.region_iso_code 的名称的对象合并
_source.kibana.alert.rule.parameters.threat.tactic.id, _source.kibana.alert.rule.parameters.threat.tactic.name security_result.attack_details.tactics 与包含所列字段中的 ID 和名称的对象合并
_source.kibana.alert.rule.parameters.threat.technique.id, _source.kibana.alert.rule.parameters.threat.technique.name, _source.kibana.alert.rule.parameters.threat.technique.subtechnique.id, _source.kibana.alert.rule.parameters.threat.technique.subtechnique.name security_result.attack_details.techniques 与包含以下字段的对象合并:id、name、subtechnique_id、subtechnique_name
_source.event.category security_result.category_details 从 _source.event.category 合并
_source.kibana.alert.rule.description security_result.description 直接复制值
_source.event.kind, _source.file.Ext.malware_signature.all_names, _source.file.Ext.malware_signature.identifier, _source.event.risk_score, _source.threat.tactic.reference, _source.threat.technique.reference, _source.threat.technique.subtechnique.reference security_result.detection_fields 与包含所列字段值的标签对象合并
_source.rule.id, _source.kibana.alert.rule.rule_id security_result.rule_id 如果 _source.rule.id 不为空,则返回该值;否则,返回 _source.kibana.alert.rule.rule_id
_source.rule.name, _source.kibana.alert.rule.name security_result.rule_name 如果 _source.rule.name 不为空,则为该值;否则为 _source.kibana.alert.rule.name 中的值
_source.rule.ruleset security_result.rule_set 直接复制值
security_result.severity 设置为“LOW”;如果 _index 与 .alert 匹配,则设置为“HIGH”;如果 _source.kibana.alert.rule.parameters.severity 与 (?i)LOW 匹配,则设置为“LOW”
_source.message security_result.summary 直接复制值
_source.file.Ext.malware_signature.primary.signature.id security_result.threat_id 直接复制值
_source.file.Ext.malware_signature.primary.signature.name security_result.threat_name 直接复制值
_source.source.address, _source.source.ip src.asset.ip 从 _source.source.address 和 _source.source.ip 合并而来
_source.source.address, _source.source.ip src.ip 从 _source.source.address 和 _source.source.ip 合并而来
_source.host.name target.asset.hostname 直接复制值
_source.destination.address, _source.destination.ip target.asset.ip 从 _source.destination.address 和 _source.destination.ip 合并而来
_source.file.path、_source.dll.path、_source.process.executable、_source.Target.process.executable target.file.full_path 如果为 events.file,则为 _source.file.path 中的值;如果为 events.library,则为 _source.dll.path 中的值;如果为 events.process 或 events.api,则为 _source.process.executable 中的值;如果为 events.api,则为 _source.Target.process.executable 中的值
_source.dll.hash.md5、_source.process.hash.md5 target.file.md5 如果为 events.library,则值为 _source.dll.hash.md5;如果为.alert,则值为 _source.process.hash .md5
_source.dll.name, _source.process.name target.file.names 如果为 events.library,则从 _source.dll.name 合并;如果为.alert,则从 _source.process .name 合并
_source.dll.hash.sha1, _source.process.hash.sha1 target.file.sha1 如果为 events.library,则值为 _source.dll.hash.sha1;如果为.alert,则值为 _source.process.hash .sha1
_source.dll.hash.sha256、_source.process.hash.sha256 target.file.sha256 如果为 events.library,则为 _source.dll.hash.sha256 中的值;如果为.alert,则为 _source.process.hash .sha256 中的值
_source.host.name target.hostname 直接复制值
_source.destination.address, _source.destination.ip target.ip 从 _source.destination.address 和 _source.destination.ip 合并而来
_source.destination.geo.city_name target.location.city 直接复制值
_source.destination.geo.country_name target.location.country_or_region 直接复制值
_source.destination.geo.continent_name target.location.name 直接复制值
_source.destination.geo.location.lat target.location.region_coordinates.latitude 转换为字符串,然后再转换为浮点数
_source.destination.geo.location.lon target.location.region_coordinates.longitude 转换为字符串,然后再转换为浮点数
_source.destination.geo.region_name target.location.state 直接复制值
_source.data_stream.namespace target.namespace 直接复制值
_source.destination.port target.port 转换为字符串,然后再转换为整数
_source.process.command_line target.process.command_line 直接复制值
_source.process.executable target.process.file.full_path 直接复制值
_source.process.hash.md5 target.process.file.md5 直接复制值
_source.process.hash.sha1 target.process.file.sha1 直接复制值
_source.process.hash.sha256 target.process.file.sha256 直接复制值
_source.process.name target.process.file.names 从 _source.process.name 合并
_source.registry.key target.registry.registry_key 直接复制值
_source.registry.path target.registry.registry_value_data 直接复制值
_source.registry.value target.registry.registry_value_name 直接复制值
_source.data_stream.dataset target.resource.name 直接复制值
_source.process.entity_id target.user.userid 直接复制值
metadata.product_name 设置为“Elastic Defend”
metadata.vendor_name 设置为“弹性”

需要更多帮助?获得社区成员和 Google SecOps 专业人士的解答。