Ordenar mensagens

A ordenação de mensagens é um recurso do Pub/Sub que permite receber mensagens nos clientes assinantes na ordem em que foram publicadas pelos clientes publisher.

Por exemplo, suponha que um cliente publisher em uma região publique as mensagens 1, 2 e 3 em ordem. Com a ordenação de mensagens, o cliente assinante recebe as mensagens publicadas na mesma ordem. Para serem entregues em ordem, o cliente publisher precisa publicar as mensagens na mesma região. No entanto, os assinantes podem se conectar a qualquer região, e a garantia de ordenação ainda é mantida.

A ordenação de mensagens é um recurso útil para cenários como captura de mudanças de banco de dados, rastreamento de sessões de usuários e aplicativos de streaming em que a preservação da cronologia de eventos é importante.

Esta página explica o conceito de ordenação de mensagens e como configurar os clientes assinantes para receber mensagens em ordem. Para configurar os clientes publisher para ordenação de mensagens, consulte Usar chaves de ordenação para publicar uma mensagem.

Visão geral da ordenação de mensagens

A ordenação no Pub/Sub é determinada pelo seguinte:

  • Chave de ordenação: é uma string usada nos metadados de mensagens do Pub/Sub e representa a entidade para a qual as mensagens precisam ser ordenadas. A chave de ordenação pode ter até 1 KB de comprimento. Para receber um conjunto de mensagens ordenadas em uma região, é necessário publicar todas as mensagens com a mesma chave de ordenação na mesma região. Alguns exemplos de chaves de ordenação são IDs de clientes e a chave primária de uma linha em um banco de dados.

    A capacidade de publicação em cada chave de ordenação é limitada a 1 MBps. A capacidade em todas as chaves de ordenação em um tópico é limitada à cota disponível em uma região de publicação. Esse limite pode ser aumentado para muitas unidades de GBps.

    Uma chave de ordenação não é equivalente a uma partição em um sistema de mensagens baseado em partição, já que as chaves de ordenação têm uma cardinalidade muito maior do que as partições.

  • Ativar a ordenação de mensagens: essa é uma configuração de assinatura. Quando uma assinatura tem a ordenação de mensagens ativada, os clientes assinantes recebem mensagens publicadas na mesma região com a mesma chave de ordenação na ordem em que foram recebidas pelo serviço. É necessário ativar essa configuração na assinatura.

    Suponha que você tenha duas assinaturas A e B anexadas ao mesmo tópico T. A assinatura A está configurada com a ordenação de mensagens ativada, e a assinatura B está configurada sem a ordenação de mensagens ativada. Nessa arquitetura, as assinaturas A e B recebem o mesmo conjunto de mensagens do tópico T. Se você publicar mensagens com chaves de ordenação na mesma região, a assinatura A receberá as mensagens na ordem em que foram publicadas. Já a assinatura B recebe as mensagens sem nenhuma ordenação esperada.

Em geral, se a solução exigir que os clientes publisher enviem mensagens ordenadas e não ordenadas, crie tópicos separados, um para mensagens ordenadas e outro para mensagens não ordenadas.

Considerações ao usar mensagens ordenadas

