收集 Harness IO 审核日志
本文档介绍了如何使用 Amazon S3 将 Harness IO 审核日志注入到 Google Security Operations。
准备工作
请确保满足以下前提条件:
- Google SecOps 实例
- 对 Harness 的特许访问权限,并具有以下权限:
- 创建 API 密钥
- 访问审核日志
- 查看账号设置
- 对 AWS(S3、IAM、Lambda、EventBridge)的特权访问权限。
收集 Harness API 凭据
在 Harness 中创建 API 密钥
- 登录 Harness 平台。
- 点击您的用户个人资料。
- 前往 My API Keys(我的 API 密钥)。
- 点击 + API 密钥。
- 提供以下配置详细信息:
- 名称:输入一个描述性名称(例如
Google SecOps Integration)。 - 说明:可选说明。
- 名称:输入一个描述性名称(例如
- 点击保存。
- 点击 + 令牌以创建新令牌。
- 提供以下配置详细信息:
- 名称:输入
Chronicle Feed Token。 - 设置过期时间:选择合适的过期时间或永不过期(用于生产环境)。
- 名称:输入
- 点击 Generate Token(生成令牌)。
复制并妥善保存令牌值。此令牌将用作
x-api-key标头值。
获取 Harness 账号 ID
- 在 Harness 平台中,记下网址中的账号 ID。
- 示例网址:
https://app.harness.io/ng/account/YOUR_ACCOUNT_ID/... YOUR_ACCOUNT_ID部分是您的账号标识符。
- 示例网址:
- 或者,前往账号设置 > 概览,查看您的账号标识符。
复制并保存账号 ID,以在 Lambda 函数中使用。
为 Google SecOps 配置 AWS S3 存储桶和 IAM
- 按照以下用户指南创建 Amazon S3 存储桶:创建存储桶
- 保存存储桶名称和区域以供日后参考(例如
harness-io-logs)。 - 按照以下用户指南创建用户:创建 IAM 用户。
- 选择创建的用户。
- 选择安全凭据标签页。
- 在访问密钥部分中,点击创建访问密钥。
- 选择第三方服务作为使用情形。
- 点击下一步。
- 可选:添加说明标记。
- 点击创建访问密钥。
- 点击下载 CSV 文件,保存访问密钥和秘密访问密钥,以供日后参考。
- 点击完成。
- 选择权限标签页。
- 在权限政策部分中,点击添加权限。
- 选择添加权限。
- 选择直接附加政策。
- 搜索 AmazonS3FullAccess 政策。
- 选择相应政策。
- 点击下一步。
- 点击添加权限。
为 Lambda S3 上传配置 IAM 政策和角色
- 在 AWS 控制台中,依次前往 IAM > 政策 > 创建政策 > JSON 标签页。
复制并粘贴以下政策:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutHarnessObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::harness-io-logs/harness/audit/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::harness-io-logs/harness/audit/state.json" } ] }- 如果您输入了其他存储桶名称,请替换
harness-io-logs。
- 如果您输入了其他存储桶名称,请替换
点击下一步。
为政策命名
HarnessToS3Policy,然后点击创建政策。依次前往 IAM > 角色 > 创建角色。
选择 AWS service 作为可信实体类型。
选择 Lambda 作为用例。
点击下一步。
搜索并选择以下政策:
HarnessToS3Policy(您刚刚创建的政策)AWSLambdaBasicExecutionRole(适用于 CloudWatch Logs)
点击下一步。
将角色命名为
HarnessAuditLambdaRole,然后点击创建角色。
创建 Lambda 函数
- 在 AWS 控制台中,依次前往 Lambda > 函数 > 创建函数。
- 点击从头开始创作。
提供以下配置详细信息:
设置 值 名称 harness-audit-to-s3运行时 Python 3.13 架构 x86_64 执行角色 HarnessAuditLambdaRole点击创建函数。
创建函数后,打开 Code 标签页。
删除默认桩代码,然后输入以下 Lambda 函数代码:
Lambda 函数代码 (
harness_audit_to_s3.py)#!/usr/bin/env python3 """ Harness.io Audit Logs to S3 Lambda Fetches audit logs from Harness API and writes to S3 for Chronicle ingestion. """ import os import json import time import uuid import logging import urllib.parse from datetime import datetime, timedelta, timezone from urllib.request import Request, urlopen from urllib.error import HTTPError, URLError import boto3 # Configuration from Environment Variables API_BASE = os.environ.get("HARNESS_API_BASE", "https://app.harness.io").rstrip("/") ACCOUNT_ID = os.environ["HARNESS_ACCOUNT_ID"] API_KEY = os.environ["HARNESS_API_KEY"] BUCKET = os.environ["S3_BUCKET"] PREFIX = os.environ.get("S3_PREFIX", "harness/audit").strip("/") STATE_KEY = os.environ.get("STATE_KEY", "harness/audit/state.json") PAGE_SIZE = min(int(os.environ.get("PAGE_SIZE", "50")), 100) START_MINUTES_BACK = int(os.environ.get("START_MINUTES_BACK", "60")) # Optional filters (NEW) FILTER_MODULES = os.environ.get("FILTER_MODULES", "").split(",") if os.environ.get("FILTER_MODULES") else None FILTER_ACTIONS = os.environ.get("FILTER_ACTIONS", "").split(",") if os.environ.get("FILTER_ACTIONS") else None STATIC_FILTER = os.environ.get("STATIC_FILTER") # e.g., "EXCLUDE_LOGIN_EVENTS" MAX_RETRIES = int(os.environ.get("MAX_RETRIES", "3")) # AWS clients s3 = boto3.client("s3") # HTTP headers for Harness API HDRS = { "x-api-key": API_KEY, "Content-Type": "application/json", "Accept": "application/json", } # Logging configuration logger = logging.getLogger() logger.setLevel(logging.INFO) # ============================================ # State Management Functions # ============================================ def _read_state(): """Read checkpoint state from S3.""" try: obj = s3.get_object(Bucket=BUCKET, Key=STATE_KEY) state = json.loads(obj["Body"].read()) since_ms = state.get("since") page_token = state.get("pageToken") logger.info(f"State loaded: since={since_ms}, pageToken={page_token}") return since_ms, page_token except s3.exceptions.NoSuchKey: logger.info("No state file found, starting fresh collection") start_time = datetime.now(timezone.utc) - timedelta(minutes=START_MINUTES_BACK) since_ms = int(start_time.timestamp() * 1000) logger.info(f"Initial since timestamp: {since_ms} ({start_time.isoformat()})") return since_ms, None except Exception as e: logger.error(f"Error reading state: {e}") raise def _write_state(since_ms: int, page_token: str = None): """Write checkpoint state to S3.""" state = { "since": since_ms, "pageToken": page_token, "lastRun": int(time.time() * 1000), "lastRunISO": datetime.now(timezone.utc).isoformat() } try: s3.put_object( Bucket=BUCKET, Key=STATE_KEY, Body=json.dumps(state, indent=2).encode(), ContentType="application/json" ) logger.info(f"State saved: since={since_ms}, pageToken={page_token}") except Exception as e: logger.error(f"Error writing state: {e}") raise # ============================================ # Harness API Functions # ============================================ def _fetch_harness_audits(since_ms: int, page_token: str = None, retry_count: int = 0): """ Fetch audit logs from Harness API with retry logic. API Endpoint: POST /audit/api/audits/listV2 Documentation: https://apidocs.harness.io/audit/getauditeventlistv2 """ try: # Build URL with query parameters url = ( f"{API_BASE}/audit/api/audits/listV2" f"?accountIdentifier={urllib.parse.quote(ACCOUNT_ID)}" f"&pageSize={PAGE_SIZE}" ) if page_token: url += f"&pageToken={urllib.parse.quote(page_token)}" logger.info(f"Fetching from: {url[:100]}...") # Build request body with time filter and optional filters body_data = { "startTime": since_ms, "endTime": int(time.time() * 1000), "filterType": "Audit" } if FILTER_MODULES: body_data["modules"] = [m.strip() for m in FILTER_MODULES if m.strip()] logger.info(f"Applying module filter: {body_data['modules']}") if FILTER_ACTIONS: body_data["actions"] = [a.strip() for a in FILTER_ACTIONS if a.strip()] logger.info(f"Applying action filter: {body_data['actions']}") if STATIC_FILTER: body_data["staticFilter"] = STATIC_FILTER logger.info(f"Applying static filter: {STATIC_FILTER}") logger.debug(f"Request body: {json.dumps(body_data)}") # Make POST request req = Request( url, data=json.dumps(body_data).encode('utf-8'), headers=HDRS, method="POST" ) resp = urlopen(req, timeout=30) resp_text = resp.read().decode('utf-8') resp_data = json.loads(resp_text) if "status" not in resp_data: logger.warning(f"Response missing 'status' field: {resp_text[:200]}") # Check response status if resp_data.get("status") != "SUCCESS": error_msg = resp_data.get("message", "Unknown error") raise Exception(f"API returned status: {resp_data.get('status')} - {error_msg}") # Extract data from response structure data_obj = resp_data.get("data", {}) if not data_obj: logger.warning("Response 'data' object is empty or missing") events = data_obj.get("content", []) has_next = data_obj.get("hasNext", False) next_token = data_obj.get("pageToken") logger.info(f"API response: {len(events)} events, hasNext={has_next}, pageToken={next_token}") if not events and data_obj: logger.info(f"Empty events but data present. Data keys: {list(data_obj.keys())}") return { "events": events, "hasNext": has_next, "pageToken": next_token } except HTTPError as e: error_body = e.read().decode() if hasattr(e, 'read') else '' if e.code == 401: logger.error("Authentication failed: Invalid API key") raise Exception("Invalid Harness API key. Check HARNESS_API_KEY environment variable.") elif e.code == 403: logger.error("Authorization failed: Insufficient permissions") raise Exception("API key lacks required audit:read permissions") elif e.code == 429: retry_after = int(e.headers.get("Retry-After", "60")) logger.warning(f"Rate limit exceeded. Retry after {retry_after} seconds (attempt {retry_count + 1}/{MAX_RETRIES})") if retry_count < MAX_RETRIES: logger.info(f"Waiting {retry_after} seconds before retry...") time.sleep(retry_after) logger.info(f"Retrying request (attempt {retry_count + 2}/{MAX_RETRIES})") return _fetch_harness_audits(since_ms, page_token, retry_count + 1) else: raise Exception(f"Max retries ({MAX_RETRIES}) exceeded for rate limiting") elif e.code == 400: logger.error(f"Bad request: {error_body}") raise Exception(f"Invalid request parameters: {error_body}") else: logger.error(f"HTTP {e.code}: {e.reason} - {error_body}") raise Exception(f"Harness API error {e.code}: {e.reason}") except URLError as e: logger.error(f"Network error: {e.reason}") raise Exception(f"Failed to connect to Harness API: {e.reason}") except json.JSONDecodeError as e: logger.error(f"Invalid JSON response: {e}") logger.error(f"Response text (first 500 chars): {resp_text[:500] if 'resp_text' in locals() else 'N/A'}") raise Exception("Harness API returned invalid JSON") except Exception as e: logger.error(f"Unexpected error in _fetch_harness_audits: {e}", exc_info=True) raise # ============================================ # S3 Upload Functions # ============================================ def _upload_to_s3(events: list) -> str: """ Upload audit events to S3 in JSONL format. Each line is a complete JSON object (one event per line). """ if not events: logger.info("No events to upload") return None try: # Create JSONL content (one JSON object per line) jsonl_lines = [json.dumps(event) for event in events] jsonl_content = "\n".join(jsonl_lines) # Generate S3 key with timestamp and UUID timestamp = datetime.now(timezone.utc) key = ( f"{PREFIX}/" f"{timestamp:%Y/%m/%d}/" f"harness-audit-{timestamp:%Y%m%d-%H%M%S}-{uuid.uuid4()}.jsonl" ) # Upload to S3 s3.put_object( Bucket=BUCKET, Key=key, Body=jsonl_content.encode('utf-8'), ContentType="application/x-ndjson", Metadata={ "event-count": str(len(events)), "source": "harness-audit-lambda", "collection-time": timestamp.isoformat() } ) logger.info(f"Uploaded {len(events)} events to s3://{BUCKET}/{key}") return key except Exception as e: logger.error(f"Error uploading to S3: {e}", exc_info=True) raise # ============================================ # Main Orchestration Function # ============================================ def fetch_and_store(): """ Main function to fetch audit logs from Harness and store in S3. Handles pagination and state management. """ logger.info("=== Harness Audit Collection Started ===") logger.info(f"Configuration: API_BASE={API_BASE}, ACCOUNT_ID={ACCOUNT_ID[:8]}..., PAGE_SIZE={PAGE_SIZE}") if FILTER_MODULES: logger.info(f"Module filter enabled: {FILTER_MODULES}") if FILTER_ACTIONS: logger.info(f"Action filter enabled: {FILTER_ACTIONS}") if STATIC_FILTER: logger.info(f"Static filter enabled: {STATIC_FILTER}") try: # Step 1: Read checkpoint state since_ms, page_token = _read_state() if page_token: logger.info(f"Resuming pagination from saved pageToken") else: since_dt = datetime.fromtimestamp(since_ms / 1000, tz=timezone.utc) logger.info(f"Starting new collection from: {since_dt.isoformat()}") # Step 2: Collect all events with pagination all_events = [] current_page_token = page_token page_count = 0 max_pages = 100 # Safety limit has_next = True while has_next and page_count < max_pages: page_count += 1 logger.info(f"--- Fetching page {page_count} ---") # Fetch one page of results result = _fetch_harness_audits(since_ms, current_page_token) # Extract events events = result.get("events", []) all_events.extend(events) logger.info(f"Page {page_count}: {len(events)} events (total: {len(all_events)})") # Check pagination status has_next = result.get("hasNext", False) current_page_token = result.get("pageToken") if not has_next: logger.info("Pagination complete (hasNext=False)") break if not current_page_token: logger.warning("hasNext=True but no pageToken, stopping pagination") break # Small delay between pages to avoid rate limiting time.sleep(0.5) if page_count >= max_pages: logger.warning(f"Reached max pages limit ({max_pages}), stopping") # Step 3: Upload collected events to S3 if all_events: s3_key = _upload_to_s3(all_events) logger.info(f"Successfully uploaded {len(all_events)} total events") else: logger.info("No new events to upload") s3_key = None # Step 4: Update checkpoint state if not has_next: # Pagination complete - update since to current time for next run new_since = int(time.time() * 1000) _write_state(new_since, None) logger.info(f"Pagination complete, state updated with new since={new_since}") else: # Pagination incomplete - save pageToken for continuation _write_state(since_ms, current_page_token) logger.info(f"Pagination incomplete, saved pageToken for next run") # Step 5: Return result result = { "statusCode": 200, "message": "Success", "eventsCollected": len(all_events), "pagesProcessed": page_count, "paginationComplete": not has_next, "s3Key": s3_key, "filters": { "modules": FILTER_MODULES, "actions": FILTER_ACTIONS, "staticFilter": STATIC_FILTER } } logger.info(f"Collection completed: {json.dumps(result)}") return result except Exception as e: logger.error(f"Collection failed: {e}", exc_info=True) result = { "statusCode": 500, "message": "Error", "error": str(e), "errorType": type(e).__name__ } return result finally: logger.info("=== Harness Audit Collection Finished ===") # ============================================ # Lambda Handler # ============================================ def lambda_handler(event, context): """AWS Lambda handler function.""" return fetch_and_store() # ============================================ # Local Testing # ============================================ if __name__ == "__main__": # For local testing result = lambda_handler(None, None) print(json.dumps(result, indent=2))
点击部署以保存函数代码。
配置 Lambda 环境变量
- 在 Lambda 函数页面中,选择配置标签页。
- 点击左侧边栏中的环境变量。
- 点击修改。
针对以下各项点击添加环境变量:
必需的环境变量:
键 值 说明 HARNESS_ACCOUNT_ID您的 Harness 账号 ID Harness 中的账号标识符 HARNESS_API_KEY您的 API 密钥令牌 具有 audit:read 权限的令牌 S3_BUCKETharness-io-logsS3 存储桶名称 S3_PREFIXharness/auditS3 对象的前缀 STATE_KEYharness/audit/state.jsonS3 中的状态文件路径 可选环境变量:
键 默认值 说明 HARNESS_API_BASEhttps://app.harness.ioHarness API 基础网址 PAGE_SIZE50每页的活动数(最多 100 个) START_MINUTES_BACK60初始回溯周期(以分钟为单位) FILTER_MODULES无 以英文逗号分隔的模块(例如 CD,CI,CE)FILTER_ACTIONS无 以英文逗号分隔的操作(例如, CREATE,UPDATE,DELETE)STATIC_FILTER无 预定义过滤条件: EXCLUDE_LOGIN_EVENTS或EXCLUDE_SYSTEM_EVENTSMAX_RETRIES3速率限制的重试次数上限 点击保存。
配置 Lambda 超时和内存
- 在 Lambda 函数页面中,选择配置标签页。
- 点击左侧边栏中的常规配置。
- 点击修改。
- 提供以下配置详细信息:
- 内存:
256 MB(推荐) - 超时:
5 min 0 sec(300 秒)
- 内存:
- 点击保存。
创建 EventBridge 计划
- 依次前往 Amazon EventBridge > 调度器 > 创建调度。
- 提供以下配置详细信息:
- 时间安排名称:输入
harness-audit-hourly。 - 说明:可选说明。
- 时间安排名称:输入
- 点击下一步。
- 在时间表模式下,选择周期性时间表。
- 选择基于费率的排期。
- 提供以下配置详细信息:
- 费率表达式:输入
1 hour。
- 费率表达式:输入
- 点击下一步。
- 在目标下,提供以下配置详细信息:
- 目标 API:选择 AWS Lambda 调用。
- Lambda 函数:选择您的函数
harness-audit-to-s3。
- 点击下一步。
- 检查日程配置。
- 点击创建时间表。
为 Google SecOps 创建只读 IAM 用户
此 IAM 用户允许 Google SecOps 从 S3 存储桶读取日志。
- 依次前往 AWS 控制台 > IAM > 用户 > 创建用户。
- 提供以下配置详细信息:
- 用户名:输入
chronicle-s3-reader。
- 用户名:输入
- 点击下一步。
- 选择直接附加政策。
- 点击创建政策。
- 选择 JSON 标签页。
粘贴以下政策:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:GetObject" ], "Resource": "arn:aws:s3:::harness-io-logs/harness/audit/*" }, { "Effect": "Allow", "Action": [ "s3:ListBucket" ], "Resource": "arn:aws:s3:::harness-io-logs", "Condition": { "StringLike": { "s3:prefix": "harness/audit/*" } } } ] }点击下一步。
将政策命名为
ChronicleHarnessS3ReadPolicy。点击创建政策。
返回到用户创建标签页,然后刷新政策列表。
搜索并选择
ChronicleHarnessS3ReadPolicy。点击下一步。
检查并点击创建用户。
为读取者用户创建访问密钥
- 在 IAM 用户页面中,选择
chronicle-s3-reader用户。 - 选择安全凭据标签页。
- 点击创建访问密钥。
- 选择第三方服务作为使用情形。
- 点击下一步。
- 可选:添加说明标记。
- 点击创建访问密钥。
- 点击 Download CSV file(下载 CSV 文件)以保存访问密钥 ID 和私有访问密钥。
- 点击完成。
在 Google SecOps 中配置 Feed 以注入 Harness IO 日志
- 依次前往 SIEM 设置> Feed。
- 点击新增。
- 在下一页上,点击配置单个 Feed。
- 在Feed 名称字段中,输入 Feed 的名称(例如
Harness Audit Logs)。 - 选择 Amazon S3 V2 作为来源类型。
- 选择 Harness IO 作为日志类型。
- 点击下一步。
为以下输入参数指定值:
- S3 URI:输入带有前缀路径的 S3 存储桶 URI:
s3://harness-io-logs/harness/audit/ 来源删除选项:根据您的偏好选择删除选项:
- 永不:永不在转移后删除任何文件(建议初始时选择此选项)。
- 成功时:成功转移后删除所有文件和空目录。
文件存在时间上限:包含在过去指定天数内修改的文件。默认值为 180 天。
访问密钥 ID:输入
chronicle-s3-reader用户的访问密钥 ID。私有访问密钥:输入
chronicle-s3-reader用户的私有访问密钥。资产命名空间:资产命名空间。输入
harness.audit。注入标签:要应用于此 Feed 中事件的可选标签。
- S3 URI:输入带有前缀路径的 S3 存储桶 URI:
点击下一步。
在最终确定界面中查看新的 Feed 配置,然后点击提交。
需要更多帮助?获得社区成员和 Google SecOps 专业人士的解答。