Atualizar um cluster do serviço gerenciado do Google Cloud para Apache Kafka

É possível editar um cluster do Serviço gerenciado para Apache Kafka do Google Cloud para atualizar propriedades como o número de vCPUs, memória, sub-redes, tipo de criptografia ou rótulos. Também é possível configurar se o serviço rebalanceia as partições entre os brokers quando um broker é adicionado ao cluster. O serviço cria novos agentes automaticamente com base na configuração de memória e vCPU do cluster.

Para editar um cluster, use o console Google Cloud , a Google Cloud CLI, a biblioteca de cliente ou a API Managed Kafka. Não é possível usar a API Apache Kafka de código aberto para atualizar um cluster.

Antes de começar

Se você atualizar a contagem de vCPUs ou a memória, as seguintes regras serão aplicáveis:

  • A proporção geral de vCPU para memória do cluster precisa sempre estar entre 1:1 e 1:8.

  • Se você reduzir a escala, cada broker precisará ter pelo menos uma vCPU e 1 GiB de memória. O número de brokers nunca diminui.

  • Se você fizer um upgrade e a mudança resultar na adição de novos brokers, a vCPU média e a memória por broker não poderão diminuir em mais de 10% em comparação com as médias antes da atualização.

    Por exemplo, se você tentar fazer o escalonamento vertical de um cluster de 45 vCPUs (3 brokers) para 48 vCPUs (4 brokers), a operação vai falhar. Isso acontece porque a vCPU média por corretor diminui de 15 para 12, uma redução de 20%, excedendo o limite de 10%.

Para mais informações, consulte Atualizar o tamanho do cluster.

A atualização de determinadas propriedades, como contagem de vCPUs e memória, pode exigir que o serviço reinicie o cluster. Os clusters são reiniciados um broker por vez. Isso leva a falhas temporárias de solicitações para brokers individuais, mas essas falhas são transitórias. As bibliotecas de cliente usadas com frequência processam esses erros automaticamente.

Não é possível editar o nome, o local ou o tipo de criptografia do cluster.

Papéis e permissões necessários para editar um cluster

Para receber as permissões necessárias para atualizar um cluster, peça ao administrador para conceder a você o papel do IAM de Editor de cluster gerenciado do Kafka (roles/managedkafka.clusterEditor) no projeto. Para mais informações sobre a concessão de papéis, consulte Gerenciar o acesso a projetos, pastas e organizações.

Esse papel predefinido contém as permissões necessárias para atualizar um cluster. Para acessar as permissões exatas necessárias, expanda a seção Permissões necessárias:

Permissões necessárias

As seguintes permissões são necessárias para atualizar um cluster:

  • Editar um cluster: managedkafka.clusters.update

Essas permissões também podem ser concedidas com funções personalizadas ou outros papéis predefinidos.

A função de editor do cluster gerenciado do Kafka não permite criar, excluir ou modificar tópicos e grupos de consumidores em clusters do Serviço gerenciado para Apache Kafka. Também não permite que o plano de dados publique ou consuma mensagens em clusters. Para mais informações sobre essa função, consulte Funções predefinidas do Managed Service para Apache Kafka.

Edite um cluster

Para editar um cluster, siga estas etapas:

