Membuat topik Google Cloud Managed Service for Apache Kafka

Di Managed Service for Apache Kafka, pesan diatur dalam topik. Topik terdiri dari partisi. Partisi adalah urutan kumpulan data yang berurutan dan tidak dapat diubah yang dimiliki oleh satu broker dalam cluster Kafka. Anda harus membuat topik untuk memublikasikan atau menggunakan pesan.

Untuk membuat topik, Anda dapat menggunakan konsol, Google Cloud CLI, library klien, Managed Kafka API, atau Apache Kafka API open source. Google Cloud

Sebelum memulai

Anda harus membuat cluster terlebih dahulu sebelum membuat topik. Pastikan Anda telah menyiapkan hal berikut:

Peran dan izin yang diperlukan untuk membuat topik

Untuk mendapatkan izin yang diperlukan guna membuat topik, minta administrator untuk memberi Anda peran IAM Managed Kafka Topic Editor (roles/managedkafka.topicEditor) di project Anda. Untuk mengetahui informasi selengkapnya tentang cara memberikan peran, lihat Mengelola akses ke project, folder, dan organisasi.

Peran bawaan ini berisi izin yang diperlukan untuk membuat topik. Untuk melihat izin yang benar-benar diperlukan, perluas bagian Izin yang diperlukan:

Izin yang diperlukan

Izin berikut diperlukan untuk membuat topik:

  • Buat topik: managedkafka.topics.create

Anda mungkin juga bisa mendapatkan izin ini dengan peran khusus atau peran bawaan lainnya.

Peran Managed Kafka Topic Editor juga berisi peran Managed Kafka Viewer. Untuk mengetahui informasi selengkapnya tentang peran ini, lihat Peran bawaan Managed Service for Apache Kafka.

Properti topik Managed Service untuk Apache Kafka

Saat membuat atau memperbarui topik Managed Service for Apache Kafka, Anda harus menentukan properti berikut.

Nama topik

Nama topik Managed Service untuk Apache Kafka yang Anda buat. Untuk mengetahui panduan cara memberi nama topik, lihat Panduan untuk memberi nama resource Managed Service untuk Apache Kafka. Nama topik tidak dapat diubah.

Jumlah partisi

Jumlah partisi dalam topik. Anda dapat mengedit topik untuk menambah jumlah partisi topik, tetapi tidak dapat menguranginya. Meningkatkan jumlah partisi untuk topik yang menggunakan kunci dapat mengubah cara pesan didistribusikan.

Faktor replikasi

Jumlah replika untuk setiap partisi. Jika Anda tidak menentukan nilai, faktor replikasi default cluster akan digunakan.

Faktor replikasi yang lebih tinggi dapat meningkatkan konsistensi data jika terjadi kegagalan broker, karena data direplikasi ke beberapa broker. Untuk lingkungan produksi, sebaiknya gunakan faktor replikasi 3 atau lebih tinggi. Jumlah replika yang lebih tinggi akan meningkatkan biaya penyimpanan lokal dan transfer data untuk topik. Namun, hal ini tidak meningkatkan biaya penyimpanan persisten. Faktor replikasi tidak boleh melebihi jumlah broker yang tersedia.

Parameter lainnya

Anda juga dapat menyetel parameter konfigurasi tingkat topik Apache Kafka lainnya. Setelan ini ditentukan sebagai pasangan key=value yang menggantikan setelan default cluster.

Konfigurasi terkait topik memiliki default server dan penggantian per topik opsional. Formatnya adalah daftar pasangan KEY=VALUE yang dipisahkan koma, dengan KEY adalah nama properti konfigurasi topik Kafka, dan VALUE adalah setelan yang diperlukan.Pasangan nilai kunci ini membantu Anda mengganti setelan default kluster. Contohnya mencakup flush.ms=10 dan compression.type=producer.

Untuk mengetahui daftar semua konfigurasi tingkat topik yang didukung, lihat Konfigurasi tingkat topik dalam dokumentasi Apache Kafka.

Membuat topik

Sebelum membuat topik, tinjau properti topik.

