Raccogliere i log CASB di Cisco CloudLock

Supportato in:

Questo documento spiega come importare i log CASB di Cisco CloudLock in Google Security Operations utilizzando Amazon S3. Il parser estrae i campi dai log JSON, li trasforma e li mappa al modello Unified Data Model (UDM). Gestisce l'analisi della data, converte campi specifici in stringhe, mappa i campi alle entità UDM (metadati, target, risultato di sicurezza, informazioni) e scorre matches per estrarre i campi di rilevamento, unendo infine tutti i dati estratti nel campo @output.

Prima di iniziare

  • Un'istanza Google SecOps
  • Accesso con privilegi al tenant Cisco CloudLock CASB
  • Accesso privilegiato ad AWS (S3, IAM, Lambda, EventBridge)

Ottieni i prerequisiti di Cisco CloudLock

  1. Accedi alla console di amministrazione Cisco CloudLock CASB.
  2. Vai a Impostazioni.
  3. Fai clic sulla scheda Autenticazione e API.
  4. In API, fai clic su Genera per creare il token di accesso.
  5. Copia e salva in una posizione sicura i seguenti dettagli:
    • Token di accesso API
    • URL del server API CloudLock (contatta l'assistenza Cloudlock per l'URL specifico della tua organizzazione)

Configura il bucket AWS S3 e IAM per Google SecOps

  1. Crea un bucket Amazon S3 seguendo questa guida utente: Creazione di un bucket
  2. Salva il nome e la regione del bucket per riferimento futuro (ad esempio, cisco-cloudlock-logs).
  3. Crea un utente seguendo questa guida utente: Creazione di un utente IAM.
  4. Seleziona l'utente creato.
  5. Seleziona la scheda Credenziali di sicurezza.
  6. Fai clic su Crea chiave di accesso nella sezione Chiavi di accesso.
  7. Seleziona Servizio di terze parti come Caso d'uso.
  8. Fai clic su Avanti.
  9. (Facoltativo) Aggiungi un tag di descrizione.
  10. Fai clic su Crea chiave di accesso.
  11. Fai clic su Scarica file CSV per salvare la chiave di accesso e la chiave di accesso segreta per riferimento futuro.
  12. Fai clic su Fine.
  13. Seleziona la scheda Autorizzazioni.
  14. Fai clic su Aggiungi autorizzazioni nella sezione Criteri per le autorizzazioni.
  15. Seleziona Aggiungi autorizzazioni.
  16. Seleziona Allega direttamente i criteri.
  17. Cerca i criteri AmazonS3FullAccess.
  18. Seleziona la policy.
  19. Fai clic su Avanti.
  20. Fai clic su Aggiungi autorizzazioni.

Configura il ruolo e il criterio IAM per i caricamenti S3

  1. Nella console AWS, vai a IAM > Policy.
  2. Fai clic su Crea criterio > scheda JSON.
  3. Inserisci la seguente policy:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::cisco-cloudlock-logs/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": "s3:GetObject",
          "Resource": "arn:aws:s3:::cisco-cloudlock-logs/cloudlock/state.json"
        }
      ]
    }
    
    • Sostituisci cisco-cloudlock-logs se hai inserito un nome bucket diverso.
  4. Fai clic su Avanti > Crea criterio.

  5. Vai a IAM > Ruoli > Crea ruolo > Servizio AWS > Lambda.

  6. Allega il criterio appena creato.

  7. Assegna al ruolo il nome cloudlock-lambda-role e fai clic su Crea ruolo.

