Actualiza un clúster de Google Cloud Managed Service para Apache Kafka

Puedes editar un clúster de Managed Service para Apache Kafka para actualizar propiedades, como el tamaño del clúster (cantidad de CPU virtuales y memoria), la lista de subredes conectadas, la configuración de rebalanceo automático y la configuración de mTLS.

Para editar un clúster, puedes usar la consola de Google Cloud , Google Cloud CLI, la biblioteca cliente o la API de Kafka administrado. No puedes usar la API de código abierto de Apache Kafka para actualizar un clúster.

La actualización de ciertas propiedades, como el recuento de CPU virtuales y la memoria, puede requerir que el servicio reinicie el clúster. El clúster se reinicia un agente a la vez. Durante este proceso, es posible que fallen las solicitudes a los agentes individuales, pero estas fallas son transitorias. Las bibliotecas cliente que se usan con frecuencia controlan estos errores automáticamente.

Roles y permisos requeridos

Para obtener los permisos que necesitas para actualizar un clúster, pídele a tu administrador que te otorgue el rol de IAM Editor de clústeres de Kafka administrados (roles/managedkafka.clusterEditor) en tu proyecto. Para obtener más información sobre cómo otorgar roles, consulta Administra el acceso a proyectos, carpetas y organizaciones.

Este rol predefinido contiene los permisos necesarios para actualizar un clúster. Para ver los permisos exactos que son necesarios, expande la sección Permisos requeridos:

Permisos necesarios

Se requieren los siguientes permisos para actualizar un clúster:

  • Edita un clúster: managedkafka.clusters.update

También puedes obtener estos permisos con roles personalizados o con otros roles predefinidos.

El rol de editor del clúster de Kafka administrado no te permite crear, borrar ni modificar temas ni grupos de consumidores en los clústeres de Managed Service para Apache Kafka. Tampoco permite el acceso al plano de datos para publicar o consumir mensajes dentro de los clústeres. Para obtener más información sobre este rol, consulta Roles predefinidos de Managed Service para Apache Kafka.

Cambia el tamaño de un clúster

Si actualizas la cantidad de CPU virtuales o la memoria de un clúster, se aplicarán las siguientes reglas:

  • La proporción general de CPU virtuales a memoria del clúster siempre debe permanecer entre 1:1 y 1:8.

  • Si reduces la escala, debe haber al menos 1 CPU virtual y 1 GiB de memoria para cada agente existente. La cantidad de agentes nunca disminuye.

  • Si aumentas la escala y el cambio genera la incorporación de nuevos intermediarios, el promedio de CPU virtuales y memoria por intermediario no puede disminuir en más del 10% en comparación con los promedios anteriores a la actualización.

    Por ejemplo, si intentas aumentar la capacidad de un clúster de 45 CPU virtuales (3 intermediarios) a 48 CPU virtuales (4 intermediarios), la operación fallará. Esto se debe a que el promedio de CPU virtuales por agente disminuye de 15 a 12, lo que representa una reducción del 20%, que supera el límite del 10%.

Para obtener más información, consulta Cómo actualizar el tamaño del clúster.

Editar un clúster

Para editar un clúster, sigue estos pasos:

Console

  1. En la consola de Google Cloud , ve a la página Clústeres.

    Ir a los clústeres

  2. En la lista de clústeres, haz clic en el clúster cuyas propiedades deseas editar.

    Se muestra la página de detalles del clúster.

  3. En la página de detalles del clúster, haz clic en Editar.

  4. Edita las propiedades según sea necesario. Las siguientes propiedades de un clúster se pueden editar desde la consola:

    • Memoria
    • CPU virtuales
    • Subred
    • Configuración de rebalanceo
    • Configuración de mTLS
    • Etiquetas
  5. Haz clic en Guardar.

gcloud

  1. En la consola de Google Cloud , activa Cloud Shell.

    Activa Cloud Shell

    En la parte inferior de la consola de Google Cloud , se inicia una sesión de Cloud Shell que muestra una ventana emergente con una línea de comandos. Cloud Shell es un entorno de shell con Google Cloud CLI ya instalada y con valores ya establecidos para el proyecto actual. La sesión puede tardar unos segundos en inicializarse.

  2. Ejecuta el comando gcloud managed-kafka clusters update:

    gcloud managed-kafka clusters update CLUSTER_ID \
        --location=LOCATION \
        --cpu=CPU \
        --memory=MEMORY \
        --subnets=SUBNETS \
        --auto-rebalance \
        --labels=LABELS
    

    Reemplaza lo siguiente:

    • CLUSTER_ID: Es el ID o el nombre del clúster. No puedes actualizar este valor.
    • LOCATION: Es la ubicación del clúster. No puedes actualizar este valor.
    • CPU: Es la cantidad de CPU virtuales para el clúster.
    • MEMORY: Es la cantidad de memoria del clúster. Usa las unidades "MB", "MiB", "GB", "GiB", "TB" o "TiB". Por ejemplo, "10 GiB".
    • SUBNETS: Es la lista de subredes a las que se conectará. Usa comas para separar varios valores de subred.
    • auto-rebalance: Habilita el rebalanceo automático de las particiones de temas entre los agentes cuando cambia la cantidad de CPU en el clúster. Esta opción está habilitada de forma predeterminada.
    • LABELS: Son las etiquetas que se asociarán con el clúster.

