使用 Kafka 命令行工具生成和使用消息

了解如何使用 Kafka 命令行工具连接到 Managed Service for Apache Kafka 集群、生成消息和使用消息。

准备工作

在开始学习本教程之前,请创建一个新的 Managed Service for Apache Kafka 集群。如果您已有集群,则可以跳过此步骤。

如何创建集群

控制台

  1. 前往 Managed Service for Apache Kafka > 集群页面。

    转到“集群”

  2. 点击 创建
  3. 集群名称字段中,输入集群的名称。
  4. 区域列表中,为集群选择一个位置。
  5. 对于网络配置,请配置集群可访问的子网:
    1. 项目部分,选择您的项目。
    2. 对于网络,选择 VPC 网络。
    3. 子网字段中,选择相应子网。
    4. 点击完成
  6. 点击创建

点击创建后,集群状态为 Creating。当集群准备就绪时,状态为 Active

gcloud

如需创建 Kafka 集群,请运行 managed-kafka clusters create 命令。

gcloud managed-kafka clusters create KAFKA_CLUSTER \
--location=REGION \
--cpu=3 \
--memory=3GiB \
--subnets=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \
--async

替换以下内容:

  • KAFKA_CLUSTER:Kafka 集群的名称
  • REGION:集群的位置
  • PROJECT_ID:您的项目 ID
  • SUBNET_NAME:您要在其中创建集群的子网,例如 default

如需了解支持的位置,请参阅 Managed Service for Apache Kafka 位置

该命令会异步运行,并返回一个操作 ID:

Check operation [projects/PROJECT_ID/locations/REGION/operations/OPERATION_ID] for status.

如需跟踪创建操作的进度,请使用 gcloud managed-kafka operations describe 命令:

gcloud managed-kafka operations describe OPERATION_ID \
  --location=REGION

当集群准备就绪时,此命令的输出内容会包含 state: ACTIVE 条目。如需了解详情,请参阅 监控集群创建操作

所需的角色

如需获得创建和配置客户端虚拟机所需的权限,请让管理员向您授予项目的以下 IAM 角色:

如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

您也可以通过自定义角色或其他预定义角色来获取所需的权限。

创建客户端虚拟机

在 Compute Engine 中创建一个可以访问 Kafka 集群的 Linux 虚拟机 (VM) 实例。配置虚拟机时,请设置以下选项:

  • Region 绑定将多选选项设置为所有记录中 Region 的所有值。在与 Kafka 集群相同的区域中创建虚拟机。

  • 子网。在与您在 Kafka 集群配置中使用的子网相同的 VPC 网络中创建虚拟机。如需了解详情,请参阅查看集群的子网

  • 访问权限范围。为虚拟机分配 https://www.googleapis.com/auth/cloud-platform 访问权限范围。此范围授权虚拟机向 Managed Kafka API 发送请求。

以下步骤展示了如何设置这些选项。

控制台

  1. 在 Google Cloud 控制台中,前往创建实例页面。

    创建实例

  2. 机器配置窗格中,执行以下操作:

    1. 名称字段中,指定实例的名称。如需了解详情,请参阅资源命名惯例

    2. 区域列表中,选择与 Kafka 集群相同的区域。

    3. 可用区列表中,选择一个可用区。

  3. 在导航菜单中,点击网络。在显示的网络窗格中,执行以下操作:

    1. 前往网络接口部分。

    2. 如需展开默认网络接口,请点击 箭头。

    3. 网络字段中,选择 VPC 网络。

    4. 子网列表中,选择子网。

    5. 点击完成

  4. 在导航菜单中,点击安全。在显示的安全窗格中,执行以下操作:

    1. 访问权限范围下,选择针对每个 API 设置访问权限

    2. 在访问权限范围列表中,找到 Cloud Platform 下拉列表,然后选择已启用

  5. 点击创建以创建虚拟机。

gcloud

如需创建虚拟机实例,请使用 gcloud compute instances create 命令。

gcloud compute instances create VM_NAME \
  --scopes=https://www.googleapis.com/auth/cloud-platform \
  --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET \
  --zone=ZONE

替换以下内容:

  • VM_NAME:虚拟机的名称
  • PROJECT_ID:您的项目 ID
  • REGION:创建 Kafka 集群的区域,例如 us-central1
  • SUBNET:与您在集群配置中使用的子网位于同一 VPC 网络中的子网
  • ZONE:创建集群的区域中的一个可用区,例如 us-central1-c

如需详细了解如何创建虚拟机,请参阅在特定子网中创建虚拟机实例

授予 IAM 角色

Compute Engine 默认服务账号授予以下 Identity and Access Management (IAM) 角色:

  • Managed Kafka Client (roles/managedkafka.client)
  • Service Account Token Creator (roles/iam.serviceAccountTokenCreator)
  • Service Account OpenID Token Creator (roles/iam.serviceAccountOpenIdTokenCreator)

控制台

  1. 在 Google Cloud 控制台中,前往 IAM 页面。

    转到 IAM

  2. 找到 Compute Engine 默认服务账号对应的行,然后点击 修改主账号

  3. 点击添加其他角色,然后选择角色 Managed Kafka Client。 针对 Service Account Token CreatorService Account OpenID Token Creator 角色重复此步骤。

  4. 点击保存

gcloud

如需授予 IAM 角色,请使用 gcloud projects add-iam-policy-binding 命令。

gcloud projects add-iam-policy-binding PROJECT_ID \
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/managedkafka.client

gcloud projects add-iam-policy-binding PROJECT_ID\
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/iam.serviceAccountTokenCreator

gcloud projects add-iam-policy-binding PROJECT_ID \
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/iam.serviceAccountOpenIdTokenCreator