Crea la funzione Lambda

  1. Nella console AWS, vai a Lambda > Funzioni > Crea funzione.
  2. Fai clic su Crea autore da zero.
  3. Fornisci i seguenti dettagli di configurazione:

    Impostazione Valore
    Nome cloudlock-data-export
    Tempo di esecuzione Python 3.12 (ultima versione supportata)
    Architettura x86_64
    Ruolo di esecuzione cloudlock-lambda-role
  4. Dopo aver creato la funzione, apri la scheda Codice, elimina lo stub e inserisci il seguente codice (cloudlock-data-export.py):

    import json
    import boto3
    import urllib3
    import os
    from datetime import datetime, timedelta
    import logging
    import time
    
    # Configure logging
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)
    
    # Initialize S3 client
    s3_client = boto3.client('s3')
    
    def lambda_handler(event, context):
        """
        Lambda function to fetch Cisco CloudLock CASB data and store in S3
        """
    
        # Environment variables
        s3_bucket = os.environ['S3_BUCKET']
        s3_prefix = os.environ['S3_PREFIX']
        state_key = os.environ['STATE_KEY']
        api_token = os.environ['CLOUDLOCK_API_TOKEN']
        api_base = os.environ['CLOUDLOCK_API_BASE']
    
        # HTTP client
        http = urllib3.PoolManager()
    
        try:
            # Get last run state for all endpoints
            state = get_last_run_state(s3_bucket, state_key)
    
            # Fetch incidents data (using updated_after for incremental sync)
            incidents_updated_after = state.get('incidents_updated_after')
            incidents, new_incidents_state = fetch_cloudlock_incidents(
                http, api_base, api_token, incidents_updated_after
            )
            if incidents:
                upload_to_s3_ndjson(s3_bucket, s3_prefix, 'incidents', incidents)
                logger.info(f"Uploaded {len(incidents)} incidents to S3")
                state['incidents_updated_after'] = new_incidents_state
    
            # Fetch activities data (using from/to time range)
            activities_from = state.get('activities_from')
            if not activities_from:
                activities_from = (datetime.utcnow() - timedelta(hours=24)).isoformat()
    
            activities_to = datetime.utcnow().isoformat()
            activities = fetch_cloudlock_activities(
                http, api_base, api_token, activities_from, activities_to
            )
            if activities:
                upload_to_s3_ndjson(s3_bucket, s3_prefix, 'activities', activities)
                logger.info(f"Uploaded {len(activities)} activities to S3")
                state['activities_from'] = activities_to
    
            # Fetch entities data (using updated_after for incremental sync)
            entities_updated_after = state.get('entities_updated_after')
            entities, new_entities_state = fetch_cloudlock_entities(
                http, api_base, api_token, entities_updated_after
            )
            if entities:
                upload_to_s3_ndjson(s3_bucket, s3_prefix, 'entities', entities)
                logger.info(f"Uploaded {len(entities)} entities to S3")
                state['entities_updated_after'] = new_entities_state
    
            # Update consolidated state
            state['updated_at'] = datetime.utcnow().isoformat()
            update_last_run_state(s3_bucket, state_key, state)
    
            return {
                'statusCode': 200,
                'body': json.dumps('CloudLock data export completed successfully')
            }
    
        except Exception as e:
            logger.error(f"Error in lambda_handler: {str(e)}")
            return {
                'statusCode': 500,
                'body': json.dumps(f'Error: {str(e)}')
            }
    
    def make_api_request(http, url, headers, retries=3):
        """
        Make API request with exponential backoff retry logic
        """
        for attempt in range(retries):
            try:
                response = http.request('GET', url, headers=headers)
    
                if response.status == 200:
                    return response
                elif response.status == 429:  # Rate limit
                    retry_after = int(response.headers.get('Retry-After', 60))
                    logger.warning(f"Rate limited, waiting {retry_after} seconds")
                    time.sleep(retry_after)
                else:
                    logger.error(f"API request failed with status {response.status}")
    
            except Exception as e:
                logger.error(f"Request attempt {attempt + 1} failed: {str(e)}")
                if attempt < retries - 1:
                    wait_time = 2 ** attempt
                    time.sleep(wait_time)
                else:
                    raise
    
        return None
    
    def fetch_cloudlock_incidents(http, api_base, api_token, updated_after=None):
        """
        Fetch incidents data from CloudLock API using updated_after for incremental sync
        API Reference: https://developer.cisco.com/docs/cloud-security/
        """
        url = f"{api_base}/api/v2/incidents"
        headers = {
            'Authorization': f'Bearer {api_token}',
            'Content-Type': 'application/json'
        }
    
        params = {
            'limit': 1000,
            'offset': 0,
            'count_total': 'false'
        }
    
        if updated_after:
            params['updated_after'] = updated_after
    
        all_data = []
        latest_updated_at = updated_after
    
        try:
            while True:
                # Build URL with parameters (avoid logging sensitive data)
                param_string = '&'.join([f"{k}={v}" for k, v in params.items()])
                full_url = f"{url}?{param_string}"
    
                logger.info(f"Fetching incidents with offset: {params['offset']}")
    
                response = make_api_request(http, full_url, headers)
                if not response:
                    break
    
                data = json.loads(response.data.decode('utf-8'))
                batch_data = data if isinstance(data, list) else data.get('data', [])
    
                if not batch_data:
                    break
    
                all_data.extend(batch_data)
    
                # Track latest updated_at for incremental sync
                for item in batch_data:
                    if 'updated_at' in item:
                        item_updated_at = item['updated_at']
                        if not latest_updated_at or item_updated_at > latest_updated_at:
                            latest_updated_at = item_updated_at
    
                # Check pagination
                if len(batch_data) < params['limit']:
                    break
    
                params['offset'] += params['limit']
    
            logger.info(f"Fetched {len(all_data)} incidents")
            return all_data, latest_updated_at
    
        except Exception as e:
            logger.error(f"Error fetching incidents: {str(e)}")
            return [], updated_after
    
    def fetch_cloudlock_activities(http, api_base, api_token, from_time, to_time):
        """
        Fetch activities data from CloudLock API using time range
        API Reference: https://developer.cisco.com/docs/cloud-security/
        """
        url = f"{api_base}/api/v2/activities"
        headers = {
            'Authorization': f'Bearer {api_token}',
            'Content-Type': 'application/json'
        }
    
        params = {
            'limit': 1000,
            'offset': 0,
            'from': from_time,
            'to': to_time
        }
    
        all_data = []
    
        try:
            while True:
                param_string = '&'.join([f"{k}={v}" for k, v in params.items()])
                full_url = f"{url}?{param_string}"
    
                logger.info(f"Fetching activities with offset: {params['offset']}")
    
                response = make_api_request(http, full_url, headers)
                if not response:
                    break
    
                data = json.loads(response.data.decode('utf-8'))
                batch_data = data if isinstance(data, list) else data.get('data', [])
    
                if not batch_data:
                    break
    
                all_data.extend(batch_data)
    
                if len(batch_data) < params['limit']:
                    break
    
                params['offset'] += params['limit']
    
            logger.info(f"Fetched {len(all_data)} activities")
            return all_data
    
        except Exception as e:
            logger.error(f"Error fetching activities: {str(e)}")
            return []
    
    def fetch_cloudlock_entities(http, api_base, api_token, updated_after=None):
        """
        Fetch entities data from CloudLock API using updated_after for incremental sync
        API Reference: https://developer.cisco.com/docs/cloud-security/
        """
        url = f"{api_base}/api/v2/entities"
        headers = {
            'Authorization': f'Bearer {api_token}',
            'Content-Type': 'application/json'
        }
    
        params = {
            'limit': 1000,
            'offset': 0
        }
    
        if updated_after:
            params['updated_after'] = updated_after
    
        all_data = []
        latest_updated_at = updated_after
    
        try:
            while True:
                param_string = '&'.join([f"{k}={v}" for k, v in params.items()])
                full_url = f"{url}?{param_string}"
    
                logger.info(f"Fetching entities with offset: {params['offset']}")
    
                response = make_api_request(http, full_url, headers)
                if not response:
                    break
    
                data = json.loads(response.data.decode('utf-8'))
                batch_data = data if isinstance(data, list) else data.get('data', [])
    
                if not batch_data:
                    break
    
                all_data.extend(batch_data)
    
                # Track latest updated_at for incremental sync
                for item in batch_data:
                    if 'updated_at' in item:
                        item_updated_at = item['updated_at']
                        if not latest_updated_at or item_updated_at > latest_updated_at:
                            latest_updated_at = item_updated_at
    
                if len(batch_data) < params['limit']:
                    break
    
                params['offset'] += params['limit']
    
            logger.info(f"Fetched {len(all_data)} entities")
            return all_data, latest_updated_at
    
        except Exception as e:
            logger.error(f"Error fetching entities: {str(e)}")
            return [], updated_after
    
    def upload_to_s3_ndjson(bucket, prefix, data_type, data):
        """
        Upload data to S3 bucket in NDJSON format (one JSON object per line)
        """
        timestamp = datetime.utcnow().strftime('%Y/%m/%d/%H')
        filename = f"{prefix}{data_type}/{timestamp}/cloudlock_{data_type}_{int(datetime.utcnow().timestamp())}.jsonl"
    
        try:
            # Convert to NDJSON format
            ndjson_content = 'n'.join([json.dumps(item, separators=(',', ':')) for item in data])
    
            s3_client.put_object(
                Bucket=bucket,
                Key=filename,
                Body=ndjson_content,
                ContentType='application/x-ndjson'
            )
            logger.info(f"Successfully uploaded {filename} to S3")
        except Exception as e:
            logger.error(f"Error uploading to S3: {str(e)}")
            raise
    
    def get_last_run_state(bucket, key):
        """
        Get the last run state from S3 with separate tracking for each endpoint
        """
        try:
            response = s3_client.get_object(Bucket=bucket, Key=key)
            state = json.loads(response['Body'].read().decode('utf-8'))
            return state
        except s3_client.exceptions.NoSuchKey:
            logger.info("No previous state found, starting fresh")
            return {}
        except Exception as e:
            logger.error(f"Error reading state: {str(e)}")
            return {}
    
    def update_last_run_state(bucket, key, state):
        """
        Update the consolidated state in S3
        """
        try:
            s3_client.put_object(
                Bucket=bucket,
                Key=key,
                Body=json.dumps(state, indent=2),
                ContentType='application/json'
            )
            logger.info("Updated state successfully")
        except Exception as e:
            logger.error(f"Error updating state: {str(e)}")
            raise
    
  5. Vai a Configurazione > Variabili di ambiente.

  6. Fai clic su Modifica > Aggiungi nuova variabile di ambiente.

  7. Inserisci le seguenti variabili di ambiente fornite, sostituendole con i tuoi valori.

    Chiave Valore di esempio
    S3_BUCKET cisco-cloudlock-logs
    S3_PREFIX cloudlock/
    STATE_KEY cloudlock/state.json
    CLOUDLOCK_API_TOKEN <your-api-token>
    CLOUDLOCK_API_BASE <your-cloudlock-api-url>
  8. Dopo aver creato la funzione, rimani sulla relativa pagina (o apri Lambda > Funzioni > la tua funzione).

  9. Seleziona la scheda Configurazione.

  10. Nel riquadro Configurazione generale, fai clic su Modifica.

  11. Modifica Timeout impostando 5 minuti (300 secondi) e fai clic su Salva.

