Raccogliere i log di Druva Backup

Supportato in:

Questo documento spiega come raccogliere i log di Druva Backup configurando una funzione Google Cloud Run che recupera gli eventi dall'API REST Druva e li scrive in un bucket Google Cloud Storage, quindi configurando un feed Google Security Operations utilizzando Google Cloud Storage V2.

Druva è una piattaforma di gestione e protezione dei dati cloud-native che fornisce servizi di backup, ripristino di emergenza e archiviazione per endpoint, applicazioni SaaS e workload aziendali. La piattaforma genera audit trail completi, eventi di backup, attività di ripristino e avvisi di sicurezza che possono essere integrati con le soluzioni SIEM per il monitoraggio e la conformità.

Prima di iniziare

Assicurati di soddisfare i seguenti prerequisiti:

  • Un'istanza Google SecOps
  • Un progetto Google Cloud con la fatturazione abilitata
  • Sono state abilitate le seguenti API Google Cloud:
    • API Cloud Run Functions
    • API Cloud Scheduler
    • API Cloud Storage
    • API Pub/Sub
    • API IAM
  • Accesso amministratore di Druva Cloud alla console Druva Cloud Platform
  • Accesso al Centro integrazione Druva per la creazione delle credenziali API

Creare un bucket Google Cloud Storage

  1. Vai alla console Google Cloud.
  2. Seleziona il tuo progetto o creane uno nuovo.
  3. Nel menu di navigazione, vai a Cloud Storage > Bucket.
  4. Fai clic su Crea bucket.
  5. Fornisci i seguenti dettagli di configurazione:

    Impostazione Valore
    Assegna un nome al bucket Inserisci un nome univoco globale (ad esempio druva-backup-logs).
    Tipo di località Scegli in base alle tue esigenze (regione singola, doppia regione, più regioni)
    Località Seleziona la località più vicina alla tua istanza Google SecOps (ad esempio us-central1).
    Classe di archiviazione Standard (consigliato per i log a cui si accede di frequente)
    Controllo dell'accesso Uniforme (consigliato)
    Strumenti di protezione (Facoltativo) Attiva il controllo delle versioni degli oggetti o la policy di conservazione
  6. Fai clic su Crea.

Raccogli le credenziali API di Druva

Per consentire alla funzione Cloud Run di recuperare gli eventi da Druva, devi creare credenziali API con autenticazione OAuth 2.0.

Crea credenziali API

  1. Accedi alla console di Druva Cloud Platform.
  2. Nel menu Navigazione globale, seleziona Centro integrazioni.
  3. Nel riquadro a sinistra, fai clic su Credenziali API.
  4. Fai clic su Nuove credenziali.
  5. Nella finestra Nuove credenziali, fornisci i seguenti dettagli: Nome: inserisci un nome descrittivo (ad esempio, Google SecOps Cloud Storage Integration).
  6. Per applicare le limitazioni di autorizzazione:
    1. Seleziona Amministratore di Druva Cloud per consentire l'accesso completo al recupero e alla modifica dei dati.
    2. In alternativa, seleziona Amministratore prodotto e scegli: Ruolo Amministratore cloud (sola lettura): per limitare l'accesso al recupero dei dati senza diritti di modifica (consigliato per l'integrazione SIEM)
  7. Fai clic su Salva.

Registra le credenziali API

Dopo aver creato le credenziali API, viene visualizzata la finestra Dettagli credenziali:

  1. Fai clic sull'icona di copia accanto a ID client per copiare il valore negli appunti.
  2. Salva l'ID client in modo sicuro (ad esempio, McNkxxxx4Vicxxxx4Ldpxxxx/09Uxxxx).
  3. Fai clic sull'icona di copia accanto a Chiave segreta per copiare il valore negli appunti.
  4. Salva la chiave segreta in modo sicuro (ad esempio, Xmcxxxx8j5xxxx6NxxxxRbRxxxxNNyPt).

