Mettre à jour un cluster Google Cloud Managed Service pour Apache Kafka

Vous pouvez modifier un cluster Google Cloud Managed Service pour Apache Kafka afin de mettre à jour des propriétés telles que le nombre de vCPU, la mémoire, les sous-réseaux, le type de chiffrement ou les libellés. Vous pouvez également configurer le service pour qu'il rééquilibre les partitions entre les courtiers lorsqu'un courtier est ajouté au cluster. Le service crée automatiquement des courtiers en fonction de la configuration de la mémoire et des vCPU du cluster.

Pour modifier un cluster, vous pouvez utiliser la console Google Cloud , Google Cloud CLI, la bibliothèque cliente ou l'API Managed Kafka. Vous ne pouvez pas utiliser l'API Apache Kafka Open Source pour mettre à jour un cluster.

Avant de commencer

Si vous modifiez le nombre de processeurs virtuels ou la mémoire, les règles suivantes s'appliquent :

  • Le ratio global processeur virtuel/mémoire du cluster doit toujours rester compris entre 1:1 et 1:8.

  • Si vous réduisez la taille, chaque courtier existant doit disposer d'au moins un processeur virtuel et 1 Gio de mémoire. Le nombre de brokers ne diminue jamais.

  • Si vous augmentez la taille de votre cluster et que cela entraîne l'ajout de nouveaux courtiers, le nombre moyen de processeurs virtuels et de mémoire par courtier ne peut pas diminuer de plus de 10 % par rapport aux moyennes avant la mise à jour.

    Par exemple, si vous essayez de faire passer un cluster de 45 processeurs virtuels (3 brokers) à 48 processeurs virtuels (4 brokers), l'opération échoue. En effet, le nombre moyen de vCPU par courtier passe de 15 à 12, soit une réduction de 20 %, ce qui dépasse la limite de 10 %.

Pour en savoir plus, consultez Modifier la taille du cluster.

La mise à jour de certaines propriétés, telles que le nombre de vCPU et la mémoire, peut nécessiter le redémarrage du cluster par le service. Les clusters sont redémarrés un courtier à la fois. Cela entraîne des échecs temporaires des requêtes adressées à des courtiers individuels, mais ces échecs sont transitoires. Les bibliothèques clientes couramment utilisées gèrent automatiquement ces erreurs.

Vous ne pouvez pas modifier le nom ni l'emplacement du cluster, ni le type de chiffrement.

Rôles et autorisations requis pour modifier un cluster

Pour obtenir les autorisations nécessaires pour mettre à jour un cluster, demandez à votre administrateur de vous accorder le rôle IAM Éditeur de cluster Kafka géré (roles/managedkafka.clusterEditor) sur votre projet. Pour en savoir plus sur l'attribution de rôles, consultez Gérer l'accès aux projets, aux dossiers et aux organisations.

Ce rôle prédéfini contient les autorisations requises pour mettre à jour un cluster. Pour connaître les autorisations exactes requises, développez la section Autorisations requises :

Autorisations requises

Les autorisations suivantes sont requises pour mettre à jour un cluster :

  • Modifier un cluster : managedkafka.clusters.update

Vous pouvez également obtenir ces autorisations avec des rôles personnalisés ou d'autres rôles prédéfinis.

Le rôle "Éditeur de cluster Managed Kafka" ne vous permet pas de créer, de supprimer ni de modifier des sujets et des groupes de consommateurs sur les clusters Managed Service pour Apache Kafka. Il n'autorise pas non plus l'accès au plan de données pour publier ou consommer des messages dans les clusters. Pour en savoir plus sur ce rôle, consultez Rôles prédéfinis de Managed Service pour Apache Kafka.

Modifier un cluster

Pour modifier un cluster, procédez comme suit :

