Recoger registros de Cisco CloudLock CASB

Disponible en:

En este documento se explica cómo ingerir registros de Cisco CloudLock CASB en Google Security Operations mediante Amazon S3. El analizador extrae campos de los registros JSON, los transforma y los asigna al modelo de datos unificado (UDM). Se encarga de analizar las fechas, convertir campos específicos en cadenas, asignar campos a entidades de UDM (metadatos, destino, resultado de seguridad y acerca de) e iterar en matches para extraer campos de detección. Por último, combina todos los datos extraídos en el campo @output.

Antes de empezar

  • Una instancia de Google SecOps
  • Acceso privilegiado al arrendatario de Cisco CloudLock CASB
  • Acceso privilegiado a AWS (S3, IAM, Lambda y EventBridge)

Obtener los requisitos previos de Cisco CloudLock

  1. Inicia sesión en la consola de administración de Cisco CloudLock CASB.
  2. Ve a Ajustes.
  3. Haz clic en la pestaña Authentication & API (Autenticación y API).
  4. En API, haz clic en Generar para crear tu token de acceso.
  5. Copia y guarda en un lugar seguro los siguientes detalles:
    • Token de acceso a la API
    • URL del servidor de la API CloudLock (ponte en contacto con el equipo de Asistencia de CloudLock para obtener la URL específica de tu organización)

Configurar un segmento de AWS S3 y IAM para Google SecOps

  1. Crea un segmento de Amazon S3 siguiendo esta guía de usuario: Crear un segmento.
  2. Guarda el nombre y la región del segmento para consultarlos más adelante (por ejemplo, cisco-cloudlock-logs).
  3. Crea un usuario siguiendo esta guía: Crear un usuario de gestión de identidades y accesos.
  4. Selecciona el Usuario creado.
  5. Selecciona la pestaña Credenciales de seguridad.
  6. En la sección Claves de acceso, haz clic en Crear clave de acceso.
  7. Selecciona Servicio de terceros en Caso práctico.
  8. Haz clic en Siguiente.
  9. Opcional: añade una etiqueta de descripción.
  10. Haz clic en Crear clave de acceso.
  11. Haz clic en Descargar archivo CSV para guardar la clave de acceso y la clave de acceso secreta para futuras consultas.
  12. Haz clic en Listo.
  13. Selecciona la pestaña Permisos.
  14. En la sección Políticas de permisos, haz clic en Añadir permisos.
  15. Selecciona Añadir permisos.
  16. Seleccione Adjuntar políticas directamente.
  17. Busca la política AmazonS3FullAccess.
  18. Selecciona la política.
  19. Haz clic en Siguiente.
  20. Haz clic en Añadir permisos.

Configurar la política y el rol de gestión de identidades y accesos para las subidas de S3

  1. En la consola de AWS, vaya a IAM > Policies (IAM > Políticas).
  2. Haz clic en Crear política > pestaña JSON.
  3. Introduce la siguiente política:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::cisco-cloudlock-logs/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": "s3:GetObject",
          "Resource": "arn:aws:s3:::cisco-cloudlock-logs/cloudlock/state.json"
        }
      ]
    }
    
    • Sustituye cisco-cloudlock-logs si has introducido otro nombre de segmento.
  4. Haz clic en Siguiente > Crear política.

  5. Ve a IAM > Roles > Crear rol > Servicio de AWS > Lambda.

  6. Adjunte la política que acaba de crear.

  7. Dale el nombre cloudlock-lambda-role al rol y haz clic en Crear rol.

