收集 Atlassian Confluence 記錄

支援的國家/地區:

本文說明如何將 Atlassian Confluence 記錄擷取至 Google Security Operations。剖析器會先嘗試使用專為 Atlassian Confluence 記錄設計的規則運算式 (grok 模式),從原始記錄訊息中擷取欄位。如果 Grok 剖析失敗或記錄檔為 JSON 格式,程式碼就會嘗試將訊息剖析為 JSON。最後,系統會將擷取的欄位對應至 Google SecOps UDM 架構,並加入額外背景資訊。

事前準備

請確認您已完成下列事前準備事項:

  • Google SecOps 執行個體
  • 具備稽核記錄存取權的 Atlassian Confluence Cloud 帳戶,或具備管理員存取權的 Confluence Data Center/Server
  • 如為 GCP 基礎方法:GCP 的特殊存取權 (GCS、IAM、Cloud Run、Pub/Sub、Cloud Scheduler)
  • Bindplane 方法:Windows Server 2016 以上版本,或搭載 systemd 的 Linux 主機

整合選項總覽

本指南提供兩種整合路徑:

  • 選項 1:透過 Bindplane + Syslog 收集 Confluence Data Center/Server 資料
  • 選項 2:透過 GCP Cloud Run 函式 + GCS 取得 Confluence Cloud 稽核記錄 (JSON 格式)

請選擇最符合 Confluence 部署類型和基礎架構的選項。

選項 1:透過 Bindplane + Syslog 傳送 Confluence Data Center/Server

這個選項會設定 Confluence Data Center 或 Server,透過系統記錄將記錄傳送至 Bindplane 代理程式,然後轉送至 Google SecOps。

取得 Google SecOps 擷取驗證檔案

  1. 登入 Google SecOps 控制台。
  2. 依序前往「SIEM 設定」>「收集代理程式」
  3. 按一下「下載」即可下載擷取驗證檔案
  4. 將檔案安全地儲存在要安裝 Bindplane 代理程式的系統中。

取得 Google SecOps 客戶 ID

  1. 登入 Google SecOps 控制台。
  2. 依序前往「SIEM 設定」>「設定檔」
  3. 複製並儲存「機構詳細資料」專區中的客戶 ID

安裝 Bindplane 代理程式

請按照下列操作說明,在 Windows 或 Linux 作業系統上安裝 Bindplane 代理程式。

Windows 安裝

  1. 以管理員身分開啟「命令提示字元」或「PowerShell」
  2. 執行下列指令:

    msiexec /i "https://github.com/observIQ/bindplane-otel-collector/releases/latest/download/observiq-otel-collector.msi" /quiet
    
  3. 等待安裝完成。

  4. 執行下列指令來驗證安裝:

    sc query observiq-otel-collector
    

服務應顯示為「RUNNING」(執行中)

安裝 Linux

  1. 開啟具備根層級或 sudo 權限的終端機。
  2. 執行下列指令:

    sudo sh -c "$(curl -fsSlL https://github.com/observIQ/bindplane-otel-collector/releases/latest/download/install_unix.sh)" install_unix.sh
    
  3. 等待安裝完成。

  4. 執行下列指令來驗證安裝:

    sudo systemctl status observiq-otel-collector
    

服務應顯示為啟用 (執行中)

其他安裝資源

如需其他安裝選項和疑難排解資訊,請參閱 Bindplane 代理程式安裝指南

設定 Bindplane 代理程式,擷取系統記錄檔並傳送至 Google SecOps

找出設定檔

  • Linux:

    sudo nano /etc/bindplane-agent/config.yaml
    
  • Windows:

    notepad "C:\Program Files\observIQ OpenTelemetry Collector\config.yaml"
    

編輯設定檔

  1. config.yaml 的所有內容替換為下列設定:

    receivers:
      udplog:
        listen_address: "0.0.0.0:514"
    
    exporters:
      chronicle/confluence_logs:
        compression: gzip
        creds_file_path: '/etc/bindplane-agent/ingestion-auth.json'
        customer_id: 'YOUR_CUSTOMER_ID'
        endpoint: malachiteingestion-pa.googleapis.com
        log_type: ATLASSIAN_CONFLUENCE
        raw_log_field: body
        ingestion_labels:
          service: confluence
    
    service:
      pipelines:
        logs/confluence:
          receivers:
            - udplog
          exporters:
            - chronicle/confluence_logs
    

設定參數

  • 請替換下列預留位置:

    • listen_address:視基礎架構需求替換通訊埠和 IP 位址。使用 0.0.0.0:514 監聽通訊埠 514 上的所有介面。
    • creds_file_path:更新驗證檔案的儲存路徑:
      • Linux/etc/bindplane-agent/ingestion-auth.json
      • WindowsC:\Program Files\observIQ OpenTelemetry Collector\ingestion-auth.json
    • customer_id:將 YOUR_CUSTOMER_ID 替換為上一個步驟中的實際客戶 ID。
    • endpoint:區域端點網址:
      • 美國malachiteingestion-pa.googleapis.com
      • 歐洲europe-malachiteingestion-pa.googleapis.com
      • 亞洲asia-southeast1-malachiteingestion-pa.googleapis.com

