收集 Harness IO 审核日志

支持的平台:

本文档介绍了如何使用 Amazon S3 将 Harness IO 审核日志注入到 Google Security Operations。

准备工作

请确保满足以下前提条件:

  • Google SecOps 实例
  • Harness 的特许访问权限,并具有以下权限:
    • 创建 API 密钥
    • 访问审核日志
    • 查看账号设置
  • AWS(S3、IAM、Lambda、EventBridge)的特权访问权限。

收集 Harness API 凭据

在 Harness 中创建 API 密钥

  1. 登录 Harness 平台
  2. 点击您的用户个人资料
  3. 前往 My API Keys(我的 API 密钥)。
  4. 点击 + API 密钥
  5. 提供以下配置详细信息:
    • 名称:输入一个描述性名称(例如 Google SecOps Integration)。
    • 说明:可选说明。
  6. 点击保存
  7. 点击 + 令牌以创建新令牌。
  8. 提供以下配置详细信息:
    • 名称:输入 Chronicle Feed Token
    • 设置过期时间:选择合适的过期时间或永不过期(用于生产环境)。
  9. 点击 Generate Token(生成令牌)。
  10. 复制并妥善保存令牌值。此令牌将用作 x-api-key 标头值。

获取 Harness 账号 ID

  1. Harness 平台中,记下网址中的账号 ID
    • 示例网址:https://app.harness.io/ng/account/YOUR_ACCOUNT_ID/...
    • YOUR_ACCOUNT_ID 部分是您的账号标识符。
  2. 或者,前往账号设置 > 概览,查看您的账号标识符
  3. 复制并保存账号 ID,以在 Lambda 函数中使用。

为 Google SecOps 配置 AWS S3 存储桶和 IAM

  1. 按照以下用户指南创建 Amazon S3 存储桶创建存储桶
  2. 保存存储桶名称区域以供日后参考(例如 harness-io-logs)。
  3. 按照以下用户指南创建用户创建 IAM 用户
  4. 选择创建的用户
  5. 选择安全凭据标签页。
  6. 访问密钥部分中,点击创建访问密钥
  7. 选择第三方服务作为使用情形
  8. 点击下一步
  9. 可选:添加说明标记。
  10. 点击创建访问密钥
  11. 点击下载 CSV 文件,保存访问密钥秘密访问密钥,以供日后参考。
  12. 点击完成
  13. 选择权限标签页。
  14. 权限政策部分中,点击添加权限
  15. 选择添加权限
  16. 选择直接附加政策
  17. 搜索 AmazonS3FullAccess 政策。
  18. 选择相应政策。
  19. 点击下一步
  20. 点击添加权限

为 Lambda S3 上传配置 IAM 政策和角色

  1. 在 AWS 控制台中,依次前往 IAM > 政策 > 创建政策 > JSON 标签页
  2. 复制并粘贴以下政策:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutHarnessObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::harness-io-logs/harness/audit/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": "s3:GetObject",
          "Resource": "arn:aws:s3:::harness-io-logs/harness/audit/state.json"
        }
      ]
    }
    
    • 如果您输入了其他存储桶名称,请替换 harness-io-logs
  3. 点击下一步

  4. 为政策命名 HarnessToS3Policy,然后点击创建政策

  5. 依次前往 IAM > 角色 > 创建角色

  6. 选择 AWS service 作为可信实体类型。

  7. 选择 Lambda 作为用例。

  8. 点击下一步

  9. 搜索并选择以下政策:

    • HarnessToS3Policy(您刚刚创建的政策)
    • AWSLambdaBasicExecutionRole(适用于 CloudWatch Logs)
  10. 点击下一步

  11. 将角色命名为 HarnessAuditLambdaRole,然后点击创建角色

