Criar um tópico do Serviço gerenciado do Google Cloud para Apache Kafka

No Serviço Gerenciado para Apache Kafka, as mensagens são organizadas em tópicos. Um tópico é composto por partições. Uma partição é uma sequência ordenada e imutável de registros pertencentes a um único agente em um cluster do Kafka. É necessário criar um tópico para publicar ou consumir mensagens.

Para criar um tópico, use o console Google Cloud , a Google Cloud CLI, a biblioteca de cliente, a API Managed Kafka ou as APIs Apache Kafka de código aberto.

Antes de começar

Primeiro, crie um cluster antes de criar um tópico. Verifique se você configurou o seguinte:

Papéis e permissões necessários para criar um tópico

Para receber as permissões necessárias para criar um tópico, peça ao administrador para conceder a você o papel do IAM de Editor de tópicos gerenciados do Kafka (roles/managedkafka.topicEditor) no projeto. Para mais informações sobre a concessão de papéis, consulte Gerenciar o acesso a projetos, pastas e organizações.

Esse papel predefinido contém as permissões necessárias para criar um tópico. Para acessar as permissões exatas necessárias, expanda a seção Permissões necessárias:

Permissões necessárias

As seguintes permissões são necessárias para criar um tópico:

  • Crie um tópico: managedkafka.topics.create

Essas permissões também podem ser concedidas com funções personalizadas ou outros papéis predefinidos.

Propriedades de um tópico do serviço gerenciado para Apache Kafka

Ao criar ou atualizar um tópico do Serviço gerenciado para Apache Kafka, é necessário especificar as seguintes propriedades.

Nome do tópico

O nome do tópico do Serviço gerenciado para Apache Kafka que você está criando. Para conferir as diretrizes de nomeação de um tópico, consulte Diretrizes de nomeação de um recurso do Serviço gerenciado para Apache Kafka. O nome de um tópico é imutável.

Contagem de partições

O número de partições no tópico. É possível editar um tópico para aumentar a contagem de partições, mas não diminuir. Aumentar o número de partições de um tópico que usa uma chave pode mudar a forma como as mensagens são distribuídas.

Fator de replicação

O número de réplicas para cada partição. Se você não especificar o valor, o fator de replicação padrão do cluster será usado.

Um fator de replicação maior pode melhorar a consistência dos dados em caso de falhas do broker, já que os dados são replicados para vários brokers. Para ambientes de produção, recomendamos um fator de replicação de 3 ou mais. Contagens de réplicas mais altas aumentam os custos de armazenamento local e transferência de dados para o tópico. No entanto, eles não aumentam os custos de armazenamento permanente. O fator de replicação não pode exceder o número de brokers disponíveis.

Outros parâmetros

Também é possível definir outros parâmetros de configuração no nível do tópico do Apache Kafka. Eles são especificados como pares key=value que substituem os padrões do cluster.

As configurações relacionadas a tópicos têm um padrão de servidor e uma substituição opcional por tópico. O formato é uma lista separada por vírgulas de pares KEY=VALUE, em que KEY é o nome da propriedade de configuração do tópico do Kafka e VALUE é a configuração obrigatória.Esses pares de chave-valor ajudam a substituir os padrões do cluster. Por exemplo, flush.ms=10 e compression.type=producer.

Para uma lista com todas as configurações de nível do tópico aceitas, consulte Configurações de nível do tópico na documentação do Apache Kafka.

Criar um tópico

Antes de criar um tópico, revise as propriedades dele.

Console

  1. No Google Cloud console, acesse a página Clusters.

    Acessar Clusters

  2. Clique no cluster para o qual você quer criar um tópico.

    A página Detalhes do cluster é aberta.

  3. Na página de detalhes do cluster, clique em Criar tópico.

    A página Criar tópico do Kafka é aberta.

  4. Em Nome do tópico, insira uma string.

  5. Em Contagem de partições, insira o número de partições que você quer ou mantenha o valor padrão.

  6. Em Fator de replicação, insira o valor desejado ou mantenha o padrão.

  7. (Opcional) Para alterar as configurações de um tema, adicione-as como pares de chave-valor separados por vírgula no campo Configurações.

  8. Clique em Criar.

