Raccogliere i log di Atlassian Confluence

Supportato in:

Questo documento spiega come importare i log di Atlassian Confluence in Google Security Operations. Il parser tenta innanzitutto di estrarre i campi dal messaggio di log non elaborato utilizzando espressioni regolari (pattern grok) progettate per i log di Atlassian Confluence. Se l'analisi grok non va a buon fine o il log è in formato JSON, il codice tenta di analizzare il messaggio come JSON. Infine, i campi estratti vengono mappati allo schema UDM di Google SecOps e arricchiti con un contesto aggiuntivo.

Prima di iniziare

Assicurati di soddisfare i seguenti prerequisiti:

  • Un'istanza Google SecOps
  • Un account Atlassian Confluence Cloud con accesso ai log di controllo OPPURE Confluence Data Center/Server con accesso amministrativo
  • Per il metodo basato su Google Cloud: accesso con privilegi a Google Cloud (GCS, IAM, Cloud Run, Pub/Sub, Cloud Scheduler)
  • Per il metodo Bindplane: Windows Server 2016 o versioni successive oppure host Linux con systemd

Panoramica delle opzioni di integrazione

Questa guida fornisce due percorsi di integrazione:

  • Opzione 1: Confluence Data Center/Server tramite Bindplane + Syslog
  • Opzione 2: log di controllo di Confluence Cloud tramite la funzione GCP Cloud Run + GCS (formato JSON)

Scegli l'opzione più adatta al tipo di deployment e all'infrastruttura di Confluence.

Opzione 1: Confluence Data Center/Server tramite Bindplane + Syslog

Questa opzione configura Confluence Data Center o Server per inviare i log tramite syslog a un agente Bindplane, che poi li inoltra a Google SecOps.

Recuperare il file di autenticazione dell'importazione di Google SecOps

  1. Accedi alla console Google SecOps.
  2. Vai a Impostazioni SIEM > Agenti di raccolta.
  3. Fai clic su Scarica per scaricare il file di autenticazione dell'importazione.
  4. Salva il file in modo sicuro sul sistema in cui verrà installato l'agente Bindplane.

Recuperare l'ID cliente Google SecOps

  1. Accedi alla console Google SecOps.
  2. Vai a Impostazioni SIEM > Profilo.
  3. Copia e salva l'ID cliente dalla sezione Dettagli dell'organizzazione.

Installa l'agente Bindplane

Installa l'agente Bindplane sul sistema operativo Windows o Linux seguendo le istruzioni riportate di seguito.

Installazione di finestre

  1. Apri Prompt dei comandi o PowerShell come amministratore.
  2. Esegui questo comando:

    msiexec /i "https://github.com/observIQ/bindplane-otel-collector/releases/latest/download/observiq-otel-collector.msi" /quiet
    
  3. Attendi il completamento dell'installazione.

  4. Verifica l'installazione eseguendo il comando:

    sc query observiq-otel-collector
    

Il servizio dovrebbe essere visualizzato come IN ESECUZIONE.

Installazione di Linux

  1. Apri un terminale con privilegi di root o sudo.
  2. Esegui questo comando:

    sudo sh -c "$(curl -fsSlL https://github.com/observIQ/bindplane-otel-collector/releases/latest/download/install_unix.sh)" install_unix.sh
    
  3. Attendi il completamento dell'installazione.

  4. Verifica l'installazione eseguendo il comando:

    sudo systemctl status observiq-otel-collector
    

Il servizio dovrebbe essere visualizzato come attivo (in esecuzione).

Risorse aggiuntive per l'installazione

Per ulteriori opzioni di installazione e risoluzione dei problemi, consulta la Guida all'installazione dell'agente Bindplane.

Configura l'agente Bindplane per importare syslog e inviarli a Google SecOps

Individua il file di configurazione

  • Linux:

    sudo nano /etc/bindplane-agent/config.yaml
    
  • Windows:

    notepad "C:\Program Files\observIQ OpenTelemetry Collector\config.yaml"
    

Modifica il file di configurazione

  1. Sostituisci l'intero contenuto di config.yaml con la seguente configurazione:

    receivers:
      udplog:
        listen_address: "0.0.0.0:514"
    
    exporters:
      chronicle/confluence_logs:
        compression: gzip
        creds_file_path: '/etc/bindplane-agent/ingestion-auth.json'
        customer_id: 'YOUR_CUSTOMER_ID'
        endpoint: malachiteingestion-pa.googleapis.com
        log_type: ATLASSIAN_CONFLUENCE
        raw_log_field: body
        ingestion_labels:
          service: confluence
    
    service:
      pipelines:
        logs/confluence:
          receivers:
            - udplog
          exporters:
            - chronicle/confluence_logs
    

