收集 Harness IO 稽核記錄

支援的國家/地區:

本文說明如何使用 Amazon S3,將 Harness IO 稽核記錄擷取至 Google Security Operations。

事前準備

請確認您已完成下列事前準備事項:

  • Google SecOps 執行個體
  • 具備下列權限的 Harness 特殊存取權:
    • 建立 API 金鑰
    • 存取稽核記錄
    • 查看帳戶設定
  • AWS (S3、IAM、Lambda、EventBridge) 的特殊存取權。

收集 Harness API 憑證

在 Harness 中建立 API 金鑰

  1. 登入 Harness Platform
  2. 按一下「使用者設定檔」
  3. 前往「我的 API 金鑰」
  4. 按一下「+ API 金鑰」
  5. 請提供下列設定詳細資料:
    • 名稱:輸入描述性名稱 (例如 Google SecOps Integration)。
    • 說明:選填說明。
  6. 按一下 [儲存]
  7. 按一下「+ 權杖」建立新權杖。
  8. 請提供下列設定詳細資料:
    • 「Name」(名稱):輸入 Chronicle Feed Token
    • 設定到期時間:選取適當的到期時間或「永不到期」 (用於正式環境)。
  9. 按一下 [產生憑證]。
  10. 複製並妥善儲存權杖值。這個權杖會做為 x-api-key 標頭值。

取得 Harness 帳戶 ID

  1. Harness 平台中,記下網址中的帳戶 ID
    • 網址範例:https://app.harness.io/ng/account/YOUR_ACCOUNT_ID/...
    • YOUR_ACCOUNT_ID 部分是您的帳戶 ID。
  2. 或者,依序前往「帳戶設定」>「總覽」,查看「帳戶 ID」
  3. 複製並儲存帳戶 ID,以供 Lambda 函式使用。

為 Google SecOps 設定 AWS S3 值區和 IAM

  1. 按照這份使用者指南建立 Amazon S3 bucket建立 bucket
  2. 儲存 bucket 的「名稱」和「地區」,以供日後參考 (例如 harness-io-logs)。
  3. 請按照這份使用者指南建立使用者建立 IAM 使用者
  4. 選取建立的「使用者」
  5. 選取「安全憑證」分頁標籤。
  6. 在「Access Keys」部分中,按一下「Create Access Key」
  7. 選取「第三方服務」做為「用途」
  8. 點選「下一步」
  9. 選用:新增說明標記。
  10. 按一下「建立存取金鑰」
  11. 按一下「下載 CSV 檔案」,儲存「存取金鑰」和「私密存取金鑰」以供日後參考。
  12. 按一下 [完成]
  13. 選取 [權限] 分頁標籤。
  14. 在「權限政策」部分中,按一下「新增權限」
  15. 選取「新增權限」
  16. 選取「直接附加政策」
  17. 搜尋「AmazonS3FullAccess」AmazonS3FullAccess政策。
  18. 選取政策。
  19. 點選「下一步」
  20. 按一下「Add permissions」。

設定 Lambda S3 上傳的身分與存取權管理政策和角色

  1. 在 AWS 控制台中,依序前往「IAM」>「Policies」>「Create policy」>「JSON」分頁標籤
  2. 複製並貼上下列政策:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutHarnessObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::harness-io-logs/harness/audit/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": "s3:GetObject",
          "Resource": "arn:aws:s3:::harness-io-logs/harness/audit/state.json"
        }
      ]
    }
    
    • 如果您輸入了其他 bucket 名稱,請替換 harness-io-logs
  3. 點選「下一步」

  4. 為政策命名 HarnessToS3Policy,然後點選「建立政策」

  5. 依序前往「IAM」>「角色」>「建立角色」

  6. 選取「AWS 服務」做為信任的實體類型。

  7. 選取「Lambda」Lambda做為用途。

  8. 點選「下一步」

  9. 搜尋並選取下列政策:

    • HarnessToS3Policy (您剛建立的政策)
    • AWSLambdaBasicExecutionRole (適用於 CloudWatch Logs)
  10. 點選「下一步」

  11. 為角色命名 HarnessAuditLambdaRole,然後按一下「建立角色」

