Atualizar um tópico do serviço gerenciado do Google Cloud para Apache Kafka

Depois de criar um tópico, é possível editar a configuração dele para atualizar estas propriedades: o número de partições e as configurações de tópico que não são padrão para as propriedades já definidas no nível do cluster. Só é possível aumentar o número de partições, não diminuir.

Para atualizar um único tópico, use o console Google Cloud , a Google Cloud CLI, a biblioteca de cliente, a API Managed Kafka ou as APIs de código aberto Apache Kafka.

Papéis e permissões necessários para editar um tópico

Para receber as permissões necessárias para editar um tópico, peça ao administrador para conceder a você o papel do IAM de Editor de tópicos gerenciados do Kafka(roles/managedkafka.topicEditor) no projeto. Para mais informações sobre a concessão de papéis, consulte Gerenciar o acesso a projetos, pastas e organizações.

Esse papel predefinido contém as permissões necessárias para editar um tópico. Para acessar as permissões exatas necessárias, expanda a seção Permissões necessárias:

Permissões necessárias

As seguintes permissões são necessárias para editar um tópico:

  • Atualizar um tema: managedkafka.topics.update

Essas permissões também podem ser concedidas com funções personalizadas ou outros papéis predefinidos.

Para mais informações sobre essa função, consulte Funções predefinidas do Managed Service para Apache Kafka.

Editar um tópico

Para editar um tópico, siga estas etapas:

Console

  1. No Google Cloud console, acesse a página Clusters.

    Acessar Clusters

    Os clusters criados em um projeto são listados.

  2. Clique no cluster a que pertence o tópico que você quer editar.

    A página Detalhes do cluster é aberta. Na página de detalhes do cluster, na guia Recursos, os tópicos são listados.

  3. Clique no tópico que você quer editar.

    A página Detalhes do tópico é aberta.

  4. Para fazer edições, clique em Editar.

  5. Clique em Salvar depois de fazer as mudanças.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Execute o comando gcloud managed-kafka topics update:

    gcloud managed-kafka topics update TOPIC_ID \
      --cluster=CLUSTER_ID \
      --location=LOCATION_ID \
      --partitions=PARTITIONS \
      --configs=CONFIGS
    

    Esse comando modifica a configuração de um tópico existente no cluster especificado do Serviço gerenciado para Apache Kafka. Use esse comando para aumentar o número de partições e atualizar as configurações de configuração no nível do tópico.

    Substitua:

    • TOPIC_ID: o ID do tópico.
    • CLUSTER_ID: o ID do cluster que contém o tópico.
    • LOCATION_ID: o local do cluster.
    • PARTITIONS: opcional: o número atualizado de partições para o tópico. Só é possível aumentar o número de partições, não diminuir.
    • CONFIGS: opcional: uma lista de configurações a serem atualizadas. Especifique como uma lista separada por vírgulas de pares de chave-valor. Por exemplo, retention.ms=3600000,retention.bytes=10000000.
  3. REST

    Antes de usar os dados da solicitação abaixo, faça as substituições a seguir:

    • PROJECT_ID: o ID do projeto do Google Cloud
    • LOCATION: o local do cluster
    • CLUSTER_ID: o ID do cluster
    • TOPIC_ID: o ID do tópico
    • UPDATE_MASK: quais campos atualizar, como uma lista separada por vírgulas de nomes totalmente qualificados. Exemplo: partitionCount
    • PARTITION_COUNT: o número atualizado de partições para o tópico

    Método HTTP e URL:

    PATCH https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC_ID?updateMask=UPDATE_MASK

    Corpo JSON da solicitação:

    {
      "name": "TOPIC_ID",
      "partitionCount": PARTITION_COUNT
    }
    

    Para enviar a solicitação, expanda uma destas opções:

    Você receberá uma resposta JSON semelhante a esta:

    {
      "name": "projects/PROJECT_ID/locations/LOCATION/operations/OPERATION_ID",
      "metadata": {
        "@type": "type.googleapis.com/google.cloud.managedkafka.v1.OperationMetadata",
        "createTime": "CREATE_TIME",
        "target": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID",
        "verb": "update",
        "requestedCancellation": false,
        "apiVersion": "v1"
      },
      "done": false
    }
    

    Go

    Antes de testar esta amostra, siga as instruções de configuração do Go em Instalar as bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Go do serviço gerenciado para Apache Kafka.

    Para autenticar o Managed Service para Apache Kafka, configure o Application Default Credentials(ADC). Para mais informações, consulte Configurar o ADC para um ambiente de desenvolvimento local.

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    	"google.golang.org/protobuf/types/known/fieldmaskpb"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func updateTopic(w io.Writer, projectID, region, clusterID, topicID string, partitionCount int32, configs map[string]string, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// clusterID := "my-cluster"
    	// topicID := "my-topic"
    	// partitionCount := 20
    	// configs := map[string]string{"min.insync.replicas":"1"}
    	ctx := context.Background()
    	client, err := managedkafka.NewClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
    	}
    	defer client.Close()
    
    	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
    	topicPath := fmt.Sprintf("%s/topics/%s", clusterPath, topicID)
    	TopicConfig := managedkafkapb.Topic{
    		Name:           topicPath,
    		PartitionCount: partitionCount,
    		Configs:        configs,
    	}
    	paths := []string{"partition_count", "configs"}
    	updateMask := &fieldmaskpb.FieldMask{
    		Paths: paths,
    	}
    
    	req := &managedkafkapb.UpdateTopicRequest{
    		UpdateMask: updateMask,
    		Topic:      &TopicConfig,
    	}
    	topic, err := client.UpdateTopic(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.UpdateTopic got err: %w", err)
    	}
    	fmt.Fprintf(w, "Updated topic: %#v\n", topic)
    	return nil
    }
    

    Java

    Antes de testar esta amostra, siga as instruções de configuração do Java em Instalar as bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Java do serviço gerenciado para Apache Kafka.

    Para autenticar o serviço gerenciado para Apache Kafka, configure o Application Default Credentials. Para mais informações, consulte Configurar o ADC para um ambiente de desenvolvimento local.

    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
    import com.google.cloud.managedkafka.v1.Topic;
    import com.google.cloud.managedkafka.v1.TopicName;
    import com.google.cloud.managedkafka.v1.UpdateTopicRequest;
    import com.google.protobuf.FieldMask;
    import java.io.IOException;
    import java.util.Arrays;
    import java.util.HashMap;
    import java.util.Map;
    
    public class UpdateTopic {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-cluster";
        String topicId = "my-topic";
        int partitionCount = 200;
        Map<String, String> configs =
            new HashMap<String, String>() {
              {
                put("min.insync.replicas", "1");
              }
            };
        updateTopic(projectId, region, clusterId, topicId, partitionCount, configs);
      }
    
      public static void updateTopic(
          String projectId,
          String region,
          String clusterId,
          String topicId,
          int partitionCount,
          Map<String, String> configs)
          throws Exception {
        Topic topic =
            Topic.newBuilder()
                .setName(TopicName.of(projectId, region, clusterId, topicId).toString())
                .setPartitionCount(partitionCount)
                .putAllConfigs(configs)
                .build();
        String[] paths = {"partition_count", "configs"};
        FieldMask updateMask = FieldMask.newBuilder().addAllPaths(Arrays.asList(paths)).build();
        try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) {
          UpdateTopicRequest request =
              UpdateTopicRequest.newBuilder().setUpdateMask(updateMask).setTopic(topic).build();
          // This operation is being handled synchronously.
          Topic response = managedKafkaClient.updateTopic(request);
          System.out.printf("Updated topic: %s\n", response.getName());
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaClient.updateCluster got err: %s", e.getMessage());
        }
      }
    }
    

    Python

    Antes de testar esta amostra, siga as instruções de configuração do Python em Instalar as bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Python do serviço gerenciado para Apache Kafka.

    Para autenticar o serviço gerenciado para Apache Kafka, configure o Application Default Credentials. Para mais informações, consulte Configurar o ADC para um ambiente de desenvolvimento local.

    from google.api_core.exceptions import NotFound
    from google.cloud import managedkafka_v1
    from google.protobuf import field_mask_pb2
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # cluster_id = "my-cluster"
    # topic_id = "my-topic"
    # partition_count = 20
    # configs = {"min.insync.replicas": "1"}
    
    client = managedkafka_v1.ManagedKafkaClient()
    
    topic = managedkafka_v1.Topic()
    topic.name = client.topic_path(project_id, region, cluster_id, topic_id)
    topic.partition_count = partition_count
    topic.configs = configs
    update_mask = field_mask_pb2.FieldMask()
    update_mask.paths.extend(["partition_count", "configs"])
    
    # For a list of editable fields, one can check https://cloud.google.com/managed-kafka/docs/create-topic#properties.
    request = managedkafka_v1.UpdateTopicRequest(
        update_mask=update_mask,
        topic=topic,
    )
    
    try:
        response = client.update_topic(request=request)
        print("Updated topic:", response)
    except NotFound as e:
        print(f"Failed to update topic {topic_id} with error: {e.message}")
    

Configurar a retenção de mensagens