Console

  1. No Google Cloud console, acesse a página Clusters.

    Acessar Clusters

  2. Na lista de clusters, clique no cluster cujas propriedades você quer editar.

    A página de detalhes do cluster é exibida.

  3. Na página de detalhes do cluster, clique em Editar.

  4. Edite as propriedades conforme necessário. As seguintes propriedades de um cluster podem ser editadas no console:

    • Memória
    • vCPUs
    • Sub-rede
    • Configuração de reequilíbrio
    • Configuração de mTLS
    • Rótulos
  5. Clique em Salvar.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Execute o comando gcloud managed-kafka clusters update:

    gcloud managed-kafka clusters update CLUSTER_ID \
        --location=LOCATION \
        --cpu=CPU \
        --memory=MEMORY \
        --subnets=SUBNETS \
        --auto-rebalance \
        --labels=LABELS
    

    Substitua:

    • CLUSTER_ID: o ID ou nome do cluster. Não é possível atualizar esse valor.
    • LOCATION: o local do cluster. Não é possível atualizar esse valor.
    • CPU: o número de CPUs virtuais para o cluster.
    • MEMORY: a quantidade de memória para o cluster. Use as unidades "MB", "MiB", "GB", "GiB", "TB" ou "TiB". Por exemplo, "10GiB".
    • SUBNETS: a lista de sub-redes a serem conectadas. Use vírgulas para separar vários valores de sub-rede.
    • auto-rebalance: ativa o reequilíbrio automático de partições de tópicos entre os agentes quando o número de CPUs no cluster muda. Essa opção fica ativada por padrão.
    • LABELS: rótulos a serem associados ao cluster.
  3. Se você usar a flag --async com o comando, o sistema vai enviar a solicitação de atualização e retornar uma resposta imediatamente, sem aguardar a conclusão da operação. Com a flag --async, é possível continuar com outras tarefas enquanto a atualização do cluster acontece em segundo plano. Se você não usar a flag --async, o sistema vai aguardar a conclusão da operação antes de retornar uma resposta. Aguarde até que o cluster seja totalmente atualizado antes de continuar com outras tarefas.

    REST

    Antes de usar os dados da solicitação abaixo, faça as substituições a seguir:

    • PROJECT_ID: o ID do projeto do Google Cloud
    • LOCATION: o local do cluster
    • CLUSTER_ID: o ID do cluster
    • UPDATE_MASK: quais campos atualizar, como uma lista separada por vírgulas de nomes totalmente qualificados. Exemplo: capacityConfig.vcpuCount,capacityConfig.memoryBytes
    • CPU_COUNT: o número de vCPUs para o cluster
    • MEMORY: a quantidade de memória para o cluster, em bytes.
    • SUBNET_ID: ID da sub-rede a ser conectada

    Método HTTP e URL:

    PATCH https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID?updateMask=UPDATE_MASK

    Corpo JSON da solicitação:

    {
      "capacityConfig": {
        "vcpuCount": CPU_COUNT,
        "memoryBytes": MEMORY
      },
      "gcpConfig": {
        "accessConfig": {
          "networkConfigs": [
            {
              "subnet": "projects/PROJECT_ID/regions/LOCATION/subnetworks/SUBNET_ID"
            }
          ]
        }
      }
    }
    

    Para enviar a solicitação, expanda uma destas opções:

    Você receberá uma resposta JSON semelhante a esta:

    {
      "name": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC_ID",
      "partitionCount": PARTITION_COUNT,
      "replicationFactor": REPLICATION_FACTOR
    }
    

    No corpo da solicitação, inclua apenas os campos que você está atualizando, conforme especificado no parâmetro de consulta UPDATE_MASK. Para adicionar uma sub-rede, anexe uma nova entrada a networkConfigs.

    Go

    Antes de testar esta amostra, siga as instruções de configuração do Go em Instalar as bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Go do serviço gerenciado para Apache Kafka.

    Para autenticar o Managed Service para Apache Kafka, configure o Application Default Credentials(ADC). Para mais informações, consulte Configurar o ADC para um ambiente de desenvolvimento local.

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    	"google.golang.org/protobuf/types/known/fieldmaskpb"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func updateCluster(w io.Writer, projectID, region, clusterID string, memory int64, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// clusterID := "my-cluster"
    	// memoryBytes := 4221225472
    	ctx := context.Background()
    	client, err := managedkafka.NewClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
    	}
    	defer client.Close()
    
    	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
    	capacityConfig := &managedkafkapb.CapacityConfig{
    		MemoryBytes: memory,
    	}
    	cluster := &managedkafkapb.Cluster{
    		Name:           clusterPath,
    		CapacityConfig: capacityConfig,
    	}
    	paths := []string{"capacity_config.memory_bytes"}
    	updateMask := &fieldmaskpb.FieldMask{
    		Paths: paths,
    	}
    
    	req := &managedkafkapb.UpdateClusterRequest{
    		UpdateMask: updateMask,
    		Cluster:    cluster,
    	}
    	op, err := client.UpdateCluster(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.UpdateCluster got err: %w", err)
    	}
    	resp, err := op.Wait(ctx)
    	if err != nil {
    		return fmt.Errorf("op.Wait got err: %w", err)
    	}
    	fmt.Fprintf(w, "Updated cluster: %#v\n", resp)
    	return nil
    }
    

    Java

    Antes de testar esta amostra, siga as instruções de configuração do Java em Instalar as bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Java do serviço gerenciado para Apache Kafka.

    Para autenticar o serviço gerenciado para Apache Kafka, configure o Application Default Credentials. Para mais informações, consulte Configurar o ADC para um ambiente de desenvolvimento local.

    
    import com.google.api.gax.longrunning.OperationFuture;
    import com.google.api.gax.longrunning.OperationSnapshot;
    import com.google.api.gax.longrunning.OperationTimedPollAlgorithm;
    import com.google.api.gax.retrying.RetrySettings;
    import com.google.api.gax.retrying.TimedRetryAlgorithm;
    import com.google.cloud.managedkafka.v1.CapacityConfig;
    import com.google.cloud.managedkafka.v1.Cluster;
    import com.google.cloud.managedkafka.v1.ClusterName;
    import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
    import com.google.cloud.managedkafka.v1.ManagedKafkaSettings;
    import com.google.cloud.managedkafka.v1.OperationMetadata;
    import com.google.cloud.managedkafka.v1.UpdateClusterRequest;
    import com.google.protobuf.FieldMask;
    import java.time.Duration;
    import java.util.concurrent.ExecutionException;
    
    public class UpdateCluster {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-cluster";
        long memoryBytes = 25769803776L; // 24 GiB
        updateCluster(projectId, region, clusterId, memoryBytes);
      }
    
      public static void updateCluster(
          String projectId, String region, String clusterId, long memoryBytes) throws Exception {
        CapacityConfig capacityConfig = CapacityConfig.newBuilder().setMemoryBytes(memoryBytes).build();
        Cluster cluster =
            Cluster.newBuilder()
                .setName(ClusterName.of(projectId, region, clusterId).toString())
                .setCapacityConfig(capacityConfig)
                .build();
        FieldMask updateMask = FieldMask.newBuilder().addPaths("capacity_config.memory_bytes").build();
    
        // Create the settings to configure the timeout for polling operations
        ManagedKafkaSettings.Builder settingsBuilder = ManagedKafkaSettings.newBuilder();
        TimedRetryAlgorithm timedRetryAlgorithm = OperationTimedPollAlgorithm.create(
            RetrySettings.newBuilder()
                .setTotalTimeoutDuration(Duration.ofHours(1L))
                .build());
        settingsBuilder.updateClusterOperationSettings()
            .setPollingAlgorithm(timedRetryAlgorithm);
    
        try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create(
            settingsBuilder.build())) {
          UpdateClusterRequest request =
              UpdateClusterRequest.newBuilder().setUpdateMask(updateMask).setCluster(cluster).build();
          OperationFuture<Cluster, OperationMetadata> future =
              managedKafkaClient.updateClusterOperationCallable().futureCall(request);
    
          // Get the initial LRO and print details. CreateCluster contains sample code for polling logs.
          OperationSnapshot operation = future.getInitialFuture().get();
          System.out.printf("Cluster update started. Operation name: %s\nDone: %s\nMetadata: %s\n",
              operation.getName(),
              operation.isDone(),
              future.getMetadata().get().toString());
    
          Cluster response = future.get();
          System.out.printf("Updated cluster: %s\n", response.getName());
        } catch (ExecutionException e) {
          System.err.printf("managedKafkaClient.updateCluster got err: %s", e.getMessage());
        }
      }
    }
    

    Python

    Antes de testar esta amostra, siga as instruções de configuração do Python em Instalar as bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Python do serviço gerenciado para Apache Kafka.

    Para autenticar o serviço gerenciado para Apache Kafka, configure o Application Default Credentials. Para mais informações, consulte Configurar o ADC para um ambiente de desenvolvimento local.

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud import managedkafka_v1
    from google.protobuf import field_mask_pb2
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # cluster_id = "my-cluster"
    # memory_bytes = 4295000000
    
    client = managedkafka_v1.ManagedKafkaClient()
    
    cluster = managedkafka_v1.Cluster()
    cluster.name = client.cluster_path(project_id, region, cluster_id)
    cluster.capacity_config.memory_bytes = memory_bytes
    update_mask = field_mask_pb2.FieldMask()
    update_mask.paths.append("capacity_config.memory_bytes")
    
    # For a list of editable fields, one can check https://cloud.google.com/managed-kafka/docs/create-cluster#properties.
    request = managedkafka_v1.UpdateClusterRequest(
        update_mask=update_mask,
        cluster=cluster,
    )
    
    try:
        operation = client.update_cluster(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        response = operation.result()
        print("Updated cluster:", response)
    except GoogleAPICallError as e:
        print(f"The operation failed with error: {e.message}")
    

A seguir

Apache Kafka® é uma marca registrada da The Apache Software Foundation ou afiliadas nos Estados Unidos e/ou em outros países.