刪除 Google Cloud Managed Service for Apache Kafka 消費者群組

刪除消費者群組會從 Managed Service for Apache Kafka 叢集中移除該群組。

如要刪除消費者群組,可以使用 Google Cloud 控制台、Google Cloud CLI、用戶端程式庫、Managed Kafka API 或開放原始碼 Apache Kafka API。

刪除消費者群組所需的角色和權限

如要取得刪除消費者群組所需的權限,請要求管理員授予您專案的代管 Kafka 消費者群組編輯者 (roles/managedkafka.consumerGroupEditor) IAM 角色。如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和組織的存取權」。

這個預先定義的角色具備刪除消費者群組所需的權限。如要查看確切的必要權限,請展開「Required permissions」(必要權限) 部分:

所需權限

如要刪除消費者群組,您必須具備下列權限:

  • 刪除消費者群組: managedkafka.consumerGroups.delete

您或許還可透過自訂角色或其他預先定義的角色取得這些權限。

如要進一步瞭解 Managed Kafka Consumer Group Editor 角色,請參閱「Managed Service for Apache Kafka 預先定義角色」。

刪除用戶群組

刪除用戶群組後,系統會永久遺失儲存的消費者偏移。您也無法在控制台中查看消費群組的記錄和指標。不過,與消費者群組相關聯的指標和記錄會保留下來,並可使用 Logs Explorer 存取。 刪除用戶群組也不會刪除該群組已取用的訊息。這些訊息仍會顯示在原本相關聯的主題中。

如要刪除消費者群組,請按照下列步驟操作:

控制台

  1. 前往 Google Cloud 控制台的「Cluster」(叢集) 頁面。

    前往「Clusters」(叢集) 頁面

  2. 在叢集清單中,按一下要刪除的消費者群組所屬叢集。

    「叢集詳細資料」頁面隨即開啟。

  3. 按一下要刪除的消費者群組。
  4. 在「Consumer group details」(消費者群組詳細資料) 頁面中,按一下「Delete」(刪除)
  5. 確認作業。

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 執行 gcloud managed-kafka consumer-groups delete 指令:

    gcloud managed-kafka consumer-groups delete CONSUMER_GROUP_ID \
        --cluster=CLUSTER_ID \
        --location=LOCATION

    這個指令會從 Managed Service for Apache Kafka 叢集永久移除消費者群組。

    更改下列內容:

    • CONSUMER_GROUP_ID:消費者群組的 ID 或名稱。

    • CLUSTER_ID:叢集的 ID 或名稱。

    • LOCATION:叢集位置。

  3. Go

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func deleteConsumerGroup(w io.Writer, projectID, region, clusterID, consumerGroupID string, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// clusterID := "my-cluster"
    	// consumerGroupID := "my-consumer-group"
    	ctx := context.Background()
    	client, err := managedkafka.NewClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
    	}
    	defer client.Close()
    
    	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
    	consumerGroupPath := fmt.Sprintf("%s/consumerGroups/%s", clusterPath, consumerGroupID)
    	req := &managedkafkapb.DeleteConsumerGroupRequest{
    		Name: consumerGroupPath,
    	}
    	if err := client.DeleteConsumerGroup(ctx, req); err != nil {
    		return fmt.Errorf("client.DeleteConsumerGroup got err: %w", err)
    	}
    	fmt.Fprint(w, "Deleted consumer group\n")
    	return nil
    }
    

    Java

    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ConsumerGroupName;
    import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
    import java.io.IOException;
    
    public class DeleteConsumerGroup {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-cluster";
        String consumerGroupId = "my-consumer-group";
        deleteConsumerGroup(projectId, region, clusterId, consumerGroupId);
      }
    
      public static void deleteConsumerGroup(
          String projectId, String region, String clusterId, String consumerGroupId) throws Exception {
        try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) {
          // This operation is being handled synchronously.
          managedKafkaClient.deleteConsumerGroup(
              ConsumerGroupName.of(projectId, region, clusterId, consumerGroupId));
          System.out.println("Deleted consumer group");
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaClient.getConsumerGroup got err: %s", e.getMessage());
        }
      }
    }
    

    Python

    from google.api_core.exceptions import NotFound
    from google.cloud import managedkafka_v1
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # cluster_id = "my-cluster"
    # consumer_group_id = "my-consumer-group"
    
    client = managedkafka_v1.ManagedKafkaClient()
    
    consumer_group_path = client.consumer_group_path(
        project_id, region, cluster_id, consumer_group_id
    )
    request = managedkafka_v1.DeleteConsumerGroupRequest(
        name=consumer_group_path,
    )
    
    try:
        client.delete_consumer_group(request=request)
        print("Deleted consumer group")
    except NotFound as e:
        print(f"Failed to delete consumer group {consumer_group_id} with error: {e.message}")
    

後續步驟

Apache Kafka® 是 The Apache Software Foundation 或其關聯企業在美國與/或其他國家/地區的註冊商標。