SMT を使用してトピックを作成する

このドキュメントでは、単一メッセージ変換(SMT)を使用して Pub/Sub トピックを作成する方法について説明します。

トピック SMT を使用すると、メッセージデータと属性に対する軽量な変更を Pub/Sub 内で直接行うことができます。この機能を使用すると、メッセージがトピックにパブリッシュされる前に、データのクリーンアップ、フィルタリング、形式変換を行うことができます。

SMT を使用してトピックを作成するには、 Google Cloud コンソール、Google Cloud CLI、 クライアント ライブラリ、または Pub/Sub API を使用します。

始める前に

必要なロールと権限

SMT を使用してトピックを作成するために必要な権限を取得するには、管理者にプロジェクトに対する Pub/Sub 編集者 roles/pubsub.editor)IAM ロールを付与するよう依頼してください。ロールの付与については、プロジェクト、フォルダ、組織に対するアクセス権の管理をご覧ください。

この事前定義ロールには SMT を使用してトピックを作成するために必要な権限が含まれています。必要とされる正確な権限については、「必要な権限」セクションを開いてご確認ください。

必要な権限

SMT を使用してトピックを作成するには、次の権限が必要です。

  • プロジェクトに対するトピックの作成権限を付与します。 pubsub.topics.create

カスタムロールや他の事前定義ロールを使用して、これらの権限を取得することもできます。

アクセス制御は、プロジェクト レベルと個々のリソースレベルで構成できます。

SMT を使用してトピックを作成する

SMT を使用してトピックを作成する前に、 トピックのプロパティのドキュメントを確認してください。

1 つ以上の SMT を使用して Pub/Sub を作成する手順は次のとおりです。トピックごとに最大 5 つの SMT を有効にできます。

コンソール

  1. コンソールで、Pub/Sub の [Topics] ページに移動します。 Google Cloud

    [トピック] に移動

  2. [トピックを作成] をクリックします。

  3. [トピック ID] フィールドに、トピックの ID を入力します。トピックの命名の詳細については、 命名ガイドラインをご覧ください。

  4. [変換] で、[変換を追加] をクリックします。

  5. [変換タイプ] を選択します。サポートされている SMT タイプの詳細については、SMT のタイプをご覧ください。

  6. SMT の構成プロパティを設定します。プロパティのセットは、SMT のタイプによって異なります。詳細については、その SMT タイプのドキュメントをご覧ください。

  7. 省略可。SMT を検証するには、[検証] をクリックします。SMT が有効な場合は、 メッセージ "Validation passed" が表示されます。それ以外の場合は、エラー メッセージが表示されます。

  8. 別の変換を追加するには、[変換を追加] をクリックして前の手順を繰り返します。

    SMT を特定の順序で配置するには、 [上に移動] または [下に移動] をクリックします。SMT を削除するには、[削除] をクリックします。

  9. 省略可。サンプル メッセージで SMT をテストする手順は次のとおりです。

    1. [変換をテスト] をクリックします。

    2. [変換をテスト] ウィンドウで、テストする関数を選択します。

    3. [入力メッセージ] ウィンドウにサンプル メッセージを入力します。

    4. メッセージに属性を追加するには、[属性を追加] をクリックして、属性のキーと値を入力します。複数の属性を追加できます。

    5. [テスト] をクリックします。メッセージに SMT を適用した結果が [出力メッセージ] に表示されます。

    6. [変換をテスト] ウィンドウを閉じるには、 close [閉じる] をクリックします。

    複数の SMT を作成する場合は、次のように変換のシーケンス全体をテストできます。

    1. 前の手順で説明したように、シーケンスの最初の SMT をテストします。
    2. 次の SMT を選択します。入力メッセージには、前のテストの出力メッセージが事前入力されます。
    3. シーケンス全体が想定どおりに動作することを確認するため、SMT のテストを順番に行います。
  10. トピックを作成するには、[作成] をクリックします。

