收集 Bitwarden Enterprise 事件記錄
支援的國家/地區:
Google SecOps
SIEM
本指南說明如何使用 Amazon S3,將 Bitwarden Enterprise 事件記錄檔擷取至 Google Security Operations。剖析器會將原始 JSON 格式的事件記錄轉換為符合 Chronicle UDM 的結構化格式。這項工具會擷取相關欄位 (例如使用者詳細資料、IP 位址和事件類型),並將這些欄位對應至相應的 UDM 欄位,以進行一致的安全性分析。
事前準備
- Google SecOps 執行個體。
- Bitwarden 租戶的特殊存取權。
- AWS (S3、IAM、Lambda、EventBridge) 的特殊存取權。
取得 Bitwarden API 金鑰和網址
- 在 Bitwarden 管理控制台中。
- 依序前往「設定」>「機構資訊」>「查看 API 金鑰」。
- 複製並將下列詳細資料儲存在安全位置:
- 用戶端 ID
- 用戶端密碼
- 判斷 Bitwarden 端點 (依據區域):
- IDENTITY_URL =
https://identity.bitwarden.com/connect/token(歐盟:https://identity.bitwarden.eu/connect/token) - API_BASE =
https://api.bitwarden.com(EU:https://api.bitwarden.eu)
- IDENTITY_URL =
為 Google SecOps 設定 AWS S3 值區和 IAM
- 按照這份使用者指南建立 Amazon S3 bucket:建立 bucket
- 儲存 bucket 的「名稱」和「地區」,以供日後參考 (例如
bitwarden-events)。 - 請按照這份使用者指南建立使用者:建立 IAM 使用者。
- 選取建立的「使用者」。
- 選取「安全憑證」分頁標籤。
- 在「Access Keys」部分中,按一下「Create Access Key」。
- 選取「第三方服務」做為「用途」。
- 點選「下一步」。
- 選用:新增說明標記。
- 按一下「建立存取金鑰」。
- 按一下「Download .csv file」(下載 .csv 檔案),儲存「Access Key」(存取金鑰) 和「Secret Access Key」(私密存取金鑰),以供日後參考。
- 按一下 [完成]。
- 選取「權限」分頁標籤。
- 在「權限政策」部分中,按一下「新增權限」。
- 選取「新增權限」。
- 選取「直接附加政策」。
- 搜尋「AmazonS3FullAccess」AmazonS3FullAccess政策。
- 選取政策。
- 點選「下一步」。
- 按一下「Add permissions」。
設定 S3 上傳的身分與存取權管理政策和角色
- 前往 AWS 主控台 > IAM > 政策 > 建立政策 > JSON 分頁。
- 複製並貼上下列政策。
- 政策 JSON (如果您輸入的 bucket 名稱不同,請替換
bitwarden-events):
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AllowPutBitwardenObjects",
"Effect": "Allow",
"Action": "s3:PutObject",
"Resource": "arn:aws:s3:::bitwarden-events/*"
},
{
"Sid": "AllowGetStateObject",
"Effect": "Allow",
"Action": "s3:GetObject",
"Resource": "arn:aws:s3:::bitwarden-events/bitwarden/events/state.json"
}
]
}
- 依序點選「下一步」>「建立政策」。
- 依序前往「IAM」>「Roles」>「Create role」>「AWS service」>「Lambda」。
- 附加新建立的政策。
- 為角色命名
WriteBitwardenToS3Role,然後按一下「建立角色」。
建立 Lambda 函式
- 在 AWS 控制台中,依序前往「Lambda」>「Functions」>「Create function」。
- 按一下「從頭開始撰寫」。
- 請提供下列設定詳細資料:
| 設定 | 值 |
|---|---|
| 名稱 | bitwarden_events_to_s3 |
| 執行階段 | Python 3.13 |
| 架構 | x86_64 |
| 執行角色 | WriteBitwardenToS3Role |
- 建立函式後,開啟「程式碼」分頁,刪除存根並貼上下列程式碼 (
bitwarden_events_to_s3.py)。
#!/usr/bin/env python3
import os, json, time, urllib.parse
from urllib.request import Request, urlopen
from urllib.error import HTTPError, URLError
import boto3
IDENTITY_URL = os.environ.get("IDENTITY_URL", "https://identity.bitwarden.com/connect/token")
API_BASE = os.environ.get("API_BASE", "https://api.bitwarden.com").rstrip("/")
CID = os.environ["BW_CLIENT_ID"] # organization.ClientId
CSECRET = os.environ["BW_CLIENT_SECRET"] # organization.ClientSecret
BUCKET = os.environ["S3_BUCKET"]
PREFIX = os.environ.get("S3_PREFIX", "bitwarden/events/").strip("/")
STATE_KEY = os.environ.get("STATE_KEY", "bitwarden/events/state.json")
MAX_PAGES = int(os.environ.get("MAX_PAGES", "10"))
HEADERS_FORM = {"Content-Type": "application/x-www-form-urlencoded"}
HEADERS_JSON = {"Accept": "application/json"}
s3 = boto3.client("s3")
def _read_state():
try:
obj = s3.get_object(Bucket=BUCKET, Key=STATE_KEY)
j = json.loads(obj["Body"].read())
return j.get("continuationToken")
except Exception:
return None
def _write_state(token):
body = json.dumps({"continuationToken": token}).encode("utf-8")
s3.put_object(Bucket=BUCKET, Key=STATE_KEY, Body=body, ContentType="application/json")
def _http(req: Request, timeout: int = 60, max_retries: int = 5):
attempt, backoff = 0, 1.0
while True:
try:
with urlopen(req, timeout=timeout) as r:
return json.loads(r.read().decode("utf-8"))
except HTTPError as e:
# Retry on 429 and 5xx
if (e.code == 429 or 500 <= e.code <= 599) and attempt < max_retries:
time.sleep(backoff); attempt += 1; backoff *= 2; continue
raise
except URLError:
if attempt < max_retries:
time.sleep(backoff); attempt += 1; backoff *= 2; continue
raise
def _get_token():
body = urllib.parse.urlencode({
"grant_type": "client_credentials",
"scope": "api.organization",
"client_id": CID,
"client_secret": CSECRET,
}).encode("utf-8")
req = Request(IDENTITY_URL, data=body, method="POST", headers=HEADERS_FORM)
data = _http(req, timeout=30)
return data["access_token"], int(data.get("expires_in", 3600))
def _fetch_events(bearer: str, cont: str | None):
params = {}
if cont:
params["continuationToken"] = cont
qs = ("?" + urllib.parse.urlencode(params)) if params else ""
url = f"{API_BASE}/public/events{qs}"
req = Request(url, method="GET", headers={"Authorization": f"Bearer {bearer}", **HEADERS_JSON})
return _http(req, timeout=60)
def _write_events_jsonl(events: list, run_ts_s: int, page_index: int) -> str:
"""
Write events in JSONL format (one JSON object per line).
Only writes if there are events to write.
Returns the S3 key of the written file.
"""
if not events:
return None
# Build JSONL content: one event per line
lines = [json.dumps(event, separators=(",", ":")) for event in events]
jsonl_content = "\n".join(lines) + "\n" # JSONL format with trailing newline
# Generate unique filename with page number to avoid conflicts
key = f"{PREFIX}/{time.strftime('%Y/%m/%d/%H%M%S', time.gmtime(run_ts_s))}-page{page_index:05d}-bitwarden-events.jsonl"
s3.put_object(
Bucket=BUCKET,
Key=key,
Body=jsonl_content.encode("utf-8"),
ContentType="application/x-ndjson", # MIME type for JSONL
)
return key
def lambda_handler(event=None, context=None):
bearer, _ttl = _get_token()
cont = _read_state()
run_ts_s = int(time.time())
pages = 0
total_events = 0
written_files = []
while pages < MAX_PAGES:
data = _fetch_events(bearer, cont)
# Extract events array from API response
# API returns: {"object":"list", "data":[...], "continuationToken":"..."}
events = data.get("data", [])
# Only write file if there are events
if events:
s3_key = _write_events_jsonl(events, run_ts_s, pages)
if s3_key:
written_files.append(s3_key)
total_events += len(events)
pages += 1
# Check for next page token
next_cont = data.get("continuationToken")
if next_cont:
cont = next_cont
continue
else:
# No more pages
break
# Save state only if there are more pages to continue in next run
# If we hit MAX_PAGES and there's still a continuation token, save it
# Otherwise, clear the state (set to None)
_write_state(cont if pages >= MAX_PAGES and cont else None)
return {
"ok": True,
"pages": pages,
"total_events": total_events,
"files_written": len(written_files),
"nextContinuationToken": cont if pages >= MAX_PAGES else None
}
if __name__ == "__main__":
print(lambda_handler())
- 依序前往「Configuration」>「Environment variables」>「Edit」>「Add new environment variable」。
- 輸入下列環境變數,並將 換成您的值。
環境變數
| 鍵 | 範例 |
|---|---|
S3_BUCKET |
bitwarden-events |
S3_PREFIX |
bitwarden/events/ |
STATE_KEY |
bitwarden/events/state.json |
BW_CLIENT_ID |
<organization client_id> |
BW_CLIENT_SECRET |
<organization client_secret> |
IDENTITY_URL |
https://identity.bitwarden.com/connect/token (歐盟:https://identity.bitwarden.eu/connect/token) |
API_BASE |
https://api.bitwarden.com (歐盟:https://api.bitwarden.eu) |
MAX_PAGES |
10 |
- 建立函式後,請留在函式頁面 (或依序開啟「Lambda」>「Functions」>「your‑function」)。
- 選取「設定」分頁標籤。
- 在「一般設定」面板中,按一下「編輯」。
- 將「Timeout」(逾時間隔) 變更為「5 minutes (300 seconds)」(5 分鐘 (300 秒)),然後按一下「Save」(儲存)。
建立 EventBridge 排程
- 依序前往「Amazon EventBridge」>「Scheduler」>「Create schedule」。
- 請提供下列設定詳細資料:
- 週期性時間表:費率 (
1 hour)。 - 目標:您的 Lambda 函式。
- 名稱:
bitwarden-events-1h。
- 週期性時間表:費率 (
- 按一下「建立時間表」。
(選用) 為 Google SecOps 建立唯讀 IAM 使用者和金鑰
- 依序前往 AWS 管理中心 > IAM > 使用者 > 新增使用者。
- 點選 [Add users] (新增使用者)。
- 請提供下列設定詳細資料:
- 使用者:輸入
secops-reader。 - 存取類型:選取「存取金鑰 - 程式輔助存取」。
- 使用者:輸入
- 按一下「建立使用者」。
- 附加最低讀取權限政策 (自訂):依序選取「Users」(使用者) >「secops-reader」>「Permissions」(權限) >「Add permissions」(新增權限) >「Attach policies directly」(直接附加政策) >「Create policy」(建立政策)。
- JSON:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": ["s3:GetObject"],
"Resource": "arn:aws:s3:::<your-bucket>/*"
},
{
"Effect": "Allow",
"Action": ["s3:ListBucket"],
"Resource": "arn:aws:s3:::<your-bucket>"
}
]
}
- Name =
secops-reader-policy。 - 依序點選「建立政策」> 搜尋/選取 >「下一步」>「新增權限」。
- 為
secops-reader建立存取金鑰:依序點選「安全憑證」>「存取金鑰」>「建立存取金鑰」> 下載.csv(你會將這些值貼到動態饋給中)。
在 Google SecOps 中設定動態饋給,以便擷取 Bitwarden Enterprise 事件記錄
- 依序前往「SIEM 設定」>「動態饋給」。
- 按一下「+ 新增動態消息」。
- 在「動態饋給名稱」欄位中輸入動態饋給名稱 (例如
Bitwarden Events)。 - 選取「Amazon S3 V2」做為「來源類型」。
- 選取「Bitwarden 事件」做為「記錄類型」。
- 點選「下一步」。
- 指定下列輸入參數的值:
- S3 URI:
s3://bitwarden-events/bitwarden/events/ - 來源刪除選項:根據偏好設定選取刪除選項。
- 檔案最長效期:預設為 180 天。
- 存取金鑰 ID:具有 S3 值區存取權的使用者存取金鑰。
- 存取密鑰:具有 S3 bucket 存取權的使用者私密金鑰。
- 資產命名空間:資產命名空間。
- 擷取標籤:套用至這個動態饋給事件的標籤。
- S3 URI:
- 點選「下一步」。
- 在「Finalize」(完成) 畫面中檢查新的動態饋給設定,然後按一下「Submit」(提交)。
UDM 對應表
| 記錄欄位 | UDM 對應 | 邏輯 |
|---|---|---|
| actingUserId | target.user.userid | 如果 enriched.actingUser.userId 為空或空值,系統會使用這個欄位填入 target.user.userid 欄位。 |
| collectionID | security_result.detection_fields.key | 在 security_result 中填入 detection_fields 內的 key 欄位。 |
| collectionID | security_result.detection_fields.value | 在 security_result 中填入 detection_fields 內的 value 欄位。 |
| 日期 | metadata.event_timestamp | 系統會剖析並轉換為時間戳記格式,然後對應至 event_timestamp。 |
| enriched.actingUser.accessAll | security_result.rule_labels.key | 在 security_result 的 rule_labels 中,將值設為「Access_All」。 |
| enriched.actingUser.accessAll | security_result.rule_labels.value | 在 security_result 的 rule_labels 中,以從 enriched.actingUser.accessAll 轉換為字串的值填入 value 欄位。 |
| enriched.actingUser.email | target.user.email_addresses | 填入 target.user 內的 email_addresses 欄位。 |
| enriched.actingUser.id | metadata.product_log_id | 填入 metadata 內的 product_log_id 欄位。 |
| enriched.actingUser.id | target.labels.key | 將值設為 target.labels 內的「ID」。 |
| enriched.actingUser.id | target.labels.value | 在 target.labels 中填入 value 欄位,值來自 enriched.actingUser.id。 |
| enriched.actingUser.name | target.user.user_display_name | 填入 target.user 內的 user_display_name 欄位。 |
| enriched.actingUser.object | target.labels.key | 在 target.labels 中將值設為「Object」。 |
| enriched.actingUser.object | target.labels.value | 在 target.labels 中填入 value 欄位,值來自 enriched.actingUser.object。 |
| enriched.actingUser.resetPasswordEnrolled | target.labels.key | 在 target.labels 中將值設為「ResetPasswordEnrolled」。 |
| enriched.actingUser.resetPasswordEnrolled | target.labels.value | 在 target.labels 中填入 value 欄位,並將 enriched.actingUser.resetPasswordEnrolled 中的值轉換為字串。 |
| enriched.actingUser.twoFactorEnabled | security_result.rule_labels.key | 在 security_result 的 rule_labels 中,將值設為「Two Factor Enabled」。 |
| enriched.actingUser.twoFactorEnabled | security_result.rule_labels.value | 在 security_result 的 rule_labels 中,以從 enriched.actingUser.twoFactorEnabled 轉換為字串的值填入 value 欄位。 |
| enriched.actingUser.userId | target.user.userid | 填入 target.user 內的 userid 欄位。 |
| enriched.collection.id | additional.fields.key | 將值設為 additional.fields 內的「集合 ID」。 |
| enriched.collection.id | additional.fields.value.string_value | 在 additional.fields 中填入 string_value 欄位,值來自 enriched.collection.id。 |
| enriched.collection.object | additional.fields.key | 在 additional.fields 中將值設為「Collection Object」。 |
| enriched.collection.object | additional.fields.value.string_value | 在 additional.fields 中填入 string_value 欄位,值來自 enriched.collection.object。 |
| enriched.type | metadata.product_event_type | 填入 metadata 內的 product_event_type 欄位。 |
| groupId | target.user.group_identifiers | 將值新增至 target.user 中的 group_identifiers 陣列。 |
| ipAddress | principal.ip | 從欄位擷取 IP 位址,並對應至 principal.ip。 |
| 不適用 | extensions.auth | 剖析器會建立空物件。 |
| 不適用 | metadata.event_type | 系統會根據 enriched.type,以及 principal 和 target 資訊是否存在,可能的值:USER_LOGIN、STATUS_UPDATE、GENERIC_EVENT。 |
| 不適用 | security_result.action | 系統會根據 enriched.type判斷。可能的值:ALLOW、BLOCK。 |
| 物件 | additional.fields.key | 在 additional.fields 中將值設為「Object」。 |
| 物件 | additional.fields.value | 在 additional.fields 中填入 value 欄位,值來自 object。 |
還有其他問題嗎?向社群成員和 Google SecOps 專業人員尋求答案。