A lista a seguir contém informações importantes sobre o comportamento das mensagens ordenadas no Pub/Sub:

  • Ordenação dentro da chave: as mensagens publicadas com a mesma chave de ordenação precisam ser recebidas em ordem. Suponha que, para a chave de ordenação A, você publique as mensagens 1, 2 e 3. Com a ordenação ativada, espera-se que 1 seja entregue antes de 2 e 2 antes de 3.

  • Ordenação entre chaves: as mensagens publicadas com chaves de ordenação diferentes não precisam ser recebidas em ordem. Suponha que você tenha as chaves de ordenação A e B. Para a chave de ordenação A, as mensagens 1 e 2 são publicadas em ordem. Para a chave de ordenação B, as mensagens 3 e 4 são publicadas em ordem. No entanto, a mensagem 1 pode chegar antes ou depois da mensagem 4.

  • Reentrega de mensagens: o Pub/Sub entrega cada mensagem pelo menos uma vez, para que o serviço do Pub/Sub possa reenviar as mensagens. As reentregas de uma mensagem acionam a reentrega de todas as mensagens subsequentes para essa chave, mesmo as confirmadas. Suponha que um cliente assinante receba as mensagens 1, 2 e 3 para uma chave de ordenação específica. Se a mensagem 2 for reentregue (porque o prazo de confirmação expirou ou a confirmação de melhor esforço não persistiu no Pub/Sub), a mensagem 3 também será reentregue. Se a ordenação de mensagens e um tópico de mensagens inativas estiverem ativados em uma assinatura, esse comportamento poderá não ser verdadeiro, já que o Pub/Sub encaminha mensagens para tópicos de mensagens inativas com base no melhor esforço.

  • Atrasos de confirmação e tópicos de mensagens inativas: as mensagens não confirmadas para uma determinada chave de ordenação podem atrasar a entrega de mensagens para outras chaves de ordenação, especialmente durante reinicializações do servidor ou mudanças de tráfego. Para manter a ordem em todos esses eventos, garanta a confirmação oportuna de todas as mensagens. Se a confirmação oportuna não for possível, considere usar um tópico de mensagens inativas para evitar a retenção indefinida de mensagens. Esteja ciente de que a ordem pode não ser preservada quando as mensagens são gravadas em um tópico de mensagens inativas.

  • Afinidade de mensagens (clientes streamingPull): as mensagens para a mesma chave são geralmente entregues ao mesmo cliente assinante streamingPull. A afinidade é esperada quando as mensagens estão pendentes para uma chave de ordenação para um cliente assinante específico. Se não houver mensagens pendentes, a afinidade poderá mudar para balanceamento de carga ou desconexões de clientes.

    Para garantir um processamento tranquilo, mesmo com possíveis mudanças de afinidade, é fundamental projetar o aplicativo streamingPull de forma que ele possa processar mensagens em qualquer cliente para uma determinada chave de ordenação.

  • Integração com o Dataflow: não ative a ordenação de mensagens para assinaturas ao configurar o Dataflow com o Pub/Sub. O Dataflow tem seu próprio mecanismo para ordenação total de mensagens, garantindo a ordem cronológica em todas as mensagens como parte das operações de janela. Esse método de ordenação difere da abordagem baseada em chave de ordenação do Pub/Sub. O uso de chaves de ordenação com o Dataflow pode reduzir o desempenho do pipeline.

  • Escalonamento automático: a entrega ordenada do Pub/Sub é escalonada para bilhões de chaves de ordenação. Um número maior de chaves de ordenação permite mais entrega paralela aos assinantes, já que a ordenação se aplica a todas as mensagens com a mesma chave de ordenação.

  • Compensações de desempenho: a entrega ordenada tem algumas compensações. Em comparação com a entrega não ordenada, a entrega ordenada diminui a disponibilidade de publicação e aumenta a latência de entrega de mensagens de ponta a ponta. No caso de entrega ordenada, o failover exige coordenação para garantir que as mensagens sejam gravadas e lidas na ordem correta.

  • Chave ativa: ao usar a ordenação de mensagens, todas as mensagens com a mesma chave de ordenação são enviadas ao cliente assinante na ordem em que são recebidas pelo serviço. O callback do usuário não é executado até que o callback seja concluído para a mensagem anterior. A capacidade de processamento máxima de mensagens que compartilham a mesma chave de ordenação ao entregar aos assinantes não é limitada pelo Pub/Sub , mas pela velocidade de processamento do cliente assinante. Uma chave ativa ocorre quando um backlog é criado em uma chave de ordenação individual porque o número de mensagens produzidas por segundo excede o número de mensagens que o assinante pode processar por segundo. Para atenuar as chaves ativas, use as chaves mais granulares possíveis e minimize o tempo de processamento por mensagem. Também é possível monitorar a métrica subscription/oldest_unacked_message_age para um valor crescente, o que pode indicar uma chave ativa.

