Nachrichten in Pub/Sub mit einer Clientbibliothek veröffentlichen und empfangen

Mit dem Pub/Sub-Dienst können Anwendungen Nachrichten verlässlich, schnell und asynchron austauschen. Die Ereignisabfolge ist wie folgt:

  1. Ein Datenersteller veröffentlicht eine Nachricht zu einem Pub/Sub-Thema.
  2. Ein Abonnentenclient erstellt ein Abo für dieses Thema und ruft Nachrichten aus dem Abo ab.

Sie können eine Pub/Sub-Umgebung mit einer der folgenden Methoden einrichten: Google Cloud Console, Cloud Shell, Clientbibliotheken oder REST APIs. Auf dieser Seite erfahren Sie, wie Sie Nachrichten mit Pub/Sub mithilfe von Clientbibliotheken veröffentlichen.

Pub/Sub bietet eine automatisch generierte Clientbibliothek auf hoher und niedriger Ebene. Standardmäßig empfehlen wir, wie in dieser Kurzanleitung, die Clientbibliothek auf hoher Ebene.


Eine detaillierte Anleitung dazu finden Sie direkt in der Google Cloud Console. Klicken Sie dazu einfach auf Anleitung:

Anleitung


Hinweis

  1. Melden Sie sich in Ihrem Google Cloud -Konto an. Wenn Sie mit Google Cloudnoch nicht vertraut sind, erstellen Sie ein Konto, um die Leistungsfähigkeit unserer Produkte in der Praxis sehen und bewerten zu können. Neukunden erhalten außerdem ein Guthaben von 300 $, um Arbeitslasten auszuführen, zu testen und bereitzustellen.
  2. Installieren Sie die Google Cloud CLI.

  3. Wenn Sie einen externen Identitätsanbieter (IdP) verwenden, müssen Sie sich zuerst mit Ihrer föderierten Identität in der gcloud CLI anmelden.

  4. Führen Sie den folgenden Befehl aus, um die gcloud CLI zu initialisieren:

    gcloud init
  5. Erstellen Sie ein Google Cloud Projekt oder wählen Sie eines aus.

    Rollen, die zum Auswählen oder Erstellen eines Projekts erforderlich sind

    • Projekt auswählen: Für die Auswahl eines Projekts ist keine bestimmte IAM-Rolle erforderlich. Sie können jedes Projekt auswählen, für das Ihnen eine Rolle zugewiesen wurde.
    • Projekt erstellen: Zum Erstellen eines Projekts benötigen Sie die Rolle „Projektersteller“ (roles/resourcemanager.projectCreator), die die Berechtigung resourcemanager.projects.create enthält. Weitere Informationen zum Zuweisen von Rollen
    • So erstellen Sie ein Google Cloud -Projekt:

      gcloud projects create PROJECT_ID

      Ersetzen Sie PROJECT_ID durch einen Namen für das Google Cloud -Projekt, das Sie erstellen.

    • Wählen Sie das von Ihnen erstellte Google Cloud Projekt aus:

      gcloud config set project PROJECT_ID

      Ersetzen Sie PROJECT_ID durch den Namen Ihres Projekts in Google Cloud .

  6. Prüfen Sie, ob für Ihr Google Cloud Projekt die Abrechnung aktiviert ist.

  7. Pub/Sub API aktivieren:

    Rollen, die zum Aktivieren von APIs erforderlich sind

    Zum Aktivieren von APIs benötigen Sie die IAM-Rolle „Service Usage-Administrator“ (roles/serviceusage.serviceUsageAdmin), die die Berechtigung serviceusage.services.enable enthält. Weitere Informationen zum Zuweisen von Rollen

    gcloud services enable pubsub.googleapis.com
  8. Erstellen Sie lokale Anmeldedaten zur Authentifizierung für Ihr Nutzerkonto:

    gcloud auth application-default login

    Wenn ein Authentifizierungsfehler zurückgegeben wird und Sie einen externen Identitätsanbieter (IdP) verwenden, prüfen Sie, ob Sie sich mit Ihrer föderierten Identität in der gcloud CLI angemeldet haben.

  9. Weisen Sie Ihrem Nutzerkonto Rollen zu. Führen Sie den folgenden Befehl für jede der folgenden IAM-Rollen einmal aus: roles/pubsub.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    Ersetzen Sie Folgendes:

    • PROJECT_ID: Ihre Projekt-ID.
    • USER_IDENTIFIER: Die Kennung für Ihr Nutzerkonto . Beispiel: myemail@example.com
    • ROLE: Die IAM-Rolle, die Sie Ihrem Nutzerkonto zuweisen.
  10. Installieren Sie die Google Cloud CLI.

  11. Wenn Sie einen externen Identitätsanbieter (IdP) verwenden, müssen Sie sich zuerst mit Ihrer föderierten Identität in der gcloud CLI anmelden.

  12. Führen Sie den folgenden Befehl aus, um die gcloud CLI zu initialisieren:

    gcloud init
  13. Erstellen Sie ein Google Cloud Projekt oder wählen Sie eines aus.

    Rollen, die zum Auswählen oder Erstellen eines Projekts erforderlich sind

    • Projekt auswählen: Für die Auswahl eines Projekts ist keine bestimmte IAM-Rolle erforderlich. Sie können jedes Projekt auswählen, für das Ihnen eine Rolle zugewiesen wurde.
    • Projekt erstellen: Zum Erstellen eines Projekts benötigen Sie die Rolle „Projektersteller“ (roles/resourcemanager.projectCreator), die die Berechtigung resourcemanager.projects.create enthält. Weitere Informationen zum Zuweisen von Rollen
    • So erstellen Sie ein Google Cloud -Projekt:

      gcloud projects create PROJECT_ID

      Ersetzen Sie PROJECT_ID durch einen Namen für das Google Cloud -Projekt, das Sie erstellen.

    • Wählen Sie das von Ihnen erstellte Google Cloud Projekt aus:

      gcloud config set project PROJECT_ID

      Ersetzen Sie PROJECT_ID durch den Namen Ihres Projekts in Google Cloud .

  14. Prüfen Sie, ob für Ihr Google Cloud Projekt die Abrechnung aktiviert ist.

  15. Pub/Sub API aktivieren:

    Rollen, die zum Aktivieren von APIs erforderlich sind

    Zum Aktivieren von APIs benötigen Sie die IAM-Rolle „Service Usage-Administrator“ (roles/serviceusage.serviceUsageAdmin), die die Berechtigung serviceusage.services.enable enthält. Weitere Informationen zum Zuweisen von Rollen

    gcloud services enable pubsub.googleapis.com
  16. Erstellen Sie lokale Anmeldedaten zur Authentifizierung für Ihr Nutzerkonto:

    gcloud auth application-default login

    Wenn ein Authentifizierungsfehler zurückgegeben wird und Sie einen externen Identitätsanbieter (IdP) verwenden, prüfen Sie, ob Sie sich mit Ihrer föderierten Identität in der gcloud CLI angemeldet haben.

  17. Weisen Sie Ihrem Nutzerkonto Rollen zu. Führen Sie den folgenden Befehl für jede der folgenden IAM-Rollen einmal aus: roles/pubsub.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    Ersetzen Sie Folgendes:

    • PROJECT_ID: Ihre Projekt-ID.
    • USER_IDENTIFIER: Die Kennung für Ihr Nutzerkonto . Beispiel: myemail@example.com
    • ROLE: Die IAM-Rolle, die Sie Ihrem Nutzerkonto zuweisen.

