Raccogliere i log di CloudM
Questo documento spiega come importare i log di CloudM in Google Security Operations utilizzando Google Cloud Storage V2.
CloudM è una piattaforma SaaS per Google Workspace e Microsoft 365 che fornisce l'automazione del workflow per l'onboarding e l'offboarding degli utenti, il backup, l'archiviazione e la migrazione dei dati. CloudM Automate genera un log di controllo completo di tutte le azioni eseguite nel tuo dominio, inclusi eventi di Gestione utenti, passaggi del workflow di offboarding, modifiche alla configurazione e operazioni relative alla sicurezza. I dati del log di controllo dell'ultimo anno vengono conservati.
Prima di iniziare
Assicurati di soddisfare i seguenti prerequisiti:
- Un'istanza Google SecOps
- Un progetto Google Cloud con l'API Storage di Cloud abilitata
- Autorizzazioni per creare e gestire bucket Cloud Storage
- Autorizzazioni per gestire i criteri IAM (Identity and Access Management) nei bucket Cloud Storage
- Autorizzazioni per creare servizi Cloud Run, argomenti Pub/Sub e job Cloud Scheduler
- Accesso amministratore all'istanza CloudM Automate con l'autorizzazione Modifica impostazioni globali
- L'URL dell'istanza CloudM Automate (ad esempio,
yourcompany.cloudm.io) - ID dominio CloudM
Raccogliere le credenziali di CloudM Automate
Creare un ruolo personalizzato per l'accesso ai log API
- Accedi alla tua istanza di CloudM Automate.
- Vai a Impostazioni > Ruoli.
- Fai clic su Aggiungi ruolo per creare un nuovo ruolo.
- Nel campo Nome ruolo, inserisci un nome descrittivo (ad esempio
Google SecOps Log Reader). Nell'elenco delle autorizzazioni, attiva la seguente autorizzazione:
- Visualizza log: concede la possibilità di visualizzare tutti i log delle applicazioni.
Salva il ruolo.
Crea un account di servizio e assegna il ruolo
- In CloudM Automate, vai a Impostazioni > Ruoli.
- Crea o identifica un account di servizio da utilizzare per l'accesso API.
- Assegna il ruolo Lettore log Google SecOps al account di servizio.
- Assicurati che il ruolo sia assegnato con ambito globale in modo che il account di servizio possa accedere ai log nell'intero dominio.
Recupera il token di accesso al account di servizio
- Genera un token di accesso per il account di servizio.
- Il token di accesso viene utilizzato come token di connessione nell'intestazione
Authorizationquando si effettuano richieste API all'API CloudM Logs. Registra i seguenti valori:
- URL istanza Automate: l'URL dell'istanza di CloudM Automate (ad esempio,
yourcompany.cloudm.io) - ID dominio: l'identificatore del tuo dominio CloudM
- Token di accesso all'account di servizio: il token Bearer per l'autenticazione API
- URL istanza Automate: l'URL dell'istanza di CloudM Automate (ad esempio,
Verifica le autorizzazioni
Per verificare che l'account disponga delle autorizzazioni richieste:
- Accedi a CloudM Automate.
- Vai a Impostazioni > Ruoli.
- Verifica che al account di servizio sia assegnata l'autorizzazione Visualizza log con ambito globale.
- Se non riesci a visualizzare questa opzione, contatta l'amministratore per concedere le autorizzazioni Modifica impostazioni globali e Visualizza log.
Testare l'accesso API
Verifica le tue credenziali prima di procedere con l'integrazione:
# Replace with your actual credentials CLOUDM_INSTANCE="yourcompany.cloudm.io" DOMAIN_ID="your-domain-id" ACCESS_TOKEN="your-access-token" # Test API access curl -v -H "Authorization: Bearer ${ACCESS_TOKEN}" \ "https://${CLOUDM_INSTANCE}/_ah/api/events/v1/${DOMAIN_ID}?from=$(date -u +%Y-%m-%d)&to=$(date -u +%Y-%m-%d)"Una risposta positiva restituisce un array JSON di eventi del log di controllo.
Autorizzazioni API richieste
Il account di servizio richiede la seguente autorizzazione:
Autorizzazione Livello di accesso Finalità Visualizza i log Globale Recupera tutti gli eventi di audit log da CloudM Automate
Crea Google Cloud bucket di archiviazione
- Vai alla console Google Cloud.
- Seleziona il tuo progetto o creane uno nuovo.
- Nel menu di navigazione, vai a Cloud Storage > Bucket.
- Fai clic su Crea bucket.
Fornisci i seguenti dettagli di configurazione:
Impostazione Valore Assegna un nome al bucket Inserisci un nome univoco globale (ad esempio cloudm-audit-logs).Tipo di località Scegli in base alle tue esigenze (regione singola, a due regioni, multiregionale) Località Seleziona la posizione (ad esempio, us-central1).Classe di archiviazione Standard (consigliato per i log a cui si accede di frequente) Controllo dell'accesso Uniforme (consigliato) Strumenti di protezione (Facoltativo) Attiva il controllo delle versioni degli oggetti o la policy di conservazione Fai clic su Crea.
Crea un account di servizio per la funzione Cloud Run
La funzione Cloud Run richiede un account di servizio con autorizzazioni di scrittura nel bucket Cloud Storage e di invocazione da parte di Pub/Sub.
Crea service account
- Nella Google Cloud console, vai a IAM e amministrazione > Service account.
- Fai clic su Crea account di servizio.
- Fornisci i seguenti dettagli di configurazione:
- Nome del service account: inserisci
cloudm-audit-collector-sa - Descrizione service account: inserisci
Service account for Cloud Run function to collect CloudM audit logs
- Nome del service account: inserisci
- Fai clic su Crea e continua.
Nella sezione Concedi a questo account di servizio l'accesso al progetto, aggiungi i seguenti ruoli:
- Fai clic su Seleziona un ruolo.
- Cerca e seleziona Amministratore oggetti di archiviazione.
- Fai clic su + Aggiungi un altro ruolo.
- Cerca e seleziona Cloud Run Invoker.
- Fai clic su + Aggiungi un altro ruolo.
- Cerca e seleziona Invoker di Cloud Functions.
Fai clic su Continua.
Fai clic su Fine.
Questi ruoli sono necessari per:
- Storage Object Admin: scrive i log nel bucket Cloud Storage e gestisce i file di stato
- Cloud Run Invoker: consente a Pub/Sub di richiamare la funzione
- Cloud Functions Invoker: consente la chiamata di funzioni
Concedere le autorizzazioni IAM sul bucket Cloud Storage
Concedi all'account di servizio le autorizzazioni di scrittura sul bucket Cloud Storage:
- Vai a Cloud Storage > Bucket.
- Fai clic sul nome del bucket (
cloudm-audit-logs). - Vai alla scheda Autorizzazioni.
- Fai clic su Concedi l'accesso.
Fornisci i seguenti dettagli di configurazione:
- Aggiungi entità: inserisci l'email del account di servizio (
cloudm-audit-collector-sa@PROJECT_ID.iam.gserviceaccount.com). - Assegna i ruoli: seleziona Storage Object Admin.
- Aggiungi entità: inserisci l'email del account di servizio (
Fai clic su Salva.
Crea argomento Pub/Sub
Crea un argomento Pub/Sub a cui Cloud Scheduler pubblicherà e a cui la funzione Cloud Run si iscriverà.
- Nella Google Cloud console, vai a Pub/Sub > Argomenti.
- Fai clic su Crea argomento.
Fornisci i seguenti dettagli di configurazione:
- ID argomento: inserisci
cloudm-audit-trigger - Lascia invariate le altre impostazioni predefinite
- ID argomento: inserisci
Fai clic su Crea.
Crea una funzione Cloud Run per raccogliere i log
La funzione Cloud Run verrà attivata dai messaggi Pub/Sub di Cloud Scheduler per recuperare i log dall'API CloudM Automate Logs e scriverli in Cloud Storage.
- Nella Google Cloud console, vai a Cloud Run.
- Fai clic su Crea servizio.
- Seleziona Funzione (usa un editor in linea per creare una funzione).
Nella sezione Configura, fornisci i seguenti dettagli di configurazione:
Impostazione Valore Nome servizio cloudm-audit-collectorRegione Seleziona la regione corrispondente al bucket Cloud Storage (ad esempio, us-central1)Tempo di esecuzione Seleziona Python 3.12 o versioni successive Nella sezione Trigger (facoltativo):
- Fai clic su + Aggiungi trigger.
- Seleziona Cloud Pub/Sub.
- In Seleziona un argomento Cloud Pub/Sub, scegli
cloudm-audit-trigger. - Fai clic su Salva.
Nella sezione Autenticazione:
- Seleziona Richiedi autenticazione.
- Controlla Identity and Access Management (IAM).
Scorri verso il basso ed espandi Container, networking, sicurezza.
Vai alla scheda Sicurezza:
- Service account (Account di servizio): seleziona
cloudm-audit-collector-sa
- Service account (Account di servizio): seleziona
Vai alla scheda Container:
- Fai clic su Variabili e secret.
Fai clic su + Aggiungi variabile per ogni variabile di ambiente:
Nome variabile Valore di esempio Descrizione GCS_BUCKETcloudm-audit-logsNome del bucket Cloud Storage GCS_PREFIXcloudm-auditPrefisso per i file di log STATE_KEYcloudm-audit/state.jsonPercorso file di stato CLOUDM_INSTANCE_URLyourcompany.cloudm.ioURL dell'istanza CloudM Automate CLOUDM_DOMAIN_IDyour-domain-idIdentificatore del dominio CloudM CLOUDM_ACCESS_TOKENyour-access-tokenToken di connessione del account di servizio CloudM LOOKBACK_HOURS24Periodo di riferimento iniziale
Nella sezione Variabili e secret, scorri verso il basso fino a Richieste:
- Timeout richiesta: inserisci
600secondi (10 minuti)
- Timeout richiesta: inserisci
Vai alla scheda Impostazioni:
Nella sezione Risorse:
- Memoria: seleziona 512 MiB o superiore
- CPU: seleziona 1
Nella sezione Scalabilità della revisione:
- Numero minimo di istanze: inserisci
0 - Numero massimo di istanze: inserisci
100
- Numero minimo di istanze: inserisci
Fai clic su Crea.
Attendi la creazione del servizio (1-2 minuti).
Dopo aver creato il servizio, si aprirà automaticamente l'editor di codice incorporato.
Aggiungi codice per la funzione
- Inserisci main nel campo Entry point (Punto di ingresso).
Nell'editor di codice incorporato, crea due file:
main.py:
import functions_framework from google.cloud import storage import json import os import urllib3 from datetime import datetime, timezone, timedelta http = urllib3.PoolManager( timeout=urllib3.Timeout(connect=10.0, read=60.0), retries=False, ) storage_client = storage.Client() GCS_BUCKET = os.environ.get('GCS_BUCKET') GCS_PREFIX = os.environ.get('GCS_PREFIX', 'cloudm-audit') STATE_KEY = os.environ.get('STATE_KEY', 'cloudm-audit/state.json') CLOUDM_INSTANCE_URL = os.environ.get('CLOUDM_INSTANCE_URL', '').rstrip('/') CLOUDM_DOMAIN_ID = os.environ.get('CLOUDM_DOMAIN_ID') CLOUDM_ACCESS_TOKEN = os.environ.get('CLOUDM_ACCESS_TOKEN') LOOKBACK_HOURS = int(os.environ.get('LOOKBACK_HOURS', '24')) @functions_framework.cloud_event def main(cloud_event): if not all([GCS_BUCKET, CLOUDM_INSTANCE_URL, CLOUDM_DOMAIN_ID, CLOUDM_ACCESS_TOKEN]): print('Error: Missing required environment variables') return try: bucket = storage_client.bucket(GCS_BUCKET) state = load_state(bucket) now = datetime.now(timezone.utc) if isinstance(state, dict) and state.get('last_event_date'): try: last_date = state['last_event_date'] last_time = datetime.strptime(last_date, '%Y-%m-%d').replace(tzinfo=timezone.utc) except Exception as e: print(f"Warning: Could not parse last_event_date: {e}") last_time = now - timedelta(hours=LOOKBACK_HOURS) else: last_time = now - timedelta(hours=LOOKBACK_HOURS) from_date = last_time.strftime('%Y-%m-%d') to_date = now.strftime('%Y-%m-%d') print(f"Fetching logs from {from_date} to {to_date}") records = fetch_logs(from_date, to_date) if not records: print("No new log records found.") save_state(bucket, to_date) return timestamp = now.strftime('%Y%m%d_%H%M%S') object_key = f"{GCS_PREFIX}/cloudm_audit_{timestamp}.ndjson" blob = bucket.blob(object_key) ndjson = '\n'.join( [json.dumps(record, ensure_ascii=False, default=str) for record in records] ) + '\n' blob.upload_from_string(ndjson, content_type='application/x-ndjson') print(f"Wrote {len(records)} records to gs://{GCS_BUCKET}/{object_key}") save_state(bucket, to_date) print(f"Successfully processed {len(records)} records") except Exception as e: print(f'Error processing logs: {str(e)}') raise def fetch_logs(from_date, to_date): instance = CLOUDM_INSTANCE_URL if not instance.startswith('https://'): instance = f"https://{instance}" endpoint = f"{instance}/_ah/api/events/v1/{CLOUDM_DOMAIN_ID}" headers = { 'Authorization': f'Bearer {CLOUDM_ACCESS_TOKEN}', 'Accept': 'application/json', 'User-Agent': 'GoogleSecOps-CloudMCollector/1.0' } url = f"{endpoint}?from={from_date}&to={to_date}" try: response = http.request('GET', url, headers=headers) if response.status == 429: retry_after = int(response.headers.get('Retry-After', '60')) print(f"Rate limited (429). Retry after {retry_after}s.") return [] if response.status != 200: print(f"HTTP Error: {response.status}") response_text = response.data.decode('utf-8') print(f"Response body: {response_text}") return [] data = json.loads(response.data.decode('utf-8')) if isinstance(data, list): records = data elif isinstance(data, dict): records = data.get('items', data.get('events', [data])) else: records = [] print(f"Retrieved {len(records)} events") return records except Exception as e: print(f"Error fetching logs: {e}") return [] def load_state(bucket): try: blob = bucket.blob(STATE_KEY) if blob.exists(): return json.loads(blob.download_as_text()) except Exception as e: print(f"Warning: Could not load state: {e}") return {} def save_state(bucket, last_event_date): try: state = { 'last_event_date': last_event_date, 'last_run': datetime.now(timezone.utc).isoformat() } blob = bucket.blob(STATE_KEY) blob.upload_from_string( json.dumps(state, indent=2), content_type='application/json' ) print(f"Saved state: last_event_date={last_event_date}") except Exception as e: print(f"Warning: Could not save state: {e}")requirements.txt:
functions-framework==3.* google-cloud-storage==2.* urllib3>=2.0.0
Fai clic su Esegui il deployment per salvare la funzione ed eseguirne il deployment.
Attendi il completamento del deployment (2-3 minuti).
Crea job Cloud Scheduler
Cloud Scheduler pubblicherà messaggi nell'argomento Pub/Sub a intervalli regolari, attivando la funzione Cloud Run.
- Nella Google Cloud console, vai a Cloud Scheduler.
- Fai clic su Crea job.
Fornisci i seguenti dettagli di configurazione:
Impostazione Valore Nome cloudm-audit-collector-hourlyRegione Seleziona la stessa regione della funzione Cloud Run Frequenza 0 * * * *(ogni ora, all'ora)Fuso orario Seleziona il fuso orario (UTC consigliato) Tipo di target Pub/Sub Argomento Seleziona cloudm-audit-triggerCorpo del messaggio {}(oggetto JSON vuoto)Fai clic su Crea.
Testare l'integrazione
- Nella console Cloud Scheduler, trova il tuo job (
cloudm-audit-collector-hourly). - Fai clic su Forza esecuzione per attivare il job manualmente.
- Attendi qualche secondo.
- Vai a Cloud Run > Servizi.
- Fai clic su
cloudm-audit-collector. - Fai clic sulla scheda Log.
Verifica che la funzione sia stata eseguita correttamente. Cerca:
Fetching logs from YYYY-MM-DD to YYYY-MM-DD Retrieved X events Wrote X records to gs://cloudm-audit-logs/cloudm-audit/cloudm_audit_YYYYMMDD_HHMMSS.ndjson Successfully processed X recordsVai a Cloud Storage > Bucket.
Fai clic su
cloudm-audit-logs.Vai alla cartella
cloudm-audit/.Verifica che sia stato creato un nuovo file
.ndjsoncon il timestamp corrente.
Se visualizzi errori nei log:
- HTTP 401: verifica che la variabile di ambiente
CLOUDM_ACCESS_TOKENsia corretta. - HTTP 403: verifica che l'account di servizio disponga dell'autorizzazione Visualizza log con ambito globale.
- HTTP 429: limitazione di frequenza: la funzione si interromperà e riprenderà alla successiva esecuzione pianificata.
- Variabili di ambiente mancanti: verifica che tutte le variabili richieste siano impostate nella configurazione della funzione Cloud Run
Recuperare il account di servizio Google SecOps
Google SecOps utilizza un account di servizio univoco per leggere i dati dal tuo bucket Cloud Storage. Devi concedere a questo account di servizio l'accesso al tuo bucket.
Recupera l'email del account di servizio
- Vai a Impostazioni SIEM > Feed.
- Fai clic su Aggiungi nuovo feed.
- Fai clic su Configura un singolo feed.
- Nel campo Nome feed, inserisci un nome per il feed (ad esempio,
CloudM Audit Logs). - Seleziona Google Cloud Storage V2 come Tipo di origine.
- Seleziona CloudM come tipo di log.
Fai clic su Ottieni service account.
Verrà visualizzata un'email univoca del account di servizio, ad esempio:
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.comCopia questo indirizzo email per utilizzarlo nel passaggio successivo.
Fai clic su Avanti.
Specifica i valori per i seguenti parametri di input:
URL del bucket di archiviazione: inserisci l'URI del bucket Cloud Storage con il percorso del prefisso:
gs://cloudm-audit-logs/cloudm-audit/Opzione di eliminazione della fonte: seleziona l'opzione di eliminazione in base alle tue preferenze:
- Mai: non elimina mai i file dopo i trasferimenti (opzione consigliata per i test).
- Elimina file trasferiti: elimina i file dopo il trasferimento riuscito.
Elimina file trasferiti e directory vuote: elimina i file e le directory vuote dopo il trasferimento riuscito.
Età massima file: includi i file modificati nell'ultimo numero di giorni (il valore predefinito è 180 giorni).
Spazio dei nomi dell'asset: lo spazio dei nomi dell'asset.
Etichette di importazione: l'etichetta da applicare agli eventi di questo feed.
Fai clic su Avanti.
Controlla la nuova configurazione del feed nella schermata Finalizza e poi fai clic su Invia.
Concedi le autorizzazioni IAM al account di servizio Google SecOps
Il account di servizio Google SecOps deve disporre del ruolo Visualizzatore oggetti Storage sul bucket Cloud Storage.
- Vai a Cloud Storage > Bucket.
- Fai clic su
cloudm-audit-logs. - Vai alla scheda Autorizzazioni.
- Fai clic su Concedi l'accesso.
Fornisci i seguenti dettagli di configurazione:
- Aggiungi entità: incolla l'email del account di servizio Google SecOps
- Assegna i ruoli: seleziona Visualizzatore oggetti Storage.
Fai clic su Salva.
Parametri dell'API CloudM Logs
L'API CloudM Logs supporta i seguenti parametri di ricerca per filtrare gli eventi di log:
| Parametro | Formato | Descrizione |
|---|---|---|
| byUser | Indirizzo email | Filtra gli eventi in base all'utente che ha eseguito l'azione (analogamente a Utente nell'interfaccia utente di CloudM) |
| da | yyyy-MM-dd |
Data di inizio del filtro dell'intervallo di date |
| a | yyyy-MM-dd |
Data di fine del filtro dell'intervallo di date |
| contextType | Stringa | Filtra per tipo di contesto (ad esempio profilo, gruppo, OU) |
| contextName | Stringa | Filtra in base al target di un'azione (ad esempio, un utente specifico che viene ritirato) |
| operazione | Stringa | Filtra per tipo di operazione (ad esempio, assegna alias, sospendi utente) |
| country | Codice paese | Filtrare in base al codice paese di geolocalizzazione |
Tabella di mappatura UDM
| Campo log | Mappatura UDM | Logic |
|---|---|---|
| about | about | Valore copiato direttamente |
| Context_Name | about.labels | Unite come coppie chiave-valore da about_Context_Name, about_Context_Type, labels0 |
| Context_Type | about.labels | |
| Login_Type | about.labels | |
| Emittente | additional.fields | Unito da additional_field0, additional_field1, additional_field2 |
| SAML_code | additional.fields | |
| SAML_ACS_Url | additional.fields | |
| Operazione | extensions.auth.type | Impostato su SSO se Operation corrisponde a SSORequest, AUTHTYPE_UNSPECIFIED se Context_Type è LoginUser |
| Context_Type | extensions.auth.type | |
| Timestamp | metadata.event_timestamp | Data e ora e fuso orario estratti dal timestamp, fuso orario convertito in offset, concatenati e analizzati come timestamp |
| Operazione | metadata.event_type | Impostato su USER_UNCATEGORIZED se Operation corrisponde a Update/Delete/SuspendUser/UnsuspendUser/Create, USER_LOGIN se Operation corrisponde a SSORequest/SSORequestFail o Context_Type è LoginUser, STATUS_UPDATE se IP non è vuoto, altrimenti GENERIC_EVENT |
| Context_Type | metadata.event_type | |
| User_Agent | network.http.user_agent | Valore copiato direttamente |
| entità | entità | Ridenominato dall'entità se Context_Type != LoginUser, altrimenti dalla destinazione |
| target | entità | |
| Organization_Unit | principal.administrative_domain | Valore copiato direttamente |
| IP | principal.ip | Valore copiato direttamente |
| Città | principal.location.city | Valore copiato direttamente |
| Paese | principal.location.country_or_region | Valore copiato direttamente |
| Geolocalizzazione | principal.location.region_latitude | Latitudine estratta dalla geolocalizzazione utilizzando grok |
| Geolocalizzazione | principal.location.region_longitude | Longitudine estratta dalla geolocalizzazione utilizzando grok |
| Regione | principal.location.state | Valore copiato direttamente |
| Attore | principal.user.attribute.roles | Impostato su role.name se Actor non è un indirizzo email e non è vuoto, quindi unito |
| Attore | principal.user.email_addresses | Valore copiato direttamente se l'attore corrisponde all'espressione regolare dell'email |
| Messaggio | principal.user.userid | Nome utente estratto dal messaggio utilizzando grok |
| security_result | security_result | Unito l'oggetto security_result |
| SAML_code | security_result.action | Imposta su ALLOW se SAML_code corrisponde a Success, BLOCK se RequestDenied |
| Messaggio | security_result.description | Valore copiato direttamente |
| Gravità | security_result.severity | Imposta su maiuscolo se Errore/Critico, su INFORMATIONAL se Info, su MEDIUM se Avviso, altrimenti su UNKNOWN_SEVERITY |
| Operazione | security_result.summary | Valore copiato direttamente |
| target | target | Ridenominato da target se Context_Type != LoginUser, altrimenti da principal |
| entità | target | |
| metadata.product_name | metadata.product_name | Impostato su "CLOUDM" |
| metadata.vendor_name | metadata.vendor_name | Impostato su "CLOUDM" |
Hai bisogno di ulteriore assistenza? Ricevi risposte dai membri della community e dai professionisti di Google SecOps.