使用架构注册表生成 Avro 消息

了解如何开发使用架构注册表预览版)生成 Apache Avro 消息的 Java 生产者应用。应用将消息写入 Managed Service for Apache Kafka 集群。

准备工作

在开始学习本教程之前,请创建一个新的 Managed Service for Apache Kafka 集群。如果您已有集群,则可以跳过此步骤。

如何创建集群

控制台

  1. 前往 Managed Service for Apache Kafka > 集群页面。

    转到“集群”

  2. 点击 创建
  3. 集群名称字段中,输入集群的名称。
  4. 区域列表中,为集群选择一个位置。
  5. 对于网络配置,请配置集群可访问的子网:
    1. 项目部分,选择您的项目。
    2. 对于网络,选择 VPC 网络。
    3. 对于子网,请选择相应子网。
    4. 点击完成
  6. 点击创建

点击创建后,集群状态为 Creating。当集群准备就绪时,状态为 Active

gcloud

如需创建 Kafka 集群,请运行 managed-kafka clusters create 命令。

gcloud managed-kafka clusters create KAFKA_CLUSTER \
--location=REGION \
--cpu=3 \
--memory=3GiB \
--subnets=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \
--async

替换以下内容:

  • KAFKA_CLUSTER:Kafka 集群的名称
  • REGION:集群的位置
  • PROJECT_ID:您的项目 ID
  • SUBNET_NAME:您要在其中创建集群的子网,例如 default

如需了解支持的位置,请参阅 Managed Service for Apache Kafka 位置

该命令会异步运行,并返回一个操作 ID:

Check operation [projects/PROJECT_ID/locations/REGION/operations/OPERATION_ID] for status.

如需跟踪创建操作的进度,请使用 gcloud managed-kafka operations describe 命令:

gcloud managed-kafka operations describe OPERATION_ID \
  --location=REGION

当集群准备就绪时,此命令的输出内容会包含 state: ACTIVE 条目。如需了解详情,请参阅 监控集群创建操作

所需的角色

如需获得创建和配置客户端虚拟机所需的权限,请让管理员向您授予项目的以下 IAM 角色:

如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

您也可以通过自定义角色或其他预定义角色来获取所需的权限。

设置客户端虚拟机

在 Compute Engine 中创建一个可以访问 Kafka 集群的 Linux 虚拟机 (VM) 实例。配置虚拟机时,请设置以下选项:

  • Region 绑定将多选选项设置为所有记录中 Region 的所有值。在与 Kafka 集群相同的区域中创建虚拟机。

  • 子网。在与您在 Kafka 集群配置中使用的子网相同的 VPC 网络中创建虚拟机。如需了解详情,请参阅查看集群的子网

  • 访问权限范围。为虚拟机分配 https://www.googleapis.com/auth/cloud-platform 访问权限范围。此范围授权虚拟机向 Managed Kafka API 发送请求。

以下步骤展示了如何设置这些选项。

控制台

  1. 在 Google Cloud 控制台中,前往创建实例页面。

    创建实例

  2. 机器配置窗格中,执行以下操作:

    1. 名称字段中,指定实例的名称。如需了解详情,请参阅资源命名惯例

    2. 区域列表中,选择与 Kafka 集群相同的区域。

    3. 可用区列表中,选择一个可用区。

  3. 在导航菜单中,点击网络。在显示的网络窗格中,执行以下操作:

    1. 前往网络接口部分。

    2. 如需展开默认网络接口,请点击 箭头。

    3. 网络字段中,选择 VPC 网络。

    4. 子网列表中,选择子网。

    5. 点击完成

  4. 在导航菜单中,点击安全。在显示的安全窗格中,执行以下操作:

    1. 访问权限范围下,选择针对每个 API 设置访问权限

    2. 在访问权限范围列表中,找到 Cloud Platform 下拉列表,然后选择已启用

  5. 点击创建以创建虚拟机。

gcloud

如需创建虚拟机实例,请使用 gcloud compute instances create 命令。

gcloud compute instances create VM_NAME \
  --scopes=https://www.googleapis.com/auth/cloud-platform \
  --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET \
  --zone=ZONE

替换以下内容:

  • VM_NAME:虚拟机的名称
  • PROJECT_ID:您的项目 ID
  • REGION:创建 Kafka 集群的区域,例如 us-central1
  • SUBNET:与您在集群配置中使用的子网位于同一 VPC 网络中的子网
  • ZONE:创建集群的区域中的一个可用区,例如 us-central1-c

