Atualizar um tópico do serviço gerenciado do Google Cloud para Apache Kafka

Depois que um tópico é criado, é possível editar a configuração dele para atualizar estas propriedades: o número de partições e as configurações de tópico que não são padrão para as propriedades já definidas no nível do cluster. Só é possível aumentar o número de partições, não diminuir.

Para atualizar um único tópico, use o Google Cloud console, a Google Cloud CLI, a biblioteca de cliente, a API Managed Kafka ou as APIs Apache Kafka de código aberto.

Papéis e permissões necessários para editar um tópico

Para receber as permissões necessárias para editar um tópico, peça ao administrador para conceder a você o papel do IAM de Editor de tópicos do Managed Kafka(roles/managedkafka.topicEditor) no seu projeto. Para mais informações sobre a concessão de papéis, consulte Gerenciar o acesso a projetos, pastas e organizações.

Esse papel predefinido contém as permissões necessárias para editar um tópico. Para acessar as permissões exatas que são necessárias, expanda a seção Permissões necessárias:

Permissões necessárias

As permissões a seguir são necessárias para editar um tópico:

  • Atualizar um tópico: managedkafka.topics.update

Essas permissões também podem ser concedidas com funções personalizadas ou outros papéis predefinidos.

Editar um tópico

Para editar um tópico, siga estas etapas:

Console

  1. No Google Cloud console, acesse a página Clusters.

    Acessar Clusters

    Os clusters criados em um projeto são listados.

  2. Clique no cluster a que pertence o tópico que você quer editar.

    A página Detalhes do cluster é aberta. Na página de detalhes do cluster, na guia Recursos, os tópicos são listados.

  3. Clique no tópico que você quer editar.

    A página Detalhes do tópico é aberta.

  4. Para fazer edições, clique em Editar.

  5. Clique em Salvar após as mudanças.

gcloud

  1. No Google Cloud console, ative o Cloud Shell.

    Ativar o Cloud Shell

    Na parte de baixo do Google Cloud console, uma sessão do Cloud Shell é iniciada e exibe um prompt de linha de comando. O Cloud Shell é um ambiente shell com a Google Cloud CLI já instalada e com valores já definidos para o projeto atual. A inicialização da sessão pode levar alguns segundos.

  2. Execute o gcloud managed-kafka topics update comando:

    gcloud managed-kafka topics update TOPIC_ID \
      --cluster=CLUSTER_ID \
      --location=LOCATION_ID \
      --partitions=PARTITIONS \
      --configs=CONFIGS
    

    Esse comando modifica a configuração de um tópico atual no cluster especificado do Serviço Gerenciado para Apache Kafka. É possível usar esse comando para aumentar o número de partições e atualizar as configurações de configuração no nível do tópico.

    Substitua:

    • TOPIC_ID: o ID do tópico.
    • CLUSTER_ID: o ID do cluster que contém o tópico.
    • LOCATION_ID: o local do cluster.
    • PARTITIONS: opcional: o número atualizado de partições do tópico. Só é possível aumentar o número de partições, não diminuir.
    • CONFIGS: opcional: uma lista de configurações a serem atualizadas. Especifique como uma lista separada por vírgulas de pares de chave-valor. Por exemplo, retention.ms=3600000,retention.bytes=10000000.

REST

Antes de usar os dados da solicitação abaixo, faça as substituições a seguir:

  • PROJECT_ID: seu Google Cloud ID do projeto
  • LOCATION: o local do cluster
  • CLUSTER_ID: o ID do cluster
  • TOPIC_ID: o ID do tópico
  • UPDATE_MASK: quais campos atualizar, como uma lista separada por vírgulas de nomes totalmente qualificados. Exemplo: partitionCount
  • PARTITION_COUNT: o número atualizado de partições do tópico

Método HTTP e URL:

PATCH https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC_ID?updateMask=UPDATE_MASK

Corpo JSON da solicitação:

{
  "name": "TOPIC_ID",
  "partitionCount": PARTITION_COUNT
}

Para enviar a solicitação, expanda uma destas opções:

Você receberá uma resposta JSON semelhante a esta:

{
  "name": "projects/PROJECT_ID/locations/LOCATION/operations/OPERATION_ID",
  "metadata": {
    "@type": "type.googleapis.com/google.cloud.managedkafka.v1.OperationMetadata",
    "createTime": "CREATE_TIME",
    "target": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID",
    "verb": "update",
    "requestedCancellation": false,
    "apiVersion": "v1"
  },
  "done": false
}

Go

Antes de testar este exemplo, siga as instruções de configuração do Go em Instalar as bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Serviço Gerenciado para Apache Kafka Go.

