Criar um tópico do Serviço gerenciado do Google Cloud para Apache Kafka

No Serviço Gerenciado para Apache Kafka, as mensagens são organizadas em tópicos. Um tópico é composto por partições. Uma partição é uma sequência ordenada e imutável de registros pertencentes a um único agente em um cluster do Kafka. É necessário criar um tópico para publicar ou consumir mensagens.

Para criar um tópico, use o Google Cloud console, a Google Cloud CLI, a biblioteca de cliente, a API Managed Kafka ou as APIs de código aberto do Apache Kafka.

Antes de começar

Primeiro, crie um cluster antes de criar um tópico. Verifique se você configurou o seguinte:

Papéis e permissões necessários para criar um tópico

Para receber as permissões necessárias para criar um tópico, peça ao administrador para conceder a você o papel do IAM de Editor de tópicos do Managed Kafka (roles/managedkafka.topicEditor) no projeto. Para mais informações sobre a concessão de papéis, consulte Gerenciar o acesso a projetos, pastas e organizações.

Esse papel predefinido contém as permissões necessárias para criar um tópico. Para acessar as permissões exatas que são necessárias, expanda a seção Permissões necessárias:

Permissões necessárias

As seguintes permissões são necessárias para criar um tópico:

  • Criar um tópico: managedkafka.topics.create

Essas permissões também podem ser concedidas com funções personalizadas ou outros papéis predefinidos.

O papel de Editor de tópicos do Managed Kafka também contém o papel de Leitor do Managed Kafka. Para mais informações sobre esse papel, consulte Papéis predefinidos do Serviço Gerenciado para Apache Kafka.

Propriedades de um tópico do Serviço Gerenciado para Apache Kafka

Ao criar ou atualizar um tópico do Serviço Gerenciado para Apache Kafka, é necessário especificar as seguintes propriedades.

Nome do tópico

O nome do tópico do Serviço Gerenciado para Apache Kafka que você está criando. Para conferir as diretrizes de nomeação de um tópico, consulte Diretrizes de nomeação de um recurso do Serviço Gerenciado para Apache Kafka. O nome de um tópico é imutável.

Contagem de partições

O número de partições no tópico. É possível editar um tópico para aumentar a contagem de partições, mas não é possível diminuí-la. Aumentar o número de partições de um tópico que usa uma chave pode mudar a forma como as mensagens são distribuídas.

Fator de replicação

O número de réplicas para cada partição. Se você não especificar o valor, o fator de replicação padrão do cluster será usado.

Um fator de replicação mais alto pode melhorar a consistência dos dados em caso de falhas do agente, já que os dados são replicados para vários agentes. Para ambientes de produção, é recomendável usar um fator de replicação de 3 ou mais. Contagens de réplicas mais altas aumentam os custos de armazenamento local e de transferência de dados para o tópico. No entanto, elas não aumentam os custos de armazenamento permanente. O fator de replicação não pode exceder o número de agentes disponíveis.

Outros parâmetros

Também é possível definir outros parâmetros de configuração de nível do tópico do Apache Kafka. Eles são especificados como pares key=value que substituem os padrões do cluster.

As configurações relacionadas a tópicos têm um padrão de servidor e uma substituição opcional por tópico. O formato é uma lista separada por vírgulas de pares KEY=VALUE, em que KEY é o nome da propriedade de configuração do tópico do Kafka e VALUE é a configuração necessária.Esses pares de chave-valor ajudam a substituir os padrões do cluster. Os exemplos incluem flush.ms=10 e compression.type=producer.

Para uma lista com todas as configurações de nível do tópico aceitas, consulte Configurações de nível do tópico na documentação do Apache Kafka.

Criar um tópico

Antes de criar um tópico, revise as propriedades dele.

Console

  1. No Google Cloud console, acesse a página Clusters.

    Acessar Clusters

  2. Clique no cluster para o qual você quer criar um tópico.

    A página Detalhes do cluster será aberta.

  3. Na página de detalhes do cluster, clique em Criar tópico.

    A página Criar tópico do Kafka será aberta.

  4. Em Nome do tópico, insira uma string.

  5. Em Contagem de partições, insira o número de partições desejado ou mantenha o valor padrão.

  6. Em Fator de replicação, insira o fator de replicação desejado ou mantenha o valor padrão.

  7. Opcional: para alterar as configurações do tópico, adicione-as como pares de chave-valor separados por vírgulas no campo Configurações.

  8. Clique em Criar.