如需详细了解如何创建虚拟机,请参阅在特定子网中创建虚拟机实例

授予 IAM 角色

Compute Engine 默认服务账号授予以下 Identity and Access Management (IAM) 角色:

  • Managed Kafka Client (roles/managedkafka.client)
  • Schema Registry Admin (roles/managedkafka.schemaRegistryAdmin)
  • Service Account Token Creator (roles/iam.serviceAccountTokenCreator)
  • Service Account OpenID Token Creator (roles/iam.serviceAccountOpenIdTokenCreator)

控制台

  1. 在 Google Cloud 控制台中,前往 IAM 页面。

    转到 IAM

  2. 找到 Compute Engine 默认服务账号所在的行,然后点击 修改主账号

  3. 点击添加其他角色,然后选择角色 Managed Kafka Client。 针对 Schema Registry AdminService Account Token CreatorService Account OpenID Token Creator 角色重复执行此步骤。

  4. 点击保存

gcloud

如需授予 IAM 角色,请使用 gcloud projects add-iam-policy-binding 命令。

gcloud projects add-iam-policy-binding PROJECT_ID \
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/managedkafka.client

gcloud projects add-iam-policy-binding PROJECT_ID\
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/managedkafka.schemaRegistryAdmin

gcloud projects add-iam-policy-binding PROJECT_ID\
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/iam.serviceAccountTokenCreator

gcloud projects add-iam-policy-binding PROJECT_ID \
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/iam.serviceAccountOpenIdTokenCreator

替换以下内容:

  • PROJECT_ID:您的项目 ID

  • PROJECT_NUMBER:您的项目编号

如需获取项目编号,请运行 gcloud projects describe 命令:

gcloud projects describe PROJECT_ID

如需了解详情,请参阅查找项目名称、编号和 ID

连接到虚拟机

使用 SSH 连接到虚拟机实例。

控制台

  1. 转到虚拟机实例页面。

    进入“虚拟机实例”

  2. 在虚拟机实例列表中,找到虚拟机名称,然后点击 SSH

gcloud

如需连接到虚拟机,请使用 gcloud compute ssh 命令。

gcloud compute ssh VM_NAME \
  --project=PROJECT_ID \
  --zone=ZONE

替换以下内容:

  • VM_NAME:虚拟机的名称
  • PROJECT_ID:您的项目 ID
  • ZONE:您在其中创建虚拟机的可用区

首次使用 SSH 时,可能需要进行额外的配置。如需了解详情,请参阅关于 SSH 连接

设置 Apache Maven 项目

在 SSH 会话中,运行以下命令以设置 Maven 项目。

  1. 使用以下命令安装 Java 和 Maven:

    sudo apt-get install maven openjdk-17-jdk
    
  2. 设置 Apache Maven 项目。

    使用以下命令在名为 demo 的目录中创建软件包 com.google.example

    mvn archetype:generate -DartifactId=demo -DgroupId=com.google.example\
       -DarchetypeArtifactId=maven-archetype-quickstart\
       -DarchetypeVersion=1.5 -DinteractiveMode=false
    

定义架构及其 Java 实现

在此示例中,消息表示具有名称和可选 ID 的“用户”。这对应于具有两个字段的 Avro 架构:一个必需字段 name(类型为 string)和一个可选整数 id.。如需在 Java 程序中使用此架构,您还需要生成与此架构对应的对象的 Java 实现。

  1. 切换到包含 cd demo 的项目目录。

  2. 在代码中创建用于存储架构文件的文件夹:

    mkdir -p src/main/avro
    
  3. 通过将以下代码粘贴到名为 src/main/avro/User.avsc 的文件中来创建 Avro 架构定义:

    {
      "namespace": "com.google.example",
      "type": "record",
      "name": "User",
      "fields": [
        {"name": "name", "type": "string"},
        {"name": "id",  "type": ["int", "null"]}
      ]
    }
    
  4. 通过将以下内容添加到 pom.xml.build 节点,配置 Maven 项目以使用 Avro Java 代码生成插件。请注意,pom.xml 可能在 pluginManagement 节点内有其他 plugins 节点。在此步骤中,请勿更改 pluginManagement 节点。plugins 节点需要与 pluginManagement 处于同一级别。

    <plugins>
    <plugin>
      <groupId>org.apache.avro</groupId>
      <artifactId>avro-maven-plugin</artifactId>
      <version>1.11.1</version>
      <executions>
        <execution>
          <phase>generate-sources</phase>
          <goals>
            <goal>schema</goal>
          </goals>
          <configuration>
            <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
            <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
          </configuration>
        </execution>
      </executions>
    </plugin>
    </plugins>
    
  5. 通过将以下内容添加到 pom.xmlproject/dependencies 节点的末尾,添加 Avro 作为依赖项。请注意,pom.xmldependencyManagement 标记内已有一个 dependencies 节点。 在此步骤中,请勿更改 dependencyManagement 节点。

    <dependency>
      <groupId>org.apache.avro</groupId>
      <artifactId>avro</artifactId>
      <version>1.11.1</version>
    </dependency>
    
  6. 生成 Java 源代码

      mvn generate-sources
    
  7. 运行以下命令,检查是否已创建实现源文件。来源是 Java 类文件,该文件实现了 User 对象的构造函数、访问器、序列化程序和反序列化程序。您将在生产者代码中使用此类。

    cat src/main/java/com/google/example/User.java
    