Crear la función Lambda

  1. En la consola de AWS, ve a Lambda > Funciones > Crear función.
  2. Haz clic en Crear desde cero.
  3. Proporciona los siguientes detalles de configuración:

    Ajuste Valor
    Nombre cloudlock-data-export
    Tiempo de ejecución Python 3.12 (la versión más reciente compatible)
    Arquitectura x86_64
    Rol de ejecución cloudlock-lambda-role
  4. Una vez creada la función, abra la pestaña Código, elimine el stub e introduzca el siguiente código (cloudlock-data-export.py):

    import json
    import boto3
    import urllib3
    import os
    from datetime import datetime, timedelta
    import logging
    import time
    
    # Configure logging
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)
    
    # Initialize S3 client
    s3_client = boto3.client('s3')
    
    def lambda_handler(event, context):
        """
        Lambda function to fetch Cisco CloudLock CASB data and store in S3
        """
    
        # Environment variables
        s3_bucket = os.environ['S3_BUCKET']
        s3_prefix = os.environ['S3_PREFIX']
        state_key = os.environ['STATE_KEY']
        api_token = os.environ['CLOUDLOCK_API_TOKEN']
        api_base = os.environ['CLOUDLOCK_API_BASE']
    
        # HTTP client
        http = urllib3.PoolManager()
    
        try:
            # Get last run state for all endpoints
            state = get_last_run_state(s3_bucket, state_key)
    
            # Fetch incidents data (using updated_after for incremental sync)
            incidents_updated_after = state.get('incidents_updated_after')
            incidents, new_incidents_state = fetch_cloudlock_incidents(
                http, api_base, api_token, incidents_updated_after
            )
            if incidents:
                upload_to_s3_ndjson(s3_bucket, s3_prefix, 'incidents', incidents)
                logger.info(f"Uploaded {len(incidents)} incidents to S3")
                state['incidents_updated_after'] = new_incidents_state
    
            # Fetch activities data (using from/to time range)
            activities_from = state.get('activities_from')
            if not activities_from:
                activities_from = (datetime.utcnow() - timedelta(hours=24)).isoformat()
    
            activities_to = datetime.utcnow().isoformat()
            activities = fetch_cloudlock_activities(
                http, api_base, api_token, activities_from, activities_to
            )
            if activities:
                upload_to_s3_ndjson(s3_bucket, s3_prefix, 'activities', activities)
                logger.info(f"Uploaded {len(activities)} activities to S3")
                state['activities_from'] = activities_to
    
            # Fetch entities data (using updated_after for incremental sync)
            entities_updated_after = state.get('entities_updated_after')
            entities, new_entities_state = fetch_cloudlock_entities(
                http, api_base, api_token, entities_updated_after
            )
            if entities:
                upload_to_s3_ndjson(s3_bucket, s3_prefix, 'entities', entities)
                logger.info(f"Uploaded {len(entities)} entities to S3")
                state['entities_updated_after'] = new_entities_state
    
            # Update consolidated state
            state['updated_at'] = datetime.utcnow().isoformat()
            update_last_run_state(s3_bucket, state_key, state)
    
            return {
                'statusCode': 200,
                'body': json.dumps('CloudLock data export completed successfully')
            }
    
        except Exception as e:
            logger.error(f"Error in lambda_handler: {str(e)}")
            return {
                'statusCode': 500,
                'body': json.dumps(f'Error: {str(e)}')
            }
    
    def make_api_request(http, url, headers, retries=3):
        """
        Make API request with exponential backoff retry logic
        """
        for attempt in range(retries):
            try:
                response = http.request('GET', url, headers=headers)
    
                if response.status == 200:
                    return response
                elif response.status == 429:  # Rate limit
                    retry_after = int(response.headers.get('Retry-After', 60))
                    logger.warning(f"Rate limited, waiting {retry_after} seconds")
                    time.sleep(retry_after)
                else:
                    logger.error(f"API request failed with status {response.status}")
    
            except Exception as e:
                logger.error(f"Request attempt {attempt + 1} failed: {str(e)}")
                if attempt < retries - 1:
                    wait_time = 2 ** attempt
                    time.sleep(wait_time)
                else:
                    raise
    
        return None
    
    def fetch_cloudlock_incidents(http, api_base, api_token, updated_after=None):
        """
        Fetch incidents data from CloudLock API using updated_after for incremental sync
        API Reference: https://developer.cisco.com/docs/cloud-security/
        """
        url = f"{api_base}/api/v2/incidents"
        headers = {
            'Authorization': f'Bearer {api_token}',
            'Content-Type': 'application/json'
        }
    
        params = {
            'limit': 1000,
            'offset': 0,
            'count_total': 'false'
        }
    
        if updated_after:
            params['updated_after'] = updated_after
    
        all_data = []
        latest_updated_at = updated_after
    
        try:
            while True:
                # Build URL with parameters (avoid logging sensitive data)
                param_string = '&'.join([f"{k}={v}" for k, v in params.items()])
                full_url = f"{url}?{param_string}"
    
                logger.info(f"Fetching incidents with offset: {params['offset']}")
    
                response = make_api_request(http, full_url, headers)
                if not response:
                    break
    
                data = json.loads(response.data.decode('utf-8'))
                batch_data = data if isinstance(data, list) else data.get('data', [])
    
                if not batch_data:
                    break
    
                all_data.extend(batch_data)
    
                # Track latest updated_at for incremental sync
                for item in batch_data:
                    if 'updated_at' in item:
                        item_updated_at = item['updated_at']
                        if not latest_updated_at or item_updated_at > latest_updated_at:
                            latest_updated_at = item_updated_at
    
                # Check pagination
                if len(batch_data) < params['limit']:
                    break
    
                params['offset'] += params['limit']
    
            logger.info(f"Fetched {len(all_data)} incidents")
            return all_data, latest_updated_at
    
        except Exception as e:
            logger.error(f"Error fetching incidents: {str(e)}")
            return [], updated_after
    
    def fetch_cloudlock_activities(http, api_base, api_token, from_time, to_time):
        """
        Fetch activities data from CloudLock API using time range
        API Reference: https://developer.cisco.com/docs/cloud-security/
        """
        url = f"{api_base}/api/v2/activities"
        headers = {
            'Authorization': f'Bearer {api_token}',
            'Content-Type': 'application/json'
        }
    
        params = {
            'limit': 1000,
            'offset': 0,
            'from': from_time,
            'to': to_time
        }
    
        all_data = []
    
        try:
            while True:
                param_string = '&'.join([f"{k}={v}" for k, v in params.items()])
                full_url = f"{url}?{param_string}"
    
                logger.info(f"Fetching activities with offset: {params['offset']}")
    
                response = make_api_request(http, full_url, headers)
                if not response:
                    break
    
                data = json.loads(response.data.decode('utf-8'))
                batch_data = data if isinstance(data, list) else data.get('data', [])
    
                if not batch_data:
                    break
    
                all_data.extend(batch_data)
    
                if len(batch_data) < params['limit']:
                    break
    
                params['offset'] += params['limit']
    
            logger.info(f"Fetched {len(all_data)} activities")
            return all_data
    
        except Exception as e:
            logger.error(f"Error fetching activities: {str(e)}")
            return []
    
    def fetch_cloudlock_entities(http, api_base, api_token, updated_after=None):
        """
        Fetch entities data from CloudLock API using updated_after for incremental sync
        API Reference: https://developer.cisco.com/docs/cloud-security/
        """
        url = f"{api_base}/api/v2/entities"
        headers = {
            'Authorization': f'Bearer {api_token}',
            'Content-Type': 'application/json'
        }
    
        params = {
            'limit': 1000,
            'offset': 0
        }
    
        if updated_after:
            params['updated_after'] = updated_after
    
        all_data = []
        latest_updated_at = updated_after
    
        try:
            while True:
                param_string = '&'.join([f"{k}={v}" for k, v in params.items()])
                full_url = f"{url}?{param_string}"
    
                logger.info(f"Fetching entities with offset: {params['offset']}")
    
                response = make_api_request(http, full_url, headers)
                if not response:
                    break
    
                data = json.loads(response.data.decode('utf-8'))
                batch_data = data if isinstance(data, list) else data.get('data', [])
    
                if not batch_data:
                    break
    
                all_data.extend(batch_data)
    
                # Track latest updated_at for incremental sync
                for item in batch_data:
                    if 'updated_at' in item:
                        item_updated_at = item['updated_at']
                        if not latest_updated_at or item_updated_at > latest_updated_at:
                            latest_updated_at = item_updated_at
    
                if len(batch_data) < params['limit']:
                    break
    
                params['offset'] += params['limit']
    
            logger.info(f"Fetched {len(all_data)} entities")
            return all_data, latest_updated_at
    
        except Exception as e:
            logger.error(f"Error fetching entities: {str(e)}")
            return [], updated_after
    
    def upload_to_s3_ndjson(bucket, prefix, data_type, data):
        """
        Upload data to S3 bucket in NDJSON format (one JSON object per line)
        """
        timestamp = datetime.utcnow().strftime('%Y/%m/%d/%H')
        filename = f"{prefix}{data_type}/{timestamp}/cloudlock_{data_type}_{int(datetime.utcnow().timestamp())}.jsonl"
    
        try:
            # Convert to NDJSON format
            ndjson_content = 'n'.join([json.dumps(item, separators=(',', ':')) for item in data])
    
            s3_client.put_object(
                Bucket=bucket,
                Key=filename,
                Body=ndjson_content,
                ContentType='application/x-ndjson'
            )
            logger.info(f"Successfully uploaded {filename} to S3")
        except Exception as e:
            logger.error(f"Error uploading to S3: {str(e)}")
            raise
    
    def get_last_run_state(bucket, key):
        """
        Get the last run state from S3 with separate tracking for each endpoint
        """
        try:
            response = s3_client.get_object(Bucket=bucket, Key=key)
            state = json.loads(response['Body'].read().decode('utf-8'))
            return state
        except s3_client.exceptions.NoSuchKey:
            logger.info("No previous state found, starting fresh")
            return {}
        except Exception as e:
            logger.error(f"Error reading state: {str(e)}")
            return {}
    
    def update_last_run_state(bucket, key, state):
        """
        Update the consolidated state in S3
        """
        try:
            s3_client.put_object(
                Bucket=bucket,
                Key=key,
                Body=json.dumps(state, indent=2),
                ContentType='application/json'
            )
            logger.info("Updated state successfully")
        except Exception as e:
            logger.error(f"Error updating state: {str(e)}")
            raise
    
  5. Vaya a Configuración > Variables de entorno.

  6. Haz clic en Editar > Añadir nueva variable de entorno.

  7. Introduce las siguientes variables de entorno, sustituyendo los valores por los tuyos.

    Clave Valor de ejemplo
    S3_BUCKET cisco-cloudlock-logs
    S3_PREFIX cloudlock/
    STATE_KEY cloudlock/state.json
    CLOUDLOCK_API_TOKEN <your-api-token>
    CLOUDLOCK_API_BASE <your-cloudlock-api-url>
  8. Una vez creada la función, permanece en su página (o abre Lambda > Funciones > tu-función).

  9. Seleccione la pestaña Configuración.

  10. En el panel Configuración general, haz clic en Editar.

  11. Cambia Tiempo de espera a 5 minutos (300 segundos) y haz clic en Guardar.

