收集 Atlassian Confluence 日志

支持的平台:

本文档介绍了如何将 Atlassian Confluence 日志提取到 Google Security Operations。解析器首先尝试使用专为 Atlassian Confluence 日志设计的正则表达式(Grok 模式)从原始日志消息中提取字段。如果 grok 解析失败或日志采用 JSON 格式,则代码会尝试将消息解析为 JSON。最后,提取的字段会映射到 Google SecOps UDM 架构,并添加更多上下文信息。

准备工作

请确保满足以下前提条件:

  • Google SecOps 实例
  • 具有审核日志访问权限的 Atlassian Confluence Cloud 账号,或者具有管理权限的 Confluence Data Center/Server 账号
  • 对于基于 GCP 的方法:对 GCP(GCS、IAM、Cloud Run、Pub/Sub、Cloud Scheduler)的特权访问权限
  • 对于 Bindplane 方法:Windows Server 2016 或更高版本,或者具有 systemd 的 Linux 主机

集成选项概览

本指南提供了两种集成途径:

  • 选项 1:通过 Bindplane + Syslog 收集 Confluence Data Center/Server 数据
  • 方式 2:通过 GCP Cloud Run 函数 + GCS(JSON 格式)获取 Confluence Cloud 审核日志

请选择最适合您的 Confluence 部署类型和基础架构的选项。

选项 1:通过 Bindplane + Syslog 收集 Confluence Data Center/Server 日志

此选项用于将 Confluence Data Center 或 Server 配置为通过 syslog 将日志发送到 Bindplane 代理,然后由该代理将日志转发到 Google SecOps。

获取 Google SecOps 注入身份验证文件

  1. 登录 Google SecOps 控制台。
  2. 依次前往 SIEM 设置 > 收集代理
  3. 点击下载以下载内容提交身份验证文件
  4. 将文件安全地保存在将要安装 Bindplane 代理的系统上。

获取 Google SecOps 客户 ID

  1. 登录 Google SecOps 控制台。
  2. 依次前往 SIEM 设置 > 个人资料
  3. 复制并保存组织详细信息部分中的客户 ID

安装 Bindplane 代理

按照以下说明在 Windows 或 Linux 操作系统上安装 Bindplane 代理。

Windows 安装

  1. 以管理员身份打开命令提示符PowerShell
  2. 运行以下命令:

    msiexec /i "https://github.com/observIQ/bindplane-otel-collector/releases/latest/download/observiq-otel-collector.msi" /quiet
    
  3. 等待安装完成。

  4. 运行以下命令来验证安装:

    sc query observiq-otel-collector
    

该服务应显示为 RUNNING

Linux 安装

  1. 打开具有 root 或 sudo 权限的终端。
  2. 运行以下命令:

    sudo sh -c "$(curl -fsSlL https://github.com/observIQ/bindplane-otel-collector/releases/latest/download/install_unix.sh)" install_unix.sh
    
  3. 等待安装完成。

  4. 运行以下命令来验证安装:

    sudo systemctl status observiq-otel-collector
    

该服务应显示为有效(正在运行)

其他安装资源

如需了解其他安装选项和问题排查信息,请参阅 Bindplane 代理安装指南

配置 Bindplane 代理以注入 syslog 并将其发送到 Google SecOps

找到配置文件

  • Linux

    sudo nano /etc/bindplane-agent/config.yaml
    
  • Windows

    notepad "C:\Program Files\observIQ OpenTelemetry Collector\config.yaml"
    

修改配置文件

  1. config.yaml 的全部内容替换为以下配置:

    receivers:
      udplog:
        listen_address: "0.0.0.0:514"
    
    exporters:
      chronicle/confluence_logs:
        compression: gzip
        creds_file_path: '/etc/bindplane-agent/ingestion-auth.json'
        customer_id: 'YOUR_CUSTOMER_ID'
        endpoint: malachiteingestion-pa.googleapis.com
        log_type: ATLASSIAN_CONFLUENCE
        raw_log_field: body
        ingestion_labels:
          service: confluence
    
    service:
      pipelines:
        logs/confluence:
          receivers:
            - udplog
          exporters:
            - chronicle/confluence_logs
    

配置参数

  • 替换以下占位符:

    • listen_address:根据您的基础架构需要替换端口和 IP 地址。使用 0.0.0.0:514 在端口 514 上监听所有接口。
    • creds_file_path:更新为身份验证文件的保存路径:
      • Linux/etc/bindplane-agent/ingestion-auth.json
      • WindowsC:\Program Files\observIQ OpenTelemetry Collector\ingestion-auth.json
    • customer_id:将 YOUR_CUSTOMER_ID 替换为上一步中的实际客户 ID。
    • endpoint:区域端点网址:
      • 美国malachiteingestion-pa.googleapis.com
      • 欧洲europe-malachiteingestion-pa.googleapis.com
      • 亚洲asia-southeast1-malachiteingestion-pa.googleapis.com