Para autenticar no Serviço Gerenciado do Google Cloud para Apache Kafka, configure o Application Default Credentials(ADC). Para mais informações, consulte Configurar o ADC para um ambiente de desenvolvimento local.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/option"
	"google.golang.org/protobuf/types/known/fieldmaskpb"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
)

func updateTopic(w io.Writer, projectID, region, clusterID, topicID string, partitionCount int32, configs map[string]string, opts ...option.ClientOption) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// clusterID := "my-cluster"
	// topicID := "my-topic"
	// partitionCount := 20
	// configs := map[string]string{"min.insync.replicas":"1"}
	ctx := context.Background()
	client, err := managedkafka.NewClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
	}
	defer client.Close()

	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
	topicPath := fmt.Sprintf("%s/topics/%s", clusterPath, topicID)
	TopicConfig := managedkafkapb.Topic{
		Name:           topicPath,
		PartitionCount: partitionCount,
		Configs:        configs,
	}
	paths := []string{"partition_count", "configs"}
	updateMask := &fieldmaskpb.FieldMask{
		Paths: paths,
	}

	req := &managedkafkapb.UpdateTopicRequest{
		UpdateMask: updateMask,
		Topic:      &TopicConfig,
	}
	topic, err := client.UpdateTopic(ctx, req)
	if err != nil {
		return fmt.Errorf("client.UpdateTopic got err: %w", err)
	}
	fmt.Fprintf(w, "Updated topic: %#v\n", topic)
	return nil
}

Java

Antes de testar este exemplo, siga as instruções de configuração do Java em Instalar as bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API do Serviço Gerenciado para Apache Kafka Java.

Para autenticar no Serviço Gerenciado para Apache Kafka, configure as Application Default Credentials. Para mais informações, consulte Configurar o ADC para um ambiente de desenvolvimento local.

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
import com.google.cloud.managedkafka.v1.Topic;
import com.google.cloud.managedkafka.v1.TopicName;
import com.google.cloud.managedkafka.v1.UpdateTopicRequest;
import com.google.protobuf.FieldMask;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

public class UpdateTopic {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String clusterId = "my-cluster";
    String topicId = "my-topic";
    int partitionCount = 200;
    Map<String, String> configs =
        new HashMap<String, String>() {
          {
            put("min.insync.replicas", "1");
          }
        };
    updateTopic(projectId, region, clusterId, topicId, partitionCount, configs);
  }

  public static void updateTopic(
      String projectId,
      String region,
      String clusterId,
      String topicId,
      int partitionCount,
      Map<String, String> configs)
      throws Exception {
    Topic topic =
        Topic.newBuilder()
            .setName(TopicName.of(projectId, region, clusterId, topicId).toString())
            .setPartitionCount(partitionCount)
            .putAllConfigs(configs)
            .build();
    String[] paths = {"partition_count", "configs"};
    FieldMask updateMask = FieldMask.newBuilder().addAllPaths(Arrays.asList(paths)).build();
    try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) {
      UpdateTopicRequest request =
          UpdateTopicRequest.newBuilder().setUpdateMask(updateMask).setTopic(topic).build();
      // This operation is being handled synchronously.
      Topic response = managedKafkaClient.updateTopic(request);
      System.out.printf("Updated topic: %s\n", response.getName());
    } catch (IOException | ApiException e) {
      System.err.printf("managedKafkaClient.updateCluster got err: %s", e.getMessage());
    }
  }
}

Python

Antes de testar este exemplo, siga as instruções de configuração do Python em Instalar as bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Managed Service for Apache Kafka Python.

Para autenticar no Serviço Gerenciado para Apache Kafka, configure as Application Default Credentials. Para mais informações, consulte Configurar o ADC para um ambiente de desenvolvimento local.

from google.api_core.exceptions import NotFound
from google.cloud import managedkafka_v1
from google.protobuf import field_mask_pb2

# TODO(developer)
# project_id = "my-project-id"
# region = "us-central1"
# cluster_id = "my-cluster"
# topic_id = "my-topic"
# partition_count = 20
# configs = {"min.insync.replicas": "1"}

client = managedkafka_v1.ManagedKafkaClient()

topic = managedkafka_v1.Topic()
topic.name = client.topic_path(project_id, region, cluster_id, topic_id)
topic.partition_count = partition_count
topic.configs = configs
update_mask = field_mask_pb2.FieldMask()
update_mask.paths.extend(["partition_count", "configs"])

# For a list of editable fields, one can check https://cloud.google.com/managed-kafka/docs/create-topic#properties.
request = managedkafka_v1.UpdateTopicRequest(
    update_mask=update_mask,
    topic=topic,
)

try:
    response = client.update_topic(request=request)
    print("Updated topic:", response)
except NotFound as e:
    print(f"Failed to update topic {topic_id} with error: {e.message}")

