Collecter les journaux Cisco CloudLock CASB

Compatible avec :

Ce document explique comment ingérer des journaux Cisco CloudLock CASB dans Google Security Operations à l'aide d'Amazon S3. L'analyseur extrait les champs des journaux JSON, les transforme et les mappe au modèle de données unifié (UDM). Il gère l'analyse des dates, convertit des champs spécifiques en chaînes, mappe les champs aux entités UDM (métadonnées, cible, résultat de sécurité, à propos) et itère sur matches pour extraire les champs de détection, en fusionnant finalement toutes les données extraites dans le champ @output.

Avant de commencer

  • Une instance Google SecOps
  • Accès privilégié au locataire Cisco CloudLock CASB
  • Accès privilégié à AWS (S3, IAM, Lambda, EventBridge)

Obtenir les prérequis de Cisco CloudLock

  1. Connectez-vous à la console d'administration Cisco CloudLock CASB.
  2. Accédez à Paramètres.
  3. Cliquez sur l'onglet Authentification et API.
  4. Sous API, cliquez sur Générer pour créer votre jeton d'accès.
  5. Copiez et enregistrez les informations suivantes dans un emplacement sécurisé :
    • Jeton d'accès à l'API
    • URL du serveur de l'API CloudLock (contactez l'assistance Cloudlock pour obtenir l'URL spécifique à votre organisation)

Configurer un bucket AWS S3 et IAM pour Google SecOps

  1. Créez un bucket Amazon S3 en suivant ce guide de l'utilisateur : Créer un bucket.
  2. Enregistrez le Nom et la Région du bucket pour référence ultérieure (par exemple, cisco-cloudlock-logs).
  3. Créez un utilisateur en suivant ce guide de l'utilisateur : Créer un utilisateur IAM.
  4. Sélectionnez l'utilisateur créé.
  5. Sélectionnez l'onglet Informations d'identification de sécurité.
  6. Cliquez sur Créer une clé d'accès dans la section Clés d'accès.
  7. Sélectionnez Service tiers comme Cas d'utilisation.
  8. Cliquez sur Suivant.
  9. Facultatif : Ajoutez un tag de description.
  10. Cliquez sur Créer une clé d'accès.
  11. Cliquez sur Télécharger le fichier CSV pour enregistrer la clé d'accès et la clé d'accès secrète pour référence ultérieure.
  12. Cliquez sur OK.
  13. Sélectionnez l'onglet Autorisations.
  14. Cliquez sur Ajouter des autorisations dans la section Règles relatives aux autorisations.
  15. Sélectionnez Ajouter des autorisations.
  16. Sélectionnez Joindre directement des règles.
  17. Recherchez la règle AmazonS3FullAccess.
  18. Sélectionnez la règle.
  19. Cliquez sur Suivant.
  20. Cliquez sur Ajouter des autorisations.

Configurer la stratégie et le rôle IAM pour les importations S3

  1. Dans la console AWS, accédez à IAM > Stratégies.
  2. Cliquez sur Créer une règle > onglet JSON.
  3. Saisissez la règle suivante :

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::cisco-cloudlock-logs/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": "s3:GetObject",
          "Resource": "arn:aws:s3:::cisco-cloudlock-logs/cloudlock/state.json"
        }
      ]
    }
    
    • Remplacez cisco-cloudlock-logs si vous avez saisi un autre nom de bucket.
  4. Cliquez sur Suivant > Créer une règle.

  5. Accédez à IAM > Rôles > Créer un rôle > Service AWS > Lambda.

  6. Associez la règle que vous venez de créer.

  7. Nommez le rôle cloudlock-lambda-role, puis cliquez sur Créer un rôle.

