Recoger registros de Cisco CloudLock CASB
En este documento se explica cómo ingerir registros de Cisco CloudLock CASB en Google Security Operations mediante Amazon S3. El analizador extrae campos de los registros JSON, los transforma y los asigna al modelo de datos unificado (UDM). Se encarga de analizar las fechas, convertir campos específicos en cadenas, asignar campos a entidades de UDM (metadatos, destino, resultado de seguridad y acerca de) e iterar en matches para extraer campos de detección. Por último, combina todos los datos extraídos en el campo @output.
Antes de empezar
- Una instancia de Google SecOps
- Acceso privilegiado al arrendatario de Cisco CloudLock CASB
- Acceso privilegiado a AWS (S3, IAM, Lambda y EventBridge)
Obtener los requisitos previos de Cisco CloudLock
- Inicia sesión en la consola de administración de Cisco CloudLock CASB.
- Ve a Ajustes.
- Haz clic en la pestaña Authentication & API (Autenticación y API).
- En API, haz clic en Generar para crear tu token de acceso.
- Copia y guarda en un lugar seguro los siguientes detalles:
- Token de acceso a la API
- URL del servidor de la API CloudLock (ponte en contacto con el equipo de Asistencia de CloudLock para obtener la URL específica de tu organización)
Configurar un segmento de AWS S3 y IAM para Google SecOps
- Crea un segmento de Amazon S3 siguiendo esta guía de usuario: Crear un segmento.
- Guarda el nombre y la región del segmento para consultarlos más adelante (por ejemplo,
cisco-cloudlock-logs). - Crea un usuario siguiendo esta guía: Crear un usuario de gestión de identidades y accesos.
- Selecciona el Usuario creado.
- Selecciona la pestaña Credenciales de seguridad.
- En la sección Claves de acceso, haz clic en Crear clave de acceso.
- Selecciona Servicio de terceros en Caso práctico.
- Haz clic en Siguiente.
- Opcional: añade una etiqueta de descripción.
- Haz clic en Crear clave de acceso.
- Haz clic en Descargar archivo CSV para guardar la clave de acceso y la clave de acceso secreta para futuras consultas.
- Haz clic en Listo.
- Selecciona la pestaña Permisos.
- En la sección Políticas de permisos, haz clic en Añadir permisos.
- Selecciona Añadir permisos.
- Seleccione Adjuntar políticas directamente.
- Busca la política AmazonS3FullAccess.
- Selecciona la política.
- Haz clic en Siguiente.
- Haz clic en Añadir permisos.
Configurar la política y el rol de gestión de identidades y accesos para las subidas de S3
- En la consola de AWS, vaya a IAM > Policies (IAM > Políticas).
- Haz clic en Crear política > pestaña JSON.
Introduce la siguiente política:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::cisco-cloudlock-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::cisco-cloudlock-logs/cloudlock/state.json" } ] }- Sustituye
cisco-cloudlock-logssi has introducido otro nombre de segmento.
- Sustituye
Haz clic en Siguiente > Crear política.
Ve a IAM > Roles > Crear rol > Servicio de AWS > Lambda.
Adjunte la política que acaba de crear.
Dale el nombre
cloudlock-lambda-roleal rol y haz clic en Crear rol.
Crear la función Lambda
- En la consola de AWS, ve a Lambda > Funciones > Crear función.
- Haz clic en Crear desde cero.
Proporciona los siguientes detalles de configuración:
Ajuste Valor Nombre cloudlock-data-exportTiempo de ejecución Python 3.12 (la versión más reciente compatible) Arquitectura x86_64 Rol de ejecución cloudlock-lambda-roleUna vez creada la función, abra la pestaña Código, elimine el stub e introduzca el siguiente código (
cloudlock-data-export.py):import json import boto3 import urllib3 import os from datetime import datetime, timedelta import logging import time # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Initialize S3 client s3_client = boto3.client('s3') def lambda_handler(event, context): """ Lambda function to fetch Cisco CloudLock CASB data and store in S3 """ # Environment variables s3_bucket = os.environ['S3_BUCKET'] s3_prefix = os.environ['S3_PREFIX'] state_key = os.environ['STATE_KEY'] api_token = os.environ['CLOUDLOCK_API_TOKEN'] api_base = os.environ['CLOUDLOCK_API_BASE'] # HTTP client http = urllib3.PoolManager() try: # Get last run state for all endpoints state = get_last_run_state(s3_bucket, state_key) # Fetch incidents data (using updated_after for incremental sync) incidents_updated_after = state.get('incidents_updated_after') incidents, new_incidents_state = fetch_cloudlock_incidents( http, api_base, api_token, incidents_updated_after ) if incidents: upload_to_s3_ndjson(s3_bucket, s3_prefix, 'incidents', incidents) logger.info(f"Uploaded {len(incidents)} incidents to S3") state['incidents_updated_after'] = new_incidents_state # Fetch activities data (using from/to time range) activities_from = state.get('activities_from') if not activities_from: activities_from = (datetime.utcnow() - timedelta(hours=24)).isoformat() activities_to = datetime.utcnow().isoformat() activities = fetch_cloudlock_activities( http, api_base, api_token, activities_from, activities_to ) if activities: upload_to_s3_ndjson(s3_bucket, s3_prefix, 'activities', activities) logger.info(f"Uploaded {len(activities)} activities to S3") state['activities_from'] = activities_to # Fetch entities data (using updated_after for incremental sync) entities_updated_after = state.get('entities_updated_after') entities, new_entities_state = fetch_cloudlock_entities( http, api_base, api_token, entities_updated_after ) if entities: upload_to_s3_ndjson(s3_bucket, s3_prefix, 'entities', entities) logger.info(f"Uploaded {len(entities)} entities to S3") state['entities_updated_after'] = new_entities_state # Update consolidated state state['updated_at'] = datetime.utcnow().isoformat() update_last_run_state(s3_bucket, state_key, state) return { 'statusCode': 200, 'body': json.dumps('CloudLock data export completed successfully') } except Exception as e: logger.error(f"Error in lambda_handler: {str(e)}") return { 'statusCode': 500, 'body': json.dumps(f'Error: {str(e)}') } def make_api_request(http, url, headers, retries=3): """ Make API request with exponential backoff retry logic """ for attempt in range(retries): try: response = http.request('GET', url, headers=headers) if response.status == 200: return response elif response.status == 429: # Rate limit retry_after = int(response.headers.get('Retry-After', 60)) logger.warning(f"Rate limited, waiting {retry_after} seconds") time.sleep(retry_after) else: logger.error(f"API request failed with status {response.status}") except Exception as e: logger.error(f"Request attempt {attempt + 1} failed: {str(e)}") if attempt < retries - 1: wait_time = 2 ** attempt time.sleep(wait_time) else: raise return None def fetch_cloudlock_incidents(http, api_base, api_token, updated_after=None): """ Fetch incidents data from CloudLock API using updated_after for incremental sync API Reference: https://developer.cisco.com/docs/cloud-security/ """ url = f"{api_base}/api/v2/incidents" headers = { 'Authorization': f'Bearer {api_token}', 'Content-Type': 'application/json' } params = { 'limit': 1000, 'offset': 0, 'count_total': 'false' } if updated_after: params['updated_after'] = updated_after all_data = [] latest_updated_at = updated_after try: while True: # Build URL with parameters (avoid logging sensitive data) param_string = '&'.join([f"{k}={v}" for k, v in params.items()]) full_url = f"{url}?{param_string}" logger.info(f"Fetching incidents with offset: {params['offset']}") response = make_api_request(http, full_url, headers) if not response: break data = json.loads(response.data.decode('utf-8')) batch_data = data if isinstance(data, list) else data.get('data', []) if not batch_data: break all_data.extend(batch_data) # Track latest updated_at for incremental sync for item in batch_data: if 'updated_at' in item: item_updated_at = item['updated_at'] if not latest_updated_at or item_updated_at > latest_updated_at: latest_updated_at = item_updated_at # Check pagination if len(batch_data) < params['limit']: break params['offset'] += params['limit'] logger.info(f"Fetched {len(all_data)} incidents") return all_data, latest_updated_at except Exception as e: logger.error(f"Error fetching incidents: {str(e)}") return [], updated_after def fetch_cloudlock_activities(http, api_base, api_token, from_time, to_time): """ Fetch activities data from CloudLock API using time range API Reference: https://developer.cisco.com/docs/cloud-security/ """ url = f"{api_base}/api/v2/activities" headers = { 'Authorization': f'Bearer {api_token}', 'Content-Type': 'application/json' } params = { 'limit': 1000, 'offset': 0, 'from': from_time, 'to': to_time } all_data = [] try: while True: param_string = '&'.join([f"{k}={v}" for k, v in params.items()]) full_url = f"{url}?{param_string}" logger.info(f"Fetching activities with offset: {params['offset']}") response = make_api_request(http, full_url, headers) if not response: break data = json.loads(response.data.decode('utf-8')) batch_data = data if isinstance(data, list) else data.get('data', []) if not batch_data: break all_data.extend(batch_data) if len(batch_data) < params['limit']: break params['offset'] += params['limit'] logger.info(f"Fetched {len(all_data)} activities") return all_data except Exception as e: logger.error(f"Error fetching activities: {str(e)}") return [] def fetch_cloudlock_entities(http, api_base, api_token, updated_after=None): """ Fetch entities data from CloudLock API using updated_after for incremental sync API Reference: https://developer.cisco.com/docs/cloud-security/ """ url = f"{api_base}/api/v2/entities" headers = { 'Authorization': f'Bearer {api_token}', 'Content-Type': 'application/json' } params = { 'limit': 1000, 'offset': 0 } if updated_after: params['updated_after'] = updated_after all_data = [] latest_updated_at = updated_after try: while True: param_string = '&'.join([f"{k}={v}" for k, v in params.items()]) full_url = f"{url}?{param_string}" logger.info(f"Fetching entities with offset: {params['offset']}") response = make_api_request(http, full_url, headers) if not response: break data = json.loads(response.data.decode('utf-8')) batch_data = data if isinstance(data, list) else data.get('data', []) if not batch_data: break all_data.extend(batch_data) # Track latest updated_at for incremental sync for item in batch_data: if 'updated_at' in item: item_updated_at = item['updated_at'] if not latest_updated_at or item_updated_at > latest_updated_at: latest_updated_at = item_updated_at if len(batch_data) < params['limit']: break params['offset'] += params['limit'] logger.info(f"Fetched {len(all_data)} entities") return all_data, latest_updated_at except Exception as e: logger.error(f"Error fetching entities: {str(e)}") return [], updated_after def upload_to_s3_ndjson(bucket, prefix, data_type, data): """ Upload data to S3 bucket in NDJSON format (one JSON object per line) """ timestamp = datetime.utcnow().strftime('%Y/%m/%d/%H') filename = f"{prefix}{data_type}/{timestamp}/cloudlock_{data_type}_{int(datetime.utcnow().timestamp())}.jsonl" try: # Convert to NDJSON format ndjson_content = 'n'.join([json.dumps(item, separators=(',', ':')) for item in data]) s3_client.put_object( Bucket=bucket, Key=filename, Body=ndjson_content, ContentType='application/x-ndjson' ) logger.info(f"Successfully uploaded {filename} to S3") except Exception as e: logger.error(f"Error uploading to S3: {str(e)}") raise def get_last_run_state(bucket, key): """ Get the last run state from S3 with separate tracking for each endpoint """ try: response = s3_client.get_object(Bucket=bucket, Key=key) state = json.loads(response['Body'].read().decode('utf-8')) return state except s3_client.exceptions.NoSuchKey: logger.info("No previous state found, starting fresh") return {} except Exception as e: logger.error(f"Error reading state: {str(e)}") return {} def update_last_run_state(bucket, key, state): """ Update the consolidated state in S3 """ try: s3_client.put_object( Bucket=bucket, Key=key, Body=json.dumps(state, indent=2), ContentType='application/json' ) logger.info("Updated state successfully") except Exception as e: logger.error(f"Error updating state: {str(e)}") raiseVaya a Configuración > Variables de entorno.
Haz clic en Editar > Añadir nueva variable de entorno.
Introduce las siguientes variables de entorno, sustituyendo los valores por los tuyos.
Clave Valor de ejemplo S3_BUCKETcisco-cloudlock-logsS3_PREFIXcloudlock/STATE_KEYcloudlock/state.jsonCLOUDLOCK_API_TOKEN<your-api-token>CLOUDLOCK_API_BASE<your-cloudlock-api-url>Una vez creada la función, permanece en su página (o abre Lambda > Funciones > tu-función).
Seleccione la pestaña Configuración.
En el panel Configuración general, haz clic en Editar.
Cambia Tiempo de espera a 5 minutos (300 segundos) y haz clic en Guardar.
Crear una programación de EventBridge
- Ve a Amazon EventBridge > Scheduler > Create schedule (Amazon EventBridge > Programador > Crear programación).
- Proporcione los siguientes detalles de configuración:
- Programación periódica: Precio (
1 hour). - Destino: tu función Lambda
cloudlock-data-export. - Nombre:
cloudlock-data-export-1h.
- Programación periódica: Precio (
- Haz clic en Crear programación.
Opcional: Crear un usuario y claves de gestión de identidades y accesos de solo lectura para Google SecOps
- Ve a Consola de AWS > IAM > Usuarios > Añadir usuarios.
- Haz clic en Add users (Añadir usuarios).
- Proporcione los siguientes detalles de configuración:
- Usuario: introduce
secops-reader. - Tipo de acceso: selecciona Clave de acceso – Acceso programático.
- Usuario: introduce
- Haz clic en Crear usuario.
- Asigna una política de lectura mínima (personalizada): Usuarios > secops-reader > Permisos > Añadir permisos > Asignar políticas directamente > Crear política.
En el editor de JSON, introduce la siguiente política:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::cisco-cloudlock-logs/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::cisco-cloudlock-logs" } ] }Asigna el nombre
secops-reader-policy.Ve a Crear política > busca o selecciona > Siguiente > Añadir permisos.
Ve a Credenciales de seguridad > Claves de acceso > Crear clave de acceso.
Descarga el archivo CSV (estos valores se introducen en el feed).
Configurar un feed en Google SecOps para ingerir registros de Cisco CloudLock
- Ve a Configuración de SIEM > Feeds.
- Haz clic en + Añadir nuevo feed.
- En el campo Nombre del feed, introduce un nombre para el feed (por ejemplo,
Cisco CloudLock logs). - Selecciona Amazon S3 V2 como Tipo de fuente.
- Selecciona Cisco CloudLock como Tipo de registro.
- Haz clic en Siguiente.
- Especifique los valores de los siguientes parámetros de entrada:
- URI de S3:
s3://cisco-cloudlock-logs/cloudlock/ - Opciones de eliminación de la fuente: selecciona la opción de eliminación que prefieras.
- Antigüedad máxima del archivo: incluye los archivos modificados en los últimos días. El valor predeterminado es 180 días.
- ID de clave de acceso: clave de acceso de usuario con acceso al bucket de S3.
- Clave de acceso secreta: clave secreta del usuario con acceso al bucket de S3.
- Espacio de nombres de recursos: el espacio de nombres de recursos.
- Etiquetas de ingestión: la etiqueta aplicada a los eventos de este feed.
- URI de S3:
- Haz clic en Siguiente.
- Revise la configuración de la nueva fuente en la pantalla Finalizar y, a continuación, haga clic en Enviar.
Tabla de asignación de UDM
| Campo de registro | Asignación de UDM | Lógica |
|---|---|---|
created_at |
about.resource.attribute.labels.key |
El valor del campo created_at se asigna a la clave labels. |
created_at |
about.resource.attribute.labels.value |
El valor del campo created_at se asigna al valor de las etiquetas. |
created_at |
about.resource.attribute.creation_time |
El campo created_at se analiza como una marca de tiempo y se asigna. |
entity.id |
target.asset.product_object_id |
Se cambia el nombre del campo entity.id. |
entity.ip |
target.ip |
El campo entity.ip se combina con el campo de IP de destino. |
entity.mime_type |
target.file.mime_type |
El campo entity.mime_type cambia de nombre cuando entity.origin_type es "document". |
entity.name |
target.application |
El campo entity.name cambia de nombre cuando entity.origin_type es "app". |
entity.name |
target.file.full_path |
El campo entity.name cambia de nombre cuando entity.origin_type es "document". |
entity.origin_id |
target.resource.product_object_id |
Se cambia el nombre del campo entity.origin_id. |
entity.origin_type |
target.resource.resource_subtype |
Se cambia el nombre del campo entity.origin_type. |
entity.owner_email |
target.user.email_addresses |
El campo entity.owner_email se combina con el campo de correo del usuario de destino si coincide con una expresión regular de correo. |
entity.owner_email |
target.user.user_display_name |
El campo entity.owner_email se cambia de nombre si no coincide con una expresión regular de correo electrónico. |
entity.owner_name |
target.user.user_display_name |
El campo entity.owner_name cambia de nombre cuando entity.owner_email coincide con una expresión regular de correo electrónico. |
entity.vendor.name |
target.platform_version |
Se cambia el nombre del campo entity.vendor.name. |
id |
metadata.product_log_id |
Se cambia el nombre del campo id. |
incident_status |
metadata.product_event_type |
Se cambia el nombre del campo incident_status. El valor se ha codificado como "updated_at". El valor se deriva del campo updated_at. El campo updated_at se analiza como una marca de tiempo y se asigna. Se asigna el valor "true" si severity es "ALERT" y incident_status es "NEW". Se ha convertido en booleano. Se asigna el valor "true" si severity es "ALERT" y incident_status es "NEW". Se ha convertido en booleano. El valor se ha codificado como "GENERIC_EVENT". El valor se ha codificado como "CISCO_CLOUDLOCK_CASB". El valor se ha codificado como "CloudLock". El valor se ha codificado como "Cisco". Se define como "ALERTING" si severity es "ALERT" y incident_status no es "RESOLVED" ni "DISMISSED". Se asigna el valor "NOT_ALERTING" si severity es "ALERT" y incident_status es "RESOLVED" o "DISMISSED". Derivado de la matriz matches, concretamente de la clave de cada objeto de coincidencia. Derivado de la matriz matches, concretamente del valor de cada objeto de coincidencia. Derivado de policy.id. Derivado de policy.name. Se asigna el valor "INFORMATIONAL" si severity es "INFO". Se asigna el valor "CRITICAL" si severity es "CRITICAL". Derivado de severity. El valor se define como "Número de coincidencias: " concatenado con el valor de match_count. Se asigna el valor "STORAGE_OBJECT" cuando entity.origin_type es "document". Derivado de entity.direct_url cuando entity.origin_type es "document". |
policy.id |
security_result.rule_id |
Se cambia el nombre del campo policy.id. |
policy.name |
security_result.rule_name |
Se cambia el nombre del campo policy.name. |
severity |
security_result.severity_details |
Se cambia el nombre del campo severity. |
updated_at |
about.resource.attribute.labels.key |
El valor del campo updated_at se asigna a la clave labels. |
updated_at |
about.resource.attribute.labels.value |
El valor del campo updated_at se asigna al valor de las etiquetas. |
updated_at |
about.resource.attribute.last_update_time |
El campo updated_at se analiza como una marca de tiempo y se asigna. |
¿Necesitas más ayuda? Recibe respuestas de los miembros de la comunidad y de los profesionales de Google SecOps.