使用架构注册表生成 Avro 消息
了解如何开发 Java 生产者应用,该应用使用 架构注册表 (预览版)生成 Apache Avro 消息。该应用会将消息写入 Managed Service for Apache Kafka 集群。
准备工作
在开始本教程之前,请创建一个新的 Managed Service for Apache Kafka 集群。如果您已有集群,则可以跳过此步骤。
如何创建集群
控制台
- 前往 Managed Service for Apache Kafka > 集群 页面。
- 点击 创建。
- 在集群名称 字段中,输入集群的名称。
- 在区域 列表中,为集群选择一个位置。
-
对于网络配置,请配置集群可访问的子网:
- 对于项目,选择您的项目。
- 对于网络,选择 VPC 网络。
- 对于子网 ,选择子网。
- 点击完成 。
- 点击创建 。
点击创建 后,集群状态为 Creating。当集群
准备就绪后,状态为 Active。
gcloud
如需创建 Kafka 集群,请运行
managed-kafka clusters
create 命令。
gcloud managed-kafka clusters create KAFKA_CLUSTER \ --location=REGION \ --cpu=3 \ --memory=3GiB \ --subnets=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \ --async
替换以下内容:
KAFKA_CLUSTER:Kafka 集群的名称REGION:集群的位置PROJECT_ID:您的项目 IDSUBNET_NAME:您要在其中创建集群的子网,例如default
如需了解受支持的位置,请参阅 Managed Service for Apache Kafka 位置。
该命令以异步方式运行,并返回操作 ID:
Check operation [projects/PROJECT_ID/locations/REGION/operations/OPERATION_ID] for status.
如需跟踪创建操作的进度,请使用
gcloud managed-kafka
operations describe 命令:
gcloud managed-kafka operations describe OPERATION_ID \ --location=REGION
当集群准备就绪后,此命令的输出会包含条目 state:
ACTIVE。如需了解详情,请参阅
监控
集群创建操作。
所需的角色
如需获得创建和配置客户端虚拟机所需的权限,请让您的管理员为您授予项目的以下 IAM 角色:
- Compute Instance Admin (v1) (
roles/compute.instanceAdmin.v1) - Project IAM Admin (
roles/resourcemanager.projectIamAdmin) - Role Viewer (
roles/iam.roleViewer) - Service Account User (
roles/iam.serviceAccountUser)
如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限。
设置客户端虚拟机
在 Compute Engine 中创建一个可以访问 Kafka 集群的 Linux 虚拟机 (VM) 实例。配置虚拟机时,请设置以下选项:
区域 。请在与 Kafka 集群相同的区域中创建虚拟机。
子网 。请在与您在 Kafka 集群配置中使用的子网相同的 VPC 网络中创建虚拟机。如需了解详情,请参阅 查看集群的子网。
访问权限范围 。将
https://www.googleapis.com/auth/cloud-platform访问权限范围分配给 虚拟机。此范围授权虚拟机向 Managed Kafka API 发送请求。
以下步骤展示了如何设置这些选项。
控制台
在 Google Cloud 控制台中,前往创建实例 页面。
在机器配置 窗格中,执行以下操作:
在名称字段中,指定实例的名称。如需了解更多 信息,请参阅 资源命名惯例。
在区域 列表中,选择与您的 Kafka 集群相同的区域。
在可用区 列表中,选择一个可用区。
在导航菜单中,点击网络 。在显示的网络窗格 中,执行以下操作:
前往网络接口 部分。
如需展开默认网络接口,请点击 箭头。
在网络 字段中,选择 VPC 网络。
在子网 列表中,选择子网。
点击完成 。
在导航菜单中,点击安全 。在显示的安全窗格 中,执行以下操作:
对于访问权限范围 ,选择针对每个 API 设置访问权限 。
在访问权限范围列表中,找到 Cloud Platform 下拉列表,然后选择已启用 。
点击创建 以创建虚拟机。
gcloud
如需创建虚拟机实例,请使用
gcloud compute instances create
命令。
gcloud compute instances create VM_NAME \
--scopes=https://www.googleapis.com/auth/cloud-platform \
--subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET \
--zone=ZONE
替换以下内容:
- VM_NAME:虚拟机的名称
- PROJECT_ID:您的项目 ID
- REGION:您在其中创建 Kafka
集群的区域,例如
us-central1 - SUBNET:与您在集群配置中使用的子网位于同一 VPC 网络中的子网
- ZONE:您在其中创建集群的区域中的可用区,例如
us-central1-c
如需详细了解如何创建虚拟机,请参阅 在特定子网中创建虚拟机实例。
授予 IAM 角色
向 Compute Engine 默认服务账号授予以下 Identity and Access Management (IAM) 角色:
- Managed Kafka Client (
roles/managedkafka.client) - Schema Registry Admin (
roles/managedkafka.schemaRegistryAdmin) - Service Account Token Creator (
roles/iam.serviceAccountTokenCreator) Service Account OpenID Token Creator (
roles/iam.serviceAccountOpenIdTokenCreator)
控制台
在 Google Cloud 控制台中,前往 IAM 页面。
找到Compute Engine 默认服务账号 对应的行,然后点击 修改主账号 。
点击添加其他角色 ,然后选择角色 Managed Kafka Client 。 对 Schema Registry Admin、 Service Account Token Creator 和 Service Account OpenID Token Creator 角色重复此步骤。
点击保存 。
gcloud
如需授予 IAM 角色,请使用
gcloud projects add-iam-policy-binding
命令。
gcloud projects add-iam-policy-binding PROJECT_ID \
--member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/managedkafka.client
gcloud projects add-iam-policy-binding PROJECT_ID\
--member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/managedkafka.schemaRegistryAdmin
gcloud projects add-iam-policy-binding PROJECT_ID\
--member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/iam.serviceAccountTokenCreator
gcloud projects add-iam-policy-binding PROJECT_ID \
--member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/iam.serviceAccountOpenIdTokenCreator
替换以下内容:
PROJECT_ID:您的项目 ID
PROJECT_NUMBER:您的项目编号
如需获取项目编号,请运行
gcloud projects describe 命令:
gcloud projects describe PROJECT_ID
如需了解详情,请参阅 查找项目名称、编号和 ID。
连接到虚拟机
使用 SSH 连接到虚拟机实例。
控制台
转到虚拟机实例页面。
在虚拟机实例列表中,找到虚拟机名称,然后点击 SSH 。
gcloud
如需连接到虚拟机,请使用
gcloud compute ssh 命令。
gcloud compute ssh VM_NAME \
--project=PROJECT_ID \
--zone=ZONE
替换以下内容:
- VM_NAME:虚拟机的名称
- PROJECT_ID:您的项目 ID
- ZONE:您在其中创建虚拟机的可用区
首次使用 SSH 时,可能需要进行额外的配置。如需了解更多 信息,请参阅关于 SSH 连接。
设置 Apache Maven 项目
在 SSH 会话中,运行以下命令来设置 Maven 项目。
使用以下命令安装 Java 和 Maven:
sudo apt-get install maven openjdk-17-jdk设置 Apache Maven 项目。
使用以下命令在名为
demo的目录中创建软件包com.google.example。mvn archetype:generate -DartifactId=demo -DgroupId=com.google.example\ -DarchetypeArtifactId=maven-archetype-quickstart\ -DarchetypeVersion=1.5 -DinteractiveMode=false
定义架构及其 Java 实现
在此示例中,消息表示具有名称和可选 ID 的“用户”。这对应于具有两个字段的 Avro 架构:
类型为 string 的必填字段 name 和可选整数 id.
如需在 Java 程序中使用此架构,您还需要生成与此架构对应的对象的
Java 实现。
使用
cd demo切换到项目目录。创建用于在代码中存储架构文件的文件夹:
mkdir -p src/main/avro通过将以下代码粘贴到名为
src/main/avro/User.avsc的文件中来创建 Avro 架构定义:{ "namespace": "com.google.example", "type": "record", "name": "User", "fields": [ {"name": "name", "type": "string"}, {"name": "id", "type": ["int", "null"]} ] }通过将以下内容添加到
build的pom.xml.节点,将 Maven 项目配置为使用 Avro Java 代码生成插件 。请注意,pom.xml的pluginManagement节点内可能还有其他plugins节点。在此步骤中,请勿更改pluginManagement节点。plugins节点需要与pluginManagement处于同一级别。<plugins> <plugin> <groupId>org.apache.avro</groupId> <artifactId>avro-maven-plugin</artifactId> <version>1.11.1</version> <executions> <execution> <phase>generate-sources</phase> <goals> <goal>schema</goal> </goals> <configuration> <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory> <outputDirectory>${project.basedir}/src/main/java/</outputDirectory> </configuration> </execution> </executions> </plugin> </plugins>通过将以下内容添加到
pom.xml的project/dependencies节点的末尾,将 Avro 添加为依赖项。请注意,pom.xml的dependencyManagement标记内已有一个dependencies节点。 在此步骤中,请勿更改dependencyManagement节点。<dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>1.11.1</version> </dependency>生成 Java 源代码
mvn generate-sources运行以下命令,检查是否已创建实现源文件。该源是一个 Java 类文件,用于为
User对象实现构造函数、访问器、序列化程序和反序列化程序。您将在生成方代码中使用此类。cat src/main/java/com/google/example/User.java
如需详细了解 Apache Avro,请参阅 Apache Avro 使用入门指南。
创建生成方客户端
本部分将详细介绍编写、构建和运行生成方客户端的步骤。
实现生成方
生成方使用 KafkaAvroSerializer.java 对消息进行编码并管理其架构。序列化程序会自动连接到架构注册表,在主题下注册架构,检索其 ID,然后使用 Avro 序列化消息。您仍然需要配置生成方和序列化程序。
通过将以下代码粘贴到名为
src/main/java/com/google/example/UserProducer.java的新文件中来创建生成方客户端类package com.google.example; import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import io.confluent.kafka.serializers.KafkaAvroSerializer; public class UserProducer { private static Properties configure() throws Exception { Properties p = new Properties(); p.load(new java.io.FileReader("client.properties")); p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class); return p; } public static void main(String[] args) throws Exception { Properties p = configure(); KafkaProducer<String, User> producer = new KafkaProducer<String, User>(p); final User u = new User("SchemaEnthusiast", 42); final String topicName = "newUsers"; ProducerRecord<String, User> message = new ProducerRecord<String, User>(topicName, "", u); producer.send(message, new SendCallback()); producer.close(); } }在
src/main/java/com/google/example/SendCallback.java中定义回调类:package com.google.example; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.RecordMetadata; class SendCallback implements Callback { public void onCompletion(RecordMetadata m, Exception e){ if (e == null){ System.out.println("Produced a message successfully."); } else { System.out.println(e.getMessage()); } } }如需编译此代码,您需要
org.apache.kafka.clients软件包和序列化程序代码。序列化程序 Maven 工件通过自定义代码库分发。将以下节点添加到pom.xml的project节点,以配置此代码库:<repositories> <repository> <id>confluent</id> <name>Confluent</name> <url>https://packages.confluent.io/maven/</url> </repository> </repositories>将以下内容添加到
pom.xml文件中的dependencies节点:<dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.32</version> </dependency> <dependency> <groupId>io.confluent</groupId> <artifactId>kafka-avro-serializer</artifactId> <version>7.8.1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.7.2</version> </dependency>如需确保所有依赖项都得到正确解析,请编译客户端:
mvn compile
创建架构注册表
如需创建架构注册表,请运行以下命令:
gcloud beta managed-kafka schema-registries create REGISTRY_ID \
--location=REGION
替换以下内容:
REGISTRY_ID:新架构注册表的唯一标识符 。这构成了注册表资源名称的一部分。名称必须以字母开头,只能包含字母
(a-z, A-Z)、数字(0-9)和下划线(_),且不得超过 63 个字符。REGION: Google Cloud 要在其中创建架构注册表的区域。此位置必须与使用此注册表的 Kafka 集群的区域一致。
您创建的架构定义尚未上传到注册表。生成方客户端会在首次运行时执行此操作,具体步骤如下。
配置并运行生成方
此时,生成方不会运行,因为它尚未完全配置。 如需配置生成方,请提供 Kafka 和架构注册表配置。
在与
pom.xml相同的目录中创建一个名为client.properties的文件,并向其中添加以下内容:bootstrap.servers=bootstrap.CLUSTER_ID.REGION.managedkafka.PROJECT_ID.cloud.goog:9092 security.protocol=SASL_SSL sasl.mechanism=OAUTHBEARER sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required; schema.registry.url=https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/REGION/schemaRegistries/REGISTRY_ID bearer.auth.credentials.source=CUSTOM bearer.auth.custom.provider.class=com.google.cloud.hosted.kafka.auth.GcpBearerAuthCredentialProvider通过将以下内容插入到
kafka-avro-serializer依赖项的 上方 的pom.xml的dependencies节点中,将 Kafka 和架构注册表身份验证处理程序依赖项添加到 Maven 项目:<dependency> <groupId>com.google.cloud.hosted.kafka</groupId> <artifactId>managed-kafka-auth-login-handler</artifactId> <version>1.0.6</version> <exclusions> <exclusion> <groupId>io.confluent</groupId> <artifactId>kafka-schema-registry-client</artifactId> </exclusion> </exclusions> </dependency>如果您想查看自定义架构注册表身份验证处理程序的实现,请查看 GcpBearerAuthCredentialProvider 类。
编译并运行生成方客户端:
mvn compile -q exec:java -Dexec.mainClass=com.google.example.UserProducer如果一切顺利,您会看到
SendCallback类生成的输出Produced a message successfully。
检查输出
检查
User架构是否已在从主题名称和架构名称派生的主题名称下注册:SR_DOMAIN=https://managedkafka.googleapis.com SR_PATH=/v1/projects/PROJECT_ID/locations/REGION SR_HOST=$SR_DOMAIN/$SR_PATH/schemaRegistries/REGISTRY_ID/subjects curl -X GET \ -H "Content-Type: application/vnd.schemaregistry.v1+json" \ -H "Authorization: Bearer $(gcloud auth print-access-token)"\ $SR_HOST此命令的输出应如下所示:
["newUsers-value"]检查在代码库中注册的架构是否与
User相同:curl -X GET \ -H "Content-Type: application/vnd.schemaregistry.v1+json" \ -H "Authorization: Bearer $(gcloud auth print-access-token)" \ $SR_HOST/newUsers-value/versions/1该命令的输出应如下所示:
{ "subject": "newUsers-value", "version": 1, "id": 2, "schemaType": "AVRO", "schema": "{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"com.google.example\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"id\",\"type\":[\"int\",\"null\"]}]}", "references": [] }
清理
为避免因本页中使用的资源导致您的 Google Cloud 账号产生费用,请按照以下步骤操作。
控制台
gcloud
如需删除虚拟机,请使用
gcloud compute instances delete命令。gcloud compute instances delete VM_NAME --zone=ZONE如需删除架构注册表,请使用
/sdk/gcloud/reference/managed-kafka/schema-registries/delete命令。gcloud beta managed-kafka schema-registries delete REGISTRY_ID \ --location=REGION如需删除 Kafka 集群,请使用
gcloud managed-kafka clusters delete命令。gcloud managed-kafka clusters delete CLUSTER_ID \ --location=REGION --async
后续步骤
- 架构注册表概览。
- Managed Service for Apache Kafka 概览。
- 向 Managed Kafka API 进行身份验证。
- 使用 Kafka Connect 将 Avro 消息写入 BigQuery