Raccogliere i log MFA HYPR
Questo documento spiega come importare i log HYPR MFA in Google Security Operations utilizzando webhook o Google Cloud Storage V2.
HYPR MFA è una soluzione di autenticazione a più fattori senza password che fornisce un'autenticazione resistente al phishing utilizzando passkey FIDO2, dati biometrici e accesso avviato da dispositivo mobile. HYPR sostituisce le password tradizionali con la crittografia sicura a chiave pubblica per eliminare gli attacchi basati sulle credenziali, semplificando al contempo l'autenticazione degli utenti su workstation, applicazioni web e servizi cloud.
Prima di iniziare
Assicurati di disporre dei seguenti prerequisiti:
- Un'istanza Google SecOps
- Accesso amministrativo a HYPR Control Center
- Contatta l'assistenza HYPR per attivare gli hook di eventi personalizzati per l'applicazione RP che vuoi monitorare
Differenze nel metodo di raccolta
HYPR MFA supporta due metodi per l'invio dei log a Google Security Operations:
- Webhook (opzione consigliata): HYPR invia gli eventi in tempo reale a Google Security Operations tramite Custom Event Hooks. Questo metodo fornisce la distribuzione immediata degli eventi e non richiede infrastrutture aggiuntive.
- Google Cloud Storage: gli eventi HYPR vengono raccolti tramite API e archiviati in GCS, poi vengono inseriti in Google Security Operations. Questo metodo fornisce l'elaborazione batch e la conservazione dei dati storici.
Scegli il metodo più adatto alle tue esigenze:
| Funzionalità | Webhook | Google Cloud Storage |
|---|---|---|
| Latenza | In tempo reale (secondi) | Batch (da minuti a ore) |
| Infrastruttura | Nessuno richiesto | Progetto Google Cloud con la funzione Cloud Run |
| Dati storici | Limitato al flusso di eventi | Conservazione completa in GCS |
| Complessità della configurazione | Semplice | Moderato |
| Costo | Minimo | Costi di computing e archiviazione di Google Cloud |
Opzione 1: configura l'integrazione webhook
Crea un feed webhook in Google SecOps
Creare il feed
- Vai a Impostazioni SIEM > Feed.
- Fai clic su Aggiungi nuovo feed.
- Nella pagina successiva, fai clic su Configura un singolo feed.
- Nel campo Nome feed, inserisci un nome per il feed (ad esempio,
HYPR MFA Events). - Seleziona Webhook come Tipo di origine.
- Seleziona HYPR MFA come Tipo di log.
- Fai clic su Avanti.
- Specifica i valori per i seguenti parametri di input:
- Delimitatore di divisione (facoltativo): lascia vuoto. Ogni richiesta webhook contiene un singolo evento JSON.
- Spazio dei nomi dell'asset: lo spazio dei nomi dell'asset.
- Etichette di importazione: l'etichetta da applicare agli eventi di questo feed.
- Fai clic su Avanti.
- Controlla la nuova configurazione del feed nella schermata Finalizza e poi fai clic su Invia.
Genera e salva la chiave segreta
Dopo aver creato il feed, devi generare una chiave segreta per l'autenticazione:
- Nella pagina dei dettagli del feed, fai clic su Genera chiave segreta.
- Una finestra di dialogo mostra la chiave segreta.
- Copia e salva la chiave segreta in modo sicuro.
Recuperare l'URL dell'endpoint del feed
- Vai alla scheda Dettagli del feed.
- Nella sezione Endpoint Information (Informazioni sull'endpoint), copia l'URL dell'endpoint del feed.
Il formato dell'URL è:
https://malachiteingestion-pa.googleapis.com/v2/unstructuredlogentries:batchCreateo
https://<REGION>-malachiteingestion-pa.googleapis.com/v2/unstructuredlogentries:batchCreateSalva questo URL per i passaggi successivi.
Fai clic su Fine.
Crea una chiave API Google Cloud
Chronicle richiede una chiave API per l'autenticazione. Crea una chiave API con limitazioni nella Google Cloud Console.
Crea la chiave API
- Vai alla pagina Credenziali della console Google Cloud.
- Seleziona il tuo progetto (quello associato alla tua istanza di Chronicle).
- Fai clic su Crea credenziali > Chiave API.
- Viene creata una chiave API e visualizzata in una finestra di dialogo.
- Fai clic su Modifica chiave API per limitare la chiave.
Limitare la chiave API
- Nella pagina delle impostazioni Chiave API:
- Nome: inserisci un nome descrittivo (ad esempio,
Chronicle Webhook API Key).
- Nome: inserisci un nome descrittivo (ad esempio,
- In Limitazioni API:
- Seleziona Limita chiave.
- Nel menu a discesa Seleziona API, cerca e seleziona API Google SecOps (o API Chronicle).
- Fai clic su Salva.
- Copia il valore della chiave API dal campo Chiave API nella parte superiore della pagina.
- Salva la chiave API in modo sicuro.
Configurare l'hook evento personalizzato HYPR MFA
Crea l'URL webhook con le intestazioni
HYPR supporta le intestazioni personalizzate per l'autenticazione. Utilizza il metodo di autenticazione delle intestazioni per una maggiore sicurezza.
URL dell'endpoint (senza parametri):
<ENDPOINT_URL>Intestazioni:
x-goog-chronicle-auth: <API_KEY> x-chronicle-auth: <SECRET_KEY>- Sostituisci:
<ENDPOINT_URL>: l'URL dell'endpoint del feed del passaggio precedente.<API_KEY>: la chiave API Google Cloud che hai creato.<SECRET_KEY>: La chiave segreta creata dal feed Chronicle.
- Sostituisci:
Prepara la configurazione JSON dell'hook evento personalizzato
Gli hook di eventi personalizzati HYPR vengono configurati utilizzando JSON. Prepara la seguente configurazione JSON, sostituendo i valori segnaposto:
{ "name": "Chronicle SIEM Integration", "eventType": "ALL", "invocationEndpoint": "<ENDPOINT_URL>", "httpMethod": "POST", "authType": "API_KEY", "authParams": { "apiKeyAuthParameters": { "apiKeyName": "x-goog-chronicle-auth", "apiKeyValue": "<API_KEY>" }, "invocationHttpParameters": { "headerParameters": [ { "key": "Content-Type", "value": "application/json", "isValueSecret": false }, { "key": "x-chronicle-auth", "value": "<SECRET_KEY>", "isValueSecret": true } ] } } }Sostituisci:
<ENDPOINT_URL>: l'URL dell'endpoint del feed di Chronicle.<API_KEY>: la chiave API Google Cloud.<SECRET_KEY>: La chiave segreta di Chronicle.
Parametri di configurazione:
name: un nome descrittivo per l'hook evento (ad esempio,
Chronicle SIEM Integration).eventType: imposta il valore su
ALLper inviare tutti gli eventi HYPR o specifica tag evento specifici comeAUTHENTICATION,REGISTRATIONoACCESS_TOKEN.invocationEndpoint: l'URL dell'endpoint del feed Chronicle.
httpMethod: impostato su
POST.authType: impostalo su
API_KEYper l'autenticazione con chiave API.apiKeyName: il nome dell'intestazione della chiave API (
x-goog-chronicle-auth).apiKeyValue: il valore della chiave API Google Cloud.
headerParameters: intestazioni aggiuntive, tra cui
Content-Type: application/jsone la chiave segreta di Chronicle nell'intestazionex-chronicle-auth.
Crea l'hook evento personalizzato in HYPR Control Center
- Accedi a HYPR Control Center come amministratore.
- Nel menu di navigazione a sinistra, fai clic su Integrations (Integrazioni).
- Nella pagina Integrazioni, fai clic su Aggiungi nuova integrazione.
- HYPR Control Center mostra le integrazioni disponibili.
- Fai clic sul riquadro in Hook evento per Eventi personalizzati.
- Fai clic su Aggiungi nuovo hook evento.
- Nella finestra di dialogo Aggiungi nuovo hook evento, incolla i contenuti JSON che hai preparato nel campo di testo.
- Fai clic su Aggiungi hook evento.
- HYPR Control Center torna alla pagina Event Hooks.
L'hook evento personalizzato è ora configurato e inizierà a inviare eventi a Google SecOps.
Verificare il funzionamento del webhook
Controllare lo stato dell'hook evento di HYPR Control Center
- Accedi al centro di controllo HYPR.
- Vai a Integrazioni.
- Fai clic sull'integrazione Eventi personalizzati.
- Nella tabella Event Hooks, verifica che l'hook evento sia elencato.
- Fai clic sul nome dell'hook evento per visualizzare i dettagli.
- Verifica che la configurazione corrisponda alle tue impostazioni.
Controllare lo stato del feed di Chronicle
- Vai a Impostazioni SIEM > Feed in Chronicle.
- Individua il feed webhook.
- Controlla la colonna Stato (deve essere Attivo).
- Controlla il conteggio Eventi ricevuti (deve aumentare).
- Controlla il timestamp di Ultima operazione riuscita il giorno (deve essere recente).
Verifica i log in Chronicle
- Vai a Ricerca > Ricerca UDM.
Utilizza la query seguente:
metadata.vendor_name = "HYPR" AND metadata.product_name = "MFA"Modifica l'intervallo di tempo sull'ultima ora.
Verifica che gli eventi vengano visualizzati nei risultati.
Riferimento ai metodi di autenticazione
Gli hook di eventi personalizzati HYPR supportano più metodi di autenticazione. Il metodo consigliato per Chronicle è l'autenticazione con chiave API con intestazioni personalizzate.
Autenticazione con chiave API (consigliata per Chronicle)
Configurazione:
{ "authType": "API_KEY", "authParams": { "apiKeyAuthParameters": { "apiKeyName": "x-goog-chronicle-auth", "apiKeyValue": "<API_KEY>" }, "invocationHttpParameters": { "headerParameters": [ { "key": "Content-Type", "value": "application/json", "isValueSecret": false }, { "key": "x-chronicle-auth", "value": "<SECRET_KEY>", "isValueSecret": true } ] } } }Vantaggi:
- Chiave API e secret inviati nelle intestazioni (più sicuri dei parametri URL).
- Supporta più intestazioni di autenticazione.
- Intestazioni non registrate nei log di accesso del server web.
Autenticazione di base
Configurazione:
{ "authType": "BASIC", "authParams": { "basicAuthParameters": { "username": "your-username", "password": "your-password" }, "invocationHttpParameters": { "headerParameters": [ { "key": "Content-Type", "value": "application/json", "isValueSecret": false } ] } } }- Caso d'uso:quando il sistema di destinazione richiede l'autenticazione di base HTTP.
Credenziali client OAuth 2.0
Configurazione:
{ "authType": "OAUTH_CLIENT_CREDENTIALS", "authParams": { "oauthParameters": { "clientParameters": { "clientId": "your-client-id", "clientSecret": "your-client-secret" }, "authorizationEndpoint": "https://login.example.com/oauth2/v2.0/token", "httpMethod": "POST", "oauthHttpParameters": { "bodyParameters": [ { "key": "scope", "value": "api://your-api/.default", "isValueSecret": false }, { "key": "grant_type", "value": "client_credentials", "isValueSecret": false } ] } }, "invocationHttpParameters": { "headerParameters": [ { "key": "Content-Type", "value": "application/json", "isValueSecret": false } ] } } }- Caso d'uso:quando il sistema di destinazione richiede l'autenticazione OAuth 2.0.
Tipi di eventi e filtri
Gli eventi HYPR vengono raggruppati utilizzando il parametro eventTags. Puoi configurare l'hook evento personalizzato per inviare tutti gli eventi o filtrare in base a tipi di eventi specifici.
Tag evento
- AUTENTICAZIONE: eventi di autenticazione utente (accesso, sblocco).
- REGISTRATION: eventi di registrazione del dispositivo (accoppiamento di dispositivi mobili, token di sicurezza).
- ACCESS_TOKEN: eventi di generazione e utilizzo del token di accesso.
- AUDIT: eventi dei log di controllo (azioni amministrative, modifiche alla configurazione).
Configurare il filtro degli eventi
Per inviare solo tipi di eventi specifici, modifica il parametro eventType nella configurazione JSON:
Invia tutti gli eventi:
{ "eventType": "ALL" }Invia solo eventi di autenticazione:
{ "eventType": "AUTHENTICATION" }Invia solo eventi di registrazione:
{ "eventType": "REGISTRATION" }
Opzione 2: configura l'integrazione di Google Cloud Storage
Prerequisiti aggiuntivi per l'integrazione di GCS
Oltre ai prerequisiti elencati nella sezione "Prima di iniziare", devi disporre di:
- Un progetto GCP con l'API Cloud Storage abilitata
- Autorizzazioni per creare e gestire bucket GCS
- Autorizzazioni per gestire le policy IAM nei bucket GCS
- Autorizzazioni per creare servizi Cloud Run, argomenti Pub/Sub e job Cloud Scheduler
- Credenziali API HYPR (contatta l'assistenza HYPR per l'accesso API)
Creazione di un bucket Google Cloud Storage
- Vai alla console Google Cloud.
- Seleziona il tuo progetto o creane uno nuovo.
- Nel menu di navigazione, vai a Cloud Storage > Bucket.
- Fai clic su Crea bucket.
Fornisci i seguenti dettagli di configurazione:
Impostazione Valore Assegna un nome al bucket Inserisci un nome univoco globale (ad esempio hypr-mfa-logs).Tipo di località Scegli in base alle tue esigenze (regione singola, doppia regione, più regioni) Località Seleziona la posizione (ad esempio, us-central1).Classe di archiviazione Standard (consigliato per i log a cui si accede di frequente) Controllo dell'accesso Uniforme (consigliato) Strumenti di protezione (Facoltativo) Attiva il controllo delle versioni degli oggetti o la policy di conservazione Fai clic su Crea.
Raccogliere le credenziali API HYPR
Contatta l'assistenza HYPR per ottenere le credenziali API per accedere ai dati degli eventi HYPR. Ti serviranno:
- URL di base dell'API: l'URL dell'istanza HYPR (ad esempio,
https://your-tenant.hypr.com) - Token API: token di autenticazione per l'accesso all'API
- ID app RP: l'ID applicazione Relying Party da monitorare
Crea un account di servizio per la funzione Cloud Run
La funzione Cloud Run richiede un account di servizio con autorizzazioni di scrittura nel bucket GCS e di invocazione da parte di Pub/Sub.
Crea service account
- Nella console Google Cloud, vai a IAM e amministrazione > Service Accounts.
- Fai clic su Crea account di servizio.
- Fornisci i seguenti dettagli di configurazione:
- Nome del service account: inserisci
hypr-logs-collector-sa. - Descrizione service account: inserisci
Service account for Cloud Run function to collect HYPR MFA logs.
- Nome del service account: inserisci
- Fai clic su Crea e continua.
- Nella sezione Concedi a questo account di servizio l'accesso al progetto, aggiungi i seguenti ruoli:
- Fai clic su Seleziona un ruolo.
- Cerca e seleziona Amministratore oggetti di archiviazione.
- Fai clic su + Aggiungi un altro ruolo.
- Cerca e seleziona Cloud Run Invoker.
- Fai clic su + Aggiungi un altro ruolo.
- Cerca e seleziona Invoker di Cloud Functions.
- Fai clic su Continua.
- Fai clic su Fine.
Questi ruoli sono necessari per:
- Amministratore oggetti Storage: scrive i log nel bucket GCS e gestisce i file di stato
- Cloud Run Invoker: consente a Pub/Sub di richiamare la funzione
- Cloud Functions Invoker: consente la chiamata di funzioni
Concedi autorizzazioni IAM sul bucket GCS
Concedi al account di servizio (hypr-logs-collector-sa) le autorizzazioni di scrittura sul bucket GCS:
- Vai a Cloud Storage > Bucket.
- Fai clic sul nome del bucket (ad esempio
hypr-mfa-logs). - Vai alla scheda Autorizzazioni.
- Fai clic su Concedi l'accesso.
- Fornisci i seguenti dettagli di configurazione:
- Aggiungi entità: inserisci l'email del account di servizio (ad es.
hypr-logs-collector-sa@PROJECT_ID.iam.gserviceaccount.com). - Assegna i ruoli: seleziona Storage Object Admin.
- Aggiungi entità: inserisci l'email del account di servizio (ad es.
- Fai clic su Salva.
Crea argomento Pub/Sub
Crea un argomento Pub/Sub a cui Cloud Scheduler pubblicherà e a cui la funzione Cloud Run si iscriverà.
- Nella console GCP, vai a Pub/Sub > Argomenti.
- Fai clic su Crea argomento.
- Fornisci i seguenti dettagli di configurazione:
- ID argomento: inserisci
hypr-logs-trigger. - Lascia le altre impostazioni sui valori predefiniti.
- ID argomento: inserisci
- Fai clic su Crea.
Crea una funzione Cloud Run per raccogliere i log
La funzione Cloud Run verrà attivata dai messaggi Pub/Sub di Cloud Scheduler per recuperare i log dall'API HYPR e scriverli in GCS.
- Nella console GCP, vai a Cloud Run.
- Fai clic su Crea servizio.
- Seleziona Funzione (usa un editor in linea per creare una funzione).
Nella sezione Configura, fornisci i seguenti dettagli di configurazione:
Impostazione Valore Nome servizio hypr-logs-collectorRegione Seleziona la regione corrispondente al tuo bucket GCS (ad esempio, us-central1)Tempo di esecuzione Seleziona Python 3.12 o versioni successive Nella sezione Trigger (facoltativo):
- Fai clic su + Aggiungi trigger.
- Seleziona Cloud Pub/Sub.
- In Seleziona un argomento Cloud Pub/Sub, scegli l'argomento Pub/Sub (
hypr-logs-trigger). - Fai clic su Salva.
Nella sezione Autenticazione:
- Seleziona Richiedi autenticazione.
- Controlla Identity and Access Management (IAM).
Scorri verso il basso ed espandi Container, networking, sicurezza.
Vai alla scheda Sicurezza:
- Service account: seleziona il account di servizio (
hypr-logs-collector-sa).
- Service account: seleziona il account di servizio (
Vai alla scheda Container:
- Fai clic su Variabili e secret.
- Fai clic su + Aggiungi variabile per ogni variabile di ambiente:
Nome variabile Valore di esempio Descrizione GCS_BUCKEThypr-mfa-logsNome bucket GCS GCS_PREFIXhypr-eventsPrefisso per i file di log STATE_KEYhypr-events/state.jsonPercorso file di stato HYPR_API_URLhttps://your-tenant.hypr.comURL di base dell'API HYPR HYPR_API_TOKENyour-api-tokenToken di autenticazione API HYPR HYPR_RP_APP_IDyour-rp-app-idID applicazione RP HYPR MAX_RECORDS1000Numero massimo di record per esecuzione PAGE_SIZE100Record per pagina LOOKBACK_HOURS24Periodo di ricerca iniziale Nella sezione Variabili e secret, scorri verso il basso fino a Richieste:
- Timeout richiesta: inserisci
600secondi (10 minuti).
- Timeout richiesta: inserisci
Vai alla scheda Impostazioni:
- Nella sezione Risorse:
- Memoria: seleziona 512 MiB o un valore superiore.
- CPU: seleziona 1.
- Nella sezione Risorse:
Nella sezione Scalabilità della revisione:
- Numero minimo di istanze: inserisci
0. - Numero massimo di istanze: inserisci
100(o modifica in base al carico previsto).
- Numero minimo di istanze: inserisci
Fai clic su Crea.
Attendi la creazione del servizio (1-2 minuti).
Dopo aver creato il servizio, si aprirà automaticamente l'editor di codice incorporato.
Aggiungi codice per la funzione
- Inserisci main nel campo Entry point (Punto di ingresso).
Nell'editor di codice incorporato, crea due file:
- Primo file: main.py:
import functions_framework from google.cloud import storage import json import os import urllib3 from datetime import datetime, timezone, timedelta import time import base64 # Initialize HTTP client with timeouts http = urllib3.PoolManager( timeout=urllib3.Timeout(connect=5.0, read=30.0), retries=False, ) # Initialize Storage client storage_client = storage.Client() # Environment variables GCS_BUCKET = os.environ.get('GCS_BUCKET') GCS_PREFIX = os.environ.get('GCS_PREFIX', 'hypr-events') STATE_KEY = os.environ.get('STATE_KEY', 'hypr-events/state.json') HYPR_API_URL = os.environ.get('HYPR_API_URL') HYPR_API_TOKEN = os.environ.get('HYPR_API_TOKEN') HYPR_RP_APP_ID = os.environ.get('HYPR_RP_APP_ID') MAX_RECORDS = int(os.environ.get('MAX_RECORDS', '1000')) PAGE_SIZE = int(os.environ.get('PAGE_SIZE', '100')) LOOKBACK_HOURS = int(os.environ.get('LOOKBACK_HOURS', '24')) def to_unix_millis(dt: datetime) -> int: """Convert datetime to Unix epoch milliseconds.""" if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) dt = dt.astimezone(timezone.utc) return int(dt.timestamp() * 1000) def parse_datetime(value: str) -> datetime: """Parse ISO datetime string to datetime object.""" if value.endswith("Z"): value = value[:-1] + "+00:00" return datetime.fromisoformat(value) @functions_framework.cloud_event def main(cloud_event): """ Cloud Run function triggered by Pub/Sub to fetch HYPR MFA logs and write to GCS. Args: cloud_event: CloudEvent object containing Pub/Sub message """ if not all([GCS_BUCKET, HYPR_API_URL, HYPR_API_TOKEN, HYPR_RP_APP_ID]): print('Error: Missing required environment variables') return try: # Get GCS bucket bucket = storage_client.bucket(GCS_BUCKET) # Load state state = load_state(bucket, STATE_KEY) # Determine time window now = datetime.now(timezone.utc) last_time = None if isinstance(state, dict) and state.get("last_event_time"): try: last_time = parse_datetime(state["last_event_time"]) # Overlap by 2 minutes to catch any delayed events last_time = last_time - timedelta(minutes=2) except Exception as e: print(f"Warning: Could not parse last_event_time: {e}") if last_time is None: last_time = now - timedelta(hours=LOOKBACK_HOURS) print(f"Fetching logs from {last_time.isoformat()} to {now.isoformat()}") # Convert to Unix milliseconds for HYPR API start_millis = to_unix_millis(last_time) end_millis = to_unix_millis(now) # Fetch logs records, newest_event_time = fetch_logs( api_url=HYPR_API_URL, api_token=HYPR_API_TOKEN, rp_app_id=HYPR_RP_APP_ID, start_time_ms=start_millis, end_time_ms=end_millis, page_size=PAGE_SIZE, max_records=MAX_RECORDS, ) if not records: print("No new log records found.") save_state(bucket, STATE_KEY, now.isoformat()) return # Write to GCS as NDJSON timestamp = now.strftime('%Y%m%d_%H%M%S') object_key = f"{GCS_PREFIX}/logs_{timestamp}.ndjson" blob = bucket.blob(object_key) ndjson = '\n'.join([json.dumps(record, ensure_ascii=False) for record in records]) + '\n' blob.upload_from_string(ndjson, content_type='application/x-ndjson') print(f"Wrote {len(records)} records to gs://{GCS_BUCKET}/{object_key}") # Update state with newest event time if newest_event_time: save_state(bucket, STATE_KEY, newest_event_time) else: save_state(bucket, STATE_KEY, now.isoformat()) print(f"Successfully processed {len(records)} records") except Exception as e: print(f'Error processing logs: {str(e)}') raise def load_state(bucket, key): """Load state from GCS.""" try: blob = bucket.blob(key) if blob.exists(): state_data = blob.download_as_text() return json.loads(state_data) except Exception as e: print(f"Warning: Could not load state: {e}") return {} def save_state(bucket, key, last_event_time_iso: str): """Save the last event timestamp to GCS state file.""" try: state = {'last_event_time': last_event_time_iso} blob = bucket.blob(key) blob.upload_from_string( json.dumps(state, indent=2), content_type='application/json' ) print(f"Saved state: last_event_time={last_event_time_iso}") except Exception as e: print(f"Warning: Could not save state: {e}") def fetch_logs(api_url: str, api_token: str, rp_app_id: str, start_time_ms: int, end_time_ms: int, page_size: int, max_records: int): """ Fetch logs from HYPR API with pagination and rate limiting. Args: api_url: HYPR API base URL api_token: HYPR API authentication token rp_app_id: HYPR RP application ID start_time_ms: Start time in Unix milliseconds end_time_ms: End time in Unix milliseconds page_size: Number of records per page max_records: Maximum total records to fetch Returns: Tuple of (records list, newest_event_time ISO string) """ # Clean up API URL base_url = api_url.rstrip('/') endpoint = f"{base_url}/rp/api/versioned/events" # Bearer token authentication headers = { 'Authorization': f'Bearer {api_token}', 'Accept': 'application/json', 'Content-Type': 'application/json', 'User-Agent': 'GoogleSecOps-HYPRCollector/1.0' } records = [] newest_time = None page_num = 0 backoff = 1.0 # Offset-based pagination start_index = 0 while True: page_num += 1 if len(records) >= max_records: print(f"Reached max_records limit ({max_records})") break # Build request parameters params = [] params.append(f"rpAppId={rp_app_id}") params.append(f"startDate={start_time_ms}") params.append(f"endDate={end_time_ms}") params.append(f"start={start_index}") params.append(f"limit={min(page_size, max_records - len(records))}") url = f"{endpoint}?{'&'.join(params)}" try: response = http.request('GET', url, headers=headers) # Handle rate limiting with exponential backoff if response.status == 429: retry_after = int(response.headers.get('Retry-After', str(int(backoff)))) print(f"Rate limited (429). Retrying after {retry_after}s...") time.sleep(retry_after) backoff = min(backoff * 2, 30.0) continue backoff = 1.0 if response.status != 200: print(f"HTTP Error: {response.status}") response_text = response.data.decode('utf-8') print(f"Response body: {response_text}") return [], None data = json.loads(response.data.decode('utf-8')) # Extract results page_results = data.get('data', []) if not page_results: print(f"No more results (empty page)") break print(f"Page {page_num}: Retrieved {len(page_results)} events") records.extend(page_results) # Track newest event time for event in page_results: try: # HYPR uses LOGGEDTIMEINUTC field with Unix milliseconds event_time_ms = event.get('LOGGEDTIMEINUTC') if event_time_ms: event_dt = datetime.fromtimestamp(event_time_ms / 1000, tz=timezone.utc) event_time = event_dt.isoformat() if newest_time is None or parse_datetime(event_time) > parse_datetime(newest_time): newest_time = event_time except Exception as e: print(f"Warning: Could not parse event time: {e}") # Check for more results current_size = data.get('size', 0) if current_size < page_size: print(f"Reached last page (size={current_size} < limit={page_size})") break start_index += current_size except Exception as e: print(f"Error fetching logs: {e}") return [], None print(f"Retrieved {len(records)} total records from {page_num} pages") return records, newest_time- Secondo file: requirements.txt::
functions-framework==3.* google-cloud-storage==2.* urllib3>=2.0.0Fai clic su Esegui il deployment per salvare la funzione ed eseguirne il deployment.
Attendi il completamento del deployment (2-3 minuti).
Crea job Cloud Scheduler
Cloud Scheduler pubblicherà messaggi nell'argomento Pub/Sub (hypr-logs-trigger) a intervalli regolari, attivando la funzione Cloud Run.
- Nella console di GCP, vai a Cloud Scheduler.
- Fai clic su Crea job.
Fornisci i seguenti dettagli di configurazione:
Impostazione Valore Nome hypr-logs-collector-hourlyRegione Seleziona la stessa regione della funzione Cloud Run Frequenza 0 * * * *(ogni ora, all'ora)Fuso orario Seleziona il fuso orario (UTC consigliato) Tipo di target Pub/Sub Argomento Seleziona l'argomento Pub/Sub ( hypr-logs-trigger)Corpo del messaggio {}(oggetto JSON vuoto)Fai clic su Crea.
Opzioni di frequenza di pianificazione
Scegli la frequenza in base al volume dei log e ai requisiti di latenza:
| Frequenza | Espressione cron | Caso d'uso |
|---|---|---|
| Ogni 5 minuti | */5 * * * * |
Volume elevato, bassa latenza |
| Ogni 15 minuti | */15 * * * * |
Volume medio |
| Ogni ora | 0 * * * * |
Standard (consigliato) |
| Ogni 6 ore | 0 */6 * * * |
Volume basso, elaborazione batch |
| Ogni giorno | 0 0 * * * |
Raccolta dei dati storici |
Testare l'integrazione
- Nella console Cloud Scheduler, trova il tuo job (
hypr-logs-collector-hourly). - Fai clic su Forza esecuzione per attivare il job manualmente.
- Attendi qualche secondo.
- Vai a Cloud Run > Servizi.
- Fai clic sul nome della funzione (
hypr-logs-collector). - Fai clic sulla scheda Log.
Verifica che la funzione sia stata eseguita correttamente. Cerca:
Fetching logs from YYYY-MM-DDTHH:MM:SS+00:00 to YYYY-MM-DDTHH:MM:SS+00:00 Page 1: Retrieved X events Wrote X records to gs://bucket-name/prefix/logs_YYYYMMDD_HHMMSS.ndjson Successfully processed X recordsVai a Cloud Storage > Bucket.
Fai clic sul nome del bucket (ad esempio
hypr-mfa-logs).Vai alla cartella del prefisso (ad esempio,
hypr-events/).Verifica che sia stato creato un nuovo file
.ndjsoncon il timestamp corrente.
Se visualizzi errori nei log:
- HTTP 401: controlla le credenziali API nelle variabili di ambiente
- HTTP 403: verifica che il token API HYPR disponga delle autorizzazioni richieste e che l'ID app RP sia corretto
- HTTP 429: limitazione della frequenza: la funzione riproverà automaticamente con backoff
- Variabili di ambiente mancanti: controlla che tutte le variabili richieste siano impostate
Recuperare il account di servizio Google SecOps
Google SecOps utilizza un account di servizio univoco per leggere i dati dal tuo bucket GCS. Devi concedere a questo account di servizio l'accesso al tuo bucket.
Configura un feed in Google SecOps per importare i log HYPR MFA
- Vai a Impostazioni SIEM > Feed.
- Fai clic su Aggiungi nuovo feed.
- Fai clic su Configura un singolo feed.
- Nel campo Nome feed, inserisci un nome per il feed (ad esempio,
HYPR MFA Logs from GCS). - Seleziona Google Cloud Storage V2 come Tipo di origine.
Seleziona HYPR MFA come Tipo di log.
Fai clic su Ottieni service account. Verrà visualizzata un'email univoca del account di servizio, ad esempio:
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.comCopia questo indirizzo email per utilizzarlo nel passaggio successivo.
Fai clic su Avanti.
Specifica i valori per i seguenti parametri di input:
URL del bucket di archiviazione: inserisci l'URI del bucket GCS con il percorso del prefisso:
gs://hypr-mfa-logs/hypr-events/- Sostituisci:
hypr-mfa-logs: il nome del bucket GCS.hypr-events: (Facoltativo) prefisso/percorso della cartella in cui vengono archiviati i log (lascia vuoto per la radice).
- Sostituisci:
Opzione di eliminazione dell'origine: seleziona l'opzione di eliminazione in base alle tue preferenze:
- Mai: non elimina mai i file dopo i trasferimenti (opzione consigliata per i test).
- Elimina file trasferiti: elimina i file dopo il trasferimento riuscito.
Elimina file trasferiti e directory vuote: elimina i file e le directory vuote dopo il trasferimento riuscito.
Età massima del file: includi i file modificati nell'ultimo numero di giorni. Il valore predefinito è 180 giorni.
Spazio dei nomi dell'asset: lo spazio dei nomi dell'asset.
Etichette di importazione: l'etichetta da applicare agli eventi di questo feed.
Fai clic su Avanti.
Controlla la nuova configurazione del feed nella schermata Finalizza e poi fai clic su Invia.
Concedi le autorizzazioni IAM al account di servizio Google SecOps
Il account di servizio Google SecOps deve avere il ruolo Visualizzatore oggetti Storage nel bucket GCS.
- Vai a Cloud Storage > Bucket.
- Fai clic sul nome del bucket (ad esempio
hypr-mfa-logs). - Vai alla scheda Autorizzazioni.
- Fai clic su Concedi l'accesso.
- Fornisci i seguenti dettagli di configurazione:
- Aggiungi entità: incolla l'email del account di servizio Google SecOps.
- Assegna i ruoli: seleziona Visualizzatore oggetti Storage.
- Fai clic su Salva.
Tabella di mappatura UDM
| Campo log | Mappatura UDM | Logic |
|---|---|---|
| extensions.auth.type | Tipo di autenticazione (ad es. SSO, MFA) | |
| metadata.event_type | Tipo di evento (ad es. USER_LOGIN, NETWORK_CONNECTION) | |
| EVENTNAME | metadata.product_event_type | Tipo di evento specifico per il prodotto |
| ID | metadata.product_log_id | ID log specifico del prodotto |
| USERAGENT | network.http.parsed_user_agent | User agent HTTP analizzato |
| USERAGENT | network.http.user_agent | Stringa dello user agent HTTP |
| SESSIONID | network.session_id | ID sessione |
| DEVICEMODEL | principal.asset.hardware.model | Modello hardware dell'asset |
| COMPANION,MACHINEDOMAIN | principal.asset.hostname | Nome host dell'asset |
| REMOTEIP | principal.asset.ip | Indirizzo IP dell'asset |
| DEVICEID | principal.asset_id | Identificatore univoco della risorsa |
| COMPANION,MACHINEDOMAIN | principal.hostname | Il nome host associato all'entità |
| REMOTEIP | principal.ip | Indirizzo IP associato al principal |
| DEVICEOS | principal.platform | Piattaforma (ad es. WINDOWS, LINUX) |
| DEVICEOSVERSION | principal.platform_version | Versione della piattaforma |
| ISSUCCESSFUL | security_result.action | Azione intrapresa dal sistema di sicurezza (ad es. CONSENTI, BLOCCA) |
| MESSAGE | security_result.description | Descrizione del risultato di sicurezza |
| MACHINEUSERNAME | target.user.user_display_name | Nome visualizzato dell'utente |
| FIDOUSER | target.user.userid | ID utente |
| metadata.product_name | Nome del prodotto | |
| metadata.vendor_name | Nome fornitore/azienda |
Hai bisogno di ulteriore assistenza? Ricevi risposte dai membri della community e dai professionisti di Google SecOps.