Clientbibliotheken installieren

Die folgenden Beispiele zeigen, wie Sie die Clientbibliotheken installieren:

Python

Weitere Informationen zur Einrichtung der Python-Entwicklungsumgebung finden Sie im Einrichtungshandbuch für die Python-Entwicklungsumgebung.

# ensure that you are using virtualenv
# as described in the python dev setup guide

pip install --upgrade google-cloud-pubsub

C++

Weitere Informationen zur Installation der C++-Bibliothek finden Sie unter GitHub-README.

C#

Install-Package Google.Cloud.PubSub.V1 -Pre

Go

go get cloud.google.com/go/pubsub

Java

Wenn Sie Maven verwenden, fügen Sie Ihrer Datei pom.xml den folgenden Code hinzu. Weitere Informationen zu BOMs finden Sie unter The Google Cloud Platform Libraries BOM.

<dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>libraries-bom</artifactId>
      <version>26.76.0</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
  </dependencies>
</dependencyManagement>

<dependencies>
  <dependency>
    <groupId>com.google.cloud</groupId>
    <artifactId>google-cloud-pubsub</artifactId>
  </dependency>

</dependencies>

Wenn Sie Gradle verwenden, fügen Sie den Abhängigkeiten Folgendes hinzu:

implementation platform('com.google.cloud:libraries-bom:26.78.0')