建立 Lambda 函式

  1. AWS 控制台中,依序前往「Lambda」>「Functions」>「Create function」
  2. 按一下「從頭開始撰寫」
  3. 請提供下列設定詳細資料:

    設定
    名稱 harness-audit-to-s3
    執行階段 Python 3.13
    架構 x86_64
    執行角色 HarnessAuditLambdaRole
  4. 按一下「建立函式」

  5. 建立函式後,開啟「程式碼」分頁。

  6. 刪除預設的存根程式碼,然後輸入下列 Lambda 函式程式碼:

    • Lambda 函式程式碼 (harness_audit_to_s3.py)

      #!/usr/bin/env python3
      """
      Harness.io Audit Logs to S3 Lambda
      Fetches audit logs from Harness API and writes to S3 for Chronicle ingestion.
      """
      import os
      import json
      import time
      import uuid
      import logging
      import urllib.parse
      from datetime import datetime, timedelta, timezone
      from urllib.request import Request, urlopen
      from urllib.error import HTTPError, URLError
      import boto3
      
      # Configuration from Environment Variables
      API_BASE = os.environ.get("HARNESS_API_BASE", "https://app.harness.io").rstrip("/")
      ACCOUNT_ID = os.environ["HARNESS_ACCOUNT_ID"]
      API_KEY = os.environ["HARNESS_API_KEY"]
      BUCKET = os.environ["S3_BUCKET"]
      PREFIX = os.environ.get("S3_PREFIX", "harness/audit").strip("/")
      STATE_KEY = os.environ.get("STATE_KEY", "harness/audit/state.json")
      PAGE_SIZE = min(int(os.environ.get("PAGE_SIZE", "50")), 100)
      START_MINUTES_BACK = int(os.environ.get("START_MINUTES_BACK", "60"))
      
      # Optional filters (NEW)
      FILTER_MODULES = os.environ.get("FILTER_MODULES", "").split(",") if os.environ.get("FILTER_MODULES") else None
      FILTER_ACTIONS = os.environ.get("FILTER_ACTIONS", "").split(",") if os.environ.get("FILTER_ACTIONS") else None
      STATIC_FILTER = os.environ.get("STATIC_FILTER")  # e.g., "EXCLUDE_LOGIN_EVENTS"
      MAX_RETRIES = int(os.environ.get("MAX_RETRIES", "3"))
      
      # AWS clients
      s3 = boto3.client("s3")
      
      # HTTP headers for Harness API
      HDRS = {
          "x-api-key": API_KEY,
          "Content-Type": "application/json",
          "Accept": "application/json",
      }
      
      # Logging configuration
      logger = logging.getLogger()
      logger.setLevel(logging.INFO)
      
      # ============================================
      # State Management Functions
      # ============================================
      def _read_state():
          """Read checkpoint state from S3."""
          try:
              obj = s3.get_object(Bucket=BUCKET, Key=STATE_KEY)
              state = json.loads(obj["Body"].read())
      
              since_ms = state.get("since")
              page_token = state.get("pageToken")
      
              logger.info(f"State loaded: since={since_ms}, pageToken={page_token}")
              return since_ms, page_token
      
          except s3.exceptions.NoSuchKey:
              logger.info("No state file found, starting fresh collection")
              start_time = datetime.now(timezone.utc) - timedelta(minutes=START_MINUTES_BACK)
              since_ms = int(start_time.timestamp() * 1000)
              logger.info(f"Initial since timestamp: {since_ms} ({start_time.isoformat()})")
              return since_ms, None
      
          except Exception as e:
              logger.error(f"Error reading state: {e}")
              raise
      
      def _write_state(since_ms: int, page_token: str = None):
          """Write checkpoint state to S3."""
          state = {
              "since": since_ms,
              "pageToken": page_token,
              "lastRun": int(time.time() * 1000),
              "lastRunISO": datetime.now(timezone.utc).isoformat()
          }
      
          try:
              s3.put_object(
                  Bucket=BUCKET,
                  Key=STATE_KEY,
                  Body=json.dumps(state, indent=2).encode(),
                  ContentType="application/json"
              )
              logger.info(f"State saved: since={since_ms}, pageToken={page_token}")
          except Exception as e:
              logger.error(f"Error writing state: {e}")
              raise
      
      # ============================================
      # Harness API Functions
      # ============================================
      def _fetch_harness_audits(since_ms: int, page_token: str = None, retry_count: int = 0):
          """
          Fetch audit logs from Harness API with retry logic.
      
          API Endpoint: POST /audit/api/audits/listV2
          Documentation: https://apidocs.harness.io/audit/getauditeventlistv2
          """
          try:
              # Build URL with query parameters
              url = (
                  f"{API_BASE}/audit/api/audits/listV2"
                  f"?accountIdentifier={urllib.parse.quote(ACCOUNT_ID)}"
                  f"&pageSize={PAGE_SIZE}"
              )
      
              if page_token:
                  url += f"&pageToken={urllib.parse.quote(page_token)}"
      
              logger.info(f"Fetching from: {url[:100]}...")
      
              # Build request body with time filter and optional filters
              body_data = {
                  "startTime": since_ms,
                  "endTime": int(time.time() * 1000),
                  "filterType": "Audit" 
              }
      
              if FILTER_MODULES:
                  body_data["modules"] = [m.strip() for m in FILTER_MODULES if m.strip()]
                  logger.info(f"Applying module filter: {body_data['modules']}")
      
              if FILTER_ACTIONS:
                  body_data["actions"] = [a.strip() for a in FILTER_ACTIONS if a.strip()]
                  logger.info(f"Applying action filter: {body_data['actions']}")
      
              if STATIC_FILTER:
                  body_data["staticFilter"] = STATIC_FILTER
                  logger.info(f"Applying static filter: {STATIC_FILTER}")
      
              logger.debug(f"Request body: {json.dumps(body_data)}")
      
              # Make POST request
              req = Request(
                  url,
                  data=json.dumps(body_data).encode('utf-8'),
                  headers=HDRS,
                  method="POST"
              )
      
              resp = urlopen(req, timeout=30)
              resp_text = resp.read().decode('utf-8')
              resp_data = json.loads(resp_text)
      
              if "status" not in resp_data:
                  logger.warning(f"Response missing 'status' field: {resp_text[:200]}")
      
              # Check response status
              if resp_data.get("status") != "SUCCESS":
                  error_msg = resp_data.get("message", "Unknown error")
                  raise Exception(f"API returned status: {resp_data.get('status')} - {error_msg}")
      
              # Extract data from response structure
              data_obj = resp_data.get("data", {})
      
              if not data_obj:
                  logger.warning("Response 'data' object is empty or missing")
      
              events = data_obj.get("content", [])
              has_next = data_obj.get("hasNext", False)
              next_token = data_obj.get("pageToken")
      
              logger.info(f"API response: {len(events)} events, hasNext={has_next}, pageToken={next_token}")
      
              if not events and data_obj:
                  logger.info(f"Empty events but data present. Data keys: {list(data_obj.keys())}")
      
              return {
                  "events": events,
                  "hasNext": has_next,
                  "pageToken": next_token
              }
      
          except HTTPError as e:
              error_body = e.read().decode() if hasattr(e, 'read') else ''
      
              if e.code == 401:
                  logger.error("Authentication failed: Invalid API key")
                  raise Exception("Invalid Harness API key. Check HARNESS_API_KEY environment variable.")
      
              elif e.code == 403:
                  logger.error("Authorization failed: Insufficient permissions")
                  raise Exception("API key lacks required audit:read permissions")
      
              elif e.code == 429:
                  retry_after = int(e.headers.get("Retry-After", "60"))
                  logger.warning(f"Rate limit exceeded. Retry after {retry_after} seconds (attempt {retry_count + 1}/{MAX_RETRIES})")
      
                  if retry_count < MAX_RETRIES:
                      logger.info(f"Waiting {retry_after} seconds before retry...")
                      time.sleep(retry_after)
                      logger.info(f"Retrying request (attempt {retry_count + 2}/{MAX_RETRIES})")
                      return _fetch_harness_audits(since_ms, page_token, retry_count + 1)
                  else:
                      raise Exception(f"Max retries ({MAX_RETRIES}) exceeded for rate limiting")
      
              elif e.code == 400:
                  logger.error(f"Bad request: {error_body}")
                  raise Exception(f"Invalid request parameters: {error_body}")
      
              else:
                  logger.error(f"HTTP {e.code}: {e.reason} - {error_body}")
                  raise Exception(f"Harness API error {e.code}: {e.reason}")
      
          except URLError as e:
              logger.error(f"Network error: {e.reason}")
              raise Exception(f"Failed to connect to Harness API: {e.reason}")
      
          except json.JSONDecodeError as e:
              logger.error(f"Invalid JSON response: {e}")
              logger.error(f"Response text (first 500 chars): {resp_text[:500] if 'resp_text' in locals() else 'N/A'}")
              raise Exception("Harness API returned invalid JSON")
      
          except Exception as e:
              logger.error(f"Unexpected error in _fetch_harness_audits: {e}", exc_info=True)
              raise
      
      # ============================================
      # S3 Upload Functions
      # ============================================
      def _upload_to_s3(events: list) -> str:
          """
          Upload audit events to S3 in JSONL format.
          Each line is a complete JSON object (one event per line).
          """
          if not events:
              logger.info("No events to upload")
              return None
      
          try:
              # Create JSONL content (one JSON object per line)
              jsonl_lines = [json.dumps(event) for event in events]
              jsonl_content = "\n".join(jsonl_lines)
      
              # Generate S3 key with timestamp and UUID
              timestamp = datetime.now(timezone.utc)
              key = (
                  f"{PREFIX}/"
                  f"{timestamp:%Y/%m/%d}/"
                  f"harness-audit-{timestamp:%Y%m%d-%H%M%S}-{uuid.uuid4()}.jsonl"
              )
      
              # Upload to S3
              s3.put_object(
                  Bucket=BUCKET,
                  Key=key,
                  Body=jsonl_content.encode('utf-8'),
                  ContentType="application/x-ndjson",
                  Metadata={
                      "event-count": str(len(events)),
                      "source": "harness-audit-lambda",
                      "collection-time": timestamp.isoformat()
                  }
              )
      
              logger.info(f"Uploaded {len(events)} events to s3://{BUCKET}/{key}")
              return key
      
          except Exception as e:
              logger.error(f"Error uploading to S3: {e}", exc_info=True)
              raise
      
      # ============================================
      # Main Orchestration Function
      # ============================================
      def fetch_and_store():
          """
          Main function to fetch audit logs from Harness and store in S3.
          Handles pagination and state management.
          """
          logger.info("=== Harness Audit Collection Started ===")
          logger.info(f"Configuration: API_BASE={API_BASE}, ACCOUNT_ID={ACCOUNT_ID[:8]}..., PAGE_SIZE={PAGE_SIZE}")
      
          if FILTER_MODULES:
              logger.info(f"Module filter enabled: {FILTER_MODULES}")
          if FILTER_ACTIONS:
              logger.info(f"Action filter enabled: {FILTER_ACTIONS}")
          if STATIC_FILTER:
              logger.info(f"Static filter enabled: {STATIC_FILTER}")
      
          try:
              # Step 1: Read checkpoint state
              since_ms, page_token = _read_state()
      
              if page_token:
                  logger.info(f"Resuming pagination from saved pageToken")
              else:
                  since_dt = datetime.fromtimestamp(since_ms / 1000, tz=timezone.utc)
                  logger.info(f"Starting new collection from: {since_dt.isoformat()}")
      
              # Step 2: Collect all events with pagination
              all_events = []
              current_page_token = page_token
              page_count = 0
              max_pages = 100  # Safety limit
              has_next = True
      
              while has_next and page_count < max_pages:
                  page_count += 1
                  logger.info(f"--- Fetching page {page_count} ---")
      
                  # Fetch one page of results
                  result = _fetch_harness_audits(since_ms, current_page_token)
      
                  # Extract events
                  events = result.get("events", [])
                  all_events.extend(events)
      
                  logger.info(f"Page {page_count}: {len(events)} events (total: {len(all_events)})")
      
                  # Check pagination status
                  has_next = result.get("hasNext", False)
                  current_page_token = result.get("pageToken")
      
                  if not has_next:
                      logger.info("Pagination complete (hasNext=False)")
                      break
      
                  if not current_page_token:
                      logger.warning("hasNext=True but no pageToken, stopping pagination")
                      break
      
                  # Small delay between pages to avoid rate limiting
                  time.sleep(0.5)
      
              if page_count >= max_pages:
                  logger.warning(f"Reached max pages limit ({max_pages}), stopping")
      
              # Step 3: Upload collected events to S3
              if all_events:
                  s3_key = _upload_to_s3(all_events)
                  logger.info(f"Successfully uploaded {len(all_events)} total events")
              else:
                  logger.info("No new events to upload")
                  s3_key = None
      
              # Step 4: Update checkpoint state
              if not has_next:
                  # Pagination complete - update since to current time for next run
                  new_since = int(time.time() * 1000)
                  _write_state(new_since, None)
                  logger.info(f"Pagination complete, state updated with new since={new_since}")
              else:
                  # Pagination incomplete - save pageToken for continuation
                  _write_state(since_ms, current_page_token)
                  logger.info(f"Pagination incomplete, saved pageToken for next run")
      
              # Step 5: Return result
              result = {
                  "statusCode": 200,
                  "message": "Success",
                  "eventsCollected": len(all_events),
                  "pagesProcessed": page_count,
                  "paginationComplete": not has_next,
                  "s3Key": s3_key,
                  "filters": {
                      "modules": FILTER_MODULES,
                      "actions": FILTER_ACTIONS,
                      "staticFilter": STATIC_FILTER
                  }
              }
      
              logger.info(f"Collection completed: {json.dumps(result)}")
              return result
      
          except Exception as e:
              logger.error(f"Collection failed: {e}", exc_info=True)
      
              result = {
                  "statusCode": 500,
                  "message": "Error",
                  "error": str(e),
                  "errorType": type(e).__name__
              }
      
              return result
      
          finally:
              logger.info("=== Harness Audit Collection Finished ===")
      
      # ============================================
      # Lambda Handler
      # ============================================
      def lambda_handler(event, context):
          """AWS Lambda handler function."""
          return fetch_and_store()
      
      # ============================================
      # Local Testing
      # ============================================
      if __name__ == "__main__":
          # For local testing
          result = lambda_handler(None, None)
          print(json.dumps(result, indent=2))
      

  7. 按一下「部署」即可儲存函式程式碼。