Console

  1. Dans la console Google Cloud , accédez à la page Clusters.

    accéder aux clusters

  2. Dans la liste des clusters, cliquez sur celui dont vous souhaitez modifier les propriétés.

    La page des détails du cluster s'affiche.

  3. Sur la page des détails du cluster, cliquez sur Modifier.

  4. Modifiez les propriétés selon vos besoins. Les propriétés suivantes d'un cluster sont modifiables depuis la console :

    • Mémoire
    • Processeurs virtuels
    • Sous-réseau
    • Configuration du rééquilibrage
    • Configuration mTLS
    • Étiquettes
  5. Cliquez sur Enregistrer.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Exécutez la commande gcloud managed-kafka clusters update :

    gcloud managed-kafka clusters update CLUSTER_ID \
        --location=LOCATION \
        --cpu=CPU \
        --memory=MEMORY \
        --subnets=SUBNETS \
        --auto-rebalance \
        --labels=LABELS
    

    Remplacez les éléments suivants :

    • CLUSTER_ID : ID ou nom du cluster. Vous ne pouvez pas modifier cette valeur.
    • LOCATION : emplacement du cluster. Vous ne pouvez pas modifier cette valeur.
    • CPU : nombre de processeurs virtuels pour le cluster.
    • MEMORY : quantité de mémoire pour le cluster. Utilisez les unités "MB", "MiB", "GB", "GiB", "TB" ou "TiB". Par exemple, "10 Gio".
    • SUBNETS : liste des sous-réseaux auxquels se connecter. Utilisez des virgules pour séparer plusieurs valeurs de sous-réseau.
    • auto-rebalance : active le rééquilibrage automatique des partitions de sujet entre les agents lorsque le nombre de processeurs du cluster change. Cette option est activée par défaut.
    • LABELS : libellés à associer au cluster.
  3. Si vous utilisez l'indicateur --async avec votre commande, le système envoie la demande de mise à jour et renvoie immédiatement une réponse, sans attendre la fin de l'opération. L'indicateur --async vous permet de poursuivre d'autres tâches pendant que la mise à jour du cluster s'effectue en arrière-plan. Si vous n'utilisez pas l'option --async, le système attend la fin de l'opération avant de renvoyer une réponse. Vous devez attendre que le cluster soit entièrement mis à jour avant de pouvoir poursuivre d'autres tâches.

    REST

    Avant d'utiliser les données de requête, effectuez les remplacements suivants :

    • PROJECT_ID : ID de votre projet Google Cloud
    • LOCATION : emplacement du cluster
    • CLUSTER_ID : ID du cluster
    • UPDATE_MASK : champs à mettre à jour, sous forme de liste de noms complets séparés par une virgule. Exemple : capacityConfig.vcpuCount,capacityConfig.memoryBytes
    • CPU_COUNT : nombre de processeurs virtuels pour le cluster.
    • MEMORY : quantité de mémoire pour le cluster, en octets
    • SUBNET_ID : ID du sous-réseau auquel se connecter

    Méthode HTTP et URL :

    PATCH https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID?updateMask=UPDATE_MASK

    Corps JSON de la requête :

    {
      "capacityConfig": {
        "vcpuCount": CPU_COUNT,
        "memoryBytes": MEMORY
      },
      "gcpConfig": {
        "accessConfig": {
          "networkConfigs": [
            {
              "subnet": "projects/PROJECT_ID/regions/LOCATION/subnetworks/SUBNET_ID"
            }
          ]
        }
      }
    }
    

    Pour envoyer votre requête, développez l'une des options suivantes :

    Vous devriez recevoir une réponse JSON de ce type :

    {
      "name": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC_ID",
      "partitionCount": PARTITION_COUNT,
      "replicationFactor": REPLICATION_FACTOR
    }
    

    Dans le corps de la requête, n'incluez que les champs que vous mettez à jour, comme spécifié dans le paramètre de requête UPDATE_MASK. Pour ajouter un sous-réseau, ajoutez une entrée à networkConfigs.

    Go

    Avant d'essayer cet exemple, suivez les instructions de configuration pour Go dans Installer les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Managed Service pour Apache Kafka en langage Go.

    Pour vous authentifier auprès de Managed Service pour Apache Kafka, configurez les identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer les ADC pour un environnement de développement local.

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    	"google.golang.org/protobuf/types/known/fieldmaskpb"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func updateCluster(w io.Writer, projectID, region, clusterID string, memory int64, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// clusterID := "my-cluster"
    	// memoryBytes := 4221225472
    	ctx := context.Background()
    	client, err := managedkafka.NewClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
    	}
    	defer client.Close()
    
    	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
    	capacityConfig := &managedkafkapb.CapacityConfig{
    		MemoryBytes: memory,
    	}
    	cluster := &managedkafkapb.Cluster{
    		Name:           clusterPath,
    		CapacityConfig: capacityConfig,
    	}
    	paths := []string{"capacity_config.memory_bytes"}
    	updateMask := &fieldmaskpb.FieldMask{
    		Paths: paths,
    	}
    
    	req := &managedkafkapb.UpdateClusterRequest{
    		UpdateMask: updateMask,
    		Cluster:    cluster,
    	}
    	op, err := client.UpdateCluster(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.UpdateCluster got err: %w", err)
    	}
    	resp, err := op.Wait(ctx)
    	if err != nil {
    		return fmt.Errorf("op.Wait got err: %w", err)
    	}
    	fmt.Fprintf(w, "Updated cluster: %#v\n", resp)
    	return nil
    }
    

    Java

    Avant d'essayer cet exemple, suivez les instructions de configuration pour Java dans Installer les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Java pour Managed Service pour Apache Kafka.

    Pour vous authentifier auprès de Managed Service pour Apache Kafka, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer les ADC pour un environnement de développement local.

    
    import com.google.api.gax.longrunning.OperationFuture;
    import com.google.api.gax.longrunning.OperationSnapshot;
    import com.google.api.gax.longrunning.OperationTimedPollAlgorithm;
    import com.google.api.gax.retrying.RetrySettings;
    import com.google.api.gax.retrying.TimedRetryAlgorithm;
    import com.google.cloud.managedkafka.v1.CapacityConfig;
    import com.google.cloud.managedkafka.v1.Cluster;
    import com.google.cloud.managedkafka.v1.ClusterName;
    import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
    import com.google.cloud.managedkafka.v1.ManagedKafkaSettings;
    import com.google.cloud.managedkafka.v1.OperationMetadata;
    import com.google.cloud.managedkafka.v1.UpdateClusterRequest;
    import com.google.protobuf.FieldMask;
    import java.time.Duration;
    import java.util.concurrent.ExecutionException;
    
    public class UpdateCluster {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-cluster";
        long memoryBytes = 25769803776L; // 24 GiB
        updateCluster(projectId, region, clusterId, memoryBytes);
      }
    
      public static void updateCluster(
          String projectId, String region, String clusterId, long memoryBytes) throws Exception {
        CapacityConfig capacityConfig = CapacityConfig.newBuilder().setMemoryBytes(memoryBytes).build();
        Cluster cluster =
            Cluster.newBuilder()
                .setName(ClusterName.of(projectId, region, clusterId).toString())
                .setCapacityConfig(capacityConfig)
                .build();
        FieldMask updateMask = FieldMask.newBuilder().addPaths("capacity_config.memory_bytes").build();
    
        // Create the settings to configure the timeout for polling operations
        ManagedKafkaSettings.Builder settingsBuilder = ManagedKafkaSettings.newBuilder();
        TimedRetryAlgorithm timedRetryAlgorithm = OperationTimedPollAlgorithm.create(
            RetrySettings.newBuilder()
                .setTotalTimeoutDuration(Duration.ofHours(1L))
                .build());
        settingsBuilder.updateClusterOperationSettings()
            .setPollingAlgorithm(timedRetryAlgorithm);
    
        try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create(
            settingsBuilder.build())) {
          UpdateClusterRequest request =
              UpdateClusterRequest.newBuilder().setUpdateMask(updateMask).setCluster(cluster).build();
          OperationFuture<Cluster, OperationMetadata> future =
              managedKafkaClient.updateClusterOperationCallable().futureCall(request);
    
          // Get the initial LRO and print details. CreateCluster contains sample code for polling logs.
          OperationSnapshot operation = future.getInitialFuture().get();
          System.out.printf("Cluster update started. Operation name: %s\nDone: %s\nMetadata: %s\n",
              operation.getName(),
              operation.isDone(),
              future.getMetadata().get().toString());
    
          Cluster response = future.get();
          System.out.printf("Updated cluster: %s\n", response.getName());
        } catch (ExecutionException e) {
          System.err.printf("managedKafkaClient.updateCluster got err: %s", e.getMessage());
        }
      }
    }
    

    Python

    Avant d'essayer cet exemple, suivez les instructions de configuration pour Python dans Installer les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Python Managed Service pour Apache Kafka.

    Pour vous authentifier auprès de Managed Service pour Apache Kafka, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer les ADC pour un environnement de développement local.

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud import managedkafka_v1
    from google.protobuf import field_mask_pb2
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # cluster_id = "my-cluster"
    # memory_bytes = 4295000000
    
    client = managedkafka_v1.ManagedKafkaClient()
    
    cluster = managedkafka_v1.Cluster()
    cluster.name = client.cluster_path(project_id, region, cluster_id)
    cluster.capacity_config.memory_bytes = memory_bytes
    update_mask = field_mask_pb2.FieldMask()
    update_mask.paths.append("capacity_config.memory_bytes")
    
    # For a list of editable fields, one can check https://cloud.google.com/managed-kafka/docs/create-cluster#properties.
    request = managedkafka_v1.UpdateClusterRequest(
        update_mask=update_mask,
        cluster=cluster,
    )
    
    try:
        operation = client.update_cluster(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        response = operation.result()
        print("Updated cluster:", response)
    except GoogleAPICallError as e:
        print(f"The operation failed with error: {e.message}")
    

Étape suivante

Apache Kafka® est une marque déposée d'Apache Software Foundation ou de ses filiales aux États-Unis et/ou dans d'autres pays.