更新 Google Cloud Managed Service for Apache Kafka 消費者群組

您可以更新 Google Cloud Managed Service for Apache Kafka 消費者群組,修改主題分割區清單的偏移。這樣一來,您就能控管群組中的消費者會收到哪些訊息。

如要更新消費者群組,可以使用 Google Cloud CLI、用戶端程式庫、Managed Kafka API 或開放原始碼 Apache Kafka API。 Google Cloud 控制台不支援編輯消費者群組。

事前準備

如要更新消費者群組,請先確認該群組未主動取用訊息。如果取用者群組從未取用任何訊息,或最後一次提交的偏移量在 offsets.retention.minutes 後過期,Kafka 就會自動刪除該群組。

更新消費者群組前,請先按照下列步驟操作:

  1. 從取用端群組讀取訊息的主題傳送一些訊息。

  2. 啟動取用端群組,處理幾則訊息。

  3. 停止所有消費者取用訊息。如要停止取用者,請按下 Control+C

如要進一步瞭解如何傳送及接收訊息,請參閱「使用 Kafka 指令列工具產生及接收訊息」。

更新消費者群組所需的角色和權限

如要取得編輯消費者群組所需的權限,請要求管理員授予您專案的代管 Kafka 消費者群組編輯者 (roles/managedkafka.consumerGroupEditor) IAM 角色。如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和組織的存取權」。

這個預先定義的角色具備編輯消費者群組所需的權限。如要查看確切的必要權限,請展開「Required permissions」(必要權限) 部分:

所需權限

如要編輯消費者群組,您必須具備下列權限:

  • 更新消費者群組: managedkafka.consumerGroups.update

您或許還可透過自訂角色或其他預先定義的角色取得這些權限。

如要進一步瞭解 Managed Kafka Consumer Group Editor 角色,請參閱「Managed Service for Apache Kafka 預先定義角色」。

授予服務代理「讀取」存取權

如要更新消費者群組偏移,服務代理必須具備主題和消費者群組資源的 READ 作業存取權。這項存取權是透過 Apache Kafka ACL 設定。

如果您尚未為叢集內的消費者群組及其主題設定任何 Apache Kafka ACL,服務代理程式就能存取這些資源。你可以略過這個部分。

如果叢集內已為消費者群組及其主題設定 Apache Kafka ACL,服務代理必須具備這兩項資源的 READ 作業 ACL 存取權。如要這麼做,請新增 ACL 項目,授予服務代理人相關消費者群組和主題的 READ 作業存取權。步驟如下:

  1. Install the Google Cloud CLI.

  2. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. 執行 gcloud managed-kafka acls add-acl-entry 指令:

    gcloud managed-kafka acls add-acl-entry CONSUMER_GROUP_ACL_ID \
      --cluster=CLUSTER_ID \
      --location=LOCATION \
      --principal=User:__AUTH_TOKEN__service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com \
      --operation=READ
      --permission-type=ALLOW
      --host=*
    
    
      gcloud managed-kafka acls add-acl-entry TOPIC_ACL_ID \
      --cluster=CLUSTER_ID \
      --location=LOCATION \
      --principal=User:__AUTH_TOKEN__service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com \
      --operation=READ
      --permission-type=ALLOW
      --host=*

    更改下列內容:

    • CONSUMER_GROUP_ACL_ID (必要):Managed Service for Apache Kafka ACL 資源的專屬 ID,您要在該資源中為消費者群組新增 ACL 項目。如要將存取權套用至所有用戶群組,請使用 `allConsumerGroups`。如要套用至特定用戶群組,請使用 `consumerGroup/CONSUMER_GROUP_NAME`。
    • TOPIC_ACL_ID (必要):您要為主題新增 ACL 項目,該項目的 Managed Service for Apache Kafka ACL 資源專屬 ID。如要將存取權套用至所有主題,請使用 `allTopics`。如要套用至特定主題,請使用 `topic/TOPIC_NAME`。
    • CLUSTER_ID (必要):包含 ACL 資源的叢集 ID。
    • LOCATION (必要):叢集所在的區域。請參閱「支援的位置」。
    • PROJECT_NUMBER (必要):叢集所在專案的專案編號。這項資訊用於建構 ACL 項目服務代理程式的主體名稱。

