用户定义的函数 (UDF) 概览

JavaScript 用户定义的函数 (UDF) 是一种单条消息转换 (SMT)。与BigQuery JavaScript UDFs类似,UDF 提供了一种灵活的方式,可在 Pub/Sub 中实现自定义转换 逻辑。

UDF 接受单条消息作为输入,对输入执行定义的操作,并返回处理结果。

UDF 具有以下关键属性:

  • 代码: 用于定义转换逻辑的 JavaScript 代码。

  • 函数名称: Pub/Sub 应用于消息的所提供代码中的 JavaScript 函数的名称。

创建 JavaScript 函数

UDF 代码必须包含具有以下签名的函数:

  /**
  * Transforms a Pub/Sub message.
  * @return {(Object<string, (string | Object<string, string>)>|* null)} - To
  * filter a message, return `null`. To transform a message, return a map with
  * the following keys:
  *   - (required) 'data' : {string}
  *   - (optional) 'attributes' : {Object<string, string>}
  * Returning empty `attributes` will remove all attributes from the message.
  *
  * @param  {(Object<string, (string | Object<string, string>)>} - Pub/Sub
  * message. Keys:
  *   - (required) 'data' : {string}
  *   - (required) 'attributes' : {Object<string, string>}
  *
  * @param  {Object<string, any>} metadata - Pub/Sub message metadata.
  * Keys:
  *   - (optional) 'message_id'  : {string}
  *   - (optional) 'publish_time': {string} YYYY-MM-DDTHH:MM:SSZ format
  *   - (optional) 'ordering_key': {string}
  */
  function <function_name>(message, metadata) {
    // Perform custom transformation logic
    return message; // to filter a message instead, return `null`
  }

输入

该函数接受以下输入:

  • message 实参: 表示 Pub/Sub 消息的 JavaScript 对象。它包含以下属性:

    • data:(String,必需)消息载荷。

    • attributes:(Object<String, String>,可选)表示消息属性的键值对 映射。

  • metadata 实参: 一个 JavaScript 对象,其中包含有关 Pub/Sub 消息的不可变元数据:

    • message_id:(String,可选)消息的唯一 ID。

    • publish_time:(String,可选)消息的发布时间,采用 RFC 3339 格式 (YYYY-MM-DDTHH:mm:ssZ)。

    • ordering_key:(String,可选)消息的排序键( 如果适用)。

输出

该函数必须返回以下其中一项:

  • 如需转换消息,请修改 message.datamessage.attributes 的内容,然后返回更改后的 message 对象。

  • 如需过滤消息,请返回 null

输入 / 输出要求

  • 如果 UDF 转换消息载荷,则载荷输入和输出必须是 UTF-8 编码的字符串。
  • 如果 UDF 不转换消息载荷,则载荷可以使用任何编码。
  • 属性键值对必须是 UTF-8 编码的字符串。

UDF 如何转换消息

对消息运行 UDF 的结果可以是以下其中一项:

  • UDF 转换消息。

  • UDF 返回 null

    • 主题 SMT:Pub/Sub 向发布者返回成功,并在响应中包含过滤后的消息的消息 ID。 Pub/Sub 不会存储消息,也不会将其发送给任何订阅方。

    • 订阅 SMT:Pub/Sub 确认消息传送,但不会将消息发送给订阅者。

  • UDF 抛出错误。

    • 主题 SMT:Pub/Sub 向发布者返回错误,并且无法发布任何消息。

    • 订阅 SMT:Pub/Sub 对消息进行否定确认。

创建 UDF SMT

可以在 Pub/Sub 主题或订阅中配置 SMT。

  • 主题 SMT 在 Pub/Sub 存储消息之前执行,所有订阅者都可以使用结果。
  • 订阅 SMT 在消息传送之前执行,结果仅适用于该订阅。

控制台

  1. 在 Google Cloud 控制台中,前往 Pub/Sub 主题 页面。

    打开“主题”

  2. 创建主题或订阅。

    • 如需创建主题,请点击创建主题 。此时会打开创建主题 页面。

    • 如需创建订阅,请执行以下操作:

      1. 点击您要订阅的主题的名称。

      2. 点击创建订阅 。此时会打开将订阅添加到主题 页面。

  3. 转换下,点击添加转换

  4. 对于转换类型,选择 JavaScript UDF

  5. 函数名称 字段中,输入 SMT 调用的 JavaScript 函数的名称。示例:redactSSN

  6. 在文本区域中,输入 UDF 的代码。示例:

    function redactSSN(message, metadata) {
      const data = JSON.parse(message.data);
      delete data['ssn'];
      message.data = JSON.stringify(data);
      return message;
    }
    

    该代码必须包含一个函数,其名称与函数名称 字段匹配。

  7. 如果您不希望 SMT 立即处于活动状态,请选择停用转换 。选择此选项后,系统会使用主题创建 SMT,但不会对传入的消息执行 SMT。创建主题后,您可以修改主题以启用 SMT。

  8. 如需创建主题或订阅,请点击创建

gcloud

创建定义文件

创建一个 YAML 或 JSON 文件,用于定义 UDF SMT。

YAML

- javascriptUdf:
    code: { FUNCTION_CODE }
    functionName: FUNCTION_NAME

JSON

{
  "javascriptUdf": {
    "code": {
      FUNCTION_CODE
    }
    "functionName": FUNCTION_NAME
  }
}