Créer la fonction Lambda

  1. Dans la console AWS, accédez à Lambda > Fonctions > Créer une fonction.
  2. Cliquez sur Créer à partir de zéro.
  3. Fournissez les informations de configuration suivantes :

    Paramètre Valeur
    Nom cloudlock-data-export
    Durée d'exécution Python 3.12 (dernière version compatible)
    Architecture x86_64
    Rôle d'exécution cloudlock-lambda-role
  4. Une fois la fonction créée, ouvrez l'onglet Code, supprimez le stub et saisissez le code suivant (cloudlock-data-export.py) :

    import json
    import boto3
    import urllib3
    import os
    from datetime import datetime, timedelta
    import logging
    import time
    
    # Configure logging
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)
    
    # Initialize S3 client
    s3_client = boto3.client('s3')
    
    def lambda_handler(event, context):
        """
        Lambda function to fetch Cisco CloudLock CASB data and store in S3
        """
    
        # Environment variables
        s3_bucket = os.environ['S3_BUCKET']
        s3_prefix = os.environ['S3_PREFIX']
        state_key = os.environ['STATE_KEY']
        api_token = os.environ['CLOUDLOCK_API_TOKEN']
        api_base = os.environ['CLOUDLOCK_API_BASE']
    
        # HTTP client
        http = urllib3.PoolManager()
    
        try:
            # Get last run state for all endpoints
            state = get_last_run_state(s3_bucket, state_key)
    
            # Fetch incidents data (using updated_after for incremental sync)
            incidents_updated_after = state.get('incidents_updated_after')
            incidents, new_incidents_state = fetch_cloudlock_incidents(
                http, api_base, api_token, incidents_updated_after
            )
            if incidents:
                upload_to_s3_ndjson(s3_bucket, s3_prefix, 'incidents', incidents)
                logger.info(f"Uploaded {len(incidents)} incidents to S3")
                state['incidents_updated_after'] = new_incidents_state
    
            # Fetch activities data (using from/to time range)
            activities_from = state.get('activities_from')
            if not activities_from:
                activities_from = (datetime.utcnow() - timedelta(hours=24)).isoformat()
    
            activities_to = datetime.utcnow().isoformat()
            activities = fetch_cloudlock_activities(
                http, api_base, api_token, activities_from, activities_to
            )
            if activities:
                upload_to_s3_ndjson(s3_bucket, s3_prefix, 'activities', activities)
                logger.info(f"Uploaded {len(activities)} activities to S3")
                state['activities_from'] = activities_to
    
            # Fetch entities data (using updated_after for incremental sync)
            entities_updated_after = state.get('entities_updated_after')
            entities, new_entities_state = fetch_cloudlock_entities(
                http, api_base, api_token, entities_updated_after
            )
            if entities:
                upload_to_s3_ndjson(s3_bucket, s3_prefix, 'entities', entities)
                logger.info(f"Uploaded {len(entities)} entities to S3")
                state['entities_updated_after'] = new_entities_state
    
            # Update consolidated state
            state['updated_at'] = datetime.utcnow().isoformat()
            update_last_run_state(s3_bucket, state_key, state)
    
            return {
                'statusCode': 200,
                'body': json.dumps('CloudLock data export completed successfully')
            }
    
        except Exception as e:
            logger.error(f"Error in lambda_handler: {str(e)}")
            return {
                'statusCode': 500,
                'body': json.dumps(f'Error: {str(e)}')
            }
    
    def make_api_request(http, url, headers, retries=3):
        """
        Make API request with exponential backoff retry logic
        """
        for attempt in range(retries):
            try:
                response = http.request('GET', url, headers=headers)
    
                if response.status == 200:
                    return response
                elif response.status == 429:  # Rate limit
                    retry_after = int(response.headers.get('Retry-After', 60))
                    logger.warning(f"Rate limited, waiting {retry_after} seconds")
                    time.sleep(retry_after)
                else:
                    logger.error(f"API request failed with status {response.status}")
    
            except Exception as e:
                logger.error(f"Request attempt {attempt + 1} failed: {str(e)}")
                if attempt < retries - 1:
                    wait_time = 2 ** attempt
                    time.sleep(wait_time)
                else:
                    raise
    
        return None
    
    def fetch_cloudlock_incidents(http, api_base, api_token, updated_after=None):
        """
        Fetch incidents data from CloudLock API using updated_after for incremental sync
        API Reference: https://developer.cisco.com/docs/cloud-security/
        """
        url = f"{api_base}/api/v2/incidents"
        headers = {
            'Authorization': f'Bearer {api_token}',
            'Content-Type': 'application/json'
        }
    
        params = {
            'limit': 1000,
            'offset': 0,
            'count_total': 'false'
        }
    
        if updated_after:
            params['updated_after'] = updated_after
    
        all_data = []
        latest_updated_at = updated_after
    
        try:
            while True:
                # Build URL with parameters (avoid logging sensitive data)
                param_string = '&'.join([f"{k}={v}" for k, v in params.items()])
                full_url = f"{url}?{param_string}"
    
                logger.info(f"Fetching incidents with offset: {params['offset']}")
    
                response = make_api_request(http, full_url, headers)
                if not response:
                    break
    
                data = json.loads(response.data.decode('utf-8'))
                batch_data = data if isinstance(data, list) else data.get('data', [])
    
                if not batch_data:
                    break
    
                all_data.extend(batch_data)
    
                # Track latest updated_at for incremental sync
                for item in batch_data:
                    if 'updated_at' in item:
                        item_updated_at = item['updated_at']
                        if not latest_updated_at or item_updated_at > latest_updated_at:
                            latest_updated_at = item_updated_at
    
                # Check pagination
                if len(batch_data) < params['limit']:
                    break
    
                params['offset'] += params['limit']
    
            logger.info(f"Fetched {len(all_data)} incidents")
            return all_data, latest_updated_at
    
        except Exception as e:
            logger.error(f"Error fetching incidents: {str(e)}")
            return [], updated_after
    
    def fetch_cloudlock_activities(http, api_base, api_token, from_time, to_time):
        """
        Fetch activities data from CloudLock API using time range
        API Reference: https://developer.cisco.com/docs/cloud-security/
        """
        url = f"{api_base}/api/v2/activities"
        headers = {
            'Authorization': f'Bearer {api_token}',
            'Content-Type': 'application/json'
        }
    
        params = {
            'limit': 1000,
            'offset': 0,
            'from': from_time,
            'to': to_time
        }
    
        all_data = []
    
        try:
            while True:
                param_string = '&'.join([f"{k}={v}" for k, v in params.items()])
                full_url = f"{url}?{param_string}"
    
                logger.info(f"Fetching activities with offset: {params['offset']}")
    
                response = make_api_request(http, full_url, headers)
                if not response:
                    break
    
                data = json.loads(response.data.decode('utf-8'))
                batch_data = data if isinstance(data, list) else data.get('data', [])
    
                if not batch_data:
                    break
    
                all_data.extend(batch_data)
    
                if len(batch_data) < params['limit']:
                    break
    
                params['offset'] += params['limit']
    
            logger.info(f"Fetched {len(all_data)} activities")
            return all_data
    
        except Exception as e:
            logger.error(f"Error fetching activities: {str(e)}")
            return []
    
    def fetch_cloudlock_entities(http, api_base, api_token, updated_after=None):
        """
        Fetch entities data from CloudLock API using updated_after for incremental sync
        API Reference: https://developer.cisco.com/docs/cloud-security/
        """
        url = f"{api_base}/api/v2/entities"
        headers = {
            'Authorization': f'Bearer {api_token}',
            'Content-Type': 'application/json'
        }
    
        params = {
            'limit': 1000,
            'offset': 0
        }
    
        if updated_after:
            params['updated_after'] = updated_after
    
        all_data = []
        latest_updated_at = updated_after
    
        try:
            while True:
                param_string = '&'.join([f"{k}={v}" for k, v in params.items()])
                full_url = f"{url}?{param_string}"
    
                logger.info(f"Fetching entities with offset: {params['offset']}")
    
                response = make_api_request(http, full_url, headers)
                if not response:
                    break
    
                data = json.loads(response.data.decode('utf-8'))
                batch_data = data if isinstance(data, list) else data.get('data', [])
    
                if not batch_data:
                    break
    
                all_data.extend(batch_data)
    
                # Track latest updated_at for incremental sync
                for item in batch_data:
                    if 'updated_at' in item:
                        item_updated_at = item['updated_at']
                        if not latest_updated_at or item_updated_at > latest_updated_at:
                            latest_updated_at = item_updated_at
    
                if len(batch_data) < params['limit']:
                    break
    
                params['offset'] += params['limit']
    
            logger.info(f"Fetched {len(all_data)} entities")
            return all_data, latest_updated_at
    
        except Exception as e:
            logger.error(f"Error fetching entities: {str(e)}")
            return [], updated_after
    
    def upload_to_s3_ndjson(bucket, prefix, data_type, data):
        """
        Upload data to S3 bucket in NDJSON format (one JSON object per line)
        """
        timestamp = datetime.utcnow().strftime('%Y/%m/%d/%H')
        filename = f"{prefix}{data_type}/{timestamp}/cloudlock_{data_type}_{int(datetime.utcnow().timestamp())}.jsonl"
    
        try:
            # Convert to NDJSON format
            ndjson_content = 'n'.join([json.dumps(item, separators=(',', ':')) for item in data])
    
            s3_client.put_object(
                Bucket=bucket,
                Key=filename,
                Body=ndjson_content,
                ContentType='application/x-ndjson'
            )
            logger.info(f"Successfully uploaded {filename} to S3")
        except Exception as e:
            logger.error(f"Error uploading to S3: {str(e)}")
            raise
    
    def get_last_run_state(bucket, key):
        """
        Get the last run state from S3 with separate tracking for each endpoint
        """
        try:
            response = s3_client.get_object(Bucket=bucket, Key=key)
            state = json.loads(response['Body'].read().decode('utf-8'))
            return state
        except s3_client.exceptions.NoSuchKey:
            logger.info("No previous state found, starting fresh")
            return {}
        except Exception as e:
            logger.error(f"Error reading state: {str(e)}")
            return {}
    
    def update_last_run_state(bucket, key, state):
        """
        Update the consolidated state in S3
        """
        try:
            s3_client.put_object(
                Bucket=bucket,
                Key=key,
                Body=json.dumps(state, indent=2),
                ContentType='application/json'
            )
            logger.info("Updated state successfully")
        except Exception as e:
            logger.error(f"Error updating state: {str(e)}")
            raise
    
  5. Accédez à Configuration > Variables d'environnement.

  6. Cliquez sur Modifier > Ajouter une variable d'environnement.

  7. Saisissez les variables d'environnement fournies ci-dessous, en remplaçant par vos valeurs.

    Clé Exemple de valeur
    S3_BUCKET cisco-cloudlock-logs
    S3_PREFIX cloudlock/
    STATE_KEY cloudlock/state.json
    CLOUDLOCK_API_TOKEN <your-api-token>
    CLOUDLOCK_API_BASE <your-cloudlock-api-url>
  8. Une fois la fonction créée, restez sur sa page (ou ouvrez Lambda > Fonctions > votre-fonction).

  9. Accédez à l'onglet Configuration.

  10. Dans le panneau Configuration générale, cliquez sur Modifier.

  11. Définissez le délai avant expiration sur 5 minutes (300 secondes), puis cliquez sur Enregistrer.