Para mais informações sobre como usar a ordenação de mensagens, consulte os seguintes tópicos de práticas recomendadas:

Comportamento do cliente assinante para ordenação de mensagens

Os clientes assinantes recebem mensagens na ordem em que foram publicadas em uma região específica. O Pub/Sub oferece suporte a diferentes maneiras de receber mensagens, como clientes assinantes conectados a assinaturas de pull e push. As bibliotecas de cliente usam streamingPull (com exceção do PHP).

Para saber mais sobre esses tipos de assinatura, consulte Escolher um tipo de assinatura.

As seções a seguir discutem o que significa receber mensagens em ordem para cada tipo de cliente assinante.

Clientes assinantes de StreamingPull

Ao usar as bibliotecas de cliente com streamingPull, é necessário especificar um callback do usuário que é executado sempre que uma mensagem é recebida por um cliente assinante. Com as bibliotecas de cliente, para qualquer chave de ordenação, o callback é executado até a conclusão das mensagens na ordem correta. Se as mensagens forem confirmadas nesse callback, todos os cálculos em uma mensagem ocorrerão em ordem. No entanto, se o callback do usuário programar outro trabalho assíncrono em mensagens, o cliente assinante precisará garantir que o trabalho assíncrono seja feito em ordem. Uma opção é adicionar mensagens a uma fila de trabalho local que é processada em ordem.

Clientes assinantes de pull

Para clientes assinantes conectados a assinaturas de pull, a ordenação de mensagens do Pub/Sub oferece suporte ao seguinte:

  • Todas as mensagens de uma chave de ordenação no PullResponse estão na ordem correta na lista.

  • Apenas um lote de mensagens pode estar pendente para uma chave de ordenação por vez.

O requisito de que apenas um lote de mensagens possa estar pendente por vez é necessário para manter a entrega ordenada, já que o serviço do Pub/Sub não pode garantir o sucesso ou a latência da resposta enviada para uma solicitação de envio do assinante.

Clientes assinantes de push

As restrições de push são ainda mais rigorosas do que as de pull. Para uma assinatura por push, o Pub/Sub oferece suporte a apenas uma mensagem pendente para cada chave de ordenação por vez. Cada mensagem é enviada a um endpoint push como uma solicitação separada. Assim, o envio das solicitações em paralelo teria o mesmo problema que a entrega de vários lotes de mensagens para a mesma chave de ordenação para assinantes de pull simultaneamente. As assinaturas push podem não ser uma boa opção para tópicos em que as mensagens são publicadas com frequência com a mesma chave de ordenação ou em que a latência é extremamente importante.

Clientes assinantes de exportação

As assinaturas de exportação oferecem suporte a mensagens ordenadas. Para assinaturas do BigQuery, as mensagens com a mesma chave de ordenação são gravadas na tabela do BigQuery em ordem. Para assinaturas do Cloud Storage, as mensagens com a mesma chave de ordenação podem não ser gravadas no mesmo arquivo. Quando estão no mesmo arquivo, as mensagens de uma chave de ordenação estão em ordem. Quando distribuídas por vários arquivos, as mensagens posteriores de uma chave de ordenação podem aparecer em um arquivo com um nome que tem um carimbo de data/hora anterior ao carimbo de data/hora no nome do arquivo com as mensagens anteriores.

Ativar ordenação de mensagens

Para receber as mensagens em ordem, defina a propriedade de ordenação das mensagens na assinatura que recebe as mensagens. O recebimento de mensagens pode aumentar a latência. Não é possível mudar a propriedade de ordenação de mensagens depois de criar uma assinatura.

É possível definir a propriedade de ordenação de mensagens ao criar uma assinatura usando o Google Cloud console, a Google Cloud CLI ou a API Pub/Sub.

Console