Crea un account di servizio

Crea un account di servizio dedicato per la funzione Cloud Run per accedere a Google Cloud Storage.

  1. Nella console Google Cloud, vai a IAM e amministrazione > Service account.
  2. Fai clic su Crea account di servizio.
  3. Fornisci i seguenti dettagli di configurazione:
    • Nome service account: inserisci druva-backup-function (o un nome descrittivo)
    • Descrizione service account: inserisci Service account for Druva Backup Cloud Run function
  4. Fai clic su Crea e continua.
  5. Nella sezione Concedi a questo account di servizio l'accesso al progetto, aggiungi i seguenti ruoli:
    1. Fai clic su Seleziona un ruolo e seleziona Amministratore oggetti di archiviazione.
    2. Fai clic su Aggiungi un altro ruolo e seleziona Cloud Run Invoker.
  6. Fai clic su Continua.
  7. Fai clic su Fine.
  8. Registra l'email del account di servizio (ad esempio, druva-backup-function@PROJECT_ID.iam.gserviceaccount.com).

Crea un argomento Pub/Sub

Crea un argomento Pub/Sub che Cloud Scheduler utilizzerà per attivare la funzione Cloud Run.

  1. Nella console Google Cloud, vai a Pub/Sub > Argomenti.
  2. Fai clic su Crea argomento.
  3. Fornisci i seguenti dettagli di configurazione:
    • ID argomento: inserisci druva-backup-trigger
  4. Deseleziona Aggiungi una sottoscrizione predefinita.
  5. Fai clic su Crea.

Crea la funzione Cloud Run

Prepara il codice della funzione

Crea una funzione Cloud Run che esegue l'autenticazione con l'API Druva utilizzando le credenziali client OAuth 2.0, recupera gli eventi tramite l'endpoint eventi con paginazione e scrive i risultati come NDJSON nel bucket GCS.

