使用架构注册表生成 Avro 消息
了解如何开发使用架构注册表(预览版)生成 Apache Avro 消息的 Java 生产者应用。应用将消息写入 Managed Service for Apache Kafka 集群。
准备工作
在开始学习本教程之前,请创建一个新的 Managed Service for Apache Kafka 集群。如果您已有集群,则可以跳过此步骤。
如何创建集群
控制台
- 前往 Managed Service for Apache Kafka > 集群页面。
- 点击 创建。
- 在集群名称字段中,输入集群的名称。
- 在区域列表中,为集群选择一个位置。
-
对于网络配置,请配置集群可访问的子网:
- 在项目部分,选择您的项目。
- 对于网络,选择 VPC 网络。
- 对于子网,请选择相应子网。
- 点击完成。
- 点击创建。
点击创建后,集群状态为 Creating。当集群准备就绪时,状态为 Active。
gcloud
如需创建 Kafka 集群,请运行 managed-kafka clusters
create 命令。
gcloud managed-kafka clusters create KAFKA_CLUSTER \ --location=REGION \ --cpu=3 \ --memory=3GiB \ --subnets=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \ --async
替换以下内容:
KAFKA_CLUSTER:Kafka 集群的名称REGION:集群的位置PROJECT_ID:您的项目 IDSUBNET_NAME:您要在其中创建集群的子网,例如default
如需了解支持的位置,请参阅 Managed Service for Apache Kafka 位置。
该命令会异步运行,并返回一个操作 ID:
Check operation [projects/PROJECT_ID/locations/REGION/operations/OPERATION_ID] for status.
如需跟踪创建操作的进度,请使用 gcloud managed-kafka
operations describe 命令:
gcloud managed-kafka operations describe OPERATION_ID \ --location=REGION
当集群准备就绪时,此命令的输出内容会包含 state:
ACTIVE 条目。如需了解详情,请参阅 监控集群创建操作。
所需的角色
如需获得创建和配置客户端虚拟机所需的权限,请让管理员向您授予项目的以下 IAM 角色:
-
Compute Instance Admin (v1) (
roles/compute.instanceAdmin.v1) -
Project IAM Admin (
roles/resourcemanager.projectIamAdmin) -
Role Viewer (
roles/iam.roleViewer) -
Service Account User (
roles/iam.serviceAccountUser)
如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限。
设置客户端虚拟机
在 Compute Engine 中创建一个可以访问 Kafka 集群的 Linux 虚拟机 (VM) 实例。配置虚拟机时,请设置以下选项:
Region 绑定将多选选项设置为所有记录中 Region 的所有值。在与 Kafka 集群相同的区域中创建虚拟机。
子网。在与您在 Kafka 集群配置中使用的子网相同的 VPC 网络中创建虚拟机。如需了解详情,请参阅查看集群的子网。
访问权限范围。为虚拟机分配
https://www.googleapis.com/auth/cloud-platform访问权限范围。此范围授权虚拟机向 Managed Kafka API 发送请求。
以下步骤展示了如何设置这些选项。
控制台
在 Google Cloud 控制台中,前往创建实例页面。
在机器配置窗格中,执行以下操作:
在名称字段中,指定实例的名称。如需了解详情,请参阅资源命名惯例。
在区域列表中,选择与 Kafka 集群相同的区域。
在可用区列表中,选择一个可用区。
在导航菜单中,点击网络。在显示的网络窗格中,执行以下操作:
前往网络接口部分。
如需展开默认网络接口,请点击 箭头。
在网络字段中,选择 VPC 网络。
在子网列表中,选择子网。
点击完成。
在导航菜单中,点击安全。在显示的安全窗格中,执行以下操作:
在访问权限范围下,选择针对每个 API 设置访问权限。
在访问权限范围列表中,找到 Cloud Platform 下拉列表,然后选择已启用。
点击创建以创建虚拟机。
gcloud
如需创建虚拟机实例,请使用 gcloud compute instances create 命令。
gcloud compute instances create VM_NAME \
--scopes=https://www.googleapis.com/auth/cloud-platform \
--subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET \
--zone=ZONE
替换以下内容:
- VM_NAME:虚拟机的名称
- PROJECT_ID:您的项目 ID
- REGION:创建 Kafka 集群的区域,例如
us-central1 - SUBNET:与您在集群配置中使用的子网位于同一 VPC 网络中的子网
- ZONE:创建集群的区域中的一个可用区,例如
us-central1-c
如需详细了解如何创建虚拟机,请参阅在特定子网中创建虚拟机实例。
授予 IAM 角色
向 Compute Engine 默认服务账号授予以下 Identity and Access Management (IAM) 角色:
- Managed Kafka Client (
roles/managedkafka.client) - Schema Registry Admin (
roles/managedkafka.schemaRegistryAdmin) - Service Account Token Creator (
roles/iam.serviceAccountTokenCreator) Service Account OpenID Token Creator (
roles/iam.serviceAccountOpenIdTokenCreator)
控制台
在 Google Cloud 控制台中,前往 IAM 页面。
找到 Compute Engine 默认服务账号所在的行,然后点击 修改主账号。
点击添加其他角色,然后选择角色 Managed Kafka Client。 针对 Schema Registry Admin、Service Account Token Creator 和 Service Account OpenID Token Creator 角色重复执行此步骤。
点击保存。
gcloud
如需授予 IAM 角色,请使用 gcloud projects add-iam-policy-binding 命令。
gcloud projects add-iam-policy-binding PROJECT_ID \
--member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/managedkafka.client
gcloud projects add-iam-policy-binding PROJECT_ID\
--member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/managedkafka.schemaRegistryAdmin
gcloud projects add-iam-policy-binding PROJECT_ID\
--member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/iam.serviceAccountTokenCreator
gcloud projects add-iam-policy-binding PROJECT_ID \
--member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/iam.serviceAccountOpenIdTokenCreator
替换以下内容:
PROJECT_ID:您的项目 ID
PROJECT_NUMBER:您的项目编号
如需获取项目编号,请运行 gcloud projects describe 命令:
gcloud projects describe PROJECT_ID
如需了解详情,请参阅查找项目名称、编号和 ID。
连接到虚拟机
使用 SSH 连接到虚拟机实例。
控制台
转到虚拟机实例页面。
在虚拟机实例列表中,找到虚拟机名称,然后点击 SSH。
gcloud
如需连接到虚拟机,请使用 gcloud compute ssh 命令。
gcloud compute ssh VM_NAME \
--project=PROJECT_ID \
--zone=ZONE
替换以下内容:
- VM_NAME:虚拟机的名称
- PROJECT_ID:您的项目 ID
- ZONE:您在其中创建虚拟机的可用区
首次使用 SSH 时,可能需要进行额外的配置。如需了解详情,请参阅关于 SSH 连接。
设置 Apache Maven 项目
在 SSH 会话中,运行以下命令以设置 Maven 项目。
使用以下命令安装 Java 和 Maven:
sudo apt-get install maven openjdk-17-jdk设置 Apache Maven 项目。
使用以下命令在名为
demo的目录中创建软件包com.google.example。mvn archetype:generate -DartifactId=demo -DgroupId=com.google.example\ -DarchetypeArtifactId=maven-archetype-quickstart\ -DarchetypeVersion=1.5 -DinteractiveMode=false
定义架构及其 Java 实现
在此示例中,消息表示具有名称和可选 ID 的“用户”。这对应于具有两个字段的 Avro 架构:一个必需字段 name(类型为 string)和一个可选整数 id.。如需在 Java 程序中使用此架构,您还需要生成与此架构对应的对象的 Java 实现。
切换到包含
cd demo的项目目录。在代码中创建用于存储架构文件的文件夹:
mkdir -p src/main/avro通过将以下代码粘贴到名为
src/main/avro/User.avsc的文件中来创建 Avro 架构定义:{ "namespace": "com.google.example", "type": "record", "name": "User", "fields": [ {"name": "name", "type": "string"}, {"name": "id", "type": ["int", "null"]} ] }通过将以下内容添加到
pom.xml.的build节点,配置 Maven 项目以使用 Avro Java 代码生成插件。请注意,pom.xml可能在pluginManagement节点内有其他plugins节点。在此步骤中,请勿更改pluginManagement节点。plugins节点需要与pluginManagement处于同一级别。<plugins> <plugin> <groupId>org.apache.avro</groupId> <artifactId>avro-maven-plugin</artifactId> <version>1.11.1</version> <executions> <execution> <phase>generate-sources</phase> <goals> <goal>schema</goal> </goals> <configuration> <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory> <outputDirectory>${project.basedir}/src/main/java/</outputDirectory> </configuration> </execution> </executions> </plugin> </plugins>通过将以下内容添加到
pom.xml的project/dependencies节点的末尾,添加 Avro 作为依赖项。请注意,pom.xml在dependencyManagement标记内已有一个dependencies节点。 在此步骤中,请勿更改dependencyManagement节点。<dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>1.11.1</version> </dependency>生成 Java 源代码
mvn generate-sources运行以下命令,检查是否已创建实现源文件。来源是 Java 类文件,该文件实现了
User对象的构造函数、访问器、序列化程序和反序列化程序。您将在生产者代码中使用此类。cat src/main/java/com/google/example/User.java
如需详细了解 Apache Avro,请参阅 Apache Avro 使用入门指南。
创建生产者客户端
本部分将逐步介绍如何编写、构建和运行生产者客户端。
实现提供方
提供方使用 KafkaAvroSerializer.java 对消息进行编码并管理其架构。序列化程序会自动连接到架构注册表,在某个主题下注册架构,检索其 ID,然后使用 Avro 序列化消息。您仍需配置提供方和序列化程序。
通过将以下代码粘贴到名为
src/main/java/com/google/example/UserProducer.java的新文件中来创建生产者客户端类package com.google.example; import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import io.confluent.kafka.serializers.KafkaAvroSerializer; public class UserProducer { private static Properties configure() throws Exception { Properties p = new Properties(); p.load(new java.io.FileReader("client.properties")); p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class); return p; } public static void main(String[] args) throws Exception { Properties p = configure(); KafkaProducer<String, User> producer = new KafkaProducer<String, User>(p); final User u = new User("SchemaEnthusiast", 42); final String topicName = "newUsers"; ProducerRecord<String, User> message = new ProducerRecord<String, User>(topicName, "", u); producer.send(message, new SendCallback()); producer.close(); } }在
src/main/java/com/google/example/SendCallback.java中定义回调类:package com.google.example; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.RecordMetadata; class SendCallback implements Callback { public void onCompletion(RecordMetadata m, Exception e){ if (e == null){ System.out.println("Produced a message successfully."); } else { System.out.println(e.getMessage()); } } }如需编译此代码,您需要
org.apache.kafka.clients软件包和序列化程序代码。序列化程序 Maven 制品通过自定义代码库分发。将以下节点添加到pom.xml的project节点,以配置此代码库:<repositories> <repository> <id>confluent</id> <name>Confluent</name> <url>https://packages.confluent.io/maven/</url> </repository> </repositories>将以下内容添加到
pom.xml文件中的dependencies节点:<dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.32</version> </dependency> <dependency> <groupId>io.confluent</groupId> <artifactId>kafka-avro-serializer</artifactId> <version>7.8.1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.7.2</version> </dependency>为确保所有依赖项都已正确解析,请编译客户端:
mvn compile
创建架构注册表
如需创建架构注册表,请运行以下命令:
gcloud beta managed-kafka schema-registries create REGISTRY_ID \
--location=REGION
替换以下内容:
REGISTRY_ID:新架构注册表的唯一标识符。这构成了注册表的资源名称的一部分。名称必须以字母开头,只能包含字母
(a-z, A-Z)、数字(0-9)和下划线(_),且长度不得超过 63 个字符。REGION:将要创建架构注册表的 Google Cloud 区域。此位置必须与使用相应注册表的 Kafka 集群的区域一致。
您创建的架构定义尚未上传到注册表。生产者客户端在首次运行时会执行以下步骤。
配置并运行提供方
此时,由于生产者未完全配置,因此不会运行。如需配置提供方,请同时提供 Kafka 和架构注册表配置。
在
pom.xml所在的同一目录中创建一个名为client.properties的文件,并向其中添加以下内容:bootstrap.servers=bootstrap.CLUSTER_ID.REGION.managedkafka.PROJECT_ID.cloud.goog:9092 security.protocol=SASL_SSL sasl.mechanism=OAUTHBEARER sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required; schema.registry.url=https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/REGION/schemaRegistries/REGISTRY_ID bearer.auth.credentials.source=CUSTOM bearer.auth.custom.provider.class=com.google.cloud.hosted.kafka.auth.GcpBearerAuthCredentialProvider通过将以下内容插入到
pom.xml的dependencies节点(kafka-avro-serializer依赖项上方)中,将 Kafka 和架构注册表身份验证处理程序依赖项添加到您的 Maven 项目:<dependency> <groupId>com.google.cloud.hosted.kafka</groupId> <artifactId>managed-kafka-auth-login-handler</artifactId> <version>1.0.6</version> <exclusions> <exclusion> <groupId>io.confluent</groupId> <artifactId>kafka-schema-registry-client</artifactId> </exclusion> </exclusions> </dependency>如果您想查看自定义架构注册表身份验证处理程序身份验证处理程序的实现,请查看 GcpBearerAuthCredentialProvider 类。
编译并运行提供方客户端:
mvn compile -q exec:java -Dexec.mainClass=com.google.example.UserProducer如果一切顺利,您会看到
SendCallback类生成的输出Produced a message successfully。
检查输出
检查
User架构是否已在从主题和架构名称派生的主题名称下注册:SR_DOMAIN=https://managedkafka.googleapis.com SR_PATH=/v1/projects/PROJECT_ID/locations/REGION SR_HOST=$SR_DOMAIN/$SR_PATH/schemaRegistries/REGISTRY_ID/subjects curl -X GET \ -H "Content-Type: application/vnd.schemaregistry.v1+json" \ -H "Authorization: Bearer $(gcloud auth print-access-token)"\ $SR_HOST此命令的输出应如下所示:
["newUsers-value"]检查在代码库中注册的架构是否与
User相同:curl -X GET \ -H "Content-Type: application/vnd.schemaregistry.v1+json" \ -H "Authorization: Bearer $(gcloud auth print-access-token)" \ $SR_HOST/newUsers-value/versions/1该命令的输出应如下所示:
{ "subject": "newUsers-value", "version": 1, "id": 2, "schemaType": "AVRO", "schema": "{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"com.google.example\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"id\",\"type\":[\"int\",\"null\"]}]}", "references": [] }
清理
为避免因本页中使用的资源导致您的 Google Cloud 账号产生费用,请按照以下步骤操作。
控制台
gcloud
如需删除虚拟机,请使用
gcloud compute instances delete命令。gcloud compute instances delete VM_NAME --zone=ZONE如需删除架构注册表,请使用
/sdk/gcloud/reference/managed-kafka/schema-registries/delete命令。gcloud beta managed-kafka schema-registries delete REGISTRY_ID \ --location=REGION如需删除 Kafka 集群,请使用
gcloud managed-kafka clusters delete命令。gcloud managed-kafka clusters delete CLUSTER_ID \ --location=REGION --async
后续步骤
- 架构注册表概览。
- Managed Service for Apache Kafka 概览。
- 向 Managed Kafka API 进行身份验证。
- 使用 Kafka Connect 将 Avro 消息写入 BigQuery