שיוך סכימה לנושא

במאמר הזה מוסבר איך לשייך סכימות לנושאים ב-Pub/Sub.

לפני שמתחילים

תפקידים והרשאות נדרשים

כדי לקבל את ההרשאות שנדרשות לשיוך ולניהול של סכימות, צריך לבקש מהאדמין להקצות לכם ב-IAM את התפקיד עריכה ב-Pub/Sub (roles/pubsub.editor) בפרויקט. כדי לקרוא הסבר על מתן תפקידים, ראו איך מנהלים את הגישה ברמת הפרויקט, התיקייה והארגון.

התפקיד המוגדר מראש הזה מכיל את ההרשאות שנדרשות לשיוך סכימות ולניהול שלהן. כדי לראות בדיוק אילו הרשאות נדרשות, אפשר להרחיב את הקטע ההרשאות הנדרשות:

ההרשאות הנדרשות

כדי לשייך סכימות ולנהל אותן, נדרשות ההרשאות הבאות:

  • יצירת סכימה: pubsub.schemas.create
  • צירוף סכימה לנושא: pubsub.schemas.attach
  • ביצוע Commit של גרסת סכימה: pubsub.schemas.commit
  • מחיקת סכימה או עדכון של סכימה: pubsub.schemas.delete
  • קבלת סכימה או עדכונים לסכימה: pubsub.schemas.get
  • רשימת סכימות: pubsub.schemas.list
  • רשימת שינויים בסכימה: pubsub.schemas.listRevisions
  • החזרה של סכימה למצב קודם: pubsub.schemas.rollback
  • כדי לאמת הודעה: pubsub.schemas.validate
  • קבלת מדיניות IAM עבור סכימה: pubsub.schemas.getIamPolicy
  • מגדירים את מדיניות IAM לסכימה: pubsub.schemas.setIamPolicy

יכול להיות שתקבלו את ההרשאות האלה באמצעות תפקידים בהתאמה אישית או תפקידים מוגדרים מראש אחרים.

אתם יכולים להעניק תפקידים והרשאות לחשבונות ראשיים כמו משתמשים, קבוצות, דומיינים או חשבונות שירות. אפשר ליצור סכימה בפרויקט אחד ולצרף אותה לנושא שנמצא בפרויקט אחר. מוודאים שיש לכם את ההרשאות הנדרשות לכל פרויקט.

הנחיות לשיוך סכימה לנושא

אפשר לשייך סכימה לנושא כשיוצרים או עורכים נושא. אלה ההנחיות לשיוך סכימה לנושא:

  • אפשר לשייך סכימה לנושא אחד או יותר.

    אחרי שמשייכים סכימה לנושא, כל הודעה שהנושא מקבל מהמפרסמים חייבת להיות בהתאם לסכימה הזו.

  • כשמשייכים סכימה לנושא, צריך לציין גם את הקידוד של ההודעות שיפורסמו כ-BINARY או כ-JSON. אם משתמשים ב-JSON עם סכימת Avro, חשוב לשים לב לכללי הקידוד של איחודים.

  • אם סכימה שמשויכת לנושא כוללת עדכונים, ההודעות צריכות להתאים לקידוד ולעבור אימות מול עדכון בטווח הזמין. אם הם לא מאומתים, פרסום ההודעה נכשל.

    המערכת מנסה להשתמש בגרסאות לפי סדר כרונולוגי הפוך שמבוסס על זמן היצירה. כדי ליצור גרסה חדשה של הסכימה, אפשר לעיין במאמר בנושא אישור גרסה חדשה של הסכימה.

לוגיקת אימות של סכימת הודעות

כשמשייכים סכימה לנושא, ואם לסכימה יש עדכונים, אפשר לציין טווח משנה של עדכונים לשימוש. אם לא מציינים טווח, המערכת משתמשת בטווח כולו לצורך אימות.

אם לא מציינים תיקון כהתיקון הראשון שמותר, המערכת משתמשת בתיקון הקיים הכי ישן של הסכימה לצורך אימות. אם לא מציינים תיקון כהתיקון האחרון שמותר, המערכת משתמשת בתיקון הקיים הכי חדש של הסכימה.

ניקח לדוגמה סכימה S שמצורפת לנושא T.