创建 Lambda 函数

  1. AWS 控制台中,依次前往 Lambda > 函数 > 创建函数
  2. 点击从头开始创作
  3. 提供以下配置详细信息:

    设置
    名称 harness-audit-to-s3
    运行时 Python 3.13
    架构 x86_64
    执行角色 HarnessAuditLambdaRole
  4. 点击创建函数

  5. 创建函数后,打开 Code 标签页。

  6. 删除默认桩代码,然后输入以下 Lambda 函数代码:

    • Lambda 函数代码 (harness_audit_to_s3.py)

      #!/usr/bin/env python3
      """
      Harness.io Audit Logs to S3 Lambda
      Fetches audit logs from Harness API and writes to S3 for Chronicle ingestion.
      """
      import os
      import json
      import time
      import uuid
      import logging
      import urllib.parse
      from datetime import datetime, timedelta, timezone
      from urllib.request import Request, urlopen
      from urllib.error import HTTPError, URLError
      import boto3
      
      # Configuration from Environment Variables
      API_BASE = os.environ.get("HARNESS_API_BASE", "https://app.harness.io").rstrip("/")
      ACCOUNT_ID = os.environ["HARNESS_ACCOUNT_ID"]
      API_KEY = os.environ["HARNESS_API_KEY"]
      BUCKET = os.environ["S3_BUCKET"]
      PREFIX = os.environ.get("S3_PREFIX", "harness/audit").strip("/")
      STATE_KEY = os.environ.get("STATE_KEY", "harness/audit/state.json")
      PAGE_SIZE = min(int(os.environ.get("PAGE_SIZE", "50")), 100)
      START_MINUTES_BACK = int(os.environ.get("START_MINUTES_BACK", "60"))
      
      # Optional filters (NEW)
      FILTER_MODULES = os.environ.get("FILTER_MODULES", "").split(",") if os.environ.get("FILTER_MODULES") else None
      FILTER_ACTIONS = os.environ.get("FILTER_ACTIONS", "").split(",") if os.environ.get("FILTER_ACTIONS") else None
      STATIC_FILTER = os.environ.get("STATIC_FILTER")  # e.g., "EXCLUDE_LOGIN_EVENTS"
      MAX_RETRIES = int(os.environ.get("MAX_RETRIES", "3"))
      
      # AWS clients
      s3 = boto3.client("s3")
      
      # HTTP headers for Harness API
      HDRS = {
          "x-api-key": API_KEY,
          "Content-Type": "application/json",
          "Accept": "application/json",
      }
      
      # Logging configuration
      logger = logging.getLogger()
      logger.setLevel(logging.INFO)
      
      # ============================================
      # State Management Functions
      # ============================================
      def _read_state():
          """Read checkpoint state from S3."""
          try:
              obj = s3.get_object(Bucket=BUCKET, Key=STATE_KEY)
              state = json.loads(obj["Body"].read())
      
              since_ms = state.get("since")
              page_token = state.get("pageToken")
      
              logger.info(f"State loaded: since={since_ms}, pageToken={page_token}")
              return since_ms, page_token
      
          except s3.exceptions.NoSuchKey:
              logger.info("No state file found, starting fresh collection")
              start_time = datetime.now(timezone.utc) - timedelta(minutes=START_MINUTES_BACK)
              since_ms = int(start_time.timestamp() * 1000)
              logger.info(f"Initial since timestamp: {since_ms} ({start_time.isoformat()})")
              return since_ms, None
      
          except Exception as e:
              logger.error(f"Error reading state: {e}")
              raise
      
      def _write_state(since_ms: int, page_token: str = None):
          """Write checkpoint state to S3."""
          state = {
              "since": since_ms,
              "pageToken": page_token,
              "lastRun": int(time.time() * 1000),
              "lastRunISO": datetime.now(timezone.utc).isoformat()
          }
      
          try:
              s3.put_object(
                  Bucket=BUCKET,
                  Key=STATE_KEY,
                  Body=json.dumps(state, indent=2).encode(),
                  ContentType="application/json"
              )
              logger.info(f"State saved: since={since_ms}, pageToken={page_token}")
          except Exception as e:
              logger.error(f"Error writing state: {e}")
              raise
      
      # ============================================
      # Harness API Functions
      # ============================================
      def _fetch_harness_audits(since_ms: int, page_token: str = None, retry_count: int = 0):
          """
          Fetch audit logs from Harness API with retry logic.
      
          API Endpoint: POST /audit/api/audits/listV2
          Documentation: https://apidocs.harness.io/audit/getauditeventlistv2
          """
          try:
              # Build URL with query parameters
              url = (
                  f"{API_BASE}/audit/api/audits/listV2"
                  f"?accountIdentifier={urllib.parse.quote(ACCOUNT_ID)}"
                  f"&pageSize={PAGE_SIZE}"
              )
      
              if page_token:
                  url += f"&pageToken={urllib.parse.quote(page_token)}"
      
              logger.info(f"Fetching from: {url[:100]}...")
      
              # Build request body with time filter and optional filters
              body_data = {
                  "startTime": since_ms,
                  "endTime": int(time.time() * 1000),
                  "filterType": "Audit" 
              }
      
              if FILTER_MODULES:
                  body_data["modules"] = [m.strip() for m in FILTER_MODULES if m.strip()]
                  logger.info(f"Applying module filter: {body_data['modules']}")
      
              if FILTER_ACTIONS:
                  body_data["actions"] = [a.strip() for a in FILTER_ACTIONS if a.strip()]
                  logger.info(f"Applying action filter: {body_data['actions']}")
      
              if STATIC_FILTER:
                  body_data["staticFilter"] = STATIC_FILTER
                  logger.info(f"Applying static filter: {STATIC_FILTER}")
      
              logger.debug(f"Request body: {json.dumps(body_data)}")
      
              # Make POST request
              req = Request(
                  url,
                  data=json.dumps(body_data).encode('utf-8'),
                  headers=HDRS,
                  method="POST"
              )
      
              resp = urlopen(req, timeout=30)
              resp_text = resp.read().decode('utf-8')
              resp_data = json.loads(resp_text)
      
              if "status" not in resp_data:
                  logger.warning(f"Response missing 'status' field: {resp_text[:200]}")
      
              # Check response status
              if resp_data.get("status") != "SUCCESS":
                  error_msg = resp_data.get("message", "Unknown error")
                  raise Exception(f"API returned status: {resp_data.get('status')} - {error_msg}")
      
              # Extract data from response structure
              data_obj = resp_data.get("data", {})
      
              if not data_obj:
                  logger.warning("Response 'data' object is empty or missing")
      
              events = data_obj.get("content", [])
              has_next = data_obj.get("hasNext", False)
              next_token = data_obj.get("pageToken")
      
              logger.info(f"API response: {len(events)} events, hasNext={has_next}, pageToken={next_token}")
      
              if not events and data_obj:
                  logger.info(f"Empty events but data present. Data keys: {list(data_obj.keys())}")
      
              return {
                  "events": events,
                  "hasNext": has_next,
                  "pageToken": next_token
              }
      
          except HTTPError as e:
              error_body = e.read().decode() if hasattr(e, 'read') else ''
      
              if e.code == 401:
                  logger.error("Authentication failed: Invalid API key")
                  raise Exception("Invalid Harness API key. Check HARNESS_API_KEY environment variable.")
      
              elif e.code == 403:
                  logger.error("Authorization failed: Insufficient permissions")
                  raise Exception("API key lacks required audit:read permissions")
      
              elif e.code == 429:
                  retry_after = int(e.headers.get("Retry-After", "60"))
                  logger.warning(f"Rate limit exceeded. Retry after {retry_after} seconds (attempt {retry_count + 1}/{MAX_RETRIES})")
      
                  if retry_count < MAX_RETRIES:
                      logger.info(f"Waiting {retry_after} seconds before retry...")
                      time.sleep(retry_after)
                      logger.info(f"Retrying request (attempt {retry_count + 2}/{MAX_RETRIES})")
                      return _fetch_harness_audits(since_ms, page_token, retry_count + 1)
                  else:
                      raise Exception(f"Max retries ({MAX_RETRIES}) exceeded for rate limiting")
      
              elif e.code == 400:
                  logger.error(f"Bad request: {error_body}")
                  raise Exception(f"Invalid request parameters: {error_body}")
      
              else:
                  logger.error(f"HTTP {e.code}: {e.reason} - {error_body}")
                  raise Exception(f"Harness API error {e.code}: {e.reason}")
      
          except URLError as e:
              logger.error(f"Network error: {e.reason}")
              raise Exception(f"Failed to connect to Harness API: {e.reason}")
      
          except json.JSONDecodeError as e:
              logger.error(f"Invalid JSON response: {e}")
              logger.error(f"Response text (first 500 chars): {resp_text[:500] if 'resp_text' in locals() else 'N/A'}")
              raise Exception("Harness API returned invalid JSON")
      
          except Exception as e:
              logger.error(f"Unexpected error in _fetch_harness_audits: {e}", exc_info=True)
              raise
      
      # ============================================
      # S3 Upload Functions
      # ============================================
      def _upload_to_s3(events: list) -> str:
          """
          Upload audit events to S3 in JSONL format.
          Each line is a complete JSON object (one event per line).
          """
          if not events:
              logger.info("No events to upload")
              return None
      
          try:
              # Create JSONL content (one JSON object per line)
              jsonl_lines = [json.dumps(event) for event in events]
              jsonl_content = "\n".join(jsonl_lines)
      
              # Generate S3 key with timestamp and UUID
              timestamp = datetime.now(timezone.utc)
              key = (
                  f"{PREFIX}/"
                  f"{timestamp:%Y/%m/%d}/"
                  f"harness-audit-{timestamp:%Y%m%d-%H%M%S}-{uuid.uuid4()}.jsonl"
              )
      
              # Upload to S3
              s3.put_object(
                  Bucket=BUCKET,
                  Key=key,
                  Body=jsonl_content.encode('utf-8'),
                  ContentType="application/x-ndjson",
                  Metadata={
                      "event-count": str(len(events)),
                      "source": "harness-audit-lambda",
                      "collection-time": timestamp.isoformat()
                  }
              )
      
              logger.info(f"Uploaded {len(events)} events to s3://{BUCKET}/{key}")
              return key
      
          except Exception as e:
              logger.error(f"Error uploading to S3: {e}", exc_info=True)
              raise
      
      # ============================================
      # Main Orchestration Function
      # ============================================
      def fetch_and_store():
          """
          Main function to fetch audit logs from Harness and store in S3.
          Handles pagination and state management.
          """
          logger.info("=== Harness Audit Collection Started ===")
          logger.info(f"Configuration: API_BASE={API_BASE}, ACCOUNT_ID={ACCOUNT_ID[:8]}..., PAGE_SIZE={PAGE_SIZE}")
      
          if FILTER_MODULES:
              logger.info(f"Module filter enabled: {FILTER_MODULES}")
          if FILTER_ACTIONS:
              logger.info(f"Action filter enabled: {FILTER_ACTIONS}")
          if STATIC_FILTER:
              logger.info(f"Static filter enabled: {STATIC_FILTER}")
      
          try:
              # Step 1: Read checkpoint state
              since_ms, page_token = _read_state()
      
              if page_token:
                  logger.info(f"Resuming pagination from saved pageToken")
              else:
                  since_dt = datetime.fromtimestamp(since_ms / 1000, tz=timezone.utc)
                  logger.info(f"Starting new collection from: {since_dt.isoformat()}")
      
              # Step 2: Collect all events with pagination
              all_events = []
              current_page_token = page_token
              page_count = 0
              max_pages = 100  # Safety limit
              has_next = True
      
              while has_next and page_count < max_pages:
                  page_count += 1
                  logger.info(f"--- Fetching page {page_count} ---")
      
                  # Fetch one page of results
                  result = _fetch_harness_audits(since_ms, current_page_token)
      
                  # Extract events
                  events = result.get("events", [])
                  all_events.extend(events)
      
                  logger.info(f"Page {page_count}: {len(events)} events (total: {len(all_events)})")
      
                  # Check pagination status
                  has_next = result.get("hasNext", False)
                  current_page_token = result.get("pageToken")
      
                  if not has_next:
                      logger.info("Pagination complete (hasNext=False)")
                      break
      
                  if not current_page_token:
                      logger.warning("hasNext=True but no pageToken, stopping pagination")
                      break
      
                  # Small delay between pages to avoid rate limiting
                  time.sleep(0.5)
      
              if page_count >= max_pages:
                  logger.warning(f"Reached max pages limit ({max_pages}), stopping")
      
              # Step 3: Upload collected events to S3
              if all_events:
                  s3_key = _upload_to_s3(all_events)
                  logger.info(f"Successfully uploaded {len(all_events)} total events")
              else:
                  logger.info("No new events to upload")
                  s3_key = None
      
              # Step 4: Update checkpoint state
              if not has_next:
                  # Pagination complete - update since to current time for next run
                  new_since = int(time.time() * 1000)
                  _write_state(new_since, None)
                  logger.info(f"Pagination complete, state updated with new since={new_since}")
              else:
                  # Pagination incomplete - save pageToken for continuation
                  _write_state(since_ms, current_page_token)
                  logger.info(f"Pagination incomplete, saved pageToken for next run")
      
              # Step 5: Return result
              result = {
                  "statusCode": 200,
                  "message": "Success",
                  "eventsCollected": len(all_events),
                  "pagesProcessed": page_count,
                  "paginationComplete": not has_next,
                  "s3Key": s3_key,
                  "filters": {
                      "modules": FILTER_MODULES,
                      "actions": FILTER_ACTIONS,
                      "staticFilter": STATIC_FILTER
                  }
              }
      
              logger.info(f"Collection completed: {json.dumps(result)}")
              return result
      
          except Exception as e:
              logger.error(f"Collection failed: {e}", exc_info=True)
      
              result = {
                  "statusCode": 500,
                  "message": "Error",
                  "error": str(e),
                  "errorType": type(e).__name__
              }
      
              return result
      
          finally:
              logger.info("=== Harness Audit Collection Finished ===")
      
      # ============================================
      # Lambda Handler
      # ============================================
      def lambda_handler(event, context):
          """AWS Lambda handler function."""
          return fetch_and_store()
      
      # ============================================
      # Local Testing
      # ============================================
      if __name__ == "__main__":
          # For local testing
          result = lambda_handler(None, None)
          print(json.dumps(result, indent=2))
      

  7. 点击部署以保存函数代码。