替换以下内容:

  • FUNCTION_CODE:UDF 的 JavaScript 代码。 该代码必须包含一个函数,其名称与 functionName 字段匹配。示例:

    function redactSSN(message, metadata) {
      const data = JSON.parse(message.data);
      delete data['ssn'];
      message.data = JSON.stringify(data);
      return message;
    }
    
  • FUNCTION_NAME:SMT 调用的 JavaScript 函数的名称。示例:redactSSN

创建主题或订阅

如需创建主题,请运行 gcloud pubsub topics create 命令。

gcloud pubsub topics create TOPIC_ID \
  --message-transforms-file=TRANSFORMS_FILE

替换以下内容:

  • TOPIC_ID:您要创建的主题的 ID 或名称。
  • TRANSFORMS_FILE:定义文件的路径。

如需创建订阅,请运行 gcloud pubsub subscriptions create 命令。

gcloud pubsub subscriptions create SUBSCRIPTION_ID \
  --topic=projects/PROJECT_ID/topics/TOPIC_ID \
  --message-transforms-file=TRANSFORMS_FILE

替换以下内容:

  • SUBSCRIPTION_ID:要创建的订阅 的 ID 或名称。

  • PROJECT_ID:包含主题的项目的 ID。

  • TOPIC_ID:要订阅的主题的 ID。

  • TRANSFORMS_FILE:定义文件的路径。

您也可以选择在创建 SMT 之前对其进行验证和测试。如需了解详情,请参阅以下页面:

限制

Pub/Sub 对 UDF 强制执行资源限制,以确保高效的转换操作。限制包括:

  • 每个 UDF 最多 20 KB 的代码
  • 每条消息最多 500 毫秒的执行时间
  • 仅支持 ECMAScript 标准内置函数
  • 不调用外部 API
  • 不导入外部库

UDF 示例

以下是一些用于发布和订阅的 UDF 示例。您可以在 UDF 库中找到更多示例。

函数:将星期几整数转换为相应的字符串

当您将以下 UDF 添加到主题或订阅时,在消息发布或传送期间会发生以下更改:

  1. Pub/Sub 将该函数应用于消息。如果消息没有 JSON 载荷,UDF 会抛出错误。

  2. UDF 会查找名为 dayOfWeek 的字段,如果此字段的值是介于 0 到 6 之间的数字,则将其转换为相应的星期几,例如 Monday。如果该字段不存在或数字不在 0 到 6 的范围内,则代码会将 dayOfWeek 字段设置为 Unknown

  3. UDF 会将修改后的载荷序列化回消息中。

  4. Pub/Sub 会将更新后的消息传递到流水线中的下一步。

function intToString(message, metadata) {
  const data = JSON.parse(message.data);
  switch(`data["dayOfWeek"]`) {
    case 0:
      data["dayOfWeek"] = "Sunday";
      break;
    case 1:
      data["dayOfWeek"] = "Monday";
      break;
    case 2:
      data["dayOfWeek"] = "Tuesday";
      break;
    case 3:
      data["dayOfWeek"] = "Wednesday";
      break;
    case 4:
      data["dayOfWeek"] = "Thursday";
      break;
    case 5:
      data["dayOfWeek"] = "Friday";
      break;
    case 6:
      data["dayOfWeek"] = "Saturday";
      break;
    default:
      data["dayOfWeek"] = "Unknown";
  }
  message.data = JSON.stringify(data);
  return message;
}

函数:修订社会保障号

当您将以下 UDF 添加到主题或订阅时,在消息发布或传送期间会发生以下更改:

  1. Pub/Sub 将该函数应用于消息。如果消息没有 JSON 载荷,UDF 会抛出错误。

  2. UDF 会从消息载荷中移除字段 ssn(如果存在)。

  3. UDF 会将修改后的载荷序列化回消息中。

  4. Pub/Sub 会将更新后的消息传递到流水线中的下一步。

function redactSSN(message, metadata) {
  const data = JSON.parse(message.data);
  delete data['ssn'];
  message.data = JSON.stringify(data);
  return message;
}

函数:过滤掉并自动确认特定消息

当您将以下 UDF 添加到主题或订阅时,在消息发布或传送期间会发生以下更改:

  1. Pub/Sub 将该函数应用于消息。如果消息没有 JSON 载荷,UDF 会抛出错误。

  2. UDF 会检查载荷是否包含名为 region 的字段。

  3. 如果 region 字段的值不是 US,则该函数会返回 null,导致 Pub/Sub 过滤消息。

  4. 如果 region 字段的值是 US,Pub/Sub 会将原始消息传递到流水线中的下一步。

function filterForUSRegion(message, metadata) {
  const data = JSON.parse(message.data);
  if (data["region"] !== "US") {
    return null;
  }
  return message;
}

函数:验证消息内容,确保金额不大于 100

当您将以下 UDF 添加到主题或订阅时,在消息发布或传送期间会发生以下更改:

  1. Pub/Sub 将该函数应用于消息。如果消息没有 JSON 载荷,UDF 会抛出错误。

  2. UDF 会检查消息是否包含名为 amount 的字段。

  3. 如果 amount 字段的值大于 100,则该函数会抛出错误。

  4. 如果 amount 字段的值不大于 100,则该函数会返回原始消息。

  5. 然后,Pub/Sub 会将消息标记为失败,或将原始消息传递到流水线中的下一步。

function validateAmount(message, metadata) {
  const data = JSON.parse(message.data);
  if (data["amount"] > 100) {
    throw new Error("Amount is invalid");
  }
  return message;
}

后续步骤