儲存設定檔

編輯完成後,請儲存檔案:

  • Linux:依序按下 Ctrl+OEnterCtrl+X
  • Windows:依序點選「檔案」>「儲存」

重新啟動 Bindplane 代理程式,以套用變更

在 Linux 中重新啟動 Bindplane 代理程式

  1. 如要在 Linux 中重新啟動 Bindplane 代理程式,請執行下列指令:

    sudo systemctl restart observiq-otel-collector
    
  2. 確認服務正在執行:

    sudo systemctl status observiq-otel-collector
    
  3. 檢查記錄中是否有錯誤:

    sudo journalctl -u observiq-otel-collector -f
    

在 Windows 中重新啟動 Bindplane 代理程式

  1. 如要在 Windows 中重新啟動 Bindplane 代理程式,請選擇下列任一做法:

    • 以管理員身分使用命令提示字元或 PowerShell:

      net stop observiq-otel-collector && net start observiq-otel-collector
      
    • 使用服務控制台:

      1. 按下 Win+R 鍵,輸入 services.msc,然後按下 Enter 鍵。
      2. 找出 observIQ OpenTelemetry Collector
      3. 按一下滑鼠右鍵,然後選取「重新啟動」
      4. 確認服務正在執行:

        sc query observiq-otel-collector
        
      5. 檢查記錄中是否有錯誤:

        type "C:\Program Files\observIQ OpenTelemetry Collector\log\collector.log"
        

在 Confluence Data Center/Server 上設定 Syslog 轉送

  1. 設定 Confluence 將記錄檔寫入檔案 (預設行為)。
  2. 如果沒有 rsyslog,請安裝:

    sudo apt-get install rsyslog  # Debian/Ubuntu
    sudo yum install rsyslog      # RHEL/CentOS
    
  3. 建立 rsyslog 設定檔 /etc/rsyslog.d/confluence.conf

    # Forward Confluence logs to Bindplane
    $ModLoad imfile
    
    # Application logs
    $InputFileName /opt/atlassian/confluence/logs/atlassian-confluence.log
    $InputFileTag confluence-app:
    $InputFileStateFile stat-confluence-app
    $InputFileSeverity info
    $InputFileFacility local0
    $InputRunFileMonitor
    
    # Audit logs (JSON format in DC/Server)
    $InputFileName <confluence-home-directory>/log/audit/audit.log
    $InputFileTag confluence-audit:
    $InputFileStateFile stat-confluence-audit
    $InputFileSeverity info
    $InputFileFacility local1
    $InputRunFileMonitor
    
    # Forward to Bindplane agent
    *.* @@BINDPLANE_AGENT_IP:514
    
    • BINDPLANE_AGENT_IP 替換為 Bindplane 代理程式的 IP 位址 (例如 192.168.1.100)。
    • 根據 Confluence 安裝位置調整記錄檔路徑:
      • 應用程式記錄檔通常會:<confluence-install>/logs/<local-home>/logs/
      • 稽核記錄:<confluence-home-directory>/log/audit/ (JSON 格式)
      • 如要尋找 Confluence 主目錄,請依序前往「設定」>「一般設定」>「系統資訊」,然後尋找「Confluence Home」或「Local Home」
  4. 重新啟動 rsyslog:

    sudo systemctl restart rsyslog
    

方法 B:設定 Log4j2 Syslog 轉送

如要使用這個選項,必須修改 Log4j2 設定。建議使用選項 A (rsyslog),因為較為簡單。

  1. 透過 SSH 或 RDP 登入 Confluence 伺服器。
  2. 在下列位置找出 Log4j2 設定檔:

    <confluence-install>/confluence/WEB-INF/classes/log4j2.xml
    
  3. 編輯設定檔,新增 Syslog 附加程式:

    <Configuration>
      <Appenders>
        <!-- Existing appenders -->
        <Syslog name="SyslogAppender" 
                host="BINDPLANE_AGENT_IP" 
                port="514" 
                protocol="UDP"
                format="RFC5424"
                facility="LOCAL0">
          <PatternLayout pattern="%d{ISO8601} %p [%t] [%c{1}] %m%n"/>
        </Syslog>
      </Appenders>
    
      <Loggers>
        <Root level="info">
          <AppenderRef ref="SyslogAppender"/>
          <!-- Other appender refs -->
        </Root>
    
        <!-- Audit logger -->
        <Logger name="com.atlassian.confluence.event.events.security.AuditEvent" 
                level="info" 
                additivity="false">
          <AppenderRef ref="SyslogAppender"/>
        </Logger>
      </Loggers>
    </Configuration>
    
    • BINDPLANE_AGENT_IP 替換為 Bindplane 代理程式的 IP 位址 (例如 192.168.1.100)。
  4. 重新啟動 Confluence 以套用變更:

    sudo systemctl restart confluence
    

