Trier des messages

L'ordre des messages est une fonctionnalité de Pub/Sub qui vous permet de recevoir des messages dans vos clients abonnés dans l'ordre dans lequel ils ont été publiés par les clients éditeurs.

Par exemple, supposons qu'un client éditeur d'une région publie les messages 1, 2 et 3 dans cet ordre. Avec le tri des messages, le client abonné reçoit les messages publiés dans le même ordre. Pour être distribué dans l'ordre, le client éditeur doit publier les messages dans la même région. Toutefois, les abonnés peuvent se connecter à n'importe quelle région, et la garantie de tri est toujours maintenue.

Le tri des messages est une fonctionnalité utile dans des scénarios tels que la capture des modifications de base de données, le suivi des sessions utilisateur et les applications de streaming où il est important de préserver la chronologie des événements.

Cette page explique le concept de tri des messages et comment configurer vos clients abonnés pour recevoir des messages dans l'ordre. Pour configurer vos clients éditeurs pour le tri des messages, consultez Utiliser des clés de tri pour publier un message.

Présentation du tri des messages

Le tri dans Pub/Sub est déterminé par les éléments suivants :

  • Clé de tri. Une clé de tri est une chaîne qui identifie les messages associés qui doivent être triés. Les clés de tri peuvent inclure des ID client ou la clé primaire d'une ligne dans une base de données. Une clé de tri peut comporter jusqu'à 1 Ko.

    Pour trier les messages, définissez la même clé de tri sur tous les messages associés qui doivent être reçus dans l'ordre. De plus, vous devez publier tous les messages avec la même clé de tri dans la même région. Pour en savoir plus, consultez Utiliser des clés de tri pour publier un message.

    Les messages avec une clé de tri vide ne sont pas triés.

  • Activer le tri des messages. Pour recevoir des messages dans l'ordre, vous devez activer le tri des messages dans l'abonnement. Pour en savoir plus, consultez Activer le tri des messages.

    Si le tri des messages n'est pas activé, un abonnement reçoit les messages sans ordre prévu. Par exemple, supposons que les abonnements A et B soient tous deux associés au même sujet T, et que le tri soit activé dans l'abonnement A mais pas dans l'abonnement B. Bien que les deux abonnements reçoivent le même ensemble de messages du sujet T, le tri n'est conservé que pour l'abonnement A.

Le débit de publication sur chaque clé de tri est limité à 1 Mo/s. Le débit sur toutes les clés de tri d'un sujet est limité au quota disponible dans une région de publication. Cette limite peut être augmentée à plusieurs Go/s.

En général, si votre solution nécessite que les clients éditeurs envoient des messages triés et non triés, créez des sujets distincts, l'un pour les messages triés et l'autre pour les messages non triés.

Points à prendre en compte lors de l'utilisation de la messagerie triée

