Coletar registros de auditoria do Harness IO
Este documento explica como ingerir registros de auditoria do Harness IO no Google Security Operations usando o Amazon S3.
Antes de começar
Verifique se você tem os pré-requisitos a seguir:
- Uma instância do Google SecOps
- Acesso privilegiado ao Harness com permissões para:
- Criar chaves de API
- ao máximo.
- Ver configurações da conta
- Acesso privilegiado à AWS (S3, IAM, Lambda, EventBridge).
Coletar credenciais da API Harness
Criar chave de API no Harness
- Faça login na plataforma Harness.
- Clique no seu Perfil de usuário.
- Acesse Minhas chaves de API.
- Clique em + Chave de API.
- Informe os seguintes detalhes de configuração:
- Nome: insira um nome descritivo, por exemplo,
Google SecOps Integration. - Descrição: descrição opcional.
- Nome: insira um nome descritivo, por exemplo,
- Clique em Salvar.
- Clique em + Token para criar um novo token.
- Informe os seguintes detalhes de configuração:
- Nome: insira
Chronicle Feed Token. - Definir validade: selecione um tempo de validade adequado ou Sem validade (para uso em produção).
- Nome: insira
- Clique em Gerar token.
Copie e salve o valor do token com segurança. Esse token será usado como o valor do cabeçalho
x-api-key.
Receber o ID da conta do Harness
- Na Plataforma Harness, anote o ID da conta do URL.
- URL de exemplo:
https://app.harness.io/ng/account/YOUR_ACCOUNT_ID/... - A parte
YOUR_ACCOUNT_IDé o identificador da conta.
- URL de exemplo:
- Ou acesse Configurações da conta > Visão geral para conferir seu Identificador da conta.
Copie e salve o ID da conta para usar na função Lambda.
Configurar o bucket do AWS S3 e o IAM para o Google SecOps
- Crie um bucket do Amazon S3 seguindo este guia do usuário: Como criar um bucket
- Salve o Nome e a Região do bucket para referência futura (por exemplo,
harness-io-logs). - Crie um usuário seguindo este guia: Como criar um usuário do IAM.
- Selecione o usuário criado.
- Selecione a guia Credenciais de segurança.
- Clique em Criar chave de acesso na seção Chaves de acesso.
- Selecione Serviço de terceiros como Caso de uso.
- Clique em Próxima.
- Opcional: adicione uma tag de descrição.
- Clique em Criar chave de acesso.
- Clique em Fazer o download do arquivo CSV para salvar a chave de acesso e a chave de acesso secreta para referência futura.
- Clique em Concluído.
- Selecione a guia Permissões.
- Clique em Adicionar permissões na seção Políticas de permissões.
- Selecione Adicionar permissões.
- Selecione Anexar políticas diretamente.
- Pesquise a política AmazonS3FullAccess.
- Selecione a política.
- Clique em Próxima.
- Clique em Adicionar permissões
Configurar a política e o papel do IAM para uploads do Lambda S3
- No console da AWS, acesse IAM > Políticas > Criar política > guia JSON.
Copie e cole a seguinte política:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutHarnessObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::harness-io-logs/harness/audit/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::harness-io-logs/harness/audit/state.json" } ] }- Substitua
harness-io-logsse você tiver inserido um nome de bucket diferente.
- Substitua
Clique em Próxima.
Dê o nome
HarnessToS3Policyà política e clique em Criar política.Acesse IAM > Papéis > Criar papel.
Selecione Serviço da AWS como o tipo de entidade confiável.
Selecione Lambda como o caso de uso.
Clique em Próxima.
Pesquise e selecione as seguintes políticas:
HarnessToS3Policy(a política que você acabou de criar)AWSLambdaBasicExecutionRole(para registros do CloudWatch)
Clique em Próxima.
Nomeie a função como
HarnessAuditLambdaRolee clique em Criar função.
Criar a função Lambda
- No console da AWS, acesse Lambda > Functions > Create function.
- Clique em Criar do zero.
Informe os seguintes detalhes de configuração:
Configuração Valor Nome harness-audit-to-s3Ambiente de execução Python 3.13 Arquitetura x86_64 Função de execução HarnessAuditLambdaRoleClique em Criar função.
Depois que a função for criada, abra a guia Código.
Exclua o código stub padrão e insira o seguinte código de função do Lambda:
Código da função Lambda (
harness_audit_to_s3.py)#!/usr/bin/env python3 """ Harness.io Audit Logs to S3 Lambda Fetches audit logs from Harness API and writes to S3 for Chronicle ingestion. """ import os import json import time import uuid import logging import urllib.parse from datetime import datetime, timedelta, timezone from urllib.request import Request, urlopen from urllib.error import HTTPError, URLError import boto3 # Configuration from Environment Variables API_BASE = os.environ.get("HARNESS_API_BASE", "https://app.harness.io").rstrip("/") ACCOUNT_ID = os.environ["HARNESS_ACCOUNT_ID"] API_KEY = os.environ["HARNESS_API_KEY"] BUCKET = os.environ["S3_BUCKET"] PREFIX = os.environ.get("S3_PREFIX", "harness/audit").strip("/") STATE_KEY = os.environ.get("STATE_KEY", "harness/audit/state.json") PAGE_SIZE = min(int(os.environ.get("PAGE_SIZE", "50")), 100) START_MINUTES_BACK = int(os.environ.get("START_MINUTES_BACK", "60")) # Optional filters (NEW) FILTER_MODULES = os.environ.get("FILTER_MODULES", "").split(",") if os.environ.get("FILTER_MODULES") else None FILTER_ACTIONS = os.environ.get("FILTER_ACTIONS", "").split(",") if os.environ.get("FILTER_ACTIONS") else None STATIC_FILTER = os.environ.get("STATIC_FILTER") # e.g., "EXCLUDE_LOGIN_EVENTS" MAX_RETRIES = int(os.environ.get("MAX_RETRIES", "3")) # AWS clients s3 = boto3.client("s3") # HTTP headers for Harness API HDRS = { "x-api-key": API_KEY, "Content-Type": "application/json", "Accept": "application/json", } # Logging configuration logger = logging.getLogger() logger.setLevel(logging.INFO) # ============================================ # State Management Functions # ============================================ def _read_state(): """Read checkpoint state from S3.""" try: obj = s3.get_object(Bucket=BUCKET, Key=STATE_KEY) state = json.loads(obj["Body"].read()) since_ms = state.get("since") page_token = state.get("pageToken") logger.info(f"State loaded: since={since_ms}, pageToken={page_token}") return since_ms, page_token except s3.exceptions.NoSuchKey: logger.info("No state file found, starting fresh collection") start_time = datetime.now(timezone.utc) - timedelta(minutes=START_MINUTES_BACK) since_ms = int(start_time.timestamp() * 1000) logger.info(f"Initial since timestamp: {since_ms} ({start_time.isoformat()})") return since_ms, None except Exception as e: logger.error(f"Error reading state: {e}") raise def _write_state(since_ms: int, page_token: str = None): """Write checkpoint state to S3.""" state = { "since": since_ms, "pageToken": page_token, "lastRun": int(time.time() * 1000), "lastRunISO": datetime.now(timezone.utc).isoformat() } try: s3.put_object( Bucket=BUCKET, Key=STATE_KEY, Body=json.dumps(state, indent=2).encode(), ContentType="application/json" ) logger.info(f"State saved: since={since_ms}, pageToken={page_token}") except Exception as e: logger.error(f"Error writing state: {e}") raise # ============================================ # Harness API Functions # ============================================ def _fetch_harness_audits(since_ms: int, page_token: str = None, retry_count: int = 0): """ Fetch audit logs from Harness API with retry logic. API Endpoint: POST /audit/api/audits/listV2 Documentation: https://apidocs.harness.io/audit/getauditeventlistv2 """ try: # Build URL with query parameters url = ( f"{API_BASE}/audit/api/audits/listV2" f"?accountIdentifier={urllib.parse.quote(ACCOUNT_ID)}" f"&pageSize={PAGE_SIZE}" ) if page_token: url += f"&pageToken={urllib.parse.quote(page_token)}" logger.info(f"Fetching from: {url[:100]}...") # Build request body with time filter and optional filters body_data = { "startTime": since_ms, "endTime": int(time.time() * 1000), "filterType": "Audit" } if FILTER_MODULES: body_data["modules"] = [m.strip() for m in FILTER_MODULES if m.strip()] logger.info(f"Applying module filter: {body_data['modules']}") if FILTER_ACTIONS: body_data["actions"] = [a.strip() for a in FILTER_ACTIONS if a.strip()] logger.info(f"Applying action filter: {body_data['actions']}") if STATIC_FILTER: body_data["staticFilter"] = STATIC_FILTER logger.info(f"Applying static filter: {STATIC_FILTER}") logger.debug(f"Request body: {json.dumps(body_data)}") # Make POST request req = Request( url, data=json.dumps(body_data).encode('utf-8'), headers=HDRS, method="POST" ) resp = urlopen(req, timeout=30) resp_text = resp.read().decode('utf-8') resp_data = json.loads(resp_text) if "status" not in resp_data: logger.warning(f"Response missing 'status' field: {resp_text[:200]}") # Check response status if resp_data.get("status") != "SUCCESS": error_msg = resp_data.get("message", "Unknown error") raise Exception(f"API returned status: {resp_data.get('status')} - {error_msg}") # Extract data from response structure data_obj = resp_data.get("data", {}) if not data_obj: logger.warning("Response 'data' object is empty or missing") events = data_obj.get("content", []) has_next = data_obj.get("hasNext", False) next_token = data_obj.get("pageToken") logger.info(f"API response: {len(events)} events, hasNext={has_next}, pageToken={next_token}") if not events and data_obj: logger.info(f"Empty events but data present. Data keys: {list(data_obj.keys())}") return { "events": events, "hasNext": has_next, "pageToken": next_token } except HTTPError as e: error_body = e.read().decode() if hasattr(e, 'read') else '' if e.code == 401: logger.error("Authentication failed: Invalid API key") raise Exception("Invalid Harness API key. Check HARNESS_API_KEY environment variable.") elif e.code == 403: logger.error("Authorization failed: Insufficient permissions") raise Exception("API key lacks required audit:read permissions") elif e.code == 429: retry_after = int(e.headers.get("Retry-After", "60")) logger.warning(f"Rate limit exceeded. Retry after {retry_after} seconds (attempt {retry_count + 1}/{MAX_RETRIES})") if retry_count < MAX_RETRIES: logger.info(f"Waiting {retry_after} seconds before retry...") time.sleep(retry_after) logger.info(f"Retrying request (attempt {retry_count + 2}/{MAX_RETRIES})") return _fetch_harness_audits(since_ms, page_token, retry_count + 1) else: raise Exception(f"Max retries ({MAX_RETRIES}) exceeded for rate limiting") elif e.code == 400: logger.error(f"Bad request: {error_body}") raise Exception(f"Invalid request parameters: {error_body}") else: logger.error(f"HTTP {e.code}: {e.reason} - {error_body}") raise Exception(f"Harness API error {e.code}: {e.reason}") except URLError as e: logger.error(f"Network error: {e.reason}") raise Exception(f"Failed to connect to Harness API: {e.reason}") except json.JSONDecodeError as e: logger.error(f"Invalid JSON response: {e}") logger.error(f"Response text (first 500 chars): {resp_text[:500] if 'resp_text' in locals() else 'N/A'}") raise Exception("Harness API returned invalid JSON") except Exception as e: logger.error(f"Unexpected error in _fetch_harness_audits: {e}", exc_info=True) raise # ============================================ # S3 Upload Functions # ============================================ def _upload_to_s3(events: list) -> str: """ Upload audit events to S3 in JSONL format. Each line is a complete JSON object (one event per line). """ if not events: logger.info("No events to upload") return None try: # Create JSONL content (one JSON object per line) jsonl_lines = [json.dumps(event) for event in events] jsonl_content = "\n".join(jsonl_lines) # Generate S3 key with timestamp and UUID timestamp = datetime.now(timezone.utc) key = ( f"{PREFIX}/" f"{timestamp:%Y/%m/%d}/" f"harness-audit-{timestamp:%Y%m%d-%H%M%S}-{uuid.uuid4()}.jsonl" ) # Upload to S3 s3.put_object( Bucket=BUCKET, Key=key, Body=jsonl_content.encode('utf-8'), ContentType="application/x-ndjson", Metadata={ "event-count": str(len(events)), "source": "harness-audit-lambda", "collection-time": timestamp.isoformat() } ) logger.info(f"Uploaded {len(events)} events to s3://{BUCKET}/{key}") return key except Exception as e: logger.error(f"Error uploading to S3: {e}", exc_info=True) raise # ============================================ # Main Orchestration Function # ============================================ def fetch_and_store(): """ Main function to fetch audit logs from Harness and store in S3. Handles pagination and state management. """ logger.info("=== Harness Audit Collection Started ===") logger.info(f"Configuration: API_BASE={API_BASE}, ACCOUNT_ID={ACCOUNT_ID[:8]}..., PAGE_SIZE={PAGE_SIZE}") if FILTER_MODULES: logger.info(f"Module filter enabled: {FILTER_MODULES}") if FILTER_ACTIONS: logger.info(f"Action filter enabled: {FILTER_ACTIONS}") if STATIC_FILTER: logger.info(f"Static filter enabled: {STATIC_FILTER}") try: # Step 1: Read checkpoint state since_ms, page_token = _read_state() if page_token: logger.info(f"Resuming pagination from saved pageToken") else: since_dt = datetime.fromtimestamp(since_ms / 1000, tz=timezone.utc) logger.info(f"Starting new collection from: {since_dt.isoformat()}") # Step 2: Collect all events with pagination all_events = [] current_page_token = page_token page_count = 0 max_pages = 100 # Safety limit has_next = True while has_next and page_count < max_pages: page_count += 1 logger.info(f"--- Fetching page {page_count} ---") # Fetch one page of results result = _fetch_harness_audits(since_ms, current_page_token) # Extract events events = result.get("events", []) all_events.extend(events) logger.info(f"Page {page_count}: {len(events)} events (total: {len(all_events)})") # Check pagination status has_next = result.get("hasNext", False) current_page_token = result.get("pageToken") if not has_next: logger.info("Pagination complete (hasNext=False)") break if not current_page_token: logger.warning("hasNext=True but no pageToken, stopping pagination") break # Small delay between pages to avoid rate limiting time.sleep(0.5) if page_count >= max_pages: logger.warning(f"Reached max pages limit ({max_pages}), stopping") # Step 3: Upload collected events to S3 if all_events: s3_key = _upload_to_s3(all_events) logger.info(f"Successfully uploaded {len(all_events)} total events") else: logger.info("No new events to upload") s3_key = None # Step 4: Update checkpoint state if not has_next: # Pagination complete - update since to current time for next run new_since = int(time.time() * 1000) _write_state(new_since, None) logger.info(f"Pagination complete, state updated with new since={new_since}") else: # Pagination incomplete - save pageToken for continuation _write_state(since_ms, current_page_token) logger.info(f"Pagination incomplete, saved pageToken for next run") # Step 5: Return result result = { "statusCode": 200, "message": "Success", "eventsCollected": len(all_events), "pagesProcessed": page_count, "paginationComplete": not has_next, "s3Key": s3_key, "filters": { "modules": FILTER_MODULES, "actions": FILTER_ACTIONS, "staticFilter": STATIC_FILTER } } logger.info(f"Collection completed: {json.dumps(result)}") return result except Exception as e: logger.error(f"Collection failed: {e}", exc_info=True) result = { "statusCode": 500, "message": "Error", "error": str(e), "errorType": type(e).__name__ } return result finally: logger.info("=== Harness Audit Collection Finished ===") # ============================================ # Lambda Handler # ============================================ def lambda_handler(event, context): """AWS Lambda handler function.""" return fetch_and_store() # ============================================ # Local Testing # ============================================ if __name__ == "__main__": # For local testing result = lambda_handler(None, None) print(json.dumps(result, indent=2))
Clique em Implantar para salvar o código da função.
Configurar variáveis de ambiente do Lambda
- Na página da função do Lambda, selecione a guia Configuração.
- Clique em Variáveis de ambiente na barra lateral à esquerda.
- Clique em Editar.
Clique em Adicionar variável de ambiente para cada uma das seguintes opções:
Variáveis de ambiente obrigatórias:
Chave Valor Descrição HARNESS_ACCOUNT_IDSeu ID da conta do Harness Identificador da conta da Harness. HARNESS_API_KEYSeu token de chave de API Token com permissões audit:read S3_BUCKETharness-io-logsNome do bucket do S3 S3_PREFIXharness/auditPrefixo para objetos do S3 STATE_KEYharness/audit/state.jsonCaminho do arquivo de estado no S3 Variáveis de ambiente opcionais:
Chave Valor padrão Descrição HARNESS_API_BASEhttps://app.harness.ioURL base da API do Harness PAGE_SIZE50Eventos por página (máximo de 100) START_MINUTES_BACK60Período inicial de lookback em minutos FILTER_MODULESNenhum Módulos separados por vírgulas (por exemplo, CD,CI,CE)FILTER_ACTIONSNenhum Ações separadas por vírgula (por exemplo, CREATE,UPDATE,DELETE)STATIC_FILTERNenhum Filtro predefinido: EXCLUDE_LOGIN_EVENTSouEXCLUDE_SYSTEM_EVENTSMAX_RETRIES3Número máximo de tentativas de repetição para limitação de taxa Clique em Salvar.
Configurar o tempo limite e a memória do Lambda
- Na página da função do Lambda, selecione a guia Configuração.
- Clique em Configuração geral na barra lateral esquerda.
- Clique em Editar.
- Informe os seguintes detalhes de configuração:
- Memória:
256 MB(recomendado) - Tempo limite:
5 min 0 sec(300 segundos)
- Memória:
- Clique em Salvar.
Criar uma programação do EventBridge
- Acesse Amazon EventBridge > Scheduler > Criar programação.
- Informe os seguintes detalhes de configuração:
- Nome da programação: insira
harness-audit-hourly. - Descrição: descrição opcional.
- Nome da programação: insira
- Clique em Próxima.
- Em Padrão de programação, selecione Programação recorrente.
- Selecione Programação baseada em taxa.
- Informe os seguintes detalhes de configuração:
- Expressão de taxa: insira
1 hour.
- Expressão de taxa: insira
- Clique em Próxima.
- Em Destino, forneça os seguintes detalhes de configuração:
- API de destino: selecione AWS Lambda Invoke.
- Função Lambda: selecione a função
harness-audit-to-s3.
- Clique em Próxima.
- Revise a configuração da programação.
- Clique em Criar programação.
Criar um usuário do IAM com acesso somente leitura para o Google SecOps
Esse usuário do IAM permite que o Google SecOps leia registros do bucket do S3.
- Acesse Console da AWS > IAM > Usuários > Criar usuário.
- Informe os seguintes detalhes de configuração:
- Nome de usuário: insira
chronicle-s3-reader.
- Nome de usuário: insira
- Clique em Próxima.
- Selecione Anexar políticas diretamente.
- Clique em Criar política.
- Selecione a guia JSON.
Cole a seguinte política:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:GetObject" ], "Resource": "arn:aws:s3:::harness-io-logs/harness/audit/*" }, { "Effect": "Allow", "Action": [ "s3:ListBucket" ], "Resource": "arn:aws:s3:::harness-io-logs", "Condition": { "StringLike": { "s3:prefix": "harness/audit/*" } } } ] }Clique em Próxima.
Nomeie a política como
ChronicleHarnessS3ReadPolicy.Clique em Criar política.
Volte para a guia de criação de usuários e atualize a lista de políticas.
Pesquise e selecione
ChronicleHarnessS3ReadPolicy.Clique em Próxima.
Revise e clique em Criar usuário.
Criar chaves de acesso para o usuário leitor
- Na página Usuários do IAM, selecione o usuário
chronicle-s3-reader. - Selecione a guia Credenciais de segurança.
- Clique em Criar chave de acesso.
- Selecione Serviço de terceiros como o caso de uso.
- Clique em Próxima.
- Opcional: adicione uma tag de descrição.
- Clique em Criar chave de acesso.
- Clique em Fazer o download do arquivo CSV para salvar o ID da chave de acesso e a chave de acesso secreta.
- Clique em Concluído.
Configurar um feed no Google SecOps para ingerir registros do Harness IO
- Acesse Configurações do SIEM > Feeds.
- Clique em Adicionar novo.
- Na próxima página, clique em Configurar um único feed.
- No campo Nome do feed, insira um nome para o feed (por exemplo,
Harness Audit Logs). - Selecione Amazon S3 V2 como o Tipo de origem.
- Selecione Harness IO como o Tipo de registro.
- Clique em Próxima.
Especifique valores para os seguintes parâmetros de entrada:
- URI do S3: insira o URI do bucket do S3 com o caminho do prefixo:
s3://harness-io-logs/harness/audit/ Opção de exclusão da fonte: selecione a opção de exclusão de acordo com sua preferência:
- Nunca: nunca exclui arquivos após as transferências (recomendado inicialmente).
- Em caso de sucesso: exclui todos os arquivos e diretórios vazios após a transferência bem-sucedida.
Idade máxima do arquivo: inclui arquivos modificados no último número de dias. O padrão é 180 dias.
ID da chave de acesso: insira o ID da chave de acesso do usuário
chronicle-s3-reader.Chave de acesso secreta: insira a chave de acesso secreta do usuário
chronicle-s3-reader.Namespace do recurso: o namespace do recurso. Insira
harness.audit.Rótulos de ingestão: rótulos opcionais a serem aplicados aos eventos deste feed.
- URI do S3: insira o URI do bucket do S3 com o caminho do prefixo:
Clique em Próxima.
Revise a nova configuração do feed na tela Finalizar e clique em Enviar.
Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais do Google SecOps.