Parametri di configurazione

  • Sostituisci i seguenti segnaposto:

    • listen_address: sostituisci la porta e l'indirizzo IP in base alle esigenze della tua infrastruttura. Utilizza 0.0.0.0:514 per ascoltare su tutte le interfacce sulla porta 514.
    • creds_file_path: aggiorna il percorso in cui è stato salvato il file di autenticazione:
      • Linux: /etc/bindplane-agent/ingestion-auth.json
      • Windows: C:\Program Files\observIQ OpenTelemetry Collector\ingestion-auth.json
    • customer_id: sostituisci YOUR_CUSTOMER_ID con l'ID cliente effettivo del passaggio precedente.
    • endpoint: URL endpoint regionale:
      • Stati Uniti: malachiteingestion-pa.googleapis.com
      • Europa: europe-malachiteingestion-pa.googleapis.com
      • Asia: asia-southeast1-malachiteingestion-pa.googleapis.com

Salvare il file di configurazione

Dopo la modifica, salva il file:

  • Linux: premi Ctrl+O, poi Enter e infine Ctrl+X.
  • Windows: fai clic su File > Salva.

Riavvia l'agente Bindplane per applicare le modifiche

Riavvia l'agente Bindplane in Linux

  1. Per riavviare l'agente Bindplane in Linux, esegui questo comando:

    sudo systemctl restart observiq-otel-collector
    
  2. Verifica che il servizio sia in esecuzione:

    sudo systemctl status observiq-otel-collector
    
  3. Controlla i log per individuare eventuali errori:

    sudo journalctl -u observiq-otel-collector -f
    

Riavvia l'agente Bindplane in Windows

  1. Per riavviare l'agente Bindplane in Windows, scegli una delle seguenti opzioni:

    • Utilizzando il prompt dei comandi o PowerShell come amministratore:

      net stop observiq-otel-collector && net start observiq-otel-collector
      
    • Utilizzo della console Services:

      1. Premi Win+R, digita services.msc e premi Invio.
      2. Individua observIQ OpenTelemetry Collector.
      3. Fai clic con il tasto destro del mouse e seleziona Riavvia.
      4. Verifica che il servizio sia in esecuzione:

        sc query observiq-otel-collector
        
      5. Controlla i log per individuare eventuali errori:

        type "C:\Program Files\observIQ OpenTelemetry Collector\log\collector.log"
        

Configurare l'inoltro di Syslog su Confluence Data Center/Server

  1. Configura Confluence per scrivere i log nei file (comportamento predefinito).
  2. Installa rsyslog se non è presente:

    sudo apt-get install rsyslog  # Debian/Ubuntu
    sudo yum install rsyslog      # RHEL/CentOS
    
  3. Crea il file di configurazione rsyslog /etc/rsyslog.d/confluence.conf:

    # Forward Confluence logs to Bindplane
    $ModLoad imfile
    
    # Application logs
    $InputFileName /opt/atlassian/confluence/logs/atlassian-confluence.log
    $InputFileTag confluence-app:
    $InputFileStateFile stat-confluence-app
    $InputFileSeverity info
    $InputFileFacility local0
    $InputRunFileMonitor
    
    # Audit logs (JSON format in DC/Server)
    $InputFileName <confluence-home-directory>/log/audit/audit.log
    $InputFileTag confluence-audit:
    $InputFileStateFile stat-confluence-audit
    $InputFileSeverity info
    $InputFileFacility local1
    $InputRunFileMonitor
    
    # Forward to Bindplane agent
    *.* @@BINDPLANE_AGENT_IP:514
    
    • Sostituisci BINDPLANE_AGENT_IP con l'indirizzo IP dell'agente Bindplane (ad esempio, 192.168.1.100).
    • Modifica i percorsi dei file di log in base all'installazione di Confluence:
      • I log delle applicazioni in genere: <confluence-install>/logs/ o <local-home>/logs/
      • Log di controllo: <confluence-home-directory>/log/audit/ (formato JSON)
      • Per trovare la directory principale di Confluence, vai a Impostazioni > Configurazione generale > Informazioni di sistema e cerca Confluence Home o Local Home.
  4. Riavvia rsyslog:

    sudo systemctl restart rsyslog
    

Opzione B: configura l'inoltro di Log4j2 Syslog

Questa opzione richiede la modifica della configurazione di Log4j2. L'opzione A (rsyslog) è consigliata per semplicità.

  1. Accedi al tuo server Confluence tramite SSH o RDP.
  2. Individua il file di configurazione Log4j2 in:

    <confluence-install>/confluence/WEB-INF/classes/log4j2.xml
    
  3. Modifica il file di configurazione per aggiungere un appender Syslog:

    <Configuration>
      <Appenders>
        <!-- Existing appenders -->
        <Syslog name="SyslogAppender" 
                host="BINDPLANE_AGENT_IP" 
                port="514" 
                protocol="UDP"
                format="RFC5424"
                facility="LOCAL0">
          <PatternLayout pattern="%d{ISO8601} %p [%t] [%c{1}] %m%n"/>
        </Syslog>
      </Appenders>
    
      <Loggers>
        <Root level="info">
          <AppenderRef ref="SyslogAppender"/>
          <!-- Other appender refs -->
        </Root>
    
        <!-- Audit logger -->
        <Logger name="com.atlassian.confluence.event.events.security.AuditEvent" 
                level="info" 
                additivity="false">
          <AppenderRef ref="SyslogAppender"/>
        </Logger>
      </Loggers>
    </Configuration>
    
    • Sostituisci BINDPLANE_AGENT_IP con l'indirizzo IP dell'agente Bindplane (ad esempio, 192.168.1.100).
  4. Riavvia Confluence per applicare le modifiche:

    sudo systemctl restart confluence
    