gcloud

  1. No Google Cloud console, ative o Cloud Shell.

    Ativar o Cloud Shell

    Na parte de baixo do Google Cloud console, uma sessão do Cloud Shell é iniciada e exibe um prompt de linha de comando. O Cloud Shell é um ambiente shell com a Google Cloud CLI já instalada e com valores já definidos para o projeto atual. A inicialização da sessão pode levar alguns segundos.

  2. Execute o gcloud managed-kafka topics create comando:

    gcloud managed-kafka topics create TOPIC_ID \
        --cluster=CLUSTER_ID --location=LOCATION_ID \
        --partitions=PARTITIONS \
        --replication-factor=REPLICATION_FACTOR \
        --configs=CONFIGS
    

    Substitua:

    • TOPIC_ID: o nome do tópico.
    • CLUSTER: o nome do cluster em que você quer criar o tópico.
    • LOCATION: a região do cluster.
    • PARTITIONS: o número de partições do tópico.
    • REPLICATION_FACTOR: o fator de replicação do tópico.
    • CONFIGS: parâmetros opcionais de nível do tópico. Especifique como pares de chave-valor separados por vírgulas. Por exemplo, compression.type=producer.

CLI do Kafka

Antes de executar esse comando, instale as ferramentas de linha de comando do Kafka em uma VM do Compute Engine. A VM precisa conseguir acessar uma sub-rede conectada ao cluster do Serviço Gerenciado para Apache Kafka. Siga as instruções em Produzir e consumir mensagens com as ferramentas de linha de comando do Kafka.

Execute o comando kafka-topics.sh da seguinte maneira:

kafka-topics.sh --create --if-not-exists \
  --bootstrap-server=BOOTSTRAP_ADDRESS \
  --command-config client.properties \
  --topic TOPIC_ID \
  --partitions PARTITIONS \
  --replication-factor REPLICATION_FACTOR

Substitua:

  • BOOTSTRAP_ADDRESS: o endereço de inicialização do cluster do Serviço Gerenciado para Apache Kafka.

  • TOPIC_ID: o nome do tópico.

  • PARTITIONS: o número de partições do tópico.

  • REPLICATION_FACTOR: o fator de replicação do tópico.

REST

Antes de usar os dados da solicitação abaixo, faça as substituições a seguir:

  • PROJECT_ID: ID do seu Google Cloud projeto
  • LOCATION: o local do cluster
  • CLUSTER_ID: o ID do cluster
  • TOPIC_ID: o ID do tópico
  • PARTITION_COUNT: o número de partições do tópico
  • REPLICATION_FACTOR: o número de réplicas de cada partição

Método HTTP e URL:

POST https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics?topicId=TOPIC_ID

Corpo JSON da solicitação:

{
  "name": "TOPIC_ID",
  "partitionCount": PARTITION_COUNT,
  "replicationFactor": REPLICATION_FACTOR
}

Para enviar a solicitação, expanda uma destas opções:

Você receberá uma resposta JSON semelhante a esta:

{
  "name": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC_ID",
  "partitionCount": PARTITION_COUNT,
  "replicationFactor": REPLICATION_FACTOR
}

Terraform

É possível usar um recurso do Terraform para criar um tópico.

resource "google_managed_kafka_topic" "default" {
  project            = data.google_project.default.project_id # Replace this with your project ID in quotes
  topic_id           = "my-topic-id"
  cluster            = google_managed_kafka_cluster.default.cluster_id
  location           = "us-central1"
  partition_count    = 2
  replication_factor = 3
}

Para saber como aplicar ou remover uma configuração do Terraform, consulte Comandos básicos do Terraform.

Go

Antes de testar este exemplo, siga as instruções de configuração do Go em Instalar as bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Go do Serviço Gerenciado para Apache Kafka.

Para autenticar o Serviço Gerenciado para Apache Kafka, configure o Application Default Credentials(ADC). Para mais informações, consulte Configurar o ADC para um ambiente de desenvolvimento local.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/option"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
)

