Schemaversion übernehmen

In diesem Dokument erfahren Sie, wie Sie eine Schemaversion für Pub/Sub-Themen festschreiben.

Hinweis

Erforderliche Rollen und Berechtigungen

Bitten Sie Ihren Administrator, Ihnen die Pub/Sub Editor (roles/pubsub.editor) IAM-Rolle für Ihr Projekt zuzuweisen, um die Berechtigungen zu erhalten, die Sie zum Festschreiben einer Schemaversion und zum Verwalten von Schemas benötigen. Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff auf Projekte, Ordner und Organisationen verwalten.

Diese vordefinierte Rolle enthält die Berechtigungen, die zum Festschreiben einer Schemaversion und zum Verwalten von Schemas erforderlich sind. Maximieren Sie den Abschnitt Erforderliche Berechtigungen , um die notwendigen Berechtigungen anzuzeigen, die erforderlich sind:

Erforderliche Berechtigungen

Die folgenden Berechtigungen sind erforderlich, um eine Schemaversion festzuschreiben und Schemas zu verwalten:

  • Schema erstellen: pubsub.schemas.create
  • Schema an Thema anhängen: pubsub.schemas.attach
  • Schemaversion festschreiben: pubsub.schemas.commit
  • Schema oder Schemaversion löschen: pubsub.schemas.delete
  • Schema oder Schemaversionen abrufen: pubsub.schemas.get
  • Schemas auflisten: pubsub.schemas.list
  • Schemaversionen auflisten: pubsub.schemas.listRevisions
  • Schema zurücksetzen: pubsub.schemas.rollback
  • Nachricht validieren: pubsub.schemas.validate
  • IAM-Richtlinie für ein Schema abrufen: pubsub.schemas.getIamPolicy
  • Konfigurieren Sie die IAM-Richtlinie für ein Schema: pubsub.schemas.setIamPolicy

Sie können diese Berechtigungen auch mit benutzerdefinierten Rollen oder anderen vordefinierten Rollen erhalten.

Sie können Rollen und Berechtigungen für Prinzipale wie Nutzer, Gruppen, Domains oder Dienstkonten gewähren. Sie können ein Schema in einem Projekt erstellen und es an ein Thema anhängen, das sich in einem anderen Projekt befindet. Prüfen Sie, ob Sie die erforderlichen Berechtigungen für jedes Projekt haben.

Schema überarbeiten

Sie können eine Schemaversion über die Google Cloud Console, die gcloud CLI, die Pub/Sub API, oder die Cloud-Clientbibliotheken festschreiben.

Hier sind einige Richtlinien zum Festschreiben einer Schemaversion:

  • Sie können ein Schema innerhalb bestimmter Einschränkungen überarbeiten:

    • Bei Protocol Buffer-Schemas können Sie optionale Felder hinzufügen oder entfernen. Andere Felder können nicht hinzugefügt oder gelöscht werden. Außerdem können Sie keine vorhandenen Felder bearbeiten.

    • Informationen zu Regeln für die Schemaauflösung für Avro-Schemas finden Sie in der Avro-Dokumentation. Eine neue Version muss den Regeln so folgen, als wäre sie sowohl das Leserschema als auch das Schreiberschema.

    • Ein Schema kann maximal 20 Versionen gleichzeitig haben. Wenn Sie das Limit überschreiten, löschen Sie eine Schemaversion, bevor Sie eine weitere erstellen.

  • Jeder Version ist eine eindeutige Versions-ID zugeordnet. Die Versions-ID ist eine automatisch generierte achtstellige UUID.

  • Wenn Sie den Versionsbereich oder die Version eines Schemas aktualisieren, das für die Themenvalidierung verwendet wird, kann es einige Minuten dauern, bis die Änderungen wirksam werden.

Console

