Collect Cisco CloudLock CASB logs
This document explains how to ingest Cisco CloudLock CASB logs to Google Security Operations using Amazon S3. The parser extracts fields from the JSON logs, transforms and maps them to the Unified Data Model (UDM). It handles date parsing, converts specific fields to strings, maps fields to UDM entities (metadata, target, security result, about), and iterates through matches
to extract detection fields, ultimately merging all extracted data into the @output
field.
Before you begin
- A Google SecOps instance
- Privileged access to Cisco CloudLock CASB tenant
- Privileged access to AWS (S3, IAM, Lambda, EventBridge)
Get Cisco CloudLock prerequisites
- Sign in to the Cisco CloudLock CASB Admin Console.
- Go to Settings.
- Click the Authentication & API tab.
- Under API, click Generate to create your access token.
- Copy and save in a secure location the following details:
- API Access Token
- CloudLock API Server URL (contact Cloudlock Support for your organization-specific URL)
Configure AWS S3 bucket and IAM for Google SecOps
- Create Amazon S3 bucket following this user guide: Creating a bucket
- Save bucket Name and Region for future reference (for example,
cisco-cloudlock-logs
). - Create a User following this user guide: Creating an IAM user.
- Select the created User.
- Select Security credentials tab.
- Click Create Access Key in section Access Keys.
- Select Third-party service as Use case.
- Click Next.
- Optional: Add a description tag.
- Click Create access key.
- Click Download CSV file to save the Access Key and Secret Access Key for future reference.
- Click Done.
- Select Permissions tab.
- Click Add permissions in section Permissions policies.
- Select Add permissions.
- Select Attach policies directly.
- Search for AmazonS3FullAccess policy.
- Select the policy.
- Click Next.
- Click Add permissions.
Configure the IAM policy and role for S3 uploads
- In the AWS console, go to IAM > Policies.
- Click Create policy > JSON tab.
Enter the following policy:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::cisco-cloudlock-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::cisco-cloudlock-logs/cloudlock/state.json" } ] }
- Replace
cisco-cloudlock-logs
if you entered a different bucket name.
- Replace
Click Next > Create policy.
Go to IAM > Roles > Create role > AWS service > Lambda.
Attach the newly created policy.
Name the role
cloudlock-lambda-role
and click Create role.
Create the Lambda function
- In the AWS Console, go to Lambda > Functions > Create function.
- Click Author from scratch.
Provide the following configuration details:
Setting Value Name cloudlock-data-export
Runtime Python 3.12 (latest supported) Architecture x86_64 Execution role cloudlock-lambda-role
After the function is created, open the Code tab, delete the stub and enter the following code (
cloudlock-data-export.py
):import json import boto3 import urllib3 import os from datetime import datetime, timedelta import logging import time # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Initialize S3 client s3_client = boto3.client('s3') def lambda_handler(event, context): """ Lambda function to fetch Cisco CloudLock CASB data and store in S3 """ # Environment variables s3_bucket = os.environ['S3_BUCKET'] s3_prefix = os.environ['S3_PREFIX'] state_key = os.environ['STATE_KEY'] api_token = os.environ['CLOUDLOCK_API_TOKEN'] api_base = os.environ['CLOUDLOCK_API_BASE'] # HTTP client http = urllib3.PoolManager() try: # Get last run state for all endpoints state = get_last_run_state(s3_bucket, state_key) # Fetch incidents data (using updated_after for incremental sync) incidents_updated_after = state.get('incidents_updated_after') incidents, new_incidents_state = fetch_cloudlock_incidents( http, api_base, api_token, incidents_updated_after ) if incidents: upload_to_s3_ndjson(s3_bucket, s3_prefix, 'incidents', incidents) logger.info(f"Uploaded {len(incidents)} incidents to S3") state['incidents_updated_after'] = new_incidents_state # Fetch activities data (using from/to time range) activities_from = state.get('activities_from') if not activities_from: activities_from = (datetime.utcnow() - timedelta(hours=24)).isoformat() activities_to = datetime.utcnow().isoformat() activities = fetch_cloudlock_activities( http, api_base, api_token, activities_from, activities_to ) if activities: upload_to_s3_ndjson(s3_bucket, s3_prefix, 'activities', activities) logger.info(f"Uploaded {len(activities)} activities to S3") state['activities_from'] = activities_to # Fetch entities data (using updated_after for incremental sync) entities_updated_after = state.get('entities_updated_after') entities, new_entities_state = fetch_cloudlock_entities( http, api_base, api_token, entities_updated_after ) if entities: upload_to_s3_ndjson(s3_bucket, s3_prefix, 'entities', entities) logger.info(f"Uploaded {len(entities)} entities to S3") state['entities_updated_after'] = new_entities_state # Update consolidated state state['updated_at'] = datetime.utcnow().isoformat() update_last_run_state(s3_bucket, state_key, state) return { 'statusCode': 200, 'body': json.dumps('CloudLock data export completed successfully') } except Exception as e: logger.error(f"Error in lambda_handler: {str(e)}") return { 'statusCode': 500, 'body': json.dumps(f'Error: {str(e)}') } def make_api_request(http, url, headers, retries=3): """ Make API request with exponential backoff retry logic """ for attempt in range(retries): try: response = http.request('GET', url, headers=headers) if response.status == 200: return response elif response.status == 429: # Rate limit retry_after = int(response.headers.get('Retry-After', 60)) logger.warning(f"Rate limited, waiting {retry_after} seconds") time.sleep(retry_after) else: logger.error(f"API request failed with status {response.status}") except Exception as e: logger.error(f"Request attempt {attempt + 1} failed: {str(e)}") if attempt < retries - 1: wait_time = 2 ** attempt time.sleep(wait_time) else: raise return None def fetch_cloudlock_incidents(http, api_base, api_token, updated_after=None): """ Fetch incidents data from CloudLock API using updated_after for incremental sync API Reference: https://developer.cisco.com/docs/cloud-security/ """ url = f"{api_base}/api/v2/incidents" headers = { 'Authorization': f'Bearer {api_token}', 'Content-Type': 'application/json' } params = { 'limit': 1000, 'offset': 0, 'count_total': 'false' } if updated_after: params['updated_after'] = updated_after all_data = [] latest_updated_at = updated_after try: while True: # Build URL with parameters (avoid logging sensitive data) param_string = '&'.join([f"{k}={v}" for k, v in params.items()]) full_url = f"{url}?{param_string}" logger.info(f"Fetching incidents with offset: {params['offset']}") response = make_api_request(http, full_url, headers) if not response: break data = json.loads(response.data.decode('utf-8')) batch_data = data if isinstance(data, list) else data.get('data', []) if not batch_data: break all_data.extend(batch_data) # Track latest updated_at for incremental sync for item in batch_data: if 'updated_at' in item: item_updated_at = item['updated_at'] if not latest_updated_at or item_updated_at > latest_updated_at: latest_updated_at = item_updated_at # Check pagination if len(batch_data) < params['limit']: break params['offset'] += params['limit'] logger.info(f"Fetched {len(all_data)} incidents") return all_data, latest_updated_at except Exception as e: logger.error(f"Error fetching incidents: {str(e)}") return [], updated_after def fetch_cloudlock_activities(http, api_base, api_token, from_time, to_time): """ Fetch activities data from CloudLock API using time range API Reference: https://developer.cisco.com/docs/cloud-security/ """ url = f"{api_base}/api/v2/activities" headers = { 'Authorization': f'Bearer {api_token}', 'Content-Type': 'application/json' } params = { 'limit': 1000, 'offset': 0, 'from': from_time, 'to': to_time } all_data = [] try: while True: param_string = '&'.join([f"{k}={v}" for k, v in params.items()]) full_url = f"{url}?{param_string}" logger.info(f"Fetching activities with offset: {params['offset']}") response = make_api_request(http, full_url, headers) if not response: break data = json.loads(response.data.decode('utf-8')) batch_data = data if isinstance(data, list) else data.get('data', []) if not batch_data: break all_data.extend(batch_data) if len(batch_data) < params['limit']: break params['offset'] += params['limit'] logger.info(f"Fetched {len(all_data)} activities") return all_data except Exception as e: logger.error(f"Error fetching activities: {str(e)}") return [] def fetch_cloudlock_entities(http, api_base, api_token, updated_after=None): """ Fetch entities data from CloudLock API using updated_after for incremental sync API Reference: https://developer.cisco.com/docs/cloud-security/ """ url = f"{api_base}/api/v2/entities" headers = { 'Authorization': f'Bearer {api_token}', 'Content-Type': 'application/json' } params = { 'limit': 1000, 'offset': 0 } if updated_after: params['updated_after'] = updated_after all_data = [] latest_updated_at = updated_after try: while True: param_string = '&'.join([f"{k}={v}" for k, v in params.items()]) full_url = f"{url}?{param_string}" logger.info(f"Fetching entities with offset: {params['offset']}") response = make_api_request(http, full_url, headers) if not response: break data = json.loads(response.data.decode('utf-8')) batch_data = data if isinstance(data, list) else data.get('data', []) if not batch_data: break all_data.extend(batch_data) # Track latest updated_at for incremental sync for item in batch_data: if 'updated_at' in item: item_updated_at = item['updated_at'] if not latest_updated_at or item_updated_at > latest_updated_at: latest_updated_at = item_updated_at if len(batch_data) < params['limit']: break params['offset'] += params['limit'] logger.info(f"Fetched {len(all_data)} entities") return all_data, latest_updated_at except Exception as e: logger.error(f"Error fetching entities: {str(e)}") return [], updated_after def upload_to_s3_ndjson(bucket, prefix, data_type, data): """ Upload data to S3 bucket in NDJSON format (one JSON object per line) """ timestamp = datetime.utcnow().strftime('%Y/%m/%d/%H') filename = f"{prefix}{data_type}/{timestamp}/cloudlock_{data_type}_{int(datetime.utcnow().timestamp())}.jsonl" try: # Convert to NDJSON format ndjson_content = 'n'.join([json.dumps(item, separators=(',', ':')) for item in data]) s3_client.put_object( Bucket=bucket, Key=filename, Body=ndjson_content, ContentType='application/x-ndjson' ) logger.info(f"Successfully uploaded {filename} to S3") except Exception as e: logger.error(f"Error uploading to S3: {str(e)}") raise def get_last_run_state(bucket, key): """ Get the last run state from S3 with separate tracking for each endpoint """ try: response = s3_client.get_object(Bucket=bucket, Key=key) state = json.loads(response['Body'].read().decode('utf-8')) return state except s3_client.exceptions.NoSuchKey: logger.info("No previous state found, starting fresh") return {} except Exception as e: logger.error(f"Error reading state: {str(e)}") return {} def update_last_run_state(bucket, key, state): """ Update the consolidated state in S3 """ try: s3_client.put_object( Bucket=bucket, Key=key, Body=json.dumps(state, indent=2), ContentType='application/json' ) logger.info("Updated state successfully") except Exception as e: logger.error(f"Error updating state: {str(e)}") raise
Go to Configuration > Environment variables.
Click Edit > Add new environment variable.
Enter the following environment variables provided, replacing with your values.
Key Example value S3_BUCKET
cisco-cloudlock-logs
S3_PREFIX
cloudlock/
STATE_KEY
cloudlock/state.json
CLOUDLOCK_API_TOKEN
<your-api-token>
CLOUDLOCK_API_BASE
<your-cloudlock-api-url>
After the function is created, stay on its page (or open Lambda > Functions > your-function).
Select the Configuration tab.
In the General configuration panel click Edit.
Change Timeout to 5 minutes (300 seconds) and click Save.
Create an EventBridge schedule
- Go to Amazon EventBridge > Scheduler > Create schedule.
- Provide the following configuration details:
- Recurring schedule: Rate (
1 hour
). - Target: your Lambda function
cloudlock-data-export
. - Name:
cloudlock-data-export-1h
.
- Recurring schedule: Rate (
- Click Create schedule.
Optional: Create read-only IAM user & keys for Google SecOps
- Go to AWS Console > IAM > Users > Add users.
- Click Add users.
- Provide the following configuration details:
- User: Enter
secops-reader
. - Access type: Select Access key – Programmatic access.
- User: Enter
- Click Create user.
- Attach minimal read policy (custom): Users > secops-reader > Permissions > Add permissions > Attach policies directly > Create policy.
In the JSON editor, enter the following policy:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::cisco-cloudlock-logs/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::cisco-cloudlock-logs" } ] }
Set the name to
secops-reader-policy
.Go to Create policy > search/select > Next > Add permissions.
Go to Security credentials > Access keys > Create access key.
Download the CSV (these values are entered into the feed).
Configure a feed in Google SecOps to ingest Cisco CloudLock logs
- Go to SIEM Settings > Feeds.
- Click + Add New Feed.
- In the Feed name field, enter a name for the feed (for example,
Cisco CloudLock logs
). - Select Amazon S3 V2 as the Source type.
- Select Cisco CloudLock as the Log type.
- Click Next.
- Specify values for the following input parameters:
- S3 URI:
s3://cisco-cloudlock-logs/cloudlock/
- Source deletion options: Select deletion option according to your preference.
- Maximum File Age: Include files modified in the last number of days. Default is 180 days.
- Access Key ID: User access key with access to the S3 bucket.
- Secret Access Key: User secret key with access to the S3 bucket.
- Asset namespace: The asset namespace.
- Ingestion labels: The label applied to the events from this feed.
- S3 URI:
- Click Next.
- Review your new feed configuration in the Finalize screen, and then click Submit.
UDM Mapping Table
Log Field | UDM Mapping | Logic |
---|---|---|
created_at |
about.resource.attribute.labels.key |
The created_at field's value is assigned to the labels key. |
created_at |
about.resource.attribute.labels.value |
The created_at field's value is assigned to the labels value. |
created_at |
about.resource.attribute.creation_time |
The created_at field is parsed as a timestamp and mapped. |
entity.id |
target.asset.product_object_id |
The entity.id field is renamed. |
entity.ip |
target.ip |
The entity.ip field is merged into the target IP field. |
entity.mime_type |
target.file.mime_type |
The entity.mime_type field is renamed when entity.origin_type is "document". |
entity.name |
target.application |
The entity.name field is renamed when entity.origin_type is "app". |
entity.name |
target.file.full_path |
The entity.name field is renamed when entity.origin_type is "document". |
entity.origin_id |
target.resource.product_object_id |
The entity.origin_id field is renamed. |
entity.origin_type |
target.resource.resource_subtype |
The entity.origin_type field is renamed. |
entity.owner_email |
target.user.email_addresses |
The entity.owner_email field is merged into the target user email field if it matches an email regex. |
entity.owner_email |
target.user.user_display_name |
The entity.owner_email field is renamed if it does not match an email regex. |
entity.owner_name |
target.user.user_display_name |
The entity.owner_name field is renamed when entity.owner_email matches an email regex. |
entity.vendor.name |
target.platform_version |
The entity.vendor.name field is renamed. |
id |
metadata.product_log_id |
The id field is renamed. |
incident_status |
metadata.product_event_type |
The incident_status field is renamed. Value is hardcoded to "updated_at". Value is derived from the updated_at field. The updated_at field is parsed as a timestamp and mapped. Set to "true" if severity is "ALERT" and incident_status is "NEW". Converted to boolean. Set to "true" if severity is "ALERT" and incident_status is "NEW". Converted to boolean. Value is hardcoded to "GENERIC_EVENT". Value is hardcoded to "CISCO_CLOUDLOCK_CASB". Value is hardcoded to "CloudLock". Value is hardcoded to "Cisco". Set to "ALERTING" if severity is "ALERT" and incident_status is not "RESOLVED" or "DISMISSED". Set to "NOT_ALERTING" if severity is "ALERT" and incident_status is "RESOLVED" or "DISMISSED". Derived from the matches array, specifically the key of each match object. Derived from the matches array, specifically the value of each match object. Derived from policy.id . Derived from policy.name . Set to "INFORMATIONAL" if severity is "INFO". Set to "CRITICAL" if severity is "CRITICAL". Derived from severity . The value is set to "match count: " concatenated with the value of match_count . Set to "STORAGE_OBJECT" when entity.origin_type is "document". Derived from entity.direct_url when entity.origin_type is "document". |
policy.id |
security_result.rule_id |
The policy.id field is renamed. |
policy.name |
security_result.rule_name |
The policy.name field is renamed. |
severity |
security_result.severity_details |
The severity field is renamed. |
updated_at |
about.resource.attribute.labels.key |
The updated_at field's value is assigned to the labels key. |
updated_at |
about.resource.attribute.labels.value |
The updated_at field's value is assigned to the labels value. |
updated_at |
about.resource.attribute.last_update_time |
The updated_at field is parsed as a timestamp and mapped. |
Need more help? Get answers from Community members and Google SecOps professionals.