Crea una suscripción con SMT

En este documento, se explica cómo crear una suscripción a Pub/Sub con transformaciones de un solo mensaje (SMT).

Las SMT de suscripción permiten realizar modificaciones ligeras en los datos y atributos de los mensajes directamente en Pub/Sub. Esta función permite limpiar, filtrar o convertir el formato de los datos antes de que los mensajes se entreguen a un cliente suscriptor.

Para crear una suscripción con SMT, puedes usar la consola de Google Cloud , Google Cloud CLI, la biblioteca cliente o la API de Pub/Sub.

Antes de comenzar

Roles y permisos requeridos

Para obtener los permisos que necesitas para crear una suscripción con SMT, pídele a tu administrador que te otorgue el rol de IAM Editor de Pub/Sub (roles/pubsub.editor) en tu proyecto. Para obtener más información sobre cómo otorgar roles, consulta Administra el acceso a proyectos, carpetas y organizaciones.

Este rol predefinido contiene los permisos necesarios para crear una suscripción con SMT. Para ver los permisos exactos que son necesarios, expande la sección Permisos requeridos:

Permisos necesarios

Se requieren los siguientes permisos para crear una suscripción con SMT:

  • Otorga el permiso para crear una suscripción en el proyecto: pubsub.subscriptions.create

También puedes obtener estos permisos con roles personalizados o con otros roles predefinidos.

Según el tipo de suscripción, es posible que necesites permisos adicionales. Para conocer la lista exacta de permisos, consulta el documento en el que se explica cómo crear la suscripción específica. Por ejemplo, si creas una suscripción a BigQuery con SMT, consulta Crea suscripciones a BigQuery.

Si creas una suscripción en un proyecto diferente al del tema, debes otorgar el rol roles/pubsub.subscriber al principal del proyecto que contiene la suscripción en el proyecto que contiene el tema.

Puedes configurar el control de acceso a nivel de proyecto y a nivel de los recursos individuales.

Crea una suscripción con SMT

Antes de crear una suscripción con SMT, revisa la documentación sobre las propiedades de una suscripción.

Para crear una suscripción a Pub/Sub con uno o más SMT, sigue estos pasos. Puedes habilitar hasta 5 SMT por suscripción.

Console

  1. En la consola de Google Cloud , ve a la página Suscripciones de Pub/Sub.

    Ir a las suscripciones

  2. Haz clic en Crear suscripción.

  3. En el campo ID de suscripción, ingresa un ID para tu suscripción. Para obtener más información sobre cómo asignar nombres a las suscripciones, consulta los lineamientos para asignar nombres.

  4. En Transformaciones, haz clic en Agregar una transformación.

  5. Selecciona el Tipo de transformación. Para obtener más información sobre los tipos de SMT admitidos, consulta Tipos de SMT.

  6. Establece las propiedades de configuración del SMT. El conjunto de propiedades depende del tipo de SMT. Para obtener más información, consulta la documentación de ese tipo de SMT.

  7. Es opcional. Para validar el SMT, haz clic en Validate. Si el SMT es válido, se muestra el mensaje "Validation passed". De lo contrario, se mostrará un mensaje de error.

  8. Para agregar otra transformación, haz clic en Agregar una transformación y repite los pasos anteriores.

    Para ordenar los SMT en un orden específico, haz clic en Mover hacia arriba o Mover hacia abajo. Para quitar un SMT, haz clic en Borrar.

  9. Es opcional. Para probar un SMT en un mensaje de muestra, sigue estos pasos:

    1. Haz clic en Probar transformaciones.

    2. En la ventana Test transform, selecciona la función que deseas probar.

    3. En la ventana Input message, ingresa un mensaje de ejemplo.

    4. Para agregar un atributo al mensaje, haz clic en Agregar un atributo y, luego, ingresa la clave y el valor del atributo. Puedes agregar varios atributos.

    5. Haz clic en Probar. El resultado de aplicar el SMT al mensaje se muestra en Mensaje de salida.

    6. Para cerrar la ventana Test transforms, haz clic en Cerrar.

    Si creas más de un SMT, puedes probar toda la secuencia de transformaciones de la siguiente manera:

    1. Prueba el primer SMT de la secuencia, como se describe en los pasos anteriores.
    2. Selecciona el siguiente SMT. El mensaje de entrada se completa previamente con el mensaje de salida de la prueba anterior.
    3. Sigue probando los SMT en orden para asegurarte de que toda la secuencia funcione según lo esperado.
  10. Haz clic en Crear para crear la suscripción.

