Mettre à jour un sujet Google Cloud Managed Service pour Apache Kafka

Une fois un sujet créé, vous pouvez modifier sa configuration pour mettre à jour les propriétés suivantes : le nombre de partitions et les configurations de sujet qui ne sont pas définies par défaut sur les propriétés déjà définies au niveau du cluster. Vous ne pouvez qu'augmenter le nombre de partitions, pas le diminuer.

Pour mettre à jour un seul sujet, vous pouvez utiliser la Google Cloud console, la Google Cloud CLI, la bibliothèque cliente, l'API Managed Kafka ou les API Apache Kafka Open Source.

Rôles et autorisations requis pour modifier un sujet

Pour obtenir les autorisations nécessaires pour modifier un sujet, demandez à votre administrateur de vous accorder le rôle IAM Éditeur de sujet Managed Kafka(roles/managedkafka.topicEditor) sur votre projet. Pour en savoir plus sur l'attribution de rôles, consultez Gérer l'accès aux projets, aux dossiers et aux organisations.

Ce rôle prédéfini contient les autorisations requises pour modifier un sujet. Pour connaître les autorisations exactes requises, développez la section Autorisations requises :

Autorisations requises

Les autorisations suivantes sont requises pour modifier un sujet :

  • Mettre à jour un sujet: managedkafka.topics.update

Vous pouvez également obtenir ces autorisations avec des rôles personnalisés ou d'autres rôles prédéfinis.

Pour en savoir plus sur ce rôle, consultez Rôles prédéfinis de Managed Service pour Apache Kafka.

Modifier un sujet

Pour modifier un sujet, procédez comme suit :

Console

  1. Dans la console Google Cloud , accédez à la page Clusters.

    accéder aux clusters

    Les clusters que vous avez créés dans un projet sont listés.

  2. Cliquez sur le cluster auquel appartient le sujet que vous souhaitez modifier.

    La page Cluster details (Détails du cluster) s'affiche. Dans l'onglet Resources (Ressources) de cette page, les sujets sont listés.

  3. Cliquez sur le sujet que vous souhaitez modifier.

    La page Topic details (Détails du sujet) s'affiche.

  4. Pour effectuer vos modifications, cliquez sur Edit (Modifier).

  5. Cliquez sur Save (Enregistrer) une fois les modifications effectuées.

gcloud

  1. Dans la Google Cloud console, activez Cloud Shell.

    Activer Cloud Shell

    En bas de la Google Cloud console, une session Cloud Shell démarre et affiche une invite de ligne de commande. Cloud Shell est un environnement shell dans lequel Google Cloud CLI est déjà installé, et dans lequel des valeurs sont déjà définies pour votre projet actuel. L'initialisation de la session peut prendre quelques secondes.

  2. Exécutez la gcloud managed-kafka topics update commande :

    gcloud managed-kafka topics update TOPIC_ID \
      --cluster=CLUSTER_ID \
      --location=LOCATION_ID \
      --partitions=PARTITIONS \
      --configs=CONFIGS
    

    Cette commande modifie la configuration d'un sujet existant dans le cluster Managed Service pour Apache Kafka spécifié. Vous pouvez l'utiliser pour augmenter le nombre de partitions et mettre à jour les paramètres de configuration au niveau du sujet.

    Remplacez les éléments suivants :

    • TOPIC_ID : ID du sujet
    • CLUSTER_ID : ID du cluster contenant le sujet
    • LOCATION_ID : emplacement du cluster
    • PARTITIONS (facultatif) : nouveau nombre de partitions pour le sujet Vous ne pouvez qu'augmenter le nombre de partitions, pas le diminuer.
    • CONFIGS (facultatif) : liste des paramètres de configuration à mettre à jour Spécifiez-les sous la forme d'une liste de paires clé/valeur séparées par une virgule. Exemple : retention.ms=3600000,retention.bytes=10000000

REST

Avant d'utiliser les données de requête ci-dessous, effectuez les remplacements suivants :

  • PROJECT_ID: ID de votre Google Cloud projet
  • LOCATION : emplacement du cluster
  • CLUSTER_ID : ID du cluster
  • TOPIC_ID : ID du sujet
  • UPDATE_MASK : champs à mettre à jour, sous la forme d'une liste de noms complets séparés par une virgule Exemple : partitionCount
  • PARTITION_COUNT : nouveau nombre de partitions pour le sujet

Méthode HTTP et URL :

PATCH https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC_ID?updateMask=UPDATE_MASK

Corps JSON de la requête :

{
  "name": "TOPIC_ID",
  "partitionCount": PARTITION_COUNT
}

Pour envoyer votre requête, développez l'une des options suivantes :

Vous devriez recevoir une réponse JSON de ce type :