配置 Lambda 环境变量

  1. 在 Lambda 函数页面中,选择配置标签页。
  2. 点击左侧边栏中的环境变量
  3. 点击修改
  4. 针对以下各项点击添加环境变量

    必需的环境变量

    说明
    HARNESS_ACCOUNT_ID 您的 Harness 账号 ID Harness 中的账号标识符
    HARNESS_API_KEY 您的 API 密钥令牌 具有 audit:read 权限的令牌
    S3_BUCKET harness-io-logs S3 存储桶名称
    S3_PREFIX harness/audit S3 对象的前缀
    STATE_KEY harness/audit/state.json S3 中的状态文件路径

    可选环境变量

    默认值 说明
    HARNESS_API_BASE https://app.harness.io Harness API 基础网址
    PAGE_SIZE 50 每页的活动数(最多 100 个)
    START_MINUTES_BACK 60 初始回溯周期(以分钟为单位)
    FILTER_MODULES 以英文逗号分隔的模块(例如 CD,CI,CE)
    FILTER_ACTIONS 以英文逗号分隔的操作(例如,CREATE,UPDATE,DELETE)
    STATIC_FILTER 预定义过滤条件:EXCLUDE_LOGIN_EVENTSEXCLUDE_SYSTEM_EVENTS
    MAX_RETRIES 3 速率限制的重试次数上限
  5. 点击保存

