收集 DomainTools Iris Investigate 结果

支持的平台:

本文档介绍了如何使用 Google Cloud Storage 将 DomainTools Iris Investigate 结果提取到 Google Security Operations。解析器会将 DomainTools 的 Iris API 中的原始 JSON 数据转换为符合 Google SecOps 的统一数据模型 (UDM) 的结构化格式。它会提取与网域详细信息、联系信息、安全风险、SSL 证书和其他相关属性有关的信息,并将这些信息映射到相应的 UDM 字段,以便进行一致的分析和威胁情报。

准备工作

确保您满足以下前提条件:

  • Google SecOps 实例
  • 对 DomainTools 企业账号的特权访问权限(对 Iris Investigate 的 API 访问权限)
  • 已启用 Cloud Storage API 的 GCP 项目
  • 创建和管理 GCS 存储分区的权限
  • 管理 GCS 存储分区的 IAM 政策的权限
  • 创建 Cloud Run 服务、Pub/Sub 主题和 Cloud Scheduler 作业的权限

获取 DomainTools API 密钥和端点

  1. 登录 DomainTools API 信息中心(只有 API 所有者账号才能重置 API 密钥)。
  2. 我的账号部分中,选择账号摘要标签页中的查看 API 信息中心链接。
  3. 前往 API 用户名部分,获取您的用户名。
  4. 在同一标签页中,找到您的 API 密钥
  5. 复制密钥并将其保存在安全的位置。如果您需要新密钥,请选择 Reset API Key

  6. 请注意 Iris Investigate 端点:https://api.domaintools.com/v1/iris-investigate/

