使用 SMT 建立主題

本文說明如何建立具有單一訊息轉換 (SMT) 的 Pub/Sub 主題。

主題 SMT 可直接在 Pub/Sub 中輕量修改訊息資料和屬性。這項功能可讓您在訊息發布至主題前,先清理、篩選資料或轉換格式。

如要使用 SMT 建立主題,可以使用 Google Cloud 控制台、Google Cloud CLI、用戶端程式庫或 Pub/Sub API。

事前準備

必要角色和權限

如要取得建立含有 SMT 的主題所需的權限,請要求管理員在專案中授予您「Pub/Sub 編輯者 」(roles/pubsub.editor) IAM 角色。如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和組織的存取權」。

這個預先定義的角色具備使用 SMT 建立主題所需的權限。如要查看確切的必要權限,請展開「Required permissions」(必要權限) 部分:

所需權限

如要使用 SMT 建立主題,必須具備下列權限:

  • 授予專案的主題建立權限: pubsub.topics.create

您或許還可透過自訂角色或其他預先定義的角色取得這些權限。

您可以在專案層級和個別資源層級設定存取權控管。

使用 SMT 建立主題

使用 SMT 建立主題前,請先參閱主題屬性的說明文件。

如要建立具有一或多個 SMT 的 Pub/Sub,請執行下列步驟。

