Collect Cisco CloudLock CASB logs

Supported in:

This document explains how to ingest Cisco CloudLock CASB logs to Google Security Operations using Amazon S3. The parser extracts fields from the JSON logs, transforms and maps them to the Unified Data Model (UDM). It handles date parsing, converts specific fields to strings, maps fields to UDM entities (metadata, target, security result, about), and iterates through matches to extract detection fields, ultimately merging all extracted data into the @output field.

Before you begin

  • A Google SecOps instance
  • Privileged access to Cisco CloudLock CASB tenant
  • Privileged access to AWS (S3, IAM, Lambda, EventBridge)

Get Cisco CloudLock prerequisites

  1. Sign in to the Cisco CloudLock CASB Admin Console.
  2. Go to Settings.
  3. Click the Authentication & API tab.
  4. Under API, click Generate to create your access token.
  5. Copy and save in a secure location the following details:
    • API Access Token
    • CloudLock API Server URL (contact Cloudlock Support for your organization-specific URL)

Configure AWS S3 bucket and IAM for Google SecOps

  1. Create Amazon S3 bucket following this user guide: Creating a bucket
  2. Save bucket Name and Region for future reference (for example, cisco-cloudlock-logs).
  3. Create a User following this user guide: Creating an IAM user.
  4. Select the created User.
  5. Select Security credentials tab.
  6. Click Create Access Key in section Access Keys.
  7. Select Third-party service as Use case.
  8. Click Next.
  9. Optional: Add a description tag.
  10. Click Create access key.
  11. Click Download CSV file to save the Access Key and Secret Access Key for future reference.
  12. Click Done.
  13. Select Permissions tab.
  14. Click Add permissions in section Permissions policies.
  15. Select Add permissions.
  16. Select Attach policies directly.
  17. Search for AmazonS3FullAccess policy.
  18. Select the policy.
  19. Click Next.
  20. Click Add permissions.

Configure the IAM policy and role for S3 uploads

  1. In the AWS console, go to IAM > Policies.
  2. Click Create policy > JSON tab.
  3. Enter the following policy:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::cisco-cloudlock-logs/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": "s3:GetObject",
          "Resource": "arn:aws:s3:::cisco-cloudlock-logs/cloudlock/state.json"
        }
      ]
    }
    
    • Replace cisco-cloudlock-logs if you entered a different bucket name.
  4. Click Next > Create policy.

  5. Go to IAM > Roles > Create role > AWS service > Lambda.

  6. Attach the newly created policy.

  7. Name the role cloudlock-lambda-role and click Create role.