{
  "name": "projects/PROJECT_ID/locations/LOCATION/operations/OPERATION_ID",
  "metadata": {
    "@type": "type.googleapis.com/google.cloud.managedkafka.v1.OperationMetadata",
    "createTime": "CREATE_TIME",
    "target": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID",
    "verb": "update",
    "requestedCancellation": false,
    "apiVersion": "v1"
  },
  "done": false
}

Go

Avant d'essayer cet exemple, suivez les instructions de configuration pour Go dans Installer les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Managed Service pour Apache Kafka en langage Go.

Pour vous authentifier auprès de Managed Service pour Apache Kafka, configurez les Identifiants par défaut de l'application(ADC). Pour en savoir plus, consultez Configurer les ADC pour un environnement de développement local.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/option"
	"google.golang.org/protobuf/types/known/fieldmaskpb"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
)

func updateTopic(w io.Writer, projectID, region, clusterID, topicID string, partitionCount int32, configs map[string]string, opts ...option.ClientOption) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// clusterID := "my-cluster"
	// topicID := "my-topic"
	// partitionCount := 20
	// configs := map[string]string{"min.insync.replicas":"1"}
	ctx := context.Background()
	client, err := managedkafka.NewClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
	}
	defer client.Close()

	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
	topicPath := fmt.Sprintf("%s/topics/%s", clusterPath, topicID)
	TopicConfig := managedkafkapb.Topic{
		Name:           topicPath,
		PartitionCount: partitionCount,
		Configs:        configs,
	}
	paths := []string{"partition_count", "configs"}
	updateMask := &fieldmaskpb.FieldMask{
		Paths: paths,
	}

	req := &managedkafkapb.UpdateTopicRequest{
		UpdateMask: updateMask,
		Topic:      &TopicConfig,
	}
	topic, err := client.UpdateTopic(ctx, req)
	if err != nil {
		return fmt.Errorf("client.UpdateTopic got err: %w", err)
	}
	fmt.Fprintf(w, "Updated topic: %#v\n", topic)
	return nil
}

Java

Avant d'essayer cet exemple, suivez les instructions de configuration pour Java dans Installer les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Managed Service pour Apache Kafka en langage Java.

Pour vous authentifier auprès de Managed Service pour Apache Kafka, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer les ADC pour un environnement de développement local.

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
import com.google.cloud.managedkafka.v1.Topic;
import com.google.cloud.managedkafka.v1.TopicName;
import com.google.cloud.managedkafka.v1.UpdateTopicRequest;
import com.google.protobuf.FieldMask;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

public class UpdateTopic {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String clusterId = "my-cluster";
    String topicId = "my-topic";
    int partitionCount = 200;
    Map<String, String> configs =
        new HashMap<String, String>() {
          {
            put("min.insync.replicas", "1");
          }
        };
    updateTopic(projectId, region, clusterId, topicId, partitionCount, configs);
  }

  public static void updateTopic(
      String projectId,
      String region,
      String clusterId,
      String topicId,
      int partitionCount,
      Map<String, String> configs)
      throws Exception {
    Topic topic =
        Topic.newBuilder()
            .setName(TopicName.of(projectId, region, clusterId, topicId).toString())
            .setPartitionCount(partitionCount)
            .putAllConfigs(configs)
            .build();
    String[] paths = {"partition_count", "configs"};
    FieldMask updateMask = FieldMask.newBuilder().addAllPaths(Arrays.asList(paths)).build();
    try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) {
      UpdateTopicRequest request =
          UpdateTopicRequest.newBuilder().setUpdateMask(updateMask).setTopic(topic).build();
      // This operation is being handled synchronously.
      Topic response = managedKafkaClient.updateTopic(request);
      System.out.printf("Updated topic: %s\n", response.getName());
    } catch (IOException | ApiException e) {
      System.err.printf("managedKafkaClient.updateCluster got err: %s", e.getMessage());
    }
  }
}

Python

Avant d'essayer cet exemple, suivez les instructions de configuration pour Python dans Installer les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Managed Service pour Apache Kafka en langage Python.

Pour vous authentifier auprès de Managed Service pour Apache Kafka, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer les ADC pour un environnement de développement local.

from google.api_core.exceptions import NotFound
from google.cloud import managedkafka_v1
from google.protobuf import field_mask_pb2

# TODO(developer)
# project_id = "my-project-id"
# region = "us-central1"
# cluster_id = "my-cluster"
# topic_id = "my-topic"
# partition_count = 20
# configs = {"min.insync.replicas": "1"}

client = managedkafka_v1.ManagedKafkaClient()

topic = managedkafka_v1.Topic()
topic.name = client.topic_path(project_id, region, cluster_id, topic_id)
topic.partition_count = partition_count
topic.configs = configs
update_mask = field_mask_pb2.FieldMask()
update_mask.paths.extend(["partition_count", "configs"])