Opzione 2: Confluence Cloud Audit Logs tramite la funzione GCP Cloud Run e GCS

Questo metodo utilizza la funzione GCP Cloud Run per recuperare periodicamente gli audit log tramite l'API REST di Confluence Audit e archiviarli in GCS per l'importazione di Google SecOps.

Raccogli le credenziali API Confluence Cloud

  1. Accedi al tuo account Atlassian.
  2. Vai alla pagina https://id.atlassian.com/manage-profile/security/api-tokens.
  3. Fai clic su Genera token API.
  4. Inserisci un'etichetta per il token (ad esempio, Google Security Operations Integration).
  5. Fai clic su Crea.
  6. Copia e salva il token API in modo sicuro.
  7. Prendi nota dell'URL del tuo sito Confluence Cloud (ad esempio, https://yoursite.atlassian.net).
  8. Prendi nota dell'indirizzo email del tuo account Atlassian (utilizzato per l'autenticazione).

Verifica le autorizzazioni

Per verificare che l'account disponga delle autorizzazioni richieste:

  1. Accedi a Confluence Cloud.
  2. Fai clic sull'icona Impostazioni (⚙️) nell'angolo in alto a destra.
  3. Se vedi Monitoraggio > Log di controllo nel riquadro di navigazione a sinistra, disponi delle autorizzazioni richieste.
  4. Se non riesci a visualizzare questa opzione, contatta l'amministratore per concedere l'autorizzazione Amministratore di Confluence.

Testare l'accesso API

  • Verifica le tue credenziali prima di procedere con l'integrazione:

    # Replace with your actual credentials
    CONFLUENCE_EMAIL="your-email@example.com"
    CONFLUENCE_API_TOKEN="your-api-token"
    CONFLUENCE_URL="https://yoursite.atlassian.net"
    
    # Test API access
    curl -u "${CONFLUENCE_EMAIL}:${CONFLUENCE_API_TOKEN}" \
      -H "Accept: application/json" \
      "${CONFLUENCE_URL}/wiki/rest/api/audit"
    

Creazione di un bucket Google Cloud Storage

  1. Vai alla console Google Cloud.
  2. Seleziona il tuo progetto o creane uno nuovo.
  3. Nel menu di navigazione, vai a Cloud Storage > Bucket.
  4. Fai clic su Crea bucket.
  5. Fornisci i seguenti dettagli di configurazione:

    Impostazione Valore
    Assegna un nome al bucket Inserisci un nome univoco globale (ad esempio confluence-audit-logs).
    Tipo di località Scegli in base alle tue esigenze (regione singola, doppia regione, più regioni)
    Località Seleziona la posizione (ad esempio, us-central1).
    Classe di archiviazione Standard (consigliato per i log a cui si accede di frequente)
    Controllo dell'accesso Uniforme (consigliato)
    Strumenti di protezione (Facoltativo) Attiva il controllo delle versioni degli oggetti o la policy di conservazione
  6. Fai clic su Crea.

Crea un service account per la funzione Cloud Run

La funzione Cloud Run richiede un service account con autorizzazioni di scrittura nel bucket GCS e di invocazione da parte di Pub/Sub.

Crea service account

  1. Nella console Google Cloud, vai a IAM e amministrazione > Service Accounts.
  2. Fai clic su Crea service account.
  3. Fornisci i seguenti dettagli di configurazione:
    • Nome del service account: inserisci confluence-audit-collector-sa.
    • Descrizione service account: inserisci Service account for Cloud Run function to collect Confluence Cloud audit logs.
  4. Fai clic su Crea e continua.
  5. Nella sezione Concedi a questo service account l'accesso al progetto, aggiungi i seguenti ruoli:
    1. Fai clic su Seleziona un ruolo.
    2. Cerca e seleziona Amministratore oggetti di archiviazione.
    3. Fai clic su + Aggiungi un altro ruolo.
    4. Cerca e seleziona Cloud Run Invoker.
    5. Fai clic su + Aggiungi un altro ruolo.
    6. Cerca e seleziona Invoker di Cloud Functions.
  6. Fai clic su Continua.
  7. Fai clic su Fine.

Questi ruoli sono necessari per:

  • Amministratore oggetti Storage: scrive i log nel bucket GCS e gestisce i file di stato
  • Cloud Run Invoker: consente a Pub/Sub di richiamare la funzione
  • Cloud Functions Invoker: consente la chiamata di funzioni

Concedi autorizzazioni IAM sul bucket GCS

Concedi al service account le autorizzazioni di scrittura sul bucket GCS:

  1. Vai a Cloud Storage > Bucket.
  2. Fai clic sul nome del bucket.
  3. Vai alla scheda Autorizzazioni.
  4. Fai clic su Concedi l'accesso.
  5. Fornisci i seguenti dettagli di configurazione:
    • Aggiungi entità: inserisci l'email del service account (ad es. confluence-audit-collector-sa@PROJECT_ID.iam.gserviceaccount.com).
    • Assegna i ruoli: seleziona Storage Object Admin.
  6. Fai clic su Salva.