implementation 'com.google.cloud:google-cloud-pubsub'

Wenn Sie sbt nutzen, fügen Sie den Abhängigkeiten Folgendes hinzu:

libraryDependencies += "com.google.cloud" % "google-cloud-pubsub" % "1.150.0"

Wenn Sie Visual Studio Code oder IntelliJ verwenden, können Sie Ihrem Projekt mithilfe der folgenden IDE-Plug-ins Clientbibliotheken hinzufügen:

Diese Plug-ins bieten zusätzliche Funktionen wie die Schlüsselverwaltung für Dienstkonten. Einzelheiten finden Sie in der Dokumentation der einzelnen Plug-ins.

Node.js

npm install @google-cloud/pubsub

PHP

composer require google/cloud-pubsub

Ruby

gem install google-cloud-pubsub

Thema und Abo erstellen

Nachdem Sie ein Thema erstellt haben, können Sie es abonnieren oder Nachrichten für das Thema veröffentlichen.

Verwenden Sie den folgenden gcloud pubsub topics create-Befehl, um ein Thema mit dem Namen my-topic zu erstellen. Ändern Sie den Namen des Themas nicht, da er im restlichen Teil des Tutorials verwendet wird.

gcloud pubsub topics create my-topic

Verwenden Sie den Befehl gcloud pubsub subscriptions create, um ein Abo zu erstellen. Nur Nachrichten, die nach dem Erstellen des Abos für das Thema veröffentlicht wurden, sind für Abonnentenanwendungen verfügbar.

gcloud pubsub subscriptions create my-sub --topic my-topic

Nachrichten veröffentlichen

Bevor Sie die folgenden Beispiele ausführen, müssen Sie alle erforderlichen Werte, die im Code markiert sind, einkommentieren und eingeben. Dies ist erforderlich, um das Beispiel mit Ihrem Projekt und den Pub/Sub-Ressourcen zu verknüpfen, die Sie zuvor erstellt haben.

Verwenden Sie my-topic als Themen-ID.

Python

from google.cloud import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"

publisher = pubsub_v1.PublisherClient()
# The `topic_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/topics/{topic_id}`
topic_path = publisher.topic_path(project_id, topic_id)

for n in range(1, 10):
    data_str = f"Message number {n}"
    # Data must be a bytestring
    data = data_str.encode("utf-8")
    # When you publish a message, the client returns a future.
    future = publisher.publish(topic_path, data)
    print(future.result())

print(f"Published messages to {topic_path}.")

C++

#include "google/cloud/pubsub/publisher.h"
#include <iostream>

int main(int argc, char* argv[]) try {
  if (argc != 3) {
    std::cerr << "Usage: " << argv[0] << " <project-id> <topic-id>\n";
    return 1;
  }

  std::string const project_id = argv[1];
  std::string const topic_id = argv[2];

  // Create a namespace alias to make the code easier to read.
  namespace pubsub = ::google::cloud::pubsub;

  auto publisher = pubsub::Publisher(
      pubsub::MakePublisherConnection(pubsub::Topic(project_id, topic_id)));
  auto id =
      publisher
          .Publish(pubsub::MessageBuilder{}.SetData("Hello World!").Build())
          .get();
  if (!id) throw std::move(id).status();
  std::cout << "Hello World published with id=" << *id << "\n";

  return 0;
} catch (google::cloud::Status const& status) {
  std::cerr << "google::cloud::Status thrown: " << status << "\n";
  return 1;
}

C#


using Google.Cloud.PubSub.V1;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

