收集 Swimlane Platform 日志

支持的平台:

本文档介绍了如何使用 Google Cloud Storage 将 Swimlane 平台日志提取到 Google Security Operations。Swimlane Platform 是一种安全编排、自动化和响应 (SOAR) 平台,可提供审核日志记录功能,用于跟踪账号和租户中的用户活动、配置更改和系统事件。

准备工作

确保您满足以下前提条件:

  • Google SecOps 实例
  • 已启用 Cloud Storage API 的 GCP 项目
  • 创建和管理 GCS 存储分区的权限
  • 管理 GCS 存储分区的 IAM 政策的权限
  • 创建 Cloud Run 服务、Pub/Sub 主题和 Cloud Scheduler 作业的权限
  • 对 Swimlane 平台具有特权访问权限,并拥有访问审核日志的账号管理员权限
  • Swimlane 平台实例网址和账号凭据

收集 Swimlane Platform 凭据

获取 Swimlane Platform 实例网址

  1. 登录您的 Swimlane Platform 实例。
  2. 记下浏览器地址栏中的实例网址。
    • 格式:https://<region>.swimlane.app(例如,https://us.swimlane.apphttps://eu.swimlane.app
    • 示例:如果您通过 https://us.swimlane.app/workspace 访问 Swimlane,则您的基础网址为 https://us.swimlane.app

创建个人访问令牌

  1. 以账号管理员身份登录 Swimlane 平台
  2. 前往个人资料选项
  3. 点击个人资料以打开个人资料编辑器。
  4. 前往个人访问令牌部分。
  5. 点击生成令牌以创建新的个人访问令牌。
  6. 立即复制令牌并妥善存储(系统不会再显示此令牌)。

获取账号 ID

如果您不知道自己的账号 ID,请与 Swimlane 管理员联系。审核日志 API 路径需要账号 ID。

