Raccogliere i log CASB di Cisco CloudLock
Questo documento spiega come importare i log CASB di Cisco CloudLock in Google Security Operations utilizzando Amazon S3. Il parser estrae i campi dai log JSON, li trasforma e li mappa al modello Unified Data Model (UDM). Gestisce l'analisi della data, converte campi specifici in stringhe, mappa i campi alle entità UDM (metadati, target, risultato di sicurezza, informazioni) e scorre matches
per estrarre i campi di rilevamento, unendo infine tutti i dati estratti nel campo @output
.
Prima di iniziare
- Un'istanza Google SecOps
- Accesso con privilegi al tenant Cisco CloudLock CASB
- Accesso privilegiato ad AWS (S3, IAM, Lambda, EventBridge)
Ottieni i prerequisiti di Cisco CloudLock
- Accedi alla console di amministrazione Cisco CloudLock CASB.
- Vai a Impostazioni.
- Fai clic sulla scheda Autenticazione e API.
- In API, fai clic su Genera per creare il token di accesso.
- Copia e salva in una posizione sicura i seguenti dettagli:
- Token di accesso API
- URL del server API CloudLock (contatta l'assistenza Cloudlock per l'URL specifico della tua organizzazione)
Configura il bucket AWS S3 e IAM per Google SecOps
- Crea un bucket Amazon S3 seguendo questa guida utente: Creazione di un bucket
- Salva il nome e la regione del bucket per riferimento futuro (ad esempio,
cisco-cloudlock-logs
). - Crea un utente seguendo questa guida utente: Creazione di un utente IAM.
- Seleziona l'utente creato.
- Seleziona la scheda Credenziali di sicurezza.
- Fai clic su Crea chiave di accesso nella sezione Chiavi di accesso.
- Seleziona Servizio di terze parti come Caso d'uso.
- Fai clic su Avanti.
- (Facoltativo) Aggiungi un tag di descrizione.
- Fai clic su Crea chiave di accesso.
- Fai clic su Scarica file CSV per salvare la chiave di accesso e la chiave di accesso segreta per riferimento futuro.
- Fai clic su Fine.
- Seleziona la scheda Autorizzazioni.
- Fai clic su Aggiungi autorizzazioni nella sezione Criteri per le autorizzazioni.
- Seleziona Aggiungi autorizzazioni.
- Seleziona Allega direttamente i criteri.
- Cerca i criteri AmazonS3FullAccess.
- Seleziona la policy.
- Fai clic su Avanti.
- Fai clic su Aggiungi autorizzazioni.
Configura il ruolo e il criterio IAM per i caricamenti S3
- Nella console AWS, vai a IAM > Policy.
- Fai clic su Crea criterio > scheda JSON.
Inserisci la seguente policy:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::cisco-cloudlock-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::cisco-cloudlock-logs/cloudlock/state.json" } ] }
- Sostituisci
cisco-cloudlock-logs
se hai inserito un nome bucket diverso.
- Sostituisci
Fai clic su Avanti > Crea criterio.
Vai a IAM > Ruoli > Crea ruolo > Servizio AWS > Lambda.
Allega il criterio appena creato.
Assegna al ruolo il nome
cloudlock-lambda-role
e fai clic su Crea ruolo.
Crea la funzione Lambda
- Nella console AWS, vai a Lambda > Funzioni > Crea funzione.
- Fai clic su Crea autore da zero.
Fornisci i seguenti dettagli di configurazione:
Impostazione Valore Nome cloudlock-data-export
Tempo di esecuzione Python 3.12 (ultima versione supportata) Architettura x86_64 Ruolo di esecuzione cloudlock-lambda-role
Dopo aver creato la funzione, apri la scheda Codice, elimina lo stub e inserisci il seguente codice (
cloudlock-data-export.py
):import json import boto3 import urllib3 import os from datetime import datetime, timedelta import logging import time # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Initialize S3 client s3_client = boto3.client('s3') def lambda_handler(event, context): """ Lambda function to fetch Cisco CloudLock CASB data and store in S3 """ # Environment variables s3_bucket = os.environ['S3_BUCKET'] s3_prefix = os.environ['S3_PREFIX'] state_key = os.environ['STATE_KEY'] api_token = os.environ['CLOUDLOCK_API_TOKEN'] api_base = os.environ['CLOUDLOCK_API_BASE'] # HTTP client http = urllib3.PoolManager() try: # Get last run state for all endpoints state = get_last_run_state(s3_bucket, state_key) # Fetch incidents data (using updated_after for incremental sync) incidents_updated_after = state.get('incidents_updated_after') incidents, new_incidents_state = fetch_cloudlock_incidents( http, api_base, api_token, incidents_updated_after ) if incidents: upload_to_s3_ndjson(s3_bucket, s3_prefix, 'incidents', incidents) logger.info(f"Uploaded {len(incidents)} incidents to S3") state['incidents_updated_after'] = new_incidents_state # Fetch activities data (using from/to time range) activities_from = state.get('activities_from') if not activities_from: activities_from = (datetime.utcnow() - timedelta(hours=24)).isoformat() activities_to = datetime.utcnow().isoformat() activities = fetch_cloudlock_activities( http, api_base, api_token, activities_from, activities_to ) if activities: upload_to_s3_ndjson(s3_bucket, s3_prefix, 'activities', activities) logger.info(f"Uploaded {len(activities)} activities to S3") state['activities_from'] = activities_to # Fetch entities data (using updated_after for incremental sync) entities_updated_after = state.get('entities_updated_after') entities, new_entities_state = fetch_cloudlock_entities( http, api_base, api_token, entities_updated_after ) if entities: upload_to_s3_ndjson(s3_bucket, s3_prefix, 'entities', entities) logger.info(f"Uploaded {len(entities)} entities to S3") state['entities_updated_after'] = new_entities_state # Update consolidated state state['updated_at'] = datetime.utcnow().isoformat() update_last_run_state(s3_bucket, state_key, state) return { 'statusCode': 200, 'body': json.dumps('CloudLock data export completed successfully') } except Exception as e: logger.error(f"Error in lambda_handler: {str(e)}") return { 'statusCode': 500, 'body': json.dumps(f'Error: {str(e)}') } def make_api_request(http, url, headers, retries=3): """ Make API request with exponential backoff retry logic """ for attempt in range(retries): try: response = http.request('GET', url, headers=headers) if response.status == 200: return response elif response.status == 429: # Rate limit retry_after = int(response.headers.get('Retry-After', 60)) logger.warning(f"Rate limited, waiting {retry_after} seconds") time.sleep(retry_after) else: logger.error(f"API request failed with status {response.status}") except Exception as e: logger.error(f"Request attempt {attempt + 1} failed: {str(e)}") if attempt < retries - 1: wait_time = 2 ** attempt time.sleep(wait_time) else: raise return None def fetch_cloudlock_incidents(http, api_base, api_token, updated_after=None): """ Fetch incidents data from CloudLock API using updated_after for incremental sync API Reference: https://developer.cisco.com/docs/cloud-security/ """ url = f"{api_base}/api/v2/incidents" headers = { 'Authorization': f'Bearer {api_token}', 'Content-Type': 'application/json' } params = { 'limit': 1000, 'offset': 0, 'count_total': 'false' } if updated_after: params['updated_after'] = updated_after all_data = [] latest_updated_at = updated_after try: while True: # Build URL with parameters (avoid logging sensitive data) param_string = '&'.join([f"{k}={v}" for k, v in params.items()]) full_url = f"{url}?{param_string}" logger.info(f"Fetching incidents with offset: {params['offset']}") response = make_api_request(http, full_url, headers) if not response: break data = json.loads(response.data.decode('utf-8')) batch_data = data if isinstance(data, list) else data.get('data', []) if not batch_data: break all_data.extend(batch_data) # Track latest updated_at for incremental sync for item in batch_data: if 'updated_at' in item: item_updated_at = item['updated_at'] if not latest_updated_at or item_updated_at > latest_updated_at: latest_updated_at = item_updated_at # Check pagination if len(batch_data) < params['limit']: break params['offset'] += params['limit'] logger.info(f"Fetched {len(all_data)} incidents") return all_data, latest_updated_at except Exception as e: logger.error(f"Error fetching incidents: {str(e)}") return [], updated_after def fetch_cloudlock_activities(http, api_base, api_token, from_time, to_time): """ Fetch activities data from CloudLock API using time range API Reference: https://developer.cisco.com/docs/cloud-security/ """ url = f"{api_base}/api/v2/activities" headers = { 'Authorization': f'Bearer {api_token}', 'Content-Type': 'application/json' } params = { 'limit': 1000, 'offset': 0, 'from': from_time, 'to': to_time } all_data = [] try: while True: param_string = '&'.join([f"{k}={v}" for k, v in params.items()]) full_url = f"{url}?{param_string}" logger.info(f"Fetching activities with offset: {params['offset']}") response = make_api_request(http, full_url, headers) if not response: break data = json.loads(response.data.decode('utf-8')) batch_data = data if isinstance(data, list) else data.get('data', []) if not batch_data: break all_data.extend(batch_data) if len(batch_data) < params['limit']: break params['offset'] += params['limit'] logger.info(f"Fetched {len(all_data)} activities") return all_data except Exception as e: logger.error(f"Error fetching activities: {str(e)}") return [] def fetch_cloudlock_entities(http, api_base, api_token, updated_after=None): """ Fetch entities data from CloudLock API using updated_after for incremental sync API Reference: https://developer.cisco.com/docs/cloud-security/ """ url = f"{api_base}/api/v2/entities" headers = { 'Authorization': f'Bearer {api_token}', 'Content-Type': 'application/json' } params = { 'limit': 1000, 'offset': 0 } if updated_after: params['updated_after'] = updated_after all_data = [] latest_updated_at = updated_after try: while True: param_string = '&'.join([f"{k}={v}" for k, v in params.items()]) full_url = f"{url}?{param_string}" logger.info(f"Fetching entities with offset: {params['offset']}") response = make_api_request(http, full_url, headers) if not response: break data = json.loads(response.data.decode('utf-8')) batch_data = data if isinstance(data, list) else data.get('data', []) if not batch_data: break all_data.extend(batch_data) # Track latest updated_at for incremental sync for item in batch_data: if 'updated_at' in item: item_updated_at = item['updated_at'] if not latest_updated_at or item_updated_at > latest_updated_at: latest_updated_at = item_updated_at if len(batch_data) < params['limit']: break params['offset'] += params['limit'] logger.info(f"Fetched {len(all_data)} entities") return all_data, latest_updated_at except Exception as e: logger.error(f"Error fetching entities: {str(e)}") return [], updated_after def upload_to_s3_ndjson(bucket, prefix, data_type, data): """ Upload data to S3 bucket in NDJSON format (one JSON object per line) """ timestamp = datetime.utcnow().strftime('%Y/%m/%d/%H') filename = f"{prefix}{data_type}/{timestamp}/cloudlock_{data_type}_{int(datetime.utcnow().timestamp())}.jsonl" try: # Convert to NDJSON format ndjson_content = 'n'.join([json.dumps(item, separators=(',', ':')) for item in data]) s3_client.put_object( Bucket=bucket, Key=filename, Body=ndjson_content, ContentType='application/x-ndjson' ) logger.info(f"Successfully uploaded {filename} to S3") except Exception as e: logger.error(f"Error uploading to S3: {str(e)}") raise def get_last_run_state(bucket, key): """ Get the last run state from S3 with separate tracking for each endpoint """ try: response = s3_client.get_object(Bucket=bucket, Key=key) state = json.loads(response['Body'].read().decode('utf-8')) return state except s3_client.exceptions.NoSuchKey: logger.info("No previous state found, starting fresh") return {} except Exception as e: logger.error(f"Error reading state: {str(e)}") return {} def update_last_run_state(bucket, key, state): """ Update the consolidated state in S3 """ try: s3_client.put_object( Bucket=bucket, Key=key, Body=json.dumps(state, indent=2), ContentType='application/json' ) logger.info("Updated state successfully") except Exception as e: logger.error(f"Error updating state: {str(e)}") raise
Vai a Configurazione > Variabili di ambiente.
Fai clic su Modifica > Aggiungi nuova variabile di ambiente.
Inserisci le seguenti variabili di ambiente fornite, sostituendole con i tuoi valori.
Chiave Valore di esempio S3_BUCKET
cisco-cloudlock-logs
S3_PREFIX
cloudlock/
STATE_KEY
cloudlock/state.json
CLOUDLOCK_API_TOKEN
<your-api-token>
CLOUDLOCK_API_BASE
<your-cloudlock-api-url>
Dopo aver creato la funzione, rimani sulla relativa pagina (o apri Lambda > Funzioni > la tua funzione).
Seleziona la scheda Configurazione.
Nel riquadro Configurazione generale, fai clic su Modifica.
Modifica Timeout impostando 5 minuti (300 secondi) e fai clic su Salva.
Creare una pianificazione EventBridge
- Vai a Amazon EventBridge > Scheduler > Crea pianificazione.
- Fornisci i seguenti dettagli di configurazione:
- Programma ricorrente: Tariffa (
1 hour
). - Destinazione: la tua funzione Lambda
cloudlock-data-export
. - Nome:
cloudlock-data-export-1h
- Programma ricorrente: Tariffa (
- Fai clic su Crea pianificazione.
(Facoltativo) Crea chiavi e utenti IAM di sola lettura per Google SecOps
- Vai alla console AWS > IAM > Utenti > Aggiungi utenti.
- Fai clic su Add users (Aggiungi utenti).
- Fornisci i seguenti dettagli di configurazione:
- Utente: inserisci
secops-reader
. - Tipo di accesso: seleziona Chiave di accesso - Accesso programmatico.
- Utente: inserisci
- Fai clic su Crea utente.
- Collega la criterio per la lettura minima (personalizzata): Utenti > secops-reader > Autorizzazioni > Aggiungi autorizzazioni > Collega le norme direttamente > Crea norma.
Nell'editor JSON, inserisci la seguente policy:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::cisco-cloudlock-logs/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::cisco-cloudlock-logs" } ] }
Imposta il nome su
secops-reader-policy
.Vai a Crea criterio > cerca/seleziona > Avanti > Aggiungi autorizzazioni.
Vai a Credenziali di sicurezza > Chiavi di accesso > Crea chiave di accesso.
Scarica il file CSV (questi valori vengono inseriti nel feed).
Configura un feed in Google SecOps per importare i log di Cisco CloudLock
- Vai a Impostazioni SIEM > Feed.
- Fai clic su + Aggiungi nuovo feed.
- Nel campo Nome feed, inserisci un nome per il feed (ad esempio,
Cisco CloudLock logs
). - Seleziona Amazon S3 V2 come Tipo di origine.
- Seleziona Cisco CloudLock come Tipo di log.
- Fai clic su Avanti.
- Specifica i valori per i seguenti parametri di input:
- URI S3:
s3://cisco-cloudlock-logs/cloudlock/
- Opzioni di eliminazione dell'origine: seleziona l'opzione di eliminazione in base alle tue preferenze.
- Età massima del file: includi i file modificati nell'ultimo numero di giorni. Il valore predefinito è 180 giorni.
- ID chiave di accesso: chiave di accesso utente con accesso al bucket S3.
- Chiave di accesso segreta: chiave segreta dell'utente con accesso al bucket S3.
- Spazio dei nomi dell'asset: lo spazio dei nomi dell'asset.
- Etichette di importazione: l'etichetta applicata agli eventi di questo feed.
- URI S3:
- Fai clic su Avanti.
- Controlla la nuova configurazione del feed nella schermata Finalizza e poi fai clic su Invia.
Tabella di mappatura UDM
Campo log | Mappatura UDM | Logic |
---|---|---|
created_at |
about.resource.attribute.labels.key |
Il valore del campo created_at viene assegnato alla chiave delle etichette. |
created_at |
about.resource.attribute.labels.value |
Il valore del campo created_at viene assegnato al valore delle etichette. |
created_at |
about.resource.attribute.creation_time |
Il campo created_at viene analizzato come timestamp e mappato. |
entity.id |
target.asset.product_object_id |
Il campo entity.id viene rinominato. |
entity.ip |
target.ip |
Il campo entity.ip viene unito al campo IP di destinazione. |
entity.mime_type |
target.file.mime_type |
Il campo entity.mime_type viene rinominato quando entity.origin_type è "documento". |
entity.name |
target.application |
Il campo entity.name viene rinominato quando entity.origin_type è "app". |
entity.name |
target.file.full_path |
Il campo entity.name viene rinominato quando entity.origin_type è "documento". |
entity.origin_id |
target.resource.product_object_id |
Il campo entity.origin_id viene rinominato. |
entity.origin_type |
target.resource.resource_subtype |
Il campo entity.origin_type viene rinominato. |
entity.owner_email |
target.user.email_addresses |
Il campo entity.owner_email viene unito al campo email dell'utente di destinazione se corrisponde a un'espressione regolare per le email. |
entity.owner_email |
target.user.user_display_name |
Il campo entity.owner_email viene rinominato se non corrisponde a un'espressione regolare per le email. |
entity.owner_name |
target.user.user_display_name |
Il campo entity.owner_name viene rinominato quando entity.owner_email corrisponde a un'espressione regolare per le email. |
entity.vendor.name |
target.platform_version |
Il campo entity.vendor.name viene rinominato. |
id |
metadata.product_log_id |
Il campo id viene rinominato. |
incident_status |
metadata.product_event_type |
Il campo incident_status viene rinominato. Il valore è codificato in modo permanente su "updated_at". Il valore è derivato dal campo updated_at . Il campo updated_at viene analizzato come timestamp e mappato. Impostato su "true" se severity è "ALERT" e incident_status è "NEW". Convertito in booleano. Impostato su "true" se severity è "ALERT" e incident_status è "NEW". Convertito in booleano. Il valore è codificato in modo permanente su "GENERIC_EVENT". Il valore è codificato in modo permanente su "CISCO_CLOUDLOCK_CASB". Il valore è codificato in modo permanente su "CloudLock". Il valore è codificato in modo permanente su "Cisco". Impostato su "ALERTING" se severity è "ALERT" e incident_status non è "RESOLVED" o "DISMISSED". Imposta "NOT_ALERTING" se severity è "ALERT" e incident_status è "RESOLVED" o "DISMISSED". Derivato dall'array matches , in particolare dalla chiave di ogni oggetto di corrispondenza. Derivato dall'array matches , in particolare dal valore di ogni oggetto corrispondenza. Derivato da policy.id . Derivato da policy.name . Imposta su "INFORMATIONAL" se severity è "INFO". Impostato su "CRITICAL" se severity è "CRITICAL". Derivato da severity . Il valore è impostato su "match count: " concatenato al valore di match_count . Imposta su "STORAGE_OBJECT" quando entity.origin_type è "documento". Derivato da entity.direct_url quando entity.origin_type è "documento". |
policy.id |
security_result.rule_id |
Il campo policy.id viene rinominato. |
policy.name |
security_result.rule_name |
Il campo policy.name viene rinominato. |
severity |
security_result.severity_details |
Il campo severity viene rinominato. |
updated_at |
about.resource.attribute.labels.key |
Il valore del campo updated_at viene assegnato alla chiave delle etichette. |
updated_at |
about.resource.attribute.labels.value |
Il valore del campo updated_at viene assegnato al valore delle etichette. |
updated_at |
about.resource.attribute.last_update_time |
Il campo updated_at viene analizzato come timestamp e mappato. |
Hai bisogno di ulteriore assistenza? Ricevi risposte dai membri della community e dai professionisti di Google SecOps.