使用 Kafka 命令行工具生成和使用消息
了解如何使用 Kafka 命令行工具连接到 Managed Service for Apache Kafka 集群、生成消息和使用消息。
准备工作
在开始学习本教程之前,请创建一个新的 Managed Service for Apache Kafka 集群。如果您已有集群,则可以跳过此步骤。
如何创建集群
控制台
- 前往 Managed Service for Apache Kafka > 集群页面。
- 点击 创建。
- 在集群名称字段中,输入集群的名称。
- 在区域列表中,为集群选择一个位置。
-
对于网络配置,请配置集群可访问的子网:
- 在项目部分,选择您的项目。
- 对于网络,选择 VPC 网络。
- 在子网字段中,选择相应子网。
- 点击完成。
- 点击创建。
点击创建后,集群状态为 Creating。当集群准备就绪时,状态为 Active。
gcloud
如需创建 Kafka 集群,请运行 managed-kafka clusters
create 命令。
gcloud managed-kafka clusters create KAFKA_CLUSTER \ --location=REGION \ --cpu=3 \ --memory=3GiB \ --subnets=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \ --async
替换以下内容:
KAFKA_CLUSTER:Kafka 集群的名称REGION:集群的位置PROJECT_ID:您的项目 IDSUBNET_NAME:您要在其中创建集群的子网,例如default
如需了解支持的位置,请参阅 Managed Service for Apache Kafka 位置。
该命令会异步运行,并返回一个操作 ID:
Check operation [projects/PROJECT_ID/locations/REGION/operations/OPERATION_ID] for status.
如需跟踪创建操作的进度,请使用 gcloud managed-kafka
operations describe 命令:
gcloud managed-kafka operations describe OPERATION_ID \ --location=REGION
当集群准备就绪时,此命令的输出内容会包含 state:
ACTIVE 条目。如需了解详情,请参阅 监控集群创建操作。
所需的角色
如需获得创建和配置客户端虚拟机所需的权限,请让管理员向您授予项目的以下 IAM 角色:
- Compute Instance Admin (v1) (
roles/compute.instanceAdmin.v1) - Project IAM Admin (
roles/resourcemanager.projectIamAdmin) - Role Viewer (
roles/iam.roleViewer) - Service Account User (
roles/iam.serviceAccountUser)
如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限。
创建客户端虚拟机
在 Compute Engine 中创建一个可以访问 Kafka 集群的 Linux 虚拟机 (VM) 实例。配置虚拟机时,请设置以下选项:
Region 绑定将多选选项设置为所有记录中 Region 的所有值。在与 Kafka 集群相同的区域中创建虚拟机。
子网。在与您在 Kafka 集群配置中使用的子网相同的 VPC 网络中创建虚拟机。如需了解详情,请参阅查看集群的子网。
访问权限范围。为虚拟机分配
https://www.googleapis.com/auth/cloud-platform访问权限范围。此范围授权虚拟机向 Managed Kafka API 发送请求。
以下步骤展示了如何设置这些选项。
控制台
在 Google Cloud 控制台中,前往创建实例页面。
在机器配置窗格中,执行以下操作:
在名称字段中,指定实例的名称。如需了解详情,请参阅资源命名惯例。
在区域列表中,选择与 Kafka 集群相同的区域。
在可用区列表中,选择一个可用区。
在导航菜单中,点击网络。在显示的网络窗格中,执行以下操作:
前往网络接口部分。
如需展开默认网络接口,请点击 箭头。
在网络字段中,选择 VPC 网络。
在子网列表中,选择子网。
点击完成。
在导航菜单中,点击安全。在显示的安全窗格中,执行以下操作:
在访问权限范围下,选择针对每个 API 设置访问权限。
在访问权限范围列表中,找到 Cloud Platform 下拉列表,然后选择已启用。
点击创建以创建虚拟机。
gcloud
如需创建虚拟机实例,请使用 gcloud compute instances create 命令。
gcloud compute instances create VM_NAME \
--scopes=https://www.googleapis.com/auth/cloud-platform \
--subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET \
--zone=ZONE
替换以下内容:
- VM_NAME:虚拟机的名称
- PROJECT_ID:您的项目 ID
- REGION:创建 Kafka 集群的区域,例如
us-central1 - SUBNET:与您在集群配置中使用的子网位于同一 VPC 网络中的子网
- ZONE:创建集群的区域中的一个可用区,例如
us-central1-c
如需详细了解如何创建虚拟机,请参阅在特定子网中创建虚拟机实例。
授予 IAM 角色
向 Compute Engine 默认服务账号授予以下 Identity and Access Management (IAM) 角色:
- Managed Kafka Client (
roles/managedkafka.client) - Service Account Token Creator (
roles/iam.serviceAccountTokenCreator) Service Account OpenID Token Creator (
roles/iam.serviceAccountOpenIdTokenCreator)
控制台
在 Google Cloud 控制台中,前往 IAM 页面。
找到 Compute Engine 默认服务账号对应的行,然后点击 修改主账号。
点击添加其他角色,然后选择角色 Managed Kafka Client。 针对 Service Account Token Creator 和 Service Account OpenID Token Creator 角色重复此步骤。
点击保存。
gcloud
如需授予 IAM 角色,请使用 gcloud projects add-iam-policy-binding 命令。
gcloud projects add-iam-policy-binding PROJECT_ID \
--member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/managedkafka.client
gcloud projects add-iam-policy-binding PROJECT_ID\
--member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/iam.serviceAccountTokenCreator
gcloud projects add-iam-policy-binding PROJECT_ID \
--member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/iam.serviceAccountOpenIdTokenCreator
替换以下内容:
PROJECT_ID:您的项目 ID
PROJECT_NUMBER:您的项目编号
如需获取项目编号,请运行 gcloud projects describe 命令:
gcloud projects describe PROJECT_ID
如需了解详情,请参阅查找项目名称、编号和 ID。
连接到虚拟机
使用 SSH 连接到虚拟机实例。
控制台
转到虚拟机实例页面。
在虚拟机实例列表中,找到虚拟机名称,然后点击 SSH。
gcloud
如需连接到虚拟机,请使用 gcloud compute ssh 命令。
gcloud compute ssh VM_NAME \
--project=PROJECT_ID \
--zone=ZONE
替换以下内容:
- VM_NAME:虚拟机的名称
- PROJECT_ID:您的项目 ID
- ZONE:您在其中创建虚拟机的可用区
首次使用 SSH 时,可能需要进行额外的配置。如需了解详情,请参阅关于 SSH 连接。
安装 Kafka 命令行工具
在 SSH 会话中,运行以下命令以安装 Kafka 命令行工具。
安装 Java(运行 Kafka 命令行工具需要用到)和
wget(有助于下载依赖项)。以下命令假定您使用的是 Debian Linux 环境。sudo apt-get install default-jre wget安装 Kafka 命令行工具。
wget -O kafka_2.13-3.7.2.tgz https://archive.apache.org/dist/kafka/3.7.2/kafka_2.13-3.7.2.tgz tar xfz kafka_2.13-3.7.2.tgz设置以下环境变量:
export KAFKA_HOME=$(pwd)/kafka_2.13-3.7.2 export PATH=$PATH:$KAFKA_HOME/bin export CLASSPATH=$CLASSPATH:$KAFKA_HOME/libs/release-and-dependencies/*:$KAFKA_HOME/libs/release-and-dependencies/dependency/*
设置身份验证
在 SSH 会话中,执行以下步骤来设置 Managed Service for Apache Kafka 身份验证库。
下载库并在本地安装。
wget https://github.com/googleapis/managedkafka/releases/download/v1.0.5/release-and-dependencies.zip sudo apt-get install unzip unzip -n -j release-and-dependencies.zip -d $KAFKA_HOME/libs/此命令会将该库安装到 Kafka 安装目录的
lib目录中。Kafka 命令行工具会在此目录中查找 Java 依赖项。使用文本编辑器创建一个名为
client.properties的文件,然后粘贴以下内容:security.protocol=SASL_SSL sasl.mechanism=OAUTHBEARER sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;保存文件。此文件使用以下设置配置 Kafka 客户端:
使用 SASL_SSL 与 Kafka 集群进行安全通信。
使用 OAuth 2.0 不记名令牌进行身份验证。
使用库提供的
GcpLoginCallbackHandler类作为登录回调处理程序来获取 OAuth 2.0 令牌。
生成和使用消息
在 SSH 会话中,运行以下命令以生成和使用 Kafka 消息。
将引导地址设置为环境变量。
export BOOTSTRAP=bootstrap.CLUSTER_ID.REGION.managedkafka.PROJECT_ID.cloud.goog:9092替换以下内容:
CLUSTER_ID:您的集群的名称REGION:您在其中创建集群的可用区PROJECT_ID:您的项目 ID
如需了解详情,请参阅获取引导地址。
列出集群中的主题。
kafka-topics.sh --list \ --bootstrap-server $BOOTSTRAP \ --command-config client.properties撰写主题消息。
echo "hello world" | kafka-console-producer.sh \ --topic KAFKA_TOPIC_NAME \ --bootstrap-server $BOOTSTRAP \ --producer.config client.properties将 KAFKA_TOPIC_NAME 替换为主题名称。
从主题中消费消息。
kafka-console-consumer.sh \ --topic KAFKA_TOPIC_NAME \ --from-beginning \ --bootstrap-server $BOOTSTRAP \ --consumer.config client.properties如需停止使用消息,请按 Ctrl+C。
运行生产者性能测试。
kafka-producer-perf-test.sh \ --topic KAFKA_TOPIC_NAME \ --num-records 1000000 --throughput 1000 --print-metrics --record-size 1024 \ --producer-props bootstrap.servers=$BOOTSTRAP \ --producer.config client.properties
清理
为避免因本页中使用的资源导致您的 Google Cloud 账号产生费用,请按照以下步骤操作。
控制台
gcloud
如需删除虚拟机,请使用
gcloud compute instances delete命令。gcloud compute instances delete VM_NAME --zone=ZONE如需删除 Kafka 集群,请使用
gcloud managed-kafka clusters delete命令。gcloud managed-kafka clusters delete CLUSTER_ID \ --location=REGION --async