public class PublishMessagesAsyncSample
{
    public async Task<int> PublishMessagesAsync(string projectId, string topicId, IEnumerable<string> messageTexts)
    {
        TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
        PublisherClient publisher = await PublisherClient.CreateAsync(topicName);

        int publishedMessageCount = 0;
        var publishTasks = messageTexts.Select(async text =>
        {
            try
            {
                string message = await publisher.PublishAsync(text);
                Console.WriteLine($"Published message {message}");
                Interlocked.Increment(ref publishedMessageCount);
            }
            catch (Exception exception)
            {
                Console.WriteLine($"An error occurred when publishing message {text}: {exception.Message}");
            }
        });
        await Task.WhenAll(publishTasks);
        // PublisherClient instance should be shutdown after use.
        // The TimeSpan specifies for how long to attempt to publish locally queued messages.
        await publisher.ShutdownAsync(TimeSpan.FromSeconds(15));
        return publishedMessageCount;
    }
}

Go

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub/v2"
)

func publish(w io.Writer, projectID, topicID, msg string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	// msg := "Hello World"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub: NewClient: %w", err)
	}
	defer client.Close()

	// client.Publisher can be passed a topic ID (e.g. "my-topic") or
	// a fully qualified name (e.g. "projects/my-project/topics/my-topic").
	// If a topic ID is provided, the project ID from the client is used.
	// Reuse this publisher for all publish calls to send messages in batches.
	publisher := client.Publisher(topicID)
	result := publisher.Publish(ctx, &pubsub.Message{
		Data: []byte(msg),
	})
	// Block until the result is returned and a server-generated
	// ID is returned for the published message.
	id, err := result.Get(ctx)
	if err != nil {
		return fmt.Errorf("pubsub: result.Get: %w", err)
	}
	fmt.Fprintf(w, "Published a message; msg ID: %v\n", id)
	return nil
}

Java


import com.google.api.core.ApiFuture;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class PublisherExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";

    publisherExample(projectId, topicId);
  }

  public static void publisherExample(String projectId, String topicId)
      throws IOException, ExecutionException, InterruptedException {
    TopicName topicName = TopicName.of(projectId, topicId);

    Publisher publisher = null;
    try {
      // Create a publisher instance with default settings bound to the topic
      publisher = Publisher.newBuilder(topicName).build();

      String message = "Hello World!";
      ByteString data = ByteString.copyFromUtf8(message);
      PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

      // Once published, returns a server-assigned message id (unique within the topic)
      ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
      String messageId = messageIdFuture.get();
      System.out.println("Published message ID: " + messageId);
    } finally {
      if (publisher != null) {
        // When finished with the publisher, shutdown to free up resources.
        publisher.shutdown();
        publisher.awaitTermination(1, TimeUnit.MINUTES);
      }
    }
  }
}

Node.js

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const data = JSON.stringify({foo: 'bar'});

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function publishMessage(topicNameOrId, data) {
  // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
  const dataBuffer = Buffer.from(data);

  // Cache topic objects (publishers) and reuse them.
  const topic = pubSubClient.topic(topicNameOrId);

  try {
    const messageId = await topic.publishMessage({data: dataBuffer});
    console.log(`Message ${messageId} published.`);
  } catch (error) {
    console.error(`Received error while publishing: ${error.message}`);
    process.exitCode = 1;
  }
}

Node.js

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const data = JSON.stringify({foo: 'bar'});

// Imports the Google Cloud client library
import {PubSub} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function publishMessage(topicNameOrId: string, data: string) {
  // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
  const dataBuffer = Buffer.from(data);

  // Cache topic objects (publishers) and reuse them.
  const topic = pubSubClient.topic(topicNameOrId);

  try {
    const messageId = await topic.publishMessage({data: dataBuffer});
    console.log(`Message ${messageId} published.`);
  } catch (error) {
    console.error(
      `Received error while publishing: ${(error as Error).message}`,
    );
    process.exitCode = 1;
  }
}

PHP

use Google\Cloud\PubSub\MessageBuilder;
use Google\Cloud\PubSub\PubSubClient;

/**
 * Publishes a message for a Pub/Sub topic.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $message  The message to publish.
 */