לסכימה S יש את מזהי הגרסאות A,B, C ו-D שנוצרו לפי הסדר, כאשר A היא הגרסה הראשונה או הכי ישנה. אף אחת מהסכימות לא זהה לסכימה אחרת, או שהיא לא מהווה חזרה לגרסה קודמת של סכימה קיימת.

  • אם מגדירים רק את השדה First revision allowed כ-B, הודעות שתואמות רק לסכימה A יידחו, אבל הודעות שתואמות לסכימות B,‏ C ו-D יתקבלו.

  • אם מגדירים רק את השדה Last revision allowed כ-C, הודעות שתואמות לסכימות A, B ו-C יתקבלו, והודעות שתואמות רק לסכימה D יידחו.

  • אם מגדירים את שני השדות First revision allowed (הגרסה הראשונה שאפשרה) כ-B ואת Last revision allowed (הגרסה האחרונה שאפשרה) כ-C, המערכת תקבל הודעות שתואמות לסכימות B ו-C.

  • אפשר גם להגדיר את הגרסה הראשונה והאחרונה לאותו מזהה גרסה. במקרה כזה, רק הודעות שתואמות לגרסה הזו יתקבלו.

יצירה ושיוך של סכימה כשיוצרים נושא

אפשר ליצור נושא עם סכימה באמצעות מסוף Google Cloud , ה-CLI של gcloud,‏ Pub/Sub API או ספריות הלקוח ב-Cloud.