gcloud

  1. En la consola de Google Cloud , activa Cloud Shell.

    Activa Cloud Shell

    En la parte inferior de la consola de Google Cloud , se inicia una sesión de Cloud Shell que muestra una ventana emergente con una línea de comandos. Cloud Shell es un entorno de shell con Google Cloud CLI ya instalada y con valores ya establecidos para el proyecto actual. La sesión puede tardar unos segundos en inicializarse.

  2. Crea un archivo YAML o JSON que defina uno o más SMT. La definición en YAML o JSON depende del tipo de SMT. Para obtener más información, consulta Tipos de SMT.

    Si el archivo incluye más de un SMT, Pub/Sub los ejecuta en el orden en que aparecen en la lista.

  3. Es opcional. Para validar un SMT, ejecuta el comando gcloud pubsub message-transforms validate:

    gcloud pubsub message-transforms validate \
      --message-transform-file=TRANSFORM_FILE
    

    Reemplaza lo siguiente:

    • TRANSFORM_FILE: Es la ruta de acceso a un archivo YAML o JSON que define un solo SMT. Si creas varios SMT, debes validarlos de forma individual.
  4. Es opcional. Para probar uno o más SMT en un mensaje de Pub/Sub de muestra, ejecuta el comando gcloud pubsub message-transforms test:

    gcloud pubsub message-transforms test \
      --message-transforms-file=TRANSFORMS_FILE \
      --message=MESSAGE \
      --attribute=ATTRIBUTES
    

    Reemplaza lo siguiente:

    • TRANSFORMS_FILE: Es la ruta de acceso a un archivo YAML o JSON que define uno o más SMT.
    • MESSAGE: Es el cuerpo del mensaje de muestra.
    • ATTRIBUTES: Opcional Es una lista separada por comas de atributos del mensaje. Cada atributo es un par clave-valor con el formato KEY="VALUE".

    El comando ejecuta las SMT en orden, y usa el resultado de cada SMT como entrada para la siguiente. El comando genera los resultados de cada paso.

  5. Para crear la suscripción, ejecuta el comando gcloud pubsub subscriptions create:

    gcloud pubsub subscriptions create SUBSCRIPTION_ID \
        --topic=projects/PROJECT_ID/topics/TOPIC_ID \
        --message-transforms-file=TRANSFORMS_FILE
    

    Reemplaza lo siguiente:

    • SUBSCRIPTION_ID: Es el ID o el nombre de la suscripción que deseas crear. Para obtener instrucciones sobre cómo asignar un nombre a una suscripción, consulta Nombres de recursos. El nombre de una suscripción es inmutable.

    • PROJECT_ID: Es el ID del proyecto que contiene el tema.

    • TOPIC_ID: Es el ID del tema al que se suscribirá.

    • TRANSFORMS_FILE: Es la ruta de acceso al archivo YAML o JSON que define uno o más SMT.

Java

Antes de probar esta muestra, sigue las instrucciones de configuración de Java en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Java.

import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.pubsub.v1.JavaScriptUDF;
import com.google.pubsub.v1.MessageTransform;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.Subscription;
import java.io.IOException;

public class CreateSubscriptionWithSmtExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    String subscriptionId = "your-subscription-id";

    createSubscriptionWithSmtExample(projectId, topicId, subscriptionId);
  }

  public static void createSubscriptionWithSmtExample(
      String projectId, String topicId, String subscriptionId) throws IOException {

    // UDF that removes the 'ssn' field, if present
    String code =
        "function redactSSN(message, metadata) {"
            + "  const data = JSON.parse(message.data);"
            + "  delete data['ssn'];"
            + "  message.data = JSON.stringify(data);"
            + "  return message;"
            + "}";
    String functionName = "redactSSN";

    JavaScriptUDF udf =
        JavaScriptUDF.newBuilder().setCode(code).setFunctionName(functionName).build();
    MessageTransform transform = MessageTransform.newBuilder().setJavascriptUdf(udf).build();

    try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {

      ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
      ProjectSubscriptionName subscriptionName =
          ProjectSubscriptionName.of(projectId, subscriptionId);

      Subscription subscription =
          subscriptionAdminClient.createSubscription(
              Subscription.newBuilder()
                  .setName(subscriptionName.toString())
                  .setTopic(topicName.toString())
                  // Add the UDF message transform
                  .addMessageTransforms(transform)
                  .build());

      System.out.println("Created subscription with SMT: " + subscription.getAllFields());
    }
  }
}

