Coletar registros de auditoria do Harness IO
Este documento explica como ingerir registros de auditoria do Harness IO no Google Security Operations usando o Google Cloud Storage. O Harness é uma plataforma de entrega contínua e DevOps que oferece ferramentas para entrega de software, flags de recursos, gerenciamento de custos na nuvem e testes de segurança.
Antes de começar
Verifique se você tem os pré-requisitos a seguir:
- Uma instância do Google SecOps
- Um projeto do GCP com a API Cloud Storage ativada
- Permissões para criar e gerenciar buckets do GCS
- Permissões para gerenciar políticas do IAM em buckets do GCS
- Permissões para criar serviços do Cloud Run, tópicos do Pub/Sub e jobs do Cloud Scheduler
- Acesso privilegiado ao Harness com permissões para:
- Criar chaves de API
- ao máximo.
- Ver as configurações da conta
Coletar credenciais da API Harness
Criar chave de API no Harness
- Faça login na plataforma Harness.
- Clique no seu Perfil de usuário.
- Acesse Minhas chaves de API.
- Clique em + Chave de API.
- Informe os seguintes detalhes de configuração:
- Nome: insira um nome descritivo, por exemplo,
Google SecOps Integration. - Descrição: descrição opcional.
- Nome: insira um nome descritivo, por exemplo,
- Clique em Salvar.
- Clique em + Token para criar um novo token.
- Informe os seguintes detalhes de configuração:
- Nome: insira
Chronicle Feed Token. - Definir validade: selecione um tempo de validade adequado ou Sem validade (para uso em produção).
- Nome: insira
- Clique em Gerar token.
Copie e salve o valor do token com segurança. Esse token será usado como o valor do cabeçalho
x-api-key.
Receber o ID da conta do Harness
- Na Plataforma Harness, anote o ID da conta do URL.
URL de exemplo: https://app.harness.io/ng/account/YOUR_ACCOUNT_ID/.... A parte YOUR_ACCOUNT_ID é o identificador da conta.
Outra opção é acessar Configurações da conta > Visão geral para conferir seu Identificador da conta.
Copie e salve o ID da conta para usar na função do Cloud Run.
Criar um bucket do Google Cloud Storage
- Acesse o Console do Google Cloud.
- Selecione seu projeto ou crie um novo.
- No menu de navegação, acesse Cloud Storage > Buckets.
- Clique em Criar bucket.
Informe os seguintes detalhes de configuração:
Configuração Valor Nomeie seu bucket Insira um nome exclusivo globalmente, por exemplo, harness-io-logs.Tipo de local Escolha com base nas suas necessidades (região, birregional, multirregional) Local Selecione o local (por exemplo, us-central1).Classe de armazenamento Padrão (recomendado para registros acessados com frequência) Controle de acesso Uniforme (recomendado) Ferramentas de proteção Opcional: ativar o controle de versões de objetos ou a política de retenção Clique em Criar.
Criar uma conta de serviço para a função do Cloud Run
A função do Cloud Run precisa de uma conta de serviço com permissões para gravar no bucket do GCS e ser invocada pelo Pub/Sub.
Criar conta de serviço
- No Console do GCP, acesse IAM e administrador > Contas de serviço.
- Clique em Criar conta de serviço.
- Informe os seguintes detalhes de configuração:
- Nome da conta de serviço: insira
harness-audit-collector-sa. - Descrição da conta de serviço: insira
Service account for Cloud Run function to collect Harness IO audit logs.
- Nome da conta de serviço: insira
- Clique em Criar e continuar.
- Na seção Conceder acesso a essa conta de serviço ao projeto, adicione os seguintes papéis:
- Clique em Selecionar papel.
- Pesquise e selecione Administrador de objetos do Storage.
- Clique em + Adicionar outro papel.
- Pesquise e selecione Invocador do Cloud Run.
- Clique em + Adicionar outro papel.
- Pesquise e selecione Invocador do Cloud Functions.
- Clique em Continuar.
- Clique em Concluído.
Esses papéis são necessários para:
- Administrador de objetos do Storage: grava registros em um bucket do GCS e gerencia arquivos de estado.
- Invocador do Cloud Run: permite que o Pub/Sub invoque a função
- Invocador do Cloud Functions: permite a invocação de funções
Conceder permissões do IAM no bucket do GCS
Conceda permissões de gravação à conta de serviço no bucket do GCS:
- Acesse Cloud Storage > Buckets.
- Clique no nome do bucket.
- Acesse a guia Permissões.
- Clique em Conceder acesso.
- Informe os seguintes detalhes de configuração:
- Adicionar principais: insira o e-mail da conta de serviço (por exemplo,
harness-audit-collector-sa@PROJECT_ID.iam.gserviceaccount.com). - Atribuir papéis: selecione Administrador de objetos do Storage.
- Adicionar principais: insira o e-mail da conta de serviço (por exemplo,
- Clique em Salvar.
Criar tópico Pub/Sub
Crie um tópico do Pub/Sub em que o Cloud Scheduler vai publicar e a função do Cloud Run vai se inscrever.
- No Console do GCP, acesse Pub/Sub > Tópicos.
- Selecione Criar tópico.
- Informe os seguintes detalhes de configuração:
- ID do tópico: insira
harness-audit-trigger. - Não altere as outras configurações.
- ID do tópico: insira
- Clique em Criar.
Criar uma função do Cloud Run para coletar registros
A função do Cloud Run é acionada por mensagens do Pub/Sub do Cloud Scheduler para buscar registros da API Harness e gravá-los no GCS.
- No console do GCP, acesse o Cloud Run.
- Clique em Criar serviço.
- Selecione Função (use um editor in-line para criar uma função).
Na seção Configurar, forneça os seguintes detalhes de configuração:
Configuração Valor Nome do serviço harness-audit-collectorRegião Selecione a região que corresponde ao seu bucket do GCS (por exemplo, us-central1).Ambiente de execução Selecione Python 3.12 ou uma versão mais recente. Na seção Acionador (opcional):
- Clique em + Adicionar gatilho.
- Selecione Cloud Pub/Sub.
- Em Selecionar um tópico do Cloud Pub/Sub, escolha o tópico do Pub/Sub (
harness-audit-trigger). - Clique em Salvar.
Na seção Autenticação:
- Selecione Exigir autenticação.
- Confira o Identity and Access Management (IAM).
Role a tela para baixo e abra Contêineres, rede, segurança.
Acesse a guia Segurança:
- Conta de serviço: selecione a conta de serviço (
harness-audit-collector-sa).
- Conta de serviço: selecione a conta de serviço (
Acesse a guia Contêineres:
- Clique em Variáveis e secrets.
- Clique em + Adicionar variável para cada variável de ambiente:
Nome da variável Valor de exemplo Descrição HARNESS_ACCOUNT_IDSeu ID da conta do Harness Identificador da conta da Harness. HARNESS_API_KEYSeu token de chave de API Token com permissões audit:read GCS_BUCKETharness-io-logsNome do bucket do GCS GCS_PREFIXharness/auditPrefixo para objetos do GCS STATE_KEYharness/audit/state.jsonCaminho do arquivo de estado no GCS - Variáveis de ambiente opcionais:
Nome da variável Valor padrão Descrição HARNESS_API_BASEhttps://app.harness.ioURL de base da API Harness (substituição para instâncias autohospedadas) PAGE_SIZE50Eventos por página (máximo de 100) START_MINUTES_BACK60Período inicial de lookback em minutos FILTER_MODULESNenhum Módulos separados por vírgula (por exemplo, CD,CI,CE)FILTER_ACTIONSNenhum Ações separadas por vírgula (por exemplo, CREATE,UPDATE,DELETE)STATIC_FILTERNenhum Filtro predefinido: EXCLUDE_LOGIN_EVENTSouEXCLUDE_SYSTEM_EVENTSMAX_RETRIES3Número máximo de tentativas de repetição para limitação de taxa Role a tela para baixo na guia Variáveis e secrets até Solicitações:
- Tempo limite da solicitação: insira
600segundos (10 minutos).
- Tempo limite da solicitação: insira
Acesse a guia Configurações em Contêineres:
- Na seção Recursos:
- Memória: selecione 512 MiB ou mais.
- CPU: selecione 1.
- Clique em Concluído.
- Na seção Recursos:
Role até Ambiente de execução:
- Selecione Padrão (recomendado).
Na seção Escalonamento de revisão:
- Número mínimo de instâncias: insira
0. - Número máximo de instâncias: insira
100ou ajuste com base na carga esperada.
- Número mínimo de instâncias: insira
Clique em Criar.
Aguarde a criação do serviço (1 a 2 minutos).
Depois que o serviço é criado, o editor de código inline é aberto automaticamente.
Adicionar código da função
- Insira main em Ponto de entrada da função.
No editor de código em linha, crie dois arquivos:
- Primeiro arquivo: main.py::
import functions_framework from google.cloud import storage import json import os import urllib3 from datetime import datetime, timedelta, timezone import time # Initialize HTTP client http = urllib3.PoolManager() # Initialize Storage client storage_client = storage.Client() # Configuration from Environment Variables API_BASE = os.environ.get("HARNESS_API_BASE", "https://app.harness.io").rstrip("/") ACCOUNT_ID = os.environ["HARNESS_ACCOUNT_ID"] API_KEY = os.environ["HARNESS_API_KEY"] BUCKET = os.environ["GCS_BUCKET"] PREFIX = os.environ.get("GCS_PREFIX", "harness/audit").strip("/") STATE_KEY = os.environ.get("STATE_KEY", "harness/audit/state.json") PAGE_SIZE = min(int(os.environ.get("PAGE_SIZE", "50")), 100) START_MINUTES_BACK = int(os.environ.get("START_MINUTES_BACK", "60")) # Optional filters FILTER_MODULES = os.environ.get("FILTER_MODULES", "").split(",") if os.environ.get("FILTER_MODULES") else None FILTER_ACTIONS = os.environ.get("FILTER_ACTIONS", "").split(",") if os.environ.get("FILTER_ACTIONS") else None STATIC_FILTER = os.environ.get("STATIC_FILTER") MAX_RETRIES = int(os.environ.get("MAX_RETRIES", "3")) # HTTP headers for Harness API HDRS = { "x-api-key": API_KEY, "Content-Type": "application/json", "Accept": "application/json", } def read_state(bucket): """Read checkpoint state from GCS.""" try: blob = bucket.blob(STATE_KEY) if blob.exists(): state_data = blob.download_as_text() state = json.loads(state_data) since_ms = state.get("since") page_token = state.get("pageToken") print(f"State loaded: since={since_ms}, pageToken={page_token}") return since_ms, page_token except Exception as e: print(f"Warning: Could not load state: {e}") print("No state file found, starting fresh collection") start_time = datetime.now(timezone.utc) - timedelta(minutes=START_MINUTES_BACK) since_ms = int(start_time.timestamp() * 1000) print(f"Initial since timestamp: {since_ms} ({start_time.isoformat()})") return since_ms, None def write_state(bucket, since_ms, page_token=None): """Write checkpoint state to GCS.""" state = { "since": since_ms, "pageToken": page_token, "lastRun": int(time.time() * 1000), "lastRunISO": datetime.now(timezone.utc).isoformat() } try: blob = bucket.blob(STATE_KEY) blob.upload_from_string( json.dumps(state, indent=2), content_type="application/json" ) print(f"State saved: since={since_ms}, pageToken={page_token}") except Exception as e: print(f"Error writing state: {e}") raise def fetch_harness_audits(since_ms, page_token=None, retry_count=0): """ Fetch audit logs from Harness API with retry logic. API Endpoint: POST /audit/api/audits/listV2 """ try: # Build URL with query parameters url = ( f"{API_BASE}/audit/api/audits/listV2" f"?accountIdentifier={ACCOUNT_ID}" f"&pageSize={PAGE_SIZE}" ) if page_token: url += f"&pageToken={page_token}" print(f"Fetching from: {url[:100]}...") # Build request body with time filter and optional filters body_data = { "startTime": since_ms, "endTime": int(time.time() * 1000), "filterType": "Audit" } if FILTER_MODULES: body_data["modules"] = [m.strip() for m in FILTER_MODULES if m.strip()] print(f"Applying module filter: {body_data['modules']}") if FILTER_ACTIONS: body_data["actions"] = [a.strip() for a in FILTER_ACTIONS if a.strip()] print(f"Applying action filter: {body_data['actions']}") if STATIC_FILTER: body_data["staticFilter"] = STATIC_FILTER print(f"Applying static filter: {STATIC_FILTER}") # Make POST request response = http.request( 'POST', url, body=json.dumps(body_data).encode('utf-8'), headers=HDRS, timeout=30.0 ) resp_data = json.loads(response.data.decode('utf-8')) if "status" not in resp_data: print(f"Response missing 'status' field: {response.data[:200]}") # Check response status if resp_data.get("status") != "SUCCESS": error_msg = resp_data.get("message", "Unknown error") raise Exception(f"API returned status: {resp_data.get('status')} - {error_msg}") # Extract data from response structure data_obj = resp_data.get("data", {}) if not data_obj: print("Response 'data' object is empty or missing") events = data_obj.get("content", []) has_next = data_obj.get("hasNext", False) next_token = data_obj.get("pageToken") print(f"API response: {len(events)} events, hasNext={has_next}, pageToken={next_token}") if not events and data_obj: print(f"Empty events but data present. Data keys: {list(data_obj.keys())}") return { "events": events, "hasNext": has_next, "pageToken": next_token } except Exception as e: if hasattr(e, 'status') and e.status == 429: retry_after = 60 print(f"Rate limit exceeded. Retry after {retry_after} seconds (attempt {retry_count + 1}/{MAX_RETRIES})") if retry_count < MAX_RETRIES: print(f"Waiting {retry_after} seconds before retry...") time.sleep(retry_after) print(f"Retrying request (attempt {retry_count + 2}/{MAX_RETRIES})") return fetch_harness_audits(since_ms, page_token, retry_count + 1) else: raise Exception(f"Max retries ({MAX_RETRIES}) exceeded for rate limiting") print(f"Error in fetch_harness_audits: {e}") raise def upload_to_gcs(bucket, events): """Upload audit events to GCS in JSONL format.""" if not events: print("No events to upload") return None try: # Create JSONL content (one JSON object per line) jsonl_lines = [json.dumps(event) for event in events] jsonl_content = "\n".join(jsonl_lines) # Generate GCS key with timestamp timestamp = datetime.now(timezone.utc) key = ( f"{PREFIX}/" f"{timestamp:%Y/%m/%d}/" f"harness-audit-{timestamp:%Y%m%d-%H%M%S}.jsonl" ) # Upload to GCS blob = bucket.blob(key) blob.upload_from_string( jsonl_content, content_type="application/x-ndjson" ) blob.metadata = { "event-count": str(len(events)), "source": "harness-audit-function", "collection-time": timestamp.isoformat() } blob.patch() print(f"Uploaded {len(events)} events to gs://{BUCKET}/{key}") return key except Exception as e: print(f"Error uploading to GCS: {e}") raise @functions_framework.cloud_event def main(cloud_event): """ Cloud Run function triggered by Pub/Sub to fetch Harness audit logs and write to GCS. Args: cloud_event: CloudEvent object containing Pub/Sub message """ print("=== Harness Audit Collection Started ===") print(f"Configuration: API_BASE={API_BASE}, ACCOUNT_ID={ACCOUNT_ID[:8]}..., PAGE_SIZE={PAGE_SIZE}") if FILTER_MODULES: print(f"Module filter enabled: {FILTER_MODULES}") if FILTER_ACTIONS: print(f"Action filter enabled: {FILTER_ACTIONS}") if STATIC_FILTER: print(f"Static filter enabled: {STATIC_FILTER}") try: # Get GCS bucket bucket = storage_client.bucket(BUCKET) # Step 1: Read checkpoint state since_ms, page_token = read_state(bucket) if page_token: print("Resuming pagination from saved pageToken") else: since_dt = datetime.fromtimestamp(since_ms / 1000, tz=timezone.utc) print(f"Starting new collection from: {since_dt.isoformat()}") # Step 2: Collect all events with pagination all_events = [] current_page_token = page_token page_count = 0 max_pages = 100 has_next = True while has_next and page_count < max_pages: page_count += 1 print(f"--- Fetching page {page_count} ---") # Fetch one page of results result = fetch_harness_audits(since_ms, current_page_token) # Extract events events = result.get("events", []) all_events.extend(events) print(f"Page {page_count}: {len(events)} events (total: {len(all_events)})") # Check pagination status has_next = result.get("hasNext", False) current_page_token = result.get("pageToken") if not has_next: print("Pagination complete (hasNext=False)") break if not current_page_token: print("hasNext=True but no pageToken, stopping pagination") break # Small delay between pages to avoid rate limiting time.sleep(0.5) if page_count >= max_pages: print(f"Reached max pages limit ({max_pages}), stopping") # Step 3: Upload collected events to GCS if all_events: gcs_key = upload_to_gcs(bucket, all_events) print(f"Successfully uploaded {len(all_events)} total events") else: print("No new events to upload") gcs_key = None # Step 4: Update checkpoint state if not has_next: # Pagination complete - update since to current time for next run new_since = int(time.time() * 1000) write_state(bucket, new_since, None) print(f"Pagination complete, state updated with new since={new_since}") else: # Pagination incomplete - save pageToken for continuation write_state(bucket, since_ms, current_page_token) print("Pagination incomplete, saved pageToken for next run") # Step 5: Log result result = { "status": "Success", "eventsCollected": len(all_events), "pagesProcessed": page_count, "paginationComplete": not has_next, "gcsKey": gcs_key, "filters": { "modules": FILTER_MODULES, "actions": FILTER_ACTIONS, "staticFilter": STATIC_FILTER } } print(f"Collection completed: {json.dumps(result)}") except Exception as e: print(f"Collection failed: {e}") raise finally: print("=== Harness Audit Collection Finished ===")- Segundo arquivo: requirements.txt:
functions-framework==3.* google-cloud-storage==2.* urllib3>=2.0.0Clique em Implantar para salvar e implantar a função.
Aguarde a conclusão da implantação (2 a 3 minutos).
Criar o job do Cloud Scheduler
O Cloud Scheduler publica mensagens no tópico do Pub/Sub em intervalos regulares, acionando a função do Cloud Run.
- No Console do GCP, acesse o Cloud Scheduler.
- Clique em Criar job.
Informe os seguintes detalhes de configuração:
Configuração Valor Nome harness-audit-hourlyRegião Selecione a mesma região da função do Cloud Run Frequência 0 * * * *(a cada hora, na hora)Fuso horário Selecione o fuso horário (UTC recomendado) Tipo de destino Pub/Sub Tópico Selecione o tópico do Pub/Sub ( harness-audit-trigger).Corpo da mensagem {}(objeto JSON vazio)Clique em Criar.
Opções de frequência de programação
Escolha a frequência com base no volume de registros e nos requisitos de latência:
Frequência Expressão Cron Caso de uso A cada 5 minutos */5 * * * *Alto volume e baixa latência A cada 15 minutos */15 * * * *Volume médio A cada hora 0 * * * *Padrão (recomendado) A cada 6 horas 0 */6 * * *Baixo volume, processamento em lote Diário 0 0 * * *Coleta de dados históricos
Testar a integração
- No console do Cloud Scheduler, encontre seu job.
- Clique em Executar à força para acionar o job manualmente.
- Aguarde alguns segundos.
- Acesse Cloud Run > Serviços.
- Clique no nome da função (
harness-audit-collector). - Clique na guia Registros.
Verifique se a função foi executada com sucesso. Procure o seguinte:
=== Harness Audit Collection Started === State loaded: since=... or No state file found, starting fresh collection --- Fetching page 1 --- API response: X events, hasNext=... Uploaded X events to gs://harness-io-logs/harness/audit/... Successfully processed X records === Harness Audit Collection Finished ===Acesse Cloud Storage > Buckets.
Clique no nome do bucket.
Navegue até a pasta de prefixo (
harness/audit/).Verifique se um novo arquivo
.jsonlfoi criado com o carimbo de data/hora atual.
Se você encontrar erros nos registros:
- HTTP 401: verifique as credenciais da API nas variáveis de ambiente
- HTTP 403: verifique se a conta tem as permissões necessárias
- HTTP 429: limitação de taxa. A função vai tentar novamente automaticamente com espera.
Variáveis de ambiente ausentes: verifique se todas as variáveis necessárias estão definidas.
Recuperar a conta de serviço do Google SecOps
O Google SecOps usa uma conta de serviço exclusiva para ler dados do seu bucket do GCS. Você precisa conceder a essa conta de serviço acesso ao seu bucket.
Receber o e-mail da conta de serviço
- Acesse Configurações do SIEM > Feeds.
- Clique em Adicionar novo feed.
- Clique em Configurar um único feed.
- No campo Nome do feed, insira um nome para o feed (por exemplo,
Harness Audit Logs). - Selecione Google Cloud Storage V2 como o Tipo de origem.
- Selecione Harness IO como o Tipo de registro.
Clique em Receber conta de serviço. Um e-mail exclusivo da conta de serviço é exibido, por exemplo:
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.comCopie esse endereço de e-mail para usar na próxima etapa.
Conceder permissões do IAM à conta de serviço do Google SecOps
A conta de serviço do Google SecOps precisa do papel de Leitor de objetos do Storage no seu bucket do GCS.
- Acesse Cloud Storage > Buckets.
- Clique no nome do bucket.
- Acesse a guia Permissões.
- Clique em Conceder acesso.
- Informe os seguintes detalhes de configuração:
- Adicionar participantes: cole o e-mail da conta de serviço do Google SecOps.
- Atribuir papéis: selecione Leitor de objetos do Storage.
Clique em Salvar.
Configurar um feed no Google SecOps para ingerir registros do Harness IO
- Acesse Configurações do SIEM > Feeds.
- Clique em Adicionar novo feed.
- Clique em Configurar um único feed.
- No campo Nome do feed, insira um nome para o feed (por exemplo,
Harness Audit Logs). - Selecione Google Cloud Storage V2 como o Tipo de origem.
- Selecione Harness IO como o Tipo de registro.
- Clique em Próxima.
Especifique valores para os seguintes parâmetros de entrada:
URL do bucket de armazenamento: insira o URI do bucket do GCS com o caminho do prefixo:
gs://harness-io-logs/harness/audit/Substitua:
harness-io-logs: o nome do bucket do GCS.harness/audit: prefixo/caminho da pasta onde os registros são armazenados.
Exemplos:
- Bucket raiz:
gs://company-logs/ - Com prefixo:
gs://company-logs/harness-logs/ - Com subpasta:
gs://company-logs/harness/audit/
- Bucket raiz:
Opção de exclusão da fonte: selecione a opção de exclusão de acordo com sua preferência:
- Nunca: nunca exclui arquivos após as transferências (recomendado para testes).
- Excluir arquivos transferidos: exclui os arquivos após a transferência bem-sucedida.
Excluir arquivos transferidos e diretórios vazios: exclui arquivos e diretórios vazios após a transferência bem-sucedida.
Idade máxima do arquivo: inclui arquivos modificados no último número de dias. O padrão é de 180 dias.
Namespace do recurso: o namespace do recurso. Insira
harness.audit.Rótulos de ingestão: o rótulo a ser aplicado aos eventos deste feed.
Clique em Próxima.
Revise a nova configuração do feed na tela Finalizar e clique em Enviar.
Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais do Google SecOps.