Créer une programmation EventBridge

  1. Accédez à Amazon EventBridge> Scheduler> Create schedule.
  2. Fournissez les informations de configuration suivantes :
    • Planning récurrent : Tarif (1 hour).
    • Cible : votre fonction Lambda cloudlock-data-export.
    • Nom : cloudlock-data-export-1h.
  3. Cliquez sur Créer la programmation.

Facultatif : Créez un utilisateur et des clés IAM en lecture seule pour Google SecOps

  1. Accédez à la console AWS> IAM> Utilisateurs> Ajouter des utilisateurs.
  2. Cliquez sur Add users (Ajouter des utilisateurs).
  3. Fournissez les informations de configuration suivantes :
    • Utilisateur : saisissez secops-reader.
    • Type d'accès : sélectionnez Clé d'accès – Accès programmatique.
  4. Cliquez sur Créer un utilisateur.
  5. Associez une stratégie de lecture minimale (personnalisée) : Utilisateurs > secops-reader > Autorisations > Ajouter des autorisations > Associer des stratégies directement > Créer une stratégie.
  6. Dans l'éditeur JSON, saisissez la stratégie suivante :

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::cisco-cloudlock-logs/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::cisco-cloudlock-logs"
        }
      ]
    }
    
  7. Définissez le nom sur secops-reader-policy.

  8. Accédez à Créer une règle > recherchez/sélectionnez > Suivant > Ajouter des autorisations.

  9. Accédez à Identifiants de sécurité > Clés d'accès > Créer une clé d'accès.

  10. Téléchargez le CSV (ces valeurs sont saisies dans le flux).

