收集 Forcepoint ONE CASB 記錄

支援的國家/地區:

本文說明如何使用 Google Cloud Storage V2,將 Forcepoint ONE CASB (舊稱 Forcepoint/Bitglass CASB) 記錄檔擷取至 Google Security Operations。

Forcepoint ONE CASB 是一種雲端存取安全性代理程式,可為雲端應用程式提供可視性、合規性和威脅防護。這項服務可監控及控管 SaaS 應用程式的存取權、強制執行資料遺失防護政策,以及偵測核准和未核准雲端服務中異常的使用者行為。

詳情請參閱 Forcepoint CASB 說明文件

事前準備

請確認您已完成下列事前準備事項:

  • Google SecOps 執行個體
  • 已啟用 Cloud Storage API 的 GCP 專案
  • 建立及管理 GCS bucket 的權限
  • 管理 Google Cloud Storage 值區 IAM 政策的權限
  • 建立 Cloud Run 服務、Pub/Sub 主題和 Cloud Scheduler 工作的權限
  • Forcepoint ONE 入口網站的特殊存取權 (管理員角色)
  • Forcepoint ONE API 憑證 (API 存取權杖或 OAuth 用戶端憑證)

建立 Google Cloud Storage 值區

  1. 前往 Google Cloud Console
  2. 選取專案或建立新專案。
  3. 在導覽選單中,依序前往「Cloud Storage」>「Bucket」
  4. 按一下「建立值區」
  5. 請提供下列設定詳細資料:

    設定
    為 bucket 命名 輸入全域不重複的名稱 (例如 forcepoint-casb-logs)
    位置類型 根據需求選擇 (區域、雙區域、多區域)
    位置 選取位置 (例如 us-central1)
    儲存空間級別 標準 (建議用於經常存取的記錄)
    存取控管 統一 (建議)
    保護工具 選用:啟用物件版本管理或資料保留政策
  6. 點選「建立」

收集 Forcepoint ONE CASB API 憑證

