更新 Google Cloud Managed Service for Apache Kafka 主題

建立主題後,您可以編輯主題設定,更新下列屬性:分區數量和主題設定 (預設不會使用叢集層級已設定的屬性)。您只能增加分區數量,無法減少。

如要更新單一主題,可以使用 Google Cloud 控制台、Google Cloud CLI、用戶端程式庫、Managed Kafka API 或開放原始碼 Apache Kafka API。

編輯主題所需的角色和權限

如要取得編輯主題所需的權限,請要求管理員授予您專案的 Managed Kafka 主題編輯者(roles/managedkafka.topicEditor) IAM 角色。如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和機構的存取權」。

這個預先定義的角色具備編輯主題所需的權限。如要查看確切的必要權限,請展開「Required permissions」(必要權限) 部分:

所需權限

如要編輯主題,必須具備下列權限:

  • 更新主題: managedkafka.topics.update

您或許還可透過自訂角色或其他預先定義的角色取得這些權限。

如要進一步瞭解這個角色,請參閱「Managed Service for Apache Kafka 預先定義的角色」。

編輯主題

如要編輯主題,請按照下列步驟操作:

控制台

  1. 前往 Google Cloud 控制台的「Clusters」(叢集) 頁面。

    前往「Clusters」(叢集)

    系統會列出您在專案中建立的叢集。

  2. 按一下要編輯主題所屬的叢集。

    「叢集詳細資料」頁面隨即開啟。在叢集詳細資料頁面中,主題會列在「資源」分頁中。

  3. 按一下要編輯的主題。

    「主題詳細資料」頁面隨即開啟。

  4. 如要編輯,請按一下「編輯」

  5. 完成變更後,請按一下「儲存」

gcloud

  1. 在 Google Cloud 控制台中啟用 Cloud Shell。

    啟用 Cloud Shell

    Google Cloud 主控台底部會開啟一個 Cloud Shell 工作階段,並顯示指令列提示。Cloud Shell 是已安裝 Google Cloud CLI 的殼層環境,並已針對您目前的專案設定好相關值。工作階段可能要幾秒鐘的時間才能初始化。

  2. 執行 gcloud managed-kafka topics update 指令:

    gcloud managed-kafka topics update TOPIC_ID \
      --cluster=CLUSTER_ID \
      --location=LOCATION_ID \
      --partitions=PARTITIONS \
      --configs=CONFIGS
    

    這項指令會修改指定 Managed Service for Apache Kafka 叢集中現有主題的設定。您可以使用這項指令增加分割區數量,並更新主題層級的設定。

    更改下列內容:

    • TOPIC_ID:主題 ID。
    • CLUSTER_ID:包含主題的叢集 ID。
    • LOCATION_ID:叢集位置。
    • PARTITIONS:(選用) 主題的更新分割區數量。您只能增加分區數量,無法減少。
    • CONFIGS:(選用) 要更新的設定清單。請以半形逗號分隔鍵/值組合。例如:retention.ms=3600000,retention.bytes=10000000

REST

使用任何要求資料之前,請先修改下列項目的值:

  • PROJECT_ID:您的 Google Cloud 專案 ID
  • LOCATION:叢集位置
  • CLUSTER_ID:叢集 ID
  • TOPIC_ID:主題的 ID
  • UPDATE_MASK:要更新的欄位,以逗號分隔的完整名稱清單表示。範例:partitionCount
  • PARTITION_COUNT:主題的更新分區數量

HTTP 方法和網址:

PATCH https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC_ID?updateMask=UPDATE_MASK

JSON 要求主體:

{
  "name": "TOPIC_ID",
  "partitionCount": PARTITION_COUNT
}

請展開以下其中一個選項,以傳送要求:

您應該會收到如下的 JSON 回覆:

{
  "name": "projects/PROJECT_ID/locations/LOCATION/operations/OPERATION_ID",
  "metadata": {
    "@type": "type.googleapis.com/google.cloud.managedkafka.v1.OperationMetadata",
    "createTime": "CREATE_TIME",
    "target": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID",
    "verb": "update",
    "requestedCancellation": false,
    "apiVersion": "v1"
  },
  "done": false
}

Go

在試用這個範例之前,請先按照「 安裝用戶端程式庫」中的 Go 設定說明操作。詳情請參閱 Managed Service for Apache Kafka Go API 參考文件

如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證(ADC)。 詳情請參閱「為本機開發環境設定 ADC」。

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/option"
	"google.golang.org/protobuf/types/known/fieldmaskpb"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
)

