אישור שינוי בסכימה

במאמר הזה מוסבר איך לבצע שינוי בסכימה של נושאי Pub/Sub.

לפני שמתחילים

תפקידים והרשאות נדרשים

כדי לקבל את ההרשאות שנדרשות לביצוע שינוי בסכימה ולניהול סכימות, צריך לבקש מהאדמין להקצות לכם ב-IAM את התפקיד עריכה ב-Pub/Sub (roles/pubsub.editor) בפרויקט. כדי לקרוא הסבר על מתן תפקידים, ראו איך מנהלים את הגישה ברמת הפרויקט, התיקייה והארגון.

התפקיד המוגדר מראש הזה כולל את ההרשאות שנדרשות כדי לבצע שינוי בסכימה ולנהל סכימות. כדי לראות בדיוק אילו הרשאות נדרשות, אפשר להרחיב את הקטע ההרשאות הנדרשות:

ההרשאות הנדרשות

כדי לבצע שינוי בסכימה ולנהל סכימות, נדרשות ההרשאות הבאות:

  • יצירת סכימה: pubsub.schemas.create
  • צירוף סכימה לנושא: pubsub.schemas.attach
  • ביצוע Commit של גרסת סכימה: pubsub.schemas.commit
  • מחיקת סכימה או עדכון של סכימה: pubsub.schemas.delete
  • קבלת סכימה או עדכונים לסכימה: pubsub.schemas.get
  • רשימת סכימות: pubsub.schemas.list
  • רשימת שינויים בסכימה: pubsub.schemas.listRevisions
  • החזרה של סכימה למצב קודם: pubsub.schemas.rollback
  • כדי לאמת הודעה: pubsub.schemas.validate
  • קבלת מדיניות IAM עבור סכימה: pubsub.schemas.getIamPolicy
  • מגדירים את מדיניות IAM לסכימה: pubsub.schemas.setIamPolicy

יכול להיות שתקבלו את ההרשאות האלה באמצעות תפקידים בהתאמה אישית או תפקידים מוגדרים מראש אחרים.

אתם יכולים להעניק תפקידים והרשאות לחשבונות ראשיים כמו משתמשים, קבוצות, דומיינים או חשבונות שירות. אפשר ליצור סכימה בפרויקט אחד ולצרף אותה לנושא שנמצא בפרויקט אחר. מוודאים שיש לכם את ההרשאות הנדרשות לכל פרויקט.

שינוי סכימה

אפשר לבצע commit לשינוי בסכימה באמצעותGoogle Cloud המסוף, ה-CLI של gcloud,‏ Pub/Sub API או ספריות הלקוח ב-Cloud.

ריכזנו כאן כמה הנחיות לביצוע שינוי בסכימה:

  • אפשר לשנות סכימה במסגרת מגבלות מסוימות:

    • בסכימות של Protocol Buffer, אפשר להוסיף או להסיר שדות אופציונליים. אי אפשר להוסיף או למחוק שדות אחרים. בנוסף, אי אפשר לערוך שדות קיימים.

    • לגבי סכמות Avro, אפשר לעיין במסמכי התיעוד של Avro כדי לקרוא על הכללים בנושא פתרון סכמות. גרסה חדשה צריכה לפעול לפי הכללים כאילו היא גם סכימת הקורא וגם סכימת הכתיבה.

    • לסכימה יכולות להיות עד 20 גרסאות בו-זמנית. אם חורגים מהמגבלה, צריך למחוק גרסה של סכימה לפני שיוצרים גרסה נוספת.

  • לכל גרסה משויך מזהה גרסה ייחודי. מזהה הגרסה הוא מזהה ייחודי אוניברסלי (UUID) בן שמונה תווים שנוצר באופן אוטומטי.

  • כשמעדכנים את טווח הגרסאות או את הגרסה של סכימה שמשמשת לאימות נושאים, יכול להיות שיעברו כמה דקות עד שהשינויים ייכנסו לתוקף.

