收集 Bitwarden Enterprise 事件日志

支持的平台:

本指南介绍了如何使用 Amazon S3 将 Bitwarden Enterprise 事件日志注入到 Google Security Operations。解析器会将原始 JSON 格式的事件日志转换为符合 Chronicle UDM 的结构化格式。它会提取相关字段(例如用户详细信息、IP 地址和事件类型),并将这些字段映射到相应的 UDM 字段,以便进行一致的安全分析。

准备工作

  • Google SecOps 实例。
  • Bitwarden 租户的特权访问权限。
  • AWS(S3、IAM、Lambda、EventBridge)的特权访问权限。

获取 Bitwarden API 密钥和网址

  1. 在 Bitwarden 管理控制台中。
  2. 依次前往设置 > 组织信息 > 查看 API 密钥
  3. 复制以下详细信息并将其保存在安全的位置:
    • 客户端 ID (Client ID)
    • 客户端密钥 (Client Secret)
  4. 确定您的 Bitwarden 端点(根据区域):
    • IDENTITY_URL = https://identity.bitwarden.com/connect/token (欧盟:https://identity.bitwarden.eu/connect/token
    • API_BASE = https://api.bitwarden.com (欧盟:https://api.bitwarden.eu

为 Google SecOps 配置 AWS S3 存储桶和 IAM

  1. 按照以下用户指南创建 Amazon S3 存储桶创建存储桶
  2. 保存存储桶名称区域以供日后参考(例如 bitwarden-events)。
  3. 按照以下用户指南创建用户创建 IAM 用户
  4. 选择创建的用户
  5. 选择安全凭据标签页。
  6. 访问密钥部分中,点击创建访问密钥
  7. 选择第三方服务作为使用情形
  8. 点击下一步
  9. 可选:添加说明标记。
  10. 点击创建访问密钥
  11. 点击下载 .csv 文件以保存访问密钥密钥,供日后参考。
  12. 点击完成
  13. 选择权限标签页。
  14. 权限政策部分中,点击添加权限
  15. 选择添加权限
  16. 选择直接附加政策
  17. 搜索 AmazonS3FullAccess 政策。
  18. 选择相应政策。
  19. 点击下一步
  20. 点击添加权限

为 S3 上传配置 IAM 政策和角色

  1. 前往 AWS 控制台 > IAM > 政策 > 创建政策 > JSON 标签页
  2. 复制并粘贴下方的政策。
  3. 政策 JSON(如果您输入了其他存储桶名称,请替换 bitwarden-events):
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "AllowPutBitwardenObjects",
      "Effect": "Allow",
      "Action": "s3:PutObject",
      "Resource": "arn:aws:s3:::bitwarden-events/*"
    },
    {
      "Sid": "AllowGetStateObject",
      "Effect": "Allow",
      "Action": "s3:GetObject",
      "Resource": "arn:aws:s3:::bitwarden-events/bitwarden/events/state.json"
    }
  ]
}

  1. 依次点击下一步 > 创建政策
  2. 依次前往 IAM > 角色 > 创建角色 > AWS 服务 > Lambda
  3. 附加新创建的政策。
  4. 将角色命名为 WriteBitwardenToS3Role,然后点击创建角色

创建 Lambda 函数

  1. AWS 控制台中,依次前往 Lambda > 函数 > 创建函数
  2. 点击从头开始创作
  3. 提供以下配置详细信息:
设置
名称 bitwarden_events_to_s3
运行时 Python 3.13
架构 x86_64
执行角色 WriteBitwardenToS3Role
  1. 创建函数后,打开 Code 标签页,删除桩代码并粘贴以下代码 (bitwarden_events_to_s3.py)。
#!/usr/bin/env python3

import os, json, time, urllib.parse
from urllib.request import Request, urlopen
from urllib.error import HTTPError, URLError
import boto3

