Mengumpulkan log CASB Cisco CloudLock
Dokumen ini menjelaskan cara menyerap log CASB Cisco CloudLock ke Google Security Operations menggunakan Amazon S3. Parser mengekstrak kolom dari log JSON, mentransformasi, dan memetakannya ke Model Data Terpadu (UDM). Fungsi ini menangani parsing tanggal, mengonversi kolom tertentu menjadi string, memetakan kolom ke entitas UDM (metadata, target, hasil keamanan, tentang), dan melakukan iterasi melalui matches untuk mengekstrak kolom deteksi, yang pada akhirnya menggabungkan semua data yang diekstrak ke dalam kolom @output.
Sebelum memulai
- Instance Google SecOps
- Akses istimewa ke tenant Cisco CloudLock CASB
- Akses istimewa ke AWS (S3, IAM, Lambda, EventBridge)
Mendapatkan prasyarat Cisco CloudLock
- Login ke Konsol Admin CASB Cisco CloudLock.
- Buka Setelan.
- Klik tab Authentication & API.
- Di bagian API, klik Buat untuk membuat token akses Anda.
- Salin dan simpan detail berikut di lokasi yang aman:
- Token Akses API
- URL Server API CloudLock (hubungi Dukungan Cloudlock untuk mendapatkan URL khusus organisasi Anda)
Mengonfigurasi bucket AWS S3 dan IAM untuk Google SecOps
- Buat bucket Amazon S3 dengan mengikuti panduan pengguna ini: Membuat bucket
- Simpan Name dan Region bucket untuk referensi di masa mendatang (misalnya,
cisco-cloudlock-logs). - Buat Pengguna dengan mengikuti panduan pengguna ini: Membuat pengguna IAM.
- Pilih Pengguna yang dibuat.
- Pilih tab Kredensial keamanan.
- Klik Create Access Key di bagian Access Keys.
- Pilih Layanan pihak ketiga sebagai Kasus penggunaan.
- Klik Berikutnya.
- Opsional: Tambahkan tag deskripsi.
- Klik Create access key.
- Klik Download CSV file untuk menyimpan Access Key dan Secret Access Key untuk referensi di masa mendatang.
- Klik Selesai.
- Pilih tab Permissions.
- Klik Tambahkan izin di bagian Kebijakan izin.
- Pilih Tambahkan izin.
- Pilih Lampirkan kebijakan secara langsung.
- Cari kebijakan AmazonS3FullAccess.
- Pilih kebijakan.
- Klik Berikutnya.
- Klik Add permissions.
Mengonfigurasi kebijakan dan peran IAM untuk upload S3
- Di konsol AWS, buka IAM > Policies.
- Klik Buat kebijakan > tab JSON.
Masukkan kebijakan berikut:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::cisco-cloudlock-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::cisco-cloudlock-logs/cloudlock/state.json" } ] }- Ganti
cisco-cloudlock-logsjika Anda memasukkan nama bucket yang berbeda.
- Ganti
Klik Berikutnya > Buat kebijakan.
Buka IAM > Roles > Create role > AWS service > Lambda.
Lampirkan kebijakan yang baru dibuat.
Beri nama peran
cloudlock-lambda-role, lalu klik Buat peran.
Buat fungsi Lambda
- Di Konsol AWS, buka Lambda > Functions > Create function.
- Klik Buat dari awal.
Berikan detail konfigurasi berikut:
Setelan Nilai Nama cloudlock-data-exportRuntime Python 3.12 (terbaru yang didukung) Arsitektur x86_64 Peran eksekusi cloudlock-lambda-roleSetelah fungsi dibuat, buka tab Code, hapus stub, dan masukkan kode berikut (
cloudlock-data-export.py):import json import boto3 import urllib3 import os from datetime import datetime, timedelta import logging import time # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Initialize S3 client s3_client = boto3.client('s3') def lambda_handler(event, context): """ Lambda function to fetch Cisco CloudLock CASB data and store in S3 """ # Environment variables s3_bucket = os.environ['S3_BUCKET'] s3_prefix = os.environ['S3_PREFIX'] state_key = os.environ['STATE_KEY'] api_token = os.environ['CLOUDLOCK_API_TOKEN'] api_base = os.environ['CLOUDLOCK_API_BASE'] # HTTP client http = urllib3.PoolManager() try: # Get last run state for all endpoints state = get_last_run_state(s3_bucket, state_key) # Fetch incidents data (using updated_after for incremental sync) incidents_updated_after = state.get('incidents_updated_after') incidents, new_incidents_state = fetch_cloudlock_incidents( http, api_base, api_token, incidents_updated_after ) if incidents: upload_to_s3_ndjson(s3_bucket, s3_prefix, 'incidents', incidents) logger.info(f"Uploaded {len(incidents)} incidents to S3") state['incidents_updated_after'] = new_incidents_state # Fetch activities data (using from/to time range) activities_from = state.get('activities_from') if not activities_from: activities_from = (datetime.utcnow() - timedelta(hours=24)).isoformat() activities_to = datetime.utcnow().isoformat() activities = fetch_cloudlock_activities( http, api_base, api_token, activities_from, activities_to ) if activities: upload_to_s3_ndjson(s3_bucket, s3_prefix, 'activities', activities) logger.info(f"Uploaded {len(activities)} activities to S3") state['activities_from'] = activities_to # Fetch entities data (using updated_after for incremental sync) entities_updated_after = state.get('entities_updated_after') entities, new_entities_state = fetch_cloudlock_entities( http, api_base, api_token, entities_updated_after ) if entities: upload_to_s3_ndjson(s3_bucket, s3_prefix, 'entities', entities) logger.info(f"Uploaded {len(entities)} entities to S3") state['entities_updated_after'] = new_entities_state # Update consolidated state state['updated_at'] = datetime.utcnow().isoformat() update_last_run_state(s3_bucket, state_key, state) return { 'statusCode': 200, 'body': json.dumps('CloudLock data export completed successfully') } except Exception as e: logger.error(f"Error in lambda_handler: {str(e)}") return { 'statusCode': 500, 'body': json.dumps(f'Error: {str(e)}') } def make_api_request(http, url, headers, retries=3): """ Make API request with exponential backoff retry logic """ for attempt in range(retries): try: response = http.request('GET', url, headers=headers) if response.status == 200: return response elif response.status == 429: # Rate limit retry_after = int(response.headers.get('Retry-After', 60)) logger.warning(f"Rate limited, waiting {retry_after} seconds") time.sleep(retry_after) else: logger.error(f"API request failed with status {response.status}") except Exception as e: logger.error(f"Request attempt {attempt + 1} failed: {str(e)}") if attempt < retries - 1: wait_time = 2 ** attempt time.sleep(wait_time) else: raise return None def fetch_cloudlock_incidents(http, api_base, api_token, updated_after=None): """ Fetch incidents data from CloudLock API using updated_after for incremental sync API Reference: https://developer.cisco.com/docs/cloud-security/ """ url = f"{api_base}/api/v2/incidents" headers = { 'Authorization': f'Bearer {api_token}', 'Content-Type': 'application/json' } params = { 'limit': 1000, 'offset': 0, 'count_total': 'false' } if updated_after: params['updated_after'] = updated_after all_data = [] latest_updated_at = updated_after try: while True: # Build URL with parameters (avoid logging sensitive data) param_string = '&'.join([f"{k}={v}" for k, v in params.items()]) full_url = f"{url}?{param_string}" logger.info(f"Fetching incidents with offset: {params['offset']}") response = make_api_request(http, full_url, headers) if not response: break data = json.loads(response.data.decode('utf-8')) batch_data = data if isinstance(data, list) else data.get('data', []) if not batch_data: break all_data.extend(batch_data) # Track latest updated_at for incremental sync for item in batch_data: if 'updated_at' in item: item_updated_at = item['updated_at'] if not latest_updated_at or item_updated_at > latest_updated_at: latest_updated_at = item_updated_at # Check pagination if len(batch_data) < params['limit']: break params['offset'] += params['limit'] logger.info(f"Fetched {len(all_data)} incidents") return all_data, latest_updated_at except Exception as e: logger.error(f"Error fetching incidents: {str(e)}") return [], updated_after def fetch_cloudlock_activities(http, api_base, api_token, from_time, to_time): """ Fetch activities data from CloudLock API using time range API Reference: https://developer.cisco.com/docs/cloud-security/ """ url = f"{api_base}/api/v2/activities" headers = { 'Authorization': f'Bearer {api_token}', 'Content-Type': 'application/json' } params = { 'limit': 1000, 'offset': 0, 'from': from_time, 'to': to_time } all_data = [] try: while True: param_string = '&'.join([f"{k}={v}" for k, v in params.items()]) full_url = f"{url}?{param_string}" logger.info(f"Fetching activities with offset: {params['offset']}") response = make_api_request(http, full_url, headers) if not response: break data = json.loads(response.data.decode('utf-8')) batch_data = data if isinstance(data, list) else data.get('data', []) if not batch_data: break all_data.extend(batch_data) if len(batch_data) < params['limit']: break params['offset'] += params['limit'] logger.info(f"Fetched {len(all_data)} activities") return all_data except Exception as e: logger.error(f"Error fetching activities: {str(e)}") return [] def fetch_cloudlock_entities(http, api_base, api_token, updated_after=None): """ Fetch entities data from CloudLock API using updated_after for incremental sync API Reference: https://developer.cisco.com/docs/cloud-security/ """ url = f"{api_base}/api/v2/entities" headers = { 'Authorization': f'Bearer {api_token}', 'Content-Type': 'application/json' } params = { 'limit': 1000, 'offset': 0 } if updated_after: params['updated_after'] = updated_after all_data = [] latest_updated_at = updated_after try: while True: param_string = '&'.join([f"{k}={v}" for k, v in params.items()]) full_url = f"{url}?{param_string}" logger.info(f"Fetching entities with offset: {params['offset']}") response = make_api_request(http, full_url, headers) if not response: break data = json.loads(response.data.decode('utf-8')) batch_data = data if isinstance(data, list) else data.get('data', []) if not batch_data: break all_data.extend(batch_data) # Track latest updated_at for incremental sync for item in batch_data: if 'updated_at' in item: item_updated_at = item['updated_at'] if not latest_updated_at or item_updated_at > latest_updated_at: latest_updated_at = item_updated_at if len(batch_data) < params['limit']: break params['offset'] += params['limit'] logger.info(f"Fetched {len(all_data)} entities") return all_data, latest_updated_at except Exception as e: logger.error(f"Error fetching entities: {str(e)}") return [], updated_after def upload_to_s3_ndjson(bucket, prefix, data_type, data): """ Upload data to S3 bucket in NDJSON format (one JSON object per line) """ timestamp = datetime.utcnow().strftime('%Y/%m/%d/%H') filename = f"{prefix}{data_type}/{timestamp}/cloudlock_{data_type}_{int(datetime.utcnow().timestamp())}.jsonl" try: # Convert to NDJSON format ndjson_content = 'n'.join([json.dumps(item, separators=(',', ':')) for item in data]) s3_client.put_object( Bucket=bucket, Key=filename, Body=ndjson_content, ContentType='application/x-ndjson' ) logger.info(f"Successfully uploaded {filename} to S3") except Exception as e: logger.error(f"Error uploading to S3: {str(e)}") raise def get_last_run_state(bucket, key): """ Get the last run state from S3 with separate tracking for each endpoint """ try: response = s3_client.get_object(Bucket=bucket, Key=key) state = json.loads(response['Body'].read().decode('utf-8')) return state except s3_client.exceptions.NoSuchKey: logger.info("No previous state found, starting fresh") return {} except Exception as e: logger.error(f"Error reading state: {str(e)}") return {} def update_last_run_state(bucket, key, state): """ Update the consolidated state in S3 """ try: s3_client.put_object( Bucket=bucket, Key=key, Body=json.dumps(state, indent=2), ContentType='application/json' ) logger.info("Updated state successfully") except Exception as e: logger.error(f"Error updating state: {str(e)}") raiseBuka Configuration > Environment variables.
Klik Edit > Tambahkan variabel lingkungan baru.
Masukkan variabel lingkungan berikut yang disediakan, lalu ganti dengan nilai Anda.
Kunci Nilai contoh S3_BUCKETcisco-cloudlock-logsS3_PREFIXcloudlock/STATE_KEYcloudlock/state.jsonCLOUDLOCK_API_TOKEN<your-api-token>CLOUDLOCK_API_BASE<your-cloudlock-api-url>Setelah fungsi dibuat, tetap buka halamannya (atau buka Lambda > Functions > your-function).
Pilih tab Configuration
Di panel General configuration, klik Edit.
Ubah Waktu Tunggu menjadi 5 menit (300 detik), lalu klik Simpan.
Membuat jadwal EventBridge
- Buka Amazon EventBridge > Scheduler > Create schedule.
- Berikan detail konfigurasi berikut:
- Jadwal berulang: Tarif (
1 hour). - Target: fungsi Lambda Anda
cloudlock-data-export. - Name:
cloudlock-data-export-1h.
- Jadwal berulang: Tarif (
- Klik Buat jadwal.
Opsional: Buat pengguna & kunci IAM hanya baca untuk Google SecOps
- Buka Konsol AWS > IAM > Pengguna > Tambahkan pengguna.
- Klik Add users.
- Berikan detail konfigurasi berikut:
- Pengguna: Masukkan
secops-reader. - Jenis akses: Pilih Kunci akses – Akses terprogram.
- Pengguna: Masukkan
- Klik Buat pengguna.
- Lampirkan kebijakan baca minimal (kustom): Pengguna > secops-reader > Izin > Tambahkan izin > Lampirkan kebijakan secara langsung > Buat kebijakan.
Di editor JSON, masukkan kebijakan berikut:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::cisco-cloudlock-logs/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::cisco-cloudlock-logs" } ] }Tetapkan nama ke
secops-reader-policy.Buka Buat kebijakan > cari/pilih > Berikutnya > Tambahkan izin.
Buka Kredensial keamanan > Kunci akses > Buat kunci akses.
Download CSV (nilai ini dimasukkan ke dalam feed).
Mengonfigurasi feed di Google SecOps untuk menyerap log Cisco CloudLock
- Buka Setelan SIEM > Feed.
- Klik + Tambahkan Feed Baru.
- Di kolom Nama feed, masukkan nama untuk feed (misalnya,
Cisco CloudLock logs). - Pilih Amazon S3 V2 sebagai Jenis sumber.
- Pilih Cisco CloudLock sebagai Log type.
- Klik Berikutnya.
- Tentukan nilai untuk parameter input berikut:
- URI S3:
s3://cisco-cloudlock-logs/cloudlock/ - Opsi penghapusan sumber: Pilih opsi penghapusan sesuai preferensi Anda.
- Usia File Maksimum: Menyertakan file yang diubah dalam jumlah hari terakhir. Defaultnya adalah 180 hari.
- ID Kunci Akses: Kunci akses pengguna dengan akses ke bucket S3.
- Kunci Akses Rahasia: Kunci rahasia pengguna dengan akses ke bucket S3.
- Namespace aset: Namespace aset.
- Label penyerapan: Label yang diterapkan ke peristiwa dari feed ini.
- URI S3:
- Klik Berikutnya.
- Tinjau konfigurasi feed baru Anda di layar Selesaikan, lalu klik Kirim.
Tabel Pemetaan UDM
| Kolom Log | Pemetaan UDM | Logika |
|---|---|---|
created_at |
about.resource.attribute.labels.key |
Nilai kolom created_at ditetapkan ke kunci label. |
created_at |
about.resource.attribute.labels.value |
Nilai kolom created_at ditetapkan ke nilai label. |
created_at |
about.resource.attribute.creation_time |
Kolom created_at diuraikan sebagai stempel waktu dan dipetakan. |
entity.id |
target.asset.product_object_id |
Kolom entity.id diganti namanya. |
entity.ip |
target.ip |
Kolom entity.ip digabungkan ke kolom IP target. |
entity.mime_type |
target.file.mime_type |
Kolom entity.mime_type diganti namanya saat entity.origin_type adalah "document". |
entity.name |
target.application |
Kolom entity.name diganti namanya saat entity.origin_type adalah "app". |
entity.name |
target.file.full_path |
Kolom entity.name diganti namanya saat entity.origin_type adalah "document". |
entity.origin_id |
target.resource.product_object_id |
Kolom entity.origin_id diganti namanya. |
entity.origin_type |
target.resource.resource_subtype |
Kolom entity.origin_type diganti namanya. |
entity.owner_email |
target.user.email_addresses |
Kolom entity.owner_email digabungkan ke kolom email pengguna target jika cocok dengan regex email. |
entity.owner_email |
target.user.user_display_name |
Kolom entity.owner_email akan diganti namanya jika tidak cocok dengan ekspresi reguler email. |
entity.owner_name |
target.user.user_display_name |
Kolom entity.owner_name diganti namanya saat entity.owner_email cocok dengan regex email. |
entity.vendor.name |
target.platform_version |
Kolom entity.vendor.name diganti namanya. |
id |
metadata.product_log_id |
Kolom id diganti namanya. |
incident_status |
metadata.product_event_type |
Kolom incident_status diganti namanya. Nilai di-hardcode ke "updated_at". Nilai berasal dari kolom updated_at. Kolom updated_at diuraikan sebagai stempel waktu dan dipetakan. Ditetapkan ke "true" jika severity adalah "ALERT" dan incident_status adalah "NEW". Dikonversi ke boolean. Ditetapkan ke "true" jika severity adalah "ALERT" dan incident_status adalah "NEW". Dikonversi ke boolean. Nilai di-hardcode ke "GENERIC_EVENT". Nilai dikodekan secara permanen menjadi "CISCO_CLOUDLOCK_CASB". Nilai dikodekan secara permanen ke "CloudLock". Nilai di-hardcode ke "Cisco". Ditetapkan ke "ALERTING" jika severity adalah "ALERT" dan incident_status bukan "RESOLVED" atau "DISMISSED". Disetel ke "NOT_ALERTING" jika severity adalah "ALERT" dan incident_status adalah "RESOLVED" atau "DISMISSED". Diperoleh dari array matches, khususnya kunci setiap objek kecocokan. Diperoleh dari array matches, khususnya nilai setiap objek kecocokan. Diperoleh dari policy.id. Diperoleh dari policy.name. Tetapkan ke "INFORMATIONAL" jika severity adalah "INFO". Ditetapkan ke "CRITICAL" jika severity adalah "CRITICAL". Diperoleh dari severity. Nilai ditetapkan ke "jumlah kecocokan: " yang digabungkan dengan nilai match_count. Tetapkan ke "STORAGE_OBJECT" jika entity.origin_type adalah "document". Diperoleh dari entity.direct_url saat entity.origin_type adalah "document". |
policy.id |
security_result.rule_id |
Kolom policy.id diganti namanya. |
policy.name |
security_result.rule_name |
Kolom policy.name diganti namanya. |
severity |
security_result.severity_details |
Kolom severity diganti namanya. |
updated_at |
about.resource.attribute.labels.key |
Nilai kolom updated_at ditetapkan ke kunci label. |
updated_at |
about.resource.attribute.labels.value |
Nilai kolom updated_at ditetapkan ke nilai label. |
updated_at |
about.resource.attribute.last_update_time |
Kolom updated_at diuraikan sebagai stempel waktu dan dipetakan. |
Perlu bantuan lain? Dapatkan jawaban dari anggota Komunitas dan profesional Google SecOps.