המסוף

כדי ליצור שינוי בסכימה, פועלים לפי השלבים הבאים:

  1. נכנסים לדף Pub/Sub schemas במסוף Google Cloud .

    לדף Schemas

  2. לוחצים על מזהה הסכימה של סכימה קיימת.

    ייפתח הדף Schema details של הסכימה.

  3. לוחצים על יצירת עדכון.

    ייפתח הדף Create schema revision.

  4. מבצעים את השינויים הנדרשים.

    לדוגמה, בסכימה לדוגמה ב-Avro שיצרתם במאמר יצירת סכימה, אתם יכולים להוסיף שדה אופציונלי נוסף בשם Price באופן הבא:

     {
       "type": "record",
       "name": "Avro",
       "fields": [
         {
           "name": "ProductName",
           "type": "string",
           "default": ""
         },
         {
           "name": "SKU",
           "type": "int",
           "default": 0
         },
         {
           "name": "InStock",
           "type": "boolean",
           "default": false
         },
         {
           "name": "Price",
           "type": "double",
           "default": "0.0"
         }
       ]
     }
    
  5. לוחצים על Validate definition (אימות ההגדרה) כדי לבדוק אם הגדרת הסכימה נכונה.

  6. אפשר גם לאמת את ההודעות לפי הסכימה.

    1. לוחצים על הודעת בדיקה כדי לבדוק הודעה לדוגמה.

    2. בחלון Test message (הודעת בדיקה), בוחרים סוג של Message encoding (קידוד הודעה).

    3. כותבים הודעת בדיקה בתיבת הטקסט גוף ההודעה.

      לדוגמה, הנה הודעה לדוגמה לסכימת הבדיקה. בדוגמה הזו, בוחרים באפשרות קידוד ההודעה בתור JSON.

      {"ProductName":"GreenOnions", "SKU":34543, "Price":12, "InStock":true}
      
    4. לוחצים על בדיקה.

  7. לוחצים על Commit כדי לשמור את הסכימה.

gcloud

gcloud pubsub schemas commit SCHEMA_ID \
        --type=SCHEMA_TYPE \
        --definition=SCHEMA_DEFINITION

כאשר:

  • SCHEMA_TYPE הוא avro או protocol-buffer.
  • SCHEMA_DEFINITION הוא string שמכיל את הגדרת הסכימה, בפורמט שמותאם לסוג הסכימה שנבחר.

אפשר גם לציין את הגדרת הסכימה בקובץ:

gcloud pubsub schemas commit SCHEMA_ID \
        --type=SCHEMA_TYPE \
        --definition-file=SCHEMA_DEFINITION_FILE

כאשר:

  • SCHEMA_TYPE הוא avro או protocol-buffer.
  • SCHEMA_DEFINITION_FILE הוא string שמכיל את הנתיב לקובץ עם הגדרת הסכימה, בפורמט שמתאים לסוג הסכימה שנבחר.

REST

כדי לבצע שינוי בסכימה, שולחים בקשת POST כמו הבקשה הבאה:

POST https://pubsub.googleapis.com/v1/projects/PROJECT_ID/schemas/SCHEMA_ID:commit
Authorization: Bearer $(gcloud auth application-default print-access-token)
Content-Type: application/json --data @response-body.json

מציינים את השדות הבאים בגוף הבקשה:

{
  "definition": SCHEMA_DEFINITION
  "type": SCHEMA_TYPE
  "name": SCHEMA_NAME
}

כאשר:

  • SCHEMA_TYPE הוא AVRO או PROTOCOL_BUFFER.
  • SCHEMA_DEFINITION היא מחרוזת שמכילה את ההגדרה של הסכימה, בפורמט שמתאים לסוג הסכימה שנבחר.
  • SCHEMA_NAME הוא השם של סכימה קיימת.

גוף התגובה צריך להכיל ייצוג JSON של משאב סכימה. לדוגמה:

{
  "name": SCHEMA_NAME,
  "type": SCHEMA_TYPE,
  "definition": SCHEMA_DEFINITION
  "revisionId": REVISION_ID
  "revisionCreateTime": REVISION_CREATE_TIME
}

כאשר:

  • REVISION_ID הוא המזהה של הגרסה שנוצר בשרת.
  • REVISION_CREATE_TIME היא חותמת הזמן בפורמט ISO 8601 שבה נוצרה הגרסה.

המשך

בדוגמה הבאה נעשה שימוש בגרסה הראשית של ספריית הלקוח Go Pub/Sub ‏ (v2). אם אתם עדיין משתמשים בספרייה v1, כדאי לעיין במדריך להעברה לגרסה v2. כדי לראות רשימה של דוגמאות קוד מגרסה 1, אפשר לעיין ב דוגמאות הקוד שהוצאו משימוש.

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של Go במאמר מדריך למתחילים: שימוש בספריות לקוח. מידע נוסף מופיע במאמרי העזרה של Pub/Sub Go API.

Avro

import (
	"context"
	"fmt"
	"io"
	"os"

	pubsub "cloud.google.com/go/pubsub/v2/apiv1"
	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
)

// commitAvroSchema commits a new Avro schema revision to an existing schema.
func commitAvroSchema(w io.Writer, projectID, schemaID, avscFile string) error {
	// projectID := "my-project-id"
	// schemaID := "my-schema-id"
	// avscFile = "path/to/an/avro/schema/file(.avsc)/formatted/in/json"
	ctx := context.Background()
	client, err := pubsub.NewSchemaClient(ctx)
	if err != nil {
		return fmt.Errorf("pubsub.NewSchemaClient: %w", err)
	}
	defer client.Close()

	// Read an Avro schema file formatted in JSON as a byte slice.
	avscSource, err := os.ReadFile(avscFile)
	if err != nil {
		return fmt.Errorf("error reading from file: %s", avscFile)
	}

	schema := &pubsubpb.Schema{
		Name:       fmt.Sprintf("projects/%s/schemas/%s", projectID, schemaID),
		Type:       pubsubpb.Schema_AVRO,
		Definition: string(avscSource),
	}
	req := &pubsubpb.CommitSchemaRequest{
		Name:   fmt.Sprintf("projects/%s/schemas/%s", projectID, schemaID),
		Schema: schema,
	}
	s, err := client.CommitSchema(ctx, req)
	if err != nil {
		return fmt.Errorf("error calling CommitSchema: %w", err)
	}
	fmt.Fprintf(w, "Committed a schema using an Avro schema: %#v\n", s)
	return nil
}

Proto

import (
	"context"
	"fmt"
	"io"
	"os"

	pubsub "cloud.google.com/go/pubsub/v2/apiv1"
	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
)

// commitProtoSchema commits a new proto schema revision to an existing schema.
func commitProtoSchema(w io.Writer, projectID, schemaID, protoFile string) error {
	// projectID := "my-project-id"
	// schemaID := "my-schema"
	// protoFile = "path/to/a/proto/schema/file(.proto)/formatted/in/protocol/buffers"
	ctx := context.Background()
	client, err := pubsub.NewSchemaClient(ctx)
	if err != nil {
		return fmt.Errorf("pubsub.NewSchemaClient: %w", err)
	}
	defer client.Close()

	// Read a proto file as a byte slice.
	protoSource, err := os.ReadFile(protoFile)
	if err != nil {
		return fmt.Errorf("error reading from file: %s", protoFile)
	}

	schema := &pubsubpb.Schema{
		// TODO(hongalex): check if name is necessary here
		Name:       fmt.Sprintf("projects/%s/schemas/%s", projectID, schemaID),
		Type:       pubsubpb.Schema_PROTOCOL_BUFFER,
		Definition: string(protoSource),
	}
	req := &pubsubpb.CommitSchemaRequest{
		Name:   fmt.Sprintf("projects/%s/schemas/%s", projectID, schemaID),
		Schema: schema,
	}
	s, err := client.CommitSchema(ctx, req)
	if err != nil {
		return fmt.Errorf("CommitSchema: %w", err)
	}
	fmt.Fprintf(w, "Committed a schema using a protobuf schema: %#v\n", s)
	return nil
}

