使用 SMT 创建主题

本文档介绍了如何使用单条消息转换 (SMT) 创建 Pub/Sub 主题。

主题 SMT 可直接在 Pub/Sub 中对消息数据和属性进行轻量级修改。此功能可在消息发布到主题之前实现数据清理、过滤或格式转换。

如需创建具有 SMT 的主题,您可以使用 Google Cloud 控制台、Google Cloud CLI、客户端库或 Pub/Sub API。

准备工作

所需的角色和权限

如需获得创建具有 SMT 的主题所需的权限,请让您的管理员为您授予项目的 Pub/Sub Editor (roles/pubsub.editor) IAM 角色。如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

此预定义角色包含创建具有 SMT 的主题所需的权限。如需查看所需的确切权限,请展开所需权限部分:

所需权限

您需要具备以下权限才能创建包含 SMT 的主题:

  • 授予在项目上创建主题的权限: pubsub.topics.create

您也可以使用自定义角色或其他预定义角色来获取这些权限。

您可以在项目级层和个别资源级层配置访问权限控制。

创建具有 SMT 的主题

在创建包含 SMT 的主题之前,请查看主题的属性的相关文档。

如需创建包含一个或多个 SMT 的 Pub/Sub,请执行以下步骤。

控制台

  1. 在 Google Cloud 控制台中,前往 Pub/Sub 主题页面。

    打开“主题”

  2. 点击创建主题

  3. 主题 ID 字段中,输入主题 ID。 如需详细了解主题命名,请参阅命名准则

  4. 转换下,点击添加转换

  5. 输入函数名称。例如:redactSSN

  6. 如果您不希望 SMT 立即处于有效状态,请选择停用转换。选择此选项后,系统会使用相应主题创建 SMT,但不会针对收到的消息执行该 SMT。创建主题后,您可以修改主题以启用 SMT。

  7. 在文本区域中,输入 SMT 的代码。例如:

    function redactSSN(message, metadata) {
      const data = JSON.parse(message.data);
      delete data['ssn'];
      message.data = JSON.stringify(data);
      return message;
    }
    
  8. 可选。如需验证 SMT,请点击验证。如果 SMT 有效,系统会显示消息 "Validation passed"。否则,系统会显示错误消息。

  9. 如需添加其他转换,请点击添加转换,然后重复上述步骤。

    如需按特定顺序排列 SMT,请点击上移下移。如需移除 SMT,请点击 Delete(删除)。

  10. 可选。如需在示例消息上测试 SMT,请执行以下步骤:

    1. 点击 Test transforms

    2. 测试转换窗口中,选择要测试的函数。

    3. 输入消息窗口中,输入示例消息。

    4. 如需向消息添加属性,请点击添加属性,然后输入属性的键和值。您可以添加多个属性。

    5. 点击测试。应用 SMT 后的消息结果会显示在输出消息下。

    6. 如需关闭测试转换窗口,请点击 关闭

    如果您创建了多个 SMT,可以按如下方式测试整个转换序列:

    1. 按照上述步骤中的说明,测试序列中的第一个 SMT。
    2. 选择下一个 SMT。输入消息已预先填充了上一次测试的输出消息。
    3. 继续按顺序测试 SMT,以确保整个序列按预期运行。
  11. 如需创建主题,请点击创建

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 创建用于定义一个或多个 SMT 的 YAML 或 JSON 文件。如果您有多个 SMT,系统会按您列出的顺序对消息执行这些 SMT。

    以下是 YAML 转换文件的示例:

    - javascriptUdf:
        code: >
            function redactSSN(message, metadata) {
              const data = JSON.parse(message.data);
              delete data['ssn'];
              message.data = JSON.stringify(data);
              return message;
            }
        functionName: redactSSN
    
  3. 可选。如需验证 SMT,请运行 gcloud pubsub message-transforms validate 命令:

    gcloud pubsub message-transforms validate \
      --message-transform-file=TRANSFORM_FILE
    

    替换以下内容:

    • TRANSFORM_FILE:定义单个 SMT 的 YAML 或 JSON 文件的路径。如果您要创建多个 SMT,则必须分别验证它们。
  4. 可选。如需针对示例 Pub/Sub 消息测试一个或多个 SMT,请运行 gcloud pubsub message-transforms test 命令:

    gcloud pubsub message-transforms test \
      --message-transforms-file=TRANSFORMS_FILE \
      --message=MESSAGE \
      --attribute=ATTRIBUTES
    

    替换以下内容:

    • TRANSFORMS_FILE:定义了一个或多个 SMT 的 YAML 或 JSON 文件的路径。
    • MESSAGE:示例消息的正文。
    • ATTRIBUTES:可选。以英文逗号分隔的消息属性列表。每个属性都是一个键值对,格式为 KEY="VALUE"

    该命令按顺序执行 SMT,并将每个 SMT 的输出用作下一个 SMT 的输入。该命令会输出每个步骤的结果。

  5. 如需创建主题,请运行 gcloud pubsub topics create 命令:

    gcloud pubsub topics create TOPIC_ID \
      --message-transforms-file=TRANSFORMS_FILE
    

    替换以下内容:

    • TOPIC_ID:您要创建的主题的 ID 或名称。如需了解主题命名准则,请参阅资源名称。主题的名称不可变。
    • TRANSFORMS_FILE:定义了一个或多个 SMT 的 YAML 或 JSON 文件的路径。
  6. Java

    在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档

    
    import com.google.api.gax.rpc.AlreadyExistsException;
    import com.google.cloud.pubsub.v1.TopicAdminClient;
    import com.google.pubsub.v1.JavaScriptUDF;
    import com.google.pubsub.v1.MessageTransform;
    import com.google.pubsub.v1.Topic;
    import com.google.pubsub.v1.TopicName;
    import java.io.IOException;
    
    public class CreateTopicWithSmtExample {
    
      public static void main(String... args) throws Exception {
        // TODO(developer): Replace these variables before running the sample.
        String projectId = "your-project-id";
        String topicId = "your-topic-id";
    
        createTopicWithSmtExample(projectId, topicId);
      }
    
      public static void createTopicWithSmtExample(String projectId, String topicId)
          throws IOException {
        TopicName topicName = TopicName.of(projectId, topicId);
    
        // UDF that removes the 'ssn' field, if present
        String code =
            "function redactSSN(message, metadata) {"
                + "  const data = JSON.parse(message.data);"
                + "  delete data['ssn'];"
                + "  message.data = JSON.stringify(data);"
                + "  return message;"
                + "}";
        String functionName = "redactSSN";
    
        JavaScriptUDF udf =
            JavaScriptUDF.newBuilder().setCode(code).setFunctionName(functionName).build();
        MessageTransform transform = MessageTransform.newBuilder().setJavascriptUdf(udf).build();
        try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
    
          Topic topic =
              topicAdminClient.createTopic(
                  Topic.newBuilder()
                      .setName(topicName.toString())
                      // Add the UDF message transform
                      .addMessageTransforms(transform)
                      .build());
    
          System.out.println("Created topic with SMT: " + topic.getName());
        } catch (AlreadyExistsException e) {
          System.out.println(topicName + "already exists.");
        }
      }
    }

    Python

    在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档

    from google.cloud import pubsub_v1
    from google.pubsub_v1.types import JavaScriptUDF, MessageTransform, Topic
    
    # TODO(developer)
    # project_id = "your-project-id"
    # topic_id = "your-topic-id"
    
    code = """function redactSSN(message, metadata) {
                const data = JSON.parse(message.data);
                delete data['ssn'];
                message.data = JSON.stringify(data);
                return message;
                }"""
    udf = JavaScriptUDF(code=code, function_name="redactSSN")
    transforms = [MessageTransform(javascript_udf=udf)]
    
    publisher = pubsub_v1.PublisherClient()
    topic_path = publisher.topic_path(project_id, topic_id)
    
    request = Topic(name=topic_path, message_transforms=transforms)
    
    topic = publisher.create_topic(request=request)
    
    print(f"Created topic: {topic.name} with SMT")

    Go

    以下示例使用 Go Pub/Sub 客户端库的主要版本 (v2)。如果您仍在使用 v1 库,请参阅迁移到 v2 的指南。如需查看 v1 代码示例的列表,请参阅 已弃用的代码示例

    在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Go API 参考文档

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/pubsub/v2"
    	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
    )
    
    // createTopicWithSMT creates a topic with a single message transform function applied.
    func createTopicWithSMT(w io.Writer, projectID, topicID string) error {
    	// projectID := "my-project-id"
    	// topicID := "my-topic"
    	ctx := context.Background()
    	client, err := pubsub.NewClient(ctx, projectID)
    	if err != nil {
    		return fmt.Errorf("pubsub.NewClient: %w", err)
    	}
    	defer client.Close()
    
    	code := `function redactSSN(message, metadata) {
    			const data = JSON.parse(message.data);
    			delete data['ssn'];
    			message.data = JSON.stringify(data);
    			return message;
    		}`
    	transform := &pubsubpb.MessageTransform{
    		Transform: &pubsubpb.MessageTransform_JavascriptUdf{
    			JavascriptUdf: &pubsubpb.JavaScriptUDF{
    				FunctionName: "redactSSN",
    				Code:         code,
    			},
    		},
    	}
    
    	topic := &pubsubpb.Topic{
    		Name:              fmt.Sprintf("projects/%s/topics/%s", projectID, topicID),
    		MessageTransforms: []*pubsubpb.MessageTransform{transform},
    	}
    
    	topic, err = client.TopicAdminClient.CreateTopic(ctx, topic)
    	if err != nil {
    		return fmt.Errorf("CreateTopic: %w", err)
    	}
    
    	fmt.Fprintf(w, "Created topic with message transform: %v\n", topic)
    	return nil
    }
    

后续步骤