Para criar uma assinatura com a propriedade de ordenação de mensagens, siga estas etapas:

  1. No Google Cloud console do, acesse a página Assinaturas.

Acessar "Assinaturas"

  1. Clique em Criar assinatura.

  2. Insira um ID de assinatura.

  3. Escolha um tópico do qual você quer receber mensagens.

  4. Na seção Ordem das mensagens, selecione Ordenar mensagens com uma chave de ordem.

  5. Clique em Criar.

gcloud

Para criar uma assinatura com a propriedade de ordenação de mensagens, use o gcloud pubsub subscriptions create comando e a --enable-message-ordering sinalização:

gcloud pubsub subscriptions create SUBSCRIPTION_ID \
  --enable-message-ordering

Substitua SUBSCRIPTION_ID pelo ID da assinatura.

Se a solicitação for bem-sucedida, a linha de comando exibirá uma confirmação:

Created subscription [SUBSCRIPTION_ID].

REST

Para criar uma assinatura com a propriedade de ordenação de mensagens, envie uma solicitação PUT como a seguinte:

PUT https://pubsub.googleapis.com/v1/projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID
Authorization: Bearer $(gcloud auth application-default print-access-token)

Substitua:

  • PROJECT_ID: o ID do projeto com o tópico.
  • SUBSCRIPTION_ID: o ID da assinatura.

No corpo da solicitação, especifique o seguinte:

{
  "topic": TOPIC_ID,
  "enableMessageOrdering": true,
}

Substitua TOPIC_ID pelo ID do tópico a ser anexado à assinatura.

Se a solicitação for bem-sucedida, a resposta será a assinatura no formato JSON:

{
  "name": projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID,
  "topic": projects/PROJECT_ID/topics/TOPIC_ID,
  "enableMessageOrdering": true,
}

C++

Antes de tentar esse exemplo, siga as instruções de configuração do C++ em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub C++.

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::SubscriptionAdminClient client,
   std::string const& project_id, std::string const& topic_id,
   std::string const& subscription_id) {
  google::pubsub::v1::Subscription request;
  request.set_name(
      pubsub::Subscription(project_id, subscription_id).FullName());
  request.set_topic(pubsub::Topic(project_id, topic_id).FullName());
  request.set_enable_message_ordering(true);
  auto sub = client.CreateSubscription(request);
  if (sub.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The subscription already exists\n";
    return;
  }
  if (!sub) throw std::move(sub).status();

  std::cout << "The subscription was successfully created: "
            << sub->DebugString() << "\n";
}

C#

Antes de tentar esse exemplo, siga as instruções de configuração do C# em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub C# .


using Google.Cloud.PubSub.V1;
using Grpc.Core;

public class CreateSubscriptionWithOrderingSample
{
    public Subscription CreateSubscriptionWithOrdering(string projectId, string topicId, string subscriptionId)
    {
        SubscriberServiceApiClient subscriber = SubscriberServiceApiClient.Create();
        var topicName = TopicName.FromProjectTopic(projectId, topicId);
        var subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);

        var subscriptionRequest = new Subscription
        {
            SubscriptionName = subscriptionName,
            TopicAsTopicName = topicName,
            EnableMessageOrdering = true
        };

        Subscription subscription = null;
        try
        {
            subscription = subscriber.CreateSubscription(subscriptionRequest);
        }
        catch (RpcException e) when (e.Status.StatusCode == StatusCode.AlreadyExists)
        {
            // Already exists.  That's fine.
        }
        return subscription;
    }
}

Go

O exemplo a seguir usa a versão principal da biblioteca de cliente do Go Pub/Sub (v2). Se você ainda estiver usando a biblioteca v1, consulte o guia de migração para a v2. Para conferir uma lista de exemplos de código da v1, consulte os exemplos de código obsoletos.

Antes de tentar esse exemplo, siga as instruções de configuração do Go em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Go.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub/v2"
	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
)

