Mettre à jour un sujet Google Cloud Managed Service pour Apache Kafka

Une fois un sujet créé, vous pouvez modifier sa configuration pour mettre à jour les propriétés suivantes : le nombre de partitions et les configurations de sujet qui ne sont pas définies par défaut sur les propriétés déjà définies au niveau du cluster. Vous ne pouvez qu'augmenter le nombre de partitions, et non le diminuer.

Pour mettre à jour un seul sujet, vous pouvez utiliser la console Google Cloud , Google Cloud CLI, la bibliothèque cliente, l'API Managed Kafka ou les API Apache Kafka Open Source.

Rôles et autorisations requis pour modifier un thème

Pour obtenir les autorisations nécessaires pour modifier un thème, demandez à votre administrateur de vous accorder le rôle IAM "Éditeur de thèmes Kafka gérés" (roles/managedkafka.topicEditor) sur votre projet. Pour en savoir plus sur l'attribution de rôles, consultez Gérer l'accès aux projets, aux dossiers et aux organisations.

Ce rôle prédéfini contient les autorisations requises pour modifier un thème. Pour connaître les autorisations exactes requises, développez la section Autorisations requises :

Autorisations requises

Les autorisations suivantes sont requises pour modifier un thème :

  • Mettre à jour un thème : managedkafka.topics.update

Vous pouvez également obtenir ces autorisations avec des rôles personnalisés ou d'autres rôles prédéfinis.

Pour en savoir plus sur ce rôle, consultez Rôles prédéfinis de Managed Service pour Apache Kafka.

Modifier un sujet

Pour modifier un sujet, procédez comme suit :