Crea argomento Pub/Sub

Crea un argomento Pub/Sub a cui Cloud Scheduler pubblicherà e a cui la funzione Cloud Run si iscriverà.

  1. Nella console GCP, vai a Pub/Sub > Argomenti.
  2. Fai clic su Crea argomento.
  3. Fornisci i seguenti dettagli di configurazione:
    • ID argomento: inserisci confluence-audit-trigger.
    • Lascia le altre impostazioni sui valori predefiniti.
  4. Fai clic su Crea.

Crea una funzione Cloud Run per raccogliere i log

La funzione Cloud Run viene attivata dai messaggi Pub/Sub di Cloud Scheduler per recuperare i log dall'API Confluence Cloud Audit e li scrive in GCS.

  1. Nella console GCP, vai a Cloud Run.
  2. Fai clic su Crea servizio.
  3. Seleziona Funzione (usa un editor in linea per creare una funzione).
  4. Nella sezione Configura, fornisci i seguenti dettagli di configurazione:

    Impostazione Valore
    Nome servizio confluence-audit-collector
    Regione Seleziona la regione corrispondente al tuo bucket GCS (ad esempio us-central1)
    Runtime Seleziona Python 3.12 o versioni successive
  5. Nella sezione Trigger (facoltativo):

    1. Fai clic su + Aggiungi trigger.
    2. Seleziona Cloud Pub/Sub.
    3. In Seleziona un argomento Cloud Pub/Sub, scegli confluence-audit-trigger.
    4. Fai clic su Salva.
  6. Nella sezione Autenticazione:

    1. Seleziona Richiedi autenticazione.
    2. Controlla Identity and Access Management (IAM).
  7. Scorri verso il basso ed espandi Container, networking, sicurezza.

  8. Vai alla scheda Sicurezza:

    • Service account (Service account): seleziona confluence-audit-collector-sa.
  9. Vai alla scheda Container:

    1. Fai clic su Variabili e secret.
    2. Fai clic su + Aggiungi variabile per ogni variabile di ambiente:
    Nome variabile Valore di esempio Descrizione
    GCS_BUCKET confluence-audit-logs Nome bucket GCS
    GCS_PREFIX confluence-audit Prefisso per i file di log
    STATE_KEY confluence-audit/state.json Percorso file di stato
    CONFLUENCE_URL https://yoursite.atlassian.net URL del sito Confluence
    CONFLUENCE_EMAIL your-email@example.com Email dell'account Atlassian
    CONFLUENCE_API_TOKEN your-api-token-here Token API
    MAX_RECORDS 1000 Numero massimo di record per esecuzione
  10. Nella sezione Variabili e secret, scorri verso il basso fino a Richieste:

    • Timeout richiesta: inserisci 600 secondi (10 minuti).
  11. Vai alla scheda Impostazioni:

    • Nella sezione Risorse:
      • Memoria: seleziona 512 MiB o un valore superiore.
      • CPU: seleziona 1.
  12. Nella sezione Scalabilità della revisione:

    • Numero minimo di istanze: inserisci 0.
    • Numero massimo di istanze: inserisci 100 (o modifica in base al carico previsto).
  13. Fai clic su Crea.

  14. Attendi la creazione del servizio (1-2 minuti).

  15. Dopo aver creato il servizio, si apre automaticamente l'editor di codice incorporato.