function publish_message($projectId, $topicName, $message)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);

    $topic = $pubsub->topic($topicName);
    $topic->publish((new MessageBuilder)->setData($message)->build());

    print('Message published' . PHP_EOL);
}

Ruby

# topic_id = "your-topic-id"

pubsub = Google::Cloud::PubSub.new
publisher = pubsub.publisher topic_id

publisher.publish "This is a test message."

puts "Message published."

Nachrichten empfangen

Richten Sie einen Abonnenten ein, um die soeben veröffentlichten Nachrichten abzurufen. Jeder Abonnent muss die einzelnen Nachrichten innerhalb eines konfigurierbaren Zeitfensters bestätigen. Unbestätigte Nachrichten werden noch einmal zugestellt. Beachten Sie, dass Pub/Sub eine Nachricht mehrmals liefert, um sicherzustellen, dass alle Nachrichten mindestens einmal zu einem Abonnenten gelangen.

Bevor Sie die folgenden Beispiele ausführen, müssen Sie alle erforderlichen Werte, die im Code markiert sind, einkommentieren und eingeben. Dies ist erforderlich, um das Beispiel mit Ihrem Projekt und den Pub/Sub-Ressourcen zu verknüpfen, die Sie zuvor erstellt haben.

Verwenden Sie my-sub für Ihre Abo-ID.

Weitere Beispiele, die zeigen, wie Nachrichten abgerufen werden, finden Sie unter Codebeispiele für Clientbibliotheken.

Python

from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0

subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    print(f"Received {message}.")
    message.ack()

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    try:
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception is encountered first.
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.

C++

#include "google/cloud/pubsub/message.h"
#include "google/cloud/pubsub/subscriber.h"
#include <iostream>

int main(int argc, char* argv[]) try {
  if (argc != 3) {
    std::cerr << "Usage: " << argv[0] << " <project-id> <subscription-id>\n";
    return 1;
  }

  std::string const project_id = argv[1];
  std::string const subscription_id = argv[2];

  auto constexpr kWaitTimeout = std::chrono::seconds(30);

  // Create a namespace alias to make the code easier to read.
  namespace pubsub = ::google::cloud::pubsub;

  auto subscriber = pubsub::Subscriber(pubsub::MakeSubscriberConnection(
      pubsub::Subscription(project_id, subscription_id)));

  auto session =
      subscriber.Subscribe([&](pubsub::Message const& m, pubsub::AckHandler h) {
        std::cout << "Received message " << m << "\n";
        std::move(h).ack();
      });

  std::cout << "Waiting for messages on " + subscription_id + "...\n";

  // Blocks until the timeout is reached.
  auto result = session.wait_for(kWaitTimeout);
  if (result == std::future_status::timeout) {
    std::cout << "timeout reached, ending session\n";
    session.cancel();
  }

  return 0;
} catch (google::cloud::Status const& status) {
  std::cerr << "google::cloud::Status thrown: " << status << "\n";
  return 1;
}

C#


using Google.Cloud.PubSub.V1;
using System;
using System.Threading;
using System.Threading.Tasks;

public class PullMessagesAsyncSample
{
    public async Task<int> PullMessagesAsync(string projectId, string subscriptionId, bool acknowledge)
    {
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
        SubscriberClient subscriber = await SubscriberClient.CreateAsync(subscriptionName);
        // SubscriberClient runs your message handle function on multiple
        // threads to maximize throughput.
        int messageCount = 0;
        Task startTask = subscriber.StartAsync((PubsubMessage message, CancellationToken cancel) =>
        {
            string text = message.Data.ToStringUtf8();
            Console.WriteLine($"Message {message.MessageId}: {text}");
            Interlocked.Increment(ref messageCount);
            return Task.FromResult(acknowledge ? SubscriberClient.Reply.Ack : SubscriberClient.Reply.Nack);
        });
        // Run for 5 seconds.
        await Task.Delay(5000);
        await subscriber.StopAsync(CancellationToken.None);
        // Lets make sure that the start task finished successfully after the call to stop.
        await startTask;
        return messageCount;
    }
}

Go

import (
	"context"
	"fmt"
	"io"
	"sync/atomic"
	"time"

	"cloud.google.com/go/pubsub/v2"
)

