Crea un argomento Google Cloud Managed Service per Apache Kafka

In Managed Service per Apache Kafka, i messaggi sono organizzati in argomenti. Un argomento è costituito da partizioni. Una partizione è una sequenza ordinata e immutabile di record di proprietà di un singolo broker all'interno di un cluster Kafka. Devi creare un argomento per pubblicare o utilizzare i messaggi.

Per creare un argomento, puoi utilizzare la console Google Cloud , Google Cloud CLI, la libreria client, l'API Managed Kafka o le API Apache Kafka open source.

Prima di iniziare

Prima di creare un argomento, devi creare un cluster. Assicurati di aver configurato quanto segue:

Ruoli e autorizzazioni richiesti per creare un argomento

Per ottenere le autorizzazioni necessarie per creare un argomento, chiedi all'amministratore di concederti il ruolo IAM Managed Kafka Topic Editor (roles/managedkafka.topicEditor) nel progetto. Per saperne di più sulla concessione dei ruoli, consulta Gestisci l'accesso a progetti, cartelle e organizzazioni.

Questo ruolo predefinito contiene le autorizzazioni necessarie per creare un argomento. Per vedere quali sono esattamente le autorizzazioni richieste, espandi la sezione Autorizzazioni obbligatorie:

Autorizzazioni obbligatorie

Per creare un argomento sono necessarie le seguenti autorizzazioni:

  • Crea un argomento: managedkafka.topics.create

Potresti anche ottenere queste autorizzazioni con ruoli personalizzati o altri ruoli predefiniti.

Il ruolo Managed Kafka Topic Editor contiene anche il ruolo Managed Kafka Viewer. Per saperne di più su questo ruolo, consulta Ruoli predefiniti di Managed Service per Apache Kafka.

Proprietà di un argomento Managed Service per Apache Kafka

Quando crei o aggiorni un argomento Managed Service per Apache Kafka, devi specificare le seguenti proprietà.

Nome argomento

Il nome dell'argomento Managed Service per Apache Kafka che stai creando. Per maggiori informazioni su come assegnare un nome a un argomento, consulta le linee guida per assegnare un nome a una risorsa Managed Service per Apache Kafka. Il nome di un argomento è immutabile.

Conteggio partizioni

Il numero di partizioni nell'argomento. Puoi modificare un argomento per aumentare il numero di partizioni, ma non puoi diminuirlo. L'aumento del numero di partizioni per un argomento che utilizza una chiave potrebbe modificare la modalità di distribuzione dei messaggi.

Fattore di replica

Il numero di repliche per ogni partizione. Se non specifichi il valore, viene utilizzato il fattore di replica predefinito del cluster.

Un fattore di replica più elevato può migliorare la coerenza dei dati in caso di errori del broker, poiché i dati vengono replicati su più broker. Per gli ambienti di produzione, è consigliabile un fattore di replica pari o superiore a 3. Un numero maggiore di repliche aumenta i costi di archiviazione locale e di trasferimento dei dati per l'argomento. Tuttavia, non aumentano i costi di archiviazione permanente. Il fattore di replica non può superare il numero di broker disponibili.

Altri parametri

Puoi anche impostare altri parametri di configurazione a livello di argomento di Apache Kafka. Questi vengono specificati come coppie key=value che sostituiscono i valori predefiniti del cluster.

Le configurazioni relative agli argomenti hanno un valore predefinito del server e un override facoltativo per argomento. Il formato è un elenco separato da virgole di coppie KEY=VALUE, dove KEY è il nome della proprietà di configurazione dell'argomento Kafka e VALUE è l'impostazione richiesta.Queste coppie chiave-valore ti aiutano a ignorare i valori predefiniti del cluster. Alcuni esempi includono flush.ms=10 e compression.type=producer.

Per un elenco di tutte le configurazioni supportate a livello di argomento, consulta Configurazioni a livello di argomento nella documentazione di Apache Kafka.

Crea un argomento

Prima di creare un argomento, esamina le proprietà dell'argomento.

Console

  1. Nella console Google Cloud , vai alla pagina Cluster.

    Vai a Cluster

  2. Fai clic sul cluster per cui vuoi creare un argomento.

    Viene visualizzata la pagina Dettagli cluster.

  3. Nella pagina dei dettagli del cluster, fai clic su Crea argomento.

    Viene visualizzata la pagina Crea argomento Kafka.

  4. Per Nome argomento, inserisci una stringa.

  5. Per Conteggio partizioni, inserisci il numero di partizioni che vuoi o mantieni il valore predefinito.

  6. Per Fattore di replica, inserisci il fattore di replica che vuoi o mantieni il valore predefinito.

  7. (Facoltativo) Per modificare le configurazioni degli argomenti, aggiungile come coppie chiave-valore separate da virgole nel campo Configurazioni.

  8. Fai clic su Crea.

