Harness IO 감사 로그 수집
이 문서에서는 Amazon S3를 사용하여 Harness IO 감사 로그를 Google Security Operations로 수집하는 방법을 설명합니다.
시작하기 전에
다음 기본 요건이 충족되었는지 확인합니다.
- Google SecOps 인스턴스
- 다음 권한이 있는 Harness에 대한 액세스 권한 관리
- API 키 만들기
- 감사 로그 액세스
- 계정 설정 보기
- AWS (S3, IAM, Lambda, EventBridge)에 대한 액세스 권한
Harness API 사용자 인증 정보 수집
Harness에서 API 키 만들기
- Harness Platform에 로그인합니다.
- 사용자 프로필을 클릭합니다.
- 내 API 키로 이동합니다.
- + API 키를 클릭합니다.
- 다음 구성 세부정보를 제공합니다.
- 이름: 설명이 포함된 이름을 입력합니다 (예:
Google SecOps Integration). - 설명: 선택사항인 설명입니다.
- 이름: 설명이 포함된 이름을 입력합니다 (예:
- 저장을 클릭합니다.
- + 토큰을 클릭하여 새 토큰을 만듭니다.
- 다음 구성 세부정보를 제공합니다.
- 이름:
Chronicle Feed Token를 입력합니다. - 만료 설정: 적절한 만료 시간 또는 만료 없음 (프로덕션용)을 선택합니다.
- 이름:
- 토큰 생성을 클릭합니다.
토큰 값을 복사하여 안전하게 저장합니다. 이 토큰은
x-api-key헤더 값으로 사용됩니다.
Harness 계정 ID 가져오기
- Harness Platform에서 URL의 Account ID를 확인합니다.
- URL 예:
https://app.harness.io/ng/account/YOUR_ACCOUNT_ID/... YOUR_ACCOUNT_ID부분은 계정 식별자입니다.
- URL 예:
- 또는 계정 설정 > 개요로 이동하여 계정 식별자를 확인합니다.
Lambda 함수에서 사용할 계정 ID를 복사하여 저장합니다.
Google SecOps용 AWS S3 버킷 및 IAM 구성
- 이 사용자 가이드(버킷 만들기)에 따라 Amazon S3 버킷을 만듭니다.
- 나중에 참조할 수 있도록 버킷 이름과 리전을 저장합니다(예:
harness-io-logs). - 이 사용자 가이드(IAM 사용자 만들기)에 따라 사용자를 만듭니다.
- 생성된 사용자를 선택합니다.
- 보안용 사용자 인증 정보 탭을 선택합니다.
- 액세스 키 섹션에서 액세스 키 만들기를 클릭합니다.
- 사용 사례로 서드 파티 서비스를 선택합니다.
- 다음을 클릭합니다.
- 선택사항: 설명 태그를 추가합니다.
- 액세스 키 만들기를 클릭합니다.
- CSV 파일 다운로드를 클릭하여 나중에 사용할 수 있도록 액세스 키와 보안 비밀 액세스 키를 저장합니다.
- 완료를 클릭합니다.
- 권한 탭을 선택합니다.
- 권한 정책 섹션에서 권한 추가를 클릭합니다.
- 권한 추가를 선택합니다.
- 정책 직접 연결을 선택합니다.
- AmazonS3FullAccess 정책을 검색합니다.
- 정책을 선택합니다.
- 다음을 클릭합니다.
- 권한 추가를 클릭합니다.
Lambda S3 업로드의 IAM 정책 및 역할 구성
- AWS 콘솔에서 IAM > 정책 > 정책 만들기 > JSON 탭으로 이동합니다.
다음 정책을 복사하여 붙여넣습니다.
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutHarnessObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::harness-io-logs/harness/audit/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::harness-io-logs/harness/audit/state.json" } ] }- 다른 버킷 이름을 입력한 경우
harness-io-logs을 바꿉니다.
- 다른 버킷 이름을 입력한 경우
다음을 클릭합니다.
정책 이름을
HarnessToS3Policy로 지정하고 정책 만들기를 클릭합니다.IAM > 역할 > 역할 만들기로 이동합니다.
신뢰할 수 있는 항목 유형으로 AWS 서비스를 선택합니다.
사용 사례로 Lambda를 선택합니다.
다음을 클릭합니다.
다음 정책을 검색하여 선택합니다.
HarnessToS3Policy(방금 만든 정책)AWSLambdaBasicExecutionRole(CloudWatch Logs의 경우)
다음을 클릭합니다.
역할 이름을
HarnessAuditLambdaRole로 지정하고 역할 만들기를 클릭합니다.
Lambda 함수 만들기
- AWS 콘솔에서 Lambda > 함수 > 함수 만들기로 이동합니다.
- 처음부터 작성을 클릭합니다.
다음 구성 세부정보를 제공합니다.
설정 값 이름 harness-audit-to-s3런타임 Python 3.13 아키텍처 x86_64 실행 역할 HarnessAuditLambdaRole함수 만들기를 클릭합니다.
함수가 생성되면 코드 탭을 엽니다.
기본 스텁 코드를 삭제하고 다음 Lambda 함수 코드를 입력합니다.
Lambda 함수 코드 (
harness_audit_to_s3.py)#!/usr/bin/env python3 """ Harness.io Audit Logs to S3 Lambda Fetches audit logs from Harness API and writes to S3 for Chronicle ingestion. """ import os import json import time import uuid import logging import urllib.parse from datetime import datetime, timedelta, timezone from urllib.request import Request, urlopen from urllib.error import HTTPError, URLError import boto3 # Configuration from Environment Variables API_BASE = os.environ.get("HARNESS_API_BASE", "https://app.harness.io").rstrip("/") ACCOUNT_ID = os.environ["HARNESS_ACCOUNT_ID"] API_KEY = os.environ["HARNESS_API_KEY"] BUCKET = os.environ["S3_BUCKET"] PREFIX = os.environ.get("S3_PREFIX", "harness/audit").strip("/") STATE_KEY = os.environ.get("STATE_KEY", "harness/audit/state.json") PAGE_SIZE = min(int(os.environ.get("PAGE_SIZE", "50")), 100) START_MINUTES_BACK = int(os.environ.get("START_MINUTES_BACK", "60")) # Optional filters (NEW) FILTER_MODULES = os.environ.get("FILTER_MODULES", "").split(",") if os.environ.get("FILTER_MODULES") else None FILTER_ACTIONS = os.environ.get("FILTER_ACTIONS", "").split(",") if os.environ.get("FILTER_ACTIONS") else None STATIC_FILTER = os.environ.get("STATIC_FILTER") # e.g., "EXCLUDE_LOGIN_EVENTS" MAX_RETRIES = int(os.environ.get("MAX_RETRIES", "3")) # AWS clients s3 = boto3.client("s3") # HTTP headers for Harness API HDRS = { "x-api-key": API_KEY, "Content-Type": "application/json", "Accept": "application/json", } # Logging configuration logger = logging.getLogger() logger.setLevel(logging.INFO) # ============================================ # State Management Functions # ============================================ def _read_state(): """Read checkpoint state from S3.""" try: obj = s3.get_object(Bucket=BUCKET, Key=STATE_KEY) state = json.loads(obj["Body"].read()) since_ms = state.get("since") page_token = state.get("pageToken") logger.info(f"State loaded: since={since_ms}, pageToken={page_token}") return since_ms, page_token except s3.exceptions.NoSuchKey: logger.info("No state file found, starting fresh collection") start_time = datetime.now(timezone.utc) - timedelta(minutes=START_MINUTES_BACK) since_ms = int(start_time.timestamp() * 1000) logger.info(f"Initial since timestamp: {since_ms} ({start_time.isoformat()})") return since_ms, None except Exception as e: logger.error(f"Error reading state: {e}") raise def _write_state(since_ms: int, page_token: str = None): """Write checkpoint state to S3.""" state = { "since": since_ms, "pageToken": page_token, "lastRun": int(time.time() * 1000), "lastRunISO": datetime.now(timezone.utc).isoformat() } try: s3.put_object( Bucket=BUCKET, Key=STATE_KEY, Body=json.dumps(state, indent=2).encode(), ContentType="application/json" ) logger.info(f"State saved: since={since_ms}, pageToken={page_token}") except Exception as e: logger.error(f"Error writing state: {e}") raise # ============================================ # Harness API Functions # ============================================ def _fetch_harness_audits(since_ms: int, page_token: str = None, retry_count: int = 0): """ Fetch audit logs from Harness API with retry logic. API Endpoint: POST /audit/api/audits/listV2 Documentation: https://apidocs.harness.io/audit/getauditeventlistv2 """ try: # Build URL with query parameters url = ( f"{API_BASE}/audit/api/audits/listV2" f"?accountIdentifier={urllib.parse.quote(ACCOUNT_ID)}" f"&pageSize={PAGE_SIZE}" ) if page_token: url += f"&pageToken={urllib.parse.quote(page_token)}" logger.info(f"Fetching from: {url[:100]}...") # Build request body with time filter and optional filters body_data = { "startTime": since_ms, "endTime": int(time.time() * 1000), "filterType": "Audit" } if FILTER_MODULES: body_data["modules"] = [m.strip() for m in FILTER_MODULES if m.strip()] logger.info(f"Applying module filter: {body_data['modules']}") if FILTER_ACTIONS: body_data["actions"] = [a.strip() for a in FILTER_ACTIONS if a.strip()] logger.info(f"Applying action filter: {body_data['actions']}") if STATIC_FILTER: body_data["staticFilter"] = STATIC_FILTER logger.info(f"Applying static filter: {STATIC_FILTER}") logger.debug(f"Request body: {json.dumps(body_data)}") # Make POST request req = Request( url, data=json.dumps(body_data).encode('utf-8'), headers=HDRS, method="POST" ) resp = urlopen(req, timeout=30) resp_text = resp.read().decode('utf-8') resp_data = json.loads(resp_text) if "status" not in resp_data: logger.warning(f"Response missing 'status' field: {resp_text[:200]}") # Check response status if resp_data.get("status") != "SUCCESS": error_msg = resp_data.get("message", "Unknown error") raise Exception(f"API returned status: {resp_data.get('status')} - {error_msg}") # Extract data from response structure data_obj = resp_data.get("data", {}) if not data_obj: logger.warning("Response 'data' object is empty or missing") events = data_obj.get("content", []) has_next = data_obj.get("hasNext", False) next_token = data_obj.get("pageToken") logger.info(f"API response: {len(events)} events, hasNext={has_next}, pageToken={next_token}") if not events and data_obj: logger.info(f"Empty events but data present. Data keys: {list(data_obj.keys())}") return { "events": events, "hasNext": has_next, "pageToken": next_token } except HTTPError as e: error_body = e.read().decode() if hasattr(e, 'read') else '' if e.code == 401: logger.error("Authentication failed: Invalid API key") raise Exception("Invalid Harness API key. Check HARNESS_API_KEY environment variable.") elif e.code == 403: logger.error("Authorization failed: Insufficient permissions") raise Exception("API key lacks required audit:read permissions") elif e.code == 429: retry_after = int(e.headers.get("Retry-After", "60")) logger.warning(f"Rate limit exceeded. Retry after {retry_after} seconds (attempt {retry_count + 1}/{MAX_RETRIES})") if retry_count < MAX_RETRIES: logger.info(f"Waiting {retry_after} seconds before retry...") time.sleep(retry_after) logger.info(f"Retrying request (attempt {retry_count + 2}/{MAX_RETRIES})") return _fetch_harness_audits(since_ms, page_token, retry_count + 1) else: raise Exception(f"Max retries ({MAX_RETRIES}) exceeded for rate limiting") elif e.code == 400: logger.error(f"Bad request: {error_body}") raise Exception(f"Invalid request parameters: {error_body}") else: logger.error(f"HTTP {e.code}: {e.reason} - {error_body}") raise Exception(f"Harness API error {e.code}: {e.reason}") except URLError as e: logger.error(f"Network error: {e.reason}") raise Exception(f"Failed to connect to Harness API: {e.reason}") except json.JSONDecodeError as e: logger.error(f"Invalid JSON response: {e}") logger.error(f"Response text (first 500 chars): {resp_text[:500] if 'resp_text' in locals() else 'N/A'}") raise Exception("Harness API returned invalid JSON") except Exception as e: logger.error(f"Unexpected error in _fetch_harness_audits: {e}", exc_info=True) raise # ============================================ # S3 Upload Functions # ============================================ def _upload_to_s3(events: list) -> str: """ Upload audit events to S3 in JSONL format. Each line is a complete JSON object (one event per line). """ if not events: logger.info("No events to upload") return None try: # Create JSONL content (one JSON object per line) jsonl_lines = [json.dumps(event) for event in events] jsonl_content = "\n".join(jsonl_lines) # Generate S3 key with timestamp and UUID timestamp = datetime.now(timezone.utc) key = ( f"{PREFIX}/" f"{timestamp:%Y/%m/%d}/" f"harness-audit-{timestamp:%Y%m%d-%H%M%S}-{uuid.uuid4()}.jsonl" ) # Upload to S3 s3.put_object( Bucket=BUCKET, Key=key, Body=jsonl_content.encode('utf-8'), ContentType="application/x-ndjson", Metadata={ "event-count": str(len(events)), "source": "harness-audit-lambda", "collection-time": timestamp.isoformat() } ) logger.info(f"Uploaded {len(events)} events to s3://{BUCKET}/{key}") return key except Exception as e: logger.error(f"Error uploading to S3: {e}", exc_info=True) raise # ============================================ # Main Orchestration Function # ============================================ def fetch_and_store(): """ Main function to fetch audit logs from Harness and store in S3. Handles pagination and state management. """ logger.info("=== Harness Audit Collection Started ===") logger.info(f"Configuration: API_BASE={API_BASE}, ACCOUNT_ID={ACCOUNT_ID[:8]}..., PAGE_SIZE={PAGE_SIZE}") if FILTER_MODULES: logger.info(f"Module filter enabled: {FILTER_MODULES}") if FILTER_ACTIONS: logger.info(f"Action filter enabled: {FILTER_ACTIONS}") if STATIC_FILTER: logger.info(f"Static filter enabled: {STATIC_FILTER}") try: # Step 1: Read checkpoint state since_ms, page_token = _read_state() if page_token: logger.info(f"Resuming pagination from saved pageToken") else: since_dt = datetime.fromtimestamp(since_ms / 1000, tz=timezone.utc) logger.info(f"Starting new collection from: {since_dt.isoformat()}") # Step 2: Collect all events with pagination all_events = [] current_page_token = page_token page_count = 0 max_pages = 100 # Safety limit has_next = True while has_next and page_count < max_pages: page_count += 1 logger.info(f"--- Fetching page {page_count} ---") # Fetch one page of results result = _fetch_harness_audits(since_ms, current_page_token) # Extract events events = result.get("events", []) all_events.extend(events) logger.info(f"Page {page_count}: {len(events)} events (total: {len(all_events)})") # Check pagination status has_next = result.get("hasNext", False) current_page_token = result.get("pageToken") if not has_next: logger.info("Pagination complete (hasNext=False)") break if not current_page_token: logger.warning("hasNext=True but no pageToken, stopping pagination") break # Small delay between pages to avoid rate limiting time.sleep(0.5) if page_count >= max_pages: logger.warning(f"Reached max pages limit ({max_pages}), stopping") # Step 3: Upload collected events to S3 if all_events: s3_key = _upload_to_s3(all_events) logger.info(f"Successfully uploaded {len(all_events)} total events") else: logger.info("No new events to upload") s3_key = None # Step 4: Update checkpoint state if not has_next: # Pagination complete - update since to current time for next run new_since = int(time.time() * 1000) _write_state(new_since, None) logger.info(f"Pagination complete, state updated with new since={new_since}") else: # Pagination incomplete - save pageToken for continuation _write_state(since_ms, current_page_token) logger.info(f"Pagination incomplete, saved pageToken for next run") # Step 5: Return result result = { "statusCode": 200, "message": "Success", "eventsCollected": len(all_events), "pagesProcessed": page_count, "paginationComplete": not has_next, "s3Key": s3_key, "filters": { "modules": FILTER_MODULES, "actions": FILTER_ACTIONS, "staticFilter": STATIC_FILTER } } logger.info(f"Collection completed: {json.dumps(result)}") return result except Exception as e: logger.error(f"Collection failed: {e}", exc_info=True) result = { "statusCode": 500, "message": "Error", "error": str(e), "errorType": type(e).__name__ } return result finally: logger.info("=== Harness Audit Collection Finished ===") # ============================================ # Lambda Handler # ============================================ def lambda_handler(event, context): """AWS Lambda handler function.""" return fetch_and_store() # ============================================ # Local Testing # ============================================ if __name__ == "__main__": # For local testing result = lambda_handler(None, None) print(json.dumps(result, indent=2))
배포를 클릭하여 함수 코드를 저장합니다.
Lambda 환경 변수 구성
- Lambda 함수 페이지에서 구성 탭을 선택합니다.
- 왼쪽 사이드바에서 환경 변수를 클릭합니다.
- 수정을 클릭합니다.
다음 각 항목에 대해 환경 변수 추가를 클릭합니다.
필수 환경 변수:
키 값 설명 HARNESS_ACCOUNT_IDHarness 계정 ID Harness의 계정 식별자 HARNESS_API_KEYAPI 키 토큰 audit:read 권한이 있는 토큰 S3_BUCKETharness-io-logsS3 버킷 이름 S3_PREFIXharness/auditS3 객체의 접두사 STATE_KEYharness/audit/state.jsonS3의 상태 파일 경로 선택적 환경 변수:
키 기본값 설명 HARNESS_API_BASEhttps://app.harness.ioHarness API 기본 URL PAGE_SIZE50페이지당 이벤트 수 (최대 100개) START_MINUTES_BACK60초기 확인 기간(분) FILTER_MODULES없음 쉼표로 구분된 모듈 (예: CD,CI,CE)FILTER_ACTIONS없음 쉼표로 구분된 작업 (예: CREATE,UPDATE,DELETE)STATIC_FILTER없음 사전 정의된 필터: EXCLUDE_LOGIN_EVENTS또는EXCLUDE_SYSTEM_EVENTSMAX_RETRIES3비율 제한의 최대 재시도 횟수 저장을 클릭합니다.
Lambda 제한 시간 및 메모리 구성
- Lambda 함수 페이지에서 구성 탭을 선택합니다.
- 왼쪽 사이드바에서 일반 구성을 클릭합니다.
- 수정을 클릭합니다.
- 다음 구성 세부정보를 제공합니다.
- 메모리:
256 MB(권장) - 제한 시간:
5 min 0 sec(300초)
- 메모리:
- 저장을 클릭합니다.
EventBridge 일정 만들기
- Amazon EventBridge > 스케줄러 > 일정 만들기로 이동합니다.
- 다음 구성 세부정보를 제공합니다.
- 일정 이름:
harness-audit-hourly를 입력합니다. - 설명: 선택사항인 설명입니다.
- 일정 이름:
- 다음을 클릭합니다.
- 일정 패턴에서 반복 일정을 선택합니다.
- 요금 기반 일정을 선택합니다.
- 다음 구성 세부정보를 제공합니다.
- 요율 표현식:
1 hour를 입력합니다.
- 요율 표현식:
- 다음을 클릭합니다.
- 타겟에서 다음 구성 세부정보를 제공합니다.
- 타겟 API: AWS Lambda 호출을 선택합니다.
- Lambda 함수: 함수
harness-audit-to-s3를 선택합니다.
- 다음을 클릭합니다.
- 일정 구성을 검토합니다.
- 일정 만들기를 클릭합니다.
Google SecOps용 읽기 전용 IAM 사용자 만들기
이 IAM 사용자를 통해 Google SecOps가 S3 버킷에서 로그를 읽을 수 있습니다.
- AWS 콘솔 > IAM > 사용자 > 사용자 만들기로 이동합니다.
- 다음 구성 세부정보를 제공합니다.
- 사용자 이름:
chronicle-s3-reader를 입력합니다.
- 사용자 이름:
- 다음을 클릭합니다.
- 정책 직접 연결을 선택합니다.
- 정책 만들기를 클릭합니다.
- JSON 탭을 선택합니다.
다음 정책을 붙여넣습니다.
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:GetObject" ], "Resource": "arn:aws:s3:::harness-io-logs/harness/audit/*" }, { "Effect": "Allow", "Action": [ "s3:ListBucket" ], "Resource": "arn:aws:s3:::harness-io-logs", "Condition": { "StringLike": { "s3:prefix": "harness/audit/*" } } } ] }다음을 클릭합니다.
ChronicleHarnessS3ReadPolicy정책의 이름을 지정합니다.정책 만들기를 클릭합니다.
사용자 생성 탭으로 돌아가 정책 목록을 새로고침합니다.
ChronicleHarnessS3ReadPolicy을 검색하여 선택합니다.다음을 클릭합니다.
검토하고 사용자 만들기를 클릭합니다.
리더 사용자의 액세스 키 만들기
- IAM 사용자 페이지에서
chronicle-s3-reader사용자를 선택합니다. - 보안용 사용자 인증 정보 탭을 선택합니다.
- 액세스 키 만들기를 클릭합니다.
- 사용 사례로 서드 파티 서비스를 선택합니다.
- 다음을 클릭합니다.
- 선택사항: 설명 태그를 추가합니다.
- 액세스 키 만들기를 클릭합니다.
- CSV 파일 다운로드를 클릭하여 액세스 키 ID와 보안 비밀 액세스 키를 저장합니다.
- 완료를 클릭합니다.
Harness IO 로그를 수집하도록 Google SecOps에서 피드 구성
- SIEM 설정> 피드로 이동합니다.
- 새로 추가를 클릭합니다.
- 다음 페이지에서 단일 피드 구성을 클릭합니다.
- 피드 이름 필드에 피드 이름을 입력합니다(예:
Harness Audit Logs). - 소스 유형으로 Amazon S3 V2를 선택합니다.
- 로그 유형으로 Harness IO를 선택합니다.
- 다음을 클릭합니다.
다음 입력 매개변수의 값을 지정합니다.
- S3 URI: 접두사 경로를 사용하여 S3 버킷 URI를 입력합니다.
s3://harness-io-logs/harness/audit/ 소스 삭제 옵션: 환경설정에 따라 삭제 옵션을 선택합니다.
- 삭제 안함: 전송 후 파일을 삭제하지 않습니다 (처음에는 권장).
- 성공 시: 전송이 성공한 후 모든 파일과 빈 디렉터리를 삭제합니다.
최대 파일 기간: 지난 일수 동안 수정된 파일을 포함합니다. 기본값은 180일입니다.
액세스 키 ID:
chronicle-s3-reader사용자의 액세스 키 ID를 입력합니다.보안 비밀 액세스 키:
chronicle-s3-reader사용자의 보안 비밀 액세스 키를 입력합니다.애셋 네임스페이스: 애셋 네임스페이스입니다.
harness.audit를 입력합니다.수집 라벨: 이 피드의 이벤트에 적용할 선택적 라벨입니다.
- S3 URI: 접두사 경로를 사용하여 S3 버킷 URI를 입력합니다.
다음을 클릭합니다.
확정 화면에서 새 피드 구성을 검토한 다음 제출을 클릭합니다.
도움이 더 필요하신가요? 커뮤니티 회원 및 Google SecOps 전문가에게 문의하여 답변을 받으세요.