收集 Cisco CloudLock CASB 記錄

支援的國家/地區:

本文說明如何使用 Amazon S3,將 Cisco CloudLock CASB 記錄擷取至 Google Security Operations。剖析器會從 JSON 記錄檔中擷取欄位,然後轉換並對應至統合資料模型 (UDM)。這項函式會處理日期剖析作業、將特定欄位轉換為字串、將欄位對應至 UDM 實體 (中繼資料、目標、安全性結果、關於),並逐一檢查 matches 以擷取偵測欄位,最終將所有擷取的資料合併至 @output 欄位。

事前準備

  • Google SecOps 執行個體
  • Cisco CloudLock CASB 租戶的特殊存取權
  • AWS 的具備權限存取權 (S3、IAM、Lambda、EventBridge)

取得 Cisco CloudLock 必要條件

  1. 登入 Cisco CloudLock CASB 管理控制台
  2. 前往 [設定]
  3. 按一下「Authentication & API」分頁標籤。
  4. 在「API」下方,按一下「產生」即可建立存取權杖。
  5. 複製下列詳細資料並存放在安全位置:
    • API 存取權杖
    • CloudLock API 伺服器網址 (請與 Cloudlock 支援團隊聯絡,取得貴機構專屬的網址)

為 Google SecOps 設定 AWS S3 值區和 IAM

  1. 按照這份使用者指南建立 Amazon S3 bucket建立 bucket
  2. 儲存 bucket 的「名稱」和「區域」,以供日後參考 (例如 cisco-cloudlock-logs)。
  3. 請按照這份使用者指南建立使用者建立 IAM 使用者
  4. 選取建立的「使用者」
  5. 選取「安全憑證」分頁標籤。
  6. 在「Access Keys」部分中,按一下「Create Access Key」
  7. 選取「第三方服務」做為「用途」
  8. 點選「下一步」
  9. 選用:新增說明標記。
  10. 按一下「建立存取金鑰」
  11. 按一下「下載 CSV 檔案」,儲存「存取金鑰」和「私密存取金鑰」以供日後參考。
  12. 按一下 [完成]
  13. 選取「權限」分頁標籤。
  14. 在「權限政策」部分中,按一下「新增權限」
  15. 選取「新增權限」
  16. 選取「直接附加政策」
  17. 搜尋「AmazonS3FullAccess」AmazonS3FullAccess政策。
  18. 選取政策。
  19. 點選「下一步」
  20. 按一下「Add permissions」。

設定 S3 上傳的身分與存取權管理政策和角色

  1. 在 AWS 控制台中,依序前往「IAM」>「Policies」
  2. 按一下「建立政策」>「JSON」分頁
  3. 輸入下列政策:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::cisco-cloudlock-logs/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": "s3:GetObject",
          "Resource": "arn:aws:s3:::cisco-cloudlock-logs/cloudlock/state.json"
        }
      ]
    }
    
    • 如果您輸入其他 bucket 名稱,請替換 cisco-cloudlock-logs
  4. 依序點選「下一步」>「建立政策」

  5. 依序前往「IAM」>「Roles」>「Create role」>「AWS service」>「Lambda」。

  6. 附加新建立的政策。

  7. 為角色命名 cloudlock-lambda-role,然後按一下「建立角色」

