פתרון בעיות כלליות

מידע על שלבים לפתרון בעיות שיכולים לעזור לכם אם נתקלתם בבעיות בשימוש ב-Pub/Sub.

אי אפשר ליצור נושא

מוודאים שיש לכם את ההרשאות הנדרשות. כדי ליצור נושא Pub/Sub, צריך את התפקיד עריכה ב-Pub/Sub (roles/pubsub.editor) בניהול זהויות והרשאות גישה (IAM) בפרויקט. אם לא הוקצה לכם התפקיד הזה, פנו לאדמין. למידע נוסף על פתרון בעיות שקשורות לנושאים, אפשר לעיין בדפים הבאים:

אי אפשר ליצור מינוי

חשוב לוודא שביצעתם את הפעולות הבאות:

  • מוודאים שיש לכם את ההרשאות הנדרשות. כדי ליצור מינוי ל-Pub/Sub, צריך את תפקיד ה-IAM‏ Pub/Sub Editor (roles/pubsub.editor) בפרויקט. אם לא הוקצה לכם התפקיד הזה, פנו לאדמין.

  • ציינתם שם למינוי.

  • ציינתם את השם של נושא קיים שאתם רוצים לצרף אליו את המינוי.

  • אם יוצרים מינוי לקבלת עדכונים בדחיפה, צריך לציין https:// באותיות קטנות (לא http:// או HTTPS://) כפרוטוקול של כתובת ה-URL לקבלת העדכונים בשדה pushEndpoint.

מידע נוסף על פתרון בעיות שקשורות למינויים זמין בדפים הבאים:

פתרון בעיות שקשורות להרשאות

ההרשאות ב-Pub/Sub קובעות אילו משתמשים וחשבונות שירות יכולים לבצע פעולות במשאבי Pub/Sub. הגדרות הרשאות שגויות עלולות לגרום לשגיאות של דחיית הרשאה ולהפריע לזרימת ההודעות. יומני הביקורת מספקים תיעוד מפורט של כל שינויי ההרשאות, וכך מאפשרים לכם לזהות את מקור הבעיות האלה.

כדי לפתור בעיות בהרשאות Pub/Sub באמצעות יומני ביקורת:

  1. קבלת ההרשאות הנדרשות כדי להציג את Logs Explorer.

    מידע נוסף מופיע במאמר לפני שמתחילים.

  2. נכנסים לדף Logs Explorer במסוף Google Cloud .

    כניסה לדף Logs Explorer

  3. בוחרים פרויקט, תיקייה או ארגון קיימים ב- Google Cloud .

  4. ריכזנו כאן רשימה של מסננים שבהם אפשר להשתמש כדי למצוא יומנים רלוונטיים:

    • resource.type="pubsub_topic" OR resource.type="pubsub_subscription": אפשר להשתמש בשאילתה הזו כנקודת התחלה כשמנסים לפתור בעיות שקשורות לשינויים בהגדרות של נושאים או מינויים, או לבקרת גישה. אפשר לשלב אותו עם מסננים אחרים כדי לצמצם את החיפוש עוד יותר.

    • protoPayload.methodName="google.iam.v1.SetIamPolicy": משתמשים בשאילתה הזו כשחושדים שבעיה מסוימת נגרמת בגלל הרשאות שגויות או חסרות. הוא עוזר לעקוב אחרי מי ביצע שינויים במדיניות IAM ואילו שינויים בוצעו. המידע הזה יכול לעזור לפתור בעיות כמו משתמשים שלא מצליחים לפרסם בנושאים או להירשם למינויים, אפליקציות שלא מקבלות גישה למשאבי Pub/Sub או שינויים לא צפויים בבקרת הגישה.

    • protoPayload.status.code=7: משתמשים בשאילתה הזו כשנתקלים בשגיאות שקשורות באופן מפורש להרשאות. כך תוכלו לזהות אילו פעולות נכשלות ומי מנסה לבצע אותן. אפשר לשלב את השאילתה הזו עם השאילתות הקודמות כדי לזהות את המשאב הספציפי ואת השינוי במדיניות IAM שעלולים לגרום לדחיית ההרשאה.

  5. מנתחים את היומנים כדי לזהות גורמים כמו חותמת הזמן של האירוע, חשבון המשתמש שביצע את השינוי וסוגי השינויים שבוצעו.

  6. על סמך המידע שנאסף מיומני הביקורת, אפשר לבצע פעולות מתקנות.

