删除 Google Cloud Managed Service for Apache Kafka 消费者群组

删除消费群组会从 Managed Service for Apache Kafka 集群中移除相应消费群组。

如需删除消费群组,您可以使用 Google Cloud 控制台、Google Cloud CLI、客户端库、Managed Kafka API 或开源 Apache Kafka API。

删除使用方群组所需的角色和权限

如需获得删除消费群组所需的权限,请让您的管理员为您授予项目的 Managed Kafka Consumer Group Editor (roles/managedkafka.consumerGroupEditor) IAM 角色。 如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

此预定义角色包含删除消费群组所需的权限。如需查看所需的确切权限,请展开所需权限部分:

所需权限

您需要具备以下权限才能删除消费群组:

  • 删除使用方群组: managedkafka.consumerGroups.delete

您也可以使用自定义角色或其他预定义角色来获取这些权限。

如需详细了解“Managed Kafka Consumer Group Editor”角色,请参阅 Managed Service for Apache Kafka 预定义角色

删除使用方群组

删除使用方群组后,其存储的使用方偏移量会永久丢失。您也无法在控制台中查看消费群组的日志和指标。不过,与消费群组相关联的指标和日志会保留下来,并且可以使用日志浏览器进行访问。 删除使用方群组也不会删除其已使用的消息。这些消息仍可在其最初关联的主题中查看。

如需删除消费群组,请按以下步骤操作:

控制台

  1. 在 Google Cloud 控制台中,前往集群页面。

    转到集群

  2. 在集群列表中,点击要删除的消费群组所属的集群。

    系统会打开集群详情页面。

  3. 点击要删除的消费群组。
  4. 消费者群组详情页面中,点击删除
  5. 确认操作。

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 运行 gcloud managed-kafka consumer-groups delete 命令:

    gcloud managed-kafka consumer-groups delete CONSUMER_GROUP_ID \
        --cluster=CLUSTER_ID \
        --location=LOCATION

    此命令会从您的 Managed Service for Apache Kafka 集群中永久移除某个消费者群组。

    替换以下内容:

    • CONSUMER_GROUP_ID:消费群组的 ID 或名称。

    • CLUSTER_ID:集群的 ID 或名称。

    • LOCATION:集群的位置。

  3. Go

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func deleteConsumerGroup(w io.Writer, projectID, region, clusterID, consumerGroupID string, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// clusterID := "my-cluster"
    	// consumerGroupID := "my-consumer-group"
    	ctx := context.Background()
    	client, err := managedkafka.NewClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
    	}
    	defer client.Close()
    
    	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
    	consumerGroupPath := fmt.Sprintf("%s/consumerGroups/%s", clusterPath, consumerGroupID)
    	req := &managedkafkapb.DeleteConsumerGroupRequest{
    		Name: consumerGroupPath,
    	}
    	if err := client.DeleteConsumerGroup(ctx, req); err != nil {
    		return fmt.Errorf("client.DeleteConsumerGroup got err: %w", err)
    	}
    	fmt.Fprint(w, "Deleted consumer group\n")
    	return nil
    }
    

    Java

    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ConsumerGroupName;
    import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
    import java.io.IOException;
    
    public class DeleteConsumerGroup {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-cluster";
        String consumerGroupId = "my-consumer-group";
        deleteConsumerGroup(projectId, region, clusterId, consumerGroupId);
      }
    
      public static void deleteConsumerGroup(
          String projectId, String region, String clusterId, String consumerGroupId) throws Exception {
        try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) {
          // This operation is being handled synchronously.
          managedKafkaClient.deleteConsumerGroup(
              ConsumerGroupName.of(projectId, region, clusterId, consumerGroupId));
          System.out.println("Deleted consumer group");
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaClient.getConsumerGroup got err: %s", e.getMessage());
        }
      }
    }
    

    Python

    from google.api_core.exceptions import NotFound
    from google.cloud import managedkafka_v1
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # cluster_id = "my-cluster"
    # consumer_group_id = "my-consumer-group"
    
    client = managedkafka_v1.ManagedKafkaClient()
    
    consumer_group_path = client.consumer_group_path(
        project_id, region, cluster_id, consumer_group_id
    )
    request = managedkafka_v1.DeleteConsumerGroupRequest(
        name=consumer_group_path,
    )
    
    try:
        client.delete_consumer_group(request=request)
        print("Deleted consumer group")
    except NotFound as e:
        print(f"Failed to delete consumer group {consumer_group_id} with error: {e.message}")
    

后续步骤

Apache Kafka® 是 Apache Software Foundation 或其关联公司在美国和/或其他国家/地区的注册商标。