建立 Lambda 函式

  1. AWS 控制台中,依序前往「Lambda」>「Functions」>「Create function」
  2. 按一下「從頭開始撰寫」
  3. 請提供下列設定詳細資料:

    設定
    名稱 cloudlock-data-export
    執行階段 Python 3.12 (最新支援版本)
    架構 x86_64
    執行角色 cloudlock-lambda-role
  4. 建立函式後,開啟「程式碼」分頁,刪除存根並輸入下列程式碼 (cloudlock-data-export.py):

    import json
    import boto3
    import urllib3
    import os
    from datetime import datetime, timedelta
    import logging
    import time
    
    # Configure logging
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)
    
    # Initialize S3 client
    s3_client = boto3.client('s3')
    
    def lambda_handler(event, context):
        """
        Lambda function to fetch Cisco CloudLock CASB data and store in S3
        """
    
        # Environment variables
        s3_bucket = os.environ['S3_BUCKET']
        s3_prefix = os.environ['S3_PREFIX']
        state_key = os.environ['STATE_KEY']
        api_token = os.environ['CLOUDLOCK_API_TOKEN']
        api_base = os.environ['CLOUDLOCK_API_BASE']
    
        # HTTP client
        http = urllib3.PoolManager()
    
        try:
            # Get last run state for all endpoints
            state = get_last_run_state(s3_bucket, state_key)
    
            # Fetch incidents data (using updated_after for incremental sync)
            incidents_updated_after = state.get('incidents_updated_after')
            incidents, new_incidents_state = fetch_cloudlock_incidents(
                http, api_base, api_token, incidents_updated_after
            )
            if incidents:
                upload_to_s3_ndjson(s3_bucket, s3_prefix, 'incidents', incidents)
                logger.info(f"Uploaded {len(incidents)} incidents to S3")
                state['incidents_updated_after'] = new_incidents_state
    
            # Fetch activities data (using from/to time range)
            activities_from = state.get('activities_from')
            if not activities_from:
                activities_from = (datetime.utcnow() - timedelta(hours=24)).isoformat()
    
            activities_to = datetime.utcnow().isoformat()
            activities = fetch_cloudlock_activities(
                http, api_base, api_token, activities_from, activities_to
            )
            if activities:
                upload_to_s3_ndjson(s3_bucket, s3_prefix, 'activities', activities)
                logger.info(f"Uploaded {len(activities)} activities to S3")
                state['activities_from'] = activities_to
    
            # Fetch entities data (using updated_after for incremental sync)
            entities_updated_after = state.get('entities_updated_after')
            entities, new_entities_state = fetch_cloudlock_entities(
                http, api_base, api_token, entities_updated_after
            )
            if entities:
                upload_to_s3_ndjson(s3_bucket, s3_prefix, 'entities', entities)
                logger.info(f"Uploaded {len(entities)} entities to S3")
                state['entities_updated_after'] = new_entities_state
    
            # Update consolidated state
            state['updated_at'] = datetime.utcnow().isoformat()
            update_last_run_state(s3_bucket, state_key, state)
    
            return {
                'statusCode': 200,
                'body': json.dumps('CloudLock data export completed successfully')
            }
    
        except Exception as e:
            logger.error(f"Error in lambda_handler: {str(e)}")
            return {
                'statusCode': 500,
                'body': json.dumps(f'Error: {str(e)}')
            }
    
    def make_api_request(http, url, headers, retries=3):
        """
        Make API request with exponential backoff retry logic
        """
        for attempt in range(retries):
            try:
                response = http.request('GET', url, headers=headers)
    
                if response.status == 200:
                    return response
                elif response.status == 429:  # Rate limit
                    retry_after = int(response.headers.get('Retry-After', 60))
                    logger.warning(f"Rate limited, waiting {retry_after} seconds")
                    time.sleep(retry_after)
                else:
                    logger.error(f"API request failed with status {response.status}")
    
            except Exception as e:
                logger.error(f"Request attempt {attempt + 1} failed: {str(e)}")
                if attempt < retries - 1:
                    wait_time = 2 ** attempt
                    time.sleep(wait_time)
                else:
                    raise
    
        return None
    
    def fetch_cloudlock_incidents(http, api_base, api_token, updated_after=None):
        """
        Fetch incidents data from CloudLock API using updated_after for incremental sync
        API Reference: https://developer.cisco.com/docs/cloud-security/
        """
        url = f"{api_base}/api/v2/incidents"
        headers = {
            'Authorization': f'Bearer {api_token}',
            'Content-Type': 'application/json'
        }
    
        params = {
            'limit': 1000,
            'offset': 0,
            'count_total': 'false'
        }
    
        if updated_after:
            params['updated_after'] = updated_after
    
        all_data = []
        latest_updated_at = updated_after
    
        try:
            while True:
                # Build URL with parameters (avoid logging sensitive data)
                param_string = '&'.join([f"{k}={v}" for k, v in params.items()])
                full_url = f"{url}?{param_string}"
    
                logger.info(f"Fetching incidents with offset: {params['offset']}")
    
                response = make_api_request(http, full_url, headers)
                if not response:
                    break
    
                data = json.loads(response.data.decode('utf-8'))
                batch_data = data if isinstance(data, list) else data.get('data', [])
    
                if not batch_data:
                    break
    
                all_data.extend(batch_data)
    
                # Track latest updated_at for incremental sync
                for item in batch_data:
                    if 'updated_at' in item:
                        item_updated_at = item['updated_at']
                        if not latest_updated_at or item_updated_at > latest_updated_at:
                            latest_updated_at = item_updated_at
    
                # Check pagination
                if len(batch_data) < params['limit']:
                    break
    
                params['offset'] += params['limit']
    
            logger.info(f"Fetched {len(all_data)} incidents")
            return all_data, latest_updated_at
    
        except Exception as e:
            logger.error(f"Error fetching incidents: {str(e)}")
            return [], updated_after
    
    def fetch_cloudlock_activities(http, api_base, api_token, from_time, to_time):
        """
        Fetch activities data from CloudLock API using time range
        API Reference: https://developer.cisco.com/docs/cloud-security/
        """
        url = f"{api_base}/api/v2/activities"
        headers = {
            'Authorization': f'Bearer {api_token}',
            'Content-Type': 'application/json'
        }
    
        params = {
            'limit': 1000,
            'offset': 0,
            'from': from_time,
            'to': to_time
        }
    
        all_data = []
    
        try:
            while True:
                param_string = '&'.join([f"{k}={v}" for k, v in params.items()])
                full_url = f"{url}?{param_string}"
    
                logger.info(f"Fetching activities with offset: {params['offset']}")
    
                response = make_api_request(http, full_url, headers)
                if not response:
                    break
    
                data = json.loads(response.data.decode('utf-8'))
                batch_data = data if isinstance(data, list) else data.get('data', [])
    
                if not batch_data:
                    break
    
                all_data.extend(batch_data)
    
                if len(batch_data) < params['limit']:
                    break
    
                params['offset'] += params['limit']
    
            logger.info(f"Fetched {len(all_data)} activities")
            return all_data
    
        except Exception as e:
            logger.error(f"Error fetching activities: {str(e)}")
            return []
    
    def fetch_cloudlock_entities(http, api_base, api_token, updated_after=None):
        """
        Fetch entities data from CloudLock API using updated_after for incremental sync
        API Reference: https://developer.cisco.com/docs/cloud-security/
        """
        url = f"{api_base}/api/v2/entities"
        headers = {
            'Authorization': f'Bearer {api_token}',
            'Content-Type': 'application/json'
        }
    
        params = {
            'limit': 1000,
            'offset': 0
        }
    
        if updated_after:
            params['updated_after'] = updated_after
    
        all_data = []
        latest_updated_at = updated_after
    
        try:
            while True:
                param_string = '&'.join([f"{k}={v}" for k, v in params.items()])
                full_url = f"{url}?{param_string}"
    
                logger.info(f"Fetching entities with offset: {params['offset']}")
    
                response = make_api_request(http, full_url, headers)
                if not response:
                    break
    
                data = json.loads(response.data.decode('utf-8'))
                batch_data = data if isinstance(data, list) else data.get('data', [])
    
                if not batch_data:
                    break
    
                all_data.extend(batch_data)
    
                # Track latest updated_at for incremental sync
                for item in batch_data:
                    if 'updated_at' in item:
                        item_updated_at = item['updated_at']
                        if not latest_updated_at or item_updated_at > latest_updated_at:
                            latest_updated_at = item_updated_at
    
                if len(batch_data) < params['limit']:
                    break
    
                params['offset'] += params['limit']
    
            logger.info(f"Fetched {len(all_data)} entities")
            return all_data, latest_updated_at
    
        except Exception as e:
            logger.error(f"Error fetching entities: {str(e)}")
            return [], updated_after
    
    def upload_to_s3_ndjson(bucket, prefix, data_type, data):
        """
        Upload data to S3 bucket in NDJSON format (one JSON object per line)
        """
        timestamp = datetime.utcnow().strftime('%Y/%m/%d/%H')
        filename = f"{prefix}{data_type}/{timestamp}/cloudlock_{data_type}_{int(datetime.utcnow().timestamp())}.jsonl"
    
        try:
            # Convert to NDJSON format
            ndjson_content = 'n'.join([json.dumps(item, separators=(',', ':')) for item in data])
    
            s3_client.put_object(
                Bucket=bucket,
                Key=filename,
                Body=ndjson_content,
                ContentType='application/x-ndjson'
            )
            logger.info(f"Successfully uploaded {filename} to S3")
        except Exception as e:
            logger.error(f"Error uploading to S3: {str(e)}")
            raise
    
    def get_last_run_state(bucket, key):
        """
        Get the last run state from S3 with separate tracking for each endpoint
        """
        try:
            response = s3_client.get_object(Bucket=bucket, Key=key)
            state = json.loads(response['Body'].read().decode('utf-8'))
            return state
        except s3_client.exceptions.NoSuchKey:
            logger.info("No previous state found, starting fresh")
            return {}
        except Exception as e:
            logger.error(f"Error reading state: {str(e)}")
            return {}
    
    def update_last_run_state(bucket, key, state):
        """
        Update the consolidated state in S3
        """
        try:
            s3_client.put_object(
                Bucket=bucket,
                Key=key,
                Body=json.dumps(state, indent=2),
                ContentType='application/json'
            )
            logger.info("Updated state successfully")
        except Exception as e:
            logger.error(f"Error updating state: {str(e)}")
            raise
    
  5. 依序前往「設定」>「環境變數」

  6. 依序點選「編輯」> 新增環境變數

  7. 輸入下列環境變數,並將 換成您的值。

    範例值
    S3_BUCKET cisco-cloudlock-logs
    S3_PREFIX cloudlock/
    STATE_KEY cloudlock/state.json
    CLOUDLOCK_API_TOKEN <your-api-token>
    CLOUDLOCK_API_BASE <your-cloudlock-api-url>
  8. 建立函式後,請留在函式頁面 (或依序開啟「Lambda」>「Functions」>「your-function」)。

  9. 選取「設定」分頁標籤。

  10. 在「一般設定」面板中,按一下「編輯」

  11. 將「Timeout」(逾時間隔) 變更為「5 minutes (300 seconds)」(5 分鐘 (300 秒)),然後按一下「Save」(儲存)