La liste suivante contient des informations importantes concernant le comportement de la messagerie triée dans Pub/Sub :

  • Cardinalité. Une clé de tri n'est pas équivalente à une partition dans un système de messagerie basé sur des partitions, car les clés de tri sont censées avoir une cardinalité beaucoup plus élevée que les partitions.

  • Tri dans une clé : les messages publiés avec la même clé de tri doivent être reçus dans l'ordre. Supposons que pour la clé de tri A, vous publiez les messages 1, 2 et 3. Lorsque le tri est activé, 1 doit être distribué avant 2, et 2 avant 3.

  • Tri entre les clés : les messages publiés avec des clés de tri différentes ne doivent pas être reçus dans l'ordre. Supposons que vous ayez les clés de tri A et B. Pour la clé de tri A, les messages 1 et 2 sont publiés dans l'ordre. Pour la clé de tri B, les messages 3 et 4 sont publiés dans l'ordre. Toutefois, le message 1 peut arriver avant ou après le message 4.

  • Redistribution des messages : Pub/Sub distribue chaque message au moins une fois. Le service Pub/Sub peut donc distribuer plusieurs fois des messages. Les redistributions d'un message déclenchent la redistribution de tous les messages suivants pour cette clé, même ceux qui ont été confirmés. Supposons qu'un client abonné reçoive les messages 1, 2 et 3 pour une clé de tri spécifique. Si le message 2 est redistribué (car le délai de confirmation a expiré ou que la confirmation optimale n'a pas persisté dans Pub/Sub), le message 3 est également redistribué. Si le tri des messages et une file d'attente de lettres mortes sont activés sur un abonnement, ce comportement n'est pas garanti, car Pub/Sub transfère les messages vers des files d'attente de lettres mortes de la manière la plus optimale possible.

  • Délais de confirmation et sujets de lettres mortes : les messages non confirmés pour une clé de tri donnée peuvent potentiellement retarder la distribution des messages pour d'autres clés de tri, en particulier lors des redémarrages du serveur ou des modifications du trafic. Pour maintenir l'ordre lors de ces événements, assurez-vous de confirmer tous les messages en temps voulu. Si l'accusé de réception en temps voulu n'est pas possible, envisagez d'utiliser une file d'attente de lettres mortes pour éviter la conservation indéfinie des messages. Sachez que l'ordre n'est pas garanti lorsque les messages sont écrits dans un file d'attente de lettres mortes.

  • Affinité des messages (clients streamingPull) : les messages pour la même clé sont généralement distribués au même client abonné streamingPull. L'affinité est attendue lorsque des messages sont en attente pour une clé de tri vers un client abonné spécifique. S'il n'y a pas de messages en attente, l'affinité peut changer pour l'équilibrage de charge ou les déconnexions du client.

    Pour garantir un traitement fluide, même en cas de modifications potentielles de l'affinité, il est essentiel de concevoir votre application streamingPull de manière à ce qu'elle puisse gérer les messages dans n'importe quel client pour une clé de tri donnée.

  • Intégration à Dataflow : n'activez pas le tri des messages pour les abonnements lorsque vous configurez Dataflow avec Pub/Sub. Dataflow dispose de son propre mécanisme de tri total des messages, qui garantit l'ordre chronologique de tous les messages dans le cadre des opérations de fenêtrage. Cette méthode de tri diffère de l'approche basée sur les clés de tri de Pub/Sub. L'utilisation de clés de tri avec Dataflow peut potentiellement réduire les performances du pipeline.

  • Scaling automatique : la distribution triée de Pub/Sub s'adapte à des milliards de clés de tri. Un plus grand nombre de clés de tri permet une distribution plus parallèle aux abonnés, car le tri s'applique à tous les messages ayant la même clé de tri.

  • Compromis en termes de performances : la distribution triée présente certains compromis. Par rapport à la distribution non triée, la distribution triée réduit la disponibilité de la publication et augmente la latence de distribution des messages de bout en bout. Dans le cas de la distribution triée, le basculement nécessite une coordination pour s'assurer que les messages sont écrits et lus dans le bon ordre.

  • Clé active : lorsque vous utilisez le tri des messages, tous les messages ayant la même clé de tri sont envoyés à votre client abonné dans l'ordre dans lequel ils sont reçus par le service. Le rappel utilisateur ne s'exécute que lorsque le rappel est terminé pour le message précédent. Le débit maximal des messages partageant la même clé de tri lors de la distribution aux abonnés n'est pas limité par Pub/Sub , mais par la vitesse de traitement de votre client abonné. Une clé active se produit lorsqu'un backlog s'accumule sur une clé de tri individuelle, car le nombre de messages produits par seconde dépasse le nombre de messages que l'abonné peut traiter par seconde. Pour atténuer les clés actives, utilisez les clés les plus granulaires possibles et réduisez le temps de traitement par message. Vous pouvez également surveiller la métrique subscription/oldest_unacked_message_age pour une valeur croissante, ce qui peut indiquer une clé active.

Pour en savoir plus sur l'utilisation du tri des messages, consultez les bonnes pratiques suivantes :

Comportement du client abonné pour le tri des messages

Les clients abonnés reçoivent les messages dans l'ordre dans lequel ils ont été publiés dans une région spécifique. Pub/Sub est compatible avec différentes méthodes de réception des messages, telles que les clients abonnés connectés aux abonnements pull et push. Les bibliothèques clientes utilisent streamingPull (à l'exception de PHP).

Pour en savoir plus sur ces types d'abonnements, consultez Choisir un type d'abonnement.

Les sections suivantes expliquent ce que signifie recevoir des messages dans l'ordre pour chaque type de client abonné.

Clients abonnés streamingPull

Lorsque vous utilisez les bibliothèques clientes avec streamingPull, vous devez spécifier un rappel utilisateur qui s'exécute chaque fois qu'un message est reçu par un client abonné. Avec les bibliothèques clientes, pour une clé de tri donnée, le rappel est exécuté jusqu'à la fin sur les messages dans le bon ordre. Si les messages sont confirmés dans ce rappel, tous les calculs sur un message se produisent dans l'ordre. Toutefois, si le rappel utilisateur planifie d'autres tâches asynchrones sur les messages, le client abonné doit s'assurer que les tâches asynchrones sont effectuées dans l'ordre. Une option consiste à ajouter des messages à une file d'attente de travail locale traitée dans l'ordre.

Clients abonnés pull

Pour les clients abonnés connectés aux abonnements pull, le tri des messages Pub/Sub est compatible avec les éléments suivants :

  • Tous les messages d'une clé de tri dans la PullResponse sont dans le bon ordre dans la liste.

  • Un seul lot de messages peut être en attente pour une clé de tri à la fois.