Configurer un flux dans Google SecOps pour ingérer les journaux Cisco CloudLock

  1. Accédez à Paramètres SIEM> Flux.
  2. Cliquez sur + Ajouter un flux.
  3. Dans le champ Nom du flux, saisissez un nom pour le flux (par exemple, Cisco CloudLock logs).
  4. Sélectionnez Amazon S3 V2 comme type de source.
  5. Sélectionnez Cisco CloudLock comme Type de journal.
  6. Cliquez sur Suivant.
  7. Spécifiez les valeurs des paramètres d'entrée suivants :
    • URI S3 : s3://cisco-cloudlock-logs/cloudlock/
    • Options de suppression de la source : sélectionnez l'option de suppression de votre choix.
    • Âge maximal des fichiers : incluez les fichiers modifiés au cours des derniers jours. La valeur par défaut est de 180 jours.
    • ID de clé d'accès : clé d'accès utilisateur ayant accès au bucket S3.
    • Clé d'accès secrète : clé secrète de l'utilisateur ayant accès au bucket S3.
    • Espace de noms de l'élément : espace de noms de l'élément.
    • Libellés d'ingestion : libellé appliqué aux événements de ce flux.
  8. Cliquez sur Suivant.
  9. Vérifiez la configuration de votre nouveau flux sur l'écran Finaliser, puis cliquez sur Envoyer.

