Raccogliere gli audit log di Harness IO
Questo documento spiega come importare i log di controllo di Harness IO in Google Security Operations utilizzando Amazon S3.
Prima di iniziare
Assicurati di soddisfare i seguenti prerequisiti:
- Un'istanza Google SecOps
- Accesso privilegiato a Harness con autorizzazioni per:
- Creare chiavi API
- Accedere ai log di controllo
- Visualizzare le impostazioni dell'account
- Accesso privilegiato ad AWS (S3, IAM, Lambda, EventBridge).
Raccogli le credenziali API di Harness
Crea la chiave API in Harness
- Accedi alla piattaforma Harness.
- Fai clic sul tuo profilo utente.
- Vai a Le mie chiavi API.
- Fai clic su + Chiave API.
- Fornisci i seguenti dettagli di configurazione:
- Nome: inserisci un nome descrittivo (ad esempio,
Google SecOps Integration). - Descrizione: descrizione facoltativa.
- Nome: inserisci un nome descrittivo (ad esempio,
- Fai clic su Salva.
- Fai clic su + Token per creare un nuovo token.
- Fornisci i seguenti dettagli di configurazione:
- Nome: inserisci
Chronicle Feed Token. - Imposta scadenza: seleziona un periodo di scadenza appropriato o Nessuna scadenza (per l'utilizzo in produzione).
- Nome: inserisci
- Fai clic su Genera token.
Copia e salva il valore del token in modo sicuro. Questo token verrà utilizzato come valore dell'intestazione
x-api-key.
Ottieni l'ID account Harness
- In Harness Platform, annota l'ID account dall'URL.
- URL di esempio:
https://app.harness.io/ng/account/YOUR_ACCOUNT_ID/... - La parte
YOUR_ACCOUNT_IDè l'identificatore dell'account.
- URL di esempio:
- In alternativa, vai a Impostazioni account > Panoramica per visualizzare l'identificatore account.
Copia e salva l'ID account da utilizzare nella funzione Lambda.
Configura il bucket AWS S3 e IAM per Google SecOps
- Crea un bucket Amazon S3 seguendo questa guida utente: Creazione di un bucket
- Salva il nome e la regione del bucket per riferimento futuro (ad esempio,
harness-io-logs). - Crea un utente seguendo questa guida utente: Creazione di un utente IAM.
- Seleziona l'utente creato.
- Seleziona la scheda Credenziali di sicurezza.
- Fai clic su Crea chiave di accesso nella sezione Chiavi di accesso.
- Seleziona Servizio di terze parti come Caso d'uso.
- Fai clic su Avanti.
- (Facoltativo) Aggiungi un tag di descrizione.
- Fai clic su Crea chiave di accesso.
- Fai clic su Scarica file CSV per salvare la chiave di accesso e la chiave di accesso segreta per riferimento futuro.
- Fai clic su Fine.
- Seleziona la scheda Autorizzazioni.
- Fai clic su Aggiungi autorizzazioni nella sezione Criteri per le autorizzazioni.
- Seleziona Aggiungi autorizzazioni.
- Seleziona Allega direttamente i criteri.
- Cerca il criterio AmazonS3FullAccess.
- Seleziona la policy.
- Fai clic su Avanti.
- Fai clic su Aggiungi autorizzazioni.
Configura il ruolo e il criterio IAM per i caricamenti S3 di Lambda
- Nella console AWS, vai a IAM > Policy > Crea policy > scheda JSON.
Copia e incolla il seguente criterio:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutHarnessObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::harness-io-logs/harness/audit/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::harness-io-logs/harness/audit/state.json" } ] }- Sostituisci
harness-io-logsse hai inserito un nome bucket diverso.
- Sostituisci
Fai clic su Avanti.
Assegna un nome alla policy
HarnessToS3Policye fai clic su Crea policy.Vai a IAM > Ruoli > Crea ruolo.
Seleziona Servizio AWS come tipo di entità attendibile.
Seleziona Lambda come caso d'uso.
Fai clic su Avanti.
Cerca e seleziona i seguenti criteri:
HarnessToS3Policy(il criterio che hai appena creato)AWSLambdaBasicExecutionRole(per CloudWatch Logs)
Fai clic su Avanti.
Assegna al ruolo il nome
HarnessAuditLambdaRolee fai clic su Crea ruolo.
Crea la funzione Lambda
- Nella console AWS, vai a Lambda > Funzioni > Crea funzione.
- Fai clic su Crea da zero.
Fornisci i seguenti dettagli di configurazione:
Impostazione Valore Nome harness-audit-to-s3Tempo di esecuzione Python 3.13 Architettura x86_64 Ruolo di esecuzione HarnessAuditLambdaRoleFai clic su Crea funzione.
Dopo aver creato la funzione, apri la scheda Codice.
Elimina il codice stub predefinito e inserisci il seguente codice della funzione Lambda:
Codice della funzione Lambda (
harness_audit_to_s3.py)#!/usr/bin/env python3 """ Harness.io Audit Logs to S3 Lambda Fetches audit logs from Harness API and writes to S3 for Chronicle ingestion. """ import os import json import time import uuid import logging import urllib.parse from datetime import datetime, timedelta, timezone from urllib.request import Request, urlopen from urllib.error import HTTPError, URLError import boto3 # Configuration from Environment Variables API_BASE = os.environ.get("HARNESS_API_BASE", "https://app.harness.io").rstrip("/") ACCOUNT_ID = os.environ["HARNESS_ACCOUNT_ID"] API_KEY = os.environ["HARNESS_API_KEY"] BUCKET = os.environ["S3_BUCKET"] PREFIX = os.environ.get("S3_PREFIX", "harness/audit").strip("/") STATE_KEY = os.environ.get("STATE_KEY", "harness/audit/state.json") PAGE_SIZE = min(int(os.environ.get("PAGE_SIZE", "50")), 100) START_MINUTES_BACK = int(os.environ.get("START_MINUTES_BACK", "60")) # Optional filters (NEW) FILTER_MODULES = os.environ.get("FILTER_MODULES", "").split(",") if os.environ.get("FILTER_MODULES") else None FILTER_ACTIONS = os.environ.get("FILTER_ACTIONS", "").split(",") if os.environ.get("FILTER_ACTIONS") else None STATIC_FILTER = os.environ.get("STATIC_FILTER") # e.g., "EXCLUDE_LOGIN_EVENTS" MAX_RETRIES = int(os.environ.get("MAX_RETRIES", "3")) # AWS clients s3 = boto3.client("s3") # HTTP headers for Harness API HDRS = { "x-api-key": API_KEY, "Content-Type": "application/json", "Accept": "application/json", } # Logging configuration logger = logging.getLogger() logger.setLevel(logging.INFO) # ============================================ # State Management Functions # ============================================ def _read_state(): """Read checkpoint state from S3.""" try: obj = s3.get_object(Bucket=BUCKET, Key=STATE_KEY) state = json.loads(obj["Body"].read()) since_ms = state.get("since") page_token = state.get("pageToken") logger.info(f"State loaded: since={since_ms}, pageToken={page_token}") return since_ms, page_token except s3.exceptions.NoSuchKey: logger.info("No state file found, starting fresh collection") start_time = datetime.now(timezone.utc) - timedelta(minutes=START_MINUTES_BACK) since_ms = int(start_time.timestamp() * 1000) logger.info(f"Initial since timestamp: {since_ms} ({start_time.isoformat()})") return since_ms, None except Exception as e: logger.error(f"Error reading state: {e}") raise def _write_state(since_ms: int, page_token: str = None): """Write checkpoint state to S3.""" state = { "since": since_ms, "pageToken": page_token, "lastRun": int(time.time() * 1000), "lastRunISO": datetime.now(timezone.utc).isoformat() } try: s3.put_object( Bucket=BUCKET, Key=STATE_KEY, Body=json.dumps(state, indent=2).encode(), ContentType="application/json" ) logger.info(f"State saved: since={since_ms}, pageToken={page_token}") except Exception as e: logger.error(f"Error writing state: {e}") raise # ============================================ # Harness API Functions # ============================================ def _fetch_harness_audits(since_ms: int, page_token: str = None, retry_count: int = 0): """ Fetch audit logs from Harness API with retry logic. API Endpoint: POST /audit/api/audits/listV2 Documentation: https://apidocs.harness.io/audit/getauditeventlistv2 """ try: # Build URL with query parameters url = ( f"{API_BASE}/audit/api/audits/listV2" f"?accountIdentifier={urllib.parse.quote(ACCOUNT_ID)}" f"&pageSize={PAGE_SIZE}" ) if page_token: url += f"&pageToken={urllib.parse.quote(page_token)}" logger.info(f"Fetching from: {url[:100]}...") # Build request body with time filter and optional filters body_data = { "startTime": since_ms, "endTime": int(time.time() * 1000), "filterType": "Audit" } if FILTER_MODULES: body_data["modules"] = [m.strip() for m in FILTER_MODULES if m.strip()] logger.info(f"Applying module filter: {body_data['modules']}") if FILTER_ACTIONS: body_data["actions"] = [a.strip() for a in FILTER_ACTIONS if a.strip()] logger.info(f"Applying action filter: {body_data['actions']}") if STATIC_FILTER: body_data["staticFilter"] = STATIC_FILTER logger.info(f"Applying static filter: {STATIC_FILTER}") logger.debug(f"Request body: {json.dumps(body_data)}") # Make POST request req = Request( url, data=json.dumps(body_data).encode('utf-8'), headers=HDRS, method="POST" ) resp = urlopen(req, timeout=30) resp_text = resp.read().decode('utf-8') resp_data = json.loads(resp_text) if "status" not in resp_data: logger.warning(f"Response missing 'status' field: {resp_text[:200]}") # Check response status if resp_data.get("status") != "SUCCESS": error_msg = resp_data.get("message", "Unknown error") raise Exception(f"API returned status: {resp_data.get('status')} - {error_msg}") # Extract data from response structure data_obj = resp_data.get("data", {}) if not data_obj: logger.warning("Response 'data' object is empty or missing") events = data_obj.get("content", []) has_next = data_obj.get("hasNext", False) next_token = data_obj.get("pageToken") logger.info(f"API response: {len(events)} events, hasNext={has_next}, pageToken={next_token}") if not events and data_obj: logger.info(f"Empty events but data present. Data keys: {list(data_obj.keys())}") return { "events": events, "hasNext": has_next, "pageToken": next_token } except HTTPError as e: error_body = e.read().decode() if hasattr(e, 'read') else '' if e.code == 401: logger.error("Authentication failed: Invalid API key") raise Exception("Invalid Harness API key. Check HARNESS_API_KEY environment variable.") elif e.code == 403: logger.error("Authorization failed: Insufficient permissions") raise Exception("API key lacks required audit:read permissions") elif e.code == 429: retry_after = int(e.headers.get("Retry-After", "60")) logger.warning(f"Rate limit exceeded. Retry after {retry_after} seconds (attempt {retry_count + 1}/{MAX_RETRIES})") if retry_count < MAX_RETRIES: logger.info(f"Waiting {retry_after} seconds before retry...") time.sleep(retry_after) logger.info(f"Retrying request (attempt {retry_count + 2}/{MAX_RETRIES})") return _fetch_harness_audits(since_ms, page_token, retry_count + 1) else: raise Exception(f"Max retries ({MAX_RETRIES}) exceeded for rate limiting") elif e.code == 400: logger.error(f"Bad request: {error_body}") raise Exception(f"Invalid request parameters: {error_body}") else: logger.error(f"HTTP {e.code}: {e.reason} - {error_body}") raise Exception(f"Harness API error {e.code}: {e.reason}") except URLError as e: logger.error(f"Network error: {e.reason}") raise Exception(f"Failed to connect to Harness API: {e.reason}") except json.JSONDecodeError as e: logger.error(f"Invalid JSON response: {e}") logger.error(f"Response text (first 500 chars): {resp_text[:500] if 'resp_text' in locals() else 'N/A'}") raise Exception("Harness API returned invalid JSON") except Exception as e: logger.error(f"Unexpected error in _fetch_harness_audits: {e}", exc_info=True) raise # ============================================ # S3 Upload Functions # ============================================ def _upload_to_s3(events: list) -> str: """ Upload audit events to S3 in JSONL format. Each line is a complete JSON object (one event per line). """ if not events: logger.info("No events to upload") return None try: # Create JSONL content (one JSON object per line) jsonl_lines = [json.dumps(event) for event in events] jsonl_content = "\n".join(jsonl_lines) # Generate S3 key with timestamp and UUID timestamp = datetime.now(timezone.utc) key = ( f"{PREFIX}/" f"{timestamp:%Y/%m/%d}/" f"harness-audit-{timestamp:%Y%m%d-%H%M%S}-{uuid.uuid4()}.jsonl" ) # Upload to S3 s3.put_object( Bucket=BUCKET, Key=key, Body=jsonl_content.encode('utf-8'), ContentType="application/x-ndjson", Metadata={ "event-count": str(len(events)), "source": "harness-audit-lambda", "collection-time": timestamp.isoformat() } ) logger.info(f"Uploaded {len(events)} events to s3://{BUCKET}/{key}") return key except Exception as e: logger.error(f"Error uploading to S3: {e}", exc_info=True) raise # ============================================ # Main Orchestration Function # ============================================ def fetch_and_store(): """ Main function to fetch audit logs from Harness and store in S3. Handles pagination and state management. """ logger.info("=== Harness Audit Collection Started ===") logger.info(f"Configuration: API_BASE={API_BASE}, ACCOUNT_ID={ACCOUNT_ID[:8]}..., PAGE_SIZE={PAGE_SIZE}") if FILTER_MODULES: logger.info(f"Module filter enabled: {FILTER_MODULES}") if FILTER_ACTIONS: logger.info(f"Action filter enabled: {FILTER_ACTIONS}") if STATIC_FILTER: logger.info(f"Static filter enabled: {STATIC_FILTER}") try: # Step 1: Read checkpoint state since_ms, page_token = _read_state() if page_token: logger.info(f"Resuming pagination from saved pageToken") else: since_dt = datetime.fromtimestamp(since_ms / 1000, tz=timezone.utc) logger.info(f"Starting new collection from: {since_dt.isoformat()}") # Step 2: Collect all events with pagination all_events = [] current_page_token = page_token page_count = 0 max_pages = 100 # Safety limit has_next = True while has_next and page_count < max_pages: page_count += 1 logger.info(f"--- Fetching page {page_count} ---") # Fetch one page of results result = _fetch_harness_audits(since_ms, current_page_token) # Extract events events = result.get("events", []) all_events.extend(events) logger.info(f"Page {page_count}: {len(events)} events (total: {len(all_events)})") # Check pagination status has_next = result.get("hasNext", False) current_page_token = result.get("pageToken") if not has_next: logger.info("Pagination complete (hasNext=False)") break if not current_page_token: logger.warning("hasNext=True but no pageToken, stopping pagination") break # Small delay between pages to avoid rate limiting time.sleep(0.5) if page_count >= max_pages: logger.warning(f"Reached max pages limit ({max_pages}), stopping") # Step 3: Upload collected events to S3 if all_events: s3_key = _upload_to_s3(all_events) logger.info(f"Successfully uploaded {len(all_events)} total events") else: logger.info("No new events to upload") s3_key = None # Step 4: Update checkpoint state if not has_next: # Pagination complete - update since to current time for next run new_since = int(time.time() * 1000) _write_state(new_since, None) logger.info(f"Pagination complete, state updated with new since={new_since}") else: # Pagination incomplete - save pageToken for continuation _write_state(since_ms, current_page_token) logger.info(f"Pagination incomplete, saved pageToken for next run") # Step 5: Return result result = { "statusCode": 200, "message": "Success", "eventsCollected": len(all_events), "pagesProcessed": page_count, "paginationComplete": not has_next, "s3Key": s3_key, "filters": { "modules": FILTER_MODULES, "actions": FILTER_ACTIONS, "staticFilter": STATIC_FILTER } } logger.info(f"Collection completed: {json.dumps(result)}") return result except Exception as e: logger.error(f"Collection failed: {e}", exc_info=True) result = { "statusCode": 500, "message": "Error", "error": str(e), "errorType": type(e).__name__ } return result finally: logger.info("=== Harness Audit Collection Finished ===") # ============================================ # Lambda Handler # ============================================ def lambda_handler(event, context): """AWS Lambda handler function.""" return fetch_and_store() # ============================================ # Local Testing # ============================================ if __name__ == "__main__": # For local testing result = lambda_handler(None, None) print(json.dumps(result, indent=2))
Fai clic su Esegui il deployment per salvare il codice della funzione.
Configura le variabili di ambiente Lambda
- Nella pagina della funzione Lambda, seleziona la scheda Configurazione.
- Fai clic su Variabili di ambiente nella barra laterale sinistra.
- Fai clic su Modifica.
Fai clic su Aggiungi variabile di ambiente per ciascuno dei seguenti elementi:
Variabili di ambiente obbligatorie:
Chiave Valore Descrizione HARNESS_ACCOUNT_IDIl tuo ID account Harness Identificatore account di Harness HARNESS_API_KEYIl token della chiave API Token con autorizzazioni audit:read S3_BUCKETharness-io-logsNome del bucket S3 S3_PREFIXharness/auditPrefisso per gli oggetti S3 STATE_KEYharness/audit/state.jsonPercorso del file di stato in S3 Variabili di ambiente facoltative:
Chiave Valore predefinito Descrizione HARNESS_API_BASEhttps://app.harness.ioURL di base dell'API Harness PAGE_SIZE50Eventi per pagina (max 100) START_MINUTES_BACK60Periodo di ricerca iniziale in minuti FILTER_MODULESNessuno Moduli separati da virgole (ad es. CD,CI,CEFILTER_ACTIONSNessuno Azioni separate da virgole (ad es. CREATE,UPDATE,DELETESTATIC_FILTERNessuno Filtro predefinito: EXCLUDE_LOGIN_EVENTSoEXCLUDE_SYSTEM_EVENTSMAX_RETRIES3Numero massimo di tentativi per limitazione di frequenza Fai clic su Salva.
Configurare il timeout e la memoria di Lambda
- Nella pagina della funzione Lambda, seleziona la scheda Configurazione.
- Fai clic su Configurazione generale nella barra laterale a sinistra.
- Fai clic su Modifica.
- Fornisci i seguenti dettagli di configurazione:
- Memoria:
256 MB(consigliato) - Timeout:
5 min 0 sec(300 secondi)
- Memoria:
- Fai clic su Salva.
Creare una pianificazione EventBridge
- Vai a Amazon EventBridge > Scheduler > Crea pianificazione.
- Fornisci i seguenti dettagli di configurazione:
- Nome pianificazione: inserisci
harness-audit-hourly. - Descrizione: descrizione facoltativa.
- Nome pianificazione: inserisci
- Fai clic su Avanti.
- Nella sezione Modello di programmazione, seleziona Programmazione ricorrente.
- Seleziona Programmazione basata sulla tariffa.
- Fornisci i seguenti dettagli di configurazione:
- Espressione del tasso: inserisci
1 hour.
- Espressione del tasso: inserisci
- Fai clic su Avanti.
- Nella sezione Target, fornisci i seguenti dettagli di configurazione:
- API di destinazione: seleziona AWS Lambda Invoke.
- Funzione Lambda: seleziona la funzione
harness-audit-to-s3.
- Fai clic su Avanti.
- Rivedi la configurazione della pianificazione.
- Fai clic su Crea pianificazione.
Creare un utente IAM di sola lettura per Google SecOps
Questo utente IAM consente a Google SecOps di leggere i log dal bucket S3.
- Vai a AWS Console > IAM > Users > Create user.
- Fornisci i seguenti dettagli di configurazione:
- Nome utente: inserisci
chronicle-s3-reader.
- Nome utente: inserisci
- Fai clic su Avanti.
- Seleziona Allega direttamente i criteri.
- Fai clic su Crea policy.
- Seleziona la scheda JSON.
Incolla il seguente criterio:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:GetObject" ], "Resource": "arn:aws:s3:::harness-io-logs/harness/audit/*" }, { "Effect": "Allow", "Action": [ "s3:ListBucket" ], "Resource": "arn:aws:s3:::harness-io-logs", "Condition": { "StringLike": { "s3:prefix": "harness/audit/*" } } } ] }Fai clic su Avanti.
Assegna al criterio il nome
ChronicleHarnessS3ReadPolicy.Fai clic su Crea policy.
Torna alla scheda di creazione dell'utente e aggiorna l'elenco dei criteri.
Cerca e seleziona
ChronicleHarnessS3ReadPolicy.Fai clic su Avanti.
Rivedi e fai clic su Crea utente.
Creare chiavi di accesso per l'utente lettore
- Nella pagina Utenti IAM, seleziona l'utente
chronicle-s3-reader. - Seleziona la scheda Credenziali di sicurezza.
- Fai clic su Crea chiave di accesso.
- Seleziona Servizio di terze parti come caso d'uso.
- Fai clic su Avanti.
- (Facoltativo) Aggiungi il tag della descrizione.
- Fai clic su Crea chiave di accesso.
- Fai clic su Scarica file CSV per salvare l'ID chiave di accesso e la chiave di accesso segreta.
- Fai clic su Fine.
Configura un feed in Google SecOps per importare i log di Harness IO
- Vai a Impostazioni SIEM > Feed.
- Fai clic su Aggiungi nuovo.
- Nella pagina successiva, fai clic su Configura un singolo feed.
- Nel campo Nome feed, inserisci un nome per il feed (ad esempio,
Harness Audit Logs). - Seleziona Amazon S3 V2 come Tipo di origine.
- Seleziona Harness IO come Tipo di log.
- Fai clic su Avanti.
Specifica i valori per i seguenti parametri di input:
- URI S3: inserisci l'URI del bucket S3 con il percorso del prefisso:
s3://harness-io-logs/harness/audit/ Opzione di eliminazione dell'origine: seleziona l'opzione di eliminazione in base alle tue preferenze:
- Mai: non elimina mai i file dopo i trasferimenti (opzione consigliata inizialmente).
- In caso di esito positivo: elimina tutti i file e le directory vuote dopo il trasferimento riuscito.
Età massima del file: includi i file modificati nell'ultimo numero di giorni. Il valore predefinito è 180 giorni.
ID chiave di accesso: inserisci l'ID chiave di accesso dell'utente
chronicle-s3-reader.Chiave di accesso segreta: inserisci la chiave di accesso segreta dell'utente
chronicle-s3-reader.Spazio dei nomi dell'asset: lo spazio dei nomi dell'asset. Inserisci
harness.audit.Etichette di importazione: etichette facoltative da applicare agli eventi di questo feed.
- URI S3: inserisci l'URI del bucket S3 con il percorso del prefisso:
Fai clic su Avanti.
Controlla la nuova configurazione del feed nella schermata Finalizza e poi fai clic su Invia.
Hai bisogno di ulteriore assistenza? Ricevi risposte dai membri della community e dai professionisti di Google SecOps.