L'exigence selon laquelle un seul lot de messages peut être en attente à la fois est nécessaire pour maintenir la distribution triée, car le service Pub/Sub ne peut pas garantir le succès ni la latence de la réponse qu'il envoie pour une demande d'extraction d'un abonné.

Clients abonnés push

Les restrictions sur le push sont encore plus strictes que celles sur le pull. Pour un abonnement push, Pub/Sub n'accepte qu'un seul message en attente pour chaque clé de tri à la fois. Chaque message est envoyé à un point de terminaison push en tant que requête distincte. Par conséquent, l'envoi des requêtes en parallèle aurait le même problème que la distribution simultanée de plusieurs lots de messages pour la même clé de tri aux abonnés pull. Les abonnements push ne sont peut-être pas un bon choix pour les sujets où les messages sont fréquemment publiés avec la même clé de tri ou où la latence est extrêmement importante.

Clients abonnés à l'exportation

Les abonnements à l'exportation sont compatibles avec les messages triés. Pour les abonnements BigQuery, les messages ayant la même clé de tri sont écrits dans leur table BigQuery dans l'ordre. Pour les abonnements Cloud Storage, les messages ayant la même clé de tri ne sont pas tous écrits dans le même fichier. Dans le même fichier, les messages d'une clé de tri sont dans l'ordre. Lorsqu'ils sont répartis sur plusieurs fichiers, les messages ultérieurs d'une clé de tri peuvent apparaître dans un fichier dont le nom comporte un horodatage antérieur à celui du nom du fichier contenant les messages précédents.

Activer le tri des messages

Pour recevoir les messages dans l'ordre, définissez la propriété de tri des messages sur l'abonnement à partir duquel vous recevez les messages. La réception des messages dans l'ordre peut augmenter la latence. Vous ne pouvez pas modifier la propriété de tri des messages après avoir créé un abonnement.

Vous pouvez définir la propriété de tri des messages lorsque vous créez un abonnement à l'aide de la Google Cloud console, de la Google Cloud CLI ou de l'API Pub/Sub.

Console

Pour créer un abonnement avec la propriété de tri des messages, procédez comme suit :

  1. Dans la Google Cloud console, accédez à la page Abonnements.

Accéder aux abonnements

  1. Cliquez sur Créer un abonnement.

  2. Saisissez un ID d'abonnement.

  3. Choisissez un sujet à partir duquel vous souhaitez recevoir des messages.

  4. Dans la section Tri des messages, sélectionnez Trier les messages avec une clé de tri.

  5. Cliquez sur Créer.

gcloud

Pour créer un abonnement avec la propriété de tri des messages, utilisez la gcloud pubsub subscriptions create commande et l' --enable-message-ordering option :

gcloud pubsub subscriptions create SUBSCRIPTION_ID \
  --enable-message-ordering

Remplacez SUBSCRIPTION_ID par l'ID de l'abonnement.

Si la requête aboutit, la ligne de commande affiche une confirmation :

Created subscription [SUBSCRIPTION_ID].

REST

Pour créer un abonnement avec la propriété de tri des messages, envoyez une requête PUT comme suit :

PUT https://pubsub.googleapis.com/v1/projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID
Authorization: Bearer $(gcloud auth application-default print-access-token)

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet avec le sujet
  • SUBSCRIPTION_ID : ID de l'abonnement

Dans le corps de la requête, spécifiez les éléments suivants :

{
  "topic": TOPIC_ID,
  "enableMessageOrdering": true,
}

Remplacez TOPIC_ID par l'ID du sujet à associer à l'abonnement.

Si la requête aboutit, la réponse est l'abonnement au format JSON :

{
  "name": projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID,
  "topic": projects/PROJECT_ID/topics/TOPIC_ID,
  "enableMessageOrdering": true,
}

C++

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage C++ qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Pub/Sub pour C++ .

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::SubscriptionAdminClient client,
   std::string const& project_id, std::string const& topic_id,
   std::string const& subscription_id) {
  google::pubsub::v1::Subscription request;
  request.set_name(
      pubsub::Subscription(project_id, subscription_id).FullName());
  request.set_topic(pubsub::Topic(project_id, topic_id).FullName());
  request.set_enable_message_ordering(true);
  auto sub = client.CreateSubscription(request);
  if (sub.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The subscription already exists\n";
    return;
  }
  if (!sub) throw std::move(sub).status();

  std::cout << "The subscription was successfully created: "
            << sub->DebugString() << "\n";
}

C#

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage C# qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour C#.


using Google.Cloud.PubSub.V1;
using Grpc.Core;

