收集 ZeroFox Platform 日志

支持的平台:

本文档介绍了如何使用 Amazon S3 将 ZeroFox 平台日志注入到 Google Security Operations。

准备工作

请确保满足以下前提条件:

  • Google SecOps 实例。
  • ZeroFox Platform 租户的特权访问权限。
  • AWS(S3、Identity and Access Management (IAM)、Lambda、EventBridge)的特权访问权限。

获取 ZeroFox 前提条件

  1. 前往 https://cloud.zerofox.com,登录 ZeroFox Platform
  2. 依次前往数据连接器 > API 数据 Feed
    • 直接网址(登录后):https://cloud.zerofox.com/data_connectors/api
    • 如果您没有看到此菜单项,请与您的 ZeroFox 管理员联系以获取访问权限。
  3. 点击 Generate Token(生成令牌)或 Create Personal Access Token(创建个人访问令牌)。
  4. 提供以下配置详细信息:
    • 名称:输入一个描述性名称(例如 Google SecOps S3 Ingestion)。
    • 过期时间:根据组织的安全政策选择轮替周期。
    • 权限/Feed:选择以下内容的读取权限:AlertsCTI feeds 以及您要导出的其他数据类型
  5. 点击生成
  6. 复制生成的个人访问令牌并将其保存在安全的位置(您将无法再次查看该令牌)。
  7. 保存 ZEROFOX_BASE_URLhttps://api.zerofox.com(大多数租户的默认值)

为 Google SecOps 配置 AWS S3 存储桶和 IAM

  1. 按照以下用户指南创建 Amazon S3 存储桶创建存储桶
  2. 保存存储桶名称区域以供日后参考(例如 zerofox-platform-logs)。
  3. 按照以下用户指南创建用户创建 IAM 用户
  4. 选择创建的用户
  5. 选择安全凭据标签页。
  6. 访问密钥部分中,点击创建访问密钥
  7. 选择第三方服务作为使用情形
  8. 点击下一步
  9. 可选:添加说明标记。
  10. 点击创建访问密钥
  11. 点击下载 .CSV 文件,保存访问密钥秘密访问密钥,以供日后参考。
  12. 点击完成
  13. 选择权限标签页。
  14. 权限政策部分中,点击添加权限
  15. 选择添加权限
  16. 选择直接附加政策
  17. 搜索 AmazonS3FullAccess 政策。
  18. 选择相应政策。
  19. 点击下一步
  20. 点击添加权限

为 S3 上传配置 IAM 政策和角色

  1. AWS 控制台中,依次前往 IAM > 政策
  2. 依次点击创建政策 > JSON 标签页
  3. 复制并粘贴以下政策。
  4. 政策 JSON(如果您输入了其他存储桶名称,请替换 zerofox-platform-logs):

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::zerofox-platform-logs/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": "s3:GetObject",
          "Resource": "arn:aws:s3:::zerofox-platform-logs/zerofox/platform/state.json"
        }
      ]
    }
    
  5. 依次点击下一步 > 创建政策

  6. 依次前往 IAM > 角色 > 创建角色 > AWS 服务 > Lambda

  7. 附加新创建的政策。

  8. 将角色命名为 ZeroFoxPlatformToS3Role,然后点击创建角色

