Criar um tópico do Serviço gerenciado do Google Cloud para Apache Kafka

No Serviço gerenciado para Apache Kafka, as mensagens são organizadas em tópicos. Um tópico é composto de partições. Uma partição é uma sequência ordenada e imutável de registros pertencentes a um único agente em um cluster do Kafka. É necessário criar um tópico para publicar ou consumir mensagens.

Para criar um tópico, use o console Google Cloud , a Google Cloud CLI, a biblioteca de cliente, a API Managed Kafka ou as APIs Apache Kafka de código aberto.

Antes de começar

Primeiro, crie um cluster antes de criar um tópico. Verifique se você configurou o seguinte:

Papéis e permissões necessários para criar um tópico

Para receber as permissões necessárias para criar um tópico, peça ao administrador para conceder a você o papel do IAM de Editor de tópicos gerenciados do Kafka (roles/managedkafka.topicEditor) no projeto. Para mais informações sobre a concessão de papéis, consulte Gerenciar o acesso a projetos, pastas e organizações.

Esse papel predefinido contém as permissões necessárias para criar um tópico. Para acessar as permissões exatas necessárias, expanda a seção Permissões necessárias:

Permissões necessárias

As seguintes permissões são necessárias para criar um tópico:

  • Crie um tópico: managedkafka.topics.create

Essas permissões também podem ser concedidas com funções personalizadas ou outros papéis predefinidos.

O papel de editor de tópico do Kafka gerenciado também contém o papel de leitor do Kafka gerenciado. Para mais informações sobre essa função, consulte Funções predefinidas do Managed Service para Apache Kafka.

Propriedades de um tópico do serviço gerenciado para Apache Kafka

Ao criar ou atualizar um tópico do Serviço gerenciado para Apache Kafka, é necessário especificar as seguintes propriedades.

Nome do tópico

O nome do tópico do Serviço gerenciado para Apache Kafka que você está criando. Para conferir as diretrizes de nomeação de um tópico, consulte Diretrizes de nomeação de um recurso do Serviço gerenciado para Apache Kafka. O nome de um tópico é imutável.

Contagem de partições

O número de partições no tópico. É possível editar um tópico para aumentar a contagem de partições, mas não diminuir. Aumentar o número de partições de um tópico que usa uma chave pode mudar a forma como as mensagens são distribuídas.

Fator de replicação

O número de réplicas para cada partição. Se você não especificar o valor, será usado o fator de replicação padrão do cluster.

Um fator de replicação maior pode melhorar a consistência dos dados em caso de falhas do broker, já que os dados são replicados para vários brokers. Para ambientes de produção, recomendamos um fator de replicação de 3 ou mais. Contagens de réplicas mais altas aumentam os custos de armazenamento local e transferência de dados para o tópico. No entanto, eles não aumentam os custos de armazenamento permanente. O fator de replicação não pode exceder o número de brokers disponíveis.

Outros parâmetros

Você também pode definir outros parâmetros de configuração no nível do tópico do Apache Kafka. Eles são especificados como pares key=value que substituem os padrões do cluster.

As configurações relacionadas a tópicos têm um padrão de servidor e uma substituição opcional por tópico. O formato é uma lista separada por vírgulas de pares KEY=VALUE, em que KEY é o nome da propriedade de configuração do tópico do Kafka e VALUE é a configuração necessária.Esses pares de chave-valor ajudam a substituir os padrões do cluster. Por exemplo, flush.ms=10 e compression.type=producer.

Para uma lista com todas as configurações de nível do tópico aceitas, consulte Configurações de nível do tópico na documentação do Apache Kafka.

Criar um tópico

Antes de criar um tópico, revise as propriedades dele.