C++‎

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של C++‎ במאמר תחילת העבודה: שימוש בספריות לקוח. מידע נוסף זמין במאמרי העזרה בנושא Pub/Sub C++ API.

Avro

namespace pubsub = ::google::cloud::pubsub;
[](pubsub::SchemaServiceClient client, std::string const& project_id,
   std::string const& schema_id, std::string const& schema_definition_file) {
  std::string const definition = ReadFile(schema_definition_file);

  google::pubsub::v1::CommitSchemaRequest request;
  std::string const name =
      google::cloud::pubsub::Schema(project_id, schema_id).FullName();
  request.set_name(name);
  request.mutable_schema()->set_name(name);
  request.mutable_schema()->set_type(google::pubsub::v1::Schema::AVRO);
  request.mutable_schema()->set_definition(definition);
  auto schema = client.CommitSchema(request);
  if (schema.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The schema revision already exists\n";
    return;
  }
  if (!schema) throw std::move(schema).status();

  std::cout << "Schema revision successfully committed: "
            << schema->DebugString() << "\n";
}

Proto

namespace pubsub = ::google::cloud::pubsub;
[](pubsub::SchemaServiceClient client, std::string const& project_id,
   std::string const& schema_id, std::string const& schema_definition_file) {
  std::string const definition = ReadFile(schema_definition_file);

  google::pubsub::v1::CommitSchemaRequest request;
  std::string const name =
      google::cloud::pubsub::Schema(project_id, schema_id).FullName();
  request.set_name(name);
  request.mutable_schema()->set_name(name);
  request.mutable_schema()->set_type(
      google::pubsub::v1::Schema::PROTOCOL_BUFFER);
  request.mutable_schema()->set_definition(definition);
  auto schema = client.CommitSchema(request);
  if (schema.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The schema revision already exists\n";
    return;
  }
  if (!schema) throw std::move(schema).status();

  std::cout << "Schema revision successfully committed: "
            << schema->DebugString() << "\n";
}

Java

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של Java במאמר תחילת העבודה: שימוש בספריות לקוח. מידע נוסף מופיע במאמרי העזרה של Pub/Sub Java API.

Avro


import com.google.api.gax.rpc.NotFoundException;
import com.google.cloud.pubsub.v1.SchemaServiceClient;
import com.google.pubsub.v1.ProjectName;
import com.google.pubsub.v1.Schema;
import com.google.pubsub.v1.SchemaName;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;

public class CommitAvroSchemaExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String schemaId = "your-schema-id";
    String avscFile = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json";

    commitAvroSchemaExample(projectId, schemaId, avscFile);
  }

  public static Schema commitAvroSchemaExample(String projectId, String schemaId, String avscFile)
      throws IOException {

    ProjectName projectName = ProjectName.of(projectId);
    SchemaName schemaName = SchemaName.of(projectId, schemaId);

    // Read an Avro schema file formatted in JSON as a string.
    String avscSource = new String(Files.readAllBytes(Paths.get(avscFile)));

    try (SchemaServiceClient schemaServiceClient = SchemaServiceClient.create()) {

      Schema schema =
          schemaServiceClient.commitSchema(
              schemaName.toString(),
              Schema.newBuilder()
                  .setName(schemaName.toString())
                  .setType(Schema.Type.AVRO)
                  .setDefinition(avscSource)
                  .build());

      System.out.println("Committed a schema using an Avro schema:\n" + schema);
      return schema;
    } catch (NotFoundException e) {
      System.out.println(schemaName + "does not exist.");
      return null;
    }
  }
}

Proto


