Membuat topik Google Cloud Managed Service for Apache Kafka

Di Managed Service untuk Apache Kafka, pesan diatur dalam topik. Topik terdiri dari partisi. Partisi adalah urutan kumpulan data yang berurutan dan tidak dapat diubah yang dimiliki oleh satu broker dalam cluster Kafka. Anda harus membuat topik untuk memublikasikan atau menggunakan pesan.

Untuk membuat topik, Anda dapat menggunakan Google Cloud konsol, Google Cloud CLI, library klien, Managed Kafka API, atau Apache Kafka API open source.

Sebelum memulai

Anda harus membuat cluster terlebih dahulu sebelum membuat topik. Pastikan Anda telah menyiapkan hal berikut:

Peran dan izin yang diperlukan untuk membuat topik

Untuk mendapatkan izin yang diperlukan guna membuat topik, minta administrator untuk memberi Anda peran IAM Managed Kafka Topic Editor (roles/managedkafka.topicEditor) di project Anda. Untuk mengetahui informasi selengkapnya tentang cara memberikan peran, lihat Mengelola akses ke project, folder, dan organisasi.

Peran bawaan ini berisi izin yang diperlukan untuk membuat topik. Untuk melihat izin yang benar-benar diperlukan, perluas bagian Izin yang diperlukan:

Izin yang diperlukan

Izin berikut diperlukan untuk membuat topik:

  • Membuat topik: managedkafka.topics.create

Anda mungkin juga bisa mendapatkan izin ini dengan peran khusus atau peran bawaan lainnya.

Properti topik Managed Service untuk Apache Kafka

Saat membuat atau memperbarui topik Managed Service untuk Apache Kafka, Anda harus menentukan properti berikut.

Nama topik

Nama topik Managed Service untuk Apache Kafka yang Anda buat. Untuk mengetahui panduan tentang cara memberi nama topik, lihat Panduan untuk memberi nama resource Managed Service untuk Apache Kafka. Nama topik tidak dapat diubah.

Jumlah partisi

Jumlah partisi dalam topik. Anda dapat mengedit topik untuk meningkatkan jumlah partisi untuk topik, tetapi Anda tidak dapat menguranginya. Meningkatkan jumlah partisi untuk topik yang menggunakan kunci dapat mengubah cara pesan didistribusikan.

Faktor replikasi

Jumlah replika untuk setiap partisi. Jika Anda tidak menentukan nilai, faktor replikasi default cluster akan digunakan.

Faktor replikasi yang lebih tinggi dapat meningkatkan konsistensi data jika terjadi kegagalan broker, karena data direplikasi ke beberapa broker. Untuk lingkungan produksi, sebaiknya gunakan faktor replikasi 3 atau lebih tinggi. Jumlah replika yang lebih tinggi akan meningkatkan biaya penyimpanan lokal dan transfer data untuk topik. Namun, hal ini tidak meningkatkan biaya penyimpanan persisten. Faktor replikasi tidak boleh melebihi jumlah broker yang tersedia.

Parameter lainnya

Anda juga dapat menetapkan parameter konfigurasi tingkat topik Apache Kafka lainnya. Parameter ini ditentukan sebagai pasangan key=value yang mengganti nilai default cluster.

Konfigurasi yang terkait dengan topik memiliki default server dan penggantian per topik opsional. Formatnya adalah daftar pasangan KEY=VALUE yang dipisahkan koma, dengan KEY adalah nama properti konfigurasi topik Kafka, dan VALUE adalah setelan yang diperlukan.Pasangan nilai kunci ini membantu Anda mengganti nilai default cluster. Contohnya mencakup flush.ms=10 dan compression.type=producer.

Untuk mengetahui daftar semua konfigurasi tingkat topik yang didukung, lihat Konfigurasi tingkat topik dalam dokumentasi Apache Kafka.

Membuat topik

Sebelum membuat topik, tinjau properti topik.

