Raccogliere i log MFA HYPR

Supportato in:

Questo documento spiega come importare i log HYPR MFA in Google Security Operations utilizzando webhook o Google Cloud Storage V2.

HYPR MFA è una soluzione di autenticazione a più fattori senza password che fornisce un'autenticazione resistente al phishing utilizzando passkey FIDO2, dati biometrici e accesso avviato da dispositivo mobile. HYPR sostituisce le password tradizionali con la crittografia sicura a chiave pubblica per eliminare gli attacchi basati sulle credenziali, semplificando al contempo l'autenticazione degli utenti su workstation, applicazioni web e servizi cloud.

Prima di iniziare

Assicurati di disporre dei seguenti prerequisiti:

  • Un'istanza Google SecOps
  • Accesso amministrativo a HYPR Control Center
  • Contatta l'assistenza HYPR per attivare gli hook di eventi personalizzati per l'applicazione RP che vuoi monitorare

Differenze nel metodo di raccolta

HYPR MFA supporta due metodi per l'invio dei log a Google Security Operations:

  • Webhook (opzione consigliata): HYPR invia gli eventi in tempo reale a Google Security Operations tramite Custom Event Hooks. Questo metodo fornisce la distribuzione immediata degli eventi e non richiede infrastrutture aggiuntive.
  • Google Cloud Storage: gli eventi HYPR vengono raccolti tramite API e archiviati in GCS, poi vengono inseriti in Google Security Operations. Questo metodo fornisce l'elaborazione batch e la conservazione dei dati storici.

Scegli il metodo più adatto alle tue esigenze:

Funzionalità Webhook Google Cloud Storage
Latenza In tempo reale (secondi) Batch (da minuti a ore)
Infrastruttura Nessuno richiesto Progetto Google Cloud con la funzione Cloud Run
Dati storici Limitato al flusso di eventi Conservazione completa in GCS
Complessità della configurazione Semplice Moderato
Costo Minimo Costi di computing e archiviazione di Google Cloud

Opzione 1: configura l'integrazione webhook

Crea un feed webhook in Google SecOps

Creare il feed

  1. Vai a Impostazioni SIEM > Feed.
  2. Fai clic su Aggiungi nuovo feed.
  3. Nella pagina successiva, fai clic su Configura un singolo feed.
  4. Nel campo Nome feed, inserisci un nome per il feed (ad esempio, HYPR MFA Events).
  5. Seleziona Webhook come Tipo di origine.
  6. Seleziona HYPR MFA come Tipo di log.
  7. Fai clic su Avanti.
  8. Specifica i valori per i seguenti parametri di input:
    • Delimitatore di divisione (facoltativo): lascia vuoto. Ogni richiesta webhook contiene un singolo evento JSON.
    • Spazio dei nomi dell'asset: lo spazio dei nomi dell'asset.
    • Etichette di importazione: l'etichetta da applicare agli eventi di questo feed.
  9. Fai clic su Avanti.
  10. Controlla la nuova configurazione del feed nella schermata Finalizza e poi fai clic su Invia.

Genera e salva la chiave segreta

Dopo aver creato il feed, devi generare una chiave segreta per l'autenticazione:

  1. Nella pagina dei dettagli del feed, fai clic su Genera chiave segreta.
  2. Una finestra di dialogo mostra la chiave segreta.
  3. Copia e salva la chiave segreta in modo sicuro.

