收集 Harness IO 审核日志
本文档介绍了如何使用 Google Cloud Storage 将 Harness IO 审核日志提取到 Google Security Operations。Harness 是一个持续交付和 DevOps 平台,提供用于软件交付、功能标志、云成本管理和安全测试的工具。
准备工作
请确保满足以下前提条件:
- Google SecOps 实例
- 已启用 Cloud Storage API 的 GCP 项目
- 创建和管理 GCS 存储分区的权限
- 管理 GCS 存储分区的 IAM 政策的权限
- 创建 Cloud Run 服务、Pub/Sub 主题和 Cloud Scheduler 作业的权限
- 对 Harness 具有特权访问权限,并有权执行以下操作:
- 创建 API 密钥
- 访问审核日志
- 查看账号设置
收集 Harness API 凭据
在 Harness 中创建 API 密钥
- 登录 Harness 平台。
- 点击您的用户个人资料。
- 前往 My API Keys。
- 点击 + API 密钥。
- 提供以下配置详细信息:
- 名称:输入一个描述性名称(例如
Google SecOps Integration)。 - 说明:可选说明。
- 名称:输入一个描述性名称(例如
- 点击保存。
- 点击 + 令牌以创建新令牌。
- 提供以下配置详细信息:
- 名称:输入
Chronicle Feed Token。 - 设置失效时间:选择合适的失效时间或无失效时间(用于生产环境)。
- 名称:输入
- 点击 Generate Token(生成令牌)。
复制并妥善保存令牌值。此令牌将用作
x-api-key标头值。
获取 Harness 账号 ID
- 在 Harness 平台中,记下网址中的账号 ID。
示例网址:https://app.harness.io/ng/account/YOUR_ACCOUNT_ID/...。YOUR_ACCOUNT_ID 部分是您的账号标识符。
或者,前往账号设置 > 概览,查看您的账号标识符。
复制并保存账号 ID,以便在 Cloud Run 函数中使用。
创建 Google Cloud Storage 存储分区
- 前往 Google Cloud 控制台。
- 选择您的项目或创建新项目。
- 在导航菜单中,依次前往 Cloud Storage > 存储分区。
- 点击创建存储分区。
提供以下配置详细信息:
设置 值 为存储分区命名 输入一个全局唯一的名称(例如 harness-io-logs)位置类型 根据您的需求进行选择(区域级、双区域级、多区域级) 位置 选择相应位置(例如 us-central1)存储类别 标准(建议用于经常访问的日志) 访问权限控制 统一(推荐) 保护工具 可选:启用对象版本控制或保留政策 点击创建。
为 Cloud Run 函数创建服务账号
Cloud Run 函数需要一个服务账号,该账号具有向 GCS 存储分区写入内容以及被 Pub/Sub 调用的权限。
创建服务账号
- 在 GCP 控制台中,依次前往 IAM 和管理 > 服务账号。
- 点击创建服务账号。
- 提供以下配置详细信息:
- 服务账号名称:输入
harness-audit-collector-sa。 - 服务账号说明:输入
Service account for Cloud Run function to collect Harness IO audit logs。
- 服务账号名称:输入
- 点击创建并继续。
- 在向此服务账号授予对项目的访问权限部分中,添加以下角色:
- 点击选择角色。
- 搜索并选择 Storage Object Admin。
- 点击 + 添加其他角色。
- 搜索并选择 Cloud Run Invoker。
- 点击 + 添加其他角色。
- 搜索并选择 Cloud Functions Invoker。
- 点击继续。
- 点击完成。
必须拥有这些角色,才能:
- Storage Object Admin:将日志写入 GCS 存储分区并管理状态文件
- Cloud Run Invoker:允许 Pub/Sub 调用函数
- Cloud Functions Invoker:允许调用函数
授予对 GCS 存储分区的 IAM 权限
向服务账号授予对 GCS 存储分区的写入权限:
- 前往 Cloud Storage > 存储分区。
- 点击您的存储分区名称。
- 前往权限标签页。
- 点击授予访问权限。
- 提供以下配置详细信息:
- 添加主账号:输入服务账号电子邮件地址(例如
harness-audit-collector-sa@PROJECT_ID.iam.gserviceaccount.com)。 - 分配角色:选择 Storage Object Admin。
- 添加主账号:输入服务账号电子邮件地址(例如
- 点击保存。
创建发布/订阅主题
创建一个 Pub/Sub 主题,供 Cloud Scheduler 发布消息,并供 Cloud Run 函数订阅。
- 在 GCP 控制台中,前往 Pub/Sub > 主题。
- 点击创建主题。
- 提供以下配置详细信息:
- 主题 ID:输入
harness-audit-trigger。 - 将其他设置保留为默认值。
- 主题 ID:输入
- 点击创建。
创建 Cloud Run 函数以收集日志
Cloud Run 函数由来自 Cloud Scheduler 的 Pub/Sub 消息触发,以从 Harness API 中提取日志并将其写入 GCS。
- 在 GCP 控制台中,前往 Cloud Run。
- 点击创建服务。
- 选择函数(使用内嵌编辑器创建函数)。
在配置部分中,提供以下配置详细信息:
设置 值 Service 名称 harness-audit-collector区域 选择与您的 GCS 存储分区匹配的区域(例如 us-central1)运行时 选择 Python 3.12 或更高版本 在触发器(可选)部分中:
- 点击 + 添加触发器。
- 选择 Cloud Pub/Sub。
- 在选择 Cloud Pub/Sub 主题中,选择 Pub/Sub 主题 (
harness-audit-trigger)。 - 点击保存。
在身份验证部分中:
- 选择需要进行身份验证。
- 检查 Identity and Access Management (IAM)。
向下滚动并展开容器、网络、安全性。
前往安全标签页:
- 服务账号:选择服务账号 (
harness-audit-collector-sa)。
- 服务账号:选择服务账号 (
前往容器标签页:
- 点击变量和密钥。
- 为每个环境变量点击 + 添加变量:
变量名称 示例值 说明 HARNESS_ACCOUNT_ID您的 Harness 账号 ID Harness 中的账号标识符 HARNESS_API_KEY您的 API 密钥令牌 具有 audit:read 权限的令牌 GCS_BUCKETharness-io-logsGCS 存储分区名称 GCS_PREFIXharness/auditGCS 对象的前缀 STATE_KEYharness/audit/state.jsonGCS 中的状态文件路径 - 可选环境变量:
变量名称 默认值 说明 HARNESS_API_BASEhttps://app.harness.ioHarness API 基础网址(针对自托管实例的替换项) PAGE_SIZE50每页的活动数(最多 100 个) START_MINUTES_BACK60初始回溯周期(以分钟为单位) FILTER_MODULES无 以英文逗号分隔的模块(例如 CD,CI,CE)FILTER_ACTIONS无 以英文逗号分隔的操作(例如, CREATE,UPDATE,DELETE)STATIC_FILTER无 预定义过滤条件: EXCLUDE_LOGIN_EVENTS或EXCLUDE_SYSTEM_EVENTSMAX_RETRIES3速率限制的重试次数上限 在变量和密钥标签页中,向下滚动到请求:
- 请求超时:输入
600秒(10 分钟)。
- 请求超时:输入
前往容器中的设置标签页:
- 在资源部分中:
- 内存:选择 512 MiB 或更高值。
- CPU:选择 1。
- 点击完成。
- 在资源部分中:
滚动到执行环境:
- 选择默认(推荐)。
在修订版本扩缩部分中:
- 实例数下限:输入
0。 - 实例数上限:输入
100(或根据预期负载进行调整)。
- 实例数下限:输入
点击创建。
等待服务创建完成(1-2 分钟)。
创建服务后,系统会自动打开内嵌代码编辑器。
添加函数代码
- 在函数入口点中输入 main
在内嵌代码编辑器中,创建两个文件:
- 第一个文件:main.py::
import functions_framework from google.cloud import storage import json import os import urllib3 from datetime import datetime, timedelta, timezone import time # Initialize HTTP client http = urllib3.PoolManager() # Initialize Storage client storage_client = storage.Client() # Configuration from Environment Variables API_BASE = os.environ.get("HARNESS_API_BASE", "https://app.harness.io").rstrip("/") ACCOUNT_ID = os.environ["HARNESS_ACCOUNT_ID"] API_KEY = os.environ["HARNESS_API_KEY"] BUCKET = os.environ["GCS_BUCKET"] PREFIX = os.environ.get("GCS_PREFIX", "harness/audit").strip("/") STATE_KEY = os.environ.get("STATE_KEY", "harness/audit/state.json") PAGE_SIZE = min(int(os.environ.get("PAGE_SIZE", "50")), 100) START_MINUTES_BACK = int(os.environ.get("START_MINUTES_BACK", "60")) # Optional filters FILTER_MODULES = os.environ.get("FILTER_MODULES", "").split(",") if os.environ.get("FILTER_MODULES") else None FILTER_ACTIONS = os.environ.get("FILTER_ACTIONS", "").split(",") if os.environ.get("FILTER_ACTIONS") else None STATIC_FILTER = os.environ.get("STATIC_FILTER") MAX_RETRIES = int(os.environ.get("MAX_RETRIES", "3")) # HTTP headers for Harness API HDRS = { "x-api-key": API_KEY, "Content-Type": "application/json", "Accept": "application/json", } def read_state(bucket): """Read checkpoint state from GCS.""" try: blob = bucket.blob(STATE_KEY) if blob.exists(): state_data = blob.download_as_text() state = json.loads(state_data) since_ms = state.get("since") page_token = state.get("pageToken") print(f"State loaded: since={since_ms}, pageToken={page_token}") return since_ms, page_token except Exception as e: print(f"Warning: Could not load state: {e}") print("No state file found, starting fresh collection") start_time = datetime.now(timezone.utc) - timedelta(minutes=START_MINUTES_BACK) since_ms = int(start_time.timestamp() * 1000) print(f"Initial since timestamp: {since_ms} ({start_time.isoformat()})") return since_ms, None def write_state(bucket, since_ms, page_token=None): """Write checkpoint state to GCS.""" state = { "since": since_ms, "pageToken": page_token, "lastRun": int(time.time() * 1000), "lastRunISO": datetime.now(timezone.utc).isoformat() } try: blob = bucket.blob(STATE_KEY) blob.upload_from_string( json.dumps(state, indent=2), content_type="application/json" ) print(f"State saved: since={since_ms}, pageToken={page_token}") except Exception as e: print(f"Error writing state: {e}") raise def fetch_harness_audits(since_ms, page_token=None, retry_count=0): """ Fetch audit logs from Harness API with retry logic. API Endpoint: POST /audit/api/audits/listV2 """ try: # Build URL with query parameters url = ( f"{API_BASE}/audit/api/audits/listV2" f"?accountIdentifier={ACCOUNT_ID}" f"&pageSize={PAGE_SIZE}" ) if page_token: url += f"&pageToken={page_token}" print(f"Fetching from: {url[:100]}...") # Build request body with time filter and optional filters body_data = { "startTime": since_ms, "endTime": int(time.time() * 1000), "filterType": "Audit" } if FILTER_MODULES: body_data["modules"] = [m.strip() for m in FILTER_MODULES if m.strip()] print(f"Applying module filter: {body_data['modules']}") if FILTER_ACTIONS: body_data["actions"] = [a.strip() for a in FILTER_ACTIONS if a.strip()] print(f"Applying action filter: {body_data['actions']}") if STATIC_FILTER: body_data["staticFilter"] = STATIC_FILTER print(f"Applying static filter: {STATIC_FILTER}") # Make POST request response = http.request( 'POST', url, body=json.dumps(body_data).encode('utf-8'), headers=HDRS, timeout=30.0 ) resp_data = json.loads(response.data.decode('utf-8')) if "status" not in resp_data: print(f"Response missing 'status' field: {response.data[:200]}") # Check response status if resp_data.get("status") != "SUCCESS": error_msg = resp_data.get("message", "Unknown error") raise Exception(f"API returned status: {resp_data.get('status')} - {error_msg}") # Extract data from response structure data_obj = resp_data.get("data", {}) if not data_obj: print("Response 'data' object is empty or missing") events = data_obj.get("content", []) has_next = data_obj.get("hasNext", False) next_token = data_obj.get("pageToken") print(f"API response: {len(events)} events, hasNext={has_next}, pageToken={next_token}") if not events and data_obj: print(f"Empty events but data present. Data keys: {list(data_obj.keys())}") return { "events": events, "hasNext": has_next, "pageToken": next_token } except Exception as e: if hasattr(e, 'status') and e.status == 429: retry_after = 60 print(f"Rate limit exceeded. Retry after {retry_after} seconds (attempt {retry_count + 1}/{MAX_RETRIES})") if retry_count < MAX_RETRIES: print(f"Waiting {retry_after} seconds before retry...") time.sleep(retry_after) print(f"Retrying request (attempt {retry_count + 2}/{MAX_RETRIES})") return fetch_harness_audits(since_ms, page_token, retry_count + 1) else: raise Exception(f"Max retries ({MAX_RETRIES}) exceeded for rate limiting") print(f"Error in fetch_harness_audits: {e}") raise def upload_to_gcs(bucket, events): """Upload audit events to GCS in JSONL format.""" if not events: print("No events to upload") return None try: # Create JSONL content (one JSON object per line) jsonl_lines = [json.dumps(event) for event in events] jsonl_content = "\n".join(jsonl_lines) # Generate GCS key with timestamp timestamp = datetime.now(timezone.utc) key = ( f"{PREFIX}/" f"{timestamp:%Y/%m/%d}/" f"harness-audit-{timestamp:%Y%m%d-%H%M%S}.jsonl" ) # Upload to GCS blob = bucket.blob(key) blob.upload_from_string( jsonl_content, content_type="application/x-ndjson" ) blob.metadata = { "event-count": str(len(events)), "source": "harness-audit-function", "collection-time": timestamp.isoformat() } blob.patch() print(f"Uploaded {len(events)} events to gs://{BUCKET}/{key}") return key except Exception as e: print(f"Error uploading to GCS: {e}") raise @functions_framework.cloud_event def main(cloud_event): """ Cloud Run function triggered by Pub/Sub to fetch Harness audit logs and write to GCS. Args: cloud_event: CloudEvent object containing Pub/Sub message """ print("=== Harness Audit Collection Started ===") print(f"Configuration: API_BASE={API_BASE}, ACCOUNT_ID={ACCOUNT_ID[:8]}..., PAGE_SIZE={PAGE_SIZE}") if FILTER_MODULES: print(f"Module filter enabled: {FILTER_MODULES}") if FILTER_ACTIONS: print(f"Action filter enabled: {FILTER_ACTIONS}") if STATIC_FILTER: print(f"Static filter enabled: {STATIC_FILTER}") try: # Get GCS bucket bucket = storage_client.bucket(BUCKET) # Step 1: Read checkpoint state since_ms, page_token = read_state(bucket) if page_token: print("Resuming pagination from saved pageToken") else: since_dt = datetime.fromtimestamp(since_ms / 1000, tz=timezone.utc) print(f"Starting new collection from: {since_dt.isoformat()}") # Step 2: Collect all events with pagination all_events = [] current_page_token = page_token page_count = 0 max_pages = 100 has_next = True while has_next and page_count < max_pages: page_count += 1 print(f"--- Fetching page {page_count} ---") # Fetch one page of results result = fetch_harness_audits(since_ms, current_page_token) # Extract events events = result.get("events", []) all_events.extend(events) print(f"Page {page_count}: {len(events)} events (total: {len(all_events)})") # Check pagination status has_next = result.get("hasNext", False) current_page_token = result.get("pageToken") if not has_next: print("Pagination complete (hasNext=False)") break if not current_page_token: print("hasNext=True but no pageToken, stopping pagination") break # Small delay between pages to avoid rate limiting time.sleep(0.5) if page_count >= max_pages: print(f"Reached max pages limit ({max_pages}), stopping") # Step 3: Upload collected events to GCS if all_events: gcs_key = upload_to_gcs(bucket, all_events) print(f"Successfully uploaded {len(all_events)} total events") else: print("No new events to upload") gcs_key = None # Step 4: Update checkpoint state if not has_next: # Pagination complete - update since to current time for next run new_since = int(time.time() * 1000) write_state(bucket, new_since, None) print(f"Pagination complete, state updated with new since={new_since}") else: # Pagination incomplete - save pageToken for continuation write_state(bucket, since_ms, current_page_token) print("Pagination incomplete, saved pageToken for next run") # Step 5: Log result result = { "status": "Success", "eventsCollected": len(all_events), "pagesProcessed": page_count, "paginationComplete": not has_next, "gcsKey": gcs_key, "filters": { "modules": FILTER_MODULES, "actions": FILTER_ACTIONS, "staticFilter": STATIC_FILTER } } print(f"Collection completed: {json.dumps(result)}") except Exception as e: print(f"Collection failed: {e}") raise finally: print("=== Harness Audit Collection Finished ===")- 第二个文件:requirements.txt::
functions-framework==3.* google-cloud-storage==2.* urllib3>=2.0.0点击部署以保存并部署该函数。
等待部署完成(2-3 分钟)。
创建 Cloud Scheduler 作业
Cloud Scheduler 会定期向 Pub/Sub 主题发布消息,从而触发 Cloud Run 函数。
- 在 GCP Console 中,前往 Cloud Scheduler。
- 点击创建作业。
提供以下配置详细信息:
设置 值 名称 harness-audit-hourly区域 选择与 Cloud Run 函数相同的区域 频率 0 * * * *(每小时一次,在整点时)时区 选择时区(建议选择世界协调时间 [UTC]) 目标类型 Pub/Sub 主题 选择 Pub/Sub 主题 ( harness-audit-trigger)消息正文 {}(空 JSON 对象)点击创建。
时间表频率选项
根据日志量和延迟时间要求选择频次:
频率 Cron 表达式 使用场景 每隔 5 分钟 */5 * * * *高容量、低延迟 每隔 15 分钟 */15 * * * *搜索量中等 每小时 0 * * * *标准(推荐) 每 6 小时 0 */6 * * *量小、批处理 每天 0 0 * * *历史数据收集
测试集成
- 在 Cloud Scheduler 控制台中,找到您的作业。
- 点击强制运行以手动触发作业。
- 等待几秒钟。
- 前往 Cloud Run > 服务。
- 点击函数名称 (
harness-audit-collector)。 - 点击日志标签页。
验证函数是否已成功执行。查找以下项:
=== Harness Audit Collection Started === State loaded: since=... or No state file found, starting fresh collection --- Fetching page 1 --- API response: X events, hasNext=... Uploaded X events to gs://harness-io-logs/harness/audit/... Successfully processed X records === Harness Audit Collection Finished ===前往 Cloud Storage > 存储分区。
点击您的存储分区名称。
前往前缀文件夹 (
harness/audit/)。验证是否已创建具有当前时间戳的新
.jsonl文件。
如果您在日志中看到错误,请执行以下操作:
- HTTP 401:检查环境变量中的 API 凭据
- HTTP 403:验证账号是否具有所需权限
- HTTP 429:速率限制 - 函数将自动重试并进行退避
缺少环境变量:检查是否已设置所有必需的变量
检索 Google SecOps 服务账号
Google SecOps 使用唯一的服务账号从您的 GCS 存储分区中读取数据。您必须授予此服务账号对您的存储分区的访问权限。
获取服务账号电子邮件地址
- 依次前往 SIEM 设置 > Feed。
- 点击添加新 Feed。
- 点击配置单个 Feed。
- 在Feed 名称字段中,输入 Feed 的名称(例如
Harness Audit Logs)。 - 选择 Google Cloud Storage V2 作为来源类型。
- 选择 Harness IO 作为日志类型。
点击获取服务账号。系统会显示一个唯一的服务账号电子邮件地址,例如:
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com复制此电子邮件地址,以便在下一步中使用。
向 Google SecOps 服务账号授予 IAM 权限
Google SecOps 服务账号需要对您的 GCS 存储分区具有 Storage Object Viewer 角色。
- 前往 Cloud Storage > 存储分区。
- 点击您的存储分区名称。
- 前往权限标签页。
- 点击授予访问权限。
- 提供以下配置详细信息:
- 添加主账号:粘贴 Google SecOps 服务账号电子邮件地址。
- 分配角色:选择 Storage Object Viewer。
点击保存。
在 Google SecOps 中配置 Feed 以提取 Harness IO 日志
- 依次前往 SIEM 设置 > Feed。
- 点击添加新 Feed。
- 点击配置单个 Feed。
- 在Feed 名称字段中,输入 Feed 的名称(例如
Harness Audit Logs)。 - 选择 Google Cloud Storage V2 作为来源类型。
- 选择 Harness IO 作为日志类型。
- 点击下一步。
为以下输入参数指定值:
存储分区网址:输入带有前缀路径的 GCS 存储分区 URI:
gs://harness-io-logs/harness/audit/将
harness-io-logs:您的 GCS 存储分区名称。harness/audit:存储日志的前缀/文件夹路径。
示例:
- 根存储分区:
gs://company-logs/ - 带前缀:
gs://company-logs/harness-logs/ - 使用子文件夹:
gs://company-logs/harness/audit/
- 根存储分区:
来源删除选项:根据您的偏好选择删除选项:
- 永不:永不删除转移后的任何文件(建议用于测试)。
- 删除已转移的文件:在成功转移后删除文件。
删除已转移的文件和空目录:在成功转移后删除文件和空目录。
文件存在时间上限:包含在过去指定天数内修改的文件。默认值为 180 天。
资产命名空间:资产命名空间。输入
harness.audit。注入标签:要应用于此 Feed 中事件的标签。
点击下一步。
在最终确定界面中查看新的 Feed 配置,然后点击提交。
需要更多帮助?获得社区成员和 Google SecOps 专业人士的解答。