Coletar registros do CASB do Cisco CloudLock
Este documento explica como ingerir registros do Cisco CloudLock CASB no Google Security Operations usando o Amazon S3. O analisador extrai campos dos registros JSON, transforma e mapeia para o modelo de dados unificado (UDM). Ele processa a análise de datas, converte campos específicos em strings, mapeia campos para entidades do UDM (metadados, destino, resultado de segurança, sobre) e itera matches para extrair campos de detecção, mesclando todos os dados extraídos no campo @output.
Antes de começar
- Uma instância do Google SecOps
- Acesso privilegiado ao locatário do Cisco CloudLock CASB.
- Acesso privilegiado à AWS (S3, IAM, Lambda, EventBridge)
Receber os pré-requisitos do Cisco CloudLock
- Faça login no console de administrador do Cisco CloudLock CASB.
- Acesse Configurações.
- Clique na guia Authentication & API.
- Em API, clique em Gerar para criar seu token de acesso.
- Copie e salve em um local seguro os seguintes detalhes:
- Token de acesso à API
- URL do servidor da API CloudLock (entre em contato com o suporte do CloudLock para receber o URL específico da sua organização)
Configurar o bucket do AWS S3 e o IAM para o Google SecOps
- Crie um bucket do Amazon S3 seguindo este guia do usuário: Como criar um bucket
- Salve o Nome e a Região do bucket para referência futura (por exemplo,
cisco-cloudlock-logs). - Crie um usuário seguindo este guia: Como criar um usuário do IAM.
- Selecione o usuário criado.
- Selecione a guia Credenciais de segurança.
- Clique em Criar chave de acesso na seção Chaves de acesso.
- Selecione Serviço de terceiros como Caso de uso.
- Clique em Próxima.
- Opcional: adicione uma tag de descrição.
- Clique em Criar chave de acesso.
- Clique em Fazer o download do arquivo CSV para salvar a chave de acesso e a chave de acesso secreta para referência futura.
- Clique em Concluído.
- Selecione a guia Permissões.
- Clique em Adicionar permissões na seção Políticas de permissões.
- Selecione Adicionar permissões.
- Selecione Anexar políticas diretamente.
- Pesquise a política AmazonS3FullAccess.
- Selecione a política.
- Clique em Próxima.
- Clique em Adicionar permissões
Configurar a política e o papel do IAM para uploads do S3
- No console da AWS, acesse IAM > Políticas.
- Clique em Criar política > guia JSON.
Insira a seguinte política:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::cisco-cloudlock-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::cisco-cloudlock-logs/cloudlock/state.json" } ] }- Substitua
cisco-cloudlock-logsse você tiver inserido um nome de bucket diferente.
- Substitua
Clique em Próxima > Criar política.
Acesse IAM > Funções > Criar função > Serviço da AWS > Lambda.
Anexe a política recém-criada.
Nomeie a função como
cloudlock-lambda-rolee clique em Criar função.
Criar a função Lambda
- No console da AWS, acesse Lambda > Functions > Create function.
- Clique em Criar do zero.
Informe os seguintes detalhes de configuração:
Configuração Valor Nome cloudlock-data-exportAmbiente de execução Python 3.12 (mais recente compatível) Arquitetura x86_64 Função de execução cloudlock-lambda-roleDepois que a função for criada, abra a guia Código, exclua o stub e insira o seguinte código (
cloudlock-data-export.py):import json import boto3 import urllib3 import os from datetime import datetime, timedelta import logging import time # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Initialize S3 client s3_client = boto3.client('s3') def lambda_handler(event, context): """ Lambda function to fetch Cisco CloudLock CASB data and store in S3 """ # Environment variables s3_bucket = os.environ['S3_BUCKET'] s3_prefix = os.environ['S3_PREFIX'] state_key = os.environ['STATE_KEY'] api_token = os.environ['CLOUDLOCK_API_TOKEN'] api_base = os.environ['CLOUDLOCK_API_BASE'] # HTTP client http = urllib3.PoolManager() try: # Get last run state for all endpoints state = get_last_run_state(s3_bucket, state_key) # Fetch incidents data (using updated_after for incremental sync) incidents_updated_after = state.get('incidents_updated_after') incidents, new_incidents_state = fetch_cloudlock_incidents( http, api_base, api_token, incidents_updated_after ) if incidents: upload_to_s3_ndjson(s3_bucket, s3_prefix, 'incidents', incidents) logger.info(f"Uploaded {len(incidents)} incidents to S3") state['incidents_updated_after'] = new_incidents_state # Fetch activities data (using from/to time range) activities_from = state.get('activities_from') if not activities_from: activities_from = (datetime.utcnow() - timedelta(hours=24)).isoformat() activities_to = datetime.utcnow().isoformat() activities = fetch_cloudlock_activities( http, api_base, api_token, activities_from, activities_to ) if activities: upload_to_s3_ndjson(s3_bucket, s3_prefix, 'activities', activities) logger.info(f"Uploaded {len(activities)} activities to S3") state['activities_from'] = activities_to # Fetch entities data (using updated_after for incremental sync) entities_updated_after = state.get('entities_updated_after') entities, new_entities_state = fetch_cloudlock_entities( http, api_base, api_token, entities_updated_after ) if entities: upload_to_s3_ndjson(s3_bucket, s3_prefix, 'entities', entities) logger.info(f"Uploaded {len(entities)} entities to S3") state['entities_updated_after'] = new_entities_state # Update consolidated state state['updated_at'] = datetime.utcnow().isoformat() update_last_run_state(s3_bucket, state_key, state) return { 'statusCode': 200, 'body': json.dumps('CloudLock data export completed successfully') } except Exception as e: logger.error(f"Error in lambda_handler: {str(e)}") return { 'statusCode': 500, 'body': json.dumps(f'Error: {str(e)}') } def make_api_request(http, url, headers, retries=3): """ Make API request with exponential backoff retry logic """ for attempt in range(retries): try: response = http.request('GET', url, headers=headers) if response.status == 200: return response elif response.status == 429: # Rate limit retry_after = int(response.headers.get('Retry-After', 60)) logger.warning(f"Rate limited, waiting {retry_after} seconds") time.sleep(retry_after) else: logger.error(f"API request failed with status {response.status}") except Exception as e: logger.error(f"Request attempt {attempt + 1} failed: {str(e)}") if attempt < retries - 1: wait_time = 2 ** attempt time.sleep(wait_time) else: raise return None def fetch_cloudlock_incidents(http, api_base, api_token, updated_after=None): """ Fetch incidents data from CloudLock API using updated_after for incremental sync API Reference: https://developer.cisco.com/docs/cloud-security/ """ url = f"{api_base}/api/v2/incidents" headers = { 'Authorization': f'Bearer {api_token}', 'Content-Type': 'application/json' } params = { 'limit': 1000, 'offset': 0, 'count_total': 'false' } if updated_after: params['updated_after'] = updated_after all_data = [] latest_updated_at = updated_after try: while True: # Build URL with parameters (avoid logging sensitive data) param_string = '&'.join([f"{k}={v}" for k, v in params.items()]) full_url = f"{url}?{param_string}" logger.info(f"Fetching incidents with offset: {params['offset']}") response = make_api_request(http, full_url, headers) if not response: break data = json.loads(response.data.decode('utf-8')) batch_data = data if isinstance(data, list) else data.get('data', []) if not batch_data: break all_data.extend(batch_data) # Track latest updated_at for incremental sync for item in batch_data: if 'updated_at' in item: item_updated_at = item['updated_at'] if not latest_updated_at or item_updated_at > latest_updated_at: latest_updated_at = item_updated_at # Check pagination if len(batch_data) < params['limit']: break params['offset'] += params['limit'] logger.info(f"Fetched {len(all_data)} incidents") return all_data, latest_updated_at except Exception as e: logger.error(f"Error fetching incidents: {str(e)}") return [], updated_after def fetch_cloudlock_activities(http, api_base, api_token, from_time, to_time): """ Fetch activities data from CloudLock API using time range API Reference: https://developer.cisco.com/docs/cloud-security/ """ url = f"{api_base}/api/v2/activities" headers = { 'Authorization': f'Bearer {api_token}', 'Content-Type': 'application/json' } params = { 'limit': 1000, 'offset': 0, 'from': from_time, 'to': to_time } all_data = [] try: while True: param_string = '&'.join([f"{k}={v}" for k, v in params.items()]) full_url = f"{url}?{param_string}" logger.info(f"Fetching activities with offset: {params['offset']}") response = make_api_request(http, full_url, headers) if not response: break data = json.loads(response.data.decode('utf-8')) batch_data = data if isinstance(data, list) else data.get('data', []) if not batch_data: break all_data.extend(batch_data) if len(batch_data) < params['limit']: break params['offset'] += params['limit'] logger.info(f"Fetched {len(all_data)} activities") return all_data except Exception as e: logger.error(f"Error fetching activities: {str(e)}") return [] def fetch_cloudlock_entities(http, api_base, api_token, updated_after=None): """ Fetch entities data from CloudLock API using updated_after for incremental sync API Reference: https://developer.cisco.com/docs/cloud-security/ """ url = f"{api_base}/api/v2/entities" headers = { 'Authorization': f'Bearer {api_token}', 'Content-Type': 'application/json' } params = { 'limit': 1000, 'offset': 0 } if updated_after: params['updated_after'] = updated_after all_data = [] latest_updated_at = updated_after try: while True: param_string = '&'.join([f"{k}={v}" for k, v in params.items()]) full_url = f"{url}?{param_string}" logger.info(f"Fetching entities with offset: {params['offset']}") response = make_api_request(http, full_url, headers) if not response: break data = json.loads(response.data.decode('utf-8')) batch_data = data if isinstance(data, list) else data.get('data', []) if not batch_data: break all_data.extend(batch_data) # Track latest updated_at for incremental sync for item in batch_data: if 'updated_at' in item: item_updated_at = item['updated_at'] if not latest_updated_at or item_updated_at > latest_updated_at: latest_updated_at = item_updated_at if len(batch_data) < params['limit']: break params['offset'] += params['limit'] logger.info(f"Fetched {len(all_data)} entities") return all_data, latest_updated_at except Exception as e: logger.error(f"Error fetching entities: {str(e)}") return [], updated_after def upload_to_s3_ndjson(bucket, prefix, data_type, data): """ Upload data to S3 bucket in NDJSON format (one JSON object per line) """ timestamp = datetime.utcnow().strftime('%Y/%m/%d/%H') filename = f"{prefix}{data_type}/{timestamp}/cloudlock_{data_type}_{int(datetime.utcnow().timestamp())}.jsonl" try: # Convert to NDJSON format ndjson_content = 'n'.join([json.dumps(item, separators=(',', ':')) for item in data]) s3_client.put_object( Bucket=bucket, Key=filename, Body=ndjson_content, ContentType='application/x-ndjson' ) logger.info(f"Successfully uploaded {filename} to S3") except Exception as e: logger.error(f"Error uploading to S3: {str(e)}") raise def get_last_run_state(bucket, key): """ Get the last run state from S3 with separate tracking for each endpoint """ try: response = s3_client.get_object(Bucket=bucket, Key=key) state = json.loads(response['Body'].read().decode('utf-8')) return state except s3_client.exceptions.NoSuchKey: logger.info("No previous state found, starting fresh") return {} except Exception as e: logger.error(f"Error reading state: {str(e)}") return {} def update_last_run_state(bucket, key, state): """ Update the consolidated state in S3 """ try: s3_client.put_object( Bucket=bucket, Key=key, Body=json.dumps(state, indent=2), ContentType='application/json' ) logger.info("Updated state successfully") except Exception as e: logger.error(f"Error updating state: {str(e)}") raiseAcesse Configuração > Variáveis de ambiente.
Clique em Editar > Adicionar nova variável de ambiente.
Insira as variáveis de ambiente a seguir, substituindo pelos seus valores.
Chave Valor de exemplo S3_BUCKETcisco-cloudlock-logsS3_PREFIXcloudlock/STATE_KEYcloudlock/state.jsonCLOUDLOCK_API_TOKEN<your-api-token>CLOUDLOCK_API_BASE<your-cloudlock-api-url>Depois que a função for criada, permaneça na página dela ou abra Lambda > Functions > sua-função.
Selecione a guia Configuração.
No painel Configuração geral, clique em Editar.
Mude Tempo limite para 5 minutos (300 segundos) e clique em Salvar.
Criar uma programação do EventBridge
- Acesse Amazon EventBridge > Scheduler > Criar programação.
- Informe os seguintes detalhes de configuração:
- Programação recorrente: Taxa (
1 hour). - Destino: sua função Lambda
cloudlock-data-export. - Nome:
cloudlock-data-export-1h.
- Programação recorrente: Taxa (
- Clique em Criar programação.
Opcional: criar um usuário e chaves do IAM somente leitura para o Google SecOps
- Acesse Console da AWS > IAM > Usuários > Adicionar usuários.
- Clique em Add users.
- Informe os seguintes detalhes de configuração:
- Usuário: insira
secops-reader. - Tipo de acesso: selecione Chave de acesso – Acesso programático.
- Usuário: insira
- Clique em Criar usuário.
- Anexe a política de leitura mínima (personalizada): Usuários > secops-reader > Permissões > Adicionar permissões > Anexar políticas diretamente > Criar política.
No editor JSON, insira a seguinte política:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::cisco-cloudlock-logs/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::cisco-cloudlock-logs" } ] }Defina o nome como
secops-reader-policy.Acesse Criar política > pesquise/selecione > Próxima > Adicionar permissões.
Acesse Credenciais de segurança > Chaves de acesso > Criar chave de acesso.
Faça o download do CSV (esses valores são inseridos no feed).
Configurar um feed no Google SecOps para ingerir registros do Cisco CloudLock
- Acesse Configurações do SIEM > Feeds.
- Clique em + Adicionar novo feed.
- No campo Nome do feed, insira um nome para o feed (por exemplo,
Cisco CloudLock logs). - Selecione Amazon S3 V2 como o Tipo de origem.
- Selecione Cisco CloudLock como o Tipo de registro.
- Clique em Próxima.
- Especifique valores para os seguintes parâmetros de entrada:
- URI do S3:
s3://cisco-cloudlock-logs/cloudlock/ - Opções de exclusão de fontes: selecione a opção de exclusão de acordo com sua preferência.
- Idade máxima do arquivo: inclui arquivos modificados no último número de dias. O padrão é de 180 dias.
- ID da chave de acesso: chave de acesso do usuário com acesso ao bucket do S3.
- Chave de acesso secreta: chave secreta do usuário com acesso ao bucket do S3.
- Namespace do recurso: o namespace do recurso.
- Rótulos de ingestão: o rótulo aplicado aos eventos deste feed.
- URI do S3:
- Clique em Próxima.
- Revise a nova configuração do feed na tela Finalizar e clique em Enviar.
Tabela de mapeamento do UDM
| Campo de registro | Mapeamento do UDM | Lógica |
|---|---|---|
created_at |
about.resource.attribute.labels.key |
O valor do campo created_at é atribuído à chave de rótulos. |
created_at |
about.resource.attribute.labels.value |
O valor do campo created_at é atribuído ao valor dos rótulos. |
created_at |
about.resource.attribute.creation_time |
O campo created_at é analisado como um carimbo de data/hora e mapeado. |
entity.id |
target.asset.product_object_id |
O campo entity.id foi renomeado. |
entity.ip |
target.ip |
O campo entity.ip é mesclado ao campo de IP de destino. |
entity.mime_type |
target.file.mime_type |
O campo entity.mime_type é renomeado quando entity.origin_type é "document". |
entity.name |
target.application |
O campo entity.name é renomeado quando entity.origin_type é "app". |
entity.name |
target.file.full_path |
O campo entity.name é renomeado quando entity.origin_type é "document". |
entity.origin_id |
target.resource.product_object_id |
O campo entity.origin_id foi renomeado. |
entity.origin_type |
target.resource.resource_subtype |
O campo entity.origin_type foi renomeado. |
entity.owner_email |
target.user.email_addresses |
O campo entity.owner_email é mesclado ao campo de e-mail do usuário de destino se corresponder a uma expressão regular de e-mail. |
entity.owner_email |
target.user.user_display_name |
O campo entity.owner_email será renomeado se não corresponder a uma regex de e-mail. |
entity.owner_name |
target.user.user_display_name |
O campo entity.owner_name é renomeado quando entity.owner_email corresponde a uma regex de e-mail. |
entity.vendor.name |
target.platform_version |
O campo entity.vendor.name foi renomeado. |
id |
metadata.product_log_id |
O campo id foi renomeado. |
incident_status |
metadata.product_event_type |
O campo incident_status foi renomeado. O valor é codificado como "updated_at". O valor é derivado do campo updated_at. O campo updated_at é analisado como um carimbo de data/hora e mapeado. Definido como "true" se severity for "ALERT" e incident_status for "NEW". Convertido para booleano. Definido como "true" se severity for "ALERT" e incident_status for "NEW". Convertido para booleano. O valor é codificado como "GENERIC_EVENT". O valor é codificado como "CISCO_CLOUDLOCK_CASB". O valor é codificado como "CloudLock". O valor é codificado como "Cisco". Definido como "ALERTING" se severity for "ALERT" e incident_status não for "RESOLVED" ou "DISMISSED". Definido como "NOT_ALERTING" se severity for "ALERT" e incident_status for "RESOLVED" ou "DISMISSED". Derivado da matriz matches, especificamente a chave de cada objeto de correspondência. Derivado da matriz matches, especificamente o valor de cada objeto de correspondência. Derivado de policy.id. Derivado de policy.name. Defina como "INFORMATIONAL" se severity for "INFO". Definido como "CRITICAL" se severity for "CRITICAL". Derivado de severity. O valor é definido como "contagem de correspondências: " concatenado com o valor de match_count. Defina como "STORAGE_OBJECT" quando entity.origin_type for "document". Derivado de entity.direct_url quando entity.origin_type é "document". |
policy.id |
security_result.rule_id |
O campo policy.id foi renomeado. |
policy.name |
security_result.rule_name |
O campo policy.name foi renomeado. |
severity |
security_result.severity_details |
O campo severity foi renomeado. |
updated_at |
about.resource.attribute.labels.key |
O valor do campo updated_at é atribuído à chave de rótulos. |
updated_at |
about.resource.attribute.labels.value |
O valor do campo updated_at é atribuído ao valor dos rótulos. |
updated_at |
about.resource.attribute.last_update_time |
O campo updated_at é analisado como um carimbo de data/hora e mapeado. |
Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais do Google SecOps.