收集 TeamViewer 日志
本文档介绍了如何使用 Amazon S3 将 TeamViewer 日志注入到 Google Security Operations。解析器从 JSON 格式的日志中提取审核事件。它会遍历活动详情,将特定属性映射到统一数据模型 (UDM) 字段,处理参与者和演示者信息,并根据用户活动对活动进行分类。解析器还会执行数据转换,例如合并标签和将时间戳转换为标准化格式。
准备工作
请确保满足以下前提条件:
- Google SecOps 实例。
- 对 TeamViewer 的特权访问权限。
- 对 AWS(S3、Identity and Access Management (IAM)、Lambda、EventBridge)的特权访问权限。
获取 TeamViewer 前提条件
- 以管理员身份登录 TeamViewer 管理控制台。
- 依次前往我的个人资料 > 应用。
- 点击创建应用。
- 提供以下配置详细信息:
- 应用名称:输入一个描述性名称(例如
Google SecOps Integration
)。 - 说明:输入应用的说明。
- 权限:选择审核日志访问权限。
- 应用名称:输入一个描述性名称(例如
- 点击创建,并将生成的 API 凭据保存到安全的位置。
- 记录您的 TeamViewer API 基本网址(例如
https://webapi.teamviewer.com/api/v1
)。 - 复制以下详细信息并将其保存在安全的位置:
- CLIENT_ID
- CLIENT_SECRET
- API_BASE_URL
为 Google SecOps 配置 AWS S3 存储桶和 IAM
- 按照以下用户指南创建 Amazon S3 存储桶:创建存储桶
- 保存存储桶名称和区域以供日后参考(例如
teamviewer-logs
)。 - 按照以下用户指南创建用户:创建 IAM 用户。
- 选择创建的用户。
- 选择安全凭据标签页。
- 在访问密钥部分中,点击创建访问密钥。
- 选择第三方服务作为使用情形。
- 点击下一步。
- 可选:添加说明标记。
- 点击创建访问密钥。
- 点击下载 CSV 文件,保存访问密钥和秘密访问密钥以供日后参考。
- 点击完成。
- 选择权限标签页。
- 在权限政策部分中,点击添加权限。
- 选择添加权限。
- 选择直接附加政策。
- 搜索 AmazonS3FullAccess 政策。
- 选择相应政策。
- 点击下一步。
- 点击添加权限。
为 S3 上传配置 IAM 政策和角色
- 在 AWS 控制台中,依次前往 IAM > 政策。
- 依次点击创建政策 > JSON 标签页。
- 复制并粘贴以下政策。
政策 JSON(如果您输入了其他存储桶名称,请替换
teamviewer-logs
):{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::teamviewer-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::teamviewer-logs/teamviewer/audit/state.json" } ] }
依次点击下一步 > 创建政策。
依次前往 IAM > 角色 > 创建角色 > AWS 服务 > Lambda。
附加新创建的政策。
将角色命名为
TeamViewerToS3Role
,然后点击创建角色。
创建 Lambda 函数
- 在 AWS 控制台中,依次前往 Lambda > 函数 > 创建函数。
- 点击从头开始创作。
提供以下配置详细信息:
设置 值 名称 teamviewer_to_s3
运行时 Python 3.13 架构 x86_64 执行角色 TeamViewerToS3Role
创建函数后,打开 Code 标签页,删除桩代码并粘贴以下代码 (
teamviewer_to_s3.py
)。#!/usr/bin/env python3 # Lambda: Pull TeamViewer audit logs and store raw JSON payloads to S3 # - Time window via {FROM}/{TO} placeholders (UTC ISO8601), URL-encoded. # - Preserves vendor-native JSON format for audit and session data. # - Retries with exponential backoff; unique S3 keys to avoid overwrites. import os, json, time, uuid, urllib.parse from urllib.request import Request, urlopen from urllib.error import URLError, HTTPError import boto3 S3_BUCKET = os.environ["S3_BUCKET"] S3_PREFIX = os.environ.get("S3_PREFIX", "teamviewer/audit/") STATE_KEY = os.environ.get("STATE_KEY", "teamviewer/audit/state.json") WINDOW_SEC = int(os.environ.get("WINDOW_SECONDS", "3600")) # default 1h HTTP_TIMEOUT= int(os.environ.get("HTTP_TIMEOUT", "60")) API_BASE_URL = os.environ["API_BASE_URL"] CLIENT_ID = os.environ["CLIENT_ID"] CLIENT_SECRET = os.environ["CLIENT_SECRET"] MAX_RETRIES = int(os.environ.get("MAX_RETRIES", "3")) USER_AGENT = os.environ.get("USER_AGENT", "teamviewer-to-s3/1.0") s3 = boto3.client("s3") def _load_state(): try: obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY) return json.loads(obj["Body"].read()) except Exception: return {} def _save_state(st): s3.put_object( Bucket=S3_BUCKET, Key=STATE_KEY, Body=json.dumps(st, separators=(",", ":")).encode("utf-8"), ContentType="application/json", ) def _iso(ts: float) -> str: return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(ts)) def _get_access_token() -> str: # OAuth2 Client Credentials flow for TeamViewer API token_url = f"{API_BASE_URL.rstrip('/')}/oauth2/token" data = urllib.parse.urlencode({ 'grant_type': 'client_credentials', 'client_id': CLIENT_ID, 'client_secret': CLIENT_SECRET }).encode('utf-8') req = Request(token_url, data=data, method="POST") req.add_header("Content-Type", "application/x-www-form-urlencoded") req.add_header("User-Agent", USER_AGENT) with urlopen(req, timeout=HTTP_TIMEOUT) as r: response = json.loads(r.read()) return response["access_token"] def _build_audit_url(from_ts: float, to_ts: float, access_token: str) -> str: # Build URL for TeamViewer audit API endpoint base_endpoint = f"{API_BASE_URL.rstrip('/')}/reports/connections" params = { "from_date": _iso(from_ts), "to_date": _iso(to_ts) } query_string = urllib.parse.urlencode(params) return f"{base_endpoint}?{query_string}" def _fetch_audit_data(url: str, access_token: str) -> tuple[bytes, str]: attempt = 0 while True: req = Request(url, method="GET") req.add_header("User-Agent", USER_AGENT) req.add_header("Authorization", f"Bearer {access_token}") req.add_header("Accept", "application/json") try: with urlopen(req, timeout=HTTP_TIMEOUT) as r: return r.read(), (r.headers.get("Content-Type") or "application/json") except (HTTPError, URLError) as e: attempt += 1 print(f"HTTP error on attempt {attempt}: {e}") if attempt > MAX_RETRIES: raise # exponential backoff with jitter time.sleep(min(60, 2 ** attempt) + (time.time() % 1)) def _put_audit_data(blob: bytes, content_type: str, from_ts: float, to_ts: float) -> str: # Create unique S3 key for audit data ts_path = time.strftime("%Y/%m/%d", time.gmtime(to_ts)) uniq = f"{int(time.time()*1e6)}_{uuid.uuid4().hex[:8]}" key = f"{S3_PREFIX}{ts_path}/teamviewer_audit_{int(from_ts)}_{int(to_ts)}_{uniq}.json" s3.put_object( Bucket=S3_BUCKET, Key=key, Body=blob, ContentType=content_type, Metadata={ 'source': 'teamviewer-audit', 'from_timestamp': str(int(from_ts)), 'to_timestamp': str(int(to_ts)) } ) return key def lambda_handler(event=None, context=None): st = _load_state() now = time.time() from_ts = float(st.get("last_to_ts") or (now - WINDOW_SEC)) to_ts = now # Get OAuth2 access token access_token = _get_access_token() url = _build_audit_url(from_ts, to_ts, access_token) print(f"Fetching TeamViewer audit data from: {url}") blob, ctype = _fetch_audit_data(url, access_token) # Validate that we received valid JSON data try: audit_data = json.loads(blob) print(f"Successfully retrieved {len(audit_data.get('records', []))} audit records") except json.JSONDecodeError as e: print(f"Warning: Invalid JSON received: {e}") key = _put_audit_data(blob, ctype, from_ts, to_ts) st["last_to_ts"] = to_ts st["last_successful_run"] = now _save_state(st) return { "statusCode": 200, "body": { "success": True, "s3_key": key, "content_type": ctype, "from_timestamp": from_ts, "to_timestamp": to_ts } } if __name__ == "__main__": print(lambda_handler())
依次前往配置 > 环境变量。
依次点击修改 > 添加新的环境变量。
输入下表中提供的环境变量,并将示例值替换为您的值。
环境变量
键 示例值 S3_BUCKET
teamviewer-logs
S3_PREFIX
teamviewer/audit/
STATE_KEY
teamviewer/audit/state.json
WINDOW_SECONDS
3600
HTTP_TIMEOUT
60
MAX_RETRIES
3
USER_AGENT
teamviewer-to-s3/1.0
API_BASE_URL
https://webapi.teamviewer.com/api/v1
CLIENT_ID
your-client-id
(来自第 2 步)CLIENT_SECRET
your-client-secret
(来自第 2 步)创建函数后,请停留在其页面上(或依次打开 Lambda > 函数 > 您的函数)。
选择配置标签页。
在常规配置面板中,点击修改。
将超时更改为 5 分钟(300 秒),然后点击保存。
创建 EventBridge 计划
- 依次前往 Amazon EventBridge > 调度器 > 创建调度。
- 提供以下配置详细信息:
- 周期性安排:费率 (
1 hour
)。 - 目标:您的 Lambda 函数
teamviewer_to_s3
。 - 名称:
teamviewer-audit-1h
。
- 周期性安排:费率 (
- 点击创建时间表。
(可选)为 Google SecOps 创建只读 IAM 用户和密钥
- 依次前往 AWS 控制台 > IAM > 用户 > 添加用户。
- 点击 Add users(添加用户)。
- 提供以下配置详细信息:
- 用户:输入
secops-reader
。 - 访问类型:选择访问密钥 - 以程序化方式访问。
- 用户:输入
- 点击创建用户。
- 附加最低限度的读取政策(自定义):依次选择用户 > secops-reader > 权限 > 添加权限 > 直接附加政策 > 创建政策。
JSON:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::teamviewer-logs/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::teamviewer-logs" } ] }
名称 =
secops-reader-policy
。依次点击创建政策 > 搜索/选择 > 下一步 > 添加权限。
为
secops-reader
创建访问密钥:安全凭据 > 访问密钥。点击创建访问密钥。
下载
CSV
。(您需要将这些值粘贴到 Feed 中)。
在 Google SecOps 中配置 Feed 以注入 TeamViewer 日志
- 依次前往 SIEM 设置> Feed。
- 点击 + 添加新 Feed。
- 在Feed 名称字段中,输入 Feed 的名称(例如
TeamViewer logs
)。 - 选择 Amazon S3 V2 作为来源类型。
- 选择 TeamViewer 作为日志类型。
- 点击下一步。
- 为以下输入参数指定值:
- S3 URI:
s3://teamviewer-logs/teamviewer/audit/
- 来源删除选项:根据您的偏好选择删除选项。
- 文件存在时间上限:包含在过去指定天数内修改的文件。默认值为 180 天。
- 访问密钥 ID:有权访问 S3 存储桶的用户访问密钥。
- 私有访问密钥:具有 S3 存储桶访问权限的用户私有密钥。
- 资产命名空间:资产命名空间。
- 注入标签:应用于此 Feed 中事件的标签。
- S3 URI:
- 点击下一步。
- 在最终确定界面中查看新的 Feed 配置,然后点击提交。
UDM 映射表
日志字段 | UDM 映射 | 逻辑 |
---|---|---|
AffectedItem |
metadata.product_log_id |
原始日志中的 AffectedItem 值直接映射到此 UDM 字段。 |
EventDetails.NewValue |
principal.resource.attribute.labels.value |
如果 PropertyName 包含 (server) ,则 NewValue 用作 principal.resource.attribute.labels 中标签的值。 |
EventDetails.NewValue |
principal.user.user_display_name |
如果 PropertyName 为 Name of participant ,则 NewValue 用作相应主账号的用户显示名称。 |
EventDetails.NewValue |
principal.user.userid |
如果 PropertyName 为 ID of participant ,则 NewValue 用作主账号的用户 ID。 |
EventDetails.NewValue |
security_result.about.labels.value |
对于所有其他 PropertyName 值(由特定条件处理的值除外),NewValue 用作 security_result.about.labels 数组中标签的值。 |
EventDetails.NewValue |
target.file.full_path |
如果 PropertyName 为 Source file ,则 NewValue 用作目标文件的完整路径。 |
EventDetails.NewValue |
target.resource.attribute.labels.value |
如果 PropertyName 包含 (client) ,则 NewValue 用作 target.resource.attribute.labels 中标签的值。 |
EventDetails.NewValue |
target.user.user_display_name |
如果 PropertyName 为 Name of presenter ,则会解析 NewValue 。如果它是整数,则会被舍弃。否则,它将用作目标的显示名。 |
EventDetails.NewValue |
target.user.userid |
如果 PropertyName 为 ID of presenter ,则 NewValue 用作目标的相应用户 ID。 |
EventDetails.PropertyName |
principal.resource.attribute.labels.key |
如果 PropertyName 包含 (server) ,则 PropertyName 用作 principal.resource.attribute.labels 中标签的键。 |
EventDetails.PropertyName |
security_result.about.labels.key |
对于所有其他 PropertyName 值(由特定条件处理的值除外),PropertyName 用作 security_result.about.labels 数组中标签的键。 |
EventDetails.PropertyName |
target.resource.attribute.labels.key |
如果 PropertyName 包含 (client) ,则 PropertyName 用作 target.resource.attribute.labels 中标签的键。 |
EventName |
metadata.product_event_type |
原始日志中的 EventName 值直接映射到此 UDM 字段。 |
Timestamp |
metadata.event_timestamp |
系统会解析原始日志中的 Timestamp 值,并将其用作元数据中的事件时间戳。如果 src_user (派生自 ID of participant )不为空,则设置为 USER_UNCATEGORIZED ;否则设置为 USER_RESOURCE_ACCESS 。硬编码为 TEAMVIEWER 。硬编码为 TEAMVIEWER 。硬编码为 TEAMVIEWER 。 |
需要更多帮助?从社区成员和 Google SecOps 专业人士那里获得解答。