大多数流式数据流水线都需要进行数据转换。有些用户喜欢在提取、加载、转换 (ELT) 流水线中,在数据到达目标位置后对其进行转换;而另一些用户则选择在提取、转换和加载 (ETL) 流水线中,在数据提取之前对其进行转换。传统上,这种架构需要使用 Dataflow 或 Apache Flink 等工具构建复杂的流水线才能执行数据转换。
Pub/Sub 提供单条消息转换 (SMT) 功能,可简化流处理流水线的数据转换。SMT 支持直接在 Pub/Sub 内对消息数据及属性进行轻量级修改。SMT 无需执行额外的数据处理步骤或使用单独的数据转换产品。
SMT 运行时,会以 Pub/Sub 消息(包括消息数据和属性)作为输入。输出是经过转换的 Pub/Sub 消息,其中包含对数据或属性的修改。 SMT 已集成到 Pub/Sub API 中,因此您可以将它们作为主题或订阅配置的一部分进行管理。
SMT 使用场景
假设您要设计一个网店,希望在客户浏览网站时向其提供个性化的商品推荐。为此,您可以使用 Pub/Sub 收集有关客户在网站上活动的实时数据。这包括有关所查看的商品、添加到购物车的商品以及商品的评分的数据。
不过,在将这些原始数据用于生成推荐之前,通常需要对其进行一些调整。例如,原始数据可能包含与您的使用情形无关的无关细节。此类详细信息的示例包括客户的浏览器类型或他们访问网站的时间。数据也可能不符合推荐系统所需的格式。例如,时间戳可能采用不同的格式,或者可能需要将商品 ID 转换为其他类型。
您可以使用 Pub/Sub SMT 来进行以下数据转换:
移除全名和地址等个人身份信息 (PII),以保护客户隐私。
仅保留与推荐相关的事件,例如商品浏览和购买,并舍弃其他事件,例如客户个人资料更改。
确保所有时间戳、币种值和商品 ID 都采用与推荐系统兼容的统一格式和类型。
根据原始数据生成新的数据字段,例如购物车总价值或产品网页停留时间。
将 Gemini Enterprise Agent Platform 模型中的推理结果(例如分类、预测、情感分析或嵌入)添加到事件数据中。
总而言之,SMT 可实现各种用例,包括:
数据遮盖和隐去:通过遮盖或隐去信用卡号或 PII 等字段来保护敏感数据,有助于遵守数据隐私权法规。
数据格式转换:在不同格式之间转换数据,以确保与下游系统的兼容性。
消息过滤:根据内容或属性过滤掉不需要的消息,仅处理相关消息。与 Pub/Sub 的内置过滤条件相比,SMT 支持更复杂的过滤条件。
简单的数据转换:执行基本的数据处理任务,例如字符串处理、日期格式设置或数学运算。
AI 推理:使用 AI 推理 SMT 将 AI 模型无缝集成到 Pub/Sub 流水线中。
SMT 的类型
Pub/Sub 支持以下 SMT:
- AI 推理:从 Agent Platform 模型获取有关 Pub/Sub 消息的推理结果。
- 用户定义的函数:调用 JavaScript 用户定义的函数 (UDF) 以对 Pub/Sub 消息执行自定义转换。
SMT 的消息流示例
此图片展示了一个示例 Pub/Sub 系统,其中在主题和订阅级别都应用了 SMT。
以下过程展示了 Pub/Sub 系统中的消息流:
发布者应用 Publisher 1 和 Publisher 2 分别将消息 A 和 B 发布到 Pub/Sub 主题。
主题的 SMT 将消息 A 和 B 分别转换为消息 A' 和 B'。
如果主题附加了架构,则转换后的消息 A' 和 B' 会根据该架构进行验证。例如,如果 A' 与架构不匹配,则消息 A 的发布会因错误而失败。
转换后的消息 A' 和 B' 会写入 Pub/Sub 存储空间。
Pub/Sub 会将消息 A' 和 B' 传送给所有附加的订阅,即订阅 1 和订阅 2,如图所示。
如果订阅 1 配置了过滤条件,则系统会根据该过滤条件评估消息 A' 和 B'。只有与过滤条件匹配的消息才会进入下一步。其他消息由 Pub/Sub 自动确认。
如果为订阅 2 配置了过滤条件,系统会根据该过滤条件评估消息 A' 和 B'。只有与过滤条件匹配的消息才会进入下一步。其他消息由 Pub/Sub 自动确认。
订阅 1 的 SMT 会转换消息 A' 和 B'。A' 变为 A'',B' 变为 B''。
订阅 2 的 SMT 会转换消息 A' 和 B'。A' 保持不变,B' 被过滤掉。
如果订阅 1 是启用了载荷解封装的推送订阅,则消息 A'' 和 B'' 会被解封装。如果订阅 2 是启用了载荷解封的推送订阅,则 A' 会被解封。
订阅者 1 接收消息 B'',订阅者 2 接收消息 A'',订阅者 3 接收消息 A'。
订阅者确认收到的消息。
Pub/Sub 会从存储空间中删除已确认的消息。
限制
一个主题或订阅最多可启用 5 个 SMT。
SMT 针对单条 Pub/Sub 消息运行。它们无法汇总多条 Pub/Sub 消息。