收集 TeamViewer 日志
本文档介绍了如何使用 Google Cloud Storage 将 TeamViewer 日志提取到 Google Security Operations。解析器从 JSON 格式的日志中提取审核事件。它会遍历活动详情,将特定属性映射到统一数据模型 (UDM) 字段,处理参与者和演示者信息,并根据用户活动对活动进行分类。解析器还会执行数据转换,例如合并标签和将时间戳转换为标准化格式。
准备工作
确保您满足以下前提条件:
- Google SecOps 实例
- 已启用 Cloud Storage API 的 GCP 项目
- 创建和管理 GCS 存储分区的权限
- 管理 GCS 存储分区的 IAM 政策的权限
- 创建 Cloud Run 服务、Pub/Sub 主题和 Cloud Scheduler 作业的权限
- 对 TeamViewer 管理控制台的特权访问权限
- TeamViewer Business、Premium、Corporate 或 Tensor 许可(API 访问权限必需)
创建 Google Cloud Storage 存储分区
- 前往 Google Cloud 控制台。
- 选择您的项目或创建新项目。
- 在导航菜单中,依次前往 Cloud Storage > 存储分区。
- 点击创建存储分区。
提供以下配置详细信息:
设置 值 为存储分区命名 输入一个全局唯一的名称(例如 teamviewer-logs)位置类型 根据您的需求进行选择(区域级、双区域级、多区域级) 位置 选择相应位置(例如 us-central1)存储类别 标准(建议用于经常访问的日志) 访问权限控制 统一(推荐) 保护工具 可选:启用对象版本控制或保留政策 点击创建。
获取 TeamViewer 前提条件
- 前往 https://login.teamviewer.com/,登录 TeamViewer 管理控制台。
- 点击右上角的用户图标,然后选择修改个人资料。
- 选择应用。
- 点击 Create script token。
- 提供以下配置详细信息:
- 令牌名称:输入一个描述性名称(例如
Google SecOps Integration)。 - 权限:选择以下权限:
- 账号管理 > 查看账号数据
- 会话管理 > 查看会话数据
- 连接报告 > 查看连接报告
- 令牌名称:输入一个描述性名称(例如
- 点击创建。
复制生成的脚本令牌并将其保存在安全的位置。
记录您的 TeamViewer API 基本网址:
https://webapi.teamviewer.com/api/v1
验证权限
如需验证账号是否具有所需权限,请执行以下操作:
- 登录 TeamViewer 管理控制台。
- 依次前往修改个人资料 > 应用。
- 在列表中找到您的脚本令牌。
- 验证连接报告 > 查看连接报告是否已启用。
- 如果未启用此权限,请修改令牌并添加所需权限。
测试 API 访问权限
在继续进行集成之前,请先测试您的凭据:
# Replace with your actual script token SCRIPT_TOKEN="your-script-token" API_BASE="https://webapi.teamviewer.com/api/v1" # Test API access curl -v -H "Authorization: Bearer ${SCRIPT_TOKEN}" \ -H "Accept: application/json" \ "${API_BASE}/reports/connections?from_date=2024-01-01T00:00:00Z&to_date=2024-01-01T01:00:00Z"
如果您收到包含 JSON 数据的 200 响应,则表示您的凭据配置正确。
为 Cloud Run 函数创建服务账号
Cloud Run 函数需要一个服务账号,该账号具有向 GCS 存储分区写入内容以及被 Pub/Sub 调用的权限。
创建服务账号
- 在 GCP 控制台中,依次前往 IAM 和管理 > 服务账号。
- 点击创建服务账号。
- 提供以下配置详细信息:
- 服务账号名称:输入
teamviewer-collector-sa。 - 服务账号说明:输入
Service account for Cloud Run function to collect TeamViewer logs。
- 服务账号名称:输入
- 点击创建并继续。
- 在向此服务账号授予对项目的访问权限部分中,添加以下角色:
- 点击选择角色。
- 搜索并选择 Storage Object Admin。
- 点击 + 添加其他角色。
- 搜索并选择 Cloud Run Invoker。
- 点击 + 添加其他角色。
- 搜索并选择 Cloud Functions Invoker。
- 点击继续。
- 点击完成。
以下角色是必需的: - Storage Object Admin:将日志写入 GCS 存储分区并管理状态文件 - Cloud Run Invoker:允许 Pub/Sub 调用函数 - Cloud Functions Invoker:允许函数调用
授予对 GCS 存储分区的 IAM 权限
向服务账号授予对 GCS 存储分区的写入权限:
- 前往 Cloud Storage > 存储分区。
- 点击您的存储分区名称(例如
teamviewer-logs)。 - 前往权限标签页。
- 点击授予访问权限。
- 提供以下配置详细信息:
- 添加主账号:输入服务账号电子邮件地址(例如
teamviewer-collector-sa@PROJECT_ID.iam.gserviceaccount.com)。 - 分配角色:选择 Storage Object Admin。
- 添加主账号:输入服务账号电子邮件地址(例如
- 点击保存。
创建发布/订阅主题
创建一个 Pub/Sub 主题,供 Cloud Scheduler 发布消息,并供 Cloud Run 函数订阅。
- 在 GCP 控制台中,前往 Pub/Sub > 主题。
- 点击创建主题。
- 提供以下配置详细信息:
- 主题 ID:输入
teamviewer-logs-trigger。 - 将其他设置保留为默认值。
- 主题 ID:输入
- 点击创建。
创建 Cloud Run 函数以收集日志
Cloud Run 函数由来自 Cloud Scheduler 的 Pub/Sub 消息触发,用于从 TeamViewer API 中提取日志并将其写入 GCS。
- 在 GCP 控制台中,前往 Cloud Run。
- 点击创建服务。
- 选择函数(使用内嵌编辑器创建函数)。
在配置部分中,提供以下配置详细信息:
设置 值 Service 名称 teamviewer-logs-collector区域 选择与您的 GCS 存储分区匹配的区域(例如 us-central1)运行时 选择 Python 3.12 或更高版本 在触发器(可选)部分中:
- 点击 + 添加触发器。
- 选择 Cloud Pub/Sub。
- 在选择 Cloud Pub/Sub 主题中,选择 Pub/Sub 主题 (
teamviewer-logs-trigger)。 - 点击保存。
在身份验证部分中:
- 选择需要进行身份验证。
- 检查 Identity and Access Management (IAM)。
向下滚动并展开容器、网络、安全性。
前往安全标签页:
- 服务账号:选择服务账号 (
teamviewer-collector-sa)。
- 服务账号:选择服务账号 (
前往容器标签页:
- 点击变量和密钥。
- 为每个环境变量点击 + 添加变量:
变量名称 示例值 GCS_BUCKETteamviewer-logsGCS_PREFIXteamviewer/audit/STATE_KEYteamviewer/audit/state.jsonWINDOW_SECONDS3600HTTP_TIMEOUT60MAX_RETRIES3USER_AGENTteamviewer-to-gcs/1.0SCRIPT_TOKENyour-script-token(来自 TeamViewer 前提条件)API_BASE_URLhttps://webapi.teamviewer.com/api/v1在变量和 Secret 部分中,向下滚动到请求:
- 请求超时:输入
600秒(10 分钟)。
- 请求超时:输入
前往设置标签页:
- 在资源部分中:
- 内存:选择 512 MiB 或更高值。
- CPU:选择 1。
- 在资源部分中:
在修订版本扩缩部分中:
- 实例数下限:输入
0。 - 实例数上限:输入
100(或根据预期负载进行调整)。
- 实例数下限:输入
点击创建。
等待服务创建完成(1-2 分钟)。
创建服务后,系统会自动打开内嵌代码编辑器。
添加函数代码
- 在函数入口点中输入 main
在内嵌代码编辑器中,创建两个文件:
- 第一个文件:main.py::
import functions_framework from google.cloud import storage import json import os import urllib.request import urllib.parse import urllib.error from datetime import datetime, timezone import time import uuid # Initialize Storage client storage_client = storage.Client() @functions_framework.cloud_event def main(cloud_event): """ Cloud Run function triggered by Pub/Sub to fetch TeamViewer audit logs and write to GCS. Args: cloud_event: CloudEvent object containing Pub/Sub message """ # Get environment variables bucket_name = os.environ.get('GCS_BUCKET') prefix = os.environ.get('GCS_PREFIX', 'teamviewer/audit/') state_key = os.environ.get('STATE_KEY', 'teamviewer/audit/state.json') window_sec = int(os.environ.get('WINDOW_SECONDS', '3600')) http_timeout = int(os.environ.get('HTTP_TIMEOUT', '60')) max_retries = int(os.environ.get('MAX_RETRIES', '3')) user_agent = os.environ.get('USER_AGENT', 'teamviewer-to-gcs/1.0') # TeamViewer API credentials api_base_url = os.environ.get('API_BASE_URL') script_token = os.environ.get('SCRIPT_TOKEN') if not all([bucket_name, api_base_url, script_token]): print('Error: Missing required environment variables') return try: # Get GCS bucket bucket = storage_client.bucket(bucket_name) # Load state (last processed timestamp) state = load_state(bucket, state_key) now = time.time() from_ts = float(state.get('last_to_ts') or (now - window_sec)) to_ts = now print(f'Fetching TeamViewer audit data from {iso_format(from_ts)} to {iso_format(to_ts)}') # Build audit API URL url = build_audit_url(api_base_url, from_ts, to_ts) print(f'Fetching TeamViewer audit data from: {url}') # Fetch audit data with retries and pagination all_records = [] offset_id = None while True: blob_data, content_type, next_offset = fetch_audit_data( url, script_token, user_agent, http_timeout, max_retries, offset_id ) # Validate JSON data try: audit_data = json.loads(blob_data) records = audit_data.get('records', []) all_records.extend(records) print(f"Retrieved {len(records)} audit records (total: {len(all_records)})") # Check for pagination if next_offset and len(records) == 1000: offset_id = next_offset print(f"Fetching next page with offset_id: {offset_id}") else: break except json.JSONDecodeError as e: print(f"Warning: Invalid JSON received: {e}") break if all_records: # Write to GCS key = put_audit_data(bucket, prefix, json.dumps({'records': all_records}), 'application/json', from_ts, to_ts) print(f'Successfully wrote {len(all_records)} audit records to {key}') else: print('No audit records found') # Update state state['last_to_ts'] = to_ts state['last_successful_run'] = now save_state(bucket, state_key, state) except Exception as e: print(f'Error processing TeamViewer logs: {str(e)}') raise def load_state(bucket, key): """Load state from GCS.""" try: blob = bucket.blob(key) if blob.exists(): state_data = blob.download_as_text() return json.loads(state_data) except Exception as e: print(f'Warning: Could not load state: {str(e)}') return {} def save_state(bucket, key, state): """Save state to GCS.""" try: blob = bucket.blob(key) blob.upload_from_string( json.dumps(state, separators=(',', ':')), content_type='application/json' ) except Exception as e: print(f'Warning: Could not save state: {str(e)}') def iso_format(ts): """Convert Unix timestamp to ISO 8601 format.""" return time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime(ts)) def build_audit_url(api_base_url, from_ts, to_ts): """Build URL for TeamViewer audit API endpoint.""" base_endpoint = f"{api_base_url.rstrip('/')}/reports/connections" params = { 'from_date': iso_format(from_ts), 'to_date': iso_format(to_ts) } query_string = urllib.parse.urlencode(params) return f"{base_endpoint}?{query_string}" def fetch_audit_data(url, script_token, user_agent, http_timeout, max_retries, offset_id=None): """Fetch audit data from TeamViewer API with retries and pagination support.""" # Add offset_id parameter if provided if offset_id: separator = '&' if '?' in url else '?' url = f"{url}{separator}offset_id={offset_id}" attempt = 0 while True: req = urllib.request.Request(url, method='GET') req.add_header('User-Agent', user_agent) req.add_header('Authorization', f'Bearer {script_token}') req.add_header('Accept', 'application/json') try: with urllib.request.urlopen(req, timeout=http_timeout) as r: response_data = r.read() content_type = r.headers.get('Content-Type') or 'application/json' # Extract next_offset from response if present try: data = json.loads(response_data) next_offset = data.get('next_offset') except: next_offset = None return response_data, content_type, next_offset except urllib.error.HTTPError as e: if e.code == 429: attempt += 1 print(f'Rate limited (429) on attempt {attempt}') if attempt > max_retries: raise time.sleep(min(60, 2 ** attempt) + (time.time() % 1)) else: print(f'HTTP error {e.code}: {e.reason}') raise except urllib.error.URLError as e: attempt += 1 print(f'URL error on attempt {attempt}: {e}') if attempt > max_retries: raise time.sleep(min(60, 2 ** attempt) + (time.time() % 1)) def put_audit_data(bucket, prefix, blob_data, content_type, from_ts, to_ts): """Write audit data to GCS.""" ts_path = time.strftime('%Y/%m/%d', time.gmtime(to_ts)) uniq = f"{int(time.time() * 1e6)}_{uuid.uuid4().hex[:8]}" key = f"{prefix}{ts_path}/teamviewer_audit_{int(from_ts)}_{int(to_ts)}_{uniq}.json" blob = bucket.blob(key) blob.metadata = { 'source': 'teamviewer-audit', 'from_timestamp': str(int(from_ts)), 'to_timestamp': str(int(to_ts)) } blob.upload_from_string(blob_data, content_type=content_type) return key- 第二个文件:requirements.txt::
functions-framework==3.* google-cloud-storage==2.*点击部署以保存并部署该函数。
等待部署完成(2-3 分钟)。
创建 Cloud Scheduler 作业
Cloud Scheduler 会定期向 Pub/Sub 主题发布消息,从而触发 Cloud Run 函数。
- 在 GCP Console 中,前往 Cloud Scheduler。
- 点击创建作业。
提供以下配置详细信息:
设置 值 名称 teamviewer-logs-collector-hourly区域 选择与 Cloud Run 函数相同的区域 频率 0 * * * *(每小时一次,在整点时)时区 选择时区(建议选择世界协调时间 [UTC]) 目标类型 Pub/Sub 主题 选择 Pub/Sub 主题 ( teamviewer-logs-trigger)消息正文 {}(空 JSON 对象)点击创建。
时间表频率选项
根据日志量和延迟时间要求选择频次:
频率 Cron 表达式 使用场景 每隔 5 分钟 */5 * * * *高容量、低延迟 每隔 15 分钟 */15 * * * *搜索量中等 每小时 0 * * * *标准(推荐) 每 6 小时 0 */6 * * *量小、批处理 每天 0 0 * * *历史数据收集
测试集成
- 在 Cloud Scheduler 控制台中,找到您的作业 (
teamviewer-logs-collector-hourly)。 - 点击强制运行以手动触发作业。
- 等待几秒钟。
- 前往 Cloud Run > 服务。
- 点击函数名称 (
teamviewer-logs-collector)。 - 点击日志标签页。
验证函数是否已成功执行。查找以下项:
Fetching TeamViewer audit data from YYYY-MM-DDTHH:MM:SSZ to YYYY-MM-DDTHH:MM:SSZ Retrieved X audit records (total: X) Successfully wrote X audit records to teamviewer/audit/YYYY/MM/DD/teamviewer_audit_...json前往 Cloud Storage > 存储分区。
点击您的存储分区名称 (
teamviewer-logs)。前往前缀文件夹 (
teamviewer/audit/)。验证是否已创建具有当前时间戳的新
.json文件。
如果您在日志中看到错误,请执行以下操作:
- HTTP 401:检查
SCRIPT_TOKEN环境变量是否与您的 TeamViewer 脚本令牌一致 - HTTP 403:验证脚本令牌是否具有连接报告 > 查看连接报告权限
- HTTP 429:速率限制 - 函数将自动重试,并采用指数退避算法
- 缺少环境变量:检查是否已设置所有必需的变量(
GCS_BUCKET、API_BASE_URL、SCRIPT_TOKEN)
检索 Google SecOps 服务账号
Google SecOps 使用唯一的服务账号从您的 GCS 存储分区中读取数据。您必须授予此服务账号对您的存储分区的访问权限。
获取服务账号电子邮件地址
- 依次前往 SIEM 设置 > Feed。
- 点击添加新 Feed。
- 点击配置单个 Feed。
- 在Feed 名称字段中,输入 Feed 的名称(例如
TeamViewer logs)。 - 选择 Google Cloud Storage V2 作为来源类型。
- 选择 TeamViewer 作为日志类型。
点击获取服务账号。系统会显示一个唯一的服务账号电子邮件地址,例如:
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com复制此电子邮件地址,以便在下一步中使用。
向 Google SecOps 服务账号授予 IAM 权限
Google SecOps 服务账号需要对您的 GCS 存储分区具有 Storage Object Viewer 角色。
- 前往 Cloud Storage > 存储分区。
- 点击您的存储分区名称 (
teamviewer-logs)。 - 前往权限标签页。
- 点击授予访问权限。
- 提供以下配置详细信息:
- 添加主账号:粘贴 Google SecOps 服务账号电子邮件地址。
- 分配角色:选择 Storage Object Viewer。
点击保存。
在 Google SecOps 中配置 Feed 以提取 TeamViewer 日志
- 依次前往 SIEM 设置 > Feed。
- 点击添加新 Feed。
- 点击配置单个 Feed。
- 在Feed 名称字段中,输入 Feed 的名称(例如
TeamViewer logs)。 - 选择 Google Cloud Storage V2 作为来源类型。
- 选择 TeamViewer 作为日志类型。
- 点击下一步。
为以下输入参数指定值:
存储分区网址:输入带有前缀路径的 GCS 存储分区 URI:
gs://teamviewer-logs/teamviewer/audit/将
teamviewer-logs:您的 GCS 存储分区名称。teamviewer/audit/:存储日志的前缀/文件夹路径。
来源删除选项:根据您的偏好选择删除选项:
- 永不:永不删除转移后的任何文件(建议用于测试)。
- 删除已转移的文件:在成功转移后删除文件。
删除已转移的文件和空目录:在成功转移后删除文件和空目录。
文件存在时间上限:包含在过去指定天数内修改的文件。默认值为 180 天。
资产命名空间:资产命名空间。
注入标签:要应用于此 Feed 中事件的标签。
点击下一步。
在最终确定界面中查看新的 Feed 配置,然后点击提交。
UDM 映射表
| 日志字段 | UDM 映射 | 逻辑 |
|---|---|---|
| AffectedItem | metadata.product_log_id | 原始日志中的 AffectedItem 值直接映射到此 UDM 字段。 |
| EventDetails.NewValue | principal.resource.attribute.labels.value | 如果 PropertyName 包含 (server),则 NewValue 用作 principal.resource.attribute.labels 中标签的值。 |
| EventDetails.NewValue | principal.user.user_display_name | 如果 PropertyName 是参与者的名称,则 NewValue 会用作相应主账号的用户显示名称。 |
| EventDetails.NewValue | principal.user.userid | 如果 PropertyName 是参与者的 ID,则 NewValue 用作主账号的用户 ID。 |
| EventDetails.NewValue | security_result.about.labels.value | 对于所有其他 PropertyName 值(由特定条件处理的值除外),NewValue 将用作 security_result.about.labels 数组中某个标签的值。 |
| EventDetails.NewValue | target.file.full_path | 如果 PropertyName 为“源文件”,则 NewValue 用作目标文件的完整路径。 |
| EventDetails.NewValue | target.resource.attribute.labels.value | 如果 PropertyName 包含 (client),则 NewValue 用作 target.resource.attribute.labels 中标签的值。 |
| EventDetails.NewValue | target.user.user_display_name | 如果 PropertyName 是演示者的姓名,则会解析 NewValue。如果它是整数,则会被舍弃。否则,它将用作目标的显示名。 |
| EventDetails.NewValue | target.user.userid | 如果 PropertyName 是演示者的 ID,则 NewValue 会用作目标的相应用户 ID。 |
| EventDetails.PropertyName | principal.resource.attribute.labels.key | 如果 PropertyName 包含 (server),则 PropertyName 用作 principal.resource.attribute.labels 中标签的键。 |
| EventDetails.PropertyName | security_result.about.labels.key | 对于所有其他 PropertyName 值(由特定条件处理的值除外),PropertyName 将用作 security_result.about.labels 数组中标签的键。 |
| EventDetails.PropertyName | target.resource.attribute.labels.key | 如果 PropertyName 包含 (client),则 PropertyName 用作 target.resource.attribute.labels 中标签的键。 |
| 事件名称 | metadata.product_event_type | 原始日志中的 EventName 值直接映射到此 UDM 字段。 |
| 时间戳 | metadata.event_timestamp | 系统会解析原始日志中的时间戳值,并将其用作元数据中的事件时间戳。 |
| metadata.event_type | 如果 src_user(从参与者的 ID 派生)不为空,则设置为 USER_UNCATEGORIZED;否则设置为 USER_RESOURCE_ACCESS。 | |
| metadata.vendor_name | 硬编码为 TEAMVIEWER。 | |
| metadata.product_name | 硬编码为 TEAMVIEWER。 | |
| network.application_protocol | 硬编码为 TEAMVIEWER。 |
需要更多帮助?获得社区成员和 Google SecOps 专业人士的解答。