Aggiungi codice per la funzione

  1. Inserisci main in Entry point della funzione
  2. Nell'editor di codice incorporato, crea due file:

    • Primo file: main.py:
    import functions_framework
    from google.cloud import storage
    import json
    import os
    import urllib3
    from datetime import datetime, timezone, timedelta
    import time
    import base64
    
    # Initialize HTTP client with timeouts
    http = urllib3.PoolManager(
        timeout=urllib3.Timeout(connect=5.0, read=30.0),
        retries=False,
    )
    
    # Initialize Storage client
    storage_client = storage.Client()
    
    # Environment variables
    GCS_BUCKET = os.environ.get('GCS_BUCKET')
    GCS_PREFIX = os.environ.get('GCS_PREFIX', 'confluence-audit/')
    STATE_KEY = os.environ.get('STATE_KEY', 'confluence-audit/state.json')
    CONFLUENCE_URL = os.environ.get('CONFLUENCE_URL')
    CONFLUENCE_EMAIL = os.environ.get('CONFLUENCE_EMAIL')
    CONFLUENCE_API_TOKEN = os.environ.get('CONFLUENCE_API_TOKEN')
    MAX_RECORDS = int(os.environ.get('MAX_RECORDS', '1000'))
    
    def to_unix_millis(dt: datetime) -> int:
        """Convert datetime to Unix epoch milliseconds."""
        if dt.tzinfo is None:
            dt = dt.replace(tzinfo=timezone.utc)
        dt = dt.astimezone(timezone.utc)
        return int(dt.timestamp() * 1000)
    
    def parse_datetime(value: str) -> datetime:
        """Parse ISO datetime string to datetime object."""
        if value.endswith("Z"):
            value = value[:-1] + "+00:00"
        return datetime.fromisoformat(value)
    
    @functions_framework.cloud_event
    def main(cloud_event):
        """
        Cloud Run function triggered by Pub/Sub to fetch Confluence Cloud audit logs and write to GCS.
    
        Args:
            cloud_event: CloudEvent object containing Pub/Sub message
        """
    
        if not all([GCS_BUCKET, CONFLUENCE_URL, CONFLUENCE_EMAIL, CONFLUENCE_API_TOKEN]):
            print('Error: Missing required environment variables')
            return
    
        try:
            # Get GCS bucket
            bucket = storage_client.bucket(GCS_BUCKET)
    
            # Load state
            state = load_state(bucket, STATE_KEY)
    
            # Determine time window
            now = datetime.now(timezone.utc)
            last_time = None
    
            if isinstance(state, dict) and state.get("last_event_time"):
                try:
                    last_time = parse_datetime(state["last_event_time"])
                    # Overlap by 2 minutes to catch any delayed events
                    last_time = last_time - timedelta(minutes=2)
                except Exception as e:
                    print(f"Warning: Could not parse last_event_time: {e}")
    
            if last_time is None:
                last_time = now - timedelta(hours=24)
    
            print(f"Fetching logs from {last_time.isoformat()} to {now.isoformat()}")
    
            # Convert to Unix milliseconds
            start_millis = to_unix_millis(last_time)
            end_millis = to_unix_millis(now)
    
            # Fetch logs
            records, newest_event_time = fetch_logs(
                api_base=CONFLUENCE_URL,
                email=CONFLUENCE_EMAIL,
                api_token=CONFLUENCE_API_TOKEN,
                start_time_ms=start_millis,
                end_time_ms=end_millis,
                max_records=MAX_RECORDS,
            )
    
            if not records:
                print("No new log records found.")
                save_state(bucket, STATE_KEY, now.isoformat())
                return
    
            # Write to GCS as NDJSON
            timestamp = now.strftime('%Y%m%d_%H%M%S')
            object_key = f"{GCS_PREFIX}/logs_{timestamp}.ndjson"
            blob = bucket.blob(object_key)
    
            ndjson = '\n'.join([json.dumps(record, ensure_ascii=False) for record in records]) + '\n'
            blob.upload_from_string(ndjson, content_type='application/x-ndjson')
    
            print(f"Wrote {len(records)} records to gs://{GCS_BUCKET}/{object_key}")
    
            # Update state with newest event time
            if newest_event_time:
                save_state(bucket, STATE_KEY, newest_event_time)
            else:
                save_state(bucket, STATE_KEY, now.isoformat())
    
            print(f"Successfully processed {len(records)} records")
    
        except Exception as e:
            print(f'Error processing logs: {str(e)}')
            raise
    
    def load_state(bucket, key):
        """Load state from GCS."""
        try:
            blob = bucket.blob(key)
            if blob.exists():
                state_data = blob.download_as_text()
                return json.loads(state_data)
        except Exception as e:
            print(f"Warning: Could not load state: {e}")
    
        return {}
    
    def save_state(bucket, key, last_event_time_iso: str):
        """Save the last event timestamp to GCS state file."""
        try:
            state = {'last_event_time': last_event_time_iso}
            blob = bucket.blob(key)
            blob.upload_from_string(
                json.dumps(state, indent=2),
                content_type='application/json'
            )
            print(f"Saved state: last_event_time={last_event_time_iso}")
        except Exception as e:
            print(f"Warning: Could not save state: {e}")
    
    def fetch_logs(api_base: str, email: str, api_token: str, start_time_ms: int, end_time_ms: int, max_records: int):
        """
        Fetch logs from Confluence Cloud Audit API with pagination and rate limiting.
    
        Args:
            api_base: Confluence site URL
            email: Atlassian account email
            api_token: API token
            start_time_ms: Start time in Unix milliseconds
            end_time_ms: End time in Unix milliseconds
            max_records: Maximum total records to fetch
    
        Returns:
            Tuple of (records list, newest_event_time ISO string)
        """
        # Clean up URL
        base_url = api_base.rstrip('/')
    
        # Build authentication header
        auth_string = f"{email}:{api_token}"
        auth_bytes = auth_string.encode('utf-8')
        auth_b64 = base64.b64encode(auth_bytes).decode('utf-8')
        headers = {
            'Authorization': f'Basic {auth_b64}',
            'Accept': 'application/json',
            'User-Agent': 'GoogleSecOps-ConfluenceCollector/1.0'
        }
    
        records = []
        newest_time = None
        page_num = 0
        backoff = 1.0
        start_index = 0
    
        while True:
            page_num += 1
    
            if len(records) >= max_records:
                print(f"Reached max_records limit ({max_records})")
                break
    
            # Build request URL
            url = f"{base_url}/wiki/rest/api/audit?startDate={start_time_ms}&endDate={end_time_ms}&start={start_index}&limit=100"
    
            try:
                response = http.request('GET', url, headers=headers)
    
                # Handle rate limiting with exponential backoff
                if response.status == 429:
                    retry_after = int(response.headers.get('Retry-After', str(int(backoff))))
                    print(f"Rate limited (429). Retrying after {retry_after}s...")
                    time.sleep(retry_after)
                    backoff = min(backoff * 2, 30.0)
                    continue
    
                backoff = 1.0
    
                if response.status != 200:
                    print(f"HTTP Error: {response.status}")
                    response_text = response.data.decode('utf-8')
                    print(f"Response body: {response_text}")
                    return [], None
    
                data = json.loads(response.data.decode('utf-8'))
    
                page_results = data.get('results', [])
    
                if not page_results:
                    print(f"No more results (empty page)")
                    break
    
                print(f"Page {page_num}: Retrieved {len(page_results)} events")
                records.extend(page_results)
    
                # Track newest event time
                for event in page_results:
                    try:
                        # creationDate is in Unix milliseconds
                        event_time_ms = event.get('creationDate')
                        if event_time_ms:
                            event_dt = datetime.fromtimestamp(event_time_ms / 1000, tz=timezone.utc)
                            event_time = event_dt.isoformat()
                            if newest_time is None or parse_datetime(event_time) > parse_datetime(newest_time):
                                newest_time = event_time
                    except Exception as e:
                        print(f"Warning: Could not parse event time: {e}")
    
                # Check for more results
                current_size = data.get('size', 0)
                if current_size < 100:
                    print(f"Reached last page (size={current_size} < limit=100)")
                    break
    
                start_index += current_size
    
            except Exception as e:
                print(f"Error fetching logs: {e}")
                return [], None
    
        print(f"Retrieved {len(records)} total records from {page_num} pages")
        return records[:max_records], newest_time
    
    • Secondo file: requirements.txt::
    functions-framework==3.*
    google-cloud-storage==2.*
    urllib3>=2.0.0
    
  3. Fai clic su Esegui il deployment per salvare la funzione ed eseguirne il deployment.

  4. Attendi il completamento del deployment (2-3 minuti).

