Aggiorna un argomento Google Cloud Managed Service per Apache Kafka

Dopo aver creato un argomento, puoi modificarne la configurazione per aggiornare queste proprietà: il numero di partizioni e le configurazioni dell'argomento che non vengono impostate per impostazione predefinita sulle proprietà già impostate a livello di cluster. Puoi solo aumentare il numero di partizioni, non diminuirlo.

Per aggiornare un singolo argomento, puoi utilizzare la console Google Cloud , Google Cloud CLI, la libreria client, l'API Managed Kafka o le API Apache Kafka open source.

Ruoli e autorizzazioni richiesti per modificare un argomento

Per ottenere le autorizzazioni necessarie per modificare un argomento, chiedi all'amministratore di concederti il ruolo IAM Managed Kafka Topic Editor(roles/managedkafka.topicEditor) nel progetto. Per saperne di più sulla concessione dei ruoli, consulta Gestisci l'accesso a progetti, cartelle e organizzazioni.

Questo ruolo predefinito contiene le autorizzazioni necessarie per modificare un argomento. Per vedere quali sono esattamente le autorizzazioni richieste, espandi la sezione Autorizzazioni obbligatorie:

Autorizzazioni obbligatorie

Per modificare un argomento sono necessarie le seguenti autorizzazioni:

  • Aggiorna un argomento: managedkafka.topics.update

Potresti anche ottenere queste autorizzazioni con ruoli personalizzati o altri ruoli predefiniti.

Per saperne di più su questo ruolo, consulta Ruoli predefiniti di Managed Service per Apache Kafka.

Modificare un argomento

Per modificare un argomento:

Console

  1. Nella console Google Cloud , vai alla pagina Cluster.

    Vai a Cluster

    Viene visualizzato l'elenco dei cluster che hai creato in un progetto.

  2. Fai clic sul cluster a cui appartiene l'argomento che vuoi modificare.

    Viene visualizzata la pagina Dettagli cluster. Nella pagina dei dettagli del cluster, sono elencati gli argomenti della scheda Risorse.

  3. Fai clic sull'argomento che vuoi modificare.

    Viene visualizzata la pagina Dettagli argomento.

  4. Per apportare le modifiche, fai clic su Modifica.

  5. Fai clic su Salva dopo le modifiche.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Esegui il comando gcloud managed-kafka topics update:

    gcloud managed-kafka topics update TOPIC_ID \
      --cluster=CLUSTER_ID \
      --location=LOCATION_ID \
      --partitions=PARTITIONS \
      --configs=CONFIGS
    

    Questo comando modifica la configurazione di un argomento esistente nel cluster Managed Service per Apache Kafka specificato. Puoi utilizzare questo comando per aumentare il numero di partizioni e aggiornare le impostazioni di configurazione a livello di argomento.

    Sostituisci quanto segue:

    • TOPIC_ID: l'ID dell'argomento.
    • CLUSTER_ID: l'ID del cluster che contiene l'argomento.
    • LOCATION_ID: la posizione del cluster.
    • PARTITIONS: (Facoltativo) il numero aggiornato di partizioni per l'argomento. Puoi solo aumentare il numero di partizioni, non diminuirlo.
    • CONFIGS: (Facoltativo) un elenco di impostazioni di configurazione da aggiornare. Specifica come elenco separato da virgole di coppie chiave-valore. Ad esempio, retention.ms=3600000,retention.bytes=10000000.
  3. REST

    Prima di utilizzare i dati della richiesta, apporta le sostituzioni seguenti:

    • PROJECT_ID: il tuo Google Cloud ID progetto
    • LOCATION: la posizione del cluster
    • CLUSTER_ID: l'ID del cluster
    • TOPIC_ID: l'ID dell'argomento
    • UPDATE_MASK: quali campi aggiornare, come un elenco separato da virgole di nomi completi. Esempio: partitionCount
    • PARTITION_COUNT: il numero aggiornato di partizioni per l'argomento

    Metodo HTTP e URL:

    PATCH https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC_ID?updateMask=UPDATE_MASK

    Corpo JSON della richiesta:

    {
      "name": "TOPIC_ID",
      "partitionCount": PARTITION_COUNT
    }
    

    Per inviare la richiesta, espandi una di queste opzioni:

    Dovresti ricevere una risposta JSON simile alla seguente:

    {
      "name": "projects/PROJECT_ID/locations/LOCATION/operations/OPERATION_ID",
      "metadata": {
        "@type": "type.googleapis.com/google.cloud.managedkafka.v1.OperationMetadata",
        "createTime": "CREATE_TIME",
        "target": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID",
        "verb": "update",
        "requestedCancellation": false,
        "apiVersion": "v1"
      },
      "done": false
    }
    

    Go

    Prima di provare questo esempio, segui le istruzioni di configurazione di Go in Installare le librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Go di Managed Service per Apache Kafka.

    Per eseguire l'autenticazione in Managed Service per Apache Kafka, configura le Credenziali predefinite dell'applicazione(ADC). Per saperne di più, vedi Configura ADC per un ambiente di sviluppo locale.

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    	"google.golang.org/protobuf/types/known/fieldmaskpb"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func updateTopic(w io.Writer, projectID, region, clusterID, topicID string, partitionCount int32, configs map[string]string, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// clusterID := "my-cluster"
    	// topicID := "my-topic"
    	// partitionCount := 20
    	// configs := map[string]string{"min.insync.replicas":"1"}
    	ctx := context.Background()
    	client, err := managedkafka.NewClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
    	}
    	defer client.Close()
    
    	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
    	topicPath := fmt.Sprintf("%s/topics/%s", clusterPath, topicID)
    	TopicConfig := managedkafkapb.Topic{
    		Name:           topicPath,
    		PartitionCount: partitionCount,
    		Configs:        configs,
    	}
    	paths := []string{"partition_count", "configs"}
    	updateMask := &fieldmaskpb.FieldMask{
    		Paths: paths,
    	}
    
    	req := &managedkafkapb.UpdateTopicRequest{
    		UpdateMask: updateMask,
    		Topic:      &TopicConfig,
    	}
    	topic, err := client.UpdateTopic(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.UpdateTopic got err: %w", err)
    	}
    	fmt.Fprintf(w, "Updated topic: %#v\n", topic)
    	return nil
    }
    

    Java

    Prima di provare questo esempio, segui le istruzioni di configurazione di Java in Installare le librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Java di Managed Service per Apache Kafka.

    Per eseguire l'autenticazione in Managed Service per Apache Kafka, configura le Credenziali predefinite dell'applicazione. Per saperne di più, vedi Configura ADC per un ambiente di sviluppo locale.

    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
    import com.google.cloud.managedkafka.v1.Topic;
    import com.google.cloud.managedkafka.v1.TopicName;
    import com.google.cloud.managedkafka.v1.UpdateTopicRequest;
    import com.google.protobuf.FieldMask;
    import java.io.IOException;
    import java.util.Arrays;
    import java.util.HashMap;
    import java.util.Map;
    
    public class UpdateTopic {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-cluster";
        String topicId = "my-topic";
        int partitionCount = 200;
        Map<String, String> configs =
            new HashMap<String, String>() {
              {
                put("min.insync.replicas", "1");
              }
            };
        updateTopic(projectId, region, clusterId, topicId, partitionCount, configs);
      }
    
      public static void updateTopic(
          String projectId,
          String region,
          String clusterId,
          String topicId,
          int partitionCount,
          Map<String, String> configs)
          throws Exception {
        Topic topic =
            Topic.newBuilder()
                .setName(TopicName.of(projectId, region, clusterId, topicId).toString())
                .setPartitionCount(partitionCount)
                .putAllConfigs(configs)
                .build();
        String[] paths = {"partition_count", "configs"};
        FieldMask updateMask = FieldMask.newBuilder().addAllPaths(Arrays.asList(paths)).build();
        try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) {
          UpdateTopicRequest request =
              UpdateTopicRequest.newBuilder().setUpdateMask(updateMask).setTopic(topic).build();
          // This operation is being handled synchronously.
          Topic response = managedKafkaClient.updateTopic(request);
          System.out.printf("Updated topic: %s\n", response.getName());
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaClient.updateCluster got err: %s", e.getMessage());
        }
      }
    }
    

    Python

    Prima di provare questo esempio, segui le istruzioni di configurazione di Python in Installare le librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Python di Managed Service per Apache Kafka.

    Per eseguire l'autenticazione in Managed Service per Apache Kafka, configura le Credenziali predefinite dell'applicazione. Per saperne di più, vedi Configura ADC per un ambiente di sviluppo locale.

    from google.api_core.exceptions import NotFound
    from google.cloud import managedkafka_v1
    from google.protobuf import field_mask_pb2
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # cluster_id = "my-cluster"
    # topic_id = "my-topic"
    # partition_count = 20
    # configs = {"min.insync.replicas": "1"}
    
    client = managedkafka_v1.ManagedKafkaClient()
    
    topic = managedkafka_v1.Topic()
    topic.name = client.topic_path(project_id, region, cluster_id, topic_id)
    topic.partition_count = partition_count
    topic.configs = configs
    update_mask = field_mask_pb2.FieldMask()
    update_mask.paths.extend(["partition_count", "configs"])
    
    # For a list of editable fields, one can check https://cloud.google.com/managed-kafka/docs/create-topic#properties.
    request = managedkafka_v1.UpdateTopicRequest(
        update_mask=update_mask,
        topic=topic,
    )
    
    try:
        response = client.update_topic(request=request)
        print("Updated topic:", response)
    except NotFound as e:
        print(f"Failed to update topic {topic_id} with error: {e.message}")
    

Configura la conservazione dei messaggi

