建立 Kafka 主題

建立 Kafka 主題

深入探索

如需包含這個程式碼範例的詳細說明文件,請參閱下列文章:

程式碼範例

Go

在試用這個範例之前,請先按照「使用用戶端程式庫的 Managed Service for Apache Kafka 快速入門導覽課程」中的 Go 設定說明操作。詳情請參閱 Managed Service for Apache Kafka Go API 參考文件

如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定驗證機制」。

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/option"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
)

func createTopic(w io.Writer, projectID, region, clusterID, topicID string, partitionCount, replicationFactor int32, configs map[string]string, opts ...option.ClientOption) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// clusterID := "my-cluster"
	// topicID := "my-topic"
	// partitionCount := 10
	// replicationFactor := 3
	// configs := map[string]string{"min.insync.replicas":"1"}
	ctx := context.Background()
	client, err := managedkafka.NewClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
	}
	defer client.Close()

	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
	topicPath := fmt.Sprintf("%s/topics/%s", clusterPath, topicID)
	topicConfig := &managedkafkapb.Topic{
		Name:              topicPath,
		PartitionCount:    partitionCount,
		ReplicationFactor: replicationFactor,
		Configs:           configs,
	}

	req := &managedkafkapb.CreateTopicRequest{
		Parent:  clusterPath,
		TopicId: topicID,
		Topic:   topicConfig,
	}
	topic, err := client.CreateTopic(ctx, req)
	if err != nil {
		return fmt.Errorf("client.CreateTopic got err: %w", err)
	}
	fmt.Fprintf(w, "Created topic: %s\n", topic.Name)
	return nil
}

Java

在試用這個範例之前,請先按照「使用用戶端程式庫的 Managed Service for Apache Kafka 快速入門導覽課程」中的 Java 設定說明操作。詳情請參閱 Managed Service for Apache Kafka Java API 參考文件

如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定驗證機制」。

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.managedkafka.v1.ClusterName;
import com.google.cloud.managedkafka.v1.CreateTopicRequest;
import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
import com.google.cloud.managedkafka.v1.Topic;
import com.google.cloud.managedkafka.v1.TopicName;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class CreateTopic {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String clusterId = "my-cluster";
    String topicId = "my-topic";
    int partitionCount = 100;
    int replicationFactor = 3;
    Map<String, String> configs =
        new HashMap<String, String>() {
          {
            put("min.insync.replicas", "2");
          }
        };
    createTopic(projectId, region, clusterId, topicId, partitionCount, replicationFactor, configs);
  }

  public static void createTopic(
      String projectId,
      String region,
      String clusterId,
      String topicId,
      int partitionCount,
      int replicationFactor,
      Map<String, String> configs)
      throws Exception {
    Topic topic =
        Topic.newBuilder()
            .setName(TopicName.of(projectId, region, clusterId, topicId).toString())
            .setPartitionCount(partitionCount)
            .setReplicationFactor(replicationFactor)
            .putAllConfigs(configs)
            .build();
    try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) {
      CreateTopicRequest request =
          CreateTopicRequest.newBuilder()
              .setParent(ClusterName.of(projectId, region, clusterId).toString())
              .setTopicId(topicId)
              .setTopic(topic)
              .build();
      // This operation is being handled synchronously.
      Topic response = managedKafkaClient.createTopic(request);
      System.out.printf("Created topic: %s\n", response.getName());
    } catch (IOException | ApiException e) {
      System.err.printf("managedKafkaClient.createTopic got err: %s", e.getMessage());
    }
  }
}

Python

在試用這個範例之前,請先按照「使用用戶端程式庫的 Managed Service for Apache Kafka 快速入門導覽課程」中的 Python 設定說明操作。詳情請參閱 Managed Service for Apache Kafka Python API 參考文件

如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定驗證機制」。

from google.api_core.exceptions import AlreadyExists
from google.cloud import managedkafka_v1

# TODO(developer)
# project_id = "my-project-id"
# region = "us-central1"
# cluster_id = "my-cluster"
# topic_id = "my-topic"
# partition_count = 10
# replication_factor = 3
# configs = {"min.insync.replicas": "1"}

client = managedkafka_v1.ManagedKafkaClient()

topic = managedkafka_v1.Topic()
topic.name = client.topic_path(project_id, region, cluster_id, topic_id)
topic.partition_count = partition_count
topic.replication_factor = replication_factor
# For a list of configs, one can check https://kafka.apache.org/documentation/#topicconfigs
topic.configs = configs

request = managedkafka_v1.CreateTopicRequest(
    parent=client.cluster_path(project_id, region, cluster_id),
    topic_id=topic_id,
    topic=topic,
)

try:
    response = client.create_topic(request=request)
    print("Created topic:", response.name)
except AlreadyExists as e:
    print(f"Failed to create topic {topic.name} with error: {e.message}")

Terraform

如要瞭解如何套用或移除 Terraform 設定,請參閱「基本 Terraform 指令」。詳情請參閱 Terraform 供應商參考文件

resource "google_managed_kafka_topic" "default" {
  project            = data.google_project.default.project_id # Replace this with your project ID in quotes
  topic_id           = "my-topic-id"
  cluster            = google_managed_kafka_cluster.default.cluster_id
  location           = "us-central1"
  partition_count    = 2
  replication_factor = 3
}

後續步驟

如要搜尋及篩選其他 Google Cloud 產品的程式碼範例,請參閱Google Cloud 範例瀏覽工具