更新 Google Cloud Managed Service for Apache Kafka 消费者群组

您可以更新 Google Cloud Managed Service for Apache Kafka 消费者群组,以修改主题分区列表的偏移量。这样,您就可以控制群组中的消费者接收哪些消息。

如需更新消费群组,您可以使用 Google Cloud CLI、客户端库、Managed Kafka API 或开源 Apache Kafka API。 Google Cloud 控制台不支持修改使用方群组。

准备工作

如需更新消费群组,请先确保该群组未在主动消费消息。如果某个消费群组从未消费过任何消息,或者在上次提交的偏移量过期后经过了 offsets.retention.minutes,Kafka 会自动删除该消费群组。

在更新消费群组之前,请按以下步骤操作:

  1. 向您的消费者群组正在从中读取消息的主题发送一些消息。

  2. 启动您的使用方群组以处理一些消息。

  3. 停止所有消费者消费消息。如需停止消费者,请按 Control+C

如需详细了解如何发送和使用消息,请参阅使用 Kafka 命令行工具生成和使用消息

更新消费群组所需的角色和权限

如需获得修改消费群组所需的权限,请让您的管理员为您授予项目的 Managed Kafka Consumer Group Editor (roles/managedkafka.consumerGroupEditor) IAM 角色。 如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

此预定义角色包含修改消费群组所需的权限。如需查看所需的确切权限,请展开所需权限部分:

所需权限

如需修改消费群组,您需要具备以下权限:

  • 更新了消费群体: managedkafka.consumerGroups.update

您也可以使用自定义角色或其他预定义角色来获取这些权限。

如需详细了解“Managed Kafka Consumer Group Editor”角色,请参阅 Managed Service for Apache Kafka 预定义角色

向服务代理授予读取权限

为了更新消费群组偏移量,服务代理需要对主题和消费群组资源具有 READ 操作权限。此访问权限通过 Apache Kafka ACL 进行配置。

如果您未在集群中为消费者群组及其主题配置任何 Apache Kafka ACL,则服务代理对这些资源具有环境访问权限。您可以跳过此部分。

如果为集群中的使用方群组及其主题配置了 Apache Kafka ACL,则服务代理需要对这两个资源具有明确的读取操作 ACL 访问权限。为此,请添加 ACL 条目,以授予服务代理对相关消费群组和主题的 READ 操作权限。请按照以下步骤操作:

  1. Install the Google Cloud CLI.

  2. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. 运行 gcloud managed-kafka acls add-acl-entry 命令:

    gcloud managed-kafka acls add-acl-entry CONSUMER_GROUP_ACL_ID \
      --cluster=CLUSTER_ID \
      --location=LOCATION \
      --principal=User:__AUTH_TOKEN__service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com \
      --operation=READ
      --permission-type=ALLOW
      --host=*
    
    
      gcloud managed-kafka acls add-acl-entry TOPIC_ACL_ID \
      --cluster=CLUSTER_ID \
      --location=LOCATION \
      --principal=User:__AUTH_TOKEN__service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com \
      --operation=READ
      --permission-type=ALLOW
      --host=*

    替换以下内容:

    • CONSUMER_GROUP_ACL_ID(必需):您要向其添加消费者群组 ACL 条目的 Managed Service for Apache Kafka ACL 资源的唯一 ID。如需将访问权限应用于所有使用方群组,请使用 `allConsumerGroups`。如需应用于特定使用方群组,请使用 `consumerGroup/CONSUMER_GROUP_NAME`。
    • TOPIC_ACL_ID(必需):您要向其添加主题 ACL 条目的 Managed Service for Apache Kafka ACL 资源的唯一 ID。如需将访问权限应用于所有主题,请使用 `allTopics`。如需应用于特定主题,请使用 `topic/TOPIC_NAME`。
    • CLUSTER_ID(必需):包含 ACL 资源的集群的 ID。
    • LOCATION(必需):集群所在的区域。请参阅支持的地区
    • PROJECT_NUMBER(必需):集群所在项目的项目编号。用于为 ACL 条目构建服务代理的正文名称。

如需详细了解如何添加 ACL 条目,请参阅添加 ACL 条目

