Memperbarui topik Google Cloud Managed Service for Apache Kafka

Setelah topik dibuat, Anda dapat mengedit konfigurasi topik untuk memperbarui properti berikut: jumlah partisi dan konfigurasi topik yang tidak ditetapkan secara default ke properti yang sudah ditetapkan di tingkat cluster. Anda hanya dapat menambah jumlah partisi, Anda tidak dapat menguranginya.

Untuk memperbarui satu topik, Anda dapat menggunakan Google Cloud konsol, Google Cloud CLI, library klien, Managed Kafka API, atau Apache Kafka API open source.

Peran dan izin yang diperlukan untuk mengedit topik

Untuk mendapatkan izin yang diperlukan guna mengedit topik, minta administrator Anda untuk memberi Anda peran IAM Managed Kafka Topic Editor(roles/managedkafka.topicEditor) di project Anda. Untuk mengetahui informasi selengkapnya tentang cara memberikan peran, lihat Mengelola akses ke project, folder, dan organisasi.

Peran bawaan ini berisi izin yang diperlukan untuk mengedit topik. Untuk melihat izin yang benar-benar diperlukan, perluas bagian Izin yang diperlukan:

Izin yang diperlukan

Izin berikut diperlukan untuk mengedit topik:

  • Perbarui topik: managedkafka.topics.update

Anda mungkin juga bisa mendapatkan izin ini dengan peran khusus atau peran bawaan lainnya.

Untuk mengetahui informasi selengkapnya tentang peran ini, lihat Peran bawaan Managed Service for Apache Kafka.

Mengedit topik

Untuk mengedit topik, ikuti langkah-langkah berikut:

Konsol

  1. Di konsol Google Cloud , buka halaman Clusters.

    Buka Cluster

    Cluster yang Anda buat dalam project akan dicantumkan.

  2. Klik cluster tempat topik yang ingin Anda edit berada.

    Halaman Cluster details akan terbuka. Di halaman detail cluster, untuk tab Resources, topik akan dicantumkan.

  3. Klik topik yang ingin Anda edit.

    Halaman Topic details akan terbuka.

  4. Untuk melakukan pengeditan, klik Edit.

  5. Klik Simpan setelah melakukan perubahan.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Jalankan perintah gcloud managed-kafka topics update:

    gcloud managed-kafka topics update TOPIC_ID \
      --cluster=CLUSTER_ID \
      --location=LOCATION_ID \
      --partitions=PARTITIONS \
      --configs=CONFIGS
    

    Perintah ini mengubah konfigurasi topik yang ada di cluster Managed Service untuk Apache Kafka yang ditentukan. Anda dapat menggunakan perintah ini untuk meningkatkan jumlah partisi dan memperbarui setelan konfigurasi tingkat topik.

    Ganti kode berikut:

    • TOPIC_ID: ID topik.
    • CLUSTER_ID: ID cluster yang berisi topik.
    • LOCATION_ID: Lokasi cluster.
    • PARTITIONS: Opsional: Jumlah partisi yang diperbarui untuk topik. Anda hanya dapat menambah jumlah partisi, dan tidak dapat menguranginya.
    • CONFIGS: Opsional: Daftar setelan konfigurasi yang akan diperbarui. Tentukan sebagai daftar pasangan nilai kunci yang dipisahkan koma. Contoh, retention.ms=3600000,retention.bytes=10000000.
  3. REST

    Sebelum menggunakan salah satu data permintaan, lakukan penggantian berikut:

    • PROJECT_ID: Project ID Google Cloud Anda
    • LOCATION: lokasi cluster
    • CLUSTER_ID: ID cluster
    • TOPIC_ID: ID topik
    • UPDATE_MASK: kolom yang akan diupdate, sebagai daftar nama yang sepenuhnya memenuhi syarat yang dipisahkan koma. Contoh: partitionCount
    • PARTITION_COUNT: jumlah partisi yang diperbarui untuk topik

    Metode HTTP dan URL:

    PATCH https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC_ID?updateMask=UPDATE_MASK

    Meminta isi JSON:

    {
      "name": "TOPIC_ID",
      "partitionCount": PARTITION_COUNT
    }
    

    Untuk mengirim permintaan Anda, perluas salah satu opsi berikut:

    Anda akan melihat respons JSON seperti berikut:

    {
      "name": "projects/PROJECT_ID/locations/LOCATION/operations/OPERATION_ID",
      "metadata": {
        "@type": "type.googleapis.com/google.cloud.managedkafka.v1.OperationMetadata",
        "createTime": "CREATE_TIME",
        "target": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID",
        "verb": "update",
        "requestedCancellation": false,
        "apiVersion": "v1"
      },
      "done": false
    }
    

    Go

    Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Go di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Managed Service for Apache Kafka Go API.

    Untuk melakukan autentikasi ke Managed Service untuk Apache Kafka, siapkan Kredensial Default Aplikasi(ADC). Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    	"google.golang.org/protobuf/types/known/fieldmaskpb"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func updateTopic(w io.Writer, projectID, region, clusterID, topicID string, partitionCount int32, configs map[string]string, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// clusterID := "my-cluster"
    	// topicID := "my-topic"
    	// partitionCount := 20
    	// configs := map[string]string{"min.insync.replicas":"1"}
    	ctx := context.Background()
    	client, err := managedkafka.NewClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
    	}
    	defer client.Close()
    
    	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
    	topicPath := fmt.Sprintf("%s/topics/%s", clusterPath, topicID)
    	TopicConfig := managedkafkapb.Topic{
    		Name:           topicPath,
    		PartitionCount: partitionCount,
    		Configs:        configs,
    	}
    	paths := []string{"partition_count", "configs"}
    	updateMask := &fieldmaskpb.FieldMask{
    		Paths: paths,
    	}
    
    	req := &managedkafkapb.UpdateTopicRequest{
    		UpdateMask: updateMask,
    		Topic:      &TopicConfig,
    	}
    	topic, err := client.UpdateTopic(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.UpdateTopic got err: %w", err)
    	}
    	fmt.Fprintf(w, "Updated topic: %#v\n", topic)
    	return nil
    }
    

    Java

    Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Java di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Managed Service for Apache Kafka Java API.

    Untuk melakukan autentikasi ke Managed Service for Apache Kafka, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.

    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
    import com.google.cloud.managedkafka.v1.Topic;
    import com.google.cloud.managedkafka.v1.TopicName;
    import com.google.cloud.managedkafka.v1.UpdateTopicRequest;
    import com.google.protobuf.FieldMask;
    import java.io.IOException;
    import java.util.Arrays;
    import java.util.HashMap;
    import java.util.Map;
    
    public class UpdateTopic {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-cluster";
        String topicId = "my-topic";
        int partitionCount = 200;
        Map<String, String> configs =
            new HashMap<String, String>() {
              {
                put("min.insync.replicas", "1");
              }
            };
        updateTopic(projectId, region, clusterId, topicId, partitionCount, configs);
      }
    
      public static void updateTopic(
          String projectId,
          String region,
          String clusterId,
          String topicId,
          int partitionCount,
          Map<String, String> configs)
          throws Exception {
        Topic topic =
            Topic.newBuilder()
                .setName(TopicName.of(projectId, region, clusterId, topicId).toString())
                .setPartitionCount(partitionCount)
                .putAllConfigs(configs)
                .build();
        String[] paths = {"partition_count", "configs"};
        FieldMask updateMask = FieldMask.newBuilder().addAllPaths(Arrays.asList(paths)).build();
        try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) {
          UpdateTopicRequest request =
              UpdateTopicRequest.newBuilder().setUpdateMask(updateMask).setTopic(topic).build();
          // This operation is being handled synchronously.
          Topic response = managedKafkaClient.updateTopic(request);
          System.out.printf("Updated topic: %s\n", response.getName());
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaClient.updateCluster got err: %s", e.getMessage());
        }
      }
    }
    

    Python

    Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Python di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Managed Service for Apache Kafka Python API.

    Untuk melakukan autentikasi ke Managed Service for Apache Kafka, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.

    from google.api_core.exceptions import NotFound
    from google.cloud import managedkafka_v1
    from google.protobuf import field_mask_pb2
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # cluster_id = "my-cluster"
    # topic_id = "my-topic"
    # partition_count = 20
    # configs = {"min.insync.replicas": "1"}
    
    client = managedkafka_v1.ManagedKafkaClient()
    
    topic = managedkafka_v1.Topic()
    topic.name = client.topic_path(project_id, region, cluster_id, topic_id)
    topic.partition_count = partition_count
    topic.configs = configs
    update_mask = field_mask_pb2.FieldMask()
    update_mask.paths.extend(["partition_count", "configs"])
    
    # For a list of editable fields, one can check https://cloud.google.com/managed-kafka/docs/create-topic#properties.
    request = managedkafka_v1.UpdateTopicRequest(
        update_mask=update_mask,
        topic=topic,
    )
    
    try:
        response = client.update_topic(request=request)
        print("Updated topic:", response)
    except NotFound as e:
        print(f"Failed to update topic {topic_id} with error: {e.message}")
    

Mengonfigurasi retensi pesan

Kafka menyimpan pesan dalam file segmen log. Secara default, Kafka menghapus file segmen setelah periode retensi atau saat partisi melebihi batas ukuran data. Anda dapat mengubah perilaku ini dengan mengaktifkan pemadatan log. Jika pemadatan log diaktifkan, Kafka hanya menyimpan nilai terbaru untuk setiap kunci.

