收集 Bitwarden Enterprise 事件日志
支持的平台:
Google SecOps
SIEM
本指南介绍了如何使用 Amazon S3 将 Bitwarden Enterprise 事件日志注入到 Google Security Operations。解析器会将原始 JSON 格式的事件日志转换为符合 Chronicle UDM 的结构化格式。它会提取相关字段(例如用户详细信息、IP 地址和事件类型),并将这些字段映射到相应的 UDM 字段,以便进行一致的安全分析。
准备工作
- Google SecOps 实例。
- 对 Bitwarden 租户的特权访问权限。
- 对 AWS(S3、IAM、Lambda、EventBridge)的特权访问权限。
获取 Bitwarden API 密钥和网址
- 在 Bitwarden 管理控制台中。
- 依次前往设置 > 组织信息 > 查看 API 密钥。
- 复制以下详细信息并将其保存在安全的位置:
- 客户端 ID (Client ID)
- 客户端密钥 (Client Secret)
- 确定您的 Bitwarden 端点(根据区域):
- IDENTITY_URL =
https://identity.bitwarden.com/connect/token(欧盟:https://identity.bitwarden.eu/connect/token) - API_BASE =
https://api.bitwarden.com(欧盟:https://api.bitwarden.eu)
- IDENTITY_URL =
为 Google SecOps 配置 AWS S3 存储桶和 IAM
- 按照以下用户指南创建 Amazon S3 存储桶:创建存储桶
- 保存存储桶名称和区域以供日后参考(例如
bitwarden-events)。 - 按照以下用户指南创建用户:创建 IAM 用户。
- 选择创建的用户。
- 选择安全凭据标签页。
- 在访问密钥部分中,点击创建访问密钥。
- 选择第三方服务作为使用情形。
- 点击下一步。
- 可选:添加说明标记。
- 点击创建访问密钥。
- 点击下载 .csv 文件以保存访问密钥和密钥,供日后参考。
- 点击完成。
- 选择权限标签页。
- 在权限政策部分中,点击添加权限。
- 选择添加权限。
- 选择直接附加政策。
- 搜索 AmazonS3FullAccess 政策。
- 选择相应政策。
- 点击下一步。
- 点击添加权限。
为 S3 上传配置 IAM 政策和角色
- 前往 AWS 控制台 > IAM > 政策 > 创建政策 > JSON 标签页。
- 复制并粘贴下方的政策。
- 政策 JSON(如果您输入了其他存储桶名称,请替换
bitwarden-events):
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AllowPutBitwardenObjects",
"Effect": "Allow",
"Action": "s3:PutObject",
"Resource": "arn:aws:s3:::bitwarden-events/*"
},
{
"Sid": "AllowGetStateObject",
"Effect": "Allow",
"Action": "s3:GetObject",
"Resource": "arn:aws:s3:::bitwarden-events/bitwarden/events/state.json"
}
]
}
- 依次点击下一步 > 创建政策。
- 依次前往 IAM > 角色 > 创建角色 > AWS 服务 > Lambda。
- 附加新创建的政策。
- 将角色命名为
WriteBitwardenToS3Role,然后点击创建角色。
创建 Lambda 函数
- 在 AWS 控制台中,依次前往 Lambda > 函数 > 创建函数。
- 点击从头开始创作。
- 提供以下配置详细信息:
| 设置 | 值 |
|---|---|
| 名称 | bitwarden_events_to_s3 |
| 运行时 | Python 3.13 |
| 架构 | x86_64 |
| 执行角色 | WriteBitwardenToS3Role |
- 创建函数后,打开 Code 标签页,删除桩代码并粘贴以下代码 (
bitwarden_events_to_s3.py)。
#!/usr/bin/env python3
import os, json, time, urllib.parse
from urllib.request import Request, urlopen
from urllib.error import HTTPError, URLError
import boto3
IDENTITY_URL = os.environ.get("IDENTITY_URL", "https://identity.bitwarden.com/connect/token")
API_BASE = os.environ.get("API_BASE", "https://api.bitwarden.com").rstrip("/")
CID = os.environ["BW_CLIENT_ID"] # organization.ClientId
CSECRET = os.environ["BW_CLIENT_SECRET"] # organization.ClientSecret
BUCKET = os.environ["S3_BUCKET"]
PREFIX = os.environ.get("S3_PREFIX", "bitwarden/events/").strip("/")
STATE_KEY = os.environ.get("STATE_KEY", "bitwarden/events/state.json")
MAX_PAGES = int(os.environ.get("MAX_PAGES", "10"))
HEADERS_FORM = {"Content-Type": "application/x-www-form-urlencoded"}
HEADERS_JSON = {"Accept": "application/json"}
s3 = boto3.client("s3")
def _read_state():
try:
obj = s3.get_object(Bucket=BUCKET, Key=STATE_KEY)
j = json.loads(obj["Body"].read())
return j.get("continuationToken")
except Exception:
return None
def _write_state(token):
body = json.dumps({"continuationToken": token}).encode("utf-8")
s3.put_object(Bucket=BUCKET, Key=STATE_KEY, Body=body, ContentType="application/json")
def _http(req: Request, timeout: int = 60, max_retries: int = 5):
attempt, backoff = 0, 1.0
while True:
try:
with urlopen(req, timeout=timeout) as r:
return json.loads(r.read().decode("utf-8"))
except HTTPError as e:
# Retry on 429 and 5xx
if (e.code == 429 or 500 <= e.code <= 599) and attempt < max_retries:
time.sleep(backoff); attempt += 1; backoff *= 2; continue
raise
except URLError:
if attempt < max_retries:
time.sleep(backoff); attempt += 1; backoff *= 2; continue
raise
def _get_token():
body = urllib.parse.urlencode({
"grant_type": "client_credentials",
"scope": "api.organization",
"client_id": CID,
"client_secret": CSECRET,
}).encode("utf-8")
req = Request(IDENTITY_URL, data=body, method="POST", headers=HEADERS_FORM)
data = _http(req, timeout=30)
return data["access_token"], int(data.get("expires_in", 3600))
def _fetch_events(bearer: str, cont: str | None):
params = {}
if cont:
params["continuationToken"] = cont
qs = ("?" + urllib.parse.urlencode(params)) if params else ""
url = f"{API_BASE}/public/events{qs}"
req = Request(url, method="GET", headers={"Authorization": f"Bearer {bearer}", **HEADERS_JSON})
return _http(req, timeout=60)
def _write_events_jsonl(events: list, run_ts_s: int, page_index: int) -> str:
"""
Write events in JSONL format (one JSON object per line).
Only writes if there are events to write.
Returns the S3 key of the written file.
"""
if not events:
return None
# Build JSONL content: one event per line
lines = [json.dumps(event, separators=(",", ":")) for event in events]
jsonl_content = "\n".join(lines) + "\n" # JSONL format with trailing newline
# Generate unique filename with page number to avoid conflicts
key = f"{PREFIX}/{time.strftime('%Y/%m/%d/%H%M%S', time.gmtime(run_ts_s))}-page{page_index:05d}-bitwarden-events.jsonl"
s3.put_object(
Bucket=BUCKET,
Key=key,
Body=jsonl_content.encode("utf-8"),
ContentType="application/x-ndjson", # MIME type for JSONL
)
return key
def lambda_handler(event=None, context=None):
bearer, _ttl = _get_token()
cont = _read_state()
run_ts_s = int(time.time())
pages = 0
total_events = 0
written_files = []
while pages < MAX_PAGES:
data = _fetch_events(bearer, cont)
# Extract events array from API response
# API returns: {"object":"list", "data":[...], "continuationToken":"..."}
events = data.get("data", [])
# Only write file if there are events
if events:
s3_key = _write_events_jsonl(events, run_ts_s, pages)
if s3_key:
written_files.append(s3_key)
total_events += len(events)
pages += 1
# Check for next page token
next_cont = data.get("continuationToken")
if next_cont:
cont = next_cont
continue
else:
# No more pages
break
# Save state only if there are more pages to continue in next run
# If we hit MAX_PAGES and there's still a continuation token, save it
# Otherwise, clear the state (set to None)
_write_state(cont if pages >= MAX_PAGES and cont else None)
return {
"ok": True,
"pages": pages,
"total_events": total_events,
"files_written": len(written_files),
"nextContinuationToken": cont if pages >= MAX_PAGES else None
}
if __name__ == "__main__":
print(lambda_handler())
- 依次前往配置 > 环境变量 > 修改 > 添加新的环境变量。
- 输入以下环境变量,并将其替换为您的值。
环境变量
| 键 | 示例 |
|---|---|
S3_BUCKET |
bitwarden-events |
S3_PREFIX |
bitwarden/events/ |
STATE_KEY |
bitwarden/events/state.json |
BW_CLIENT_ID |
<organization client_id> |
BW_CLIENT_SECRET |
<organization client_secret> |
IDENTITY_URL |
https://identity.bitwarden.com/connect/token (欧盟:https://identity.bitwarden.eu/connect/token) |
API_BASE |
https://api.bitwarden.com (欧盟:https://api.bitwarden.eu) |
MAX_PAGES |
10 |
- 创建函数后,请停留在其页面上(或依次打开 Lambda > 函数 > your‑function)。
- 选择配置标签页。
- 在常规配置面板中,点击修改。
- 将超时更改为 5 分钟(300 秒),然后点击保存。
创建 EventBridge 计划
- 依次前往 Amazon EventBridge > 调度器 > 创建调度。
- 提供以下配置详细信息:
- 周期性安排:费率 (
1 hour)。 - 目标:您的 Lambda 函数。
- 名称:
bitwarden-events-1h。
- 周期性安排:费率 (
- 点击创建时间表。
(可选)为 Google SecOps 创建只读 IAM 用户和密钥
- 依次前往 AWS 控制台> IAM > 用户 > 添加用户。
- 点击 Add users(添加用户)。
- 提供以下配置详细信息:
- 用户:输入
secops-reader。 - 访问类型:选择访问密钥 - 以程序化方式访问。
- 用户:输入
- 点击创建用户。
- 附加最低限度的读取政策(自定义):依次选择用户 > secops-reader > 权限 > 添加权限 > 直接附加政策 > 创建政策。
- JSON:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": ["s3:GetObject"],
"Resource": "arn:aws:s3:::<your-bucket>/*"
},
{
"Effect": "Allow",
"Action": ["s3:ListBucket"],
"Resource": "arn:aws:s3:::<your-bucket>"
}
]
}
- 名称 =
secops-reader-policy。 - 依次点击创建政策 > 搜索/选择 > 下一步 > 添加权限。
- 为
secops-reader创建访问密钥:安全凭据 > 访问密钥 > 创建访问密钥 > 下载.csv(您会将这些值粘贴到 Feed 中)。
在 Google SecOps 中配置 Feed 以注入 Bitwarden Enterprise 事件日志
- 依次前往 SIEM 设置> Feed。
- 点击 + 添加新 Feed。
- 在Feed 名称字段中,输入 Feed 的名称(例如
Bitwarden Events)。 - 选择 Amazon S3 V2 作为来源类型。
- 选择 Bitwarden 事件作为日志类型。
- 点击下一步。
- 为以下输入参数指定值:
- S3 URI:
s3://bitwarden-events/bitwarden/events/ - 源删除选项:根据您的偏好选择删除选项。
- 文件最长保留时间:默认值为 180 天。
- 访问密钥 ID:有权访问 S3 存储桶的用户访问密钥。
- 私有访问密钥:具有 S3 存储桶访问权限的用户私有密钥。
- 资产命名空间:资产命名空间。
- 注入标签:应用于此 Feed 中事件的标签。
- S3 URI:
- 点击下一步。
- 在最终确定界面中查看新的 Feed 配置,然后点击提交。
UDM 映射表
| 日志字段 | UDM 映射 | 逻辑 |
|---|---|---|
| actingUserId | target.user.userid | 如果 enriched.actingUser.userId 为空或 null,则此字段用于填充 target.user.userid 字段。 |
| collectionID | security_result.detection_fields.key | 在 security_result 中填充 detection_fields 内的 key 字段。 |
| collectionID | security_result.detection_fields.value | 在 security_result 中填充 detection_fields 内的 value 字段。 |
| 日期 | metadata.event_timestamp | 解析并转换为时间戳格式,然后映射到 event_timestamp。 |
| enriched.actingUser.accessAll | security_result.rule_labels.key | 将 security_result 中 rule_labels 内的值设置为“Access_All”。 |
| enriched.actingUser.accessAll | security_result.rule_labels.value | 使用从 enriched.actingUser.accessAll 转换而来的字符串值填充 security_result 中 rule_labels 内的 value 字段。 |
| enriched.actingUser.email | target.user.email_addresses | 填充 target.user 内的 email_addresses 字段。 |
| enriched.actingUser.id | metadata.product_log_id | 填充 metadata 内的 product_log_id 字段。 |
| enriched.actingUser.id | target.labels.key | 将值设置为 target.labels 中的“ID”。 |
| enriched.actingUser.id | target.labels.value | 使用 enriched.actingUser.id 中的值填充 target.labels 中的 value 字段。 |
| enriched.actingUser.name | target.user.user_display_name | 填充 target.user 内的 user_display_name 字段。 |
| enriched.actingUser.object | target.labels.key | 将 target.labels 中的值设置为“对象”。 |
| enriched.actingUser.object | target.labels.value | 使用 enriched.actingUser.object 中的值填充 target.labels 中的 value 字段。 |
| enriched.actingUser.resetPasswordEnrolled | target.labels.key | 将 target.labels 内的值设置为“ResetPasswordEnrolled”。 |
| enriched.actingUser.resetPasswordEnrolled | target.labels.value | 使用 enriched.actingUser.resetPasswordEnrolled 中的值(转换为字符串)填充 target.labels 中的 value 字段。 |
| enriched.actingUser.twoFactorEnabled | security_result.rule_labels.key | 将 security_result 中 rule_labels 内的值设置为“Two Factor Enabled”。 |
| enriched.actingUser.twoFactorEnabled | security_result.rule_labels.value | 使用从 enriched.actingUser.twoFactorEnabled 转换而来的字符串值填充 security_result 中 rule_labels 内的 value 字段。 |
| enriched.actingUser.userId | target.user.userid | 填充 target.user 内的 userid 字段。 |
| enriched.collection.id | additional.fields.key | 将值设置为 additional.fields 中的“集合 ID”。 |
| enriched.collection.id | additional.fields.value.string_value | 使用 enriched.collection.id 中的值填充 additional.fields 中的 string_value 字段。 |
| enriched.collection.object | additional.fields.key | 将 additional.fields 中的值设置为“集合对象”。 |
| enriched.collection.object | additional.fields.value.string_value | 使用 enriched.collection.object 中的值填充 additional.fields 中的 string_value 字段。 |
| enriched.type | metadata.product_event_type | 填充 metadata 内的 product_event_type 字段。 |
| groupId | target.user.group_identifiers | 将值添加到 target.user 中的 group_identifiers 数组。 |
| ipAddress | principal.ip | 从字段中提取 IP 地址并映射到 principal.ip。 |
| 不适用 | extensions.auth | 解析器会创建一个空对象。 |
| 不适用 | metadata.event_type | 根据 enriched.type 以及是否存在 principal 和 target 信息来确定。可能的值:USER_LOGIN、STATUS_UPDATE、GENERIC_EVENT。 |
| 不适用 | security_result.action | 根据 enriched.type 确定。可能的值:ALLOW、BLOCK。 |
| 对象 | additional.fields.key | 将 additional.fields 中的值设置为“对象”。 |
| 对象 | additional.fields.value | 使用 object 中的值填充 additional.fields 中的 value 字段。 |
需要更多帮助?获得社区成员和 Google SecOps 专业人士的解答。