Si usas la marca --async con tu comando, el sistema envía la solicitud de actualización y muestra una respuesta de inmediato, sin esperar a que se complete la operación. Con la marca --async, puedes continuar con otras tareas mientras la actualización del clúster se realiza en segundo plano. Si no usas la marca --async, el sistema espera a que se complete la operación antes de devolver una respuesta. Debes esperar a que el clúster se actualice por completo antes de continuar con otras tareas.

REST

Antes de usar cualquiera de los datos de solicitud a continuación, realiza los siguientes reemplazos:

  • PROJECT_ID: El ID de tu proyecto de Google Cloud
  • LOCATION: Es la ubicación del clúster.
  • CLUSTER_ID: ID del clúster
  • UPDATE_MASK: Indica qué campos se actualizarán, como una lista separada por comas de nombres completamente calificados. Por ejemplo: capacityConfig.vcpuCount,capacityConfig.memoryBytes
  • CPU_COUNT: Es la cantidad de CPU virtuales del clúster.
  • MEMORY: Es la cantidad de memoria del clúster, en bytes.
  • SUBNET_ID: ID de la subred a la que se conectará

Método HTTP y URL:

PATCH https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID?updateMask=UPDATE_MASK

Cuerpo JSON de la solicitud:

{
  "capacityConfig": {
    "vcpuCount": CPU_COUNT,
    "memoryBytes": MEMORY
  },
  "gcpConfig": {
    "accessConfig": {
      "networkConfigs": [
        {
          "subnet": "projects/PROJECT_ID/regions/LOCATION/subnetworks/SUBNET_ID"
        }
      ]
    }
  }
}

Para enviar tu solicitud, expande una de estas opciones:

Deberías recibir una respuesta JSON similar a la que se muestra a continuación:

{
  "name": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC_ID",
  "partitionCount": PARTITION_COUNT,
  "replicationFactor": REPLICATION_FACTOR
}

En el cuerpo de la solicitud, incluye solo los campos que estás actualizando, como se especifica en el parámetro de búsqueda UPDATE_MASK. Para agregar una subred, agrega una entrada nueva a networkConfigs.

Go

Antes de probar este ejemplo, sigue las instrucciones de configuración de Go en Instala las bibliotecas cliente. Para obtener más información, consulta la documentación de referencia de la API de Managed Service para Apache Kafka para Go.

Para autenticarte en Managed Service for Apache Kafka, configura las credenciales predeterminadas de la aplicación(ADC). Para obtener más información, consulta Configura ADC para un entorno de desarrollo local.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/option"
	"google.golang.org/protobuf/types/known/fieldmaskpb"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
)

func updateCluster(w io.Writer, projectID, region, clusterID string, memory int64, opts ...option.ClientOption) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// clusterID := "my-cluster"
	// memoryBytes := 4221225472
	ctx := context.Background()
	client, err := managedkafka.NewClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
	}
	defer client.Close()

	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
	capacityConfig := &managedkafkapb.CapacityConfig{
		MemoryBytes: memory,
	}
	cluster := &managedkafkapb.Cluster{
		Name:           clusterPath,
		CapacityConfig: capacityConfig,
	}
	paths := []string{"capacity_config.memory_bytes"}
	updateMask := &fieldmaskpb.FieldMask{
		Paths: paths,
	}

	req := &managedkafkapb.UpdateClusterRequest{
		UpdateMask: updateMask,
		Cluster:    cluster,
	}
	op, err := client.UpdateCluster(ctx, req)
	if err != nil {
		return fmt.Errorf("client.UpdateCluster got err: %w", err)
	}
	resp, err := op.Wait(ctx)
	if err != nil {
		return fmt.Errorf("op.Wait got err: %w", err)
	}
	fmt.Fprintf(w, "Updated cluster: %#v\n", resp)
	return nil
}

Java

Antes de probar este ejemplo, sigue las instrucciones de configuración de Java en Instala las bibliotecas cliente. Para obtener más información, consulta la documentación de referencia de la API de Java de Managed Service for Apache Kafka.

Para autenticarte en el servicio administrado para Apache Kafka, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta Configura ADC para un entorno de desarrollo local.