配置 Lambda 超时和内存

  1. 在 Lambda 函数页面中,选择配置标签页。
  2. 点击左侧边栏中的常规配置
  3. 点击修改
  4. 提供以下配置详细信息:
    • 内存256 MB(推荐)
    • 超时5 min 0 sec(300 秒)
  5. 点击保存

创建 EventBridge 计划

  1. 依次前往 Amazon EventBridge > 调度器 > 创建调度
  2. 提供以下配置详细信息:
    • 时间安排名称:输入 harness-audit-hourly
    • 说明:可选说明。
  3. 点击下一步
  4. 时间表模式下,选择周期性时间表
  5. 选择基于费率的排期
  6. 提供以下配置详细信息:
    • 费率表达式:输入 1 hour
  7. 点击下一步
  8. 目标下,提供以下配置详细信息:
    • 目标 API:选择 AWS Lambda 调用
    • Lambda 函数:选择您的函数 harness-audit-to-s3
  9. 点击下一步
  10. 检查日程配置。
  11. 点击创建时间表

为 Google SecOps 创建只读 IAM 用户

此 IAM 用户允许 Google SecOps 从 S3 存储桶读取日志。

  1. 依次前往 AWS 控制台 > IAM > 用户 > 创建用户
  2. 提供以下配置详细信息:
    • 用户名:输入 chronicle-s3-reader
  3. 点击下一步
  4. 选择直接附加政策
  5. 点击创建政策
  6. 选择 JSON 标签页。
  7. 粘贴以下政策:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": [
            "s3:GetObject"
          ],
          "Resource": "arn:aws:s3:::harness-io-logs/harness/audit/*"
        },
        {
          "Effect": "Allow",
          "Action": [
            "s3:ListBucket"
          ],
          "Resource": "arn:aws:s3:::harness-io-logs",
          "Condition": {
            "StringLike": {
              "s3:prefix": "harness/audit/*"
            }
          }
        }
      ]
    }
    
  8. 点击下一步

  9. 将政策命名为 ChronicleHarnessS3ReadPolicy

  10. 点击创建政策

  11. 返回到用户创建标签页,然后刷新政策列表。

  12. 搜索并选择 ChronicleHarnessS3ReadPolicy

  13. 点击下一步

  14. 检查并点击创建用户

