Creare una sottoscrizione con SMT

Questo documento spiega come creare una sottoscrizione Pub/Sub con Single Message Transform (SMT).

Le SMT di sottoscrizione consentono modifiche leggere ai dati e agli attributi dei messaggi direttamente in Pub/Sub. Questa funzionalità consente la pulizia, il filtraggio o la conversione del formato dei dati prima che i messaggi vengano inviati a un client abbonato.

Per creare una sottoscrizione con SMT, puoi utilizzare la console Google Cloud , Google Cloud CLI, la libreria client o l'API Pub/Sub.

Prima di iniziare

Ruoli e autorizzazioni richiesti

Per ottenere le autorizzazioni necessarie per creare un abbonamento con SMT, chiedi all'amministratore di concederti il ruolo IAM Pub/Sub Editor (roles/pubsub.editor) nel progetto. Per saperne di più sulla concessione dei ruoli, consulta Gestisci l'accesso a progetti, cartelle e organizzazioni.

Questo ruolo predefinito contiene le autorizzazioni necessarie per creare un abbonamento con SMT. Per vedere quali sono esattamente le autorizzazioni richieste, espandi la sezione Autorizzazioni obbligatorie:

Autorizzazioni obbligatorie

Per creare un abbonamento con SMT sono necessarie le seguenti autorizzazioni:

  • Concedi l'autorizzazione per creare una sottoscrizione nel progetto: pubsub.subscriptions.create

Potresti anche ottenere queste autorizzazioni con ruoli personalizzati o altri ruoli predefiniti.

A seconda del tipo di abbonamento, potresti aver bisogno di autorizzazioni aggiuntive. Per scoprire l'elenco esatto delle autorizzazioni, consulta il documento che illustra la creazione dell'abbonamento specifico. Ad esempio, se stai creando una sottoscrizione BigQuery con SMT, consulta la pagina Creare sottoscrizioni BigQuery.

Se crei una sottoscrizione in un progetto diverso dall'argomento, devi concedere il ruolo roles/pubsub.subscriber all'entità del progetto contenente la sottoscrizione nel progetto contenente l'argomento.

Puoi configurare il controllo dell'accesso a livello di progetto e a livello di singola risorsa.

Creare una sottoscrizione con SMT

Prima di creare un abbonamento con SMT, consulta la documentazione relativa alle proprietà di un abbonamento.

Per creare un abbonamento Pub/Sub con uno o più SMT, segui questi passaggi.

