SMT を使用してサブスクリプションを作成する

このドキュメントでは、単一メッセージ変換(SMT)を使用して Pub/Sub サブスクリプションを作成する方法について説明します。

サブスクリプション SMT を使用すると、メッセージデータと属性に対する軽量な変更を Pub/Sub 内で直接行うことができます。この機能を使用すると、メッセージがサブスクライバー クライアントに配信される前に、データのクリーンアップ、フィルタリング、形式変換を行うことができます。

SMT を使用してサブスクリプションを作成するには、 Google Cloud コンソール、Google Cloud CLI、クライアント ライブラリ、または Pub/Sub API を使用します。

始める前に

必要なロールと権限

SMT を使用してサブスクリプションを作成するために必要な権限を取得するには、プロジェクトに対する Pub/Sub 編集者 roles/pubsub.editor)IAM ロールの付与を管理者に依頼してください。ロールの付与については、プロジェクト、フォルダ、組織に対するアクセス権の管理をご覧ください。

この事前定義ロールには、SMT を使用してサブスクリプションを作成するために必要な権限が含まれています。必要とされる正確な権限については、「必要な権限」セクションを開いてご確認ください。

必要な権限

SMT を使用してサブスクリプションを作成するには、次の権限が必要です。

  • プロジェクトに対するサブスクリプション作成権限を付与します。 pubsub.subscriptions.create

カスタムロールや他の事前定義ロールを使用して、これらの権限を取得することもできます。

サブスクリプションの種類によっては、追加の権限が必要になる場合があります。権限の正確なリストについては、特定の定期購入の作成について説明しているドキュメントをご覧ください。たとえば、SMT を使用して BigQuery サブスクリプションを作成する場合は、BigQuery サブスクリプションを作成するをご覧ください。

トピックとは異なるプロジェクトにサブスクリプションを作成する場合は、トピックを含むプロジェクトで、サブスクリプションを含むプロジェクトのプリンシパルに roles/pubsub.subscriber ロールを付与する必要があります。

アクセス制御は、プロジェクト レベルと個々のリソースレベルで構成できます。

SMT を使用してサブスクリプションを作成する

SMT を使用してサブスクリプションを作成する前に、サブスクリプションのプロパティのドキュメントを確認してください。

1 つ以上の SMT を使用して Pub/Sub サブスクリプションを作成する手順は次のとおりです。

コンソール

  1. Google Cloud コンソールで、Pub/Sub の [サブスクリプション] ページに移動します。

    [サブスクリプション] に移動

  2. [サブスクリプションを作成] をクリックします。

    [サブスクリプションの作成] ページが開きます。

  3. [サブスクリプション ID] フィールドに、サブスクリプションの ID を入力します。サブスクリプションの命名の詳細については、命名ガイドラインをご覧ください。

  4. [変換] で、[変換を追加] をクリックします。

  5. 関数名を入力します。例: redactSSN

  6. SMT をすぐに有効にしない場合は、[変換を無効にする] を選択します。このオプションを選択すると、SMT はサブスクリプションで作成されますが、受信メッセージに対して実行されません。サブスクリプションを作成したら、サブスクリプションを編集して SMT を有効にできます。

  7. テキスト エリアに SMT のコードを入力します。次に例を示します。

    function redactSSN(message, metadata) {
      const data = JSON.parse(message.data);
      delete data['ssn'];
      message.data = JSON.stringify(data);
      return message;
    }
    
  8. 省略可。SMT を検証するには、[Validate] をクリックします。SMT が有効な場合は、メッセージ "Validation passed" が表示されます。それ以外の場合は、エラー メッセージが表示されます。

  9. 別の変換を追加するには、[変換を追加] をクリックして、前の手順を繰り返します。

    SMT を特定の順序で並べ替えるには、上に移動または 下に移動をクリックします。SMT を削除するには、 [削除] をクリックします。

  10. 省略可。サンプル メッセージで SMT をテストする手順は次のとおりです。

    1. [変換をテスト] をクリックします。

    2. [テスト変換] ウィンドウで、テストする関数を選択します。

    3. [入力メッセージ] ウィンドウにサンプル メッセージを入力します。

    4. メッセージに属性を追加するには、[属性を追加する] をクリックし、属性のキーと値を入力します。複数の属性を追加できます。

    5. [Test] をクリックします。メッセージに SMT を適用した結果は、[出力メッセージ] に表示されます。

    6. [変換をテスト] ウィンドウを閉じるには、閉じる)をクリックします。

    複数の SMT を作成した場合は、次のように変換のシーケンス全体をテストできます。

    1. 前の手順で説明したように、シーケンスの最初の SMT をテストします。
    2. 次の SMT を選択します。入力メッセージには、前のテストの出力メッセージが事前入力されています。
    3. SMT を順番にテストして、シーケンス全体が想定どおりに動作することを確認します。
  11. [作成] をクリックしてサブスクリプションを作成します。