So erstellen Sie eine Schemaversion:

  1. Rufen Sie in der Google Cloud Console die Pub/Sub-Schemas Seite auf.

    Zu „Schemas“

  2. Klicken Sie auf die Schema-ID eines vorhandenen Schemas.

    Die Seite Schemadetails für das Schema wird geöffnet.

  3. Klicken Sie auf Version erstellen.

    Die Seite Schemaversion erstellen wird geöffnet.

  4. Nehmen Sie die gewünschten Änderungen vor.

    Für das in Schema erstellen erstellte Beispielschema in Avro können Sie beispielsweise so ein zusätzliches optionales Feld namens Price hinzufügen:

     {
       "type": "record",
       "name": "Avro",
       "fields": [
         {
           "name": "ProductName",
           "type": "string",
           "default": ""
         },
         {
           "name": "SKU",
           "type": "int",
           "default": 0
         },
         {
           "name": "InStock",
           "type": "boolean",
           "default": false
         },
         {
           "name": "Price",
           "type": "double",
           "default": "0.0"
         }
       ]
     }
    
  5. Klicken Sie auf Definition validieren , um zu prüfen, ob die Schemadefinition korrekt ist.

  6. Sie können auch die Nachrichten für das Schema validieren.

    1. Klicken Sie auf Testnachricht , um eine Beispielnachricht zu testen.

    2. Wählen Sie im Fenster Testnachricht einen Typ für die Nachrichtenkodierung aus.

    3. Geben Sie im Feld Nachrichtentext eine Testnachricht ein.

      Hier ist beispielsweise eine Beispielnachricht für das Testschema. Wählen Sie in diesem Beispiel JSON als Nachrichtenkodierung aus.

      {"ProductName":"GreenOnions", "SKU":34543, "Price":12, "InStock":true}
      
    4. Klicken Sie auf Testen.

  7. Klicken Sie auf Festschreiben , um das Schema zu speichern.

gcloud

gcloud pubsub schemas commit SCHEMA_ID \
        --type=SCHEMA_TYPE \
        --definition=SCHEMA_DEFINITION

Hierbei gilt:

  • SCHEMA_TYPE ist entweder avro oder protocol-buffer.
  • SCHEMA_DEFINITION ist ein string, der die Definition von dem Schema enthält, die gemäß dem ausgewählten Schematyp formatiert ist.

Sie können die Schemadefinition auch in einer Datei angeben:

gcloud pubsub schemas commit SCHEMA_ID \
        --type=SCHEMA_TYPE \
        --definition-file=SCHEMA_DEFINITION_FILE

Hierbei gilt:

  • SCHEMA_TYPE ist entweder avro oder protocol-buffer.
  • SCHEMA_DEFINITION_FILE ist ein string, der den Pfad zur Datei mit der Definition des Schemas enthält, die gemäß dem ausgewählten Schematyp formatiert ist.

REST

Senden Sie zum Festschreiben einer Schemaversion eine POST-Anfrage wie die folgende:

POST https://pubsub.googleapis.com/v1/projects/PROJECT_ID/schemas/SCHEMA_ID:commit
Authorization: Bearer $(gcloud auth application-default print-access-token)
Content-Type: application/json --data @response-body.json

Geben Sie im Anfragetext die folgenden Felder an:

{
  "definition": SCHEMA_DEFINITION
  "type": SCHEMA_TYPE
  "name": SCHEMA_NAME
}

Hierbei gilt:

  • SCHEMA_TYPE ist entweder AVRO oder PROTOCOL_BUFFER.
  • SCHEMA_DEFINITION ist ein String, der die Definition von dem Schema enthält, formatiert gemäß dem ausgewählten Schematyp.
  • SCHEMA_NAME ist der Name eines vorhandenen Schemas.

Der Antworttext sollte eine JSON-Darstellung einer Schemaressource enthalten. Beispiel:

{
  "name": SCHEMA_NAME,
  "type": SCHEMA_TYPE,
  "definition": SCHEMA_DEFINITION
  "revisionId": REVISION_ID
  "revisionCreateTime": REVISION_CREATE_TIME
}

Hierbei gilt:

  • REVISION_ID ist die vom Server generierte ID für die Version.
  • REVISION_CREATE_TIME ist der ISO 8601-Zeitstempel, zu dem die Version erstellt wurde.

Go

Im folgenden Beispiel wird die Hauptversion der Go Pub/Sub-Clientbibliothek (Version 2) verwendet. Wenn Sie noch die Version 1 verwenden, finden Sie im Migrationsleitfaden Informationen zur Migration zu Version 2. Eine Liste der Codebeispiele für Version 1 finden Sie unter Veraltete Codebeispiele.

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Go in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Go API.

Avro

import (
	"context"
	"fmt"
	"io"
	"os"

	pubsub "cloud.google.com/go/pubsub/v2/apiv1"
	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
)

