收集 ESET Threat Intelligence 記錄
本文說明如何使用 Google Cloud Storage V2、Cloud Run 函式和 Cloud Scheduler,將 ESET Threat Intelligence 記錄檔擷取至 Google Security Operations。
ESET Threat Intelligence (ETI) 提供現有或新興威脅的實證資訊和實用建議。ETI 服務會針對可能威脅貴機構或客戶的惡意軟體或活動發出警告。這項服務會透過 TAXII 2.1 資訊提供 STIX 2.1 格式的威脅情報資料,包括 APT IoC、殭屍網路 C&C 和目標、惡意網域、IP、網址、檔案、網路釣魚網址、勒索軟體和 Android 威脅。
事前準備
請確認您已完成下列事前準備事項:
- Google SecOps 執行個體
- 已啟用下列 API 的 Google Cloud 專案:
- Cloud Storage API
- Cloud Run 函式 API
- Cloud Scheduler API
- Cloud Pub/Sub API
- 建立及管理 Google Cloud Storage 值區、Cloud Run 函式、Pub/Sub 主題和 Cloud Scheduler 工作的權限
- 管理 Google Cloud Storage 值區 IAM 政策的權限
- 有效的 ESET Threat Intelligence 訂閱方案
- 存取 ESET 威脅情報入口網站 (https://eti.eset.com)
建立 Google Cloud Storage 值區
- 前往 Google Cloud 控制台。
- 選取專案或建立新專案。
- 在導覽選單中,依序前往「Cloud Storage」>「Bucket」。
- 按一下「建立值區」。
請提供下列設定詳細資料:
設定 值 為 bucket 命名 輸入全域不重複的名稱 (例如 eset-ti-logs)位置類型 根據需求選擇 (區域、雙區域、多區域) 位置 選取地點 (例如 us-central1)儲存空間級別 標準 (建議用於經常存取的記錄) 存取控管 統一 (建議) 保護工具 選用:啟用物件版本管理或保留政策 點選「建立」。
收集 ESET Threat Intelligence TAXII 憑證
如要讓 Cloud Run 函式擷取威脅情資資料,您必須啟用 TAXII 動態饋給,並從 ETI 入口網站產生 TAXII 憑證。
啟用 TAXII 動態饋給
- 前往 https://eti.eset.com 登入 ESET Threat Intelligence 入口網站。
- 前往主選單中的「資料動態饋給」。
- 找出要啟用的資料動態饋給,然後按一下旁邊的三點圖示。
- 選取「啟用動態消息」。
針對要匯入 Google SecOps 的每個動態饋給,重複執行步驟 3 和 4。
產生 TAXII 憑證
- 在 ESET Threat Intelligence 入口網站中,依序前往「Admin Settings」>「Access Credentials」。
- 按一下「產生 TAXII 憑證」。
在隨即顯示的對話方塊中,複製並儲存下列值:
- 使用者名稱:您的 TAXII 使用者名稱
- 密碼:您的 TAXII 密碼
記錄 TAXII 動態饋給詳細資料
啟用動態饋給並產生憑證後,請記錄要擷取的每個動態饋給的下列資訊:
- 在 ESET Threat Intelligence 入口網站中,前往「Data Feeds」(資料動態饋給)。
- 按一下已啟用動態饋給旁的三點圖示。
- 選取「顯示資料動態饋給詳細資料」。
在側邊面板中,記下下列值:
- TAXII 動態饋給名稱:動態饋給 ID (例如
botnet stix 2.1) - TAXII 2 ID:集合 ID (例如
0abb06690b0b47e49cd7794396b76b20) - TAXII 2 動態消息網址:完整集合網址
- TAXII 動態饋給名稱:動態饋給 ID (例如
可用的 TAXII 動態饋給
ESET Threat Intelligence 提供下列 TAXII 2.1 動態消息:
動態饋給名稱 TAXII 動態饋給名稱 集合 ID Android 竊資軟體動態消息 androidinfostealer stix 2.1 9ee501cde0c44d6db4ae995fead1a7c8 Android 威脅動態消息 androidthreats stix 2.1 daf3de8fab144552a1cb5af054ed07ee APT IoC apt stix 2.1 97e3eb74ae5f46dd9e22f677a6938ee7 殭屍網路動態消息 botnet stix 2.1 0abb06690b0b47e49cd7794396b76b20 殭屍網路 - C&C botnet.cc stix 2.1 d1923a526e8f400dbb301259240ee3d5 殭屍網路 - 目標 botnet.target stix 2.1 61b6e4f9153e411ca7a9982a2c6ae788 加密貨幣詐騙動態消息 cryptoscam stix 2.1 2c183ce9551a43338c6cc2ed7c2a704d 網域動態消息 網域 STIX 2.1 a34aa0a4f9de419582a883863503f9c4 eCrime IoC 動態饋給 ecrime stix 2.1 08059376eac84ec4a076cfd682493f91 IP 動態饋給 ip stix 2.1 baaed2a92335418aa753fe944e13c23a 惡意電子郵件附件 emailattachments stix 2.1 c0d56cf7f81d482eb97fd46beaa4bae0 惡意檔案動態饋給 檔案 stix 2.1 ee6a153ed77e4ec3ab21e76cc2074b9f 網路釣魚網址動態饋給 phishingurl stix 2.1 d0a6c0f962dd4dd2b3eeb96b18612584 PUA 廣告軟體檔案動態饋給 puaadware stix 2.1 d1bfc81202fc4c6599326771ec2da41d PUA 雙用途應用程式檔案動態饋給 puadualapps stix 2.1 970a7d0039ac4668addf058cd9feb953 勒索軟體動態消息 勒索軟體 stix 2.1 8d3490d688ce4a989aee9af5c680d8bf 詐騙網址動態饋給 scamurl stix 2.1 2130adc3c67c43f9a3664b187931375e 網路釣魚動態消息 smishing stix 2.1 330ad7d0c736476babe5e49077b96c95 簡訊詐騙動態消息 smsscam stix 2.1 6e20217a2e1246b8ab11be29f759f716 網址動態饋給 url stix 2.1 1d3208c143be49da8130f5a66fd3a0fa
為 Cloud Run 函式建立服務帳戶
- 在 Google Cloud 控制台中,依序前往「IAM 與管理」>「服務帳戶」。
- 按一下「Create Service Account」(建立服務帳戶)。
請提供下列設定詳細資料:
- 服務帳戶名稱:輸入
eset-ti-collector - 服務帳戶說明:輸入
Service account for ESET Threat Intelligence Cloud Run function to write STIX objects to GCS
- 服務帳戶名稱:輸入
按一下「建立並繼續」。
在「將專案存取權授予這個服務帳戶」部分,新增下列角色:
- 按一下「選取角色」,然後搜尋並選取「Storage 物件管理員」。
- 按一下「新增其他角色」,然後搜尋並選取「Cloud Run Invoker」。
按一下「繼續」。
按一下 [完成]。
授予 Google Cloud Storage bucket 的 IAM 權限
- 依序前往「Cloud Storage」>「Buckets」。
- 按一下 bucket 名稱 (例如
eset-ti-logs)。 - 前往「權限」分頁標籤。
- 按一下「授予存取權」。
請提供下列設定詳細資料:
- 新增主體:輸入服務帳戶電子郵件地址 (例如
eset-ti-collector@PROJECT_ID.iam.gserviceaccount.com) - 指派角色:選取「Storage 物件管理員」
- 新增主體:輸入服務帳戶電子郵件地址 (例如
按一下 [儲存]。
建立 Pub/Sub 主題
當 Cloud Scheduler 發布訊息時,Pub/Sub 主題會觸發 Cloud Run 函式。
- 在 Google Cloud 控制台中,依序前往「Pub/Sub」>「Topics」(主題)。
- 按一下 [Create Topic] (建立主題)。
- 請提供下列設定詳細資料:
- 主題 ID:輸入
eset-ti-trigger - 新增預設訂閱項目:保持選取狀態
- 主題 ID:輸入
- 點選「建立」。
建立 Cloud Run 函式
- 前往 Google Cloud 控制台中的「Cloud Run functions」(Cloud Run 函式)。
- 按一下「Create function」(建立函式)。
請提供下列設定詳細資料:
設定 值 環境 第 2 代 函式名稱 eset-ti-collector區域 選取與 GCS 值區相同的區域 觸發條件類型 Cloud Pub/Sub Pub/Sub 主題 eset-ti-trigger分配的記憶體 512 MiB 逾時 540 秒 執行階段服務帳戶 eset-ti-collector點選「下一步」。
將「執行階段」設為「Python 3.12」。
將「Entry point」(進入點) 設為
main。在
requirements.txt檔案中新增下列依附元件:functions-framework==3.* google-cloud-storage==2.* urllib3==2.*在
main.py檔案中貼上下列程式碼:import functions_framework import json import os import logging import time import urllib3 from datetime import datetime, timedelta, timezone from google.cloud import storage logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) HTTP = urllib3.PoolManager(retries=False) storage_client = storage.Client() API_ROOT = "https://taxii.eset.com/taxii2/643f4eb5-f8b7-46a3-a606-6d61d5ce223a" TAXII_CONTENT_TYPE = "application/taxii+json;version=2.1" def _load_state(bucket_name: str, state_key: str, lookback_hours: int) -> str: """Return ISO8601 checkpoint (UTC).""" try: bucket = storage_client.bucket(bucket_name) blob = bucket.blob(state_key) if blob.exists(): state_data = blob.download_as_text() state = json.loads(state_data) ts = state.get("last_poll_time") if ts: logger.info(f"Loaded state: {ts}") return ts except Exception as e: logger.warning(f"State read error: {e}") default_ts = ( datetime.now(timezone.utc) - timedelta(hours=lookback_hours) ).strftime("%Y-%m-%dT%H:%M:%S.000Z") logger.info(f"No previous state found, using lookback: {default_ts}") return default_ts def _save_state(bucket_name: str, state_key: str, ts: str) -> None: """Persist the checkpoint to GCS.""" bucket = storage_client.bucket(bucket_name) blob = bucket.blob(state_key) blob.upload_from_string( json.dumps({"last_poll_time": ts}), content_type="application/json", ) logger.info(f"Saved state: {ts}") def _fetch_objects( username: str, password: str, collection_id: str, added_after: str, max_records: int, ) -> list: """Query TAXII 2.1 collection objects with pagination.""" url = f"{API_ROOT}/collections/{collection_id}/objects/" headers = urllib3.make_headers(basic_auth=f"{username}:{password}") headers["Accept"] = TAXII_CONTENT_TYPE headers["User-Agent"] = "Chronicle-ESET-TI-GCS/1.0" all_objects = [] params = {"added_after": added_after} while True: qs = "&".join(f"{k}={v}" for k, v in params.items()) request_url = f"{url}?{qs}" if qs else url for attempt in range(3): try: resp = HTTP.request("GET", request_url, headers=headers) break except Exception as e: wait = 2 ** (attempt + 1) logger.warning(f"Request error: {e}, retrying in {wait}s") time.sleep(wait) else: raise RuntimeError("Exceeded retry budget for TAXII API") if resp.status == 401: raise RuntimeError("Authentication failed: check TAXII credentials") if resp.status == 404: raise RuntimeError( f"Collection not found: {collection_id}" ) if resp.status not in (200, 206): raise RuntimeError( f"TAXII API error {resp.status}: {resp.data[:500]}" ) body = json.loads(resp.data.decode("utf-8")) objects = body.get("objects", []) all_objects.extend(objects) logger.info( f"Fetched {len(objects)} objects (total: {len(all_objects)})" ) if len(all_objects) >= max_records: logger.info(f"Reached max_records limit: {max_records}") all_objects = all_objects[:max_records] break more = body.get("more", False) next_param = body.get("next") if more and next_param: params = {"added_after": added_after, "next": next_param} else: break return all_objects @functions_framework.cloud_event def main(cloud_event): """Cloud Run function entry point triggered by Pub/Sub.""" bucket_name = os.environ["GCS_BUCKET"] prefix = os.environ.get("GCS_PREFIX", "eset-ti") state_key = os.environ.get("STATE_KEY", "eset-ti/state.json") username = os.environ["TAXII_USERNAME"] password = os.environ["TAXII_PASSWORD"] collection_id = os.environ["COLLECTION_ID"] max_records = int(os.environ.get("MAX_RECORDS", "10000")) lookback_hours = int(os.environ.get("LOOKBACK_HOURS", "48")) try: last_poll = _load_state(bucket_name, state_key, lookback_hours) objects = _fetch_objects( username, password, collection_id, last_poll, max_records ) if not objects: logger.info("No new STIX objects found") return "No new objects", 200 now_str = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S") blob_path = ( f"{prefix}/eset_ti_{collection_id}_{now_str}.json" ) ndjson_body = "\n".join( json.dumps(obj, separators=(",", ":")) for obj in objects ) bucket = storage_client.bucket(bucket_name) blob = bucket.blob(blob_path) blob.upload_from_string( ndjson_body, content_type="application/x-ndjson" ) new_poll_time = datetime.now(timezone.utc).strftime( "%Y-%m-%dT%H:%M:%S.000Z" ) _save_state(bucket_name, state_key, new_poll_time) msg = ( f"Wrote {len(objects)} STIX objects to " f"gs://{bucket_name}/{blob_path}" ) logger.info(msg) return msg, 200 except Exception as e: logger.error(f"Error collecting ESET TI: {e}") raise點選「Deploy」(部署)。
等待函式部署完成。部署完成後,狀態會變更為綠色勾號。
設定環境變數
- 函式部署完畢後,請前往「Cloud Run functions」> eset-ti-collector。
- 按一下「編輯及部署新的修訂版本」。
- 按一下「變數和密鑰」分頁標籤 (或展開「執行階段、建構作業、連線和安全性設定」,適用於第 1 代)。
新增下列環境變數:
鍵 範例值 GCS_BUCKETeset-ti-logsGCS_PREFIXeset-tiSTATE_KEYeset-ti/state.jsonTAXII_USERNAMEETI 入口網站的 TAXII 使用者名稱 TAXII_PASSWORDETI 入口網站的 TAXII 密碼 COLLECTION_ID0abb06690b0b47e49cd7794396b76b20MAX_RECORDS10000LOOKBACK_HOURS48點選「Deploy」(部署)。
建立 Cloud Scheduler 工作
Cloud Scheduler 會依排程將訊息發布至 Pub/Sub 主題,觸發 Cloud Run 函式輪詢 ESET Threat Intelligence,找出新的 STIX 物件。
- 前往 Google Cloud 控制台的「Cloud Scheduler」。
- 點選「建立工作」。
請提供下列設定詳細資料:
設定 值 名稱 eset-ti-poll區域 選取與函式相同的區域 頻率 0 */1 * * *(每小時)時區 選取時區 (例如 UTC)按一下「繼續」。
在「設定執行作業」部分:
- 目標類型:選取「Pub/Sub」
- 主題:選取
eset-ti-trigger - 訊息內文:輸入
{"poll": true}
點選「建立」。
驗證 Cloud Run 函式
- 在 Cloud Scheduler 中,找到
eset-ti-poll工作。 - 按一下「強制執行」,即可立即執行。
- 依序前往「Cloud Run functions」>「eset-ti-collector」>「Logs」。
檢查是否有下列記錄項目,確認函式是否成功執行:
Fetched 250 objects (total: 250) Wrote 250 STIX objects to gs://eset-ti-logs/eset-ti/eset_ti_0abb06690b0b47e49cd7794396b76b20_20250115_103000.json依序前往「Cloud Storage」>「Buckets」>「eset-ti-logs」。
前往
eset-ti/前置字元。確認系統是否正在建立含有 STIX 物件的 NDJSON 檔案。
擷取 Google SecOps 服務帳戶並設定動態饋給
Google SecOps 會使用專屬服務帳戶,從 GCS bucket 讀取資料。您必須授予這個服務帳戶值區存取權。
取得服務帳戶電子郵件地址
- 依序前往「SIEM 設定」>「動態饋給」。
- 按一下「新增動態消息」。
- 按一下「設定單一動態饋給」。
- 在「動態饋給名稱」欄位中輸入動態饋給名稱 (例如
ESET Threat Intelligence - Botnet)。 - 選取「Google Cloud Storage V2」做為「來源類型」。
- 選取「ESET Threat Intelligence」做為「記錄類型」。
按一下「取得服務帳戶」。系統會顯示不重複的服務帳戶電子郵件地址,例如:
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com複製這個電子郵件地址,以便在下一步中使用。
點選「下一步」。
指定下列輸入參數的值:
儲存空間 bucket URL:輸入 GCS bucket URI,並加上前置路徑:
gs://eset-ti-logs/eset-ti/來源刪除選項:根據偏好設定選取刪除選項:
- 永不:移轉後一律不刪除任何檔案 (建議用於測試)。
- 刪除已轉移的檔案:成功轉移檔案後刪除檔案。
刪除已轉移的檔案和空白目錄:成功轉移後刪除檔案和空白目錄。
檔案存在時間上限:包含在過去天數內修改的檔案 (預設為 180 天)
資產命名空間:資產命名空間
擷取標籤:要套用至這個動態饋給中事件的標籤 (例如
ESET_IOC)
點選「下一步」。
在「Finalize」(完成) 畫面中檢查新的動態饋給設定,然後按一下「Submit」(提交)。
將 IAM 權限授予 Google SecOps 服務帳戶
Google SecOps 服務帳戶必須具備 Google Cloud Storage bucket 的「Storage 物件檢視者」角色。
- 依序前往「Cloud Storage」>「Buckets」。
- 按一下 bucket 名稱 (例如
eset-ti-logs)。 - 前往「權限」分頁標籤。
- 按一下「授予存取權」。
- 請提供下列設定詳細資料:
- 新增主體:貼上 Google SecOps 服務帳戶電子郵件地址
- 指派角色:選取「Storage 物件檢視者」
按一下 [儲存]。
UDM 對應表
| 記錄欄位 | UDM 對應 | 邏輯 |
|---|---|---|
| 時間 | metadata.event_timestamp | 事件發生的時間戳記 |
| metadata.event_type | 事件類型 (例如 USER_LOGIN、NETWORK_CONNECTION) | |
| messageid | metadata.id | 事件的專屬 ID |
| 通訊協定 | network.ip_protocol | IP 通訊協定 (例如 TCP、UDP) |
| deviceName | principal.hostname | 來源主機名稱 |
| srcAddr | principal.ip | 連線的來源 IP 位址 |
| srcPort | principal.port | 來源通訊埠號碼 |
| 動作 | security_result.action | 安全性產品採取的動作 (例如 ALLOW、BLOCK) |
| dstAddr | target.ip | 目的地 IP 位址 |
| dstPort | target.port | 目的地通訊埠號碼 |
| metadata.product_name | 產品名稱 | |
| metadata.vendor_name | 供應商/公司名稱 |
還有其他問題嗎?向社群成員和 Google SecOps 專業人員尋求答案。