Mengumpulkan log Atlassian Confluence

Didukung di:

Dokumen ini menjelaskan cara menyerap log Atlassian Confluence ke Google Security Operations. Parser terlebih dahulu mencoba mengekstrak kolom dari pesan log mentah menggunakan ekspresi reguler (pola grok) yang dirancang untuk log Atlassian Confluence. Jika penguraian grok gagal atau log dalam format JSON, kode akan mencoba mengurai pesan sebagai JSON. Terakhir, kolom yang diekstrak dipetakan ke skema UDM Google SecOps dan diperkaya dengan konteks tambahan.

Sebelum memulai

Pastikan Anda memiliki prasyarat berikut:

  • Instance Google SecOps
  • Akun Atlassian Confluence Cloud dengan akses log audit ATAU Confluence Data Center/Server dengan akses administratif
  • Untuk metode berbasis GCP: Akses istimewa ke GCP (GCS, IAM, Cloud Run, Pub/Sub, Cloud Scheduler)
  • Untuk metode Bindplane: Windows Server 2016 atau yang lebih baru, atau host Linux dengan systemd

Ringkasan opsi integrasi

Panduan ini menyediakan dua jalur integrasi:

  • Opsi 1: Confluence Data Center/Server melalui Bindplane + Syslog
  • Opsi 2: Log Audit Confluence Cloud melalui fungsi GCP Cloud Run + GCS (format JSON)

Pilih opsi yang paling sesuai dengan jenis dan infrastruktur deployment Confluence Anda.

Opsi 1: Confluence Data Center/Server melalui Bindplane + Syslog

Opsi ini mengonfigurasi Confluence Data Center atau Server untuk mengirim log melalui syslog ke agen Bindplane, yang kemudian meneruskannya ke Google SecOps.

Mendapatkan file autentikasi penyerapan Google SecOps

  1. Login ke konsol Google SecOps.
  2. Buka Setelan SIEM > Agen Pengumpulan.
  3. Klik Download untuk mendownload file autentikasi penyerapan.
  4. Simpan file dengan aman di sistem tempat agen BindPlane akan diinstal.

Mendapatkan ID pelanggan Google SecOps

  1. Login ke konsol Google SecOps.
  2. Buka Setelan SIEM > Profil.
  3. Salin dan simpan ID Pelanggan dari bagian Detail Organisasi.

Menginstal agen BindPlane

Instal agen Bindplane di sistem operasi Windows atau Linux Anda sesuai dengan petunjuk berikut.

Penginstalan Windows

  1. Buka Command Prompt atau PowerShell sebagai administrator.
  2. Jalankan perintah berikut:

    msiexec /i "https://github.com/observIQ/bindplane-otel-collector/releases/latest/download/observiq-otel-collector.msi" /quiet
    
  3. Tunggu hingga penginstalan selesai.

  4. Verifikasi penginstalan dengan menjalankan:

    sc query observiq-otel-collector
    

Layanan akan ditampilkan sebagai RUNNING.

Penginstalan Linux

  1. Buka terminal dengan hak istimewa root atau sudo.
  2. Jalankan perintah berikut:

    sudo sh -c "$(curl -fsSlL https://github.com/observIQ/bindplane-otel-collector/releases/latest/download/install_unix.sh)" install_unix.sh
    
  3. Tunggu hingga penginstalan selesai.

  4. Verifikasi penginstalan dengan menjalankan:

    sudo systemctl status observiq-otel-collector
    

Layanan akan ditampilkan sebagai aktif (berjalan).

Referensi penginstalan tambahan

Untuk opsi penginstalan dan pemecahan masalah tambahan, lihat Panduan penginstalan agen BindPlane.

Mengonfigurasi agen BindPlane untuk menyerap syslog dan mengirimkannya ke Google SecOps

Cari file konfigurasi

  • Linux:

    sudo nano /etc/bindplane-agent/config.yaml
    
  • Windows:

    notepad "C:\Program Files\observIQ OpenTelemetry Collector\config.yaml"
    

Edit file konfigurasi

  1. Ganti seluruh konten config.yaml dengan konfigurasi berikut:

    receivers:
      udplog:
        listen_address: "0.0.0.0:514"
    
    exporters:
      chronicle/confluence_logs:
        compression: gzip
        creds_file_path: '/etc/bindplane-agent/ingestion-auth.json'
        customer_id: 'YOUR_CUSTOMER_ID'
        endpoint: malachiteingestion-pa.googleapis.com
        log_type: ATLASSIAN_CONFLUENCE
        raw_log_field: body
        ingestion_labels:
          service: confluence
    
    service:
      pipelines:
        logs/confluence:
          receivers:
            - udplog
          exporters:
            - chronicle/confluence_logs
    