取得 API 存取權杖

  1. 使用管理員帳戶登入 Forcepoint ONE 入口網站 (https://portal.bitglass.com 或您所在區域的入口網站網址)。
  2. 依序前往「設定」>「API 整合」
  3. 按一下「Generate API Token」(產生 API 權杖) (如果是第一次產生,請按一下「Create New Token」)。
  4. 請提供下列設定詳細資料:
    • 權杖名稱:輸入描述性名稱 (例如 SecOps-GCS-Integration)。
    • 權限:為下列範圍選取「讀取」存取權:
      • 存取記錄:使用者活動和存取事件。
      • 管理員稽核記錄:管理變更和設定事件。
      • 資料遺失防護違規事件:資料遺失防護事件。
      • 威脅防護記錄:惡意軟體和威脅偵測事件。
  5. 點按「生成」
  6. 複製並妥善儲存下列值:

    • API 存取權杖:系統產生的權杖值。
    • API 基準網址:Forcepoint ONE API 端點 (例如 https://portal.bitglass.com/api)。

驗證權限

如要確認帳戶是否具備必要權限,請按照下列步驟操作:

  1. 登入 Forcepoint ONE 入口網站。
  2. 依序前往「設定」>「管理員」
  3. 確認您的帳戶是否具備「管理員」或「安全性管理員」角色。
  4. 如果看不到「API Integration」(API 整合) 選項,請與 Forcepoint ONE 管理員聯絡,要求對方授予必要角色。

測試 API 存取權

  • 請先測試憑證,再繼續進行整合:

    API_TOKEN="<your-api-token>"
    API_BASE="https://portal.bitglass.com"
    
    # Test API access - retrieve recent access logs
    curl -v -H "Authorization: Bearer ${API_TOKEN}" \
        -H "Accept: application/json" \
        "${API_BASE}/api/v1/logs/access?limit=1"
    

如果回應成功,系統會傳回 HTTP 200,以及包含記錄項目的 JSON 物件。如果收到 HTTP 401,請驗證 API 權杖。如果收到 HTTP 403 錯誤,請確認帳戶權限。

為 Cloud Run 函式建立服務帳戶

Cloud Run 函式需要具備寫入 GCS bucket 權限的服務帳戶,並由 Pub/Sub 叫用。

建立服務帳戶

  1. GCP 控制台中,依序前往「IAM 與管理」>「服務帳戶」
  2. 按一下「Create Service Account」(建立服務帳戶)
  3. 請提供下列設定詳細資料:
    • 服務帳戶名稱:輸入 forcepoint-casb-collector-sa
    • 服務帳戶說明:輸入 Service account for Cloud Run function to collect Forcepoint ONE CASB logs
  4. 按一下「建立並繼續」
  5. 在「將專案存取權授予這個服務帳戶」部分,新增下列角色:
    1. 按一下「選擇角色」
    2. 搜尋並選取「Storage 物件管理員」
    3. 點選「+ 新增其他角色」
    4. 搜尋並選取「Cloud Run Invoker」
    5. 點選「+ 新增其他角色」
    6. 搜尋並選取「Cloud Functions Invoker」(Cloud Functions 叫用者)
  6. 按一下「繼續」
  7. 按一下 [完成]

這些角色適用於:

  • Storage 物件管理員:將記錄檔寫入 GCS bucket,並管理狀態檔案。
  • Cloud Run 叫用者:允許 Pub/Sub 叫用函式。
  • Cloud Functions 叫用者:允許叫用函式。

授予 GCS 值區的 IAM 權限

授予服務帳戶 GCS bucket 的寫入權限:

  1. 依序前往「Cloud Storage」>「Buckets」
  2. 按一下 bucket 名稱 (例如 forcepoint-casb-logs)。
  3. 前往「權限」分頁標籤。
  4. 按一下「授予存取權」
  5. 請提供下列設定詳細資料:
    • 新增主體:輸入服務帳戶電子郵件地址 (例如 forcepoint-casb-collector-sa@PROJECT_ID.iam.gserviceaccount.com)
    • 指派角色:選取「Storage 物件管理員」
  6. 按一下 [儲存]

建立 Pub/Sub 主題

建立 Pub/Sub 主題,供 Cloud Scheduler 發布訊息,以及 Cloud Run 函式訂閱訊息。

  1. GCP Console 中,前往「Pub/Sub」>「Topics」(主題)
  2. 按一下「建立主題」
  3. 請提供下列設定詳細資料:
    • 主題 ID:輸入 forcepoint-casb-trigger
    • 其他設定保留預設值
  4. 點選「建立」

建立 Cloud Run 函式來收集記錄

Cloud Run 函式會由 Cloud Scheduler 的 Pub/Sub 訊息觸發,從 Forcepoint ONE CASB API 擷取記錄,並寫入 GCS。

  1. 前往 GCP Console 的「Cloud Run」
  2. 按一下「Create service」(建立服務)
  3. 選取「函式」 (使用內嵌編輯器建立函式)。
  4. 在「設定」部分,提供下列設定詳細資料:

    設定
    服務名稱 forcepoint-casb-collector
    區域 選取與 GCS bucket 相符的區域 (例如 us-central1)
    執行階段 選取 Python 3.12 以上版本
  5. 在「Trigger (optional)」(觸發條件 (選用)) 專區:

    1. 按一下「+ 新增觸發條件」
    2. 選取「Cloud Pub/Sub」
    3. 在「Select a Cloud Pub/Sub topic」(選取 Cloud Pub/Sub 主題) 中,選擇 forcepoint-casb-trigger
    4. 按一下 [儲存]
  6. 在「Authentication」(驗證) 部分:

    1. 選取「需要驗證」
    2. 檢查 Identity and Access Management (IAM)
  7. 向下捲動並展開「Containers, Networking, Security」

  8. 前往「安全性」分頁:

    • 服務帳戶:選取 forcepoint-casb-collector-sa
  9. 前往「容器」分頁:

    1. 按一下「變數與密鑰」
    2. 針對每個環境變數,按一下「+ 新增變數」
    變數名稱 範例值 說明
    GCS_BUCKET forcepoint-casb-logs GCS bucket 名稱
    GCS_PREFIX forcepoint-casb 記錄檔的前置字串
    STATE_KEY forcepoint-casb/state.json 狀態檔案路徑
    API_BASE https://portal.bitglass.com Forcepoint ONE API 基本網址
    API_TOKEN your-api-token Forcepoint ONE API 權杖
    MAX_RECORDS 5000 每次執行的記錄數上限
    PAGE_SIZE 500 每頁記錄數
    LOOKBACK_HOURS 24 初始回溯期
  10. 在「變數與密鑰」部分,向下捲動至「要求」

    • 要求逾時:輸入 600 秒 (10 分鐘)
  11. 前往「設定」分頁:

    • 在「資源」部分:
      • 記憶體:選取 512 MiB 以上
      • CPU:選取 1
  12. 在「修訂版本資源調度」部分:

    • 執行個體數量下限:輸入 0
    • 「執行個體數量上限」:輸入 100 (或根據預期負載調整)
  13. 點選「建立」

  14. 等待服務建立完成 (1 到 2 分鐘)。

  15. 服務建立完成後,系統會自動開啟內嵌程式碼編輯器

新增函式程式碼

  1. 在「進入點」欄位中輸入 main
  2. 在內嵌程式碼編輯器中建立兩個檔案:

    • main.py:

      import functions_framework
      from google.cloud import storage
      import json
      import os
      import urllib3
      from datetime import datetime, timezone, timedelta
      import time
      
      # Initialize HTTP client with timeouts
      http = urllib3.PoolManager(
          timeout=urllib3.Timeout(connect=5.0, read=30.0),
          retries=False,
      )
      
      # Initialize Storage client
      storage_client = storage.Client()
      
      # Environment variables
      GCS_BUCKET = os.environ.get('GCS_BUCKET')
      GCS_PREFIX = os.environ.get('GCS_PREFIX', 'forcepoint-casb')
      STATE_KEY = os.environ.get('STATE_KEY', 'forcepoint-casb/state.json')
      API_BASE = os.environ.get('API_BASE')
      API_TOKEN = os.environ.get('API_TOKEN')
      MAX_RECORDS = int(os.environ.get('MAX_RECORDS', '5000'))
      PAGE_SIZE = int(os.environ.get('PAGE_SIZE', '500'))
      LOOKBACK_HOURS = int(os.environ.get('LOOKBACK_HOURS', '24'))
      
      def parse_datetime(value: str) -> datetime:
          """Parse ISO datetime string to datetime object."""
          if value.endswith("Z"):
              value = value[:-1] + "+00:00"
          return datetime.fromisoformat(value)
      
      @functions_framework.cloud_event
      def main(cloud_event):
          """
          Cloud Run function triggered by Pub/Sub to fetch Forcepoint ONE CASB logs
          and write to GCS.
      
          Args:
              cloud_event: CloudEvent object containing Pub/Sub message
          """
      
          if not all([GCS_BUCKET, API_BASE, API_TOKEN]):
              print('Error: Missing required environment variables')
              return
      
          try:
              # Get GCS bucket
              bucket = storage_client.bucket(GCS_BUCKET)
      
              # Load state
              state = load_state(bucket, STATE_KEY)
      
              # Determine time window
              now = datetime.now(timezone.utc)
              last_time = None
      
              if isinstance(state, dict) and state.get("last_event_time"):
                  try:
                      last_time = parse_datetime(state["last_event_time"])
                      # Overlap by 2 minutes to catch any delayed events
                      last_time = last_time - timedelta(minutes=2)
                  except Exception as e:
                      print(f"Warning: Could not parse last_event_time: {e}")
      
              if last_time is None:
                  last_time = now - timedelta(hours=LOOKBACK_HOURS)
      
              print(f"Fetching logs from {last_time.isoformat()} to {now.isoformat()}")
      
              # Fetch logs
              records, newest_event_time = fetch_logs(
                  api_base=API_BASE,
                  api_token=API_TOKEN,
                  start_time=last_time,
                  end_time=now,
                  page_size=PAGE_SIZE,
                  max_records=MAX_RECORDS,
              )
      
              if not records:
                  print("No new log records found.")
                  save_state(bucket, STATE_KEY, now.isoformat())
                  return
      
              # Write to GCS as NDJSON
              timestamp = now.strftime('%Y%m%d_%H%M%S')
              object_key = f"{GCS_PREFIX}/logs_{timestamp}.ndjson"
              blob = bucket.blob(object_key)
      
              ndjson = '\n'.join([json.dumps(record, ensure_ascii=False) for record in records]) + '\n'
              blob.upload_from_string(ndjson, content_type='application/x-ndjson')
      
              print(f"Wrote {len(records)} records to gs://{GCS_BUCKET}/{object_key}")
      
              # Update state with newest event time
              if newest_event_time:
                  save_state(bucket, STATE_KEY, newest_event_time)
              else:
                  save_state(bucket, STATE_KEY, now.isoformat())
      
              print(f"Successfully processed {len(records)} records")
      
          except Exception as e:
              print(f'Error processing logs: {str(e)}')
              raise
      
      def load_state(bucket, key):
          """Load state from GCS."""
          try:
              blob = bucket.blob(key)
              if blob.exists():
                  state_data = blob.download_as_text()
                  return json.loads(state_data)
          except Exception as e:
              print(f"Warning: Could not load state: {e}")
      
          return {}
      
      def save_state(bucket, key, last_event_time_iso: str):
          """Save the last event timestamp to GCS state file."""
          try:
              state = {'last_event_time': last_event_time_iso}
              blob = bucket.blob(key)
              blob.upload_from_string(
                  json.dumps(state, indent=2),
                  content_type='application/json'
              )
              print(f"Saved state: last_event_time={last_event_time_iso}")
          except Exception as e:
              print(f"Warning: Could not save state: {e}")
      
      def fetch_logs(api_base: str, api_token: str, start_time: datetime, end_time: datetime, page_size: int, max_records: int):
          """
          Fetch logs from Forcepoint ONE CASB API with pagination and rate limiting.
      
          Args:
              api_base: Forcepoint ONE API base URL
              api_token: API access token
              start_time: Start time for log query
              end_time: End time for log query
              page_size: Number of records per page
              max_records: Maximum total records to fetch
      
          Returns:
              Tuple of (records list, newest_event_time ISO string)
          """
          base_url = api_base.rstrip('/')
          endpoint = f"{base_url}/api/v1/logs/access"
      
          headers = {
              'Authorization': f'Bearer {api_token}',
              'Accept': 'application/json',
              'Content-Type': 'application/json',
              'User-Agent': 'GoogleSecOps-ForcepointCASBCollector/1.0'
          }
      
          records = []
          newest_time = None
          page_num = 0
          backoff = 1.0
          start_index = 0
      
          start_iso = start_time.strftime('%Y-%m-%dT%H:%M:%SZ')
          end_iso = end_time.strftime('%Y-%m-%dT%H:%M:%SZ')
      
          while True:
              page_num += 1
      
              if len(records) >= max_records:
                  print(f"Reached max_records limit ({max_records})")
                  break
      
              current_limit = min(page_size, max_records - len(records))
              url = f"{endpoint}?startDate={start_iso}&endDate={end_iso}&offset={start_index}&limit={current_limit}"
      
              try:
                  response = http.request('GET', url, headers=headers)
      
                  # Handle rate limiting with exponential backoff
                  if response.status == 429:
                      retry_after = int(response.headers.get('Retry-After', str(int(backoff))))
                      print(f"Rate limited (429). Retrying after {retry_after}s...")
                      time.sleep(retry_after)
                      backoff = min(backoff * 2, 30.0)
                      continue
      
                  backoff = 1.0
      
                  if response.status != 200:
                      print(f"HTTP Error: {response.status}")
                      response_text = response.data.decode('utf-8')
                      print(f"Response body: {response_text}")
                      return [], None
      
                  data = json.loads(response.data.decode('utf-8'))
      
                  page_results = data.get('data', [])
      
                  if not page_results:
                      print(f"No more results (empty page)")
                      break
      
                  print(f"Page {page_num}: Retrieved {len(page_results)} events")
                  records.extend(page_results)
      
                  # Track newest event time
                  for event in page_results:
                      try:
                          event_time = event.get('timestamp') or event.get('created_at')
                          if event_time:
                              if newest_time is None or parse_datetime(event_time) > parse_datetime(newest_time):
                                  newest_time = event_time
                      except Exception as e:
                          print(f"Warning: Could not parse event time: {e}")
      
                  # Check for more results
                  if len(page_results) < current_limit:
                      print(f"Reached last page (size={len(page_results)} < limit={current_limit})")
                      break
      
                  start_index += len(page_results)
      
              except Exception as e:
                  print(f"Error fetching logs: {e}")
                  return [], None
      
          print(f"Retrieved {len(records)} total records from {page_num} pages")
          return records, newest_time
      
    • requirements.txt:

      functions-framework==3.*
      google-cloud-storage==2.*
      urllib3>=2.0.0
      
  3. 點選「部署」即可儲存並部署函式。

  4. 等待部署作業完成 (2 到 3 分鐘)。

建立 Cloud Scheduler 工作

Cloud Scheduler 會定期將訊息發布至 Pub/Sub 主題,觸發 Cloud Run 函式。

  1. 前往 GCP 主控台的「Cloud Scheduler」
  2. 點選「建立工作」
  3. 請提供下列設定詳細資料:

    設定
    名稱 forcepoint-casb-collector-hourly
    區域 選取與 Cloud Run 函式相同的區域
    頻率 0 * * * * (每小時整點)
    時區 選取時區 (建議使用世界標準時間)
    目標類型 Pub/Sub
    主題 選取「forcepoint-casb-trigger
    郵件內文 {} (空白 JSON 物件)
  4. 點選「建立」

排程頻率選項

根據記錄檔量和延遲時間要求選擇頻率:

頻率 Cron 運算式 用途
每 5 分鐘 */5 * * * * 高容量、低延遲
每 15 分鐘 */15 * * * * 中等音量
每小時 0 * * * * 標準 (建議)
每 6 小時 0 */6 * * * 少量、批次處理
每日 0 0 * * * 歷來資料集合

測試整合項目

  1. Cloud Scheduler 控制台中,找出您的工作 (forcepoint-casb-collector-hourly)。
  2. 按一下「強制執行」即可手動觸發工作。
  3. 稍等幾秒鐘。
  4. 前往「Cloud Run」>「Services」
  5. 按一下「forcepoint-casb-collector」
  6. 按一下 [Logs] (記錄) 分頁標籤。
  7. 確認函式是否順利執行。尋找:

    Fetching logs from YYYY-MM-DDTHH:MM:SS+00:00 to YYYY-MM-DDTHH:MM:SS+00:00
    Page 1: Retrieved X events
    Wrote X records to gs://forcepoint-casb-logs/forcepoint-casb/logs_YYYYMMDD_HHMMSS.ndjson
    Successfully processed X records
    
  8. 依序前往「Cloud Storage」>「Buckets」

  9. 按一下 bucket 名稱 (forcepoint-casb-logs)。

  10. 前往 forcepoint-casb/ 資料夾。

  11. 確認是否已建立含有目前時間戳記的新 .ndjson 檔案。

如果記錄中顯示錯誤:

  • HTTP 401:檢查環境變數中的 API 權杖。
  • HTTP 403:確認帳戶在 Forcepoint ONE 入口網站中具備必要權限。
  • HTTP 429:頻率限制 - 函式會自動重試並輪詢。
  • 缺少環境變數:確認已設定所有必要變數。

擷取 Google SecOps 服務帳戶

Google SecOps 會使用專屬服務帳戶,從 GCS bucket 讀取資料。您必須授予這個服務帳戶值區存取權。

取得服務帳戶電子郵件地址

  1. 依序前往「SIEM 設定」>「動態饋給」
  2. 按一下「新增動態消息」
  3. 按一下「設定單一動態饋給」
  4. 在「動態饋給名稱」欄位中輸入動態饋給名稱 (例如 Forcepoint CASB Logs)。
  5. 選取「Google Cloud Storage V2」做為「來源類型」
  6. 選取「Forcepoint CASB」做為「記錄類型」
  7. 按一下「取得服務帳戶」。系統會顯示專屬服務帳戶電子郵件地址,例如:

    chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com
    
  8. 複製這個電子郵件地址,以便在下一步中使用。

  9. 點選「下一步」

  10. 指定下列輸入參數的值:

    • 儲存空間 bucket URL:輸入 GCS bucket URI,並加上前置路徑:

      gs://forcepoint-casb-logs/forcepoint-casb/
      
    • 來源刪除選項:根據偏好設定選取刪除選項:

      • 永不:移轉後一律不刪除任何檔案 (建議用於測試)。
      • 刪除已轉移的檔案:成功轉移檔案後刪除檔案。
      • 刪除已轉移的檔案和空白目錄:成功轉移檔案後,刪除檔案和空白目錄。
    • 檔案存在時間上限:包含在過去天數內修改的檔案 (預設為 180 天)。

    • 資產命名空間資產命名空間

    • 擷取標籤:要套用至這個動態饋給事件的標籤。

  11. 點選「下一步」

  12. 在「Finalize」(完成) 畫面中檢查新的動態饋給設定,然後按一下「Submit」(提交)

將 IAM 權限授予 Google SecOps 服務帳戶

Google SecOps 服務帳戶需要 GCS bucket 的「Storage 物件檢視者」角色。

  1. 依序前往「Cloud Storage」>「Buckets」
  2. 按一下 bucket 名稱 (forcepoint-casb-logs)。
  3. 前往「權限」分頁標籤。
  4. 按一下「授予存取權」
  5. 請提供下列設定詳細資料:
    • 新增主體:貼上 Google SecOps 服務帳戶電子郵件地址。
    • 指派角色:選取「Storage 物件檢視者」
  6. 按一下 [儲存]

UDM 對應表

記錄欄位 UDM 對應 邏輯
extensions extensions 已重新命名/對應
request extensions.auth.auth_details 直接對應
intermediary intermediary 已合併
smb_host intermediary.hostname 直接對應
smb_uid intermediary.user.userid 直接對應
metadata metadata 已重新命名/對應
msg metadata.description 直接對應
event_name metadata.product_event_type 直接對應
event_type metadata.product_event_type 直接對應
value metadata.product_name 直接對應
product metadata.vendor_name 直接對應
network network 已重新命名/對應
requestClientApplication network.http.user_agent 直接對應
principal principal 已重新命名/對應
ahost principal.hostname 直接對應
agt principal.ip 已合併
atz principal.location.country_or_region 直接對應
amac principal.mac 已合併
sourceServiceName principal.platform_version 直接對應
av_label principal.resource.attribute.labels 已合併
aid principal.resource.id 直接對應
at principal.resource.name 直接對應
agentZoneURI principal.url 直接對應
cs1 principal.user.email_addresses 已合併
suid principal.user.userid 直接對應
security_result security_result 已合併
action security_result.action 已合併
msg security_result.summary 直接對應
src_label src 已重新命名/對應
target target 已重新命名/對應
dvchost target.hostname 直接對應
dvc target.ip 已合併
deviceProcessName target.resource.name 直接對應
deviceZoneURI target.url 直接對應
不適用 extensions.auth.type 常數:SSO
不適用 intermediary 常數:intermediary
不適用 metadata.event_type 常數:USER_RESOURCE_ACCESS
不適用 principal.ip 常數:agt
不適用 principal.mac 常數:amac
不適用 principal.platform 常數:WINDOWS
不適用 principal.resource.attribute.labels 常數:av_label
不適用 principal.user.email_addresses 常數:cs1
不適用 security_result 常數:security_result
不適用 security_result.action 常數:action
不適用 target.ip 常數:dvc

還有其他問題嗎?向社群成員和 Google SecOps 專業人員尋求答案。