Mettre à jour un cluster Google Cloud Managed Service pour Apache Kafka

Vous pouvez modifier un cluster Managed Service pour Apache Kafka afin de mettre à jour des propriétés telles que la taille du cluster (nombre de vCPU et mémoire), la liste des sous-réseaux connectés, la configuration de l'équilibrage automatique et la configuration mTLS.

Pour modifier un cluster, vous pouvez utiliser la console Google Cloud , Google Cloud CLI, la bibliothèque cliente ou l'API Managed Kafka. Vous ne pouvez pas utiliser l'API Apache Kafka Open Source pour mettre à jour un cluster.

La mise à jour de certaines propriétés, telles que le nombre de vCPU et la mémoire, peut nécessiter le redémarrage du cluster par le service. Le cluster est redémarré un courtier à la fois. Pendant ce processus, les requêtes adressées à des courtiers individuels peuvent échouer, mais ces échecs sont temporaires. Les bibliothèques clientes courantes gèrent automatiquement ces erreurs.

Rôles et autorisations nécessaires

Pour obtenir les autorisations nécessaires pour mettre à jour un cluster, demandez à votre administrateur de vous accorder le rôle IAM Éditeur de cluster Kafka géré (roles/managedkafka.clusterEditor) sur votre projet. Pour en savoir plus sur l'attribution de rôles, consultez Gérer l'accès aux projets, aux dossiers et aux organisations.

Ce rôle prédéfini contient les autorisations requises pour mettre à jour un cluster. Pour connaître les autorisations exactes requises, développez la section Autorisations requises :

Autorisations requises

Les autorisations suivantes sont requises pour mettre à jour un cluster :

  • Modifier un cluster : managedkafka.clusters.update

Vous pouvez également obtenir ces autorisations avec des rôles personnalisés ou d'autres rôles prédéfinis.

Redimensionner un cluster

Si vous modifiez le nombre de processeurs virtuels ou la mémoire d'un cluster, les règles suivantes s'appliquent :

  • Le ratio global processeur virtuel/mémoire du cluster doit toujours rester compris entre 1:1 et 1:8.

  • Si vous réduisez la taille, chaque courtier existant doit disposer d'au moins un processeur virtuel et 1 Gio de mémoire. Le nombre de courtiers ne diminue jamais.

  • Si vous augmentez la taille de votre cluster et que cela entraîne l'ajout de nouveaux courtiers, le nombre moyen de processeurs virtuels et la mémoire par courtier ne peuvent pas diminuer de plus de 10% par rapport aux moyennes avant la mise à jour.

    Par exemple, si vous essayez de faire passer un cluster de 45 processeurs virtuels (3 brokers) à 48 processeurs virtuels (4 brokers), l'opération échoue. En effet, le nombre moyen de vCPU par courtier passe de 15 à 12, soit une réduction de 20 %, ce qui dépasse la limite de 10 %.

Pour en savoir plus, consultez Modifier la taille du cluster.

Modifier un cluster

Pour modifier un cluster, procédez comme suit :

Console

  1. Dans la console Google Cloud , accédez à la page Clusters.

    accéder aux clusters

  2. Dans la liste des clusters, cliquez sur celui dont vous souhaitez modifier les propriétés.

    La page des détails du cluster s'affiche.

  3. Sur la page d'informations du cluster, cliquez sur Modifier.

  4. Modifiez les propriétés selon vos besoins. Les propriétés suivantes d'un cluster sont modifiables depuis la console :

    • Mémoire
    • Processeurs virtuels
    • Sous-réseau
    • Configuration du rééquilibrage
    • Configuration mTLS
    • Étiquettes
  5. Cliquez sur Enregistrer.

gcloud

  1. Dans la console Google Cloud , activez Cloud Shell.

    Activer Cloud Shell

    En bas de la console Google Cloud , une session Cloud Shell démarre et affiche une invite de ligne de commande. Cloud Shell est un environnement de shell dans lequel Google Cloud CLI est déjà installé, et dans lequel des valeurs sont déjà définies pour votre projet actuel. L'initialisation de la session peut prendre quelques secondes.

  2. Exécutez la commande gcloud managed-kafka clusters update :

    gcloud managed-kafka clusters update CLUSTER_ID \
        --location=LOCATION \
        --cpu=CPU \
        --memory=MEMORY \
        --subnets=SUBNETS \
        --auto-rebalance \
        --labels=LABELS
    

    Remplacez les éléments suivants :

    • CLUSTER_ID : ID ou nom du cluster. Vous ne pouvez pas modifier cette valeur.
    • LOCATION : emplacement du cluster. Vous ne pouvez pas modifier cette valeur.
    • CPU : nombre de processeurs virtuels pour le cluster.
    • MEMORY : quantité de mémoire pour le cluster. Utilisez les unités "MB", "MiB", "GB", "GiB", "TB" ou "TiB". Par exemple, "10 Gio".
    • SUBNETS : liste des sous-réseaux auxquels se connecter. Utilisez des virgules pour séparer plusieurs valeurs de sous-réseau.
    • auto-rebalance : active le rééquilibrage automatique des partitions de sujet entre les agents lorsque le nombre de processeurs du cluster change. Cette option est activée par défaut.
    • LABELS : libellés à associer au cluster.