import com.google.api.gax.longrunning.OperationFuture;
import com.google.api.gax.longrunning.OperationSnapshot;
import com.google.api.gax.longrunning.OperationTimedPollAlgorithm;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.retrying.TimedRetryAlgorithm;
import com.google.cloud.managedkafka.v1.CapacityConfig;
import com.google.cloud.managedkafka.v1.Cluster;
import com.google.cloud.managedkafka.v1.ClusterName;
import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
import com.google.cloud.managedkafka.v1.ManagedKafkaSettings;
import com.google.cloud.managedkafka.v1.OperationMetadata;
import com.google.cloud.managedkafka.v1.UpdateClusterRequest;
import com.google.protobuf.FieldMask;
import java.time.Duration;
import java.util.concurrent.ExecutionException;

public class UpdateCluster {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String clusterId = "my-cluster";
    long memoryBytes = 25769803776L; // 24 GiB
    updateCluster(projectId, region, clusterId, memoryBytes);
  }

  public static void updateCluster(
      String projectId, String region, String clusterId, long memoryBytes) throws Exception {
    CapacityConfig capacityConfig = CapacityConfig.newBuilder().setMemoryBytes(memoryBytes).build();
    Cluster cluster =
        Cluster.newBuilder()
            .setName(ClusterName.of(projectId, region, clusterId).toString())
            .setCapacityConfig(capacityConfig)
            .build();
    FieldMask updateMask = FieldMask.newBuilder().addPaths("capacity_config.memory_bytes").build();

    // Create the settings to configure the timeout for polling operations
    ManagedKafkaSettings.Builder settingsBuilder = ManagedKafkaSettings.newBuilder();
    TimedRetryAlgorithm timedRetryAlgorithm = OperationTimedPollAlgorithm.create(
        RetrySettings.newBuilder()
            .setTotalTimeoutDuration(Duration.ofHours(1L))
            .build());
    settingsBuilder.updateClusterOperationSettings()
        .setPollingAlgorithm(timedRetryAlgorithm);

    try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create(
        settingsBuilder.build())) {
      UpdateClusterRequest request =
          UpdateClusterRequest.newBuilder().setUpdateMask(updateMask).setCluster(cluster).build();
      OperationFuture<Cluster, OperationMetadata> future =
          managedKafkaClient.updateClusterOperationCallable().futureCall(request);

      // Get the initial LRO and print details. CreateCluster contains sample code for polling logs.
      OperationSnapshot operation = future.getInitialFuture().get();
      System.out.printf("Cluster update started. Operation name: %s\nDone: %s\nMetadata: %s\n",
          operation.getName(),
          operation.isDone(),
          future.getMetadata().get().toString());

      Cluster response = future.get();
      System.out.printf("Updated cluster: %s\n", response.getName());
    } catch (ExecutionException e) {
      System.err.printf("managedKafkaClient.updateCluster got err: %s", e.getMessage());
    }
  }
}

Python

Antes de probar este ejemplo, sigue las instrucciones de configuración de Python en Instala las bibliotecas cliente. Si deseas obtener más información, consulta la documentación de referencia de la API de Python de Managed Service for Apache Kafka.

Para autenticarte en el servicio administrado para Apache Kafka, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta Configura ADC para un entorno de desarrollo local.

from google.api_core.exceptions import GoogleAPICallError
from google.cloud import managedkafka_v1
from google.protobuf import field_mask_pb2

# TODO(developer)
# project_id = "my-project-id"
# region = "us-central1"
# cluster_id = "my-cluster"
# memory_bytes = 4295000000

client = managedkafka_v1.ManagedKafkaClient()

cluster = managedkafka_v1.Cluster()
cluster.name = client.cluster_path(project_id, region, cluster_id)
cluster.capacity_config.memory_bytes = memory_bytes
update_mask = field_mask_pb2.FieldMask()
update_mask.paths.append("capacity_config.memory_bytes")

# For a list of editable fields, one can check https://cloud.google.com/managed-kafka/docs/create-cluster#properties.
request = managedkafka_v1.UpdateClusterRequest(
    update_mask=update_mask,
    cluster=cluster,
)

try:
    operation = client.update_cluster(request=request)
    print(f"Waiting for operation {operation.operation.name} to complete...")
    response = operation.result()
    print("Updated cluster:", response)
except GoogleAPICallError as e:
    print(f"The operation failed with error: {e.message}")

Limitaciones

Después de crear un clúster de Managed Service para Apache Kafka, no puedes actualizar las siguientes propiedades:

  • El nombre del clúster
  • Ubicación del clúster
  • Tipo de encriptación

Aunque no puedes cambiar el tipo de encriptación, puedes rotar las claves de encriptación.

Próximos pasos

Apache Kafka® es una marca registrada de The Apache Software Foundation o sus afiliados en Estados Unidos y otros países.