設定 Lambda 環境變數

  1. 在 Lambda 函式頁面中,選取「設定」分頁標籤。
  2. 按一下左側側欄的「環境變數」
  3. 按一下 [編輯]
  4. 針對下列各項,按一下「新增環境變數」

    必要環境變數:

    說明
    HARNESS_ACCOUNT_ID Harness 帳戶 ID Harness 帳戶 ID
    HARNESS_API_KEY 您的 API 金鑰權杖 具備 audit:read 權限的權杖
    S3_BUCKET harness-io-logs S3 儲存貯體名稱
    S3_PREFIX harness/audit S3 物件的前置字串
    STATE_KEY harness/audit/state.json S3 中的狀態檔案路徑

    選用環境變數:

    預設值 說明
    HARNESS_API_BASE https://app.harness.io Harness API 基準網址
    PAGE_SIZE 50 每頁事件數 (最多 100 個)
    START_MINUTES_BACK 60 初始回溯期 (分鐘)
    FILTER_MODULES 以半形逗號分隔的模組 (例如 CD,CI,CE)
    FILTER_ACTIONS 以半形逗號分隔的動作 (例如 CREATE,UPDATE,DELETE)
    STATIC_FILTER 預先定義的篩選器:EXCLUDE_LOGIN_EVENTSEXCLUDE_SYSTEM_EVENTS
    MAX_RETRIES 3 頻率限制的重試次數上限
  5. 按一下 [儲存]