Parameter konfigurasi

  • Ganti placeholder berikut:

    • listen_address: Ganti port dan alamat IP sesuai kebutuhan di infrastruktur Anda. Gunakan 0.0.0.0:514 untuk memantau semua antarmuka di port 514.
    • creds_file_path: Perbarui jalur tempat file autentikasi disimpan:
      • Linux: /etc/bindplane-agent/ingestion-auth.json
      • Windows: C:\Program Files\observIQ OpenTelemetry Collector\ingestion-auth.json
    • customer_id: Ganti YOUR_CUSTOMER_ID dengan ID pelanggan sebenarnya dari langkah sebelumnya.
    • endpoint: URL endpoint regional:
      • Amerika Serikat: malachiteingestion-pa.googleapis.com
      • Eropa: europe-malachiteingestion-pa.googleapis.com
      • Asia: asia-southeast1-malachiteingestion-pa.googleapis.com

Simpan file konfigurasi

Setelah mengedit, simpan file:

  • Linux: Tekan Ctrl+O, lalu Enter, lalu Ctrl+X
  • Windows: Klik File > Save

Mulai ulang agen Bindplane untuk menerapkan perubahan

Mulai ulang agen Bindplane di Linux

  1. Untuk memulai ulang agen Bindplane di Linux, jalankan perintah berikut:

    sudo systemctl restart observiq-otel-collector
    
  2. Pastikan layanan sedang berjalan:

    sudo systemctl status observiq-otel-collector
    
  3. Periksa log untuk mengetahui error:

    sudo journalctl -u observiq-otel-collector -f
    

Mulai ulang agen Bindplane di Windows

  1. Untuk memulai ulang agen Bindplane di Windows, pilih salah satu opsi berikut:

    • Menggunakan Command Prompt atau PowerShell sebagai administrator:

      net stop observiq-otel-collector && net start observiq-otel-collector
      
    • Menggunakan konsol Layanan:

      1. Tekan Win+R, ketik services.msc, lalu tekan Enter.
      2. Temukan observIQ OpenTelemetry Collector.
      3. Klik kanan, lalu pilih Mulai Ulang.
      4. Pastikan layanan sedang berjalan:

        sc query observiq-otel-collector
        
      5. Periksa log untuk mengetahui error:

        type "C:\Program Files\observIQ OpenTelemetry Collector\log\collector.log"
        

Mengonfigurasi penerusan Syslog di Confluence Data Center/Server

  1. Konfigurasi Confluence untuk menulis log ke file (perilaku default).
  2. Instal rsyslog jika belum ada:

    sudo apt-get install rsyslog  # Debian/Ubuntu
    sudo yum install rsyslog      # RHEL/CentOS
    
  3. Buat file konfigurasi rsyslog /etc/rsyslog.d/confluence.conf:

    # Forward Confluence logs to Bindplane
    $ModLoad imfile
    
    # Application logs
    $InputFileName /opt/atlassian/confluence/logs/atlassian-confluence.log
    $InputFileTag confluence-app:
    $InputFileStateFile stat-confluence-app
    $InputFileSeverity info
    $InputFileFacility local0
    $InputRunFileMonitor
    
    # Audit logs (JSON format in DC/Server)
    $InputFileName <confluence-home-directory>/log/audit/audit.log
    $InputFileTag confluence-audit:
    $InputFileStateFile stat-confluence-audit
    $InputFileSeverity info
    $InputFileFacility local1
    $InputRunFileMonitor
    
    # Forward to Bindplane agent
    *.* @@BINDPLANE_AGENT_IP:514
    
    • Ganti BINDPLANE_AGENT_IP dengan alamat IP agen BindPlane (misalnya, 192.168.1.100).
    • Sesuaikan jalur file log berdasarkan penginstalan Confluence Anda:
      • Log aplikasi biasanya: <confluence-install>/logs/ atau <local-home>/logs/
      • Log audit: <confluence-home-directory>/log/audit/ (format JSON)
      • Untuk menemukan direktori beranda Confluence, buka Settings > General Configuration > System Information, lalu cari Confluence Home atau Local Home.
  4. Mulai ulang rsyslog:

    sudo systemctl restart rsyslog
    

Opsi B: Mengonfigurasi penerusan Syslog Log4j2

Opsi ini memerlukan modifikasi konfigurasi Log4j2. Opsi A (rsyslog) direkomendasikan untuk kesederhanaan.

  1. Login ke server Confluence Anda melalui SSH atau RDP.
  2. Cari file konfigurasi Log4j2 di:

    <confluence-install>/confluence/WEB-INF/classes/log4j2.xml
    
  3. Edit file konfigurasi untuk menambahkan appender Syslog:

    <Configuration>
      <Appenders>
        <!-- Existing appenders -->
        <Syslog name="SyslogAppender" 
                host="BINDPLANE_AGENT_IP" 
                port="514" 
                protocol="UDP"
                format="RFC5424"
                facility="LOCAL0">
          <PatternLayout pattern="%d{ISO8601} %p [%t] [%c{1}] %m%n"/>
        </Syslog>
      </Appenders>
    
      <Loggers>
        <Root level="info">
          <AppenderRef ref="SyslogAppender"/>
          <!-- Other appender refs -->
        </Root>
    
        <!-- Audit logger -->
        <Logger name="com.atlassian.confluence.event.events.security.AuditEvent" 
                level="info" 
                additivity="false">
          <AppenderRef ref="SyslogAppender"/>
        </Logger>
      </Loggers>
    </Configuration>
    
    • Ganti BINDPLANE_AGENT_IP dengan alamat IP agen BindPlane (misalnya, 192.168.1.100).
  4. Mulai ulang Confluence untuk menerapkan perubahan:

    sudo systemctl restart confluence
    