为读取者用户创建访问密钥

  1. IAM 用户页面中,选择 chronicle-s3-reader 用户。
  2. 选择安全凭据标签页。
  3. 点击创建访问密钥
  4. 选择第三方服务作为使用情形。
  5. 点击下一步
  6. 可选:添加说明标记。
  7. 点击创建访问密钥
  8. 点击 Download CSV file(下载 CSV 文件)以保存访问密钥 ID 和私有访问密钥。
  9. 点击完成

在 Google SecOps 中配置 Feed 以注入 Harness IO 日志

  1. 依次前往 SIEM 设置> Feed
  2. 点击新增
  3. 在下一页上,点击配置单个 Feed
  4. Feed 名称字段中,输入 Feed 的名称(例如 Harness Audit Logs)。
  5. 选择 Amazon S3 V2 作为来源类型
  6. 选择 Harness IO 作为日志类型
  7. 点击下一步
  8. 为以下输入参数指定值:

    • S3 URI:输入带有前缀路径的 S3 存储桶 URI: s3://harness-io-logs/harness/audit/
    • 来源删除选项:根据您的偏好选择删除选项:

      • 永不:永不在转移后删除任何文件(建议初始时选择此选项)。
      • 成功时:成功转移后删除所有文件和空目录。
    • 文件存在时间上限:包含在过去指定天数内修改的文件。默认值为 180 天

    • 访问密钥 ID:输入 chronicle-s3-reader 用户的访问密钥 ID。

    • 私有访问密钥:输入 chronicle-s3-reader 用户的私有访问密钥。

    • 资产命名空间资产命名空间。输入 harness.audit

    • 注入标签:要应用于此 Feed 中事件的可选标签。

  9. 点击下一步

  10. 最终确定界面中查看新的 Feed 配置,然后点击提交

需要更多帮助?获得社区成员和 Google SecOps 专业人士的解答。