gcloud

  1. Google Cloud コンソールで Cloud Shell をアクティブにします。

    Cloud Shell をアクティブにする

    Google Cloud コンソールの下部にある Cloud Shell セッションが開始し、コマンドライン プロンプトが表示されます。Cloud Shell はシェル環境です。Google Cloud CLI がすでにインストールされており、現在のプロジェクトの値もすでに設定されています。セッションが初期化されるまで数秒かかることがあります。

  2. 1 つ以上の SMT を定義する YAML ファイルまたは JSON ファイルを作成します。複数の SMT がある場合、それらはリストに記載されている順にメッセージに対して実行されます。

    YAML 変換ファイルの例を次に示します。

    - javascriptUdf:
        code: >
            function redactSSN(message, metadata) {
              const data = JSON.parse(message.data);
              delete data['ssn'];
              message.data = JSON.stringify(data);
              return message;
            }
        functionName: redactSSN
    
  3. 省略可。SMT を検証するには、gcloud pubsub message-transforms validate コマンドを実行します。

    gcloud pubsub message-transforms validate \
      --message-transform-file=TRANSFORM_FILE
    

    次のように置き換えます。

    • TRANSFORM_FILE: 単一の SMT を定義する YAML ファイルまたは JSON ファイルへのパス。複数の SMT を作成する場合は、個別に検証する必要があります。
  4. 省略可。サンプル Pub/Sub メッセージで 1 つ以上の SMT をテストするには、gcloud pubsub message-transforms test コマンドを実行します。

    gcloud pubsub message-transforms test \
      --message-transforms-file=TRANSFORMS_FILE \
      --message=MESSAGE \
      --attribute=ATTRIBUTES
    

    次のように置き換えます。

    • TRANSFORMS_FILE: 1 つ以上の SMT を定義する YAML ファイルまたは JSON ファイルへのパス。
    • MESSAGE: サンプル メッセージの本文。
    • ATTRIBUTES: 省略可。メッセージ属性のカンマ区切りのリスト。各属性は KEY="VALUE" 形式の Key-Value ペアです。

    このコマンドは、各 SMT の出力を次の SMT の入力として使用して、SMT を順番に実行します。このコマンドは、各ステップの結果を出力します。

  5. サブスクリプションを作成するには、gcloud pubsub subscriptions create コマンドを実行します。

    gcloud pubsub subscriptions create SUBSCRIPTION_ID \
        --topic=projects/PROJECT_ID/topics/TOPIC_ID \
        --message-transforms-file=TRANSFORMS_FILE
    

    次のように置き換えます。

    • SUBSCRIPTION_ID: 作成するサブスクリプションの ID または名前。サブスクリプションの命名方法のガイドラインについては、リソース名をご覧ください。サブスクリプションの名前は変更できません。

    • PROJECT_ID: トピックを含むプロジェクトの ID。

    • TOPIC_ID: 登録するトピックの ID。

    • TRANSFORMS_FILE: 1 つ以上の SMT を定義する YAML ファイルまたは JSON ファイルのパス。

Java

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Java の設定手順を実施してください。詳細については、Pub/Sub Java API のリファレンス ドキュメントをご覧ください。

import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.pubsub.v1.JavaScriptUDF;
import com.google.pubsub.v1.MessageTransform;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.Subscription;
import java.io.IOException;

public class CreateSubscriptionWithSmtExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    String subscriptionId = "your-subscription-id";

    createSubscriptionWithSmtExample(projectId, topicId, subscriptionId);
  }

  public static void createSubscriptionWithSmtExample(
      String projectId, String topicId, String subscriptionId) throws IOException {

    // UDF that removes the 'ssn' field, if present
    String code =
        "function redactSSN(message, metadata) {"
            + "  const data = JSON.parse(message.data);"
            + "  delete data['ssn'];"
            + "  message.data = JSON.stringify(data);"
            + "  return message;"
            + "}";
    String functionName = "redactSSN";

    JavaScriptUDF udf =
        JavaScriptUDF.newBuilder().setCode(code).setFunctionName(functionName).build();
    MessageTransform transform = MessageTransform.newBuilder().setJavascriptUdf(udf).build();

    try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {

      ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
      ProjectSubscriptionName subscriptionName =
          ProjectSubscriptionName.of(projectId, subscriptionId);

      Subscription subscription =
          subscriptionAdminClient.createSubscription(
              Subscription.newBuilder()
                  .setName(subscriptionName.toString())
                  .setTopic(topicName.toString())
                  // Add the UDF message transform
                  .addMessageTransforms(transform)
                  .build());

      System.out.println("Created subscription with SMT: " + subscription.getAllFields());
    }
  }
}

Python

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Python の設定手順を実施してください。詳細については、Pub/Sub Python API のリファレンス ドキュメントをご覧ください。

from google.cloud import pubsub_v1
from google.pubsub_v1.types import JavaScriptUDF, MessageTransform

# TODO(developer): Choose an existing topic.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# subscription_id = "your-subscription-id"

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)

code = """function redactSSN(message, metadata) {
            const data = JSON.parse(message.data);
            delete data['ssn'];
            message.data = JSON.stringify(data);
            return message;
            }"""
udf = JavaScriptUDF(code=code, function_name="redactSSN")
transforms = [MessageTransform(javascript_udf=udf)]

with subscriber:
    subscription = subscriber.create_subscription(
        request={
            "name": subscription_path,
            "topic": topic_path,
            "message_transforms": transforms,
        }
    )
    print(f"Created subscription with SMT: {subscription}")

Go

次のサンプルでは、Go Pub/Sub クライアント ライブラリのメジャー バージョン(v2)を使用しています。引き続き v1 ライブラリを使用している場合は、v2 への移行ガイドをご覧ください。v1 コードサンプルの一覧については、 非推奨のコードサンプルをご覧ください。

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Go の設定手順を実施してください。詳細については、Pub/Sub Go API のリファレンス ドキュメントをご覧ください。

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub/v2"
	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
)

// createSubscriptionWithSMT creates a subscription with a single message transform function applied.
func createSubscriptionWithSMT(w io.Writer, projectID, topicID, subID string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	// subID := "my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	code := `function redactSSN(message, metadata) {
			const data = JSON.parse(message.data);
			delete data['ssn'];
			message.data = JSON.stringify(data);
			return message;
		}`

	transform := &pubsubpb.MessageTransform{
		Transform: &pubsubpb.MessageTransform_JavascriptUdf{
			JavascriptUdf: &pubsubpb.JavaScriptUDF{
				FunctionName: "redactSSN",
				Code:         code,
			},
		},
	}

	sub := &pubsubpb.Subscription{
		Name:              fmt.Sprintf("projects/%s/subscriptions/%s", projectID, subID),
		Topic:             fmt.Sprintf("projects/%s/topics/%s", projectID, topicID),
		MessageTransforms: []*pubsubpb.MessageTransform{transform},
	}
	sub, err = client.SubscriptionAdminClient.CreateSubscription(ctx, sub)
	if err != nil {
		return fmt.Errorf("CreateSubscription: %w", err)
	}
	fmt.Fprintf(w, "Created subscription with message transform: %v\n", sub)
	return nil
}

SMT と他のサブスクリプション機能の連携

サブスクリプションで SMT と Pub/Sub の組み込みフィルタの両方を使用している場合、フィルタは SMT の前に適用されます。この動作には次の影響があります。

  • SMT がメッセージ属性を変更した場合、Pub/Sub フィルタは新しい属性セットに適用されません。
  • SMT は、Pub/Sub フィルタで除外されたメッセージには適用されません。

SMT でメッセージがフィルタされる場合は、サブスクリプション バックログのモニタリングへの影響に注意してください。サブスクリプションを Dataflow パイプラインにフィードする場合は、SMT を使用してメッセージをフィルタしないでください。Dataflow の自動スケーリングが中断されます。

次のステップ