gcloud

  1. Nella console Google Cloud , attiva Cloud Shell.

    Attiva Cloud Shell

    Nella parte inferiore della console Google Cloud viene avviata una sessione di Cloud Shell e viene visualizzato un prompt della riga di comando. Cloud Shell è un ambiente shell con Google Cloud CLI già installata e con valori già impostati per il progetto corrente. L'inizializzazione della sessione può richiedere alcuni secondi.

  2. Esegui il comando gcloud managed-kafka topics create:

    gcloud managed-kafka topics create TOPIC_ID \
        --cluster=CLUSTER_ID --location=LOCATION_ID \
        --partitions=PARTITIONS \
        --replication-factor=REPLICATION_FACTOR \
        --configs=CONFIGS
    

    Sostituisci quanto segue:

    • TOPIC_ID: il nome dell'argomento.
    • CLUSTER: il nome del cluster in cui vuoi creare l'argomento.
    • LOCATION: la regione del cluster.
    • PARTITIONS: Il numero di partizioni per l'argomento.
    • REPLICATION_FACTOR: Il fattore di replica per l'argomento.
    • CONFIGS: parametri facoltativi a livello di argomento. Specifica le coppie chiave-valore separate da virgole. Ad esempio, compression.type=producer.

Interfaccia a riga di comando Kafka

Prima di eseguire questo comando, installa gli strumenti a riga di comando di Kafka su una VM Compute Engine. La VM deve essere in grado di raggiungere una subnet connessa al tuo cluster Managed Service per Apache Kafka. Segui le istruzioni riportate in Produci e utilizza messaggi con gli strumenti a riga di comando Kafka.

Esegui il comando kafka-topics.sh come segue:

kafka-topics.sh --create --if-not-exists \
  --bootstrap-server=BOOTSTRAP_ADDRESS \
  --command-config client.properties \
  --topic TOPIC_ID \
  --partitions PARTITIONS \
  --replication-factor REPLICATION_FACTOR

Sostituisci quanto segue:

  • BOOTSTRAP_ADDRESS: l'indirizzo di bootstrap del cluster Managed Service per Apache Kafka.

  • TOPIC_ID: il nome dell'argomento.

  • PARTITIONS: il numero di partizioni per l'argomento.

  • REPLICATION_FACTOR: il fattore di replica per l'argomento.

REST

Prima di utilizzare i dati della richiesta, apporta le sostituzioni seguenti:

  • PROJECT_ID: il tuo Google Cloud ID progetto
  • LOCATION: la posizione del cluster
  • CLUSTER_ID: l'ID del cluster
  • TOPIC_ID: l'ID dell'argomento
  • PARTITION_COUNT: il numero di partizioni per l'argomento
  • REPLICATION_FACTOR: il numero di repliche di ogni partizione

Metodo HTTP e URL:

POST https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics?topicId=TOPIC_ID

Corpo JSON della richiesta:

{
  "name": "TOPIC_ID",
  "partitionCount": PARTITION_COUNT,
  "replicationFactor": REPLICATION_FACTOR
}

Per inviare la richiesta, espandi una di queste opzioni:

Dovresti ricevere una risposta JSON simile alla seguente:

{
  "name": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC_ID",
  "partitionCount": PARTITION_COUNT,
  "replicationFactor": REPLICATION_FACTOR
}

Terraform

Puoi utilizzare una risorsa Terraform per creare un argomento.

resource "google_managed_kafka_topic" "default" {
  project            = data.google_project.default.project_id # Replace this with your project ID in quotes
  topic_id           = "my-topic-id"
  cluster            = google_managed_kafka_cluster.default.cluster_id
  location           = "us-central1"
  partition_count    = 2
  replication_factor = 3
}

Per scoprire come applicare o rimuovere una configurazione Terraform, consulta Comandi Terraform di base.

Go

Prima di provare questo esempio, segui le istruzioni di configurazione di Go in Installare le librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Go di Managed Service per Apache Kafka.

Per eseguire l'autenticazione in Managed Service per Apache Kafka, configura le Credenziali predefinite dell'applicazione(ADC). Per saperne di più, vedi Configura ADC per un ambiente di sviluppo locale.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/option"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
)