func pullMsgs(w io.Writer, projectID, subID string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	// client.Subscriber can be passed a subscription ID (e.g. "my-sub") or
	// a fully qualified name (e.g. "projects/my-project/subscriptions/my-sub").
	// If a subscription ID is provided, the project ID from the client is used.
	sub := client.Subscriber(subID)

	// Receive messages for 10 seconds, which simplifies testing.
	// Comment this out in production, since `Receive` should
	// be used as a long running operation.
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()

	var received int32
	err = sub.Receive(ctx, func(_ context.Context, msg *pubsub.Message) {
		fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))
		atomic.AddInt32(&received, 1)
		msg.Ack()
	})
	if err != nil {
		return fmt.Errorf("sub.Receive: %w", err)
	}
	fmt.Fprintf(w, "Received %d messages\n", received)

	return nil
}

Java


import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SubscribeAsyncExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";

    subscribeAsyncExample(projectId, subscriptionId);
  }

  public static void subscribeAsyncExample(String projectId, String subscriptionId) {
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          // Handle incoming message, then ack the received message.
          System.out.println("Id: " + message.getMessageId());
          System.out.println("Data: " + message.getData().toStringUtf8());
          consumer.ack();
        };

    Subscriber subscriber = null;
    try {
      subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
      // Start the subscriber.
      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      // Allow the subscriber to run for 30s unless an unrecoverable error occurs.
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (TimeoutException timeoutException) {
      // Shut down the subscriber after 30s. Stop receiving messages.
      subscriber.stopAsync();
    }
  }
}

Node.js

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const timeout = 60;

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

function listenForMessages(subscriptionNameOrId, timeout) {
  // References an existing subscription; if you are unsure if the
  // subscription will exist, try the optimisticSubscribe sample.
  const subscription = pubSubClient.subscription(subscriptionNameOrId);

  // Create an event handler to handle messages
  let messageCount = 0;
  const messageHandler = message => {
    console.log(`Received message ${message.id}:`);
    console.log(`\tData: ${message.data}`);
    console.log(`\tAttributes: ${message.attributes}`);
    messageCount += 1;

    // "Ack" (acknowledge receipt of) the message
    message.ack();
  };

  // Listen for new messages until timeout is hit
  subscription.on('message', messageHandler);

  // Wait a while for the subscription to run. (Part of the sample only.)
  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    console.log(`${messageCount} message(s) received.`);
  }, timeout * 1000);
}

PHP

use Google\Cloud\PubSub\PubSubClient;

/**
 * Pulls all Pub/Sub messages for a subscription.
 *
 * @param string $projectId  The Google project ID.
 * @param string $subscriptionName  The Pub/Sub subscription name.
 */
function pull_messages($projectId, $subscriptionName)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $subscription = $pubsub->subscription($subscriptionName);
    foreach ($subscription->pull() as $message) {
        printf('Message: %s' . PHP_EOL, $message->data());
        // Acknowledge the Pub/Sub message has been received, so it will not be pulled multiple times.
        $subscription->acknowledge($message);
    }
}

Ruby

# subscription_id = "your-subscription-id"

pubsub = Google::Cloud::PubSub.new
subscriber = pubsub.subscriber subscription_id

listener = subscriber.listen do |received_message|
  puts "Received message: #{received_message.data}"
  received_message.acknowledge!
end

listener.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
listener.stop.wait!

Wie ist es gelaufen?

Bereinigen (optional)

  1. Damit Ihrem Google Cloud Konto die in diesem Leitfaden verwendeten Ressourcen nicht in Rechnung gestellt werden, können Sie das Thema und das Abo über die Befehlszeile löschen.
      gcloud pubsub subscriptions delete my-sub
      gcloud pubsub topics delete my-topic
  2. Optional: Widerrufen Sie die von Ihnen erstellten Anmeldedaten für die Authentifizierung und löschen Sie die lokale Datei mit den Anmeldedaten:

    gcloud auth application-default revoke
  3. Optional: Widerrufen Sie Anmeldedaten von der gcloud-CLI.

    gcloud auth revoke

Nächste Schritte