收集 ESET Threat Intelligence 記錄

支援的國家/地區:

本文說明如何使用 Google Cloud Storage V2、Cloud Run 函式和 Cloud Scheduler,將 ESET Threat Intelligence 記錄檔擷取至 Google Security Operations。

ESET Threat Intelligence (ETI) 提供現有或新興威脅的實證資訊和實用建議。ETI 服務會針對可能威脅貴機構或客戶的惡意軟體或活動發出警告。這項服務會透過 TAXII 2.1 資訊提供 STIX 2.1 格式的威脅情報資料,包括 APT IoC、殭屍網路 C&C 和目標、惡意網域、IP、網址、檔案、網路釣魚網址、勒索軟體和 Android 威脅。

事前準備

請確認您已完成下列事前準備事項:

  • Google SecOps 執行個體
  • 已啟用下列 API 的 Google Cloud 專案:
    • Cloud Storage API
    • Cloud Run 函式 API
    • Cloud Scheduler API
    • Cloud Pub/Sub API
  • 建立及管理 Google Cloud Storage 值區、Cloud Run 函式、Pub/Sub 主題和 Cloud Scheduler 工作的權限
  • 管理 Google Cloud Storage 值區 IAM 政策的權限
  • 有效的 ESET Threat Intelligence 訂閱方案
  • 存取 ESET 威脅情報入口網站 (https://eti.eset.com)

建立 Google Cloud Storage 值區

  1. 前往 Google Cloud 控制台
  2. 選取專案或建立新專案。
  3. 在導覽選單中,依序前往「Cloud Storage」>「Bucket」
  4. 按一下「建立值區」
  5. 請提供下列設定詳細資料:

    設定
    為 bucket 命名 輸入全域不重複的名稱 (例如 eset-ti-logs)
    位置類型 根據需求選擇 (區域、雙區域、多區域)
    位置 選取地點 (例如 us-central1)
    儲存空間級別 標準 (建議用於經常存取的記錄)
    存取控管 統一 (建議)
    保護工具 選用:啟用物件版本管理或保留政策
  6. 點選「建立」

收集 ESET Threat Intelligence TAXII 憑證

如要讓 Cloud Run 函式擷取威脅情資資料,您必須啟用 TAXII 動態饋給,並從 ETI 入口網站產生 TAXII 憑證。

啟用 TAXII 動態饋給

  1. 前往 https://eti.eset.com 登入 ESET Threat Intelligence 入口網站
  2. 前往主選單中的「資料動態饋給」
  3. 找出要啟用的資料動態饋給,然後按一下旁邊的三點圖示。
  4. 選取「啟用動態消息」
  5. 針對要匯入 Google SecOps 的每個動態饋給,重複執行步驟 3 和 4。

產生 TAXII 憑證

  1. 在 ESET Threat Intelligence 入口網站中,依序前往「Admin Settings」>「Access Credentials」
  2. 按一下「產生 TAXII 憑證」
  3. 在隨即顯示的對話方塊中,複製並儲存下列值:

    • 使用者名稱:您的 TAXII 使用者名稱
    • 密碼:您的 TAXII 密碼

記錄 TAXII 動態饋給詳細資料

啟用動態饋給並產生憑證後,請記錄要擷取的每個動態饋給的下列資訊:

  1. 在 ESET Threat Intelligence 入口網站中,前往「Data Feeds」(資料動態饋給)
  2. 按一下已啟用動態饋給旁的三點圖示。
  3. 選取「顯示資料動態饋給詳細資料」
  4. 在側邊面板中,記下下列值:

    • TAXII 動態饋給名稱:動態饋給 ID (例如 botnet stix 2.1)
    • TAXII 2 ID:集合 ID (例如 0abb06690b0b47e49cd7794396b76b20)
    • TAXII 2 動態消息網址:完整集合網址

可用的 TAXII 動態饋給

  • ESET Threat Intelligence 提供下列 TAXII 2.1 動態消息:

    動態饋給名稱 TAXII 動態饋給名稱 集合 ID
    Android 竊資軟體動態消息 androidinfostealer stix 2.1 9ee501cde0c44d6db4ae995fead1a7c8
    Android 威脅動態消息 androidthreats stix 2.1 daf3de8fab144552a1cb5af054ed07ee
    APT IoC apt stix 2.1 97e3eb74ae5f46dd9e22f677a6938ee7
    殭屍網路動態消息 botnet stix 2.1 0abb06690b0b47e49cd7794396b76b20
    殭屍網路 - C&C botnet.cc stix 2.1 d1923a526e8f400dbb301259240ee3d5
    殭屍網路 - 目標 botnet.target stix 2.1 61b6e4f9153e411ca7a9982a2c6ae788
    加密貨幣詐騙動態消息 cryptoscam stix 2.1 2c183ce9551a43338c6cc2ed7c2a704d
    網域動態消息 網域 STIX 2.1 a34aa0a4f9de419582a883863503f9c4
    eCrime IoC 動態饋給 ecrime stix 2.1 08059376eac84ec4a076cfd682493f91
    IP 動態饋給 ip stix 2.1 baaed2a92335418aa753fe944e13c23a
    惡意電子郵件附件 emailattachments stix 2.1 c0d56cf7f81d482eb97fd46beaa4bae0
    惡意檔案動態饋給 檔案 stix 2.1 ee6a153ed77e4ec3ab21e76cc2074b9f
    網路釣魚網址動態饋給 phishingurl stix 2.1 d0a6c0f962dd4dd2b3eeb96b18612584
    PUA 廣告軟體檔案動態饋給 puaadware stix 2.1 d1bfc81202fc4c6599326771ec2da41d
    PUA 雙用途應用程式檔案動態饋給 puadualapps stix 2.1 970a7d0039ac4668addf058cd9feb953
    勒索軟體動態消息 勒索軟體 stix 2.1 8d3490d688ce4a989aee9af5c680d8bf
    詐騙網址動態饋給 scamurl stix 2.1 2130adc3c67c43f9a3664b187931375e
    網路釣魚動態消息 smishing stix 2.1 330ad7d0c736476babe5e49077b96c95
    簡訊詐騙動態消息 smsscam stix 2.1 6e20217a2e1246b8ab11be29f759f716
    網址動態饋給 url stix 2.1 1d3208c143be49da8130f5a66fd3a0fa

為 Cloud Run 函式建立服務帳戶

  1. Google Cloud 控制台中,依序前往「IAM 與管理」>「服務帳戶」
  2. 按一下「Create Service Account」(建立服務帳戶)
  3. 請提供下列設定詳細資料:

    • 服務帳戶名稱:輸入 eset-ti-collector
    • 服務帳戶說明:輸入 Service account for ESET Threat Intelligence Cloud Run function to write STIX objects to GCS
  4. 按一下「建立並繼續」

  5. 在「將專案存取權授予這個服務帳戶」部分,新增下列角色:

    1. 按一下「選取角色」,然後搜尋並選取「Storage 物件管理員」
    2. 按一下「新增其他角色」,然後搜尋並選取「Cloud Run Invoker」
  6. 按一下「繼續」

  7. 按一下 [完成]

授予 Google Cloud Storage bucket 的 IAM 權限

  1. 依序前往「Cloud Storage」>「Buckets」
  2. 按一下 bucket 名稱 (例如 eset-ti-logs)。
  3. 前往「權限」分頁標籤。
  4. 按一下「授予存取權」
  5. 請提供下列設定詳細資料:

    • 新增主體:輸入服務帳戶電子郵件地址 (例如 eset-ti-collector@PROJECT_ID.iam.gserviceaccount.com)
    • 指派角色:選取「Storage 物件管理員」
  6. 按一下 [儲存]

建立 Pub/Sub 主題

當 Cloud Scheduler 發布訊息時,Pub/Sub 主題會觸發 Cloud Run 函式。

  1. Google Cloud 控制台中,依序前往「Pub/Sub」>「Topics」(主題)
  2. 按一下 [Create Topic] (建立主題)
  3. 請提供下列設定詳細資料:
    • 主題 ID:輸入 eset-ti-trigger
    • 新增預設訂閱項目:保持選取狀態
  4. 點選「建立」

建立 Cloud Run 函式

  1. 前往 Google Cloud 控制台中的「Cloud Run functions」(Cloud Run 函式)
  2. 按一下「Create function」(建立函式)
  3. 請提供下列設定詳細資料:

    設定
    環境 第 2 代
    函式名稱 eset-ti-collector
    區域 選取與 GCS 值區相同的區域
    觸發條件類型 Cloud Pub/Sub
    Pub/Sub 主題 eset-ti-trigger
    分配的記憶體 512 MiB
    逾時 540 秒
    執行階段服務帳戶 eset-ti-collector
  4. 點選「下一步」

  5. 將「執行階段」設為「Python 3.12」

  6. 將「Entry point」(進入點) 設為 main

  7. requirements.txt 檔案中新增下列依附元件:

    functions-framework==3.*
    google-cloud-storage==2.*
    urllib3==2.*
    
  8. main.py 檔案中貼上下列程式碼:

    import functions_framework
    import json
    import os
    import logging
    import time
    import urllib3
    from datetime import datetime, timedelta, timezone
    from google.cloud import storage
    
    logger = logging.getLogger(__name__)
    logger.setLevel(logging.INFO)
    
    HTTP = urllib3.PoolManager(retries=False)
    storage_client = storage.Client()
    
    API_ROOT = "https://taxii.eset.com/taxii2/643f4eb5-f8b7-46a3-a606-6d61d5ce223a"
    TAXII_CONTENT_TYPE = "application/taxii+json;version=2.1"
    
    def _load_state(bucket_name: str, state_key: str, lookback_hours: int) -> str:
        """Return ISO8601 checkpoint (UTC)."""
        try:
            bucket = storage_client.bucket(bucket_name)
            blob = bucket.blob(state_key)
            if blob.exists():
                state_data = blob.download_as_text()
                state = json.loads(state_data)
                ts = state.get("last_poll_time")
                if ts:
                    logger.info(f"Loaded state: {ts}")
                    return ts
        except Exception as e:
            logger.warning(f"State read error: {e}")
        default_ts = (
            datetime.now(timezone.utc) - timedelta(hours=lookback_hours)
        ).strftime("%Y-%m-%dT%H:%M:%S.000Z")
        logger.info(f"No previous state found, using lookback: {default_ts}")
        return default_ts
    
    def _save_state(bucket_name: str, state_key: str, ts: str) -> None:
        """Persist the checkpoint to GCS."""
        bucket = storage_client.bucket(bucket_name)
        blob = bucket.blob(state_key)
        blob.upload_from_string(
            json.dumps({"last_poll_time": ts}),
            content_type="application/json",
        )
        logger.info(f"Saved state: {ts}")
    
    def _fetch_objects(
        username: str,
        password: str,
        collection_id: str,
        added_after: str,
        max_records: int,
    ) -> list:
        """Query TAXII 2.1 collection objects with pagination."""
        url = f"{API_ROOT}/collections/{collection_id}/objects/"
        headers = urllib3.make_headers(basic_auth=f"{username}:{password}")
        headers["Accept"] = TAXII_CONTENT_TYPE
        headers["User-Agent"] = "Chronicle-ESET-TI-GCS/1.0"
    
        all_objects = []
        params = {"added_after": added_after}
    
        while True:
            qs = "&".join(f"{k}={v}" for k, v in params.items())
            request_url = f"{url}?{qs}" if qs else url
    
            for attempt in range(3):
                try:
                    resp = HTTP.request("GET", request_url, headers=headers)
                    break
                except Exception as e:
                    wait = 2 ** (attempt + 1)
                    logger.warning(f"Request error: {e}, retrying in {wait}s")
                    time.sleep(wait)
            else:
                raise RuntimeError("Exceeded retry budget for TAXII API")
    
            if resp.status == 401:
                raise RuntimeError("Authentication failed: check TAXII credentials")
            if resp.status == 404:
                raise RuntimeError(
                    f"Collection not found: {collection_id}"
                )
            if resp.status not in (200, 206):
                raise RuntimeError(
                    f"TAXII API error {resp.status}: {resp.data[:500]}"
                )
    
            body = json.loads(resp.data.decode("utf-8"))
            objects = body.get("objects", [])
            all_objects.extend(objects)
            logger.info(
                f"Fetched {len(objects)} objects (total: {len(all_objects)})"
            )
    
            if len(all_objects) >= max_records:
                logger.info(f"Reached max_records limit: {max_records}")
                all_objects = all_objects[:max_records]
                break
    
            more = body.get("more", False)
            next_param = body.get("next")
            if more and next_param:
                params = {"added_after": added_after, "next": next_param}
            else:
                break
    
        return all_objects
    
    @functions_framework.cloud_event
    def main(cloud_event):
        """Cloud Run function entry point triggered by Pub/Sub."""
        bucket_name = os.environ["GCS_BUCKET"]
        prefix = os.environ.get("GCS_PREFIX", "eset-ti")
        state_key = os.environ.get("STATE_KEY", "eset-ti/state.json")
        username = os.environ["TAXII_USERNAME"]
        password = os.environ["TAXII_PASSWORD"]
        collection_id = os.environ["COLLECTION_ID"]
        max_records = int(os.environ.get("MAX_RECORDS", "10000"))
        lookback_hours = int(os.environ.get("LOOKBACK_HOURS", "48"))
    
        try:
            last_poll = _load_state(bucket_name, state_key, lookback_hours)
            objects = _fetch_objects(
                username, password, collection_id, last_poll, max_records
            )
    
            if not objects:
                logger.info("No new STIX objects found")
                return "No new objects", 200
    
            now_str = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
            blob_path = (
                f"{prefix}/eset_ti_{collection_id}_{now_str}.json"
            )
            ndjson_body = "\n".join(
                json.dumps(obj, separators=(",", ":")) for obj in objects
            )
    
            bucket = storage_client.bucket(bucket_name)
            blob = bucket.blob(blob_path)
            blob.upload_from_string(
                ndjson_body, content_type="application/x-ndjson"
            )
    
            new_poll_time = datetime.now(timezone.utc).strftime(
                "%Y-%m-%dT%H:%M:%S.000Z"
            )
            _save_state(bucket_name, state_key, new_poll_time)
    
            msg = (
                f"Wrote {len(objects)} STIX objects to "
                f"gs://{bucket_name}/{blob_path}"
            )
            logger.info(msg)
            return msg, 200
    
        except Exception as e:
            logger.error(f"Error collecting ESET TI: {e}")
            raise
    
  9. 點選「Deploy」(部署)

  10. 等待函式部署完成。部署完成後,狀態會變更為綠色勾號。

設定環境變數

  1. 函式部署完畢後,請前往「Cloud Run functions」> eset-ti-collector
  2. 按一下「編輯及部署新的修訂版本」
  3. 按一下「變數和密鑰」分頁標籤 (或展開「執行階段、建構作業、連線和安全性設定」,適用於第 1 代)。
  4. 新增下列環境變數:

    範例值
    GCS_BUCKET eset-ti-logs
    GCS_PREFIX eset-ti
    STATE_KEY eset-ti/state.json
    TAXII_USERNAME ETI 入口網站的 TAXII 使用者名稱
    TAXII_PASSWORD ETI 入口網站的 TAXII 密碼
    COLLECTION_ID 0abb06690b0b47e49cd7794396b76b20
    MAX_RECORDS 10000
    LOOKBACK_HOURS 48
  5. 點選「Deploy」(部署)

建立 Cloud Scheduler 工作

Cloud Scheduler 會依排程將訊息發布至 Pub/Sub 主題,觸發 Cloud Run 函式輪詢 ESET Threat Intelligence,找出新的 STIX 物件。

  1. 前往 Google Cloud 控制台的「Cloud Scheduler」
  2. 點選「建立工作」
  3. 請提供下列設定詳細資料:

    設定
    名稱 eset-ti-poll
    區域 選取與函式相同的區域
    頻率 0 */1 * * * (每小時)
    時區 選取時區 (例如 UTC)
  4. 按一下「繼續」

  5. 在「設定執行作業」部分:

    • 目標類型:選取「Pub/Sub」
    • 主題:選取 eset-ti-trigger
    • 訊息內文:輸入 {"poll": true}
  6. 點選「建立」

驗證 Cloud Run 函式

  1. Cloud Scheduler 中,找到 eset-ti-poll 工作。
  2. 按一下「強制執行」,即可立即執行。
  3. 依序前往「Cloud Run functions」>「eset-ti-collector」>「Logs」
  4. 檢查是否有下列記錄項目,確認函式是否成功執行:

    Fetched 250 objects (total: 250)
    Wrote 250 STIX objects to gs://eset-ti-logs/eset-ti/eset_ti_0abb06690b0b47e49cd7794396b76b20_20250115_103000.json
    
  5. 依序前往「Cloud Storage」>「Buckets」>「eset-ti-logs」

  6. 前往 eset-ti/ 前置字元。

  7. 確認系統是否正在建立含有 STIX 物件的 NDJSON 檔案。

擷取 Google SecOps 服務帳戶並設定動態饋給

Google SecOps 會使用專屬服務帳戶,從 GCS bucket 讀取資料。您必須授予這個服務帳戶值區存取權。

取得服務帳戶電子郵件地址

  1. 依序前往「SIEM 設定」>「動態饋給」
  2. 按一下「新增動態消息」
  3. 按一下「設定單一動態饋給」
  4. 在「動態饋給名稱」欄位中輸入動態饋給名稱 (例如 ESET Threat Intelligence - Botnet)。
  5. 選取「Google Cloud Storage V2」做為「來源類型」
  6. 選取「ESET Threat Intelligence」做為「記錄類型」
  7. 按一下「取得服務帳戶」。系統會顯示不重複的服務帳戶電子郵件地址,例如:

    chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com
    
  8. 複製這個電子郵件地址,以便在下一步中使用。

  9. 點選「下一步」

  10. 指定下列輸入參數的值:

    • 儲存空間 bucket URL:輸入 GCS bucket URI,並加上前置路徑:

      gs://eset-ti-logs/eset-ti/
      
    • 來源刪除選項:根據偏好設定選取刪除選項:

      • 永不:移轉後一律不刪除任何檔案 (建議用於測試)。
      • 刪除已轉移的檔案:成功轉移檔案後刪除檔案。
      • 刪除已轉移的檔案和空白目錄:成功轉移後刪除檔案和空白目錄。

    • 檔案存在時間上限:包含在過去天數內修改的檔案 (預設為 180 天)

    • 資產命名空間資產命名空間

    • 擷取標籤:要套用至這個動態饋給中事件的標籤 (例如 ESET_IOC)

  11. 點選「下一步」

  12. 在「Finalize」(完成) 畫面中檢查新的動態饋給設定,然後按一下「Submit」(提交)

將 IAM 權限授予 Google SecOps 服務帳戶

Google SecOps 服務帳戶必須具備 Google Cloud Storage bucket 的「Storage 物件檢視者」角色。

  1. 依序前往「Cloud Storage」>「Buckets」
  2. 按一下 bucket 名稱 (例如 eset-ti-logs)。
  3. 前往「權限」分頁標籤。
  4. 按一下「授予存取權」
  5. 請提供下列設定詳細資料:
    • 新增主體:貼上 Google SecOps 服務帳戶電子郵件地址
    • 指派角色:選取「Storage 物件檢視者」
  6. 按一下 [儲存]

UDM 對應表

記錄欄位 UDM 對應 邏輯
時間 metadata.event_timestamp 事件發生的時間戳記
metadata.event_type 事件類型 (例如 USER_LOGIN、NETWORK_CONNECTION)
messageid metadata.id 事件的專屬 ID
通訊協定 network.ip_protocol IP 通訊協定 (例如 TCP、UDP)
deviceName principal.hostname 來源主機名稱
srcAddr principal.ip 連線的來源 IP 位址
srcPort principal.port 來源通訊埠號碼
動作 security_result.action 安全性產品採取的動作 (例如 ALLOW、BLOCK)
dstAddr target.ip 目的地 IP 位址
dstPort target.port 目的地通訊埠號碼
metadata.product_name 產品名稱
metadata.vendor_name 供應商/公司名稱

還有其他問題嗎?向社群成員和 Google SecOps 專業人員尋求答案。