SMT を使用してトピックを作成する

このドキュメントでは、単一メッセージ変換(SMT)を使用して Pub/Sub トピックを作成する方法について説明します。

トピック SMT を使用すると、メッセージデータと属性に対する軽量な変更を Pub/Sub 内で直接行うことができます。この機能を使用すると、メッセージがトピックにパブリッシュされる前に、データのクリーンアップ、フィルタリング、形式変換を行うことができます。

SMT を使用してトピックを作成するには、 Google Cloud コンソール、Google Cloud CLI、クライアント ライブラリ、または Pub/Sub API を使用します。

始める前に

必要なロールと権限

SMT を使用してトピックを作成するために必要な権限を取得するには、プロジェクトに対する Pub/Sub 編集者 roles/pubsub.editor)IAM ロールの付与を管理者に依頼してください。ロールの付与については、プロジェクト、フォルダ、組織に対するアクセス権の管理をご覧ください。

この事前定義ロールには、SMT を使用してトピックを作成するために必要な権限が含まれています。必要とされる正確な権限については、「必要な権限」セクションを開いてご確認ください。

必要な権限

SMT を使用してトピックを作成するには、次の権限が必要です。

  • プロジェクトに対するトピック作成権限を付与します。 pubsub.topics.create

カスタムロールや他の事前定義ロールを使用して、これらの権限を取得することもできます。

アクセス制御は、プロジェクト レベルと個々のリソースレベルで構成できます。

SMT を使用してトピックを作成する

SMT を使用してトピックを作成する前に、トピックのプロパティのドキュメントを確認してください。

1 つ以上の SMT を使用して Pub/Sub を作成する手順は次のとおりです。