记录集成的以下详细信息:

  • 个人访问令牌 (PAT):用于 API 调用的 Private-Token 标头中。
  • 账号 ID:对于审核日志 API 路径 /api/public/audit/account/{ACCOUNT_ID}/auditlogs,此参数是必需的。
  • 基础网址:您的 Swimlane 网域(例如 https://eu.swimlane.apphttps://us.swimlane.app)。

验证权限

如需验证您的账号是否拥有访问审核日志所需的权限,请执行以下操作:

  1. 登录 Swimlane 平台
  2. 确认您拥有账号管理员访问权限。
  3. 如果您无法访问审核日志功能,请与您的 Swimlane 管理员联系。

测试 API 访问权限

  • 在继续进行集成之前,请验证您的 API 凭据是否正常运行:

    # Replace with your actual credentials
    SWIMLANE_BASE_URL="https://<region>.swimlane.app"
    SWIMLANE_ACCOUNT_ID="<your-account-id>"
    SWIMLANE_PAT_TOKEN="<your-personal-access-token>"
    
    # Test API access
    curl -v -X GET "${SWIMLANE_BASE_URL}/api/public/audit/account/${SWIMLANE_ACCOUNT_ID}/auditlogs?pageNumber=1&pageSize=10" \
      -H "Private-Token: ${SWIMLANE_PAT_TOKEN}" \
      -H "Accept: application/json"
    

预期响应:HTTP 200,包含审核日志的 JSON。

如果您收到错误消息,请执行以下操作:

  • HTTP 401:验证您的个人访问令牌是否正确
  • HTTP 403:验证您的账号是否拥有“账号管理员”权限
  • HTTP 404:验证账号 ID 和基本网址是否正确

创建 Google Cloud Storage 存储分区

  1. 前往 Google Cloud 控制台
  2. 选择您的项目或创建新项目。
  3. 在导航菜单中,依次前往 Cloud Storage > 存储分区
  4. 点击创建存储分区
  5. 提供以下配置详细信息:

    设置
    为存储分区命名 输入一个全局唯一的名称(例如 swimlane-audit
    位置类型 根据您的需求进行选择(区域级、双区域级、多区域级)
    位置 选择相应位置(例如 us-central1
    存储类别 标准(建议用于经常访问的日志)
    访问权限控制 统一(推荐)
    保护工具 可选:启用对象版本控制或保留政策
  6. 点击创建

为 Cloud Run 函数创建服务账号

Cloud Run 函数需要一个服务账号,该账号具有向 GCS 存储分区写入内容以及被 Pub/Sub 调用的权限。

创建服务账号

  1. GCP 控制台中,依次前往 IAM 和管理 > 服务账号
  2. 点击创建服务账号
  3. 提供以下配置详细信息:
    • 服务账号名称:输入 swimlane-audit-collector-sa
    • 服务账号说明:输入 Service account for Cloud Run function to collect Swimlane Platform logs
  4. 点击创建并继续
  5. 向此服务账号授予对项目的访问权限部分中,添加以下角色:
    1. 点击选择角色
    2. 搜索并选择 Storage Object Admin
    3. 点击 + 添加其他角色
    4. 搜索并选择 Cloud Run Invoker
    5. 点击 + 添加其他角色
    6. 搜索并选择 Cloud Functions Invoker
  6. 点击继续
  7. 点击完成

必须拥有这些角色,才能:

  • Storage Object Admin:将日志写入 GCS 存储分区并管理状态文件
  • Cloud Run Invoker:允许 Pub/Sub 调用函数
  • Cloud Functions Invoker:允许调用函数

授予对 GCS 存储分区的 IAM 权限

向服务账号授予对 GCS 存储分区的写入权限:

  1. 前往 Cloud Storage > 存储分区
  2. 点击您的存储分区名称。
  3. 前往权限标签页。
  4. 点击授予访问权限
  5. 提供以下配置详细信息:
    • 添加主账号:输入服务账号电子邮件地址(例如 swimlane-audit-collector-sa@PROJECT_ID.iam.gserviceaccount.com)。
    • 分配角色:选择 Storage Object Admin
  6. 点击保存

创建发布/订阅主题

创建一个 Pub/Sub 主题,供 Cloud Scheduler 发布消息,并供 Cloud Run 函数订阅。

  1. GCP 控制台中,前往 Pub/Sub > 主题
  2. 点击创建主题
  3. 提供以下配置详细信息:
    • 主题 ID:输入 swimlane-audit-trigger
    • 将其他设置保留为默认值。
  4. 点击创建

创建 Cloud Run 函数以收集日志

Cloud Run 函数由来自 Cloud Scheduler 的 Pub/Sub 消息触发,用于从 Swimlane Platform API 中提取日志并将其写入 GCS。

  1. GCP 控制台中,前往 Cloud Run
  2. 点击创建服务
  3. 选择函数(使用内嵌编辑器创建函数)。
  4. 配置部分中,提供以下配置详细信息:

    设置
    Service 名称 swimlane-audit-collector
    区域 选择与您的 GCS 存储分区匹配的区域(例如 us-central1
    运行时 选择 Python 3.12 或更高版本
  5. 触发器(可选)部分中:

    1. 点击 + 添加触发器
    2. 选择 Cloud Pub/Sub
    3. 选择 Cloud Pub/Sub 主题中,选择 Pub/Sub 主题 (swimlane-audit-trigger)。
    4. 点击保存
  6. 身份验证部分中:

    1. 选择需要进行身份验证
    2. 检查 Identity and Access Management (IAM)
  7. 向下滚动并展开容器、网络、安全性

  8. 前往安全标签页:

    • 服务账号:选择服务账号 (swimlane-audit-collector-sa)。
  9. 前往容器标签页:

    1. 点击变量和密钥
    2. 为每个环境变量点击 + 添加变量
    变量名称 示例值 说明
    GCS_BUCKET swimlane-audit GCS 存储分区名称
    GCS_PREFIX swimlane/audit/ 日志文件的前缀
    STATE_KEY swimlane/audit/state.json 状态文件路径
    SWIMLANE_BASE_URL https://us.swimlane.app Swimlane Platform 基准网址
    SWIMLANE_PAT_TOKEN your-personal-access-token Swimlane 个人访问令牌
    SWIMLANE_ACCOUNT_ID your-account-id 泳道账号标识符
    SWIMLANE_TENANT_LIST `` 以英文逗号分隔的租户 ID(可选,如需针对所有租户,请将此字段留空)
    INCLUDE_ACCOUNT true 是否包含账号级日志(true/false)
    PAGE_SIZE 100 每页记录数(最多 100 条)
    LOOKBACK_HOURS 24 初始回溯期
    TIMEOUT 30 API 请求超时时间(以秒为单位)
  10. 变量和 Secret 部分中,向下滚动到请求

    • 请求超时:输入 600 秒(10 分钟)。
  11. 前往设置标签页:

    • 资源部分中:
      • 内存:选择 512 MiB 或更高值。
      • CPU:选择 1
  12. 修订版本扩缩部分中:

    • 实例数下限:输入 0
    • 实例数上限:输入 100(或根据预期负载进行调整)。
  13. 点击创建

  14. 等待服务创建完成(1-2 分钟)。

  15. 创建服务后,系统会自动打开内嵌代码编辑器

添加函数代码

  1. 函数入口点中输入 main
  2. 在内嵌代码编辑器中,创建两个文件:

    • 第一个文件:main.py:
    import functions_framework
    from google.cloud import storage
    import json
    import os
    import urllib3
    from datetime import datetime, timezone, timedelta
    import time
    import uuid
    import gzip
    import io
    
    # Initialize HTTP client with timeouts
    http = urllib3.PoolManager(
        timeout=urllib3.Timeout(connect=5.0, read=30.0),
        retries=False,
    )
    
    # Initialize Storage client
    storage_client = storage.Client()
    
    # Environment variables
    GCS_BUCKET = os.environ.get('GCS_BUCKET')
    GCS_PREFIX = os.environ.get('GCS_PREFIX', 'swimlane/audit/')
    STATE_KEY = os.environ.get('STATE_KEY', 'swimlane/audit/state.json')
    SWIMLANE_BASE_URL = os.environ.get('SWIMLANE_BASE_URL', '').rstrip('/')
    SWIMLANE_PAT_TOKEN = os.environ.get('SWIMLANE_PAT_TOKEN')
    SWIMLANE_ACCOUNT_ID = os.environ.get('SWIMLANE_ACCOUNT_ID')
    SWIMLANE_TENANT_LIST = os.environ.get('SWIMLANE_TENANT_LIST', '')
    INCLUDE_ACCOUNT = os.environ.get('INCLUDE_ACCOUNT', 'true').lower() == 'true'
    PAGE_SIZE = int(os.environ.get('PAGE_SIZE', '100'))
    LOOKBACK_HOURS = int(os.environ.get('LOOKBACK_HOURS', '24'))
    TIMEOUT = int(os.environ.get('TIMEOUT', '30'))
    
    def parse_datetime(value: str) -> datetime:
        """Parse ISO datetime string to datetime object."""
        if value.endswith("Z"):
            value = value[:-1] + "+00:00"
        return datetime.fromisoformat(value)
    
    @functions_framework.cloud_event
    def main(cloud_event):
        """
        Cloud Run function triggered by Pub/Sub to fetch Swimlane Platform logs and write to GCS.
    
        Args:
            cloud_event: CloudEvent object containing Pub/Sub message
        """
    
        if not all([GCS_BUCKET, SWIMLANE_BASE_URL, SWIMLANE_PAT_TOKEN, SWIMLANE_ACCOUNT_ID]):
            print('Error: Missing required environment variables (GCS_BUCKET, SWIMLANE_BASE_URL, SWIMLANE_PAT_TOKEN, SWIMLANE_ACCOUNT_ID)')
            return
    
        try:
            # Get GCS bucket
            bucket = storage_client.bucket(GCS_BUCKET)
    
            # Load state
            state = load_state(bucket, STATE_KEY)
    
            # Determine time window
            now = datetime.now(timezone.utc)
            last_time = None
    
            if isinstance(state, dict) and state.get("last_event_time"):
                try:
                    last_time = parse_datetime(state["last_event_time"])
                    # Overlap by 2 minutes to catch any delayed events
                    last_time = last_time - timedelta(minutes=2)
                except Exception as e:
                    print(f"Warning: Could not parse last_event_time: {e}")
    
            if last_time is None:
                last_time = now - timedelta(hours=LOOKBACK_HOURS)
    
            print(f"Fetching logs from {last_time.isoformat()} to {now.isoformat()}")
    
            # Fetch logs
            records, newest_event_time = fetch_logs(
                base_url=SWIMLANE_BASE_URL,
                pat_token=SWIMLANE_PAT_TOKEN,
                account_id=SWIMLANE_ACCOUNT_ID,
                tenant_list=SWIMLANE_TENANT_LIST,
                include_account=INCLUDE_ACCOUNT,
                start_time=last_time,
                end_time=now,
                page_size=PAGE_SIZE,
            )
    
            if not records:
                print("No new log records found.")
                save_state(bucket, STATE_KEY, now.isoformat())
                return
    
            # Write to GCS as gzipped NDJSON
            timestamp = now.strftime('%Y%m%d_%H%M%S')
            object_key = f"{GCS_PREFIX}{now:%Y/%m/%d}/swimlane-audit-{uuid.uuid4()}.json.gz"
    
            buf = io.BytesIO()
            with gzip.GzipFile(fileobj=buf, mode='w') as gz:
                for record in records:
                    gz.write((json.dumps(record, ensure_ascii=False) + '\n').encode())
    
            buf.seek(0)
            blob = bucket.blob(object_key)
            blob.upload_from_file(buf, content_type='application/gzip')
    
            print(f"Wrote {len(records)} records to gs://{GCS_BUCKET}/{object_key}")
    
            # Update state with newest event time
            if newest_event_time:
                save_state(bucket, STATE_KEY, newest_event_time)
            else:
                save_state(bucket, STATE_KEY, now.isoformat())
    
            print(f"Successfully processed {len(records)} records")
    
        except Exception as e:
            print(f'Error processing logs: {str(e)}')
            raise
    
    def load_state(bucket, key):
        """Load state from GCS."""
        try:
            blob = bucket.blob(key)
            if blob.exists():
                state_data = blob.download_as_text()
                return json.loads(state_data)
        except Exception as e:
            print(f"Warning: Could not load state: {e}")
    
        return {}
    
    def save_state(bucket, key, last_event_time_iso: str):
        """Save the last event timestamp to GCS state file."""
        try:
            state = {
                'last_event_time': last_event_time_iso,
                'updated_at': datetime.now(timezone.utc).isoformat() + 'Z'
            }
            blob = bucket.blob(key)
            blob.upload_from_string(
                json.dumps(state, indent=2),
                content_type='application/json'
            )
            print(f"Saved state: last_event_time={last_event_time_iso}")
        except Exception as e:
            print(f"Warning: Could not save state: {e}")
    
    def fetch_logs(base_url: str, pat_token: str, account_id: str, tenant_list: str, include_account: bool, start_time: datetime, end_time: datetime, page_size: int):
        """
        Fetch logs from Swimlane Platform API with pagination and rate limiting.
    
        Args:
            base_url: Swimlane Platform base URL
            pat_token: Personal Access Token
            account_id: Swimlane account identifier
            tenant_list: Comma-separated tenant IDs (optional)
            include_account: Include account-level logs
            start_time: Start time for log query
            end_time: End time for log query
            page_size: Number of records per page (max 100)
    
        Returns:
            Tuple of (records list, newest_event_time ISO string)
        """
    
        endpoint = f"{base_url}/api/public/audit/account/{account_id}/auditlogs"
    
        headers = {
            'Private-Token': pat_token,
            'Accept': 'application/json',
            'Content-Type': 'application/json',
            'User-Agent': 'GoogleSecOps-SwimlaneCollector/1.0'
        }
    
        records = []
        newest_time = None
        page_num = 1
        backoff = 1.0
    
        while True:
            params = []
            params.append(f"pageNumber={page_num}")
            params.append(f"pageSize={min(page_size, 100)}")
            params.append(f"fromdate={start_time.isoformat()}")
            params.append(f"todate={end_time.isoformat()}")
    
            if tenant_list:
                params.append(f"tenantList={tenant_list}")
    
            params.append(f"includeAccount={'true' if include_account else 'false'}")
    
            url = f"{endpoint}?{'&'.join(params)}"
    
            try:
                response = http.request('GET', url, headers=headers, timeout=TIMEOUT)
    
                # Handle rate limiting with exponential backoff
                if response.status == 429:
                    retry_after = int(response.headers.get('Retry-After', str(int(backoff))))
                    print(f"Rate limited (429). Retrying after {retry_after}s...")
                    time.sleep(retry_after)
                    backoff = min(backoff * 2, 30.0)
                    continue
    
                backoff = 1.0
    
                if response.status == 401:
                    print(f"Authentication failed (401). Verify SWIMLANE_PAT_TOKEN is correct.")
                    return [], None
    
                if response.status == 403:
                    print(f"Access forbidden (403). Verify account has Account Admin permissions to access audit logs.")
                    return [], None
    
                if response.status == 400:
                    print(f"Bad request (400). Verify account_id and query parameters are correct.")
                    response_text = response.data.decode('utf-8')
                    print(f"Response body: {response_text}")
                    return [], None
    
                if response.status != 200:
                    print(f"HTTP Error: {response.status}")
                    response_text = response.data.decode('utf-8')
                    print(f"Response body: {response_text}")
                    return [], None
    
                data = json.loads(response.data.decode('utf-8'))
    
                page_results = data.get('auditlogs', [])
    
                if not page_results:
                    print(f"No more results (empty page)")
                    break
    
                print(f"Page {page_num}: Retrieved {len(page_results)} events")
                records.extend(page_results)
    
                # Track newest event time
                for event in page_results:
                    try:
                        event_time = event.get('eventTime') or event.get('EventTime')
                        if event_time:
                            if newest_time is None or parse_datetime(event_time) > parse_datetime(newest_time):
                                newest_time = event_time
                    except Exception as e:
                        print(f"Warning: Could not parse event time: {e}")
    
                # Check for more results
                has_next = data.get('next')
                total_count = data.get('totalCount', 0)
    
                if not has_next:
                    print(f"Reached last page (no next link)")
                    break
    
                # Check if we've hit the 10,000 log limit
                if total_count > 10000 and len(records) >= 10000:
                    print(f"Warning: Reached Swimlane API limit of 10,000 logs. Consider narrowing the time range.")
                    break
    
                page_num += 1
    
            except Exception as e:
                print(f"Error fetching logs: {e}")
                return [], None
    
        print(f"Retrieved {len(records)} total records from {page_num} pages")
        return records, newest_time
    
    • 第二个文件:requirements.txt:
    functions-framework==3.*
    google-cloud-storage==2.*
    urllib3>=2.0.0
    
  3. 点击部署以保存并部署该函数。

  4. 等待部署完成(2-3 分钟)。

创建 Cloud Scheduler 作业

Cloud Scheduler 会定期向 Pub/Sub 主题发布消息,从而触发 Cloud Run 函数。

  1. GCP Console 中,前往 Cloud Scheduler
  2. 点击创建作业
  3. 提供以下配置详细信息:

    设置
    名称 swimlane-audit-schedule-15min
    区域 选择与 Cloud Run 函数相同的区域
    频率 */15 * * * *(每 15 分钟)
    时区 选择时区(建议选择世界协调时间 [UTC])
    目标类型 Pub/Sub
    主题 选择 Pub/Sub 主题 (swimlane-audit-trigger)
    消息正文 {}(空 JSON 对象)
  4. 点击创建

时间表频率选项

  • 根据日志量和延迟时间要求选择频次:

    频率 Cron 表达式 使用场景
    每隔 5 分钟 */5 * * * * 高容量、低延迟
    每隔 15 分钟 */15 * * * * 标准(推荐)
    每小时 0 * * * * 搜索量中等
    每 6 小时 0 */6 * * * 量小、批处理
    每天 0 0 * * * 历史数据收集

测试集成

  1. Cloud Scheduler 控制台中,找到您的作业。
  2. 点击强制运行以手动触发作业。
  3. 等待几秒钟。
  4. 前往 Cloud Run > 服务
  5. 点击函数名称 (swimlane-audit-collector)。
  6. 点击日志标签页。
  7. 验证函数是否已成功执行。查找以下项:

    Fetching logs from YYYY-MM-DDTHH:MM:SS+00:00 to YYYY-MM-DDTHH:MM:SS+00:00
    Page 1: Retrieved X events
    Wrote X records to gs://bucket-name/swimlane/audit/YYYY/MM/DD/swimlane-audit-UUID.json.gz
    Successfully processed X records
    
  8. 前往 Cloud Storage > 存储分区

  9. 点击您的存储分区名称。

  10. 前往前缀文件夹 (swimlane/audit/)。

  11. 验证是否已创建具有当前时间戳的新 .json.gz 文件。

如果您在日志中看到错误,请执行以下操作:

  • HTTP 401:检查环境变量中的 SWIMLANE_PAT_TOKEN,并验证个人访问令牌是否正确
  • HTTP 403:验证账号是否具有访问审核日志的账号管理员权限
  • HTTP 400:验证 SWIMLANE_ACCOUNT_ID 是否正确,以及查询参数是否有效
  • HTTP 404:验证 SWIMLANE_BASE_网址 和 API 端点路径是否正确
  • HTTP 429:速率限制 - 函数将自动重试并进行退避
  • 缺少环境变量:检查是否已设置所有必需的变量(GCS_BUCKET、SWIMLANE_BASE_网址、SWIMLANE_PAT_TOKEN、SWIMLANE_ACCOUNT_ID)
  • 连接错误:验证与 Swimlane 平台的网络连接和防火墙规则
  • “日志数量上限为 1 万”警告:减少 LOOKBACK_HOURS 或提高 Cloud Scheduler 频率,以符合 Swimlane 的 API 限制

检索 Google SecOps 服务账号

Google SecOps 使用唯一的服务账号从您的 GCS 存储分区中读取数据。您必须授予此服务账号对您的存储分区的访问权限。

获取服务账号电子邮件地址

  1. 依次前往 SIEM 设置 > Feed
  2. 点击添加新 Feed
  3. 点击配置单个 Feed
  4. Feed 名称字段中,输入 Feed 的名称(例如 Swimlane Platform logs)。
  5. 选择 Google Cloud Storage V2 作为来源类型
  6. 选择 Swimlane Platform 作为日志类型
  7. 点击获取服务账号。系统会显示一个唯一的服务账号电子邮件地址,例如:

    chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com
    
  8. 复制此电子邮件地址,以便在下一步中使用。

向 Google SecOps 服务账号授予 IAM 权限

Google SecOps 服务账号需要对您的 GCS 存储分区具有 Storage Object Viewer 角色。

  1. 前往 Cloud Storage > 存储分区
  2. 点击您的存储分区名称。
  3. 前往权限标签页。
  4. 点击授予访问权限
  5. 提供以下配置详细信息:
    • 添加主账号:粘贴 Google SecOps 服务账号电子邮件地址。
    • 分配角色:选择 Storage Object Viewer
  6. 点击保存

在 Google SecOps 中配置 Feed 以提取 Swimlane Platform 日志

  1. 依次前往 SIEM 设置 > Feed
  2. 点击添加新 Feed
  3. 点击配置单个 Feed
  4. Feed 名称字段中,输入 Feed 的名称(例如 Swimlane Platform logs)。
  5. 选择 Google Cloud Storage V2 作为来源类型
  6. 选择 Swimlane Platform 作为日志类型
  7. 点击下一步
  8. 为以下输入参数指定值:

    • 存储分区网址:输入带有前缀路径的 GCS 存储分区 URI:

      gs://swimlane-audit/swimlane/audit/
      
        • swimlane-audit:您的 GCS 存储分区名称。
        • swimlane/audit/:存储日志的前缀/文件夹路径。
    • 来源删除选项:根据您的偏好选择删除选项:

      • 永不:永不删除转移后的任何文件(建议用于测试)。
      • 删除已转移的文件:在成功转移后删除文件。
      • 删除已转移的文件和空目录:在成功转移后删除文件和空目录。

    • 文件存在时间上限:包含在过去指定天数内修改的文件。默认值为 180 天。

    • 资产命名空间资产命名空间

    • 注入标签:要应用于此 Feed 中事件的标签。

  9. 点击下一步

  10. 最终确定界面中查看新的 Feed 配置,然后点击提交

需要更多帮助?获得社区成员和 Google SecOps 专业人士的解答。