收集 ESET Threat Intelligence 日志

支持的平台:

本文档介绍了如何使用 Google Cloud Storage V2、Cloud Run 函数和 Cloud Scheduler 将 ESET 威胁情报日志注入到 Google Security Operations 中。

ESET Threat Intelligence (ETI) 可提供基于证据的信息和有关现有或新兴威胁的实用建议。ETI 服务会针对可能威胁贵组织或其客户的恶意软件或活动发出警告。该服务通过 STIX 2.1 格式的 TAXII 2.1 Feed 提供威胁情报数据,包括 APT IoC、僵尸网络 C&C 和目标、恶意网域、IP、网址、文件、钓鱼网址、勒索软件和 Android 威胁。

准备工作

请确保满足以下前提条件:

  • Google SecOps 实例
  • 已启用以下 API 的 Google Cloud 项目:
    • Cloud Storage API
    • Cloud Run Functions API
    • Cloud Scheduler API
    • Cloud Pub/Sub API
  • 创建和管理 Google Cloud Storage 存储分区、Cloud Run 函数、Pub/Sub 主题和 Cloud Scheduler 作业的权限
  • 用于管理 Google Cloud Storage 存储分区的 IAM 政策的权限
  • 有效的 ESET Threat Intelligence 订阅
  • 访问 ESET Threat Intelligence 门户 (https://eti.eset.com)

创建 Google Cloud Storage 存储分区

  1. 前往 Google Cloud 控制台
  2. 选择您的项目或创建新项目。
  3. 在导航菜单中,依次前往 Cloud Storage > 存储分区
  4. 点击创建存储分区
  5. 提供以下配置详细信息:

    设置
    为存储桶命名 输入一个全局唯一的名称(例如 eset-ti-logs
    位置类型 根据您的需求进行选择(区域级、双区域级、多区域级)
    位置 选择相应位置(例如 us-central1
    存储类别 标准(建议用于经常访问的日志)
    访问权限控制 统一(推荐)
    保护工具 可选:启用对象版本控制或保留政策
  6. 点击创建

收集 ESET Threat Intelligence TAXII 凭据

如需让 Cloud Run 函数能够检索威胁情报数据,您需要从 ETI 门户网站激活 TAXII Feed 并生成 TAXII 凭据。

激活 TAXII Feed

  1. 访问 https://eti.eset.com,登录 ESET 威胁情报门户
  2. 前往主菜单中的数据 Feed
  3. 点击要激活的数据 Feed 旁边的三点状图标
  4. 选择启用 Feed
  5. 针对要注入到 Google SecOps 中的每个 Feed 重复执行第 3 步和第 4 步。

生成 TAXII 凭据

  1. 在 ESET Threat Intelligence 门户中,依次前往 Admin Settings > Access Credentials
  2. 点击 Generate TAXII Credentials(生成 TAXII 凭据)。
  3. 在显示的对话框中,复制并保存以下值:

    • 用户名:您的 TAXII 用户名
    • 密码:您的 TAXII 密码

记录 TAXII Feed 详细信息

激活 Feed 并生成凭据后,请记录您要注入的每个 Feed 的以下信息:

  1. 在 ESET Threat Intelligence 门户中,前往数据 Feed
  2. 点击已启用的 Feed 旁边的三点状图标。
  3. 选择 Show Data Feed detail
  4. 在侧边栏中,记下以下值:

    • TAXII Feed 名称:Feed 标识符(例如 botnet stix 2.1
    • TAXII 2 ID:集合 ID(例如 0abb06690b0b47e49cd7794396b76b20
    • TAXII 2 Feed 网址:完整的集合网址

可用的 TAXII Feed

  • ESET Threat Intelligence 提供以下 TAXII 2.1 Feed:

    Feed 名称 TAXII Feed 名称 集合 ID
    Android 窃取信息恶意软件 Feed androidinfostealer stix 2.1 9ee501cde0c44d6db4ae995fead1a7c8
    Android 威胁 Feed androidthreats stix 2.1 daf3de8fab144552a1cb5af054ed07ee
    APT IoC apt stix 2.1 97e3eb74ae5f46dd9e22f677a6938ee7
    僵尸网络 Feed 僵尸网络 STIX 2.1 0abb06690b0b47e49cd7794396b76b20
    僵尸网络 - C&C botnet.cc STIX 2.1 d1923a526e8f400dbb301259240ee3d5
    僵尸网络 - 目标 botnet.target stix 2.1 61b6e4f9153e411ca7a9982a2c6ae788
    加密货币诈骗信息流 cryptoscam stix 2.1 2c183ce9551a43338c6cc2ed7c2a704d
    网域 Feed domain stix 2.1 a34aa0a4f9de419582a883863503f9c4
    eCrime IoC Feed ecrime stix 2.1 08059376eac84ec4a076cfd682493f91
    IP Feed ip stix 2.1 baaed2a92335418aa753fe944e13c23a
    恶意电子邮件附件 电子邮件附件 STIX 2.1 c0d56cf7f81d482eb97fd46beaa4bae0
    恶意文件 Feed 文件 stix 2.1 ee6a153ed77e4ec3ab21e76cc2074b9f
    钓鱼网址 Feed phishingurl STIX 2.1 d0a6c0f962dd4dd2b3eeb96b18612584
    PUA 广告软件文件 Feed puaadware stix 2.1 d1bfc81202fc4c6599326771ec2da41d
    PUA 双重用途应用文件 Feed puadualapps stix 2.1 970a7d0039ac4668addf058cd9feb953
    勒索软件 Feed 勒索软件 STIX 2.1 8d3490d688ce4a989aee9af5c680d8bf
    欺诈性网址 Feed scamurl stix 2.1 2130adc3c67c43f9a3664b187931375e
    网络钓鱼短信 Feed smishing stix 2.1 330ad7d0c736476babe5e49077b96c95
    短信诈骗信息流 smsscam stix 2.1 6e20217a2e1246b8ab11be29f759f716
    网址 Feed url stix 2.1 1d3208c143be49da8130f5a66fd3a0fa

为 Cloud Run 函数创建服务账号

  1. Google Cloud 控制台中,依次前往 IAM 和管理 > 服务账号
  2. 点击创建服务账号
  3. 提供以下配置详细信息:

    • 服务账号名称:输入 eset-ti-collector
    • 服务账号说明:输入 Service account for ESET Threat Intelligence Cloud Run function to write STIX objects to GCS
  4. 点击创建并继续

  5. 向此服务账号授予对项目的访问权限部分中,添加以下角色:

    1. 点击选择角色,然后搜索并选择 Storage Object Admin
    2. 点击添加其他角色,然后搜索并选择 Cloud Run Invoker
  6. 点击继续

  7. 点击完成

授予对 Google Cloud Storage 存储桶的 IAM 权限

  1. 前往 Cloud Storage > 存储分区
  2. 点击您的存储桶名称(例如 eset-ti-logs)。
  3. 前往权限标签页。
  4. 点击授予访问权限
  5. 提供以下配置详细信息:

    • 添加主账号:输入服务账号电子邮件地址(例如 eset-ti-collector@PROJECT_ID.iam.gserviceaccount.com
    • 分配角色:选择 Storage Object Admin
  6. 点击保存

创建发布/订阅主题

当 Cloud Scheduler 发布消息时,Pub/Sub 主题会触发 Cloud Run 函数。

  1. Google Cloud 控制台中,前往 Pub/Sub > 主题
  2. 点击创建主题
  3. 提供以下配置详细信息:
    • 主题 ID:输入 eset-ti-trigger
    • 添加默认订阅:保持选中状态
  4. 点击创建

创建 Cloud Run 函数

  1. Google Cloud 控制台中,前往 Cloud Run 函数
  2. 点击创建函数
  3. 提供以下配置详细信息:

    设置
    环境 第 2 代
    函数名称 eset-ti-collector
    区域 选择与您的 GCS 存储桶相同的区域
    触发器类型 Cloud Pub/Sub
    发布/订阅主题 eset-ti-trigger
    分配的内存 512 MiB
    超时 540 秒
    运行时服务账号 eset-ti-collector
  4. 点击下一步

  5. 运行时设置为 Python 3.12

  6. 入口点设置为 main

  7. requirements.txt 文件中,添加以下依赖项:

    functions-framework==3.*
    google-cloud-storage==2.*
    urllib3==2.*
    
  8. main.py 文件中,粘贴以下代码:

    import functions_framework
    import json
    import os
    import logging
    import time
    import urllib3
    from datetime import datetime, timedelta, timezone
    from google.cloud import storage
    
    logger = logging.getLogger(__name__)
    logger.setLevel(logging.INFO)
    
    HTTP = urllib3.PoolManager(retries=False)
    storage_client = storage.Client()
    
    API_ROOT = "https://taxii.eset.com/taxii2/643f4eb5-f8b7-46a3-a606-6d61d5ce223a"
    TAXII_CONTENT_TYPE = "application/taxii+json;version=2.1"
    
    def _load_state(bucket_name: str, state_key: str, lookback_hours: int) -> str:
        """Return ISO8601 checkpoint (UTC)."""
        try:
            bucket = storage_client.bucket(bucket_name)
            blob = bucket.blob(state_key)
            if blob.exists():
                state_data = blob.download_as_text()
                state = json.loads(state_data)
                ts = state.get("last_poll_time")
                if ts:
                    logger.info(f"Loaded state: {ts}")
                    return ts
        except Exception as e:
            logger.warning(f"State read error: {e}")
        default_ts = (
            datetime.now(timezone.utc) - timedelta(hours=lookback_hours)
        ).strftime("%Y-%m-%dT%H:%M:%S.000Z")
        logger.info(f"No previous state found, using lookback: {default_ts}")
        return default_ts
    
    def _save_state(bucket_name: str, state_key: str, ts: str) -> None:
        """Persist the checkpoint to GCS."""
        bucket = storage_client.bucket(bucket_name)
        blob = bucket.blob(state_key)
        blob.upload_from_string(
            json.dumps({"last_poll_time": ts}),
            content_type="application/json",
        )
        logger.info(f"Saved state: {ts}")
    
    def _fetch_objects(
        username: str,
        password: str,
        collection_id: str,
        added_after: str,
        max_records: int,
    ) -> list:
        """Query TAXII 2.1 collection objects with pagination."""
        url = f"{API_ROOT}/collections/{collection_id}/objects/"
        headers = urllib3.make_headers(basic_auth=f"{username}:{password}")
        headers["Accept"] = TAXII_CONTENT_TYPE
        headers["User-Agent"] = "Chronicle-ESET-TI-GCS/1.0"
    
        all_objects = []
        params = {"added_after": added_after}
    
        while True:
            qs = "&".join(f"{k}={v}" for k, v in params.items())
            request_url = f"{url}?{qs}" if qs else url
    
            for attempt in range(3):
                try:
                    resp = HTTP.request("GET", request_url, headers=headers)
                    break
                except Exception as e:
                    wait = 2 ** (attempt + 1)
                    logger.warning(f"Request error: {e}, retrying in {wait}s")
                    time.sleep(wait)
            else:
                raise RuntimeError("Exceeded retry budget for TAXII API")
    
            if resp.status == 401:
                raise RuntimeError("Authentication failed: check TAXII credentials")
            if resp.status == 404:
                raise RuntimeError(
                    f"Collection not found: {collection_id}"
                )
            if resp.status not in (200, 206):
                raise RuntimeError(
                    f"TAXII API error {resp.status}: {resp.data[:500]}"
                )
    
            body = json.loads(resp.data.decode("utf-8"))
            objects = body.get("objects", [])
            all_objects.extend(objects)
            logger.info(
                f"Fetched {len(objects)} objects (total: {len(all_objects)})"
            )
    
            if len(all_objects) >= max_records:
                logger.info(f"Reached max_records limit: {max_records}")
                all_objects = all_objects[:max_records]
                break
    
            more = body.get("more", False)
            next_param = body.get("next")
            if more and next_param:
                params = {"added_after": added_after, "next": next_param}
            else:
                break
    
        return all_objects
    
    @functions_framework.cloud_event
    def main(cloud_event):
        """Cloud Run function entry point triggered by Pub/Sub."""
        bucket_name = os.environ["GCS_BUCKET"]
        prefix = os.environ.get("GCS_PREFIX", "eset-ti")
        state_key = os.environ.get("STATE_KEY", "eset-ti/state.json")
        username = os.environ["TAXII_USERNAME"]
        password = os.environ["TAXII_PASSWORD"]
        collection_id = os.environ["COLLECTION_ID"]
        max_records = int(os.environ.get("MAX_RECORDS", "10000"))
        lookback_hours = int(os.environ.get("LOOKBACK_HOURS", "48"))
    
        try:
            last_poll = _load_state(bucket_name, state_key, lookback_hours)
            objects = _fetch_objects(
                username, password, collection_id, last_poll, max_records
            )
    
            if not objects:
                logger.info("No new STIX objects found")
                return "No new objects", 200
    
            now_str = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
            blob_path = (
                f"{prefix}/eset_ti_{collection_id}_{now_str}.json"
            )
            ndjson_body = "\n".join(
                json.dumps(obj, separators=(",", ":")) for obj in objects
            )
    
            bucket = storage_client.bucket(bucket_name)
            blob = bucket.blob(blob_path)
            blob.upload_from_string(
                ndjson_body, content_type="application/x-ndjson"
            )
    
            new_poll_time = datetime.now(timezone.utc).strftime(
                "%Y-%m-%dT%H:%M:%S.000Z"
            )
            _save_state(bucket_name, state_key, new_poll_time)
    
            msg = (
                f"Wrote {len(objects)} STIX objects to "
                f"gs://{bucket_name}/{blob_path}"
            )
            logger.info(msg)
            return msg, 200
    
        except Exception as e:
            logger.error(f"Error collecting ESET TI: {e}")
            raise
    
  9. 点击部署

  10. 等待函数部署完毕。部署完成后,状态会变为绿色对勾标记。

配置环境变量

  1. 部署函数后,依次前往 Cloud Run functions > eset-ti-collector
  2. 点击修改和部署新的修订版本
  3. 点击变量和密钥标签页(或展开运行时、构建、连接和安全设置 [第 1 代])。
  4. 添加以下环境变量:

    示例值
    GCS_BUCKET eset-ti-logs
    GCS_PREFIX eset-ti
    STATE_KEY eset-ti/state.json
    TAXII_USERNAME 您在 ETI 门户中的 TAXII 用户名
    TAXII_PASSWORD ETI 门户中的 TAXII 密码
    COLLECTION_ID 0abb06690b0b47e49cd7794396b76b20
    MAX_RECORDS 10000
    LOOKBACK_HOURS 48
  5. 点击部署

创建 Cloud Scheduler 作业

Cloud Scheduler 会按计划将消息发布到 Pub/Sub 主题,从而触发 Cloud Run 函数轮询 ESET 威胁情报,以获取新的 STIX 对象。

  1. Google Cloud 控制台中,前往 Cloud Scheduler
  2. 点击创建作业
  3. 提供以下配置详细信息:

    设置
    名称 eset-ti-poll
    区域 选择与函数相同的区域
    频率 0 */1 * * *(每小时)
    时区 选择您所在的时区(例如 UTC
  4. 点击继续

  5. 配置执行部分中:

    • 目标类型:选择 Pub/Sub
    • 主题:选择 eset-ti-trigger
    • 消息正文:输入 {"poll": true}
  6. 点击创建

验证 Cloud Run 函数

  1. Cloud Scheduler 中,找到 eset-ti-poll 作业。
  2. 点击 Force Run 以触发立即执行。
  3. 依次前往 Cloud Run 函数> eset-ti-collector > 日志
  4. 通过检查日志条目(例如以下条目)来验证函数是否已成功执行:

    Fetched 250 objects (total: 250)
    Wrote 250 STIX objects to gs://eset-ti-logs/eset-ti/eset_ti_0abb06690b0b47e49cd7794396b76b20_20250115_103000.json
    
  5. 依次前往 Cloud Storage > 存储分区 > eset-ti-logs

  6. 前往 eset-ti/ 前缀。

  7. 验证是否正在创建包含 STIX 对象的 NDJSON 文件。

检索 Google SecOps 服务账号并配置 Feed

Google SecOps 使用唯一的服务账号从您的 GCS 存储桶中读取数据。您必须向此服务账号授予对您的存储桶的访问权限。

获取服务账号电子邮件地址

  1. 依次前往 SIEM 设置> Feed
  2. 点击添加新 Feed
  3. 点击配置单个 Feed
  4. Feed 名称字段中,输入 Feed 的名称(例如 ESET Threat Intelligence - Botnet)。
  5. 选择 Google Cloud Storage V2 作为来源类型
  6. 选择 ESET 威胁情报作为日志类型
  7. 点击获取服务账号。系统会显示一个唯一的服务账号电子邮件地址,例如:

    chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com
    
  8. 复制此电子邮件地址,以便在下一步中使用。

  9. 点击下一步

  10. 为以下输入参数指定值:

    • 存储桶网址:输入带有前缀路径的 GCS 存储桶 URI:

      gs://eset-ti-logs/eset-ti/
      
    • 来源删除选项:根据您的偏好选择删除选项:

      • 永不:转移后永不删除任何文件(建议用于测试)。
      • 删除已转移的文件:成功转移后删除文件。
      • 删除已转移的文件和空目录:成功转移后删除文件和空目录。

    • 文件存在时间上限:包含在过去指定天数内修改的文件(默认值为 180 天)

    • 资产命名空间资产命名空间

    • 注入标签:要应用于相应 Feed 中事件的标签(例如 ESET_IOC

  11. 点击下一步

  12. 最终确定界面中查看新的 Feed 配置,然后点击提交

向 Google SecOps 服务账号授予 IAM 权限

Google SecOps 服务账号需要对您的 Google Cloud Storage 存储桶具有 Storage Object Viewer 角色。

  1. 前往 Cloud Storage > 存储分区
  2. 点击您的存储桶名称(例如 eset-ti-logs)。
  3. 前往权限标签页。
  4. 点击授予访问权限
  5. 提供以下配置详细信息:
    • 添加主账号:粘贴 Google SecOps 服务账号电子邮件地址
    • 分配角色:选择 Storage Object Viewer
  6. 点击保存

UDM 映射表

日志字段 UDM 映射 逻辑
when metadata.event_timestamp 事件发生时的时间戳
metadata.event_type 事件类型(例如 USER_LOGIN、NETWORK_CONNECTION)
messageid metadata.id 事件的唯一标识符
协议 network.ip_protocol IP 协议(例如,TCP、UDP)
deviceName principal.hostname 来源主机名
srcAddr principal.ip 连接的来源 IP 地址
srcPort principal.port 来源端口号
操作 security_result.action 安全产品采取的操作(例如“允许”“阻止”)
dstAddr target.ip 目标 IP 地址
dstPort target.port 目标端口号
metadata.product_name 产品名称
metadata.vendor_name 供应商/公司名称

需要更多帮助?获得社区成员和 Google SecOps 专业人士的解答。