פתרון בעיות בהרשאות ב-Terraform

כשמשתמשים ב-Pub/Sub עם Terraform, צריך להעניק במפורש את התפקידים הנדרשים בקוד Terraform. לדוגמה, כדי לפרסם, חשבון השירות של האפליקציה צריך את התפקיד roles/pubsub.publisher. אם התפקיד הזה לא מוגדר במפורש בקוד Terraform, יכול להיות שגרסה עתידית של terraform apply תסיר אותו. המצב הזה קורה לעיתים קרובות במהלך עדכונים לא קשורים, וגורם לכך שאפליקציה מהימנה נכשלת פתאום עם שגיאות PERMISSION_DENIED. הגדרה מפורשת של התפקיד בקוד מונעת רגרסיות מקריות כאלה.

המינוי נמחק

יש שתי דרכים עיקריות למחוק מינויים ל-Pub/Sub:

  • משתמש או חשבון שירות עם הרשאות מספיקות מוחקים את המינוי בכוונה.

  • מינוי נמחק אוטומטית אחרי תקופה של חוסר פעילות, שהיא 31 ימים כברירת מחדל. מידע נוסף על מדיניות תפוגת המינוי זמין במאמר בנושא תקופת התפוגה.

כדי לפתור בעיות שקשורות למינוי שנמחק, פועלים לפי השלבים הבאים:

  1. במסוף Google Cloud , עוברים לדף Pub/Sub subscriptions (מינויים ל-Pub/Sub) ומוודאים שהמינוי כבר לא מופיע ברשימה. מידע נוסף על הצגת רשימת המינויים זמין במאמר הצגת רשימת המינויים.

  2. בודקים את יומני הביקורת. עוברים אל Logs Explorer. אפשר להשתמש במסנן protoPayload.methodName="google.pubsub.v1.Subscriber.DeleteSubscription" כדי למצוא מינויים שנמחקו. בודקים את היומנים כדי לדעת אם מישהו מחק את המינוי או שהוא נמחק בגלל חוסר פעילות. ‫InternalExpireInactiveSubscription מציין שמינוי נמחק בגלל חוסר פעילות. מידע נוסף על שימוש ביומני ביקורת לפתרון בעיות מופיע במאמר פתרון בעיות ב-Pub/Sub באמצעות יומני ביקורת.

שגיאה מסוג 403 (Forbidden)

בדרך כלל, שגיאה 403 מציינת שאין לכם את ההרשאות המתאימות לביצוע פעולה. לדוגמה, יכול להיות שתקבלו שגיאת 403 User not authorized כשאתם מנסים לפרסם בנושא או לשלוף ממינוי.

אם השגיאה הזו מופיעה, צריך לבצע את הפעולות הבאות:

  • צריך לוודא שהפעלתם את Pub/Sub API במסוףGoogle Cloud .
  • חשוב לוודא שלגורם המבצע את הבקשה יש את ההרשאות הנדרשות במשאבי Pub/Sub API הרלוונטיים, במיוחד אם משתמשים ב-Pub/Sub API לתקשורת בין פרויקטים.

  • אם אתם משתמשים ב-Dataflow, ודאו שלחשבון השירות של Dataflow‏ {PROJECT_NUMBER}@cloudservices.gserviceaccount.comולחשבון השירות של Compute Engine‏ {PROJECT_NUMBER}-compute@developer.gserviceaccount.comיש את ההרשאות הנדרשות במשאב ה-API הרלוונטי של Pub/Sub. מידע נוסף זמין במאמר בנושא אבטחה והרשאות ב-Dataflow.

  • אם אתם משתמשים ב-App Engine, בדקו בדף ההרשאות של הפרויקט אם חשבון שירות של App Engine מופיע כעורך Pub/Sub. אם לא, מוסיפים את חשבון השירות של App Engine כעורך ב-Pub/Sub. בדרך כלל, חשבון השירות של App Engine הוא מהצורה <project-id>@appspot.gserviceaccount.com.

  • אפשר להשתמש ביומני ביקורת כדי לפתור בעיות שקשורות להרשאות.