创建 Lambda 函数

  1. AWS 控制台中,依次前往 Lambda > 函数 > 创建函数
  2. 点击从头开始创作
  3. 提供以下配置详细信息:

    设置
    名称 zerofox_platform_to_s3
    运行时 Python 3.13
    架构 x86_64
    执行角色 ZeroFoxPlatformToS3Role
  4. 创建函数后,打开 Code 标签页,删除桩代码并粘贴以下代码 (zerofox_platform_to_s3.py)。

    #!/usr/bin/env python3
    # Lambda: Pull ZeroFox Platform data (alerts/incidents/logs) to S3 (no transform)
    
    import os, json, time, urllib.parse
    from urllib.request import Request, urlopen
    from urllib.error import HTTPError, URLError
    import boto3
    
    S3_BUCKET     = os.environ["S3_BUCKET"]
    S3_PREFIX     = os.environ.get("S3_PREFIX", "zerofox/platform/")
    STATE_KEY     = os.environ.get("STATE_KEY", "zerofox/platform/state.json")
    LOOKBACK_SEC  = int(os.environ.get("LOOKBACK_SECONDS", "3600"))
    PAGE_SIZE     = int(os.environ.get("PAGE_SIZE", "200"))
    MAX_PAGES     = int(os.environ.get("MAX_PAGES", "20"))
    HTTP_TIMEOUT  = int(os.environ.get("HTTP_TIMEOUT", "60"))
    HTTP_RETRIES  = int(os.environ.get("HTTP_RETRIES", "3"))
    URL_TEMPLATE  = os.environ.get("URL_TEMPLATE", "")
    AUTH_HEADER   = os.environ.get("AUTH_HEADER", "")  # e.g. "Authorization: Bearer <token>"
    
    ZEROFOX_BASE_URL = os.environ.get("ZEROFOX_BASE_URL", "https://api.zerofox.com")
    ZEROFOX_API_TOKEN = os.environ.get("ZEROFOX_API_TOKEN", "")
    
    s3 = boto3.client("s3")
    
    def _iso(ts: float) -> str:
        return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(ts))
    
    def _load_state() -> dict:
        try:
            obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY)
            b = obj["Body"].read()
            return json.loads(b) if b else {}
        except Exception:
            return {"last_since": _iso(time.time() - LOOKBACK_SEC)}
    
    def _save_state(st: dict) -> None:
        s3.put_object(
            Bucket=S3_BUCKET, Key=STATE_KEY,
            Body=json.dumps(st, separators=(",", ":")).encode("utf-8"),
            ContentType="application/json",
        )
    
    def _headers() -> dict:
        hdrs = {"Accept": "application/json", "Content-Type": "application/json"}
        if AUTH_HEADER:
            try:
                k, v = AUTH_HEADER.split(":", 1)
                hdrs[k.strip()] = v.strip()
            except ValueError:
                hdrs["Authorization"] = AUTH_HEADER.strip()
        elif ZEROFOX_API_TOKEN:
            hdrs["Authorization"] = f"Bearer {ZEROFOX_API_TOKEN}"
        return hdrs
    
    def _http_get(url: str) -> dict:
        attempt = 0
        while True:
            try:
                req = Request(url, method="GET")
                for k, v in _headers().items():
                    req.add_header(k, v)
                with urlopen(req, timeout=HTTP_TIMEOUT) as r:
                    body = r.read()
                    try:
                        return json.loads(body.decode("utf-8"))
                    except json.JSONDecodeError:
                        return {"raw": body.decode("utf-8", errors="replace")}
            except HTTPError as e:
                if e.code in (429, 500, 502, 503, 504) and attempt < HTTP_RETRIES:
                    retry_after = int(e.headers.get("Retry-After", 1 + attempt))
                    time.sleep(max(1, retry_after))
                    attempt += 1
                    continue
                raise
            except URLError:
                if attempt < HTTP_RETRIES:
                    time.sleep(1 + attempt)
                    attempt += 1
                    continue
                raise
    
    def _put_json(obj: dict, label: str) -> str:
        ts = time.gmtime()
        key = f"{S3_PREFIX}/{time.strftime('%Y/%m/%d/%H%M%S', ts)}-zerofox-{label}.json"
        s3.put_object(
            Bucket=S3_BUCKET, Key=key,
            Body=json.dumps(obj, separators=(",", ":")).encode("utf-8"),
            ContentType="application/json",
        )
        return key
    
    def _extract_next_token(payload: dict):
        next_token = (payload.get("next") or payload.get("next_token") or 
                      payload.get("nextPageToken") or payload.get("next_page_token"))
        if isinstance(next_token, dict):
            return next_token.get("token") or next_token.get("cursor") or next_token.get("value")
        return next_token
    
    def _extract_items(payload: dict) -> list:
        for key in ("results", "data", "alerts", "items", "logs", "events"):
            if isinstance(payload.get(key), list):
                return payload[key]
        return []
    
    def _extract_newest_timestamp(items: list, current: str) -> str:
        newest = current
        for item in items:
            timestamp = (item.get("timestamp") or item.get("created_at") or 
                        item.get("last_modified") or item.get("event_time") or 
                        item.get("log_time") or item.get("updated_at"))
            if isinstance(timestamp, str) and timestamp > newest:
                newest = timestamp
        return newest
    
    def lambda_handler(event=None, context=None):
        st = _load_state()
        since = st.get("last_since") or _iso(time.time() - LOOKBACK_SEC)
    
        # Use URL_TEMPLATE if provided, otherwise construct default alerts endpoint
        if URL_TEMPLATE:
            base_url = URL_TEMPLATE.replace("{SINCE}", urllib.parse.quote(since))
        else:
            base_url = f"{ZEROFOX_BASE_URL}/v1/alerts?since={urllib.parse.quote(since)}"
    
        page_token = ""
        pages = 0
        total_items = 0
        newest_since = since
    
        while pages < MAX_PAGES:
            # Construct URL with pagination
            if URL_TEMPLATE:
                url = (base_url
                      .replace("{PAGE_TOKEN}", urllib.parse.quote(page_token))
                      .replace("{PAGE_SIZE}", str(PAGE_SIZE)))
            else:
                url = f"{base_url}&limit={PAGE_SIZE}"
                if page_token:
                    url += f"&page_token={urllib.parse.quote(page_token)}"
    
            payload = _http_get(url)
            _put_json(payload, f"page-{pages:05d}")
    
            items = _extract_items(payload)
            total_items += len(items)
            newest_since = _extract_newest_timestamp(items, newest_since)
    
            pages += 1
            next_token = _extract_next_token(payload)
            if not next_token:
                break
            page_token = str(next_token)
    
        if newest_since and newest_since != st.get("last_since"):
            st["last_since"] = newest_since
            _save_state(st)
    
        return {"ok": True, "pages": pages, "items": total_items, "since": since, "new_since": newest_since}
    
    if __name__ == "__main__":
        print(lambda_handler())
    
  5. 依次前往配置 > 环境变量

  6. 依次点击修改 > 添加新的环境变量

  7. 输入下表中提供的环境变量,并将示例值替换为您的值。

    环境变量

    示例值
    S3_BUCKET zerofox-platform-logs
    S3_PREFIX zerofox/platform/
    STATE_KEY zerofox/platform/state.json
    ZEROFOX_BASE_URL https://api.zerofox.com
    ZEROFOX_API_TOKEN your-zerofox-personal-access-token
    LOOKBACK_SECONDS 3600
    PAGE_SIZE 200
    MAX_PAGES 20
    HTTP_TIMEOUT 60
    HTTP_RETRIES 3
    URL_TEMPLATE (可选)包含 {SINCE}{PAGE_TOKEN}{PAGE_SIZE} 的自定义网址模板
    AUTH_HEADER (可选)针对自定义身份验证的 Authorization: Bearer <token>
  8. 创建函数后,请停留在其页面上(或依次打开 Lambda > 函数 > 您的函数)。

  9. 选择配置标签页。

  10. 常规配置面板中,点击修改

  11. 超时更改为 5 分钟(300 秒),然后点击保存

