JavaScript 用户定义的函数 (UDF) 是一种单条消息转换 (SMT)。与BigQuery JavaScript UDFs类似,UDF 提供了一种灵活的方式,可在 Pub/Sub 中实现自定义转换 逻辑。
UDF 接受单条消息作为输入,对输入执行定义的操作,并返回处理结果。
UDF 具有以下关键属性:
代码: 用于定义转换逻辑的 JavaScript 代码。
函数名称: Pub/Sub 应用于消息的所提供代码中的 JavaScript 函数的名称。
创建 JavaScript 函数
UDF 代码必须包含具有以下签名的函数:
/**
* Transforms a Pub/Sub message.
* @return {(Object<string, (string | Object<string, string>)>|* null)} - To
* filter a message, return `null`. To transform a message, return a map with
* the following keys:
* - (required) 'data' : {string}
* - (optional) 'attributes' : {Object<string, string>}
* Returning empty `attributes` will remove all attributes from the message.
*
* @param {(Object<string, (string | Object<string, string>)>} - Pub/Sub
* message. Keys:
* - (required) 'data' : {string}
* - (required) 'attributes' : {Object<string, string>}
*
* @param {Object<string, any>} metadata - Pub/Sub message metadata.
* Keys:
* - (optional) 'message_id' : {string}
* - (optional) 'publish_time': {string} YYYY-MM-DDTHH:MM:SSZ format
* - (optional) 'ordering_key': {string}
*/
function <function_name>(message, metadata) {
// Perform custom transformation logic
return message; // to filter a message instead, return `null`
}
输入
该函数接受以下输入:
message实参: 表示 Pub/Sub 消息的 JavaScript 对象。它包含以下属性:data:(String,必需)消息载荷。attributes:(Object<String, String>,可选)表示消息属性的键值对 映射。
metadata实参: 一个 JavaScript 对象,其中包含有关 Pub/Sub 消息的不可变元数据:
输出
该函数必须返回以下其中一项:
如需转换消息,请修改
message.data和message.attributes的内容,然后返回更改后的message对象。如需过滤消息,请返回
null。
输入 / 输出要求
- 如果 UDF 转换消息载荷,则载荷输入和输出必须是 UTF-8 编码的字符串。
- 如果 UDF 不转换消息载荷,则载荷可以使用任何编码。
- 属性键值对必须是 UTF-8 编码的字符串。
UDF 如何转换消息
对消息运行 UDF 的结果可以是以下其中一项:
UDF 转换消息。
UDF 返回
null。主题 SMT:Pub/Sub 向发布者返回成功,并在响应中包含过滤后的消息的消息 ID。 Pub/Sub 不会存储消息,也不会将其发送给任何订阅方。
订阅 SMT:Pub/Sub 确认消息传送,但不会将消息发送给订阅者。
UDF 抛出错误。
主题 SMT:Pub/Sub 向发布者返回错误,并且无法发布任何消息。
订阅 SMT:Pub/Sub 对消息进行否定确认。
创建 UDF SMT
可以在 Pub/Sub 主题或订阅中配置 SMT。
- 主题 SMT 在 Pub/Sub 存储消息之前执行,所有订阅者都可以使用结果。
订阅 SMT 在消息传送之前执行,结果仅适用于该订阅。
控制台
在 Google Cloud 控制台中,前往 Pub/Sub 主题 页面。
创建主题或订阅。
如需创建主题,请点击创建主题 。此时会打开创建主题 页面。
如需创建订阅,请执行以下操作:
点击您要订阅的主题的名称。
点击创建订阅 。此时会打开将订阅添加到主题 页面。
在转换下,点击添加转换。
对于转换类型,选择 JavaScript UDF。
在函数名称 字段中,输入 SMT 调用的 JavaScript 函数的名称。示例:
redactSSN。在文本区域中,输入 UDF 的代码。示例:
function redactSSN(message, metadata) { const data = JSON.parse(message.data); delete data['ssn']; message.data = JSON.stringify(data); return message; }该代码必须包含一个函数,其名称与函数名称 字段匹配。
如果您不希望 SMT 立即处于活动状态,请选择停用转换 。选择此选项后,系统会使用主题创建 SMT,但不会对传入的消息执行 SMT。创建主题后,您可以修改主题以启用 SMT。
如需创建主题或订阅,请点击创建 。
gcloud
创建定义文件
创建一个 YAML 或 JSON 文件,用于定义 UDF SMT。
YAML
- javascriptUdf:
code: { FUNCTION_CODE }
functionName: FUNCTION_NAME
JSON
{
"javascriptUdf": {
"code": {
FUNCTION_CODE
}
"functionName": FUNCTION_NAME
}
}
替换以下内容:
FUNCTION_CODE:UDF 的 JavaScript 代码。 该代码必须包含一个函数,其名称与
functionName字段匹配。示例:function redactSSN(message, metadata) { const data = JSON.parse(message.data); delete data['ssn']; message.data = JSON.stringify(data); return message; }FUNCTION_NAME:SMT 调用的 JavaScript 函数的名称。示例:
redactSSN。
创建主题或订阅
如需创建主题,请运行
gcloud pubsub topics create
命令。
gcloud pubsub topics create TOPIC_ID \
--message-transforms-file=TRANSFORMS_FILE
替换以下内容:
- TOPIC_ID:您要创建的主题的 ID 或名称。
- TRANSFORMS_FILE:定义文件的路径。
如需创建订阅,请运行
gcloud pubsub subscriptions create
命令。
gcloud pubsub subscriptions create SUBSCRIPTION_ID \
--topic=projects/PROJECT_ID/topics/TOPIC_ID \
--message-transforms-file=TRANSFORMS_FILE
替换以下内容:
SUBSCRIPTION_ID:要创建的订阅 的 ID 或名称。
PROJECT_ID:包含主题的项目的 ID。
TOPIC_ID:要订阅的主题的 ID。
TRANSFORMS_FILE:定义文件的路径。
您也可以选择在创建 SMT 之前对其进行验证和测试。如需了解详情,请参阅以下页面:
限制
Pub/Sub 对 UDF 强制执行资源限制,以确保高效的转换操作。限制包括:
- 每个 UDF 最多 20 KB 的代码
- 每条消息最多 500 毫秒的执行时间
- 仅支持 ECMAScript 标准内置函数
- 不调用外部 API
- 不导入外部库
UDF 示例
以下是一些用于发布和订阅的 UDF 示例。您可以在 UDF 库中找到更多示例。
函数:将星期几整数转换为相应的字符串
当您将以下 UDF 添加到主题或订阅时,在消息发布或传送期间会发生以下更改:
Pub/Sub 将该函数应用于消息。如果消息没有 JSON 载荷,UDF 会抛出错误。
UDF 会查找名为
dayOfWeek的字段,如果此字段的值是介于 0 到 6 之间的数字,则将其转换为相应的星期几,例如Monday。如果该字段不存在或数字不在 0 到 6 的范围内,则代码会将dayOfWeek字段设置为Unknown。UDF 会将修改后的载荷序列化回消息中。
Pub/Sub 会将更新后的消息传递到流水线中的下一步。
function intToString(message, metadata) {
const data = JSON.parse(message.data);
switch(`data["dayOfWeek"]`) {
case 0:
data["dayOfWeek"] = "Sunday";
break;
case 1:
data["dayOfWeek"] = "Monday";
break;
case 2:
data["dayOfWeek"] = "Tuesday";
break;
case 3:
data["dayOfWeek"] = "Wednesday";
break;
case 4:
data["dayOfWeek"] = "Thursday";
break;
case 5:
data["dayOfWeek"] = "Friday";
break;
case 6:
data["dayOfWeek"] = "Saturday";
break;
default:
data["dayOfWeek"] = "Unknown";
}
message.data = JSON.stringify(data);
return message;
}
函数:修订社会保障号
当您将以下 UDF 添加到主题或订阅时,在消息发布或传送期间会发生以下更改:
Pub/Sub 将该函数应用于消息。如果消息没有 JSON 载荷,UDF 会抛出错误。
UDF 会从消息载荷中移除字段
ssn(如果存在)。UDF 会将修改后的载荷序列化回消息中。
Pub/Sub 会将更新后的消息传递到流水线中的下一步。
function redactSSN(message, metadata) {
const data = JSON.parse(message.data);
delete data['ssn'];
message.data = JSON.stringify(data);
return message;
}
函数:过滤掉并自动确认特定消息
当您将以下 UDF 添加到主题或订阅时,在消息发布或传送期间会发生以下更改:
Pub/Sub 将该函数应用于消息。如果消息没有 JSON 载荷,UDF 会抛出错误。
UDF 会检查载荷是否包含名为
region的字段。如果
region字段的值不是US,则该函数会返回 null,导致 Pub/Sub 过滤消息。如果
region字段的值是US,Pub/Sub 会将原始消息传递到流水线中的下一步。
function filterForUSRegion(message, metadata) {
const data = JSON.parse(message.data);
if (data["region"] !== "US") {
return null;
}
return message;
}
函数:验证消息内容,确保金额不大于 100
当您将以下 UDF 添加到主题或订阅时,在消息发布或传送期间会发生以下更改:
Pub/Sub 将该函数应用于消息。如果消息没有 JSON 载荷,UDF 会抛出错误。
UDF 会检查消息是否包含名为
amount的字段。如果
amount字段的值大于100,则该函数会抛出错误。如果
amount字段的值不大于100,则该函数会返回原始消息。然后,Pub/Sub 会将消息标记为失败,或将原始消息传递到流水线中的下一步。
function validateAmount(message, metadata) {
const data = JSON.parse(message.data);
if (data["amount"] > 100) {
throw new Error("Amount is invalid");
}
return message;
}