// commitAvroSchema commits a new Avro schema revision to an existing schema.
func commitAvroSchema(w io.Writer, projectID, schemaID, avscFile string) error {
	// projectID := "my-project-id"
	// schemaID := "my-schema-id"
	// avscFile = "path/to/an/avro/schema/file(.avsc)/formatted/in/json"
	ctx := context.Background()
	client, err := pubsub.NewSchemaClient(ctx)
	if err != nil {
		return fmt.Errorf("pubsub.NewSchemaClient: %w", err)
	}
	defer client.Close()

	// Read an Avro schema file formatted in JSON as a byte slice.
	avscSource, err := os.ReadFile(avscFile)
	if err != nil {
		return fmt.Errorf("error reading from file: %s", avscFile)
	}

	schema := &pubsubpb.Schema{
		Name:       fmt.Sprintf("projects/%s/schemas/%s", projectID, schemaID),
		Type:       pubsubpb.Schema_AVRO,
		Definition: string(avscSource),
	}
	req := &pubsubpb.CommitSchemaRequest{
		Name:   fmt.Sprintf("projects/%s/schemas/%s", projectID, schemaID),
		Schema: schema,
	}
	s, err := client.CommitSchema(ctx, req)
	if err != nil {
		return fmt.Errorf("error calling CommitSchema: %w", err)
	}
	fmt.Fprintf(w, "Committed a schema using an Avro schema: %#v\n", s)
	return nil
}

Proto

import (
	"context"
	"fmt"
	"io"
	"os"

	pubsub "cloud.google.com/go/pubsub/v2/apiv1"
	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
)

// commitProtoSchema commits a new proto schema revision to an existing schema.
func commitProtoSchema(w io.Writer, projectID, schemaID, protoFile string) error {
	// projectID := "my-project-id"
	// schemaID := "my-schema"
	// protoFile = "path/to/a/proto/schema/file(.proto)/formatted/in/protocol/buffers"
	ctx := context.Background()
	client, err := pubsub.NewSchemaClient(ctx)
	if err != nil {
		return fmt.Errorf("pubsub.NewSchemaClient: %w", err)
	}
	defer client.Close()

	// Read a proto file as a byte slice.
	protoSource, err := os.ReadFile(protoFile)
	if err != nil {
		return fmt.Errorf("error reading from file: %s", protoFile)
	}

	schema := &pubsubpb.Schema{
		// TODO(hongalex): check if name is necessary here
		Name:       fmt.Sprintf("projects/%s/schemas/%s", projectID, schemaID),
		Type:       pubsubpb.Schema_PROTOCOL_BUFFER,
		Definition: string(protoSource),
	}
	req := &pubsubpb.CommitSchemaRequest{
		Name:   fmt.Sprintf("projects/%s/schemas/%s", projectID, schemaID),
		Schema: schema,
	}
	s, err := client.CommitSchema(ctx, req)
	if err != nil {
		return fmt.Errorf("CommitSchema: %w", err)
	}
	fmt.Fprintf(w, "Committed a schema using a protobuf schema: %#v\n", s)
	return nil
}

C++

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für C++ in der Kurzanleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub C++ API.

Avro

namespace pubsub = ::google::cloud::pubsub;
[](pubsub::SchemaServiceClient client, std::string const& project_id,
   std::string const& schema_id, std::string const& schema_definition_file) {
  std::string const definition = ReadFile(schema_definition_file);

  google::pubsub::v1::CommitSchemaRequest request;
  std::string const name =
      google::cloud::pubsub::Schema(project_id, schema_id).FullName();
  request.set_name(name);
  request.mutable_schema()->set_name(name);
  request.mutable_schema()->set_type(google::pubsub::v1::Schema::AVRO);
  request.mutable_schema()->set_definition(definition);
  auto schema = client.CommitSchema(request);
  if (schema.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The schema revision already exists\n";
    return;
  }
  if (!schema) throw std::move(schema).status();

  std::cout << "Schema revision successfully committed: "
            << schema->DebugString() << "\n";
}

Proto

namespace pubsub = ::google::cloud::pubsub;
[](pubsub::SchemaServiceClient client, std::string const& project_id,
   std::string const& schema_id, std::string const& schema_definition_file) {
  std::string const definition = ReadFile(schema_definition_file);

  google::pubsub::v1::CommitSchemaRequest request;
  std::string const name =
      google::cloud::pubsub::Schema(project_id, schema_id).FullName();
  request.set_name(name);
  request.mutable_schema()->set_name(name);
  request.mutable_schema()->set_type(
      google::pubsub::v1::Schema::PROTOCOL_BUFFER);
  request.mutable_schema()->set_definition(definition);
  auto schema = client.CommitSchema(request);
  if (schema.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The schema revision already exists\n";
    return;
  }
  if (!schema) throw std::move(schema).status();

  std::cout << "Schema revision successfully committed: "
            << schema->DebugString() << "\n";
}

Java

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Java in der Kurzanleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Java API.

