S'abonner de manière optimiste sans créer d'abonnement au préalable

Réduit les opérations administratives en tentant d'abord de s'abonner, puis en créant un abonnement en cas d'erreur NotFound.

En savoir plus

Pour obtenir une documentation détaillée incluant cet exemple de code, consultez les articles suivants :

Exemple de code

C++

Avant d'essayer cet exemple, suivez les instructions de configuration pour C++ du guide de démarrage rapide de Pub/Sub à l'aide de bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour C++.

Pour vous authentifier auprès de Pub/Sub, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

auto process_response = [](gc::StatusOr<pubsub::PullResponse> response) {
  if (response) {
    std::cout << "Received message " << response->message << "\n";
    std::move(response->handler).ack();
    return gc::Status();
  }
  if (response.status().code() == gc::StatusCode::kUnavailable &&
      response.status().message() == "no messages returned") {
    std::cout << "No messages returned from Pull()\n";
    return gc::Status();
  }
  return response.status();
};

// Instead of checking if the subscription exists, optimistically try to
// consume from the subscription.
auto status = process_response(subscriber.Pull());
if (status.ok()) return;
if (status.code() != gc::StatusCode::kNotFound) throw std::move(status);

// Since the subscription does not exist, create the subscription.
pubsub_admin::SubscriptionAdminClient subscription_admin_client(
    pubsub_admin::MakeSubscriptionAdminConnection());
google::pubsub::v1::Subscription request;
request.set_name(
    pubsub::Subscription(project_id, subscription_id).FullName());
request.set_topic(
    pubsub::Topic(project_id, std::move(topic_id)).FullName());
auto sub = subscription_admin_client.CreateSubscription(request);
if (!sub) throw std::move(sub).status();

// Consume from the new subscription.
status = process_response(subscriber.Pull());
if (!status.ok()) throw std::move(status);

C#

Avant d'essayer cet exemple, suivez les instructions de configuration pour C# du guide de démarrage rapide de Pub/Sub à l'aide de bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour C#.

Pour vous authentifier auprès de Pub/Sub, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.


using Google.Cloud.PubSub.V1;
using Grpc.Core;
using System;
using System.Threading;
using System.Threading.Tasks;

public class OptimisticSubscribeSample
{
    public async Task<int> OptimisticSubscribe(string projectId, string topicId, string subscriptionId)
    {
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
        SubscriberClient subscriber = await SubscriberClient.CreateAsync(subscriptionName);

        int messageCount = 0;
        try
        {
            Task startTask = subscriber.StartAsync((PubsubMessage message, CancellationToken cancel) =>
            {
                string text = message.Data.ToStringUtf8();
                Console.WriteLine($"Message {message.MessageId}: {text}");
                Interlocked.Increment(ref messageCount);
                return Task.FromResult(SubscriberClient.Reply.Ack);
            });
            // Run for 5 seconds.
            await Task.Delay(5000);
            await subscriber.StopAsync(CancellationToken.None);
            // Wait for the StartAsync call to finish
            await startTask;
        }
        // Catch exception when subscription is not found
        catch (RpcException e) when (e.Status.StatusCode == StatusCode.NotFound)
        {
            Console.WriteLine($"Message: {e.Message}");
            Console.WriteLine($"StackTrace: {e.StackTrace}");

            SubscriberServiceApiClient subscriberServiceApiClient = SubscriberServiceApiClient.Create();
            TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
            Subscription subscription = subscriberServiceApiClient.CreateSubscription(subscriptionName, topicName, pushConfig: null, ackDeadlineSeconds: 60);
            // Recursively call OptimisticSubscribe to restart the subscriber
            return await OptimisticSubscribe(projectId, topicId, subscriptionId);
        }
        return messageCount;
    }
}

Go

Avant d'essayer cet exemple, suivez les instructions de configuration pour Go du guide de démarrage rapide de Pub/Sub à l'aide de bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Go.

Pour vous authentifier auprès de Pub/Sub, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

import (
	"context"
	"errors"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/pubsub/v2"
	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"
)

// optimisticSubscribe shows the recommended pattern for optimistically
// assuming a subscription exists prior to receiving messages.
func optimisticSubscribe(w io.Writer, projectID, topic, subscriptionName string) error {
	// projectID := "my-project-id"
	// topic := "projects/my-project-id/topics/my-topic"
	// subscription := "projects/my-project/subscriptions/my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	// client.Subscriber can be passed a subscription ID (e.g. "my-sub") or
	// a fully qualified name (e.g. "projects/my-project/subscriptions/my-sub").
	// If a subscription ID is provided, the project ID from the client is used.
	sub := client.Subscriber(subscriptionName)

	// Receive messages for 10 seconds, which simplifies testing.
	// Comment this out in production, since `Receive` should
	// be used as a long running operation.
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()

	// Instead of checking if the subscription exists, optimistically try to
	// receive from the subscription assuming it exists.
	err = sub.Receive(ctx, func(_ context.Context, msg *pubsub.Message) {
		fmt.Fprintf(w, "Got from existing subscription: %q\n", string(msg.Data))
		msg.Ack()
	})
	if err != nil {
		if st, ok := status.FromError(err); ok {
			if st.Code() == codes.NotFound {
				// If the subscription does not exist, then create the subscription.
				subscription, err := client.SubscriptionAdminClient.CreateSubscription(ctx, &pubsubpb.Subscription{
					Name:  subscriptionName,
					Topic: topic,
				})
				if err != nil {
					return err
				}
				fmt.Fprintf(w, "Created subscription: %q\n", subscriptionName)

				// client.Subscriber can be passed a subscription ID (e.g. "my-sub") or
				// a fully qualified name (e.g. "projects/my-project/subscriptions/my-sub").
				// If a subscription ID is provided, the project ID from the client is used.
				sub = client.Subscriber(subscription.GetName())
				err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
					fmt.Fprintf(w, "Got from new subscription: %q\n", string(msg.Data))
					msg.Ack()
				})
				if err != nil && !errors.Is(err, context.Canceled) {
					return err
				}
			}
		}
	}
	return nil
}