IDENTITY_URL = os.environ.get("IDENTITY_URL", "https://identity.bitwarden.com/connect/token")
API_BASE = os.environ.get("API_BASE", "https://api.bitwarden.com").rstrip("/")
CID = os.environ["BW_CLIENT_ID"]          # organization.ClientId
CSECRET = os.environ["BW_CLIENT_SECRET"]  # organization.ClientSecret
BUCKET = os.environ["S3_BUCKET"]
PREFIX = os.environ.get("S3_PREFIX", "bitwarden/events/").strip("/")
STATE_KEY = os.environ.get("STATE_KEY", "bitwarden/events/state.json")
MAX_PAGES = int(os.environ.get("MAX_PAGES", "10"))

HEADERS_FORM = {"Content-Type": "application/x-www-form-urlencoded"}
HEADERS_JSON = {"Accept": "application/json"}

s3 = boto3.client("s3")


def _read_state():
    try:
        obj = s3.get_object(Bucket=BUCKET, Key=STATE_KEY)
        j = json.loads(obj["Body"].read())
        return j.get("continuationToken")
    except Exception:
        return None


def _write_state(token):
    body = json.dumps({"continuationToken": token}).encode("utf-8")
    s3.put_object(Bucket=BUCKET, Key=STATE_KEY, Body=body, ContentType="application/json")


def _http(req: Request, timeout: int = 60, max_retries: int = 5):
    attempt, backoff = 0, 1.0
    while True:
        try:
            with urlopen(req, timeout=timeout) as r:
                return json.loads(r.read().decode("utf-8"))
        except HTTPError as e:
            # Retry on 429 and 5xx
            if (e.code == 429 or 500 <= e.code <= 599) and attempt < max_retries:
                time.sleep(backoff); attempt += 1; backoff *= 2; continue
            raise
        except URLError:
            if attempt < max_retries:
                time.sleep(backoff); attempt += 1; backoff *= 2; continue
            raise


def _get_token():
    body = urllib.parse.urlencode({
        "grant_type": "client_credentials",
        "scope": "api.organization",
        "client_id": CID,
        "client_secret": CSECRET,
    }).encode("utf-8")
    req = Request(IDENTITY_URL, data=body, method="POST", headers=HEADERS_FORM)
    data = _http(req, timeout=30)
    return data["access_token"], int(data.get("expires_in", 3600))


def _fetch_events(bearer: str, cont: str | None):
    params = {}
    if cont:
        params["continuationToken"] = cont
    qs = ("?" + urllib.parse.urlencode(params)) if params else ""
    url = f"{API_BASE}/public/events{qs}"
    req = Request(url, method="GET", headers={"Authorization": f"Bearer {bearer}", **HEADERS_JSON})
    return _http(req, timeout=60)


def _write_events_jsonl(events: list, run_ts_s: int, page_index: int) -> str:
    """
    Write events in JSONL format (one JSON object per line).
    Only writes if there are events to write.
    Returns the S3 key of the written file.
    """
    if not events:
        return None
    
    # Build JSONL content: one event per line
    lines = [json.dumps(event, separators=(",", ":")) for event in events]
    jsonl_content = "\n".join(lines) + "\n"  # JSONL format with trailing newline
    
    # Generate unique filename with page number to avoid conflicts
    key = f"{PREFIX}/{time.strftime('%Y/%m/%d/%H%M%S', time.gmtime(run_ts_s))}-page{page_index:05d}-bitwarden-events.jsonl"
    
    s3.put_object(
        Bucket=BUCKET,
        Key=key,
        Body=jsonl_content.encode("utf-8"),
        ContentType="application/x-ndjson",  # MIME type for JSONL
    )
    return key


