收集 Dataminr Alerts 記錄

支援的國家/地區:

本文說明如何使用 Google Cloud Storage V2、Cloud Run 函式和 Cloud Scheduler,將 Dataminr Alerts 記錄檔擷取至 Google Security Operations。

Dataminr Pulse 運用 AI 技術,從全球超過 50 萬個公開資料來源 (包括深層和暗網) 取得即時情報。這個平台會針對影響貴機構和第三方的網路威脅、安全漏洞、勒索軟體攻擊、資料侵害和數位風險,提供早期預警。Dataminr Pulse API 使用 OAuth 2.0 用戶端憑證驗證和游標式分頁,擷取快訊。

事前準備

請確認您已完成下列事前準備事項:

  • Google SecOps 執行個體
  • 已啟用下列 API 的 Google Cloud 專案:
    • Cloud Storage API
    • Cloud Run 函式 API
    • Cloud Scheduler API
    • Cloud Pub/Sub API
  • 建立及管理 GCS 值區、Cloud Run 函式、Pub/Sub 主題和 Cloud Scheduler 工作的權限
  • 管理 Google Cloud Storage 值區 IAM 政策的權限
  • 已啟用 API 存取權的有效 Dataminr Pulse 帳戶
  • Dataminr Pulse API 憑證 (用戶端 ID 和用戶端密鑰)
  • Dataminr 帳戶中至少設定一個 Dataminr Pulse 快訊清單

建立 Google Cloud Storage 值區

  1. 前往 Google Cloud Console
  2. 選取專案或建立新專案。
  3. 在導覽選單中,依序前往「Cloud Storage」>「Bucket」
  4. 按一下「建立值區」
  5. 請提供下列設定詳細資料:

    設定
    為 bucket 命名 輸入全域不重複的名稱 (例如 dataminr-alert-logs)
    位置類型 根據需求選擇 (區域、雙區域、多區域)
    位置 選取位置 (例如 us-central1)
    儲存空間級別 標準 (建議用於經常存取的記錄)
    存取控管 統一 (建議)
    保護工具 選用:啟用物件版本管理或保留政策
  6. 點選「建立」

收集 Dataminr 憑證

如要讓 Cloud Run 函式擷取快訊資料,您需要向 Dataminr 帳戶代表索取 API 憑證,並使用 OAuth 2.0 用戶端憑證驗證。

取得 API 憑證

  1. 如要要求 API 存取權,請與 Dataminr 帳戶代表或支援團隊聯絡。
  2. 請提供下列資訊:
    • 貴機構的名稱
    • 用途:與 Google Chronicle SIEM 整合
    • 必要存取權:Dataminr Pulse API for Cyber Risk
  3. Dataminr 會提供 API 憑證,並提供下列資訊:

    • 用戶端 ID:不重複的 OAuth 2.0 用戶端 ID
    • 用戶端密鑰:您的 OAuth 2.0 用戶端密鑰

驗證 API 憑證

  • 如要確認憑證是否有效,請執行下列指令:

    curl -X POST https://gateway.dataminr.com/auth/2/token \
      -H "Content-Type: application/x-www-form-urlencoded" \
      -d "client_id=YOUR_CLIENT_ID&client_secret=YOUR_CLIENT_SECRET&grant_type=api_key"
    

    如果回應成功,會傳回包含 access_token 欄位的 JSON 物件:

    {
      "access_token": "eyJhbGciOiJSUzI1NiIsInR5cCI...",
      "token_type": "Bearer",
      "expire": 3600
    }
    

收集快訊清單 ID

  1. 前往 https://app.dataminr.com 登入 Dataminr Pulse 網頁應用程式。
  2. 前往已設定的快訊清單 (監控清單)。
  3. 記下要匯入 Google SecOps 的快訊清單 ID。

為 Cloud Run 函式建立服務帳戶

  1. Google Cloud 控制台中,依序前往「IAM 與管理」> 服務帳戶
  2. 按一下「Create Service Account」(建立服務帳戶)
  3. 請提供下列設定詳細資料:
    • 服務帳戶名稱:輸入 dataminr-alert-collector
    • 服務帳戶說明:輸入 Service account for Dataminr Alerts Cloud Run function to write alert data to GCS
  4. 按一下「建立並繼續」
  5. 在「將專案存取權授予這個服務帳戶」部分,新增下列角色:
    1. 按一下「選取角色」,然後搜尋並選取「Storage 物件管理員」
    2. 按一下「新增其他角色」,然後搜尋並選取「Cloud Run Invoker」
  6. 按一下「繼續」
  7. 按一下 [完成]