替换以下内容:

  • PROJECT_ID:您的项目 ID

  • PROJECT_NUMBER:您的项目编号

如需获取项目编号,请运行 gcloud projects describe 命令:

gcloud projects describe PROJECT_ID

如需了解详情,请参阅查找项目名称、编号和 ID

连接到虚拟机

使用 SSH 连接到虚拟机实例。

控制台

  1. 转到虚拟机实例页面。

    进入“虚拟机实例”

  2. 在虚拟机实例列表中,找到虚拟机名称,然后点击 SSH

gcloud

如需连接到虚拟机,请使用 gcloud compute ssh 命令。

gcloud compute ssh VM_NAME \
  --project=PROJECT_ID \
  --zone=ZONE

替换以下内容:

  • VM_NAME:虚拟机的名称
  • PROJECT_ID:您的项目 ID
  • ZONE:您在其中创建虚拟机的可用区

首次使用 SSH 时,可能需要进行额外的配置。如需了解详情,请参阅关于 SSH 连接

安装 Kafka 命令行工具

在 SSH 会话中,运行以下命令以安装 Kafka 命令行工具。

  1. 安装 Java(运行 Kafka 命令行工具需要用到)和 wget(有助于下载依赖项)。以下命令假定您使用的是 Debian Linux 环境。

    sudo apt-get install default-jre wget
    
  2. 安装 Kafka 命令行工具。

    wget -O kafka_2.13-3.7.2.tgz https://archive.apache.org/dist/kafka/3.7.2/kafka_2.13-3.7.2.tgz
    tar xfz kafka_2.13-3.7.2.tgz
    
  3. 设置以下环境变量:

    export KAFKA_HOME=$(pwd)/kafka_2.13-3.7.2
    export PATH=$PATH:$KAFKA_HOME/bin
    export CLASSPATH=$CLASSPATH:$KAFKA_HOME/libs/release-and-dependencies/*:$KAFKA_HOME/libs/release-and-dependencies/dependency/*
    

设置身份验证

在 SSH 会话中,执行以下步骤来设置 Managed Service for Apache Kafka 身份验证库。

  1. 下载库并在本地安装。

    wget https://github.com/googleapis/managedkafka/releases/download/v1.0.5/release-and-dependencies.zip
    sudo apt-get install unzip
    unzip -n -j release-and-dependencies.zip -d $KAFKA_HOME/libs/
    

    此命令会将该库安装到 Kafka 安装目录的 lib 目录中。Kafka 命令行工具会在此目录中查找 Java 依赖项。

  2. 使用文本编辑器创建一个名为 client.properties 的文件,然后粘贴以下内容:

    security.protocol=SASL_SSL
    sasl.mechanism=OAUTHBEARER
    sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
    sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
    

    保存文件。此文件使用以下设置配置 Kafka 客户端:

    • 使用 SASL_SSL 与 Kafka 集群进行安全通信。

    • 使用 OAuth 2.0 不记名令牌进行身份验证。

    • 使用库提供的 GcpLoginCallbackHandler 类作为登录回调处理程序来获取 OAuth 2.0 令牌。

生成和使用消息

在 SSH 会话中,运行以下命令以生成和使用 Kafka 消息。

  1. 将引导地址设置为环境变量。

    export BOOTSTRAP=bootstrap.CLUSTER_ID.REGION.managedkafka.PROJECT_ID.cloud.goog:9092
    

    替换以下内容:

    • CLUSTER_ID:您的集群的名称
    • REGION:您在其中创建集群的可用区
    • PROJECT_ID:您的项目 ID

    如需了解详情,请参阅获取引导地址

  2. 列出集群中的主题。

    kafka-topics.sh --list \
      --bootstrap-server $BOOTSTRAP \
      --command-config client.properties
    
  3. 撰写主题消息。

    echo "hello world" | kafka-console-producer.sh \
      --topic KAFKA_TOPIC_NAME \
      --bootstrap-server $BOOTSTRAP \
      --producer.config client.properties
    

    KAFKA_TOPIC_NAME 替换为主题名称。

  4. 从主题中消费消息。

    kafka-console-consumer.sh \
      --topic KAFKA_TOPIC_NAME \
      --from-beginning \
      --bootstrap-server $BOOTSTRAP \
      --consumer.config client.properties
    

    如需停止使用消息,请按 Ctrl+C

  5. 运行生产者性能测试。

    kafka-producer-perf-test.sh \
      --topic KAFKA_TOPIC_NAME \
      --num-records 1000000 --throughput 1000 --print-metrics --record-size 1024 \
      --producer-props bootstrap.servers=$BOOTSTRAP \
      --producer.config client.properties
    

清理

为避免因本页中使用的资源导致您的 Google Cloud 账号产生费用,请按照以下步骤操作。

控制台

  1. 删除虚拟机实例。

    1. 转到虚拟机实例页面。

      进入“虚拟机实例”

    2. 选择虚拟机,然后点击删除

  2. 删除 Kafka 集群。

    1. 前往 Managed Service for Apache Kafka > 集群页面。

      转到“集群”

    2. 选择 Kafka 集群,然后点击删除

gcloud

  1. 如需删除虚拟机,请使用 gcloud compute instances delete 命令。

    gcloud compute instances delete VM_NAME --zone=ZONE
    
  2. 如需删除 Kafka 集群,请使用 gcloud managed-kafka clusters delete 命令。

    gcloud managed-kafka clusters delete CLUSTER_ID \
      --location=REGION --async
    

后续步骤

Apache Kafka® 是 Apache Software Foundation 或其关联公司在美国和/或其他国家/地区的注册商标。