保存配置文件

修改后,保存文件:

  • Linux:依次按 Ctrl+OEnterCtrl+X
  • Windows:依次点击文件 > 保存

重启 Bindplane 代理以应用更改

在 Linux 中重启 Bindplane 代理

  1. 如需在 Linux 中重启 Bindplane 代理,请运行以下命令:

    sudo systemctl restart observiq-otel-collector
    
  2. 验证服务是否正在运行:

    sudo systemctl status observiq-otel-collector
    
  3. 检查日志是否存在错误:

    sudo journalctl -u observiq-otel-collector -f
    

在 Windows 中重启 Bindplane 代理

  1. 如需在 Windows 中重启 Bindplane 代理,请选择以下选项之一:

    • 以管理员身份使用命令提示符或 PowerShell:

      net stop observiq-otel-collector && net start observiq-otel-collector
      
    • 使用“服务”控制台:

      1. Win+R,输入 services.msc,然后按 Enter 键。
      2. 找到 observIQ OpenTelemetry 收集器
      3. 右键点击并选择重新启动
      4. 验证服务是否正在运行:

        sc query observiq-otel-collector
        
      5. 检查日志是否存在错误:

        type "C:\Program Files\observIQ OpenTelemetry Collector\log\collector.log"
        

在 Confluence Data Center/Server 上配置 Syslog 转发

  1. 将 Confluence 配置为将日志写入文件(默认行为)。
  2. 如果 rsyslog 不存在,请进行安装:

    sudo apt-get install rsyslog  # Debian/Ubuntu
    sudo yum install rsyslog      # RHEL/CentOS
    
  3. 创建 rsyslog 配置文件 /etc/rsyslog.d/confluence.conf

    # Forward Confluence logs to Bindplane
    $ModLoad imfile
    
    # Application logs
    $InputFileName /opt/atlassian/confluence/logs/atlassian-confluence.log
    $InputFileTag confluence-app:
    $InputFileStateFile stat-confluence-app
    $InputFileSeverity info
    $InputFileFacility local0
    $InputRunFileMonitor
    
    # Audit logs (JSON format in DC/Server)
    $InputFileName <confluence-home-directory>/log/audit/audit.log
    $InputFileTag confluence-audit:
    $InputFileStateFile stat-confluence-audit
    $InputFileSeverity info
    $InputFileFacility local1
    $InputRunFileMonitor
    
    # Forward to Bindplane agent
    *.* @@BINDPLANE_AGENT_IP:514
    
    • BINDPLANE_AGENT_IP 替换为 Bindplane 代理的 IP 地址(例如 192.168.1.100)。
    • 根据您的 Confluence 安装情况调整日志文件路径:
      • 应用日志通常:<confluence-install>/logs/<local-home>/logs/
      • 审核日志:<confluence-home-directory>/log/audit/(JSON 格式)
      • 如需查找 Confluence 主目录,请依次前往设置 > 常规配置 > 系统信息,然后查找 Confluence 主目录本地主目录
  4. 重启 rsyslog:

    sudo systemctl restart rsyslog
    

选项 B:配置 Log4j2 Syslog 转发

此选项需要修改 Log4j2 配置。建议选择方案 A (rsyslog),因为它比较简单。

  1. 通过 SSH 或 RDP 登录您的 Confluence 服务器。
  2. 找到以下位置的 Log4j2 配置文件:

    <confluence-install>/confluence/WEB-INF/classes/log4j2.xml
    
  3. 修改配置文件以添加 Syslog appender:

    <Configuration>
      <Appenders>
        <!-- Existing appenders -->
        <Syslog name="SyslogAppender" 
                host="BINDPLANE_AGENT_IP" 
                port="514" 
                protocol="UDP"
                format="RFC5424"
                facility="LOCAL0">
          <PatternLayout pattern="%d{ISO8601} %p [%t] [%c{1}] %m%n"/>
        </Syslog>
      </Appenders>
    
      <Loggers>
        <Root level="info">
          <AppenderRef ref="SyslogAppender"/>
          <!-- Other appender refs -->
        </Root>
    
        <!-- Audit logger -->
        <Logger name="com.atlassian.confluence.event.events.security.AuditEvent" 
                level="info" 
                additivity="false">
          <AppenderRef ref="SyslogAppender"/>
        </Logger>
      </Loggers>
    </Configuration>
    
    • BINDPLANE_AGENT_IP 替换为 Bindplane 代理的 IP 地址(例如 192.168.1.100)。
  4. 重启 Confluence 以应用更改:

    sudo systemctl restart confluence
    