Console

  1. Dans la console Google Cloud , accédez à la page Clusters.

    accéder aux clusters

    La liste des clusters que vous avez créés dans un projet s'affiche.

  2. Cliquez sur le cluster auquel appartient la rubrique que vous souhaitez modifier.

    La page Détails du cluster s'ouvre. Les thèmes sont listés dans l'onglet Ressources de la page d'informations sur le cluster.

  3. Cliquez sur le thème que vous souhaitez modifier.

    La page Détails du sujet s'affiche.

  4. Pour apporter des modifications, cliquez sur Modifier.

  5. Cliquez sur Enregistrer après avoir effectué les modifications.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Exécutez la commande gcloud managed-kafka topics update :

    gcloud managed-kafka topics update TOPIC_ID \
      --cluster=CLUSTER_ID \
      --location=LOCATION_ID \
      --partitions=PARTITIONS \
      --configs=CONFIGS
    

    Cette commande modifie la configuration d'un sujet existant dans le cluster Managed Service pour Apache Kafka spécifié. Vous pouvez utiliser cette commande pour augmenter le nombre de partitions et mettre à jour les paramètres de configuration au niveau du thème.

    Remplacez les éléments suivants :

    • TOPIC_ID : ID du sujet.
    • CLUSTER_ID : ID du cluster contenant le sujet.
    • LOCATION_ID : emplacement du cluster.
    • PARTITIONS (facultatif) : nombre de partitions mis à jour pour le sujet. Vous ne pouvez qu'augmenter le nombre de partitions, et non le diminuer.
    • CONFIGS : (Facultatif) Liste des paramètres de configuration à mettre à jour. Spécifiez-les sous forme de liste de paires clé-valeur séparées par une virgule. Exemple :retention.ms=3600000,retention.bytes=10000000
  3. REST

    Avant d'utiliser les données de requête, effectuez les remplacements suivants :

    • PROJECT_ID : ID de votre projet Google Cloud
    • LOCATION : emplacement du cluster
    • CLUSTER_ID : ID du cluster
    • TOPIC_ID : ID du sujet
    • UPDATE_MASK : champs à mettre à jour, sous forme de liste de noms complets séparés par une virgule. Exemple : partitionCount
    • PARTITION_COUNT : nouveau nombre de partitions pour le sujet

    Méthode HTTP et URL :

    PATCH https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC_ID?updateMask=UPDATE_MASK

    Corps JSON de la requête :

    {
      "name": "TOPIC_ID",
      "partitionCount": PARTITION_COUNT
    }
    

    Pour envoyer votre requête, développez l'une des options suivantes :

    Vous devriez recevoir une réponse JSON de ce type :

    {
      "name": "projects/PROJECT_ID/locations/LOCATION/operations/OPERATION_ID",
      "metadata": {
        "@type": "type.googleapis.com/google.cloud.managedkafka.v1.OperationMetadata",
        "createTime": "CREATE_TIME",
        "target": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID",
        "verb": "update",
        "requestedCancellation": false,
        "apiVersion": "v1"
      },
      "done": false
    }
    

    Go

    Avant d'essayer cet exemple, suivez les instructions de configuration pour Go dans Installer les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Managed Service pour Apache Kafka en langage Go.

    Pour vous authentifier auprès de Managed Service pour Apache Kafka, configurez les identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer les ADC pour un environnement de développement local.

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    	"google.golang.org/protobuf/types/known/fieldmaskpb"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func updateTopic(w io.Writer, projectID, region, clusterID, topicID string, partitionCount int32, configs map[string]string, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// clusterID := "my-cluster"
    	// topicID := "my-topic"
    	// partitionCount := 20
    	// configs := map[string]string{"min.insync.replicas":"1"}
    	ctx := context.Background()
    	client, err := managedkafka.NewClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
    	}
    	defer client.Close()
    
    	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
    	topicPath := fmt.Sprintf("%s/topics/%s", clusterPath, topicID)
    	TopicConfig := managedkafkapb.Topic{
    		Name:           topicPath,
    		PartitionCount: partitionCount,
    		Configs:        configs,
    	}
    	paths := []string{"partition_count", "configs"}
    	updateMask := &fieldmaskpb.FieldMask{
    		Paths: paths,
    	}
    
    	req := &managedkafkapb.UpdateTopicRequest{
    		UpdateMask: updateMask,
    		Topic:      &TopicConfig,
    	}
    	topic, err := client.UpdateTopic(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.UpdateTopic got err: %w", err)
    	}
    	fmt.Fprintf(w, "Updated topic: %#v\n", topic)
    	return nil
    }
    

    Java

    Avant d'essayer cet exemple, suivez les instructions de configuration pour Java dans Installer les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Java pour Managed Service pour Apache Kafka.

    Pour vous authentifier auprès de Managed Service pour Apache Kafka, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer les ADC pour un environnement de développement local.

    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
    import com.google.cloud.managedkafka.v1.Topic;
    import com.google.cloud.managedkafka.v1.TopicName;
    import com.google.cloud.managedkafka.v1.UpdateTopicRequest;
    import com.google.protobuf.FieldMask;
    import java.io.IOException;
    import java.util.Arrays;
    import java.util.HashMap;
    import java.util.Map;
    
    public class UpdateTopic {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-cluster";
        String topicId = "my-topic";
        int partitionCount = 200;
        Map<String, String> configs =
            new HashMap<String, String>() {
              {
                put("min.insync.replicas", "1");
              }
            };
        updateTopic(projectId, region, clusterId, topicId, partitionCount, configs);
      }
    
      public static void updateTopic(
          String projectId,
          String region,
          String clusterId,
          String topicId,
          int partitionCount,
          Map<String, String> configs)
          throws Exception {
        Topic topic =
            Topic.newBuilder()
                .setName(TopicName.of(projectId, region, clusterId, topicId).toString())
                .setPartitionCount(partitionCount)
                .putAllConfigs(configs)
                .build();
        String[] paths = {"partition_count", "configs"};
        FieldMask updateMask = FieldMask.newBuilder().addAllPaths(Arrays.asList(paths)).build();
        try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) {
          UpdateTopicRequest request =
              UpdateTopicRequest.newBuilder().setUpdateMask(updateMask).setTopic(topic).build();
          // This operation is being handled synchronously.
          Topic response = managedKafkaClient.updateTopic(request);
          System.out.printf("Updated topic: %s\n", response.getName());
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaClient.updateCluster got err: %s", e.getMessage());
        }
      }
    }
    

    Python

    Avant d'essayer cet exemple, suivez les instructions de configuration pour Python dans Installer les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Python Managed Service pour Apache Kafka.

    Pour vous authentifier auprès de Managed Service pour Apache Kafka, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer les ADC pour un environnement de développement local.

    from google.api_core.exceptions import NotFound
    from google.cloud import managedkafka_v1
    from google.protobuf import field_mask_pb2
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # cluster_id = "my-cluster"
    # topic_id = "my-topic"
    # partition_count = 20
    # configs = {"min.insync.replicas": "1"}
    
    client = managedkafka_v1.ManagedKafkaClient()
    
    topic = managedkafka_v1.Topic()
    topic.name = client.topic_path(project_id, region, cluster_id, topic_id)
    topic.partition_count = partition_count
    topic.configs = configs
    update_mask = field_mask_pb2.FieldMask()
    update_mask.paths.extend(["partition_count", "configs"])
    
    # For a list of editable fields, one can check https://cloud.google.com/managed-kafka/docs/create-topic#properties.
    request = managedkafka_v1.UpdateTopicRequest(
        update_mask=update_mask,
        topic=topic,
    )
    
    try:
        response = client.update_topic(request=request)
        print("Updated topic:", response)
    except NotFound as e:
        print(f"Failed to update topic {topic_id} with error: {e.message}")
    

Configurer la conservation des messages

