收集 Cisco CloudLock CASB 日志
本文档介绍了如何使用 Amazon S3 将 Cisco CloudLock CASB 日志注入到 Google Security Operations。解析器从 JSON 日志中提取字段,并将其转换和映射到统一数据模型 (UDM)。它负责处理日期解析、将特定字段转换为字符串、将字段映射到 UDM 实体(元数据、目标、安全结果、关于),并遍历 matches
以提取检测字段,最终将所有提取的数据合并到 @output
字段中。
准备工作
- Google SecOps 实例
- 对 Cisco CloudLock CASB 租户的特权访问权限
- 对 AWS(S3、IAM、Lambda、EventBridge)的特权访问权限
获取 Cisco CloudLock 前提条件
- 登录 Cisco CloudLock CASB 管理控制台。
- 转到设置。
- 点击身份验证和 API 标签页。
- 在 API 下,点击 Generate(生成)以创建访问令牌。
- 复制以下详细信息并将其保存在安全的位置:
- API 访问令牌
- CloudLock API 服务器网址(请与 Cloudlock 支持团队联系,获取您组织专用的网址)
为 Google SecOps 配置 AWS S3 存储桶和 IAM
- 按照以下用户指南创建 Amazon S3 存储桶:创建存储桶
- 保存存储桶名称和区域以供日后参考(例如
cisco-cloudlock-logs
)。 - 按照以下用户指南创建用户:创建 IAM 用户。
- 选择创建的用户。
- 选择安全凭据标签页。
- 在访问密钥部分中,点击创建访问密钥。
- 选择第三方服务作为使用情形。
- 点击下一步。
- 可选:添加说明标记。
- 点击创建访问密钥。
- 点击下载 CSV 文件,保存访问密钥和秘密访问密钥,以供日后参考。
- 点击完成。
- 选择权限标签页。
- 在权限政策部分中,点击添加权限。
- 选择添加权限。
- 选择直接附加政策。
- 搜索 AmazonS3FullAccess 政策。
- 选择相应政策。
- 点击下一步。
- 点击添加权限。
为 S3 上传配置 IAM 政策和角色
- 在 AWS 控制台中,依次前往 IAM > 政策。
- 依次点击创建政策 > JSON 标签页。
输入以下政策:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::cisco-cloudlock-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::cisco-cloudlock-logs/cloudlock/state.json" } ] }
- 如果您输入了其他存储桶名称,请替换
cisco-cloudlock-logs
。
- 如果您输入了其他存储桶名称,请替换
依次点击下一步 > 创建政策。
依次前往 IAM > 角色 > 创建角色 > AWS 服务 > Lambda。
附加新创建的政策。
将角色命名为
cloudlock-lambda-role
,然后点击创建角色。
创建 Lambda 函数
- 在 AWS 控制台中,依次前往 Lambda > 函数 > 创建函数。
- 点击从头开始创作。
提供以下配置详细信息:
设置 值 名称 cloudlock-data-export
运行时 Python 3.12(最新支持的版本) 架构 x86_64 执行角色 cloudlock-lambda-role
创建函数后,打开 Code 标签页,删除桩代码并输入以下代码 (
cloudlock-data-export.py
):import json import boto3 import urllib3 import os from datetime import datetime, timedelta import logging import time # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Initialize S3 client s3_client = boto3.client('s3') def lambda_handler(event, context): """ Lambda function to fetch Cisco CloudLock CASB data and store in S3 """ # Environment variables s3_bucket = os.environ['S3_BUCKET'] s3_prefix = os.environ['S3_PREFIX'] state_key = os.environ['STATE_KEY'] api_token = os.environ['CLOUDLOCK_API_TOKEN'] api_base = os.environ['CLOUDLOCK_API_BASE'] # HTTP client http = urllib3.PoolManager() try: # Get last run state for all endpoints state = get_last_run_state(s3_bucket, state_key) # Fetch incidents data (using updated_after for incremental sync) incidents_updated_after = state.get('incidents_updated_after') incidents, new_incidents_state = fetch_cloudlock_incidents( http, api_base, api_token, incidents_updated_after ) if incidents: upload_to_s3_ndjson(s3_bucket, s3_prefix, 'incidents', incidents) logger.info(f"Uploaded {len(incidents)} incidents to S3") state['incidents_updated_after'] = new_incidents_state # Fetch activities data (using from/to time range) activities_from = state.get('activities_from') if not activities_from: activities_from = (datetime.utcnow() - timedelta(hours=24)).isoformat() activities_to = datetime.utcnow().isoformat() activities = fetch_cloudlock_activities( http, api_base, api_token, activities_from, activities_to ) if activities: upload_to_s3_ndjson(s3_bucket, s3_prefix, 'activities', activities) logger.info(f"Uploaded {len(activities)} activities to S3") state['activities_from'] = activities_to # Fetch entities data (using updated_after for incremental sync) entities_updated_after = state.get('entities_updated_after') entities, new_entities_state = fetch_cloudlock_entities( http, api_base, api_token, entities_updated_after ) if entities: upload_to_s3_ndjson(s3_bucket, s3_prefix, 'entities', entities) logger.info(f"Uploaded {len(entities)} entities to S3") state['entities_updated_after'] = new_entities_state # Update consolidated state state['updated_at'] = datetime.utcnow().isoformat() update_last_run_state(s3_bucket, state_key, state) return { 'statusCode': 200, 'body': json.dumps('CloudLock data export completed successfully') } except Exception as e: logger.error(f"Error in lambda_handler: {str(e)}") return { 'statusCode': 500, 'body': json.dumps(f'Error: {str(e)}') } def make_api_request(http, url, headers, retries=3): """ Make API request with exponential backoff retry logic """ for attempt in range(retries): try: response = http.request('GET', url, headers=headers) if response.status == 200: return response elif response.status == 429: # Rate limit retry_after = int(response.headers.get('Retry-After', 60)) logger.warning(f"Rate limited, waiting {retry_after} seconds") time.sleep(retry_after) else: logger.error(f"API request failed with status {response.status}") except Exception as e: logger.error(f"Request attempt {attempt + 1} failed: {str(e)}") if attempt < retries - 1: wait_time = 2 ** attempt time.sleep(wait_time) else: raise return None def fetch_cloudlock_incidents(http, api_base, api_token, updated_after=None): """ Fetch incidents data from CloudLock API using updated_after for incremental sync API Reference: https://developer.cisco.com/docs/cloud-security/ """ url = f"{api_base}/api/v2/incidents" headers = { 'Authorization': f'Bearer {api_token}', 'Content-Type': 'application/json' } params = { 'limit': 1000, 'offset': 0, 'count_total': 'false' } if updated_after: params['updated_after'] = updated_after all_data = [] latest_updated_at = updated_after try: while True: # Build URL with parameters (avoid logging sensitive data) param_string = '&'.join([f"{k}={v}" for k, v in params.items()]) full_url = f"{url}?{param_string}" logger.info(f"Fetching incidents with offset: {params['offset']}") response = make_api_request(http, full_url, headers) if not response: break data = json.loads(response.data.decode('utf-8')) batch_data = data if isinstance(data, list) else data.get('data', []) if not batch_data: break all_data.extend(batch_data) # Track latest updated_at for incremental sync for item in batch_data: if 'updated_at' in item: item_updated_at = item['updated_at'] if not latest_updated_at or item_updated_at > latest_updated_at: latest_updated_at = item_updated_at # Check pagination if len(batch_data) < params['limit']: break params['offset'] += params['limit'] logger.info(f"Fetched {len(all_data)} incidents") return all_data, latest_updated_at except Exception as e: logger.error(f"Error fetching incidents: {str(e)}") return [], updated_after def fetch_cloudlock_activities(http, api_base, api_token, from_time, to_time): """ Fetch activities data from CloudLock API using time range API Reference: https://developer.cisco.com/docs/cloud-security/ """ url = f"{api_base}/api/v2/activities" headers = { 'Authorization': f'Bearer {api_token}', 'Content-Type': 'application/json' } params = { 'limit': 1000, 'offset': 0, 'from': from_time, 'to': to_time } all_data = [] try: while True: param_string = '&'.join([f"{k}={v}" for k, v in params.items()]) full_url = f"{url}?{param_string}" logger.info(f"Fetching activities with offset: {params['offset']}") response = make_api_request(http, full_url, headers) if not response: break data = json.loads(response.data.decode('utf-8')) batch_data = data if isinstance(data, list) else data.get('data', []) if not batch_data: break all_data.extend(batch_data) if len(batch_data) < params['limit']: break params['offset'] += params['limit'] logger.info(f"Fetched {len(all_data)} activities") return all_data except Exception as e: logger.error(f"Error fetching activities: {str(e)}") return [] def fetch_cloudlock_entities(http, api_base, api_token, updated_after=None): """ Fetch entities data from CloudLock API using updated_after for incremental sync API Reference: https://developer.cisco.com/docs/cloud-security/ """ url = f"{api_base}/api/v2/entities" headers = { 'Authorization': f'Bearer {api_token}', 'Content-Type': 'application/json' } params = { 'limit': 1000, 'offset': 0 } if updated_after: params['updated_after'] = updated_after all_data = [] latest_updated_at = updated_after try: while True: param_string = '&'.join([f"{k}={v}" for k, v in params.items()]) full_url = f"{url}?{param_string}" logger.info(f"Fetching entities with offset: {params['offset']}") response = make_api_request(http, full_url, headers) if not response: break data = json.loads(response.data.decode('utf-8')) batch_data = data if isinstance(data, list) else data.get('data', []) if not batch_data: break all_data.extend(batch_data) # Track latest updated_at for incremental sync for item in batch_data: if 'updated_at' in item: item_updated_at = item['updated_at'] if not latest_updated_at or item_updated_at > latest_updated_at: latest_updated_at = item_updated_at if len(batch_data) < params['limit']: break params['offset'] += params['limit'] logger.info(f"Fetched {len(all_data)} entities") return all_data, latest_updated_at except Exception as e: logger.error(f"Error fetching entities: {str(e)}") return [], updated_after def upload_to_s3_ndjson(bucket, prefix, data_type, data): """ Upload data to S3 bucket in NDJSON format (one JSON object per line) """ timestamp = datetime.utcnow().strftime('%Y/%m/%d/%H') filename = f"{prefix}{data_type}/{timestamp}/cloudlock_{data_type}_{int(datetime.utcnow().timestamp())}.jsonl" try: # Convert to NDJSON format ndjson_content = 'n'.join([json.dumps(item, separators=(',', ':')) for item in data]) s3_client.put_object( Bucket=bucket, Key=filename, Body=ndjson_content, ContentType='application/x-ndjson' ) logger.info(f"Successfully uploaded {filename} to S3") except Exception as e: logger.error(f"Error uploading to S3: {str(e)}") raise def get_last_run_state(bucket, key): """ Get the last run state from S3 with separate tracking for each endpoint """ try: response = s3_client.get_object(Bucket=bucket, Key=key) state = json.loads(response['Body'].read().decode('utf-8')) return state except s3_client.exceptions.NoSuchKey: logger.info("No previous state found, starting fresh") return {} except Exception as e: logger.error(f"Error reading state: {str(e)}") return {} def update_last_run_state(bucket, key, state): """ Update the consolidated state in S3 """ try: s3_client.put_object( Bucket=bucket, Key=key, Body=json.dumps(state, indent=2), ContentType='application/json' ) logger.info("Updated state successfully") except Exception as e: logger.error(f"Error updating state: {str(e)}") raise
依次前往配置 > 环境变量。
依次点击修改 > 添加新的环境变量。
输入以下提供的环境变量,并将其替换为您的值。
键 示例值 S3_BUCKET
cisco-cloudlock-logs
S3_PREFIX
cloudlock/
STATE_KEY
cloudlock/state.json
CLOUDLOCK_API_TOKEN
<your-api-token>
CLOUDLOCK_API_BASE
<your-cloudlock-api-url>
创建函数后,请停留在其页面上(或依次打开 Lambda > 函数 > 您的函数)。
选择配置标签页。
在常规配置面板中,点击修改。
将超时更改为 5 分钟(300 秒),然后点击保存。
创建 EventBridge 计划
- 依次前往 Amazon EventBridge > 调度器 > 创建调度。
- 提供以下配置详细信息:
- 周期性安排:费率 (
1 hour
)。 - 目标:您的 Lambda 函数
cloudlock-data-export
。 - 名称:
cloudlock-data-export-1h
。
- 周期性安排:费率 (
- 点击创建时间表。
可选:为 Google SecOps 创建只读 IAM 用户和密钥
- 依次前往 AWS 控制台 > IAM > 用户 > 添加用户。
- 点击 Add users(添加用户)。
- 提供以下配置详细信息:
- 用户:输入
secops-reader
。 - 访问类型:选择访问密钥 - 以程序化方式访问。
- 用户:输入
- 点击创建用户。
- 附加最低限度的读取政策(自定义):依次选择用户 > secops-reader > 权限 > 添加权限 > 直接附加政策 > 创建政策。
在 JSON 编辑器中,输入以下政策:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::cisco-cloudlock-logs/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::cisco-cloudlock-logs" } ] }
将名称设置为
secops-reader-policy
。依次前往创建政策 > 搜索/选择 > 下一步 > 添加权限。
依次前往安全凭据 > 访问密钥 > 创建访问密钥。
下载 CSV(这些值会输入到 Feed 中)。
在 Google SecOps 中配置 Feed 以注入 Cisco CloudLock 日志
- 依次前往 SIEM 设置> Feed。
- 点击 + 添加新 Feed。
- 在Feed 名称字段中,输入 Feed 的名称(例如
Cisco CloudLock logs
)。 - 选择 Amazon S3 V2 作为来源类型。
- 选择 Cisco CloudLock 作为日志类型。
- 点击下一步。
- 为以下输入参数指定值:
- S3 URI:
s3://cisco-cloudlock-logs/cloudlock/
- 来源删除选项:根据您的偏好选择删除选项。
- 文件存在时间上限:包含在过去指定天数内修改的文件。默认值为 180 天。
- 访问密钥 ID:有权访问 S3 存储桶的用户访问密钥。
- 私有访问密钥:具有 S3 存储桶访问权限的用户私有密钥。
- 资产命名空间:资产命名空间。
- 注入标签:应用于此 Feed 中事件的标签。
- S3 URI:
- 点击下一步。
- 在最终确定界面中查看新的 Feed 配置,然后点击提交。
UDM 映射表
日志字段 | UDM 映射 | 逻辑 |
---|---|---|
created_at |
about.resource.attribute.labels.key |
created_at 字段的值会分配给标签键。 |
created_at |
about.resource.attribute.labels.value |
created_at 字段的值会分配给标签值。 |
created_at |
about.resource.attribute.creation_time |
created_at 字段被解析为时间戳并进行映射。 |
entity.id |
target.asset.product_object_id |
entity.id 字段已重命名。 |
entity.ip |
target.ip |
entity.ip 字段已合并到目标 IP 字段中。 |
entity.mime_type |
target.file.mime_type |
当 entity.origin_type 为“document”时,entity.mime_type 字段会被重命名。 |
entity.name |
target.application |
当 entity.origin_type 为“应用”时,entity.name 字段会更名。 |
entity.name |
target.file.full_path |
当 entity.origin_type 为“document”时,entity.name 字段会被重命名。 |
entity.origin_id |
target.resource.product_object_id |
entity.origin_id 字段已重命名。 |
entity.origin_type |
target.resource.resource_subtype |
entity.origin_type 字段已重命名。 |
entity.owner_email |
target.user.email_addresses |
如果 entity.owner_email 字段与电子邮件正则表达式匹配,则会将其合并到目标用户电子邮件字段中。 |
entity.owner_email |
target.user.user_display_name |
如果 entity.owner_email 字段与电子邮件正则表达式不匹配,则会重命名该字段。 |
entity.owner_name |
target.user.user_display_name |
当 entity.owner_email 与电子邮件正则表达式匹配时,系统会重命名 entity.owner_name 字段。 |
entity.vendor.name |
target.platform_version |
entity.vendor.name 字段已重命名。 |
id |
metadata.product_log_id |
id 字段已重命名。 |
incident_status |
metadata.product_event_type |
incident_status 字段已重命名。值已硬编码为“updated_at”。值派生自 updated_at 字段。updated_at 字段被解析为时间戳并进行映射。如果 severity 为“ALERT”且 incident_status 为“NEW”,则设置为“true”。转换为布尔值。如果 severity 为“ALERT”且 incident_status 为“NEW”,则设置为“true”。转换为布尔值。值硬编码为“GENERIC_EVENT”。该值硬编码为“CISCO_CLOUDLOCK_CASB”。值硬编码为“CloudLock”。值硬编码为“Cisco”。如果 severity 为“ALERT”,且 incident_status 不是“RESOLVED”或“DISMISSED”,则设置为“ALERTING”。如果 severity 为“ALERT”,且 incident_status 为“RESOLVED”或“DISMISSED”,则设置为“NOT_ALERTING”。派生自 matches 数组,具体来说是每个匹配对象的键。派生自 matches 数组,具体来说是每个匹配对象的值。派生自 policy.id 。派生自 policy.name 。如果 severity 为“INFO”,则设置为“INFORMATIONAL”。如果 severity 为“CRITICAL”,则设置为“CRITICAL”。派生自 severity 。该值设置为“匹配次数:”与 match_count 的值串联而成。当 entity.origin_type 为“document”时,设置为“STORAGE_OBJECT”。当 entity.origin_type 为“document”时,派生自 entity.direct_url 。 |
policy.id |
security_result.rule_id |
policy.id 字段已重命名。 |
policy.name |
security_result.rule_name |
policy.name 字段已重命名。 |
severity |
security_result.severity_details |
severity 字段已重命名。 |
updated_at |
about.resource.attribute.labels.key |
updated_at 字段的值会分配给标签键。 |
updated_at |
about.resource.attribute.labels.value |
updated_at 字段的值会分配给标签值。 |
updated_at |
about.resource.attribute.last_update_time |
updated_at 字段被解析为时间戳并进行映射。 |
需要更多帮助?从社区成员和 Google SecOps 专业人士那里获得解答。