# For a list of editable fields, one can check https://cloud.google.com/managed-kafka/docs/create-topic#properties.
request = managedkafka_v1.UpdateTopicRequest(
    update_mask=update_mask,
    topic=topic,
)

try:
    response = client.update_topic(request=request)
    print("Updated topic:", response)
except NotFound as e:
    print(f"Failed to update topic {topic_id} with error: {e.message}")

Configurer la conservation des messages

Kafka stocke les messages dans des fichiers de segments de journaux. Par défaut, Kafka supprime les fichiers de segments après une période de conservation ou lorsqu'une partition dépasse un seuil de taille de données. Vous pouvez modifier ce comportement en activant la compression des journaux. Si la compression des journaux est activée, Kafka ne conserve que la dernière valeur de chaque clé.

Google Cloud Managed Service pour Apache Kafka utilise le stockage hiérarchisé, ce qui signifie que les segments de journaux terminés sont stockés à distance, plutôt que dans un stockage local. Pour en savoir plus sur le stockage hiérarchisé, consultez Stockage hiérarchisé dans la documentation Apache Kafka.

Définir les valeurs de conservation

Si la compression des journaux n'est pas activée, les paramètres suivants contrôlent la façon dont Kafka stocke les fichiers de segments de journaux :

  • retention.ms: durée maximale de conservation des fichiers de segments, en millisecondes
  • retention.bytes : nombre maximal d'octets à stocker par partition Si les données d'une partition dépassent cette valeur, Kafka supprime les fichiers de segments les plus anciens.

Pour modifier ces paramètres, utilisez la gcloud CLI ou la CLI Kafka :

gcloud

Pour définir la conservation des messages, exécutez la gcloud managed-kafka topics update commande.

gcloud managed-kafka topics update TOPIC_ID \
  --cluster=CLUSTER_ID \
  --location=LOCATION_ID \
  --configs=retention.ms=RETENTION_PERIOD,retention.bytes=MAX_BYTES

Remplacez les éléments suivants :

  • TOPIC_ID : ID du sujet
  • CLUSTER_ID : ID du cluster contenant le sujet
  • LOCATION_ID : emplacement du cluster
  • RETENTION_PERIOD : durée maximale de conservation des fichiers de segments, en millisecondes
  • MAX_BYTES : nombre maximal d'octets à stocker par partition

CLI Kafka

Avant d'exécuter cette commande, installez les outils de ligne de commande Kafka sur une VM Compute Engine. La VM doit pouvoir accéder à un sous-réseau connecté à votre cluster Managed Service pour Apache Kafka. Suivez les instructions de la section Produire et consommer des messages avec les outils de ligne de commande Kafka.

Exécutez la commande kafka-configs.sh :

kafka-configs.sh --alter \
  --bootstrap-server=BOOTSTRAP_ADDRESS \
  --command-config client.properties \
  --entity-type topics \
  --entity-name TOPIC_ID \
  --add-config retention.ms=RETENTION_PERIOD,retention.bytes=MAX_BYTES

Remplacez les éléments suivants :

Activer la compression des journaux

Si la compression des journaux est activée, Kafka ne stocke que le dernier message de chaque clé. La compression des journaux est désactivée par défaut. Pour l'activer pour un sujet, définissez la configuration cleanup.policy sur "compact", comme suit :

gcloud

Exécutez la gcloud managed-kafka topics update commande.

gcloud managed-kafka topics update TOPIC_ID \
  --cluster=CLUSTER_ID \
  --location=LOCATION_ID \
  --configs=cleanup.policy=compact

Remplacez les éléments suivants :

  • TOPIC_ID : ID du sujet
  • CLUSTER_ID : ID du cluster contenant le sujet
  • LOCATION_ID : emplacement du cluster

CLI Kafka

Avant d'exécuter cette commande, installez les outils de ligne de commande Kafka sur une VM Compute Engine. La VM doit pouvoir accéder à un sous-réseau connecté à votre cluster Managed Service pour Apache Kafka. Suivez les instructions de la section Produire et consommer des messages avec les outils de ligne de commande Kafka.

Exécutez la commande kafka-configs.sh :

kafka-configs.sh --alter \
  --bootstrap-server=BOOTSTRAP_ADDRESS \
  --command-config client.properties \
  --entity-type topics \
  --entity-name TOPIC_ID \
  --add-config cleanup.policy=compact

Remplacez les éléments suivants :

Limites

  • Vous ne pouvez pas remplacer les configurations de sujet pour le stockage à distance, telles que remote.storage.enable.

  • Vous ne pouvez pas remplacer les configurations de sujet pour les fichiers de segments de journaux, telles que segment.bytes.

  • L'activation de la compression des journaux pour un sujet désactive implicitement le stockage hiérarchisé pour ce sujet. Tous les fichiers journaux du sujet sont stockés localement.

Étape suivante