Kafka stocke les messages dans des fichiers de segments de journaux. Par défaut, Kafka supprime les fichiers de segment après une période de conservation ou lorsqu'une partition dépasse un seuil de taille de données. Vous pouvez modifier ce comportement en activant la compression des journaux. Si la compaction des journaux est activée, Kafka ne conserve que la dernière valeur pour chaque clé.

Google Cloud Managed Service pour Apache Kafka utilise le stockage hiérarchisé, ce qui signifie que les segments de journaux terminés sont stockés à distance plutôt que localement. Pour en savoir plus sur le stockage hiérarchisé, consultez Stockage hiérarchisé dans la documentation Apache Kafka.

Définir les valeurs de rétention

Si la compaction des journaux n'est pas activée, les paramètres suivants contrôlent la façon dont Kafka stocke les fichiers de segments de journaux :

  • retention.ms : durée maximale de sauvegarde des fichiers de segment, en millisecondes.
  • retention.bytes : nombre maximal d'octets à stocker par partition. Si les données d'une partition dépassent cette valeur, Kafka supprime les fichiers de segment les plus anciens.

Pour mettre à jour ces paramètres, utilisez gcloud CLI ou Kafka CLI :

gcloud

Pour définir la durée de conservation des messages, exécutez la commande gcloud managed-kafka topics update.

gcloud managed-kafka topics update TOPIC_ID \
  --cluster=CLUSTER_ID \
  --location=LOCATION_ID \
  --configs=retention.ms=RETENTION_PERIOD,retention.bytes=MAX_BYTES

Remplacez les éléments suivants :

  • TOPIC_ID : ID du sujet.
  • CLUSTER_ID : ID du cluster contenant le thème.
  • LOCATION_ID : emplacement du cluster.
  • RETENTION_PERIOD : durée maximale de stockage des fichiers de segment, en millisecondes.
  • MAX_BYTES : nombre maximal d'octets à stocker par partition.

CLI Kafka

Avant d'exécuter cette commande, installez les outils de ligne de commande Kafka sur une VM Compute Engine. La VM doit pouvoir accéder à un sous-réseau connecté à votre cluster Managed Service pour Apache Kafka. Suivez les instructions de la section Produire et consommer des messages avec les outils de ligne de commande Kafka.

Exécutez la commande kafka-configs.sh :

kafka-configs.sh --alter \
  --bootstrap-server=BOOTSTRAP_ADDRESS \
  --command-config client.properties \
  --entity-type topics \
  --entity-name TOPIC_ID \
  --add-config retention.ms=RETENTION_PERIOD,retention.bytes=MAX_BYTES

Remplacez les éléments suivants :

  • BOOTSTRAP_ADDRESS : adresse d'amorçage du cluster Managed Service pour Apache Kafka.
  • TOPIC_ID : ID du sujet.
  • RETENTION_PERIOD : durée maximale de stockage des fichiers de segment, en millisecondes.
  • MAX_BYTES : nombre maximal d'octets à stocker par partition.

Activer la compression des journaux

Si la compaction des journaux est activée, Kafka ne stocke que le dernier message pour chaque clé. La compaction des journaux est désactivée par défaut. Pour activer la compaction des journaux pour un sujet, définissez la configuration cleanup.policy sur "compact", comme suit :

gcloud

Exécutez la commande gcloud managed-kafka topics update.

gcloud managed-kafka topics update TOPIC_ID \
  --cluster=CLUSTER_ID \
  --location=LOCATION_ID \
  --configs=cleanup.policy=compact

Remplacez les éléments suivants :

  • TOPIC_ID : ID du sujet.
  • CLUSTER_ID : ID du cluster contenant le thème.
  • LOCATION_ID : emplacement du cluster.

CLI Kafka

Avant d'exécuter cette commande, installez les outils de ligne de commande Kafka sur une VM Compute Engine. La VM doit pouvoir accéder à un sous-réseau connecté à votre cluster Managed Service pour Apache Kafka. Suivez les instructions de la section Produire et consommer des messages avec les outils de ligne de commande Kafka.

Exécutez la commande kafka-configs.sh :

kafka-configs.sh --alter \
  --bootstrap-server=BOOTSTRAP_ADDRESS \
  --command-config client.properties \
  --entity-type topics \
  --entity-name TOPIC_ID \
  --add-config cleanup.policy=compact

Remplacez les éléments suivants :

  • BOOTSTRAP_ADDRESS : adresse d'amorçage du cluster Managed Service pour Apache Kafka.
  • TOPIC_ID : ID du sujet.

Limites

  • Vous ne pouvez pas remplacer les configurations de sujet pour le stockage à distance, comme remote.storage.enable.

  • Vous ne pouvez pas remplacer les configurations de topics pour les fichiers de segments de journaux, tels que segment.bytes.

  • L'activation de la compaction des journaux pour un sujet désactive implicitement le stockage hiérarchisé pour ce sujet. Tous les fichiers journaux du sujet sont stockés en local.

Étape suivante