המסוף

  1. נכנסים לדף Pub/Sub topics במסוף Google Cloud .

    לדף Topics

  2. לוחצים על יצירת נושא.

  3. בשדה Topic ID (מזהה הנושא), מזינים מזהה לנושא.

    הנחיות למתן שמות לנושאים

  4. מסמנים את התיבה שימוש בסכימה.

    משאירים את הגדרות ברירת המחדל בשאר השדות.

    אתם יכולים ליצור סכימה או להשתמש בסכימה קיימת.

  5. אם אתם יוצרים סכימה, פועלים לפי השלבים הבאים: `

    1. בקטע Select a Pub/Sub schema, בוחרים באפשרות Create a new schema.

    הדף Create schema (יצירת סכימה) מוצג בכרטיסייה משנית.

    פועלים לפי השלבים במאמר בנושא יצירת סכימה.

    1. חוזרים לכרטיסייה יצירת נושא ולוחצים על רענון.

    2. מחפשים את הסכימה בשדה Select a Pub/Sub schema.

    3. בוחרים את קידוד ההודעה כ-JSON או כ-Binary.

    לסכימה שיצרתם יש מזהה של גרסה. אתם יכולים ליצור גרסאות נוספות של הסכימה כמו שמוסבר במאמר בנושא אישור גרסה של סכימה.

  6. אם אתם משייכים סכימה שכבר יצרתם, פועלים לפי השלבים הבאים:

    1. בקטע Select a Pub/Sub schema, בוחרים סכימה קיימת.

    2. בוחרים את קידוד ההודעה כ-JSON או כ-Binary.

  7. אופציונלי: אם לסכימה שנבחרה יש עדכונים, בתפריטים הנפתחים של העדכון הראשון שאפשר להשתמש בו והעדכון האחרון שאפשר להשתמש בו בוחרים את העדכונים הרצויים בשדה טווח עדכונים.

אתם יכולים לציין את שני השדות, לציין רק אחד מהם או להשאיר את הגדרות ברירת המחדל בהתאם לדרישות שלכם.

  1. משאירים את הגדרות ברירת המחדל בשאר השדות.

  2. לוחצים על יצירה כדי לשמור את הנושא ולהקצות אותו לסכימה שנבחרה.

gcloud

כדי ליצור נושא שמוקצה לו סכימה שנוצרה קודם, מריצים את הפקודה gcloud pubsub topics create:

gcloud pubsub topics create TOPIC_ID \
        --message-encoding=ENCODING_TYPE \
        --schema=SCHEMA_ID \
        --first-revision-id=FIRST_REVISION_ID \
        --last-revision-id=LAST_REVISION_ID \

כאשר:

  • TOPIC_ID הוא המזהה של הנושא שאתם יוצרים.
  • ENCODING_TYPE הוא הקידוד של ההודעות שאומתו מול הסכימה. הערך הזה צריך להיות JSON או BINARY.
  • SCHEMA_ID הוא המזהה של סכימה קיימת.
  • FIRST_REVISION_ID הוא המזהה של הגרסה הכי ישנה שרוצים לאמת.
  • LAST_REVISION_ID הוא המזהה של הגרסה האחרונה שצריך לאמת.

המאפיינים --first-revision-id ו---last-revision-id הם אופציונליים.

אפשר גם להקצות סכימה מפרויקט אחר של Google Cloud :

gcloud pubsub topics create TOPIC_ID \
        --message-encoding=ENCODING_TYPE \
        --schema=SCHEMA_ID \
        --schema-project=SCHEMA_PROJECT \
        --project=TOPIC_PROJECT

כאשר:

  • SCHEMA_PROJECT הוא מזהה הפרויקט של Google Cloud הפרויקט של הסכימה.
  • TOPIC_PROJECT הוא מזהה הפרויקט של Google Cloud הפרויקט שבו נמצא הנושא.

REST

כדי ליצור נושא, משתמשים בשיטה projects.topics.create:

בקשה:

הבקשה צריכה להיות מאומתת באמצעות אסימון גישה בכותרת Authorization. כדי לקבל אסימון גישה ל-Application Default Credentials הנוכחיים: gcloud auth application-default print-access-token.

PUT https://pubsub.googleapis.com/v1/projects/PROJECT_ID/topics/TOPIC_ID
Authorization: Bearer ACCESS_TOKEN

גוף הבקשה:

{
  "schemaSettings": {
    "schema": "SCHEMA_NAME",
    "encoding": "ENCODING_TYPE"
    "firstRevisionId": "FIRST_REVISION_ID"
    "lastRevisionId": "LAST_REVISION_ID"
  }
}

כאשר:

  • PROJECT_ID הוא מזהה הפרויקט.
  • מספר הנושא שלך הוא TOPIC_ID.
  • SCHEMA_NAME הוא שם הסכימה שצריך לאמת מולה את ההודעות שמתפרסמות. הפורמט הוא: projects/PROJECT_ID/schemas/SCHEMA_ID.
  • ENCODING_TYPE הוא הקידוד של ההודעות שאומתו מול הסכימה. הערך חייב להיות JSON או BINARY.
  • FIRST_REVISION_ID הוא המזהה של הגרסה הכי ישנה שרוצים לבצע אימות מולה.
  • LAST_REVISION_ID הוא המזהה של הגרסה האחרונה שצריך לאמת.

המאפיינים firstRevisionId ו-lastRevisionId הם אופציונליים.

תשובה:

{
  "name": "projects/PROJECT_ID/topics/TOPIC_ID",
  "schemaSettings": {
    "schema": "SCHEMA_NAME",
    "encoding": "ENCODING_TYPE"
    "firstRevisionId": "FIRST_REVISION_ID"
    "lastRevisionId": "LAST_REVISION_ID"
  }
}

אם לא מציינים את firstRevisionId ואת lastRevisionId בבקשה, הם לא מופיעים.

C++‎

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של C++‎ במאמר תחילת העבודה: שימוש בספריות לקוח. מידע נוסף זמין במאמרי העזרה בנושא Pub/Sub C++ API.

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
   std::string topic_id, std::string schema_id, std::string const& encoding) {
  google::pubsub::v1::Topic request;
  request.set_name(pubsub::Topic(project_id, std::move(topic_id)).FullName());
  request.mutable_schema_settings()->set_schema(
      pubsub::Schema(std::move(project_id), std::move(schema_id)).FullName());
  request.mutable_schema_settings()->set_encoding(
      encoding == "JSON" ? google::pubsub::v1::JSON
                         : google::pubsub::v1::BINARY);
  auto topic = client.CreateTopic(request);

  // Note that kAlreadyExists is a possible error when the library retries.
  if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The topic already exists\n";
    return;
  }
  if (!topic) throw std::move(topic).status();

  std::cout << "The topic was successfully created: " << topic->DebugString()
            << "\n";
}

C#‎

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של C# ‎ במאמר הפעלה מהירה: שימוש בספריות לקוח. מידע נוסף מופיע במאמרי העזרה של Pub/Sub C# API.


using Google.Cloud.PubSub.V1;
using Grpc.Core;
using System;

public class CreateTopicWithSchemaSample
{
    public Topic CreateTopicWithSchema(string projectId, string topicId, string schemaId, Encoding encoding)
    {
        PublisherServiceApiClient publisher = PublisherServiceApiClient.Create();
        var topicName = TopicName.FromProjectTopic(projectId, topicId);
        Topic topic = new Topic
        {
            TopicName = topicName,
            SchemaSettings = new SchemaSettings
            {
                SchemaAsSchemaName = SchemaName.FromProjectSchema(projectId, schemaId),
                Encoding = encoding
            }
        };

        Topic receivedTopic = null;
        try
        {
            receivedTopic = publisher.CreateTopic(topic);
            Console.WriteLine($"Topic {topic.Name} created.");
        }
        catch (RpcException e) when (e.Status.StatusCode == StatusCode.AlreadyExists)
        {
            Console.WriteLine($"Topic {topicName} already exists.");
        }
        return receivedTopic;
    }
}

המשך

בדוגמה הבאה נעשה שימוש בגרסה הראשית של ספריית הלקוח Go Pub/Sub ‏ (v2). אם אתם עדיין משתמשים בספרייה v1, כדאי לעיין במדריך להעברה לגרסה v2. כדי לראות רשימה של דוגמאות קוד מגרסה 1, אפשר לעיין ב דוגמאות הקוד שהוצאו משימוש.

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של Go במאמר מדריך למתחילים: שימוש בספריות לקוח. מידע נוסף מופיע במאמרי העזרה של Pub/Sub Go API.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub/v2"
	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
)

func createTopicWithSchemaRevisions(w io.Writer, projectID, topicID, schemaID, firstRevisionID, lastRevisionID string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	// schemaID := "my-schema-id"
	// firstRevisionID := "my-revision-id"
	// lastRevisionID := "my-revision-id"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}

	topic := &pubsubpb.Topic{
		Name: fmt.Sprintf("projects/%s/topics/%s", projectID, topicID),
		SchemaSettings: &pubsubpb.SchemaSettings{
			Schema:          fmt.Sprintf("projects/%s/schemas/%s", projectID, schemaID),
			FirstRevisionId: firstRevisionID,
			LastRevisionId:  lastRevisionID,
			// Alternative encoding is pubsubpb.Encoding_JSON
			Encoding: pubsubpb.Encoding_BINARY,
		},
	}
	t, err := client.TopicAdminClient.CreateTopic(ctx, topic)
	if err != nil {
		return fmt.Errorf("CreateTopicWithConfig: %w", err)
	}
	fmt.Fprintf(w, "Created topic with schema revision: %#v\n", t)
	return nil
}

Java

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של Java במאמר תחילת העבודה: שימוש בספריות לקוח. מידע נוסף מופיע במאמרי העזרה של Pub/Sub Java API.


import com.google.api.gax.rpc.AlreadyExistsException;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.pubsub.v1.Encoding;
import com.google.pubsub.v1.SchemaName;
import com.google.pubsub.v1.SchemaSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;

public class CreateTopicWithSchemaRevisionsExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    // Use an existing schema.
    String schemaId = "your-schema-id";
    // Choose either BINARY or JSON message serialization in this topic.
    Encoding encoding = Encoding.BINARY;
    // Set the minimum and maximum revsion ID
    String firstRevisionId = "your-revision-id";
    String lastRevisionId = "your-revision-id";

    createTopicWithSchemaRevisionsExample(
        projectId, topicId, schemaId, firstRevisionId, lastRevisionId, encoding);
  }

  public static void createTopicWithSchemaRevisionsExample(
      String projectId,
      String topicId,
      String schemaId,
      String firstRevisionid,
      String lastRevisionId,
      Encoding encoding)
      throws IOException {
    TopicName topicName = TopicName.of(projectId, topicId);
    SchemaName schemaName = SchemaName.of(projectId, schemaId);

    SchemaSettings schemaSettings =
        SchemaSettings.newBuilder()
            .setSchema(schemaName.toString())
            .setFirstRevisionId(firstRevisionid)
            .setLastRevisionId(lastRevisionId)
            .setEncoding(encoding)
            .build();

    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {

      Topic topic =
          topicAdminClient.createTopic(
              Topic.newBuilder()
                  .setName(topicName.toString())
                  .setSchemaSettings(schemaSettings)
                  .build());

      System.out.println("Created topic with schema: " + topic.getName());
    } catch (AlreadyExistsException e) {
      System.out.println(schemaName + "already exists.");
    }
  }
}

Node.js

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של Node.js במאמר תחילת העבודה: שימוש בספריות לקוח. מידע נוסף זמין במאמר Pub/Sub Node.js API reference documentation.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const schemaName = 'YOUR_SCHEMA_NAME_OR_ID';
// const encodingType = 'BINARY';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithSchema(
  topicNameOrId,
  schemaNameOrId,
  encodingType,
) {
  // Get the fully qualified schema name.
  const schema = pubSubClient.schema(schemaNameOrId);
  const fullName = await schema.getName();

  // Creates a new topic with a schema. Note that you might also
  // pass Encodings.Json or Encodings.Binary here.
  await pubSubClient.createTopic({
    name: topicNameOrId,
    schemaSettings: {
      schema: fullName,
      encoding: encodingType,
    },
  });
  console.log(`Topic ${topicNameOrId} created with schema ${fullName}.`);
}

Node.js

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של Node.js במאמר תחילת העבודה: שימוש בספריות לקוח. מידע נוסף זמין במאמר Pub/Sub Node.js API reference documentation.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const schemaName = 'YOUR_SCHEMA_NAME_OR_ID';
// const encodingType = 'BINARY';

// Imports the Google Cloud client library
import {PubSub} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithSchema(
  topicNameOrId: string,
  schemaNameOrId: string,
  encodingType: 'BINARY' | 'JSON',
) {
  // Get the fully qualified schema name.
  const schema = pubSubClient.schema(schemaNameOrId);
  const fullName = await schema.getName();

  // Creates a new topic with a schema. Note that you might also
  // pass Encodings.Json or Encodings.Binary here.
  await pubSubClient.createTopic({
    name: topicNameOrId,
    schemaSettings: {
      schema: fullName,
      encoding: encodingType,
    },
  });
  console.log(`Topic ${topicNameOrId} created with schema ${fullName}.`);
}

PHP

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של PHP במאמר מדריך למתחילים: שימוש בספריות לקוח. מידע נוסף זמין במאמר מאמרי העזרה של ה-API של Pub/Sub PHP.

use Google\Cloud\PubSub\PubSubClient;
use Google\Cloud\PubSub\Schema;

/**
 * Create a topic with a schema.
 *
 * @param string $projectId
 * @param string $topicId
 * @param string $schemaId
 * @param string $encoding
 */
function create_topic_with_schema($projectId, $topicId, $schemaId, $encoding)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);

    $schema = $pubsub->schema($schemaId);

    $topic = $pubsub->createTopic($topicId, [
        'schemaSettings' => [
            // The schema may be provided as an instance of the schema type,
            // or by using the schema ID directly.
            'schema' => $schema,
            // Encoding may be either `BINARY` or `JSON`.
            // Provide a string or a constant from Google\Cloud\PubSub\V1\Encoding.
            'encoding' => $encoding,
        ]
    ]);

    printf('Topic %s created', $topic->name());
}

Python

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של Python במאמר תחילת העבודה: שימוש בספריות לקוח. מידע נוסף זמין במאמרי העזרה של Pub/Sub Python API.

from google.api_core.exceptions import AlreadyExists, InvalidArgument
from google.cloud.pubsub import PublisherClient, SchemaServiceClient
from google.pubsub_v1.types import Encoding

# TODO(developer): Replace these variables before running the sample.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# schema_id = "your-schema-id"
# first_revision_id = "your-revision-id"
# last_revision_id = "your-revision-id"
# Choose either BINARY or JSON as valid message encoding in this topic.
# message_encoding = "BINARY"

publisher_client = PublisherClient()
topic_path = publisher_client.topic_path(project_id, topic_id)

schema_client = SchemaServiceClient()
schema_path = schema_client.schema_path(project_id, schema_id)

if message_encoding == "BINARY":
    encoding = Encoding.BINARY
elif message_encoding == "JSON":
    encoding = Encoding.JSON
else:
    encoding = Encoding.ENCODING_UNSPECIFIED

try:
    response = publisher_client.create_topic(
        request={
            "name": topic_path,
            "schema_settings": {
                "schema": schema_path,
                "encoding": encoding,
                "first_revision_id": first_revision_id,
                "last_revision_id": last_revision_id,
            },
        }
    )
    print(f"Created a topic:\n{response}")

except AlreadyExists:
    print(f"{topic_id} already exists.")
except InvalidArgument:
    print("Please choose either BINARY or JSON as a valid message encoding type.")

Ruby

בדוגמה הבאה נעשה שימוש בספריית הלקוח של Ruby Pub/Sub בגרסה 3. אם אתם עדיין משתמשים בספרייה v2, כדאי לעיין במדריך להעברה לגרסה v3. כדי לראות רשימה של דוגמאות קוד של Ruby v2, אפשר לעיין ב דוגמאות הקוד שהוצאו משימוש.

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של Ruby במאמר תחילת העבודה המהירה: שימוש בספריות לקוח. מידע נוסף מופיע במאמרי העזרה של Pub/Sub Ruby API.

# topic_id = "your-topic-id"
# schema_id = "your-schema-id"
# Choose either BINARY or JSON as valid message encoding in this topic.
# message_encoding = :BINARY

pubsub = Google::Cloud::PubSub.new
topic_admin = pubsub.topic_admin

topic = topic_admin.create_topic name: pubsub.topic_path(topic_id),
                                 schema_settings: {
                                   schema: pubsub.schema_path(schema_id),
                                   encoding: message_encoding
                                 }

puts "Topic #{topic.name} created."

עריכת סכימה שמשויכת לנושא

אפשר לערוך נושא כדי לצרף סכימה, להסיר סכימה או לעדכן את טווח הגרסאות שמשמש לאימות הודעות. באופן כללי, אם אתם מתכננים לבצע שינויים בסכימה שבשימוש, אתם יכולים לשמור גרסה חדשה ולעדכן את טווח הגרסאות שמשמשות לנושא.

אפשר לערוך סכימה שמשויכת לנושא באמצעותGoogle Cloud המסוף, ה-CLI של gcloud,‏ Pub/Sub API או ספריות הלקוח ב-Cloud.

המסוף

  1. נכנסים לדף Pub/Sub topics במסוף Google Cloud .

    לדף Topics

  2. לוחצים על Topic ID של נושא.

  3. בדף הפרטים של הנושא, לוחצים על עריכה.

  4. אפשר לבצע את השינויים הבאים בסכימה.

    יכול להיות שיחלפו כמה דקות עד שהשינויים יתעדכנו.

    • אם רוצים להסיר את הסכימה מהנושא, בדף עריכת נושא מבטלים את הסימון של התיבה שימוש בסכימה.

    • אם רוצים לשנות את הסכימה, בקטע סכימה, בוחרים את שם הסכימה.

    מעדכנים את שאר השדות לפי הצורך.

    • אם רוצים לעדכן את טווח הגרסאות, בתפריטים הנפתחים הגרסה הראשונה שמותרת והגרסה האחרונה שמותרת בוחרים את הגרסאות הרצויות בטווח הגרסאות.

    אתם יכולים לציין את שני השדות, לציין רק אחד מהם או להשאיר את הגדרות ברירת המחדל בהתאם לדרישות שלכם.

  5. לוחצים על עדכון כדי לשמור את השינויים.

gcloud

gcloud pubsub topics update TOPIC_ID \
        --message-encoding=ENCODING_TYPE \
        --schema=SCHEMA_NAME \
        --first-revision-id=FIRST_REVISION_ID \
        --last-revision-id=LAST_REVISION_ID \

כאשר:

  • TOPIC_ID הוא המזהה של הנושא שאתם יוצרים.
  • ENCODING_TYPE הוא הקידוד של ההודעות שאומתו מול הסכימה. הערך הזה צריך להיות JSON או BINARY.
  • SCHEMA_NAME הוא השם של סכימה קיימת.
  • FIRST_REVISION_ID הוא המזהה של הגרסה הכי ישנה שרוצים לבצע אימות מולה.
  • LAST_REVISION_ID הוא המזהה של הגרסה האחרונה שצריך לאמת.

המאפיינים --first-revision-id ו---last-revision-id הם אופציונליים.

REST

כדי לעדכן נושא, משתמשים בשיטה projects.topics.patch:

בקשה:

הבקשה צריכה להיות מאומתת באמצעות אסימון גישה בכותרת Authorization. כדי לקבל אסימון גישה ל-Application Default Credentials הנוכחיים: gcloud auth application-default print-access-token.

PATCH https://pubsub.googleapis.com/v1/projects/PROJECT_ID/topics/TOPIC_ID
Authorization: Bearer ACCESS_TOKEN

גוף הבקשה:

{
  "schemaSettings": {
    "schema": "SCHEMA_NAME",
    "encoding": "ENCODING_TYPE"
    "firstRevisionId": "FIRST_REVISION_ID"
    "lastRevisionId": "LAST_REVISION_ID"
    "update_mask":
  }
}

כאשר:

  • PROJECT_ID הוא מזהה הפרויקט.
  • מספר הנושא שלך הוא TOPIC_ID.
  • SCHEMA_NAME הוא שם הסכימה שצריך לאמת מולה את ההודעות שמתפרסמות. הפורמט הוא: projects/PROJECT_ID/schemas/SCHEMA_ID.
  • ENCODING_TYPE הוא הקידוד של ההודעות שאומתו מול הסכימה. הערך חייב להיות JSON או BINARY.
  • FIRST_REVISION_ID הוא המזהה של הגרסה הכי ישנה שרוצים לבצע אימות מולה.
  • LAST_REVISION_ID הוא המזהה של הגרסה האחרונה שצריך לאמת.

המאפיינים firstRevisionId ו-lastRevisionId הם אופציונליים.

תשובה:

{
  "name": "projects/PROJECT_ID/topics/TOPIC_ID",
  "schemaSettings": {
    "schema": "SCHEMA_NAME",
    "encoding": "ENCODING_TYPE"
    "firstRevisionId": "FIRST_REVISION_ID"
    "lastRevisionId": "LAST_REVISION_ID"
  }
}

אחרי העדכון, גם firstRevisionId וגם lastRevisionId לא מוגדרים.

C++‎

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של C++‎ במאמר תחילת העבודה: שימוש בספריות לקוח. מידע נוסף זמין במאמרי העזרה בנושא Pub/Sub C++ API.

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
   std::string topic_id, std::string const& first_revision_id,
   std::string const& last_revision_id) {
  google::pubsub::v1::UpdateTopicRequest request;
  auto* request_topic = request.mutable_topic();
  request_topic->set_name(
      pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
  request_topic->mutable_schema_settings()->set_first_revision_id(
      first_revision_id);
  request_topic->mutable_schema_settings()->set_last_revision_id(
      last_revision_id);
  *request.mutable_update_mask()->add_paths() =
      "schema_settings.first_revision_id";
  *request.mutable_update_mask()->add_paths() =
      "schema_settings.last_revision_id";
  auto topic = client.UpdateTopic(request);

  if (!topic) throw std::move(topic).status();

  std::cout << "The topic was successfully updated: " << topic->DebugString()
            << "\n";
}

המשך

בדוגמה הבאה נעשה שימוש בגרסה הראשית של ספריית הלקוח Go Pub/Sub ‏ (v2). אם אתם עדיין משתמשים בספרייה v1, כדאי לעיין במדריך להעברה לגרסה v2. כדי לראות רשימה של דוגמאות קוד מגרסה 1, אפשר לעיין ב דוגמאות הקוד שהוצאו משימוש.

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של Go במאמר מדריך למתחילים: שימוש בספריות לקוח. מידע נוסף מופיע במאמרי העזרה של Pub/Sub Go API.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub/v2"
	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
	"google.golang.org/protobuf/types/known/fieldmaskpb"
)

func updateTopicSchema(w io.Writer, projectID, topicID, firstRevisionID, lastRevisionID string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic" // an existing topic that has schema settings attached to it.
	// firstRevisionID := "my-revision-id"
	// lastRevisionID := "my-revision-id"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}

	// This updates the first / last revision ID for the topic's schema.
	// To clear the schema entirely, use a zero valued (empty) SchemaSettings
	// with the same field mask.
	req := &pubsubpb.UpdateTopicRequest{
		Topic: &pubsubpb.Topic{
			Name: fmt.Sprintf("projects/%s/topics/%s", projectID, topicID),
			SchemaSettings: &pubsubpb.SchemaSettings{
				FirstRevisionId: firstRevisionID,
				LastRevisionId:  lastRevisionID,
			},
		},
		// Construct a field mask to indicate which field to update in the topic.
		// Fields are specified relative to the topic
		UpdateMask: &fieldmaskpb.FieldMask{
			Paths: []string{"schema_settings.first_revision_id", "schema_settings.last_revision_id"},
		},
	}
	gotTopicCfg, err := client.TopicAdminClient.UpdateTopic(ctx, req)
	if err != nil {
		fmt.Fprintf(w, "topic.Update err: %v\n", gotTopicCfg)
		return err
	}
	fmt.Fprintf(w, "Updated topic with schema: %#v\n", gotTopicCfg)
	return nil
}

Java

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של Java במאמר תחילת העבודה: שימוש בספריות לקוח. מידע נוסף מופיע במאמרי העזרה של Pub/Sub Java API.


import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.protobuf.FieldMask;
import com.google.pubsub.v1.SchemaSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import com.google.pubsub.v1.UpdateTopicRequest;
import java.io.IOException;

public class UpdateTopicSchemaExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    // This is an existing topic that has schema settings attached to it.
    String topicId = "your-topic-id";
    // Set the minimum and maximum revsion ID
    String firstRevisionId = "your-revision-id";
    String lastRevisionId = "your-revision-id";

    UpdateTopicSchemaExample.updateTopicSchemaExample(
        projectId, topicId, firstRevisionId, lastRevisionId);
  }

  public static void updateTopicSchemaExample(
      String projectId, String topicId, String firstRevisionid, String lastRevisionId)
      throws IOException {
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {

      TopicName topicName = TopicName.of(projectId, topicId);

      // Construct the schema settings with the changes you want to make.
      SchemaSettings schemaSettings =
          SchemaSettings.newBuilder()
              .setFirstRevisionId(firstRevisionid)
              .setLastRevisionId(lastRevisionId)
              .build();

      // Construct the topic with the schema settings you want to change.
      Topic topic =
          Topic.newBuilder()
              .setName(topicName.toString())
              .setSchemaSettings(schemaSettings)
              .build();

      // Construct a field mask to indicate which field to update in the topic.
      FieldMask updateMask =
          FieldMask.newBuilder()
              .addPaths("schema_settings.first_revision_id")
              .addPaths("schema_settings.last_revision_id")
              .build();

      UpdateTopicRequest request =
          UpdateTopicRequest.newBuilder().setTopic(topic).setUpdateMask(updateMask).build();

      Topic response = topicAdminClient.updateTopic(request);

      System.out.println("Updated topic with schema: " + topic.getName());
    }
  }
}

Python

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של Python במאמר תחילת העבודה: שימוש בספריות לקוח. מידע נוסף זמין במאמרי העזרה של Pub/Sub Python API.

from google.api_core.exceptions import InvalidArgument, NotFound
from google.cloud.pubsub import PublisherClient

# TODO(developer): Replace these variables before running the sample.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# first_revision_id = "your-revision-id"
# last_revision_id = "your-revision-id"

publisher_client = PublisherClient()
topic_path = publisher_client.topic_path(project_id, topic_id)

try:
    response = publisher_client.update_topic(
        request={
            "topic": {
                "name": topic_path,
                "schema_settings": {
                    "first_revision_id": first_revision_id,
                    "last_revision_id": last_revision_id,
                },
            },
            "update_mask": "schemaSettings.firstRevisionId,schemaSettings.lastRevisionId",
        }
    )
    print(f"Updated a topic schema:\n{response}")

except NotFound:
    print(f"{topic_id} not found.")
except InvalidArgument:
    print("Schema settings are not valid.")

המאמרים הבאים