更新消费群组

确保您已完成准备工作部分中的步骤。

如需更新消费群组,请按以下步骤操作:

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 运行 gcloud managed-kafka consumer-groups update 命令:

    gcloud managed-kafka consumer-groups update CONSUMER_GROUP_ID \
        --cluster=CLUSTER_ID \
        --location=LOCATION \
        --topics-file=TOPICS_FILE

    替换以下内容:

    • CLUSTER_ID:集群的 ID 或名称。

    • LOCATION:集群的位置。

    • CONSUMER_GROUP_ID:消费群组的 ID 或名称。

    • TOPICS_FILE:此设置用于指定包含要为消费者群组更新的主题配置的文件位置。该文件可以是 JSON 或 YAML 格式。它可以是文件路径,也可以直接包含 JSON 或 YAML 内容。

      主题文件使用 JSON 结构来表示 ConsumerGroup 主题映射,格式为 { topicName1: {ConsumerPartitionMetadata}, topicName2:{ConsumerPartitionMetadata}}。对于每个主题,ConsumerPartitionMetadata 会提供每个分区的偏移量和元数据。

      如需将名为 topic1 的主题中单个分区(分区 0)的偏移量设置为 10,JSON 配置应如下所示:{"topic1": {"partitions": { 0 : { "offset": 10, "metadata": ""}}}}

      以下是 topics.json 文件内容的示例:

      {
        "projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_NAME/TOPICS/TOPIC_NAME": {
          "partitions": {
            "1": {
              "offset": "1",
              "metadata": "metadata"
            },
            "2": {
              "offset": "1",
              "metadata": "metadata"
            }
          }
        },
        "projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_NAME/TOPICS/OTHER_TOPIC_NAME": {
          "partitions": {
            "1": {
              "offset": "1",
              "metadata": "metadata"
            }
          }
        }
      }

    • TOPIC_PATH:在 JSON 或 YAML 文件中指定主题时,请包含完整的主题路径,该路径可通过运行 gcloud managed-kafak topics describe 命令获取,格式为 projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_ID/topics/topic。。

  3. Go

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    	"google.golang.org/protobuf/types/known/fieldmaskpb"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func updateConsumerGroup(w io.Writer, projectID, region, clusterID, consumerGroupID, topicPath string, partitionOffsets map[int32]int64, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// clusterID := "my-cluster"
    	// consumerGroupID := "my-consumer-group"
    	// topicPath := "my-topic-path"
    	// partitionOffsets := map[int32]int64{1: 10, 2: 20, 3: 30}
    	ctx := context.Background()
    	client, err := managedkafka.NewClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
    	}
    	defer client.Close()
    
    	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
    	consumerGroupPath := fmt.Sprintf("%s/consumerGroups/%s", clusterPath, consumerGroupID)
    
    	partitionMetadata := make(map[int32]*managedkafkapb.ConsumerPartitionMetadata)
    	for partition, offset := range partitionOffsets {
    		partitionMetadata[partition] = &managedkafkapb.ConsumerPartitionMetadata{
    			Offset: offset,
    		}
    	}
    	topicConfig := map[string]*managedkafkapb.ConsumerTopicMetadata{
    		topicPath: {
    			Partitions: partitionMetadata,
    		},
    	}
    	consumerGroupConfig := managedkafkapb.ConsumerGroup{
    		Name:   consumerGroupPath,
    		Topics: topicConfig,
    	}
    	paths := []string{"topics"}
    	updateMask := &fieldmaskpb.FieldMask{
    		Paths: paths,
    	}
    
    	req := &managedkafkapb.UpdateConsumerGroupRequest{
    		UpdateMask:    updateMask,
    		ConsumerGroup: &consumerGroupConfig,
    	}
    	consumerGroup, err := client.UpdateConsumerGroup(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.UpdateConsumerGroup got err: %w", err)
    	}
    	fmt.Fprintf(w, "Updated consumer group: %#v\n", consumerGroup)
    	return nil
    }
    

    Java

    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ConsumerGroup;
    import com.google.cloud.managedkafka.v1.ConsumerGroupName;
    import com.google.cloud.managedkafka.v1.ConsumerPartitionMetadata;
    import com.google.cloud.managedkafka.v1.ConsumerTopicMetadata;
    import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
    import com.google.cloud.managedkafka.v1.TopicName;
    import com.google.cloud.managedkafka.v1.UpdateConsumerGroupRequest;
    import com.google.protobuf.FieldMask;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    public class UpdateConsumerGroup {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-cluster";
        String topicId = "my-topic";
        String consumerGroupId = "my-consumer-group";
        Map<Integer, Integer> partitionOffsets =
            new HashMap<Integer, Integer>() {
              {
                put(1, 10);
                put(2, 20);
                put(3, 30);
              }
            };
        updateConsumerGroup(projectId, region, clusterId, topicId, consumerGroupId, partitionOffsets);
      }
    
      public static void updateConsumerGroup(
          String projectId,
          String region,
          String clusterId,
          String topicId,
          String consumerGroupId,
          Map<Integer, Integer> partitionOffsets)
          throws Exception {
        TopicName topicName = TopicName.of(projectId, region, clusterId, topicId);
        ConsumerGroupName consumerGroupName =
            ConsumerGroupName.of(projectId, region, clusterId, consumerGroupId);
    
        Map<Integer, ConsumerPartitionMetadata> partitions =
            new HashMap<Integer, ConsumerPartitionMetadata>() {
              {
                for (Entry<Integer, Integer> partitionOffset : partitionOffsets.entrySet()) {
                  ConsumerPartitionMetadata partitionMetadata =
                      ConsumerPartitionMetadata.newBuilder()
                          .setOffset(partitionOffset.getValue())
                          .build();
                  put(partitionOffset.getKey(), partitionMetadata);
                }
              }
            };
        ConsumerTopicMetadata topicMetadata =
            ConsumerTopicMetadata.newBuilder().putAllPartitions(partitions).build();
        ConsumerGroup consumerGroup =
            ConsumerGroup.newBuilder()
                .setName(consumerGroupName.toString())
                .putTopics(topicName.toString(), topicMetadata)
                .build();
        FieldMask updateMask = FieldMask.newBuilder().addPaths("topics").build();
    
        try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) {
          UpdateConsumerGroupRequest request =
              UpdateConsumerGroupRequest.newBuilder()
                  .setUpdateMask(updateMask)
                  .setConsumerGroup(consumerGroup)
                  .build();
          // This operation is being handled synchronously.
          ConsumerGroup response = managedKafkaClient.updateConsumerGroup(request);
          System.out.printf("Updated consumer group: %s\n", response.getName());
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaClient.updateConsumerGroup got err: %s", e.getMessage());
        }
      }
    }
    

    Python

    from google.api_core.exceptions import NotFound
    from google.cloud import managedkafka_v1
    from google.protobuf import field_mask_pb2
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # cluster_id = "my-cluster"
    # consumer_group_id = "my-consumer-group"
    # topic_path = "my-topic-path"
    # partition_offsets = {10: 10}
    
    client = managedkafka_v1.ManagedKafkaClient()
    
    consumer_group = managedkafka_v1.ConsumerGroup()
    consumer_group.name = client.consumer_group_path(
        project_id, region, cluster_id, consumer_group_id
    )
    
    topic_metadata = managedkafka_v1.ConsumerTopicMetadata()
    for partition, offset in partition_offsets.items():
        partition_metadata = managedkafka_v1.ConsumerPartitionMetadata(offset=offset)
        topic_metadata.partitions[partition] = partition_metadata
    consumer_group.topics = {
        topic_path: topic_metadata,
    }
    
    update_mask = field_mask_pb2.FieldMask()
    update_mask.paths.append("topics")
    
    request = managedkafka_v1.UpdateConsumerGroupRequest(
        update_mask=update_mask,
        consumer_group=consumer_group,
    )
    
    try:
        response = client.update_consumer_group(request=request)
        print("Updated consumer group:", response)
    except NotFound as e:
        print(f"Failed to update consumer group {consumer_group_id} with error: {e.message}")
    

后续步骤

Apache Kafka® 是 Apache Software Foundation 或其关联公司在美国和/或其他国家/地区的注册商标。