Crear una programación de EventBridge

  1. Ve a Amazon EventBridge > Scheduler > Create schedule (Amazon EventBridge > Programador > Crear programación).
  2. Proporcione los siguientes detalles de configuración:
    • Programación periódica: Precio (1 hour).
    • Destino: tu función Lambda cloudlock-data-export.
    • Nombre: cloudlock-data-export-1h.
  3. Haz clic en Crear programación.

Opcional: Crear un usuario y claves de gestión de identidades y accesos de solo lectura para Google SecOps

  1. Ve a Consola de AWS > IAM > Usuarios > Añadir usuarios.
  2. Haz clic en Add users (Añadir usuarios).
  3. Proporcione los siguientes detalles de configuración:
    • Usuario: introduce secops-reader.
    • Tipo de acceso: selecciona Clave de acceso – Acceso programático.
  4. Haz clic en Crear usuario.
  5. Asigna una política de lectura mínima (personalizada): Usuarios > secops-reader > Permisos > Añadir permisos > Asignar políticas directamente > Crear política.
  6. En el editor de JSON, introduce la siguiente política:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::cisco-cloudlock-logs/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::cisco-cloudlock-logs"
        }
      ]
    }
    
  7. Asigna el nombre secops-reader-policy.

  8. Ve a Crear política > busca o selecciona > Siguiente > Añadir permisos.

  9. Ve a Credenciales de seguridad > Claves de acceso > Crear clave de acceso.

  10. Descarga el archivo CSV (estos valores se introducen en el feed).

