收集 Atlassian Confluence 日志
本文档介绍了如何将 Atlassian Confluence 日志提取到 Google Security Operations。解析器首先尝试使用专为 Atlassian Confluence 日志设计的正则表达式(Grok 模式)从原始日志消息中提取字段。如果 grok 解析失败或日志采用 JSON 格式,则代码会尝试将消息解析为 JSON。最后,提取的字段会映射到 Google SecOps UDM 架构,并添加更多上下文信息。
准备工作
请确保满足以下前提条件:
- Google SecOps 实例
- 具有审核日志访问权限的 Atlassian Confluence Cloud 账号,或者具有管理权限的 Confluence Data Center/Server 账号
- 对于基于 GCP 的方法:对 GCP(GCS、IAM、Cloud Run、Pub/Sub、Cloud Scheduler)的特权访问权限
- 对于 Bindplane 方法:Windows Server 2016 或更高版本,或者具有
systemd的 Linux 主机
集成选项概览
本指南提供了两种集成途径:
- 选项 1:通过 Bindplane + Syslog 收集 Confluence Data Center/Server 数据
- 方式 2:通过 GCP Cloud Run 函数 + GCS(JSON 格式)获取 Confluence Cloud 审核日志
请选择最适合您的 Confluence 部署类型和基础架构的选项。
选项 1:通过 Bindplane + Syslog 收集 Confluence Data Center/Server 日志
此选项用于将 Confluence Data Center 或 Server 配置为通过 syslog 将日志发送到 Bindplane 代理,然后由该代理将日志转发到 Google SecOps。
获取 Google SecOps 注入身份验证文件
- 登录 Google SecOps 控制台。
- 依次前往 SIEM 设置 > 收集代理。
- 点击下载以下载内容提交身份验证文件。
将文件安全地保存在将要安装 Bindplane 代理的系统上。
获取 Google SecOps 客户 ID
- 登录 Google SecOps 控制台。
- 依次前往 SIEM 设置 > 个人资料。
复制并保存组织详细信息部分中的客户 ID。
安装 Bindplane 代理
按照以下说明在 Windows 或 Linux 操作系统上安装 Bindplane 代理。
Windows 安装
- 以管理员身份打开命令提示符或 PowerShell。
运行以下命令:
msiexec /i "https://github.com/observIQ/bindplane-otel-collector/releases/latest/download/observiq-otel-collector.msi" /quiet等待安装完成。
运行以下命令来验证安装:
sc query observiq-otel-collector
该服务应显示为 RUNNING。
Linux 安装
- 打开具有 root 或 sudo 权限的终端。
运行以下命令:
sudo sh -c "$(curl -fsSlL https://github.com/observIQ/bindplane-otel-collector/releases/latest/download/install_unix.sh)" install_unix.sh等待安装完成。
运行以下命令来验证安装:
sudo systemctl status observiq-otel-collector
该服务应显示为有效(正在运行)。
其他安装资源
如需了解其他安装选项和问题排查信息,请参阅 Bindplane 代理安装指南。
配置 Bindplane 代理以注入 syslog 并将其发送到 Google SecOps
找到配置文件
Linux:
sudo nano /etc/bindplane-agent/config.yamlWindows:
notepad "C:\Program Files\observIQ OpenTelemetry Collector\config.yaml"
修改配置文件
将
config.yaml的全部内容替换为以下配置:receivers: udplog: listen_address: "0.0.0.0:514" exporters: chronicle/confluence_logs: compression: gzip creds_file_path: '/etc/bindplane-agent/ingestion-auth.json' customer_id: 'YOUR_CUSTOMER_ID' endpoint: malachiteingestion-pa.googleapis.com log_type: ATLASSIAN_CONFLUENCE raw_log_field: body ingestion_labels: service: confluence service: pipelines: logs/confluence: receivers: - udplog exporters: - chronicle/confluence_logs
配置参数
替换以下占位符:
listen_address:根据您的基础架构需要替换端口和 IP 地址。使用0.0.0.0:514在端口 514 上监听所有接口。creds_file_path:更新为身份验证文件的保存路径:- Linux:
/etc/bindplane-agent/ingestion-auth.json - Windows:
C:\Program Files\observIQ OpenTelemetry Collector\ingestion-auth.json
- Linux:
customer_id:将YOUR_CUSTOMER_ID替换为上一步中的实际客户 ID。endpoint:区域端点网址:- 美国:
malachiteingestion-pa.googleapis.com - 欧洲:
europe-malachiteingestion-pa.googleapis.com - 亚洲:
asia-southeast1-malachiteingestion-pa.googleapis.com
- 美国:
保存配置文件
修改后,保存文件:
- Linux:依次按
Ctrl+O、Enter和Ctrl+X - Windows:依次点击文件 > 保存
重启 Bindplane 代理以应用更改
在 Linux 中重启 Bindplane 代理
如需在 Linux 中重启 Bindplane 代理,请运行以下命令:
sudo systemctl restart observiq-otel-collector验证服务是否正在运行:
sudo systemctl status observiq-otel-collector检查日志是否存在错误:
sudo journalctl -u observiq-otel-collector -f
在 Windows 中重启 Bindplane 代理
如需在 Windows 中重启 Bindplane 代理,请选择以下选项之一:
以管理员身份使用命令提示符或 PowerShell:
net stop observiq-otel-collector && net start observiq-otel-collector使用“服务”控制台:
- 按
Win+R,输入services.msc,然后按 Enter 键。 - 找到 observIQ OpenTelemetry 收集器。
- 右键点击并选择重新启动。
验证服务是否正在运行:
sc query observiq-otel-collector检查日志是否存在错误:
type "C:\Program Files\observIQ OpenTelemetry Collector\log\collector.log"
- 按
在 Confluence Data Center/Server 上配置 Syslog 转发
方案 A:配置 rsyslog 以转发本地日志文件(推荐)
- 将 Confluence 配置为将日志写入文件(默认行为)。
如果 rsyslog 不存在,请进行安装:
sudo apt-get install rsyslog # Debian/Ubuntu sudo yum install rsyslog # RHEL/CentOS创建 rsyslog 配置文件
/etc/rsyslog.d/confluence.conf:# Forward Confluence logs to Bindplane $ModLoad imfile # Application logs $InputFileName /opt/atlassian/confluence/logs/atlassian-confluence.log $InputFileTag confluence-app: $InputFileStateFile stat-confluence-app $InputFileSeverity info $InputFileFacility local0 $InputRunFileMonitor # Audit logs (JSON format in DC/Server) $InputFileName <confluence-home-directory>/log/audit/audit.log $InputFileTag confluence-audit: $InputFileStateFile stat-confluence-audit $InputFileSeverity info $InputFileFacility local1 $InputRunFileMonitor # Forward to Bindplane agent *.* @@BINDPLANE_AGENT_IP:514- 将
BINDPLANE_AGENT_IP替换为 Bindplane 代理的 IP 地址(例如192.168.1.100)。 - 根据您的 Confluence 安装情况调整日志文件路径:
- 应用日志通常:
<confluence-install>/logs/或<local-home>/logs/ - 审核日志:
<confluence-home-directory>/log/audit/(JSON 格式) - 如需查找 Confluence 主目录,请依次前往设置 > 常规配置 > 系统信息,然后查找 Confluence 主目录或本地主目录。
- 应用日志通常:
- 将
重启 rsyslog:
sudo systemctl restart rsyslog
选项 B:配置 Log4j2 Syslog 转发
此选项需要修改 Log4j2 配置。建议选择方案 A (rsyslog),因为它比较简单。
- 通过 SSH 或 RDP 登录您的 Confluence 服务器。
找到以下位置的 Log4j2 配置文件:
<confluence-install>/confluence/WEB-INF/classes/log4j2.xml修改配置文件以添加 Syslog appender:
<Configuration> <Appenders> <!-- Existing appenders --> <Syslog name="SyslogAppender" host="BINDPLANE_AGENT_IP" port="514" protocol="UDP" format="RFC5424" facility="LOCAL0"> <PatternLayout pattern="%d{ISO8601} %p [%t] [%c{1}] %m%n"/> </Syslog> </Appenders> <Loggers> <Root level="info"> <AppenderRef ref="SyslogAppender"/> <!-- Other appender refs --> </Root> <!-- Audit logger --> <Logger name="com.atlassian.confluence.event.events.security.AuditEvent" level="info" additivity="false"> <AppenderRef ref="SyslogAppender"/> </Logger> </Loggers> </Configuration>- 将
BINDPLANE_AGENT_IP替换为 Bindplane 代理的 IP 地址(例如192.168.1.100)。
- 将
重启 Confluence 以应用更改:
sudo systemctl restart confluence
方案 2:通过 GCP Cloud Run 函数和 GCS 获取 Confluence Cloud 审核日志
此方法使用 GCP Cloud Run 函数通过 Confluence Audit REST API 定期提取审核日志,并将其存储在 GCS 中以供 Google SecOps 提取。
收集 Confluence Cloud API 凭据
- 登录您的 Atlassian 账号。
- 前往 https://id.atlassian.com/manage-profile/security/api-tokens。
- 点击创建 API 令牌。
- 为令牌输入标签(例如,
Google Security Operations Integration)。 - 点击创建。
- 复制并妥善保存 API 令牌。
- 记下您的 Confluence Cloud 网站网址(例如
https://yoursite.atlassian.net)。 记下您的 Atlassian 账号电子邮件地址(用于身份验证)。
验证权限
如需验证账号是否具有所需权限,请执行以下操作:
- 登录 Confluence Cloud。
- 点击右上角的设置图标 (⚙️)。
- 如果您可以在左侧导航栏中看到监控 > 审核日志,则说明您拥有所需的权限。
- 如果您看不到此选项,请与您的管理员联系,以获取 Confluence 管理员权限。
测试 API 访问权限
在继续进行集成之前,请先测试您的凭据:
# Replace with your actual credentials CONFLUENCE_EMAIL="your-email@example.com" CONFLUENCE_API_TOKEN="your-api-token" CONFLUENCE_URL="https://yoursite.atlassian.net" # Test API access curl -u "${CONFLUENCE_EMAIL}:${CONFLUENCE_API_TOKEN}" \ -H "Accept: application/json" \ "${CONFLUENCE_URL}/wiki/rest/api/audit"
创建 Google Cloud Storage 存储分区
- 前往 Google Cloud 控制台。
- 选择您的项目或创建新项目。
- 在导航菜单中,依次前往 Cloud Storage > 存储分区。
- 点击创建存储分区。
提供以下配置详细信息:
设置 值 为存储分区命名 输入一个全局唯一的名称(例如 confluence-audit-logs)位置类型 根据您的需求进行选择(区域级、双区域级、多区域级) 位置 选择相应位置(例如 us-central1)存储类别 标准(建议用于经常访问的日志) 访问权限控制 统一(推荐) 保护工具 可选:启用对象版本控制或保留政策 点击创建。
为 Cloud Run 函数创建服务账号
Cloud Run 函数需要一个服务账号,该账号具有向 GCS 存储分区写入内容以及被 Pub/Sub 调用的权限。
创建服务账号
- 在 GCP 控制台中,依次前往 IAM 和管理 > 服务账号。
- 点击创建服务账号。
- 提供以下配置详细信息:
- 服务账号名称:输入
confluence-audit-collector-sa。 - 服务账号说明:输入
Service account for Cloud Run function to collect Confluence Cloud audit logs。
- 服务账号名称:输入
- 点击创建并继续。
- 在向此服务账号授予对项目的访问权限部分中,添加以下角色:
- 点击选择角色。
- 搜索并选择 Storage Object Admin。
- 点击 + 添加其他角色。
- 搜索并选择 Cloud Run Invoker。
- 点击 + 添加其他角色。
- 搜索并选择 Cloud Functions Invoker。
- 点击继续。
- 点击完成。
必须拥有这些角色,才能:
- Storage Object Admin:将日志写入 GCS 存储分区并管理状态文件
- Cloud Run Invoker:允许 Pub/Sub 调用函数
- Cloud Functions Invoker:允许调用函数
授予对 GCS 存储分区的 IAM 权限
向服务账号授予对 GCS 存储分区的写入权限:
- 前往 Cloud Storage > 存储分区。
- 点击您的存储分区名称。
- 前往权限标签页。
- 点击授予访问权限。
- 提供以下配置详细信息:
- 添加主账号:输入服务账号电子邮件地址(例如
confluence-audit-collector-sa@PROJECT_ID.iam.gserviceaccount.com)。 - 分配角色:选择 Storage Object Admin。
- 添加主账号:输入服务账号电子邮件地址(例如
- 点击保存。
创建发布/订阅主题
创建一个 Pub/Sub 主题,供 Cloud Scheduler 发布消息,并供 Cloud Run 函数订阅。
- 在 GCP 控制台中,前往 Pub/Sub > 主题。
- 点击创建主题。
- 提供以下配置详细信息:
- 主题 ID:输入
confluence-audit-trigger。 - 将其他设置保留为默认值。
- 主题 ID:输入
- 点击创建。
创建 Cloud Run 函数以收集日志
Cloud Run 函数由来自 Cloud Scheduler 的 Pub/Sub 消息触发,以从 Confluence Cloud Audit API 中提取日志并将其写入 GCS。
- 在 GCP 控制台中,前往 Cloud Run。
- 点击创建服务。
- 选择函数(使用内嵌编辑器创建函数)。
在配置部分中,提供以下配置详细信息:
设置 值 Service 名称 confluence-audit-collector区域 选择与您的 GCS 存储分区匹配的区域(例如 us-central1)运行时 选择 Python 3.12 或更高版本 在触发器(可选)部分中:
- 点击 + 添加触发器。
- 选择 Cloud Pub/Sub。
- 在选择 Cloud Pub/Sub 主题中,选择
confluence-audit-trigger。 - 点击保存。
在身份验证部分中:
- 选择需要进行身份验证。
- 检查 Identity and Access Management (IAM)。
向下滚动并展开容器、网络、安全性。
前往安全标签页:
- 服务账号:选择
confluence-audit-collector-sa。
- 服务账号:选择
前往容器标签页:
- 点击变量和密钥。
- 为每个环境变量点击 + 添加变量:
变量名称 示例值 说明 GCS_BUCKETconfluence-audit-logsGCS 存储分区名称 GCS_PREFIXconfluence-audit日志文件的前缀 STATE_KEYconfluence-audit/state.json状态文件路径 CONFLUENCE_URLhttps://yoursite.atlassian.netConfluence 网站网址 CONFLUENCE_EMAILyour-email@example.comAtlassian 账号电子邮件地址 CONFLUENCE_API_TOKENyour-api-token-hereAPI 令牌 MAX_RECORDS1000每次运行的记录数上限 在变量和 Secret 部分中,向下滚动到请求:
- 请求超时:输入
600秒(10 分钟)。
- 请求超时:输入
前往设置标签页:
- 在资源部分中:
- 内存:选择 512 MiB 或更高值。
- CPU:选择 1。
- 在资源部分中:
在修订版本扩缩部分中:
- 实例数下限:输入
0。 - 实例数上限:输入
100(或根据预期负载进行调整)。
- 实例数下限:输入
点击创建。
等待服务创建完成(1-2 分钟)。
创建服务后,系统会自动打开内嵌代码编辑器。
添加函数代码
- 在函数入口点中输入 main
在内嵌代码编辑器中,创建两个文件:
- 第一个文件:main.py::
import functions_framework from google.cloud import storage import json import os import urllib3 from datetime import datetime, timezone, timedelta import time import base64 # Initialize HTTP client with timeouts http = urllib3.PoolManager( timeout=urllib3.Timeout(connect=5.0, read=30.0), retries=False, ) # Initialize Storage client storage_client = storage.Client() # Environment variables GCS_BUCKET = os.environ.get('GCS_BUCKET') GCS_PREFIX = os.environ.get('GCS_PREFIX', 'confluence-audit/') STATE_KEY = os.environ.get('STATE_KEY', 'confluence-audit/state.json') CONFLUENCE_URL = os.environ.get('CONFLUENCE_URL') CONFLUENCE_EMAIL = os.environ.get('CONFLUENCE_EMAIL') CONFLUENCE_API_TOKEN = os.environ.get('CONFLUENCE_API_TOKEN') MAX_RECORDS = int(os.environ.get('MAX_RECORDS', '1000')) def to_unix_millis(dt: datetime) -> int: """Convert datetime to Unix epoch milliseconds.""" if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) dt = dt.astimezone(timezone.utc) return int(dt.timestamp() * 1000) def parse_datetime(value: str) -> datetime: """Parse ISO datetime string to datetime object.""" if value.endswith("Z"): value = value[:-1] + "+00:00" return datetime.fromisoformat(value) @functions_framework.cloud_event def main(cloud_event): """ Cloud Run function triggered by Pub/Sub to fetch Confluence Cloud audit logs and write to GCS. Args: cloud_event: CloudEvent object containing Pub/Sub message """ if not all([GCS_BUCKET, CONFLUENCE_URL, CONFLUENCE_EMAIL, CONFLUENCE_API_TOKEN]): print('Error: Missing required environment variables') return try: # Get GCS bucket bucket = storage_client.bucket(GCS_BUCKET) # Load state state = load_state(bucket, STATE_KEY) # Determine time window now = datetime.now(timezone.utc) last_time = None if isinstance(state, dict) and state.get("last_event_time"): try: last_time = parse_datetime(state["last_event_time"]) # Overlap by 2 minutes to catch any delayed events last_time = last_time - timedelta(minutes=2) except Exception as e: print(f"Warning: Could not parse last_event_time: {e}") if last_time is None: last_time = now - timedelta(hours=24) print(f"Fetching logs from {last_time.isoformat()} to {now.isoformat()}") # Convert to Unix milliseconds start_millis = to_unix_millis(last_time) end_millis = to_unix_millis(now) # Fetch logs records, newest_event_time = fetch_logs( api_base=CONFLUENCE_URL, email=CONFLUENCE_EMAIL, api_token=CONFLUENCE_API_TOKEN, start_time_ms=start_millis, end_time_ms=end_millis, max_records=MAX_RECORDS, ) if not records: print("No new log records found.") save_state(bucket, STATE_KEY, now.isoformat()) return # Write to GCS as NDJSON timestamp = now.strftime('%Y%m%d_%H%M%S') object_key = f"{GCS_PREFIX}/logs_{timestamp}.ndjson" blob = bucket.blob(object_key) ndjson = '\n'.join([json.dumps(record, ensure_ascii=False) for record in records]) + '\n' blob.upload_from_string(ndjson, content_type='application/x-ndjson') print(f"Wrote {len(records)} records to gs://{GCS_BUCKET}/{object_key}") # Update state with newest event time if newest_event_time: save_state(bucket, STATE_KEY, newest_event_time) else: save_state(bucket, STATE_KEY, now.isoformat()) print(f"Successfully processed {len(records)} records") except Exception as e: print(f'Error processing logs: {str(e)}') raise def load_state(bucket, key): """Load state from GCS.""" try: blob = bucket.blob(key) if blob.exists(): state_data = blob.download_as_text() return json.loads(state_data) except Exception as e: print(f"Warning: Could not load state: {e}") return {} def save_state(bucket, key, last_event_time_iso: str): """Save the last event timestamp to GCS state file.""" try: state = {'last_event_time': last_event_time_iso} blob = bucket.blob(key) blob.upload_from_string( json.dumps(state, indent=2), content_type='application/json' ) print(f"Saved state: last_event_time={last_event_time_iso}") except Exception as e: print(f"Warning: Could not save state: {e}") def fetch_logs(api_base: str, email: str, api_token: str, start_time_ms: int, end_time_ms: int, max_records: int): """ Fetch logs from Confluence Cloud Audit API with pagination and rate limiting. Args: api_base: Confluence site URL email: Atlassian account email api_token: API token start_time_ms: Start time in Unix milliseconds end_time_ms: End time in Unix milliseconds max_records: Maximum total records to fetch Returns: Tuple of (records list, newest_event_time ISO string) """ # Clean up URL base_url = api_base.rstrip('/') # Build authentication header auth_string = f"{email}:{api_token}" auth_bytes = auth_string.encode('utf-8') auth_b64 = base64.b64encode(auth_bytes).decode('utf-8') headers = { 'Authorization': f'Basic {auth_b64}', 'Accept': 'application/json', 'User-Agent': 'GoogleSecOps-ConfluenceCollector/1.0' } records = [] newest_time = None page_num = 0 backoff = 1.0 start_index = 0 while True: page_num += 1 if len(records) >= max_records: print(f"Reached max_records limit ({max_records})") break # Build request URL url = f"{base_url}/wiki/rest/api/audit?startDate={start_time_ms}&endDate={end_time_ms}&start={start_index}&limit=100" try: response = http.request('GET', url, headers=headers) # Handle rate limiting with exponential backoff if response.status == 429: retry_after = int(response.headers.get('Retry-After', str(int(backoff)))) print(f"Rate limited (429). Retrying after {retry_after}s...") time.sleep(retry_after) backoff = min(backoff * 2, 30.0) continue backoff = 1.0 if response.status != 200: print(f"HTTP Error: {response.status}") response_text = response.data.decode('utf-8') print(f"Response body: {response_text}") return [], None data = json.loads(response.data.decode('utf-8')) page_results = data.get('results', []) if not page_results: print(f"No more results (empty page)") break print(f"Page {page_num}: Retrieved {len(page_results)} events") records.extend(page_results) # Track newest event time for event in page_results: try: # creationDate is in Unix milliseconds event_time_ms = event.get('creationDate') if event_time_ms: event_dt = datetime.fromtimestamp(event_time_ms / 1000, tz=timezone.utc) event_time = event_dt.isoformat() if newest_time is None or parse_datetime(event_time) > parse_datetime(newest_time): newest_time = event_time except Exception as e: print(f"Warning: Could not parse event time: {e}") # Check for more results current_size = data.get('size', 0) if current_size < 100: print(f"Reached last page (size={current_size} < limit=100)") break start_index += current_size except Exception as e: print(f"Error fetching logs: {e}") return [], None print(f"Retrieved {len(records)} total records from {page_num} pages") return records[:max_records], newest_time- 第二个文件:requirements.txt::
functions-framework==3.* google-cloud-storage==2.* urllib3>=2.0.0点击部署以保存并部署该函数。
等待部署完成(2-3 分钟)。
创建 Cloud Scheduler 作业
Cloud Scheduler 会定期向 Pub/Sub 主题发布消息,从而触发 Cloud Run 函数。
- 在 GCP Console 中,前往 Cloud Scheduler。
- 点击创建作业。
提供以下配置详细信息:
设置 值 名称 confluence-audit-collector-hourly区域 选择与 Cloud Run 函数相同的区域 频率 0 * * * *(每小时一次,在整点时)时区 选择时区(建议选择世界协调时间 [UTC]) 目标类型 Pub/Sub 主题 选择 confluence-audit-trigger消息正文 {}(空 JSON 对象)点击创建。
时间表频率选项
根据日志量和延迟时间要求选择频次:
频率 Cron 表达式 使用场景 每隔 5 分钟 */5 * * * *高容量、低延迟 每隔 15 分钟 */15 * * * *搜索量中等 每小时 0 * * * *标准(推荐) 每 6 小时 0 */6 * * *量小、批处理 每天 0 0 * * *历史数据收集
测试集成
- 在 Cloud Scheduler 控制台中,找到您的作业。
- 点击强制运行以手动触发作业。
- 等待几秒钟。
- 前往 Cloud Run > 服务。
- 点击
confluence-audit-collector。 - 点击日志标签页。
验证函数是否已成功执行。查找以下项:
Fetching logs from YYYY-MM-DDTHH:MM:SS+00:00 to YYYY-MM-DDTHH:MM:SS+00:00 Page 1: Retrieved X events Wrote X records to gs://bucket-name/prefix/logs_YYYYMMDD_HHMMSS.ndjson Successfully processed X records前往 Cloud Storage > 存储分区。
点击您的存储分区名称。
导航到
confluence-audit/文件夹。验证是否已创建具有当前时间戳的新
.ndjson文件。
如果您在日志中看到错误,请执行以下操作:
- HTTP 401:检查环境变量中的 API 凭据
- HTTP 403:验证账号是否具有 Confluence 管理员权限
- HTTP 429:速率限制 - 函数将自动重试并进行退避
- 缺少环境变量:检查是否已设置所有必需的变量
检索 Google SecOps 服务账号
Google SecOps 使用唯一的服务账号从您的 GCS 存储分区中读取数据。您必须授予此服务账号对您的存储分区的访问权限。
获取服务账号电子邮件地址
- 依次前往 SIEM 设置 > Feed。
- 点击添加新 Feed。
- 点击配置单个 Feed。
- 在Feed 名称字段中,输入 Feed 的名称(例如
Confluence Cloud Audit Logs)。 - 选择 Google Cloud Storage V2 作为来源类型。
- 选择 Atlassian Confluence 作为日志类型。
点击获取服务账号。系统会显示一个唯一的服务账号电子邮件地址,例如:
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com复制此电子邮件地址,以便在下一步中使用。
向 Google SecOps 服务账号授予 IAM 权限
Google SecOps 服务账号需要对您的 GCS 存储分区具有 Storage Object Viewer 角色。
- 前往 Cloud Storage > 存储分区。
- 点击您的存储分区名称。
- 前往权限标签页。
- 点击授予访问权限。
- 提供以下配置详细信息:
- 添加主账号:粘贴 Google SecOps 服务账号电子邮件地址。
- 分配角色:选择 Storage Object Viewer。
点击保存。
在 Google SecOps 中配置 Feed 以提取 Confluence 日志
- 依次前往 SIEM 设置 > Feed。
- 点击添加新 Feed。
- 点击配置单个 Feed。
- 在Feed 名称字段中,输入 Feed 的名称(例如
Confluence Cloud Audit Logs)。 - 选择 Google Cloud Storage V2 作为来源类型。
- 选择 Atlassian Confluence 作为日志类型。
- 点击下一步。
为以下输入参数指定值:
存储分区网址:输入带有前缀路径的 GCS 存储分区 URI:
gs://confluence-audit-logs/confluence-audit/将
confluence-audit-logs:您的 GCS 存储分区名称。confluence-audit:存储日志的可选前缀/文件夹路径(留空表示根目录)。
示例:
- 根存储分区:
gs://company-logs/ - 带前缀:
gs://company-logs/confluence-audit/ - 使用子文件夹:
gs://company-logs/confluence/audit/
- 根存储分区:
来源删除选项:根据您的偏好选择删除选项:
- 永不:永不删除转移后的任何文件(建议用于测试)。
- 删除已转移的文件:在成功转移后删除文件。
删除已转移的文件和空目录:在成功转移后删除文件和空目录。
文件存在时间上限:包含在过去指定天数内修改的文件。默认值为 180 天。
资产命名空间:资产命名空间。
注入标签:要应用于此 Feed 中事件的标签。
点击下一步。
在最终确定界面中查看新的 Feed 配置,然后点击提交。
UDM 映射表
| 日志字段 | UDM 映射 | 逻辑 |
|---|---|---|
| 代理 | read_only_udm.network.http.user_agent | 从“agent”字段中获取的值。 |
| app_protocol | read_only_udm.network.application_protocol | 派生自“app_protocol”字段。如果“app_protocol”包含“HTTPS”“HTTP”“SSH”或“RDP”,则使用相应协议。否则,默认值为“UNKNOWN_APPLICATION_PROTOCOL”。 |
| app_protocol | read_only_udm.network.application_protocol_version | 值取自“app_protocol”字段。 |
| auditType.action | read_only_udm.security_result.action | 派生自“auditType.action”字段。如果“auditType.action”包含“successful”,则该值设置为“ALLOW”。如果包含“restricted”,则将值设置为“BLOCK”。 |
| auditType.action | read_only_udm.security_result.summary | 当“auditType”不为空且“auditType_area”为“SECURITY”时,从“auditType.action”字段中获取的值。 |
| auditType.actionI18nKey | read_only_udm.metadata.product_event_type | 当“auditType”不为空时,从“auditType.actionI18nKey”字段中获取的值。 |
| auditType.area | read_only_udm.security_result.detection_fields.value | 从“auditType.area”字段中获取值,并将其分配给检测字段的“value”字段,同时将“key”字段设置为“auditType area”。当“auditType”不为空时,系统会执行此映射。 |
| auditType.category | read_only_udm.security_result.category_details | 当“auditType”不为空时,从“auditType.category”字段中获取的值。 |
| auditType.categoryI18nKey | read_only_udm.security_result.detection_fields.value | 从“auditType.categoryI18nKey”字段中获取值,并将其分配给检测字段的“value”字段,同时将“key”字段设置为“auditType categoryI18nKey”。当“auditType”不为空时,系统会执行此映射。 |
| auditType.level | read_only_udm.security_result.detection_fields.value | 从“auditType.level”字段中获取值,并将其分配给检测字段的“value”字段,同时将“key”字段设置为“auditType level”。当“auditType”不为空时,系统会执行此映射。 |
| author.displayName | read_only_udm.principal.user.user_display_name | 取自“author.displayName”字段的值。 |
| author.externalCollaborator | read_only_udm.security_result.about.resource.attribute.labels.value | 从“author.externalCollaborator”字段中提取值,并将其分配给“key”字段设置为“externalCollaborator”的标签的“value”字段。 |
| author.id | read_only_udm.principal.user.userid | 当“author.type”为“user”且“principal_user_present”为“false”时,从“author.id”字段中获取的值。 |
| author.isExternalCollaborator | read_only_udm.security_result.about.resource.attribute.labels.value | 从“author.isExternalCollaborator”字段中提取值,并将其分配给“key”字段设置为“isExternalCollaborator”的标签的“value”字段。 |
| author.name | read_only_udm.principal.user.user_display_name | 当“author.type”为“user”且“principal_user_present”为“false”时,从“author.name”字段中获取的值。 |
| bytes_in | read_only_udm.network.received_bytes | 如果“bytes_in”字段包含数字,则取自该字段的值。否则,默认值为 0。 |
| 类别 | read_only_udm.security_result.category_details | 值取自“category”字段。 |
| changedValues | read_only_udm.principal.resource.attribute.labels | 遍历“changedValues”中的每个元素,并创建键类似于“changedValue [index] [key]”的标签,以及来自“changedValues”数组中相应的值。 |
| 创建日期 | read_only_udm.metadata.event_timestamp | 从“creationDate”字段中获取的值,解析为 UNIX 或 UNIX_MS 时间戳。 |
| extraAttributes | read_only_udm.principal.resource.attribute.labels | 遍历“extraAttributes”中的每个元素,并根据“name”和“nameI18nKey”字段创建键,根据相应的“value”字段创建值。 |
| http_verb | read_only_udm.network.http.method | 值取自“http_verb”字段。 |
| ip | read_only_udm.target.ip | 从“ip”字段中获取的值。 |
| principal_host | read_only_udm.principal.hostname | 从“principal_host”字段中获取的值。 |
| referral_url | read_only_udm.network.http.referral_url | 从“referral_url”字段中获取的值。 |
| remoteAddress | read_only_udm.principal.ip | 从“remoteAddress”字段中获取的值,解析为 IP 地址。 |
| response_code | read_only_udm.network.http.response_code | 从“response_code”字段中获取的值。 |
| session_duration | read_only_udm.additional.fields.value.string_value | 从“session_duration”字段中获取值,并将其分配给“key”字段设置为“Session Duration”的标签的“string_value”字段。 |
| 来源 | read_only_udm.principal.ip | 从“来源”字段中获取的值,解析为 IP 地址。 |
| src_ip | read_only_udm.principal.ip | 如果“remoteAddress”为空,则取自“src_ip”字段的值。 |
| 摘要 | read_only_udm.security_result.summary | 值取自“摘要”字段。 |
| sysAdmin | read_only_udm.security_result.about.resource.attribute.labels.value | 从“sysAdmin”字段中提取的值,并分配给“key”字段设置为“sysAdmin”的标签的“value”字段。 |
| superAdmin | read_only_udm.security_result.about.resource.attribute.labels.value | 从“superAdmin”字段中获取值,并将其分配给“key”字段设置为“superAdmin”的标签的“value”字段。 |
| target_url | read_only_udm.target.url | 从“target_url”字段中获取的值。 |
| 时间戳 | read_only_udm.metadata.event_timestamp | 从“时间戳”字段中获取的值,解析为日期和时间字符串。 |
| user_id | read_only_udm.principal.user.userid | 值取自“user_id”字段。 |
| read_only_udm.metadata.event_type | 此字段的值由一系列检查确定,默认值为“GENERIC_EVENT”。系统会根据“principal_host”“user_id”“has_principal”和“author.type”等其他字段的存在情况和内容,将其设置为“NETWORK_HTTP”“USER_UNCATEGORIZED”或“STATUS_UPDATE”等特定值。 | |
| read_only_udm.metadata.vendor_name | 设置为“ATLASSIAN”。 | |
| read_only_udm.metadata.product_name | 设置为“CONFLUENCE”。 | |
| read_only_udm.metadata.log_type | 设置为“ATLASSIAN_CONFLUENCE”。 | |
| read_only_udm.principal.user.user_display_name | 此字段的值可能来自“author.displayName”或“affectedObject.name”,具体取决于上下文。 | |
| read_only_udm.target.process.pid | 此字段的值可以来自“principal_host”或“pid”,具体取决于上下文。 | |
| read_only_udm.principal.resource.attribute.labels | 此字段填充了从“affectedObjects”“changedValues”和“extraAttributes”等字段派生的各种标签。这些标签的键和值会根据这些字段的具体内容动态生成。 |
需要更多帮助?获得社区成员和 Google SecOps 专业人士的解答。