Configurar a retenção de mensagens

O Kafka armazena mensagens em arquivos de segmento de registro. Por padrão, o Kafka exclui arquivos de segmento após um período de armazenamento ou quando uma partição excede um limite de tamanho de dados. É possível mudar esse comportamento ativando a compactação de registros . Se a compactação de registros estiver ativada, o Kafka manterá apenas o valor mais recente de cada chave.

O Serviço Gerenciado do Google Cloud para Apache Kafka usa o armazenamento em camadas, o que significa que os segmentos de registro concluídos são armazenados remotamente, em vez de armazenamento local. Para saber mais sobre o armazenamento em camadas, consulte Armazenamento em camadas na documentação do Apache Kafka.

Definir os valores de retenção

Se a compactação de registros não estiver ativada, as configurações a seguir controlarão como o Kafka armazena arquivos de segmento de registro:

  • retention.ms: o período máximo de tempo para salvar arquivos de segmento, em milissegundos.
  • retention.bytes: o número máximo de bytes a serem armazenados por partição. Se os dados em uma partição excederem esse valor, o Kafka descartará arquivos de segmento mais antigos.

Para atualizar essas configurações, use a CLI gcloud ou a CLI Kafka:

gcloud

Para definir a retenção de mensagens, execute o gcloud managed-kafka topics update comando.

gcloud managed-kafka topics update TOPIC_ID \
  --cluster=CLUSTER_ID \
  --location=LOCATION_ID \
  --configs=retention.ms=RETENTION_PERIOD,retention.bytes=MAX_BYTES

Substitua:

  • TOPIC_ID: o ID do tópico.
  • CLUSTER_ID: o ID do cluster que contém o tópico.
  • LOCATION_ID: o local do cluster.
  • RETENTION_PERIOD: o período máximo de tempo para armazenar arquivos de segmento, em milissegundos.
  • MAX_BYTES: o número máximo de bytes a serem armazenados por partição.

CLI Kafka

Antes de executar esse comando, instale as ferramentas de linha de comando do Kafka em uma VM do Compute Engine. A VM precisa acessar uma sub-rede conectada ao cluster do Serviço Gerenciado para Apache Kafka. Siga as instruções em Produzir e consumir mensagens com as ferramentas de linha de comando do Kafka.

Execute o comando kafka-configs.sh:

kafka-configs.sh --alter \
  --bootstrap-server=BOOTSTRAP_ADDRESS \
  --command-config client.properties \
  --entity-type topics \
  --entity-name TOPIC_ID \
  --add-config retention.ms=RETENTION_PERIOD,retention.bytes=MAX_BYTES

Substitua:

  • BOOTSTRAP_ADDRESS: o endereço de inicialização do cluster do Serviço Gerenciado para Apache Kafka.
  • TOPIC_ID: o ID do tópico.
  • RETENTION_PERIOD: o período máximo de tempo para armazenar arquivos de segmento, em milissegundos.
  • MAX_BYTES: o número máximo de bytes a serem armazenados por partição.

Ativar a compactação de registros

Se a compactação de registros estiver ativada, o Kafka armazenará apenas a mensagem mais recente de cada chave. A compactação de registros está desativada por padrão. Para ativar a compactação de registros de um tópico, defina a cleanup.policy configuração como "compact", da seguinte maneira:

gcloud

Execute o gcloud managed-kafka topics update comando.

gcloud managed-kafka topics update TOPIC_ID \
  --cluster=CLUSTER_ID \
  --location=LOCATION_ID \
  --configs=cleanup.policy=compact

Substitua:

  • TOPIC_ID: o ID do tópico.
  • CLUSTER_ID: o ID do cluster que contém o tópico.
  • LOCATION_ID: o local do cluster.

CLI Kafka

Antes de executar esse comando, instale as ferramentas de linha de comando do Kafka em uma VM do Compute Engine. A VM precisa acessar uma sub-rede conectada ao cluster do Serviço Gerenciado para Apache Kafka. Siga as instruções em Produzir e consumir mensagens com as ferramentas de linha de comando do Kafka.

Execute o comando kafka-configs.sh:

kafka-configs.sh --alter \
  --bootstrap-server=BOOTSTRAP_ADDRESS \
  --command-config client.properties \
  --entity-type topics \
  --entity-name TOPIC_ID \
  --add-config cleanup.policy=compact

Substitua:

Limitações

  • Não é possível substituir as configurações de tópico para armazenamento remoto, como remote.storage.enable.

  • Não é possível substituir as configurações de tópico para arquivos de segmento de registro, como segment.bytes.

  • A ativação da compactação de registros de um tópico desativa implicitamente o armazenamento em camadas para esse tópico. Todos os arquivos de registro do tópico são armazenados localmente.

A seguir