gcloud

  1. No console do Google Cloud , ative o Cloud Shell.

    Ativar o Cloud Shell

    Na parte de baixo do console Google Cloud , uma sessão do Cloud Shell é iniciada e exibe um prompt de linha de comando. O Cloud Shell é um ambiente shell com a CLI do Google Cloud já instalada e com valores já definidos para o projeto atual. A inicialização da sessão pode levar alguns segundos.

  2. Execute o comando gcloud managed-kafka topics create:

    gcloud managed-kafka topics create TOPIC_ID \
        --cluster=CLUSTER_ID --location=LOCATION_ID \
        --partitions=PARTITIONS \
        --replication-factor=REPLICATION_FACTOR \
        --configs=CONFIGS
    

    Substitua:

    • TOPIC_ID: o nome do tópico.
    • CLUSTER: o nome do cluster em que você quer criar o tópico.
    • LOCATION: a região do cluster.
    • PARTITIONS: o número de partições do tópico.
    • REPLICATION_FACTOR: o fator de replicação do tema.
    • CONFIGS: parâmetros opcionais no nível do tópico. Especifique como pares de chave-valor separados por vírgulas. Por exemplo, compression.type=producer.

CLI do Kafka

Antes de executar esse comando, instale as ferramentas de linha de comando do Kafka em uma VM do Compute Engine. A VM precisa conseguir acessar uma sub-rede conectada ao cluster do Serviço Gerenciado para Apache Kafka. Siga as instruções em Produzir e consumir mensagens com as ferramentas de linha de comando do Kafka.

Execute o comando kafka-topics.sh da seguinte maneira:

kafka-topics.sh --create --if-not-exists \
  --bootstrap-server=BOOTSTRAP_ADDRESS \
  --command-config client.properties \
  --topic TOPIC_ID \
  --partitions PARTITIONS \
  --replication-factor REPLICATION_FACTOR

Substitua:

  • BOOTSTRAP_ADDRESS: o endereço de inicialização do cluster do Serviço Gerenciado para Apache Kafka.

  • TOPIC_ID: o nome do tópico.

  • PARTITIONS: o número de partições do tópico.

  • REPLICATION_FACTOR: o fator de replicação do tópico.

REST

Antes de usar os dados da solicitação abaixo, faça as substituições a seguir:

  • PROJECT_ID: o ID do projeto do Google Cloud
  • LOCATION: o local do cluster
  • CLUSTER_ID: o ID do cluster
  • TOPIC_ID: o ID do tópico
  • PARTITION_COUNT: o número de partições do tópico.
  • REPLICATION_FACTOR: o número de réplicas de cada partição.

Método HTTP e URL:

POST https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics?topicId=TOPIC_ID

Corpo JSON da solicitação:

{
  "name": "TOPIC_ID",
  "partitionCount": PARTITION_COUNT,
  "replicationFactor": REPLICATION_FACTOR
}

Para enviar a solicitação, expanda uma destas opções:

Você receberá uma resposta JSON semelhante a esta:

{
  "name": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC_ID",
  "partitionCount": PARTITION_COUNT,
  "replicationFactor": REPLICATION_FACTOR
}

Terraform

É possível usar um recurso do Terraform para criar um tópico.

resource "google_managed_kafka_topic" "default" {
  project            = data.google_project.default.project_id # Replace this with your project ID in quotes
  topic_id           = "my-topic-id"
  cluster            = google_managed_kafka_cluster.default.cluster_id
  location           = "us-central1"
  partition_count    = 2
  replication_factor = 3
}

Para saber como aplicar ou remover uma configuração do Terraform, consulte Comandos básicos do Terraform.

Go

Antes de testar esta amostra, siga as instruções de configuração do Go em Instalar as bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Go do serviço gerenciado para Apache Kafka.

Para autenticar o serviço gerenciado para Apache Kafka, configure o Application Default Credentials(ADC). Para mais informações, consulte Configurar o ADC para um ambiente de desenvolvimento local.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/option"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
)