選項 2:透過 GCP Cloud Run 函式和 GCS 取得 Confluence Cloud 稽核記錄

這個方法會使用 GCP Cloud Run 函式,透過 Confluence Audit REST API 定期擷取稽核記錄,並將這些記錄儲存在 GCS 中,供 Google SecOps 擷取。

收集 Confluence Cloud API 憑證

  1. 登入 Atlassian 帳戶
  2. 前往 https://id.atlassian.com/manage-profile/security/api-tokens
  3. 按一下「Create API Token」(建立 API 憑證)
  4. 輸入權杖的標籤 (例如 Google Security Operations Integration)。
  5. 點選「建立」
  6. 複製並妥善儲存 API 權杖。
  7. 記下 Confluence Cloud 網站網址 (例如 https://yoursite.atlassian.net)。
  8. 記下 Atlassian 帳戶電子郵件地址 (用於驗證)。

驗證權限

如要確認帳戶是否具備必要權限,請按照下列步驟操作:

  1. 登入 Confluence Cloud。
  2. 按一下右上角的「設定」圖示 (⚙️)。
  3. 如果左側導覽面板顯示「監控」>「稽核記錄」,表示您具備必要權限。
  4. 如果沒有看到這個選項,請與管理員聯絡,授予 Confluence 管理員權限。

測試 API 存取權

  • 請先測試憑證,再繼續進行整合:

    # Replace with your actual credentials
    CONFLUENCE_EMAIL="your-email@example.com"
    CONFLUENCE_API_TOKEN="your-api-token"
    CONFLUENCE_URL="https://yoursite.atlassian.net"
    
    # Test API access
    curl -u "${CONFLUENCE_EMAIL}:${CONFLUENCE_API_TOKEN}" \
      -H "Accept: application/json" \
      "${CONFLUENCE_URL}/wiki/rest/api/audit"
    

建立 Google Cloud Storage 值區

  1. 前往 Google Cloud 控制台
  2. 選取專案或建立新專案。
  3. 在導覽選單中,依序前往「Cloud Storage」>「Bucket」
  4. 按一下「建立值區」
  5. 請提供下列設定詳細資料:

    設定
    為 bucket 命名 輸入全域不重複的名稱 (例如 confluence-audit-logs)
    位置類型 根據需求選擇 (區域、雙區域、多區域)
    位置 選取地點 (例如 us-central1)
    儲存空間級別 標準 (建議用於經常存取的記錄)
    存取控管 統一 (建議)
    保護工具 選用:啟用物件版本管理或保留政策
  6. 點選「建立」

為 Cloud Run 函式建立服務帳戶

Cloud Run 函式需要具備 GCS bucket 寫入權限的服務帳戶,並由 Pub/Sub 叫用。

建立服務帳戶

  1. GCP 主控台中,依序前往「IAM & Admin」(IAM 與管理) >「Service Accounts」(服務帳戶)
  2. 按一下 [Create Service Account] (建立服務帳戶)
  3. 請提供下列設定詳細資料:
    • 服務帳戶名稱:輸入 confluence-audit-collector-sa
    • 服務帳戶說明:輸入 Service account for Cloud Run function to collect Confluence Cloud audit logs
  4. 按一下「建立並繼續」
  5. 在「將專案存取權授予這個服務帳戶」部分,新增下列角色:
    1. 按一下「選擇角色」
    2. 搜尋並選取「Storage 物件管理員」
    3. 點選「+ 新增其他角色」
    4. 搜尋並選取「Cloud Run Invoker」
    5. 點選「+ 新增其他角色」
    6. 搜尋並選取「Cloud Functions Invoker」(Cloud Functions 叫用者)
  6. 按一下「繼續」
  7. 按一下 [完成]。

這些角色適用於:

  • Storage 物件管理員:將記錄檔寫入 GCS 值區,並管理狀態檔案
  • Cloud Run 叫用者:允許 Pub/Sub 叫用函式
  • Cloud Functions 叫用者:允許函式叫用

授予 GCS 值區的 IAM 權限

授予服務帳戶 GCS bucket 的寫入權限:

  1. 依序前往「Cloud Storage」>「Buckets」
  2. 按一下 bucket 名稱。
  3. 前往「權限」分頁標籤。
  4. 按一下「授予存取權」
  5. 請提供下列設定詳細資料:
    • 新增主體:輸入服務帳戶電子郵件地址 (例如 confluence-audit-collector-sa@PROJECT_ID.iam.gserviceaccount.com)。
    • 指派角色:選取「Storage 物件管理員」
  6. 按一下 [儲存]

建立 Pub/Sub 主題

建立 Pub/Sub 主題,Cloud Scheduler 會將訊息發布至該主題,而 Cloud Run 函式會訂閱該主題。

  1. GCP Console 中,前往「Pub/Sub」>「Topics」(主題)
  2. 按一下「建立主題」
  3. 請提供下列設定詳細資料:
    • 主題 ID:輸入 confluence-audit-trigger
    • 其他設定保留預設值。
  4. 點選「建立」

建立 Cloud Run 函式來收集記錄

Cloud Run 函式會由 Cloud Scheduler 的 Pub/Sub 訊息觸發,從 Confluence Cloud Audit API 擷取記錄,並將記錄寫入 GCS。

  1. 前往 GCP Console 的「Cloud Run」
  2. 按一下「Create service」(建立服務)
  3. 選取「函式」 (使用內嵌編輯器建立函式)。
  4. 在「設定」部分,提供下列設定詳細資料:

    設定
    服務名稱 confluence-audit-collector
    區域 選取與 GCS bucket 相符的區域 (例如 us-central1)
    執行階段 選取「Python 3.12」以上版本
  5. 在「Trigger (optional)」(觸發條件 (選用)) 專區:

    1. 按一下「+ 新增觸發條件」
    2. 選取「Cloud Pub/Sub」
    3. 在「Select a Cloud Pub/Sub topic」(選取 Cloud Pub/Sub 主題) 中,選擇 confluence-audit-trigger
    4. 按一下 [儲存]
  6. 在「Authentication」(驗證) 部分:

    1. 選取「需要驗證」
    2. 檢查 Identity and Access Management (IAM)
  7. 向下捲動並展開「Containers, Networking, Security」

  8. 前往「安全性」分頁:

    • 服務帳戶:選取 confluence-audit-collector-sa
  9. 前往「容器」分頁:

    1. 按一下「變數與密鑰」
    2. 針對每個環境變數,按一下「+ 新增變數」
    變數名稱 範例值 說明
    GCS_BUCKET confluence-audit-logs GCS bucket 名稱
    GCS_PREFIX confluence-audit 記錄檔的前置字串
    STATE_KEY confluence-audit/state.json 狀態檔案路徑
    CONFLUENCE_URL https://yoursite.atlassian.net Confluence 網站網址
    CONFLUENCE_EMAIL your-email@example.com Atlassian 帳戶電子郵件地址
    CONFLUENCE_API_TOKEN your-api-token-here API 權杖
    MAX_RECORDS 1000 每次執行的記錄數量上限
  10. 在「變數與密鑰」部分,向下捲動至「要求」

    • 要求逾時:輸入 600 秒 (10 分鐘)。
  11. 前往「設定」分頁:

    • 在「資源」部分:
      • 記憶體:選取 512 MiB 以上。
      • CPU:選取 1
  12. 在「修訂版本資源調度」部分:

    • 執行個體數量下限:輸入 0
    • 「Maximum number of instances」(執行個體數量上限):輸入 100 (或根據預期負載調整)。
  13. 點選「建立」

  14. 等待服務建立完成 (1 到 2 分鐘)。

  15. 服務建立完成後,系統會自動開啟內嵌程式碼編輯器

新增函式程式碼

  1. 在「Function entry point」(函式進入點) 中輸入 main
  2. 在內嵌程式碼編輯器中建立兩個檔案:

    • 第一個檔案:main.py:
    import functions_framework
    from google.cloud import storage
    import json
    import os
    import urllib3
    from datetime import datetime, timezone, timedelta
    import time
    import base64
    
    # Initialize HTTP client with timeouts
    http = urllib3.PoolManager(
        timeout=urllib3.Timeout(connect=5.0, read=30.0),
        retries=False,
    )
    
    # Initialize Storage client
    storage_client = storage.Client()
    
    # Environment variables
    GCS_BUCKET = os.environ.get('GCS_BUCKET')
    GCS_PREFIX = os.environ.get('GCS_PREFIX', 'confluence-audit/')
    STATE_KEY = os.environ.get('STATE_KEY', 'confluence-audit/state.json')
    CONFLUENCE_URL = os.environ.get('CONFLUENCE_URL')
    CONFLUENCE_EMAIL = os.environ.get('CONFLUENCE_EMAIL')
    CONFLUENCE_API_TOKEN = os.environ.get('CONFLUENCE_API_TOKEN')
    MAX_RECORDS = int(os.environ.get('MAX_RECORDS', '1000'))
    
    def to_unix_millis(dt: datetime) -> int:
        """Convert datetime to Unix epoch milliseconds."""
        if dt.tzinfo is None:
            dt = dt.replace(tzinfo=timezone.utc)
        dt = dt.astimezone(timezone.utc)
        return int(dt.timestamp() * 1000)
    
    def parse_datetime(value: str) -> datetime:
        """Parse ISO datetime string to datetime object."""
        if value.endswith("Z"):
            value = value[:-1] + "+00:00"
        return datetime.fromisoformat(value)
    
    @functions_framework.cloud_event
    def main(cloud_event):
        """
        Cloud Run function triggered by Pub/Sub to fetch Confluence Cloud audit logs and write to GCS.
    
        Args:
            cloud_event: CloudEvent object containing Pub/Sub message
        """
    
        if not all([GCS_BUCKET, CONFLUENCE_URL, CONFLUENCE_EMAIL, CONFLUENCE_API_TOKEN]):
            print('Error: Missing required environment variables')
            return
    
        try:
            # Get GCS bucket
            bucket = storage_client.bucket(GCS_BUCKET)
    
            # Load state
            state = load_state(bucket, STATE_KEY)
    
            # Determine time window
            now = datetime.now(timezone.utc)
            last_time = None
    
            if isinstance(state, dict) and state.get("last_event_time"):
                try:
                    last_time = parse_datetime(state["last_event_time"])
                    # Overlap by 2 minutes to catch any delayed events
                    last_time = last_time - timedelta(minutes=2)
                except Exception as e:
                    print(f"Warning: Could not parse last_event_time: {e}")
    
            if last_time is None:
                last_time = now - timedelta(hours=24)
    
            print(f"Fetching logs from {last_time.isoformat()} to {now.isoformat()}")
    
            # Convert to Unix milliseconds
            start_millis = to_unix_millis(last_time)
            end_millis = to_unix_millis(now)
    
            # Fetch logs
            records, newest_event_time = fetch_logs(
                api_base=CONFLUENCE_URL,
                email=CONFLUENCE_EMAIL,
                api_token=CONFLUENCE_API_TOKEN,
                start_time_ms=start_millis,
                end_time_ms=end_millis,
                max_records=MAX_RECORDS,
            )
    
            if not records:
                print("No new log records found.")
                save_state(bucket, STATE_KEY, now.isoformat())
                return
    
            # Write to GCS as NDJSON
            timestamp = now.strftime('%Y%m%d_%H%M%S')
            object_key = f"{GCS_PREFIX}/logs_{timestamp}.ndjson"
            blob = bucket.blob(object_key)
    
            ndjson = '\n'.join([json.dumps(record, ensure_ascii=False) for record in records]) + '\n'
            blob.upload_from_string(ndjson, content_type='application/x-ndjson')
    
            print(f"Wrote {len(records)} records to gs://{GCS_BUCKET}/{object_key}")
    
            # Update state with newest event time
            if newest_event_time:
                save_state(bucket, STATE_KEY, newest_event_time)
            else:
                save_state(bucket, STATE_KEY, now.isoformat())
    
            print(f"Successfully processed {len(records)} records")
    
        except Exception as e:
            print(f'Error processing logs: {str(e)}')
            raise
    
    def load_state(bucket, key):
        """Load state from GCS."""
        try:
            blob = bucket.blob(key)
            if blob.exists():
                state_data = blob.download_as_text()
                return json.loads(state_data)
        except Exception as e:
            print(f"Warning: Could not load state: {e}")
    
        return {}
    
    def save_state(bucket, key, last_event_time_iso: str):
        """Save the last event timestamp to GCS state file."""
        try:
            state = {'last_event_time': last_event_time_iso}
            blob = bucket.blob(key)
            blob.upload_from_string(
                json.dumps(state, indent=2),
                content_type='application/json'
            )
            print(f"Saved state: last_event_time={last_event_time_iso}")
        except Exception as e:
            print(f"Warning: Could not save state: {e}")
    
    def fetch_logs(api_base: str, email: str, api_token: str, start_time_ms: int, end_time_ms: int, max_records: int):
        """
        Fetch logs from Confluence Cloud Audit API with pagination and rate limiting.
    
        Args:
            api_base: Confluence site URL
            email: Atlassian account email
            api_token: API token
            start_time_ms: Start time in Unix milliseconds
            end_time_ms: End time in Unix milliseconds
            max_records: Maximum total records to fetch
    
        Returns:
            Tuple of (records list, newest_event_time ISO string)
        """
        # Clean up URL
        base_url = api_base.rstrip('/')
    
        # Build authentication header
        auth_string = f"{email}:{api_token}"
        auth_bytes = auth_string.encode('utf-8')
        auth_b64 = base64.b64encode(auth_bytes).decode('utf-8')
        headers = {
            'Authorization': f'Basic {auth_b64}',
            'Accept': 'application/json',
            'User-Agent': 'GoogleSecOps-ConfluenceCollector/1.0'
        }
    
        records = []
        newest_time = None
        page_num = 0
        backoff = 1.0
        start_index = 0
    
        while True:
            page_num += 1
    
            if len(records) >= max_records:
                print(f"Reached max_records limit ({max_records})")
                break
    
            # Build request URL
            url = f"{base_url}/wiki/rest/api/audit?startDate={start_time_ms}&endDate={end_time_ms}&start={start_index}&limit=100"
    
            try:
                response = http.request('GET', url, headers=headers)
    
                # Handle rate limiting with exponential backoff
                if response.status == 429:
                    retry_after = int(response.headers.get('Retry-After', str(int(backoff))))
                    print(f"Rate limited (429). Retrying after {retry_after}s...")
                    time.sleep(retry_after)
                    backoff = min(backoff * 2, 30.0)
                    continue
    
                backoff = 1.0
    
                if response.status != 200:
                    print(f"HTTP Error: {response.status}")
                    response_text = response.data.decode('utf-8')
                    print(f"Response body: {response_text}")
                    return [], None
    
                data = json.loads(response.data.decode('utf-8'))
    
                page_results = data.get('results', [])
    
                if not page_results:
                    print(f"No more results (empty page)")
                    break
    
                print(f"Page {page_num}: Retrieved {len(page_results)} events")
                records.extend(page_results)
    
                # Track newest event time
                for event in page_results:
                    try:
                        # creationDate is in Unix milliseconds
                        event_time_ms = event.get('creationDate')
                        if event_time_ms:
                            event_dt = datetime.fromtimestamp(event_time_ms / 1000, tz=timezone.utc)
                            event_time = event_dt.isoformat()
                            if newest_time is None or parse_datetime(event_time) > parse_datetime(newest_time):
                                newest_time = event_time
                    except Exception as e:
                        print(f"Warning: Could not parse event time: {e}")
    
                # Check for more results
                current_size = data.get('size', 0)
                if current_size < 100:
                    print(f"Reached last page (size={current_size} < limit=100)")
                    break
    
                start_index += current_size
    
            except Exception as e:
                print(f"Error fetching logs: {e}")
                return [], None
    
        print(f"Retrieved {len(records)} total records from {page_num} pages")
        return records[:max_records], newest_time
    
    • 第二個檔案:requirements.txt:
    functions-framework==3.*
    google-cloud-storage==2.*
    urllib3>=2.0.0
    
  3. 點選「部署」來儲存並部署函式。

  4. 等待部署作業完成 (2 到 3 分鐘)。

建立 Cloud Scheduler 工作

Cloud Scheduler 會定期將訊息發布至 Pub/Sub 主題,觸發 Cloud Run 函式。

  1. 前往 GCP 主控台的「Cloud Scheduler」
  2. 點選「建立工作」
  3. 請提供下列設定詳細資料:

    設定
    名稱 confluence-audit-collector-hourly
    區域 選取與 Cloud Run 函式相同的區域
    頻率 0 * * * * (每小時整點)
    時區 選取時區 (建議使用世界標準時間)
    目標類型 Pub/Sub
    主題 選取「confluence-audit-trigger
    郵件內文 {} (空白 JSON 物件)
  4. 點選「建立」

排程頻率選項

  • 根據記錄檔量和延遲時間要求選擇頻率:

    頻率 Cron 運算式 用途
    每 5 分鐘 */5 * * * * 高容量、低延遲
    每 15 分鐘檢查一次 */15 * * * * 普通量
    每小時 0 * * * * 標準 (建議採用)
    每 6 小時 0 */6 * * * 少量、批次處理
    每日 0 0 * * * 歷來資料集合

測試整合項目

  1. Cloud Scheduler 控制台中找出您的工作。
  2. 按一下「強制執行」,手動觸發工作。
  3. 稍等幾秒鐘。
  4. 前往「Cloud Run」>「Services」
  5. 按一下 confluence-audit-collector
  6. 按一下 [Logs] (記錄) 分頁標籤。
  7. 確認函式是否已順利執行。請找出以下項目:

    Fetching logs from YYYY-MM-DDTHH:MM:SS+00:00 to YYYY-MM-DDTHH:MM:SS+00:00
    Page 1: Retrieved X events
    Wrote X records to gs://bucket-name/prefix/logs_YYYYMMDD_HHMMSS.ndjson
    Successfully processed X records
    
  8. 依序前往「Cloud Storage」>「Buckets」

  9. 按一下 bucket 名稱。

  10. 前往 confluence-audit/ 資料夾:

  11. 確認是否已建立含有目前時間戳記的新 .ndjson 檔案。

如果在記錄中發現錯誤:

  • HTTP 401:檢查環境變數中的 API 憑證
  • HTTP 403:確認帳戶是否具備 Confluence 管理員權限
  • HTTP 429:頻率限制 - 函式會自動重試並延遲
  • 缺少環境變數:檢查是否已設定所有必要變數

擷取 Google SecOps 服務帳戶

Google SecOps 會使用專屬服務帳戶,從 GCS bucket 讀取資料。您必須授予這個服務帳戶值區存取權。

取得服務帳戶電子郵件地址

  1. 依序前往「SIEM 設定」>「動態饋給」
  2. 按一下「新增動態消息」
  3. 按一下「設定單一動態饋給」
  4. 在「動態饋給名稱」欄位中輸入動態饋給名稱 (例如 Confluence Cloud Audit Logs)。
  5. 選取「Google Cloud Storage V2」做為「來源類型」
  6. 選取「Atlassian Confluence」做為「記錄類型」
  7. 按一下「取得服務帳戶」。系統會顯示不重複的服務帳戶電子郵件地址,例如:

    chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com
    
  8. 複製這個電子郵件地址,以便在下一步中使用。

將 IAM 權限授予 Google SecOps 服務帳戶

Google SecOps 服務帳戶需要 GCS bucket 的「Storage 物件檢視者」角色。

  1. 依序前往「Cloud Storage」>「Buckets」
  2. 按一下 bucket 名稱。
  3. 前往「權限」分頁標籤。
  4. 按一下「授予存取權」
  5. 請提供下列設定詳細資料:
    • 新增主體:貼上 Google SecOps 服務帳戶電子郵件地址。
    • 指派角色:選取「Storage 物件檢視者」
  6. 按一下 [儲存]

在 Google SecOps 中設定動態饋給,擷取 Confluence 記錄

  1. 依序前往「SIEM 設定」>「動態饋給」
  2. 按一下「新增動態消息」
  3. 按一下「設定單一動態饋給」
  4. 在「動態饋給名稱」欄位中輸入動態饋給名稱 (例如 Confluence Cloud Audit Logs)。
  5. 選取「Google Cloud Storage V2」做為「來源類型」
  6. 選取「Atlassian Confluence」做為「記錄類型」
  7. 點選 [下一步]。
  8. 指定下列輸入參數的值:

    • 儲存空間 bucket URL:輸入 GCS bucket URI,並加上前置路徑:

      gs://confluence-audit-logs/confluence-audit/
      
      • 取代:

        • confluence-audit-logs:您的 GCS bucket 名稱。
        • confluence-audit:儲存記錄的選用前置字元/資料夾路徑 (如為根目錄,請留空)。
      • 範例:

        • 根層級 bucket:gs://company-logs/
        • 前置字串:gs://company-logs/confluence-audit/
        • 有子資料夾:gs://company-logs/confluence/audit/
    • 來源刪除選項:根據偏好設定選取刪除選項:

      • 永不:移轉後一律不刪除任何檔案 (建議用於測試)。
      • 刪除已轉移的檔案:成功轉移檔案後刪除檔案。
      • 刪除已轉移的檔案和空白目錄:成功轉移後刪除檔案和空白目錄。

    • 檔案存在時間上限:包含在過去天數內修改的檔案。預設值為 180 天。

    • 資產命名空間資產命名空間

    • 擷取標籤:要套用至這個動態饋給事件的標籤。

  9. 點選 [下一步]。

  10. 在「Finalize」(完成) 畫面中檢查新的動態饋給設定,然後按一下「Submit」(提交)

UDM 對應表

記錄欄位 UDM 對應 邏輯
代理程式 read_only_udm.network.http.user_agent 取自「代理人」欄位的值。
app_protocol read_only_udm.network.application_protocol 衍生自「app_protocol」欄位。如果「app_protocol」包含「HTTPS」、「HTTP」、「SSH」或「RDP」,系統會使用對應的通訊協定。否則預設為「UNKNOWN_APPLICATION_PROTOCOL」。
app_protocol read_only_udm.network.application_protocol_version 取自「app_protocol」欄位的值。
auditType.action read_only_udm.security_result.action 衍生自「auditType.action」欄位。如果「auditType.action」包含「successful」,則值會設為「ALLOW」。如果包含「restricted」,則值會設為「BLOCK」。
auditType.action read_only_udm.security_result.summary 如果「auditType」不為空白,且「auditType_area」為「SECURITY」,則值取自「auditType.action」欄位。
auditType.actionI18nKey read_only_udm.metadata.product_event_type 如果「auditType」不為空白,則取自「auditType.actionI18nKey」欄位的值。
auditType.area read_only_udm.security_result.detection_fields.value 從「auditType.area」欄位取得值,並指派給偵測欄位的「value」欄位,且「key」欄位設為「auditType area」。如果「auditType」不為空白,系統就會執行這項對應作業。
auditType.category read_only_udm.security_result.category_details 「auditType」不為空白時,取自「auditType.category」欄位的值。
auditType.categoryI18nKey read_only_udm.security_result.detection_fields.value 從「auditType.categoryI18nKey」欄位取得值,並指派給偵測欄位的「value」欄位,而「key」欄位則設為「auditType categoryI18nKey」。如果「auditType」不為空白,系統就會執行這項對應作業。
auditType.level read_only_udm.security_result.detection_fields.value 從「auditType.level」欄位取得值,並指派給偵測欄位的「value」欄位,且「key」欄位設為「auditType level」。如果「auditType」不為空白,系統就會執行這項對應作業。
author.displayName read_only_udm.principal.user.user_display_name 取自「author.displayName」欄位的值。
author.externalCollaborator read_only_udm.security_result.about.resource.attribute.labels.value 從「author.externalCollaborator」欄位取得值,並指派給「key」欄位設為「externalCollaborator」的標籤「value」欄位。
author.id read_only_udm.principal.user.userid 當「author.type」為「user」且「principal_user_present」為「false」時,取自「author.id」欄位的值。
author.isExternalCollaborator read_only_udm.security_result.about.resource.attribute.labels.value 從「author.isExternalCollaborator」欄位取得值,並指派給「key」欄位設為「isExternalCollaborator」的標籤「value」欄位。
author.name read_only_udm.principal.user.user_display_name 當「author.type」為「user」且「principal_user_present」為「false」時,取自「author.name」欄位的值。
bytes_in read_only_udm.network.received_bytes 如果「bytes_in」欄位包含數字,則取自該欄位的值。否則預設為 0。
category read_only_udm.security_result.category_details 取自「category」欄位的值。
changedValues read_only_udm.principal.resource.attribute.labels 逐一疊代「changedValues」中的每個元素,並使用「changedValue [index] [key]」等鍵建立標籤,以及「changedValues」陣列中對應的值。
建立日期 read_only_udm.metadata.event_timestamp 取自「creationDate」欄位的值,剖析為 UNIX 或 UNIX_MS 時間戳記。
extraAttributes read_only_udm.principal.resource.attribute.labels 逐一疊代「extraAttributes」中的每個元素,並根據「name」和「nameI18nKey」欄位建立含有鍵的標籤,以及來自對應「value」欄位的值。
http_verb read_only_udm.network.http.method 值取自「http_verb」欄位。
ip read_only_udm.target.ip 取自「ip」欄位的值。
principal_host read_only_udm.principal.hostname 取自「principal_host」欄位的值。
referral_url read_only_udm.network.http.referral_url 值取自「referral_url」欄位。
remoteAddress read_only_udm.principal.ip 取自「remoteAddress」欄位的值,並剖析為 IP 位址。
response_code read_only_udm.network.http.response_code 取自「response_code」欄位的值。
session_duration read_only_udm.additional.fields.value.string_value 從「session_duration」欄位取得值,並指派給「key」欄位設為「Session Duration」的標籤「string_value」欄位。
來源 read_only_udm.principal.ip 從「來源」欄位取得的值,並剖析為 IP 位址。
src_ip read_only_udm.principal.ip 如果「remoteAddress」為空白,則取自「src_ip」欄位的值。
摘要 read_only_udm.security_result.summary 值取自「摘要」欄位。
sysAdmin read_only_udm.security_result.about.resource.attribute.labels.value 從「sysAdmin」欄位取得值,並指派給「key」欄位設為「sysAdmin」的標籤「value」欄位。
superAdmin read_only_udm.security_result.about.resource.attribute.labels.value 從「superAdmin」欄位取得值,並指派給「value」欄位,而標籤的「key」欄位則設為「superAdmin」。
target_url read_only_udm.target.url 值取自「target_url」欄位。
時間戳記 read_only_udm.metadata.event_timestamp 從「時間戳記」欄位取得的值,並剖析為日期和時間字串。
user_id read_only_udm.principal.user.userid 取自「user_id」欄位的值。
read_only_udm.metadata.event_type 這個欄位的值取決於一系列檢查作業,預設為「GENERIC_EVENT」。系統會根據「principal_host」、「user_id」、「has_principal」和「author.type」等其他欄位的存在與內容,將這個欄位設為「NETWORK_HTTP」、「USER_UNCATEGORIZED」或「STATUS_UPDATE」等特定值。
read_only_udm.metadata.vendor_name 設為「ATLASSIAN」。
read_only_udm.metadata.product_name 設為「CONFLUENCE」。
read_only_udm.metadata.log_type 設為「ATLASSIAN_CONFLUENCE」。
read_only_udm.principal.user.user_display_name 這個欄位的值可能來自「author.displayName」或「affectedObject.name」,視情況而定。
read_only_udm.target.process.pid 這個欄位的值可能來自「principal_host」或「pid」,視情況而定。
read_only_udm.principal.resource.attribute.labels 這個欄位會填入從「affectedObjects」、「changedValues」和「extraAttributes」等欄位衍生的各種標籤。這些標籤的鍵和值會根據這些欄位的特定內容動態產生。

需要其他協助嗎?向社群成員和 Google SecOps 專業人員尋求答案。