授予 GCS 值區的 IAM 權限

  1. 依序前往「Cloud Storage」>「Buckets」
  2. 按一下 Bucket 名稱 (例如 dataminr-alert-logs)。
  3. 前往「權限」分頁標籤。
  4. 按一下「授予存取權」
  5. 請提供下列設定詳細資料:
    • 新增主體:輸入服務帳戶電子郵件地址 (例如 dataminr-alert-collector@PROJECT_ID.iam.gserviceaccount.com)。
    • 指派角色:選取「Storage 物件管理員」
  6. 按一下 [儲存]

建立 Pub/Sub 主題

當 Cloud Scheduler 發布訊息時,Pub/Sub 主題會觸發 Cloud Run 函式。

  1. Google Cloud 控制台中,依序前往「Pub/Sub」>「Topics」
  2. 按一下 [Create Topic] (建立主題)
  3. 請提供下列設定詳細資料:
    • 主題 ID:輸入 dataminr-alert-trigger
    • 新增預設訂閱項目:保持勾選
  4. 點選「建立」

建立 Cloud Run 函式

  1. 前往 Google Cloud 控制台的「Cloud Run functions」(Cloud Run 函式)
  2. 按一下「Create function」(建立函式)
  3. 請提供下列設定詳細資料:

    設定
    環境 第 2 代
    函式名稱 dataminr-alert-collector
    區域 選取與 GCS 值區相同的區域
    觸發條件類型 Cloud Pub/Sub
    Pub/Sub 主題 dataminr-alert-trigger
    分配的記憶體 512 MiB
    逾時 540 秒
    執行階段服務帳戶 dataminr-alert-collector
  4. 點選「下一步」

  5. 將「執行階段」設為「Python 3.12」

  6. 將「Entry point」(進入點) 設為 main

  7. requirements.txt 檔案中新增下列依附元件:

    functions-framework==3.*
    google-cloud-storage==2.*
    requests==2.*
    
  8. main.py 檔案中貼上下列程式碼:

    import functions_framework
    import json
    import os
    import logging
    import time
    from datetime import datetime, timedelta, timezone
    from google.cloud import storage
    import requests
    
    logger = logging.getLogger(__name__)
    logger.setLevel(logging.INFO)
    
    storage_client = storage.Client()
    
    TOKEN_URL = "https://gateway.dataminr.com/auth/2/token"
    ALERTS_URL = "https://gateway.dataminr.com/api/3/alerts"
    
    def _get_access_token(client_id: str, client_secret: str) -> str:
        """Obtain an OAuth 2.0 access token from Dataminr."""
        payload = {
            "client_id": client_id,
            "client_secret": client_secret,
            "grant_type": "api_key",
        }
        headers = {"Content-Type": "application/x-www-form-urlencoded"}
        resp = requests.post(TOKEN_URL, data=payload, headers=headers, timeout=30)
        resp.raise_for_status()
        token_data = resp.json()
        access_token = token_data.get("access_token")
        if not access_token:
            raise ValueError("No access_token in token response")
        logger.info("Successfully obtained Dataminr access token.")
        return access_token
    
    def _load_state(bucket_name: str, state_key: str) -> dict:
        """Load the last cursor (alertId) from GCS."""
        try:
            bucket = storage_client.bucket(bucket_name)
            blob = bucket.blob(state_key)
            if blob.exists():
                data = json.loads(blob.download_as_text())
                logger.info(f"Loaded state: {data}")
                return data
        except Exception as e:
            logger.warning(f"State read error: {e}")
        logger.info("No previous state found.")
        return {}
    
    def _save_state(bucket_name: str, state_key: str, state: dict) -> None:
        """Save the cursor state to GCS."""
        bucket = storage_client.bucket(bucket_name)
        blob = bucket.blob(state_key)
        blob.upload_from_string(
            json.dumps(state), content_type="application/json"
        )
        logger.info(f"Saved state: {state}")
    
    def _fetch_alerts(
        access_token: str,
        alert_lists: str,
        page_size: int,
        cursor: str = None,
    ) -> tuple:
        """Fetch a page of alerts from the Dataminr Pulse API."""
        headers = {
            "Authorization": f"Bearer {access_token}",
            "Accept": "application/json",
        }
        params = {
            "lists": alert_lists,
            "num": page_size,
        }
        if cursor:
            params["from"] = cursor
    
        resp = requests.get(
            ALERTS_URL, headers=headers, params=params, timeout=60
        )
    
        # Handle rate limiting via response headers
        rate_remaining = resp.headers.get("x-ratelimit-remaining")
        rate_reset = resp.headers.get("x-ratelimit-reset")
    
        if resp.status_code == 429:
            reset_time = int(rate_reset) if rate_reset else 60
            wait_seconds = max(reset_time - int(time.time()), 1)
            logger.warning(
                f"Rate limited. Waiting {wait_seconds}s before retry."
            )
            time.sleep(wait_seconds)
            resp = requests.get(
                ALERTS_URL, headers=headers, params=params, timeout=60
            )
    
        resp.raise_for_status()
    
        if rate_remaining is not None:
            logger.info(
                f"Rate limit remaining: {rate_remaining}, reset: {rate_reset}"
            )
    
        data = resp.json()
        alerts = data if isinstance(data, list) else data.get("data", [])
        return alerts
    
    @functions_framework.cloud_event
    def main(cloud_event):
        """Cloud Run function entry point triggered by Pub/Sub."""
        bucket_name = os.environ["GCS_BUCKET"]
        prefix = os.environ.get("GCS_PREFIX", "dataminr_alerts")
        state_key = os.environ.get("STATE_KEY", "dataminr_state/cursor.json")
        client_id = os.environ["CLIENT_ID"]
        client_secret = os.environ["CLIENT_SECRET"]
        alert_lists = os.environ["ALERT_LISTS"]
        max_records = int(os.environ.get("MAX_RECORDS", "1000"))
        page_size = min(int(os.environ.get("PAGE_SIZE", "40")), 40)
        lookback_hours = int(os.environ.get("LOOKBACK_HOURS", "24"))
    
        try:
            access_token = _get_access_token(client_id, client_secret)
            state = _load_state(bucket_name, state_key)
            cursor = state.get("last_cursor")
            is_first_run = cursor is None
    
            all_alerts = []
            total_fetched = 0
            pages_fetched = 0
    
            while total_fetched < max_records:
                logger.info(
                    f"Fetching page {pages_fetched + 1} (cursor: {cursor})..."
                )
                alerts = _fetch_alerts(
                    access_token, alert_lists, page_size, cursor=cursor
                )
    
                if not alerts:
                    logger.info("No more alerts returned. Stopping pagination.")
                    break
    
                # Filter by lookback window on first run (no prior cursor)
                if is_first_run:
                    cutoff_ms = int(
                        (
                            datetime.now(timezone.utc)
                            - timedelta(hours=lookback_hours)
                        ).timestamp()
                        * 1000
                    )
                    alerts = [
                        a for a in alerts if a.get("eventTime", 0) >= cutoff_ms
                    ]
    
                all_alerts.extend(alerts)
                total_fetched += len(alerts)
                pages_fetched += 1
    
                # Update cursor to the last alertId in this page
                last_alert = alerts[-1] if alerts else None
                if last_alert and "alertId" in last_alert:
                    cursor = last_alert["alertId"]
                else:
                    break
    
                # Stop if we received fewer alerts than requested
                if len(alerts) < page_size:
                    logger.info("Received partial page. Stopping pagination.")
                    break
    
            logger.info(
                f"Collected {len(all_alerts)} alerts across {pages_fetched} pages."
            )
    
            if not all_alerts:
                logger.info("No new alerts to write.")
                return "No new alerts", 200
    
            # Write alerts as NDJSON to GCS
            now_str = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
            blob_path = f"{prefix}/{now_str}.ndjson"
            ndjson_body = "\n".join(
                json.dumps(alert, separators=(",", ":")) for alert in all_alerts
            )
    
            bucket = storage_client.bucket(bucket_name)
            blob = bucket.blob(blob_path)
            blob.upload_from_string(
                ndjson_body, content_type="application/x-ndjson"
            )
    
            _save_state(
                bucket_name,
                state_key,
                {
                    "last_cursor": cursor,
                    "last_run": datetime.now(timezone.utc).isoformat(),
                },
            )
    
            msg = (
                f"Wrote {len(all_alerts)} alerts to "
                f"gs://{bucket_name}/{blob_path}"
            )
            logger.info(msg)
            return msg, 200
    
        except Exception as e:
            logger.error(f"Error collecting Dataminr alerts: {e}")
            raise
    
  9. 點選「Deploy」(部署)

  10. 等待函式部署完成。部署完成後,狀態會變更為綠色勾號。

