Collecter les journaux Cisco CloudLock CASB
Ce document explique comment ingérer des journaux Cisco CloudLock CASB dans Google Security Operations à l'aide d'Amazon S3. L'analyseur extrait les champs des journaux JSON, les transforme et les mappe au modèle de données unifié (UDM). Il gère l'analyse des dates, convertit des champs spécifiques en chaînes, mappe les champs aux entités UDM (métadonnées, cible, résultat de sécurité, à propos) et itère sur matches pour extraire les champs de détection, en fusionnant finalement toutes les données extraites dans le champ @output.
Avant de commencer
- Une instance Google SecOps
- Accès privilégié au locataire Cisco CloudLock CASB
- Accès privilégié à AWS (S3, IAM, Lambda, EventBridge)
Obtenir les prérequis de Cisco CloudLock
- Connectez-vous à la console d'administration Cisco CloudLock CASB.
- Accédez à Paramètres.
- Cliquez sur l'onglet Authentification et API.
- Sous API, cliquez sur Générer pour créer votre jeton d'accès.
- Copiez et enregistrez les informations suivantes dans un emplacement sécurisé :
- Jeton d'accès à l'API
- URL du serveur de l'API CloudLock (contactez l'assistance Cloudlock pour obtenir l'URL spécifique à votre organisation)
Configurer un bucket AWS S3 et IAM pour Google SecOps
- Créez un bucket Amazon S3 en suivant ce guide de l'utilisateur : Créer un bucket.
- Enregistrez le Nom et la Région du bucket pour référence ultérieure (par exemple,
cisco-cloudlock-logs). - Créez un utilisateur en suivant ce guide de l'utilisateur : Créer un utilisateur IAM.
- Sélectionnez l'utilisateur créé.
- Sélectionnez l'onglet Informations d'identification de sécurité.
- Cliquez sur Créer une clé d'accès dans la section Clés d'accès.
- Sélectionnez Service tiers comme Cas d'utilisation.
- Cliquez sur Suivant.
- Facultatif : Ajoutez un tag de description.
- Cliquez sur Créer une clé d'accès.
- Cliquez sur Télécharger le fichier CSV pour enregistrer la clé d'accès et la clé d'accès secrète pour référence ultérieure.
- Cliquez sur OK.
- Sélectionnez l'onglet Autorisations.
- Cliquez sur Ajouter des autorisations dans la section Règles relatives aux autorisations.
- Sélectionnez Ajouter des autorisations.
- Sélectionnez Joindre directement des règles.
- Recherchez la règle AmazonS3FullAccess.
- Sélectionnez la règle.
- Cliquez sur Suivant.
- Cliquez sur Ajouter des autorisations.
Configurer la stratégie et le rôle IAM pour les importations S3
- Dans la console AWS, accédez à IAM > Stratégies.
- Cliquez sur Créer une règle > onglet JSON.
Saisissez la règle suivante :
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::cisco-cloudlock-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::cisco-cloudlock-logs/cloudlock/state.json" } ] }- Remplacez
cisco-cloudlock-logssi vous avez saisi un autre nom de bucket.
- Remplacez
Cliquez sur Suivant > Créer une règle.
Accédez à IAM > Rôles > Créer un rôle > Service AWS > Lambda.
Associez la règle que vous venez de créer.
Nommez le rôle
cloudlock-lambda-role, puis cliquez sur Créer un rôle.
Créer la fonction Lambda
- Dans la console AWS, accédez à Lambda > Fonctions > Créer une fonction.
- Cliquez sur Créer à partir de zéro.
Fournissez les informations de configuration suivantes :
Paramètre Valeur Nom cloudlock-data-exportDurée d'exécution Python 3.12 (dernière version compatible) Architecture x86_64 Rôle d'exécution cloudlock-lambda-roleUne fois la fonction créée, ouvrez l'onglet Code, supprimez le stub et saisissez le code suivant (
cloudlock-data-export.py) :import json import boto3 import urllib3 import os from datetime import datetime, timedelta import logging import time # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Initialize S3 client s3_client = boto3.client('s3') def lambda_handler(event, context): """ Lambda function to fetch Cisco CloudLock CASB data and store in S3 """ # Environment variables s3_bucket = os.environ['S3_BUCKET'] s3_prefix = os.environ['S3_PREFIX'] state_key = os.environ['STATE_KEY'] api_token = os.environ['CLOUDLOCK_API_TOKEN'] api_base = os.environ['CLOUDLOCK_API_BASE'] # HTTP client http = urllib3.PoolManager() try: # Get last run state for all endpoints state = get_last_run_state(s3_bucket, state_key) # Fetch incidents data (using updated_after for incremental sync) incidents_updated_after = state.get('incidents_updated_after') incidents, new_incidents_state = fetch_cloudlock_incidents( http, api_base, api_token, incidents_updated_after ) if incidents: upload_to_s3_ndjson(s3_bucket, s3_prefix, 'incidents', incidents) logger.info(f"Uploaded {len(incidents)} incidents to S3") state['incidents_updated_after'] = new_incidents_state # Fetch activities data (using from/to time range) activities_from = state.get('activities_from') if not activities_from: activities_from = (datetime.utcnow() - timedelta(hours=24)).isoformat() activities_to = datetime.utcnow().isoformat() activities = fetch_cloudlock_activities( http, api_base, api_token, activities_from, activities_to ) if activities: upload_to_s3_ndjson(s3_bucket, s3_prefix, 'activities', activities) logger.info(f"Uploaded {len(activities)} activities to S3") state['activities_from'] = activities_to # Fetch entities data (using updated_after for incremental sync) entities_updated_after = state.get('entities_updated_after') entities, new_entities_state = fetch_cloudlock_entities( http, api_base, api_token, entities_updated_after ) if entities: upload_to_s3_ndjson(s3_bucket, s3_prefix, 'entities', entities) logger.info(f"Uploaded {len(entities)} entities to S3") state['entities_updated_after'] = new_entities_state # Update consolidated state state['updated_at'] = datetime.utcnow().isoformat() update_last_run_state(s3_bucket, state_key, state) return { 'statusCode': 200, 'body': json.dumps('CloudLock data export completed successfully') } except Exception as e: logger.error(f"Error in lambda_handler: {str(e)}") return { 'statusCode': 500, 'body': json.dumps(f'Error: {str(e)}') } def make_api_request(http, url, headers, retries=3): """ Make API request with exponential backoff retry logic """ for attempt in range(retries): try: response = http.request('GET', url, headers=headers) if response.status == 200: return response elif response.status == 429: # Rate limit retry_after = int(response.headers.get('Retry-After', 60)) logger.warning(f"Rate limited, waiting {retry_after} seconds") time.sleep(retry_after) else: logger.error(f"API request failed with status {response.status}") except Exception as e: logger.error(f"Request attempt {attempt + 1} failed: {str(e)}") if attempt < retries - 1: wait_time = 2 ** attempt time.sleep(wait_time) else: raise return None def fetch_cloudlock_incidents(http, api_base, api_token, updated_after=None): """ Fetch incidents data from CloudLock API using updated_after for incremental sync API Reference: https://developer.cisco.com/docs/cloud-security/ """ url = f"{api_base}/api/v2/incidents" headers = { 'Authorization': f'Bearer {api_token}', 'Content-Type': 'application/json' } params = { 'limit': 1000, 'offset': 0, 'count_total': 'false' } if updated_after: params['updated_after'] = updated_after all_data = [] latest_updated_at = updated_after try: while True: # Build URL with parameters (avoid logging sensitive data) param_string = '&'.join([f"{k}={v}" for k, v in params.items()]) full_url = f"{url}?{param_string}" logger.info(f"Fetching incidents with offset: {params['offset']}") response = make_api_request(http, full_url, headers) if not response: break data = json.loads(response.data.decode('utf-8')) batch_data = data if isinstance(data, list) else data.get('data', []) if not batch_data: break all_data.extend(batch_data) # Track latest updated_at for incremental sync for item in batch_data: if 'updated_at' in item: item_updated_at = item['updated_at'] if not latest_updated_at or item_updated_at > latest_updated_at: latest_updated_at = item_updated_at # Check pagination if len(batch_data) < params['limit']: break params['offset'] += params['limit'] logger.info(f"Fetched {len(all_data)} incidents") return all_data, latest_updated_at except Exception as e: logger.error(f"Error fetching incidents: {str(e)}") return [], updated_after def fetch_cloudlock_activities(http, api_base, api_token, from_time, to_time): """ Fetch activities data from CloudLock API using time range API Reference: https://developer.cisco.com/docs/cloud-security/ """ url = f"{api_base}/api/v2/activities" headers = { 'Authorization': f'Bearer {api_token}', 'Content-Type': 'application/json' } params = { 'limit': 1000, 'offset': 0, 'from': from_time, 'to': to_time } all_data = [] try: while True: param_string = '&'.join([f"{k}={v}" for k, v in params.items()]) full_url = f"{url}?{param_string}" logger.info(f"Fetching activities with offset: {params['offset']}") response = make_api_request(http, full_url, headers) if not response: break data = json.loads(response.data.decode('utf-8')) batch_data = data if isinstance(data, list) else data.get('data', []) if not batch_data: break all_data.extend(batch_data) if len(batch_data) < params['limit']: break params['offset'] += params['limit'] logger.info(f"Fetched {len(all_data)} activities") return all_data except Exception as e: logger.error(f"Error fetching activities: {str(e)}") return [] def fetch_cloudlock_entities(http, api_base, api_token, updated_after=None): """ Fetch entities data from CloudLock API using updated_after for incremental sync API Reference: https://developer.cisco.com/docs/cloud-security/ """ url = f"{api_base}/api/v2/entities" headers = { 'Authorization': f'Bearer {api_token}', 'Content-Type': 'application/json' } params = { 'limit': 1000, 'offset': 0 } if updated_after: params['updated_after'] = updated_after all_data = [] latest_updated_at = updated_after try: while True: param_string = '&'.join([f"{k}={v}" for k, v in params.items()]) full_url = f"{url}?{param_string}" logger.info(f"Fetching entities with offset: {params['offset']}") response = make_api_request(http, full_url, headers) if not response: break data = json.loads(response.data.decode('utf-8')) batch_data = data if isinstance(data, list) else data.get('data', []) if not batch_data: break all_data.extend(batch_data) # Track latest updated_at for incremental sync for item in batch_data: if 'updated_at' in item: item_updated_at = item['updated_at'] if not latest_updated_at or item_updated_at > latest_updated_at: latest_updated_at = item_updated_at if len(batch_data) < params['limit']: break params['offset'] += params['limit'] logger.info(f"Fetched {len(all_data)} entities") return all_data, latest_updated_at except Exception as e: logger.error(f"Error fetching entities: {str(e)}") return [], updated_after def upload_to_s3_ndjson(bucket, prefix, data_type, data): """ Upload data to S3 bucket in NDJSON format (one JSON object per line) """ timestamp = datetime.utcnow().strftime('%Y/%m/%d/%H') filename = f"{prefix}{data_type}/{timestamp}/cloudlock_{data_type}_{int(datetime.utcnow().timestamp())}.jsonl" try: # Convert to NDJSON format ndjson_content = 'n'.join([json.dumps(item, separators=(',', ':')) for item in data]) s3_client.put_object( Bucket=bucket, Key=filename, Body=ndjson_content, ContentType='application/x-ndjson' ) logger.info(f"Successfully uploaded {filename} to S3") except Exception as e: logger.error(f"Error uploading to S3: {str(e)}") raise def get_last_run_state(bucket, key): """ Get the last run state from S3 with separate tracking for each endpoint """ try: response = s3_client.get_object(Bucket=bucket, Key=key) state = json.loads(response['Body'].read().decode('utf-8')) return state except s3_client.exceptions.NoSuchKey: logger.info("No previous state found, starting fresh") return {} except Exception as e: logger.error(f"Error reading state: {str(e)}") return {} def update_last_run_state(bucket, key, state): """ Update the consolidated state in S3 """ try: s3_client.put_object( Bucket=bucket, Key=key, Body=json.dumps(state, indent=2), ContentType='application/json' ) logger.info("Updated state successfully") except Exception as e: logger.error(f"Error updating state: {str(e)}") raiseAccédez à Configuration > Variables d'environnement.
Cliquez sur Modifier > Ajouter une variable d'environnement.
Saisissez les variables d'environnement fournies ci-dessous, en remplaçant par vos valeurs.
Clé Exemple de valeur S3_BUCKETcisco-cloudlock-logsS3_PREFIXcloudlock/STATE_KEYcloudlock/state.jsonCLOUDLOCK_API_TOKEN<your-api-token>CLOUDLOCK_API_BASE<your-cloudlock-api-url>Une fois la fonction créée, restez sur sa page (ou ouvrez Lambda > Fonctions > votre-fonction).
Accédez à l'onglet Configuration.
Dans le panneau Configuration générale, cliquez sur Modifier.
Définissez le délai avant expiration sur 5 minutes (300 secondes), puis cliquez sur Enregistrer.
Créer une programmation EventBridge
- Accédez à Amazon EventBridge> Scheduler> Create schedule.
- Fournissez les informations de configuration suivantes :
- Planning récurrent : Tarif (
1 hour). - Cible : votre fonction Lambda
cloudlock-data-export. - Nom :
cloudlock-data-export-1h.
- Planning récurrent : Tarif (
- Cliquez sur Créer la programmation.
Facultatif : Créez un utilisateur et des clés IAM en lecture seule pour Google SecOps
- Accédez à la console AWS> IAM> Utilisateurs> Ajouter des utilisateurs.
- Cliquez sur Add users (Ajouter des utilisateurs).
- Fournissez les informations de configuration suivantes :
- Utilisateur : saisissez
secops-reader. - Type d'accès : sélectionnez Clé d'accès – Accès programmatique.
- Utilisateur : saisissez
- Cliquez sur Créer un utilisateur.
- Associez une stratégie de lecture minimale (personnalisée) : Utilisateurs > secops-reader > Autorisations > Ajouter des autorisations > Associer des stratégies directement > Créer une stratégie.
Dans l'éditeur JSON, saisissez la stratégie suivante :
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::cisco-cloudlock-logs/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::cisco-cloudlock-logs" } ] }Définissez le nom sur
secops-reader-policy.Accédez à Créer une règle > recherchez/sélectionnez > Suivant > Ajouter des autorisations.
Accédez à Identifiants de sécurité > Clés d'accès > Créer une clé d'accès.
Téléchargez le CSV (ces valeurs sont saisies dans le flux).
Configurer un flux dans Google SecOps pour ingérer les journaux Cisco CloudLock
- Accédez à Paramètres SIEM> Flux.
- Cliquez sur + Ajouter un flux.
- Dans le champ Nom du flux, saisissez un nom pour le flux (par exemple,
Cisco CloudLock logs). - Sélectionnez Amazon S3 V2 comme type de source.
- Sélectionnez Cisco CloudLock comme Type de journal.
- Cliquez sur Suivant.
- Spécifiez les valeurs des paramètres d'entrée suivants :
- URI S3 :
s3://cisco-cloudlock-logs/cloudlock/ - Options de suppression de la source : sélectionnez l'option de suppression de votre choix.
- Âge maximal des fichiers : incluez les fichiers modifiés au cours des derniers jours. La valeur par défaut est de 180 jours.
- ID de clé d'accès : clé d'accès utilisateur ayant accès au bucket S3.
- Clé d'accès secrète : clé secrète de l'utilisateur ayant accès au bucket S3.
- Espace de noms de l'élément : espace de noms de l'élément.
- Libellés d'ingestion : libellé appliqué aux événements de ce flux.
- URI S3 :
- Cliquez sur Suivant.
- Vérifiez la configuration de votre nouveau flux sur l'écran Finaliser, puis cliquez sur Envoyer.
Table de mappage UDM
| Champ de journal | Mappage UDM | Logique |
|---|---|---|
created_at |
about.resource.attribute.labels.key |
La valeur du champ created_at est attribuée à la clé des libellés. |
created_at |
about.resource.attribute.labels.value |
La valeur du champ created_at est attribuée à la valeur des libellés. |
created_at |
about.resource.attribute.creation_time |
Le champ created_at est analysé en tant qu'horodatage et mappé. |
entity.id |
target.asset.product_object_id |
Le champ entity.id est renommé. |
entity.ip |
target.ip |
Le champ entity.ip est fusionné dans le champ d'adresse IP cible. |
entity.mime_type |
target.file.mime_type |
Le champ entity.mime_type est renommé lorsque entity.origin_type est défini sur "document". |
entity.name |
target.application |
Le champ entity.name est renommé lorsque entity.origin_type est défini sur "app". |
entity.name |
target.file.full_path |
Le champ entity.name est renommé lorsque entity.origin_type est défini sur "document". |
entity.origin_id |
target.resource.product_object_id |
Le champ entity.origin_id est renommé. |
entity.origin_type |
target.resource.resource_subtype |
Le champ entity.origin_type est renommé. |
entity.owner_email |
target.user.email_addresses |
Le champ entity.owner_email est fusionné dans le champ de l'adresse e-mail de l'utilisateur cible s'il correspond à une expression régulière d'adresse e-mail. |
entity.owner_email |
target.user.user_display_name |
Le champ entity.owner_email est renommé s'il ne correspond pas à une expression régulière d'adresse e-mail. |
entity.owner_name |
target.user.user_display_name |
Le champ entity.owner_name est renommé lorsque entity.owner_email correspond à une expression régulière d'adresse e-mail. |
entity.vendor.name |
target.platform_version |
Le champ entity.vendor.name est renommé. |
id |
metadata.product_log_id |
Le champ id est renommé. |
incident_status |
metadata.product_event_type |
Le champ incident_status est renommé. La valeur est codée en dur sur "updated_at". La valeur est dérivée du champ updated_at. Le champ updated_at est analysé en tant qu'horodatage et mappé. Défini sur "true" si severity est défini sur "ALERT" et incident_status sur "NEW". Converti en valeur booléenne. Défini sur "true" si severity est défini sur "ALERT" et incident_status sur "NEW". Converti en valeur booléenne. La valeur est codée en dur sur "GENERIC_EVENT". La valeur est codée en dur sur "CISCO_CLOUDLOCK_CASB". La valeur est codée en dur sur "CloudLock". La valeur est codée en dur sur "Cisco". Définissez sur "ALERTING" si severity est "ALERT" et que incident_status n'est pas "RESOLVED" ni "DISMISSED". Définissez sur "NOT_ALERTING" si severity est défini sur "ALERT" et incident_status sur "RESOLVED" ou "DISMISSED". Dérivé du tableau matches, plus précisément de la clé de chaque objet de correspondance. Dérivé du tableau matches, plus précisément de la valeur de chaque objet de correspondance. Dérivé de policy.id. Dérivé de policy.name. Définissez sur "INFORMATIONAL" si severity est "INFO". Définissez sur "CRITICAL" si severity est défini sur "CRITICAL". Dérivé de severity. La valeur est définie sur "match count: " concaténée avec la valeur de match_count. Définissez sur "STORAGE_OBJECT" lorsque entity.origin_type est défini sur "document". Dérivé de entity.direct_url lorsque entity.origin_type est "document". |
policy.id |
security_result.rule_id |
Le champ policy.id est renommé. |
policy.name |
security_result.rule_name |
Le champ policy.name est renommé. |
severity |
security_result.severity_details |
Le champ severity est renommé. |
updated_at |
about.resource.attribute.labels.key |
La valeur du champ updated_at est attribuée à la clé des libellés. |
updated_at |
about.resource.attribute.labels.value |
La valeur du champ updated_at est attribuée à la valeur des libellés. |
updated_at |
about.resource.attribute.last_update_time |
Le champ updated_at est analysé en tant qu'horodatage et mappé. |
Vous avez encore besoin d'aide ? Obtenez des réponses de membres de la communauté et de professionnels Google SecOps.