קודי שגיאה נפוצים אחרים

רשימה של קודי שגיאה נפוצים אחרים שקשורים ל-Pub/Sub API והתיאורים שלהם מופיעה במאמר קודי שגיאה.

זמני קצוב לתפוגה של חיבורים, חביון או שגיאות ברשת

יכול להיות שתיתקלו בכשלים לסירוגין או בכשלים קבועים כשאתם מנסים להתחבר לשירותים של Pub/Sub באמצעות אפליקציות לקוח. Google Cloudהבעיות האלה יכולות להתבטא בצורות הבאות:

  • עיכובים משמעותיים בפרסום הודעות, שעלולים לגרום להצטברות של הודעות שממתינות לעיבוד באפליקציה.
  • שגיאות שקשורות לזמן קצוב לתפוגה, כמו gRPC DEADLINE_EXCEEDED, code = DeadlineExceeded או java.net.SocketTimeoutException.
  • כשלים בקלט/פלט של הרשת, כמו שגיאות UNAVAILABLE: io exception או Connection refused כשמנסים לגשת לשירותים כמו pubsub.googleapis.com או oauth2.googleapis.com.

בעיות הקישוריות האלה יכולות לקרות גם בלי לבצע שינויים בהגדרות של Pub/Sub או בקוד האפליקציה. המצב הזה קורה לעיתים קרובות כשחומות אש מקומיות או חומות אש של VPC משתמשות ברשימות היתרים של כתובות IP שמוגדרות מראש עבור Google APIs. שירותי Google, כולל Pub/Sub והתלויות שלו כמו שירותי אימות, משתמשים בטווח דינמי של כתובות IP. אם חומת האש לא לוקחת בחשבון כתובות IP חדשות, היא עלולה לחסום תנועה לכתובות ה-IP החדשות, ולגרום לבעיות בחיבור ובאימות.

כדי להבטיח קישוריות יציבה, מומלץ להימנע מכללים של חומת אש שמבוססים על כתובות IP סטטיות בשירותי Google. במקום זאת:

  • מגדירים את חומת האש כך שתאפשר תעבורת נתונים באמצעות טווחי כתובות ה-IP שפורסמו על ידי Google עבור דומיינים שמוגדרים כברירת מחדל, במקום כתובות שמוגדרות בהארדקוד. כדי ללמוד איך להשיג את הטווחים האלה ולעדכן באופן אוטומטי את הכללים בחומת האש, אפשר לעיין במאמר כתובות IP לדומיינים שמוגדרים כברירת מחדל.
  • מפעילים גישה פרטית ל-Google, שמאפשרת למופעים ברשת ה-VPC שלכם להגיע לשירותים ולממשקי ה-API של Google בלי לעבור דרך האינטרנט הציבורי, וכך מפשטת את ניהול חומת האש.

‫JWT לא חוקי: הטוקן חייב להיות טוקן לטווח קצר

אם אתם מקבלים שגיאה כמו Invalid JWT: Token must be a short-lived token (60 minutes) and in a reasonable timeframe כשהאפליקציה שלכם יוצרת אינטראקציה עם Pub/Sub API, בדרך כלל זה מצביע על בעיה בתזמון של פרטי הכניסה לאימות.

השגיאה הזו מתרחשת במהלך האימות של JSON Web Token ‏ (JWT) שמשמש לאימות של בקשות API. סיבה נפוצה היא הבדל משמעותי בשעה (הטיה בזמן) בין מכונת הלקוח שבה מופעלת ספריית Pub/Sub לבין שרתי האימות של Google. ל-JWT יש חלון תוקף מוגבל, ולכן פערים בשעון יכולים לגרום למערכת להתייחס אליהם כאל JWT שתוקף השימוש בהם פג או שעדיין לא תקף.

כדי לפתור את הבעיה, צריך לסנכרן את השעון של מכונת הלקוח:

  • מוודאים שהתאריך, השעה ואזור הזמן במחשב נכונים.

  • משתמשים בשירות Network Time Protocol ‏ (NTP) כדי לשמור על סנכרון של זמן המערכת, ומוודאים שהשירות פועל ומוגדר בצורה נכונה.

שימוש מופרז בפעולות ניהול

