收集 Cisco CloudLock CASB 記錄
本文說明如何使用 Google Cloud Storage 將 Cisco CloudLock CASB 記錄擷取至 Google Security Operations。剖析器會從 JSON 記錄中擷取欄位,並轉換及對應至統一資料模型 (UDM)。這個函式會處理日期剖析作業、將特定欄位轉換為字串、將欄位對應至 UDM 實體 (中繼資料、目標、安全性結果、關於),並逐一比對以擷取偵測欄位,最終將所有擷取的資料合併至 @output 欄位。
Cisco CloudLock 是雲端原生雲端存取安全性代理程式 (CASB),可提供雲端應用程式的瀏覽權限和控制權。這項服務可協助機構發掘影子 IT、強制執行資料遺失防護政策、偵測威脅,並確保 SaaS 應用程式符合法規。
事前準備
請確認您已完成下列事前準備事項:
- Google SecOps 執行個體
- 已啟用 Cloud Storage API 的 GCP 專案
- 建立及管理 GCS 值區的權限
- 管理 Google Cloud Storage 值區 IAM 政策的權限
- 建立 Cloud Run 服務、Pub/Sub 主題和 Cloud Scheduler 工作的權限
- Cisco CloudLock 管理控制台的特殊權限
取得 Cisco CloudLock API 先決條件
如要開始使用,請與 Cloudlock 支援團隊聯絡,取得 Cloudlock API 網址。在 Cloudlock 應用程式中產生存取權杖,方法是選取「設定」頁面中的「驗證和 API」分頁,然後按一下「產生」。
- 登入 Cisco CloudLock 管理控制台。
- 依序前往「設定」>「驗證和 API」。
- 在「API」下方,按一下「產生」即可建立存取權杖。
- 複製下列詳細資料並儲存在安全的位置:
- API 存取權杖
- API 基準網址 (由 Cisco CloudLock 支援團隊提供,電子郵件地址為 [email protected])
建立 Google Cloud Storage 值區
- 前往 Google Cloud 控制台。
- 選取專案或建立新專案。
- 在導覽選單中,依序前往「Cloud Storage」>「Bucket」。
- 按一下「建立值區」。
請提供下列設定詳細資料:
設定 值 為 bucket 命名 輸入全域不重複的名稱 (例如 cisco-cloudlock-logs)位置類型 根據需求選擇 (區域、雙區域、多區域) 位置 選取位置 (例如 us-central1)儲存空間級別 標準 (建議用於經常存取的記錄) 存取控管 統一 (建議) 保護工具 選用:啟用物件版本管理或保留政策 點選「建立」。
為 Cloud Run 函式建立服務帳戶
Cloud Run 函式需要具備 GCS bucket 寫入權限的服務帳戶。
建立服務帳戶
- 在 GCP 主控台中,依序前往「IAM & Admin」(IAM 與管理) >「Service Accounts」(服務帳戶)。
- 按一下 [Create Service Account] (建立服務帳戶)。
- 請提供下列設定詳細資料:
- 服務帳戶名稱:輸入
cloudlock-data-export-sa。 - 服務帳戶說明:輸入
Service account for Cloud Run function to collect Cisco CloudLock logs。
- 服務帳戶名稱:輸入
- 按一下「建立並繼續」。
- 在「將專案存取權授予這個服務帳戶」部分:
- 按一下「選擇角色」。
- 搜尋並選取「Storage 物件管理員」。
- 點選「+ 新增其他角色」。
- 搜尋並選取「Cloud Run Invoker」。
- 點選「+ 新增其他角色」。
- 搜尋並選取「Cloud Functions Invoker」(Cloud Functions 叫用者)。
- 按一下「繼續」。
- 按一下 [完成]。
授予 GCS 值區的 IAM 權限
授予服務帳戶 GCS bucket 的寫入權限:
- 依序前往「Cloud Storage」>「Buckets」。
- 按一下 bucket 名稱。
- 前往「權限」分頁標籤。
- 按一下「授予存取權」。
- 請提供下列設定詳細資料:
- 新增主體:輸入服務帳戶電子郵件地址 (例如
cloudlock-data-export-sa@PROJECT_ID.iam.gserviceaccount.com)。 - 指派角色:選取「Storage 物件管理員」。
- 新增主體:輸入服務帳戶電子郵件地址 (例如
- 按一下 [儲存]。
建立 Pub/Sub 主題
建立 Pub/Sub 主題,Cloud Scheduler 會將訊息發布至該主題,而 Cloud Run 函式會訂閱該主題。
- 在 GCP Console 中,前往「Pub/Sub」>「Topics」(主題)。
- 按一下「建立主題」。
- 請提供下列設定詳細資料:
- 主題 ID:輸入
cloudlock-data-export-trigger。 - 其他設定保留預設值。
- 主題 ID:輸入
- 點選「建立」。
建立 Cloud Run 函式來收集記錄
Cloud Run 函式會由 Cloud Scheduler 的 Pub/Sub 訊息觸發,從 Cisco CloudLock API 擷取記錄,並寫入 GCS。
- 前往 GCP Console 的「Cloud Run」。
- 按一下「Create service」(建立服務)。
- 選取「函式」 (使用內嵌編輯器建立函式)。
在「設定」部分,提供下列設定詳細資料:
設定 值 服務名稱 cloudlock-data-export區域 選取與 GCS bucket 相符的區域 (例如 us-central1)執行階段 選取「Python 3.12」以上版本 在「Trigger (optional)」(觸發條件 (選用)) 專區:
- 按一下「+ 新增觸發條件」。
- 選取「Cloud Pub/Sub」。
- 在「選取 Cloud Pub/Sub 主題」中,選擇主題 (
cloudlock-data-export-trigger)。 - 按一下 [儲存]。
在「Authentication」(驗證) 部分:
- 選取「需要驗證」。
- 檢查 Identity and Access Management (IAM)。
捲動至「Containers, Networking, Security」(容器、網路、安全性) 並展開。
前往「安全性」分頁:
- 服務帳戶:選取服務帳戶 (
cloudlock-data-export-sa)。
- 服務帳戶:選取服務帳戶 (
前往「容器」分頁:
- 按一下「變數與密鑰」。
針對每個環境變數,按一下「+ 新增變數」:
變數名稱 範例值 GCS_BUCKETcisco-cloudlock-logsGCS_PREFIXcloudlock/STATE_KEYcloudlock/state.jsonCLOUDLOCK_API_TOKENyour-api-tokenCLOUDLOCK_API_BASEhttps://api.cloudlock.com
在「變數與密鑰」分頁中向下捲動至「要求」:
- 要求逾時:輸入
600秒 (10 分鐘)。
- 要求逾時:輸入
前往「容器」中的「設定」分頁:
- 在「資源」部分:
- 記憶體:選取 512 MiB 以上。
- CPU:選取 1。
- 按一下 [完成]。
- 在「資源」部分:
捲動至「執行環境」:
- 選取「預設」 (建議選項)。
在「修訂版本資源調度」部分:
- 執行個體數量下限:輸入
0。 - 「Maximum number of instances」(執行個體數量上限):輸入
100(或根據預期負載調整)。
- 執行個體數量下限:輸入
點選「建立」。
等待服務建立完成 (1 到 2 分鐘)。
服務建立完成後,系統會自動開啟內嵌程式碼編輯器。
新增函式程式碼
- 在「Function entry point」(函式進入點) 中輸入 main
在內嵌程式碼編輯器中建立兩個檔案:
第一個檔案:main.py:
import functions_framework from google.cloud import storage import json import os import urllib3 from datetime import datetime, timezone, timedelta import time # Initialize HTTP client http = urllib3.PoolManager() # Initialize Storage client storage_client = storage.Client() @functions_framework.cloud_event def main(cloud_event): """ Cloud Run function triggered by Pub/Sub to fetch logs from Cisco CloudLock API and write to GCS. Args: cloud_event: CloudEvent object containing Pub/Sub message """ # Get environment variables bucket_name = os.environ.get('GCS_BUCKET') prefix = os.environ.get('GCS_PREFIX', 'cloudlock/') state_key = os.environ.get('STATE_KEY', 'cloudlock/state.json') api_token = os.environ.get('CLOUDLOCK_API_TOKEN') api_base = os.environ.get('CLOUDLOCK_API_BASE') if not all([bucket_name, api_token, api_base]): print('Error: Missing required environment variables') return try: # Get GCS bucket bucket = storage_client.bucket(bucket_name) # Load state (last processed offset for each endpoint) state = load_state(bucket, state_key) print(f'Processing logs with state: {state}') # Create Authorization header headers = { 'Authorization': f'Bearer {api_token}', 'Content-Type': 'application/json' } # Fetch incidents data (using offset-based pagination) incidents_offset = state.get('incidents_offset', 0) incidents, new_incidents_offset = fetch_cloudlock_incidents( http, api_base, headers, incidents_offset ) if incidents: upload_to_gcs_ndjson(bucket, prefix, 'incidents', incidents) print(f'Uploaded {len(incidents)} incidents to GCS') state['incidents_offset'] = new_incidents_offset # Fetch activities data (using time-based filtering with offset pagination) activities_last_time = state.get('activities_last_time') if not activities_last_time: activities_last_time = (datetime.now(timezone.utc) - timedelta(hours=24)).isoformat() activities_offset = state.get('activities_offset', 0) activities, new_activities_offset, newest_activity_time = fetch_cloudlock_activities( http, api_base, headers, activities_last_time, activities_offset ) if activities: upload_to_gcs_ndjson(bucket, prefix, 'activities', activities) print(f'Uploaded {len(activities)} activities to GCS') state['activities_offset'] = new_activities_offset if newest_activity_time: state['activities_last_time'] = newest_activity_time # Fetch entities data (using offset-based pagination) entities_offset = state.get('entities_offset', 0) entities, new_entities_offset = fetch_cloudlock_entities( http, api_base, headers, entities_offset ) if entities: upload_to_gcs_ndjson(bucket, prefix, 'entities', entities) print(f'Uploaded {len(entities)} entities to GCS') state['entities_offset'] = new_entities_offset # Update consolidated state state['updated_at'] = datetime.now(timezone.utc).isoformat() save_state(bucket, state_key, state) print('CloudLock data export completed successfully') except Exception as e: print(f'Error processing logs: {str(e)}') raise def make_api_request(http, url, headers, retries=3): """Make API request with exponential backoff retry logic.""" for attempt in range(retries): try: response = http.request('GET', url, headers=headers) if response.status == 200: return response elif response.status == 429: # Rate limit retry_after = int(response.headers.get('Retry-After', 60)) print(f'Rate limited, waiting {retry_after} seconds') time.sleep(retry_after) else: print(f'API request failed with status {response.status}: {response.data.decode("utf-8")}') except Exception as e: print(f'Request attempt {attempt + 1} failed: {str(e)}') if attempt < retries - 1: wait_time = 2 ** attempt time.sleep(wait_time) else: raise return None def fetch_cloudlock_incidents(http, api_base, headers, start_offset=0): """ Fetch incidents data from Cisco CloudLock API using offset-based pagination. Note: The CloudLock API does not support updated_after parameter. This function uses offset-based pagination. For production use, consider implementing time-based filtering using created_at or updated_at fields in the response data. """ url = f"{api_base}/api/v2/incidents" limit = 1000 offset = start_offset all_data = [] try: while True: # Build URL with parameters full_url = f"{url}?limit={limit}&offset={offset}" print(f"Fetching incidents with offset: {offset}") response = make_api_request(http, full_url, headers) if not response: break data = json.loads(response.data.decode('utf-8')) # CloudLock API returns items in 'items' array batch_data = data.get('items', []) if not batch_data: print("No more incidents to fetch") break all_data.extend(batch_data) # Check if we've reached the end total = data.get('total', 0) results = data.get('results', len(batch_data)) print(f"Fetched {results} incidents (total available: {total})") if results < limit or offset + results >= total: print("Reached end of incidents") break offset += limit print(f"Fetched {len(all_data)} total incidents") return all_data, offset except Exception as e: print(f"Error fetching incidents: {str(e)}") return [], start_offset def fetch_cloudlock_activities(http, api_base, headers, from_time, start_offset=0): """ Fetch activities data from Cisco CloudLock API using time-based filtering and offset pagination. """ url = f"{api_base}/api/v2/activities" limit = 1000 offset = start_offset all_data = [] newest_time = None try: while True: # Build URL with time filter and pagination full_url = f"{url}?limit={limit}&offset={offset}" print(f"Fetching activities with offset: {offset}") response = make_api_request(http, full_url, headers) if not response: break data = json.loads(response.data.decode('utf-8')) batch_data = data.get('items', []) if not batch_data: print("No more activities to fetch") break # Filter activities by time (client-side filtering since API may not support time parameters) filtered_batch = [] for item in batch_data: item_time = item.get('timestamp') or item.get('created_at') if item_time and item_time >= from_time: filtered_batch.append(item) if not newest_time or item_time > newest_time: newest_time = item_time all_data.extend(filtered_batch) results = data.get('results', len(batch_data)) total = data.get('total', 0) print(f"Fetched {results} activities, {len(filtered_batch)} after time filter (total available: {total})") if results < limit or offset + results >= total: print("Reached end of activities") break offset += limit print(f"Fetched {len(all_data)} total activities") return all_data, offset, newest_time except Exception as e: print(f"Error fetching activities: {str(e)}") return [], start_offset, None def fetch_cloudlock_entities(http, api_base, headers, start_offset=0): """ Fetch entities data from Cisco CloudLock API using offset-based pagination. Note: This endpoint requires the Entity Cache feature. If not enabled, use the incident entities endpoint as an alternative. """ url = f"{api_base}/api/v2/entities" limit = 1000 offset = start_offset all_data = [] try: while True: full_url = f"{url}?limit={limit}&offset={offset}" print(f"Fetching entities with offset: {offset}") response = make_api_request(http, full_url, headers) if not response: break data = json.loads(response.data.decode('utf-8')) batch_data = data.get('items', []) if not batch_data: print("No more entities to fetch") break all_data.extend(batch_data) results = data.get('results', len(batch_data)) total = data.get('total', 0) print(f"Fetched {results} entities (total available: {total})") if results < limit or offset + results >= total: print("Reached end of entities") break offset += limit print(f"Fetched {len(all_data)} total entities") return all_data, offset except Exception as e: print(f"Error fetching entities: {str(e)}") return [], start_offset def upload_to_gcs_ndjson(bucket, prefix, data_type, data): """Upload data to GCS bucket in NDJSON format (one JSON object per line).""" timestamp = datetime.now(timezone.utc).strftime('%Y/%m/%d/%H') filename = f"{prefix}{data_type}/{timestamp}/cloudlock_{data_type}_{int(datetime.now(timezone.utc).timestamp())}.jsonl" try: # Convert to NDJSON format ndjson_content = '\n'.join([json.dumps(item, separators=(',', ':')) for item in data]) blob = bucket.blob(filename) blob.upload_from_string( ndjson_content, content_type='application/x-ndjson' ) print(f"Successfully uploaded {filename} to GCS") except Exception as e: print(f"Error uploading to GCS: {str(e)}") raise def load_state(bucket, key): """Load state from GCS with separate tracking for each endpoint.""" try: blob = bucket.blob(key) if blob.exists(): state_data = blob.download_as_text() return json.loads(state_data) except Exception as e: print(f'Warning: Could not load state: {str(e)}') print("No previous state found, starting fresh") return {} def save_state(bucket, key, state): """Save consolidated state to GCS.""" try: blob = bucket.blob(key) blob.upload_from_string( json.dumps(state, indent=2), content_type='application/json' ) print("Updated state successfully") except Exception as e: print(f"Error updating state: {str(e)}") raise- 第二個檔案:requirements.txt:
functions-framework==3.* google-cloud-storage==2.* urllib3>=2.0.0
點選「部署」來儲存並部署函式。
等待部署作業完成 (2 到 3 分鐘)。
建立 Cloud Scheduler 工作
Cloud Scheduler 會定期將訊息發布至 Pub/Sub 主題,觸發 Cloud Run 函式。
- 前往 GCP 主控台的「Cloud Scheduler」。
- 點選「建立工作」。
請提供下列設定詳細資料:
設定 值 名稱 cloudlock-data-export-hourly區域 選取與 Cloud Run 函式相同的區域 頻率 0 * * * *(每小時整點)時區 選取時區 (建議使用世界標準時間) 目標類型 Pub/Sub 主題 選取主題 ( cloudlock-data-export-trigger)郵件內文 {}(空白 JSON 物件)點選「建立」。
排程頻率選項
根據記錄檔量和延遲時間要求選擇頻率:
頻率 Cron 運算式 用途 每 5 分鐘 */5 * * * *高容量、低延遲 每 15 分鐘檢查一次 */15 * * * *普通量 每小時 0 * * * *標準 (建議採用) 每 6 小時 0 */6 * * *少量、批次處理 每日 0 0 * * *歷來資料集合
測試排程器工作
- 在 Cloud Scheduler 控制台中找出您的工作。
- 按一下「強制執行」即可手動觸發。
- 等待幾秒鐘,然後依序前往「Cloud Run」>「Services」(服務) >「cloudlock-data-export」>「Logs」(記錄)。
- 確認函式是否已順利執行。
- 檢查 GCS 值區,確認是否已寫入記錄。
擷取 Google SecOps 服務帳戶
Google SecOps 會使用專屬服務帳戶,從 GCS bucket 讀取資料。您必須授予這個服務帳戶值區存取權。
取得服務帳戶電子郵件地址
- 依序前往「SIEM 設定」>「動態饋給」。
- 按一下「新增動態消息」。
- 按一下「設定單一動態饋給」。
- 在「動態饋給名稱」欄位中輸入動態饋給名稱 (例如
Cisco CloudLock logs)。 - 選取「Google Cloud Storage V2」做為「來源類型」。
- 選取「Cisco CloudLock」做為「記錄類型」。
按一下「取得服務帳戶」。系統會顯示專屬的服務帳戶電子郵件地址,例如:
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com複製這個電子郵件地址,以便在下一步中使用。
將 IAM 權限授予 Google SecOps 服務帳戶
Google SecOps 服務帳戶需要 GCS bucket 的「Storage 物件檢視者」角色。
- 依序前往「Cloud Storage」>「Buckets」。
- 按一下 bucket 名稱。
- 前往「權限」分頁標籤。
- 按一下「授予存取權」。
- 請提供下列設定詳細資料:
- 新增主體:貼上 Google SecOps 服務帳戶電子郵件地址。
- 指派角色:選取「Storage 物件檢視者」。
- 按一下 [儲存]。
在 Google SecOps 中設定資訊提供,擷取 Cisco CloudLock 記錄
- 依序前往「SIEM 設定」>「動態饋給」。
- 按一下「新增動態消息」。
- 按一下「設定單一動態饋給」。
- 在「動態饋給名稱」欄位中輸入動態饋給名稱 (例如
Cisco CloudLock logs)。 - 選取「Google Cloud Storage V2」做為「來源類型」。
- 選取「Cisco CloudLock」做為「記錄類型」。
- 點選 [下一步]。
指定下列輸入參數的值:
儲存空間 bucket URL:輸入 GCS bucket URI,並加上前置路徑:
gs://cisco-cloudlock-logs/cloudlock/取代:
cisco-cloudlock-logs:您的 GCS bucket 名稱。cloudlock/:儲存記錄的選用前置字元/資料夾路徑 (如為根目錄,請留空)。
範例:
- 根層級 bucket:
gs://cisco-cloudlock-logs/ - 前置字串:
gs://cisco-cloudlock-logs/cloudlock/ - 有子資料夾:
gs://cisco-cloudlock-logs/cloudlock/incidents/
- 根層級 bucket:
來源刪除選項:根據偏好設定選取刪除選項:
- 永不:移轉後一律不刪除任何檔案 (建議用於測試)。
- 刪除已轉移的檔案:成功轉移檔案後刪除檔案。
刪除已轉移的檔案和空白目錄:成功轉移後刪除檔案和空白目錄。
檔案存在時間上限:包含在過去天數內修改的檔案。預設值為 180 天。
資產命名空間:資產命名空間。
擷取標籤:要套用至這個動態饋給事件的標籤。
點選 [下一步]。
在「Finalize」(完成) 畫面中檢查新的動態饋給設定,然後按一下「Submit」(提交)。
UDM 對應表
| 記錄欄位 | UDM 對應 | 邏輯 |
|---|---|---|
| created_at | about.resource.attribute.labels.key | created_at 欄位的值會指派給標籤鍵。 |
| created_at | about.resource.attribute.labels.value | created_at 欄位的值會指派給標籤值。 |
| created_at | about.resource.attribute.creation_time | 系統會將 created_at 欄位剖析為時間戳記並對應。 |
| entity.id | target.asset.product_object_id | entity.id 欄位已重新命名。 |
| entity.ip | target.ip | entity.ip 欄位會合併至目標 IP 欄位。 |
| entity.mime_type | target.file.mime_type | 當 entity.origin_type 為「document」時,entity.mime_type 欄位會重新命名。 |
| entity.name | target.application | 當 entity.origin_type 為「app」時,entity.name 欄位會重新命名。 |
| entity.name | target.file.full_path | 當 entity.origin_type 為「document」時,entity.name 欄位會重新命名。 |
| entity.origin_id | target.resource.product_object_id | entity.origin_id 欄位已重新命名。 |
| entity.origin_type | target.resource.resource_subtype | entity.origin_type 欄位已重新命名。 |
| entity.owner_email | target.user.email_addresses | 如果實體.owner_email 欄位符合電子郵件 regex,系統會將該欄位合併至目標使用者電子郵件欄位。 |
| entity.owner_email | target.user.user_display_name | 如果 entity.owner_email 欄位與電子郵件規則運算式不符,系統會重新命名該欄位。 |
| entity.owner_name | target.user.user_display_name | 如果 entity.owner_email 符合電子郵件的 regex,系統會重新命名 entity.owner_name 欄位。 |
| entity.vendor.name | target.platform_version | 「entity.vendor.name」欄位已重新命名。 |
| id | metadata.product_log_id | 「id」欄位已重新命名。 |
| incident_status | metadata.product_event_type | 「incident_status」欄位已重新命名。 |
| metadata.event_timestamp | 值會以硬式編碼設為「updated_at」。值是從 updated_at 欄位衍生而來。系統會將 updated_at 欄位剖析為時間戳記並對應。 | |
| security_result.detection_fields.key | 如果嚴重程度為「ALERT」,且 incident_status 為「NEW」,請設為「true」。轉換成布林值。 | |
| security_result.detection_fields.value | 如果嚴重程度為「ALERT」,且 incident_status 為「NEW」,請設為「true」。轉換成布林值。 | |
| metadata.event_type | 值會硬式編碼為「GENERIC_EVENT」。 | |
| metadata.product_name | 值會硬式編碼為「CISCO_CLOUDLOCK_CASB」。 | |
| metadata.vendor_name | 值會硬式編碼為「CloudLock」。 | |
| metadata.product_version | 值會硬式編碼為「Cisco」。 | |
| security_result.alert_state | 如果嚴重程度為「ALERT」,且 incident_status 不是「RESOLVED」或「DISMISSED」,請設為「ALERTING」。如果嚴重程度為「ALERT」,且 incident_status 為「RESOLVED」或「DISMISSED」,請設為「NOT_ALERTING」。 | |
| security_result.detection_fields.key | 衍生自 matches 陣列,具體來說是每個相符物件的鍵。 | |
| security_result.detection_fields.value | 衍生自 matches 陣列,具體來說是每個相符物件的值。 | |
| security_result.rule_id | 衍生自 policy.id。 | |
| security_result.rule_name | 衍生自 policy.name。 | |
| security_result.severity | 如果嚴重程度為「INFO」,請設為「INFORMATIONAL」。如果嚴重程度為「重大」,則設為「重大」。衍生自嚴重程度。 | |
| security_result.summary | 此值會設為「相符次數:」串連 match_count 的值。 | |
| target.resource.resource_type | 當 entity.origin_type 為「document」時,請設為「STORAGE_OBJECT」。 | |
| target.url | 當 entity.origin_type 為「document」時,衍生自 entity.direct_url。 | |
| policy.id | security_result.rule_id | 「policy.id」欄位已重新命名。 |
| policy.name | security_result.rule_name | 「policy.name」欄位已重新命名。 |
| 嚴重性 | security_result.severity_details | 「嚴重程度」欄位已重新命名。 |
| updated_at | about.resource.attribute.labels.key | updated_at 欄位的值會指派給 labels 鍵。 |
| updated_at | about.resource.attribute.labels.value | updated_at 欄位的值會指派給標籤值。 |
| updated_at | about.resource.attribute.last_update_time | 系統會將 updated_at 欄位剖析為時間戳記並對應。 |
需要其他協助嗎?向社群成員和 Google SecOps 專業人員尋求答案。