开发 Java 生产者应用

了解如何开发一个 Java 生产者应用,该应用通过使用应用默认凭据 (ADC) 向 Managed Service for Apache Kafka 集群进行身份验证。借助 ADC,在 Google Cloud 上运行的应用可以自动查找并使用正确的凭据向 Google Cloud 服务进行身份验证。

准备工作

在开始学习本教程之前,请创建一个新的 Managed Service for Apache Kafka 集群。如果您已有集群,则可以跳过此步骤。

如何创建集群

控制台

  1. 前往 Managed Service for Apache Kafka > 集群页面。

    转到“集群”

  2. 点击 创建
  3. 集群名称字段中,输入集群的名称。
  4. 区域列表中,为集群选择一个位置。
  5. 对于网络配置,请配置集群可访问的子网:
    1. 项目部分,选择您的项目。
    2. 对于网络,选择 VPC 网络。
    3. 对于子网,请选择相应子网。
    4. 点击完成
  6. 点击创建

点击创建后,集群状态为 Creating。当集群准备就绪时,状态为 Active

gcloud

如需创建 Kafka 集群,请运行 managed-kafka clusters create 命令。

gcloud managed-kafka clusters create KAFKA_CLUSTER \
--location=REGION \
--cpu=3 \
--memory=3GiB \
--subnets=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \
--async

替换以下内容:

  • KAFKA_CLUSTER:Kafka 集群的名称
  • REGION:集群的位置
  • PROJECT_ID:您的项目 ID
  • SUBNET_NAME:您要在其中创建集群的子网,例如 default

如需了解支持的位置,请参阅 Managed Service for Apache Kafka 位置

该命令会异步运行,并返回一个操作 ID:

Check operation [projects/PROJECT_ID/locations/REGION/operations/OPERATION_ID] for status.

如需跟踪创建操作的进度,请使用 gcloud managed-kafka operations describe 命令:

gcloud managed-kafka operations describe OPERATION_ID \
  --location=REGION

当集群准备就绪时,此命令的输出内容会包含 state: ACTIVE 条目。如需了解详情,请参阅 监控集群创建操作

所需的角色

如需获得创建和配置客户端虚拟机所需的权限,请让管理员向您授予项目的以下 IAM 角色:

如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

您也可以通过自定义角色或其他预定义角色来获取所需的权限。

创建客户端虚拟机

在 Compute Engine 中创建一个可以访问 Kafka 集群的 Linux 虚拟机 (VM) 实例。配置虚拟机时,请设置以下选项:

  • Region 绑定将多选选项设置为所有记录中 Region 的所有值。在与 Kafka 集群相同的区域中创建虚拟机。

  • 子网。在与您在 Kafka 集群配置中使用的子网相同的 VPC 网络中创建虚拟机。如需了解详情,请参阅查看集群的子网

  • 访问权限范围。为虚拟机分配 https://www.googleapis.com/auth/cloud-platform 访问权限范围。此范围授权虚拟机向 Managed Kafka API 发送请求。

以下步骤展示了如何设置这些选项。

控制台

  1. 在 Google Cloud 控制台中,前往创建实例页面。

    创建实例

  2. 机器配置窗格中,执行以下操作:

    1. 名称字段中,指定实例的名称。如需了解详情,请参阅资源命名惯例

    2. 区域列表中,选择与 Kafka 集群相同的区域。

    3. 可用区列表中,选择一个可用区。

  3. 在导航菜单中,点击网络。在显示的网络窗格中,执行以下操作:

    1. 前往网络接口部分。

    2. 如需展开默认网络接口,请点击 箭头。

    3. 网络字段中,选择 VPC 网络。

    4. 子网列表中,选择子网。

    5. 点击完成

  4. 在导航菜单中,点击安全。在显示的安全窗格中,执行以下操作:

    1. 访问权限范围下,选择针对每个 API 设置访问权限

    2. 在访问权限范围列表中,找到 Cloud Platform 下拉列表,然后选择已启用

  5. 点击创建以创建虚拟机。

gcloud

如需创建虚拟机实例,请使用 gcloud compute instances create 命令。

gcloud compute instances create VM_NAME \
  --scopes=https://www.googleapis.com/auth/cloud-platform \
  --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET \
  --zone=ZONE

替换以下内容:

  • VM_NAME:虚拟机的名称
  • PROJECT_ID:您的项目 ID
  • REGION:创建 Kafka 集群的区域,例如 us-central1
  • SUBNET:与您在集群配置中使用的子网位于同一 VPC 网络中的子网
  • ZONE:创建集群的区域中的一个可用区,例如 us-central1-c

如需详细了解如何创建虚拟机,请参阅在特定子网中创建虚拟机实例

授予 IAM 角色

Compute Engine 默认服务账号授予以下 Identity and Access Management (IAM) 角色:

  • Managed Kafka Client (roles/managedkafka.client)
  • Service Account Token Creator (roles/iam.serviceAccountTokenCreator)
  • Service Account OpenID Token Creator (roles/iam.serviceAccountOpenIdTokenCreator)

控制台

  1. 在 Google Cloud 控制台中,前往 IAM 页面。

    转到 IAM

  2. 找到 Compute Engine 默认服务账号所在的行,然后点击 修改主账号

  3. 点击添加其他角色,然后选择角色 Managed Kafka Client。 针对 Service Account Token CreatorService Account OpenID Token Creator 角色重复此步骤。

  4. 点击保存

gcloud

如需授予 IAM 角色,请使用 gcloud projects add-iam-policy-binding 命令。