Esegui il deployment della funzione Cloud Run

  1. Nella console Google Cloud, vai a Cloud Run Functions.
  2. Fai clic su Crea funzione.
  3. Fornisci i seguenti dettagli di configurazione:

    • Ambiente: seleziona 2ª gen.
    • Nome funzione: inserisci druva-backup-to-gcs
    • Regione: seleziona la regione più vicina al tuo bucket GCS (ad esempio, us-central1).
    • Tipo di trigger: seleziona Cloud Pub/Sub
    • Argomento Cloud Pub/Sub: seleziona druva-backup-trigger
    • Service account (Account di servizio): seleziona druva-backup-function@PROJECT_ID.iam.gserviceaccount.com
    • Memoria allocata: 512 MiB
    • Timeout: 540 secondi
    • Numero massimo di istanze: 1
  4. Fai clic su Avanti.

  5. Seleziona Python 3.11 come Runtime.

  6. Imposta il punto di ingresso su main.

  7. Nell'editor Codice sorgente, sostituisci i contenuti di main.py con quanto segue:

    import base64
    import json
    import os
    import time
    from datetime import datetime, timezone, timedelta
    
    import requests
    from google.cloud import storage
    
    GCS_BUCKET = os.environ["GCS_BUCKET"]
    GCS_PREFIX = os.environ.get("GCS_PREFIX", "druva_backup")
    STATE_KEY = os.environ.get("STATE_KEY", "druva_state.json")
    DRUVA_BASE_URL = os.environ.get("DRUVA_BASE_URL", "apis.druva.com")
    CLIENT_ID = os.environ["CLIENT_ID"]
    CLIENT_SECRET = os.environ["CLIENT_SECRET"]
    MAX_RECORDS = int(os.environ.get("MAX_RECORDS", "10000"))
    PAGE_SIZE = int(os.environ.get("PAGE_SIZE", "500"))
    LOOKBACK_HOURS = int(os.environ.get("LOOKBACK_HOURS", "24"))
    
    def get_oauth_token():
        """Obtain OAuth 2.0 access token using client credentials grant."""
        token_url = f"https://{DRUVA_BASE_URL}/token"
        payload = {
            "grant_type": "client_credentials",
            "scope": "read",
        }
        resp = requests.post(
            token_url,
            data=payload,
            auth=(CLIENT_ID, CLIENT_SECRET),
            timeout=30,
        )
        resp.raise_for_status()
        return resp.json()["access_token"]
    
    def load_state(storage_client):
        """Load the persisted state (last event time and tracker) from GCS."""
        bucket = storage_client.bucket(GCS_BUCKET)
        blob = bucket.blob(f"{GCS_PREFIX}/{STATE_KEY}")
        if blob.exists():
            return json.loads(blob.download_as_text())
        return {}
    
    def save_state(storage_client, state):
        """Persist state to GCS."""
        bucket = storage_client.bucket(GCS_BUCKET)
        blob = bucket.blob(f"{GCS_PREFIX}/{STATE_KEY}")
        blob.upload_from_string(
            json.dumps(state),
            content_type="application/json",
        )
    
    def fetch_events(token, state):
        """Fetch events from Druva API with pagination via nextPageToken."""
        events_url = f"https://{DRUVA_BASE_URL}/insync/eventmanagement/v2/events"
        headers = {
            "Authorization": f"Bearer {token}",
            "Accept": "application/json",
        }
    
        params = {"pageSize": PAGE_SIZE}
    
        tracker = state.get("tracker")
        last_event_time = state.get("last_event_time")
    
        if tracker:
            params["tracker"] = tracker
        elif last_event_time:
            params["fromTime"] = last_event_time
        else:
            lookback = datetime.now(timezone.utc) - timedelta(hours=LOOKBACK_HOURS)
            params["fromTime"] = lookback.strftime("%Y-%m-%dT%H:%M:%SZ")
    
        all_events = []
        total_fetched = 0
    
        while total_fetched < MAX_RECORDS:
            resp = requests.get(
                events_url,
                headers=headers,
                params=params,
                timeout=60,
            )
            resp.raise_for_status()
            data = resp.json()
    
            events = data.get("events", [])
            all_events.extend(events)
            total_fetched += len(events)
    
            new_tracker = data.get("tracker")
            next_page_token = data.get("nextPageToken")
    
            if new_tracker:
                state["tracker"] = new_tracker
    
            if next_page_token:
                params["nextPageToken"] = next_page_token
                params.pop("tracker", None)
                params.pop("fromTime", None)
            else:
                break
    
        if all_events:
            last_ts = all_events[-1].get("eventTime", "")
            if last_ts:
                state["last_event_time"] = last_ts
    
        return all_events, state
    
    def write_events_to_gcs(storage_client, events):
        """Write events as NDJSON to GCS."""
        if not events:
            return
    
        now = datetime.now(timezone.utc)
        filename = now.strftime("%Y%m%d_%H%M%S") + ".ndjson"
        blob_path = f"{GCS_PREFIX}/{now.strftime('%Y/%m/%d')}/{filename}"
    
        ndjson_lines = "\n".join(json.dumps(event) for event in events)
    
        bucket = storage_client.bucket(GCS_BUCKET)
        blob = bucket.blob(blob_path)
        blob.upload_from_string(
            ndjson_lines,
            content_type="application/x-ndjson",
        )
        print(f"Wrote {len(events)} events to gs://{GCS_BUCKET}/{blob_path}")
    
    def main(event, context):
        """Cloud Run function entry point triggered by Pub/Sub."""
        storage_client = storage.Client()
    
        token = get_oauth_token()
    
        state = load_state(storage_client)
    
        events, updated_state = fetch_events(token, state)
    
        write_events_to_gcs(storage_client, events)
    
        save_state(storage_client, updated_state)
    
        print(f"Completed: fetched {len(events)} events")
        return f"OK: {len(events)} events"
    
  8. Sostituisci i contenuti di requirements.txt come segue:

    requests>=2.31.0
    google-cloud-storage>=2.14.0
    