Creare una pianificazione EventBridge

  1. Vai a Amazon EventBridge > Scheduler > Crea pianificazione.
  2. Fornisci i seguenti dettagli di configurazione:
    • Programma ricorrente: Tariffa (1 hour).
    • Destinazione: la tua funzione Lambda cloudlock-data-export.
    • Nome: cloudlock-data-export-1h
  3. Fai clic su Crea pianificazione.

(Facoltativo) Crea chiavi e utenti IAM di sola lettura per Google SecOps

  1. Vai alla console AWS > IAM > Utenti > Aggiungi utenti.
  2. Fai clic su Add users (Aggiungi utenti).
  3. Fornisci i seguenti dettagli di configurazione:
    • Utente: inserisci secops-reader.
    • Tipo di accesso: seleziona Chiave di accesso - Accesso programmatico.
  4. Fai clic su Crea utente.
  5. Collega la criterio per la lettura minima (personalizzata): Utenti > secops-reader > Autorizzazioni > Aggiungi autorizzazioni > Collega le norme direttamente > Crea norma.
  6. Nell'editor JSON, inserisci la seguente policy:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::cisco-cloudlock-logs/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::cisco-cloudlock-logs"
        }
      ]
    }
    
  7. Imposta il nome su secops-reader-policy.

  8. Vai a Crea criterio > cerca/seleziona > Avanti > Aggiungi autorizzazioni.

  9. Vai a Credenziali di sicurezza > Chiavi di accesso > Crea chiave di accesso.

  10. Scarica il file CSV (questi valori vengono inseriti nel feed).