func createWithOrdering(w io.Writer, projectID, topic, subscription string) error {
	// projectID := "my-project-id"
	// topic := "projects/my-project-id/topics/my-topic"
	// subscription := "projects/my-project/subscriptions/my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	// Message ordering can only be set when creating a subscription.
	sub, err := client.SubscriptionAdminClient.CreateSubscription(ctx, &pubsubpb.Subscription{
		Name:                  subscription,
		Topic:                 topic,
		EnableMessageOrdering: true,
	})
	if err != nil {
		return fmt.Errorf("CreateSubscription: %w", err)
	}
	fmt.Fprintf(w, "Created subscription: %v\n", sub)
	return nil
}

Java

Antes de tentar essa amostra, siga as instruções de configuração do Java em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub para Java (em inglês).

import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.Subscription;
import java.io.IOException;

public class CreateSubscriptionWithOrdering {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    String subscriptionId = "your-subscription-id";

    createSubscriptionWithOrderingExample(projectId, topicId, subscriptionId);
  }

  public static void createSubscriptionWithOrderingExample(
      String projectId, String topicId, String subscriptionId) throws IOException {
    try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {

      ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
      ProjectSubscriptionName subscriptionName =
          ProjectSubscriptionName.of(projectId, subscriptionId);

      Subscription subscription =
          subscriptionAdminClient.createSubscription(
              Subscription.newBuilder()
                  .setName(subscriptionName.toString())
                  .setTopic(topicName.toString())
                  // Set message ordering to true for ordered messages in the subscription.
                  .setEnableMessageOrdering(true)
                  .build());

      System.out.println("Created a subscription with ordering: " + subscription.getAllFields());
    }
  }
}

Node.js

Antes de tentar essa amostra, siga as instruções de configuração do Node.js em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub para Node.js (em inglês).

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithOrdering(
  topicNameOrId,
  subscriptionNameOrId,
) {
  // Creates a new subscription
  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, {
      enableMessageOrdering: true,
    });
  console.log(
    `Created subscription ${subscriptionNameOrId} with ordering enabled.`,
  );
  console.log(
    'To process messages in order, remember to add an ordering key to your messages.',
  );
}

Node.js

Antes de tentar essa amostra, siga as instruções de configuração do Node.js em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub para Node.js (em inglês).

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
import {PubSub} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithOrdering(
  topicNameOrId: string,
  subscriptionNameOrId: string,
) {
  // Creates a new subscription
  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, {
      enableMessageOrdering: true,
    });
  console.log(
    `Created subscription ${subscriptionNameOrId} with ordering enabled.`,
  );
  console.log(
    'To process messages in order, remember to add an ordering key to your messages.',
  );
}

Python

Antes de tentar esse exemplo, siga as instruções de configuração do Python em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Python do Pub/Sub.

from google.cloud import pubsub_v1

# TODO(developer): Choose an existing topic.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# subscription_id = "your-subscription-id"

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)

with subscriber:
    subscription = subscriber.create_subscription(
        request={
            "name": subscription_path,
            "topic": topic_path,
            "enable_message_ordering": True,
        }
    )
    print(f"Created subscription with ordering: {subscription}")

Ruby

O exemplo a seguir usa a biblioteca de cliente do Ruby Pub/Sub v3. Se você ainda estiver usando a biblioteca v2, consulte o guia de migração para a v3. Para conferir uma lista de exemplos de código do Ruby v2, consulte os exemplos de código obsoletos.

Antes de tentar esse exemplo, siga as instruções de configuração do Ruby em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Ruby.

# topic_id        = "your-topic-id"
# subscription_id = "your-subscription-id"

pubsub = Google::Cloud::PubSub.new
subscription_admin = pubsub.subscription_admin

subscription = subscription_admin.create_subscription \
  name: pubsub.subscription_path(subscription_id),
  topic: pubsub.topic_path(topic_id),
  enable_message_ordering: true

puts "Pull subscription #{subscription_id} created with message ordering."

A seguir