Si vous utilisez l'indicateur --async avec votre commande, le système envoie la demande de mise à jour et renvoie immédiatement une réponse, sans attendre la fin de l'opération. L'indicateur --async vous permet de poursuivre d'autres tâches pendant que la mise à jour du cluster s'effectue en arrière-plan. Si vous n'utilisez pas l'option --async, le système attend la fin de l'opération avant de renvoyer une réponse. Vous devez attendre que le cluster soit entièrement mis à jour avant de pouvoir poursuivre d'autres tâches.

REST

Avant d'utiliser les données de requête, effectuez les remplacements suivants :

  • PROJECT_ID : ID de votre projet Google Cloud
  • LOCATION : emplacement du cluster
  • CLUSTER_ID : ID du cluster
  • UPDATE_MASK : champs à mettre à jour, sous forme de liste de noms complets séparés par une virgule. Exemple : capacityConfig.vcpuCount,capacityConfig.memoryBytes
  • CPU_COUNT : nombre de processeurs virtuels pour le cluster.
  • MEMORY : quantité de mémoire pour le cluster, en octets
  • SUBNET_ID : ID du sous-réseau auquel se connecter

Méthode HTTP et URL :

PATCH https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID?updateMask=UPDATE_MASK

Corps JSON de la requête :

{
  "capacityConfig": {
    "vcpuCount": CPU_COUNT,
    "memoryBytes": MEMORY
  },
  "gcpConfig": {
    "accessConfig": {
      "networkConfigs": [
        {
          "subnet": "projects/PROJECT_ID/regions/LOCATION/subnetworks/SUBNET_ID"
        }
      ]
    }
  }
}

Pour envoyer votre requête, développez l'une des options suivantes :

Vous devriez recevoir une réponse JSON de ce type :

{
  "name": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC_ID",
  "partitionCount": PARTITION_COUNT,
  "replicationFactor": REPLICATION_FACTOR
}

Dans le corps de la requête, n'incluez que les champs que vous mettez à jour, comme indiqué dans le paramètre de requête UPDATE_MASK. Pour ajouter un sous-réseau, ajoutez une entrée à networkConfigs.

Go

Avant d'essayer cet exemple, suivez les instructions de configuration pour Go dans Installer les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Managed Service pour Apache Kafka en langage Go.

Pour vous authentifier auprès de Managed Service pour Apache Kafka, configurez les identifiants par défaut de l'application(ADC, Application Default Credentials). Pour en savoir plus, consultez Configurer les ADC pour un environnement de développement local.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/option"
	"google.golang.org/protobuf/types/known/fieldmaskpb"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
)

func updateCluster(w io.Writer, projectID, region, clusterID string, memory int64, opts ...option.ClientOption) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// clusterID := "my-cluster"
	// memoryBytes := 4221225472
	ctx := context.Background()
	client, err := managedkafka.NewClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
	}
	defer client.Close()

	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
	capacityConfig := &managedkafkapb.CapacityConfig{
		MemoryBytes: memory,
	}
	cluster := &managedkafkapb.Cluster{
		Name:           clusterPath,
		CapacityConfig: capacityConfig,
	}
	paths := []string{"capacity_config.memory_bytes"}
	updateMask := &fieldmaskpb.FieldMask{
		Paths: paths,
	}

	req := &managedkafkapb.UpdateClusterRequest{
		UpdateMask: updateMask,
		Cluster:    cluster,
	}
	op, err := client.UpdateCluster(ctx, req)
	if err != nil {
		return fmt.Errorf("client.UpdateCluster got err: %w", err)
	}
	resp, err := op.Wait(ctx)
	if err != nil {
		return fmt.Errorf("op.Wait got err: %w", err)
	}
	fmt.Fprintf(w, "Updated cluster: %#v\n", resp)
	return nil
}

Java

Avant d'essayer cet exemple, suivez les instructions de configuration pour Java dans Installer les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Java Managed Service pour Apache Kafka.

Pour vous authentifier auprès de Managed Service pour Apache Kafka, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer les ADC pour un environnement de développement local.