Recuperare l'URL dell'endpoint del feed

  1. Vai alla scheda Dettagli del feed.
  2. Nella sezione Endpoint Information (Informazioni sull'endpoint), copia l'URL dell'endpoint del feed.
  3. Il formato dell'URL è:

    https://malachiteingestion-pa.googleapis.com/v2/unstructuredlogentries:batchCreate
    

    o

    https://<REGION>-malachiteingestion-pa.googleapis.com/v2/unstructuredlogentries:batchCreate
    
  4. Salva questo URL per i passaggi successivi.

  5. Fai clic su Fine.

Crea una chiave API Google Cloud

Chronicle richiede una chiave API per l'autenticazione. Crea una chiave API con limitazioni nella Google Cloud Console.

Crea la chiave API

  1. Vai alla pagina Credenziali della console Google Cloud.
  2. Seleziona il tuo progetto (quello associato alla tua istanza di Chronicle).
  3. Fai clic su Crea credenziali > Chiave API.
  4. Viene creata una chiave API e visualizzata in una finestra di dialogo.
  5. Fai clic su Modifica chiave API per limitare la chiave.

Limitare la chiave API

  1. Nella pagina delle impostazioni Chiave API:
    • Nome: inserisci un nome descrittivo (ad esempio, Chronicle Webhook API Key).
  2. In Limitazioni API:
    1. Seleziona Limita chiave.
    2. Nel menu a discesa Seleziona API, cerca e seleziona API Google SecOps (o API Chronicle).
  3. Fai clic su Salva.
  4. Copia il valore della chiave API dal campo Chiave API nella parte superiore della pagina.
  5. Salva la chiave API in modo sicuro.

Configurare l'hook evento personalizzato HYPR MFA

Crea l'URL webhook con le intestazioni

HYPR supporta le intestazioni personalizzate per l'autenticazione. Utilizza il metodo di autenticazione delle intestazioni per una maggiore sicurezza.

  • URL dell'endpoint (senza parametri):

    <ENDPOINT_URL>
    
  • Intestazioni:

    x-goog-chronicle-auth: <API_KEY>
    x-chronicle-auth: <SECRET_KEY>
    
    • Sostituisci:
      • <ENDPOINT_URL>: l'URL dell'endpoint del feed del passaggio precedente.
      • <API_KEY>: la chiave API Google Cloud che hai creato.
      • <SECRET_KEY>: La chiave segreta creata dal feed Chronicle.

Prepara la configurazione JSON dell'hook evento personalizzato

  • Gli hook di eventi personalizzati HYPR vengono configurati utilizzando JSON. Prepara la seguente configurazione JSON, sostituendo i valori segnaposto:

    {
      "name": "Chronicle SIEM Integration",
      "eventType": "ALL",
      "invocationEndpoint": "<ENDPOINT_URL>",
      "httpMethod": "POST",
      "authType": "API_KEY",
      "authParams": {
        "apiKeyAuthParameters": {
          "apiKeyName": "x-goog-chronicle-auth",
          "apiKeyValue": "<API_KEY>"
        },
        "invocationHttpParameters": {
          "headerParameters": [
            {
              "key": "Content-Type",
              "value": "application/json",
              "isValueSecret": false
            },
            {
              "key": "x-chronicle-auth",
              "value": "<SECRET_KEY>",
              "isValueSecret": true
            }
          ]
        }
      }
    }
    
    • Sostituisci:

      • <ENDPOINT_URL>: l'URL dell'endpoint del feed di Chronicle.
      • <API_KEY>: la chiave API Google Cloud.
      • <SECRET_KEY>: La chiave segreta di Chronicle.
    • Parametri di configurazione:

    • name: un nome descrittivo per l'hook evento (ad esempio, Chronicle SIEM Integration).

    • eventType: imposta il valore su ALL per inviare tutti gli eventi HYPR o specifica tag evento specifici come AUTHENTICATION, REGISTRATION o ACCESS_TOKEN.

    • invocationEndpoint: l'URL dell'endpoint del feed Chronicle.

    • httpMethod: impostato su POST.

    • authType: impostalo su API_KEY per l'autenticazione con chiave API.

    • apiKeyName: il nome dell'intestazione della chiave API (x-goog-chronicle-auth).

    • apiKeyValue: il valore della chiave API Google Cloud.

    • headerParameters: intestazioni aggiuntive, tra cui Content-Type: application/json e la chiave segreta di Chronicle nell'intestazione x-chronicle-auth.

Crea l'hook evento personalizzato in HYPR Control Center

  1. Accedi a HYPR Control Center come amministratore.
  2. Nel menu di navigazione a sinistra, fai clic su Integrations (Integrazioni).
  3. Nella pagina Integrazioni, fai clic su Aggiungi nuova integrazione.
  4. HYPR Control Center mostra le integrazioni disponibili.
  5. Fai clic sul riquadro in Hook evento per Eventi personalizzati.
  6. Fai clic su Aggiungi nuovo hook evento.
  7. Nella finestra di dialogo Aggiungi nuovo hook evento, incolla i contenuti JSON che hai preparato nel campo di testo.
  8. Fai clic su Aggiungi hook evento.
  9. HYPR Control Center torna alla pagina Event Hooks.

L'hook evento personalizzato è ora configurato e inizierà a inviare eventi a Google SecOps.

Verificare il funzionamento del webhook

Controllare lo stato dell'hook evento di HYPR Control Center

  1. Accedi al centro di controllo HYPR.
  2. Vai a Integrazioni.
  3. Fai clic sull'integrazione Eventi personalizzati.
  4. Nella tabella Event Hooks, verifica che l'hook evento sia elencato.
  5. Fai clic sul nome dell'hook evento per visualizzare i dettagli.
  6. Verifica che la configurazione corrisponda alle tue impostazioni.

Controllare lo stato del feed di Chronicle

  1. Vai a Impostazioni SIEM > Feed in Chronicle.
  2. Individua il feed webhook.
  3. Controlla la colonna Stato (deve essere Attivo).
  4. Controlla il conteggio Eventi ricevuti (deve aumentare).
  5. Controlla il timestamp di Ultima operazione riuscita il giorno (deve essere recente).

Verifica i log in Chronicle

  1. Vai a Ricerca > Ricerca UDM.
  2. Utilizza la query seguente:

    metadata.vendor_name = "HYPR" AND metadata.product_name = "MFA"
    
  3. Modifica l'intervallo di tempo sull'ultima ora.

  4. Verifica che gli eventi vengano visualizzati nei risultati.

Riferimento ai metodi di autenticazione

Gli hook di eventi personalizzati HYPR supportano più metodi di autenticazione. Il metodo consigliato per Chronicle è l'autenticazione con chiave API con intestazioni personalizzate.

  • Configurazione:

    {
      "authType": "API_KEY",
      "authParams": {
        "apiKeyAuthParameters": {
          "apiKeyName": "x-goog-chronicle-auth",
          "apiKeyValue": "<API_KEY>"
        },
        "invocationHttpParameters": {
          "headerParameters": [
            {
              "key": "Content-Type",
              "value": "application/json",
              "isValueSecret": false
            },
            {
              "key": "x-chronicle-auth",
              "value": "<SECRET_KEY>",
              "isValueSecret": true
            }
          ]
        }
      }
    }
    
  • Vantaggi:

    • Chiave API e secret inviati nelle intestazioni (più sicuri dei parametri URL).
    • Supporta più intestazioni di autenticazione.
    • Intestazioni non registrate nei log di accesso del server web.

Autenticazione di base

  • Configurazione:

    {
      "authType": "BASIC",
      "authParams": {
        "basicAuthParameters": {
          "username": "your-username",
          "password": "your-password"
        },
        "invocationHttpParameters": {
          "headerParameters": [
            {
              "key": "Content-Type",
              "value": "application/json",
              "isValueSecret": false
            }
          ]
        }
      }
    }
    
    • Caso d'uso:quando il sistema di destinazione richiede l'autenticazione di base HTTP.

Credenziali client OAuth 2.0

  • Configurazione:

    {
      "authType": "OAUTH_CLIENT_CREDENTIALS",
      "authParams": {
        "oauthParameters": {
          "clientParameters": {
            "clientId": "your-client-id",
            "clientSecret": "your-client-secret"
          },
          "authorizationEndpoint": "https://login.example.com/oauth2/v2.0/token",
          "httpMethod": "POST",
          "oauthHttpParameters": {
            "bodyParameters": [
              {
                "key": "scope",
                "value": "api://your-api/.default",
                "isValueSecret": false
              },
              {
                "key": "grant_type",
                "value": "client_credentials",
                "isValueSecret": false
              }
            ]
          }
        },
        "invocationHttpParameters": {
          "headerParameters": [
            {
              "key": "Content-Type",
              "value": "application/json",
              "isValueSecret": false
            }
          ]
        }
      }
    }
    
    • Caso d'uso:quando il sistema di destinazione richiede l'autenticazione OAuth 2.0.

Tipi di eventi e filtri

Gli eventi HYPR vengono raggruppati utilizzando il parametro eventTags. Puoi configurare l'hook evento personalizzato per inviare tutti gli eventi o filtrare in base a tipi di eventi specifici.

Tag evento

  • AUTENTICAZIONE: eventi di autenticazione utente (accesso, sblocco).
  • REGISTRATION: eventi di registrazione del dispositivo (accoppiamento di dispositivi mobili, token di sicurezza).
  • ACCESS_TOKEN: eventi di generazione e utilizzo del token di accesso.
  • AUDIT: eventi dei log di controllo (azioni amministrative, modifiche alla configurazione).

Configurare il filtro degli eventi

Per inviare solo tipi di eventi specifici, modifica il parametro eventType nella configurazione JSON:

  • Invia tutti gli eventi:

    {
      "eventType": "ALL"
    }
    
  • Invia solo eventi di autenticazione:

    {
      "eventType": "AUTHENTICATION"
    }
    
  • Invia solo eventi di registrazione:

    {
      "eventType": "REGISTRATION"
    }
    

Opzione 2: configura l'integrazione di Google Cloud Storage

Prerequisiti aggiuntivi per l'integrazione di GCS

Oltre ai prerequisiti elencati nella sezione "Prima di iniziare", devi disporre di:

  • Un progetto GCP con l'API Cloud Storage abilitata
  • Autorizzazioni per creare e gestire bucket GCS
  • Autorizzazioni per gestire le policy IAM nei bucket GCS
  • Autorizzazioni per creare servizi Cloud Run, argomenti Pub/Sub e job Cloud Scheduler
  • Credenziali API HYPR (contatta l'assistenza HYPR per l'accesso API)

Creazione di un bucket Google Cloud Storage

  1. Vai alla console Google Cloud.
  2. Seleziona il tuo progetto o creane uno nuovo.
  3. Nel menu di navigazione, vai a Cloud Storage > Bucket.
  4. Fai clic su Crea bucket.
  5. Fornisci i seguenti dettagli di configurazione:

    Impostazione Valore
    Assegna un nome al bucket Inserisci un nome univoco globale (ad esempio hypr-mfa-logs).
    Tipo di località Scegli in base alle tue esigenze (regione singola, doppia regione, più regioni)
    Località Seleziona la posizione (ad esempio, us-central1).
    Classe di archiviazione Standard (consigliato per i log a cui si accede di frequente)
    Controllo dell'accesso Uniforme (consigliato)
    Strumenti di protezione (Facoltativo) Attiva il controllo delle versioni degli oggetti o la policy di conservazione
  6. Fai clic su Crea.

Raccogliere le credenziali API HYPR

Contatta l'assistenza HYPR per ottenere le credenziali API per accedere ai dati degli eventi HYPR. Ti serviranno:

  • URL di base dell'API: l'URL dell'istanza HYPR (ad esempio, https://your-tenant.hypr.com)
  • Token API: token di autenticazione per l'accesso all'API
  • ID app RP: l'ID applicazione Relying Party da monitorare

Crea un account di servizio per la funzione Cloud Run

La funzione Cloud Run richiede un account di servizio con autorizzazioni di scrittura nel bucket GCS e di invocazione da parte di Pub/Sub.

Crea service account

  1. Nella console Google Cloud, vai a IAM e amministrazione > Service Accounts.
  2. Fai clic su Crea account di servizio.
  3. Fornisci i seguenti dettagli di configurazione:
    • Nome del service account: inserisci hypr-logs-collector-sa.
    • Descrizione service account: inserisci Service account for Cloud Run function to collect HYPR MFA logs.
  4. Fai clic su Crea e continua.
  5. Nella sezione Concedi a questo account di servizio l'accesso al progetto, aggiungi i seguenti ruoli:
    1. Fai clic su Seleziona un ruolo.
    2. Cerca e seleziona Amministratore oggetti di archiviazione.
    3. Fai clic su + Aggiungi un altro ruolo.
    4. Cerca e seleziona Cloud Run Invoker.
    5. Fai clic su + Aggiungi un altro ruolo.
    6. Cerca e seleziona Invoker di Cloud Functions.
  6. Fai clic su Continua.
  7. Fai clic su Fine.

Questi ruoli sono necessari per:

  • Amministratore oggetti Storage: scrive i log nel bucket GCS e gestisce i file di stato
  • Cloud Run Invoker: consente a Pub/Sub di richiamare la funzione
  • Cloud Functions Invoker: consente la chiamata di funzioni

Concedi autorizzazioni IAM sul bucket GCS

Concedi al account di servizio (hypr-logs-collector-sa) le autorizzazioni di scrittura sul bucket GCS:

  1. Vai a Cloud Storage > Bucket.
  2. Fai clic sul nome del bucket (ad esempio hypr-mfa-logs).
  3. Vai alla scheda Autorizzazioni.
  4. Fai clic su Concedi l'accesso.
  5. Fornisci i seguenti dettagli di configurazione:
    • Aggiungi entità: inserisci l'email del account di servizio (ad es. hypr-logs-collector-sa@PROJECT_ID.iam.gserviceaccount.com).
    • Assegna i ruoli: seleziona Storage Object Admin.
  6. Fai clic su Salva.

Crea argomento Pub/Sub

Crea un argomento Pub/Sub a cui Cloud Scheduler pubblicherà e a cui la funzione Cloud Run si iscriverà.

  1. Nella console GCP, vai a Pub/Sub > Argomenti.
  2. Fai clic su Crea argomento.
  3. Fornisci i seguenti dettagli di configurazione:
    • ID argomento: inserisci hypr-logs-trigger.
    • Lascia le altre impostazioni sui valori predefiniti.
  4. Fai clic su Crea.

Crea una funzione Cloud Run per raccogliere i log

La funzione Cloud Run verrà attivata dai messaggi Pub/Sub di Cloud Scheduler per recuperare i log dall'API HYPR e scriverli in GCS.

  1. Nella console GCP, vai a Cloud Run.
  2. Fai clic su Crea servizio.
  3. Seleziona Funzione (usa un editor in linea per creare una funzione).
  4. Nella sezione Configura, fornisci i seguenti dettagli di configurazione:

    Impostazione Valore
    Nome servizio hypr-logs-collector
    Regione Seleziona la regione corrispondente al tuo bucket GCS (ad esempio, us-central1)
    Tempo di esecuzione Seleziona Python 3.12 o versioni successive
  5. Nella sezione Trigger (facoltativo):

    1. Fai clic su + Aggiungi trigger.
    2. Seleziona Cloud Pub/Sub.
    3. In Seleziona un argomento Cloud Pub/Sub, scegli l'argomento Pub/Sub (hypr-logs-trigger).
    4. Fai clic su Salva.
  6. Nella sezione Autenticazione:

    1. Seleziona Richiedi autenticazione.
    2. Controlla Identity and Access Management (IAM).
  7. Scorri verso il basso ed espandi Container, networking, sicurezza.

  8. Vai alla scheda Sicurezza:

    • Service account: seleziona il account di servizio (hypr-logs-collector-sa).
  9. Vai alla scheda Container:

    1. Fai clic su Variabili e secret.
    2. Fai clic su + Aggiungi variabile per ogni variabile di ambiente:
    Nome variabile Valore di esempio Descrizione
    GCS_BUCKET hypr-mfa-logs Nome bucket GCS
    GCS_PREFIX hypr-events Prefisso per i file di log
    STATE_KEY hypr-events/state.json Percorso file di stato
    HYPR_API_URL https://your-tenant.hypr.com URL di base dell'API HYPR
    HYPR_API_TOKEN your-api-token Token di autenticazione API HYPR
    HYPR_RP_APP_ID your-rp-app-id ID applicazione RP HYPR
    MAX_RECORDS 1000 Numero massimo di record per esecuzione
    PAGE_SIZE 100 Record per pagina
    LOOKBACK_HOURS 24 Periodo di ricerca iniziale
  10. Nella sezione Variabili e secret, scorri verso il basso fino a Richieste:

    • Timeout richiesta: inserisci 600 secondi (10 minuti).
  11. Vai alla scheda Impostazioni:

    • Nella sezione Risorse:
      • Memoria: seleziona 512 MiB o un valore superiore.
      • CPU: seleziona 1.
  12. Nella sezione Scalabilità della revisione:

    • Numero minimo di istanze: inserisci 0.
    • Numero massimo di istanze: inserisci 100 (o modifica in base al carico previsto).
  13. Fai clic su Crea.

  14. Attendi la creazione del servizio (1-2 minuti).

  15. Dopo aver creato il servizio, si aprirà automaticamente l'editor di codice incorporato.

Aggiungi codice per la funzione

  1. Inserisci main nel campo Entry point (Punto di ingresso).
  2. Nell'editor di codice incorporato, crea due file:

    • Primo file: main.py:
    import functions_framework
    from google.cloud import storage
    import json
    import os
    import urllib3
    from datetime import datetime, timezone, timedelta
    import time
    import base64
    
    # Initialize HTTP client with timeouts
    http = urllib3.PoolManager(
        timeout=urllib3.Timeout(connect=5.0, read=30.0),
        retries=False,
    )
    
    # Initialize Storage client
    storage_client = storage.Client()
    
    # Environment variables
    GCS_BUCKET = os.environ.get('GCS_BUCKET')
    GCS_PREFIX = os.environ.get('GCS_PREFIX', 'hypr-events')
    STATE_KEY = os.environ.get('STATE_KEY', 'hypr-events/state.json')
    HYPR_API_URL = os.environ.get('HYPR_API_URL')
    HYPR_API_TOKEN = os.environ.get('HYPR_API_TOKEN')
    HYPR_RP_APP_ID = os.environ.get('HYPR_RP_APP_ID')
    MAX_RECORDS = int(os.environ.get('MAX_RECORDS', '1000'))
    PAGE_SIZE = int(os.environ.get('PAGE_SIZE', '100'))
    LOOKBACK_HOURS = int(os.environ.get('LOOKBACK_HOURS', '24'))
    
    def to_unix_millis(dt: datetime) -> int:
        """Convert datetime to Unix epoch milliseconds."""
        if dt.tzinfo is None:
            dt = dt.replace(tzinfo=timezone.utc)
        dt = dt.astimezone(timezone.utc)
        return int(dt.timestamp() * 1000)
    
    def parse_datetime(value: str) -> datetime:
        """Parse ISO datetime string to datetime object."""
        if value.endswith("Z"):
            value = value[:-1] + "+00:00"
        return datetime.fromisoformat(value)
    
    @functions_framework.cloud_event
    def main(cloud_event):
        """
        Cloud Run function triggered by Pub/Sub to fetch HYPR MFA logs and write to GCS.
    
        Args:
            cloud_event: CloudEvent object containing Pub/Sub message
        """
    
        if not all([GCS_BUCKET, HYPR_API_URL, HYPR_API_TOKEN, HYPR_RP_APP_ID]):
            print('Error: Missing required environment variables')
            return
    
        try:
            # Get GCS bucket
            bucket = storage_client.bucket(GCS_BUCKET)
    
            # Load state
            state = load_state(bucket, STATE_KEY)
    
            # Determine time window
            now = datetime.now(timezone.utc)
            last_time = None
    
            if isinstance(state, dict) and state.get("last_event_time"):
                try:
                    last_time = parse_datetime(state["last_event_time"])
                    # Overlap by 2 minutes to catch any delayed events
                    last_time = last_time - timedelta(minutes=2)
                except Exception as e:
                    print(f"Warning: Could not parse last_event_time: {e}")
    
            if last_time is None:
                last_time = now - timedelta(hours=LOOKBACK_HOURS)
    
            print(f"Fetching logs from {last_time.isoformat()} to {now.isoformat()}")
    
            # Convert to Unix milliseconds for HYPR API
            start_millis = to_unix_millis(last_time)
            end_millis = to_unix_millis(now)
    
            # Fetch logs
            records, newest_event_time = fetch_logs(
                api_url=HYPR_API_URL,
                api_token=HYPR_API_TOKEN,
                rp_app_id=HYPR_RP_APP_ID,
                start_time_ms=start_millis,
                end_time_ms=end_millis,
                page_size=PAGE_SIZE,
                max_records=MAX_RECORDS,
            )
    
            if not records:
                print("No new log records found.")
                save_state(bucket, STATE_KEY, now.isoformat())
                return
    
            # Write to GCS as NDJSON
            timestamp = now.strftime('%Y%m%d_%H%M%S')
            object_key = f"{GCS_PREFIX}/logs_{timestamp}.ndjson"
            blob = bucket.blob(object_key)
    
            ndjson = '\n'.join([json.dumps(record, ensure_ascii=False) for record in records]) + '\n'
            blob.upload_from_string(ndjson, content_type='application/x-ndjson')
    
            print(f"Wrote {len(records)} records to gs://{GCS_BUCKET}/{object_key}")
    
            # Update state with newest event time
            if newest_event_time:
                save_state(bucket, STATE_KEY, newest_event_time)
            else:
                save_state(bucket, STATE_KEY, now.isoformat())
    
            print(f"Successfully processed {len(records)} records")
    
        except Exception as e:
            print(f'Error processing logs: {str(e)}')
            raise
    
    def load_state(bucket, key):
        """Load state from GCS."""
        try:
            blob = bucket.blob(key)
            if blob.exists():
                state_data = blob.download_as_text()
                return json.loads(state_data)
        except Exception as e:
            print(f"Warning: Could not load state: {e}")
    
        return {}
    
    def save_state(bucket, key, last_event_time_iso: str):
        """Save the last event timestamp to GCS state file."""
        try:
            state = {'last_event_time': last_event_time_iso}
            blob = bucket.blob(key)
            blob.upload_from_string(
                json.dumps(state, indent=2),
                content_type='application/json'
            )
            print(f"Saved state: last_event_time={last_event_time_iso}")
        except Exception as e:
            print(f"Warning: Could not save state: {e}")
    
    def fetch_logs(api_url: str, api_token: str, rp_app_id: str, start_time_ms: int, end_time_ms: int, page_size: int, max_records: int):
        """
        Fetch logs from HYPR API with pagination and rate limiting.
    
        Args:
            api_url: HYPR API base URL
            api_token: HYPR API authentication token
            rp_app_id: HYPR RP application ID
            start_time_ms: Start time in Unix milliseconds
            end_time_ms: End time in Unix milliseconds
            page_size: Number of records per page
            max_records: Maximum total records to fetch
    
        Returns:
            Tuple of (records list, newest_event_time ISO string)
        """
        # Clean up API URL
        base_url = api_url.rstrip('/')
    
        endpoint = f"{base_url}/rp/api/versioned/events"
    
        # Bearer token authentication
        headers = {
            'Authorization': f'Bearer {api_token}',
            'Accept': 'application/json',
            'Content-Type': 'application/json',
            'User-Agent': 'GoogleSecOps-HYPRCollector/1.0'
        }
    
        records = []
        newest_time = None
        page_num = 0
        backoff = 1.0
    
        # Offset-based pagination
        start_index = 0
    
        while True:
            page_num += 1
    
            if len(records) >= max_records:
                print(f"Reached max_records limit ({max_records})")
                break
    
            # Build request parameters
            params = []
            params.append(f"rpAppId={rp_app_id}")
            params.append(f"startDate={start_time_ms}")
            params.append(f"endDate={end_time_ms}")
            params.append(f"start={start_index}")
            params.append(f"limit={min(page_size, max_records - len(records))}")
            url = f"{endpoint}?{'&'.join(params)}"
    
            try:
                response = http.request('GET', url, headers=headers)
    
                # Handle rate limiting with exponential backoff
                if response.status == 429:
                    retry_after = int(response.headers.get('Retry-After', str(int(backoff))))
                    print(f"Rate limited (429). Retrying after {retry_after}s...")
                    time.sleep(retry_after)
                    backoff = min(backoff * 2, 30.0)
                    continue
    
                backoff = 1.0
    
                if response.status != 200:
                    print(f"HTTP Error: {response.status}")
                    response_text = response.data.decode('utf-8')
                    print(f"Response body: {response_text}")
                    return [], None
    
                data = json.loads(response.data.decode('utf-8'))
    
                # Extract results
                page_results = data.get('data', [])
    
                if not page_results:
                    print(f"No more results (empty page)")
                    break
    
                print(f"Page {page_num}: Retrieved {len(page_results)} events")
                records.extend(page_results)
    
                # Track newest event time
                for event in page_results:
                    try:
                        # HYPR uses LOGGEDTIMEINUTC field with Unix milliseconds
                        event_time_ms = event.get('LOGGEDTIMEINUTC')
                        if event_time_ms:
                            event_dt = datetime.fromtimestamp(event_time_ms / 1000, tz=timezone.utc)
                            event_time = event_dt.isoformat()
                            if newest_time is None or parse_datetime(event_time) > parse_datetime(newest_time):
                                newest_time = event_time
                    except Exception as e:
                        print(f"Warning: Could not parse event time: {e}")
    
                # Check for more results
                current_size = data.get('size', 0)
                if current_size < page_size:
                    print(f"Reached last page (size={current_size} < limit={page_size})")
                    break
    
                start_index += current_size
    
            except Exception as e:
                print(f"Error fetching logs: {e}")
                return [], None
    
        print(f"Retrieved {len(records)} total records from {page_num} pages")
        return records, newest_time
    
    • Secondo file: requirements.txt::
    functions-framework==3.*
    google-cloud-storage==2.*
    urllib3>=2.0.0
    
  3. Fai clic su Esegui il deployment per salvare la funzione ed eseguirne il deployment.

  4. Attendi il completamento del deployment (2-3 minuti).

Crea job Cloud Scheduler

Cloud Scheduler pubblicherà messaggi nell'argomento Pub/Sub (hypr-logs-trigger) a intervalli regolari, attivando la funzione Cloud Run.

  1. Nella console di GCP, vai a Cloud Scheduler.
  2. Fai clic su Crea job.
  3. Fornisci i seguenti dettagli di configurazione:

    Impostazione Valore
    Nome hypr-logs-collector-hourly
    Regione Seleziona la stessa regione della funzione Cloud Run
    Frequenza 0 * * * * (ogni ora, all'ora)
    Fuso orario Seleziona il fuso orario (UTC consigliato)
    Tipo di target Pub/Sub
    Argomento Seleziona l'argomento Pub/Sub (hypr-logs-trigger)
    Corpo del messaggio {} (oggetto JSON vuoto)
  4. Fai clic su Crea.

Opzioni di frequenza di pianificazione

Scegli la frequenza in base al volume dei log e ai requisiti di latenza:

Frequenza Espressione cron Caso d'uso
Ogni 5 minuti */5 * * * * Volume elevato, bassa latenza
Ogni 15 minuti */15 * * * * Volume medio
Ogni ora 0 * * * * Standard (consigliato)
Ogni 6 ore 0 */6 * * * Volume basso, elaborazione batch
Ogni giorno 0 0 * * * Raccolta dei dati storici

Testare l'integrazione

  1. Nella console Cloud Scheduler, trova il tuo job (hypr-logs-collector-hourly).
  2. Fai clic su Forza esecuzione per attivare il job manualmente.
  3. Attendi qualche secondo.
  4. Vai a Cloud Run > Servizi.
  5. Fai clic sul nome della funzione (hypr-logs-collector).
  6. Fai clic sulla scheda Log.
  7. Verifica che la funzione sia stata eseguita correttamente. Cerca:

    Fetching logs from YYYY-MM-DDTHH:MM:SS+00:00 to YYYY-MM-DDTHH:MM:SS+00:00
    Page 1: Retrieved X events
    Wrote X records to gs://bucket-name/prefix/logs_YYYYMMDD_HHMMSS.ndjson
    Successfully processed X records
    
  8. Vai a Cloud Storage > Bucket.

  9. Fai clic sul nome del bucket (ad esempio hypr-mfa-logs).

  10. Vai alla cartella del prefisso (ad esempio, hypr-events/).

  11. Verifica che sia stato creato un nuovo file .ndjson con il timestamp corrente.

Se visualizzi errori nei log:

  • HTTP 401: controlla le credenziali API nelle variabili di ambiente
  • HTTP 403: verifica che il token API HYPR disponga delle autorizzazioni richieste e che l'ID app RP sia corretto
  • HTTP 429: limitazione della frequenza: la funzione riproverà automaticamente con backoff
  • Variabili di ambiente mancanti: controlla che tutte le variabili richieste siano impostate

Recuperare il account di servizio Google SecOps

Google SecOps utilizza un account di servizio univoco per leggere i dati dal tuo bucket GCS. Devi concedere a questo account di servizio l'accesso al tuo bucket.

Configura un feed in Google SecOps per importare i log HYPR MFA

  1. Vai a Impostazioni SIEM > Feed.
  2. Fai clic su Aggiungi nuovo feed.
  3. Fai clic su Configura un singolo feed.
  4. Nel campo Nome feed, inserisci un nome per il feed (ad esempio, HYPR MFA Logs from GCS).
  5. Seleziona Google Cloud Storage V2 come Tipo di origine.
  6. Seleziona HYPR MFA come Tipo di log.

  7. Fai clic su Ottieni service account. Verrà visualizzata un'email univoca del account di servizio, ad esempio:

    chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com
    
  8. Copia questo indirizzo email per utilizzarlo nel passaggio successivo.

  9. Fai clic su Avanti.

  10. Specifica i valori per i seguenti parametri di input:

    • URL del bucket di archiviazione: inserisci l'URI del bucket GCS con il percorso del prefisso:

      gs://hypr-mfa-logs/hypr-events/
      
      • Sostituisci:
        • hypr-mfa-logs: il nome del bucket GCS.
        • hypr-events: (Facoltativo) prefisso/percorso della cartella in cui vengono archiviati i log (lascia vuoto per la radice).
    • Opzione di eliminazione dell'origine: seleziona l'opzione di eliminazione in base alle tue preferenze:

      • Mai: non elimina mai i file dopo i trasferimenti (opzione consigliata per i test).
      • Elimina file trasferiti: elimina i file dopo il trasferimento riuscito.
      • Elimina file trasferiti e directory vuote: elimina i file e le directory vuote dopo il trasferimento riuscito.

    • Età massima del file: includi i file modificati nell'ultimo numero di giorni. Il valore predefinito è 180 giorni.

    • Spazio dei nomi dell'asset: lo spazio dei nomi dell'asset.

    • Etichette di importazione: l'etichetta da applicare agli eventi di questo feed.

  11. Fai clic su Avanti.

  12. Controlla la nuova configurazione del feed nella schermata Finalizza e poi fai clic su Invia.

Concedi le autorizzazioni IAM al account di servizio Google SecOps

Il account di servizio Google SecOps deve avere il ruolo Visualizzatore oggetti Storage nel bucket GCS.

  1. Vai a Cloud Storage > Bucket.
  2. Fai clic sul nome del bucket (ad esempio hypr-mfa-logs).
  3. Vai alla scheda Autorizzazioni.
  4. Fai clic su Concedi l'accesso.
  5. Fornisci i seguenti dettagli di configurazione:
    • Aggiungi entità: incolla l'email del account di servizio Google SecOps.
    • Assegna i ruoli: seleziona Visualizzatore oggetti Storage.
  6. Fai clic su Salva.

Tabella di mappatura UDM

Campo log Mappatura UDM Logic
extensions.auth.type Tipo di autenticazione (ad es. SSO, MFA)
metadata.event_type Tipo di evento (ad es. USER_LOGIN, NETWORK_CONNECTION)
EVENTNAME metadata.product_event_type Tipo di evento specifico per il prodotto
ID metadata.product_log_id ID log specifico del prodotto
USERAGENT network.http.parsed_user_agent User agent HTTP analizzato
USERAGENT network.http.user_agent Stringa dello user agent HTTP
SESSIONID network.session_id ID sessione
DEVICEMODEL principal.asset.hardware.model Modello hardware dell'asset
COMPANION,MACHINEDOMAIN principal.asset.hostname Nome host dell'asset
REMOTEIP principal.asset.ip Indirizzo IP dell'asset
DEVICEID principal.asset_id Identificatore univoco della risorsa
COMPANION,MACHINEDOMAIN principal.hostname Il nome host associato all'entità
REMOTEIP principal.ip Indirizzo IP associato al principal
DEVICEOS principal.platform Piattaforma (ad es. WINDOWS, LINUX)
DEVICEOSVERSION principal.platform_version Versione della piattaforma
ISSUCCESSFUL security_result.action Azione intrapresa dal sistema di sicurezza (ad es. CONSENTI, BLOCCA)
MESSAGE security_result.description Descrizione del risultato di sicurezza
MACHINEUSERNAME target.user.user_display_name Nome visualizzato dell'utente
FIDOUSER target.user.userid ID utente
metadata.product_name Nome del prodotto
metadata.vendor_name Nome fornitore/azienda

Hai bisogno di ulteriore assistenza? Ricevi risposte dai membri della community e dai professionisti di Google SecOps.