Configurar un feed en Google SecOps para ingerir registros de Cisco CloudLock

  1. Ve a Configuración de SIEM > Feeds.
  2. Haz clic en + Añadir nuevo feed.
  3. En el campo Nombre del feed, introduce un nombre para el feed (por ejemplo, Cisco CloudLock logs).
  4. Selecciona Amazon S3 V2 como Tipo de fuente.
  5. Selecciona Cisco CloudLock como Tipo de registro.
  6. Haz clic en Siguiente.
  7. Especifique los valores de los siguientes parámetros de entrada:
    • URI de S3: s3://cisco-cloudlock-logs/cloudlock/
    • Opciones de eliminación de la fuente: selecciona la opción de eliminación que prefieras.
    • Antigüedad máxima del archivo: incluye los archivos modificados en los últimos días. El valor predeterminado es 180 días.
    • ID de clave de acceso: clave de acceso de usuario con acceso al bucket de S3.
    • Clave de acceso secreta: clave secreta del usuario con acceso al bucket de S3.
    • Espacio de nombres de recursos: el espacio de nombres de recursos.
    • Etiquetas de ingestión: la etiqueta aplicada a los eventos de este feed.
  8. Haz clic en Siguiente.
  9. Revise la configuración de la nueva fuente en la pantalla Finalizar y, a continuación, haga clic en Enviar.