Konsol

  1. Di konsol Google Cloud , buka halaman Clusters.

    Buka Cluster

  2. Klik cluster yang topiknya ingin Anda buat.

    Halaman Cluster details akan terbuka.

  3. Di halaman detail cluster, klik Create Topic.

    Halaman Create Kafka topic akan terbuka.

  4. Untuk Topic name, masukkan string.

  5. Untuk Jumlah partisi, masukkan jumlah partisi yang Anda inginkan atau pertahankan nilai default.

  6. Untuk Faktor replikasi, masukkan faktor replikasi yang Anda inginkan atau pertahankan nilai default.

  7. (Opsional) Untuk mengubah konfigurasi topik, tambahkan sebagai pasangan nilai kunci yang dipisahkan koma di kolom Konfigurasi.

  8. Klik Create.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Jalankan perintah gcloud managed-kafka topics create:

    gcloud managed-kafka topics create TOPIC_ID \
        --cluster=CLUSTER_ID --location=LOCATION_ID \
        --partitions=PARTITIONS \
        --replication-factor=REPLICATION_FACTOR \
        --configs=CONFIGS
    

    Ganti kode berikut:

    • TOPIC_ID: Nama topik.
    • CLUSTER: Nama cluster tempat Anda ingin membuat topik.
    • LOCATION: Region cluster.
    • PARTITIONS: Jumlah partisi untuk topik.
    • REPLICATION_FACTOR: Faktor replikasi untuk topik.
    • CONFIGS: Parameter opsional tingkat topik. Tentukan sebagai pasangan nilai kunci yang dipisahkan koma. Contoh, compression.type=producer.
  3. Kafka CLI

    Sebelum menjalankan perintah ini, instal alat command line Kafka di VM Compute Engine. VM harus dapat menjangkau subnet yang terhubung ke cluster Managed Service untuk Apache Kafka. Ikuti petunjuk di Membuat dan menggunakan pesan dengan alat command line Kafka.

    Jalankan perintah kafka-topics.sh sebagai berikut:

    kafka-topics.sh --create --if-not-exists \
      --bootstrap-server=BOOTSTRAP_ADDRESS \
      --command-config client.properties \
      --topic TOPIC_ID \
      --partitions PARTITIONS \
      --replication-factor REPLICATION_FACTOR
    

    Ganti kode berikut:

    • BOOTSTRAP_ADDRESS: Alamat bootstrap cluster Managed Service untuk Apache Kafka.

    • TOPIC_ID: Nama topik.

    • PARTITIONS: Jumlah partisi untuk topik.

    • REPLICATION_FACTOR: Faktor replikasi untuk topik.

    REST

    Sebelum menggunakan salah satu data permintaan, lakukan penggantian berikut:

    • PROJECT_ID: Project ID Google Cloud Anda
    • LOCATION: lokasi cluster
    • CLUSTER_ID: ID cluster
    • TOPIC_ID: ID topik
    • PARTITION_COUNT: jumlah partisi untuk topik
    • REPLICATION_FACTOR: jumlah replika setiap partisi

    Metode HTTP dan URL:

    POST https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics?topicId=TOPIC_ID

    Meminta isi JSON:

    {
      "name": "TOPIC_ID",
      "partitionCount": PARTITION_COUNT,
      "replicationFactor": REPLICATION_FACTOR
    }
    

    Untuk mengirim permintaan Anda, perluas salah satu opsi berikut:

    Anda akan melihat respons JSON yang mirip seperti berikut:

    {
      "name": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC_ID",
      "partitionCount": PARTITION_COUNT,
      "replicationFactor": REPLICATION_FACTOR
    }
    

    Terraform

    Anda dapat menggunakan resource Terraform untuk membuat topik.

    resource "google_managed_kafka_topic" "default" {
      project            = data.google_project.default.project_id # Replace this with your project ID in quotes
      topic_id           = "my-topic-id"
      cluster            = google_managed_kafka_cluster.default.cluster_id
      location           = "us-central1"
      partition_count    = 2
      replication_factor = 3
    }

    Untuk mempelajari cara menerapkan atau menghapus konfigurasi Terraform, lihat Perintah dasar Terraform.

    Go

    Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Go di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Managed Service for Apache Kafka Go API.

    Untuk melakukan autentikasi ke Managed Service untuk Apache Kafka, siapkan Kredensial Default Aplikasi(ADC). Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func createTopic(w io.Writer, projectID, region, clusterID, topicID string, partitionCount, replicationFactor int32, configs map[string]string, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// clusterID := "my-cluster"
    	// topicID := "my-topic"
    	// partitionCount := 10
    	// replicationFactor := 3
    	// configs := map[string]string{"min.insync.replicas":"1"}
    	ctx := context.Background()
    	client, err := managedkafka.NewClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
    	}
    	defer client.Close()
    
    	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
    	topicPath := fmt.Sprintf("%s/topics/%s", clusterPath, topicID)
    	topicConfig := &managedkafkapb.Topic{
    		Name:              topicPath,
    		PartitionCount:    partitionCount,
    		ReplicationFactor: replicationFactor,
    		Configs:           configs,
    	}
    
    	req := &managedkafkapb.CreateTopicRequest{
    		Parent:  clusterPath,
    		TopicId: topicID,
    		Topic:   topicConfig,
    	}
    	topic, err := client.CreateTopic(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.CreateTopic got err: %w", err)
    	}
    	fmt.Fprintf(w, "Created topic: %s\n", topic.Name)
    	return nil
    }
    

    Java

    Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Java di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Managed Service for Apache Kafka Java API.

    Untuk melakukan autentikasi ke Managed Service for Apache Kafka, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.

    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ClusterName;
    import com.google.cloud.managedkafka.v1.CreateTopicRequest;
    import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
    import com.google.cloud.managedkafka.v1.Topic;
    import com.google.cloud.managedkafka.v1.TopicName;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    public class CreateTopic {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-cluster";
        String topicId = "my-topic";
        int partitionCount = 100;
        int replicationFactor = 3;
        Map<String, String> configs =
            new HashMap<String, String>() {
              {
                put("min.insync.replicas", "2");
              }
            };
        createTopic(projectId, region, clusterId, topicId, partitionCount, replicationFactor, configs);
      }
    
      public static void createTopic(
          String projectId,
          String region,
          String clusterId,
          String topicId,
          int partitionCount,
          int replicationFactor,
          Map<String, String> configs)
          throws Exception {
        Topic topic =
            Topic.newBuilder()
                .setName(TopicName.of(projectId, region, clusterId, topicId).toString())
                .setPartitionCount(partitionCount)
                .setReplicationFactor(replicationFactor)
                .putAllConfigs(configs)
                .build();
        try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) {
          CreateTopicRequest request =
              CreateTopicRequest.newBuilder()
                  .setParent(ClusterName.of(projectId, region, clusterId).toString())
                  .setTopicId(topicId)
                  .setTopic(topic)
                  .build();
          // This operation is being handled synchronously.
          Topic response = managedKafkaClient.createTopic(request);
          System.out.printf("Created topic: %s\n", response.getName());
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaClient.createTopic got err: %s", e.getMessage());
        }
      }
    }
    

    Python

    Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Python di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Managed Service for Apache Kafka Python API.

    Untuk melakukan autentikasi ke Managed Service for Apache Kafka, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.

    from google.api_core.exceptions import AlreadyExists
    from google.cloud import managedkafka_v1
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # cluster_id = "my-cluster"
    # topic_id = "my-topic"
    # partition_count = 10
    # replication_factor = 3
    # configs = {"min.insync.replicas": "1"}
    
    client = managedkafka_v1.ManagedKafkaClient()
    
    topic = managedkafka_v1.Topic()
    topic.name = client.topic_path(project_id, region, cluster_id, topic_id)
    topic.partition_count = partition_count
    topic.replication_factor = replication_factor
    # For a list of configs, one can check https://kafka.apache.org/documentation/#topicconfigs
    topic.configs = configs
    
    request = managedkafka_v1.CreateTopicRequest(
        parent=client.cluster_path(project_id, region, cluster_id),
        topic_id=topic_id,
        topic=topic,
    )
    
    try:
        response = client.create_topic(request=request)
        print("Created topic:", response.name)
    except AlreadyExists as e:
        print(f"Failed to create topic {topic.name} with error: {e.message}")
    

Apa langkah selanjutnya?