Table de mappage UDM

Champ de journal Mappage UDM Logique
created_at about.resource.attribute.labels.key La valeur du champ created_at est attribuée à la clé des libellés.
created_at about.resource.attribute.labels.value La valeur du champ created_at est attribuée à la valeur des libellés.
created_at about.resource.attribute.creation_time Le champ created_at est analysé en tant qu'horodatage et mappé.
entity.id target.asset.product_object_id Le champ entity.id est renommé.
entity.ip target.ip Le champ entity.ip est fusionné dans le champ d'adresse IP cible.
entity.mime_type target.file.mime_type Le champ entity.mime_type est renommé lorsque entity.origin_type est défini sur "document".
entity.name target.application Le champ entity.name est renommé lorsque entity.origin_type est défini sur "app".
entity.name target.file.full_path Le champ entity.name est renommé lorsque entity.origin_type est défini sur "document".
entity.origin_id target.resource.product_object_id Le champ entity.origin_id est renommé.
entity.origin_type target.resource.resource_subtype Le champ entity.origin_type est renommé.
entity.owner_email target.user.email_addresses Le champ entity.owner_email est fusionné dans le champ de l'adresse e-mail de l'utilisateur cible s'il correspond à une expression régulière d'adresse e-mail.
entity.owner_email target.user.user_display_name Le champ entity.owner_email est renommé s'il ne correspond pas à une expression régulière d'adresse e-mail.
entity.owner_name target.user.user_display_name Le champ entity.owner_name est renommé lorsque entity.owner_email correspond à une expression régulière d'adresse e-mail.
entity.vendor.name target.platform_version Le champ entity.vendor.name est renommé.
id metadata.product_log_id Le champ id est renommé.
incident_status metadata.product_event_type Le champ incident_status est renommé. La valeur est codée en dur sur "updated_at". La valeur est dérivée du champ updated_at. Le champ updated_at est analysé en tant qu'horodatage et mappé. Défini sur "true" si severity est défini sur "ALERT" et incident_status sur "NEW". Converti en valeur booléenne. Défini sur "true" si severity est défini sur "ALERT" et incident_status sur "NEW". Converti en valeur booléenne. La valeur est codée en dur sur "GENERIC_EVENT". La valeur est codée en dur sur "CISCO_CLOUDLOCK_CASB". La valeur est codée en dur sur "CloudLock". La valeur est codée en dur sur "Cisco". Définissez sur "ALERTING" si severity est "ALERT" et que incident_status n'est pas "RESOLVED" ni "DISMISSED". Définissez sur "NOT_ALERTING" si severity est défini sur "ALERT" et incident_status sur "RESOLVED" ou "DISMISSED". Dérivé du tableau matches, plus précisément de la clé de chaque objet de correspondance. Dérivé du tableau matches, plus précisément de la valeur de chaque objet de correspondance. Dérivé de policy.id. Dérivé de policy.name. Définissez sur "INFORMATIONAL" si severity est "INFO". Définissez sur "CRITICAL" si severity est défini sur "CRITICAL". Dérivé de severity. La valeur est définie sur "match count: " concaténée avec la valeur de match_count. Définissez sur "STORAGE_OBJECT" lorsque entity.origin_type est défini sur "document". Dérivé de entity.direct_url lorsque entity.origin_type est "document".
policy.id security_result.rule_id Le champ policy.id est renommé.
policy.name security_result.rule_name Le champ policy.name est renommé.
severity security_result.severity_details Le champ severity est renommé.
updated_at about.resource.attribute.labels.key La valeur du champ updated_at est attribuée à la clé des libellés.
updated_at about.resource.attribute.labels.value La valeur du champ updated_at est attribuée à la valeur des libellés.
updated_at about.resource.attribute.last_update_time Le champ updated_at est analysé en tant qu'horodatage et mappé.

Vous avez encore besoin d'aide ? Obtenez des réponses de membres de la communauté et de professionnels Google SecOps.