Crea un argomento Google Cloud Managed Service per Apache Kafka

In Managed Service per Apache Kafka, i messaggi sono organizzati in argomenti. Un argomento è costituito da partizioni. Una partizione è una sequenza ordinata e immutabile di record di proprietà di un singolo broker all'interno di un cluster Kafka. Devi creare un argomento per pubblicare o utilizzare i messaggi.

Per creare un argomento, puoi utilizzare la console Google Cloud , Google Cloud CLI, la libreria client, l'API Managed Kafka o le API Apache Kafka open source.

Prima di iniziare

Prima di creare un argomento, devi creare un cluster. Assicurati di aver configurato quanto segue:

Ruoli e autorizzazioni richiesti per creare un argomento

Per ottenere le autorizzazioni necessarie per creare un argomento, chiedi all'amministratore di concederti il ruolo IAM Managed Kafka Topic Editor (roles/managedkafka.topicEditor) nel progetto. Per saperne di più sulla concessione dei ruoli, consulta Gestisci l'accesso a progetti, cartelle e organizzazioni.

Questo ruolo predefinito contiene le autorizzazioni necessarie per creare un argomento. Per vedere quali sono esattamente le autorizzazioni richieste, espandi la sezione Autorizzazioni obbligatorie:

Autorizzazioni obbligatorie

Per creare un argomento sono necessarie le seguenti autorizzazioni:

  • Crea un argomento: managedkafka.topics.create

Potresti anche ottenere queste autorizzazioni con ruoli personalizzati o altri ruoli predefiniti.

Il ruolo Managed Kafka Topic Editor contiene anche il ruolo Managed Kafka Viewer. Per saperne di più su questo ruolo, consulta Ruoli predefiniti di Managed Service per Apache Kafka.

Proprietà di un argomento Managed Service per Apache Kafka

Quando crei o aggiorni un argomento Managed Service per Apache Kafka, devi specificare le seguenti proprietà.

Nome argomento

Il nome dell'argomento Managed Service per Apache Kafka che stai creando. Per maggiori informazioni su come assegnare un nome a un argomento, consulta le linee guida per assegnare un nome a una risorsa Managed Service per Apache Kafka. Il nome di un argomento è immutabile.

Conteggio partizioni

Il numero di partizioni nell'argomento. Puoi modificare un argomento per aumentare il numero di partizioni, ma non puoi diminuirlo. L'aumento del numero di partizioni per un argomento che utilizza una chiave potrebbe modificare la modalità di distribuzione dei messaggi.

Fattore di replica

Il numero di repliche per ogni partizione. Se non specifichi il valore, viene utilizzato il fattore di replica predefinito del cluster.

Un fattore di replica più elevato può migliorare la coerenza dei dati in caso di errori del broker, poiché i dati vengono replicati su più broker. Per gli ambienti di produzione, è consigliabile un fattore di replica pari o superiore a 3. Un numero maggiore di repliche aumenta i costi di archiviazione locale e trasferimento dei dati per l'argomento. Tuttavia, non aumentano i costi dello spazio di archiviazione permanente. Il fattore di replica non può superare il numero di broker disponibili.

Altri parametri

Puoi anche impostare altri parametri di configurazione a livello di argomento di Apache Kafka. Questi vengono specificati come coppie key=value che sostituiscono i valori predefiniti del cluster.

Le configurazioni relative agli argomenti hanno un valore predefinito del server e un override per argomento facoltativo. Il formato è un elenco separato da virgole di coppie KEY=VALUE, dove KEY è il nome della proprietà di configurazione dell'argomento Kafka e VALUE è l'impostazione richiesta.Queste coppie chiave-valore consentono di ignorare i valori predefiniti del cluster. Alcuni esempi includono flush.ms=10 e compression.type=producer.

Per un elenco di tutte le configurazioni supportate a livello di argomento, consulta Configurazioni a livello di argomento nella documentazione di Apache Kafka.

Crea un argomento

Prima di creare un argomento, esamina le proprietà dell'argomento.

