Actualiza un clúster de Connect

Puedes editar un clúster de Connect para actualizar propiedades como la cantidad de CPU virtuales, la memoria, la red y las etiquetas.

Para editar un clúster de Connect, puedes usar la consola de Google Cloud , gcloud CLI, la biblioteca cliente o la API de Kafka administrado. No puedes usar la API de Apache Kafka de código abierto para actualizar un clúster de Connect.

Antes de comenzar

No todas las propiedades de un clúster de Connect se pueden editar. Revisa las propiedades de un clúster de Connect antes de actualizarlo.

Roles y permisos obligatorios para editar un clúster de Connect

Para obtener los permisos que necesitas para editar un clúster de Connect, pídele a tu administrador que te otorgue el rol de IAM Editor de clústeres de Kafka Connect administrados (roles/managedkafka.connectClusterEditor) en tu proyecto. Para obtener más información sobre cómo otorgar roles, consulta Administra el acceso a proyectos, carpetas y organizaciones.

Este rol predefinido contiene los permisos necesarios para editar un clúster de Connect. Para ver los permisos exactos que son necesarios, expande la sección Permisos requeridos:

Permisos necesarios

Se requieren los siguientes permisos para editar un clúster de Connect:

  • Otorga al permiso de actualización un permiso de clúster de Connect en la ubicación especificada: managedkafka.connectClusters.update
  • Otorga al permiso de vista un permiso de clúster de Connect en la ubicación especificada. Este permiso solo es necesario para actualizar un clúster de Connect con la consola de Google Cloud : managedkafka.connectors.list

También puedes obtener estos permisos con roles personalizados o con otros roles predefinidos.

Para obtener más información sobre este rol, consulta Roles predefinidos de Managed Service para Apache Kafka.

Cómo editar un clúster de Connect

La actualización de ciertas propiedades, como la CPU y la memoria, requiere que se reinicie el clúster.

Los reinicios del clúster conservan los datos, pero pueden aumentar la latencia. La cantidad inicial de trabajadores en el clúster determina la duración del reinicio.

Puedes actualizar las siguientes propiedades del clúster de Connect:

Propiedad Editable
CPU virtuales
Memoria
Red
Subred de trabajadores
Dominios del DNS que se pueden resolver Sí (agregar o borrar)
Nombre del clúster de Connect No
Clúster de Kafka No
Ubicación No
Etiquetas Sí (agregar, editar o borrar)
Secrets Sí (agregar o borrar)