gcloud

  1. コンソールで Cloud Shell をアクティブにします。 Google Cloud

    Cloud Shell をアクティブにする

    コンソールの下部にある Google Cloud Cloud Shell セッションが開始し、コマンドライン プロンプトが表示されます。Cloud Shell はシェル環境です 。Google Cloud CLI がすでにインストールされており、現在のプロジェクトの値もすでに設定されています 。セッションが初期化されるまで数秒かかることがあります。

  2. 1 つ以上の SMT を定義する YAML ファイルまたは JSON ファイルを作成します。YAML または JSON の定義は、SMT のタイプによって異なります。詳細については、 SMT のタイプをご覧ください。

    ファイルに複数の SMT が含まれている場合、Pub/Sub はリストに記載されている順に実行します。

  3. 省略可。SMT を検証するには、 gcloud pubsub message-transforms validate コマンドを実行します。

    gcloud pubsub message-transforms validate \
      --message-transform-file=TRANSFORM_FILE
    

    次のように置き換えます。

    • TRANSFORM_FILE:単一の SMT を定義する YAML ファイルまたは JSON ファイルのパス 。複数の SMT を作成する場合は、個別に検証する必要があります。
  4. 省略可。サンプル Pub/Sub メッセージで 1 つ以上の SMT をテストするには、 gcloud pubsub message-transforms test コマンドを実行します。

    gcloud pubsub message-transforms test \
      --message-transforms-file=TRANSFORMS_FILE \
      --message=MESSAGE \
      --attribute=ATTRIBUTES
    

    次のように置き換えます。

    • TRANSFORMS_FILE: 1 つ以上の SMT を定義する YAML ファイルまたは JSON ファイルのパス 。
    • MESSAGE: サンプル メッセージの本文。
    • ATTRIBUTES: 省略可。メッセージ属性のカンマ区切りのリスト。各属性は、 KEY="VALUE" の形式の Key-Value ペアです。

    このコマンドは、各 SMT の出力を次の SMT の入力として使用して、SMT を順番に実行します。このコマンドは、各ステップの結果を出力します。

  5. トピックを作成するには、 gcloud pubsub topics create コマンドを実行します。

    gcloud pubsub topics create TOPIC_ID \
      --message-transforms-file=TRANSFORMS_FILE
    

    次のように置き換えます。

    • TOPIC_ID: 作成するトピックの ID または名前。トピックの命名方法のガイドラインについては、 リソース名をご覧ください。トピックの名前は変更できません。
    • TRANSFORMS_FILE: 1 つ以上の SMT を定義する YAML ファイルまたは JSON ファイルのパス 。

Java

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Java の設定手順を実施してください。詳細については、Pub/Sub Java API のリファレンス ドキュメントをご覧ください。


import com.google.api.gax.rpc.AlreadyExistsException;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.pubsub.v1.JavaScriptUDF;
import com.google.pubsub.v1.MessageTransform;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;

public class CreateTopicWithSmtExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";

    createTopicWithSmtExample(projectId, topicId);
  }

  public static void createTopicWithSmtExample(String projectId, String topicId)
      throws IOException {
    TopicName topicName = TopicName.of(projectId, topicId);

    // UDF that removes the 'ssn' field, if present
    String code =
        "function redactSSN(message, metadata) {"
            + "  const data = JSON.parse(message.data);"
            + "  delete data['ssn'];"
            + "  message.data = JSON.stringify(data);"
            + "  return message;"
            + "}";
    String functionName = "redactSSN";

    JavaScriptUDF udf =
        JavaScriptUDF.newBuilder().setCode(code).setFunctionName(functionName).build();
    MessageTransform transform = MessageTransform.newBuilder().setJavascriptUdf(udf).build();
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {

      Topic topic =
          topicAdminClient.createTopic(
              Topic.newBuilder()
                  .setName(topicName.toString())
                  // Add the UDF message transform
                  .addMessageTransforms(transform)
                  .build());

      System.out.println("Created topic with SMT: " + topic.getName());
    } catch (AlreadyExistsException e) {
      System.out.println(topicName + "already exists.");
    }
  }
}

Python

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Python の設定手順を実施してください。詳細については、Pub/Sub Python API のリファレンス ドキュメントをご覧ください。

from google.cloud import pubsub_v1
from google.pubsub_v1.types import JavaScriptUDF, MessageTransform, Topic

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"

code = """function redactSSN(message, metadata) {
            const data = JSON.parse(message.data);
            delete data['ssn'];
            message.data = JSON.stringify(data);
            return message;
            }"""
udf = JavaScriptUDF(code=code, function_name="redactSSN")
transforms = [MessageTransform(javascript_udf=udf)]

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

request = Topic(name=topic_path, message_transforms=transforms)

topic = publisher.create_topic(request=request)

print(f"Created topic: {topic.name} with SMT")

Go

次のサンプルでは、Go Pub/Sub クライアント ライブラリのメジャー バージョン(v2)を使用しています。v1 ライブラリをまだ使用している場合は、 v2 への移行ガイドをご覧ください。 v1 コードサンプルのリストについては、 非推奨のコードサンプルをご覧ください。

このサンプルを試す前に、 クイックスタート: クライアント ライブラリの使用の Go の設定手順を実施してください。 詳細については、Pub/Sub Go API のリファレンス ドキュメントをご覧ください。

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub/v2"
	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
)

// createTopicWithSMT creates a topic with a single message transform function applied.
func createTopicWithSMT(w io.Writer, projectID, topicID string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	code := `function redactSSN(message, metadata) {
			const data = JSON.parse(message.data);
			delete data['ssn'];
			message.data = JSON.stringify(data);
			return message;
		}`
	transform := &pubsubpb.MessageTransform{
		Transform: &pubsubpb.MessageTransform_JavascriptUdf{
			JavascriptUdf: &pubsubpb.JavaScriptUDF{
				FunctionName: "redactSSN",
				Code:         code,
			},
		},
	}

	topic := &pubsubpb.Topic{
		Name:              fmt.Sprintf("projects/%s/topics/%s", projectID, topicID),
		MessageTransforms: []*pubsubpb.MessageTransform{transform},
	}

	topic, err = client.TopicAdminClient.CreateTopic(ctx, topic)
	if err != nil {
		return fmt.Errorf("CreateTopic: %w", err)
	}

	fmt.Fprintf(w, "Created topic with message transform: %v\n", topic)
	return nil
}

次のステップ