設定環境變數

  1. 函式部署完畢後,請前往「Cloud Run Functions」> dataminr-alert-collector
  2. 按一下「編輯及部署新的修訂版本」
  3. 按一下「變數和密鑰」分頁標籤 (或展開「執行階段、建構作業、連線和安全性設定」,適用於第 1 代)。
  4. 新增下列環境變數:

    範例值
    GCS_BUCKET dataminr-alert-logs
    GCS_PREFIX dataminr_alerts
    STATE_KEY dataminr_state/cursor.json
    CLIENT_ID 您的 Dataminr OAuth 2.0 用戶端 ID
    CLIENT_SECRET 您的 Dataminr OAuth 2.0 用戶端密鑰
    ALERT_LISTS 以半形逗號分隔的 Dataminr 快訊清單 ID
    MAX_RECORDS 1000
    PAGE_SIZE 40
    LOOKBACK_HOURS 24
  5. 點選「Deploy」(部署)

建立 Cloud Scheduler 工作

Cloud Scheduler 會依排程將訊息發布至 Pub/Sub 主題,觸發 Cloud Run 函式輪詢 Dataminr Pulse 的新快訊。

  1. 前往 Google Cloud 控制台中的「Cloud Scheduler」
  2. 點選「建立工作」
  3. 請提供下列設定詳細資料:

    設定
    名稱 dataminr-alert-poll
    區域 選取與函式相同的區域
    頻率 */5 * * * * (每 5 分鐘)
    時區 選取時區 (例如 UTC)
  4. 按一下「繼續」

  5. 在「設定執行作業」部分:

    • 目標類型:選取「Pub/Sub」
    • 主題:選取 dataminr-alert-trigger
    • 訊息內文:輸入 {"poll": true}
  6. 點選「建立」

