Collecter les journaux Druva Backup
Ce document explique comment collecter les journaux de sauvegarde Druva en configurant une fonction Google Cloud Run qui récupère les événements de l'API REST Druva et les écrit dans un bucket Google Cloud Storage, puis en configurant un flux Google Security Operations à l'aide de Google Cloud Storage V2.
Druva est une plate-forme cloud native de protection et de gestion des données qui fournit des services de sauvegarde, de reprise après sinistre et d'archivage pour les points de terminaison, les applications SaaS et les charges de travail d'entreprise. La plate-forme génère des journaux d'audit complets, des événements de sauvegarde, des activités de restauration et des alertes de sécurité qui peuvent être intégrés aux solutions SIEM pour la surveillance et la conformité.
Avant de commencer
Assurez-vous de remplir les conditions suivantes :
- Une instance Google SecOps
- Un projet Google Cloud avec facturation activée
- Les API Google Cloud suivantes sont activées :
- API Cloud Run functions
- API Cloud Scheduler
- API Cloud Storage
- API Pub/Sub
- API IAM
- Accès administrateur Druva Cloud à la console Druva Cloud Platform
- Accès au centre d'intégration Druva pour la création d'identifiants API
créer un bucket Google Cloud Storage ;
- Accédez à Google Cloud Console.
- Sélectionnez votre projet ou créez-en un.
- Dans le menu de navigation, accédez à Cloud Storage> Buckets.
- Cliquez sur Créer un bucket.
Fournissez les informations de configuration suivantes :
Paramètre Valeur Nommer votre bucket Saisissez un nom unique (par exemple, druva-backup-logs).Type d'emplacement Choisissez en fonction de vos besoins (région, birégion ou multirégion). Emplacement Sélectionnez l'emplacement le plus proche de votre instance Google SecOps (par exemple, us-central1).Classe de stockage Standard (recommandé pour les journaux auxquels vous accédez fréquemment) Access control (Contrôle des accès) Uniforme (recommandé) Outils de protection Facultatif : Activer la gestion des versions des objets ou la règle de conservation Cliquez sur Créer.
Collecter les identifiants de l'API Druva
Pour permettre à la fonction Cloud Run de récupérer les événements depuis Druva, vous devez créer des identifiants API avec l'authentification OAuth 2.0.
Créer des identifiants d'API
- Connectez-vous à la console Druva Cloud Platform.
- Dans le menu de navigation globale, sélectionnez Centre d'intégration.
- Dans le panneau de gauche, cliquez sur Identifiants d'API.
- Cliquez sur Nouveaux identifiants.
- Dans la fenêtre Nouveaux identifiants, fournissez les informations suivantes :
Nom : saisissez un nom descriptif (par exemple,
Google SecOps Cloud Storage Integration). - Pour appliquer des restrictions d'autorisation :
- Sélectionnez Administrateur Druva Cloud pour autoriser l'accès complet à la récupération et à la modification des données.
- Vous pouvez également sélectionner Administrateur de produit, puis choisir : Rôle d'administrateur Cloud (lecture seule) : pour limiter l'accès à la récupération des données uniquement, sans droit de modification (recommandé pour l'intégration SIEM)
- Cliquez sur Enregistrer.
Enregistrer les identifiants de l'API
Une fois les identifiants de l'API créés, la fenêtre Détails des identifiants s'affiche :
- Cliquez sur l'icône de copie à côté de ID client pour copier la valeur dans le presse-papiers.
- Enregistrez l'ID client de manière sécurisée (par exemple,
McNkxxxx4Vicxxxx4Ldpxxxx/09Uxxxx). - Cliquez sur l'icône de copie à côté de Clé secrète pour copier la valeur dans le presse-papiers.
Enregistrez la clé secrète de manière sécurisée (par exemple,
Xmcxxxx8j5xxxx6NxxxxRbRxxxxNNyPt).
Créer un compte de service
Créez un compte de service dédié pour que la fonction Cloud Run puisse accéder à Google Cloud Storage.
- Dans la console Google Cloud, accédez à IAM et administration > Comptes de service.
- Cliquez sur Créer un compte de service.
- Fournissez les informations de configuration suivantes :
- Nom du compte de service : saisissez
druva-backup-function(ou un nom descriptif). - Description du compte de service : saisissez
Service account for Druva Backup Cloud Run function.
- Nom du compte de service : saisissez
- Cliquez sur Créer et continuer.
- Dans la section Autoriser ce compte de service à accéder au projet, ajoutez les rôles suivants :
- Cliquez sur Sélectionner un rôle, puis sélectionnez Administrateur des objets Storage.
- Cliquez sur Ajouter un autre rôle, puis sélectionnez Demandeur Cloud Run.
- Cliquez sur Continuer.
- Cliquez sur OK.
Notez l'adresse e-mail du compte de service (par exemple,
druva-backup-function@PROJECT_ID.iam.gserviceaccount.com).
Créer un sujet Pub/Sub
Créez un sujet Pub/Sub que Cloud Scheduler utilisera pour déclencher la fonction Cloud Run.
- Dans la console Google Cloud, accédez à Pub/Sub > Sujets.
- Cliquez sur Créer un sujet.
- Fournissez les informations de configuration suivantes :
- ID du sujet : saisissez
druva-backup-trigger.
- ID du sujet : saisissez
- Décochez Ajouter un abonnement par défaut.
Cliquez sur Créer.
Créer la fonction Cloud Run
Préparer le code de la fonction
Créez une fonction Cloud Run qui s'authentifie auprès de l'API Druva à l'aide d'identifiants client OAuth 2.0, récupère les événements via le point de terminaison des événements avec pagination et écrit les résultats au format NDJSON dans le bucket GCS.
Déployer la fonction Cloud Run
- Dans la console Google Cloud, accédez à Cloud Run Functions.
- Cliquez sur Créer une fonction.
Fournissez les informations de configuration suivantes :
- Environnement : sélectionnez 2e génération.
- Nom de la fonction : saisissez
druva-backup-to-gcs. - Région : sélectionnez la région la plus proche de votre bucket GCS (par exemple,
us-central1). - Type de déclencheur : sélectionnez Cloud Pub/Sub.
- Sujet Cloud Pub/Sub : sélectionnez
druva-backup-trigger. - Compte de service : sélectionnez
druva-backup-function@PROJECT_ID.iam.gserviceaccount.com. - Mémoire allouée :
512 MiB - Délai avant expiration :
540secondes - Nombre maximal d'instances :
1
Cliquez sur Suivant.
Sélectionnez Python 3.11 comme environnement d'exécution.
Définissez le champ Point d'entrée sur
main.Dans l'éditeur Code source, remplacez le contenu de
main.pypar ce qui suit :import base64 import json import os import time from datetime import datetime, timezone, timedelta import requests from google.cloud import storage GCS_BUCKET = os.environ["GCS_BUCKET"] GCS_PREFIX = os.environ.get("GCS_PREFIX", "druva_backup") STATE_KEY = os.environ.get("STATE_KEY", "druva_state.json") DRUVA_BASE_URL = os.environ.get("DRUVA_BASE_URL", "apis.druva.com") CLIENT_ID = os.environ["CLIENT_ID"] CLIENT_SECRET = os.environ["CLIENT_SECRET"] MAX_RECORDS = int(os.environ.get("MAX_RECORDS", "10000")) PAGE_SIZE = int(os.environ.get("PAGE_SIZE", "500")) LOOKBACK_HOURS = int(os.environ.get("LOOKBACK_HOURS", "24")) def get_oauth_token(): """Obtain OAuth 2.0 access token using client credentials grant.""" token_url = f"https://{DRUVA_BASE_URL}/token" payload = { "grant_type": "client_credentials", "scope": "read", } resp = requests.post( token_url, data=payload, auth=(CLIENT_ID, CLIENT_SECRET), timeout=30, ) resp.raise_for_status() return resp.json()["access_token"] def load_state(storage_client): """Load the persisted state (last event time and tracker) from GCS.""" bucket = storage_client.bucket(GCS_BUCKET) blob = bucket.blob(f"{GCS_PREFIX}/{STATE_KEY}") if blob.exists(): return json.loads(blob.download_as_text()) return {} def save_state(storage_client, state): """Persist state to GCS.""" bucket = storage_client.bucket(GCS_BUCKET) blob = bucket.blob(f"{GCS_PREFIX}/{STATE_KEY}") blob.upload_from_string( json.dumps(state), content_type="application/json", ) def fetch_events(token, state): """Fetch events from Druva API with pagination via nextPageToken.""" events_url = f"https://{DRUVA_BASE_URL}/insync/eventmanagement/v2/events" headers = { "Authorization": f"Bearer {token}", "Accept": "application/json", } params = {"pageSize": PAGE_SIZE} tracker = state.get("tracker") last_event_time = state.get("last_event_time") if tracker: params["tracker"] = tracker elif last_event_time: params["fromTime"] = last_event_time else: lookback = datetime.now(timezone.utc) - timedelta(hours=LOOKBACK_HOURS) params["fromTime"] = lookback.strftime("%Y-%m-%dT%H:%M:%SZ") all_events = [] total_fetched = 0 while total_fetched < MAX_RECORDS: resp = requests.get( events_url, headers=headers, params=params, timeout=60, ) resp.raise_for_status() data = resp.json() events = data.get("events", []) all_events.extend(events) total_fetched += len(events) new_tracker = data.get("tracker") next_page_token = data.get("nextPageToken") if new_tracker: state["tracker"] = new_tracker if next_page_token: params["nextPageToken"] = next_page_token params.pop("tracker", None) params.pop("fromTime", None) else: break if all_events: last_ts = all_events[-1].get("eventTime", "") if last_ts: state["last_event_time"] = last_ts return all_events, state def write_events_to_gcs(storage_client, events): """Write events as NDJSON to GCS.""" if not events: return now = datetime.now(timezone.utc) filename = now.strftime("%Y%m%d_%H%M%S") + ".ndjson" blob_path = f"{GCS_PREFIX}/{now.strftime('%Y/%m/%d')}/{filename}" ndjson_lines = "\n".join(json.dumps(event) for event in events) bucket = storage_client.bucket(GCS_BUCKET) blob = bucket.blob(blob_path) blob.upload_from_string( ndjson_lines, content_type="application/x-ndjson", ) print(f"Wrote {len(events)} events to gs://{GCS_BUCKET}/{blob_path}") def main(event, context): """Cloud Run function entry point triggered by Pub/Sub.""" storage_client = storage.Client() token = get_oauth_token() state = load_state(storage_client) events, updated_state = fetch_events(token, state) write_events_to_gcs(storage_client, events) save_state(storage_client, updated_state) print(f"Completed: fetched {len(events)} events") return f"OK: {len(events)} events"Remplacez le contenu de
requirements.txtpar le code suivant :requests>=2.31.0 google-cloud-storage>=2.14.0
Configurer les variables d'environnement
- Dans la configuration de la fonction Cloud Run, accédez à la section Paramètres d'exécution, de compilation, de connexion et de sécurité.
Sous Variables d'environnement d'exécution, ajoutez les variables suivantes :
GCS_BUCKET: nom de votre bucket GCS (par exemple,druva-backup-logs)GCS_PREFIX: préfixe du chemin d'accès aux fichiers journaux (par exemple,druva_backup)STATE_KEY: nom du fichier d'état (par exemple,druva_state.json)DRUVA_BASE_URL: URL de base de l'API Druva :apis.druva.compour Druva Cloud (Standard)govcloudapis.druva.compour Druva GovCloud
CLIENT_ID: ID client des identifiants de l'API DruvaCLIENT_SECRET: clé secrète des identifiants de l'API DruvaMAX_RECORDS: nombre maximal d'enregistrements à récupérer par appel (par exemple,10000)PAGE_SIZE: nombre d'événements par page d'API (maximum500)LOOKBACK_HOURS: nombre d'heures à analyser lors de la première exécution (par exemple,24)
Cliquez sur Déployer.
Attendez la fin du déploiement.
Créer une tâche Cloud Scheduler
Créez un job Cloud Scheduler pour déclencher la fonction Cloud Run à intervalles réguliers.
- Dans la console Google Cloud, accédez à Cloud Scheduler.
- Cliquez sur Créer une tâche.
Fournissez les informations de configuration suivantes :
- Nom : saisissez
druva-backup-scheduler. - Région : sélectionnez la même région que votre fonction Cloud Run (par exemple,
us-central1). - Description : saisissez
Triggers Druva Backup log collection every 30 minutes. - Fréquence : saisissez
*/30 * * * *(toutes les 30 minutes). - Fuseau horaire : sélectionnez le fuseau horaire de votre choix (par exemple,
UTC).
- Nom : saisissez
Cliquez sur Continuer.
Configurez la cible :
- Type de cible : sélectionnez Pub/Sub.
- Sujet Cloud Pub/Sub : sélectionnez
druva-backup-trigger. - Corps du message : saisissez
{"trigger": "scheduled"}.
Cliquez sur Créer.
Tester la tâche Cloud Scheduler
- Dans la liste Cloud Scheduler, recherchez
druva-backup-scheduler. - Cliquez sur Exécuter de force pour déclencher la fonction immédiatement.
- Vérifiez l'exécution en consultant les éléments suivants :
- Journaux des fonctions Cloud Run dans Cloud Run Functions> druva-backup-to-gcs> Journaux
- Bucket GCS pour les nouveaux fichiers NDJSON dans Cloud Storage > druva-backup-logs
Récupérer le compte de service Google SecOps et configurer le flux
Obtenir l'adresse e-mail du compte de service
- Accédez à Paramètres SIEM> Flux.
- Cliquez sur Add New Feed (Ajouter un flux).
- Cliquez sur Configurer un flux unique.
- Dans le champ Nom du flux, saisissez un nom pour le flux (par exemple,
Druva Backup Events). - Sélectionnez Google Cloud Storage V2 comme Type de source.
- Sélectionnez Druva Backup comme type de journal.
Cliquez sur Obtenir un compte de service. Une adresse e-mail unique pour le compte de service s'affiche, par exemple :
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.comCopiez cette adresse e-mail pour l'utiliser à l'étape suivante.
Configurer le flux
- Cliquez sur Suivant.
Spécifiez les valeurs des paramètres d'entrée suivants :
URL du bucket de stockage : saisissez l'URI du bucket GCS avec le chemin d'accès au préfixe :
gs://druva-backup-logs/druva_backup/Option de suppression de la source : sélectionnez l'option de suppression de votre choix :
- Jamais : ne supprime jamais aucun fichier après les transferts (recommandé pour les tests)
- Supprimer les fichiers transférés : supprime les fichiers une fois le transfert réussi.
- Supprimer les fichiers transférés et les répertoires vides : supprime les fichiers et les répertoires vides après un transfert réussi.
Âge maximal des fichiers : incluez les fichiers modifiés au cours des derniers jours (180 jours par défaut).
Espace de noms de l'élément : espace de noms de l'élément
Libellés d'ingestion : libellé à appliquer aux événements de ce flux
Cliquez sur Suivant.
Vérifiez la configuration de votre nouveau flux sur l'écran Finaliser, puis cliquez sur Envoyer.
Accorder des autorisations IAM au compte de service Google SecOps
Le compte de service Google SecOps doit disposer du rôle Lecteur d'objets Storage sur votre bucket GCS pour lire les fichiers journaux écrits par la fonction Cloud Run.
- Accédez à Cloud Storage > Buckets.
- Cliquez sur le nom de votre bucket (par exemple,
druva-backup-logs). - Accédez à l'onglet Autorisations.
- Cliquez sur Accorder l'accès.
- Fournissez les informations de configuration suivantes :
- Ajouter des comptes principaux : collez l'adresse e-mail du compte de service Google SecOps (par exemple,
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com). - Attribuer des rôles : sélectionnez Lecteur des objets de l'espace de stockage.
- Ajouter des comptes principaux : collez l'adresse e-mail du compte de service Google SecOps (par exemple,
Cliquez sur Enregistrer.
Table de mappage UDM
| Champ de journal | Mappage UDM | Logique |
|---|---|---|
| inSyncUserID, eventsGroupId, FilesMissed, FilesBackedup, TotalBackupSize, TotalBytesTransferred, facility, inSyncDataSourceID, initiator, event_type | additional.fields | Fusionné avec les libellés créés à partir de chaque champ s'il n'est pas vide |
| initiateur | extensions.auth.type | Définissez sur "AUTHTYPE_UNSPECIFIED" si l'initiateur correspond à l'expression régulière d'adresse e-mail. |
| metadata.event_type | Défini sur "USER_LOGIN" si has_target_user est défini sur "true" et has_principal est défini sur "true", sur "STATUS_UPDATE" si has_principal est défini sur "true" et has_target est défini sur "false", ou sur "GENERIC_EVENT" dans le cas contraire. | |
| eventID | metadata.product_log_id | Converti en chaîne |
| metadata.product_name | Défini sur "DRUVA_BACKUP" | |
| clientVersion | metadata.product_version | Valeur copiée directement |
| inSyncDataSourceName | principal.asset.hostname | Valeur copiée directement |
| ip | principal.asset.ip | Fusionné à partir de l'adresse IP |
| inSyncDataSourceName | principal.hostname | Valeur copiée directement |
| ip | principal.ip | Fusionné à partir de l'adresse IP |
| clientOS | principal.platform | Définissez sur "LINUX" si la valeur correspond à (?i)Linux, sur "WINDOWS" si elle correspond à (?i)windows, et sur "MAC" si elle correspond à (?i)mac. |
| profileName | principal.resource.name | Valeur copiée directement |
| profileID | principal.resource.product_object_id | Converti en chaîne |
| eventState | security_result.action | Définissez sur "ALLOW" si la valeur correspond à (?i)Success, sinon sur "BLOCK". |
| eventState | security_result.action_details | Valeur copiée directement |
| de gravité, | security_result.severity | Définissez la valeur sur "LOW" (FAIBLE) si elle est comprise dans [0,1,2,3,LOW]; sur "MEDIUM" (MOYENNE) si elle est comprise dans [4,5,6,MEDIUM,SUBSTANTIAL,INFO]; sur "HIGH" (ÉLEVÉE) si elle est comprise dans [7,8,HIGH,SEVERE]; sur "CRITICAL" (CRITIQUE) si elle est comprise dans [9,10,VERY-HIGH,CRITICAL]. |
| inSyncUserEmail, initiateur | target.user.email_addresses | Fusionné à partir de inSyncUserEmail ; également à partir de l'initiateur si l'expression régulière d'adresse e-mail correspond |
| inSyncUserName | target.user.userid | Valeur copiée directement |
| metadata.vendor_name | Défini sur "DRUVA_BACKUP" |
Vous avez encore besoin d'aide ? Obtenez des réponses de membres de la communauté et de professionnels Google SecOps.