public class CreateSubscriptionWithOrderingSample
{
    public Subscription CreateSubscriptionWithOrdering(string projectId, string topicId, string subscriptionId)
    {
        SubscriberServiceApiClient subscriber = SubscriberServiceApiClient.Create();
        var topicName = TopicName.FromProjectTopic(projectId, topicId);
        var subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);

        var subscriptionRequest = new Subscription
        {
            SubscriptionName = subscriptionName,
            TopicAsTopicName = topicName,
            EnableMessageOrdering = true
        };

        Subscription subscription = null;
        try
        {
            subscription = subscriber.CreateSubscription(subscriptionRequest);
        }
        catch (RpcException e) when (e.Status.StatusCode == StatusCode.AlreadyExists)
        {
            // Already exists.  That's fine.
        }
        return subscription;
    }
}

Go

L'exemple suivant utilise la version majeure de la bibliothèque cliente Go Pub/Sub (v2). Si vous utilisez toujours la bibliothèque v1, consultez le guide de migration vers la v2. Pour afficher la liste des exemples de code v1, consultez les exemples de code obsolètes.

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Go qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Pub/Sub pour Go.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub/v2"
	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
)

func createWithOrdering(w io.Writer, projectID, topic, subscription string) error {
	// projectID := "my-project-id"
	// topic := "projects/my-project-id/topics/my-topic"
	// subscription := "projects/my-project/subscriptions/my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	// Message ordering can only be set when creating a subscription.
	sub, err := client.SubscriptionAdminClient.CreateSubscription(ctx, &pubsubpb.Subscription{
		Name:                  subscription,
		Topic:                 topic,
		EnableMessageOrdering: true,
	})
	if err != nil {
		return fmt.Errorf("CreateSubscription: %w", err)
	}
	fmt.Fprintf(w, "Created subscription: %v\n", sub)
	return nil
}

Java

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Java se trouvant sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Pub/Sub pour Java .

import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.Subscription;
import java.io.IOException;

public class CreateSubscriptionWithOrdering {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    String subscriptionId = "your-subscription-id";

    createSubscriptionWithOrderingExample(projectId, topicId, subscriptionId);
  }

  public static void createSubscriptionWithOrderingExample(
      String projectId, String topicId, String subscriptionId) throws IOException {
    try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {

      ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
      ProjectSubscriptionName subscriptionName =
          ProjectSubscriptionName.of(projectId, subscriptionId);

      Subscription subscription =
          subscriptionAdminClient.createSubscription(
              Subscription.newBuilder()
                  .setName(subscriptionName.toString())
                  .setTopic(topicName.toString())
                  // Set message ordering to true for ordered messages in the subscription.
                  .setEnableMessageOrdering(true)
                  .build());

      System.out.println("Created a subscription with ordering: " + subscription.getAllFields());
    }
  }
}

Node.js

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Node.js qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Pub/Sub pour Node.js.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithOrdering(
  topicNameOrId,
  subscriptionNameOrId,
) {
  // Creates a new subscription
  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, {
      enableMessageOrdering: true,
    });
  console.log(
    `Created subscription ${subscriptionNameOrId} with ordering enabled.`,
  );
  console.log(
    'To process messages in order, remember to add an ordering key to your messages.',
  );
}

Node.js

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Node.js qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Pub/Sub pour Node.js.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
import {PubSub} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithOrdering(
  topicNameOrId: string,
  subscriptionNameOrId: string,
) {
  // Creates a new subscription
  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, {
      enableMessageOrdering: true,
    });
  console.log(
    `Created subscription ${subscriptionNameOrId} with ordering enabled.`,
  );
  console.log(
    'To process messages in order, remember to add an ordering key to your messages.',
  );
}

Python

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Python qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Pub/Sub pour Python.

from google.cloud import pubsub_v1

# TODO(developer): Choose an existing topic.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# subscription_id = "your-subscription-id"

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)

with subscriber:
    subscription = subscriber.create_subscription(
        request={
            "name": subscription_path,
            "topic": topic_path,
            "enable_message_ordering": True,
        }
    )
    print(f"Created subscription with ordering: {subscription}")

Ruby

L'exemple suivant utilise la bibliothèque cliente Ruby Pub/Sub v3. Si vous utilisez toujours la bibliothèque v2, consultez le guide de migration vers la v3. Pour afficher la liste des exemples de code Ruby v2, consultez les exemples de code obsolètes.

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Ruby qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Pub/Sub pour Ruby.

# topic_id        = "your-topic-id"
# subscription_id = "your-subscription-id"

pubsub = Google::Cloud::PubSub.new
subscription_admin = pubsub.subscription_admin

subscription = subscription_admin.create_subscription \
  name: pubsub.subscription_path(subscription_id),
  topic: pubsub.topic_path(topic_id),
  enable_message_ordering: true

puts "Pull subscription #{subscription_id} created with message ordering."

Étape suivante