Create the Lambda function

  1. In the AWS Console, go to Lambda > Functions > Create function.
  2. Click Author from scratch.
  3. Provide the following configuration details:

    Setting Value
    Name cloudlock-data-export
    Runtime Python 3.12 (latest supported)
    Architecture x86_64
    Execution role cloudlock-lambda-role
  4. After the function is created, open the Code tab, delete the stub and enter the following code (cloudlock-data-export.py):

    import json
    import boto3
    import urllib3
    import os
    from datetime import datetime, timedelta
    import logging
    import time
    
    # Configure logging
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)
    
    # Initialize S3 client
    s3_client = boto3.client('s3')
    
    def lambda_handler(event, context):
        """
        Lambda function to fetch Cisco CloudLock CASB data and store in S3
        """
    
        # Environment variables
        s3_bucket = os.environ['S3_BUCKET']
        s3_prefix = os.environ['S3_PREFIX']
        state_key = os.environ['STATE_KEY']
        api_token = os.environ['CLOUDLOCK_API_TOKEN']
        api_base = os.environ['CLOUDLOCK_API_BASE']
    
        # HTTP client
        http = urllib3.PoolManager()
    
        try:
            # Get last run state for all endpoints
            state = get_last_run_state(s3_bucket, state_key)
    
            # Fetch incidents data (using updated_after for incremental sync)
            incidents_updated_after = state.get('incidents_updated_after')
            incidents, new_incidents_state = fetch_cloudlock_incidents(
                http, api_base, api_token, incidents_updated_after
            )
            if incidents:
                upload_to_s3_ndjson(s3_bucket, s3_prefix, 'incidents', incidents)
                logger.info(f"Uploaded {len(incidents)} incidents to S3")
                state['incidents_updated_after'] = new_incidents_state
    
            # Fetch activities data (using from/to time range)
            activities_from = state.get('activities_from')
            if not activities_from:
                activities_from = (datetime.utcnow() - timedelta(hours=24)).isoformat()
    
            activities_to = datetime.utcnow().isoformat()
            activities = fetch_cloudlock_activities(
                http, api_base, api_token, activities_from, activities_to
            )
            if activities:
                upload_to_s3_ndjson(s3_bucket, s3_prefix, 'activities', activities)
                logger.info(f"Uploaded {len(activities)} activities to S3")
                state['activities_from'] = activities_to
    
            # Fetch entities data (using updated_after for incremental sync)
            entities_updated_after = state.get('entities_updated_after')
            entities, new_entities_state = fetch_cloudlock_entities(
                http, api_base, api_token, entities_updated_after
            )
            if entities:
                upload_to_s3_ndjson(s3_bucket, s3_prefix, 'entities', entities)
                logger.info(f"Uploaded {len(entities)} entities to S3")
                state['entities_updated_after'] = new_entities_state
    
            # Update consolidated state
            state['updated_at'] = datetime.utcnow().isoformat()
            update_last_run_state(s3_bucket, state_key, state)
    
            return {
                'statusCode': 200,
                'body': json.dumps('CloudLock data export completed successfully')
            }
    
        except Exception as e:
            logger.error(f"Error in lambda_handler: {str(e)}")
            return {
                'statusCode': 500,
                'body': json.dumps(f'Error: {str(e)}')
            }
    
    def make_api_request(http, url, headers, retries=3):
        """
        Make API request with exponential backoff retry logic
        """
        for attempt in range(retries):
            try:
                response = http.request('GET', url, headers=headers)
    
                if response.status == 200:
                    return response
                elif response.status == 429:  # Rate limit
                    retry_after = int(response.headers.get('Retry-After', 60))
                    logger.warning(f"Rate limited, waiting {retry_after} seconds")
                    time.sleep(retry_after)
                else:
                    logger.error(f"API request failed with status {response.status}")
    
            except Exception as e:
                logger.error(f"Request attempt {attempt + 1} failed: {str(e)}")
                if attempt < retries - 1:
                    wait_time = 2 ** attempt
                    time.sleep(wait_time)
                else:
                    raise
    
        return None
    
    def fetch_cloudlock_incidents(http, api_base, api_token, updated_after=None):
        """
        Fetch incidents data from CloudLock API using updated_after for incremental sync
        API Reference: https://developer.cisco.com/docs/cloud-security/
        """
        url = f"{api_base}/api/v2/incidents"
        headers = {
            'Authorization': f'Bearer {api_token}',
            'Content-Type': 'application/json'
        }
    
        params = {
            'limit': 1000,
            'offset': 0,
            'count_total': 'false'
        }
    
        if updated_after:
            params['updated_after'] = updated_after
    
        all_data = []
        latest_updated_at = updated_after
    
        try:
            while True:
                # Build URL with parameters (avoid logging sensitive data)
                param_string = '&'.join([f"{k}={v}" for k, v in params.items()])
                full_url = f"{url}?{param_string}"
    
                logger.info(f"Fetching incidents with offset: {params['offset']}")
    
                response = make_api_request(http, full_url, headers)
                if not response:
                    break
    
                data = json.loads(response.data.decode('utf-8'))
                batch_data = data if isinstance(data, list) else data.get('data', [])
    
                if not batch_data:
                    break
    
                all_data.extend(batch_data)
    
                # Track latest updated_at for incremental sync
                for item in batch_data:
                    if 'updated_at' in item:
                        item_updated_at = item['updated_at']
                        if not latest_updated_at or item_updated_at > latest_updated_at:
                            latest_updated_at = item_updated_at
    
                # Check pagination
                if len(batch_data) < params['limit']:
                    break
    
                params['offset'] += params['limit']
    
            logger.info(f"Fetched {len(all_data)} incidents")
            return all_data, latest_updated_at
    
        except Exception as e:
            logger.error(f"Error fetching incidents: {str(e)}")
            return [], updated_after
    
    def fetch_cloudlock_activities(http, api_base, api_token, from_time, to_time):
        """
        Fetch activities data from CloudLock API using time range
        API Reference: https://developer.cisco.com/docs/cloud-security/
        """
        url = f"{api_base}/api/v2/activities"
        headers = {
            'Authorization': f'Bearer {api_token}',
            'Content-Type': 'application/json'
        }
    
        params = {
            'limit': 1000,
            'offset': 0,
            'from': from_time,
            'to': to_time
        }
    
        all_data = []
    
        try:
            while True:
                param_string = '&'.join([f"{k}={v}" for k, v in params.items()])
                full_url = f"{url}?{param_string}"
    
                logger.info(f"Fetching activities with offset: {params['offset']}")
    
                response = make_api_request(http, full_url, headers)
                if not response:
                    break
    
                data = json.loads(response.data.decode('utf-8'))
                batch_data = data if isinstance(data, list) else data.get('data', [])
    
                if not batch_data:
                    break
    
                all_data.extend(batch_data)
    
                if len(batch_data) < params['limit']:
                    break
    
                params['offset'] += params['limit']
    
            logger.info(f"Fetched {len(all_data)} activities")
            return all_data
    
        except Exception as e:
            logger.error(f"Error fetching activities: {str(e)}")
            return []
    
    def fetch_cloudlock_entities(http, api_base, api_token, updated_after=None):
        """
        Fetch entities data from CloudLock API using updated_after for incremental sync
        API Reference: https://developer.cisco.com/docs/cloud-security/
        """
        url = f"{api_base}/api/v2/entities"
        headers = {
            'Authorization': f'Bearer {api_token}',
            'Content-Type': 'application/json'
        }
    
        params = {
            'limit': 1000,
            'offset': 0
        }
    
        if updated_after:
            params['updated_after'] = updated_after
    
        all_data = []
        latest_updated_at = updated_after
    
        try:
            while True:
                param_string = '&'.join([f"{k}={v}" for k, v in params.items()])
                full_url = f"{url}?{param_string}"
    
                logger.info(f"Fetching entities with offset: {params['offset']}")
    
                response = make_api_request(http, full_url, headers)
                if not response:
                    break
    
                data = json.loads(response.data.decode('utf-8'))
                batch_data = data if isinstance(data, list) else data.get('data', [])
    
                if not batch_data:
                    break
    
                all_data.extend(batch_data)
    
                # Track latest updated_at for incremental sync
                for item in batch_data:
                    if 'updated_at' in item:
                        item_updated_at = item['updated_at']
                        if not latest_updated_at or item_updated_at > latest_updated_at:
                            latest_updated_at = item_updated_at
    
                if len(batch_data) < params['limit']:
                    break
    
                params['offset'] += params['limit']
    
            logger.info(f"Fetched {len(all_data)} entities")
            return all_data, latest_updated_at
    
        except Exception as e:
            logger.error(f"Error fetching entities: {str(e)}")
            return [], updated_after
    
    def upload_to_s3_ndjson(bucket, prefix, data_type, data):
        """
        Upload data to S3 bucket in NDJSON format (one JSON object per line)
        """
        timestamp = datetime.utcnow().strftime('%Y/%m/%d/%H')
        filename = f"{prefix}{data_type}/{timestamp}/cloudlock_{data_type}_{int(datetime.utcnow().timestamp())}.jsonl"
    
        try:
            # Convert to NDJSON format
            ndjson_content = 'n'.join([json.dumps(item, separators=(',', ':')) for item in data])
    
            s3_client.put_object(
                Bucket=bucket,
                Key=filename,
                Body=ndjson_content,
                ContentType='application/x-ndjson'
            )
            logger.info(f"Successfully uploaded {filename} to S3")
        except Exception as e:
            logger.error(f"Error uploading to S3: {str(e)}")
            raise
    
    def get_last_run_state(bucket, key):
        """
        Get the last run state from S3 with separate tracking for each endpoint
        """
        try:
            response = s3_client.get_object(Bucket=bucket, Key=key)
            state = json.loads(response['Body'].read().decode('utf-8'))
            return state
        except s3_client.exceptions.NoSuchKey:
            logger.info("No previous state found, starting fresh")
            return {}
        except Exception as e:
            logger.error(f"Error reading state: {str(e)}")
            return {}
    
    def update_last_run_state(bucket, key, state):
        """
        Update the consolidated state in S3
        """
        try:
            s3_client.put_object(
                Bucket=bucket,
                Key=key,
                Body=json.dumps(state, indent=2),
                ContentType='application/json'
            )
            logger.info("Updated state successfully")
        except Exception as e:
            logger.error(f"Error updating state: {str(e)}")
            raise
    
  5. Go to Configuration > Environment variables.

  6. Click Edit > Add new environment variable.

  7. Enter the following environment variables provided, replacing with your values.

    Key Example value
    S3_BUCKET cisco-cloudlock-logs
    S3_PREFIX cloudlock/
    STATE_KEY cloudlock/state.json
    CLOUDLOCK_API_TOKEN <your-api-token>
    CLOUDLOCK_API_BASE <your-cloudlock-api-url>
  8. After the function is created, stay on its page (or open Lambda > Functions > your-function).

  9. Select the Configuration tab.

  10. In the General configuration panel click Edit.

  11. Change Timeout to 5 minutes (300 seconds) and click Save.