如需详细了解 Apache Avro,请参阅 Apache Avro 使用入门指南

创建生产者客户端

本部分将逐步介绍如何编写、构建和运行生产者客户端。

实现提供方

提供方使用 KafkaAvroSerializer.java 对消息进行编码并管理其架构。序列化程序会自动连接到架构注册表,在某个主题下注册架构,检索其 ID,然后使用 Avro 序列化消息。您仍需配置提供方和序列化程序。

  1. 通过将以下代码粘贴到名为 src/main/java/com/google/example/UserProducer.java 的新文件中来创建生产者客户端类

    
    package com.google.example;
    import java.util.Properties;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.common.serialization.StringSerializer;
    import io.confluent.kafka.serializers.KafkaAvroSerializer;
    
      public class UserProducer {
    
        private static Properties configure() throws Exception {
            Properties p = new Properties();
            p.load(new java.io.FileReader("client.properties"));
            p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
            return p;
        }
    
        public static void main(String[] args) throws Exception {
            Properties p = configure();
    
            KafkaProducer<String, User> producer = new KafkaProducer<String, User>(p);
            final User u = new User("SchemaEnthusiast", 42);
            final String topicName = "newUsers";
            ProducerRecord<String, User>  message =
              new ProducerRecord<String, User>(topicName, "", u);
            producer.send(message, new SendCallback());
            producer.close();
        }
      }
    
  2. src/main/java/com/google/example/SendCallback.java 中定义回调类:

    package com.google.example;
    
    import org.apache.kafka.clients.producer.Callback;
    import org.apache.kafka.clients.producer.RecordMetadata;
    
    class SendCallback implements Callback {
          public void onCompletion(RecordMetadata m, Exception e){
              if (e == null){
                System.out.println("Produced a message successfully.");
              } else {
                System.out.println(e.getMessage());
              }
          }
    }
    
  3. 如需编译此代码,您需要 org.apache.kafka.clients 软件包和序列化程序代码。序列化程序 Maven 制品通过自定义代码库分发。将以下节点添加到 pom.xmlproject 节点,以配置此代码库:

      <repositories>
        <repository>
          <id>confluent</id>
          <name>Confluent</name>
          <url>https://packages.confluent.io/maven/</url>
        </repository>
      </repositories>
    
  4. 将以下内容添加到 pom.xml 文件中的 dependencies 节点:

       <dependency>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-simple</artifactId>
          <version>1.7.32</version>
        </dependency>
        <dependency>
          <groupId>io.confluent</groupId>
          <artifactId>kafka-avro-serializer</artifactId>
          <version>7.8.1</version>
        </dependency>
        <dependency>
          <groupId>org.apache.kafka</groupId>
          <artifactId>kafka-clients</artifactId>
          <version>3.7.2</version>
        </dependency>
    
  5. 为确保所有依赖项都已正确解析,请编译客户端:

    mvn compile
    

创建架构注册表

如需创建架构注册表,请运行以下命令:

gcloud beta managed-kafka schema-registries create REGISTRY_ID \
    --location=REGION

替换以下内容:

  • REGISTRY_ID:新架构注册表的唯一标识符。这构成了注册表的资源名称的一部分。名称必须以字母开头,只能包含字母 (a-z, A-Z)、数字 (0-9) 和下划线 (_),且长度不得超过 63 个字符。

  • REGION:将要创建架构注册表的 Google Cloud 区域。此位置必须与使用相应注册表的 Kafka 集群的区域一致。

您创建的架构定义尚未上传到注册表。生产者客户端在首次运行时会执行以下步骤。