設定 Lambda 逾時和記憶體

  1. 在 Lambda 函式頁面中,選取「設定」分頁標籤。
  2. 按一下左側邊欄中的「一般設定」
  3. 按一下 [編輯]
  4. 請提供下列設定詳細資料:
    • 記憶體256 MB (建議)
    • 逾時5 min 0 sec (300 秒)
  5. 按一下 [儲存]

建立 EventBridge 排程

  1. 依序前往「Amazon EventBridge」>「Scheduler」>「Create schedule」
  2. 請提供下列設定詳細資料:
    • 時間表名稱:輸入 harness-audit-hourly
    • 說明:選填說明。
  3. 點選「下一步」
  4. 在「排程模式」下方,選取「週期性排程」
  5. 選取「以費率為準的排程」
  6. 請提供下列設定詳細資料:
    • 費率運算式:輸入 1 hour
  7. 點選「下一步」
  8. 在「目標」下方,提供下列設定詳細資料:
    • 目標 API:選取「AWS Lambda Invoke」
    • Lambda 函式:選取函式 harness-audit-to-s3
  9. 點選「下一步」
  10. 檢查時間表設定。
  11. 按一下「建立時間表」

為 Google SecOps 建立唯讀 IAM 使用者

這個 IAM 使用者可讓 Google SecOps 讀取 S3 值區中的記錄。

  1. 前往 AWS 控制台 > IAM > 使用者 > 建立使用者
  2. 請提供下列設定詳細資料:
    • 使用者名稱:輸入 chronicle-s3-reader
  3. 點選「下一步」
  4. 選取「直接附加政策」
  5. 點選「建立政策」
  6. 選取「JSON」分頁標籤。
  7. 貼上下列政策:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": [
            "s3:GetObject"
          ],
          "Resource": "arn:aws:s3:::harness-io-logs/harness/audit/*"
        },
        {
          "Effect": "Allow",
          "Action": [
            "s3:ListBucket"
          ],
          "Resource": "arn:aws:s3:::harness-io-logs",
          "Condition": {
            "StringLike": {
              "s3:prefix": "harness/audit/*"
            }
          }
        }
      ]
    }
    
  8. 點選「下一步」

  9. 將政策命名為 ChronicleHarnessS3ReadPolicy

  10. 點選「建立政策」

  11. 返回使用者建立分頁,然後重新整理政策清單。

  12. 搜尋並選取 ChronicleHarnessS3ReadPolicy

  13. 點選「下一步」

  14. 檢查並按一下「建立使用者」