Create an EventBridge schedule

  1. Go to Amazon EventBridge > Scheduler > Create schedule.
  2. Provide the following configuration details:
    • Recurring schedule: Rate (1 hour).
    • Target: your Lambda function cloudlock-data-export.
    • Name: cloudlock-data-export-1h.
  3. Click Create schedule.

Optional: Create read-only IAM user & keys for Google SecOps

  1. Go to AWS Console > IAM > Users > Add users.
  2. Click Add users.
  3. Provide the following configuration details:
    • User: Enter secops-reader.
    • Access type: Select Access key – Programmatic access.
  4. Click Create user.
  5. Attach minimal read policy (custom): Users > secops-reader > Permissions > Add permissions > Attach policies directly > Create policy.
  6. In the JSON editor, enter the following policy:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::cisco-cloudlock-logs/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::cisco-cloudlock-logs"
        }
      ]
    }
    
  7. Set the name to secops-reader-policy.

  8. Go to Create policy > search/select > Next > Add permissions.

  9. Go to Security credentials > Access keys > Create access key.

  10. Download the CSV (these values are entered into the feed).

Configure a feed in Google SecOps to ingest Cisco CloudLock logs

  1. Go to SIEM Settings > Feeds.
  2. Click + Add New Feed.
  3. In the Feed name field, enter a name for the feed (for example, Cisco CloudLock logs).
  4. Select Amazon S3 V2 as the Source type.
  5. Select Cisco CloudLock as the Log type.
  6. Click Next.
  7. Specify values for the following input parameters:
    • S3 URI: s3://cisco-cloudlock-logs/cloudlock/
    • Source deletion options: Select deletion option according to your preference.
    • Maximum File Age: Include files modified in the last number of days. Default is 180 days.
    • Access Key ID: User access key with access to the S3 bucket.
    • Secret Access Key: User secret key with access to the S3 bucket.
    • Asset namespace: The asset namespace.
    • Ingestion labels: The label applied to the events from this feed.
  8. Click Next.
  9. Review your new feed configuration in the Finalize screen, and then click Submit.