Python

Antes de probar esta muestra, sigue las instrucciones de configuración de Python en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Python.

from google.cloud import pubsub_v1
from google.pubsub_v1.types import JavaScriptUDF, MessageTransform

# TODO(developer): Choose an existing topic.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# subscription_id = "your-subscription-id"

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)

code = """function redactSSN(message, metadata) {
            const data = JSON.parse(message.data);
            delete data['ssn'];
            message.data = JSON.stringify(data);
            return message;
            }"""
udf = JavaScriptUDF(code=code, function_name="redactSSN")
transforms = [MessageTransform(javascript_udf=udf)]

with subscriber:
    subscription = subscriber.create_subscription(
        request={
            "name": subscription_path,
            "topic": topic_path,
            "message_transforms": transforms,
        }
    )
    print(f"Created subscription with SMT: {subscription}")

Go

En el siguiente ejemplo, se usa la versión principal de la biblioteca cliente de Pub/Sub de Go (v2). Si aún usas la biblioteca de la versión 1, consulta la guía de migración a la versión 2. Para ver una lista de muestras de código de la versión 1, consulta las muestras de código obsoletas.

Antes de probar esta muestra, sigue las instrucciones de configuración de Go en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Go.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub/v2"
	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
)

// createSubscriptionWithSMT creates a subscription with a single message transform function applied.
func createSubscriptionWithSMT(w io.Writer, projectID, topicID, subID string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	// subID := "my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	code := `function redactSSN(message, metadata) {
			const data = JSON.parse(message.data);
			delete data['ssn'];
			message.data = JSON.stringify(data);
			return message;
		}`

	transform := &pubsubpb.MessageTransform{
		Transform: &pubsubpb.MessageTransform_JavascriptUdf{
			JavascriptUdf: &pubsubpb.JavaScriptUDF{
				FunctionName: "redactSSN",
				Code:         code,
			},
		},
	}

	sub := &pubsubpb.Subscription{
		Name:              fmt.Sprintf("projects/%s/subscriptions/%s", projectID, subID),
		Topic:             fmt.Sprintf("projects/%s/topics/%s", projectID, topicID),
		MessageTransforms: []*pubsubpb.MessageTransform{transform},
	}
	sub, err = client.SubscriptionAdminClient.CreateSubscription(ctx, sub)
	if err != nil {
		return fmt.Errorf("CreateSubscription: %w", err)
	}
	fmt.Fprintf(w, "Created subscription with message transform: %v\n", sub)
	return nil
}

Cómo interactúan los SMT con otras funciones de suscripción

Ten en cuenta los siguientes puntos cuando uses un SMT de suscripción.

Filtros

Si tu suscripción usa tanto SMT como filtros integrados de Pub/Sub, el filtro se aplica antes del SMT. Esto tiene las siguientes implicaciones:

  • Si tu SMT altera los atributos del mensaje, el filtro de Pub/Sub no se aplica al nuevo conjunto de atributos.
  • Tu SMT no se aplica a los mensajes que se filtran con el filtro de Pub/Sub.
  • Si tu SMT filtra mensajes, ten en cuenta el impacto en el monitoreo de la acumulación de suscripciones.
  • Si conectas una suscripción a una canalización de Dataflow, no uses un SMT de suscripción para filtrar mensajes, ya que esto interrumpe el escalado automático de Dataflow.

Ordenamiento de mensajes

Si defines un SMT en una suscripción que tiene habilitado el ordenamiento y la ejecución del SMT arroja un error, los mensajes posteriores para la misma clave de ordenamiento no se entregan al suscriptor. Para evitar este problema, configura un tema de mensajes no entregados en la suscripción para quitar el mensaje no procesado del registro de mensajes pendientes.

¿Qué sigue?