开发 Java 生产者应用
了解如何开发一个 Java 生产者应用,该应用通过使用应用默认凭据 (ADC) 向 Managed Service for Apache Kafka 集群进行身份验证。借助 ADC,在 Google Cloud 上运行的应用可以自动查找并使用正确的凭据向 Google Cloud 服务进行身份验证。
准备工作
在开始学习本教程之前,请创建一个新的 Managed Service for Apache Kafka 集群。如果您已有集群,则可以跳过此步骤。
如何创建集群
控制台
- 前往 Managed Service for Apache Kafka > 集群页面。
- 点击 创建。
- 在集群名称字段中,输入集群的名称。
- 在区域列表中,为集群选择一个位置。
-
对于网络配置,请配置集群可访问的子网:
- 在项目部分,选择您的项目。
- 对于网络,选择 VPC 网络。
- 对于子网,请选择相应子网。
- 点击完成。
- 点击创建。
点击创建后,集群状态为 Creating。当集群准备就绪时,状态为 Active。
gcloud
如需创建 Kafka 集群,请运行 managed-kafka clusters
create 命令。
gcloud managed-kafka clusters create KAFKA_CLUSTER \ --location=REGION \ --cpu=3 \ --memory=3GiB \ --subnets=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \ --async
替换以下内容:
KAFKA_CLUSTER:Kafka 集群的名称REGION:集群的位置PROJECT_ID:您的项目 IDSUBNET_NAME:您要在其中创建集群的子网,例如default
如需了解支持的位置,请参阅 Managed Service for Apache Kafka 位置。
该命令会异步运行,并返回一个操作 ID:
Check operation [projects/PROJECT_ID/locations/REGION/operations/OPERATION_ID] for status.
如需跟踪创建操作的进度,请使用 gcloud managed-kafka
operations describe 命令:
gcloud managed-kafka operations describe OPERATION_ID \ --location=REGION
当集群准备就绪时,此命令的输出内容会包含 state:
ACTIVE 条目。如需了解详情,请参阅 监控集群创建操作。
所需的角色
如需获得创建和配置客户端虚拟机所需的权限,请让管理员向您授予项目的以下 IAM 角色:
-
Compute Instance Admin (v1) (
roles/compute.instanceAdmin.v1) -
Project IAM Admin (
roles/resourcemanager.projectIamAdmin) -
Role Viewer (
roles/iam.roleViewer) -
Service Account User (
roles/iam.serviceAccountUser)
如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限。
创建客户端虚拟机
在 Compute Engine 中创建一个可以访问 Kafka 集群的 Linux 虚拟机 (VM) 实例。配置虚拟机时,请设置以下选项:
Region 绑定将多选选项设置为所有记录中 Region 的所有值。在与 Kafka 集群相同的区域中创建虚拟机。
子网。在与您在 Kafka 集群配置中使用的子网相同的 VPC 网络中创建虚拟机。如需了解详情,请参阅查看集群的子网。
访问权限范围。为虚拟机分配
https://www.googleapis.com/auth/cloud-platform访问权限范围。此范围授权虚拟机向 Managed Kafka API 发送请求。
以下步骤展示了如何设置这些选项。
控制台
在 Google Cloud 控制台中,前往创建实例页面。
在机器配置窗格中,执行以下操作:
在名称字段中,指定实例的名称。如需了解详情,请参阅资源命名惯例。
在区域列表中,选择与 Kafka 集群相同的区域。
在可用区列表中,选择一个可用区。
在导航菜单中,点击网络。在显示的网络窗格中,执行以下操作:
前往网络接口部分。
如需展开默认网络接口,请点击 箭头。
在网络字段中,选择 VPC 网络。
在子网列表中,选择子网。
点击完成。
在导航菜单中,点击安全。在显示的安全窗格中,执行以下操作:
在访问权限范围下,选择针对每个 API 设置访问权限。
在访问权限范围列表中,找到 Cloud Platform 下拉列表,然后选择已启用。
点击创建以创建虚拟机。
gcloud
如需创建虚拟机实例,请使用 gcloud compute instances create 命令。
gcloud compute instances create VM_NAME \
--scopes=https://www.googleapis.com/auth/cloud-platform \
--subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET \
--zone=ZONE
替换以下内容:
- VM_NAME:虚拟机的名称
- PROJECT_ID:您的项目 ID
- REGION:创建 Kafka 集群的区域,例如
us-central1 - SUBNET:与您在集群配置中使用的子网位于同一 VPC 网络中的子网
- ZONE:创建集群的区域中的一个可用区,例如
us-central1-c
如需详细了解如何创建虚拟机,请参阅在特定子网中创建虚拟机实例。
授予 IAM 角色
向 Compute Engine 默认服务账号授予以下 Identity and Access Management (IAM) 角色:
- Managed Kafka Client (
roles/managedkafka.client) - Service Account Token Creator (
roles/iam.serviceAccountTokenCreator) Service Account OpenID Token Creator (
roles/iam.serviceAccountOpenIdTokenCreator)
控制台
在 Google Cloud 控制台中,前往 IAM 页面。
找到 Compute Engine 默认服务账号所在的行,然后点击 修改主账号。
点击添加其他角色,然后选择角色 Managed Kafka Client。 针对 Service Account Token Creator 和 Service Account OpenID Token Creator 角色重复此步骤。
点击保存。
gcloud
如需授予 IAM 角色,请使用 gcloud projects add-iam-policy-binding 命令。
gcloud projects add-iam-policy-binding PROJECT_ID \
--member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/managedkafka.client
gcloud projects add-iam-policy-binding PROJECT_ID\
--member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/iam.serviceAccountTokenCreator
gcloud projects add-iam-policy-binding PROJECT_ID \
--member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/iam.serviceAccountOpenIdTokenCreator
替换以下内容:
PROJECT_ID:您的项目 ID
PROJECT_NUMBER:您的项目编号
如需获取项目编号,请运行 gcloud projects describe 命令:
gcloud projects describe PROJECT_ID
如需了解详情,请参阅查找项目名称、编号和 ID。
连接到虚拟机
使用 SSH 连接到虚拟机实例。
控制台
转到虚拟机实例页面。
在虚拟机实例列表中,找到虚拟机名称,然后点击 SSH。
gcloud
如需连接到虚拟机,请使用 gcloud compute ssh 命令。
gcloud compute ssh VM_NAME \
--project=PROJECT_ID \
--zone=ZONE
替换以下内容:
- VM_NAME:虚拟机的名称
- PROJECT_ID:您的项目 ID
- ZONE:您在其中创建虚拟机的可用区
首次使用 SSH 时,可能需要进行额外的配置。如需了解详情,请参阅关于 SSH 连接。
设置 Apache Maven 项目
在 SSH 会话中,运行以下命令以设置 Maven 项目。
使用以下命令安装 Java 和 Maven:
sudo apt-get install maven openjdk-17-jdk。设置 Apache Maven 项目。
此命令将在名为
demo的目录中创建一个软件包com.google.example。mvn archetype:generate -DartifactId=demo -DgroupId=com.google.example\ -DarchetypeArtifactId=maven-archetype-quickstart\ -DarchetypeVersion=1.5 -DinteractiveMode=false切换到包含
cd demo的项目目录。
创建 Java 生产者应用
本部分将引导您创建一个向 Kafka 主题生成消息的 Java 应用。使用 Maven 编写并编译 Java 代码,在 kafka-client.properties 文件中配置必要的参数,然后运行应用以发送消息。
编写 Producer 代码
将 src/main/java/com/google/example/App.java 中的代码替换为以下代码:
package com.google.example;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.Callback;
class SendCallback implements Callback {
public void onCompletion(RecordMetadata m, Exception e){
if (e == null){
System.out.println("Produced a message successfully.");
} else {
System.out.println(e.getMessage());
}
}
}
public class App {
public static void main(String[] args) throws Exception {
Properties p = new Properties();
p.load(new java.io.FileReader("kafka-client.properties"));
KafkaProducer producer = new KafkaProducer(p);
ProducerRecord message = new ProducerRecord("topicName", "key", "value");
SendCallback callback = new SendCallback();
producer.send(message,callback);
producer.close();
}
}
编译应用
如需编译此应用,您需要与 Kafka 客户端相关的软件包(一般)以及特定于 Google Cloud的身份验证逻辑。
在演示版项目目录中,您会找到包含此项目 Maven 配置的
pom.xml。将以下行添加到pom.xml.的<dependencies>部分<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.7.2</version> </dependency> <dependency> <groupId>com.google.cloud.hosted.kafka</groupId> <artifactId>managed-kafka-auth-login-handler</artifactId> <version>1.0.5</version> </dependency>使用
mvn compile编译应用。
配置并运行应用
生产者需要名为
kafka-client.properties的文件中的客户端配置参数。在演示项目目录(包含pom.xml的目录)中创建此文件,其中包含以下内容:bootstrap.servers=bootstrap.CLUSTER_ID.REGION.managedkafka.PROJECT_ID.cloud.goog:9092 value.serializer=org.apache.kafka.common.serialization.StringSerializer key.serializer=org.apache.kafka.common.serialization.StringSerializer security.protocol=SASL_SSL sasl.mechanism=OAUTHBEARER sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;您现在可以运行应用了:
mvn exec:java -Dexec.mainClass="com.google.example.App" --quiet
清理
为避免因本页中使用的资源导致您的 Google Cloud 账号产生费用,请按照以下步骤操作。
控制台
gcloud
如需删除虚拟机,请使用
gcloud compute instances delete命令。gcloud compute instances delete VM_NAME --zone=ZONE如需删除 Kafka 集群,请使用
gcloud managed-kafka clusters delete命令。gcloud managed-kafka clusters delete CLUSTER_ID \ --location=REGION --async