Actualiza un clúster de Google Cloud Managed Service para Apache Kafka

Puedes editar un clúster de Servicio administrado para Apache Kafka de Google Cloud para actualizar propiedades como la cantidad de CPU virtuales, la memoria, las subredes, el tipo de encriptación o las etiquetas. También puedes configurar si el servicio reequilibra las particiones entre los intermediarios cuando se agrega uno al clúster. El servicio crea agentes nuevos automáticamente según la configuración de memoria y CPU virtuales del clúster.

Para editar un clúster, puedes usar la consola de Google Cloud , Google Cloud CLI, la biblioteca cliente o la API de Kafka administrado. No puedes usar la API de código abierto de Apache Kafka para actualizar un clúster.

Antes de comenzar

Si actualizas el recuento de CPU virtuales o la memoria, se aplicarán las siguientes reglas:

  • La proporción general de CPU virtuales a memoria del clúster siempre debe permanecer entre 1:1 y 1:8.

  • Si reduces la escala, debe haber al menos 1 CPU virtual y 1 GiB de memoria para cada agente existente. La cantidad de intermediarios nunca disminuye.

  • Si aumentas la escala y el cambio genera la incorporación de nuevos intermediarios, el promedio de CPU virtuales y memoria por intermediario no puede disminuir en más del 10% en comparación con los promedios anteriores a la actualización.

    Por ejemplo, si intentas aumentar la escala de un clúster de 45 CPU virtuales (3 intermediarios) a 48 CPU virtuales (4 intermediarios), la operación fallará. Esto se debe a que el promedio de CPU virtuales por agente disminuye de 15 a 12, lo que representa una reducción del 20%, que supera el límite del 10%.

Para obtener más información, consulta Cómo actualizar el tamaño del clúster.

La actualización de ciertas propiedades, como el recuento de CPU virtuales y la memoria, podría requerir que el servicio reinicie el clúster. Los clústeres se reinician un agente a la vez. Esto provoca fallas temporales en las solicitudes a los distintos intermediarios, pero estas fallas son transitorias. Las bibliotecas cliente que se usan con frecuencia controlan estos errores automáticamente.

No puedes editar el nombre, la ubicación ni el tipo de encriptación del clúster.

Roles y permisos obligatorios para editar un clúster

Para obtener los permisos que necesitas para actualizar un clúster, pídele a tu administrador que te otorgue el rol de IAM Editor de clústeres de Kafka administrados (roles/managedkafka.clusterEditor) en tu proyecto. Para obtener más información sobre cómo otorgar roles, consulta Administra el acceso a proyectos, carpetas y organizaciones.

Este rol predefinido contiene los permisos necesarios para actualizar un clúster. Para ver los permisos exactos que son necesarios, expande la sección Permisos requeridos:

Permisos necesarios

Se requieren los siguientes permisos para actualizar un clúster:

  • Edita un clúster: managedkafka.clusters.update

También puedes obtener estos permisos con roles personalizados o con otros roles predefinidos.

El rol de editor del clúster de Kafka administrado no te permite crear, borrar ni modificar temas y grupos de consumidores en los clústeres de Managed Service para Apache Kafka. Tampoco permite el acceso al plano de datos para publicar o consumir mensajes dentro de los clústeres. Para obtener más información sobre este rol, consulta Roles predefinidos de Managed Service para Apache Kafka.

Editar un clúster

Para editar un clúster, sigue estos pasos:

Console

  1. En la consola de Google Cloud , ve a la página Clústeres.

    Ir a los clústeres

  2. En la lista de clústeres, haz clic en el clúster cuyas propiedades deseas editar.

    Se muestra la página de detalles del clúster.

  3. En la página de detalles del clúster, haz clic en Editar.

  4. Edita las propiedades según sea necesario. Las siguientes propiedades de un clúster se pueden editar desde la consola:

    • Memoria
    • CPU virtuales
    • Subred
    • Configuración de rebalanceo
    • Configuración de mTLS
    • Etiquetas
  5. Haz clic en Guardar.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Ejecuta el comando gcloud managed-kafka clusters update:

    gcloud managed-kafka clusters update CLUSTER_ID \
        --location=LOCATION \
        --cpu=CPU \
        --memory=MEMORY \
        --subnets=SUBNETS \
        --auto-rebalance \
        --labels=LABELS
    

    Reemplaza lo siguiente:

    • CLUSTER_ID: Es el ID o el nombre del clúster. No puedes actualizar este valor.
    • LOCATION: Es la ubicación del clúster. No puedes actualizar este valor.
    • CPU: Es la cantidad de CPU virtuales para el clúster.
    • MEMORY: Es la cantidad de memoria del clúster. Usa las unidades "MB", "MiB", "GB", "GiB", "TB" o "TiB". Por ejemplo, "10 GiB".
    • SUBNETS: Es la lista de subredes a las que se conectará. Usa comas para separar varios valores de subred.
    • auto-rebalance: Habilita el rebalanceo automático de las particiones de temas entre los agentes cuando cambia la cantidad de CPU en el clúster. Esta opción está habilitada de forma predeterminada.
    • LABELS: Son las etiquetas que se asociarán con el clúster.
  3. Si usas la marca --async con tu comando, el sistema envía la solicitud de actualización y muestra una respuesta de inmediato, sin esperar a que se complete la operación. Con la marca --async, puedes continuar con otras tareas mientras la actualización del clúster se realiza en segundo plano. Si no usas la marca --async, el sistema espera a que se complete la operación antes de devolver una respuesta. Debes esperar a que el clúster se actualice por completo antes de continuar con otras tareas.

    REST

    Antes de usar cualquiera de los datos de solicitud a continuación, realiza los siguientes reemplazos:

    • PROJECT_ID: El ID de tu proyecto de Google Cloud
    • LOCATION: Es la ubicación del clúster.
    • CLUSTER_ID: ID del clúster
    • UPDATE_MASK: Indica qué campos se actualizarán, como una lista separada por comas de nombres completamente calificados. Por ejemplo: capacityConfig.vcpuCount,capacityConfig.memoryBytes
    • CPU_COUNT: Es la cantidad de CPU virtuales del clúster.
    • MEMORY: Es la cantidad de memoria del clúster, en bytes.
    • SUBNET_ID: ID de la subred a la que se conectará

    Método HTTP y URL:

    PATCH https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID?updateMask=UPDATE_MASK

    Cuerpo JSON de la solicitud:

    {
      "capacityConfig": {
        "vcpuCount": CPU_COUNT,
        "memoryBytes": MEMORY
      },
      "gcpConfig": {
        "accessConfig": {
          "networkConfigs": [
            {
              "subnet": "projects/PROJECT_ID/regions/LOCATION/subnetworks/SUBNET_ID"
            }
          ]
        }
      }
    }
    

    Para enviar tu solicitud, expande una de estas opciones:

    Deberías recibir una respuesta JSON similar a la que se muestra a continuación:

    {
      "name": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC_ID",
      "partitionCount": PARTITION_COUNT,
      "replicationFactor": REPLICATION_FACTOR
    }
    

    En el cuerpo de la solicitud, incluye solo los campos que estás actualizando, como se especifica en el parámetro de búsqueda UPDATE_MASK. Para agregar una subred, agrega una entrada nueva a networkConfigs.

    Go

    Antes de probar este ejemplo, sigue las instrucciones de configuración de Go en Instala las bibliotecas cliente. Para obtener más información, consulta la documentación de referencia de la API de Managed Service para Apache Kafka en Go.

    Para autenticarte en Managed Service for Apache Kafka, configura las credenciales predeterminadas de la aplicación(ADC). Para obtener más información, consulta Configura ADC para un entorno de desarrollo local.

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    	"google.golang.org/protobuf/types/known/fieldmaskpb"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func updateCluster(w io.Writer, projectID, region, clusterID string, memory int64, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// clusterID := "my-cluster"
    	// memoryBytes := 4221225472
    	ctx := context.Background()
    	client, err := managedkafka.NewClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
    	}
    	defer client.Close()
    
    	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
    	capacityConfig := &managedkafkapb.CapacityConfig{
    		MemoryBytes: memory,
    	}
    	cluster := &managedkafkapb.Cluster{
    		Name:           clusterPath,
    		CapacityConfig: capacityConfig,
    	}
    	paths := []string{"capacity_config.memory_bytes"}
    	updateMask := &fieldmaskpb.FieldMask{
    		Paths: paths,
    	}
    
    	req := &managedkafkapb.UpdateClusterRequest{
    		UpdateMask: updateMask,
    		Cluster:    cluster,
    	}
    	op, err := client.UpdateCluster(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.UpdateCluster got err: %w", err)
    	}
    	resp, err := op.Wait(ctx)
    	if err != nil {
    		return fmt.Errorf("op.Wait got err: %w", err)
    	}
    	fmt.Fprintf(w, "Updated cluster: %#v\n", resp)
    	return nil
    }
    

    Java

    Antes de probar este ejemplo, sigue las instrucciones de configuración de Java en Instala las bibliotecas cliente. Para obtener más información, consulta la documentación de referencia de la API de Java de Managed Service for Apache Kafka.

    Para autenticarte en el servicio administrado para Apache Kafka, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta Configura ADC para un entorno de desarrollo local.

    
    import com.google.api.gax.longrunning.OperationFuture;
    import com.google.api.gax.longrunning.OperationSnapshot;
    import com.google.api.gax.longrunning.OperationTimedPollAlgorithm;
    import com.google.api.gax.retrying.RetrySettings;
    import com.google.api.gax.retrying.TimedRetryAlgorithm;
    import com.google.cloud.managedkafka.v1.CapacityConfig;
    import com.google.cloud.managedkafka.v1.Cluster;
    import com.google.cloud.managedkafka.v1.ClusterName;
    import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
    import com.google.cloud.managedkafka.v1.ManagedKafkaSettings;
    import com.google.cloud.managedkafka.v1.OperationMetadata;
    import com.google.cloud.managedkafka.v1.UpdateClusterRequest;
    import com.google.protobuf.FieldMask;
    import java.time.Duration;
    import java.util.concurrent.ExecutionException;
    
    public class UpdateCluster {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-cluster";
        long memoryBytes = 25769803776L; // 24 GiB
        updateCluster(projectId, region, clusterId, memoryBytes);
      }
    
      public static void updateCluster(
          String projectId, String region, String clusterId, long memoryBytes) throws Exception {
        CapacityConfig capacityConfig = CapacityConfig.newBuilder().setMemoryBytes(memoryBytes).build();
        Cluster cluster =
            Cluster.newBuilder()
                .setName(ClusterName.of(projectId, region, clusterId).toString())
                .setCapacityConfig(capacityConfig)
                .build();
        FieldMask updateMask = FieldMask.newBuilder().addPaths("capacity_config.memory_bytes").build();
    
        // Create the settings to configure the timeout for polling operations
        ManagedKafkaSettings.Builder settingsBuilder = ManagedKafkaSettings.newBuilder();
        TimedRetryAlgorithm timedRetryAlgorithm = OperationTimedPollAlgorithm.create(
            RetrySettings.newBuilder()
                .setTotalTimeoutDuration(Duration.ofHours(1L))
                .build());
        settingsBuilder.updateClusterOperationSettings()
            .setPollingAlgorithm(timedRetryAlgorithm);
    
        try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create(
            settingsBuilder.build())) {
          UpdateClusterRequest request =
              UpdateClusterRequest.newBuilder().setUpdateMask(updateMask).setCluster(cluster).build();
          OperationFuture<Cluster, OperationMetadata> future =
              managedKafkaClient.updateClusterOperationCallable().futureCall(request);
    
          // Get the initial LRO and print details. CreateCluster contains sample code for polling logs.
          OperationSnapshot operation = future.getInitialFuture().get();
          System.out.printf("Cluster update started. Operation name: %s\nDone: %s\nMetadata: %s\n",
              operation.getName(),
              operation.isDone(),
              future.getMetadata().get().toString());
    
          Cluster response = future.get();
          System.out.printf("Updated cluster: %s\n", response.getName());
        } catch (ExecutionException e) {
          System.err.printf("managedKafkaClient.updateCluster got err: %s", e.getMessage());
        }
      }
    }
    

    Python

    Antes de probar este ejemplo, sigue las instrucciones de configuración de Python en Instala las bibliotecas cliente. Si deseas obtener más información, consulta la documentación de referencia de la API de Python de Managed Service for Apache Kafka.

    Para autenticarte en el servicio administrado para Apache Kafka, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta Configura ADC para un entorno de desarrollo local.

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud import managedkafka_v1
    from google.protobuf import field_mask_pb2
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # cluster_id = "my-cluster"
    # memory_bytes = 4295000000
    
    client = managedkafka_v1.ManagedKafkaClient()
    
    cluster = managedkafka_v1.Cluster()
    cluster.name = client.cluster_path(project_id, region, cluster_id)
    cluster.capacity_config.memory_bytes = memory_bytes
    update_mask = field_mask_pb2.FieldMask()
    update_mask.paths.append("capacity_config.memory_bytes")
    
    # For a list of editable fields, one can check https://cloud.google.com/managed-kafka/docs/create-cluster#properties.
    request = managedkafka_v1.UpdateClusterRequest(
        update_mask=update_mask,
        cluster=cluster,
    )
    
    try:
        operation = client.update_cluster(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        response = operation.result()
        print("Updated cluster:", response)
    except GoogleAPICallError as e:
        print(f"The operation failed with error: {e.message}")
    

Próximos pasos

Apache Kafka® es una marca registrada de The Apache Software Foundation o sus afiliados en Estados Unidos y otros países.