Configura le variabili di ambiente

  1. Nella configurazione della funzione Cloud Run, vai alla sezione Impostazioni di runtime, build, connessioni e sicurezza.
  2. In Variabili di ambiente runtime, aggiungi le seguenti variabili:

    • GCS_BUCKET: il nome del bucket GCS (ad esempio, druva-backup-logs)
    • GCS_PREFIX: il percorso del prefisso per i file di log (ad esempio, druva_backup)
    • STATE_KEY: Il nome del file di stato (ad esempio, druva_state.json)
    • DRUVA_BASE_URL: URL di base dell'API Druva:
      • apis.druva.com per Druva Cloud (Standard)
      • govcloudapis.druva.com per Druva GovCloud
    • CLIENT_ID: l'ID client delle credenziali API Druva
    • CLIENT_SECRET: la chiave segreta delle credenziali API Druva
    • MAX_RECORDS: Numero massimo di record da recuperare per chiamata (ad esempio, 10000)
    • PAGE_SIZE: Numero di eventi per pagina API (massimo 500)
    • LOOKBACK_HOURS: Numero di ore da considerare per la prima esecuzione (ad esempio, 24)
  3. Fai clic su Esegui il deployment.

  4. Attendi il completamento del deployment.

Crea un job Cloud Scheduler

Crea un job Cloud Scheduler per attivare la funzione Cloud Run a intervalli regolari.

  1. Nella console Google Cloud, vai a Cloud Scheduler.
  2. Fai clic su Crea job.
  3. Fornisci i seguenti dettagli di configurazione:

    • Nome: inserisci druva-backup-scheduler
    • Regione: seleziona la stessa regione della tua funzione Cloud Run (ad esempio, us-central1).
    • Descrizione: inserisci Triggers Druva Backup log collection every 30 minutes
    • Frequenza: inserisci */30 * * * * (ogni 30 minuti)
    • Fuso orario: seleziona il fuso orario che preferisci (ad esempio, UTC).
  4. Fai clic su Continua.

  5. Configura il target:

    • Tipo target: seleziona Pub/Sub.
    • Argomento Cloud Pub/Sub: seleziona druva-backup-trigger
    • Corpo del messaggio: inserisci {"trigger": "scheduled"}
  6. Fai clic su Crea.

Testa il job Cloud Scheduler

  1. Nell'elenco Cloud Scheduler, individua druva-backup-scheduler.
  2. Fai clic su Forza esecuzione per attivare immediatamente la funzione.
  3. Verifica l'esecuzione controllando:
    • I log della funzione Cloud Run si trovano in Cloud Run Functions > druva-backup-to-gcs > Log
    • Il bucket GCS per i nuovi file NDJSON in Cloud Storage > druva-backup-logs

Recupera l'account di servizio Google SecOps e configura il feed

Recupera l'email del account di servizio

  1. Vai a Impostazioni SIEM > Feed.
  2. Fai clic su Aggiungi nuovo feed.
  3. Fai clic su Configura un singolo feed.
  4. Nel campo Nome feed, inserisci un nome per il feed (ad esempio, Druva Backup Events).
  5. Seleziona Google Cloud Storage V2 come Tipo di origine.
  6. Seleziona Druva Backup come Tipo di log.
  7. Fai clic su Ottieni service account. Verrà visualizzata un'email univoca del account di servizio, ad esempio:

    chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com
    
  8. Copia questo indirizzo email per utilizzarlo nel passaggio successivo.