אם אתם מגלים שאתם משתמשים ביותר מדי מהמכסת הפעולות האדמיניסטרטיביות, יכול להיות שתצטרכו לשנות את מבנה הקוד. לדוגמה, נניח שיש לנו את הפסאודו קוד הבא. בדוגמה הזו, נעשה שימוש בפעולה אדמיניסטרטיבית (GET) כדי לבדוק אם יש מינוי לפני שהמערכת מנסה להשתמש במשאבים שלו. גם GET וגם CREATE הן פעולות של אדמין:

if !GetSubscription my-sub {
  CreateSubscription my-sub
}
Consume from subscription my-sub

דפוס יעיל יותר הוא לנסות לצרוך הודעות מהמינוי (בהנחה שאפשר להיות בטוחים באופן סביר בשם המינוי). בגישה האופטימית הזו, אתם מקבלים או יוצרים את המינוי רק אם יש שגיאה. דוגמה:

try {
  Consume from subscription my-sub
} catch NotFoundError {
  CreateSubscription my-sub
  Consume from subscription my-sub
}

אפשר להשתמש בדוגמאות הקוד הבאות כדי להטמיע את התבנית הזו בשפה הרצויה:

המשך

בדוגמה הבאה נעשה שימוש בגרסה הראשית של ספריית הלקוח Go Pub/Sub ‏ (v2). אם אתם עדיין משתמשים בספרייה v1, כדאי לעיין במדריך להעברה לגרסה v2. כדי לראות רשימה של דוגמאות קוד מגרסה 1, אפשר לעיין ב דוגמאות הקוד שהוצאו משימוש.

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של Go במאמר מדריך למתחילים: שימוש בספריות לקוח. מידע נוסף מופיע במאמרי העזרה של Pub/Sub Go API.

import (
	"context"
	"errors"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/pubsub/v2"
	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"
)

// optimisticSubscribe shows the recommended pattern for optimistically
// assuming a subscription exists prior to receiving messages.
func optimisticSubscribe(w io.Writer, projectID, topic, subscriptionName string) error {
	// projectID := "my-project-id"
	// topic := "projects/my-project-id/topics/my-topic"
	// subscription := "projects/my-project/subscriptions/my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	// client.Subscriber can be passed a subscription ID (e.g. "my-sub") or
	// a fully qualified name (e.g. "projects/my-project/subscriptions/my-sub").
	// If a subscription ID is provided, the project ID from the client is used.
	sub := client.Subscriber(subscriptionName)

	// Receive messages for 10 seconds, which simplifies testing.
	// Comment this out in production, since `Receive` should
	// be used as a long running operation.
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()

	// Instead of checking if the subscription exists, optimistically try to
	// receive from the subscription assuming it exists.
	err = sub.Receive(ctx, func(_ context.Context, msg *pubsub.Message) {
		fmt.Fprintf(w, "Got from existing subscription: %q\n", string(msg.Data))
		msg.Ack()
	})
	if err != nil {
		if st, ok := status.FromError(err); ok {
			if st.Code() == codes.NotFound {
				// If the subscription does not exist, then create the subscription.
				subscription, err := client.SubscriptionAdminClient.CreateSubscription(ctx, &pubsubpb.Subscription{
					Name:  subscriptionName,
					Topic: topic,
				})
				if err != nil {
					return err
				}
				fmt.Fprintf(w, "Created subscription: %q\n", subscriptionName)

				// client.Subscriber can be passed a subscription ID (e.g. "my-sub") or
				// a fully qualified name (e.g. "projects/my-project/subscriptions/my-sub").
				// If a subscription ID is provided, the project ID from the client is used.
				sub = client.Subscriber(subscription.GetName())
				err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
					fmt.Fprintf(w, "Got from new subscription: %q\n", string(msg.Data))
					msg.Ack()
				})
				if err != nil && !errors.Is(err, context.Canceled) {
					return err
				}
			}
		}
	}
	return nil
}

Java

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של Java במאמר התחלה מהירה: שימוש בספריות לקוח. מידע נוסף מופיע במאמרי העזרה של Pub/Sub Java API.


