Crea un tema de Google Cloud Managed Service para Apache Kafka

En Managed Service para Apache Kafka, los mensajes se organizan en temas. Un tema se compone de particiones. Una partición es una secuencia ordenada e inmutable de registros que pertenece a un solo agente dentro de un clúster de Kafka. Debes crear un tema para publicar o usar mensajes.

Para crear un tema, puedes usar la consola de Google Cloud , Google Cloud CLI, la biblioteca cliente, la API de Kafka administrado o las APIs de Apache Kafka de código abierto.

Antes de comenzar

Primero debes crear un clúster antes de crear un tema. Asegúrate de haber configurado lo siguiente:

Roles y permisos obligatorios para crear un tema

Para obtener los permisos que necesitas para crear un tema, pídele a tu administrador que te otorgue el rol de IAM Editor de temas de Kafka administrados (roles/managedkafka.topicEditor) en tu proyecto. Para obtener más información sobre cómo otorgar roles, consulta Administra el acceso a proyectos, carpetas y organizaciones.

Este rol predefinido contiene los permisos necesarios para crear un tema. Para ver los permisos exactos que son necesarios, expande la sección Permisos requeridos:

Permisos necesarios

Se requieren los siguientes permisos para crear un tema:

  • Crea un tema: managedkafka.topics.create

También puedes obtener estos permisos con roles personalizados o con otros roles predefinidos.

El rol de Editor del tema de Kafka administrado también contiene el rol de Visualizador de Kafka administrado. Para obtener más información sobre este rol, consulta Roles predefinidos de Managed Service para Apache Kafka.

Propiedades de un tema de Managed Service para Apache Kafka

Cuando creas o actualizas un tema de Managed Service para Apache Kafka, debes especificar las siguientes propiedades.

Nombre del tema

Es el nombre del tema de Managed Service para Apache Kafka que estás creando. Si necesitas ayuda para asignarle un nombre a un tema, consulta los Lineamientos para asignarles nombres a los recursos de Servicio administrado para Apache Kafka. El nombre de un tema es inmutable.

Recuento de particiones

La cantidad de particiones del tema. Puedes editar un tema para aumentar la cantidad de particiones, pero no puedes disminuirla. Aumentar la cantidad de particiones de un tema que usa una clave podría cambiar la forma en que se distribuyen los mensajes.

Factor de replicación

Es la cantidad de réplicas para cada partición. Si no especificas el valor, se usa el factor de replicación predeterminado del clúster.

Un factor de replicación más alto puede mejorar la coherencia de los datos en caso de fallas del agente, ya que los datos se replican en varios agentes. Para los entornos de producción, se recomienda un factor de replicación de 3 o más. Una mayor cantidad de réplicas aumenta los costos de almacenamiento local y transferencia de datos del tema. Sin embargo, no aumentan los costos de almacenamiento persistente. El factor de replicación no puede exceder la cantidad de intermediarios disponibles.

Otros parámetros

También puedes establecer otros parámetros de configuración de Apache Kafka a nivel del tema. Estos se especifican como pares key=value que anulan los valores predeterminados del clúster.

Las configuraciones relacionadas con los temas tienen un valor predeterminado del servidor y una anulación opcional por tema. El formato es una lista separada por comas de pares KEY=VALUE, en la que KEY es el nombre de la propiedad de configuración del tema de Kafka y VALUE es el parámetro de configuración requerido.Estos pares clave-valor te ayudan a anular los valores predeterminados del clúster. Entre los ejemplos, se incluyen flush.ms=10 y compression.type=producer.

Para ver una lista de todos los parámetros de configuración admitidos a nivel del tema, consulta Topic-level configs en la documentación de Apache Kafka.

Crea un tema

Antes de crear un tema, revisa las propiedades del tema.