import com.google.api.gax.rpc.NotFoundException;
import com.google.cloud.pubsub.v1.SchemaServiceClient;
import com.google.pubsub.v1.ProjectName;
import com.google.pubsub.v1.Schema;
import com.google.pubsub.v1.SchemaName;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;

public class CommitProtoSchemaExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String schemaId = "your-schema-id";
    String protoFile = "path/to/a/proto/file/(.proto)/formatted/in/protocol/buffers";

    commitProtoSchemaExample(projectId, schemaId, protoFile);
  }

  public static Schema commitProtoSchemaExample(String projectId, String schemaId, String protoFile)
      throws IOException {

    ProjectName projectName = ProjectName.of(projectId);
    SchemaName schemaName = SchemaName.of(projectId, schemaId);

    // Read a proto file as a string.
    String protoSource = new String(Files.readAllBytes(Paths.get(protoFile)));

    try (SchemaServiceClient schemaServiceClient = SchemaServiceClient.create()) {

      Schema schema =
          schemaServiceClient.commitSchema(
              schemaName.toString(),
              Schema.newBuilder()
                  .setName(schemaName.toString())
                  .setType(Schema.Type.PROTOCOL_BUFFER)
                  .setDefinition(protoSource)
                  .build());

      System.out.println("Committed a schema using a protobuf schema:\n" + schema);
      return schema;
    } catch (NotFoundException e) {
      System.out.println(schemaName + "does not exist.");
      return null;
    }
  }
}

Python

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של Python במאמר תחילת העבודה: שימוש בספריות לקוח. מידע נוסף זמין במאמרי העזרה של Pub/Sub Python API.

Avro

from google.api_core.exceptions import NotFound
from google.cloud.pubsub import SchemaServiceClient
from google.pubsub_v1.types import Schema

# TODO(developer): Replace these variables before running the sample.
# project_id = "your-project-id"
# schema_id = "your-schema-id"
# avsc_file = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json"

# Read a JSON-formatted Avro schema file as a string.
with open(avsc_file, "rb") as f:
    avsc_source = f.read().decode("utf-8")

schema_client = SchemaServiceClient()
schema_path = schema_client.schema_path(project_id, schema_id)
schema = Schema(name=schema_path, type_=Schema.Type.AVRO, definition=avsc_source)

try:
    result = schema_client.commit_schema(
        request={"schema": schema, "name": schema_path}
    )
    print(f"Committed a schema revision using an Avro schema file:\n{result}")
    return result
except NotFound:
    print(f"{schema_id} does not exist.")

Proto

from google.api_core.exceptions import NotFound
from google.cloud.pubsub import SchemaServiceClient
from google.pubsub_v1.types import Schema

# TODO(developer): Replace these variables before running the sample.
# project_id = "your-project-id"
# schema_id = "your-schema-id"
# proto_file = "path/to/a/proto/file/(.proto)/formatted/in/protocol/buffers"

# Read a protobuf schema file as a string.
with open(proto_file, "rb") as f:
    proto_source = f.read().decode("utf-8")

schema_client = SchemaServiceClient()
schema_path = schema_client.schema_path(project_id, schema_id)
schema = Schema(
    name=schema_path, type_=Schema.Type.PROTOCOL_BUFFER, definition=proto_source
)

try:
    result = schema_client.commit_schema(
        request={"schema": schema, "name": schema_path}
    )
    print(f"Committed a schema revision using a protobuf schema file:\n{result}")
    return result
except NotFound:
    print(f"{schema_id} does not exist.")

Node.js

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של Node.js במאמר תחילת העבודה: שימוש בספריות לקוח. מידע נוסף זמין במאמר Pub/Sub Node.js API reference documentation.

Avro

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const schemaNameOrId = 'YOUR_SCHEMA_NAME_OR_ID';
// const avscFile = 'path/to/an/avro/schema/file/(.avsc)/formatted/in/json';

// Imports the Google Cloud client library
const {PubSub, SchemaTypes} = require('@google-cloud/pubsub');

