Cisco CloudLock CASB のログを収集する
このドキュメントでは、Amazon S3 を使用して Cisco CloudLock CASB ログを Google Security Operations に取り込む方法について説明します。パーサーは、JSON ログからフィールドを抽出し、変換して Unified Data Model(UDM)にマッピングします。日付の解析を処理し、特定のフィールドを文字列に変換し、フィールドを UDM エンティティ(メタデータ、ターゲット、セキュリティ結果、概要)にマッピングし、matches
を反復処理して検出フィールドを抽出し、最終的に抽出されたすべてのデータを @output
フィールドに統合します。
始める前に
- Google SecOps インスタンス
- Cisco CloudLock CASB テナントへの特権アクセス
- AWS(S3、IAM、Lambda、EventBridge)への特権アクセス
Cisco CloudLock の前提条件を取得する
- Cisco CloudLock CASB 管理コンソールにログインします。
- [設定] に移動します。
- [Authentication & API] タブをクリックします。
- [API] で [生成] をクリックして、アクセス トークンを作成します。
- 次の詳細をコピーして安全な場所に保存します。
- API アクセス トークン
- CloudLock API サーバー URL(組織固有の URL については CloudLock サポートにお問い合わせください)
Google SecOps 用に AWS S3 バケットと IAM を構成する
- バケットの作成のユーザーガイドに沿って、Amazon S3 バケットを作成します。
- 後で参照できるように、バケットの名前とリージョンを保存します(例:
cisco-cloudlock-logs
)。 - IAM ユーザーの作成のユーザーガイドに沿って、ユーザーを作成します。
- 作成したユーザーを選択します。
- [セキュリティ認証情報] タブを選択します。
- [アクセスキー] セクションで [アクセスキーを作成] をクリックします。
- [ユースケース] として [サードパーティ サービス] を選択します。
- [次へ] をクリックします。
- 省略可: 説明タグを追加します。
- [アクセスキーを作成] をクリックします。
- [CSV ファイルをダウンロード] をクリックし、[アクセスキー] と [シークレット アクセスキー] を保存して、今後の参照に備えます。
- [完了] をクリックします。
- [権限] タブを選択します。
- [権限ポリシー] セクションの [権限を追加] をクリックします。
- [権限を追加] を選択します。
- [ポリシーを直接アタッチする] を選択します。
- AmazonS3FullAccess ポリシーを検索します。
- ポリシーを選択します。
- [次へ] をクリックします。
- [権限を追加] をクリックします。
S3 アップロードの IAM ポリシーとロールを構成する
- AWS コンソールで、[IAM] > [ポリシー] に移動します。
- [ポリシーを作成> [JSON] タブ] をクリックします。
次のポリシーを入力します。
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::cisco-cloudlock-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::cisco-cloudlock-logs/cloudlock/state.json" } ] }
- 別のバケット名を入力した場合は、
cisco-cloudlock-logs
を置き換えます。
- 別のバケット名を入力した場合は、
[次へ] > [ポリシーを作成] をクリックします。
[IAM] > [ロール] > [ロールの作成] > [AWS サービス] > [Lambda] に移動します。
新しく作成したポリシーを関連付けます。
ロールに「
cloudlock-lambda-role
」という名前を付けて、[ロールを作成] をクリックします。
Lambda 関数を作成する
- AWS コンソールで、[Lambda] > [Functions] > [Create function] に移動します。
- [Author from scratch] をクリックします。
次の構成情報を提供してください。
設定 値 名前 cloudlock-data-export
ランタイム Python 3.12(最新のサポート対象) アーキテクチャ x86_64 実行ロール cloudlock-lambda-role
関数を作成したら、[コード] タブを開き、スタブを削除して次のコード(
cloudlock-data-export.py
)を入力します。import json import boto3 import urllib3 import os from datetime import datetime, timedelta import logging import time # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Initialize S3 client s3_client = boto3.client('s3') def lambda_handler(event, context): """ Lambda function to fetch Cisco CloudLock CASB data and store in S3 """ # Environment variables s3_bucket = os.environ['S3_BUCKET'] s3_prefix = os.environ['S3_PREFIX'] state_key = os.environ['STATE_KEY'] api_token = os.environ['CLOUDLOCK_API_TOKEN'] api_base = os.environ['CLOUDLOCK_API_BASE'] # HTTP client http = urllib3.PoolManager() try: # Get last run state for all endpoints state = get_last_run_state(s3_bucket, state_key) # Fetch incidents data (using updated_after for incremental sync) incidents_updated_after = state.get('incidents_updated_after') incidents, new_incidents_state = fetch_cloudlock_incidents( http, api_base, api_token, incidents_updated_after ) if incidents: upload_to_s3_ndjson(s3_bucket, s3_prefix, 'incidents', incidents) logger.info(f"Uploaded {len(incidents)} incidents to S3") state['incidents_updated_after'] = new_incidents_state # Fetch activities data (using from/to time range) activities_from = state.get('activities_from') if not activities_from: activities_from = (datetime.utcnow() - timedelta(hours=24)).isoformat() activities_to = datetime.utcnow().isoformat() activities = fetch_cloudlock_activities( http, api_base, api_token, activities_from, activities_to ) if activities: upload_to_s3_ndjson(s3_bucket, s3_prefix, 'activities', activities) logger.info(f"Uploaded {len(activities)} activities to S3") state['activities_from'] = activities_to # Fetch entities data (using updated_after for incremental sync) entities_updated_after = state.get('entities_updated_after') entities, new_entities_state = fetch_cloudlock_entities( http, api_base, api_token, entities_updated_after ) if entities: upload_to_s3_ndjson(s3_bucket, s3_prefix, 'entities', entities) logger.info(f"Uploaded {len(entities)} entities to S3") state['entities_updated_after'] = new_entities_state # Update consolidated state state['updated_at'] = datetime.utcnow().isoformat() update_last_run_state(s3_bucket, state_key, state) return { 'statusCode': 200, 'body': json.dumps('CloudLock data export completed successfully') } except Exception as e: logger.error(f"Error in lambda_handler: {str(e)}") return { 'statusCode': 500, 'body': json.dumps(f'Error: {str(e)}') } def make_api_request(http, url, headers, retries=3): """ Make API request with exponential backoff retry logic """ for attempt in range(retries): try: response = http.request('GET', url, headers=headers) if response.status == 200: return response elif response.status == 429: # Rate limit retry_after = int(response.headers.get('Retry-After', 60)) logger.warning(f"Rate limited, waiting {retry_after} seconds") time.sleep(retry_after) else: logger.error(f"API request failed with status {response.status}") except Exception as e: logger.error(f"Request attempt {attempt + 1} failed: {str(e)}") if attempt < retries - 1: wait_time = 2 ** attempt time.sleep(wait_time) else: raise return None def fetch_cloudlock_incidents(http, api_base, api_token, updated_after=None): """ Fetch incidents data from CloudLock API using updated_after for incremental sync API Reference: https://developer.cisco.com/docs/cloud-security/ """ url = f"{api_base}/api/v2/incidents" headers = { 'Authorization': f'Bearer {api_token}', 'Content-Type': 'application/json' } params = { 'limit': 1000, 'offset': 0, 'count_total': 'false' } if updated_after: params['updated_after'] = updated_after all_data = [] latest_updated_at = updated_after try: while True: # Build URL with parameters (avoid logging sensitive data) param_string = '&'.join([f"{k}={v}" for k, v in params.items()]) full_url = f"{url}?{param_string}" logger.info(f"Fetching incidents with offset: {params['offset']}") response = make_api_request(http, full_url, headers) if not response: break data = json.loads(response.data.decode('utf-8')) batch_data = data if isinstance(data, list) else data.get('data', []) if not batch_data: break all_data.extend(batch_data) # Track latest updated_at for incremental sync for item in batch_data: if 'updated_at' in item: item_updated_at = item['updated_at'] if not latest_updated_at or item_updated_at > latest_updated_at: latest_updated_at = item_updated_at # Check pagination if len(batch_data) < params['limit']: break params['offset'] += params['limit'] logger.info(f"Fetched {len(all_data)} incidents") return all_data, latest_updated_at except Exception as e: logger.error(f"Error fetching incidents: {str(e)}") return [], updated_after def fetch_cloudlock_activities(http, api_base, api_token, from_time, to_time): """ Fetch activities data from CloudLock API using time range API Reference: https://developer.cisco.com/docs/cloud-security/ """ url = f"{api_base}/api/v2/activities" headers = { 'Authorization': f'Bearer {api_token}', 'Content-Type': 'application/json' } params = { 'limit': 1000, 'offset': 0, 'from': from_time, 'to': to_time } all_data = [] try: while True: param_string = '&'.join([f"{k}={v}" for k, v in params.items()]) full_url = f"{url}?{param_string}" logger.info(f"Fetching activities with offset: {params['offset']}") response = make_api_request(http, full_url, headers) if not response: break data = json.loads(response.data.decode('utf-8')) batch_data = data if isinstance(data, list) else data.get('data', []) if not batch_data: break all_data.extend(batch_data) if len(batch_data) < params['limit']: break params['offset'] += params['limit'] logger.info(f"Fetched {len(all_data)} activities") return all_data except Exception as e: logger.error(f"Error fetching activities: {str(e)}") return [] def fetch_cloudlock_entities(http, api_base, api_token, updated_after=None): """ Fetch entities data from CloudLock API using updated_after for incremental sync API Reference: https://developer.cisco.com/docs/cloud-security/ """ url = f"{api_base}/api/v2/entities" headers = { 'Authorization': f'Bearer {api_token}', 'Content-Type': 'application/json' } params = { 'limit': 1000, 'offset': 0 } if updated_after: params['updated_after'] = updated_after all_data = [] latest_updated_at = updated_after try: while True: param_string = '&'.join([f"{k}={v}" for k, v in params.items()]) full_url = f"{url}?{param_string}" logger.info(f"Fetching entities with offset: {params['offset']}") response = make_api_request(http, full_url, headers) if not response: break data = json.loads(response.data.decode('utf-8')) batch_data = data if isinstance(data, list) else data.get('data', []) if not batch_data: break all_data.extend(batch_data) # Track latest updated_at for incremental sync for item in batch_data: if 'updated_at' in item: item_updated_at = item['updated_at'] if not latest_updated_at or item_updated_at > latest_updated_at: latest_updated_at = item_updated_at if len(batch_data) < params['limit']: break params['offset'] += params['limit'] logger.info(f"Fetched {len(all_data)} entities") return all_data, latest_updated_at except Exception as e: logger.error(f"Error fetching entities: {str(e)}") return [], updated_after def upload_to_s3_ndjson(bucket, prefix, data_type, data): """ Upload data to S3 bucket in NDJSON format (one JSON object per line) """ timestamp = datetime.utcnow().strftime('%Y/%m/%d/%H') filename = f"{prefix}{data_type}/{timestamp}/cloudlock_{data_type}_{int(datetime.utcnow().timestamp())}.jsonl" try: # Convert to NDJSON format ndjson_content = 'n'.join([json.dumps(item, separators=(',', ':')) for item in data]) s3_client.put_object( Bucket=bucket, Key=filename, Body=ndjson_content, ContentType='application/x-ndjson' ) logger.info(f"Successfully uploaded {filename} to S3") except Exception as e: logger.error(f"Error uploading to S3: {str(e)}") raise def get_last_run_state(bucket, key): """ Get the last run state from S3 with separate tracking for each endpoint """ try: response = s3_client.get_object(Bucket=bucket, Key=key) state = json.loads(response['Body'].read().decode('utf-8')) return state except s3_client.exceptions.NoSuchKey: logger.info("No previous state found, starting fresh") return {} except Exception as e: logger.error(f"Error reading state: {str(e)}") return {} def update_last_run_state(bucket, key, state): """ Update the consolidated state in S3 """ try: s3_client.put_object( Bucket=bucket, Key=key, Body=json.dumps(state, indent=2), ContentType='application/json' ) logger.info("Updated state successfully") except Exception as e: logger.error(f"Error updating state: {str(e)}") raise
[構成] > [環境変数] に移動します。
[編集>新しい環境変数を追加] をクリックします。
次の環境変数を入力し、実際の値に置き換えます。
キー 値の例 S3_BUCKET
cisco-cloudlock-logs
S3_PREFIX
cloudlock/
STATE_KEY
cloudlock/state.json
CLOUDLOCK_API_TOKEN
<your-api-token>
CLOUDLOCK_API_BASE
<your-cloudlock-api-url>
関数が作成されたら、そのページにとどまるか、[Lambda] > [関数] > [your-function] を開きます。
[CONFIGURATION] タブを選択します。
[全般設定] パネルで、[編集] をクリックします。
[Timeout] を [5 minutes (300 seconds)] に変更し、[Save] をクリックします。
EventBridge スケジュールを作成する
- [Amazon EventBridge] > [Scheduler] > [スケジュールの作成] に移動します。
- 次の構成の詳細を入力します。
- 定期的なスケジュール: レート(
1 hour
)。 - ターゲット: Lambda 関数
cloudlock-data-export
。 - 名前:
cloudlock-data-export-1h
- 定期的なスケジュール: レート(
- [スケジュールを作成] をクリックします。
省略可: Google SecOps 用の読み取り専用の IAM ユーザーと鍵を作成する
- AWS コンソール > IAM > ユーザー > ユーザーを追加 に移動します。
- [ユーザーを追加] をクリックします。
- 次の構成の詳細を入力します。
- ユーザー: 「
secops-reader
」と入力します。 - アクセスの種類: [アクセスキー - プログラムによるアクセス] を選択します。
- ユーザー: 「
- [ユーザーを作成] をクリックします。
- 最小限の読み取りポリシー(カスタム)を関連付ける: [ユーザー] > [secops-reader] > [権限] > [権限を追加] > [ポリシーを直接関連付ける] > [ポリシーを作成]。
JSON エディタで、次のポリシーを入力します。
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::cisco-cloudlock-logs/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::cisco-cloudlock-logs" } ] }
名前を
secops-reader-policy
に設定します。[ポリシーの作成> 検索/選択> 次へ> 権限を追加] に移動します。
[セキュリティ認証情報] > [アクセスキー] > [アクセスキーを作成] に移動します。
CSV をダウンロードします(これらの値はフィードに入力されます)。
Cisco CloudLock のログを取り込むように Google SecOps でフィードを構成する
- [SIEM 設定] > [フィード] に移動します。
- [+ 新しいフィードを追加] をクリックします。
- [フィード名] フィールドに、フィードの名前を入力します(例:
Cisco CloudLock logs
)。 - [ソースタイプ] として [Amazon S3 V2] を選択します。
- [Log type] として [Cisco CloudLock] を選択します。
- [次へ] をクリックします。
- 次の入力パラメータの値を指定します。
- S3 URI:
s3://cisco-cloudlock-logs/cloudlock/
- Source deletion options: 必要に応じて削除オプションを選択します。
- ファイルの最大経過日数: 指定した日数以内に変更されたファイルを含めます。デフォルトは 180 日です。
- アクセスキー ID: S3 バケットにアクセスできるユーザー アクセスキー。
- シークレット アクセスキー: S3 バケットにアクセスできるユーザーのシークレット キー。
- アセットの名前空間: アセットの名前空間。
- Ingestion labels: このフィードのイベントに適用されるラベル。
- S3 URI:
- [次へ] をクリックします。
- [Finalize] 画面で新しいフィードの設定を確認し、[送信] をクリックします。
UDM マッピング テーブル
ログフィールド | UDM マッピング | ロジック |
---|---|---|
created_at |
about.resource.attribute.labels.key |
created_at フィールドの値がラベルキーに割り当てられます。 |
created_at |
about.resource.attribute.labels.value |
created_at フィールドの値がラベルの値に割り当てられます。 |
created_at |
about.resource.attribute.creation_time |
created_at フィールドはタイムスタンプとして解析され、マッピングされます。 |
entity.id |
target.asset.product_object_id |
entity.id フィールドの名前が変更されました。 |
entity.ip |
target.ip |
entity.ip フィールドはターゲット IP フィールドに統合されます。 |
entity.mime_type |
target.file.mime_type |
entity.origin_type が「document」の場合、entity.mime_type フィールドの名前が変更されます。 |
entity.name |
target.application |
entity.origin_type が「app」の場合、entity.name フィールドの名前が変更されます。 |
entity.name |
target.file.full_path |
entity.origin_type が「document」の場合、entity.name フィールドの名前が変更されます。 |
entity.origin_id |
target.resource.product_object_id |
entity.origin_id フィールドの名前が変更されました。 |
entity.origin_type |
target.resource.resource_subtype |
entity.origin_type フィールドの名前が変更されました。 |
entity.owner_email |
target.user.email_addresses |
entity.owner_email フィールドがメールの正規表現と一致する場合、ターゲット ユーザーのメール フィールドに統合されます。 |
entity.owner_email |
target.user.user_display_name |
entity.owner_email フィールドがメールの正規表現と一致しない場合、そのフィールドの名前が変更されます。 |
entity.owner_name |
target.user.user_display_name |
entity.owner_email がメールの正規表現と一致する場合、entity.owner_name フィールドの名前が変更されます。 |
entity.vendor.name |
target.platform_version |
entity.vendor.name フィールドの名前が変更されました。 |
id |
metadata.product_log_id |
id フィールドの名前が変更されました。 |
incident_status |
metadata.product_event_type |
incident_status フィールドの名前が変更されました。値は「updated_at」にハードコードされています。値は updated_at フィールドから取得されます。updated_at フィールドはタイムスタンプとして解析され、マッピングされます。severity が「ALERT」で incident_status が「NEW」の場合は true に設定されます。ブール値に変換されます。severity が「ALERT」で incident_status が「NEW」の場合は true に設定されます。ブール値に変換されます。値は「GENERIC_EVENT」にハードコードされています。値は「CISCO_CLOUDLOCK_CASB」にハードコードされます。値は「CloudLock」にハードコードされています。値は「Cisco」にハードコードされます。severity が「ALERT」で、incident_status が「RESOLVED」または「DISMISSED」でない場合は、「ALERTING」に設定します。severity が「ALERT」で、incident_status が「RESOLVED」または「DISMISSED」の場合は、「NOT_ALERTING」に設定します。matches 配列(具体的には各一致オブジェクトのキー)から派生します。matches 配列(特に各一致オブジェクトの値)から派生します。policy.id から取得されます。policy.name から取得されます。severity が「INFO」の場合は、「INFORMATIONAL」に設定します。severity が「CRITICAL」の場合は、「CRITICAL」に設定します。severity から取得されます。値は、「一致数: 」と match_count の値を連結した値に設定されます。entity.origin_type が「document」の場合は、「STORAGE_OBJECT」に設定します。entity.origin_type が「document」の場合、entity.direct_url から派生します。 |
policy.id |
security_result.rule_id |
policy.id フィールドの名前が変更されました。 |
policy.name |
security_result.rule_name |
policy.name フィールドの名前が変更されました。 |
severity |
security_result.severity_details |
severity フィールドの名前が変更されました。 |
updated_at |
about.resource.attribute.labels.key |
updated_at フィールドの値がラベルキーに割り当てられます。 |
updated_at |
about.resource.attribute.labels.value |
updated_at フィールドの値がラベルの値に割り当てられます。 |
updated_at |
about.resource.attribute.last_update_time |
updated_at フィールドはタイムスタンプとして解析され、マッピングされます。 |
さらにサポートが必要な場合 コミュニティ メンバーや Google SecOps のプロフェッショナルから回答を得ることができます。