控制台

  1. 前往 Google Cloud 控制台的 Pub/Sub「主題」頁面。

    前往「主題」

  2. 按一下「建立主題」

  3. 在「主題 ID」欄位中,輸入主題的 ID。如要進一步瞭解如何命名主題,請參閱命名規範

  4. 在「轉換」下方,按一下「新增轉換」

  5. 輸入函式名稱。例如:redactSSN

  6. 如不想立即啟用 SMT,請選取「停用轉換」。選取這個選項後,系統會使用主題建立 SMT,但不會對內送郵件執行 SMT。建立主題後,您可以編輯主題來啟用 SMT。

  7. 在文字區域中輸入 SMT 的代碼。例如:

    function redactSSN(message, metadata) {
      const data = JSON.parse(message.data);
      delete data['ssn'];
      message.data = JSON.stringify(data);
      return message;
    }
    
  8. 選用。如要驗證 SMT,請按一下「驗證」。如果 SMT 有效,系統會顯示 "Validation passed" 訊息。否則會顯示錯誤訊息。

  9. 如要新增其他轉換作業,請按一下「新增轉換作業」,然後重複上述步驟。

    如要依特定順序排列 SMT,請按一下「上移」或「下移」如要移除 SMT,請按一下「刪除」

  10. 選用。如要測試範例訊息的 SMT,請按照下列步驟操作:

    1. 按一下「測試轉換」

    2. 在「測試轉換」視窗中,選取要測試的函式。

    3. 在「輸入訊息」視窗中輸入範例訊息。

    4. 如要為訊息新增屬性,請按一下「新增屬性」,然後輸入屬性的鍵和值。你可以新增多個屬性。

    5. 按一下「測試」。將 SMT 套用至訊息的結果會顯示在「輸出訊息」下方。

    6. 如要關閉「測試轉換」視窗,請按一下「關閉」

    如果您建立多個 SMT,可以按照下列方式測試整個轉換序列:

    1. 按照先前的步驟,測試序列中的第一個 SMT。
    2. 選取下一個 SMT。輸入訊息會預先填入先前測試的輸出訊息。
    3. 繼續依序測試 SMT,確保整個序列運作正常。
  11. 如要建立主題,請按一下「建立」

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 建立定義一或多個 SMT 的 YAML 或 JSON 檔案。如果您有多個 SMT,系統會按照您列出的順序對郵件執行這些 SMT。

    以下是 YAML 轉換檔案範例:

    - javascriptUdf:
        code: >
            function redactSSN(message, metadata) {
              const data = JSON.parse(message.data);
              delete data['ssn'];
              message.data = JSON.stringify(data);
              return message;
            }
        functionName: redactSSN
    
  3. 選用。如要驗證 SMT,請執行 gcloud pubsub message-transforms validate 指令:

    gcloud pubsub message-transforms validate \
      --message-transform-file=TRANSFORM_FILE
    

    更改下列內容:

    • TRANSFORM_FILE:定義單一 SMT 的 YAML 或 JSON 檔案路徑。如要建立多個 SMT,必須逐一驗證。
  4. 選用。如要測試一或多個 SMT 是否能處理 Pub/Sub 訊息範例,請執行 gcloud pubsub message-transforms test 指令:

    gcloud pubsub message-transforms test \
      --message-transforms-file=TRANSFORMS_FILE \
      --message=MESSAGE \
      --attribute=ATTRIBUTES
    

    更改下列內容:

    • TRANSFORMS_FILE:定義一或多個 SMT 的 YAML 或 JSON 檔案路徑。
    • MESSAGE:範例訊息內文。
    • ATTRIBUTES:選用。以半形逗號分隔的訊息屬性清單。每個屬性都是鍵/值組合,格式為 KEY="VALUE"

    這項指令會依序執行 SMT,並將每個 SMT 的輸出內容做為下一個 SMT 的輸入內容。這個指令會輸出每個步驟的結果。

  5. 如要建立主題,請執行 gcloud pubsub topics create 指令:

    gcloud pubsub topics create TOPIC_ID \
      --message-transforms-file=TRANSFORMS_FILE
    

    更改下列內容:

    • TOPIC_ID:要建立的主題 ID 或名稱。如需主題命名規範,請參閱「資源名稱」。主題名稱無法變更。
    • TRANSFORMS_FILE:定義一或多個 SMT 的 YAML 或 JSON 檔案路徑。
  6. Java

    在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Java 設定操作說明進行操作。詳情請參閱 Pub/Sub Java API 參考說明文件

    
    import com.google.api.gax.rpc.AlreadyExistsException;
    import com.google.cloud.pubsub.v1.TopicAdminClient;
    import com.google.pubsub.v1.JavaScriptUDF;
    import com.google.pubsub.v1.MessageTransform;
    import com.google.pubsub.v1.Topic;
    import com.google.pubsub.v1.TopicName;
    import java.io.IOException;
    
    public class CreateTopicWithSmtExample {
    
      public static void main(String... args) throws Exception {
        // TODO(developer): Replace these variables before running the sample.
        String projectId = "your-project-id";
        String topicId = "your-topic-id";
    
        createTopicWithSmtExample(projectId, topicId);
      }
    
      public static void createTopicWithSmtExample(String projectId, String topicId)
          throws IOException {
        TopicName topicName = TopicName.of(projectId, topicId);
    
        // UDF that removes the 'ssn' field, if present
        String code =
            "function redactSSN(message, metadata) {"
                + "  const data = JSON.parse(message.data);"
                + "  delete data['ssn'];"
                + "  message.data = JSON.stringify(data);"
                + "  return message;"
                + "}";
        String functionName = "redactSSN";
    
        JavaScriptUDF udf =
            JavaScriptUDF.newBuilder().setCode(code).setFunctionName(functionName).build();
        MessageTransform transform = MessageTransform.newBuilder().setJavascriptUdf(udf).build();
        try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
    
          Topic topic =
              topicAdminClient.createTopic(
                  Topic.newBuilder()
                      .setName(topicName.toString())
                      // Add the UDF message transform
                      .addMessageTransforms(transform)
                      .build());
    
          System.out.println("Created topic with SMT: " + topic.getName());
        } catch (AlreadyExistsException e) {
          System.out.println(topicName + "already exists.");
        }
      }
    }

    Python

    在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Python 設定操作說明來進行。詳情請參閱 Pub/Sub Python API 參考說明文件

    from google.cloud import pubsub_v1
    from google.pubsub_v1.types import JavaScriptUDF, MessageTransform, Topic
    
    # TODO(developer)
    # project_id = "your-project-id"
    # topic_id = "your-topic-id"
    
    code = """function redactSSN(message, metadata) {
                const data = JSON.parse(message.data);
                delete data['ssn'];
                message.data = JSON.stringify(data);
                return message;
                }"""
    udf = JavaScriptUDF(code=code, function_name="redactSSN")
    transforms = [MessageTransform(javascript_udf=udf)]
    
    publisher = pubsub_v1.PublisherClient()
    topic_path = publisher.topic_path(project_id, topic_id)
    
    request = Topic(name=topic_path, message_transforms=transforms)
    
    topic = publisher.create_topic(request=request)
    
    print(f"Created topic: {topic.name} with SMT")

    Go

    下列範例使用 Go Pub/Sub 用戶端程式庫的主要版本 (v2)。如果您仍在使用第 1 版程式庫,請參閱第 2 版遷移指南。如要查看第 1 版程式碼範例清單,請參閱 已淘汰的程式碼範例

    在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的操作說明設定 Go 環境。詳情請參閱 Pub/Sub Go API 參考說明文件

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/pubsub/v2"
    	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
    )
    
    // createTopicWithSMT creates a topic with a single message transform function applied.
    func createTopicWithSMT(w io.Writer, projectID, topicID string) error {
    	// projectID := "my-project-id"
    	// topicID := "my-topic"
    	ctx := context.Background()
    	client, err := pubsub.NewClient(ctx, projectID)
    	if err != nil {
    		return fmt.Errorf("pubsub.NewClient: %w", err)
    	}
    	defer client.Close()
    
    	code := `function redactSSN(message, metadata) {
    			const data = JSON.parse(message.data);
    			delete data['ssn'];
    			message.data = JSON.stringify(data);
    			return message;
    		}`
    	transform := &pubsubpb.MessageTransform{
    		Transform: &pubsubpb.MessageTransform_JavascriptUdf{
    			JavascriptUdf: &pubsubpb.JavaScriptUDF{
    				FunctionName: "redactSSN",
    				Code:         code,
    			},
    		},
    	}
    
    	topic := &pubsubpb.Topic{
    		Name:              fmt.Sprintf("projects/%s/topics/%s", projectID, topicID),
    		MessageTransforms: []*pubsubpb.MessageTransform{transform},
    	}
    
    	topic, err = client.TopicAdminClient.CreateTopic(ctx, topic)
    	if err != nil {
    		return fmt.Errorf("CreateTopic: %w", err)
    	}
    
    	fmt.Fprintf(w, "Created topic with message transform: %v\n", topic)
    	return nil
    }
    

後續步驟