Java

Avant d'essayer cet exemple, suivez les instructions de configuration pour Java du guide de démarrage rapide de Pub/Sub à l'aide de bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Java.

Pour vous authentifier auprès de Pub/Sub, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.


import com.google.api.gax.rpc.NotFoundException;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PushConfig;
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class OptimisticSubscribeExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";
    String topicId = "your-topic-id";

    optimisticSubscribeExample(projectId, subscriptionId, topicId);
  }

  public static void optimisticSubscribeExample(
      String projectId, String subscriptionId, String topicId) throws IOException {
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          // Handle incoming message, then ack the received message.
          System.out.println("Id: " + message.getMessageId());
          System.out.println("Data: " + message.getData().toStringUtf8());
          consumer.ack();
        };

    Subscriber subscriber = null;
    try {
      subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();

      // Listen for resource NOT_FOUND errors and rebuild the  subscriber and restart subscribing
      // when the current subscriber encounters these errors.
      subscriber.addListener(
          new Subscriber.Listener() {
            public void failed(Subscriber.State from, Throwable failure) {
              System.out.println(failure.getStackTrace());
              if (failure instanceof NotFoundException) {
                try (SubscriptionAdminClient subscriptionAdminClient =
                    SubscriptionAdminClient.create()) {
                  TopicName topicName = TopicName.of(projectId, topicId);
                  // Create a pull subscription with default acknowledgement deadline of 10 seconds.
                  // The client library will automatically extend acknowledgement deadlines.
                  Subscription subscription =
                      subscriptionAdminClient.createSubscription(
                          subscriptionName, topicName, PushConfig.getDefaultInstance(), 10);
                  System.out.println("Created pull subscription: " + subscription.getName());
                  optimisticSubscribeExample(projectId, subscriptionId, topicId);
                } catch (IOException err) {
                  System.out.println("Failed to create pull subscription: " + err.getMessage());
                }
              }
            }
          },
          MoreExecutors.directExecutor());

      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (IllegalStateException e) {
      // Prevent an exception from being thrown if it is the expected NotFoundException
      if (!(subscriber.failureCause() instanceof NotFoundException)) {
        throw e;
      }
    } catch (TimeoutException e) {
      subscriber.stopAsync();
    }
  }
}

PHP

Avant d'essayer cet exemple, suivez les instructions de configuration pour PHP du guide de démarrage rapide de Pub/Sub à l'aide de bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour PHP.

Pour vous authentifier auprès de Pub/Sub, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

use Google\Cloud\Core\Exception\NotFoundException;
use Google\Cloud\PubSub\PubSubClient;

/**
 * Optimistically subscribes to a topic
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $subscriptionId  The ID of the subscription.
 */
function optimistic_subscribe(
    string $projectId,
    string $topicName,
    string $subscriptionId
): void {

    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);

    $subscription = $pubsub->subscription($subscriptionId);

    try {
        $messages = $subscription->pull();
        foreach ($messages as $message) {
            printf('PubSub Message: %s' . PHP_EOL, $message->data());
            $subscription->acknowledge($message);
        }
    } catch (NotFoundException $e) { // Subscription is not found
        printf('Exception Message: %s' . PHP_EOL, $e->getMessage());
        printf('StackTrace: %s' . PHP_EOL, $e->getTraceAsString());
        // Create subscription and retry the pull. Any messages published before subscription creation would not be received.
        $pubsub->subscribe($subscriptionId, $topicName);
        optimistic_subscribe($projectId, $topicName, $subscriptionId);
    }
}

Ruby

Avant d'essayer cet exemple, suivez les instructions de configuration pour Ruby du guide de démarrage rapide de Pub/Sub à l'aide de bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Ruby.

Pour vous authentifier auprès de Pub/Sub, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

# topic_id = "your-topic-id"
# subscription_id = "your-subscription-id"

pubsub = Google::Cloud::Pubsub.new

# Propagate exception from child threads to the main thread as soon as it is
# raised. Exceptions happened in the callback thread are collected in the
# callback thread pool and do not propagate to the main thread
Thread.abort_on_exception = true

begin
  subscriber = pubsub.subscriber subscription_id
  listener   = subscriber.listen do |received_message|
    puts "Received message: #{received_message.data}"
    received_message.acknowledge!
  end
  listener.start
  # Let the main thread sleep for 60 seconds so the thread for listening
  # messages does not quit
  sleep 60
  listener.stop.wait!
rescue Google::Cloud::NotFoundError => e
  puts "Subscription #{subscription_id} does not exist."

  subscription = pubsub.subscription_admin.create_subscription \
    name: pubsub.subscription_path(subscription_id),
    topic: pubsub.topic_path(topic_id)

  puts "Subscription #{subscription_id} created."
end

Étape suivante

Pour rechercher et filtrer des exemples de code pour d'autres produits Google Cloud , consultez l'explorateur d'exemplesGoogle Cloud .