O Kafka armazena mensagens em arquivos de segmento de registro. Por padrão, o Kafka exclui arquivos de segmento após um período de armazenamento ou quando uma partição excede um limite de tamanho de dados. É possível mudar esse comportamento ativando a compactação de registros. Se a compactação de registros estiver ativada, o Kafka manterá apenas o valor mais recente de cada chave.

O Google Cloud Managed Service para Apache Kafka usa o armazenamento em camadas, o que significa que os segmentos de registros concluídos são armazenados remotamente, em vez de no armazenamento local. Para saber mais sobre armazenamento em camadas, consulte Armazenamento em camadas na documentação do Apache Kafka.

Definir os valores de retenção

Se a compactação de registros não estiver ativada, as seguintes configurações vão controlar como o Kafka armazena arquivos de segmento de registro:

  • retention.ms: o período máximo para salvar arquivos de segmento, em milissegundos.
  • retention.bytes: o número máximo de bytes a serem armazenados por partição. Se os dados em uma partição excederem esse valor, o Kafka vai descartar os arquivos de segmento mais antigos.

Para atualizar essas configurações, use a CLI gcloud ou a CLI do Kafka:

gcloud

Para definir a retenção de mensagens, execute o comando gcloud managed-kafka topics update.

gcloud managed-kafka topics update TOPIC_ID \
  --cluster=CLUSTER_ID \
  --location=LOCATION_ID \
  --configs=retention.ms=RETENTION_PERIOD,retention.bytes=MAX_BYTES

Substitua:

  • TOPIC_ID: o ID do tópico.
  • CLUSTER_ID: o ID do cluster que contém o tópico.
  • LOCATION_ID: o local do cluster.
  • RETENTION_PERIOD: o período máximo para armazenar arquivos de segmento, em milissegundos.
  • MAX_BYTES: o número máximo de bytes a serem armazenados por partição.

CLI do Kafka

Antes de executar esse comando, instale as ferramentas de linha de comando do Kafka em uma VM do Compute Engine. A VM precisa conseguir acessar uma sub-rede conectada ao cluster do Managed Service para Apache Kafka. Siga as instruções em Produzir e consumir mensagens com as ferramentas de linha de comando do Kafka.

Execute o comando kafka-configs.sh:

kafka-configs.sh --alter \
  --bootstrap-server=BOOTSTRAP_ADDRESS \
  --command-config client.properties \
  --entity-type topics \
  --entity-name TOPIC_ID \
  --add-config retention.ms=RETENTION_PERIOD,retention.bytes=MAX_BYTES

Substitua:

  • BOOTSTRAP_ADDRESS: o endereço de inicialização do cluster do Serviço Gerenciado para Apache Kafka.
  • TOPIC_ID: o ID do tópico.
  • RETENTION_PERIOD: o período máximo para armazenar arquivos de segmento, em milissegundos.
  • MAX_BYTES: o número máximo de bytes a serem armazenados por partição.

Ativar a compactação de registros

Se a compactação de registros estiver ativada, o Kafka vai armazenar apenas a mensagem mais recente de cada chave. A compactação de registros fica desativada por padrão. Para ativar a compactação de registros em um tópico, defina a configuração cleanup.policy como "compact", da seguinte maneira:

gcloud

Execute o comando gcloud managed-kafka topics update.

gcloud managed-kafka topics update TOPIC_ID \
  --cluster=CLUSTER_ID \
  --location=LOCATION_ID \
  --configs=cleanup.policy=compact

Substitua:

  • TOPIC_ID: o ID do tópico.
  • CLUSTER_ID: o ID do cluster que contém o tópico.
  • LOCATION_ID: o local do cluster.

CLI do Kafka

Antes de executar esse comando, instale as ferramentas de linha de comando do Kafka em uma VM do Compute Engine. A VM precisa conseguir acessar uma sub-rede conectada ao cluster do Managed Service para Apache Kafka. Siga as instruções em Produzir e consumir mensagens com as ferramentas de linha de comando do Kafka.

Execute o comando kafka-configs.sh:

kafka-configs.sh --alter \
  --bootstrap-server=BOOTSTRAP_ADDRESS \
  --command-config client.properties \
  --entity-type topics \
  --entity-name TOPIC_ID \
  --add-config cleanup.policy=compact

Substitua:

Limitações

  • Não é possível substituir as configurações de tópicos para armazenamento remoto, como remote.storage.enable.

  • Não é possível substituir as configurações de tópico para arquivos de segmento de registro, como segment.bytes.

  • Ao ativar a compactação de registros para um tópico, o armazenamento em camadas desse tópico é desativado implicitamente. Todos os arquivos de registros do tópico são armazenados localmente.

A seguir