gcloud projects add-iam-policy-binding PROJECT_ID \
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/managedkafka.client

gcloud projects add-iam-policy-binding PROJECT_ID\
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/iam.serviceAccountTokenCreator

gcloud projects add-iam-policy-binding PROJECT_ID \
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/iam.serviceAccountOpenIdTokenCreator

替换以下内容:

  • PROJECT_ID:您的项目 ID

  • PROJECT_NUMBER:您的项目编号

如需获取项目编号,请运行 gcloud projects describe 命令:

gcloud projects describe PROJECT_ID

如需了解详情,请参阅查找项目名称、编号和 ID

连接到虚拟机

使用 SSH 连接到虚拟机实例。

控制台

  1. 转到虚拟机实例页面。

    进入“虚拟机实例”

  2. 在虚拟机实例列表中,找到虚拟机名称,然后点击 SSH

gcloud

如需连接到虚拟机,请使用 gcloud compute ssh 命令。

gcloud compute ssh VM_NAME \
  --project=PROJECT_ID \
  --zone=ZONE

替换以下内容:

  • VM_NAME:虚拟机的名称
  • PROJECT_ID:您的项目 ID
  • ZONE:您在其中创建虚拟机的可用区

首次使用 SSH 时,可能需要进行额外的配置。如需了解详情,请参阅关于 SSH 连接

设置 Apache Maven 项目

在 SSH 会话中,运行以下命令以设置 Maven 项目。

  1. 使用以下命令安装 Java 和 Maven:sudo apt-get install maven openjdk-17-jdk

  2. 设置 Apache Maven 项目。

    此命令将在名为 demo 的目录中创建一个软件包 com.google.example

    mvn archetype:generate -DartifactId=demo -DgroupId=com.google.example\
       -DarchetypeArtifactId=maven-archetype-quickstart\
       -DarchetypeVersion=1.5 -DinteractiveMode=false
    
  3. 切换到包含 cd demo 的项目目录。

创建 Java 生产者应用

本部分将引导您创建一个向 Kafka 主题生成消息的 Java 应用。使用 Maven 编写并编译 Java 代码,在 kafka-client.properties 文件中配置必要的参数,然后运行应用以发送消息。

编写 Producer 代码

src/main/java/com/google/example/App.java 中的代码替换为以下代码:

  package com.google.example;

  import java.util.Properties;
  import org.apache.kafka.clients.producer.KafkaProducer;
  import org.apache.kafka.clients.producer.ProducerRecord;
  import org.apache.kafka.clients.producer.RecordMetadata;
  import org.apache.kafka.clients.producer.Callback;


  class SendCallback implements Callback {
          public void onCompletion(RecordMetadata m, Exception e){
              if (e == null){
                System.out.println("Produced a message successfully.");
              } else {
                System.out.println(e.getMessage());
              }
          }
  }

  public class App {
      public static void main(String[] args) throws Exception {
          Properties p = new Properties();
          p.load(new java.io.FileReader("kafka-client.properties"));

          KafkaProducer producer = new KafkaProducer(p);
          ProducerRecord message = new ProducerRecord("topicName", "key", "value");
          SendCallback callback = new SendCallback();
          producer.send(message,callback);
          producer.close();
      }
  }

编译应用

如需编译此应用,您需要与 Kafka 客户端相关的软件包(一般)以及特定于 Google Cloud的身份验证逻辑。

  1. 在演示版项目目录中,您会找到包含此项目 Maven 配置的 pom.xml。将以下行添加到 pom.xml.<dependencies> 部分

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>3.7.2</version>
    </dependency>
    <dependency>
        <groupId>com.google.cloud.hosted.kafka</groupId>
        <artifactId>managed-kafka-auth-login-handler</artifactId>
        <version>1.0.5</version>
    </dependency>
    
  2. 使用 mvn compile 编译应用。

配置并运行应用

  1. 生产者需要名为 kafka-client.properties 的文件中的客户端配置参数。在演示项目目录(包含 pom.xml 的目录)中创建此文件,其中包含以下内容:

    bootstrap.servers=bootstrap.CLUSTER_ID.REGION.managedkafka.PROJECT_ID.cloud.goog:9092
    value.serializer=org.apache.kafka.common.serialization.StringSerializer
    key.serializer=org.apache.kafka.common.serialization.StringSerializer
    security.protocol=SASL_SSL
    sasl.mechanism=OAUTHBEARER
    sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
    sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
    
  2. 您现在可以运行应用了:

    mvn exec:java -Dexec.mainClass="com.google.example.App" --quiet
    

清理

为避免因本页中使用的资源导致您的 Google Cloud 账号产生费用,请按照以下步骤操作。

控制台

  1. 删除虚拟机实例。

    1. 转到虚拟机实例页面。

      进入“虚拟机实例”

    2. 选择虚拟机,然后点击删除

  2. 删除 Kafka 集群。

    1. 前往 Managed Service for Apache Kafka > 集群页面。

      转到“集群”

    2. 选择 Kafka 集群,然后点击删除

gcloud

  1. 如需删除虚拟机,请使用 gcloud compute instances delete 命令。

    gcloud compute instances delete VM_NAME --zone=ZONE
    
  2. 如需删除 Kafka 集群,请使用 gcloud managed-kafka clusters delete 命令。

    gcloud managed-kafka clusters delete CLUSTER_ID \
      --location=REGION --async
    

后续步骤

Apache Kafka® 是 Apache Software Foundation 或其关联公司在美国和/或其他国家/地区的注册商标。