Console

  1. En la consola de Google Cloud , ve a la página Connect Clusters.

    Ir a Connect Clusters

  2. Haz clic en el clúster de Connect que quieres actualizar.

    Se muestra la página Detalles de conexión del clúster.

  3. Haz clic en Editar.

    Aparecerá la página Editar clúster de Kafka Connect.

  4. Realiza los cambios necesarios en las propiedades editables.

  5. Haz clic en Guardar.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Ejecuta el comando gcloud managed-kafka connect-clusters update:

    gcloud managed-kafka connect-clusters update CONNECT_CLUSTER_ID \
        --location=LOCATION \
        [--cpu=CPU --memory=MEMORY
         | --clear-dns-names \
         | --dns-name=DNS_NAME --clear-labels \
         | --labels=LABELS --clear-secrets \
         | --secret=SECRET [--primary-subnet=WORKER_SUBNET \
        [--async]
    

    Reemplaza lo siguiente:

    • CONNECT_CLUSTER_ID: Es el ID o el nombre del clúster de Connect. El nombre de un clúster de Connect es inmutable.
    • LOCATION: Es la ubicación del clúster de Connect. La ubicación de un clúster de Connect es inmutable.
    • CPU: Es la cantidad de CPU virtuales para el clúster de Connect. El valor mínimo es 3 vCPUs.
    • MEMORY: Es la cantidad de memoria para el clúster de Connect. Usa las unidades "MB", "MiB", "GB", "GiB", "TB" o "TiB". Por ejemplo, "10 GiB". Debes aprovisionar entre 1 GiB y 8 GiB por CPU virtual.

    • DNS_NAME: Nombre de dominio DNS de la red de la subred que se hará visible para el clúster de Connect.
    • LABELS: Son etiquetas opcionales para asociar con el clúster. Para obtener más información sobre el formato de las etiquetas, consulta Etiquetas. Lista de pares clave-valor de etiquetas que se agregarán. Las claves deben comenzar con un carácter en minúscula y contener solo guiones (-), guiones bajos (_), caracteres en minúscula y números. Los valores deben contener solo guiones (-), guiones bajos (_), caracteres en minúscula y números.
    • SECRET: (Opcional) Son los secretos que se cargarán en los trabajadores. Se deben proporcionar las versiones exactas de los secretos de Secret Manager, ya que no se admiten alias. Se pueden cargar hasta 32 secretos en un clúster. Formato: projects/PROJECT_ID/secrets/SECRET_NAME/versions/VERSION_ID
    • WORKER_SUBNET: Es la subred de trabajadores del clúster de Connect. La subred de trabajadores debe estar en la misma región que el clúster de Connect.

      El formato de la subred es projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_ID.

  3. Go

    Antes de probar este ejemplo, sigue las instrucciones de configuración de Go en Instala las bibliotecas cliente. Para obtener más información, consulta la documentación de referencia de la API de Managed Service para Apache Kafka en Go.

    Para autenticarte en Managed Service for Apache Kafka, configura las credenciales predeterminadas de la aplicación(ADC). Para obtener más información, consulta Configura ADC para un entorno de desarrollo local.

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    	"google.golang.org/protobuf/types/known/fieldmaskpb"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func updateConnectCluster(w io.Writer, projectID, region, clusterID string, memoryBytes int64, labels map[string]string, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// clusterID := "my-connect-cluster"
    	// memoryBytes := 25769803776 // 24 GiB in bytes
    	// labels := map[string]string{"environment": "production"}
    	ctx := context.Background()
    	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
    	}
    	defer client.Close()
    
    	clusterPath := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s", projectID, region, clusterID)
    
    	// Capacity configuration update
    	capacityConfig := &managedkafkapb.CapacityConfig{
    		MemoryBytes: memoryBytes,
    	}
    
    	connectCluster := &managedkafkapb.ConnectCluster{
    		Name:           clusterPath,
    		CapacityConfig: capacityConfig,
    		Labels:         labels,
    	}
    	paths := []string{"capacity_config.memory_bytes", "labels"}
    	updateMask := &fieldmaskpb.FieldMask{
    		Paths: paths,
    	}
    
    	req := &managedkafkapb.UpdateConnectClusterRequest{
    		UpdateMask:     updateMask,
    		ConnectCluster: connectCluster,
    	}
    	op, err := client.UpdateConnectCluster(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.UpdateConnectCluster got err: %w", err)
    	}
    	resp, err := op.Wait(ctx)
    	if err != nil {
    		return fmt.Errorf("op.Wait got err: %w", err)
    	}
    	fmt.Fprintf(w, "Updated connect cluster: %#v\n", resp)
    	return nil
    }
    

    Java

    Antes de probar este ejemplo, sigue las instrucciones de configuración de Java en Instala las bibliotecas cliente. Para obtener más información, consulta la documentación de referencia de la API de Java de Managed Service for Apache Kafka.

    Para autenticarte en el servicio administrado para Apache Kafka, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta Configura ADC para un entorno de desarrollo local.

    
    import com.google.api.gax.longrunning.OperationFuture;
    import com.google.api.gax.longrunning.OperationSnapshot;
    import com.google.api.gax.longrunning.OperationTimedPollAlgorithm;
    import com.google.api.gax.retrying.RetrySettings;
    import com.google.api.gax.retrying.TimedRetryAlgorithm;
    import com.google.cloud.managedkafka.v1.CapacityConfig;
    import com.google.cloud.managedkafka.v1.ConnectCluster;
    import com.google.cloud.managedkafka.v1.ConnectClusterName;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectSettings;
    import com.google.cloud.managedkafka.v1.OperationMetadata;
    import com.google.cloud.managedkafka.v1.UpdateConnectClusterRequest;
    import com.google.protobuf.FieldMask;
    import java.time.Duration;
    import java.util.concurrent.ExecutionException;
    
    public class UpdateConnectCluster {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-connect-cluster";
        long memoryBytes = 25769803776L; // 24 GiB
        updateConnectCluster(projectId, region, clusterId, memoryBytes);
      }
    
      public static void updateConnectCluster(
          String projectId, String region, String clusterId, long memoryBytes) throws Exception {
        CapacityConfig capacityConfig = CapacityConfig.newBuilder().setMemoryBytes(memoryBytes).build();
        ConnectCluster connectCluster = ConnectCluster.newBuilder()
            .setName(ConnectClusterName.of(projectId, region, clusterId).toString())
            .setCapacityConfig(capacityConfig)
            .build();
        FieldMask updateMask = FieldMask.newBuilder().addPaths("capacity_config.memory_bytes").build();
    
        // Create the settings to configure the timeout for polling operations
        ManagedKafkaConnectSettings.Builder settingsBuilder = ManagedKafkaConnectSettings.newBuilder();
        TimedRetryAlgorithm timedRetryAlgorithm = OperationTimedPollAlgorithm.create(
            RetrySettings.newBuilder()
                .setTotalTimeoutDuration(Duration.ofHours(1L))
                .build());
        settingsBuilder.updateConnectClusterOperationSettings()
            .setPollingAlgorithm(timedRetryAlgorithm);
    
        try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create(
            settingsBuilder.build())) {
          UpdateConnectClusterRequest request = UpdateConnectClusterRequest.newBuilder()
              .setUpdateMask(updateMask)
              .setConnectCluster(connectCluster).build();
          OperationFuture<ConnectCluster, OperationMetadata> future = managedKafkaConnectClient
              .updateConnectClusterOperationCallable().futureCall(request);
    
          // Get the initial LRO and print details. CreateConnectCluster contains sample
          // code for polling logs.
          OperationSnapshot operation = future.getInitialFuture().get();
          System.out.printf(
              "Connect cluster update started. Operation name: %s\nDone: %s\nMetadata: %s\n",
              operation.getName(),
              operation.isDone(),
              future.getMetadata().get().toString());
    
          ConnectCluster response = future.get();
          System.out.printf("Updated connect cluster: %s\n", response.getName());
        } catch (ExecutionException e) {
          System.err.printf("managedKafkaConnectClient.updateConnectCluster got err: %s\n", 
              e.getMessage());
        }
      }
    }
    

    Python

    Antes de probar este ejemplo, sigue las instrucciones de configuración de Python en Instala las bibliotecas cliente. Si deseas obtener más información, consulta la documentación de referencia de la API de Python de Managed Service for Apache Kafka.

    Para autenticarte en el servicio administrado para Apache Kafka, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta Configura ADC para un entorno de desarrollo local.

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud import managedkafka_v1
    from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
        ManagedKafkaConnectClient,
    )
    from google.cloud.managedkafka_v1.types import ConnectCluster
    from google.protobuf import field_mask_pb2
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # connect_cluster_id = "my-connect-cluster"
    # memory_bytes = 4295000000
    
    connect_client = ManagedKafkaConnectClient()
    
    connect_cluster = ConnectCluster()
    connect_cluster.name = connect_client.connect_cluster_path(
        project_id, region, connect_cluster_id
    )
    connect_cluster.capacity_config.memory_bytes = memory_bytes
    update_mask = field_mask_pb2.FieldMask()
    update_mask.paths.append("capacity_config.memory_bytes")
    
    # For a list of editable fields, one can check https://cloud.google.com/managed-service-for-apache-kafka/docs/connect-cluster/create-connect-cluster#properties.
    request = managedkafka_v1.UpdateConnectClusterRequest(
        update_mask=update_mask,
        connect_cluster=connect_cluster,
    )
    
    try:
        operation = connect_client.update_connect_cluster(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        operation.result()
        response = operation.result()
        print("Updated Connect cluster:", response)
    except GoogleAPICallError as e:
        print(f"The operation failed with error: {e}")
    

Próximos pasos

Apache Kafka® es una marca registrada de The Apache Software Foundation o sus afiliados en Estados Unidos y otros países.