Atualizar um cluster do Connect

É possível editar um cluster do Connect para atualizar propriedades como o número de vCPUs, memória, rede e rótulos.

Para editar um cluster do Connect, use o console Google Cloud , a CLI gcloud, a biblioteca de cliente ou a API Managed Kafka. Não é possível usar a API Apache Kafka de código aberto para atualizar um cluster do Connect.

Antes de começar

Nem todas as propriedades de um cluster do Connect podem ser editadas. Analise as propriedades de um cluster do Connect antes de atualizar.

Papéis e permissões necessários para editar um cluster do Connect

Para receber as permissões necessárias para editar um cluster do Connect, peça ao administrador para conceder a você o papel do IAM de Editor de cluster gerenciado do Kafka Connect (roles/managedkafka.connectClusterEditor) no projeto. Para mais informações sobre a concessão de papéis, consulte Gerenciar o acesso a projetos, pastas e organizações.

Esse papel predefinido contém as permissões necessárias para editar um cluster do Connect. Para acessar as permissões exatas necessárias, expanda a seção Permissões necessárias:

Permissões necessárias

As seguintes permissões são necessárias para editar um cluster do Connect:

  • Conceda à atualização uma permissão de cluster do Connect no local especificado: managedkafka.connectClusters.update
  • Concede à visualização uma permissão de cluster do Connect no local especificado. Essa permissão é necessária apenas para atualizar um cluster do Connect usando o console Google Cloud : managedkafka.connectors.list

Essas permissões também podem ser concedidas com funções personalizadas ou outros papéis predefinidos.

Para mais informações sobre essa função, consulte Funções predefinidas do Managed Service para Apache Kafka.

Editar um cluster do Connect

Para atualizar algumas propriedades, como CPU e memória, é necessário reiniciar o cluster.

As reinicializações de cluster preservam os dados, mas podem aumentar a latência. O número inicial de workers no cluster determina a duração da reinicialização.

É possível atualizar as seguintes propriedades do cluster do Connect:

Propriedade Editável
vCPUs Sim
Memória Sim
Rede Sim
Sub-rede de worker Sim
Domínios DNS solucionáveis Sim (adicionar/excluir)
Nome do cluster Connect Não
Cluster Kafka Não
Local Não
Rótulos Sim (adicionar/editar/excluir)
Secrets Sim (adicionar/excluir)