Konsol

  1. Di Google Cloud konsol, buka halaman Clusters.

    Buka Cluster

  2. Klik cluster yang ingin Anda buatkan topik.

    Halaman Cluster details akan terbuka.

  3. Di halaman detail cluster, klik Create Topic.

    Halaman Create Kafka topic akan terbuka.

  4. Untuk Topic name, masukkan string.

  5. Untuk Partition count, masukkan jumlah partisi yang Anda inginkan atau pertahankan nilai default.

  6. Untuk Replication factor, masukkan faktor replikasi yang Anda inginkan atau pertahankan nilai default.

  7. (Opsional) Untuk mengubah konfigurasi topik, tambahkan sebagai pasangan nilai kunci yang dipisahkan koma di kolom Configurations.

  8. Klik Create.

gcloud

  1. Di konsol, aktifkan Cloud Shell. Google Cloud

    Aktifkan Cloud Shell

    Di bagian bawah konsol Google Cloud , sesi Cloud Shell akan dimulai dan menampilkan prompt command line. Cloud Shell adalah lingkungan shell dengan Google Cloud CLI yang sudah terinstal, dan dengan nilai yang sudah ditetapkan untuk project Anda saat ini. Diperlukan waktu beberapa detik untuk melakukan inisialisasi pada sesi.

  2. Jalankan gcloud managed-kafka topics create perintah:

    gcloud managed-kafka topics create TOPIC_ID \
        --cluster=CLUSTER_ID --location=LOCATION_ID \
        --partitions=PARTITIONS \
        --replication-factor=REPLICATION_FACTOR \
        --configs=CONFIGS
    

    Ganti kode berikut:

    • TOPIC_ID: Nama topik.
    • CLUSTER: Nama cluster tempat Anda ingin membuat topik.
    • LOCATION: Region cluster.
    • PARTITIONS: Jumlah partisi untuk topik.
    • REPLICATION_FACTOR: Faktor replikasi untuk topik.
    • CONFIGS: Parameter opsional tingkat topik. Tentukan sebagai pasangan nilai kunci yang dipisahkan koma. Misalnya, compression.type=producer.

Kafka CLI

Sebelum menjalankan perintah ini, instal alat command line Kafka di VM Compute Engine. VM harus dapat menjangkau subnet yang terhubung ke cluster Managed Service untuk Apache Kafka Anda. Ikuti petunjuk di Membuat dan menggunakan pesan dengan alat command line Kafka.

Jalankan perintah kafka-topics.sh sebagai berikut:

kafka-topics.sh --create --if-not-exists \
  --bootstrap-server=BOOTSTRAP_ADDRESS \
  --command-config client.properties \
  --topic TOPIC_ID \
  --partitions PARTITIONS \
  --replication-factor REPLICATION_FACTOR

Ganti kode berikut:

REST

Sebelum menggunakan salah satu data permintaan, lakukan penggantian berikut:

  • PROJECT_ID: ID proyek Google Cloud Anda
  • LOCATION: lokasi cluster
  • CLUSTER_ID: ID cluster
  • TOPIC_ID: ID topik
  • PARTITION_COUNT: jumlah partisi untuk topik
  • REPLICATION_FACTOR: jumlah replika setiap partisi

Metode HTTP dan URL:

POST https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics?topicId=TOPIC_ID

Meminta isi JSON:

{
  "name": "TOPIC_ID",
  "partitionCount": PARTITION_COUNT,
  "replicationFactor": REPLICATION_FACTOR
}

Untuk mengirim permintaan Anda, perluas salah satu opsi berikut:

Anda akan melihat respons JSON yang mirip seperti berikut:

{
  "name": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC_ID",
  "partitionCount": PARTITION_COUNT,
  "replicationFactor": REPLICATION_FACTOR
}

Terraform

Anda dapat menggunakan resource Terraform untuk membuat a topik.

resource "google_managed_kafka_topic" "default" {
  project            = data.google_project.default.project_id # Replace this with your project ID in quotes
  topic_id           = "my-topic-id"
  cluster            = google_managed_kafka_cluster.default.cluster_id
  location           = "us-central1"
  partition_count    = 2
  replication_factor = 3
}

Untuk mempelajari cara menerapkan atau menghapus konfigurasi Terraform, lihat Perintah dasar Terraform.

Go

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Go di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat Dokumentasi referensi Managed Service untuk Apache Kafka Go API.

Untuk melakukan autentikasi ke Managed Service untuk Apache Kafka, siapkan Kredensial Default Aplikasi(ADC). Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/option"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
)

