收集 Harness IO 审核日志

支持的平台:

本文档介绍了如何使用 Amazon S3 将 Harness IO 审核日志注入到 Google Security Operations。

准备工作

  • Google SecOps 实例
  • 对 Harness 的特权访问权限(API 密钥和账号 ID)
  • 对 AWS(S3、IAM、Lambda、EventBridge)的特权访问权限

获取个人账号的 Harness API 密钥和账号 ID

  1. 登录 Harness 网页界面。
  2. 前往您的用户个人资料 > 我的 API 密钥
  3. 选择 API Key
  4. API 密钥输入名称
  5. 点击保存
  6. 在新 API 密钥下选择令牌
  7. 输入令牌的名称
  8. 点击 Generate Token(生成令牌)。
  9. 复制令牌并将其保存在安全的位置。
  10. 复制并保存您的账号 ID(显示在 Harness 网址和账号设置中)。

可选:获取服务账号的 Harness API 密钥和账号 ID

  1. 登录 Harness 网页界面。
  2. 创建服务账号
  3. 依次前往账号设置 > 访问权限控制
  4. 依次选择服务账号 > 选择要为其创建 API 密钥的服务账号
  5. API 密钥下,选择 API 密钥
  6. API 密钥输入名称
  7. 点击保存
  8. 在新 API 密钥下,选择 Token
  9. 输入令牌的名称。
  10. 点击 Generate Token(生成令牌)。
  11. 复制令牌并将其保存在安全的位置。
  12. 复制并保存您的账号 ID(显示在 Harness 网址和账号设置中)。

为 Google SecOps 配置 AWS S3 存储桶和 IAM

  1. 按照以下用户指南创建 Amazon S3 存储桶创建存储桶
  2. 保存存储桶名称区域以供日后参考(例如 harness-io)。
  3. 按照以下用户指南创建用户:创建 IAM 用户
  4. 选择创建的用户
  5. 选择安全凭据标签页。
  6. 访问密钥部分中,点击创建访问密钥
  7. 选择第三方服务作为使用情形
  8. 点击下一步
  9. 可选:添加说明标记。
  10. 点击创建访问密钥
  11. 点击下载 CSV 文件,保存访问密钥不公开的访问密钥以供日后使用。
  12. 点击完成
  13. 选择权限标签页。
  14. 权限政策部分中,点击添加权限
  15. 选择添加权限
  16. 选择直接附加政策
  17. 搜索并选择 AmazonS3FullAccess 政策。
  18. 点击下一步
  19. 点击添加权限

为 S3 上传配置 IAM 政策和角色

  1. AWS 控制台中,依次前往 IAM > 政策 > 创建政策 > JSON 标签页
  2. 输入以下政策:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutHarnessObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::harness-io/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": "s3:GetObject",
          "Resource": "arn:aws:s3:::harness-io/harness/audit/state.json"
        }
      ]
    }
    
    
    • 如果您输入了其他存储桶名称,请替换 harness-io):
  3. 依次点击下一步 > 创建政策

  4. 依次前往 IAM > 角色 > 创建角色 > AWS 服务 > Lambda

  5. 附加新创建的政策。

  6. 将角色命名为 WriteHarnessToS3Role,然后点击创建角色