如要進一步瞭解如何新增 ACL 項目,請參閱「新增 ACL 項目」。

更新消費者群組

請確認您已完成「事前準備」一節中的步驟。

如要更新消費者群組,請按照下列步驟操作:

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 執行 gcloud managed-kafka consumer-groups update 指令:

    gcloud managed-kafka consumer-groups update CONSUMER_GROUP_ID \
        --cluster=CLUSTER_ID \
        --location=LOCATION \
        --topics-file=TOPICS_FILE

    更改下列內容:

    • CLUSTER_ID:叢集的 ID 或名稱。

    • LOCATION:叢集位置。

    • CONSUMER_GROUP_ID:消費者群組的 ID 或名稱。

    • TOPICS_FILE:這項設定會指定檔案位置,其中包含要為消費者群組更新的主題設定。檔案可為 JSON 或 YAML 格式。可以是檔案路徑,也可以直接加入 JSON 或 YAML 內容。

      主題檔案會使用 JSON 結構,以 ConsumerGroup 主題地圖的形式表示 { topicName1: {ConsumerPartitionMetadata}, topicName2:{ConsumerPartitionMetadata}}ConsumerPartitionMetadata 會針對每個主題提供每個分區的位移和中繼資料。

      如要將主題 (名為 topic1) 中單一分割區 (分割區 0) 的位移設為 10,JSON 設定會如下所示:{"topic1": {"partitions": { 0 : { "offset": 10, "metadata": ""}}}}

      以下是 topics.json 檔案內容的範例:

      {
        "projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_NAME/TOPICS/TOPIC_NAME": {
          "partitions": {
            "1": {
              "offset": "1",
              "metadata": "metadata"
            },
            "2": {
              "offset": "1",
              "metadata": "metadata"
            }
          }
        },
        "projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_NAME/TOPICS/OTHER_TOPIC_NAME": {
          "partitions": {
            "1": {
              "offset": "1",
              "metadata": "metadata"
            }
          }
        }
      }

    • TOPIC_PATH:在 JSON 或 YAML 檔案中指定主題時,請加入完整主題路徑,可透過執行 gcloud managed-kafak topics describe 指令取得,格式為 projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_ID/topics/topic。。

  3. Go

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    	"google.golang.org/protobuf/types/known/fieldmaskpb"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func updateConsumerGroup(w io.Writer, projectID, region, clusterID, consumerGroupID, topicPath string, partitionOffsets map[int32]int64, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// clusterID := "my-cluster"
    	// consumerGroupID := "my-consumer-group"
    	// topicPath := "my-topic-path"
    	// partitionOffsets := map[int32]int64{1: 10, 2: 20, 3: 30}
    	ctx := context.Background()
    	client, err := managedkafka.NewClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
    	}
    	defer client.Close()
    
    	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
    	consumerGroupPath := fmt.Sprintf("%s/consumerGroups/%s", clusterPath, consumerGroupID)
    
    	partitionMetadata := make(map[int32]*managedkafkapb.ConsumerPartitionMetadata)
    	for partition, offset := range partitionOffsets {
    		partitionMetadata[partition] = &managedkafkapb.ConsumerPartitionMetadata{
    			Offset: offset,
    		}
    	}
    	topicConfig := map[string]*managedkafkapb.ConsumerTopicMetadata{
    		topicPath: {
    			Partitions: partitionMetadata,
    		},
    	}
    	consumerGroupConfig := managedkafkapb.ConsumerGroup{
    		Name:   consumerGroupPath,
    		Topics: topicConfig,
    	}
    	paths := []string{"topics"}
    	updateMask := &fieldmaskpb.FieldMask{
    		Paths: paths,
    	}
    
    	req := &managedkafkapb.UpdateConsumerGroupRequest{
    		UpdateMask:    updateMask,
    		ConsumerGroup: &consumerGroupConfig,
    	}
    	consumerGroup, err := client.UpdateConsumerGroup(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.UpdateConsumerGroup got err: %w", err)
    	}
    	fmt.Fprintf(w, "Updated consumer group: %#v\n", consumerGroup)
    	return nil
    }
    

    Java

    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ConsumerGroup;
    import com.google.cloud.managedkafka.v1.ConsumerGroupName;
    import com.google.cloud.managedkafka.v1.ConsumerPartitionMetadata;
    import com.google.cloud.managedkafka.v1.ConsumerTopicMetadata;
    import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
    import com.google.cloud.managedkafka.v1.TopicName;
    import com.google.cloud.managedkafka.v1.UpdateConsumerGroupRequest;
    import com.google.protobuf.FieldMask;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    public class UpdateConsumerGroup {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-cluster";
        String topicId = "my-topic";
        String consumerGroupId = "my-consumer-group";
        Map<Integer, Integer> partitionOffsets =
            new HashMap<Integer, Integer>() {
              {
                put(1, 10);
                put(2, 20);
                put(3, 30);
              }
            };
        updateConsumerGroup(projectId, region, clusterId, topicId, consumerGroupId, partitionOffsets);
      }
    
      public static void updateConsumerGroup(
          String projectId,
          String region,
          String clusterId,
          String topicId,
          String consumerGroupId,
          Map<Integer, Integer> partitionOffsets)
          throws Exception {
        TopicName topicName = TopicName.of(projectId, region, clusterId, topicId);
        ConsumerGroupName consumerGroupName =
            ConsumerGroupName.of(projectId, region, clusterId, consumerGroupId);
    
        Map<Integer, ConsumerPartitionMetadata> partitions =
            new HashMap<Integer, ConsumerPartitionMetadata>() {
              {
                for (Entry<Integer, Integer> partitionOffset : partitionOffsets.entrySet()) {
                  ConsumerPartitionMetadata partitionMetadata =
                      ConsumerPartitionMetadata.newBuilder()
                          .setOffset(partitionOffset.getValue())
                          .build();
                  put(partitionOffset.getKey(), partitionMetadata);
                }
              }
            };
        ConsumerTopicMetadata topicMetadata =
            ConsumerTopicMetadata.newBuilder().putAllPartitions(partitions).build();
        ConsumerGroup consumerGroup =
            ConsumerGroup.newBuilder()
                .setName(consumerGroupName.toString())
                .putTopics(topicName.toString(), topicMetadata)
                .build();
        FieldMask updateMask = FieldMask.newBuilder().addPaths("topics").build();
    
        try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) {
          UpdateConsumerGroupRequest request =
              UpdateConsumerGroupRequest.newBuilder()
                  .setUpdateMask(updateMask)
                  .setConsumerGroup(consumerGroup)
                  .build();
          // This operation is being handled synchronously.
          ConsumerGroup response = managedKafkaClient.updateConsumerGroup(request);
          System.out.printf("Updated consumer group: %s\n", response.getName());
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaClient.updateConsumerGroup got err: %s", e.getMessage());
        }
      }
    }
    

    Python

    from google.api_core.exceptions import NotFound
    from google.cloud import managedkafka_v1
    from google.protobuf import field_mask_pb2
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # cluster_id = "my-cluster"
    # consumer_group_id = "my-consumer-group"
    # topic_path = "my-topic-path"
    # partition_offsets = {10: 10}
    
    client = managedkafka_v1.ManagedKafkaClient()
    
    consumer_group = managedkafka_v1.ConsumerGroup()
    consumer_group.name = client.consumer_group_path(
        project_id, region, cluster_id, consumer_group_id
    )
    
    topic_metadata = managedkafka_v1.ConsumerTopicMetadata()
    for partition, offset in partition_offsets.items():
        partition_metadata = managedkafka_v1.ConsumerPartitionMetadata(offset=offset)
        topic_metadata.partitions[partition] = partition_metadata
    consumer_group.topics = {
        topic_path: topic_metadata,
    }
    
    update_mask = field_mask_pb2.FieldMask()
    update_mask.paths.append("topics")
    
    request = managedkafka_v1.UpdateConsumerGroupRequest(
        update_mask=update_mask,
        consumer_group=consumer_group,
    )
    
    try:
        response = client.update_consumer_group(request=request)
        print("Updated consumer group:", response)
    except NotFound as e:
        print(f"Failed to update consumer group {consumer_group_id} with error: {e.message}")
    

後續步驟

Apache Kafka® 是 The Apache Software Foundation 或其關聯企業在美國與/或其他國家/地區的註冊商標。