Console

  1. Nella console Google Cloud , vai alla pagina Cluster.

    Vai a Cluster

  2. Fai clic sul cluster per cui vuoi creare un argomento.

    Viene visualizzata la pagina Dettagli cluster.

  3. Nella pagina dei dettagli del cluster, fai clic su Crea argomento.

    Viene visualizzata la pagina Crea argomento Kafka.

  4. Per Nome argomento, inserisci una stringa.

  5. Per Conteggio partizioni, inserisci il numero di partizioni che vuoi o mantieni il valore predefinito.

  6. Per Fattore di replica, inserisci il fattore di replica che preferisci o mantieni il valore predefinito.

  7. (Facoltativo) Per modificare le configurazioni degli argomenti, aggiungile come coppie chiave-valore separate da virgole nel campo Configurazioni.

  8. Fai clic su Crea.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Esegui il comando gcloud managed-kafka topics create:

    gcloud managed-kafka topics create TOPIC_ID \
        --cluster=CLUSTER_ID --location=LOCATION_ID \
        --partitions=PARTITIONS \
        --replication-factor=REPLICATION_FACTOR \
        --configs=CONFIGS
    

    Sostituisci quanto segue:

    • TOPIC_ID: il nome dell'argomento.
    • CLUSTER: il nome del cluster in cui vuoi creare l'argomento.
    • LOCATION: la regione del cluster.
    • PARTITIONS: Il numero di partizioni per l'argomento.
    • REPLICATION_FACTOR: Il fattore di replica per l'argomento.
    • CONFIGS: parametri facoltativi a livello di argomento. Specifica le coppie chiave-valore separate da virgole. Ad esempio, compression.type=producer.
  3. Interfaccia a riga di comando Kafka

    Prima di eseguire questo comando, installa gli strumenti a riga di comando Kafka su una VM Compute Engine. La VM deve essere in grado di raggiungere una subnet connessa al cluster Managed Service per Apache Kafka. Segui le istruzioni riportate in Produci e utilizza messaggi con gli strumenti a riga di comando Kafka.

    Esegui il comando kafka-topics.sh come segue:

    kafka-topics.sh --create --if-not-exists \
      --bootstrap-server=BOOTSTRAP_ADDRESS \
      --command-config client.properties \
      --topic TOPIC_ID \
      --partitions PARTITIONS \
      --replication-factor REPLICATION_FACTOR
    

    Sostituisci quanto segue:

    • BOOTSTRAP_ADDRESS: l'indirizzo di bootstrap del cluster Managed Service per Apache Kafka.

    • TOPIC_ID: il nome dell'argomento.

    • PARTITIONS: il numero di partizioni per l'argomento.

    • REPLICATION_FACTOR: il fattore di replica per l'argomento.

    REST

    Prima di utilizzare i dati della richiesta, apporta le sostituzioni seguenti:

    • PROJECT_ID: il tuo Google Cloud ID progetto
    • LOCATION: la posizione del cluster
    • CLUSTER_ID: l'ID del cluster
    • TOPIC_ID: l'ID dell'argomento
    • PARTITION_COUNT: il numero di partizioni per l'argomento
    • REPLICATION_FACTOR: il numero di repliche di ogni partizione

    Metodo HTTP e URL:

    POST https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics?topicId=TOPIC_ID

    Corpo JSON della richiesta:

    {
      "name": "TOPIC_ID",
      "partitionCount": PARTITION_COUNT,
      "replicationFactor": REPLICATION_FACTOR
    }
    

    Per inviare la richiesta, espandi una di queste opzioni:

    Dovresti ricevere una risposta JSON simile alla seguente:

    {
      "name": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC_ID",
      "partitionCount": PARTITION_COUNT,
      "replicationFactor": REPLICATION_FACTOR
    }
    

    Terraform

    Puoi utilizzare una risorsa Terraform per creare un argomento.

    resource "google_managed_kafka_topic" "default" {
      project            = data.google_project.default.project_id # Replace this with your project ID in quotes
      topic_id           = "my-topic-id"
      cluster            = google_managed_kafka_cluster.default.cluster_id
      location           = "us-central1"
      partition_count    = 2
      replication_factor = 3
    }

    Per scoprire come applicare o rimuovere una configurazione Terraform, consulta Comandi Terraform di base.

    Go

    Prima di provare questo esempio, segui le istruzioni di configurazione di Go in Installare le librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Go di Managed Service per Apache Kafka.

    Per eseguire l'autenticazione in Managed Service per Apache Kafka, configura le Credenziali predefinite dell'applicazione(ADC). Per saperne di più, vedi Configura ADC per un ambiente di sviluppo locale.

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func createTopic(w io.Writer, projectID, region, clusterID, topicID string, partitionCount, replicationFactor int32, configs map[string]string, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// clusterID := "my-cluster"
    	// topicID := "my-topic"
    	// partitionCount := 10
    	// replicationFactor := 3
    	// configs := map[string]string{"min.insync.replicas":"1"}
    	ctx := context.Background()
    	client, err := managedkafka.NewClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
    	}
    	defer client.Close()
    
    	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
    	topicPath := fmt.Sprintf("%s/topics/%s", clusterPath, topicID)
    	topicConfig := &managedkafkapb.Topic{
    		Name:              topicPath,
    		PartitionCount:    partitionCount,
    		ReplicationFactor: replicationFactor,
    		Configs:           configs,
    	}
    
    	req := &managedkafkapb.CreateTopicRequest{
    		Parent:  clusterPath,
    		TopicId: topicID,
    		Topic:   topicConfig,
    	}
    	topic, err := client.CreateTopic(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.CreateTopic got err: %w", err)
    	}
    	fmt.Fprintf(w, "Created topic: %s\n", topic.Name)
    	return nil
    }
    

    Java

    Prima di provare questo esempio, segui le istruzioni di configurazione di Java in Installare le librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Java di Managed Service per Apache Kafka.

    Per eseguire l'autenticazione in Managed Service per Apache Kafka, configura le Credenziali predefinite dell'applicazione. Per saperne di più, vedi Configura ADC per un ambiente di sviluppo locale.

    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ClusterName;
    import com.google.cloud.managedkafka.v1.CreateTopicRequest;
    import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
    import com.google.cloud.managedkafka.v1.Topic;
    import com.google.cloud.managedkafka.v1.TopicName;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    public class CreateTopic {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-cluster";
        String topicId = "my-topic";
        int partitionCount = 100;
        int replicationFactor = 3;
        Map<String, String> configs =
            new HashMap<String, String>() {
              {
                put("min.insync.replicas", "2");
              }
            };
        createTopic(projectId, region, clusterId, topicId, partitionCount, replicationFactor, configs);
      }
    
      public static void createTopic(
          String projectId,
          String region,
          String clusterId,
          String topicId,
          int partitionCount,
          int replicationFactor,
          Map<String, String> configs)
          throws Exception {
        Topic topic =
            Topic.newBuilder()
                .setName(TopicName.of(projectId, region, clusterId, topicId).toString())
                .setPartitionCount(partitionCount)
                .setReplicationFactor(replicationFactor)
                .putAllConfigs(configs)
                .build();
        try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) {
          CreateTopicRequest request =
              CreateTopicRequest.newBuilder()
                  .setParent(ClusterName.of(projectId, region, clusterId).toString())
                  .setTopicId(topicId)
                  .setTopic(topic)
                  .build();
          // This operation is being handled synchronously.
          Topic response = managedKafkaClient.createTopic(request);
          System.out.printf("Created topic: %s\n", response.getName());
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaClient.createTopic got err: %s", e.getMessage());
        }
      }
    }
    

    Python

    Prima di provare questo esempio, segui le istruzioni di configurazione di Python in Installare le librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Python di Managed Service per Apache Kafka.

    Per eseguire l'autenticazione in Managed Service per Apache Kafka, configura le Credenziali predefinite dell'applicazione. Per saperne di più, vedi Configura ADC per un ambiente di sviluppo locale.

    from google.api_core.exceptions import AlreadyExists
    from google.cloud import managedkafka_v1
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # cluster_id = "my-cluster"
    # topic_id = "my-topic"
    # partition_count = 10
    # replication_factor = 3
    # configs = {"min.insync.replicas": "1"}
    
    client = managedkafka_v1.ManagedKafkaClient()
    
    topic = managedkafka_v1.Topic()
    topic.name = client.topic_path(project_id, region, cluster_id, topic_id)
    topic.partition_count = partition_count
    topic.replication_factor = replication_factor
    # For a list of configs, one can check https://kafka.apache.org/documentation/#topicconfigs
    topic.configs = configs
    
    request = managedkafka_v1.CreateTopicRequest(
        parent=client.cluster_path(project_id, region, cluster_id),
        topic_id=topic_id,
        topic=topic,
    )
    
    try:
        response = client.create_topic(request=request)
        print("Created topic:", response.name)
    except AlreadyExists as e:
        print(f"Failed to create topic {topic.name} with error: {e.message}")
    

Passaggi successivi