Kafka archivia i messaggi in file di segmenti di log. Per impostazione predefinita, Kafka elimina i file di segmento dopo un periodo di conservazione o quando una partizione supera una soglia di dimensioni dei dati. Puoi modificare questo comportamento attivando la compressione dei log. Se la compattazione dei log è abilitata, Kafka conserva solo l'ultimo valore per ogni chiave.

Google Cloud Managed Service per Apache Kafka utilizza l'archiviazione a livelli, il che significa che i segmenti di log completati vengono archiviati in remoto, anziché nell'archiviazione locale. Per scoprire di più sull'archiviazione a livelli, consulta Archiviazione a livelli nella documentazione di Apache Kafka.

Imposta i valori di conservazione

Se la compattazione dei log non è abilitata, le seguenti impostazioni controllano il modo in cui Kafka memorizza i file dei segmenti di log:

  • retention.ms: La durata massima di salvataggio dei file di segmento, in millisecondi.
  • retention.bytes: il numero massimo di byte da archiviare per partizione. Se i dati in una partizione superano questo valore, Kafka elimina i file di segmento meno recenti.

Per aggiornare queste impostazioni, utilizza gcloud CLI o Kafka CLI:

gcloud

Per impostare la conservazione dei messaggi, esegui il comando gcloud managed-kafka topics update.

gcloud managed-kafka topics update TOPIC_ID \
  --cluster=CLUSTER_ID \
  --location=LOCATION_ID \
  --configs=retention.ms=RETENTION_PERIOD,retention.bytes=MAX_BYTES

Sostituisci quanto segue:

  • TOPIC_ID: l'ID dell'argomento.
  • CLUSTER_ID: l'ID del cluster contenente l'argomento.
  • LOCATION_ID: la posizione del cluster.
  • RETENTION_PERIOD: il periodo di tempo massimo per archiviare i file di segmento, in millisecondi.
  • MAX_BYTES: il numero massimo di byte da memorizzare per partizione.

Interfaccia a riga di comando Kafka

Prima di eseguire questo comando, installa gli strumenti a riga di comando Kafka su una VM Compute Engine. La VM deve essere in grado di raggiungere una subnet connessa al cluster Managed Service per Apache Kafka. Segui le istruzioni riportate in Produci e utilizza messaggi con gli strumenti a riga di comando Kafka.

Esegui il comando kafka-configs.sh:

kafka-configs.sh --alter \
  --bootstrap-server=BOOTSTRAP_ADDRESS \
  --command-config client.properties \
  --entity-type topics \
  --entity-name TOPIC_ID \
  --add-config retention.ms=RETENTION_PERIOD,retention.bytes=MAX_BYTES

Sostituisci quanto segue:

  • BOOTSTRAP_ADDRESS: l'indirizzo di bootstrap del cluster Managed Service per Apache Kafka.
  • TOPIC_ID: l'ID dell'argomento.
  • RETENTION_PERIOD: il periodo di tempo massimo per archiviare i file di segmento, in millisecondi.
  • MAX_BYTES: il numero massimo di byte da memorizzare per partizione.

Abilita la compattazione dei log

Se la compattazione dei log è abilitata, Kafka memorizza solo l'ultimo messaggio per ogni chiave. La compattazione dei log è disattivata per impostazione predefinita. Per attivare la compattazione dei log per un argomento, imposta la configurazione cleanup.policy su "compact", come segue:

gcloud

Esegui il comando gcloud managed-kafka topics update.

gcloud managed-kafka topics update TOPIC_ID \
  --cluster=CLUSTER_ID \
  --location=LOCATION_ID \
  --configs=cleanup.policy=compact

Sostituisci quanto segue:

  • TOPIC_ID: l'ID dell'argomento.
  • CLUSTER_ID: l'ID del cluster contenente l'argomento.
  • LOCATION_ID: la posizione del cluster.

Interfaccia a riga di comando Kafka

Prima di eseguire questo comando, installa gli strumenti a riga di comando Kafka su una VM Compute Engine. La VM deve essere in grado di raggiungere una subnet connessa al cluster Managed Service per Apache Kafka. Segui le istruzioni riportate in Produci e utilizza messaggi con gli strumenti a riga di comando Kafka.

Esegui il comando kafka-configs.sh:

kafka-configs.sh --alter \
  --bootstrap-server=BOOTSTRAP_ADDRESS \
  --command-config client.properties \
  --entity-type topics \
  --entity-name TOPIC_ID \
  --add-config cleanup.policy=compact

Sostituisci quanto segue:

  • BOOTSTRAP_ADDRESS: l'indirizzo di bootstrap del cluster Managed Service per Apache Kafka.
  • TOPIC_ID: l'ID dell'argomento.

Limitazioni

  • Non puoi ignorare le configurazioni degli argomenti per l'archiviazione remota, ad esempio remote.storage.enable.

  • Non puoi ignorare le configurazioni degli argomenti per i file di segmenti di log, ad esempio segment.bytes.

  • L'attivazione della compattazione dei log per un argomento disattiva implicitamente l'archiviazione a livelli per quell'argomento. Tutti i file di log per l'argomento vengono archiviati localmente.

Passaggi successivi