建立 EventBridge 排程

  1. 依序前往「Amazon EventBridge」>「Scheduler」>「Create schedule」
  2. 提供下列設定詳細資料:
    • 週期性時間表費率 (1 hour)。
    • 目標:您的 Lambda 函式 cloudlock-data-export
    • 名稱cloudlock-data-export-1h
  3. 按一下「建立時間表」

選用:為 Google SecOps 建立唯讀 IAM 使用者和金鑰

  1. 依序前往 AWS 管理中心 > IAM >「Users」(使用者) >「Add users」(新增使用者)
  2. 點選 [Add users] (新增使用者)。
  3. 提供下列設定詳細資料:
    • 使用者:輸入 secops-reader
    • 存取類型:選取「存取金鑰 - 程式輔助存取」
  4. 按一下「建立使用者」
  5. 附加最低讀取權限政策 (自訂):依序選取「Users」(使用者) >「secops-reader」>「Permissions」(權限) >「Add permissions」(新增權限) >「Attach policies directly」(直接附加政策) >「Create policy」(建立政策)
  6. 在 JSON 編輯器中輸入下列政策:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::cisco-cloudlock-logs/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::cisco-cloudlock-logs"
        }
      ]
    }
    
  7. 將名稱設為 secops-reader-policy

  8. 依序前往「建立政策」> 搜尋/選取 >「下一步」>「新增權限」

  9. 依序前往「安全憑證」>「存取金鑰」>「建立存取金鑰」

  10. 下載 CSV (這些值會輸入至動態饋給)。

