您可以更新 Google Cloud Managed Service for Apache Kafka 消费者群组,以修改主题分区列表的偏移量。这样,您就可以控制群组中的消费者接收哪些消息。
如需更新消费群组,您可以使用 Google Cloud CLI、客户端库、Managed Kafka API 或开源 Apache Kafka API。 Google Cloud 控制台不支持修改使用方群组。
准备工作
如需更新消费群组,请先确保该群组未在主动消费消息。如果某个消费群组从未消费过任何消息,或者在上次提交的偏移量过期后经过了 offsets.retention.minutes,Kafka 会自动删除该消费群组。
在更新消费群组之前,请按以下步骤操作:
向您的消费者群组正在从中读取消息的主题发送一些消息。
启动您的使用方群组以处理一些消息。
停止所有消费者消费消息。如需停止消费者,请按 Control+C。
如需详细了解如何发送和使用消息,请参阅使用 Kafka 命令行工具生成和使用消息。
更新消费群组所需的角色和权限
如需获得修改消费群组所需的权限,请让您的管理员为您授予项目的 Managed Kafka Consumer Group Editor (roles/managedkafka.consumerGroupEditor) IAM 角色。
如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限。
此预定义角色包含修改消费群组所需的权限。如需查看所需的确切权限,请展开所需权限部分:
所需权限
如需修改消费群组,您需要具备以下权限:
-
更新了消费群体:
managedkafka.consumerGroups.update
如需详细了解“Managed Kafka Consumer Group Editor”角色,请参阅 Managed Service for Apache Kafka 预定义角色。
向服务代理授予读取权限
为了更新消费群组偏移量,服务代理需要对主题和消费群组资源具有 READ 操作权限。此访问权限通过 Apache Kafka ACL 进行配置。
如果您未在集群中为消费者群组及其主题配置任何 Apache Kafka ACL,则服务代理对这些资源具有环境访问权限。您可以跳过此部分。
如果为集群中的使用方群组及其主题配置了 Apache Kafka ACL,则服务代理需要对这两个资源具有明确的读取操作 ACL 访问权限。为此,请添加 ACL 条目,以授予服务代理对相关消费群组和主题的 READ 操作权限。请按照以下步骤操作:
-
Install the Google Cloud CLI.
-
If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.
-
To initialize the gcloud CLI, run the following command:
gcloud init 运行
gcloud managed-kafka acls add-acl-entry命令:gcloud managed-kafka acls add-acl-entry CONSUMER_GROUP_ACL_ID \ --cluster=CLUSTER_ID \ --location=LOCATION \ --principal=User:__AUTH_TOKEN__service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com \ --operation=READ --permission-type=ALLOW --host=* gcloud managed-kafka acls add-acl-entry TOPIC_ACL_ID \ --cluster=CLUSTER_ID \ --location=LOCATION \ --principal=User:__AUTH_TOKEN__service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com \ --operation=READ --permission-type=ALLOW --host=*
替换以下内容:
CONSUMER_GROUP_ACL_ID(必需):您要向其添加消费者群组 ACL 条目的 Managed Service for Apache Kafka ACL 资源的唯一 ID。如需将访问权限应用于所有使用方群组,请使用 `allConsumerGroups`。如需应用于特定使用方群组,请使用 `consumerGroup/CONSUMER_GROUP_NAME`。TOPIC_ACL_ID(必需):您要向其添加主题 ACL 条目的 Managed Service for Apache Kafka ACL 资源的唯一 ID。如需将访问权限应用于所有主题,请使用 `allTopics`。如需应用于特定主题,请使用 `topic/TOPIC_NAME`。CLUSTER_ID(必需):包含 ACL 资源的集群的 ID。LOCATION(必需):集群所在的区域。请参阅支持的地区。PROJECT_NUMBER(必需):集群所在项目的项目编号。用于为 ACL 条目构建服务代理的正文名称。
如需详细了解如何添加 ACL 条目,请参阅添加 ACL 条目。
更新消费群组
确保您已完成准备工作部分中的步骤。
如需更新消费群组,请按以下步骤操作:
gcloud
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
运行
gcloud managed-kafka consumer-groups update命令:gcloud managed-kafka consumer-groups update CONSUMER_GROUP_ID \ --cluster=CLUSTER_ID \ --location=LOCATION \ --topics-file=TOPICS_FILE
替换以下内容:
-
CLUSTER_ID:集群的 ID 或名称。
-
LOCATION:集群的位置。
-
CONSUMER_GROUP_ID:消费群组的 ID 或名称。
-
TOPICS_FILE:此设置用于指定包含要为消费者群组更新的主题配置的文件位置。该文件可以是 JSON 或 YAML 格式。它可以是文件路径,也可以直接包含 JSON 或 YAML 内容。
主题文件使用 JSON 结构来表示
ConsumerGroup主题映射,格式为{ topicName1: {ConsumerPartitionMetadata}, topicName2:{ConsumerPartitionMetadata}}。对于每个主题,ConsumerPartitionMetadata会提供每个分区的偏移量和元数据。如需将名为
topic1的主题中单个分区(分区 0)的偏移量设置为 10,JSON 配置应如下所示:{"topic1": {"partitions": { 0 : { "offset": 10, "metadata": ""}}}}以下是
topics.json文件内容的示例:{ "projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_NAME/TOPICS/TOPIC_NAME": { "partitions": { "1": { "offset": "1", "metadata": "metadata" }, "2": { "offset": "1", "metadata": "metadata" } } }, "projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_NAME/TOPICS/OTHER_TOPIC_NAME": { "partitions": { "1": { "offset": "1", "metadata": "metadata" } } } }
-
TOPIC_PATH:在 JSON 或 YAML 文件中指定主题时,请包含完整的主题路径,该路径可通过运行
gcloud managed-kafak topics describe命令获取,格式为projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_ID/topics/topic。。
-