func createTopic(w io.Writer, projectID, region, clusterID, topicID string, partitionCount, replicationFactor int32, configs map[string]string, opts ...option.ClientOption) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// clusterID := "my-cluster"
	// topicID := "my-topic"
	// partitionCount := 10
	// replicationFactor := 3
	// configs := map[string]string{"min.insync.replicas":"1"}
	ctx := context.Background()
	client, err := managedkafka.NewClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
	}
	defer client.Close()

	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
	topicPath := fmt.Sprintf("%s/topics/%s", clusterPath, topicID)
	topicConfig := &managedkafkapb.Topic{
		Name:              topicPath,
		PartitionCount:    partitionCount,
		ReplicationFactor: replicationFactor,
		Configs:           configs,
	}

	req := &managedkafkapb.CreateTopicRequest{
		Parent:  clusterPath,
		TopicId: topicID,
		Topic:   topicConfig,
	}
	topic, err := client.CreateTopic(ctx, req)
	if err != nil {
		return fmt.Errorf("client.CreateTopic got err: %w", err)
	}
	fmt.Fprintf(w, "Created topic: %s\n", topic.Name)
	return nil
}

Java

Antes de testar esta amostra, siga as instruções de configuração do Java em Instalar as bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Java do serviço gerenciado para Apache Kafka.

Para autenticar o serviço gerenciado para Apache Kafka, configure o Application Default Credentials. Para mais informações, consulte Configurar o ADC para um ambiente de desenvolvimento local.

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.managedkafka.v1.ClusterName;
import com.google.cloud.managedkafka.v1.CreateTopicRequest;
import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
import com.google.cloud.managedkafka.v1.Topic;
import com.google.cloud.managedkafka.v1.TopicName;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class CreateTopic {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String clusterId = "my-cluster";
    String topicId = "my-topic";
    int partitionCount = 100;
    int replicationFactor = 3;
    Map<String, String> configs =
        new HashMap<String, String>() {
          {
            put("min.insync.replicas", "2");
          }
        };
    createTopic(projectId, region, clusterId, topicId, partitionCount, replicationFactor, configs);
  }

  public static void createTopic(
      String projectId,
      String region,
      String clusterId,
      String topicId,
      int partitionCount,
      int replicationFactor,
      Map<String, String> configs)
      throws Exception {
    Topic topic =
        Topic.newBuilder()
            .setName(TopicName.of(projectId, region, clusterId, topicId).toString())
            .setPartitionCount(partitionCount)
            .setReplicationFactor(replicationFactor)
            .putAllConfigs(configs)
            .build();
    try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) {
      CreateTopicRequest request =
          CreateTopicRequest.newBuilder()
              .setParent(ClusterName.of(projectId, region, clusterId).toString())
              .setTopicId(topicId)
              .setTopic(topic)
              .build();
      // This operation is being handled synchronously.
      Topic response = managedKafkaClient.createTopic(request);
      System.out.printf("Created topic: %s\n", response.getName());
    } catch (IOException | ApiException e) {
      System.err.printf("managedKafkaClient.createTopic got err: %s", e.getMessage());
    }
  }
}

Python

Antes de testar esta amostra, siga as instruções de configuração do Python em Instalar as bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Python do serviço gerenciado para Apache Kafka.

Para autenticar o serviço gerenciado para Apache Kafka, configure o Application Default Credentials. Para mais informações, consulte Configurar o ADC para um ambiente de desenvolvimento local.

from google.api_core.exceptions import AlreadyExists
from google.cloud import managedkafka_v1

# TODO(developer)
# project_id = "my-project-id"
# region = "us-central1"
# cluster_id = "my-cluster"
# topic_id = "my-topic"
# partition_count = 10
# replication_factor = 3
# configs = {"min.insync.replicas": "1"}

client = managedkafka_v1.ManagedKafkaClient()

topic = managedkafka_v1.Topic()
topic.name = client.topic_path(project_id, region, cluster_id, topic_id)
topic.partition_count = partition_count
topic.replication_factor = replication_factor
# For a list of configs, one can check https://kafka.apache.org/documentation/#topicconfigs
topic.configs = configs

request = managedkafka_v1.CreateTopicRequest(
    parent=client.cluster_path(project_id, region, cluster_id),
    topic_id=topic_id,
    topic=topic,
)

try:
    response = client.create_topic(request=request)
    print("Created topic:", response.name)
except AlreadyExists as e:
    print(f"Failed to create topic {topic.name} with error: {e.message}")

A seguir