コンソール

  1. Google Cloud コンソールで、Pub/Sub の [トピック] ページに移動します。

    [トピック] に移動

  2. [トピックを作成] をクリックします。

  3. [トピック ID] フィールドに、トピックの ID を入力します。トピックの命名の詳細については、命名ガイドラインをご覧ください。

  4. [変換] で、[変換を追加] をクリックします。

  5. 関数名を入力します。例: redactSSN

  6. SMT をすぐに有効にしない場合は、[変換を無効にする] を選択します。このオプションを選択すると、SMT はトピックとともに作成されますが、受信メッセージに対して実行されません。トピックを作成したら、トピックを編集して SMT を有効にできます。

  7. テキスト エリアに SMT のコードを入力します。次に例を示します。

    function redactSSN(message, metadata) {
      const data = JSON.parse(message.data);
      delete data['ssn'];
      message.data = JSON.stringify(data);
      return message;
    }
    
  8. 省略可。SMT を検証するには、[Validate] をクリックします。SMT が有効な場合は、メッセージ "Validation passed" が表示されます。それ以外の場合は、エラー メッセージが表示されます。

  9. 別の変換を追加するには、[変換を追加] をクリックして、前の手順を繰り返します。

    SMT を特定の順序で並べ替えるには、上に移動または 下に移動をクリックします。SMT を削除するには、 [削除] をクリックします。

  10. 省略可。サンプル メッセージで SMT をテストする手順は次のとおりです。

    1. [変換をテスト] をクリックします。

    2. [テスト変換] ウィンドウで、テストする関数を選択します。

    3. [入力メッセージ] ウィンドウにサンプル メッセージを入力します。

    4. メッセージに属性を追加するには、[属性を追加] をクリックし、属性のキーと値を入力します。複数の属性を追加できます。

    5. [Test] をクリックします。メッセージに SMT を適用した結果が [出力メッセージ] に表示されます。

    6. [変換をテスト] ウィンドウを閉じるには、([閉じる])をクリックします。

    複数の SMT を作成した場合は、次のように変換のシーケンス全体をテストできます。

    1. 前の手順で説明したように、シーケンスの最初の SMT をテストします。
    2. 次の SMT を選択します。入力メッセージには、前のテストの出力メッセージが事前入力されています。
    3. SMT のテストを順番に続け、シーケンス全体が想定どおりに動作することを確認します。
  11. トピックを作成するには、[作成] をクリックします。

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 1 つ以上の SMT を定義する YAML ファイルまたは JSON ファイルを作成します。複数の SMT がある場合、それらはリストに記載されている順にメッセージに対して実行されます。

    YAML 変換ファイルの例を次に示します。

    - javascriptUdf:
        code: >
            function redactSSN(message, metadata) {
              const data = JSON.parse(message.data);
              delete data['ssn'];
              message.data = JSON.stringify(data);
              return message;
            }
        functionName: redactSSN
    
  3. 省略可。SMT を検証するには、gcloud pubsub message-transforms validate コマンドを実行します。

    gcloud pubsub message-transforms validate \
      --message-transform-file=TRANSFORM_FILE
    

    次のように置き換えます。

    • TRANSFORM_FILE: 単一の SMT を定義する YAML ファイルまたは JSON ファイルへのパス。複数の SMT を作成する場合は、個別に検証する必要があります。
  4. 省略可。サンプル Pub/Sub メッセージで 1 つ以上の SMT をテストするには、gcloud pubsub message-transforms test コマンドを実行します。

    gcloud pubsub message-transforms test \
      --message-transforms-file=TRANSFORMS_FILE \
      --message=MESSAGE \
      --attribute=ATTRIBUTES
    

    次のように置き換えます。

    • TRANSFORMS_FILE: 1 つ以上の SMT を定義する YAML ファイルまたは JSON ファイルへのパス。
    • MESSAGE: サンプル メッセージの本文。
    • ATTRIBUTES: 省略可。メッセージ属性のカンマ区切りのリスト。各属性は KEY="VALUE" 形式の Key-Value ペアです。

    このコマンドは、各 SMT の出力を次の SMT の入力として使用して、SMT を順番に実行します。このコマンドは、各ステップの結果を出力します。

  5. トピックを作成するには、gcloud pubsub topics create コマンドを実行します。

    gcloud pubsub topics create TOPIC_ID \
      --message-transforms-file=TRANSFORMS_FILE
    

    次のように置き換えます。

    • TOPIC_ID: 作成するトピックの ID または名前。トピックの命名方法のガイドラインについては、リソース名をご覧ください。トピックの名前は変更できません。
    • TRANSFORMS_FILE: 1 つ以上の SMT を定義する YAML ファイルまたは JSON ファイルへのパス。
  6. Java

    このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Java の設定手順を実施してください。詳細については、Pub/Sub Java API のリファレンス ドキュメントをご覧ください。

    
    import com.google.api.gax.rpc.AlreadyExistsException;
    import com.google.cloud.pubsub.v1.TopicAdminClient;
    import com.google.pubsub.v1.JavaScriptUDF;
    import com.google.pubsub.v1.MessageTransform;
    import com.google.pubsub.v1.Topic;
    import com.google.pubsub.v1.TopicName;
    import java.io.IOException;
    
    public class CreateTopicWithSmtExample {
    
      public static void main(String... args) throws Exception {
        // TODO(developer): Replace these variables before running the sample.
        String projectId = "your-project-id";
        String topicId = "your-topic-id";
    
        createTopicWithSmtExample(projectId, topicId);
      }
    
      public static void createTopicWithSmtExample(String projectId, String topicId)
          throws IOException {
        TopicName topicName = TopicName.of(projectId, topicId);
    
        // UDF that removes the 'ssn' field, if present
        String code =
            "function redactSSN(message, metadata) {"
                + "  const data = JSON.parse(message.data);"
                + "  delete data['ssn'];"
                + "  message.data = JSON.stringify(data);"
                + "  return message;"
                + "}";
        String functionName = "redactSSN";
    
        JavaScriptUDF udf =
            JavaScriptUDF.newBuilder().setCode(code).setFunctionName(functionName).build();
        MessageTransform transform = MessageTransform.newBuilder().setJavascriptUdf(udf).build();
        try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
    
          Topic topic =
              topicAdminClient.createTopic(
                  Topic.newBuilder()
                      .setName(topicName.toString())
                      // Add the UDF message transform
                      .addMessageTransforms(transform)
                      .build());
    
          System.out.println("Created topic with SMT: " + topic.getName());
        } catch (AlreadyExistsException e) {
          System.out.println(topicName + "already exists.");
        }
      }
    }

    Python

    このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Python の設定手順を実施してください。詳細については、Pub/Sub Python API のリファレンス ドキュメントをご覧ください。

    from google.cloud import pubsub_v1
    from google.pubsub_v1.types import JavaScriptUDF, MessageTransform, Topic
    
    # TODO(developer)
    # project_id = "your-project-id"
    # topic_id = "your-topic-id"
    
    code = """function redactSSN(message, metadata) {
                const data = JSON.parse(message.data);
                delete data['ssn'];
                message.data = JSON.stringify(data);
                return message;
                }"""
    udf = JavaScriptUDF(code=code, function_name="redactSSN")
    transforms = [MessageTransform(javascript_udf=udf)]
    
    publisher = pubsub_v1.PublisherClient()
    topic_path = publisher.topic_path(project_id, topic_id)
    
    request = Topic(name=topic_path, message_transforms=transforms)
    
    topic = publisher.create_topic(request=request)
    
    print(f"Created topic: {topic.name} with SMT")

    Go

    次のサンプルでは、Go Pub/Sub クライアント ライブラリのメジャー バージョン(v2)を使用しています。引き続き v1 ライブラリを使用している場合は、v2 への移行ガイドをご覧ください。v1 のコードサンプルの一覧については、 非推奨のコードサンプルをご覧ください。

    このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Go の設定手順を実施してください。詳細については、Pub/Sub Go API のリファレンス ドキュメントをご覧ください。

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/pubsub/v2"
    	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
    )
    
    // createTopicWithSMT creates a topic with a single message transform function applied.
    func createTopicWithSMT(w io.Writer, projectID, topicID string) error {
    	// projectID := "my-project-id"
    	// topicID := "my-topic"
    	ctx := context.Background()
    	client, err := pubsub.NewClient(ctx, projectID)
    	if err != nil {
    		return fmt.Errorf("pubsub.NewClient: %w", err)
    	}
    	defer client.Close()
    
    	code := `function redactSSN(message, metadata) {
    			const data = JSON.parse(message.data);
    			delete data['ssn'];
    			message.data = JSON.stringify(data);
    			return message;
    		}`
    	transform := &pubsubpb.MessageTransform{
    		Transform: &pubsubpb.MessageTransform_JavascriptUdf{
    			JavascriptUdf: &pubsubpb.JavaScriptUDF{
    				FunctionName: "redactSSN",
    				Code:         code,
    			},
    		},
    	}
    
    	topic := &pubsubpb.Topic{
    		Name:              fmt.Sprintf("projects/%s/topics/%s", projectID, topicID),
    		MessageTransforms: []*pubsubpb.MessageTransform{transform},
    	}
    
    	topic, err = client.TopicAdminClient.CreateTopic(ctx, topic)
    	if err != nil {
    		return fmt.Errorf("CreateTopic: %w", err)
    	}
    
    	fmt.Fprintf(w, "Created topic with message transform: %v\n", topic)
    	return nil
    }
    

次のステップ