為讀取者使用者建立存取金鑰

  1. 在「IAM Users」頁面中,選取 chronicle-s3-reader 使用者。
  2. 選取「安全憑證」分頁標籤。
  3. 按一下「建立存取金鑰」
  4. 選取「第三方服務」做為用途。
  5. 點選「下一步」
  6. 選用:新增說明標記。
  7. 按一下「建立存取金鑰」
  8. 按一下「下載 CSV 檔案」,儲存存取金鑰 ID 和私密存取金鑰。
  9. 按一下 [完成]

在 Google SecOps 中設定動態饋給,擷取 Harness IO 記錄

  1. 依序前往「SIEM 設定」>「動態饋給」
  2. 按一下「新增」
  3. 在下一個頁面中,按一下「設定單一動態饋給」
  4. 在「動態饋給名稱」欄位中輸入動態饋給名稱 (例如 Harness Audit Logs)。
  5. 選取「Amazon S3 V2」做為「來源類型」
  6. 選取「Harness IO」做為「記錄類型」
  7. 點選「下一步」
  8. 指定下列輸入參數的值:

    • S3 URI:輸入 S3 值區 URI,並加上前置路徑: s3://harness-io-logs/harness/audit/
    • 來源刪除選項:根據偏好設定選取刪除選項:

      • 永不:轉移後一律不刪除任何檔案 (建議一開始使用)。
      • 成功時:成功移轉後,刪除所有檔案和空白目錄。
    • 檔案存在時間上限:包含在過去天數內修改的檔案。預設值為 180 天

    • 「Access Key ID」(存取金鑰 ID):輸入 chronicle-s3-reader 使用者的存取金鑰 ID。

    • 存取密鑰:輸入 chronicle-s3-reader 使用者的存取密鑰。

    • 資產命名空間資產命名空間。輸入 harness.audit

    • 擷取標籤:要套用至這個動態饋給事件的選用標籤。

  9. 點選「下一步」

  10. 在「Finalize」(完成) 畫面中檢查新的動態饋給設定,然後按一下「Submit」(提交)

還有其他問題嗎?向社群成員和 Google SecOps 專業人員尋求答案。