func createTopic(w io.Writer, projectID, region, clusterID, topicID string, partitionCount, replicationFactor int32, configs map[string]string, opts ...option.ClientOption) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// clusterID := "my-cluster"
	// topicID := "my-topic"
	// partitionCount := 10
	// replicationFactor := 3
	// configs := map[string]string{"min.insync.replicas":"1"}
	ctx := context.Background()
	client, err := managedkafka.NewClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
	}
	defer client.Close()

	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
	topicPath := fmt.Sprintf("%s/topics/%s", clusterPath, topicID)
	topicConfig := &managedkafkapb.Topic{
		Name:              topicPath,
		PartitionCount:    partitionCount,
		ReplicationFactor: replicationFactor,
		Configs:           configs,
	}

	req := &managedkafkapb.CreateTopicRequest{
		Parent:  clusterPath,
		TopicId: topicID,
		Topic:   topicConfig,
	}
	topic, err := client.CreateTopic(ctx, req)
	if err != nil {
		return fmt.Errorf("client.CreateTopic got err: %w", err)
	}
	fmt.Fprintf(w, "Created topic: %s\n", topic.Name)
	return nil
}

Java

Antes de testar este exemplo, siga as instruções de configuração do Java em Instalar as bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Java do Serviço Gerenciado para Apache Kafka.

Para autenticar o Serviço Gerenciado para Apache Kafka, configure o Application Default Credentials. Para mais informações, consulte Configurar o ADC para um ambiente de desenvolvimento local.

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.managedkafka.v1.ClusterName;
import com.google.cloud.managedkafka.v1.CreateTopicRequest;
import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
import com.google.cloud.managedkafka.v1.Topic;
import com.google.cloud.managedkafka.v1.TopicName;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class CreateTopic {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String clusterId = "my-cluster";
    String topicId = "my-topic";
    int partitionCount = 100;
    int replicationFactor = 3;
    Map<String, String> configs =
        new HashMap<String, String>() {
          {
            put("min.insync.replicas", "2");
          }
        };
    createTopic(projectId, region, clusterId, topicId, partitionCount, replicationFactor, configs);
  }

  public static void createTopic(
      String projectId,
      String region,
      String clusterId,
      String topicId,
      int partitionCount,
      int replicationFactor,
      Map<String, String> configs)
      throws Exception {
    Topic topic =
        Topic.newBuilder()
            .setName(TopicName.of(projectId, region, clusterId, topicId).toString())
            .setPartitionCount(partitionCount)
            .setReplicationFactor(replicationFactor)
            .putAllConfigs(configs)
            .build();
    try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) {
      CreateTopicRequest request =
          CreateTopicRequest.newBuilder()
              .setParent(ClusterName.of(projectId, region, clusterId).toString())
              .setTopicId(topicId)
              .setTopic(topic)
              .build();
      // This operation is being handled synchronously.
      Topic response = managedKafkaClient.createTopic(request);
      System.out.printf("Created topic: %s\n", response.getName());
    } catch (IOException | ApiException e) {
      System.err.printf("managedKafkaClient.createTopic got err: %s", e.getMessage());
    }
  }
}

Python

Antes de testar este exemplo, siga as instruções de configuração do Python em Instalar as bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Python do Serviço Gerenciado para Apache Kafka.

Para autenticar o Serviço Gerenciado para Apache Kafka, configure o Application Default Credentials. Para mais informações, consulte Configurar o ADC para um ambiente de desenvolvimento local.

from google.api_core.exceptions import AlreadyExists
from google.cloud import managedkafka_v1

# TODO(developer)
# project_id = "my-project-id"
# region = "us-central1"
# cluster_id = "my-cluster"
# topic_id = "my-topic"
# partition_count = 10
# replication_factor = 3
# configs = {"min.insync.replicas": "1"}

client = managedkafka_v1.ManagedKafkaClient()

topic = managedkafka_v1.Topic()
topic.name = client.topic_path(project_id, region, cluster_id, topic_id)
topic.partition_count = partition_count
topic.replication_factor = replication_factor
# For a list of configs, one can check https://kafka.apache.org/documentation/#topicconfigs
topic.configs = configs

request = managedkafka_v1.CreateTopicRequest(
    parent=client.cluster_path(project_id, region, cluster_id),
    topic_id=topic_id,
    topic=topic,
)

try:
    response = client.create_topic(request=request)
    print("Created topic:", response.name)
except AlreadyExists as e:
    print(f"Failed to create topic {topic.name} with error: {e.message}")

A seguir