Google Cloud Managed Service for Apache Kafka menggunakan tiered storage, yang berarti bahwa segmen log yang telah selesai disimpan dari jarak jauh, bukan di penyimpanan lokal. Untuk mempelajari lebih lanjut penyimpanan bertingkat, lihat Penyimpanan Bertingkat dalam dokumentasi Apache Kafka.

Menetapkan nilai retensi

Jika pemadatan log tidak diaktifkan, setelan berikut akan mengontrol cara Kafka menyimpan file segmen log:

  • retention.ms: Durasi maksimum untuk menyimpan file segmen, dalam milidetik.
  • retention.bytes: Jumlah maksimum byte yang akan disimpan per partisi. Jika data dalam partisi melebihi nilai ini, Kafka akan menghapus file segmen yang lebih lama.

Untuk memperbarui setelan ini, gunakan gcloud CLI atau Kafka CLI:

gcloud

Untuk menetapkan retensi pesan, jalankan perintah gcloud managed-kafka topics update.

gcloud managed-kafka topics update TOPIC_ID \
  --cluster=CLUSTER_ID \
  --location=LOCATION_ID \
  --configs=retention.ms=RETENTION_PERIOD,retention.bytes=MAX_BYTES

Ganti kode berikut:

  • TOPIC_ID: ID topik.
  • CLUSTER_ID: ID cluster yang berisi topik.
  • LOCATION_ID: Lokasi cluster.
  • RETENTION_PERIOD: Jumlah waktu maksimum untuk menyimpan file segmen, dalam milidetik.
  • MAX_BYTES: Jumlah maksimum byte yang akan disimpan, per partisi.

Kafka CLI

Sebelum menjalankan perintah ini, instal alat command line Kafka di VM Compute Engine. VM harus dapat menjangkau subnet yang terhubung ke cluster Managed Service untuk Apache Kafka. Ikuti petunjuk di Membuat dan menggunakan pesan dengan alat command line Kafka.

Jalankan perintah kafka-configs.sh:

kafka-configs.sh --alter \
  --bootstrap-server=BOOTSTRAP_ADDRESS \
  --command-config client.properties \
  --entity-type topics \
  --entity-name TOPIC_ID \
  --add-config retention.ms=RETENTION_PERIOD,retention.bytes=MAX_BYTES

Ganti kode berikut:

  • BOOTSTRAP_ADDRESS: Alamat bootstrap cluster Managed Service untuk Apache Kafka.
  • TOPIC_ID: ID topik.
  • RETENTION_PERIOD: Jumlah waktu maksimum untuk menyimpan file segmen, dalam milidetik.
  • MAX_BYTES: Jumlah maksimum byte yang akan disimpan, per partisi.

Mengaktifkan pemadatan log

Jika pemadatan log diaktifkan, Kafka hanya menyimpan pesan terbaru untuk setiap kunci. Pemadatan log dinonaktifkan secara default. Untuk mengaktifkan pemadatan log untuk topik, tetapkan konfigurasi cleanup.policy ke "compact", seperti berikut:

gcloud

Jalankan perintah gcloud managed-kafka topics update.

gcloud managed-kafka topics update TOPIC_ID \
  --cluster=CLUSTER_ID \
  --location=LOCATION_ID \
  --configs=cleanup.policy=compact

Ganti kode berikut:

  • TOPIC_ID: ID topik.
  • CLUSTER_ID: ID cluster yang berisi topik.
  • LOCATION_ID: Lokasi cluster.

Kafka CLI

Sebelum menjalankan perintah ini, instal alat command line Kafka di VM Compute Engine. VM harus dapat menjangkau subnet yang terhubung ke cluster Managed Service untuk Apache Kafka. Ikuti petunjuk di Membuat dan menggunakan pesan dengan alat command line Kafka.

Jalankan perintah kafka-configs.sh:

kafka-configs.sh --alter \
  --bootstrap-server=BOOTSTRAP_ADDRESS \
  --command-config client.properties \
  --entity-type topics \
  --entity-name TOPIC_ID \
  --add-config cleanup.policy=compact

Ganti kode berikut:

  • BOOTSTRAP_ADDRESS: Alamat bootstrap cluster Managed Service untuk Apache Kafka.
  • TOPIC_ID: ID topik.

Batasan

  • Anda tidak dapat mengganti konfigurasi topik untuk penyimpanan jarak jauh, seperti remote.storage.enable.

  • Anda tidak dapat mengganti konfigurasi topik untuk file segmen log, seperti segment.bytes.

  • Mengaktifkan pemadatan log untuk topik secara implisit menonaktifkan penyimpanan bertingkat untuk topik tersebut. Semua file log untuk topik disimpan secara lokal.

Apa langkah selanjutnya?