方案 2:通过 GCP Cloud Run 函数和 GCS 获取 Confluence Cloud 审核日志

此方法使用 GCP Cloud Run 函数通过 Confluence Audit REST API 定期提取审核日志,并将其存储在 GCS 中以供 Google SecOps 提取。

收集 Confluence Cloud API 凭据

  1. 登录您的 Atlassian 账号
  2. 前往 https://id.atlassian.com/manage-profile/security/api-tokens
  3. 点击创建 API 令牌
  4. 为令牌输入标签(例如,Google Security Operations Integration)。
  5. 点击创建
  6. 复制并妥善保存 API 令牌。
  7. 记下您的 Confluence Cloud 网站网址(例如 https://yoursite.atlassian.net)。
  8. 记下您的 Atlassian 账号电子邮件地址(用于身份验证)。

验证权限

如需验证账号是否具有所需权限,请执行以下操作:

  1. 登录 Confluence Cloud。
  2. 点击右上角的设置图标 (⚙️)。
  3. 如果您可以在左侧导航栏中看到监控 > 审核日志,则说明您拥有所需的权限。
  4. 如果您看不到此选项,请与您的管理员联系,以获取 Confluence 管理员权限。

测试 API 访问权限

  • 在继续进行集成之前,请先测试您的凭据:

    # Replace with your actual credentials
    CONFLUENCE_EMAIL="your-email@example.com"
    CONFLUENCE_API_TOKEN="your-api-token"
    CONFLUENCE_URL="https://yoursite.atlassian.net"
    
    # Test API access
    curl -u "${CONFLUENCE_EMAIL}:${CONFLUENCE_API_TOKEN}" \
      -H "Accept: application/json" \
      "${CONFLUENCE_URL}/wiki/rest/api/audit"
    

创建 Google Cloud Storage 存储分区

  1. 前往 Google Cloud 控制台
  2. 选择您的项目或创建新项目。
  3. 在导航菜单中,依次前往 Cloud Storage > 存储分区
  4. 点击创建存储分区
  5. 提供以下配置详细信息:

    设置
    为存储分区命名 输入一个全局唯一的名称(例如 confluence-audit-logs
    位置类型 根据您的需求进行选择(区域级、双区域级、多区域级)
    位置 选择相应位置(例如 us-central1
    存储类别 标准(建议用于经常访问的日志)
    访问权限控制 统一(推荐)
    保护工具 可选:启用对象版本控制或保留政策
  6. 点击创建

为 Cloud Run 函数创建服务账号

Cloud Run 函数需要一个服务账号,该账号具有向 GCS 存储分区写入内容以及被 Pub/Sub 调用的权限。

创建服务账号

  1. GCP 控制台中,依次前往 IAM 和管理 > 服务账号
  2. 点击创建服务账号
  3. 提供以下配置详细信息:
    • 服务账号名称:输入 confluence-audit-collector-sa
    • 服务账号说明:输入 Service account for Cloud Run function to collect Confluence Cloud audit logs
  4. 点击创建并继续
  5. 向此服务账号授予对项目的访问权限部分中,添加以下角色:
    1. 点击选择角色
    2. 搜索并选择 Storage Object Admin
    3. 点击 + 添加其他角色
    4. 搜索并选择 Cloud Run Invoker
    5. 点击 + 添加其他角色
    6. 搜索并选择 Cloud Functions Invoker
  6. 点击继续
  7. 点击完成

必须拥有这些角色,才能:

  • Storage Object Admin:将日志写入 GCS 存储分区并管理状态文件
  • Cloud Run Invoker:允许 Pub/Sub 调用函数
  • Cloud Functions Invoker:允许调用函数

授予对 GCS 存储分区的 IAM 权限

向服务账号授予对 GCS 存储分区的写入权限:

  1. 前往 Cloud Storage > 存储分区
  2. 点击您的存储分区名称。
  3. 前往权限标签页。
  4. 点击授予访问权限
  5. 提供以下配置详细信息:
    • 添加主账号:输入服务账号电子邮件地址(例如 confluence-audit-collector-sa@PROJECT_ID.iam.gserviceaccount.com)。
    • 分配角色:选择 Storage Object Admin
  6. 点击保存

创建发布/订阅主题

创建一个 Pub/Sub 主题,供 Cloud Scheduler 发布消息,并供 Cloud Run 函数订阅。

  1. GCP 控制台中,前往 Pub/Sub > 主题
  2. 点击创建主题
  3. 提供以下配置详细信息:
    • 主题 ID:输入 confluence-audit-trigger
    • 将其他设置保留为默认值。
  4. 点击创建

创建 Cloud Run 函数以收集日志

Cloud Run 函数由来自 Cloud Scheduler 的 Pub/Sub 消息触发,以从 Confluence Cloud Audit API 中提取日志并将其写入 GCS。

  1. GCP 控制台中,前往 Cloud Run
  2. 点击创建服务
  3. 选择函数(使用内嵌编辑器创建函数)。
  4. 配置部分中,提供以下配置详细信息:

    设置
    Service 名称 confluence-audit-collector
    区域 选择与您的 GCS 存储分区匹配的区域(例如 us-central1
    运行时 选择 Python 3.12 或更高版本
  5. 触发器(可选)部分中:

    1. 点击 + 添加触发器
    2. 选择 Cloud Pub/Sub
    3. 选择 Cloud Pub/Sub 主题中,选择 confluence-audit-trigger
    4. 点击保存
  6. 身份验证部分中:

    1. 选择需要进行身份验证
    2. 检查 Identity and Access Management (IAM)
  7. 向下滚动并展开容器、网络、安全性

  8. 前往安全标签页:

    • 服务账号:选择 confluence-audit-collector-sa
  9. 前往容器标签页:

    1. 点击变量和密钥
    2. 为每个环境变量点击 + 添加变量
    变量名称 示例值 说明
    GCS_BUCKET confluence-audit-logs GCS 存储分区名称
    GCS_PREFIX confluence-audit 日志文件的前缀
    STATE_KEY confluence-audit/state.json 状态文件路径
    CONFLUENCE_URL https://yoursite.atlassian.net Confluence 网站网址
    CONFLUENCE_EMAIL your-email@example.com Atlassian 账号电子邮件地址
    CONFLUENCE_API_TOKEN your-api-token-here API 令牌
    MAX_RECORDS 1000 每次运行的记录数上限
  10. 变量和 Secret 部分中,向下滚动到请求

    • 请求超时:输入 600 秒(10 分钟)。
  11. 前往设置标签页:

    • 资源部分中:
      • 内存:选择 512 MiB 或更高值。
      • CPU:选择 1
  12. 修订版本扩缩部分中:

    • 实例数下限:输入 0
    • 实例数上限:输入 100(或根据预期负载进行调整)。
  13. 点击创建

  14. 等待服务创建完成(1-2 分钟)。

  15. 创建服务后,系统会自动打开内嵌代码编辑器

添加函数代码

  1. 函数入口点中输入 main
  2. 在内嵌代码编辑器中,创建两个文件:

    • 第一个文件:main.py:
    import functions_framework
    from google.cloud import storage
    import json
    import os
    import urllib3
    from datetime import datetime, timezone, timedelta
    import time
    import base64
    
    # Initialize HTTP client with timeouts
    http = urllib3.PoolManager(
        timeout=urllib3.Timeout(connect=5.0, read=30.0),
        retries=False,
    )
    
    # Initialize Storage client
    storage_client = storage.Client()
    
    # Environment variables
    GCS_BUCKET = os.environ.get('GCS_BUCKET')
    GCS_PREFIX = os.environ.get('GCS_PREFIX', 'confluence-audit/')
    STATE_KEY = os.environ.get('STATE_KEY', 'confluence-audit/state.json')
    CONFLUENCE_URL = os.environ.get('CONFLUENCE_URL')
    CONFLUENCE_EMAIL = os.environ.get('CONFLUENCE_EMAIL')
    CONFLUENCE_API_TOKEN = os.environ.get('CONFLUENCE_API_TOKEN')
    MAX_RECORDS = int(os.environ.get('MAX_RECORDS', '1000'))
    
    def to_unix_millis(dt: datetime) -> int:
        """Convert datetime to Unix epoch milliseconds."""
        if dt.tzinfo is None:
            dt = dt.replace(tzinfo=timezone.utc)
        dt = dt.astimezone(timezone.utc)
        return int(dt.timestamp() * 1000)
    
    def parse_datetime(value: str) -> datetime:
        """Parse ISO datetime string to datetime object."""
        if value.endswith("Z"):
            value = value[:-1] + "+00:00"
        return datetime.fromisoformat(value)
    
    @functions_framework.cloud_event
    def main(cloud_event):
        """
        Cloud Run function triggered by Pub/Sub to fetch Confluence Cloud audit logs and write to GCS.
    
        Args:
            cloud_event: CloudEvent object containing Pub/Sub message
        """
    
        if not all([GCS_BUCKET, CONFLUENCE_URL, CONFLUENCE_EMAIL, CONFLUENCE_API_TOKEN]):
            print('Error: Missing required environment variables')
            return
    
        try:
            # Get GCS bucket
            bucket = storage_client.bucket(GCS_BUCKET)
    
            # Load state
            state = load_state(bucket, STATE_KEY)
    
            # Determine time window
            now = datetime.now(timezone.utc)
            last_time = None
    
            if isinstance(state, dict) and state.get("last_event_time"):
                try:
                    last_time = parse_datetime(state["last_event_time"])
                    # Overlap by 2 minutes to catch any delayed events
                    last_time = last_time - timedelta(minutes=2)
                except Exception as e:
                    print(f"Warning: Could not parse last_event_time: {e}")
    
            if last_time is None:
                last_time = now - timedelta(hours=24)
    
            print(f"Fetching logs from {last_time.isoformat()} to {now.isoformat()}")
    
            # Convert to Unix milliseconds
            start_millis = to_unix_millis(last_time)
            end_millis = to_unix_millis(now)
    
            # Fetch logs
            records, newest_event_time = fetch_logs(
                api_base=CONFLUENCE_URL,
                email=CONFLUENCE_EMAIL,
                api_token=CONFLUENCE_API_TOKEN,
                start_time_ms=start_millis,
                end_time_ms=end_millis,
                max_records=MAX_RECORDS,
            )
    
            if not records:
                print("No new log records found.")
                save_state(bucket, STATE_KEY, now.isoformat())
                return
    
            # Write to GCS as NDJSON
            timestamp = now.strftime('%Y%m%d_%H%M%S')
            object_key = f"{GCS_PREFIX}/logs_{timestamp}.ndjson"
            blob = bucket.blob(object_key)
    
            ndjson = '\n'.join([json.dumps(record, ensure_ascii=False) for record in records]) + '\n'
            blob.upload_from_string(ndjson, content_type='application/x-ndjson')
    
            print(f"Wrote {len(records)} records to gs://{GCS_BUCKET}/{object_key}")
    
            # Update state with newest event time
            if newest_event_time:
                save_state(bucket, STATE_KEY, newest_event_time)
            else:
                save_state(bucket, STATE_KEY, now.isoformat())
    
            print(f"Successfully processed {len(records)} records")
    
        except Exception as e:
            print(f'Error processing logs: {str(e)}')
            raise
    
    def load_state(bucket, key):
        """Load state from GCS."""
        try:
            blob = bucket.blob(key)
            if blob.exists():
                state_data = blob.download_as_text()
                return json.loads(state_data)
        except Exception as e:
            print(f"Warning: Could not load state: {e}")
    
        return {}
    
    def save_state(bucket, key, last_event_time_iso: str):
        """Save the last event timestamp to GCS state file."""
        try:
            state = {'last_event_time': last_event_time_iso}
            blob = bucket.blob(key)
            blob.upload_from_string(
                json.dumps(state, indent=2),
                content_type='application/json'
            )
            print(f"Saved state: last_event_time={last_event_time_iso}")
        except Exception as e:
            print(f"Warning: Could not save state: {e}")
    
    def fetch_logs(api_base: str, email: str, api_token: str, start_time_ms: int, end_time_ms: int, max_records: int):
        """
        Fetch logs from Confluence Cloud Audit API with pagination and rate limiting.
    
        Args:
            api_base: Confluence site URL
            email: Atlassian account email
            api_token: API token
            start_time_ms: Start time in Unix milliseconds
            end_time_ms: End time in Unix milliseconds
            max_records: Maximum total records to fetch
    
        Returns:
            Tuple of (records list, newest_event_time ISO string)
        """
        # Clean up URL
        base_url = api_base.rstrip('/')
    
        # Build authentication header
        auth_string = f"{email}:{api_token}"
        auth_bytes = auth_string.encode('utf-8')
        auth_b64 = base64.b64encode(auth_bytes).decode('utf-8')
        headers = {
            'Authorization': f'Basic {auth_b64}',
            'Accept': 'application/json',
            'User-Agent': 'GoogleSecOps-ConfluenceCollector/1.0'
        }
    
        records = []
        newest_time = None
        page_num = 0
        backoff = 1.0
        start_index = 0
    
        while True:
            page_num += 1
    
            if len(records) >= max_records:
                print(f"Reached max_records limit ({max_records})")
                break
    
            # Build request URL
            url = f"{base_url}/wiki/rest/api/audit?startDate={start_time_ms}&endDate={end_time_ms}&start={start_index}&limit=100"
    
            try:
                response = http.request('GET', url, headers=headers)
    
                # Handle rate limiting with exponential backoff
                if response.status == 429:
                    retry_after = int(response.headers.get('Retry-After', str(int(backoff))))
                    print(f"Rate limited (429). Retrying after {retry_after}s...")
                    time.sleep(retry_after)
                    backoff = min(backoff * 2, 30.0)
                    continue
    
                backoff = 1.0
    
                if response.status != 200:
                    print(f"HTTP Error: {response.status}")
                    response_text = response.data.decode('utf-8')
                    print(f"Response body: {response_text}")
                    return [], None
    
                data = json.loads(response.data.decode('utf-8'))
    
                page_results = data.get('results', [])
    
                if not page_results:
                    print(f"No more results (empty page)")
                    break
    
                print(f"Page {page_num}: Retrieved {len(page_results)} events")
                records.extend(page_results)
    
                # Track newest event time
                for event in page_results:
                    try:
                        # creationDate is in Unix milliseconds
                        event_time_ms = event.get('creationDate')
                        if event_time_ms:
                            event_dt = datetime.fromtimestamp(event_time_ms / 1000, tz=timezone.utc)
                            event_time = event_dt.isoformat()
                            if newest_time is None or parse_datetime(event_time) > parse_datetime(newest_time):
                                newest_time = event_time
                    except Exception as e:
                        print(f"Warning: Could not parse event time: {e}")
    
                # Check for more results
                current_size = data.get('size', 0)
                if current_size < 100:
                    print(f"Reached last page (size={current_size} < limit=100)")
                    break
    
                start_index += current_size
    
            except Exception as e:
                print(f"Error fetching logs: {e}")
                return [], None
    
        print(f"Retrieved {len(records)} total records from {page_num} pages")
        return records[:max_records], newest_time
    
    • 第二个文件:requirements.txt:
    functions-framework==3.*
    google-cloud-storage==2.*
    urllib3>=2.0.0
    
  3. 点击部署以保存并部署该函数。

  4. 等待部署完成(2-3 分钟)。

创建 Cloud Scheduler 作业

Cloud Scheduler 会定期向 Pub/Sub 主题发布消息,从而触发 Cloud Run 函数。

  1. GCP Console 中,前往 Cloud Scheduler
  2. 点击创建作业
  3. 提供以下配置详细信息:

    设置
    名称 confluence-audit-collector-hourly
    区域 选择与 Cloud Run 函数相同的区域
    频率 0 * * * *(每小时一次,在整点时)
    时区 选择时区(建议选择世界协调时间 [UTC])
    目标类型 Pub/Sub
    主题 选择confluence-audit-trigger
    消息正文 {}(空 JSON 对象)
  4. 点击创建

时间表频率选项

  • 根据日志量和延迟时间要求选择频次:

    频率 Cron 表达式 使用场景
    每隔 5 分钟 */5 * * * * 高容量、低延迟
    每隔 15 分钟 */15 * * * * 搜索量中等
    每小时 0 * * * * 标准(推荐)
    每 6 小时 0 */6 * * * 量小、批处理
    每天 0 0 * * * 历史数据收集

测试集成

  1. Cloud Scheduler 控制台中,找到您的作业。
  2. 点击强制运行以手动触发作业。
  3. 等待几秒钟。
  4. 前往 Cloud Run > 服务
  5. 点击 confluence-audit-collector
  6. 点击日志标签页。
  7. 验证函数是否已成功执行。查找以下项:

    Fetching logs from YYYY-MM-DDTHH:MM:SS+00:00 to YYYY-MM-DDTHH:MM:SS+00:00
    Page 1: Retrieved X events
    Wrote X records to gs://bucket-name/prefix/logs_YYYYMMDD_HHMMSS.ndjson
    Successfully processed X records
    
  8. 前往 Cloud Storage > 存储分区

  9. 点击您的存储分区名称。

  10. 导航到 confluence-audit/ 文件夹。

  11. 验证是否已创建具有当前时间戳的新 .ndjson 文件。

如果您在日志中看到错误,请执行以下操作:

  • HTTP 401:检查环境变量中的 API 凭据
  • HTTP 403:验证账号是否具有 Confluence 管理员权限
  • HTTP 429:速率限制 - 函数将自动重试并进行退避
  • 缺少环境变量:检查是否已设置所有必需的变量

检索 Google SecOps 服务账号

Google SecOps 使用唯一的服务账号从您的 GCS 存储分区中读取数据。您必须授予此服务账号对您的存储分区的访问权限。

获取服务账号电子邮件地址

  1. 依次前往 SIEM 设置 > Feed
  2. 点击添加新 Feed
  3. 点击配置单个 Feed
  4. Feed 名称字段中,输入 Feed 的名称(例如 Confluence Cloud Audit Logs)。
  5. 选择 Google Cloud Storage V2 作为来源类型
  6. 选择 Atlassian Confluence 作为日志类型
  7. 点击获取服务账号。系统会显示一个唯一的服务账号电子邮件地址,例如:

    chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com
    
  8. 复制此电子邮件地址,以便在下一步中使用。

向 Google SecOps 服务账号授予 IAM 权限

Google SecOps 服务账号需要对您的 GCS 存储分区具有 Storage Object Viewer 角色。

  1. 前往 Cloud Storage > 存储分区
  2. 点击您的存储分区名称。
  3. 前往权限标签页。
  4. 点击授予访问权限
  5. 提供以下配置详细信息:
    • 添加主账号:粘贴 Google SecOps 服务账号电子邮件地址。
    • 分配角色:选择 Storage Object Viewer
  6. 点击保存

在 Google SecOps 中配置 Feed 以提取 Confluence 日志

  1. 依次前往 SIEM 设置 > Feed
  2. 点击添加新 Feed
  3. 点击配置单个 Feed
  4. Feed 名称字段中,输入 Feed 的名称(例如 Confluence Cloud Audit Logs)。
  5. 选择 Google Cloud Storage V2 作为来源类型
  6. 选择 Atlassian Confluence 作为日志类型
  7. 点击下一步
  8. 为以下输入参数指定值:

    • 存储分区网址:输入带有前缀路径的 GCS 存储分区 URI:

      gs://confluence-audit-logs/confluence-audit/
      
        • confluence-audit-logs:您的 GCS 存储分区名称。
        • confluence-audit:存储日志的可选前缀/文件夹路径(留空表示根目录)。
      • 示例

        • 根存储分区:gs://company-logs/
        • 带前缀:gs://company-logs/confluence-audit/
        • 使用子文件夹:gs://company-logs/confluence/audit/
    • 来源删除选项:根据您的偏好选择删除选项:

      • 永不:永不删除转移后的任何文件(建议用于测试)。
      • 删除已转移的文件:在成功转移后删除文件。
      • 删除已转移的文件和空目录:在成功转移后删除文件和空目录。

    • 文件存在时间上限:包含在过去指定天数内修改的文件。默认值为 180 天。

    • 资产命名空间资产命名空间

    • 注入标签:要应用于此 Feed 中事件的标签。

  9. 点击下一步

  10. 最终确定界面中查看新的 Feed 配置,然后点击提交

UDM 映射表

日志字段 UDM 映射 逻辑
代理 read_only_udm.network.http.user_agent 从“agent”字段中获取的值。
app_protocol read_only_udm.network.application_protocol 派生自“app_protocol”字段。如果“app_protocol”包含“HTTPS”“HTTP”“SSH”或“RDP”,则使用相应协议。否则,默认值为“UNKNOWN_APPLICATION_PROTOCOL”。
app_protocol read_only_udm.network.application_protocol_version 值取自“app_protocol”字段。
auditType.action read_only_udm.security_result.action 派生自“auditType.action”字段。如果“auditType.action”包含“successful”,则该值设置为“ALLOW”。如果包含“restricted”,则将值设置为“BLOCK”。
auditType.action read_only_udm.security_result.summary 当“auditType”不为空且“auditType_area”为“SECURITY”时,从“auditType.action”字段中获取的值。
auditType.actionI18nKey read_only_udm.metadata.product_event_type 当“auditType”不为空时,从“auditType.actionI18nKey”字段中获取的值。
auditType.area read_only_udm.security_result.detection_fields.value 从“auditType.area”字段中获取值,并将其分配给检测字段的“value”字段,同时将“key”字段设置为“auditType area”。当“auditType”不为空时,系统会执行此映射。
auditType.category read_only_udm.security_result.category_details 当“auditType”不为空时,从“auditType.category”字段中获取的值。
auditType.categoryI18nKey read_only_udm.security_result.detection_fields.value 从“auditType.categoryI18nKey”字段中获取值,并将其分配给检测字段的“value”字段,同时将“key”字段设置为“auditType categoryI18nKey”。当“auditType”不为空时,系统会执行此映射。
auditType.level read_only_udm.security_result.detection_fields.value 从“auditType.level”字段中获取值,并将其分配给检测字段的“value”字段,同时将“key”字段设置为“auditType level”。当“auditType”不为空时,系统会执行此映射。
author.displayName read_only_udm.principal.user.user_display_name 取自“author.displayName”字段的值。
author.externalCollaborator read_only_udm.security_result.about.resource.attribute.labels.value 从“author.externalCollaborator”字段中提取值,并将其分配给“key”字段设置为“externalCollaborator”的标签的“value”字段。
author.id read_only_udm.principal.user.userid 当“author.type”为“user”且“principal_user_present”为“false”时,从“author.id”字段中获取的值。
author.isExternalCollaborator read_only_udm.security_result.about.resource.attribute.labels.value 从“author.isExternalCollaborator”字段中提取值,并将其分配给“key”字段设置为“isExternalCollaborator”的标签的“value”字段。
author.name read_only_udm.principal.user.user_display_name 当“author.type”为“user”且“principal_user_present”为“false”时,从“author.name”字段中获取的值。
bytes_in read_only_udm.network.received_bytes 如果“bytes_in”字段包含数字,则取自该字段的值。否则,默认值为 0。
类别 read_only_udm.security_result.category_details 值取自“category”字段。
changedValues read_only_udm.principal.resource.attribute.labels 遍历“changedValues”中的每个元素,并创建键类似于“changedValue [index] [key]”的标签,以及来自“changedValues”数组中相应的值。
创建日期 read_only_udm.metadata.event_timestamp 从“creationDate”字段中获取的值,解析为 UNIX 或 UNIX_MS 时间戳。
extraAttributes read_only_udm.principal.resource.attribute.labels 遍历“extraAttributes”中的每个元素,并根据“name”和“nameI18nKey”字段创建键,根据相应的“value”字段创建值。
http_verb read_only_udm.network.http.method 值取自“http_verb”字段。
ip read_only_udm.target.ip 从“ip”字段中获取的值。
principal_host read_only_udm.principal.hostname 从“principal_host”字段中获取的值。
referral_url read_only_udm.network.http.referral_url 从“referral_url”字段中获取的值。
remoteAddress read_only_udm.principal.ip 从“remoteAddress”字段中获取的值,解析为 IP 地址。
response_code read_only_udm.network.http.response_code 从“response_code”字段中获取的值。
session_duration read_only_udm.additional.fields.value.string_value 从“session_duration”字段中获取值,并将其分配给“key”字段设置为“Session Duration”的标签的“string_value”字段。
来源 read_only_udm.principal.ip 从“来源”字段中获取的值,解析为 IP 地址。
src_ip read_only_udm.principal.ip 如果“remoteAddress”为空,则取自“src_ip”字段的值。
摘要 read_only_udm.security_result.summary 值取自“摘要”字段。
sysAdmin read_only_udm.security_result.about.resource.attribute.labels.value 从“sysAdmin”字段中提取的值,并分配给“key”字段设置为“sysAdmin”的标签的“value”字段。
superAdmin read_only_udm.security_result.about.resource.attribute.labels.value 从“superAdmin”字段中获取值,并将其分配给“key”字段设置为“superAdmin”的标签的“value”字段。
target_url read_only_udm.target.url 从“target_url”字段中获取的值。
时间戳 read_only_udm.metadata.event_timestamp 从“时间戳”字段中获取的值,解析为日期和时间字符串。
user_id read_only_udm.principal.user.userid 值取自“user_id”字段。
read_only_udm.metadata.event_type 此字段的值由一系列检查确定,默认值为“GENERIC_EVENT”。系统会根据“principal_host”“user_id”“has_principal”和“author.type”等其他字段的存在情况和内容,将其设置为“NETWORK_HTTP”“USER_UNCATEGORIZED”或“STATUS_UPDATE”等特定值。
read_only_udm.metadata.vendor_name 设置为“ATLASSIAN”。
read_only_udm.metadata.product_name 设置为“CONFLUENCE”。
read_only_udm.metadata.log_type 设置为“ATLASSIAN_CONFLUENCE”。
read_only_udm.principal.user.user_display_name 此字段的值可能来自“author.displayName”或“affectedObject.name”,具体取决于上下文。
read_only_udm.target.process.pid 此字段的值可以来自“principal_host”或“pid”,具体取决于上下文。
read_only_udm.principal.resource.attribute.labels 此字段填充了从“affectedObjects”“changedValues”和“extraAttributes”等字段派生的各种标签。这些标签的键和值会根据这些字段的具体内容动态生成。

需要更多帮助?获得社区成员和 Google SecOps 专业人士的解答。