Configurare il feed

  1. Fai clic su Avanti.
  2. Specifica i valori per i seguenti parametri di input:

    • URL del bucket di archiviazione: inserisci l'URI del bucket GCS con il percorso del prefisso:

      gs://druva-backup-logs/druva_backup/
      
    • Opzione di eliminazione dell'origine: seleziona l'opzione di eliminazione in base alle tue preferenze:

      • Mai: non elimina mai i file dopo i trasferimenti (opzione consigliata per i test)
      • Elimina file trasferiti: elimina i file dopo il trasferimento riuscito
      • Elimina file trasferiti e directory vuote: elimina i file e le directory vuote dopo il trasferimento riuscito
    • Età massima del file: includi i file modificati nell'ultimo numero di giorni (il valore predefinito è 180 giorni)

    • Spazio dei nomi dell'asset: lo spazio dei nomi dell'asset

    • Etichette di importazione: l'etichetta da applicare agli eventi di questo feed

  3. Fai clic su Avanti.

  4. Controlla la nuova configurazione del feed nella schermata Finalizza e poi fai clic su Invia.

Concedi le autorizzazioni IAM al account di servizio Google SecOps

Il account di servizio Google SecOps richiede il ruolo Visualizzatore oggetti Storage sul tuo bucket GCS per leggere i file di log scritti dalla funzione Cloud Run.

  1. Vai a Cloud Storage > Bucket.
  2. Fai clic sul nome del bucket (ad esempio druva-backup-logs).
  3. Vai alla scheda Autorizzazioni.
  4. Fai clic su Concedi l'accesso.
  5. Fornisci i seguenti dettagli di configurazione:
    • Aggiungi entità: incolla l'email dell'account di servizio Google SecOps (ad esempio, chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com)
    • Assegna i ruoli: seleziona Visualizzatore oggetti Storage
  6. Fai clic su Salva.

Tabella di mappatura UDM

Campo log Mappatura UDM Logic
inSyncUserID, eventsGroupId, FilesMissed, FilesBackedup, TotalBackupSize, TotalBytesTransferred, facility, inSyncDataSourceID, initiator, event_type additional.fields Unite alle etichette create da ogni campo, se non vuoto
iniziatore extensions.auth.type Impostato su "AUTHTYPE_UNSPECIFIED" se l'iniziatore corrisponde all'espressione regolare dell'email
metadata.event_type Impostato su "USER_LOGIN" se has_target_user è true e has_principal è true; "STATUS_UPDATE" se has_principal è true e has_target è false; altrimenti "GENERIC_EVENT"
eventID metadata.product_log_id Convertito in stringa
metadata.product_name Imposta su "DRUVA_BACKUP"
clientVersion metadata.product_version Valore copiato direttamente
inSyncDataSourceName principal.asset.hostname Valore copiato direttamente
ip principal.asset.ip Unito da IP
inSyncDataSourceName principal.hostname Valore copiato direttamente
ip principal.ip Unito da IP
clientOS principal.platform Impostato su "LINUX" se corrisponde a (?i)Linux; "WINDOWS" se corrisponde a (?i)windows; "MAC" se corrisponde a (?i)mac
profileName principal.resource.name Valore copiato direttamente
profileID principal.resource.product_object_id Convertito in stringa
eventState security_result.action Imposta su "ALLOW" se corrisponde a (?i)Success, altrimenti "BLOCK"
eventState security_result.action_details Valore copiato direttamente
gravità security_result.severity Impostato su "LOW" se in [0,1,2,3,LOW]; "MEDIUM" se in [4,5,6,MEDIUM,SUBSTANTIAL,INFO]; "HIGH" se in [7,8,HIGH,SEVERE]; "CRITICAL" se in [9,10,VERY-HIGH,CRITICAL]
inSyncUserEmail, iniziatore target.user.email_addresses Unito da inSyncUserEmail; anche dall'iniziatore se corrisponde all'espressione regolare dell'email
inSyncUserName target.user.userid Valore copiato direttamente
metadata.vendor_name Imposta su "DRUVA_BACKUP"

Hai bisogno di ulteriore assistenza? Ricevi risposte dai membri della community e dai professionisti di Google SecOps.