Aggiorna un argomento Kafka

Aggiorna un argomento Kafka

Per saperne di più

Per la documentazione dettagliata che include questo esempio di codice, vedi quanto segue:

Esempio di codice

Go

Prima di provare questo esempio, segui le istruzioni di configurazione di Go nella guida rapida di Managed Service per Apache Kafka per l'utilizzo delle librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Managed Service per Apache Kafka Go.

Per eseguire l'autenticazione in Managed Service per Apache Kafka, configura le Credenziali predefinite dell'applicazione. Per saperne di più, consulta Configura l'autenticazione per un ambiente di sviluppo locale.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/option"
	"google.golang.org/protobuf/types/known/fieldmaskpb"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
)

func updateTopic(w io.Writer, projectID, region, clusterID, topicID string, partitionCount int32, configs map[string]string, opts ...option.ClientOption) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// clusterID := "my-cluster"
	// topicID := "my-topic"
	// partitionCount := 20
	// configs := map[string]string{"min.insync.replicas":"1"}
	ctx := context.Background()
	client, err := managedkafka.NewClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
	}
	defer client.Close()

	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
	topicPath := fmt.Sprintf("%s/topics/%s", clusterPath, topicID)
	TopicConfig := managedkafkapb.Topic{
		Name:           topicPath,
		PartitionCount: partitionCount,
		Configs:        configs,
	}
	paths := []string{"partition_count", "configs"}
	updateMask := &fieldmaskpb.FieldMask{
		Paths: paths,
	}

	req := &managedkafkapb.UpdateTopicRequest{
		UpdateMask: updateMask,
		Topic:      &TopicConfig,
	}
	topic, err := client.UpdateTopic(ctx, req)
	if err != nil {
		return fmt.Errorf("client.UpdateTopic got err: %w", err)
	}
	fmt.Fprintf(w, "Updated topic: %#v\n", topic)
	return nil
}

Java

Prima di provare questo esempio, segui le istruzioni di configurazione di Java nella guida rapida di Managed Service per Apache Kafka per l'utilizzo delle librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Managed Service per Apache Kafka Java.

Per eseguire l'autenticazione in Managed Service per Apache Kafka, configura le Credenziali predefinite dell'applicazione. Per saperne di più, consulta Configura l'autenticazione per un ambiente di sviluppo locale.

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
import com.google.cloud.managedkafka.v1.Topic;
import com.google.cloud.managedkafka.v1.TopicName;
import com.google.cloud.managedkafka.v1.UpdateTopicRequest;
import com.google.protobuf.FieldMask;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

public class UpdateTopic {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String clusterId = "my-cluster";
    String topicId = "my-topic";
    int partitionCount = 200;
    Map<String, String> configs =
        new HashMap<String, String>() {
          {
            put("min.insync.replicas", "1");
          }
        };
    updateTopic(projectId, region, clusterId, topicId, partitionCount, configs);
  }

  public static void updateTopic(
      String projectId,
      String region,
      String clusterId,
      String topicId,
      int partitionCount,
      Map<String, String> configs)
      throws Exception {
    Topic topic =
        Topic.newBuilder()
            .setName(TopicName.of(projectId, region, clusterId, topicId).toString())
            .setPartitionCount(partitionCount)
            .putAllConfigs(configs)
            .build();
    String[] paths = {"partition_count", "configs"};
    FieldMask updateMask = FieldMask.newBuilder().addAllPaths(Arrays.asList(paths)).build();
    try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) {
      UpdateTopicRequest request =
          UpdateTopicRequest.newBuilder().setUpdateMask(updateMask).setTopic(topic).build();
      // This operation is being handled synchronously.
      Topic response = managedKafkaClient.updateTopic(request);
      System.out.printf("Updated topic: %s\n", response.getName());
    } catch (IOException | ApiException e) {
      System.err.printf("managedKafkaClient.updateCluster got err: %s", e.getMessage());
    }
  }
}

Python

Prima di provare questo esempio, segui le istruzioni di configurazione di Python nella guida rapida di Managed Service per Apache Kafka per l'utilizzo delle librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Managed Service per Apache Kafka Python.

Per eseguire l'autenticazione in Managed Service per Apache Kafka, configura le Credenziali predefinite dell'applicazione. Per saperne di più, consulta Configura l'autenticazione per un ambiente di sviluppo locale.

from google.api_core.exceptions import NotFound
from google.cloud import managedkafka_v1
from google.protobuf import field_mask_pb2

# TODO(developer)
# project_id = "my-project-id"
# region = "us-central1"
# cluster_id = "my-cluster"
# topic_id = "my-topic"
# partition_count = 20
# configs = {"min.insync.replicas": "1"}

client = managedkafka_v1.ManagedKafkaClient()

topic = managedkafka_v1.Topic()
topic.name = client.topic_path(project_id, region, cluster_id, topic_id)
topic.partition_count = partition_count
topic.configs = configs
update_mask = field_mask_pb2.FieldMask()
update_mask.paths.extend(["partition_count", "configs"])

# For a list of editable fields, one can check https://cloud.google.com/managed-kafka/docs/create-topic#properties.
request = managedkafka_v1.UpdateTopicRequest(
    update_mask=update_mask,
    topic=topic,
)

try:
    response = client.update_topic(request=request)
    print("Updated topic:", response)
except NotFound as e:
    print(f"Failed to update topic {topic_id} with error: {e.message}")

Passaggi successivi

Per cercare e filtrare gli esempi di codice per altri prodotti Google Cloud , consulta il browser degli esempi diGoogle Cloud .