您可以更新 Google Cloud Managed Service for Apache Kafka 消費者群組,修改主題分割區清單的偏移。這樣一來,您就能控管群組中的消費者會收到哪些訊息。
如要更新消費者群組,可以使用 Google Cloud CLI、用戶端程式庫、Managed Kafka API 或開放原始碼 Apache Kafka API。 Google Cloud 控制台不支援編輯消費者群組。
事前準備
如要更新消費者群組,請先確認該群組未主動取用訊息。如果取用者群組從未取用任何訊息,或最後一次提交的偏移量在 offsets.retention.minutes 後過期,Kafka 就會自動刪除該群組。
更新消費者群組前,請先按照下列步驟操作:
從取用端群組讀取訊息的主題傳送一些訊息。
啟動取用端群組,處理幾則訊息。
停止所有消費者取用訊息。如要停止取用者,請按下 Control+C。
如要進一步瞭解如何傳送及接收訊息,請參閱「使用 Kafka 指令列工具產生及接收訊息」。
更新消費者群組所需的角色和權限
如要取得編輯消費者群組所需的權限,請要求管理員授予您專案的代管 Kafka 消費者群組編輯者 (roles/managedkafka.consumerGroupEditor) IAM 角色。如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和組織的存取權」。
這個預先定義的角色具備編輯消費者群組所需的權限。如要查看確切的必要權限,請展開「Required permissions」(必要權限) 部分:
所需權限
如要編輯消費者群組,您必須具備下列權限:
-
更新消費者群組:
managedkafka.consumerGroups.update
如要進一步瞭解 Managed Kafka Consumer Group Editor 角色,請參閱「Managed Service for Apache Kafka 預先定義角色」。
授予服務代理「讀取」存取權
如要更新消費者群組偏移,服務代理必須具備主題和消費者群組資源的 READ 作業存取權。這項存取權是透過 Apache Kafka ACL 設定。
如果您尚未為叢集內的消費者群組及其主題設定任何 Apache Kafka ACL,服務代理程式就能存取這些資源。你可以略過這個部分。
如果叢集內已為消費者群組及其主題設定 Apache Kafka ACL,服務代理必須具備這兩項資源的 READ 作業 ACL 存取權。如要這麼做,請新增 ACL 項目,授予服務代理人相關消費者群組和主題的 READ 作業存取權。步驟如下:
-
Install the Google Cloud CLI.
-
If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.
-
To initialize the gcloud CLI, run the following command:
gcloud init 執行
gcloud managed-kafka acls add-acl-entry指令:gcloud managed-kafka acls add-acl-entry CONSUMER_GROUP_ACL_ID \ --cluster=CLUSTER_ID \ --location=LOCATION \ --principal=User:__AUTH_TOKEN__service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com \ --operation=READ --permission-type=ALLOW --host=* gcloud managed-kafka acls add-acl-entry TOPIC_ACL_ID \ --cluster=CLUSTER_ID \ --location=LOCATION \ --principal=User:__AUTH_TOKEN__service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com \ --operation=READ --permission-type=ALLOW --host=*
更改下列內容:
CONSUMER_GROUP_ACL_ID(必要):Managed Service for Apache Kafka ACL 資源的專屬 ID,您要在該資源中為消費者群組新增 ACL 項目。如要將存取權套用至所有用戶群組,請使用 `allConsumerGroups`。如要套用至特定用戶群組,請使用 `consumerGroup/CONSUMER_GROUP_NAME`。TOPIC_ACL_ID(必要):您要為主題新增 ACL 項目,該項目的 Managed Service for Apache Kafka ACL 資源專屬 ID。如要將存取權套用至所有主題,請使用 `allTopics`。如要套用至特定主題,請使用 `topic/TOPIC_NAME`。CLUSTER_ID(必要):包含 ACL 資源的叢集 ID。LOCATION(必要):叢集所在的區域。請參閱「支援的位置」。PROJECT_NUMBER(必要):叢集所在專案的專案編號。這項資訊用於建構 ACL 項目服務代理程式的主體名稱。
如要進一步瞭解如何新增 ACL 項目,請參閱「新增 ACL 項目」。
更新消費者群組
請確認您已完成「事前準備」一節中的步驟。
如要更新消費者群組,請按照下列步驟操作:
gcloud
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
執行
gcloud managed-kafka consumer-groups update指令:gcloud managed-kafka consumer-groups update CONSUMER_GROUP_ID \ --cluster=CLUSTER_ID \ --location=LOCATION \ --topics-file=TOPICS_FILE
更改下列內容:
-
CLUSTER_ID:叢集的 ID 或名稱。
-
LOCATION:叢集位置。
-
CONSUMER_GROUP_ID:消費者群組的 ID 或名稱。
-
TOPICS_FILE:這項設定會指定檔案位置,其中包含要為消費者群組更新的主題設定。檔案可為 JSON 或 YAML 格式。可以是檔案路徑,也可以直接加入 JSON 或 YAML 內容。
主題檔案會使用 JSON 結構,以
ConsumerGroup主題地圖的形式表示{ topicName1: {ConsumerPartitionMetadata}, topicName2:{ConsumerPartitionMetadata}}。ConsumerPartitionMetadata會針對每個主題提供每個分區的位移和中繼資料。如要將主題 (名為
topic1) 中單一分割區 (分割區 0) 的位移設為 10,JSON 設定會如下所示:{"topic1": {"partitions": { 0 : { "offset": 10, "metadata": ""}}}}以下是
topics.json檔案內容的範例:{ "projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_NAME/TOPICS/TOPIC_NAME": { "partitions": { "1": { "offset": "1", "metadata": "metadata" }, "2": { "offset": "1", "metadata": "metadata" } } }, "projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_NAME/TOPICS/OTHER_TOPIC_NAME": { "partitions": { "1": { "offset": "1", "metadata": "metadata" } } } }
-
TOPIC_PATH:在 JSON 或 YAML 檔案中指定主題時,請加入完整主題路徑,可透過執行
gcloud managed-kafak topics describe指令取得,格式為projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_ID/topics/topic。。
-