在 Google SecOps 中設定資訊提供,擷取 Cisco CloudLock 記錄

  1. 依序前往「SIEM 設定」>「動態饋給」
  2. 按一下「+ 新增動態消息」
  3. 在「動態饋給名稱」欄位中輸入動態饋給名稱 (例如 Cisco CloudLock logs)。
  4. 選取「Amazon S3 V2」做為「來源類型」
  5. 選取「Cisco CloudLock」做為「記錄類型」
  6. 點選「下一步」
  7. 指定下列輸入參數的值:
    • S3 URIs3://cisco-cloudlock-logs/cloudlock/
    • 來源刪除選項:根據偏好設定選取刪除選項。
    • 檔案存在時間上限:包含在過去天數內修改的檔案。預設值為 180 天。
    • 存取金鑰 ID:具有 S3 值區存取權的使用者存取金鑰。
    • 存取密鑰:具有 S3 bucket 存取權的使用者私密金鑰。
    • 資產命名空間資產命名空間
    • 擷取標籤:套用至這個動態饋給事件的標籤。
  8. 點選「下一步」
  9. 在「完成」畫面中檢查新的動態饋給設定,然後按一下「提交」

UDM 對應表

記錄欄位 UDM 對應 邏輯
created_at about.resource.attribute.labels.key created_at 欄位的值會指派給標籤鍵。
created_at about.resource.attribute.labels.value created_at 欄位的值會指派給標籤值。
created_at about.resource.attribute.creation_time created_at 欄位會剖析為時間戳記並對應。
entity.id target.asset.product_object_id entity.id 欄位已重新命名。
entity.ip target.ip entity.ip 欄位會合併至目標 IP 欄位。
entity.mime_type target.file.mime_type entity.origin_type 為「document」時,entity.mime_type 欄位會重新命名。
entity.name target.application entity.origin_type 為「app」時,entity.name 欄位會重新命名。
entity.name target.file.full_path entity.origin_type 為「document」時,entity.name 欄位會重新命名。
entity.origin_id target.resource.product_object_id entity.origin_id 欄位已重新命名。
entity.origin_type target.resource.resource_subtype entity.origin_type 欄位已重新命名。
entity.owner_email target.user.email_addresses 如果 entity.owner_email 欄位符合電子郵件的 regex,系統會將該欄位合併至目標使用者電子郵件欄位。
entity.owner_email target.user.user_display_name 如果 entity.owner_email 欄位符合電子郵件規則運算式,系統會重新命名該欄位。
entity.owner_name target.user.user_display_name 如果 entity.owner_email 符合電子郵件規則運算式,系統會重新命名 entity.owner_name 欄位。
entity.vendor.name target.platform_version entity.vendor.name 欄位已重新命名。
id metadata.product_log_id id 欄位已重新命名。
incident_status metadata.product_event_type incident_status 欄位已重新命名。值會以硬式編碼設為「updated_at」。這個值衍生自 updated_at 欄位。updated_at 欄位會剖析為時間戳記並對應。如果 severity 為「ALERT」,且 incident_status 為「NEW」,則設為「true」。已轉換為布林值。如果 severity 為「ALERT」,且 incident_status 為「NEW」,則設為「true」。已轉換為布林值。值會硬式編碼為「GENERIC_EVENT」。值會硬式編碼為「CISCO_CLOUDLOCK_CASB」。值會硬式編碼為「CloudLock」。值會硬式編碼為「Cisco」。如果 severity 為「ALERT」,且 incident_status 不是「RESOLVED」或「DISMISSED」,則設為「ALERTING」。如果 severity 為「ALERT」,且 incident_status 為「RESOLVED」或「DISMISSED」,請設為「NOT_ALERTING」。衍生自 matches 陣列,特別是每個比對物件的鍵。衍生自 matches 陣列,特別是每個相符物件的值。衍生自 policy.id。衍生自 policy.name。如果 severity 為「INFO」,請設為「INFORMATIONAL」。如果 severity 為「CRITICAL」,請設為「CRITICAL」。衍生自 severity。這個值會設為「相符數量:」,並與 match_count 的值串連。如果 entity.origin_type 為「document」,請設為「STORAGE_OBJECT」。當 entity.origin_type 為「文件」時,衍生自 entity.direct_url
policy.id security_result.rule_id policy.id 欄位已重新命名。
policy.name security_result.rule_name policy.name 欄位已重新命名。
severity security_result.severity_details severity 欄位已重新命名。
updated_at about.resource.attribute.labels.key updated_at 欄位的值會指派給標籤鍵。
updated_at about.resource.attribute.labels.value updated_at 欄位的值會指派給標籤值。
updated_at about.resource.attribute.last_update_time updated_at 欄位會剖析為時間戳記並對應。

還有其他問題嗎?向社群成員和 Google SecOps 專業人員尋求答案。