创建主题

本文档介绍了如何在架构注册表中创建主题。主题是架构不同版本的逻辑容器。首次创建主题时,您还会为该主题创建第一个架构版本。

您可以通过以下方式之一创建主题:

  • 隐式(默认):许多生产者和消费者客户端的默认行为是在客户端连接时自动创建不存在的架构。引用该架构的主题和版本也会自动创建。这很方便,但如果多个客户端同时创建版本,可能会导致数据不一致。

  • 显式(推荐):在这种方法中,每个架构都必须在注册表中创建,然后生产者或消费者客户端才能使用它。您可以使用 Google Cloud 控制台或 Managed Kafka API 来执行此操作。

此行为必须在客户端设置中进行配置。如需了解详情,请参阅序列化程序或反序列化程序客户端库文档。

准备工作

所需的角色和权限

如需获得创建主题所需的权限,请让您的管理员为您授予项目或架构注册表的 Managed Kafka Schema Registry Editor (roles/managedkafka.schemaRegistryEditor) IAM 角色。 如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

此预定义角色包含创建主账号所需的权限。如需查看所需的确切权限,请展开所需权限部分:

所需权限

创建正文需要以下权限:

  • 在父上下文或默认上下文中授予此权限: managedkafka.versions.create

您也可以使用自定义角色或其他预定义角色来获取这些权限。

更高级别的 Managed Kafka Schema Registry Admin (roles/managedkafka.schemaRegistryAdmin) 角色还包含创建和管理主题版本的权限。

如需详细了解 Managed Service for Apache Kafka 可用的预定义角色,请参阅访问权限控制文档

创建主题或架构的第一个版本

创建主题时,您也会创建其第一个版本。第一个版本会创建新架构或引用现有架构。

控制台

  1. 在 Google Cloud 控制台中,前往架构注册表页面。

    前往架构注册表

  2. 点击要创建主题的架构注册表的名称。

  3. 点击 创建主题

  4. 主题名称中,为主题输入一个唯一的名称。

    名称必须以字母开头,并且只能包含字母、数字和以下特殊字符:短划线 (-)、英文句点 (.)、下划线 (_)、波形符 (~)、百分号 (%) 或加号 (+)。主题的名称设定后不可更改。

    如需详细了解如何选择主题名称,请参阅主题命名策略

  5. 对于情境,请选择一个情境或创建一个新情境。上下文的作用类似于命名空间,用于整理主题和架构,并在不同群组之间提供隔离。

    • 如需使用现有上下文,请从上下文列表中选择相应上下文。 默认情境显示为 (default context)

    • 如需创建新情境,请执行以下步骤:

      1. 上下文列表中选择创建上下文

      2. 上下文名称字段中,输入上下文的名称。

        名称必须以字母开头,且只能包含字母、数字和以下特殊字符:短划线 (-)、英文句点 (.)、下划线 (_)、波形符 (~)、百分号 (%) 或加号 (+)。上下文的名称设定后不可更改。

      3. 点击保存

  6. 对于架构类型,请选择 Avro协议缓冲区

  7. 架构定义字段中,输入架构定义。架构的格式必须与架构类型一致。请勿在架构字段名称中包含敏感信息,例如个人身份信息 (PII) 或安全数据。

  8. 如果您的架构使用或依赖于架构注册表中其他架构中定义的数据结构,请执行以下步骤:

    1. 点击 Add Schema reference
    2. 参考名称字段中,输入所引用架构的参考名称。
    3. 主题列表中,选择包含所引用架构的主题。
    4. 版本列表中,选择所引用架构的版本号。
    5. 点击确定

    针对每个引用的架构重复执行上述步骤。

  9. 点击创建

REST

必须使用 Authorization 标头中的访问令牌对请求进行身份验证。如需获取当前应用默认凭据的访问令牌,请运行以下命令:gcloud auth application-default print-access-token

以下 REST API 示例会创建主题的第一个版本。

如需在默认情境中创建正文,请使用 projects.locations.schemaRegistries.subjects.versions.create 方法向指定 URI 发出 POST 请求:

POST https://managedkafka.googleapis.com/v1main/projects/PROJECT_ID/locations/LOCATION/schemaRegistries/REGISTRY_ID/subjects/SUBJECT_ID/versions
Authorization: Bearer $(gcloud auth application-default print-access-token)
Content-Type: application/json

或者,如果使用特定情境,请使用 projects.locations.schemaRegistries.contexts.subjects.versions.create 方法将情境包含在主题集合 URI 中:

POST https://managedkafka.googleapis.com/v1main/projects/PROJECT_ID/locations/LOCATION/schemaRegistries/REGISTRY_ID/contexts/CONTEXT_ID/subjects/SUBJECT_ID/versions
Authorization: Bearer $(gcloud auth application-default print-access-token)
Content-Type: application/json

替换以下内容:

  • PROJECT_ID(必需):您的 Google Cloud项目 ID。

  • LOCATION(必需):架构注册表所在的 Google Cloud 区域。

  • REGISTRY_ID(必需):目标架构注册表的 ID。

  • CONTEXT_ID(可选):包含主题的上下文的 ID。如果您想明确指定默认上下文,请使用 .;否则,请省略 /contexts/CONTEXT_ID 以隐式使用默认上下文。

    此名称必须以字母开头,且只能包含字母、数字和以下特殊字符:短划线 -、英文句点 .、下划线 _、波形符 ~、百分号 % 或加号 +。上下文的名称是不可变的。

  • SUBJECT_ID(必需):新主题的 ID,用于创建第一个版本。

    此名称必须以字母开头,且只能包含字母、数字和以下特殊字符:短划线 -、英文句点 .、下划线 _、波形符 ~、百分号 % 或加号 +。主题的名称是不可变的。

请求正文

在请求正文中添加一个 JSON 对象,用于指定架构详细信息:

{
  "schema": "YOUR_SCHEMA_DEFINITION_STRING",
  "schema_type": "AVRO" | "PROTOBUF", // Optional, defaults to AVRO
  "references": [ // Optional
    {
      "name": "REFERENCE_NAME",
      "subject": "REFERENCED_SUBJECT_ID",
      "version": REFERENCED_VERSION_NUMBER
    }
    // ... more references
  ]
  // "version": VERSION_NUMBER, // Optional: Usually omitted, let service assign next
  // "id": SCHEMA_ID, // Optional: Usually omitted, let service assign or reuse
}

替换以下内容:

  • YOUR_SCHEMA_DEFINITION_STRING(必需):包含实际架构定义载荷的字符串。

    请勿在架构字段名称中包含敏感信息,例如个人身份信息 (PII) 或安全数据。

  • schemaType(可选):架构的类型。可以是 AVROPROTOBUF。如果省略,则默认为 AVRO

  • references(可选):一个对象数组,用于定义此架构引用的任何架构。

    • REFERENCE_NAME:用于在此架构的定义中引用其他架构的名称。
    • REFERENCED_SUBJECT_ID:所引用架构的主题 ID。
    • REFERENCED_VERSION_NUMBER:所引用主题的架构的特定版本号。
  • versionIdschemaId:可选字段,通常由服务处理。对于主题的第一个版本,versionId 将为“1”。

如果请求成功,并且架构有效,且通过了兼容性检查(如果已配置),则 API 会返回 200 OK 状态代码。响应正文包含所创建版本使用的架构 ID,该 ID 与版本 ID 不同。

如需了解详情,请参阅 REST API 文档

后续步骤

Apache Kafka® 是 Apache Software Foundation 或其关联公司在美国和/或其他国家/地区的注册商标。