Crea job Cloud Scheduler

Cloud Scheduler pubblica messaggi nell'argomento Pub/Sub a intervalli regolari, attivando la funzione Cloud Run.

  1. Nella console di GCP, vai a Cloud Scheduler.
  2. Fai clic su Crea job.
  3. Fornisci i seguenti dettagli di configurazione:

    Impostazione Valore
    Nome confluence-audit-collector-hourly
    Regione Seleziona la stessa regione della funzione Cloud Run
    Frequenza 0 * * * * (ogni ora, all'ora)
    Fuso orario Seleziona il fuso orario (UTC consigliato)
    Tipo di target Pub/Sub
    Argomento Seleziona confluence-audit-trigger
    Corpo del messaggio {} (oggetto JSON vuoto)
  4. Fai clic su Crea.

Opzioni di frequenza di pianificazione

  • Scegli la frequenza in base al volume dei log e ai requisiti di latenza:

    Frequenza Espressione cron Caso d'uso
    Ogni 5 minuti */5 * * * * Volume elevato, bassa latenza
    Ogni 15 minuti */15 * * * * Volume medio
    Ogni ora 0 * * * * Standard (consigliato)
    Ogni 6 ore 0 */6 * * * Volume basso, elaborazione batch
    Ogni giorno 0 0 * * * Raccolta dei dati storici

Testare l'integrazione

  1. Nella console Cloud Scheduler, trova il job.
  2. Fai clic su Forza esecuzione per attivare il job manualmente.
  3. Aspetta alcuni secondi.
  4. Vai a Cloud Run > Servizi.
  5. Fai clic su confluence-audit-collector.
  6. Fai clic sulla scheda Log.
  7. Verifica che la funzione sia stata eseguita correttamente. Cerca quanto segue:

    Fetching logs from YYYY-MM-DDTHH:MM:SS+00:00 to YYYY-MM-DDTHH:MM:SS+00:00
    Page 1: Retrieved X events
    Wrote X records to gs://bucket-name/prefix/logs_YYYYMMDD_HHMMSS.ndjson
    Successfully processed X records
    
  8. Vai a Cloud Storage > Bucket.

  9. Fai clic sul nome del bucket.

  10. Vai alla cartella confluence-audit/.

  11. Verifica che sia stato creato un nuovo file .ndjson con il timestamp corrente.

Se visualizzi errori nei log:

  • HTTP 401: controlla le credenziali API nelle variabili di ambiente
  • HTTP 403: verifica che l'account disponga delle autorizzazioni di amministratore di Confluence
  • HTTP 429: limitazione della frequenza: la funzione riproverà automaticamente con backoff
  • Variabili di ambiente mancanti: controlla che tutte le variabili richieste siano impostate

Recuperare il service account Google SecOps

Google SecOps utilizza un service account univoco per leggere i dati dal tuo bucket GCS. Devi concedere a questo service account l'accesso al tuo bucket.

Recuperare l'email del service account

  1. Vai a Impostazioni SIEM > Feed.
  2. Fai clic su Aggiungi nuovo feed.
  3. Fai clic su Configura un singolo feed.
  4. Nel campo Nome feed, inserisci un nome per il feed (ad esempio, Confluence Cloud Audit Logs).
  5. Seleziona Google Cloud Storage V2 come Tipo di origine.
  6. Seleziona Atlassian Confluence come Tipo di log.
  7. Fai clic su Ottieni service account. Verrà visualizzata un'email univoca del service account, ad esempio:

    chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com
    
  8. Copia questo indirizzo email per utilizzarlo nel passaggio successivo.

Concedi le autorizzazioni IAM al service account Google SecOps

Il service account Google SecOps deve avere il ruolo Visualizzatore oggetti Storage nel bucket GCS.

  1. Vai a Cloud Storage > Bucket.
  2. Fai clic sul nome del bucket.
  3. Vai alla scheda Autorizzazioni.
  4. Fai clic su Concedi l'accesso.
  5. Fornisci i seguenti dettagli di configurazione:
    • Aggiungi entità: incolla l'email del service account Google SecOps.
    • Assegna i ruoli: seleziona Visualizzatore oggetti Storage.
  6. Fai clic su Salva.

Configura un feed in Google SecOps per importare i log di Confluence

  1. Vai a Impostazioni SIEM > Feed.
  2. Fai clic su Aggiungi nuovo feed.
  3. Fai clic su Configura un singolo feed.
  4. Nel campo Nome feed, inserisci un nome per il feed (ad esempio, Confluence Cloud Audit Logs).
  5. Seleziona Google Cloud Storage V2 come Tipo di origine.
  6. Seleziona Atlassian Confluence come Tipo di log.
  7. Fai clic su Avanti.
  8. Specifica i valori per i seguenti parametri di input:

    • URL del bucket di archiviazione: inserisci l'URI del bucket GCS con il percorso del prefisso:

      gs://confluence-audit-logs/confluence-audit/
      
      • Sostituisci:

        • confluence-audit-logs: il nome del bucket GCS.
        • confluence-audit: (Facoltativo) prefisso/percorso della cartella in cui vengono archiviati i log (lascia vuoto per la radice).
      • Esempi:

        • Bucket radice: gs://company-logs/
        • Con prefisso: gs://company-logs/confluence-audit/
        • Con sottocartella: gs://company-logs/confluence/audit/
    • Opzione di eliminazione dell'origine: seleziona l'opzione di eliminazione in base alle tue preferenze:

      • Mai: non elimina mai i file dopo i trasferimenti (opzione consigliata per i test).
      • Elimina file trasferiti: elimina i file dopo il trasferimento riuscito.
      • Elimina file trasferiti e directory vuote: elimina i file e le directory vuote dopo il trasferimento riuscito.

    • Età massima del file: includi i file modificati nell'ultimo numero di giorni. Il valore predefinito è 180 giorni.

    • Spazio dei nomi dell'asset: lo spazio dei nomi dell'asset.

    • Etichette di importazione: l'etichetta da applicare agli eventi di questo feed.

  9. Fai clic su Avanti.

  10. Controlla la nuova configurazione del feed nella schermata Finalizza e poi fai clic su Invia.

Tabella di mappatura UDM

Campo log Mappatura UDM Funzione logica
agente read_only_udm.network.http.user_agent Valore tratto dal campo "agente".
app_protocol read_only_udm.network.application_protocol Derivato dal campo "app_protocol". Se "app_protocol" contiene "HTTPS", "HTTP", "SSH" o "RDP", viene utilizzato il protocollo corrispondente. In caso contrario, il valore predefinito è "UNKNOWN_APPLICATION_PROTOCOL".
app_protocol read_only_udm.network.application_protocol_version Valore estratto dal campo "app_protocol".
auditType.action read_only_udm.security_result.action Derivato dal campo "auditType.action". Se "auditType.action" contiene "successful", il valore è impostato su "ALLOW". Se contiene "restricted", il valore è impostato su "BLOCK".
auditType.action read_only_udm.security_result.summary Valore estratto dal campo "auditType.action" quando "auditType" non è vuoto e "auditType_area" è "SECURITY".
auditType.actionI18nKey read_only_udm.metadata.product_event_type Valore estratto dal campo "auditType.actionI18nKey" quando "auditType" non è vuoto.
auditType.area read_only_udm.security_result.detection_fields.value Valore estratto dal campo "auditType.area" e assegnato al campo "value" di un campo di rilevamento con il campo "key" impostato su "auditType area". Questo mapping viene eseguito quando "auditType" non è vuoto.
auditType.category read_only_udm.security_result.category_details Valore estratto dal campo "auditType.category" quando "auditType" non è vuoto.
auditType.categoryI18nKey read_only_udm.security_result.detection_fields.value Valore estratto dal campo "auditType.categoryI18nKey" e assegnato al campo "value" di un campo di rilevamento con il campo "key" impostato su "auditType categoryI18nKey". Questo mapping viene eseguito quando "auditType" non è vuoto.
auditType.level read_only_udm.security_result.detection_fields.value Valore estratto dal campo "auditType.level" e assegnato al campo "value" di un campo di rilevamento con il campo "key" impostato su "auditType level". Questo mapping viene eseguito quando "auditType" non è vuoto.
author.displayName read_only_udm.principal.user.user_display_name Valore tratto dal campo "author.displayName".
author.externalCollaborator read_only_udm.security_result.about.resource.attribute.labels.value Valore estratto dal campo "author.externalCollaborator" e assegnato al campo "value" di un'etichetta con il campo "key" impostato su "externalCollaborator".
author.id read_only_udm.principal.user.userid Valore estratto dal campo "author.id" quando "author.type" è "user" e "principal_user_present" è "false".
author.isExternalCollaborator read_only_udm.security_result.about.resource.attribute.labels.value Valore estratto dal campo "author.isExternalCollaborator" e assegnato al campo "value" di un'etichetta con il campo "key" impostato su "isExternalCollaborator".
author.name read_only_udm.principal.user.user_display_name Valore tratto dal campo "author.name" quando "author.type" è "user" e "principal_user_present" è "false".
bytes_in read_only_udm.network.received_bytes Valore estratto dal campo "bytes_in" se contiene cifre. In caso contrario, il valore predefinito è 0.
category read_only_udm.security_result.category_details Valore tratto dal campo "category".
changedValues read_only_udm.principal.resource.attribute.labels Itera ogni elemento di "changedValues" e crea etichette con chiavi come "changedValue [index] [key]" e valori corrispondenti nell'array "changedValues".
creationDate read_only_udm.metadata.event_timestamp Valore estratto dal campo "creationDate", analizzato come timestamp UNIX o UNIX_MS.
extraAttributes read_only_udm.principal.resource.attribute.labels Itera ogni elemento in "extraAttributes" e crea etichette con chiavi basate sui campi "name" e "nameI18nKey" e sui valori del campo "value" corrispondente.
http_verb read_only_udm.network.http.method Valore estratto dal campo "http_verb".
ip read_only_udm.target.ip Valore tratto dal campo "ip".
principal_host read_only_udm.principal.hostname Valore tratto dal campo "principal_host".
referral_url read_only_udm.network.http.referral_url Valore tratto dal campo "referral_url".
remoteAddress read_only_udm.principal.ip Valore estratto dal campo "remoteAddress", analizzato come indirizzo IP.
response_code read_only_udm.network.http.response_code Valore estratto dal campo "response_code".
session_duration read_only_udm.additional.fields.value.string_value Valore estratto dal campo "session_duration" e assegnato al campo "string_value" di un'etichetta con il campo "key" impostato su "Session Duration" (Durata sessione).
origine read_only_udm.principal.ip Valore estratto dal campo "origine", analizzato come indirizzo IP.
src_ip read_only_udm.principal.ip Valore estratto dal campo "src_ip" se "remoteAddress" è vuoto.
riepilogo read_only_udm.security_result.summary Valore tratto dal campo "Riepilogo".
sysAdmin read_only_udm.security_result.about.resource.attribute.labels.value Valore estratto dal campo "sysAdmin" e assegnato al campo "value" di un'etichetta con il campo "key" impostato su "sysAdmin".
superAdmin read_only_udm.security_result.about.resource.attribute.labels.value Valore estratto dal campo "superAdmin" e assegnato al campo "value" di un'etichetta con il campo "key" impostato su "superAdmin".
target_url read_only_udm.target.url Valore tratto dal campo "target_url".
timestamp read_only_udm.metadata.event_timestamp Valore estratto dal campo "timestamp", analizzato come stringa di data e ora.
user_id read_only_udm.principal.user.userid Valore tratto dal campo "user_id".
read_only_udm.metadata.event_type Il valore di questo campo è determinato da una serie di controlli e il valore predefinito è "GENERIC_EVENT". È impostato su valori specifici come "NETWORK_HTTP", "USER_UNCATEGORIZED" o "STATUS_UPDATE" in base alla presenza e al contenuto di altri campi come "principal_host", "user_id", "has_principal" e "author.type".
read_only_udm.metadata.vendor_name Imposta il valore su "ATLASSIAN".
read_only_udm.metadata.product_name Imposta su "CONFLUENCE".
read_only_udm.metadata.log_type Imposta il valore su "ATLASSIAN_CONFLUENCE".
read_only_udm.principal.user.user_display_name Il valore di questo campo può provenire da "author.displayName" o "affectedObject.name", a seconda del contesto.
read_only_udm.target.process.pid Il valore di questo campo può provenire da "principal_host" o "pid" a seconda del contesto.
read_only_udm.principal.resource.attribute.labels Questo campo viene compilato con varie etichette derivate da campi come "affectedObjects", "changedValues" e "extraAttributes". Le chiavi e i valori di queste etichette vengono generati dinamicamente in base al contenuto specifico di questi campi.

Hai bisogno di ulteriore assistenza? Ricevi risposte dai membri della community e dai professionisti di Google SecOps.