const fs = require('fs');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function commitAvroSchema(schemaNameOrId, avscFile) {
  // Get the fully qualified schema name.
  const schema = pubSubClient.schema(schemaNameOrId);
  const name = await schema.getName();

  // Read the new schema definition from storage.
  const definition = fs.readFileSync(avscFile).toString();

  // Use the gapic client to commit the new definition.
  const schemaClient = await pubSubClient.getSchemaClient();
  const [result] = await schemaClient.commitSchema({
    name,
    schema: {
      name,
      type: SchemaTypes.Avro,
      definition,
    },
  });

  console.log(`Schema ${name} committed with revision ${result.revisionId}.`);
}

Proto

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const schemaNameOrId = 'YOUR_SCHEMA_NAME_OR_ID';
// const protoFile = 'path/to/a/proto/schema/file/(.proto)/formatted/in/protcol/buffers';

// Imports the Google Cloud client library
const {PubSub, SchemaTypes} = require('@google-cloud/pubsub');

const fs = require('fs');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function commitProtoSchema(schemaNameOrId, protoFile) {
  // Get the fully qualified schema name.
  const schema = pubSubClient.schema(schemaNameOrId);
  const name = await schema.getName();

  // Read the new schema definition from storage.
  const definition = fs.readFileSync(protoFile).toString();

  // Use the gapic client to commit the new definition.
  const schemaClient = await pubSubClient.getSchemaClient();
  const [result] = await schemaClient.commitSchema({
    name,
    schema: {
      name,
      type: SchemaTypes.ProtocolBuffer,
      definition,
    },
  });

  console.log(`Schema ${name} committed with revision ${result.revisionId}.`);
}

Node.js

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של Node.js במאמר תחילת העבודה: שימוש בספריות לקוח. מידע נוסף זמין במאמר Pub/Sub Node.js API reference documentation.

Avro

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const schemaNameOrId = 'YOUR_SCHEMA_NAME_OR_ID';
// const avscFile = 'path/to/an/avro/schema/file/(.avsc)/formatted/in/json';

// Imports the Google Cloud client library
import {PubSub, SchemaTypes} from '@google-cloud/pubsub';

import * as fs from 'fs';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function commitAvroSchema(schemaNameOrId: string, avscFile: string) {
  // Get the fully qualified schema name.
  const schema = pubSubClient.schema(schemaNameOrId);
  const name = await schema.getName();

  // Read the new schema definition from storage.
  const definition: string = fs.readFileSync(avscFile).toString();

  // Use the gapic client to commit the new definition.
  const schemaClient = await pubSubClient.getSchemaClient();
  const [result] = await schemaClient.commitSchema({
    name,
    schema: {
      name,
      type: SchemaTypes.Avro,
      definition,
    },
  });

  console.log(`Schema ${name} committed with revision ${result.revisionId}.`);
}

Proto

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const schemaNameOrId = 'YOUR_SCHEMA_NAME_OR_ID';
// const protoFile = 'path/to/a/proto/schema/file/(.proto)/formatted/in/protcol/buffers';

// Imports the Google Cloud client library
import {PubSub, SchemaTypes} from '@google-cloud/pubsub';

import * as fs from 'fs';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function commitProtoSchema(schemaNameOrId: string, protoFile: string) {
  // Get the fully qualified schema name.
  const schema = pubSubClient.schema(schemaNameOrId);
  const name = await schema.getName();

  // Read the new schema definition from storage.
  const definition: string = fs.readFileSync(protoFile).toString();

  // Use the gapic client to commit the new definition.
  const schemaClient = await pubSubClient.getSchemaClient();
  const [result] = await schemaClient.commitSchema({
    name,
    schema: {
      name,
      type: SchemaTypes.ProtocolBuffer,
      definition,
    },
  });

  console.log(`Schema ${name} committed with revision ${result.revisionId}.`);
}

אחרי שמאשרים שינוי בסכימה, אפשר לראות את הפרטים של השינוי החדש בדף Schemas.

המאמרים הבאים