import com.google.api.gax.rpc.NotFoundException;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PushConfig;
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class OptimisticSubscribeExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";
    String topicId = "your-topic-id";

    optimisticSubscribeExample(projectId, subscriptionId, topicId);
  }

  public static void optimisticSubscribeExample(
      String projectId, String subscriptionId, String topicId) throws IOException {
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          // Handle incoming message, then ack the received message.
          System.out.println("Id: " + message.getMessageId());
          System.out.println("Data: " + message.getData().toStringUtf8());
          consumer.ack();
        };

    Subscriber subscriber = null;
    try {
      subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();

      // Listen for resource NOT_FOUND errors and rebuild the  subscriber and restart subscribing
      // when the current subscriber encounters these errors.
      subscriber.addListener(
          new Subscriber.Listener() {
            public void failed(Subscriber.State from, Throwable failure) {
              System.out.println(failure.getStackTrace());
              if (failure instanceof NotFoundException) {
                try (SubscriptionAdminClient subscriptionAdminClient =
                    SubscriptionAdminClient.create()) {
                  TopicName topicName = TopicName.of(projectId, topicId);
                  // Create a pull subscription with default acknowledgement deadline of 10 seconds.
                  // The client library will automatically extend acknowledgement deadlines.
                  Subscription subscription =
                      subscriptionAdminClient.createSubscription(
                          subscriptionName, topicName, PushConfig.getDefaultInstance(), 10);
                  System.out.println("Created pull subscription: " + subscription.getName());
                  optimisticSubscribeExample(projectId, subscriptionId, topicId);
                } catch (IOException err) {
                  System.out.println("Failed to create pull subscription: " + err.getMessage());
                }
              }
            }
          },
          MoreExecutors.directExecutor());

      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (IllegalStateException e) {
      // Prevent an exception from being thrown if it is the expected NotFoundException
      if (!(subscriber.failureCause() instanceof NotFoundException)) {
        throw e;
      }
    } catch (TimeoutException e) {
      subscriber.stopAsync();
    }
  }
}

Node.js

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של Node.js במאמר הפעלה מהירה: שימוש בספריות לקוח. מידע נוסף מופיע במאמרי העזרה של Pub/Sub Node.js API.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const timeout = 60;

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

function optimisticSubscribe(subscriptionNameOrId, topicNameOrId, timeout) {
  // Try using an existing subscription
  let subscription = pubSubClient.subscription(subscriptionNameOrId);

  // Create an event handler to handle messages
  let messageCount = 0;
  const messageHandler = message => {
    console.log(`Received message ${message.id}:`);
    console.log(`\tData: ${message.data}`);
    console.log(`\tAttributes: ${message.attributes}`);
    messageCount += 1;

    // "Ack" (acknowledge receipt of) the message
    message.ack();
  };

  // Set an error handler so that we're notified if the subscription doesn't
  // already exist.
  subscription.on('error', async e => {
    // Resource Not Found
    if (e.code === 5) {
      console.log('Subscription not found, creating it');
      await pubSubClient.createSubscription(
        topicNameOrId,
        subscriptionNameOrId,
      );

      // Refresh our subscriber object and re-attach the message handler.
      subscription = pubSubClient.subscription(subscriptionNameOrId);
      subscription.on('message', messageHandler);
    }
  });

  // Listen for new messages until timeout is hit; this will attempt to
  // open the actual subscriber streams. If it fails, the error handler
  // above will be called.
  subscription.on('message', messageHandler);

  // Wait a while for the subscription to run. (Part of the sample only.)
  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    console.log(`${messageCount} message(s) received.`);
  }, timeout * 1000);
}

Node.ts

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של Node.js במאמר הפעלה מהירה: שימוש בספריות לקוח. מידע נוסף מופיע במאמרי העזרה של Pub/Sub Node.js API.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const timeout = 60;

// Imports the Google Cloud client library
import {PubSub, Message, StatusError} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