驗證 Cloud Run 函式

  1. Cloud Scheduler 中找到 dataminr-alert-poll 工作。
  2. 按一下「強制執行」,即可立即執行。
  3. 依序前往「Cloud Run Functions」>「dataminr-alert-collector」>「Logs」
  4. 檢查是否有下列記錄項目,確認函式是否成功執行:

    Successfully obtained Dataminr access token.
    Fetching page 1 (cursor: None)...
    Collected 35 alerts across 1 pages.
    Wrote 35 alerts to gs://dataminr-alert-logs/dataminr_alerts/20250115T103000Z.ndjson
    
  5. 依序前往「Cloud Storage」>「Buckets」>「dataminr-alert-logs」

  6. 前往 dataminr_alerts/ 前置字元。

  7. 確認系統是否正在使用 Dataminr 快訊資料建立 NDJSON 檔案。

擷取 Google SecOps 服務帳戶並設定動態饋給

Google SecOps 會使用專屬服務帳戶,從 GCS bucket 讀取資料。您必須授予這個服務帳戶值區存取權。

取得服務帳戶電子郵件地址

  1. 依序前往「SIEM 設定」>「動態饋給」
  2. 按一下「新增動態消息」
  3. 按一下「設定單一動態饋給」
  4. 在「動態饋給名稱」欄位中輸入動態饋給名稱 (例如 Dataminr Alerts)。
  5. 選取「Google Cloud Storage V2」做為「來源類型」
  6. 選取「Dataminr 快訊」做為「記錄類型」
  7. 按一下「取得服務帳戶」
  8. 系統會顯示不重複的服務帳戶電子郵件地址,例如:

    chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com
    
  9. 複製這個電子郵件地址,以便在下一節中使用。

  10. 點選「下一步」

  11. 指定下列輸入參數的值:

    • 儲存空間 bucket URL:輸入 GCS bucket URI,並加上前置路徑:

      gs://dataminr-alert-logs/dataminr_alerts/
      
    • 來源刪除選項:根據偏好設定選取刪除選項:

      • 永不:移轉後一律不刪除任何檔案 (建議用於測試)。
      • 刪除已轉移的檔案:成功轉移檔案後刪除檔案。
      • 刪除已轉移的檔案和空白目錄:成功轉移檔案後,刪除檔案和空白目錄。

    • 檔案存在時間上限:包含在過去天數內修改的檔案 (預設為 180 天)。

    • 資產命名空間資產命名空間

    • 擷取標籤:要套用至這個動態饋給事件的標籤 (例如 DATAMINR_ALERT)。

  12. 點選「下一步」

  13. 在「Finalize」(完成) 畫面中檢查新的動態饋給設定,然後按一下「Submit」(提交)

將 IAM 權限授予 Google SecOps 服務帳戶

Google SecOps 服務帳戶需要 GCS bucket 的「Storage 物件檢視者」角色。

  1. 依序前往「Cloud Storage」>「Buckets」
  2. 按一下 Bucket 名稱 (例如 dataminr-alert-logs)。
  3. 前往「權限」分頁標籤。
  4. 按一下「授予存取權」
  5. 請提供下列設定詳細資料:
    • 新增主體:貼上 Google SecOps 服務帳戶電子郵件地址。
    • 指派角色:選取「Storage 物件檢視者」
  6. 按一下 [儲存]

UDM 對應表

