收集 Cisco CloudLock CASB 記錄
本文說明如何使用 Amazon S3,將 Cisco CloudLock CASB 記錄擷取至 Google Security Operations。剖析器會從 JSON 記錄檔中擷取欄位,然後轉換並對應至統合資料模型 (UDM)。這項函式會處理日期剖析作業、將特定欄位轉換為字串、將欄位對應至 UDM 實體 (中繼資料、目標、安全性結果、關於),並逐一檢查 matches
以擷取偵測欄位,最終將所有擷取的資料合併至 @output
欄位。
事前準備
- Google SecOps 執行個體
- Cisco CloudLock CASB 租戶的特殊存取權
- AWS 的具備權限存取權 (S3、IAM、Lambda、EventBridge)
取得 Cisco CloudLock 必要條件
- 登入 Cisco CloudLock CASB 管理控制台。
- 前往 [設定]。
- 按一下「Authentication & API」分頁標籤。
- 在「API」下方,按一下「產生」即可建立存取權杖。
- 複製下列詳細資料並存放在安全位置:
- API 存取權杖
- CloudLock API 伺服器網址 (請與 Cloudlock 支援團隊聯絡,取得貴機構專屬的網址)
為 Google SecOps 設定 AWS S3 值區和 IAM
- 按照這份使用者指南建立 Amazon S3 bucket:建立 bucket
- 儲存 bucket 的「名稱」和「區域」,以供日後參考 (例如
cisco-cloudlock-logs
)。 - 請按照這份使用者指南建立使用者:建立 IAM 使用者。
- 選取建立的「使用者」。
- 選取「安全憑證」分頁標籤。
- 在「Access Keys」部分中,按一下「Create Access Key」。
- 選取「第三方服務」做為「用途」。
- 點選「下一步」。
- 選用:新增說明標記。
- 按一下「建立存取金鑰」。
- 按一下「下載 CSV 檔案」,儲存「存取金鑰」和「私密存取金鑰」以供日後參考。
- 按一下 [完成]。
- 選取「權限」分頁標籤。
- 在「權限政策」部分中,按一下「新增權限」。
- 選取「新增權限」。
- 選取「直接附加政策」。
- 搜尋「AmazonS3FullAccess」AmazonS3FullAccess政策。
- 選取政策。
- 點選「下一步」。
- 按一下「Add permissions」。
設定 S3 上傳的身分與存取權管理政策和角色
- 在 AWS 控制台中,依序前往「IAM」>「Policies」。
- 按一下「建立政策」>「JSON」分頁。
輸入下列政策:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::cisco-cloudlock-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::cisco-cloudlock-logs/cloudlock/state.json" } ] }
- 如果您輸入其他 bucket 名稱,請替換
cisco-cloudlock-logs
。
- 如果您輸入其他 bucket 名稱,請替換
依序點選「下一步」>「建立政策」。
依序前往「IAM」>「Roles」>「Create role」>「AWS service」>「Lambda」。
附加新建立的政策。
為角色命名
cloudlock-lambda-role
,然後按一下「建立角色」。
建立 Lambda 函式
- 在 AWS 控制台中,依序前往「Lambda」>「Functions」>「Create function」。
- 按一下「從頭開始撰寫」。
請提供下列設定詳細資料:
設定 值 名稱 cloudlock-data-export
執行階段 Python 3.12 (最新支援版本) 架構 x86_64 執行角色 cloudlock-lambda-role
建立函式後,開啟「程式碼」分頁,刪除存根並輸入下列程式碼 (
cloudlock-data-export.py
):import json import boto3 import urllib3 import os from datetime import datetime, timedelta import logging import time # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Initialize S3 client s3_client = boto3.client('s3') def lambda_handler(event, context): """ Lambda function to fetch Cisco CloudLock CASB data and store in S3 """ # Environment variables s3_bucket = os.environ['S3_BUCKET'] s3_prefix = os.environ['S3_PREFIX'] state_key = os.environ['STATE_KEY'] api_token = os.environ['CLOUDLOCK_API_TOKEN'] api_base = os.environ['CLOUDLOCK_API_BASE'] # HTTP client http = urllib3.PoolManager() try: # Get last run state for all endpoints state = get_last_run_state(s3_bucket, state_key) # Fetch incidents data (using updated_after for incremental sync) incidents_updated_after = state.get('incidents_updated_after') incidents, new_incidents_state = fetch_cloudlock_incidents( http, api_base, api_token, incidents_updated_after ) if incidents: upload_to_s3_ndjson(s3_bucket, s3_prefix, 'incidents', incidents) logger.info(f"Uploaded {len(incidents)} incidents to S3") state['incidents_updated_after'] = new_incidents_state # Fetch activities data (using from/to time range) activities_from = state.get('activities_from') if not activities_from: activities_from = (datetime.utcnow() - timedelta(hours=24)).isoformat() activities_to = datetime.utcnow().isoformat() activities = fetch_cloudlock_activities( http, api_base, api_token, activities_from, activities_to ) if activities: upload_to_s3_ndjson(s3_bucket, s3_prefix, 'activities', activities) logger.info(f"Uploaded {len(activities)} activities to S3") state['activities_from'] = activities_to # Fetch entities data (using updated_after for incremental sync) entities_updated_after = state.get('entities_updated_after') entities, new_entities_state = fetch_cloudlock_entities( http, api_base, api_token, entities_updated_after ) if entities: upload_to_s3_ndjson(s3_bucket, s3_prefix, 'entities', entities) logger.info(f"Uploaded {len(entities)} entities to S3") state['entities_updated_after'] = new_entities_state # Update consolidated state state['updated_at'] = datetime.utcnow().isoformat() update_last_run_state(s3_bucket, state_key, state) return { 'statusCode': 200, 'body': json.dumps('CloudLock data export completed successfully') } except Exception as e: logger.error(f"Error in lambda_handler: {str(e)}") return { 'statusCode': 500, 'body': json.dumps(f'Error: {str(e)}') } def make_api_request(http, url, headers, retries=3): """ Make API request with exponential backoff retry logic """ for attempt in range(retries): try: response = http.request('GET', url, headers=headers) if response.status == 200: return response elif response.status == 429: # Rate limit retry_after = int(response.headers.get('Retry-After', 60)) logger.warning(f"Rate limited, waiting {retry_after} seconds") time.sleep(retry_after) else: logger.error(f"API request failed with status {response.status}") except Exception as e: logger.error(f"Request attempt {attempt + 1} failed: {str(e)}") if attempt < retries - 1: wait_time = 2 ** attempt time.sleep(wait_time) else: raise return None def fetch_cloudlock_incidents(http, api_base, api_token, updated_after=None): """ Fetch incidents data from CloudLock API using updated_after for incremental sync API Reference: https://developer.cisco.com/docs/cloud-security/ """ url = f"{api_base}/api/v2/incidents" headers = { 'Authorization': f'Bearer {api_token}', 'Content-Type': 'application/json' } params = { 'limit': 1000, 'offset': 0, 'count_total': 'false' } if updated_after: params['updated_after'] = updated_after all_data = [] latest_updated_at = updated_after try: while True: # Build URL with parameters (avoid logging sensitive data) param_string = '&'.join([f"{k}={v}" for k, v in params.items()]) full_url = f"{url}?{param_string}" logger.info(f"Fetching incidents with offset: {params['offset']}") response = make_api_request(http, full_url, headers) if not response: break data = json.loads(response.data.decode('utf-8')) batch_data = data if isinstance(data, list) else data.get('data', []) if not batch_data: break all_data.extend(batch_data) # Track latest updated_at for incremental sync for item in batch_data: if 'updated_at' in item: item_updated_at = item['updated_at'] if not latest_updated_at or item_updated_at > latest_updated_at: latest_updated_at = item_updated_at # Check pagination if len(batch_data) < params['limit']: break params['offset'] += params['limit'] logger.info(f"Fetched {len(all_data)} incidents") return all_data, latest_updated_at except Exception as e: logger.error(f"Error fetching incidents: {str(e)}") return [], updated_after def fetch_cloudlock_activities(http, api_base, api_token, from_time, to_time): """ Fetch activities data from CloudLock API using time range API Reference: https://developer.cisco.com/docs/cloud-security/ """ url = f"{api_base}/api/v2/activities" headers = { 'Authorization': f'Bearer {api_token}', 'Content-Type': 'application/json' } params = { 'limit': 1000, 'offset': 0, 'from': from_time, 'to': to_time } all_data = [] try: while True: param_string = '&'.join([f"{k}={v}" for k, v in params.items()]) full_url = f"{url}?{param_string}" logger.info(f"Fetching activities with offset: {params['offset']}") response = make_api_request(http, full_url, headers) if not response: break data = json.loads(response.data.decode('utf-8')) batch_data = data if isinstance(data, list) else data.get('data', []) if not batch_data: break all_data.extend(batch_data) if len(batch_data) < params['limit']: break params['offset'] += params['limit'] logger.info(f"Fetched {len(all_data)} activities") return all_data except Exception as e: logger.error(f"Error fetching activities: {str(e)}") return [] def fetch_cloudlock_entities(http, api_base, api_token, updated_after=None): """ Fetch entities data from CloudLock API using updated_after for incremental sync API Reference: https://developer.cisco.com/docs/cloud-security/ """ url = f"{api_base}/api/v2/entities" headers = { 'Authorization': f'Bearer {api_token}', 'Content-Type': 'application/json' } params = { 'limit': 1000, 'offset': 0 } if updated_after: params['updated_after'] = updated_after all_data = [] latest_updated_at = updated_after try: while True: param_string = '&'.join([f"{k}={v}" for k, v in params.items()]) full_url = f"{url}?{param_string}" logger.info(f"Fetching entities with offset: {params['offset']}") response = make_api_request(http, full_url, headers) if not response: break data = json.loads(response.data.decode('utf-8')) batch_data = data if isinstance(data, list) else data.get('data', []) if not batch_data: break all_data.extend(batch_data) # Track latest updated_at for incremental sync for item in batch_data: if 'updated_at' in item: item_updated_at = item['updated_at'] if not latest_updated_at or item_updated_at > latest_updated_at: latest_updated_at = item_updated_at if len(batch_data) < params['limit']: break params['offset'] += params['limit'] logger.info(f"Fetched {len(all_data)} entities") return all_data, latest_updated_at except Exception as e: logger.error(f"Error fetching entities: {str(e)}") return [], updated_after def upload_to_s3_ndjson(bucket, prefix, data_type, data): """ Upload data to S3 bucket in NDJSON format (one JSON object per line) """ timestamp = datetime.utcnow().strftime('%Y/%m/%d/%H') filename = f"{prefix}{data_type}/{timestamp}/cloudlock_{data_type}_{int(datetime.utcnow().timestamp())}.jsonl" try: # Convert to NDJSON format ndjson_content = 'n'.join([json.dumps(item, separators=(',', ':')) for item in data]) s3_client.put_object( Bucket=bucket, Key=filename, Body=ndjson_content, ContentType='application/x-ndjson' ) logger.info(f"Successfully uploaded {filename} to S3") except Exception as e: logger.error(f"Error uploading to S3: {str(e)}") raise def get_last_run_state(bucket, key): """ Get the last run state from S3 with separate tracking for each endpoint """ try: response = s3_client.get_object(Bucket=bucket, Key=key) state = json.loads(response['Body'].read().decode('utf-8')) return state except s3_client.exceptions.NoSuchKey: logger.info("No previous state found, starting fresh") return {} except Exception as e: logger.error(f"Error reading state: {str(e)}") return {} def update_last_run_state(bucket, key, state): """ Update the consolidated state in S3 """ try: s3_client.put_object( Bucket=bucket, Key=key, Body=json.dumps(state, indent=2), ContentType='application/json' ) logger.info("Updated state successfully") except Exception as e: logger.error(f"Error updating state: {str(e)}") raise
依序前往「設定」>「環境變數」。
依序點選「編輯」> 新增環境變數。
輸入下列環境變數,並將 換成您的值。
鍵 範例值 S3_BUCKET
cisco-cloudlock-logs
S3_PREFIX
cloudlock/
STATE_KEY
cloudlock/state.json
CLOUDLOCK_API_TOKEN
<your-api-token>
CLOUDLOCK_API_BASE
<your-cloudlock-api-url>
建立函式後,請留在函式頁面 (或依序開啟「Lambda」>「Functions」>「your-function」)。
選取「設定」分頁標籤。
在「一般設定」面板中,按一下「編輯」。
將「Timeout」(逾時間隔) 變更為「5 minutes (300 seconds)」(5 分鐘 (300 秒)),然後按一下「Save」(儲存)。
建立 EventBridge 排程
- 依序前往「Amazon EventBridge」>「Scheduler」>「Create schedule」。
- 提供下列設定詳細資料:
- 週期性時間表:費率 (
1 hour
)。 - 目標:您的 Lambda 函式
cloudlock-data-export
。 - 名稱:
cloudlock-data-export-1h
。
- 週期性時間表:費率 (
- 按一下「建立時間表」。
選用:為 Google SecOps 建立唯讀 IAM 使用者和金鑰
- 依序前往 AWS 管理中心 > IAM >「Users」(使用者) >「Add users」(新增使用者)。
- 點選 [Add users] (新增使用者)。
- 提供下列設定詳細資料:
- 使用者:輸入
secops-reader
。 - 存取類型:選取「存取金鑰 - 程式輔助存取」。
- 使用者:輸入
- 按一下「建立使用者」。
- 附加最低讀取權限政策 (自訂):依序選取「Users」(使用者) >「secops-reader」>「Permissions」(權限) >「Add permissions」(新增權限) >「Attach policies directly」(直接附加政策) >「Create policy」(建立政策)。
在 JSON 編輯器中輸入下列政策:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::cisco-cloudlock-logs/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::cisco-cloudlock-logs" } ] }
將名稱設為
secops-reader-policy
。依序前往「建立政策」> 搜尋/選取 >「下一步」>「新增權限」。
依序前往「安全憑證」>「存取金鑰」>「建立存取金鑰」。
下載 CSV (這些值會輸入至動態饋給)。
在 Google SecOps 中設定資訊提供,擷取 Cisco CloudLock 記錄
- 依序前往「SIEM 設定」>「動態饋給」。
- 按一下「+ 新增動態消息」。
- 在「動態饋給名稱」欄位中輸入動態饋給名稱 (例如
Cisco CloudLock logs
)。 - 選取「Amazon S3 V2」做為「來源類型」。
- 選取「Cisco CloudLock」做為「記錄類型」。
- 點選「下一步」。
- 指定下列輸入參數的值:
- S3 URI:
s3://cisco-cloudlock-logs/cloudlock/
- 來源刪除選項:根據偏好設定選取刪除選項。
- 檔案存在時間上限:包含在過去天數內修改的檔案。預設值為 180 天。
- 存取金鑰 ID:具有 S3 值區存取權的使用者存取金鑰。
- 存取密鑰:具有 S3 bucket 存取權的使用者私密金鑰。
- 資產命名空間:資產命名空間。
- 擷取標籤:套用至這個動態饋給事件的標籤。
- S3 URI:
- 點選「下一步」。
- 在「完成」畫面中檢查新的動態饋給設定,然後按一下「提交」。
UDM 對應表
記錄欄位 | UDM 對應 | 邏輯 |
---|---|---|
created_at |
about.resource.attribute.labels.key |
created_at 欄位的值會指派給標籤鍵。 |
created_at |
about.resource.attribute.labels.value |
created_at 欄位的值會指派給標籤值。 |
created_at |
about.resource.attribute.creation_time |
created_at 欄位會剖析為時間戳記並對應。 |
entity.id |
target.asset.product_object_id |
entity.id 欄位已重新命名。 |
entity.ip |
target.ip |
entity.ip 欄位會合併至目標 IP 欄位。 |
entity.mime_type |
target.file.mime_type |
當 entity.origin_type 為「document」時,entity.mime_type 欄位會重新命名。 |
entity.name |
target.application |
當 entity.origin_type 為「app」時,entity.name 欄位會重新命名。 |
entity.name |
target.file.full_path |
當 entity.origin_type 為「document」時,entity.name 欄位會重新命名。 |
entity.origin_id |
target.resource.product_object_id |
entity.origin_id 欄位已重新命名。 |
entity.origin_type |
target.resource.resource_subtype |
entity.origin_type 欄位已重新命名。 |
entity.owner_email |
target.user.email_addresses |
如果 entity.owner_email 欄位符合電子郵件的 regex,系統會將該欄位合併至目標使用者電子郵件欄位。 |
entity.owner_email |
target.user.user_display_name |
如果 entity.owner_email 欄位不符合電子郵件規則運算式,系統會重新命名該欄位。 |
entity.owner_name |
target.user.user_display_name |
如果 entity.owner_email 符合電子郵件規則運算式,系統會重新命名 entity.owner_name 欄位。 |
entity.vendor.name |
target.platform_version |
entity.vendor.name 欄位已重新命名。 |
id |
metadata.product_log_id |
id 欄位已重新命名。 |
incident_status |
metadata.product_event_type |
incident_status 欄位已重新命名。值會以硬式編碼設為「updated_at」。這個值衍生自 updated_at 欄位。updated_at 欄位會剖析為時間戳記並對應。如果 severity 為「ALERT」,且 incident_status 為「NEW」,則設為「true」。已轉換為布林值。如果 severity 為「ALERT」,且 incident_status 為「NEW」,則設為「true」。已轉換為布林值。值會硬式編碼為「GENERIC_EVENT」。值會硬式編碼為「CISCO_CLOUDLOCK_CASB」。值會硬式編碼為「CloudLock」。值會硬式編碼為「Cisco」。如果 severity 為「ALERT」,且 incident_status 不是「RESOLVED」或「DISMISSED」,則設為「ALERTING」。如果 severity 為「ALERT」,且 incident_status 為「RESOLVED」或「DISMISSED」,請設為「NOT_ALERTING」。衍生自 matches 陣列,特別是每個比對物件的鍵。衍生自 matches 陣列,特別是每個相符物件的值。衍生自 policy.id 。衍生自 policy.name 。如果 severity 為「INFO」,請設為「INFORMATIONAL」。如果 severity 為「CRITICAL」,請設為「CRITICAL」。衍生自 severity 。這個值會設為「相符數量:」,並與 match_count 的值串連。如果 entity.origin_type 為「document」,請設為「STORAGE_OBJECT」。當 entity.origin_type 為「文件」時,衍生自 entity.direct_url 。 |
policy.id |
security_result.rule_id |
policy.id 欄位已重新命名。 |
policy.name |
security_result.rule_name |
policy.name 欄位已重新命名。 |
severity |
security_result.severity_details |
severity 欄位已重新命名。 |
updated_at |
about.resource.attribute.labels.key |
updated_at 欄位的值會指派給標籤鍵。 |
updated_at |
about.resource.attribute.labels.value |
updated_at 欄位的值會指派給標籤值。 |
updated_at |
about.resource.attribute.last_update_time |
updated_at 欄位會剖析為時間戳記並對應。 |
還有其他問題嗎?向社群成員和 Google SecOps 專業人員尋求答案。