function optimisticSubscribe(
  subscriptionNameOrId: string,
  topicNameOrId: string,
  timeout: number,
) {
  // Try using an existing subscription
  let subscription = pubSubClient.subscription(subscriptionNameOrId);

  // Create an event handler to handle messages
  let messageCount = 0;
  const messageHandler = (message: Message) => {
    console.log(`Received message ${message.id}:`);
    console.log(`\tData: ${message.data}`);
    console.log(`\tAttributes: ${message.attributes}`);
    messageCount += 1;

    // "Ack" (acknowledge receipt of) the message
    message.ack();
  };

  // Set an error handler so that we're notified if the subscription doesn't
  // already exist.
  subscription.on('error', async (e: StatusError) => {
    // Resource Not Found
    if (e.code === 5) {
      console.log('Subscription not found, creating it');
      await pubSubClient.createSubscription(
        topicNameOrId,
        subscriptionNameOrId,
      );

      // Refresh our subscriber object and re-attach the message handler.
      subscription = pubSubClient.subscription(subscriptionNameOrId);
      subscription.on('message', messageHandler);
    }
  });

  // Listen for new messages until timeout is hit; this will attempt to
  // open the actual subscriber streams. If it fails, the error handler
  // above will be called.
  subscription.on('message', messageHandler);

  // Wait a while for the subscription to run. (Part of the sample only.)
  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    console.log(`${messageCount} message(s) received.`);
  }, timeout * 1000);
}

Python

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של Python במאמר תחילת העבודה המהירה: שימוש בספריות לקוח. מידע נוסף מופיע במאמרי העזרה של ה-API בשפת Python של Pub/Sub.

from google.api_core.exceptions import NotFound
from google.cloud import pubsub_v1
from concurrent.futures import TimeoutError

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0
# topic_id = "your-topic-id"

# Create a subscriber client.
subscriber = pubsub_v1.SubscriberClient()

# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)

# Define callback to be called when a message is received.
def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    # Ack message after processing it.
    message.ack()

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    try:
        # Optimistically subscribe to messages on the subscription.
        streaming_pull_future = subscriber.subscribe(
            subscription_path, callback=callback
        )
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        print("Successfully subscribed until the timeout passed.")
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.
    except NotFound:
        print(f"Subscription {subscription_path} not found, creating it.")

        try:
            # If the subscription does not exist, then create it.
            publisher = pubsub_v1.PublisherClient()
            topic_path = publisher.topic_path(project_id, topic_id)
            subscription = subscriber.create_subscription(
                request={"name": subscription_path, "topic": topic_path}
            )

            if subscription:
                print(f"Subscription {subscription.name} created")
            else:
                raise ValueError("Subscription creation failed.")

            # Subscribe on the created subscription.
            try:
                streaming_pull_future = subscriber.subscribe(
                    subscription.name, callback=callback
                )
                streaming_pull_future.result(timeout=timeout)
            except TimeoutError:
                streaming_pull_future.cancel()  # Trigger the shutdown.
                streaming_pull_future.result()  # Block until the shutdown is complete.
        except Exception as e:
            print(
                f"Exception occurred when creating subscription and subscribing to it: {e}"
            )
    except Exception as e:
        print(f"Exception occurred when attempting optimistic subscribe: {e}")

C++‎

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של C++‎ במאמר תחילת העבודה המהירה: שימוש בספריות לקוח. מידע נוסף זמין במאמרי העזרה של Pub/Sub C++ API.

auto process_response = [](gc::StatusOr<pubsub::PullResponse> response) {
  if (response) {
    std::cout << "Received message " << response->message << "\n";
    std::move(response->handler).ack();
    return gc::Status();
  }
  if (response.status().code() == gc::StatusCode::kUnavailable &&
      response.status().message() == "no messages returned") {
    std::cout << "No messages returned from Pull()\n";
    return gc::Status();
  }
  return response.status();
};

// Instead of checking if the subscription exists, optimistically try to
// consume from the subscription.
auto status = process_response(subscriber.Pull());
if (status.ok()) return;
if (status.code() != gc::StatusCode::kNotFound) throw std::move(status);

// Since the subscription does not exist, create the subscription.
pubsub_admin::SubscriptionAdminClient subscription_admin_client(
    pubsub_admin::MakeSubscriptionAdminConnection());
google::pubsub::v1::Subscription request;
request.set_name(
    pubsub::Subscription(project_id, subscription_id).FullName());
request.set_topic(
    pubsub::Topic(project_id, std::move(topic_id)).FullName());
auto sub = subscription_admin_client.CreateSubscription(request);
if (!sub) throw std::move(sub).status();

// Consume from the new subscription.
status = process_response(subscriber.Pull());
if (!status.ok()) throw std::move(status);