Recolha registos do CASB do Cisco CloudLock
Este documento explica como carregar registos do Cisco CloudLock CASB para o Google Security Operations através do Amazon S3. O analisador extrai campos dos registos JSON, transforma-os e mapeia-os para o modelo de dados unificado (UDM). Processa a análise de datas, converte campos específicos em strings, mapeia campos para entidades UDM (metadados, destino, resultado de segurança, acerca de) e itera através de matches para extrair campos de deteção, acabando por unir todos os dados extraídos no campo @output.
Antes de começar
- Uma instância do Google SecOps
- Acesso privilegiado ao inquilino do Cisco CloudLock CASB
- Acesso privilegiado à AWS (S3, IAM, Lambda, EventBridge)
Obtenha os pré-requisitos do Cisco CloudLock
- Inicie sessão na consola do administrador do Cisco CloudLock CASB.
- Aceda a Definições.
- Clique no separador Autenticação e API.
- Em API, clique em Gerar para criar o seu token de acesso.
- Copie e guarde numa localização segura os seguintes detalhes:
- Token de acesso à API
- URL do servidor da API CloudLock (contacte o apoio técnico do Cloudlock para obter o URL específico da sua organização)
Configure o contentor do AWS S3 e o IAM para o Google SecOps
- Crie um contentor do Amazon S3 seguindo este guia do utilizador: Criar um contentor
- Guarde o nome e a região do contentor para referência futura (por exemplo,
cisco-cloudlock-logs). - Crie um utilizador seguindo este guia do utilizador: criar um utilizador do IAM.
- Selecione o utilizador criado.
- Selecione o separador Credenciais de segurança.
- Clique em Criar chave de acesso na secção Chaves de acesso.
- Selecione Serviço de terceiros como Exemplo de utilização.
- Clicar em Seguinte.
- Opcional: adicione uma etiqueta de descrição.
- Clique em Criar chave de acesso.
- Clique em Transferir ficheiro CSV para guardar a chave de acesso e a chave de acesso secreta para referência futura.
- Clique em Concluído.
- Selecione o separador Autorizações.
- Clique em Adicionar autorizações na secção Políticas de autorizações.
- Selecione Adicionar autorizações.
- Selecione Anexar políticas diretamente.
- Pesquise a política AmazonS3FullAccess.
- Selecione a política.
- Clicar em Seguinte.
- Clique em Adicionar autorizações.
Configure a política e a função de IAM para carregamentos do S3
- Na consola da AWS, aceda a IAM > Políticas.
- Clique em Criar política > separador JSON.
Introduza a seguinte política:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::cisco-cloudlock-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::cisco-cloudlock-logs/cloudlock/state.json" } ] }- Substitua
cisco-cloudlock-logsse tiver introduzido um nome de contentor diferente.
- Substitua
Clique em Seguinte > Criar política.
Aceda a IAM > Funções > Criar função > Serviço AWS > Lambda.
Anexe a política criada recentemente.
Dê o nome
cloudlock-lambda-roleà função e clique em Criar função.
Crie a função Lambda
- Na consola da AWS, aceda a Lambda > Functions > Create function.
- Clique em Criar do zero.
Faculte os seguintes detalhes de configuração:
Definição Valor Nome cloudlock-data-exportRuntime Python 3.12 (mais recente suportado) Arquitetura x86_64 Função de execução cloudlock-lambda-roleDepois de criar a função, abra o separador Código, elimine o fragmento de código e introduza o seguinte código (
cloudlock-data-export.py):import json import boto3 import urllib3 import os from datetime import datetime, timedelta import logging import time # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Initialize S3 client s3_client = boto3.client('s3') def lambda_handler(event, context): """ Lambda function to fetch Cisco CloudLock CASB data and store in S3 """ # Environment variables s3_bucket = os.environ['S3_BUCKET'] s3_prefix = os.environ['S3_PREFIX'] state_key = os.environ['STATE_KEY'] api_token = os.environ['CLOUDLOCK_API_TOKEN'] api_base = os.environ['CLOUDLOCK_API_BASE'] # HTTP client http = urllib3.PoolManager() try: # Get last run state for all endpoints state = get_last_run_state(s3_bucket, state_key) # Fetch incidents data (using updated_after for incremental sync) incidents_updated_after = state.get('incidents_updated_after') incidents, new_incidents_state = fetch_cloudlock_incidents( http, api_base, api_token, incidents_updated_after ) if incidents: upload_to_s3_ndjson(s3_bucket, s3_prefix, 'incidents', incidents) logger.info(f"Uploaded {len(incidents)} incidents to S3") state['incidents_updated_after'] = new_incidents_state # Fetch activities data (using from/to time range) activities_from = state.get('activities_from') if not activities_from: activities_from = (datetime.utcnow() - timedelta(hours=24)).isoformat() activities_to = datetime.utcnow().isoformat() activities = fetch_cloudlock_activities( http, api_base, api_token, activities_from, activities_to ) if activities: upload_to_s3_ndjson(s3_bucket, s3_prefix, 'activities', activities) logger.info(f"Uploaded {len(activities)} activities to S3") state['activities_from'] = activities_to # Fetch entities data (using updated_after for incremental sync) entities_updated_after = state.get('entities_updated_after') entities, new_entities_state = fetch_cloudlock_entities( http, api_base, api_token, entities_updated_after ) if entities: upload_to_s3_ndjson(s3_bucket, s3_prefix, 'entities', entities) logger.info(f"Uploaded {len(entities)} entities to S3") state['entities_updated_after'] = new_entities_state # Update consolidated state state['updated_at'] = datetime.utcnow().isoformat() update_last_run_state(s3_bucket, state_key, state) return { 'statusCode': 200, 'body': json.dumps('CloudLock data export completed successfully') } except Exception as e: logger.error(f"Error in lambda_handler: {str(e)}") return { 'statusCode': 500, 'body': json.dumps(f'Error: {str(e)}') } def make_api_request(http, url, headers, retries=3): """ Make API request with exponential backoff retry logic """ for attempt in range(retries): try: response = http.request('GET', url, headers=headers) if response.status == 200: return response elif response.status == 429: # Rate limit retry_after = int(response.headers.get('Retry-After', 60)) logger.warning(f"Rate limited, waiting {retry_after} seconds") time.sleep(retry_after) else: logger.error(f"API request failed with status {response.status}") except Exception as e: logger.error(f"Request attempt {attempt + 1} failed: {str(e)}") if attempt < retries - 1: wait_time = 2 ** attempt time.sleep(wait_time) else: raise return None def fetch_cloudlock_incidents(http, api_base, api_token, updated_after=None): """ Fetch incidents data from CloudLock API using updated_after for incremental sync API Reference: https://developer.cisco.com/docs/cloud-security/ """ url = f"{api_base}/api/v2/incidents" headers = { 'Authorization': f'Bearer {api_token}', 'Content-Type': 'application/json' } params = { 'limit': 1000, 'offset': 0, 'count_total': 'false' } if updated_after: params['updated_after'] = updated_after all_data = [] latest_updated_at = updated_after try: while True: # Build URL with parameters (avoid logging sensitive data) param_string = '&'.join([f"{k}={v}" for k, v in params.items()]) full_url = f"{url}?{param_string}" logger.info(f"Fetching incidents with offset: {params['offset']}") response = make_api_request(http, full_url, headers) if not response: break data = json.loads(response.data.decode('utf-8')) batch_data = data if isinstance(data, list) else data.get('data', []) if not batch_data: break all_data.extend(batch_data) # Track latest updated_at for incremental sync for item in batch_data: if 'updated_at' in item: item_updated_at = item['updated_at'] if not latest_updated_at or item_updated_at > latest_updated_at: latest_updated_at = item_updated_at # Check pagination if len(batch_data) < params['limit']: break params['offset'] += params['limit'] logger.info(f"Fetched {len(all_data)} incidents") return all_data, latest_updated_at except Exception as e: logger.error(f"Error fetching incidents: {str(e)}") return [], updated_after def fetch_cloudlock_activities(http, api_base, api_token, from_time, to_time): """ Fetch activities data from CloudLock API using time range API Reference: https://developer.cisco.com/docs/cloud-security/ """ url = f"{api_base}/api/v2/activities" headers = { 'Authorization': f'Bearer {api_token}', 'Content-Type': 'application/json' } params = { 'limit': 1000, 'offset': 0, 'from': from_time, 'to': to_time } all_data = [] try: while True: param_string = '&'.join([f"{k}={v}" for k, v in params.items()]) full_url = f"{url}?{param_string}" logger.info(f"Fetching activities with offset: {params['offset']}") response = make_api_request(http, full_url, headers) if not response: break data = json.loads(response.data.decode('utf-8')) batch_data = data if isinstance(data, list) else data.get('data', []) if not batch_data: break all_data.extend(batch_data) if len(batch_data) < params['limit']: break params['offset'] += params['limit'] logger.info(f"Fetched {len(all_data)} activities") return all_data except Exception as e: logger.error(f"Error fetching activities: {str(e)}") return [] def fetch_cloudlock_entities(http, api_base, api_token, updated_after=None): """ Fetch entities data from CloudLock API using updated_after for incremental sync API Reference: https://developer.cisco.com/docs/cloud-security/ """ url = f"{api_base}/api/v2/entities" headers = { 'Authorization': f'Bearer {api_token}', 'Content-Type': 'application/json' } params = { 'limit': 1000, 'offset': 0 } if updated_after: params['updated_after'] = updated_after all_data = [] latest_updated_at = updated_after try: while True: param_string = '&'.join([f"{k}={v}" for k, v in params.items()]) full_url = f"{url}?{param_string}" logger.info(f"Fetching entities with offset: {params['offset']}") response = make_api_request(http, full_url, headers) if not response: break data = json.loads(response.data.decode('utf-8')) batch_data = data if isinstance(data, list) else data.get('data', []) if not batch_data: break all_data.extend(batch_data) # Track latest updated_at for incremental sync for item in batch_data: if 'updated_at' in item: item_updated_at = item['updated_at'] if not latest_updated_at or item_updated_at > latest_updated_at: latest_updated_at = item_updated_at if len(batch_data) < params['limit']: break params['offset'] += params['limit'] logger.info(f"Fetched {len(all_data)} entities") return all_data, latest_updated_at except Exception as e: logger.error(f"Error fetching entities: {str(e)}") return [], updated_after def upload_to_s3_ndjson(bucket, prefix, data_type, data): """ Upload data to S3 bucket in NDJSON format (one JSON object per line) """ timestamp = datetime.utcnow().strftime('%Y/%m/%d/%H') filename = f"{prefix}{data_type}/{timestamp}/cloudlock_{data_type}_{int(datetime.utcnow().timestamp())}.jsonl" try: # Convert to NDJSON format ndjson_content = 'n'.join([json.dumps(item, separators=(',', ':')) for item in data]) s3_client.put_object( Bucket=bucket, Key=filename, Body=ndjson_content, ContentType='application/x-ndjson' ) logger.info(f"Successfully uploaded {filename} to S3") except Exception as e: logger.error(f"Error uploading to S3: {str(e)}") raise def get_last_run_state(bucket, key): """ Get the last run state from S3 with separate tracking for each endpoint """ try: response = s3_client.get_object(Bucket=bucket, Key=key) state = json.loads(response['Body'].read().decode('utf-8')) return state except s3_client.exceptions.NoSuchKey: logger.info("No previous state found, starting fresh") return {} except Exception as e: logger.error(f"Error reading state: {str(e)}") return {} def update_last_run_state(bucket, key, state): """ Update the consolidated state in S3 """ try: s3_client.put_object( Bucket=bucket, Key=key, Body=json.dumps(state, indent=2), ContentType='application/json' ) logger.info("Updated state successfully") except Exception as e: logger.error(f"Error updating state: {str(e)}") raiseAceda a Configuração > Variáveis de ambiente.
Clique em Editar > Adicionar nova variável de ambiente.
Introduza as seguintes variáveis de ambiente fornecidas, substituindo-as pelos seus valores.
Chave Valor de exemplo S3_BUCKETcisco-cloudlock-logsS3_PREFIXcloudlock/STATE_KEYcloudlock/state.jsonCLOUDLOCK_API_TOKEN<your-api-token>CLOUDLOCK_API_BASE<your-cloudlock-api-url>Depois de criar a função, permaneça na respetiva página (ou abra Lambda > Functions > a sua função).
Selecione o separador Configuração.
No painel Configuração geral, clique em Editar.
Altere Tempo limite para 5 minutos (300 segundos) e clique em Guardar.
Crie um horário do EventBridge
- Aceda a Amazon EventBridge > Scheduler > Create schedule.
- Indique os seguintes detalhes de configuração:
- Agenda recorrente: Taxa (
1 hour). - Alvo: a sua função Lambda
cloudlock-data-export. - Nome:
cloudlock-data-export-1h.
- Agenda recorrente: Taxa (
- Clique em Criar programação.
Opcional: crie um utilizador e chaves da IAM só de leitura para o Google SecOps
- Aceda a AWS Console > IAM > Users > Add users.
- Clique em Adicionar utilizadores.
- Indique os seguintes detalhes de configuração:
- Utilizador: introduza
secops-reader. - Tipo de acesso: selecione Chave de acesso – Acesso programático.
- Utilizador: introduza
- Clique em Criar utilizador.
- Anexe a política de leitura mínima (personalizada): Users > secops-reader > Permissions > Add permissions > Attach policies directly > Create policy.
No editor JSON, introduza a seguinte política:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::cisco-cloudlock-logs/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::cisco-cloudlock-logs" } ] }Defina o nome como
secops-reader-policy.Aceda a Criar política > pesquise/selecione > Seguinte > Adicionar autorizações.
Aceda a Credenciais de segurança > Chaves de acesso > Criar chave de acesso.
Transfira o CSV (estes valores são introduzidos no feed).
Configure um feed no Google SecOps para carregar registos do Cisco CloudLock
- Aceda a Definições do SIEM > Feeds.
- Clique em + Adicionar novo feed.
- No campo Nome do feed, introduza um nome para o feed (por exemplo,
Cisco CloudLock logs). - Selecione Amazon S3 V2 como o Tipo de origem.
- Selecione Cisco CloudLock como o Tipo de registo.
- Clicar em Seguinte.
- Especifique valores para os seguintes parâmetros de entrada:
- URI do S3:
s3://cisco-cloudlock-logs/cloudlock/ - Opções de eliminação de origens: selecione a opção de eliminação de acordo com a sua preferência.
- Idade máxima do ficheiro: inclua ficheiros modificados no último número de dias. A predefinição é 180 dias.
- ID da chave de acesso: chave de acesso do utilizador com acesso ao contentor do S3.
- Chave de acesso secreta: chave secreta do utilizador com acesso ao contentor do S3.
- Espaço de nomes do recurso: o espaço de nomes do recurso.
- Etiquetas de carregamento: a etiqueta aplicada aos eventos deste feed.
- URI do S3:
- Clicar em Seguinte.
- Reveja a nova configuração do feed no ecrã Finalizar e, de seguida, clique em Enviar.
Tabela de mapeamento da UDM
| Campo de registo | Mapeamento de UDM | Lógica |
|---|---|---|
created_at |
about.resource.attribute.labels.key |
O valor do campo created_at é atribuído à chave de etiquetas. |
created_at |
about.resource.attribute.labels.value |
O valor do campo created_at é atribuído ao valor das etiquetas. |
created_at |
about.resource.attribute.creation_time |
O campo created_at é analisado como uma data/hora e mapeado. |
entity.id |
target.asset.product_object_id |
O nome do campo entity.id é alterado. |
entity.ip |
target.ip |
O campo entity.ip é unido ao campo de IP de destino. |
entity.mime_type |
target.file.mime_type |
O nome do campo entity.mime_type é mudado quando entity.origin_type é "document". |
entity.name |
target.application |
O campo entity.name é renomeado quando entity.origin_type é "app". |
entity.name |
target.file.full_path |
O nome do campo entity.name é mudado quando entity.origin_type é "document". |
entity.origin_id |
target.resource.product_object_id |
O nome do campo entity.origin_id é alterado. |
entity.origin_type |
target.resource.resource_subtype |
O nome do campo entity.origin_type é alterado. |
entity.owner_email |
target.user.email_addresses |
O campo entity.owner_email é unido ao campo de email do utilizador de destino se corresponder a uma regex de email. |
entity.owner_email |
target.user.user_display_name |
O campo entity.owner_email é mudado de nome se não corresponder a uma regex de email. |
entity.owner_name |
target.user.user_display_name |
O campo entity.owner_name é renomeado quando entity.owner_email corresponde a uma regex de email. |
entity.vendor.name |
target.platform_version |
O nome do campo entity.vendor.name é alterado. |
id |
metadata.product_log_id |
O nome do campo id é alterado. |
incident_status |
metadata.product_event_type |
O nome do campo incident_status é alterado. O valor está codificado como "updated_at". O valor é derivado do campo updated_at. O campo updated_at é analisado como uma data/hora e mapeado. Definido como "true" se severity for "ALERT" e incident_status for "NEW". Convertido em booleano. Definido como "true" se severity for "ALERT" e incident_status for "NEW". Convertido em booleano. O valor está codificado como "GENERIC_EVENT". O valor está codificado como "CISCO_CLOUDLOCK_CASB". O valor está codificado como "CloudLock". O valor está codificado como "Cisco". Definido como "ALERTING" se severity for "ALERT" e incident_status não for "RESOLVED" nem "DISMISSED". Definido como "NOT_ALERTING" se severity for "ALERT" e incident_status for "RESOLVED" ou "DISMISSED". Derivado da matriz matches, especificamente da chave de cada objeto de correspondência. Derivado da matriz matches, especificamente do valor de cada objeto de correspondência. Derivado de policy.id. Derivado de policy.name. Definido como "INFORMATIONAL" se severity for "INFO". Definido como "CRITICAL" se severity for "CRITICAL". Derivado de severity. O valor é definido como "match count: " concatenado com o valor de match_count. Definido como "STORAGE_OBJECT" quando entity.origin_type é "document". Derivado de entity.direct_url quando entity.origin_type é "document". |
policy.id |
security_result.rule_id |
O nome do campo policy.id é alterado. |
policy.name |
security_result.rule_name |
O nome do campo policy.name é alterado. |
severity |
security_result.severity_details |
O nome do campo severity é alterado. |
updated_at |
about.resource.attribute.labels.key |
O valor do campo updated_at é atribuído à chave de etiquetas. |
updated_at |
about.resource.attribute.labels.value |
O valor do campo updated_at é atribuído ao valor das etiquetas. |
updated_at |
about.resource.attribute.last_update_time |
O campo updated_at é analisado como uma data/hora e mapeado. |
Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais da Google SecOps.