func createTopic(w io.Writer, projectID, region, clusterID, topicID string, partitionCount, replicationFactor int32, configs map[string]string, opts ...option.ClientOption) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// clusterID := "my-cluster"
	// topicID := "my-topic"
	// partitionCount := 10
	// replicationFactor := 3
	// configs := map[string]string{"min.insync.replicas":"1"}
	ctx := context.Background()
	client, err := managedkafka.NewClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
	}
	defer client.Close()

	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
	topicPath := fmt.Sprintf("%s/topics/%s", clusterPath, topicID)
	topicConfig := &managedkafkapb.Topic{
		Name:              topicPath,
		PartitionCount:    partitionCount,
		ReplicationFactor: replicationFactor,
		Configs:           configs,
	}

	req := &managedkafkapb.CreateTopicRequest{
		Parent:  clusterPath,
		TopicId: topicID,
		Topic:   topicConfig,
	}
	topic, err := client.CreateTopic(ctx, req)
	if err != nil {
		return fmt.Errorf("client.CreateTopic got err: %w", err)
	}
	fmt.Fprintf(w, "Created topic: %s\n", topic.Name)
	return nil
}

Java

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Java di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat Dokumentasi referensi Managed Service untuk Apache Kafka Java API.

Untuk melakukan autentikasi ke Managed Service untuk Apache Kafka, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.managedkafka.v1.ClusterName;
import com.google.cloud.managedkafka.v1.CreateTopicRequest;
import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
import com.google.cloud.managedkafka.v1.Topic;
import com.google.cloud.managedkafka.v1.TopicName;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class CreateTopic {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String clusterId = "my-cluster";
    String topicId = "my-topic";
    int partitionCount = 100;
    int replicationFactor = 3;
    Map<String, String> configs =
        new HashMap<String, String>() {
          {
            put("min.insync.replicas", "2");
          }
        };
    createTopic(projectId, region, clusterId, topicId, partitionCount, replicationFactor, configs);
  }

  public static void createTopic(
      String projectId,
      String region,
      String clusterId,
      String topicId,
      int partitionCount,
      int replicationFactor,
      Map<String, String> configs)
      throws Exception {
    Topic topic =
        Topic.newBuilder()
            .setName(TopicName.of(projectId, region, clusterId, topicId).toString())
            .setPartitionCount(partitionCount)
            .setReplicationFactor(replicationFactor)
            .putAllConfigs(configs)
            .build();
    try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) {
      CreateTopicRequest request =
          CreateTopicRequest.newBuilder()
              .setParent(ClusterName.of(projectId, region, clusterId).toString())
              .setTopicId(topicId)
              .setTopic(topic)
              .build();
      // This operation is being handled synchronously.
      Topic response = managedKafkaClient.createTopic(request);
      System.out.printf("Created topic: %s\n", response.getName());
    } catch (IOException | ApiException e) {
      System.err.printf("managedKafkaClient.createTopic got err: %s", e.getMessage());
    }
  }
}

Python

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Python di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat Dokumentasi referensi Managed Service untuk Apache Kafka Python API.

Untuk melakukan autentikasi ke Managed Service untuk Apache Kafka, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.

from google.api_core.exceptions import AlreadyExists
from google.cloud import managedkafka_v1

# TODO(developer)
# project_id = "my-project-id"
# region = "us-central1"
# cluster_id = "my-cluster"
# topic_id = "my-topic"
# partition_count = 10
# replication_factor = 3
# configs = {"min.insync.replicas": "1"}

client = managedkafka_v1.ManagedKafkaClient()

topic = managedkafka_v1.Topic()
topic.name = client.topic_path(project_id, region, cluster_id, topic_id)
topic.partition_count = partition_count
topic.replication_factor = replication_factor
# For a list of configs, one can check https://kafka.apache.org/documentation/#topicconfigs
topic.configs = configs

request = managedkafka_v1.CreateTopicRequest(
    parent=client.cluster_path(project_id, region, cluster_id),
    topic_id=topic_id,
    topic=topic,
)

try:
    response = client.create_topic(request=request)
    print("Created topic:", response.name)
except AlreadyExists as e:
    print(f"Failed to create topic {topic.name} with error: {e.message}")

Apa langkah selanjutnya?