def lambda_handler(event=None, context=None):
    bearer, _ttl = _get_token()
    cont = _read_state()
    run_ts_s = int(time.time())

    pages = 0
    total_events = 0
    written_files = []
    
    while pages < MAX_PAGES:
        data = _fetch_events(bearer, cont)
        
        # Extract events array from API response
        # API returns: {"object":"list", "data":[...], "continuationToken":"..."}
        events = data.get("data", [])
        
        # Only write file if there are events
        if events:
            s3_key = _write_events_jsonl(events, run_ts_s, pages)
            if s3_key:
                written_files.append(s3_key)
                total_events += len(events)
        
        pages += 1
        
        # Check for next page token
        next_cont = data.get("continuationToken")
        if next_cont:
            cont = next_cont
            continue
        else:
            # No more pages
            break
    
    # Save state only if there are more pages to continue in next run
    # If we hit MAX_PAGES and there's still a continuation token, save it
    # Otherwise, clear the state (set to None)
    _write_state(cont if pages >= MAX_PAGES and cont else None)
    
    return {
        "ok": True,
        "pages": pages,
        "total_events": total_events,
        "files_written": len(written_files),
        "nextContinuationToken": cont if pages >= MAX_PAGES else None
    }




if __name__ == "__main__":
    print(lambda_handler())
  1. 依次前往配置 > 环境变量 > 修改 > 添加新的环境变量
  2. 输入以下环境变量,并将其替换为您的值。

环境变量

示例
S3_BUCKET bitwarden-events
S3_PREFIX bitwarden/events/
STATE_KEY bitwarden/events/state.json
BW_CLIENT_ID <organization client_id>
BW_CLIENT_SECRET <organization client_secret>
IDENTITY_URL https://identity.bitwarden.com/connect/token (欧盟:https://identity.bitwarden.eu/connect/token
API_BASE https://api.bitwarden.com (欧盟:https://api.bitwarden.eu
MAX_PAGES 10
  1. 创建函数后,请停留在其页面上(或依次打开 Lambda > 函数 > your‑function)。
  2. 选择配置标签页。
  3. 常规配置面板中,点击修改
  4. 超时更改为 5 分钟(300 秒),然后点击保存

创建 EventBridge 计划

  1. 依次前往 Amazon EventBridge > 调度器 > 创建调度
  2. 提供以下配置详细信息:
    • 周期性安排费率 (1 hour)。
    • 目标:您的 Lambda 函数。
    • 名称bitwarden-events-1h
  3. 点击创建时间表

(可选)为 Google SecOps 创建只读 IAM 用户和密钥

  1. 依次前往 AWS 控制台> IAM > 用户 > 添加用户
  2. 点击 Add users(添加用户)。
  3. 提供以下配置详细信息:
    • 用户:输入 secops-reader
    • 访问类型:选择访问密钥 - 以程序化方式访问
  4. 点击创建用户
  5. 附加最低限度的读取政策(自定义):依次选择用户 > secops-reader > 权限 > 添加权限 > 直接附加政策 > 创建政策
  6. JSON:
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": ["s3:GetObject"],
      "Resource": "arn:aws:s3:::<your-bucket>/*"
    },
    {
      "Effect": "Allow",
      "Action": ["s3:ListBucket"],
      "Resource": "arn:aws:s3:::<your-bucket>"
    }
  ]
}
  1. 名称 = secops-reader-policy
  2. 依次点击创建政策 > 搜索/选择 > 下一步 > 添加权限
  3. secops-reader 创建访问密钥:安全凭据 > 访问密钥 > 创建访问密钥 > 下载 .csv(您会将这些值粘贴到 Feed 中)

在 Google SecOps 中配置 Feed 以注入 Bitwarden Enterprise 事件日志

  1. 依次前往 SIEM 设置> Feed
  2. 点击 + 添加新 Feed
  3. Feed 名称字段中,输入 Feed 的名称(例如 Bitwarden Events)。
  4. 选择 Amazon S3 V2 作为来源类型
  5. 选择 Bitwarden 事件作为日志类型
  6. 点击下一步
  7. 为以下输入参数指定值:
    • S3 URIs3://bitwarden-events/bitwarden/events/
    • 源删除选项:根据您的偏好选择删除选项。
    • 文件最长保留时间:默认值为 180 天。
    • 访问密钥 ID:有权访问 S3 存储桶的用户访问密钥。
    • 私有访问密钥:具有 S3 存储桶访问权限的用户私有密钥。
    • 资产命名空间资产命名空间
    • 注入标签:应用于此 Feed 中事件的标签。
  8. 点击下一步
  9. 最终确定界面中查看新的 Feed 配置,然后点击提交