Avro


import com.google.api.gax.rpc.NotFoundException;
import com.google.cloud.pubsub.v1.SchemaServiceClient;
import com.google.pubsub.v1.ProjectName;
import com.google.pubsub.v1.Schema;
import com.google.pubsub.v1.SchemaName;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;

public class CommitAvroSchemaExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String schemaId = "your-schema-id";
    String avscFile = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json";

    commitAvroSchemaExample(projectId, schemaId, avscFile);
  }

  public static Schema commitAvroSchemaExample(String projectId, String schemaId, String avscFile)
      throws IOException {

    ProjectName projectName = ProjectName.of(projectId);
    SchemaName schemaName = SchemaName.of(projectId, schemaId);

    // Read an Avro schema file formatted in JSON as a string.
    String avscSource = new String(Files.readAllBytes(Paths.get(avscFile)));

    try (SchemaServiceClient schemaServiceClient = SchemaServiceClient.create()) {

      Schema schema =
          schemaServiceClient.commitSchema(
              schemaName.toString(),
              Schema.newBuilder()
                  .setName(schemaName.toString())
                  .setType(Schema.Type.AVRO)
                  .setDefinition(avscSource)
                  .build());

      System.out.println("Committed a schema using an Avro schema:\n" + schema);
      return schema;
    } catch (NotFoundException e) {
      System.out.println(schemaName + "does not exist.");
      return null;
    }
  }
}

Proto


import com.google.api.gax.rpc.NotFoundException;
import com.google.cloud.pubsub.v1.SchemaServiceClient;
import com.google.pubsub.v1.ProjectName;
import com.google.pubsub.v1.Schema;
import com.google.pubsub.v1.SchemaName;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;

public class CommitProtoSchemaExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String schemaId = "your-schema-id";
    String protoFile = "path/to/a/proto/file/(.proto)/formatted/in/protocol/buffers";

    commitProtoSchemaExample(projectId, schemaId, protoFile);
  }

  public static Schema commitProtoSchemaExample(String projectId, String schemaId, String protoFile)
      throws IOException {

    ProjectName projectName = ProjectName.of(projectId);
    SchemaName schemaName = SchemaName.of(projectId, schemaId);

    // Read a proto file as a string.
    String protoSource = new String(Files.readAllBytes(Paths.get(protoFile)));

    try (SchemaServiceClient schemaServiceClient = SchemaServiceClient.create()) {

      Schema schema =
          schemaServiceClient.commitSchema(
              schemaName.toString(),
              Schema.newBuilder()
                  .setName(schemaName.toString())
                  .setType(Schema.Type.PROTOCOL_BUFFER)
                  .setDefinition(protoSource)
                  .build());

      System.out.println("Committed a schema using a protobuf schema:\n" + schema);
      return schema;
    } catch (NotFoundException e) {
      System.out.println(schemaName + "does not exist.");
      return null;
    }
  }
}

Python

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Python in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Python API.

Avro

from google.api_core.exceptions import NotFound
from google.cloud.pubsub import SchemaServiceClient
from google.pubsub_v1.types import Schema

# TODO(developer): Replace these variables before running the sample.
# project_id = "your-project-id"
# schema_id = "your-schema-id"
# avsc_file = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json"

# Read a JSON-formatted Avro schema file as a string.
with open(avsc_file, "rb") as f:
    avsc_source = f.read().decode("utf-8")

schema_client = SchemaServiceClient()
schema_path = schema_client.schema_path(project_id, schema_id)
schema = Schema(name=schema_path, type_=Schema.Type.AVRO, definition=avsc_source)

try:
    result = schema_client.commit_schema(
        request={"schema": schema, "name": schema_path}
    )
    print(f"Committed a schema revision using an Avro schema file:\n{result}")
    return result
except NotFound:
    print(f"{schema_id} does not exist.")

Proto

from google.api_core.exceptions import NotFound
from google.cloud.pubsub import SchemaServiceClient
from google.pubsub_v1.types import Schema

# TODO(developer): Replace these variables before running the sample.
# project_id = "your-project-id"
# schema_id = "your-schema-id"
# proto_file = "path/to/a/proto/file/(.proto)/formatted/in/protocol/buffers"

# Read a protobuf schema file as a string.
with open(proto_file, "rb") as f:
    proto_source = f.read().decode("utf-8")

schema_client = SchemaServiceClient()
schema_path = schema_client.schema_path(project_id, schema_id)
schema = Schema(
    name=schema_path, type_=Schema.Type.PROTOCOL_BUFFER, definition=proto_source
)