创建 Google Cloud Storage 存储分区

  1. 前往 Google Cloud 控制台
  2. 选择您的项目或创建新项目。
  3. 在导航菜单中,依次前往 Cloud Storage > 存储分区
  4. 点击创建存储分区
  5. 提供以下配置详细信息:

    设置
    为存储分区命名 输入一个全局唯一的名称(例如 domaintools-iris
    位置类型 根据您的需求进行选择(区域级、双区域级、多区域级)
    位置 选择相应位置(例如 us-central1
    存储类别 标准(建议用于经常访问的日志)
    访问权限控制 统一(推荐)
    保护工具 可选:启用对象版本控制或保留政策
  6. 点击创建

为 Cloud Run 函数创建服务账号

Cloud Run 函数需要一个服务账号,该账号具有向 GCS 存储分区写入内容以及被 Pub/Sub 调用的权限。

创建服务账号

  1. GCP 控制台中,依次前往 IAM 和管理 > 服务账号
  2. 点击创建服务账号
  3. 提供以下配置详细信息:
    • 服务账号名称:输入 domaintools-iris-collector-sa
    • 服务账号说明:输入 Service account for Cloud Run function to collect DomainTools Iris Investigate logs
  4. 点击创建并继续
  5. 向此服务账号授予对项目的访问权限部分中,添加以下角色:
    1. 点击选择角色
    2. 搜索并选择 Storage Object Admin
    3. 点击 + 添加其他角色
    4. 搜索并选择 Cloud Run Invoker
    5. 点击 + 添加其他角色
    6. 搜索并选择 Cloud Functions Invoker
  6. 点击继续
  7. 点击完成

必须拥有这些角色,才能:

  • Storage Object Admin:将日志写入 GCS 存储分区并管理状态文件
  • Cloud Run Invoker:允许 Pub/Sub 调用函数
  • Cloud Functions Invoker:允许调用函数

授予对 GCS 存储分区的 IAM 权限

向服务账号授予对 GCS 存储分区的写入权限:

  1. 前往 Cloud Storage > 存储分区
  2. 点击您的存储分区名称。
  3. 前往权限标签页。
  4. 点击授予访问权限
  5. 提供以下配置详细信息:
    • 添加主账号:输入服务账号电子邮件地址(例如 domaintools-iris-collector-sa@PROJECT_ID.iam.gserviceaccount.com)。
    • 分配角色:选择 Storage Object Admin
  6. 点击保存

创建发布/订阅主题

创建一个 Pub/Sub 主题,供 Cloud Scheduler 发布消息,并供 Cloud Run 函数订阅。

  1. GCP 控制台中,前往 Pub/Sub > 主题
  2. 点击创建主题
  3. 提供以下配置详细信息:
    • 主题 ID:输入 domaintools-iris-trigger
    • 将其他设置保留为默认值。
  4. 点击创建

创建 Cloud Run 函数以收集日志

Cloud Run 函数由来自 Cloud Scheduler 的 Pub/Sub 消息触发,用于从 DomainTools Iris Investigate API 中提取日志并将其写入 GCS。

  1. GCP 控制台中,前往 Cloud Run
  2. 点击创建服务
  3. 选择函数(使用内嵌编辑器创建函数)。
  4. 配置部分中,提供以下配置详细信息:

    设置
    Service 名称 domaintools-iris-collector
    区域 选择与您的 GCS 存储分区匹配的区域(例如 us-central1
    运行时 选择 Python 3.12 或更高版本
  5. 触发器(可选)部分中:

    1. 点击 + 添加触发器
    2. 选择 Cloud Pub/Sub
    3. 选择 Cloud Pub/Sub 主题中,选择 Pub/Sub 主题 (domaintools-iris-trigger)。
    4. 点击保存
  6. 身份验证部分中:

    1. 选择需要进行身份验证
    2. 检查 Identity and Access Management (IAM)
  7. 向下滚动并展开容器、网络、安全性

  8. 前往安全标签页:

    • 服务账号:选择服务账号 (domaintools-iris-collector-sa)。
  9. 前往容器标签页:

    1. 点击变量和密钥
    2. 为每个环境变量点击 + 添加变量
    变量名称 示例值 说明
    GCS_BUCKET domaintools-iris 将存储数据的 GCS 存储分区名称。
    GCS_PREFIX domaintools/iris/ 对象的可选 GCS 前缀(子文件夹)。
    STATE_KEY domaintools/iris/state.json 可选的状态/检查点文件键。
    DT_API_KEY DT-XXXXXXXXXXXXXXXXXXXX DomainTools API 密钥。
    USE_MODE HASH 选择要使用的模式:HASHDOMAINSQUERY(一次只能启用一种模式)。
    SEARCH_HASHES hash1;hash2;hash3 如果 USE_MODE=HASH,则为必需。以英文分号分隔的 Iris 界面中的已保存搜索哈希列表。
    DOMAINS example.com;domaintools.com 如果 USE_MODE=DOMAINS,则为必需。以英文分号分隔的网域列表。
    QUERY_LIST ip=1.1.1.1;ip=8.8.8.8;domain=example.org 如果 USE_MODE=QUERY,则为必需。以英文分号分隔的查询字符串列表 (k=v&k2=v2)。
    PAGE_SIZE 500 每页行数(默认值为 500)。
    MAX_PAGES 20 每个请求的最大页面数。
  10. 变量和 Secret 部分中,向下滚动到请求

    • 请求超时:输入 900 秒(15 分钟)。
  11. 前往设置标签页:

    • 资源部分中:
      • 内存:选择 512 MiB 或更高值。
      • CPU:选择 1
  12. 修订版本扩缩部分中:

    • 实例数下限:输入 0
    • 实例数上限:输入 100(或根据预期负载进行调整)。
  13. 点击创建

  14. 等待服务创建完成(1-2 分钟)。

  15. 创建服务后,系统会自动打开内嵌代码编辑器

添加函数代码

  1. 函数入口点中输入 main
  2. 在内嵌代码编辑器中,创建两个文件:

    • 第一个文件:main.py:
    import functions_framework
    from google.cloud import storage
    import json
    import os
    import urllib.parse
    from urllib.request import Request, urlopen
    from urllib.error import HTTPError
    import time
    from datetime import datetime, timezone
    
    # Initialize Storage client
    storage_client = storage.Client()
    
    # Environment variables
    GCS_BUCKET = os.environ.get("GCS_BUCKET", "").strip()
    GCS_PREFIX = os.environ.get("GCS_PREFIX", "domaintools/iris/").strip()
    STATE_KEY = os.environ.get("STATE_KEY", "domaintools/iris/state.json").strip()
    DT_API_KEY = os.environ.get("DT_API_KEY", "").strip()
    USE_MODE = os.environ.get("USE_MODE", "HASH").strip().upper()
    SEARCH_HASHES = [h.strip() for h in os.environ.get("SEARCH_HASHES", "").split(";") if h.strip()]
    DOMAINS = [d.strip() for d in os.environ.get("DOMAINS", "").split(";") if d.strip()]
    QUERY_LIST = [q.strip() for q in os.environ.get("QUERY_LIST", "").split(";") if q.strip()]
    PAGE_SIZE = int(os.environ.get("PAGE_SIZE", "500"))
    MAX_PAGES = int(os.environ.get("MAX_PAGES", "20"))
    USE_NEXT = os.environ.get("USE_NEXT", "true").lower() == "true"
    HTTP_TIMEOUT = int(os.environ.get("HTTP_TIMEOUT", "60"))
    RETRIES = int(os.environ.get("HTTP_RETRIES", "2"))
    
    BASE_URL = "https://api.domaintools.com/v1/iris-investigate/"
    HDRS = {
        "X-Api-Key": DT_API_KEY,
        "Accept": "application/json",
    }
    
    def _http_post(url: str, body: dict) -> dict:
        """Make HTTP POST request with form-encoded body."""
        req = Request(url, method="POST")
        for k, v in HDRS.items():
            req.add_header(k, v)
        req.add_header("Content-Type", "application/x-www-form-urlencoded")
    
        encoded_body = urllib.parse.urlencode(body, doseq=True).encode('utf-8')
    
        attempt = 0
        while True:
            try:
                with urlopen(req, data=encoded_body, timeout=HTTP_TIMEOUT) as r:
                    return json.loads(r.read().decode("utf-8"))
            except HTTPError as e:
                if e.code in (429, 500, 502, 503, 504) and attempt < RETRIES:
                    delay = int(e.headers.get("Retry-After", "2"))
                    time.sleep(max(1, delay))
                    attempt += 1
                    continue
                raise
    
    def _write_page(bucket, obj: dict, label: str, page: int) -> str:
        ts = time.strftime("%Y/%m/%d/%H%M%S", time.gmtime())
        key = f"{GCS_PREFIX.rstrip('/')}/{ts}-{label}-p{page:05d}.json"
        blob = bucket.blob(key)
        blob.upload_from_string(
            json.dumps(obj, separators=(",", ":")),
            content_type="application/json"
        )
        return key
    
    def _first_page_params() -> dict:
        params = {"page_size": str(PAGE_SIZE)}
        if USE_NEXT:
            params["next"] = "true"
        return params
    
    def _paginate(bucket, label: str, params: dict) -> tuple:
        pages = 0
        total = 0
    
        while pages < MAX_PAGES:
            data = _http_post(BASE_URL, params)
            _write_page(bucket, data, label, pages)
            resp = data.get("response") or {}
            results = resp.get("results") or []
            total += len(results)
            pages += 1
    
            next_url = resp.get("next") if isinstance(resp, dict) else None
            if next_url:
                parsed = urllib.parse.urlparse(next_url)
                params = dict(urllib.parse.parse_qsl(parsed.query))
                continue
    
            if resp.get("has_more_results") and resp.get("position"):
                base = _first_page_params()
                base.pop("next", None)
                base["position"] = resp["position"]
                params = base
                continue
            break
        return pages, total
    
    def run_hashes(bucket, hashes: list) -> dict:
        agg_pages = agg_results = 0
        for h in hashes:
            params = _first_page_params()
            params["search_hash"] = h
            p, r = _paginate(bucket, f"hash-{h}", params)
            agg_pages += p
            agg_results += r
        return {"pages": agg_pages, "results": agg_results}
    
    def run_domains(bucket, domains: list) -> dict:
        agg_pages = agg_results = 0
        for d in domains:
            params = _first_page_params()
            params["domain"] = d
            p, r = _paginate(bucket, f"domain-{d}", params)
            agg_pages += p
            agg_results += r
        return {"pages": agg_pages, "results": agg_results}
    
    def run_queries(bucket, queries: list) -> dict:
        agg_pages = agg_results = 0
        for q in queries:
            base = _first_page_params()
            for k, v in urllib.parse.parse_qsl(q, keep_blank_values=True):
                base.setdefault(k, v)
            p, r = _paginate(bucket, f"query-{q.replace('=', '-')}", base)
            agg_pages += p
            agg_results += r
        return {"pages": agg_pages, "results": agg_results}
    
    @functions_framework.cloud_event
    def main(cloud_event):
        """
        Cloud Run function triggered by Pub/Sub to fetch DomainTools Iris Investigate results and write to GCS.
    
        Args:
            cloud_event: CloudEvent object containing Pub/Sub message
        """
    
        if not GCS_BUCKET:
            print("Error: GCS_BUCKET environment variable not set")
            return
    
        try:
            bucket = storage_client.bucket(GCS_BUCKET)
    
            if USE_MODE == "HASH" and SEARCH_HASHES:
                res = run_hashes(bucket, SEARCH_HASHES)
            elif USE_MODE == "DOMAINS" and DOMAINS:
                res = run_domains(bucket, DOMAINS)
            elif USE_MODE == "QUERY" and QUERY_LIST:
                res = run_queries(bucket, QUERY_LIST)
            else:
                raise ValueError(
                    "Invalid USE_MODE or missing parameters. Set USE_MODE to HASH | DOMAINS | QUERY "
                    "and provide SEARCH_HASHES | DOMAINS | QUERY_LIST accordingly."
                )
    
            print(f"Successfully processed: {json.dumps({'ok': True, 'mode': USE_MODE, **res})}")
    
        except Exception as e:
            print(f"Error processing DomainTools Iris data: {str(e)}")
            raise
    
    • 第二个文件:requirements.txt:
    functions-framework==3.*
    google-cloud-storage==2.*
    
  3. 点击部署以保存并部署该函数。

  4. 等待部署完成(2-3 分钟)。

创建 Cloud Scheduler 作业

Cloud Scheduler 会定期向 Pub/Sub 主题发布消息,从而触发 Cloud Run 函数。

  1. GCP Console 中,前往 Cloud Scheduler
  2. 点击创建作业
  3. 提供以下配置详细信息:

    设置
    名称 domaintools-iris-1h
    区域 选择与 Cloud Run 函数相同的区域
    频率 0 * * * *(每小时一次,在整点时)
    时区 选择时区(建议选择世界协调时间 [UTC])
    目标类型 Pub/Sub
    主题 选择 Pub/Sub 主题 (domaintools-iris-trigger)
    消息正文 {}(空 JSON 对象)
  4. 点击创建

时间表频率选项

  • 根据日志量和延迟时间要求选择频次:

    频率 Cron 表达式 使用场景
    每隔 5 分钟 */5 * * * * 高容量、低延迟
    每隔 15 分钟 */15 * * * * 搜索量中等
    每小时 0 * * * * 标准(推荐)
    每 6 小时 0 */6 * * * 量小、批处理
    每天 0 0 * * * 历史数据收集

测试集成

  1. Cloud Scheduler 控制台中,找到您的作业。
  2. 点击强制运行以手动触发作业。
  3. 等待几秒钟。
  4. 前往 Cloud Run > 服务
  5. 点击函数名称 (domaintools-iris-collector)。
  6. 点击日志标签页。
  7. 验证函数是否已成功执行。查找以下项:

    Successfully processed: {"ok": true, "mode": "HASH", "pages": X, "results": Y}
    
  8. 前往 Cloud Storage > 存储分区

  9. 点击您的存储分区名称。

  10. 前往前缀文件夹 (domaintools/iris/)。

  11. 验证是否已创建具有当前时间戳的新 .json 文件。

如果您在日志中看到错误,请执行以下操作:

  • HTTP 401:检查环境变量中的 DomainTools API 凭据
  • HTTP 403:验证账号是否具有 Iris Investigate API 所需的权限
  • HTTP 429:速率限制 - 函数将自动重试并进行退避
  • 缺少环境变量:检查是否已设置所有必需的变量
  • USE_MODE 无效:验证 USE_MODE 是否已设置为 HASH、DOMAINS 或 QUERY,并提供了相应的参数

检索 Google SecOps 服务账号

Google SecOps 使用唯一的服务账号从您的 GCS 存储分区中读取数据。您必须授予此服务账号对您的存储分区的访问权限。

获取服务账号电子邮件地址

  1. 依次前往 SIEM 设置 > Feed
  2. 点击添加新 Feed
  3. 点击配置单个 Feed
  4. Feed 名称字段中,输入 Feed 的名称(例如 DomainTools Iris Investigate)。
  5. 选择 Google Cloud Storage V2 作为来源类型
  6. 选择 DomainTools Threat Intelligence 作为日志类型
  7. 点击获取服务账号。系统会显示一个唯一的服务账号电子邮件地址,例如:

    chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com
    
  8. 复制此电子邮件地址,以便在下一步中使用。

向 Google SecOps 服务账号授予 IAM 权限

Google SecOps 服务账号需要对您的 GCS 存储分区具有 Storage Object Viewer 角色。

  1. 前往 Cloud Storage > 存储分区
  2. 点击您的存储分区名称。
  3. 前往权限标签页。
  4. 点击授予访问权限
  5. 提供以下配置详细信息:
    • 添加主账号:粘贴 Google SecOps 服务账号电子邮件地址。
    • 分配角色:选择 Storage Object Viewer
  6. 点击保存

在 Google SecOps 中配置 Feed 以提取 DomainTools Iris Investigate 结果

  1. 依次前往 SIEM 设置 > Feed
  2. 点击添加新 Feed
  3. 点击配置单个 Feed
  4. Feed 名称字段中,输入 Feed 的名称(例如 DomainTools Iris Investigate)。
  5. 选择 Google Cloud Storage V2 作为来源类型
  6. 选择 DomainTools Threat Intelligence 作为日志类型
  7. 点击下一步
  8. 为以下输入参数指定值:

    • 存储分区网址:输入带有前缀路径的 GCS 存储分区 URI:

      gs://domaintools-iris/domaintools/iris/
      
        • domaintools-iris:您的 GCS 存储分区名称。
        • domaintools/iris/:存储日志的可选前缀/文件夹路径(留空表示根目录)。
      • 示例

        • 根存储分区:gs://domaintools-iris/
        • 带前缀:gs://domaintools-iris/domaintools/iris/
    • 来源删除选项:根据您的偏好选择删除选项:

      • 永不:永不删除转移后的任何文件(建议用于测试)。
      • 删除已转移的文件:在成功转移后删除文件。
      • 删除已转移的文件和空目录:在成功转移后删除文件和空目录。

    • 文件存在时间上限:包含在过去指定天数内修改的文件。默认值为 180 天。

    • 资源命名空间domaintools.threat_intel

    • 注入标签:要应用于此 Feed 中事件的标签。

  9. 点击下一步

  10. 最终确定界面中查看新的 Feed 配置,然后点击提交

UDM 映射表

日志字段 UDM 映射 逻辑
有效 principal.domain.status 直接从原始日志中的有效字段映射。
additional_whois_email.[].value about.labels.additional_whois_email 从 additional_whois_email 数组中提取,并作为标签添加到 about 对象中。
adsense.value about.labels.adsense 从 adsense.value 中提取并作为标签添加到 about 对象中。
admin_contact.city.value principal.domain.admin.office_address.city 直接从原始日志中的 admin_contact.city.value 字段映射。
admin_contact.country.value principal.domain.admin.office_address.country_or_region 直接从原始日志中的 admin_contact.country.value 字段映射。
admin_contact.email.[].value principal.domain.admin.email_addresses 从 admin_contact.email 数组中提取并添加到 email_addresses 字段。
admin_contact.fax.value principal.domain.admin.attribute.labels.fax 从 admin_contact.fax.value 中提取,并作为标签添加到 admin 属性中,键为“fax”。
admin_contact.name.value principal.domain.admin.user_display_name 直接从原始日志中的 admin_contact.name.value 字段映射。
admin_contact.org.value principal.domain.admin.company_name 直接从原始日志中的 admin_contact.org.value 字段映射。
admin_contact.phone.value principal.domain.admin.phone_numbers 直接从原始日志中的 admin_contact.phone.value 字段映射。
admin_contact.postal.value principal.domain.admin.attribute.labels.postal 从 admin_contact.postal.value 中提取,并作为具有键“postal”的标签添加到 admin 属性中。
admin_contact.state.value principal.domain.admin.office_address.state 直接从原始日志中的 admin_contact.state.value 字段映射。
admin_contact.street.value principal.domain.admin.office_address.name 直接从原始日志中的 admin_contact.street.value 字段映射。
alexa about.labels.alexa 直接从原始日志中的 alexa 字段映射,并作为标签添加到 about 对象中。
baidu_codes.[].value about.labels.baidu_codes 从 baidu_codes 数组中提取,并作为标签添加到 about 对象中。
billing_contact.city.value principal.domain.billing.office_address.city 直接从原始日志中的 billing_contact.city.value 字段映射。
billing_contact.country.value principal.domain.billing.office_address.country_or_region 直接从原始日志中的 billing_contact.country.value 字段映射。
billing_contact.email.[].value principal.domain.billing.email_addresses 从 billing_contact.email 数组中提取并添加到 email_addresses 字段中。
billing_contact.fax.value principal.domain.billing.attribute.labels.fax 从 billing_contact.fax.value 中提取,并作为标签添加到结算属性中,键为“传真”。
billing_contact.name.value principal.domain.billing.user_display_name 直接从原始日志中的 billing_contact.name.value 字段映射。
billing_contact.org.value principal.domain.billing.company_name 直接从原始日志中的 billing_contact.org.value 字段映射。
billing_contact.phone.value principal.domain.billing.phone_numbers 直接从原始日志中的 billing_contact.phone.value 字段映射。
billing_contact.postal.value principal.domain.billing.attribute.labels.postal 从 billing_contact.postal.value 中提取,并作为具有“postal”键的标签添加到结算属性中。
billing_contact.state.value principal.domain.billing.office_address.state 直接从原始日志中的 billing_contact.state.value 字段映射。
billing_contact.street.value principal.domain.billing.office_address.name 直接从原始日志中的 billing_contact.street.value 字段映射。
create_date.value principal.domain.creation_time 根据原始日志中的 create_date.value 字段转换为时间戳格式。
data_updated_timestamp principal.domain.audit_update_time 从原始日志中的 data_updated_timestamp 字段转换为时间戳格式。
域名 principal.hostname 直接从原始日志中的网域字段映射。
domain_risk.components.[].evidence security_result.detection_fields.evidence 从 domain_risk.components.[].evidence 数组中提取,并作为检测字段添加到 security_result 对象中,键为“evidence”。
domain_risk.components.[].name security_result.category_details 直接从原始日志中的 domain_risk.components.[].name 字段映射。
domain_risk.components.[].risk_score security_result.risk_score 直接从原始日志中的 domain_risk.components.[].risk_score 字段映射。
domain_risk.components.[].threats security_result.threat_name domain_risk.components.[].threats 数组的第一个元素会映射到 security_result.threat_name。
domain_risk.components.[].threats security_result.detection_fields.threats domain_risk.components.[].threats 数组的其余元素将作为检测字段添加到 security_result 对象中,键为“threats”。
domain_risk.risk_score security_result.risk_score 直接从原始日志中的 domain_risk.risk_score 字段映射。
email_domain.[].value about.labels.email_domain 从 email_domain 数组中提取,并作为标签添加到 about 对象中。
expiration_date.value principal.domain.expiration_time 从原始日志中的 expiration_date.value 字段转换为时间戳格式。
fb_codes.[].value about.labels.fb_codes 从 fb_codes 数组中提取并作为标签添加到 about 对象中。
first_seen.value principal.domain.first_seen_time 根据原始日志中的 first_seen.value 字段转换为时间戳格式。
ga4.[].value about.labels.ga4 从 GA4 数组中提取,并作为标签添加到 about 对象中。
google_analytics.value about.labels.google_analytics 从 google_analytics.value 中提取,并作为标签添加到 about 对象中。
gtm_codes.[].value about.labels.gtm_codes 从 gtm_codes 数组中提取并作为标签添加到 about 对象中。
hotjar_codes.[].value about.labels.hotjar_codes 从 hotjar_codes 数组中提取,并作为标签添加到 about 对象中。
ip.[].address.value principal.ip ip 数组的第一个元素会映射到 principal.ip。
ip.[].address.value about.labels.ip_address ip 数组的其余元素将作为标签添加到 about 对象中,键为“ip_address”。
ip.[].asn.[].value network.asn 第一个 ip.asn 数组的第一个元素会映射到 network.asn。
ip.[].asn.[].value about.labels.asn ip.asn 数组的其余元素将作为标签添加到 about 对象中,键为“asn”。
ip.[].country_code.value principal.location.country_or_region ip 数组中第一个元素的 country_code.value 会映射到 principal.location.country_or_region。
ip.[].country_code.value about.location.country_or_region ip 数组中其余元素的 country_code.value 会映射到 about.location.country_or_region。
ip.[].isp.value principal.labels.isp ip 数组中第一个元素的 isp.value 会映射到 principal.labels.isp。
ip.[].isp.value about.labels.isp ip 数组中其余元素的 isp.value 会映射到 about.labels.isp。
matomo_codes.[].value about.labels.matomo_codes 从 matomo_codes 数组中提取并作为标签添加到 about 对象中。
monitor_domain about.labels.monitor_domain 直接从原始日志中的 monitor_domain 字段映射,并作为标签添加到 about 对象中。
monitoring_domain_list_name about.labels.monitoring_domain_list_name 直接从原始日志中的 monitoring_domain_list_name 字段映射,并作为标签添加到 about 对象中。
mx.[].domain.value about.domain.name 直接从原始日志中的 mx.[].domain.value 字段映射。
mx.[].host.value about.hostname 直接从原始日志中的 mx.[].host.value 字段映射。
mx.[].ip.[].value about.ip 从 mx.[].ip 数组中提取并添加到 ip 字段。
mx.[].priority about.security_result.priority_details 直接从原始日志中的 mx.[].priority 字段映射。
name_server.[].domain.value about.labels.name_server_domain 从 name_server.[].domain.value 中提取,并作为标签添加到 about 对象中,键为“name_server_domain”。
name_server.[].host.value principal.domain.name_server 从 name_server.[].host.value 中提取并添加到 name_server 字段。
name_server.[].host.value about.domain.name_server 从 name_server.[].host.value 中提取并添加到 name_server 字段。
name_server.[].ip.[].value about.labels.ip 从 name_server.[].ip 数组中提取,并作为标签添加到 about 对象中,键为“ip”。
popularity_rank about.labels.popularity_rank 直接从原始日志中的 popularity_rank 字段映射,并作为标签添加到 about 对象中。
redirect.value about.labels.redirect 从 redirect.value 中提取,并作为标签添加到 about 对象中。
redirect_domain.value about.labels.redirect_domain 从 redirect_domain.value 中提取,并作为标签添加到 about 对象中。
registrant_contact.city.value principal.domain.registrant.office_address.city 直接从原始日志中的 registrant_contact.city.value 字段映射。
registrant_contact.country.value principal.domain.registrant.office_address.country_or_region 直接从原始日志中的 registrant_contact.country.value 字段映射。
registrant_contact.email.[].value principal.domain.registrant.email_addresses 从 registrant_contact.email 数组中提取并添加到 email_addresses 字段。
registrant_contact.fax.value principal.domain.registrant.attribute.labels.fax 从 registrant_contact.fax.value 中提取,并作为标签添加到注册人属性中,键为“传真”。
registrant_contact.name.value principal.domain.registrant.user_display_name 直接从原始日志中的 registrant_contact.name.value 字段映射。
registrant_contact.org.value principal.domain.registrant.company_name 直接从原始日志中的 registrant_contact.org.value 字段映射。
registrant_contact.phone.value principal.domain.registrant.phone_numbers 直接从原始日志中的 registrant_contact.phone.value 字段映射。
registrant_contact.postal.value principal.domain.registrant.attribute.labels.postal 从 registrant_contact.postal.value 中提取,并作为注册人属性中键为“postal”的标签添加。
registrant_contact.state.value principal.domain.registrant.office_address.state 直接从原始日志中的 registrant_contact.state.value 字段映射。
registrant_contact.street.value principal.domain.registrant.office_address.name 直接从原始日志中的 registrant_contact.street.value 字段映射。
registrant_name.value about.labels.registrant_name 从 registrant_name.value 中提取,并作为标签添加到 about 对象中。
registrant_org.value about.labels.registrant_org 从 registrant_org.value 中提取,并作为标签添加到 about 对象中。
registrar.value principal.domain.registrar 直接从原始日志中的 registrar.value 字段映射。
registrar_status about.labels.registrar_status 从 registrar_status 数组中提取,并作为标签添加到 about 对象中。
server_type network.tls.client.server_name 直接从原始日志中的 server_type 字段映射。
soa_email.[].value principal.user.email_addresses 从 soa_email 数组中提取并添加到 email_addresses 字段。
spf_info about.labels.spf_info 直接从原始日志中的 spf_info 字段映射,并作为标签添加到 about 对象中。
ssl_email.[].value about.labels.ssl_email 从 ssl_email 数组中提取,并作为标签添加到 about 对象中。
ssl_info.[].alt_names.[].value about.labels.alt_names 从 ssl_info.[].alt_names 数组中提取,并作为标签添加到 about 对象中。
ssl_info.[].common_name.value about.labels.common_name 从 ssl_info.[].common_name.value 中提取,并作为标签添加到 about 对象中。
ssl_info.[].duration.value about.labels.duration 从 ssl_info.[].duration.value 中提取,并作为标签添加到 about 对象中。
ssl_info.[].email.[].value about.labels.ssl_info_email 从 ssl_info.[].email 数组中提取,并作为标签添加到 about 对象中,键为“ssl_info_email”。
ssl_info.[].hash.value network.tls.server.certificate.sha1 ssl_info 数组中第一个元素的 hash.value 会映射到 network.tls.server.certificate.sha1。
ssl_info.[].hash.value about.labels.hash ssl_info 数组中其余元素的 hash.value 会映射到 about.labels.hash。
ssl_info.[].issuer_common_name.value network.tls.server.certificate.issuer ssl_info 数组中第一个元素的 issuer_common_name.value 会映射到 network.tls.server.certificate.issuer。
ssl_info.[].issuer_common_name.value about.labels.issuer_common_name ssl_info 数组中其余元素的 issuer_common_name.value 会映射到 about.labels.issuer_common_name。
ssl_info.[].not_after.value network.tls.server.certificate.not_after ssl_info 数组中第一个元素的 not_after.value 会转换为时间戳格式,并映射到 network.tls.server.certificate.not_after。
ssl_info.[].not_after.value about.labels.not_after ssl_info 数组中其余元素的 not_after.value 会映射到 about.labels.not_after。
ssl_info.[].not_before.value network.tls.server.certificate.not_before ssl_info 数组中第一个元素的 not_before.value 转换为时间戳格式,并映射到 network.tls.server.certificate.not_before。
ssl_info.[].not_before.value about.labels.not_before ssl_info 数组中其余元素的 not_before.value 会映射到 about.labels.not_before。
ssl_info.[].organization.value network.organization_name ssl_info 数组中第一个元素的 organization.value 会映射到 network.organization_name。
ssl_info.[].organization.value about.labels.organization ssl_info 数组中其余元素的 organization.value 会映射到 about.labels.organization。
ssl_info.[].subject.value about.labels.subject 从 ssl_info.[].subject.value 中提取,并作为标签添加到 about 对象中。
statcounter_project_codes.[].value about.labels.statcounter_project_codes 从 statcounter_project_codes 数组中提取,并作为标签添加到 about 对象中。
statcounter_security_codes.[].value about.labels.statcounter_security_codes 从 statcounter_security_codes 数组中提取,并作为标签添加到 about 对象中。
tags.[].label about.file.tags 从 [].label 标记中提取并添加到标记字段。
tags.[].scope security_result.detection_fields.scope 从标记 [].scope 中提取,并作为检测字段添加到 security_result 对象中,键为“scope”。
tags.[].tagged_at security_result.detection_fields.tagged_at 从标记中提取。[].tagged_at 并作为检测字段添加到 security_result 对象中,键为“tagged_at”。
technical_contact.city.value principal.domain.tech.office_address.city 直接从原始日志中的 technical_contact.city.value 字段映射。
technical_contact.country.value principal.domain.tech.office_address.country_or_region 直接从原始日志中的 technical_contact.country.value 字段映射。
technical_contact.email.[].value principal.domain.tech.email_addresses 从 technical_contact.email 数组中提取并添加到 email_addresses 字段中。
technical_contact.fax.value principal.domain.tech.attribute.labels.fax 从 technical_contact.fax.value 中提取,并作为具有键“fax”的标签添加到技术属性中。
technical_contact.name.value principal.domain.tech.user_display_name 直接从原始日志中的 technical_contact.name.value 字段映射。
technical_contact.org.value principal.domain.tech.company_name 直接从原始日志中的 technical_contact.org.value 字段映射。
technical_contact.phone.value principal.domain.tech.phone_numbers 直接从原始日志中的 technical_contact.phone.value 字段映射。
technical_contact.postal.value principal.domain.tech.attribute.labels.postal 从 technical_contact.postal.value 中提取,并作为键为“postal”的标签添加到技术属性中。
technical_contact.state.value principal.domain.tech.office_address.state 直接从原始日志中的 technical_contact.state.value 字段映射。
technical_contact.street.value principal.domain.tech.office_address.name 直接从原始日志中的 technical_contact.street.value 字段映射。
tld about.labels.tld 直接从原始日志中的 tld 字段映射,并作为标签添加到 about 对象中。
时间戳 about.labels.timestamp 直接从原始日志中的时间戳字段映射,并作为标签添加到 about 对象中。
website_response principal.network.http.response_code 直接从原始日志中的 website_response 字段映射。
website_title about.labels.website_title 直接从原始日志中的 website_title 字段映射,并作为标签添加到 about 对象中。
whois_url principal.domain.whois_server 直接从原始日志中的 whois_url 字段映射。
yandex_codes.[].value about.labels.yandex_codes 从 yandex_codes 数组中提取并作为标签添加到 about 对象中。
edr.client.hostname 设置为网域字段的值。
edr.client.ip_addresses 设置为 IP 数组中第一个元素的值,具体为 ip.[0].address.value。
edr.raw_event_name 如果存在 principal.hostname,则设置为“STATUS_UPDATE”,否则设置为“GENERIC_EVENT”。
metadata.event_timestamp 从原始日志中的顶级 create_time 字段复制。
metadata.event_type 如果存在 principal.hostname,则设置为“STATUS_UPDATE”,否则设置为“GENERIC_EVENT”。
metadata.log_type 设置为“DOMAINTOOLS_THREATINTEL”。
metadata.product_name 设置为“DOMAINTOOLS”。
metadata.vendor_name 设置为“DOMAINTOOLS”。

需要更多帮助?获得社区成员和 Google SecOps 专业人士的解答。