Cisco CloudLock CASB-Logs erfassen
In diesem Dokument wird beschrieben, wie Sie Cisco CloudLock CASB-Logs mit Amazon S3 in Google Security Operations aufnehmen. Der Parser extrahiert Felder aus den JSON-Logs, transformiert sie und ordnet sie dem Unified Data Model (UDM) zu. Dabei werden Datumsangaben geparst, bestimmte Felder in Strings konvertiert, Felder UDM-Entitäten (Metadaten, Ziel, Sicherheitsergebnis, „Über“) zugeordnet und matches durchlaufen, um Erkennungsfelder zu extrahieren. Schließlich werden alle extrahierten Daten im Feld @output zusammengeführt.
Hinweise
- Eine Google SecOps-Instanz
- Berechtigter Zugriff auf den Cisco CloudLock CASB-Mandanten
- Privilegierter Zugriff auf AWS (S3, IAM, Lambda, EventBridge)
Voraussetzungen für Cisco CloudLock
- Melden Sie sich in der Cisco CloudLock CASB Admin Console an.
- Gehen Sie zu Einstellungen.
- Klicken Sie auf den Tab Authentifizierung und API.
- Klicken Sie unter API auf Generieren, um Ihr Zugriffstoken zu erstellen.
- Kopieren Sie die folgenden Details und speichern Sie sie an einem sicheren Ort:
- API-Zugriffstoken
- CloudLock API-Server-URL (wenden Sie sich an den CloudLock-Support, um die organisationsspezifische URL zu erhalten)
AWS S3-Bucket und IAM für Google SecOps konfigurieren
- Erstellen Sie einen Amazon S3-Bucket. Folgen Sie dazu der Anleitung unter Bucket erstellen.
- Speichern Sie den Namen und die Region des Buckets zur späteren Verwendung (z. B.
cisco-cloudlock-logs). - Erstellen Sie einen Nutzer gemäß dieser Anleitung: IAM-Nutzer erstellen.
- Wählen Sie den erstellten Nutzer aus.
- Wählen Sie den Tab Sicherheitsanmeldedaten aus.
- Klicken Sie im Abschnitt Zugriffsschlüssel auf Zugriffsschlüssel erstellen.
- Wählen Sie Drittanbieterdienst als Anwendungsfall aus.
- Klicken Sie auf Weiter.
- Optional: Fügen Sie ein Beschreibungstag hinzu.
- Klicken Sie auf Zugriffsschlüssel erstellen.
- Klicken Sie auf CSV-Datei herunterladen, um den Access Key (Zugriffsschlüssel) und den Secret Access Key (geheimer Zugriffsschlüssel) für die zukünftige Verwendung zu speichern.
- Klicken Sie auf Fertig.
- Wählen Sie den Tab Berechtigungen aus.
- Klicken Sie im Bereich Berechtigungsrichtlinien auf Berechtigungen hinzufügen.
- Wählen Sie Berechtigungen hinzufügen aus.
- Wählen Sie Richtlinien direkt anhängen aus.
- Suchen Sie nach der Richtlinie AmazonS3FullAccess.
- Wählen Sie die Richtlinie aus.
- Klicken Sie auf Weiter.
- Klicken Sie auf Berechtigungen hinzufügen.
IAM-Richtlinie und ‑Rolle für S3-Uploads konfigurieren
- Rufen Sie in der AWS Console IAM > Richtlinien auf.
- Klicken Sie auf Richtlinie erstellen> Tab „JSON“.
Geben Sie die folgende Richtlinie ein:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::cisco-cloudlock-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::cisco-cloudlock-logs/cloudlock/state.json" } ] }- Ersetzen Sie
cisco-cloudlock-logs, wenn Sie einen anderen Bucket-Namen eingegeben haben.
- Ersetzen Sie
Klicken Sie auf Weiter > Richtlinie erstellen.
Rufen Sie IAM > Rollen > Rolle erstellen > AWS-Service > Lambda auf.
Hängen Sie die neu erstellte Richtlinie an.
Geben Sie der Rolle den Namen
cloudlock-lambda-roleund klicken Sie auf Rolle erstellen.
Lambda-Funktion erstellen
- Rufen Sie in der AWS Console Lambda > Funktionen > Funktion erstellen auf.
- Klicken Sie auf Von Grund auf erstellen.
Geben Sie die folgenden Konfigurationsdetails an:
Einstellung Wert Name cloudlock-data-exportLaufzeit Python 3.12 (neueste unterstützte Version) Architektur x86_64 Ausführungsrolle cloudlock-lambda-roleNachdem die Funktion erstellt wurde, öffnen Sie den Tab Code, löschen Sie den Stub und geben Sie den folgenden Code ein (
cloudlock-data-export.py):import json import boto3 import urllib3 import os from datetime import datetime, timedelta import logging import time # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Initialize S3 client s3_client = boto3.client('s3') def lambda_handler(event, context): """ Lambda function to fetch Cisco CloudLock CASB data and store in S3 """ # Environment variables s3_bucket = os.environ['S3_BUCKET'] s3_prefix = os.environ['S3_PREFIX'] state_key = os.environ['STATE_KEY'] api_token = os.environ['CLOUDLOCK_API_TOKEN'] api_base = os.environ['CLOUDLOCK_API_BASE'] # HTTP client http = urllib3.PoolManager() try: # Get last run state for all endpoints state = get_last_run_state(s3_bucket, state_key) # Fetch incidents data (using updated_after for incremental sync) incidents_updated_after = state.get('incidents_updated_after') incidents, new_incidents_state = fetch_cloudlock_incidents( http, api_base, api_token, incidents_updated_after ) if incidents: upload_to_s3_ndjson(s3_bucket, s3_prefix, 'incidents', incidents) logger.info(f"Uploaded {len(incidents)} incidents to S3") state['incidents_updated_after'] = new_incidents_state # Fetch activities data (using from/to time range) activities_from = state.get('activities_from') if not activities_from: activities_from = (datetime.utcnow() - timedelta(hours=24)).isoformat() activities_to = datetime.utcnow().isoformat() activities = fetch_cloudlock_activities( http, api_base, api_token, activities_from, activities_to ) if activities: upload_to_s3_ndjson(s3_bucket, s3_prefix, 'activities', activities) logger.info(f"Uploaded {len(activities)} activities to S3") state['activities_from'] = activities_to # Fetch entities data (using updated_after for incremental sync) entities_updated_after = state.get('entities_updated_after') entities, new_entities_state = fetch_cloudlock_entities( http, api_base, api_token, entities_updated_after ) if entities: upload_to_s3_ndjson(s3_bucket, s3_prefix, 'entities', entities) logger.info(f"Uploaded {len(entities)} entities to S3") state['entities_updated_after'] = new_entities_state # Update consolidated state state['updated_at'] = datetime.utcnow().isoformat() update_last_run_state(s3_bucket, state_key, state) return { 'statusCode': 200, 'body': json.dumps('CloudLock data export completed successfully') } except Exception as e: logger.error(f"Error in lambda_handler: {str(e)}") return { 'statusCode': 500, 'body': json.dumps(f'Error: {str(e)}') } def make_api_request(http, url, headers, retries=3): """ Make API request with exponential backoff retry logic """ for attempt in range(retries): try: response = http.request('GET', url, headers=headers) if response.status == 200: return response elif response.status == 429: # Rate limit retry_after = int(response.headers.get('Retry-After', 60)) logger.warning(f"Rate limited, waiting {retry_after} seconds") time.sleep(retry_after) else: logger.error(f"API request failed with status {response.status}") except Exception as e: logger.error(f"Request attempt {attempt + 1} failed: {str(e)}") if attempt < retries - 1: wait_time = 2 ** attempt time.sleep(wait_time) else: raise return None def fetch_cloudlock_incidents(http, api_base, api_token, updated_after=None): """ Fetch incidents data from CloudLock API using updated_after for incremental sync API Reference: https://developer.cisco.com/docs/cloud-security/ """ url = f"{api_base}/api/v2/incidents" headers = { 'Authorization': f'Bearer {api_token}', 'Content-Type': 'application/json' } params = { 'limit': 1000, 'offset': 0, 'count_total': 'false' } if updated_after: params['updated_after'] = updated_after all_data = [] latest_updated_at = updated_after try: while True: # Build URL with parameters (avoid logging sensitive data) param_string = '&'.join([f"{k}={v}" for k, v in params.items()]) full_url = f"{url}?{param_string}" logger.info(f"Fetching incidents with offset: {params['offset']}") response = make_api_request(http, full_url, headers) if not response: break data = json.loads(response.data.decode('utf-8')) batch_data = data if isinstance(data, list) else data.get('data', []) if not batch_data: break all_data.extend(batch_data) # Track latest updated_at for incremental sync for item in batch_data: if 'updated_at' in item: item_updated_at = item['updated_at'] if not latest_updated_at or item_updated_at > latest_updated_at: latest_updated_at = item_updated_at # Check pagination if len(batch_data) < params['limit']: break params['offset'] += params['limit'] logger.info(f"Fetched {len(all_data)} incidents") return all_data, latest_updated_at except Exception as e: logger.error(f"Error fetching incidents: {str(e)}") return [], updated_after def fetch_cloudlock_activities(http, api_base, api_token, from_time, to_time): """ Fetch activities data from CloudLock API using time range API Reference: https://developer.cisco.com/docs/cloud-security/ """ url = f"{api_base}/api/v2/activities" headers = { 'Authorization': f'Bearer {api_token}', 'Content-Type': 'application/json' } params = { 'limit': 1000, 'offset': 0, 'from': from_time, 'to': to_time } all_data = [] try: while True: param_string = '&'.join([f"{k}={v}" for k, v in params.items()]) full_url = f"{url}?{param_string}" logger.info(f"Fetching activities with offset: {params['offset']}") response = make_api_request(http, full_url, headers) if not response: break data = json.loads(response.data.decode('utf-8')) batch_data = data if isinstance(data, list) else data.get('data', []) if not batch_data: break all_data.extend(batch_data) if len(batch_data) < params['limit']: break params['offset'] += params['limit'] logger.info(f"Fetched {len(all_data)} activities") return all_data except Exception as e: logger.error(f"Error fetching activities: {str(e)}") return [] def fetch_cloudlock_entities(http, api_base, api_token, updated_after=None): """ Fetch entities data from CloudLock API using updated_after for incremental sync API Reference: https://developer.cisco.com/docs/cloud-security/ """ url = f"{api_base}/api/v2/entities" headers = { 'Authorization': f'Bearer {api_token}', 'Content-Type': 'application/json' } params = { 'limit': 1000, 'offset': 0 } if updated_after: params['updated_after'] = updated_after all_data = [] latest_updated_at = updated_after try: while True: param_string = '&'.join([f"{k}={v}" for k, v in params.items()]) full_url = f"{url}?{param_string}" logger.info(f"Fetching entities with offset: {params['offset']}") response = make_api_request(http, full_url, headers) if not response: break data = json.loads(response.data.decode('utf-8')) batch_data = data if isinstance(data, list) else data.get('data', []) if not batch_data: break all_data.extend(batch_data) # Track latest updated_at for incremental sync for item in batch_data: if 'updated_at' in item: item_updated_at = item['updated_at'] if not latest_updated_at or item_updated_at > latest_updated_at: latest_updated_at = item_updated_at if len(batch_data) < params['limit']: break params['offset'] += params['limit'] logger.info(f"Fetched {len(all_data)} entities") return all_data, latest_updated_at except Exception as e: logger.error(f"Error fetching entities: {str(e)}") return [], updated_after def upload_to_s3_ndjson(bucket, prefix, data_type, data): """ Upload data to S3 bucket in NDJSON format (one JSON object per line) """ timestamp = datetime.utcnow().strftime('%Y/%m/%d/%H') filename = f"{prefix}{data_type}/{timestamp}/cloudlock_{data_type}_{int(datetime.utcnow().timestamp())}.jsonl" try: # Convert to NDJSON format ndjson_content = 'n'.join([json.dumps(item, separators=(',', ':')) for item in data]) s3_client.put_object( Bucket=bucket, Key=filename, Body=ndjson_content, ContentType='application/x-ndjson' ) logger.info(f"Successfully uploaded {filename} to S3") except Exception as e: logger.error(f"Error uploading to S3: {str(e)}") raise def get_last_run_state(bucket, key): """ Get the last run state from S3 with separate tracking for each endpoint """ try: response = s3_client.get_object(Bucket=bucket, Key=key) state = json.loads(response['Body'].read().decode('utf-8')) return state except s3_client.exceptions.NoSuchKey: logger.info("No previous state found, starting fresh") return {} except Exception as e: logger.error(f"Error reading state: {str(e)}") return {} def update_last_run_state(bucket, key, state): """ Update the consolidated state in S3 """ try: s3_client.put_object( Bucket=bucket, Key=key, Body=json.dumps(state, indent=2), ContentType='application/json' ) logger.info("Updated state successfully") except Exception as e: logger.error(f"Error updating state: {str(e)}") raiseRufen Sie Konfiguration > Umgebungsvariablen auf.
Klicken Sie auf Bearbeiten> Neue Umgebungsvariable hinzufügen.
Geben Sie die folgenden Umgebungsvariablen ein und ersetzen Sie die Platzhalter durch Ihre Werte.
Schlüssel Beispielwert S3_BUCKETcisco-cloudlock-logsS3_PREFIXcloudlock/STATE_KEYcloudlock/state.jsonCLOUDLOCK_API_TOKEN<your-api-token>CLOUDLOCK_API_BASE<your-cloudlock-api-url>Bleiben Sie nach dem Erstellen der Funktion auf der zugehörigen Seite oder öffnen Sie Lambda > Funktionen > Ihre Funktion.
Wählen Sie den Tab Konfiguration aus.
Klicken Sie im Bereich Allgemeine Konfiguration auf Bearbeiten.
Ändern Sie Zeitlimit in 5 Minuten (300 Sekunden) und klicken Sie auf Speichern.
EventBridge-Zeitplan erstellen
- Gehen Sie zu Amazon EventBridge > Scheduler > Create schedule (Amazon EventBridge > Scheduler > Zeitplan erstellen).
- Geben Sie die folgenden Konfigurationsdetails an:
- Wiederkehrender Zeitplan: Preis (
1 hour). - Ziel: Ihre Lambda-Funktion
cloudlock-data-export. - Name:
cloudlock-data-export-1h.
- Wiederkehrender Zeitplan: Preis (
- Klicken Sie auf Zeitplan erstellen.
Optional: IAM-Nutzer mit Lesezugriff und Schlüssel für Google SecOps erstellen
- Rufen Sie die AWS-Konsole > IAM > Nutzer > Nutzer hinzufügen auf.
- Klicken Sie auf Add users (Nutzer hinzufügen).
- Geben Sie die folgenden Konfigurationsdetails an:
- Nutzer: Geben Sie
secops-readerein. - Zugriffstyp: Wählen Sie Zugriffsschlüssel – programmatischer Zugriff aus.
- Nutzer: Geben Sie
- Klicken Sie auf Nutzer erstellen.
- Minimale Leseberechtigung (benutzerdefiniert) anhängen: Nutzer > secops-reader > Berechtigungen > Berechtigungen hinzufügen > Richtlinien direkt anhängen > Richtlinie erstellen.
Geben Sie im JSON-Editor die folgende Richtlinie ein:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::cisco-cloudlock-logs/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::cisco-cloudlock-logs" } ] }Legen Sie
secops-reader-policyals Name fest.Gehen Sie zu Richtlinie erstellen> suchen/auswählen > Weiter > Berechtigungen hinzufügen.
Rufen Sie Sicherheitsanmeldedaten > Zugriffsschlüssel > Zugriffsschlüssel erstellen auf.
Laden Sie die CSV herunter (diese Werte werden in den Feed eingegeben).
Feed in Google SecOps konfigurieren, um Cisco CloudLock-Logs aufzunehmen
- Rufen Sie die SIEM-Einstellungen > Feeds auf.
- Klicken Sie auf + Neuen Feed hinzufügen.
- Geben Sie im Feld Feedname einen Namen für den Feed ein, z. B.
Cisco CloudLock logs. - Wählen Sie Amazon S3 V2 als Quelltyp aus.
- Wählen Sie Cisco CloudLock als Logtyp aus.
- Klicken Sie auf Weiter.
- Geben Sie Werte für die folgenden Eingabeparameter an:
- S3-URI:
s3://cisco-cloudlock-logs/cloudlock/ - Optionen zum Löschen der Quelle: Wählen Sie die gewünschte Option zum Löschen aus.
- Maximales Dateialter: Dateien einschließen, die in den letzten Tagen geändert wurden. Der Standardwert ist 180 Tage.
- Zugriffsschlüssel-ID: Nutzerzugriffsschlüssel mit Zugriff auf den S3-Bucket.
- Geheimer Zugriffsschlüssel: Der geheime Schlüssel des Nutzers mit Zugriff auf den S3-Bucket.
- Asset-Namespace: Der Asset-Namespace.
- Aufnahmelabels: Das Label, das auf die Ereignisse aus diesem Feed angewendet wird.
- S3-URI:
- Klicken Sie auf Weiter.
- Prüfen Sie die neue Feedkonfiguration auf dem Bildschirm Abschließen und klicken Sie dann auf Senden.
UDM-Zuordnungstabelle
| Logfeld | UDM-Zuordnung | Logik |
|---|---|---|
created_at |
about.resource.attribute.labels.key |
Der Wert des Felds created_at wird dem Labelschlüssel zugewiesen. |
created_at |
about.resource.attribute.labels.value |
Der Wert des Felds created_at wird dem Wert des Labels zugewiesen. |
created_at |
about.resource.attribute.creation_time |
Das Feld created_at wird als Zeitstempel geparst und zugeordnet. |
entity.id |
target.asset.product_object_id |
Das Feld entity.id wird umbenannt. |
entity.ip |
target.ip |
Das Feld entity.ip wird in das Feld für die Ziel-IP-Adresse eingefügt. |
entity.mime_type |
target.file.mime_type |
Das Feld entity.mime_type wird umbenannt, wenn entity.origin_type „document“ ist. |
entity.name |
target.application |
Das Feld entity.name wird umbenannt, wenn entity.origin_type „app“ ist. |
entity.name |
target.file.full_path |
Das Feld entity.name wird umbenannt, wenn entity.origin_type „document“ ist. |
entity.origin_id |
target.resource.product_object_id |
Das Feld entity.origin_id wird umbenannt. |
entity.origin_type |
target.resource.resource_subtype |
Das Feld entity.origin_type wird umbenannt. |
entity.owner_email |
target.user.email_addresses |
Das Feld entity.owner_email wird in das E-Mail-Feld des Zielnutzers zusammengeführt, wenn es mit einem E-Mail-Regex übereinstimmt. |
entity.owner_email |
target.user.user_display_name |
Das Feld entity.owner_email wird umbenannt, wenn es nicht mit einem regulären Ausdruck für E-Mail-Adressen übereinstimmt. |
entity.owner_name |
target.user.user_display_name |
Das Feld entity.owner_name wird umbenannt, wenn entity.owner_email mit einem regulären Ausdruck für E-Mail-Adressen übereinstimmt. |
entity.vendor.name |
target.platform_version |
Das Feld entity.vendor.name wird umbenannt. |
id |
metadata.product_log_id |
Das Feld id wird umbenannt. |
incident_status |
metadata.product_event_type |
Das Feld incident_status wird umbenannt. Der Wert ist fest auf „updated_at“ codiert. Der Wert wird aus dem Feld updated_at abgeleitet. Das Feld updated_at wird als Zeitstempel geparst und zugeordnet. Auf „true“ gesetzt, wenn severity „ALERT“ und incident_status „NEW“ ist. In boolesche Werte umgewandelt. Auf „true“ gesetzt, wenn severity „ALERT“ und incident_status „NEW“ ist. In boolesche Werte umgewandelt. Der Wert ist fest auf „GENERIC_EVENT“ codiert. Der Wert ist fest auf „CISCO_CLOUDLOCK_CASB“ codiert. Der Wert ist fest auf „CloudLock“ codiert. Der Wert ist fest auf „Cisco“ codiert. Wird auf „ALERTING“ festgelegt, wenn severity „ALERT“ ist und incident_status nicht „RESOLVED“ oder „DISMISSED“ ist. Wird auf „NOT_ALERTING“ festgelegt, wenn severity „ALERT“ und incident_status „RESOLVED“ oder „DISMISSED“ ist. Abgeleitet aus dem matches-Array, insbesondere dem Schlüssel jedes übereinstimmenden Objekts. Abgeleitet vom matches-Array, insbesondere vom Wert jedes Match-Objekts. Abgeleitet von policy.id. Abgeleitet von policy.name. Wird auf „INFORMATIONAL“ gesetzt, wenn severity „INFO“ ist. Wird auf „CRITICAL“ gesetzt, wenn severity „CRITICAL“ ist. Abgeleitet von severity. Der Wert wird auf „match count: “ (Anzahl der Übereinstimmungen:) festgelegt, gefolgt vom Wert von match_count. Legen Sie „STORAGE_OBJECT“ fest, wenn entity.origin_type „document“ ist. Abgeleitet von entity.direct_url, wenn entity.origin_type „document“ ist. |
policy.id |
security_result.rule_id |
Das Feld policy.id wird umbenannt. |
policy.name |
security_result.rule_name |
Das Feld policy.name wird umbenannt. |
severity |
security_result.severity_details |
Das Feld severity wird umbenannt. |
updated_at |
about.resource.attribute.labels.key |
Der Wert des Felds updated_at wird dem Labelschlüssel zugewiesen. |
updated_at |
about.resource.attribute.labels.value |
Der Wert des Felds updated_at wird dem Wert des Labels zugewiesen. |
updated_at |
about.resource.attribute.last_update_time |
Das Feld updated_at wird als Zeitstempel geparst und zugeordnet. |
Benötigen Sie weitere Hilfe? Antworten von Community-Mitgliedern und Google SecOps-Experten erhalten