Opsi 2: Confluence Cloud Audit Logs melalui fungsi GCP Cloud Run dan GCS

Metode ini menggunakan fungsi GCP Cloud Run untuk mengambil log audit secara berkala melalui Confluence Audit REST API dan menyimpannya di GCS untuk penyerapan Google SecOps.

Mengumpulkan kredensial Confluence Cloud API

  1. Login ke akun Atlassian Anda.
  2. Buka https://id.atlassian.com/manage-profile/security/api-tokens.
  3. Klik Create API token.
  4. Masukkan label untuk token (misalnya, Google Security Operations Integration).
  5. Klik Buat.
  6. Salin dan simpan token API secara aman.
  7. Catat URL situs Confluence Cloud Anda (misalnya, https://yoursite.atlassian.net).
  8. Catat alamat email akun Atlassian Anda (digunakan untuk autentikasi).

Verifikasi izin

Untuk memverifikasi bahwa akun memiliki izin yang diperlukan:

  1. Login ke Confluence Cloud.
  2. Klik ikon Setelan (⚙️) di sudut kanan atas.
  3. Jika Anda dapat melihat Monitoring > Audit log di navigasi sebelah kiri, berarti Anda memiliki izin yang diperlukan.
  4. Jika Anda tidak dapat melihat opsi ini, hubungi administrator Anda untuk memberikan izin Administrator Confluence.

Menguji akses API

  • Uji kredensial Anda sebelum melanjutkan integrasi:

    # Replace with your actual credentials
    CONFLUENCE_EMAIL="your-email@example.com"
    CONFLUENCE_API_TOKEN="your-api-token"
    CONFLUENCE_URL="https://yoursite.atlassian.net"
    
    # Test API access
    curl -u "${CONFLUENCE_EMAIL}:${CONFLUENCE_API_TOKEN}" \
      -H "Accept: application/json" \
      "${CONFLUENCE_URL}/wiki/rest/api/audit"
    

Membuat bucket Google Cloud Storage

  1. Buka Google Cloud Console.
  2. Pilih project Anda atau buat project baru.
  3. Di menu navigasi, buka Cloud Storage > Buckets.
  4. Klik Create bucket.
  5. Berikan detail konfigurasi berikut:

    Setelan Nilai
    Beri nama bucket Anda Masukkan nama yang unik secara global (misalnya, confluence-audit-logs)
    Location type Pilih berdasarkan kebutuhan Anda (Region, Dual-region, Multi-region)
    Lokasi Pilih lokasi (misalnya, us-central1)
    Kelas penyimpanan Standar (direkomendasikan untuk log yang sering diakses)
    Access control Seragam (direkomendasikan)
    Alat perlindungan Opsional: Aktifkan pembuatan versi objek atau kebijakan retensi
  6. Klik Buat.

Buat akun layanan untuk Cloud Run Function

Fungsi Cloud Run memerlukan akun layanan dengan izin untuk menulis ke bucket GCS dan dipanggil oleh Pub/Sub.

Membuat akun layanan

  1. Di GCP Console, buka IAM & Admin > Service Accounts.
  2. Klik Create Service Account.
  3. Berikan detail konfigurasi berikut:
    • Nama akun layanan: Masukkan confluence-audit-collector-sa.
    • Deskripsi akun layanan: Masukkan Service account for Cloud Run function to collect Confluence Cloud audit logs.
  4. Klik Create and Continue.
  5. Di bagian Berikan akun layanan ini akses ke project, tambahkan peran berikut:
    1. Klik Pilih peran.
    2. Telusuri dan pilih Storage Object Admin.
    3. Klik + Add another role.
    4. Telusuri dan pilih Cloud Run Invoker.
    5. Klik + Add another role.
    6. Telusuri dan pilih Cloud Functions Invoker.
  6. Klik Lanjutkan.
  7. Klik Selesai.

Peran ini diperlukan untuk:

  • Storage Object Admin: Menulis log ke bucket GCS dan mengelola file status
  • Cloud Run Invoker: Mengizinkan Pub/Sub memanggil fungsi
  • Cloud Functions Invoker: Mengizinkan pemanggilan fungsi

Memberikan izin IAM pada bucket GCS

Beri akun layanan izin tulis di bucket GCS:

  1. Buka Cloud Storage > Buckets.
  2. Klik nama bucket Anda.
  3. Buka tab Izin.
  4. Klik Grant access.
  5. Berikan detail konfigurasi berikut:
    • Tambahkan prinsipal: Masukkan email akun layanan (misalnya, confluence-audit-collector-sa@PROJECT_ID.iam.gserviceaccount.com).
    • Tetapkan peran: Pilih Storage Object Admin.
  6. Klik Simpan.

Membuat topik Pub/Sub

Buat topik Pub/Sub yang akan dipublikasikan oleh Cloud Scheduler dan akan dilanggan oleh fungsi Cloud Run.

  1. Di GCP Console, buka Pub/Sub > Topics.
  2. Klik Create topic.
  3. Berikan detail konfigurasi berikut:
    • ID Topik: Masukkan confluence-audit-trigger.
    • Biarkan setelan lainnya tetap default.
  4. Klik Buat.

Membuat fungsi Cloud Run untuk mengumpulkan log

Fungsi Cloud Run dipicu oleh pesan Pub/Sub dari Cloud Scheduler untuk mengambil log dari Confluence Cloud Audit API dan menuliskannya ke GCS.

  1. Di GCP Console, buka Cloud Run.
  2. Klik Create service.
  3. Pilih Function (gunakan editor inline untuk membuat fungsi).
  4. Di bagian Konfigurasi, berikan detail konfigurasi berikut:

    Setelan Nilai
    Nama layanan confluence-audit-collector
    Wilayah Pilih region yang cocok dengan bucket GCS Anda (misalnya, us-central1)
    Runtime Pilih Python 3.12 atau yang lebih baru
  5. Di bagian Pemicu (opsional):

    1. Klik + Tambahkan pemicu.
    2. Pilih Cloud Pub/Sub.
    3. Di Select a Cloud Pub/Sub topic, pilih confluence-audit-trigger.
    4. Klik Simpan.
  6. Di bagian Authentication:

    1. Pilih Wajibkan autentikasi.
    2. Periksa Identity and Access Management (IAM).
  7. Scroll ke bawah dan luaskan Containers, Networking, Security.

  8. Buka tab Security:

    • Akun layanan: Pilih confluence-audit-collector-sa.
  9. Buka tab Containers:

    1. Klik Variables & Secrets.
    2. Klik + Tambahkan variabel untuk setiap variabel lingkungan:
    Nama Variabel Nilai Contoh Deskripsi
    GCS_BUCKET confluence-audit-logs Nama bucket GCS
    GCS_PREFIX confluence-audit Awalan untuk file log
    STATE_KEY confluence-audit/state.json Jalur file status
    CONFLUENCE_URL https://yoursite.atlassian.net URL situs Confluence
    CONFLUENCE_EMAIL your-email@example.com Email akun Atlassian
    CONFLUENCE_API_TOKEN your-api-token-here Token API
    MAX_RECORDS 1000 Jumlah maksimum data per proses
  10. Di bagian Variables & Secrets, scroll ke bawah ke Requests:

    • Waktu tunggu permintaan: Masukkan 600 detik (10 menit).
  11. Buka tab Setelan:

    • Di bagian Materi:
      • Memori: Pilih 512 MiB atau yang lebih tinggi.
      • CPU: Pilih 1.
  12. Di bagian Penskalaan revisi:

    • Jumlah minimum instance: Masukkan 0.
    • Jumlah maksimum instance: Masukkan 100 (atau sesuaikan berdasarkan perkiraan beban).
  13. Klik Buat.

  14. Tunggu hingga layanan dibuat (1-2 menit).

  15. Setelah layanan dibuat, editor kode inline akan terbuka secara otomatis.

Menambahkan kode fungsi

  1. Masukkan main di Function entry point
  2. Di editor kode inline, buat dua file:

    • File pertama: main.py:
    import functions_framework
    from google.cloud import storage
    import json
    import os
    import urllib3
    from datetime import datetime, timezone, timedelta
    import time
    import base64
    
    # Initialize HTTP client with timeouts
    http = urllib3.PoolManager(
        timeout=urllib3.Timeout(connect=5.0, read=30.0),
        retries=False,
    )
    
    # Initialize Storage client
    storage_client = storage.Client()
    
    # Environment variables
    GCS_BUCKET = os.environ.get('GCS_BUCKET')
    GCS_PREFIX = os.environ.get('GCS_PREFIX', 'confluence-audit/')
    STATE_KEY = os.environ.get('STATE_KEY', 'confluence-audit/state.json')
    CONFLUENCE_URL = os.environ.get('CONFLUENCE_URL')
    CONFLUENCE_EMAIL = os.environ.get('CONFLUENCE_EMAIL')
    CONFLUENCE_API_TOKEN = os.environ.get('CONFLUENCE_API_TOKEN')
    MAX_RECORDS = int(os.environ.get('MAX_RECORDS', '1000'))
    
    def to_unix_millis(dt: datetime) -> int:
        """Convert datetime to Unix epoch milliseconds."""
        if dt.tzinfo is None:
            dt = dt.replace(tzinfo=timezone.utc)
        dt = dt.astimezone(timezone.utc)
        return int(dt.timestamp() * 1000)
    
    def parse_datetime(value: str) -> datetime:
        """Parse ISO datetime string to datetime object."""
        if value.endswith("Z"):
            value = value[:-1] + "+00:00"
        return datetime.fromisoformat(value)
    
    @functions_framework.cloud_event
    def main(cloud_event):
        """
        Cloud Run function triggered by Pub/Sub to fetch Confluence Cloud audit logs and write to GCS.
    
        Args:
            cloud_event: CloudEvent object containing Pub/Sub message
        """
    
        if not all([GCS_BUCKET, CONFLUENCE_URL, CONFLUENCE_EMAIL, CONFLUENCE_API_TOKEN]):
            print('Error: Missing required environment variables')
            return
    
        try:
            # Get GCS bucket
            bucket = storage_client.bucket(GCS_BUCKET)
    
            # Load state
            state = load_state(bucket, STATE_KEY)
    
            # Determine time window
            now = datetime.now(timezone.utc)
            last_time = None
    
            if isinstance(state, dict) and state.get("last_event_time"):
                try:
                    last_time = parse_datetime(state["last_event_time"])
                    # Overlap by 2 minutes to catch any delayed events
                    last_time = last_time - timedelta(minutes=2)
                except Exception as e:
                    print(f"Warning: Could not parse last_event_time: {e}")
    
            if last_time is None:
                last_time = now - timedelta(hours=24)
    
            print(f"Fetching logs from {last_time.isoformat()} to {now.isoformat()}")
    
            # Convert to Unix milliseconds
            start_millis = to_unix_millis(last_time)
            end_millis = to_unix_millis(now)
    
            # Fetch logs
            records, newest_event_time = fetch_logs(
                api_base=CONFLUENCE_URL,
                email=CONFLUENCE_EMAIL,
                api_token=CONFLUENCE_API_TOKEN,
                start_time_ms=start_millis,
                end_time_ms=end_millis,
                max_records=MAX_RECORDS,
            )
    
            if not records:
                print("No new log records found.")
                save_state(bucket, STATE_KEY, now.isoformat())
                return
    
            # Write to GCS as NDJSON
            timestamp = now.strftime('%Y%m%d_%H%M%S')
            object_key = f"{GCS_PREFIX}/logs_{timestamp}.ndjson"
            blob = bucket.blob(object_key)
    
            ndjson = '\n'.join([json.dumps(record, ensure_ascii=False) for record in records]) + '\n'
            blob.upload_from_string(ndjson, content_type='application/x-ndjson')
    
            print(f"Wrote {len(records)} records to gs://{GCS_BUCKET}/{object_key}")
    
            # Update state with newest event time
            if newest_event_time:
                save_state(bucket, STATE_KEY, newest_event_time)
            else:
                save_state(bucket, STATE_KEY, now.isoformat())
    
            print(f"Successfully processed {len(records)} records")
    
        except Exception as e:
            print(f'Error processing logs: {str(e)}')
            raise
    
    def load_state(bucket, key):
        """Load state from GCS."""
        try:
            blob = bucket.blob(key)
            if blob.exists():
                state_data = blob.download_as_text()
                return json.loads(state_data)
        except Exception as e:
            print(f"Warning: Could not load state: {e}")
    
        return {}
    
    def save_state(bucket, key, last_event_time_iso: str):
        """Save the last event timestamp to GCS state file."""
        try:
            state = {'last_event_time': last_event_time_iso}
            blob = bucket.blob(key)
            blob.upload_from_string(
                json.dumps(state, indent=2),
                content_type='application/json'
            )
            print(f"Saved state: last_event_time={last_event_time_iso}")
        except Exception as e:
            print(f"Warning: Could not save state: {e}")
    
    def fetch_logs(api_base: str, email: str, api_token: str, start_time_ms: int, end_time_ms: int, max_records: int):
        """
        Fetch logs from Confluence Cloud Audit API with pagination and rate limiting.
    
        Args:
            api_base: Confluence site URL
            email: Atlassian account email
            api_token: API token
            start_time_ms: Start time in Unix milliseconds
            end_time_ms: End time in Unix milliseconds
            max_records: Maximum total records to fetch
    
        Returns:
            Tuple of (records list, newest_event_time ISO string)
        """
        # Clean up URL
        base_url = api_base.rstrip('/')
    
        # Build authentication header
        auth_string = f"{email}:{api_token}"
        auth_bytes = auth_string.encode('utf-8')
        auth_b64 = base64.b64encode(auth_bytes).decode('utf-8')
        headers = {
            'Authorization': f'Basic {auth_b64}',
            'Accept': 'application/json',
            'User-Agent': 'GoogleSecOps-ConfluenceCollector/1.0'
        }
    
        records = []
        newest_time = None
        page_num = 0
        backoff = 1.0
        start_index = 0
    
        while True:
            page_num += 1
    
            if len(records) >= max_records:
                print(f"Reached max_records limit ({max_records})")
                break
    
            # Build request URL
            url = f"{base_url}/wiki/rest/api/audit?startDate={start_time_ms}&endDate={end_time_ms}&start={start_index}&limit=100"
    
            try:
                response = http.request('GET', url, headers=headers)
    
                # Handle rate limiting with exponential backoff
                if response.status == 429:
                    retry_after = int(response.headers.get('Retry-After', str(int(backoff))))
                    print(f"Rate limited (429). Retrying after {retry_after}s...")
                    time.sleep(retry_after)
                    backoff = min(backoff * 2, 30.0)
                    continue
    
                backoff = 1.0
    
                if response.status != 200:
                    print(f"HTTP Error: {response.status}")
                    response_text = response.data.decode('utf-8')
                    print(f"Response body: {response_text}")
                    return [], None
    
                data = json.loads(response.data.decode('utf-8'))
    
                page_results = data.get('results', [])
    
                if not page_results:
                    print(f"No more results (empty page)")
                    break
    
                print(f"Page {page_num}: Retrieved {len(page_results)} events")
                records.extend(page_results)
    
                # Track newest event time
                for event in page_results:
                    try:
                        # creationDate is in Unix milliseconds
                        event_time_ms = event.get('creationDate')
                        if event_time_ms:
                            event_dt = datetime.fromtimestamp(event_time_ms / 1000, tz=timezone.utc)
                            event_time = event_dt.isoformat()
                            if newest_time is None or parse_datetime(event_time) > parse_datetime(newest_time):
                                newest_time = event_time
                    except Exception as e:
                        print(f"Warning: Could not parse event time: {e}")
    
                # Check for more results
                current_size = data.get('size', 0)
                if current_size < 100:
                    print(f"Reached last page (size={current_size} < limit=100)")
                    break
    
                start_index += current_size
    
            except Exception as e:
                print(f"Error fetching logs: {e}")
                return [], None
    
        print(f"Retrieved {len(records)} total records from {page_num} pages")
        return records[:max_records], newest_time
    
    • File kedua: requirements.txt:
    functions-framework==3.*
    google-cloud-storage==2.*
    urllib3>=2.0.0
    
  3. Klik Deploy untuk menyimpan dan men-deploy fungsi.

  4. Tunggu hingga deployment selesai (2-3 menit).

Buat tugas Cloud Scheduler

Cloud Scheduler memublikasikan pesan ke topik Pub/Sub secara berkala, sehingga memicu fungsi Cloud Run.

  1. Di GCP Console, buka Cloud Scheduler.
  2. Klik Create Job.
  3. Berikan detail konfigurasi berikut:

    Setelan Nilai
    Nama confluence-audit-collector-hourly
    Wilayah Pilih region yang sama dengan fungsi Cloud Run
    Frekuensi 0 * * * * (setiap jam, tepat pada waktunya)
    Zona waktu Pilih zona waktu (UTC direkomendasikan)
    Jenis target Pub/Sub
    Topik Pilih confluence-audit-trigger
    Isi pesan {} (objek JSON kosong)
  4. Klik Buat.

Opsi frekuensi jadwal

  • Pilih frekuensi berdasarkan volume log dan persyaratan latensi:

    Frekuensi Ekspresi Cron Kasus Penggunaan
    Setiap 5 menit */5 * * * * Volume tinggi, latensi rendah
    Setiap 15 menit */15 * * * * Volume sedang
    Setiap jam 0 * * * * Standar (direkomendasikan)
    Setiap 6 jam 0 */6 * * * Volume rendah, pemrosesan batch
    Harian 0 0 * * * Pengumpulan data historis

Menguji integrasi

  1. Di konsol Cloud Scheduler, temukan tugas Anda.
  2. Klik Force run untuk memicu tugas secara manual.
  3. Tunggu beberapa detik.
  4. Buka Cloud Run > Services.
  5. Klik confluence-audit-collector.
  6. Klik tab Logs.
  7. Pastikan fungsi berhasil dieksekusi. Cari hal berikut:

    Fetching logs from YYYY-MM-DDTHH:MM:SS+00:00 to YYYY-MM-DDTHH:MM:SS+00:00
    Page 1: Retrieved X events
    Wrote X records to gs://bucket-name/prefix/logs_YYYYMMDD_HHMMSS.ndjson
    Successfully processed X records
    
  8. Buka Cloud Storage > Buckets.

  9. Klik nama bucket Anda.

  10. Buka folder confluence-audit/.

  11. Pastikan file .ndjson baru dibuat dengan stempel waktu saat ini.

Jika Anda melihat error dalam log:

  • HTTP 401: Periksa kredensial API di variabel lingkungan
  • HTTP 403: Verifikasi bahwa akun memiliki izin Administrator Confluence
  • HTTP 429: Pembatasan kecepatan - fungsi akan otomatis mencoba lagi dengan penundaan
  • Variabel lingkungan tidak ada: Periksa apakah semua variabel yang diperlukan telah ditetapkan

Mengambil akun layanan Google SecOps

Google SecOps menggunakan akun layanan unik untuk membaca data dari bucket GCS Anda. Anda harus memberi akun layanan ini akses ke bucket Anda.

Dapatkan email akun layanan

  1. Buka Setelan SIEM > Feed.
  2. Klik Tambahkan Feed Baru.
  3. Klik Konfigurasi satu feed.
  4. Di kolom Nama feed, masukkan nama untuk feed (misalnya, Confluence Cloud Audit Logs).
  5. Pilih Google Cloud Storage V2 sebagai Source type.
  6. Pilih Atlassian Confluence sebagai Jenis log.
  7. Klik Get Service Account. Email akun layanan yang unik akan ditampilkan, misalnya:

    chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com
    
  8. Salin alamat email ini untuk digunakan di langkah berikutnya.

Memberikan izin IAM ke akun layanan Google SecOps

Akun layanan Google SecOps memerlukan peran Storage Object Viewer di bucket GCS Anda.

  1. Buka Cloud Storage > Buckets.
  2. Klik nama bucket Anda.
  3. Buka tab Izin.
  4. Klik Grant access.
  5. Berikan detail konfigurasi berikut:
    • Add principals: Tempel email akun layanan Google SecOps.
    • Tetapkan peran: Pilih Storage Object Viewer.
  6. Klik Simpan.

Mengonfigurasi feed di Google SecOps untuk memproses log Confluence

  1. Buka Setelan SIEM > Feed.
  2. Klik Tambahkan Feed Baru.
  3. Klik Konfigurasi satu feed.
  4. Di kolom Nama feed, masukkan nama untuk feed (misalnya, Confluence Cloud Audit Logs).
  5. Pilih Google Cloud Storage V2 sebagai Source type.
  6. Pilih Atlassian Confluence sebagai Jenis log.
  7. Klik Berikutnya.
  8. Tentukan nilai untuk parameter input berikut:

    • URL bucket penyimpanan: Masukkan URI bucket GCS dengan jalur awalan:

      gs://confluence-audit-logs/confluence-audit/
      
      • Ganti:

        • confluence-audit-logs: Nama bucket GCS Anda.
        • confluence-audit: Awalan/jalur folder opsional tempat log disimpan (biarkan kosong untuk root).
      • Contoh:

        • Bucket root: gs://company-logs/
        • Dengan awalan: gs://company-logs/confluence-audit/
        • Dengan subfolder: gs://company-logs/confluence/audit/
    • Opsi penghapusan sumber: Pilih opsi penghapusan sesuai preferensi Anda:

      • Jangan pernah: Tidak pernah menghapus file apa pun setelah transfer (direkomendasikan untuk pengujian).
      • Hapus file yang ditransfer: Menghapus file setelah transfer berhasil.
      • Hapus file yang ditransfer dan direktori kosong: Menghapus file dan direktori kosong setelah transfer berhasil.

    • Usia File Maksimum: Menyertakan file yang diubah dalam beberapa hari terakhir. Defaultnya adalah 180 hari.

    • Namespace aset: Namespace aset.

    • Label penyerapan: Label yang akan diterapkan ke peristiwa dari feed ini.

  9. Klik Berikutnya.

  10. Tinjau konfigurasi feed baru Anda di layar Selesaikan, lalu klik Kirim.

Tabel pemetaan UDM

Kolom Log Pemetaan UDM Logika
agen read_only_udm.network.http.user_agent Nilai diambil dari kolom "agent".
app_protocol read_only_udm.network.application_protocol Diperoleh dari kolom "app_protocol". Jika "app_protocol" berisi "HTTPS", "HTTP", "SSH", atau "RDP", protokol yang sesuai akan digunakan. Jika tidak, defaultnya adalah "UNKNOWN_APPLICATION_PROTOCOL".
app_protocol read_only_udm.network.application_protocol_version Nilai diambil dari kolom "app_protocol".
auditType.action read_only_udm.security_result.action Diperoleh dari kolom "auditType.action". Jika "auditType.action" berisi "successful", nilai ditetapkan ke "ALLOW". Jika berisi "restricted", nilainya ditetapkan ke "BLOCK".
auditType.action read_only_udm.security_result.summary Nilai diambil dari kolom "auditType.action" jika "auditType" tidak kosong dan "auditType_area" adalah "SECURITY".
auditType.actionI18nKey read_only_udm.metadata.product_event_type Nilai diambil dari kolom "auditType.actionI18nKey" jika "auditType" tidak kosong.
auditType.area read_only_udm.security_result.detection_fields.value Nilai diambil dari kolom "auditType.area" dan ditetapkan ke kolom "value" dari kolom deteksi dengan kolom "key" yang ditetapkan ke "auditType area". Pemetaan ini dilakukan saat "auditType" tidak kosong.
auditType.category read_only_udm.security_result.category_details Nilai diambil dari kolom "auditType.category" jika "auditType" tidak kosong.
auditType.categoryI18nKey read_only_udm.security_result.detection_fields.value Nilai diambil dari kolom "auditType.categoryI18nKey" dan ditetapkan ke kolom "value" dari kolom deteksi dengan kolom "key" yang ditetapkan ke "auditType categoryI18nKey". Pemetaan ini dilakukan saat "auditType" tidak kosong.
auditType.level read_only_udm.security_result.detection_fields.value Nilai diambil dari kolom "auditType.level" dan ditetapkan ke kolom "value" dari kolom deteksi dengan kolom "key" yang ditetapkan ke "auditType level". Pemetaan ini dilakukan saat "auditType" tidak kosong.
author.displayName read_only_udm.principal.user.user_display_name Nilai diambil dari kolom "author.displayName".
author.externalCollaborator read_only_udm.security_result.about.resource.attribute.labels.value Nilai diambil dari kolom "author.externalCollaborator" dan ditetapkan ke kolom "value" dari label dengan kolom "key" yang ditetapkan ke "externalCollaborator".
author.id read_only_udm.principal.user.userid Nilai diambil dari kolom "author.id" jika "author.type" adalah "user" dan "principal_user_present" adalah "false".
author.isExternalCollaborator read_only_udm.security_result.about.resource.attribute.labels.value Nilai diambil dari kolom "author.isExternalCollaborator" dan ditetapkan ke kolom "value" dari label dengan kolom "key" yang ditetapkan ke "isExternalCollaborator".
author.name read_only_udm.principal.user.user_display_name Nilai diambil dari kolom "author.name" jika "author.type" adalah "user" dan "principal_user_present" adalah "false".
bytes_in read_only_udm.network.received_bytes Nilai diambil dari kolom "bytes_in" jika berisi digit. Jika tidak, nilai defaultnya adalah 0.
kategori read_only_udm.security_result.category_details Nilai diambil dari kolom "category".
changedValues read_only_udm.principal.resource.attribute.labels Melakukan iterasi melalui setiap elemen dalam "changedValues" dan membuat label dengan kunci seperti "changedValue [index] [key]" dan nilai dari nilai yang sesuai dalam array "changedValues".
creationDate read_only_udm.metadata.event_timestamp Nilai diambil dari kolom "creationDate", diuraikan sebagai stempel waktu UNIX atau UNIX_MS.
extraAttributes read_only_udm.principal.resource.attribute.labels Melakukan iterasi melalui setiap elemen di "extraAttributes" dan membuat label dengan kunci berdasarkan kolom "name" dan "nameI18nKey" serta nilai dari kolom "value" yang sesuai.
http_verb read_only_udm.network.http.method Nilai diambil dari kolom "http_verb".
ip read_only_udm.target.ip Nilai diambil dari kolom "ip".
principal_host read_only_udm.principal.hostname Nilai diambil dari kolom "principal_host".
referral_url read_only_udm.network.http.referral_url Nilai diambil dari kolom "referral_url".
remoteAddress read_only_udm.principal.ip Nilai diambil dari kolom "remoteAddress", diuraikan sebagai alamat IP.
response_code read_only_udm.network.http.response_code Nilai diambil dari kolom "response_code".
Durasi sesi read_only_udm.additional.fields.value.string_value Nilai diambil dari kolom "session_duration" dan ditetapkan ke kolom "string_value" dari label dengan kolom "key" yang ditetapkan ke "Durasi Sesi".
source read_only_udm.principal.ip Nilai diambil dari kolom "source", diuraikan sebagai alamat IP.
src_ip read_only_udm.principal.ip Nilai diambil dari kolom "src_ip" jika "remoteAddress" kosong.
ringkasan read_only_udm.security_result.summary Nilai diambil dari kolom "summary".
sysAdmin read_only_udm.security_result.about.resource.attribute.labels.value Nilai diambil dari kolom "sysAdmin" dan ditetapkan ke kolom "value" dari label dengan kolom "key" yang ditetapkan ke "sysAdmin".
superAdmin read_only_udm.security_result.about.resource.attribute.labels.value Nilai diambil dari kolom "superAdmin" dan ditetapkan ke kolom "value" dari label dengan kolom "key" yang ditetapkan ke "superAdmin".
target_url read_only_udm.target.url Nilai diambil dari kolom "target_url".
timestamp read_only_udm.metadata.event_timestamp Nilai diambil dari kolom "timestamp", diuraikan sebagai string tanggal dan waktu.
user_id read_only_udm.principal.user.userid Nilai diambil dari kolom "user_id".
read_only_udm.metadata.event_type Nilai kolom ini ditentukan oleh serangkaian pemeriksaan dan secara default adalah "GENERIC_EVENT". Nilai ini ditetapkan ke nilai tertentu seperti "NETWORK_HTTP", "USER_UNCATEGORIZED", atau "STATUS_UPDATE" berdasarkan keberadaan dan konten kolom lain seperti "principal_host", "user_id", "has_principal", dan "author.type".
read_only_udm.metadata.vendor_name Tetapkan ke "ATLASSIAN".
read_only_udm.metadata.product_name Tetapkan ke "CONFLUENCE".
read_only_udm.metadata.log_type Tetapkan ke "ATLASSIAN_CONFLUENCE".
read_only_udm.principal.user.user_display_name Nilai kolom ini dapat berasal dari "author.displayName" atau "affectedObject.name", bergantung pada konteksnya.
read_only_udm.target.process.pid Nilai kolom ini dapat berasal dari "principal_host" atau "pid", bergantung pada konteksnya.
read_only_udm.principal.resource.attribute.labels Kolom ini diisi dengan berbagai label yang berasal dari kolom seperti "affectedObjects", "changedValues", dan "extraAttributes". Kunci dan nilai label ini dibuat secara dinamis berdasarkan konten spesifik dari kolom ini.

Perlu bantuan lebih lanjut? Dapatkan jawaban dari anggota Komunitas dan profesional Google SecOps.