Console

  1. Nella console Google Cloud , vai alla pagina Sottoscrizioni di Pub/Sub.

    Vai agli abbonamenti

  2. Fai clic su Crea sottoscrizione.

    Viene visualizzata la pagina Crea sottoscrizione.

  3. Nel campo ID abbonamento, inserisci un ID per l'abbonamento. Per ulteriori informazioni sulla denominazione degli abbonamenti, consulta le linee guida per i nomi.

  4. In Trasformazioni, fai clic su Aggiungi una trasformazione.

  5. Inserisci un nome di funzione. Ad esempio: redactSSN.

  6. Se non vuoi che la trasformazione semantica sia attiva immediatamente, seleziona Disattiva trasformazione. Quando questa opzione è selezionata, la SMT viene creata con l'abbonamento, ma non viene eseguita sui messaggi in arrivo. Dopo aver creato l'abbonamento, puoi modificarlo per attivare l'SMT.

  7. Nell'area di testo, inserisci il codice per l'SMT. Ad esempio:

    function redactSSN(message, metadata) {
      const data = JSON.parse(message.data);
      delete data['ssn'];
      message.data = JSON.stringify(data);
      return message;
    }
    
  8. Facoltativo. Per convalidare l'SMT, fai clic su Convalida. Se l'SMT è valido, viene visualizzato il messaggio "Validation passed". In caso contrario, viene visualizzato un messaggio di errore.

  9. Per aggiungere un'altra trasformazione, fai clic su Aggiungi una trasformazione e ripeti i passaggi precedenti.

    Per disporre le SMT in un ordine specifico, fai clic su Sposta su o Sposta giù. Per rimuovere un SMT, fai clic su Elimina.

  10. Facoltativo. Per testare un SMT su un messaggio di esempio:

    1. Fai clic su Testa trasformazioni.

    2. Nella finestra Testa trasformazione, seleziona la funzione che vuoi testare.

    3. Nella finestra Messaggio di input, inserisci un messaggio di esempio.

    4. Per aggiungere un attributo al messaggio, fai clic su Aggiungi un attributo e inserisci la chiave e il valore dell'attributo. Puoi aggiungere più attributi.

    5. Fai clic su Test. Il risultato dell'applicazione della SMT al messaggio viene visualizzato in Messaggio di output.

    6. Per chiudere la finestra Testa trasformazioni, fai clic su Chiudi.

    Se crei più di una SMT, puoi testare l'intera sequenza di trasformazioni nel seguente modo:

    1. Testa il primo SMT della sequenza, come descritto nei passaggi precedenti.
    2. Seleziona il successivo SMT. Il messaggio di input è precompilato con il messaggio di output del test precedente.
    3. Continua a testare gli SMT in ordine per assicurarti che l'intera sequenza funzioni come previsto.
  11. Fai clic su Crea per creare l'abbonamento.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Crea un file YAML o JSON che definisca uno o più SMT. Se hai più di un SMT, vengono eseguiti sui messaggi nell'ordine in cui li elenchi.

    Ecco un esempio di file di trasformazione YAML:

    - javascriptUdf:
        code: >
            function redactSSN(message, metadata) {
              const data = JSON.parse(message.data);
              delete data['ssn'];
              message.data = JSON.stringify(data);
              return message;
            }
        functionName: redactSSN
    
  3. Facoltativo. Per convalidare un SMT, esegui il comando gcloud pubsub message-transforms validate:

    gcloud pubsub message-transforms validate \
      --message-transform-file=TRANSFORM_FILE
    

    Sostituisci quanto segue:

    • TRANSFORM_FILE: il percorso di un file YAML o JSON che definisce un singolo SMT. Se crei più SMT, devi convalidarli singolarmente.
  4. Facoltativo. Per testare uno o più SMT su un messaggio Pub/Sub di esempio, esegui il comando gcloud pubsub message-transforms test:

    gcloud pubsub message-transforms test \
      --message-transforms-file=TRANSFORMS_FILE \
      --message=MESSAGE \
      --attribute=ATTRIBUTES
    

    Sostituisci quanto segue:

    • TRANSFORMS_FILE: il percorso di un file YAML o JSON che definisce uno o più SMT.
    • MESSAGE: il corpo del messaggio di esempio.
    • ATTRIBUTES: (Facoltativo) Un elenco separato da virgole di attributi del messaggio. Ogni attributo è una coppia chiave-valore formattata come KEY="VALUE".

    Il comando esegue le SMT in ordine, utilizzando l'output di ciascuna SMT come input per la successiva. Il comando restituisce i risultati di ogni passaggio.

  5. Per creare l'abbonamento, esegui il comando gcloud pubsub subscriptions create:

    gcloud pubsub subscriptions create SUBSCRIPTION_ID \
        --topic=projects/PROJECT_ID/topics/TOPIC_ID \
        --message-transforms-file=TRANSFORMS_FILE
    

    Sostituisci quanto segue:

    • SUBSCRIPTION_ID: l'ID o il nome dell'abbonamento che vuoi creare. Per le linee guida su come denominare un abbonamento, consulta Nomi delle risorse. Il nome di un abbonamento è immutabile.

    • PROJECT_ID: l'ID del progetto che contiene l'argomento.

    • TOPIC_ID: l'ID dell'argomento a cui iscriversi.

    • TRANSFORMS_FILE: il percorso del file YAML o JSON che definisce uno o più SMT.

  6. Java

    Prima di provare questo esempio, segui le istruzioni di configurazione di Java nella guida rapida all'utilizzo delle librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Java di Pub/Sub.

    import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
    import com.google.pubsub.v1.JavaScriptUDF;
    import com.google.pubsub.v1.MessageTransform;
    import com.google.pubsub.v1.ProjectSubscriptionName;
    import com.google.pubsub.v1.ProjectTopicName;
    import com.google.pubsub.v1.Subscription;
    import java.io.IOException;
    
    public class CreateSubscriptionWithSmtExample {
      public static void main(String... args) throws Exception {
        // TODO(developer): Replace these variables before running the sample.
        String projectId = "your-project-id";
        String topicId = "your-topic-id";
        String subscriptionId = "your-subscription-id";
    
        createSubscriptionWithSmtExample(projectId, topicId, subscriptionId);
      }
    
      public static void createSubscriptionWithSmtExample(
          String projectId, String topicId, String subscriptionId) throws IOException {
    
        // UDF that removes the 'ssn' field, if present
        String code =
            "function redactSSN(message, metadata) {"
                + "  const data = JSON.parse(message.data);"
                + "  delete data['ssn'];"
                + "  message.data = JSON.stringify(data);"
                + "  return message;"
                + "}";
        String functionName = "redactSSN";
    
        JavaScriptUDF udf =
            JavaScriptUDF.newBuilder().setCode(code).setFunctionName(functionName).build();
        MessageTransform transform = MessageTransform.newBuilder().setJavascriptUdf(udf).build();
    
        try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {
    
          ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
          ProjectSubscriptionName subscriptionName =
              ProjectSubscriptionName.of(projectId, subscriptionId);
    
          Subscription subscription =
              subscriptionAdminClient.createSubscription(
                  Subscription.newBuilder()
                      .setName(subscriptionName.toString())
                      .setTopic(topicName.toString())
                      // Add the UDF message transform
                      .addMessageTransforms(transform)
                      .build());
    
          System.out.println("Created subscription with SMT: " + subscription.getAllFields());
        }
      }
    }

    Python

    Prima di provare questo esempio, segui le istruzioni di configurazione di Python nella guida rapida all'utilizzo delle librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Pub/Sub Python.

    from google.cloud import pubsub_v1
    from google.pubsub_v1.types import JavaScriptUDF, MessageTransform
    
    # TODO(developer): Choose an existing topic.
    # project_id = "your-project-id"
    # topic_id = "your-topic-id"
    # subscription_id = "your-subscription-id"
    
    publisher = pubsub_v1.PublisherClient()
    subscriber = pubsub_v1.SubscriberClient()
    topic_path = publisher.topic_path(project_id, topic_id)
    subscription_path = subscriber.subscription_path(project_id, subscription_id)
    
    code = """function redactSSN(message, metadata) {
                const data = JSON.parse(message.data);
                delete data['ssn'];
                message.data = JSON.stringify(data);
                return message;
                }"""
    udf = JavaScriptUDF(code=code, function_name="redactSSN")
    transforms = [MessageTransform(javascript_udf=udf)]
    
    with subscriber:
        subscription = subscriber.create_subscription(
            request={
                "name": subscription_path,
                "topic": topic_path,
                "message_transforms": transforms,
            }
        )
        print(f"Created subscription with SMT: {subscription}")

    Go

    L'esempio seguente utilizza la versione principale della libreria client Go Pub/Sub (v2). Se utilizzi ancora la libreria v1, consulta la guida alla migrazione alla v2. Per visualizzare un elenco di esempi di codice della versione 1, consulta gli esempi di codice deprecati.

    Prima di provare questo esempio, segui le istruzioni di configurazione di Go nella guida rapida all'utilizzo delle librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Pub/Sub Go.

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/pubsub/v2"
    	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
    )
    
    // createSubscriptionWithSMT creates a subscription with a single message transform function applied.
    func createSubscriptionWithSMT(w io.Writer, projectID, topicID, subID string) error {
    	// projectID := "my-project-id"
    	// topicID := "my-topic"
    	// subID := "my-sub"
    	ctx := context.Background()
    	client, err := pubsub.NewClient(ctx, projectID)
    	if err != nil {
    		return fmt.Errorf("pubsub.NewClient: %w", err)
    	}
    	defer client.Close()
    
    	code := `function redactSSN(message, metadata) {
    			const data = JSON.parse(message.data);
    			delete data['ssn'];
    			message.data = JSON.stringify(data);
    			return message;
    		}`
    
    	transform := &pubsubpb.MessageTransform{
    		Transform: &pubsubpb.MessageTransform_JavascriptUdf{
    			JavascriptUdf: &pubsubpb.JavaScriptUDF{
    				FunctionName: "redactSSN",
    				Code:         code,
    			},
    		},
    	}
    
    	sub := &pubsubpb.Subscription{
    		Name:              fmt.Sprintf("projects/%s/subscriptions/%s", projectID, subID),
    		Topic:             fmt.Sprintf("projects/%s/topics/%s", projectID, topicID),
    		MessageTransforms: []*pubsubpb.MessageTransform{transform},
    	}
    	sub, err = client.SubscriptionAdminClient.CreateSubscription(ctx, sub)
    	if err != nil {
    		return fmt.Errorf("CreateSubscription: %w", err)
    	}
    	fmt.Fprintf(w, "Created subscription with message transform: %v\n", sub)
    	return nil
    }
    

Come interagiscono le notifiche intelligenti con altre funzionalità di abbonamento

Se il tuo abbonamento utilizza sia SMT sia i filtri integrati di Pub/Sub, il filtro viene applicato prima dell'SMT. Ciò comporta le seguenti implicazioni:

  • Se la tua SMT modifica gli attributi del messaggio, il filtro Pub/Sub non viene applicato al nuovo insieme di attributi.
  • Il tuo SMT non verrà applicato ai messaggi filtrati dal filtro Pub/Sub.

Se il tuo SMT filtra i messaggi, tieni presente l'impatto sul monitoraggio del backlog degli abbonamenti. Se inserisci l'abbonamento in una pipeline Dataflow, non filtrare i messaggi utilizzando SMT, in quanto interrompe la scalabilità automatica di Dataflow.

Passaggi successivi