Console

  1. No console do Google Cloud , acesse a página Conectar clusters.

    Acessar o Connect Clusters

  2. Clique no cluster do Connect que você quer atualizar.

    A página Detalhes do cluster de conexão é exibida.

  3. Clique em Editar.

    A página Editar cluster do Kafka Connect é exibida.

  4. Faça as mudanças necessárias nas propriedades editáveis.

  5. Clique em Salvar.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Execute o comando gcloud managed-kafka connect-clusters update:

    gcloud managed-kafka connect-clusters update CONNECT_CLUSTER_ID \
        --location=LOCATION \
        [--cpu=CPU --memory=MEMORY
         | --clear-dns-names \
         | --dns-name=DNS_NAME --clear-labels \
         | --labels=LABELS --clear-secrets \
         | --secret=SECRET [--primary-subnet=WORKER_SUBNET \
        [--async]
    

    Substitua:

    • CONNECT_CLUSTER_ID: o ID ou nome do cluster do Connect. O nome de um cluster do Connect é imutável.
    • LOCATION: o local do cluster do Connect. O local de um cluster do Connect é imutável.
    • CPU: o número de vCPUs para o cluster do Connect. O valor mínimo é 3 vCPUs.
    • MEMORY: a quantidade de memória para o cluster do Connect. Use as unidades "MB", "MiB", "GB", "GiB", "TB" ou "TiB". Por exemplo, "10GiB". É necessário provisionar entre 1 GiB e 8 GiB por vCPU.

    • DNS_NAME: nome de domínio DNS da rede da sub-rede a ser disponibilizada para o Connect Cluster.
    • LABELS: (opcional) rótulos a serem associados ao cluster. Para mais informações sobre o formato dos rótulos, consulte Rótulos. Lista de pares de rótulos KEY=VALUE a serem adicionados. As chaves precisam começar com um caractere minúsculo e conter apenas hifens (-), sublinhados (_), caracteres minúsculos e números. Os valores precisam conter apenas hifens (-), sublinhados (_), caracteres minúsculos e números.
    • SECRET: (opcional) secrets a serem carregados nos workers. As versões exatas do secret do Secret Manager precisam ser fornecidas. Não há suporte para aliases. É possível carregar até 32 secrets em um cluster. Formato: projects/PROJECT_ID/secrets/SECRET_NAME/versions/VERSION_ID
    • WORKER_SUBNET: a sub-rede de nós de trabalho do cluster do Connect. A sub-rede de worker precisa estar na mesma região que o cluster do Connect.

      O formato da sub-rede é projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_ID.

  3. Go

    Antes de testar esta amostra, siga as instruções de configuração do Go em Instalar as bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Go do serviço gerenciado para Apache Kafka.

    Para autenticar o Managed Service para Apache Kafka, configure o Application Default Credentials(ADC). Para mais informações, consulte Configurar o ADC para um ambiente de desenvolvimento local.

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    	"google.golang.org/protobuf/types/known/fieldmaskpb"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func updateConnectCluster(w io.Writer, projectID, region, clusterID string, memoryBytes int64, labels map[string]string, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// clusterID := "my-connect-cluster"
    	// memoryBytes := 25769803776 // 24 GiB in bytes
    	// labels := map[string]string{"environment": "production"}
    	ctx := context.Background()
    	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
    	}
    	defer client.Close()
    
    	clusterPath := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s", projectID, region, clusterID)
    
    	// Capacity configuration update
    	capacityConfig := &managedkafkapb.CapacityConfig{
    		MemoryBytes: memoryBytes,
    	}
    
    	connectCluster := &managedkafkapb.ConnectCluster{
    		Name:           clusterPath,
    		CapacityConfig: capacityConfig,
    		Labels:         labels,
    	}
    	paths := []string{"capacity_config.memory_bytes", "labels"}
    	updateMask := &fieldmaskpb.FieldMask{
    		Paths: paths,
    	}
    
    	req := &managedkafkapb.UpdateConnectClusterRequest{
    		UpdateMask:     updateMask,
    		ConnectCluster: connectCluster,
    	}
    	op, err := client.UpdateConnectCluster(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.UpdateConnectCluster got err: %w", err)
    	}
    	resp, err := op.Wait(ctx)
    	if err != nil {
    		return fmt.Errorf("op.Wait got err: %w", err)
    	}
    	fmt.Fprintf(w, "Updated connect cluster: %#v\n", resp)
    	return nil
    }
    

    Java

    Antes de testar esta amostra, siga as instruções de configuração do Java em Instalar as bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Java do serviço gerenciado para Apache Kafka.

    Para autenticar o serviço gerenciado para Apache Kafka, configure o Application Default Credentials. Para mais informações, consulte Configurar o ADC para um ambiente de desenvolvimento local.

    
    import com.google.api.gax.longrunning.OperationFuture;
    import com.google.api.gax.longrunning.OperationSnapshot;
    import com.google.api.gax.longrunning.OperationTimedPollAlgorithm;
    import com.google.api.gax.retrying.RetrySettings;
    import com.google.api.gax.retrying.TimedRetryAlgorithm;
    import com.google.cloud.managedkafka.v1.CapacityConfig;
    import com.google.cloud.managedkafka.v1.ConnectCluster;
    import com.google.cloud.managedkafka.v1.ConnectClusterName;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectSettings;
    import com.google.cloud.managedkafka.v1.OperationMetadata;
    import com.google.cloud.managedkafka.v1.UpdateConnectClusterRequest;
    import com.google.protobuf.FieldMask;
    import java.time.Duration;
    import java.util.concurrent.ExecutionException;
    
    public class UpdateConnectCluster {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-connect-cluster";
        long memoryBytes = 25769803776L; // 24 GiB
        updateConnectCluster(projectId, region, clusterId, memoryBytes);
      }
    
      public static void updateConnectCluster(
          String projectId, String region, String clusterId, long memoryBytes) throws Exception {
        CapacityConfig capacityConfig = CapacityConfig.newBuilder().setMemoryBytes(memoryBytes).build();
        ConnectCluster connectCluster = ConnectCluster.newBuilder()
            .setName(ConnectClusterName.of(projectId, region, clusterId).toString())
            .setCapacityConfig(capacityConfig)
            .build();
        FieldMask updateMask = FieldMask.newBuilder().addPaths("capacity_config.memory_bytes").build();
    
        // Create the settings to configure the timeout for polling operations
        ManagedKafkaConnectSettings.Builder settingsBuilder = ManagedKafkaConnectSettings.newBuilder();
        TimedRetryAlgorithm timedRetryAlgorithm = OperationTimedPollAlgorithm.create(
            RetrySettings.newBuilder()
                .setTotalTimeoutDuration(Duration.ofHours(1L))
                .build());
        settingsBuilder.updateConnectClusterOperationSettings()
            .setPollingAlgorithm(timedRetryAlgorithm);
    
        try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create(
            settingsBuilder.build())) {
          UpdateConnectClusterRequest request = UpdateConnectClusterRequest.newBuilder()
              .setUpdateMask(updateMask)
              .setConnectCluster(connectCluster).build();
          OperationFuture<ConnectCluster, OperationMetadata> future = managedKafkaConnectClient
              .updateConnectClusterOperationCallable().futureCall(request);
    
          // Get the initial LRO and print details. CreateConnectCluster contains sample
          // code for polling logs.
          OperationSnapshot operation = future.getInitialFuture().get();
          System.out.printf(
              "Connect cluster update started. Operation name: %s\nDone: %s\nMetadata: %s\n",
              operation.getName(),
              operation.isDone(),
              future.getMetadata().get().toString());
    
          ConnectCluster response = future.get();
          System.out.printf("Updated connect cluster: %s\n", response.getName());
        } catch (ExecutionException e) {
          System.err.printf("managedKafkaConnectClient.updateConnectCluster got err: %s\n", 
              e.getMessage());
        }
      }
    }
    

    Python

    Antes de testar esta amostra, siga as instruções de configuração do Python em Instalar as bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Python do serviço gerenciado para Apache Kafka.

    Para autenticar o serviço gerenciado para Apache Kafka, configure o Application Default Credentials. Para mais informações, consulte Configurar o ADC para um ambiente de desenvolvimento local.

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud import managedkafka_v1
    from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
        ManagedKafkaConnectClient,
    )
    from google.cloud.managedkafka_v1.types import ConnectCluster
    from google.protobuf import field_mask_pb2
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # connect_cluster_id = "my-connect-cluster"
    # memory_bytes = 4295000000
    
    connect_client = ManagedKafkaConnectClient()
    
    connect_cluster = ConnectCluster()
    connect_cluster.name = connect_client.connect_cluster_path(
        project_id, region, connect_cluster_id
    )
    connect_cluster.capacity_config.memory_bytes = memory_bytes
    update_mask = field_mask_pb2.FieldMask()
    update_mask.paths.append("capacity_config.memory_bytes")
    
    # For a list of editable fields, one can check https://cloud.google.com/managed-service-for-apache-kafka/docs/connect-cluster/create-connect-cluster#properties.
    request = managedkafka_v1.UpdateConnectClusterRequest(
        update_mask=update_mask,
        connect_cluster=connect_cluster,
    )
    
    try:
        operation = connect_client.update_connect_cluster(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        operation.result()
        response = operation.result()
        print("Updated Connect cluster:", response)
    except GoogleAPICallError as e:
        print(f"The operation failed with error: {e}")
    

A seguir

Apache Kafka® é uma marca registrada da The Apache Software Foundation ou afiliadas nos Estados Unidos e/ou em outros países.