import com.google.api.gax.longrunning.OperationFuture;
import com.google.api.gax.longrunning.OperationSnapshot;
import com.google.api.gax.longrunning.OperationTimedPollAlgorithm;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.retrying.TimedRetryAlgorithm;
import com.google.cloud.managedkafka.v1.CapacityConfig;
import com.google.cloud.managedkafka.v1.Cluster;
import com.google.cloud.managedkafka.v1.ClusterName;
import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
import com.google.cloud.managedkafka.v1.ManagedKafkaSettings;
import com.google.cloud.managedkafka.v1.OperationMetadata;
import com.google.cloud.managedkafka.v1.UpdateClusterRequest;
import com.google.protobuf.FieldMask;
import java.time.Duration;
import java.util.concurrent.ExecutionException;

public class UpdateCluster {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String clusterId = "my-cluster";
    long memoryBytes = 25769803776L; // 24 GiB
    updateCluster(projectId, region, clusterId, memoryBytes);
  }

  public static void updateCluster(
      String projectId, String region, String clusterId, long memoryBytes) throws Exception {
    CapacityConfig capacityConfig = CapacityConfig.newBuilder().setMemoryBytes(memoryBytes).build();
    Cluster cluster =
        Cluster.newBuilder()
            .setName(ClusterName.of(projectId, region, clusterId).toString())
            .setCapacityConfig(capacityConfig)
            .build();
    FieldMask updateMask = FieldMask.newBuilder().addPaths("capacity_config.memory_bytes").build();

    // Create the settings to configure the timeout for polling operations
    ManagedKafkaSettings.Builder settingsBuilder = ManagedKafkaSettings.newBuilder();
    TimedRetryAlgorithm timedRetryAlgorithm = OperationTimedPollAlgorithm.create(
        RetrySettings.newBuilder()
            .setTotalTimeoutDuration(Duration.ofHours(1L))
            .build());
    settingsBuilder.updateClusterOperationSettings()
        .setPollingAlgorithm(timedRetryAlgorithm);

    try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create(
        settingsBuilder.build())) {
      UpdateClusterRequest request =
          UpdateClusterRequest.newBuilder().setUpdateMask(updateMask).setCluster(cluster).build();
      OperationFuture<Cluster, OperationMetadata> future =
          managedKafkaClient.updateClusterOperationCallable().futureCall(request);

      // Get the initial LRO and print details. CreateCluster contains sample code for polling logs.
      OperationSnapshot operation = future.getInitialFuture().get();
      System.out.printf("Cluster update started. Operation name: %s\nDone: %s\nMetadata: %s\n",
          operation.getName(),
          operation.isDone(),
          future.getMetadata().get().toString());

      Cluster response = future.get();
      System.out.printf("Updated cluster: %s\n", response.getName());
    } catch (ExecutionException e) {
      System.err.printf("managedKafkaClient.updateCluster got err: %s", e.getMessage());
    }
  }
}

Python

Avant d'essayer cet exemple, suivez les instructions de configuration pour Python dans Installer les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Python Managed Service pour Apache Kafka.

Pour vous authentifier auprès de Managed Service pour Apache Kafka, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer les ADC pour un environnement de développement local.

from google.api_core.exceptions import GoogleAPICallError
from google.cloud import managedkafka_v1
from google.protobuf import field_mask_pb2

# TODO(developer)
# project_id = "my-project-id"
# region = "us-central1"
# cluster_id = "my-cluster"
# memory_bytes = 4295000000

client = managedkafka_v1.ManagedKafkaClient()

cluster = managedkafka_v1.Cluster()
cluster.name = client.cluster_path(project_id, region, cluster_id)
cluster.capacity_config.memory_bytes = memory_bytes
update_mask = field_mask_pb2.FieldMask()
update_mask.paths.append("capacity_config.memory_bytes")

# For a list of editable fields, one can check https://cloud.google.com/managed-kafka/docs/create-cluster#properties.
request = managedkafka_v1.UpdateClusterRequest(
    update_mask=update_mask,
    cluster=cluster,
)

try:
    operation = client.update_cluster(request=request)
    print(f"Waiting for operation {operation.operation.name} to complete...")
    response = operation.result()
    print("Updated cluster:", response)
except GoogleAPICallError as e:
    print(f"The operation failed with error: {e.message}")

Limites

Une fois que vous avez créé un cluster Managed Service pour Apache Kafka, vous ne pouvez pas modifier les propriétés suivantes :

  • Nom du cluster
  • Emplacement du cluster
  • Type de chiffrement

Bien que vous ne puissiez pas modifier le type de chiffrement, vous pouvez faire pivoter les clés de chiffrement.

Étape suivante

Apache Kafka® est une marque déposée d'Apache Software Foundation ou de ses filiales aux États-Unis et/ou dans d'autres pays.