Configura un feed in Google SecOps per importare i log di Cisco CloudLock

  1. Vai a Impostazioni SIEM > Feed.
  2. Fai clic su + Aggiungi nuovo feed.
  3. Nel campo Nome feed, inserisci un nome per il feed (ad esempio, Cisco CloudLock logs).
  4. Seleziona Amazon S3 V2 come Tipo di origine.
  5. Seleziona Cisco CloudLock come Tipo di log.
  6. Fai clic su Avanti.
  7. Specifica i valori per i seguenti parametri di input:
    • URI S3: s3://cisco-cloudlock-logs/cloudlock/
    • Opzioni di eliminazione dell'origine: seleziona l'opzione di eliminazione in base alle tue preferenze.
    • Età massima del file: includi i file modificati nell'ultimo numero di giorni. Il valore predefinito è 180 giorni.
    • ID chiave di accesso: chiave di accesso utente con accesso al bucket S3.
    • Chiave di accesso segreta: chiave segreta dell'utente con accesso al bucket S3.
    • Spazio dei nomi dell'asset: lo spazio dei nomi dell'asset.
    • Etichette di importazione: l'etichetta applicata agli eventi di questo feed.
  8. Fai clic su Avanti.
  9. Controlla la nuova configurazione del feed nella schermata Finalizza e poi fai clic su Invia.