func updateTopic(w io.Writer, projectID, region, clusterID, topicID string, partitionCount int32, configs map[string]string, opts ...option.ClientOption) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// clusterID := "my-cluster"
	// topicID := "my-topic"
	// partitionCount := 20
	// configs := map[string]string{"min.insync.replicas":"1"}
	ctx := context.Background()
	client, err := managedkafka.NewClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
	}
	defer client.Close()

	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
	topicPath := fmt.Sprintf("%s/topics/%s", clusterPath, topicID)
	TopicConfig := managedkafkapb.Topic{
		Name:           topicPath,
		PartitionCount: partitionCount,
		Configs:        configs,
	}
	paths := []string{"partition_count", "configs"}
	updateMask := &fieldmaskpb.FieldMask{
		Paths: paths,
	}

	req := &managedkafkapb.UpdateTopicRequest{
		UpdateMask: updateMask,
		Topic:      &TopicConfig,
	}
	topic, err := client.UpdateTopic(ctx, req)
	if err != nil {
		return fmt.Errorf("client.UpdateTopic got err: %w", err)
	}
	fmt.Fprintf(w, "Updated topic: %#v\n", topic)
	return nil
}

Java

在試用這個範例之前,請先按照「 安裝用戶端程式庫」中的 Java 設定操作說明進行操作。詳情請參閱 Managed Service for Apache Kafka Java API 參考文件

如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證。詳情請參閱「 為本機開發環境設定 ADC」。

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
import com.google.cloud.managedkafka.v1.Topic;
import com.google.cloud.managedkafka.v1.TopicName;
import com.google.cloud.managedkafka.v1.UpdateTopicRequest;
import com.google.protobuf.FieldMask;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

public class UpdateTopic {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String clusterId = "my-cluster";
    String topicId = "my-topic";
    int partitionCount = 200;
    Map<String, String> configs =
        new HashMap<String, String>() {
          {
            put("min.insync.replicas", "1");
          }
        };
    updateTopic(projectId, region, clusterId, topicId, partitionCount, configs);
  }

  public static void updateTopic(
      String projectId,
      String region,
      String clusterId,
      String topicId,
      int partitionCount,
      Map<String, String> configs)
      throws Exception {
    Topic topic =
        Topic.newBuilder()
            .setName(TopicName.of(projectId, region, clusterId, topicId).toString())
            .setPartitionCount(partitionCount)
            .putAllConfigs(configs)
            .build();
    String[] paths = {"partition_count", "configs"};
    FieldMask updateMask = FieldMask.newBuilder().addAllPaths(Arrays.asList(paths)).build();
    try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) {
      UpdateTopicRequest request =
          UpdateTopicRequest.newBuilder().setUpdateMask(updateMask).setTopic(topic).build();
      // This operation is being handled synchronously.
      Topic response = managedKafkaClient.updateTopic(request);
      System.out.printf("Updated topic: %s\n", response.getName());
    } catch (IOException | ApiException e) {
      System.err.printf("managedKafkaClient.updateCluster got err: %s", e.getMessage());
    }
  }
}

Python

在試用這個範例之前,請先按照「 安裝用戶端程式庫」中的 Python 設定說明操作。詳情請參閱 Managed Service for Apache Kafka Python API 參考文件

如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定 ADC」。

from google.api_core.exceptions import NotFound
from google.cloud import managedkafka_v1
from google.protobuf import field_mask_pb2

# TODO(developer)
# project_id = "my-project-id"
# region = "us-central1"
# cluster_id = "my-cluster"
# topic_id = "my-topic"
# partition_count = 20
# configs = {"min.insync.replicas": "1"}

client = managedkafka_v1.ManagedKafkaClient()

topic = managedkafka_v1.Topic()
topic.name = client.topic_path(project_id, region, cluster_id, topic_id)
topic.partition_count = partition_count
topic.configs = configs
update_mask = field_mask_pb2.FieldMask()
update_mask.paths.extend(["partition_count", "configs"])

# For a list of editable fields, one can check https://cloud.google.com/managed-kafka/docs/create-topic#properties.
request = managedkafka_v1.UpdateTopicRequest(
    update_mask=update_mask,
    topic=topic,
)

try:
    response = client.update_topic(request=request)
    print("Updated topic:", response)
except NotFound as e:
    print(f"Failed to update topic {topic_id} with error: {e.message}")

設定訊息保留功能