try:
    result = schema_client.commit_schema(
        request={"schema": schema, "name": schema_path}
    )
    print(f"Committed a schema revision using a protobuf schema file:\n{result}")
    return result
except NotFound:
    print(f"{schema_id} does not exist.")

Node.js

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Node.js in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Node.js API.

Avro

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const schemaNameOrId = 'YOUR_SCHEMA_NAME_OR_ID';
// const avscFile = 'path/to/an/avro/schema/file/(.avsc)/formatted/in/json';

// Imports the Google Cloud client library
const {PubSub, SchemaTypes} = require('@google-cloud/pubsub');

const fs = require('fs');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function commitAvroSchema(schemaNameOrId, avscFile) {
  // Get the fully qualified schema name.
  const schema = pubSubClient.schema(schemaNameOrId);
  const name = await schema.getName();

  // Read the new schema definition from storage.
  const definition = fs.readFileSync(avscFile).toString();

  // Use the gapic client to commit the new definition.
  const schemaClient = await pubSubClient.getSchemaClient();
  const [result] = await schemaClient.commitSchema({
    name,
    schema: {
      name,
      type: SchemaTypes.Avro,
      definition,
    },
  });

  console.log(`Schema ${name} committed with revision ${result.revisionId}.`);
}

Proto

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const schemaNameOrId = 'YOUR_SCHEMA_NAME_OR_ID';
// const protoFile = 'path/to/a/proto/schema/file/(.proto)/formatted/in/protcol/buffers';

// Imports the Google Cloud client library
const {PubSub, SchemaTypes} = require('@google-cloud/pubsub');

const fs = require('fs');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function commitProtoSchema(schemaNameOrId, protoFile) {
  // Get the fully qualified schema name.
  const schema = pubSubClient.schema(schemaNameOrId);
  const name = await schema.getName();

  // Read the new schema definition from storage.
  const definition = fs.readFileSync(protoFile).toString();

  // Use the gapic client to commit the new definition.
  const schemaClient = await pubSubClient.getSchemaClient();
  const [result] = await schemaClient.commitSchema({
    name,
    schema: {
      name,
      type: SchemaTypes.ProtocolBuffer,
      definition,
    },
  });

  console.log(`Schema ${name} committed with revision ${result.revisionId}.`);
}

Node.js

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Node.js in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Node.js API.

Avro

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const schemaNameOrId = 'YOUR_SCHEMA_NAME_OR_ID';
// const avscFile = 'path/to/an/avro/schema/file/(.avsc)/formatted/in/json';

// Imports the Google Cloud client library
import {PubSub, SchemaTypes} from '@google-cloud/pubsub';

import * as fs from 'fs';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function commitAvroSchema(schemaNameOrId: string, avscFile: string) {
  // Get the fully qualified schema name.
  const schema = pubSubClient.schema(schemaNameOrId);
  const name = await schema.getName();

  // Read the new schema definition from storage.
  const definition: string = fs.readFileSync(avscFile).toString();

  // Use the gapic client to commit the new definition.
  const schemaClient = await pubSubClient.getSchemaClient();
  const [result] = await schemaClient.commitSchema({
    name,
    schema: {
      name,
      type: SchemaTypes.Avro,
      definition,
    },
  });

  console.log(`Schema ${name} committed with revision ${result.revisionId}.`);
}

Proto

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const schemaNameOrId = 'YOUR_SCHEMA_NAME_OR_ID';
// const protoFile = 'path/to/a/proto/schema/file/(.proto)/formatted/in/protcol/buffers';

// Imports the Google Cloud client library
import {PubSub, SchemaTypes} from '@google-cloud/pubsub';

import * as fs from 'fs';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function commitProtoSchema(schemaNameOrId: string, protoFile: string) {
  // Get the fully qualified schema name.
  const schema = pubSubClient.schema(schemaNameOrId);
  const name = await schema.getName();

  // Read the new schema definition from storage.
  const definition: string = fs.readFileSync(protoFile).toString();

  // Use the gapic client to commit the new definition.
  const schemaClient = await pubSubClient.getSchemaClient();
  const [result] = await schemaClient.commitSchema({
    name,
    schema: {
      name,
      type: SchemaTypes.ProtocolBuffer,
      definition,
    },
  });

  console.log(`Schema ${name} committed with revision ${result.revisionId}.`);
}

Nachdem Sie eine Schemaversion festgeschrieben haben, können Sie die Details der neuen Version auf der Seite Schemas aufrufen.

Nächste Schritte