创建 Lambda 函数

  1. AWS 控制台中,依次前往 Lambda > 函数 > 创建函数
  2. 点击从头开始创作
  3. 提供以下配置详细信息:

    设置
    名称 harness_io_to_s3
    运行时 Python 3.13
    架构 x86_64
    执行角色 WriteHarnessToS3Role
  4. 创建函数后,打开 Code 标签页,删除桩代码并输入以下代码 (harness_io_to_s3.py)。

    #!/usr/bin/env python3
    
    import os, json, time, urllib.parse
    from urllib.request import Request, urlopen
    from urllib.error import HTTPError, URLError
    import boto3
    
    API_BASE = os.environ.get("HARNESS_API_BASE", "https://app.harness.io").rstrip("/")
    ACCOUNT_ID = os.environ["HARNESS_ACCOUNT_ID"]
    API_KEY = os.environ["HARNESS_API_KEY"]  # x-api-key token
    BUCKET = os.environ["S3_BUCKET"]
    PREFIX = os.environ.get("S3_PREFIX", "harness/audit/").strip("/")
    STATE_KEY = os.environ.get("STATE_KEY", "harness/audit/state.json")
    PAGE_SIZE = min(int(os.environ.get("PAGE_SIZE", "100")), 100)  # <=100
    START_MINUTES_BACK = int(os.environ.get("START_MINUTES_BACK", "60"))
    
    s3 = boto3.client("s3")
    HDRS = {"x-api-key": API_KEY, "Content-Type": "application/json", "Accept": "application/json"}
    
    def _read_state():
        try:
            obj = s3.get_object(Bucket=BUCKET, Key=STATE_KEY)
            j = json.loads(obj["Body"].read())
            return j.get("since"), j.get("pageToken")
        except Exception:
            return None, None
    
    def _write_state(since_ms: int, page_token: str | None):
        body = json.dumps({"since": since_ms, "pageToken": page_token}).encode("utf-8")
        s3.put_object(Bucket=BUCKET, Key=STATE_KEY, Body=body, ContentType="application/json")
    
    def _http_post(path: str, body: dict, query: dict, timeout: int = 60, max_retries: int = 5) -> dict:
        qs = urllib.parse.urlencode(query)
        url = f"{API_BASE}{path}?{qs}"
        data = json.dumps(body).encode("utf-8")
    
        attempt, backoff = 0, 1.0
        while True:
            req = Request(url, data=data, method="POST")
            for k, v in HDRS.items():
                req.add_header(k, v)
            try:
                with urlopen(req, timeout=timeout) as r:
                    return json.loads(r.read().decode("utf-8"))
            except HTTPError as e:
                if (e.code == 429 or 500 <= e.code <= 599) and attempt < max_retries:
                    time.sleep(backoff)
                    attempt += 1
                    backoff *= 2
                    continue
                raise
            except URLError:
                if attempt < max_retries:
                    time.sleep(backoff)
                    attempt += 1
                    backoff *= 2
                    continue
                raise
    
    def _write_page(obj: dict, now: float, page_index: int) -> str:
        ts = time.strftime("%Y/%m/%d/%H%M%S", time.gmtime(now))
        key = f"{PREFIX}/{ts}-page{page_index:05d}.json"
        s3.put_object(
            Bucket=BUCKET,
            Key=key,
            Body=json.dumps(obj, separators=(",", ":")).encode("utf-8"),
            ContentType="application/json",
        )
        return key
    
    def fetch_and_store():
        now_s = time.time()
        since_ms, page_token = _read_state()
        if since_ms is None:
            since_ms = int((now_s - START_MINUTES_BACK * 60) * 1000)
        until_ms = int(now_s * 1000)
    
        page_index = 0
        total = 0
    
        while True:
            body = {"startTime": since_ms, "endTime": until_ms}
            query = {"accountIdentifier": ACCOUNT_ID, "pageSize": PAGE_SIZE}
            if page_token:
                query["pageToken"] = page_token
            else:
                query["pageIndex"] = page_index
    
            data = _http_post("/audit/api/audits/listV2", body, query)
            _write_page(data, now_s, page_index)
    
            entries = []
            for key in ("data", "content", "response", "resource", "resources", "items"):
                if isinstance(data.get(key), list):
                    entries = data[key]
                    break
            total += len(entries) if isinstance(entries, list) else 0
    
            next_token = (
                data.get("pageToken")
                or (isinstance(data.get("meta"), dict) and data["meta"].get("pageToken"))
                or (isinstance(data.get("metadata"), dict) and data["metadata"].get("pageToken"))
            )
    
            if next_token:
                page_token = next_token
                page_index += 1
                continue
    
            if len(entries) < PAGE_SIZE:
                break
            page_index += 1
    
        _write_state(until_ms, None)
        return {"pages": page_index + 1, "objects_estimate": total}
    
    def lambda_handler(event=None, context=None):
        return fetch_and_store()
    
    if __name__ == "__main__":
        print(lambda_handler())
    
    
  5. 依次前往配置 > 环境变量 > 修改 > 添加新的环境变量

  6. 输入以下环境变量,并替换为您的值:

    示例
    S3_BUCKET harness-io
    S3_PREFIX harness/audit/
    STATE_KEY harness/audit/state.json
    HARNESS_ACCOUNT_ID 123456789
    HARNESS_API_KEY harness_xxx_token
    HARNESS_API_BASE https://app.harness.io
    PAGE_SIZE 100
    START_MINUTES_BACK 60
  7. 创建函数后,请停留在其页面上(或依次打开 Lambda > 函数 > your‑function)。

  8. 选择配置标签页。

  9. 常规配置面板中,点击修改

  10. 超时更改为 5 分钟(300 秒),然后点击保存

创建 EventBridge 计划

  1. 依次前往 Amazon EventBridge > 调度器 > 创建调度
  2. 提供以下配置详细信息:

    • 周期性安排费率 (1 hour)。
    • 目标:您的 Lambda 函数。
    • 名称harness-io-1h
  3. 点击创建时间表

为 Google SecOps 创建只读 IAM 用户和密钥

  1. 在 AWS 控制台中,依次前往 IAM > 用户,然后点击添加用户
  2. 提供以下配置详细信息:
    • 用户:输入唯一名称(例如 secops-reader
    • 访问类型:选择访问密钥 - 以程序化方式访问
    • 点击创建用户
  3. 附加最低限度的读取政策(自定义):用户 > 选择 secops-reader > 权限 > 添加权限 > 直接附加政策 > 创建政策
  4. 在 JSON 编辑器中,输入以下政策:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::<your-bucket>/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::<your-bucket>"
        }
      ]
    }
    
  5. 将名称设置为 secops-reader-policy

  6. 依次前往创建政策 > 搜索/选择 > 下一步 > 添加权限

  7. 依次前往安全凭据 > 访问密钥 > 创建访问密钥

  8. 下载 CSV(这些值会输入到 Feed 中)。

在 Google SecOps 中配置 Feed 以注入 Harness IO 日志

  1. 依次前往 SIEM 设置 > Feed
  2. 点击添加新 Feed
  3. Feed 名称字段中,输入 Feed 的名称(例如 Harness IO)。
  4. 选择 Amazon S3 V2 作为来源类型
  5. 选择 Harness IO 作为日志类型
  6. 点击下一步
  7. 为以下输入参数指定值:
    • S3 URIs3://harness-io/harness/audit/
    • 来源删除选项:根据您的偏好选择删除选项。
    • 文件最长保留时间:默认值为 180 天。
    • 访问密钥 ID:有权访问 S3 存储桶的用户访问密钥。
    • 私有访问密钥:有权访问 S3 存储桶的用户私有密钥。
    • 资产命名空间资产命名空间
    • 注入标签:要应用于此 Feed 中事件的标签。
  8. 点击下一步
  9. 最终确定界面中查看新的 Feed 配置,然后点击提交

需要更多帮助?获得社区成员和 Google SecOps 专业人士的解答。