Kafka 會將訊息儲存在記錄區段檔案中。根據預設,Kafka 會在保留期限過後,或分區超過資料大小門檻時,刪除區隔檔案。如要變更這項行為,請啟用記錄壓縮。如果啟用記錄壓縮功能,Kafka 只會保留每個鍵的最新值。

Google Cloud Managed Service for Apache Kafka 使用分層儲存空間,也就是說,完成的記錄區段會儲存在遠端,而非本機儲存空間。如要進一步瞭解分層儲存空間,請參閱 Apache Kafka 說明文件中的「分層儲存空間」。

設定保留值

如果未啟用記錄壓縮功能,則下列設定會控管 Kafka 儲存記錄區段檔案的方式:

  • retention.ms:儲存區隔檔案的時間長度上限 (以毫秒為單位)。
  • retention.bytes:每個分割區可儲存的位元組數量上限。如果分區中的資料超過這個值,Kafka 就會捨棄較舊的區隔檔案。

如要更新這些設定,請使用 gcloud CLI 或 Kafka CLI:

gcloud

如要設定訊息保留時間,請執行 gcloud managed-kafka topics update 指令。

gcloud managed-kafka topics update TOPIC_ID \
  --cluster=CLUSTER_ID \
  --location=LOCATION_ID \
  --configs=retention.ms=RETENTION_PERIOD,retention.bytes=MAX_BYTES

更改下列內容:

  • TOPIC_ID:主題 ID。
  • CLUSTER_ID:包含主題的叢集 ID。
  • LOCATION_ID:叢集位置。
  • RETENTION_PERIOD:儲存區段檔案的時間上限 (以毫秒為單位)。
  • MAX_BYTES:每個分割區可儲存的位元組數量上限。

Kafka CLI

執行這項指令前,請先在 Compute Engine VM 上安裝 Kafka 指令列工具。VM 必須能夠連上與 Managed Service for Apache Kafka 叢集連線的子網路。請按照「 使用 Kafka 指令列工具產生及取用訊息」中的操作說明進行操作。

執行 kafka-configs.sh 指令:

kafka-configs.sh --alter \
  --bootstrap-server=BOOTSTRAP_ADDRESS \
  --command-config client.properties \
  --entity-type topics \
  --entity-name TOPIC_ID \
  --add-config retention.ms=RETENTION_PERIOD,retention.bytes=MAX_BYTES

更改下列內容:

  • BOOTSTRAP_ADDRESS:Managed Service for Apache Kafka 叢集的啟動位址
  • TOPIC_ID:主題 ID。
  • RETENTION_PERIOD:儲存區段檔案的時間上限 (以毫秒為單位)。
  • MAX_BYTES:每個分割區可儲存的位元組數量上限。

啟用記錄壓縮功能

如果啟用記錄壓縮功能,Kafka 只會儲存每個鍵的最新訊息。記錄壓縮功能預設為停用。如要為主題啟用記錄壓縮功能,請將 cleanup.policy 設定設為 "compact",如下所示:

gcloud

執行 gcloud managed-kafka topics update 指令。

gcloud managed-kafka topics update TOPIC_ID \
  --cluster=CLUSTER_ID \
  --location=LOCATION_ID \
  --configs=cleanup.policy=compact

更改下列內容:

  • TOPIC_ID:主題 ID。
  • CLUSTER_ID:包含主題的叢集 ID。
  • LOCATION_ID:叢集位置。

Kafka CLI

執行這項指令前,請先在 Compute Engine VM 上安裝 Kafka 指令列工具。VM 必須能夠連上與 Managed Service for Apache Kafka 叢集連線的子網路。請按照「 使用 Kafka 指令列工具產生及取用訊息」中的操作說明進行操作。

執行 kafka-configs.sh 指令:

kafka-configs.sh --alter \
  --bootstrap-server=BOOTSTRAP_ADDRESS \
  --command-config client.properties \
  --entity-type topics \
  --entity-name TOPIC_ID \
  --add-config cleanup.policy=compact

更改下列內容:

  • BOOTSTRAP_ADDRESS:Managed Service for Apache Kafka 叢集的啟動位址
  • TOPIC_ID:主題 ID。

限制

  • 您無法覆寫遠端儲存空間的主題設定,例如 remote.storage.enable

  • 您無法覆寫記錄區段檔案的主題設定,例如 segment.bytes

  • 為主題啟用記錄壓縮功能時,系統會隱含停用該主題的分層儲存空間。主題的所有記錄檔都會儲存在本機。

後續步驟