配置并运行提供方

此时,由于生产者未完全配置,因此不会运行。如需配置提供方,请同时提供 Kafka 和架构注册表配置。

  1. pom.xml 所在的同一目录中创建一个名为 client.properties 的文件,并向其中添加以下内容:

    bootstrap.servers=bootstrap.CLUSTER_ID.REGION.managedkafka.PROJECT_ID.cloud.goog:9092
    
    security.protocol=SASL_SSL
    sasl.mechanism=OAUTHBEARER
    sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
    sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
    
    schema.registry.url=https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/REGION/schemaRegistries/REGISTRY_ID
    bearer.auth.credentials.source=CUSTOM
    bearer.auth.custom.provider.class=com.google.cloud.hosted.kafka.auth.GcpBearerAuthCredentialProvider
    

    通过将以下内容插入到 pom.xmldependencies 节点(kafka-avro-serializer 依赖项上方)中,将 Kafka 和架构注册表身份验证处理程序依赖项添加到您的 Maven 项目:

      <dependency>
          <groupId>com.google.cloud.hosted.kafka</groupId>
          <artifactId>managed-kafka-auth-login-handler</artifactId>
          <version>1.0.6</version>
          <exclusions>
            <exclusion>
              <groupId>io.confluent</groupId>
              <artifactId>kafka-schema-registry-client</artifactId>
            </exclusion>
        </exclusions>
      </dependency>
    

    如果您想查看自定义架构注册表身份验证处理程序身份验证处理程序的实现,请查看 GcpBearerAuthCredentialProvider 类。

  2. 编译并运行提供方客户端:

    mvn compile -q exec:java -Dexec.mainClass=com.google.example.UserProducer
    

    如果一切顺利,您会看到 SendCallback 类生成的输出 Produced a message successfully

检查输出

  1. 检查 User 架构是否已在从主题和架构名称派生的主题名称下注册:

    SR_DOMAIN=https://managedkafka.googleapis.com
    SR_PATH=/v1/projects/PROJECT_ID/locations/REGION
    SR_HOST=$SR_DOMAIN/$SR_PATH/schemaRegistries/REGISTRY_ID/subjects
    
    curl -X GET \
      -H "Content-Type: application/vnd.schemaregistry.v1+json" \
      -H "Authorization: Bearer $(gcloud auth print-access-token)"\
      $SR_HOST
    

    此命令的输出应如下所示:

    ["newUsers-value"]
    
  2. 检查在代码库中注册的架构是否与 User 相同:

    curl -X GET \
      -H "Content-Type: application/vnd.schemaregistry.v1+json" \
      -H "Authorization: Bearer $(gcloud auth print-access-token)" \
      $SR_HOST/newUsers-value/versions/1
    

    该命令的输出应如下所示:

    {
      "subject": "newUsers-value",
      "version": 1,
      "id": 2,
      "schemaType": "AVRO",
      "schema": "{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"com.google.example\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"id\",\"type\":[\"int\",\"null\"]}]}",
      "references": []
    }
    

清理

为避免因本页中使用的资源导致您的 Google Cloud 账号产生费用,请按照以下步骤操作。

控制台

  1. 删除虚拟机实例。

    1. 转到虚拟机实例页面。

      进入“虚拟机实例”

    2. 选择虚拟机,然后点击删除

  2. 删除架构注册表。

    1. 前往架构注册表页面。

      前往架构注册表

    2. 点击架构注册表的名称。

    3. 点击删除

  3. 删除 Kafka 集群。

    1. 前往 Managed Service for Apache Kafka > 集群页面。

      转到“集群”

    2. 选择 Kafka 集群,然后点击删除

gcloud

  1. 如需删除虚拟机,请使用 gcloud compute instances delete 命令。

    gcloud compute instances delete VM_NAME --zone=ZONE
    
  2. 如需删除架构注册表,请使用 /sdk/gcloud/reference/managed-kafka/schema-registries/delete 命令。

    gcloud beta managed-kafka schema-registries delete REGISTRY_ID \
      --location=REGION
    
  3. 如需删除 Kafka 集群,请使用 gcloud managed-kafka clusters delete 命令。

    gcloud managed-kafka clusters delete CLUSTER_ID \
      --location=REGION --async
    

后续步骤

Apache Kafka® 和 Apache Avro 是 Apache Software Foundation 或其关联公司在美国和/或其他国家/地区的注册商标。
Confluent 是 Confluent, Inc. 的注册商标。