Console

  1. En la consola de Google Cloud , ve a la página Clústeres.

    Ir a los clústeres

  2. Haz clic en el clúster para el que deseas crear un tema.

    Se abrirá la página Detalles del clúster.

  3. En la página de detalles del clúster, haz clic en Crear tema.

    Se abrirá la página Crear tema de Kafka.

  4. En Nombre del tema, ingresa una cadena.

  5. En Cantidad de particiones, ingresa la cantidad de particiones que deseas o conserva el valor predeterminado.

  6. En Factor de replicación, ingresa el factor de replicación que desees o conserva el valor predeterminado.

  7. (Opcional) Para modificar la configuración de algún tema, agrégala como pares clave-valor separados por comas en el campo Configurations.

  8. Haz clic en Crear.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Ejecuta el comando gcloud managed-kafka topics create:

    gcloud managed-kafka topics create TOPIC_ID \
        --cluster=CLUSTER_ID --location=LOCATION_ID \
        --partitions=PARTITIONS \
        --replication-factor=REPLICATION_FACTOR \
        --configs=CONFIGS
    

    Reemplaza lo siguiente:

    • TOPIC_ID: Es el nombre del tema.
    • CLUSTER: Es el nombre del clúster en el que deseas crear el tema.
    • LOCATION: es la región del clúster.
    • PARTITIONS: Es la cantidad de particiones del tema.
    • REPLICATION_FACTOR: Es el factor de replicación del tema.
    • CONFIGS: Son parámetros opcionales a nivel del tema. Se especifica como pares clave-valor separados por comas. Por ejemplo, compression.type=producer.
  3. CLI de Kafka

    Antes de ejecutar este comando, instala las herramientas de línea de comandos de Kafka en una VM de Compute Engine. La VM debe poder acceder a una subred conectada a tu clúster de Managed Service para Apache Kafka. Sigue las instrucciones en Produce and consume messages with the Kafka command-line tools.

    Ejecuta el comando kafka-topics.sh de la siguiente manera:

    kafka-topics.sh --create --if-not-exists \
      --bootstrap-server=BOOTSTRAP_ADDRESS \
      --command-config client.properties \
      --topic TOPIC_ID \
      --partitions PARTITIONS \
      --replication-factor REPLICATION_FACTOR
    

    Reemplaza lo siguiente:

    • BOOTSTRAP_ADDRESS: La dirección de arranque del clúster de Managed Service para Apache Kafka.

    • TOPIC_ID: Es el nombre del tema.

    • PARTITIONS: Es la cantidad de particiones del tema.

    • REPLICATION_FACTOR: Es el factor de replicación del tema.

    REST

    Antes de usar cualquiera de los datos de solicitud a continuación, realiza los siguientes reemplazos:

    • PROJECT_ID: El ID de tu proyecto de Google Cloud
    • LOCATION: Es la ubicación del clúster.
    • CLUSTER_ID: ID del clúster
    • TOPIC_ID: El ID del tema
    • PARTITION_COUNT: Es la cantidad de particiones del tema.
    • REPLICATION_FACTOR: Es la cantidad de réplicas de cada partición.

    Método HTTP y URL:

    POST https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics?topicId=TOPIC_ID

    Cuerpo JSON de la solicitud:

    {
      "name": "TOPIC_ID",
      "partitionCount": PARTITION_COUNT,
      "replicationFactor": REPLICATION_FACTOR
    }
    

    Para enviar tu solicitud, expande una de estas opciones:

    Deberías recibir una respuesta JSON similar a la que se muestra a continuación:

    {
      "name": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC_ID",
      "partitionCount": PARTITION_COUNT,
      "replicationFactor": REPLICATION_FACTOR
    }
    

    Terraform

    Puedes usar un recurso de Terraform para crear un tema.

    resource "google_managed_kafka_topic" "default" {
      project            = data.google_project.default.project_id # Replace this with your project ID in quotes
      topic_id           = "my-topic-id"
      cluster            = google_managed_kafka_cluster.default.cluster_id
      location           = "us-central1"
      partition_count    = 2
      replication_factor = 3
    }

    Si deseas obtener más información para aplicar o quitar una configuración de Terraform, consulta los comandos básicos de Terraform.

    Go

    Antes de probar este ejemplo, sigue las instrucciones de configuración de Go en Instala las bibliotecas cliente. Para obtener más información, consulta la documentación de referencia de la API de Managed Service para Apache Kafka en Go.

    Para autenticarte en Managed Service for Apache Kafka, configura las credenciales predeterminadas de la aplicación(ADC). Para obtener más información, consulta Configura ADC para un entorno de desarrollo local.

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func createTopic(w io.Writer, projectID, region, clusterID, topicID string, partitionCount, replicationFactor int32, configs map[string]string, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// clusterID := "my-cluster"
    	// topicID := "my-topic"
    	// partitionCount := 10
    	// replicationFactor := 3
    	// configs := map[string]string{"min.insync.replicas":"1"}
    	ctx := context.Background()
    	client, err := managedkafka.NewClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
    	}
    	defer client.Close()
    
    	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
    	topicPath := fmt.Sprintf("%s/topics/%s", clusterPath, topicID)
    	topicConfig := &managedkafkapb.Topic{
    		Name:              topicPath,
    		PartitionCount:    partitionCount,
    		ReplicationFactor: replicationFactor,
    		Configs:           configs,
    	}
    
    	req := &managedkafkapb.CreateTopicRequest{
    		Parent:  clusterPath,
    		TopicId: topicID,
    		Topic:   topicConfig,
    	}
    	topic, err := client.CreateTopic(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.CreateTopic got err: %w", err)
    	}
    	fmt.Fprintf(w, "Created topic: %s\n", topic.Name)
    	return nil
    }
    

    Java

    Antes de probar este ejemplo, sigue las instrucciones de configuración de Java en Instala las bibliotecas cliente. Para obtener más información, consulta la documentación de referencia de la API de Java de Managed Service for Apache Kafka.

    Para autenticarte en el servicio administrado para Apache Kafka, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta Configura ADC para un entorno de desarrollo local.

    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ClusterName;
    import com.google.cloud.managedkafka.v1.CreateTopicRequest;
    import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
    import com.google.cloud.managedkafka.v1.Topic;
    import com.google.cloud.managedkafka.v1.TopicName;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    public class CreateTopic {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-cluster";
        String topicId = "my-topic";
        int partitionCount = 100;
        int replicationFactor = 3;
        Map<String, String> configs =
            new HashMap<String, String>() {
              {
                put("min.insync.replicas", "2");
              }
            };
        createTopic(projectId, region, clusterId, topicId, partitionCount, replicationFactor, configs);
      }
    
      public static void createTopic(
          String projectId,
          String region,
          String clusterId,
          String topicId,
          int partitionCount,
          int replicationFactor,
          Map<String, String> configs)
          throws Exception {
        Topic topic =
            Topic.newBuilder()
                .setName(TopicName.of(projectId, region, clusterId, topicId).toString())
                .setPartitionCount(partitionCount)
                .setReplicationFactor(replicationFactor)
                .putAllConfigs(configs)
                .build();
        try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) {
          CreateTopicRequest request =
              CreateTopicRequest.newBuilder()
                  .setParent(ClusterName.of(projectId, region, clusterId).toString())
                  .setTopicId(topicId)
                  .setTopic(topic)
                  .build();
          // This operation is being handled synchronously.
          Topic response = managedKafkaClient.createTopic(request);
          System.out.printf("Created topic: %s\n", response.getName());
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaClient.createTopic got err: %s", e.getMessage());
        }
      }
    }
    

    Python

    Antes de probar este ejemplo, sigue las instrucciones de configuración de Python en Instala las bibliotecas cliente. Si deseas obtener más información, consulta la documentación de referencia de la API de Python de Managed Service for Apache Kafka.

    Para autenticarte en el servicio administrado para Apache Kafka, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta Configura ADC para un entorno de desarrollo local.

    from google.api_core.exceptions import AlreadyExists
    from google.cloud import managedkafka_v1
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # cluster_id = "my-cluster"
    # topic_id = "my-topic"
    # partition_count = 10
    # replication_factor = 3
    # configs = {"min.insync.replicas": "1"}
    
    client = managedkafka_v1.ManagedKafkaClient()
    
    topic = managedkafka_v1.Topic()
    topic.name = client.topic_path(project_id, region, cluster_id, topic_id)
    topic.partition_count = partition_count
    topic.replication_factor = replication_factor
    # For a list of configs, one can check https://kafka.apache.org/documentation/#topicconfigs
    topic.configs = configs
    
    request = managedkafka_v1.CreateTopicRequest(
        parent=client.cluster_path(project_id, region, cluster_id),
        topic_id=topic_id,
        topic=topic,
    )
    
    try:
        response = client.create_topic(request=request)
        print("Created topic:", response.name)
    except AlreadyExists as e:
        print(f"Failed to create topic {topic.name} with error: {e.message}")
    

Próximos pasos