UDM Mapping Table

Log Field UDM Mapping Logic
created_at about.resource.attribute.labels.key The created_at field's value is assigned to the labels key.
created_at about.resource.attribute.labels.value The created_at field's value is assigned to the labels value.
created_at about.resource.attribute.creation_time The created_at field is parsed as a timestamp and mapped.
entity.id target.asset.product_object_id The entity.id field is renamed.
entity.ip target.ip The entity.ip field is merged into the target IP field.
entity.mime_type target.file.mime_type The entity.mime_type field is renamed when entity.origin_type is "document".
entity.name target.application The entity.name field is renamed when entity.origin_type is "app".
entity.name target.file.full_path The entity.name field is renamed when entity.origin_type is "document".
entity.origin_id target.resource.product_object_id The entity.origin_id field is renamed.
entity.origin_type target.resource.resource_subtype The entity.origin_type field is renamed.
entity.owner_email target.user.email_addresses The entity.owner_email field is merged into the target user email field if it matches an email regex.
entity.owner_email target.user.user_display_name The entity.owner_email field is renamed if it does not match an email regex.
entity.owner_name target.user.user_display_name The entity.owner_name field is renamed when entity.owner_email matches an email regex.
entity.vendor.name target.platform_version The entity.vendor.name field is renamed.
id metadata.product_log_id The id field is renamed.
incident_status metadata.product_event_type The incident_status field is renamed. Value is hardcoded to "updated_at". Value is derived from the updated_at field. The updated_at field is parsed as a timestamp and mapped. Set to "true" if severity is "ALERT" and incident_status is "NEW". Converted to boolean. Set to "true" if severity is "ALERT" and incident_status is "NEW". Converted to boolean. Value is hardcoded to "GENERIC_EVENT". Value is hardcoded to "CISCO_CLOUDLOCK_CASB". Value is hardcoded to "CloudLock". Value is hardcoded to "Cisco". Set to "ALERTING" if severity is "ALERT" and incident_status is not "RESOLVED" or "DISMISSED". Set to "NOT_ALERTING" if severity is "ALERT" and incident_status is "RESOLVED" or "DISMISSED". Derived from the matches array, specifically the key of each match object. Derived from the matches array, specifically the value of each match object. Derived from policy.id. Derived from policy.name. Set to "INFORMATIONAL" if severity is "INFO". Set to "CRITICAL" if severity is "CRITICAL". Derived from severity. The value is set to "match count: " concatenated with the value of match_count. Set to "STORAGE_OBJECT" when entity.origin_type is "document". Derived from entity.direct_url when entity.origin_type is "document".
policy.id security_result.rule_id The policy.id field is renamed.
policy.name security_result.rule_name The policy.name field is renamed.
severity security_result.severity_details The severity field is renamed.
updated_at about.resource.attribute.labels.key The updated_at field's value is assigned to the labels key.
updated_at about.resource.attribute.labels.value The updated_at field's value is assigned to the labels value.
updated_at about.resource.attribute.last_update_time The updated_at field is parsed as a timestamp and mapped.

Need more help? Get answers from Community members and Google SecOps professionals.