記錄欄位 UDM 對應 邏輯
alertId metadata.product_log_id 直接複製值
alertType.color about.labels.alertType_color 直接複製值
alertType.id about.labels.alertType_id 直接複製值
alertType.name about.labels.alertType_name 直接複製值
availableRelatedAlerts about.labels.availableRelatedAlerts 已轉換為字串
說明文字 metadata.description 直接複製值
cat.name security_result.category_details 直接複製值
cat.id security_result.detection_fields.categories_id 直接複製值
cat.idStr security_result.detection_fields.categories_idStr 直接複製值
cat.path security_result.detection_fields.categories_path 直接複製值
cat.requested security_result.detection_fields.categories_requested 直接複製值
cat.retired security_result.detection_fields.categories_retired 已轉換為字串
cat.topicType about.labels.categories_topicType 直接複製值
cat.name security_result.category 如果 cat.name ==「Cybersecurity - Policy」,請設為 POLICY_VIOLATION;如果 cat.name 位於 ["Cybersecurity - Threats & Vulnerabilities", "Cybersecurity - Crime & Malicious Activity", "Threats & Precautions", "Threats"] 中,請設為 NETWORK_MALICIOUS;如果 cat.name =~「Cybersecurity」,請設為 NETWORK_SUSPICIOUS;如果 cat.name =~「Email and Web Servers」,請設為 MAIL_PHISHING;如果 cat.name =~「Data Exposure and Breaches」,請設為 DATA_EXFILTRATION;如果 cat.name =~「Government, Policy, & Political Affairs」,請設為 POLICY_VIOLATION;如果 cat.name =~「(Malware
comp.dm_bucket.name security_result.about.resource.attribute.labels.dmbucket%{bucket.id} 直接複製值
comp.dm_sector.name security_result.about.resource.attribute.labels.dmsector%{sector.id} 直接複製值
comp.id security_result.about.resource.attribute.labels.companies_id 直接複製值
comp.idStr security_result.about.resource.attribute.labels.companies_idStr 直接複製值
comp.locations.city security_result.about.location.city 如果 loc_index == 0,則為 loc.city 的值
comp.locations.country、comp.locations.state.symbol security_result.about.location.country_or_region 如果 loc_index == 0 且兩者皆不為空,則串連為 %{loc.country} - %{loc.state.symbol}
comp.locations.postalCode security_result.about.resource.attribute.labels.locations_postalCode 如果 loc_index == 0 且不為空白,則直接複製值
comp.locations.state.name security_result.about.location.state 如果 loc_index == 0,則直接複製值
comp.locations.city about.labels.loc_%{loc_index}_city 如果 loc_index != 0 且不為空白,則直接複製值
comp.locations.country、comp.locations.state.symbol about.labels.loc_%{loc_index}_country_or_region 如果 loc_index != 0 且兩者皆不為空,則會串連為「%{loc.country} - %{loc.state.symbol}」
comp.locations.postalCode securityresult.about.resource.attribute.labels.locations%{loc_index}_postalCode 如果 loc_index != 0 且不為空白,則直接複製值
comp.locations.state.name about.labels.loc_%{loc_index}_state_name 如果 loc_index != 0 且不為空白,則直接複製值
comp.name security_result.about.resource.name 直接複製值
comp.requested security_result.about.resource.attribute.labels.companies_requested 直接複製值
comp.retired security_result.about.resource.attribute.labels.companies_retired 已轉換為字串
comp.ticker security_result.about.resource.attribute.labels.companies_ticker 直接複製值
comp.topicType security_result.about.resource.attribute.labels.companies_topicType 直接複製值
eventLocation.coordinates.0 principal.location.region_coordinates.latitude 直接複製值
eventLocation.coordinates.1 principal.location.region_coordinates.longitude 直接複製值
eventLocation.name principal.location.name 直接複製值
eventLocation.places principal.labels.location_places 以半形逗號分隔的陣列
eventLocation.probability principal.labels.eventLocation_probability 已轉換為字串
eventLocation.radius principal.labels.eventLocation_radius 已轉換為字串
eventMapLargeURL principal.labels.eventMapLargeURL 直接複製值
eventMapSmallURL principal.labels.eventMapSmallURL 直接複製值
eventTime @timestamp 從 Epoch 毫秒轉換為時間戳記
eventVolume about.labels.eventVolume 已轉換為字串
expandAlertURL metadata.url_back_to_product 直接複製值
expandMapURL principal.labels.expandMapURL 直接複製值
headerColor about.labels.headerColor 直接複製值
headerLabel about.labels.headerLabel 直接複製值
metadata.cyber.addresses.ip principal.ip 如果 index == 0,則使用 grok 模式擷取
metadata.cyber.addresses.port principal.port 如果 index == 0,則直接複製值,否則轉換為整數
metadata.cyber.addresses.port principal.labels.addresses_%{index}_port 如果索引 != 0,則直接複製值
metadata.cyber.addresses.version principal.labels.metadata_cyberaddresses%{index}_version 直接複製值
metadata.cyber.asns network.asn 如果索引 == 0,則直接複製值
metadata.cyber.asns about.labels.metadatacyber%{index}_asn 如果索引 != 0,則直接複製值
metadata.cyber.hashValues.value security_result.about.file.sha1 如果 type == SHA1,則直接複製值 (小寫)
metadata.cyber.hashValues.value security_result.about.file.sha256 如果 type == SHA256,則直接複製值 (小寫)
metadata.cyber.malwares security_result.associations.name 直接複製值
metadata.cyber.malwares security_result.associations.type 設為 MALWARE
metadata.cyber.orgs network.organization_name 如果索引 == 0,則直接複製值
metadata.cyber.orgs about.labels.metadatacyber%{index}_orgs 如果索引 != 0,則直接複製值
metadata.cyber.products principal.application 如果索引 == 0,則直接複製值
metadata.cyber.products principal.labels.metadata_cyberproducts%{index} 如果索引 != 0,則直接複製值
metadata.cyber.threats security_result.threat_name 如果索引 == 0,則直接複製值
metadata.cyber.threats security_result.about.labels.metadata_cyberthreats%{index} 如果索引 != 0,則直接複製值
metadata.cyber.URLs security_result.about.url 如果索引 == 0,則直接複製值
metadata.cyber.URLs securityresult.about.labels.url%{index} 如果索引 != 0,則直接複製值
metadata.cyber.malwares.0 security_result.category 如果存在,則設為 SOFTWARE_MALICIOUS
metadata.cyber.vulnerabilities.cvss extensions.vulns.vulnerabilities.cvss_base_score 直接複製值
metadata.cyber.vulnerabilities.exploitPocLinks extensions.vulns.vulnerabilities.cve_description 從陣列中加入,並以「 n」分隔符號分隔
metadata.cyber.vulnerabilities.id extensions.vulns.vulnerabilities.cve_id 直接複製值
metadata.cyber.vulnerabilities.products.productName extensions.vulns.vulnerabilities.about.application 如果索引 == 0,則直接複製值
metadata.cyber.vulnerabilities.products.productVendor extensions.vulns.vulnerabilities.vendor 如果索引 == 0,則直接複製值
metadata.cyber.vulnerabilities.products.productVersion extensions.vulns.vulnerabilities.about.platform_version 如果索引 == 0,則直接複製值,並移除空格
metadata.cyber.vulnerabilities.products.productName extensions.vulns.vulnerabilities.about.labels.productName_%{index} 如果索引 != 0,則直接複製值
metadata.cyber.vulnerabilities.products.productVendor extensions.vulns.vulnerabilities.about.labels.productVendor_%{index} 如果索引 != 0,則直接複製值
metadata.cyber.vulnerabilities.products.productVersion extensions.vulns.vulnerabilities.about.labels.productVersion_%{index} 如果索引 != 0,則直接複製值,並移除空格
parentAlertId about.labels.parentAlertId 直接複製值
post.languages.lang target.labels.post_languageslang%{index} 直接複製值
post.languages.position target.labels.post_languagesposition%{index} 已轉換為字串
post.link target.labels.post_link 直接複製值
post.media.link principal.resource.name 如果索引 == 0,則直接複製值
post.media.description target.resource.attribute.labels.post_media_description 如果索引 == 0,則直接複製值
post.media.display_url target.resource.attribute.labels.post_media_display_url 如果索引 == 0,則直接複製值
post.media.isSafe target.resource.attribute.labels.post_media_isSafe 如果索引 == 0,則轉換為字串
post.media.media_url target.resource.attribute.labels.post_media_media_url 如果索引 == 0,則直接複製值
post.media.sizes.large.h target.resource.attribute.labels.post_media_sizes_large_h 如果索引 == 0,則轉換為字串
post.media.sizes.large.resize target.resource.attribute.labels.post_media_sizes_large_resize 如果索引 == 0,則直接複製值
post.media.sizes.large.w target.resource.attribute.labels.post_media_sizes_large_w 如果索引 == 0,則轉換為字串
post.media.sizes.medium.h target.resource.attribute.labels.post_media_sizes_medium_h 如果索引 == 0,則轉換為字串
post.media.sizes.medium.resize target.resource.attribute.labels.post_media_sizes_medium_resize 如果索引 == 0,則直接複製值
post.media.sizes.medium.w target.resource.attribute.labels.post_media_sizes_medium_w 如果索引 == 0,則轉換為字串
post.media.sizes.small.h target.resource.attribute.labels.post_media_sizes_small_h 如果索引 == 0,則轉換為字串
post.media.sizes.small.resize target.resource.attribute.labels.post_media_sizes_small_resize 如果索引 == 0,則直接複製值
post.media.sizes.small.w target.resource.attribute.labels.post_media_sizes_small_w 如果索引 == 0,則轉換為字串
post.media.sizes.thumb.h target.resource.attribute.labels.post_media_sizes_thumb_h 如果索引 == 0,則轉換為字串
post.media.sizes.thumb.resize target.resource.attribute.labels.post_media_sizes_thumb_resize 如果索引 == 0,則直接複製值
post.media.sizes.thumb.w target.resource.attribute.labels.post_media_sizes_thumb_w 如果索引 == 0,則轉換為字串
post.media.source target.resource.attribute.labels.post_media_source 如果索引 == 0,則直接複製值
post.media.thumbnail target.resource.attribute.labels.post_media_thumbnail 如果索引 == 0,則直接複製值
post.media.title target.resource.attribute.labels.post_media_title 如果索引 == 0,則直接複製值
post.media.url target.resource.attribute.labels.post_media_url 如果索引 == 0,則直接複製值
post.media.video_info.duration_millis target.resource.attribute.labels.post_media_video_info_duration_millis 如果索引 == 0,則轉換為字串
post.media.video_info.aspect_ratio target.resource.attribute.labels.post_media_video_info_aspect_ratio 如果索引為 0,則串連為 %{med.video_info.aspect_ratio.0}、%{med.video_info.aspect_ratio.1}
post.media.video_info.variants.bitrate target.resource.attribute.labels.post_media_video_info_variantsbitrate%{var_index} 已轉換為字串
post.media.video_info.variants.content_type target.resource.attribute.labels.post_media_video_info_variants_contenttype%{var_index} 直接複製值
post.media.video_info.variants.url target.resource.attribute.labels.post_media_video_info_variantsurl%{var_index} 直接複製值
post.media.type principal.resource.resource_subtype 如果索引 == 0,則直接複製值
post.media.link about.resource.name 如果索引 != 0,則直接複製值
post.media.description about.resource.attribute.labels.post_media_description 如果索引 != 0,則直接複製值
post.media.display_url about.resource.attribute.labels.post_media_display_url 如果索引 != 0,則直接複製值
post.media.isSafe about.resource.attribute.labels.post_media_isSafe 如果索引 != 0,則轉換為字串
post.media.media_url about.resource.attribute.labels.post_media_media_url 如果索引 != 0,則直接複製值
post.media.sizes.large.h about.resource.attribute.labels.post_media_sizes_large_h 如果索引 != 0,則轉換為字串
post.media.sizes.large.resize about.resource.attribute.labels.post_media_sizes_large_resize 如果索引 != 0,則直接複製值
post.media.sizes.large.w about.resource.attribute.labels.post_media_sizes_large_w 如果索引 != 0,則轉換為字串
post.media.sizes.medium.h about.resource.attribute.labels.post_media_sizes_medium_h 如果索引 != 0,則轉換為字串
post.media.sizes.medium.resize about.resource.attribute.labels.post_media_sizes_medium_resize 如果索引 != 0,則直接複製值
post.media.sizes.medium.w about.resource.attribute.labels.post_media_sizes_medium_w 如果索引 != 0,則轉換為字串
post.media.sizes.small.h about.resource.attribute.labels.post_media_sizes_small_h 如果索引 != 0,則轉換為字串
post.media.sizes.small.resize about.resource.attribute.labels.post_media_sizes_small_resize 如果索引 != 0,則直接複製值
post.media.sizes.small.w about.resource.attribute.labels.post_media_sizes_small_w 如果索引 != 0,則轉換為字串
post.media.sizes.thumb.h about.resource.attribute.labels.post_media_sizes_thumb_h 如果索引 != 0,則轉換為字串
post.media.sizes.thumb.resize about.resource.attribute.labels.post_media_sizes_thumb_resize 如果索引 != 0,則直接複製值
post.media.sizes.thumb.w about.resource.attribute.labels.post_media_sizes_thumb_w 如果索引 != 0,則轉換為字串
post.media.source about.resource.attribute.labels.post_media_source 如果索引 != 0,則直接複製值
post.media.thumbnail about.resource.attribute.labels.post_media_thumbnail 如果索引 != 0,則直接複製值
post.media.title about.resource.attribute.labels.post_media_title 如果索引 != 0,則直接複製值
post.media.url about.resource.attribute.labels.post_media_url 如果索引 != 0,則直接複製值
post.media.video_info.duration_millis about.resource.attribute.labels.post_media_video_info_duration_millis 如果索引 != 0,則轉換為字串
post.media.video_info.aspect_ratio about.resource.attribute.labels.post_media_video_info_aspect_ratio 如果索引不等於 0,則串連為 %{med.video_info.aspect_ratio.0}、%{med.video_info.aspect_ratio.1}
post.media.video_info.variants.bitrate about.resource.attribute.labels.post_media_video_info_variantsbitrate%{var_index} 已轉換為字串
post.media.video_info.variants.content_type about.resource.attribute.labels.post_media_video_info_variants_contenttype%{var_index} 直接複製值
post.media.video_info.variants.url about.resource.attribute.labels.post_media_video_info_variantsurl%{var_index} 直接複製值
post.media.type about.resource.resource_subtype 如果索引 != 0,則直接複製值
post.translatedText target.labels.post_translatedText 直接複製值
post.text target.labels.post_text 直接複製值
post.timestamp target.resource.attribute.creation_time 從 Epoch 毫秒轉換為時間戳記
publisherCategory.color target.labels.publisherCategory_color 直接複製值
publisherCategory.name target.labels.publisherCategory_name 直接複製值
publisherCategory.shortName target.labels.publisherCategory_shortName 直接複製值
relatedTerms.url principal.labels.relatedTerms_%{terms.text} 直接複製值
relatedTermsQueryURL principal.labels.relatedTermsQueryURL 直接複製值
sect.id about.labels.sectors_id 直接複製值
sect.idStr about.labels.sectors_idStr 直接複製值
sect.name about.labels.sectors_name 直接複製值
sect.retired about.labels.sectors_retired 已轉換為字串
sect.topicType about.labels.sectors_topicType 直接複製值
source.channels.0 principal.application 直接複製值
source.displayName principal.user.user_display_name 直接複製值
source.link principal.url 直接複製值
source.verified principal.labels.source_verified 已轉換為字串
subCaption.bullets.content about.labels.subCaption_bullets_content 直接複製值
subCaption.bullets.media about.labels.subCaption_bullets_media 直接複製值
subCaption.bullets.source about.labels.subCaption_bullets_source 直接複製值
watchlist.id about.labels.watchlistsMatchedByType_id 直接複製值
watchlist.externalTopicIds about.labels.watchlistsMatchedByType_externalTopicIds 以半形逗號分隔的陣列
watchlist.name about.labels.watchlistsMatchedByType_name 直接複製值
watchlist.type about.labels.watchlistsMatchedByType_type 直接複製值
watchlist.userProperties.omnilist about.labels.watchlistsMatchedByType_userProperties_omnilist 直接複製值
watchlist.userProperties.uiListType about.labels.watchlistsMatchedByType_userProperties_uiListType 直接複製值
watchlist.userProperties.watchlistColor about.labels.watchlistsMatchedByType_userProperties_watchlistColor 直接複製值
watchlist.locationGroups.locations.id about.labels.watchlistsMatchedByTypelocationGroups%{lg_i}_locationsid%{loc_i} 直接複製值
watchlist.locationGroups.locations.lng about.labels.watchlistsMatchedByTypelocationGroups%{lg_i}_locationslng%{loc_i} 如果 lg_i != 0 或 loc_i != 0,則會轉換為字串
watchlist.locationGroups.locations.lat about.labels.watchlistsMatchedByTypelocationGroups%{lg_i}_locationslat%{loc_i} 如果 lg_i != 0 或 loc_i != 0,則會轉換為字串
watchlist.locationGroups.locations.name about.labels.watchlistsMatchedByTypelocationGroups%{lg_i}_locationsname%{loc_i} 如果 lg_i != 0 或 loc_i != 0,則直接複製值
watchlist.locationGroups.id about.labels.watchlistsMatchedByType_locationGroupsid%{lg_i} 直接複製值
watchlist.locationGroups.name about.labels.watchlistsMatchedByType_locationGroupsname%{lg_i} 直接複製值
watchlist.locationGroups.locations.lng about.location.region_coordinates.longitude 如果 lg_i == 0 且 loc_i == 0,則直接複製值
watchlist.locationGroups.locations.lat about.location.region_coordinates.latitude 如果 lg_i == 0 且 loc_i == 0,則直接複製值
watchlist.locationGroups.locations.name about.location.name 如果 lg_i == 0 且 loc_i == 0,則直接複製值
source.entityName principal.hostname 直接複製值
metadata.event_type 設為「GENERIC_EVENT」;如果 principal_ip 或 principal.hostname 不為空白,則變更為「SCAN_HOST」

還有其他問題嗎?向社群成員和 Google SecOps 專業人員尋求答案。