Tabla de asignación de UDM

Campo de registro Asignación de UDM Lógica
created_at about.resource.attribute.labels.key El valor del campo created_at se asigna a la clave labels.
created_at about.resource.attribute.labels.value El valor del campo created_at se asigna al valor de las etiquetas.
created_at about.resource.attribute.creation_time El campo created_at se analiza como una marca de tiempo y se asigna.
entity.id target.asset.product_object_id Se cambia el nombre del campo entity.id.
entity.ip target.ip El campo entity.ip se combina con el campo de IP de destino.
entity.mime_type target.file.mime_type El campo entity.mime_type cambia de nombre cuando entity.origin_type es "document".
entity.name target.application El campo entity.name cambia de nombre cuando entity.origin_type es "app".
entity.name target.file.full_path El campo entity.name cambia de nombre cuando entity.origin_type es "document".
entity.origin_id target.resource.product_object_id Se cambia el nombre del campo entity.origin_id.
entity.origin_type target.resource.resource_subtype Se cambia el nombre del campo entity.origin_type.
entity.owner_email target.user.email_addresses El campo entity.owner_email se combina con el campo de correo del usuario de destino si coincide con una expresión regular de correo.
entity.owner_email target.user.user_display_name El campo entity.owner_email se cambia de nombre si no coincide con una expresión regular de correo electrónico.
entity.owner_name target.user.user_display_name El campo entity.owner_name cambia de nombre cuando entity.owner_email coincide con una expresión regular de correo electrónico.
entity.vendor.name target.platform_version Se cambia el nombre del campo entity.vendor.name.
id metadata.product_log_id Se cambia el nombre del campo id.
incident_status metadata.product_event_type Se cambia el nombre del campo incident_status. El valor se ha codificado como "updated_at". El valor se deriva del campo updated_at. El campo updated_at se analiza como una marca de tiempo y se asigna. Se asigna el valor "true" si severity es "ALERT" y incident_status es "NEW". Se ha convertido en booleano. Se asigna el valor "true" si severity es "ALERT" y incident_status es "NEW". Se ha convertido en booleano. El valor se ha codificado como "GENERIC_EVENT". El valor se ha codificado como "CISCO_CLOUDLOCK_CASB". El valor se ha codificado como "CloudLock". El valor se ha codificado como "Cisco". Se define como "ALERTING" si severity es "ALERT" y incident_status no es "RESOLVED" ni "DISMISSED". Se asigna el valor "NOT_ALERTING" si severity es "ALERT" y incident_status es "RESOLVED" o "DISMISSED". Derivado de la matriz matches, concretamente de la clave de cada objeto de coincidencia. Derivado de la matriz matches, concretamente del valor de cada objeto de coincidencia. Derivado de policy.id. Derivado de policy.name. Se asigna el valor "INFORMATIONAL" si severity es "INFO". Se asigna el valor "CRITICAL" si severity es "CRITICAL". Derivado de severity. El valor se define como "Número de coincidencias: " concatenado con el valor de match_count. Se asigna el valor "STORAGE_OBJECT" cuando entity.origin_type es "document". Derivado de entity.direct_url cuando entity.origin_type es "document".
policy.id security_result.rule_id Se cambia el nombre del campo policy.id.
policy.name security_result.rule_name Se cambia el nombre del campo policy.name.
severity security_result.severity_details Se cambia el nombre del campo severity.
updated_at about.resource.attribute.labels.key El valor del campo updated_at se asigna a la clave labels.
updated_at about.resource.attribute.labels.value El valor del campo updated_at se asigna al valor de las etiquetas.
updated_at about.resource.attribute.last_update_time El campo updated_at se analiza como una marca de tiempo y se asigna.

¿Necesitas más ayuda? Recibe respuestas de los miembros de la comunidad y de los profesionales de Google SecOps.