Tabella di mappatura UDM

Campo log Mappatura UDM Logic
created_at about.resource.attribute.labels.key Il valore del campo created_at viene assegnato alla chiave delle etichette.
created_at about.resource.attribute.labels.value Il valore del campo created_at viene assegnato al valore delle etichette.
created_at about.resource.attribute.creation_time Il campo created_at viene analizzato come timestamp e mappato.
entity.id target.asset.product_object_id Il campo entity.id viene rinominato.
entity.ip target.ip Il campo entity.ip viene unito al campo IP di destinazione.
entity.mime_type target.file.mime_type Il campo entity.mime_type viene rinominato quando entity.origin_type è "documento".
entity.name target.application Il campo entity.name viene rinominato quando entity.origin_type è "app".
entity.name target.file.full_path Il campo entity.name viene rinominato quando entity.origin_type è "documento".
entity.origin_id target.resource.product_object_id Il campo entity.origin_id viene rinominato.
entity.origin_type target.resource.resource_subtype Il campo entity.origin_type viene rinominato.
entity.owner_email target.user.email_addresses Il campo entity.owner_email viene unito al campo email dell'utente di destinazione se corrisponde a un'espressione regolare per le email.
entity.owner_email target.user.user_display_name Il campo entity.owner_email viene rinominato se non corrisponde a un'espressione regolare per le email.
entity.owner_name target.user.user_display_name Il campo entity.owner_name viene rinominato quando entity.owner_email corrisponde a un'espressione regolare per le email.
entity.vendor.name target.platform_version Il campo entity.vendor.name viene rinominato.
id metadata.product_log_id Il campo id viene rinominato.
incident_status metadata.product_event_type Il campo incident_status viene rinominato. Il valore è codificato in modo permanente su "updated_at". Il valore è derivato dal campo updated_at. Il campo updated_at viene analizzato come timestamp e mappato. Impostato su "true" se severity è "ALERT" e incident_status è "NEW". Convertito in booleano. Impostato su "true" se severity è "ALERT" e incident_status è "NEW". Convertito in booleano. Il valore è codificato in modo permanente su "GENERIC_EVENT". Il valore è codificato in modo permanente su "CISCO_CLOUDLOCK_CASB". Il valore è codificato in modo permanente su "CloudLock". Il valore è codificato in modo permanente su "Cisco". Impostato su "ALERTING" se severity è "ALERT" e incident_status non è "RESOLVED" o "DISMISSED". Imposta "NOT_ALERTING" se severity è "ALERT" e incident_status è "RESOLVED" o "DISMISSED". Derivato dall'array matches, in particolare dalla chiave di ogni oggetto di corrispondenza. Derivato dall'array matches, in particolare dal valore di ogni oggetto corrispondenza. Derivato da policy.id. Derivato da policy.name. Imposta su "INFORMATIONAL" se severity è "INFO". Impostato su "CRITICAL" se severity è "CRITICAL". Derivato da severity. Il valore è impostato su "match count: " concatenato al valore di match_count. Imposta su "STORAGE_OBJECT" quando entity.origin_type è "documento". Derivato da entity.direct_url quando entity.origin_type è "documento".
policy.id security_result.rule_id Il campo policy.id viene rinominato.
policy.name security_result.rule_name Il campo policy.name viene rinominato.
severity security_result.severity_details Il campo severity viene rinominato.
updated_at about.resource.attribute.labels.key Il valore del campo updated_at viene assegnato alla chiave delle etichette.
updated_at about.resource.attribute.labels.value Il valore del campo updated_at viene assegnato al valore delle etichette.
updated_at about.resource.attribute.last_update_time Il campo updated_at viene analizzato come timestamp e mappato.

Hai bisogno di ulteriore assistenza? Ricevi risposte dai membri della community e dai professionisti di Google SecOps.