UDM 映射表

日志字段 UDM 映射 逻辑
actingUserId target.user.userid 如果 enriched.actingUser.userId 为空或 null,则此字段用于填充 target.user.userid 字段。
collectionID security_result.detection_fields.key security_result 中填充 detection_fields 内的 key 字段。
collectionID security_result.detection_fields.value security_result 中填充 detection_fields 内的 value 字段。
日期 metadata.event_timestamp 解析并转换为时间戳格式,然后映射到 event_timestamp
enriched.actingUser.accessAll security_result.rule_labels.key security_resultrule_labels 内的值设置为“Access_All”。
enriched.actingUser.accessAll security_result.rule_labels.value 使用从 enriched.actingUser.accessAll 转换而来的字符串值填充 security_resultrule_labels 内的 value 字段。
enriched.actingUser.email target.user.email_addresses 填充 target.user 内的 email_addresses 字段。
enriched.actingUser.id metadata.product_log_id 填充 metadata 内的 product_log_id 字段。
enriched.actingUser.id target.labels.key 将值设置为 target.labels 中的“ID”。
enriched.actingUser.id target.labels.value 使用 enriched.actingUser.id 中的值填充 target.labels 中的 value 字段。
enriched.actingUser.name target.user.user_display_name 填充 target.user 内的 user_display_name 字段。
enriched.actingUser.object target.labels.key target.labels 中的值设置为“对象”。
enriched.actingUser.object target.labels.value 使用 enriched.actingUser.object 中的值填充 target.labels 中的 value 字段。
enriched.actingUser.resetPasswordEnrolled target.labels.key target.labels 内的值设置为“ResetPasswordEnrolled”。
enriched.actingUser.resetPasswordEnrolled target.labels.value 使用 enriched.actingUser.resetPasswordEnrolled 中的值(转换为字符串)填充 target.labels 中的 value 字段。
enriched.actingUser.twoFactorEnabled security_result.rule_labels.key security_resultrule_labels 内的值设置为“Two Factor Enabled”。
enriched.actingUser.twoFactorEnabled security_result.rule_labels.value 使用从 enriched.actingUser.twoFactorEnabled 转换而来的字符串值填充 security_resultrule_labels 内的 value 字段。
enriched.actingUser.userId target.user.userid 填充 target.user 内的 userid 字段。
enriched.collection.id additional.fields.key 将值设置为 additional.fields 中的“集合 ID”。
enriched.collection.id additional.fields.value.string_value 使用 enriched.collection.id 中的值填充 additional.fields 中的 string_value 字段。
enriched.collection.object additional.fields.key additional.fields 中的值设置为“集合对象”。
enriched.collection.object additional.fields.value.string_value 使用 enriched.collection.object 中的值填充 additional.fields 中的 string_value 字段。
enriched.type metadata.product_event_type 填充 metadata 内的 product_event_type 字段。
groupId target.user.group_identifiers 将值添加到 target.user 中的 group_identifiers 数组。
ipAddress principal.ip 从字段中提取 IP 地址并映射到 principal.ip
不适用 extensions.auth 解析器会创建一个空对象。
不适用 metadata.event_type 根据 enriched.type 以及是否存在 principaltarget 信息来确定。可能的值:USER_LOGIN、STATUS_UPDATE、GENERIC_EVENT。
不适用 security_result.action 根据 enriched.type 确定。可能的值:ALLOW、BLOCK。
对象 additional.fields.key additional.fields 中的值设置为“对象”。
对象 additional.fields.value 使用 object 中的值填充 additional.fields 中的 value 字段。

需要更多帮助?获得社区成员和 Google SecOps 专业人士的解答。