创建 EventBridge 计划

  1. 依次前往 Amazon EventBridge > 调度器 > 创建调度
  2. 提供以下配置详细信息:
    • 周期性安排费率 (1 hour)。
    • 目标:您的 Lambda 函数 zerofox_platform_to_s3
    • 名称zerofox-platform-1h
  3. 点击创建时间表

(可选)为 Google SecOps 创建只读 IAM 用户和密钥

  1. 依次前往 AWS 控制台 > IAM > 用户
  2. 点击 Add users(添加用户)。
  3. 提供以下配置详细信息:
    • 用户:输入 secops-reader
    • 访问类型:选择访问密钥 - 以程序化方式访问
  4. 点击创建用户
  5. 附加最低限度的读取政策(自定义):依次选择用户 > secops-reader > 权限 > 添加权限 > 直接附加政策 > 创建政策
  6. JSON:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::zerofox-platform-logs/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::zerofox-platform-logs"
        }
      ]
    }
    
  7. 名称 = secops-reader-policy

  8. 依次点击创建政策 > 搜索/选择 > 下一步 > 添加权限

  9. secops-reader 创建访问密钥:安全凭据 > 访问密钥

  10. 点击创建访问密钥

  11. 下载 .CSV。(您需要将这些值粘贴到 Feed 中)。

在 Google SecOps 中配置 Feed 以注入 ZeroFox 平台日志

  1. 依次前往 SIEM 设置> Feed
  2. 点击 + 添加新 Feed
  3. Feed 名称字段中,输入 Feed 的名称(例如 ZeroFox Platform Logs)。
  4. 选择 Amazon S3 V2 作为来源类型
  5. 选择 ZeroFox 平台作为日志类型
  6. 点击下一步
  7. 为以下输入参数指定值:
    • S3 URIs3://zerofox-platform-logs/zerofox/platform/
    • 来源删除选项:根据您的偏好选择删除选项。
    • 文件存在时间上限:包含在过去指定天数内修改的文件。默认值为 180 天。
    • 访问密钥 ID:有权访问 S3 存储桶的用户访问密钥。
    • 私有访问密钥:具有 S3 存储桶访问权限的用户私有密钥。
    • 资产命名空间资产命名空间
    • 注入标签:应用于此 Feed 中事件的标签。
  8. 点击下一步
  9. 最终确定界面中查看新的 Feed 配置,然后点击提交

需要更多帮助?从社区成员和 Google SecOps 专业人士那里获得解答。