func createTopic(w io.Writer, projectID, region, clusterID, topicID string, partitionCount, replicationFactor int32, configs map[string]string, opts ...option.ClientOption) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// clusterID := "my-cluster"
	// topicID := "my-topic"
	// partitionCount := 10
	// replicationFactor := 3
	// configs := map[string]string{"min.insync.replicas":"1"}
	ctx := context.Background()
	client, err := managedkafka.NewClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
	}
	defer client.Close()

	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
	topicPath := fmt.Sprintf("%s/topics/%s", clusterPath, topicID)
	topicConfig := &managedkafkapb.Topic{
		Name:              topicPath,
		PartitionCount:    partitionCount,
		ReplicationFactor: replicationFactor,
		Configs:           configs,
	}

	req := &managedkafkapb.CreateTopicRequest{
		Parent:  clusterPath,
		TopicId: topicID,
		Topic:   topicConfig,
	}
	topic, err := client.CreateTopic(ctx, req)
	if err != nil {
		return fmt.Errorf("client.CreateTopic got err: %w", err)
	}
	fmt.Fprintf(w, "Created topic: %s\n", topic.Name)
	return nil
}

Java

Prima di provare questo esempio, segui le istruzioni di configurazione di Java in Installare le librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Java di Managed Service per Apache Kafka.

Per eseguire l'autenticazione in Managed Service per Apache Kafka, configura le Credenziali predefinite dell'applicazione. Per saperne di più, vedi Configura ADC per un ambiente di sviluppo locale.

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.managedkafka.v1.ClusterName;
import com.google.cloud.managedkafka.v1.CreateTopicRequest;
import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
import com.google.cloud.managedkafka.v1.Topic;
import com.google.cloud.managedkafka.v1.TopicName;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class CreateTopic {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String clusterId = "my-cluster";
    String topicId = "my-topic";
    int partitionCount = 100;
    int replicationFactor = 3;
    Map<String, String> configs =
        new HashMap<String, String>() {
          {
            put("min.insync.replicas", "2");
          }
        };
    createTopic(projectId, region, clusterId, topicId, partitionCount, replicationFactor, configs);
  }

  public static void createTopic(
      String projectId,
      String region,
      String clusterId,
      String topicId,
      int partitionCount,
      int replicationFactor,
      Map<String, String> configs)
      throws Exception {
    Topic topic =
        Topic.newBuilder()
            .setName(TopicName.of(projectId, region, clusterId, topicId).toString())
            .setPartitionCount(partitionCount)
            .setReplicationFactor(replicationFactor)
            .putAllConfigs(configs)
            .build();
    try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) {
      CreateTopicRequest request =
          CreateTopicRequest.newBuilder()
              .setParent(ClusterName.of(projectId, region, clusterId).toString())
              .setTopicId(topicId)
              .setTopic(topic)
              .build();
      // This operation is being handled synchronously.
      Topic response = managedKafkaClient.createTopic(request);
      System.out.printf("Created topic: %s\n", response.getName());
    } catch (IOException | ApiException e) {
      System.err.printf("managedKafkaClient.createTopic got err: %s", e.getMessage());
    }
  }
}

Python

Prima di provare questo esempio, segui le istruzioni di configurazione di Python in Installare le librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Python di Managed Service per Apache Kafka.

Per eseguire l'autenticazione in Managed Service per Apache Kafka, configura le Credenziali predefinite dell'applicazione. Per saperne di più, vedi Configura ADC per un ambiente di sviluppo locale.

from google.api_core.exceptions import AlreadyExists
from google.cloud import managedkafka_v1

# TODO(developer)
# project_id = "my-project-id"
# region = "us-central1"
# cluster_id = "my-cluster"
# topic_id = "my-topic"
# partition_count = 10
# replication_factor = 3
# configs = {"min.insync.replicas": "1"}

client = managedkafka_v1.ManagedKafkaClient()

topic = managedkafka_v1.Topic()
topic.name = client.topic_path(project_id, region, cluster_id, topic_id)
topic.partition_count = partition_count
topic.replication_factor = replication_factor
# For a list of configs, one can check https://kafka.apache.org/documentation/#topicconfigs
topic.configs = configs

request = managedkafka_v1.CreateTopicRequest(
    parent=client.cluster_path(project_id, region, cluster_id),
    topic_id=topic_id,
    topic=topic,
)

try:
    response = client.create_topic(request=request)
    print("Created topic:", response.name)
except AlreadyExists as e:
    print(f"Failed to create topic {topic.name} with error: {e.message}")

Passaggi successivi