Console

  1. No Google Cloud console, acesse a página Clusters.

    Acessar Clusters

  2. Clique no cluster para o qual você quer criar um tópico.

    A página Detalhes do cluster é aberta.

  3. Na página de detalhes do cluster, clique em Criar tópico.

    A página Criar tópico do Kafka é aberta.

  4. Em Nome do tópico, insira uma string.

  5. Em Contagem de partições, insira o número de partições desejado ou mantenha o valor padrão.

  6. Em Fator de replicação, insira o fator desejado ou mantenha o valor padrão.

  7. (Opcional) Para alterar as configurações de um tema, adicione-as como pares de chave-valor separados por vírgula no campo Configurações.

  8. Clique em Criar.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Execute o comando gcloud managed-kafka topics create:

    gcloud managed-kafka topics create TOPIC_ID \
        --cluster=CLUSTER_ID --location=LOCATION_ID \
        --partitions=PARTITIONS \
        --replication-factor=REPLICATION_FACTOR \
        --configs=CONFIGS
    

    Substitua:

    • TOPIC_ID: o nome do tema.
    • CLUSTER: o nome do cluster em que você quer criar o tópico.
    • LOCATION: a região do cluster.
    • PARTITIONS: o número de partições do tópico.
    • REPLICATION_FACTOR: o fator de replicação do tema.
    • CONFIGS: parâmetros opcionais no nível do tópico. Especifique como pares de chave-valor separados por vírgulas. Por exemplo, compression.type=producer.
  3. CLI do Kafka

    Antes de executar esse comando, instale as ferramentas de linha de comando do Kafka em uma VM do Compute Engine. A VM precisa conseguir acessar uma sub-rede conectada ao cluster do Managed Service para Apache Kafka. Siga as instruções em Produzir e consumir mensagens com as ferramentas de linha de comando do Kafka.

    Execute o comando kafka-topics.sh da seguinte maneira:

    kafka-topics.sh --create --if-not-exists \
      --bootstrap-server=BOOTSTRAP_ADDRESS \
      --command-config client.properties \
      --topic TOPIC_ID \
      --partitions PARTITIONS \
      --replication-factor REPLICATION_FACTOR
    

    Substitua:

    • BOOTSTRAP_ADDRESS: o endereço de inicialização do cluster do Serviço Gerenciado para Apache Kafka.

    • TOPIC_ID: o nome do tema.

    • PARTITIONS: o número de partições do tópico.

    • REPLICATION_FACTOR: o fator de replicação do tópico.

    REST

    Antes de usar os dados da solicitação abaixo, faça as substituições a seguir:

    • PROJECT_ID: o ID do projeto do Google Cloud
    • LOCATION: o local do cluster
    • CLUSTER_ID: o ID do cluster
    • TOPIC_ID: o ID do tópico
    • PARTITION_COUNT: o número de partições para o tópico
    • REPLICATION_FACTOR: o número de réplicas de cada partição.

    Método HTTP e URL:

    POST https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics?topicId=TOPIC_ID

    Corpo JSON da solicitação:

    {
      "name": "TOPIC_ID",
      "partitionCount": PARTITION_COUNT,
      "replicationFactor": REPLICATION_FACTOR
    }
    

    Para enviar a solicitação, expanda uma destas opções:

    Você receberá uma resposta JSON semelhante a esta:

    {
      "name": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC_ID",
      "partitionCount": PARTITION_COUNT,
      "replicationFactor": REPLICATION_FACTOR
    }
    

    Terraform

    É possível usar um recurso do Terraform para criar um tópico.

    resource "google_managed_kafka_topic" "default" {
      project            = data.google_project.default.project_id # Replace this with your project ID in quotes
      topic_id           = "my-topic-id"
      cluster            = google_managed_kafka_cluster.default.cluster_id
      location           = "us-central1"
      partition_count    = 2
      replication_factor = 3
    }

    Para saber como aplicar ou remover uma configuração do Terraform, consulte Comandos básicos do Terraform.

    Go

    Antes de testar esta amostra, siga as instruções de configuração do Go em Instalar as bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Go do serviço gerenciado para Apache Kafka.

    Para autenticar o Managed Service para Apache Kafka, configure o Application Default Credentials(ADC). Para mais informações, consulte Configurar o ADC para um ambiente de desenvolvimento local.

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func createTopic(w io.Writer, projectID, region, clusterID, topicID string, partitionCount, replicationFactor int32, configs map[string]string, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// clusterID := "my-cluster"
    	// topicID := "my-topic"
    	// partitionCount := 10
    	// replicationFactor := 3
    	// configs := map[string]string{"min.insync.replicas":"1"}
    	ctx := context.Background()
    	client, err := managedkafka.NewClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
    	}
    	defer client.Close()
    
    	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
    	topicPath := fmt.Sprintf("%s/topics/%s", clusterPath, topicID)
    	topicConfig := &managedkafkapb.Topic{
    		Name:              topicPath,
    		PartitionCount:    partitionCount,
    		ReplicationFactor: replicationFactor,
    		Configs:           configs,
    	}
    
    	req := &managedkafkapb.CreateTopicRequest{
    		Parent:  clusterPath,
    		TopicId: topicID,
    		Topic:   topicConfig,
    	}
    	topic, err := client.CreateTopic(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.CreateTopic got err: %w", err)
    	}
    	fmt.Fprintf(w, "Created topic: %s\n", topic.Name)
    	return nil
    }
    

    Java

    Antes de testar esta amostra, siga as instruções de configuração do Java em Instalar as bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Java do serviço gerenciado para Apache Kafka.

    Para autenticar o serviço gerenciado para Apache Kafka, configure o Application Default Credentials. Para mais informações, consulte Configurar o ADC para um ambiente de desenvolvimento local.

    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ClusterName;
    import com.google.cloud.managedkafka.v1.CreateTopicRequest;
    import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
    import com.google.cloud.managedkafka.v1.Topic;
    import com.google.cloud.managedkafka.v1.TopicName;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    public class CreateTopic {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-cluster";
        String topicId = "my-topic";
        int partitionCount = 100;
        int replicationFactor = 3;
        Map<String, String> configs =
            new HashMap<String, String>() {
              {
                put("min.insync.replicas", "2");
              }
            };
        createTopic(projectId, region, clusterId, topicId, partitionCount, replicationFactor, configs);
      }
    
      public static void createTopic(
          String projectId,
          String region,
          String clusterId,
          String topicId,
          int partitionCount,
          int replicationFactor,
          Map<String, String> configs)
          throws Exception {
        Topic topic =
            Topic.newBuilder()
                .setName(TopicName.of(projectId, region, clusterId, topicId).toString())
                .setPartitionCount(partitionCount)
                .setReplicationFactor(replicationFactor)
                .putAllConfigs(configs)
                .build();
        try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) {
          CreateTopicRequest request =
              CreateTopicRequest.newBuilder()
                  .setParent(ClusterName.of(projectId, region, clusterId).toString())
                  .setTopicId(topicId)
                  .setTopic(topic)
                  .build();
          // This operation is being handled synchronously.
          Topic response = managedKafkaClient.createTopic(request);
          System.out.printf("Created topic: %s\n", response.getName());
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaClient.createTopic got err: %s", e.getMessage());
        }
      }
    }
    

    Python

    Antes de testar esta amostra, siga as instruções de configuração do Python em Instalar as bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Python do serviço gerenciado para Apache Kafka.

    Para autenticar o serviço gerenciado para Apache Kafka, configure o Application Default Credentials. Para mais informações, consulte Configurar o ADC para um ambiente de desenvolvimento local.

    from google.api_core.exceptions import AlreadyExists
    from google.cloud import managedkafka_v1
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # cluster_id = "my-cluster"
    # topic_id = "my-topic"
    # partition_count = 10
    # replication_factor = 3
    # configs = {"min.insync.replicas": "1"}
    
    client = managedkafka_v1.ManagedKafkaClient()
    
    topic = managedkafka_v1.Topic()
    topic.name = client.topic_path(project_id, region, cluster_id, topic_id)
    topic.partition_count = partition_count
    topic.replication_factor = replication_factor
    # For a list of configs, one can check https://kafka.apache.org/documentation/#topicconfigs
    topic.configs = configs
    
    request = managedkafka_v1.CreateTopicRequest(
        parent=client.cluster_path(project_id, region, cluster_id),
        topic_id=topic_id,
        topic=topic,
    )
    
    try:
        response = client.create_topic(request=request)
        print("Created topic:", response.name)
    except AlreadyExists as e:
        print(f"Failed to create topic {topic.name} with error: {e.message}")
    

A seguir