Raccogliere i log di Atlassian Confluence
Questo documento spiega come importare i log di Atlassian Confluence in Google Security Operations. Il parser tenta innanzitutto di estrarre i campi dal messaggio di log non elaborato utilizzando espressioni regolari (pattern grok) progettate per i log di Atlassian Confluence. Se l'analisi grok non va a buon fine o il log è in formato JSON, il codice tenta di analizzare il messaggio come JSON. Infine, i campi estratti vengono mappati allo schema UDM di Google SecOps e arricchiti con un contesto aggiuntivo.
Prima di iniziare
Assicurati di soddisfare i seguenti prerequisiti:
- Un'istanza Google SecOps
- Un account Atlassian Confluence Cloud con accesso ai log di controllo OPPURE Confluence Data Center/Server con accesso amministrativo
- Per il metodo basato su Google Cloud: accesso con privilegi a Google Cloud (GCS, IAM, Cloud Run, Pub/Sub, Cloud Scheduler)
- Per il metodo Bindplane: Windows Server 2016 o versioni successive oppure host Linux con
systemd
Panoramica delle opzioni di integrazione
Questa guida fornisce due percorsi di integrazione:
- Opzione 1: Confluence Data Center/Server tramite Bindplane + Syslog
- Opzione 2: log di controllo di Confluence Cloud tramite la funzione GCP Cloud Run + GCS (formato JSON)
Scegli l'opzione più adatta al tipo di deployment e all'infrastruttura di Confluence.
Opzione 1: Confluence Data Center/Server tramite Bindplane + Syslog
Questa opzione configura Confluence Data Center o Server per inviare i log tramite syslog a un agente Bindplane, che poi li inoltra a Google SecOps.
Recuperare il file di autenticazione dell'importazione di Google SecOps
- Accedi alla console Google SecOps.
- Vai a Impostazioni SIEM > Agenti di raccolta.
- Fai clic su Scarica per scaricare il file di autenticazione dell'importazione.
Salva il file in modo sicuro sul sistema in cui verrà installato l'agente Bindplane.
Recuperare l'ID cliente Google SecOps
- Accedi alla console Google SecOps.
- Vai a Impostazioni SIEM > Profilo.
Copia e salva l'ID cliente dalla sezione Dettagli dell'organizzazione.
Installa l'agente Bindplane
Installa l'agente Bindplane sul sistema operativo Windows o Linux seguendo le istruzioni riportate di seguito.
Installazione di finestre
- Apri Prompt dei comandi o PowerShell come amministratore.
Esegui questo comando:
msiexec /i "https://github.com/observIQ/bindplane-otel-collector/releases/latest/download/observiq-otel-collector.msi" /quietAttendi il completamento dell'installazione.
Verifica l'installazione eseguendo il comando:
sc query observiq-otel-collector
Il servizio dovrebbe essere visualizzato come IN ESECUZIONE.
Installazione di Linux
- Apri un terminale con privilegi di root o sudo.
Esegui questo comando:
sudo sh -c "$(curl -fsSlL https://github.com/observIQ/bindplane-otel-collector/releases/latest/download/install_unix.sh)" install_unix.shAttendi il completamento dell'installazione.
Verifica l'installazione eseguendo il comando:
sudo systemctl status observiq-otel-collector
Il servizio dovrebbe essere visualizzato come attivo (in esecuzione).
Risorse aggiuntive per l'installazione
Per ulteriori opzioni di installazione e risoluzione dei problemi, consulta la Guida all'installazione dell'agente Bindplane.
Configura l'agente Bindplane per importare syslog e inviarli a Google SecOps
Individua il file di configurazione
Linux:
sudo nano /etc/bindplane-agent/config.yamlWindows:
notepad "C:\Program Files\observIQ OpenTelemetry Collector\config.yaml"
Modifica il file di configurazione
Sostituisci l'intero contenuto di
config.yamlcon la seguente configurazione:receivers: udplog: listen_address: "0.0.0.0:514" exporters: chronicle/confluence_logs: compression: gzip creds_file_path: '/etc/bindplane-agent/ingestion-auth.json' customer_id: 'YOUR_CUSTOMER_ID' endpoint: malachiteingestion-pa.googleapis.com log_type: ATLASSIAN_CONFLUENCE raw_log_field: body ingestion_labels: service: confluence service: pipelines: logs/confluence: receivers: - udplog exporters: - chronicle/confluence_logs
Parametri di configurazione
Sostituisci i seguenti segnaposto:
listen_address: sostituisci la porta e l'indirizzo IP in base alle esigenze della tua infrastruttura. Utilizza0.0.0.0:514per ascoltare su tutte le interfacce sulla porta 514.creds_file_path: aggiorna il percorso in cui è stato salvato il file di autenticazione:- Linux:
/etc/bindplane-agent/ingestion-auth.json - Windows:
C:\Program Files\observIQ OpenTelemetry Collector\ingestion-auth.json
- Linux:
customer_id: sostituisciYOUR_CUSTOMER_IDcon l'ID cliente effettivo del passaggio precedente.endpoint: URL endpoint regionale:- Stati Uniti:
malachiteingestion-pa.googleapis.com - Europa:
europe-malachiteingestion-pa.googleapis.com - Asia:
asia-southeast1-malachiteingestion-pa.googleapis.com
- Stati Uniti:
Salvare il file di configurazione
Dopo la modifica, salva il file:
- Linux: premi
Ctrl+O, poiEntere infineCtrl+X. - Windows: fai clic su File > Salva.
Riavvia l'agente Bindplane per applicare le modifiche
Riavvia l'agente Bindplane in Linux
Per riavviare l'agente Bindplane in Linux, esegui questo comando:
sudo systemctl restart observiq-otel-collectorVerifica che il servizio sia in esecuzione:
sudo systemctl status observiq-otel-collectorControlla i log per individuare eventuali errori:
sudo journalctl -u observiq-otel-collector -f
Riavvia l'agente Bindplane in Windows
Per riavviare l'agente Bindplane in Windows, scegli una delle seguenti opzioni:
Utilizzando il prompt dei comandi o PowerShell come amministratore:
net stop observiq-otel-collector && net start observiq-otel-collectorUtilizzo della console Services:
- Premi
Win+R, digitaservices.msce premi Invio. - Individua observIQ OpenTelemetry Collector.
- Fai clic con il tasto destro del mouse e seleziona Riavvia.
Verifica che il servizio sia in esecuzione:
sc query observiq-otel-collectorControlla i log per individuare eventuali errori:
type "C:\Program Files\observIQ OpenTelemetry Collector\log\collector.log"
- Premi
Configurare l'inoltro di Syslog su Confluence Data Center/Server
Opzione A: configura rsyslog per inoltrare i file di log locali (consigliata)
- Configura Confluence per scrivere i log nei file (comportamento predefinito).
Installa rsyslog se non è presente:
sudo apt-get install rsyslog # Debian/Ubuntu sudo yum install rsyslog # RHEL/CentOSCrea il file di configurazione rsyslog
/etc/rsyslog.d/confluence.conf:# Forward Confluence logs to Bindplane $ModLoad imfile # Application logs $InputFileName /opt/atlassian/confluence/logs/atlassian-confluence.log $InputFileTag confluence-app: $InputFileStateFile stat-confluence-app $InputFileSeverity info $InputFileFacility local0 $InputRunFileMonitor # Audit logs (JSON format in DC/Server) $InputFileName <confluence-home-directory>/log/audit/audit.log $InputFileTag confluence-audit: $InputFileStateFile stat-confluence-audit $InputFileSeverity info $InputFileFacility local1 $InputRunFileMonitor # Forward to Bindplane agent *.* @@BINDPLANE_AGENT_IP:514- Sostituisci
BINDPLANE_AGENT_IPcon l'indirizzo IP dell'agente Bindplane (ad esempio,192.168.1.100). - Modifica i percorsi dei file di log in base all'installazione di Confluence:
- I log delle applicazioni in genere:
<confluence-install>/logs/o<local-home>/logs/ - Log di controllo:
<confluence-home-directory>/log/audit/(formato JSON) - Per trovare la directory principale di Confluence, vai a Impostazioni > Configurazione generale > Informazioni di sistema e cerca Confluence Home o Local Home.
- I log delle applicazioni in genere:
- Sostituisci
Riavvia rsyslog:
sudo systemctl restart rsyslog
Opzione B: configura l'inoltro di Log4j2 Syslog
Questa opzione richiede la modifica della configurazione di Log4j2. L'opzione A (rsyslog) è consigliata per semplicità.
- Accedi al tuo server Confluence tramite SSH o RDP.
Individua il file di configurazione Log4j2 in:
<confluence-install>/confluence/WEB-INF/classes/log4j2.xmlModifica il file di configurazione per aggiungere un appender Syslog:
<Configuration> <Appenders> <!-- Existing appenders --> <Syslog name="SyslogAppender" host="BINDPLANE_AGENT_IP" port="514" protocol="UDP" format="RFC5424" facility="LOCAL0"> <PatternLayout pattern="%d{ISO8601} %p [%t] [%c{1}] %m%n"/> </Syslog> </Appenders> <Loggers> <Root level="info"> <AppenderRef ref="SyslogAppender"/> <!-- Other appender refs --> </Root> <!-- Audit logger --> <Logger name="com.atlassian.confluence.event.events.security.AuditEvent" level="info" additivity="false"> <AppenderRef ref="SyslogAppender"/> </Logger> </Loggers> </Configuration>- Sostituisci
BINDPLANE_AGENT_IPcon l'indirizzo IP dell'agente Bindplane (ad esempio,192.168.1.100).
- Sostituisci
Riavvia Confluence per applicare le modifiche:
sudo systemctl restart confluence
Opzione 2: Confluence Cloud Audit Logs tramite la funzione GCP Cloud Run e GCS
Questo metodo utilizza la funzione GCP Cloud Run per recuperare periodicamente gli audit log tramite l'API REST di Confluence Audit e archiviarli in GCS per l'importazione di Google SecOps.
Raccogli le credenziali API Confluence Cloud
- Accedi al tuo account Atlassian.
- Vai alla pagina https://id.atlassian.com/manage-profile/security/api-tokens.
- Fai clic su Genera token API.
- Inserisci un'etichetta per il token (ad esempio,
Google Security Operations Integration). - Fai clic su Crea.
- Copia e salva il token API in modo sicuro.
- Prendi nota dell'URL del tuo sito Confluence Cloud (ad esempio,
https://yoursite.atlassian.net). Prendi nota dell'indirizzo email del tuo account Atlassian (utilizzato per l'autenticazione).
Verifica le autorizzazioni
Per verificare che l'account disponga delle autorizzazioni richieste:
- Accedi a Confluence Cloud.
- Fai clic sull'icona Impostazioni (⚙️) nell'angolo in alto a destra.
- Se vedi Monitoraggio > Log di controllo nel riquadro di navigazione a sinistra, disponi delle autorizzazioni richieste.
- Se non riesci a visualizzare questa opzione, contatta l'amministratore per concedere l'autorizzazione Amministratore di Confluence.
Testare l'accesso API
Verifica le tue credenziali prima di procedere con l'integrazione:
# Replace with your actual credentials CONFLUENCE_EMAIL="your-email@example.com" CONFLUENCE_API_TOKEN="your-api-token" CONFLUENCE_URL="https://yoursite.atlassian.net" # Test API access curl -u "${CONFLUENCE_EMAIL}:${CONFLUENCE_API_TOKEN}" \ -H "Accept: application/json" \ "${CONFLUENCE_URL}/wiki/rest/api/audit"
Creazione di un bucket Google Cloud Storage
- Vai alla console Google Cloud.
- Seleziona il tuo progetto o creane uno nuovo.
- Nel menu di navigazione, vai a Cloud Storage > Bucket.
- Fai clic su Crea bucket.
Fornisci i seguenti dettagli di configurazione:
Impostazione Valore Assegna un nome al bucket Inserisci un nome univoco globale (ad esempio confluence-audit-logs).Tipo di località Scegli in base alle tue esigenze (regione singola, doppia regione, più regioni) Località Seleziona la posizione (ad esempio, us-central1).Classe di archiviazione Standard (consigliato per i log a cui si accede di frequente) Controllo dell'accesso Uniforme (consigliato) Strumenti di protezione (Facoltativo) Attiva il controllo delle versioni degli oggetti o la policy di conservazione Fai clic su Crea.
Crea un service account per la funzione Cloud Run
La funzione Cloud Run richiede un service account con autorizzazioni di scrittura nel bucket GCS e di invocazione da parte di Pub/Sub.
Crea service account
- Nella console Google Cloud, vai a IAM e amministrazione > Service Accounts.
- Fai clic su Crea service account.
- Fornisci i seguenti dettagli di configurazione:
- Nome del service account: inserisci
confluence-audit-collector-sa. - Descrizione service account: inserisci
Service account for Cloud Run function to collect Confluence Cloud audit logs.
- Nome del service account: inserisci
- Fai clic su Crea e continua.
- Nella sezione Concedi a questo service account l'accesso al progetto, aggiungi i seguenti ruoli:
- Fai clic su Seleziona un ruolo.
- Cerca e seleziona Amministratore oggetti di archiviazione.
- Fai clic su + Aggiungi un altro ruolo.
- Cerca e seleziona Cloud Run Invoker.
- Fai clic su + Aggiungi un altro ruolo.
- Cerca e seleziona Invoker di Cloud Functions.
- Fai clic su Continua.
- Fai clic su Fine.
Questi ruoli sono necessari per:
- Amministratore oggetti Storage: scrive i log nel bucket GCS e gestisce i file di stato
- Cloud Run Invoker: consente a Pub/Sub di richiamare la funzione
- Cloud Functions Invoker: consente la chiamata di funzioni
Concedi autorizzazioni IAM sul bucket GCS
Concedi al service account le autorizzazioni di scrittura sul bucket GCS:
- Vai a Cloud Storage > Bucket.
- Fai clic sul nome del bucket.
- Vai alla scheda Autorizzazioni.
- Fai clic su Concedi l'accesso.
- Fornisci i seguenti dettagli di configurazione:
- Aggiungi entità: inserisci l'email del service account (ad es.
confluence-audit-collector-sa@PROJECT_ID.iam.gserviceaccount.com). - Assegna i ruoli: seleziona Storage Object Admin.
- Aggiungi entità: inserisci l'email del service account (ad es.
- Fai clic su Salva.
Crea argomento Pub/Sub
Crea un argomento Pub/Sub a cui Cloud Scheduler pubblicherà e a cui la funzione Cloud Run si iscriverà.
- Nella console GCP, vai a Pub/Sub > Argomenti.
- Fai clic su Crea argomento.
- Fornisci i seguenti dettagli di configurazione:
- ID argomento: inserisci
confluence-audit-trigger. - Lascia le altre impostazioni sui valori predefiniti.
- ID argomento: inserisci
- Fai clic su Crea.
Crea una funzione Cloud Run per raccogliere i log
La funzione Cloud Run viene attivata dai messaggi Pub/Sub di Cloud Scheduler per recuperare i log dall'API Confluence Cloud Audit e li scrive in GCS.
- Nella console GCP, vai a Cloud Run.
- Fai clic su Crea servizio.
- Seleziona Funzione (usa un editor in linea per creare una funzione).
Nella sezione Configura, fornisci i seguenti dettagli di configurazione:
Impostazione Valore Nome servizio confluence-audit-collectorRegione Seleziona la regione corrispondente al tuo bucket GCS (ad esempio us-central1)Runtime Seleziona Python 3.12 o versioni successive Nella sezione Trigger (facoltativo):
- Fai clic su + Aggiungi trigger.
- Seleziona Cloud Pub/Sub.
- In Seleziona un argomento Cloud Pub/Sub, scegli
confluence-audit-trigger. - Fai clic su Salva.
Nella sezione Autenticazione:
- Seleziona Richiedi autenticazione.
- Controlla Identity and Access Management (IAM).
Scorri verso il basso ed espandi Container, networking, sicurezza.
Vai alla scheda Sicurezza:
- Service account (Service account): seleziona
confluence-audit-collector-sa.
- Service account (Service account): seleziona
Vai alla scheda Container:
- Fai clic su Variabili e secret.
- Fai clic su + Aggiungi variabile per ogni variabile di ambiente:
Nome variabile Valore di esempio Descrizione GCS_BUCKETconfluence-audit-logsNome bucket GCS GCS_PREFIXconfluence-auditPrefisso per i file di log STATE_KEYconfluence-audit/state.jsonPercorso file di stato CONFLUENCE_URLhttps://yoursite.atlassian.netURL del sito Confluence CONFLUENCE_EMAILyour-email@example.comEmail dell'account Atlassian CONFLUENCE_API_TOKENyour-api-token-hereToken API MAX_RECORDS1000Numero massimo di record per esecuzione Nella sezione Variabili e secret, scorri verso il basso fino a Richieste:
- Timeout richiesta: inserisci
600secondi (10 minuti).
- Timeout richiesta: inserisci
Vai alla scheda Impostazioni:
- Nella sezione Risorse:
- Memoria: seleziona 512 MiB o un valore superiore.
- CPU: seleziona 1.
- Nella sezione Risorse:
Nella sezione Scalabilità della revisione:
- Numero minimo di istanze: inserisci
0. - Numero massimo di istanze: inserisci
100(o modifica in base al carico previsto).
- Numero minimo di istanze: inserisci
Fai clic su Crea.
Attendi la creazione del servizio (1-2 minuti).
Dopo aver creato il servizio, si apre automaticamente l'editor di codice incorporato.
Aggiungi codice per la funzione
- Inserisci main in Entry point della funzione
Nell'editor di codice incorporato, crea due file:
- Primo file: main.py:
import functions_framework from google.cloud import storage import json import os import urllib3 from datetime import datetime, timezone, timedelta import time import base64 # Initialize HTTP client with timeouts http = urllib3.PoolManager( timeout=urllib3.Timeout(connect=5.0, read=30.0), retries=False, ) # Initialize Storage client storage_client = storage.Client() # Environment variables GCS_BUCKET = os.environ.get('GCS_BUCKET') GCS_PREFIX = os.environ.get('GCS_PREFIX', 'confluence-audit/') STATE_KEY = os.environ.get('STATE_KEY', 'confluence-audit/state.json') CONFLUENCE_URL = os.environ.get('CONFLUENCE_URL') CONFLUENCE_EMAIL = os.environ.get('CONFLUENCE_EMAIL') CONFLUENCE_API_TOKEN = os.environ.get('CONFLUENCE_API_TOKEN') MAX_RECORDS = int(os.environ.get('MAX_RECORDS', '1000')) def to_unix_millis(dt: datetime) -> int: """Convert datetime to Unix epoch milliseconds.""" if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) dt = dt.astimezone(timezone.utc) return int(dt.timestamp() * 1000) def parse_datetime(value: str) -> datetime: """Parse ISO datetime string to datetime object.""" if value.endswith("Z"): value = value[:-1] + "+00:00" return datetime.fromisoformat(value) @functions_framework.cloud_event def main(cloud_event): """ Cloud Run function triggered by Pub/Sub to fetch Confluence Cloud audit logs and write to GCS. Args: cloud_event: CloudEvent object containing Pub/Sub message """ if not all([GCS_BUCKET, CONFLUENCE_URL, CONFLUENCE_EMAIL, CONFLUENCE_API_TOKEN]): print('Error: Missing required environment variables') return try: # Get GCS bucket bucket = storage_client.bucket(GCS_BUCKET) # Load state state = load_state(bucket, STATE_KEY) # Determine time window now = datetime.now(timezone.utc) last_time = None if isinstance(state, dict) and state.get("last_event_time"): try: last_time = parse_datetime(state["last_event_time"]) # Overlap by 2 minutes to catch any delayed events last_time = last_time - timedelta(minutes=2) except Exception as e: print(f"Warning: Could not parse last_event_time: {e}") if last_time is None: last_time = now - timedelta(hours=24) print(f"Fetching logs from {last_time.isoformat()} to {now.isoformat()}") # Convert to Unix milliseconds start_millis = to_unix_millis(last_time) end_millis = to_unix_millis(now) # Fetch logs records, newest_event_time = fetch_logs( api_base=CONFLUENCE_URL, email=CONFLUENCE_EMAIL, api_token=CONFLUENCE_API_TOKEN, start_time_ms=start_millis, end_time_ms=end_millis, max_records=MAX_RECORDS, ) if not records: print("No new log records found.") save_state(bucket, STATE_KEY, now.isoformat()) return # Write to GCS as NDJSON timestamp = now.strftime('%Y%m%d_%H%M%S') object_key = f"{GCS_PREFIX}/logs_{timestamp}.ndjson" blob = bucket.blob(object_key) ndjson = '\n'.join([json.dumps(record, ensure_ascii=False) for record in records]) + '\n' blob.upload_from_string(ndjson, content_type='application/x-ndjson') print(f"Wrote {len(records)} records to gs://{GCS_BUCKET}/{object_key}") # Update state with newest event time if newest_event_time: save_state(bucket, STATE_KEY, newest_event_time) else: save_state(bucket, STATE_KEY, now.isoformat()) print(f"Successfully processed {len(records)} records") except Exception as e: print(f'Error processing logs: {str(e)}') raise def load_state(bucket, key): """Load state from GCS.""" try: blob = bucket.blob(key) if blob.exists(): state_data = blob.download_as_text() return json.loads(state_data) except Exception as e: print(f"Warning: Could not load state: {e}") return {} def save_state(bucket, key, last_event_time_iso: str): """Save the last event timestamp to GCS state file.""" try: state = {'last_event_time': last_event_time_iso} blob = bucket.blob(key) blob.upload_from_string( json.dumps(state, indent=2), content_type='application/json' ) print(f"Saved state: last_event_time={last_event_time_iso}") except Exception as e: print(f"Warning: Could not save state: {e}") def fetch_logs(api_base: str, email: str, api_token: str, start_time_ms: int, end_time_ms: int, max_records: int): """ Fetch logs from Confluence Cloud Audit API with pagination and rate limiting. Args: api_base: Confluence site URL email: Atlassian account email api_token: API token start_time_ms: Start time in Unix milliseconds end_time_ms: End time in Unix milliseconds max_records: Maximum total records to fetch Returns: Tuple of (records list, newest_event_time ISO string) """ # Clean up URL base_url = api_base.rstrip('/') # Build authentication header auth_string = f"{email}:{api_token}" auth_bytes = auth_string.encode('utf-8') auth_b64 = base64.b64encode(auth_bytes).decode('utf-8') headers = { 'Authorization': f'Basic {auth_b64}', 'Accept': 'application/json', 'User-Agent': 'GoogleSecOps-ConfluenceCollector/1.0' } records = [] newest_time = None page_num = 0 backoff = 1.0 start_index = 0 while True: page_num += 1 if len(records) >= max_records: print(f"Reached max_records limit ({max_records})") break # Build request URL url = f"{base_url}/wiki/rest/api/audit?startDate={start_time_ms}&endDate={end_time_ms}&start={start_index}&limit=100" try: response = http.request('GET', url, headers=headers) # Handle rate limiting with exponential backoff if response.status == 429: retry_after = int(response.headers.get('Retry-After', str(int(backoff)))) print(f"Rate limited (429). Retrying after {retry_after}s...") time.sleep(retry_after) backoff = min(backoff * 2, 30.0) continue backoff = 1.0 if response.status != 200: print(f"HTTP Error: {response.status}") response_text = response.data.decode('utf-8') print(f"Response body: {response_text}") return [], None data = json.loads(response.data.decode('utf-8')) page_results = data.get('results', []) if not page_results: print(f"No more results (empty page)") break print(f"Page {page_num}: Retrieved {len(page_results)} events") records.extend(page_results) # Track newest event time for event in page_results: try: # creationDate is in Unix milliseconds event_time_ms = event.get('creationDate') if event_time_ms: event_dt = datetime.fromtimestamp(event_time_ms / 1000, tz=timezone.utc) event_time = event_dt.isoformat() if newest_time is None or parse_datetime(event_time) > parse_datetime(newest_time): newest_time = event_time except Exception as e: print(f"Warning: Could not parse event time: {e}") # Check for more results current_size = data.get('size', 0) if current_size < 100: print(f"Reached last page (size={current_size} < limit=100)") break start_index += current_size except Exception as e: print(f"Error fetching logs: {e}") return [], None print(f"Retrieved {len(records)} total records from {page_num} pages") return records[:max_records], newest_time- Secondo file: requirements.txt::
functions-framework==3.* google-cloud-storage==2.* urllib3>=2.0.0Fai clic su Esegui il deployment per salvare la funzione ed eseguirne il deployment.
Attendi il completamento del deployment (2-3 minuti).
Crea job Cloud Scheduler
Cloud Scheduler pubblica messaggi nell'argomento Pub/Sub a intervalli regolari, attivando la funzione Cloud Run.
- Nella console di GCP, vai a Cloud Scheduler.
- Fai clic su Crea job.
Fornisci i seguenti dettagli di configurazione:
Impostazione Valore Nome confluence-audit-collector-hourlyRegione Seleziona la stessa regione della funzione Cloud Run Frequenza 0 * * * *(ogni ora, all'ora)Fuso orario Seleziona il fuso orario (UTC consigliato) Tipo di target Pub/Sub Argomento Seleziona confluence-audit-triggerCorpo del messaggio {}(oggetto JSON vuoto)Fai clic su Crea.
Opzioni di frequenza di pianificazione
Scegli la frequenza in base al volume dei log e ai requisiti di latenza:
Frequenza Espressione cron Caso d'uso Ogni 5 minuti */5 * * * *Volume elevato, bassa latenza Ogni 15 minuti */15 * * * *Volume medio Ogni ora 0 * * * *Standard (consigliato) Ogni 6 ore 0 */6 * * *Volume basso, elaborazione batch Ogni giorno 0 0 * * *Raccolta dei dati storici
Testare l'integrazione
- Nella console Cloud Scheduler, trova il job.
- Fai clic su Forza esecuzione per attivare il job manualmente.
- Aspetta alcuni secondi.
- Vai a Cloud Run > Servizi.
- Fai clic su
confluence-audit-collector. - Fai clic sulla scheda Log.
Verifica che la funzione sia stata eseguita correttamente. Cerca quanto segue:
Fetching logs from YYYY-MM-DDTHH:MM:SS+00:00 to YYYY-MM-DDTHH:MM:SS+00:00 Page 1: Retrieved X events Wrote X records to gs://bucket-name/prefix/logs_YYYYMMDD_HHMMSS.ndjson Successfully processed X recordsVai a Cloud Storage > Bucket.
Fai clic sul nome del bucket.
Vai alla cartella
confluence-audit/.Verifica che sia stato creato un nuovo file
.ndjsoncon il timestamp corrente.
Se visualizzi errori nei log:
- HTTP 401: controlla le credenziali API nelle variabili di ambiente
- HTTP 403: verifica che l'account disponga delle autorizzazioni di amministratore di Confluence
- HTTP 429: limitazione della frequenza: la funzione riproverà automaticamente con backoff
- Variabili di ambiente mancanti: controlla che tutte le variabili richieste siano impostate
Recuperare il service account Google SecOps
Google SecOps utilizza un service account univoco per leggere i dati dal tuo bucket GCS. Devi concedere a questo service account l'accesso al tuo bucket.
Recuperare l'email del service account
- Vai a Impostazioni SIEM > Feed.
- Fai clic su Aggiungi nuovo feed.
- Fai clic su Configura un singolo feed.
- Nel campo Nome feed, inserisci un nome per il feed (ad esempio,
Confluence Cloud Audit Logs). - Seleziona Google Cloud Storage V2 come Tipo di origine.
- Seleziona Atlassian Confluence come Tipo di log.
Fai clic su Ottieni service account. Verrà visualizzata un'email univoca del service account, ad esempio:
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.comCopia questo indirizzo email per utilizzarlo nel passaggio successivo.
Concedi le autorizzazioni IAM al service account Google SecOps
Il service account Google SecOps deve avere il ruolo Visualizzatore oggetti Storage nel bucket GCS.
- Vai a Cloud Storage > Bucket.
- Fai clic sul nome del bucket.
- Vai alla scheda Autorizzazioni.
- Fai clic su Concedi l'accesso.
- Fornisci i seguenti dettagli di configurazione:
- Aggiungi entità: incolla l'email del service account Google SecOps.
- Assegna i ruoli: seleziona Visualizzatore oggetti Storage.
Fai clic su Salva.
Configura un feed in Google SecOps per importare i log di Confluence
- Vai a Impostazioni SIEM > Feed.
- Fai clic su Aggiungi nuovo feed.
- Fai clic su Configura un singolo feed.
- Nel campo Nome feed, inserisci un nome per il feed (ad esempio,
Confluence Cloud Audit Logs). - Seleziona Google Cloud Storage V2 come Tipo di origine.
- Seleziona Atlassian Confluence come Tipo di log.
- Fai clic su Avanti.
Specifica i valori per i seguenti parametri di input:
URL del bucket di archiviazione: inserisci l'URI del bucket GCS con il percorso del prefisso:
gs://confluence-audit-logs/confluence-audit/Sostituisci:
confluence-audit-logs: il nome del bucket GCS.confluence-audit: (Facoltativo) prefisso/percorso della cartella in cui vengono archiviati i log (lascia vuoto per la radice).
Esempi:
- Bucket radice:
gs://company-logs/ - Con prefisso:
gs://company-logs/confluence-audit/ - Con sottocartella:
gs://company-logs/confluence/audit/
- Bucket radice:
Opzione di eliminazione dell'origine: seleziona l'opzione di eliminazione in base alle tue preferenze:
- Mai: non elimina mai i file dopo i trasferimenti (opzione consigliata per i test).
- Elimina file trasferiti: elimina i file dopo il trasferimento riuscito.
Elimina file trasferiti e directory vuote: elimina i file e le directory vuote dopo il trasferimento riuscito.
Età massima del file: includi i file modificati nell'ultimo numero di giorni. Il valore predefinito è 180 giorni.
Spazio dei nomi dell'asset: lo spazio dei nomi dell'asset.
Etichette di importazione: l'etichetta da applicare agli eventi di questo feed.
Fai clic su Avanti.
Controlla la nuova configurazione del feed nella schermata Finalizza e poi fai clic su Invia.
Tabella di mappatura UDM
| Campo log | Mappatura UDM | Funzione logica |
|---|---|---|
| agente | read_only_udm.network.http.user_agent | Valore tratto dal campo "agente". |
| app_protocol | read_only_udm.network.application_protocol | Derivato dal campo "app_protocol". Se "app_protocol" contiene "HTTPS", "HTTP", "SSH" o "RDP", viene utilizzato il protocollo corrispondente. In caso contrario, il valore predefinito è "UNKNOWN_APPLICATION_PROTOCOL". |
| app_protocol | read_only_udm.network.application_protocol_version | Valore estratto dal campo "app_protocol". |
| auditType.action | read_only_udm.security_result.action | Derivato dal campo "auditType.action". Se "auditType.action" contiene "successful", il valore è impostato su "ALLOW". Se contiene "restricted", il valore è impostato su "BLOCK". |
| auditType.action | read_only_udm.security_result.summary | Valore estratto dal campo "auditType.action" quando "auditType" non è vuoto e "auditType_area" è "SECURITY". |
| auditType.actionI18nKey | read_only_udm.metadata.product_event_type | Valore estratto dal campo "auditType.actionI18nKey" quando "auditType" non è vuoto. |
| auditType.area | read_only_udm.security_result.detection_fields.value | Valore estratto dal campo "auditType.area" e assegnato al campo "value" di un campo di rilevamento con il campo "key" impostato su "auditType area". Questo mapping viene eseguito quando "auditType" non è vuoto. |
| auditType.category | read_only_udm.security_result.category_details | Valore estratto dal campo "auditType.category" quando "auditType" non è vuoto. |
| auditType.categoryI18nKey | read_only_udm.security_result.detection_fields.value | Valore estratto dal campo "auditType.categoryI18nKey" e assegnato al campo "value" di un campo di rilevamento con il campo "key" impostato su "auditType categoryI18nKey". Questo mapping viene eseguito quando "auditType" non è vuoto. |
| auditType.level | read_only_udm.security_result.detection_fields.value | Valore estratto dal campo "auditType.level" e assegnato al campo "value" di un campo di rilevamento con il campo "key" impostato su "auditType level". Questo mapping viene eseguito quando "auditType" non è vuoto. |
| author.displayName | read_only_udm.principal.user.user_display_name | Valore tratto dal campo "author.displayName". |
| author.externalCollaborator | read_only_udm.security_result.about.resource.attribute.labels.value | Valore estratto dal campo "author.externalCollaborator" e assegnato al campo "value" di un'etichetta con il campo "key" impostato su "externalCollaborator". |
| author.id | read_only_udm.principal.user.userid | Valore estratto dal campo "author.id" quando "author.type" è "user" e "principal_user_present" è "false". |
| author.isExternalCollaborator | read_only_udm.security_result.about.resource.attribute.labels.value | Valore estratto dal campo "author.isExternalCollaborator" e assegnato al campo "value" di un'etichetta con il campo "key" impostato su "isExternalCollaborator". |
| author.name | read_only_udm.principal.user.user_display_name | Valore tratto dal campo "author.name" quando "author.type" è "user" e "principal_user_present" è "false". |
| bytes_in | read_only_udm.network.received_bytes | Valore estratto dal campo "bytes_in" se contiene cifre. In caso contrario, il valore predefinito è 0. |
| category | read_only_udm.security_result.category_details | Valore tratto dal campo "category". |
| changedValues | read_only_udm.principal.resource.attribute.labels | Itera ogni elemento di "changedValues" e crea etichette con chiavi come "changedValue [index] [key]" e valori corrispondenti nell'array "changedValues". |
| creationDate | read_only_udm.metadata.event_timestamp | Valore estratto dal campo "creationDate", analizzato come timestamp UNIX o UNIX_MS. |
| extraAttributes | read_only_udm.principal.resource.attribute.labels | Itera ogni elemento in "extraAttributes" e crea etichette con chiavi basate sui campi "name" e "nameI18nKey" e sui valori del campo "value" corrispondente. |
| http_verb | read_only_udm.network.http.method | Valore estratto dal campo "http_verb". |
| ip | read_only_udm.target.ip | Valore tratto dal campo "ip". |
| principal_host | read_only_udm.principal.hostname | Valore tratto dal campo "principal_host". |
| referral_url | read_only_udm.network.http.referral_url | Valore tratto dal campo "referral_url". |
| remoteAddress | read_only_udm.principal.ip | Valore estratto dal campo "remoteAddress", analizzato come indirizzo IP. |
| response_code | read_only_udm.network.http.response_code | Valore estratto dal campo "response_code". |
| session_duration | read_only_udm.additional.fields.value.string_value | Valore estratto dal campo "session_duration" e assegnato al campo "string_value" di un'etichetta con il campo "key" impostato su "Session Duration" (Durata sessione). |
| origine | read_only_udm.principal.ip | Valore estratto dal campo "origine", analizzato come indirizzo IP. |
| src_ip | read_only_udm.principal.ip | Valore estratto dal campo "src_ip" se "remoteAddress" è vuoto. |
| riepilogo | read_only_udm.security_result.summary | Valore tratto dal campo "Riepilogo". |
| sysAdmin | read_only_udm.security_result.about.resource.attribute.labels.value | Valore estratto dal campo "sysAdmin" e assegnato al campo "value" di un'etichetta con il campo "key" impostato su "sysAdmin". |
| superAdmin | read_only_udm.security_result.about.resource.attribute.labels.value | Valore estratto dal campo "superAdmin" e assegnato al campo "value" di un'etichetta con il campo "key" impostato su "superAdmin". |
| target_url | read_only_udm.target.url | Valore tratto dal campo "target_url". |
| timestamp | read_only_udm.metadata.event_timestamp | Valore estratto dal campo "timestamp", analizzato come stringa di data e ora. |
| user_id | read_only_udm.principal.user.userid | Valore tratto dal campo "user_id". |
| read_only_udm.metadata.event_type | Il valore di questo campo è determinato da una serie di controlli e il valore predefinito è "GENERIC_EVENT". È impostato su valori specifici come "NETWORK_HTTP", "USER_UNCATEGORIZED" o "STATUS_UPDATE" in base alla presenza e al contenuto di altri campi come "principal_host", "user_id", "has_principal" e "author.type". | |
| read_only_udm.metadata.vendor_name | Imposta il valore su "ATLASSIAN". | |
| read_only_udm.metadata.product_name | Imposta su "CONFLUENCE". | |
| read_only_udm.metadata.log_type | Imposta il valore su "ATLASSIAN_CONFLUENCE". | |
| read_only_udm.principal.user.user_display_name | Il valore di questo campo può provenire da "author.displayName" o "affectedObject.name", a seconda del contesto. | |
| read_only_udm.target.process.pid | Il valore di questo campo può provenire da "principal_host" o "pid" a seconda del contesto. | |
| read_only_udm.principal.resource.attribute.labels | Questo campo viene compilato con varie etichette derivate da campi come "affectedObjects", "changedValues" e "extraAttributes". Le chiavi e i valori di queste etichette vengono generati dinamicamente in base al contenuto specifico di questi campi. |
Hai bisogno di ulteriore assistenza? Ricevi risposte dai membri della community e dai professionisti di Google SecOps.