透過同步提取和釋出管理功能訂閱

使用同步提取功能接收訊息,並修改確認期限。

深入探索

如需包含這個程式碼範例的詳細說明文件,請參閱下列文章:

程式碼範例

C#

在試用這個範例之前,請先按照「使用用戶端程式庫的 Pub/Sub 快速入門導覽課程」中的 C# 設定說明操作。詳情請參閱 Pub/Sub C# API 參考文件

如要向 Pub/Sub 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定驗證機制」。


using Google.Cloud.PubSub.V1;
using Grpc.Core;
using System;
using System.Collections.Generic;

public class PullMessageWithLeaseManagementSample
{
    public int PullMessageWithLeaseManagement(string projectId, string subscriptionId, bool acknowledge)
    {
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
        SubscriberServiceApiClient subscriberClient = SubscriberServiceApiClient.Create();

        var ackIds = new List<string>();
        try
        {
            PullResponse response = subscriberClient.Pull(subscriptionName, maxMessages: 20);

            // Print out each received message.
            foreach (ReceivedMessage msg in response.ReceivedMessages)
            {
                ackIds.Add(msg.AckId);
                string text = msg.Message.Data.ToStringUtf8();
                Console.WriteLine($"Message {msg.Message.MessageId}: {text}");

                // Modify the ack deadline of each received message from the default 10 seconds to 30.
                // This prevents the server from redelivering the message after the default 10 seconds
                // have passed.
                subscriberClient.ModifyAckDeadline(subscriptionName, new List<string> { msg.AckId }, 30);
            }
            // If acknowledgement required, send to server.
            if (acknowledge && ackIds.Count > 0)
            {
                subscriberClient.Acknowledge(subscriptionName, ackIds);
            }
        }
        catch (RpcException ex) when (ex.Status.StatusCode == StatusCode.Unavailable)
        {
            // UNAVAILABLE due to too many concurrent pull requests pending for the given subscription.
        }
        return ackIds.Count;
    }
}

Java

在試用這個範例之前,請先按照「使用用戶端程式庫的 Pub/Sub 快速入門導覽課程」中的 Java 設定說明操作。詳情請參閱 Pub/Sub Java API 參考文件

如要向 Pub/Sub 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定驗證機制」。


import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.ModifyAckDeadlineRequest;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PullRequest;
import com.google.pubsub.v1.PullResponse;
import com.google.pubsub.v1.ReceivedMessage;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class SubscribeSyncWithLeaseExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";
    Integer numOfMessages = 10;

    subscribeSyncWithLeaseExample(projectId, subscriptionId, numOfMessages);
  }

  public static void subscribeSyncWithLeaseExample(
      String projectId, String subscriptionId, Integer numOfMessages)
      throws IOException, InterruptedException {
    SubscriberStubSettings subscriberStubSettings =
        SubscriberStubSettings.newBuilder()
            .setTransportChannelProvider(
                SubscriberStubSettings.defaultGrpcTransportProviderBuilder()
                    .setMaxInboundMessageSize(20 << 20) // 20 MB
                    .build())
            .build();

    try (SubscriberStub subscriber = GrpcSubscriberStub.create(subscriberStubSettings)) {

      String subscriptionName = ProjectSubscriptionName.format(projectId, subscriptionId);

      PullRequest pullRequest =
          PullRequest.newBuilder()
              .setMaxMessages(numOfMessages)
              .setSubscription(subscriptionName)
              .build();

      // Use pullCallable().futureCall to asynchronously perform this operation.
      PullResponse pullResponse = subscriber.pullCallable().call(pullRequest);

      // Stop the program if the pull response is empty to avoid acknowledging
      // an empty list of ack IDs.
      if (pullResponse.getReceivedMessagesList().isEmpty()) {
        System.out.println("No message was pulled. Exiting.");
        return;
      }

      List<String> ackIds = new ArrayList<>();
      for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) {
        ackIds.add(message.getAckId());

        // Modify the ack deadline of each received message from the default 10 seconds to 30.
        // This prevents the server from redelivering the message after the default 10 seconds
        // have passed.
        ModifyAckDeadlineRequest modifyAckDeadlineRequest =
            ModifyAckDeadlineRequest.newBuilder()
                .setSubscription(subscriptionName)
                .addAckIds(message.getAckId())
                .setAckDeadlineSeconds(30)
                .build();

        subscriber.modifyAckDeadlineCallable().call(modifyAckDeadlineRequest);
      }

      // Acknowledge received messages.
      AcknowledgeRequest acknowledgeRequest =
          AcknowledgeRequest.newBuilder()
              .setSubscription(subscriptionName)
              .addAllAckIds(ackIds)
              .build();

      // Use acknowledgeCallable().futureCall to asynchronously perform this operation.
      subscriber.acknowledgeCallable().call(acknowledgeRequest);
      System.out.println(pullResponse.getReceivedMessagesList());
    }
  }
}

Ruby

在試用這個範例之前,請先按照「使用用戶端程式庫的 Pub/Sub 快速入門導覽課程」中的 Ruby 設定說明操作。詳情請參閱 Pub/Sub Ruby API 參考文件

如要向 Pub/Sub 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定驗證機制」。

# subscription_id = "your-subscription-id"

pubsub = Google::Cloud::PubSub.new
subscriber = pubsub.subscriber subscription_id
new_ack_deadline = 30
processed = false

# The subscriber pulls a specified number of messages.
received_messages = subscriber.pull immediate: false, max: 1
# Obtain the first message.
message = received_messages.first

# Send the message to a non-blocking worker that starts a long-running
# process, such as writing the message to a table, which may take longer than
# the default 10-second acknowledge deadline.
Thread.new do
  sleep 15
  processed = true
  puts "Finished processing \"#{message.data}\"."
end

loop do
  sleep 1
  if processed
    # If the message has been processed, acknowledge the message.
    message.acknowledge!
    puts "Done."
    # Exit after the message is acknowledged.
    break
  else
    # If the message has not yet been processed, reset its ack deadline.
    message.modify_ack_deadline! new_ack_deadline
    puts "Reset ack deadline for \"#{message.data}\" for " \
         "#{new_ack_deadline} seconds."
  end
end

後續步驟

如要搜尋及篩選其他 Google Cloud 產品的程式碼範例,請參閱Google Cloud 範例瀏覽工具