Enumera tus temas de Managed Service para Apache Kafka de Google Cloud

Para enumerar tus temas en un clúster, puedes usar la consola de Google Cloud , Google Cloud CLI, la biblioteca cliente, la API de Kafka administrado o las APIs de Apache Kafka de código abierto.

Roles y permisos obligatorios para mostrar tus temas

Para obtener los permisos que necesitas para enumerar tus temas, pídele a tu administrador que te otorgue el rol de IAM Visualizador de Kafka administrado (roles/managedkafka.viewer) en tu proyecto. Para obtener más información sobre cómo otorgar roles, consulta Administra el acceso a proyectos, carpetas y organizaciones.

Este rol predefinido contiene los permisos necesarios para enumerar tus temas. Para ver los permisos exactos que son necesarios, expande la sección Permisos requeridos:

Permisos necesarios

Se requieren los siguientes permisos para enumerar tus temas:

  • Temas de la lista: managedkafka.topics.list
  • Obtén el tema: managedkafka.topics.get

También puedes obtener estos permisos con roles personalizados o con otros roles predefinidos.

Para obtener más información sobre el rol de IAM de Visualizador de Kafka administrado (roles/managedkafka.viewer), consulta Roles predefinidos de Managed Service para Apache Kafka.

Haz una lista de tus temas

Console

  1. En la consola de Google Cloud , ve a la página Clústeres.

    Ir a los clústeres

    Se muestran los clústeres que creaste en un proyecto.

  2. Haz clic en el clúster para el que deseas ver los temas.

    Se muestra la página de detalles del clúster. En la página de detalles del clúster, en la pestaña Recursos, se enumeran los temas.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Ejecuta el comando gcloud managed-kafka topics list:

    gcloud managed-kafka topics list CLUSTER_ID \
        --location=LOCATION_ID \
        --limit=LIMIT
    

    Este comando recupera una lista de todos los temas presentes en el clúster de Managed Service para Apache Kafka especificado. Puedes usar marcas opcionales para filtrar, limitar y ordenar el resultado.

    Reemplaza lo siguiente:

    • CLUSTER_ID: Es el nombre del clúster cuyos temas deseas enumerar.
    • LOCATION_ID: Es la ubicación del clúster.
    • LIMIT: (Opcional) Es la cantidad máxima de temas que se pueden incluir en la lista.
  3. CLI de Kafka

    Antes de ejecutar este comando, instala las herramientas de línea de comandos de Kafka en una VM de Compute Engine. La VM debe poder acceder a una subred conectada a tu clúster de Managed Service para Apache Kafka. Sigue las instrucciones en Produce and consume messages with the Kafka command-line tools.

    Ejecuta el comando kafka-topics.sh de la siguiente manera:

    kafka-topics.sh --list \
      --bootstrap-server=BOOTSTRAP_ADDRESS \
      --command-config client.properties
    

    Reemplaza lo siguiente:

    REST

    Antes de usar cualquiera de los datos de solicitud a continuación, realiza los siguientes reemplazos:

    • PROJECT_ID: El ID de tu proyecto de Google Cloud
    • LOCATION: Es la ubicación del clúster.
    • CLUSTER_ID: ID del clúster

    Método HTTP y URL:

    GET https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics

    Para enviar tu solicitud, expande una de estas opciones:

    Deberías recibir una respuesta JSON similar a la que se muestra a continuación:

    {
      "topics": [
        {
          "name": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/__remote_log_metadata",
          "partitionCount": 50,
          "replicationFactor": 3,
          "configs": {
            "remote.storage.enable": "false",
            "cleanup.policy": "delete",
            "retention.ms": "-1"
          }
        },
        {
          "name": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC_ID",
          "partitionCount": 3,
          "replicationFactor": 3
        }
      ]
    }
    
    

    Go

    Antes de probar este ejemplo, sigue las instrucciones de configuración de Go en Instala las bibliotecas cliente. Para obtener más información, consulta la documentación de referencia de la API de Managed Service para Apache Kafka en Go.

    Para autenticarte en Managed Service for Apache Kafka, configura las credenciales predeterminadas de la aplicación(ADC). Para obtener más información, consulta Configura ADC para un entorno de desarrollo local.

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/iterator"
    	"google.golang.org/api/option"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func listTopics(w io.Writer, projectID, region, clusterID string, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// clusterID := "my-cluster"
    	ctx := context.Background()
    	client, err := managedkafka.NewClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
    	}
    	defer client.Close()
    
    	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
    	req := &managedkafkapb.ListTopicsRequest{
    		Parent: clusterPath,
    	}
    	topicIter := client.ListTopics(ctx, req)
    	for {
    		res, err := topicIter.Next()
    		if err == iterator.Done {
    			break
    		}
    		if err != nil {
    			return fmt.Errorf("topicIter.Next() got err: %w", err)
    		}
    		fmt.Fprintf(w, "Got topic: %v", res)
    	}
    	return nil
    }
    

    Java

    Antes de probar este ejemplo, sigue las instrucciones de configuración de Java en Instala las bibliotecas cliente. Para obtener más información, consulta la documentación de referencia de la API de Java de Managed Service for Apache Kafka.

    Para autenticarte en el servicio administrado para Apache Kafka, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta Configura ADC para un entorno de desarrollo local.

    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ClusterName;
    import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
    import com.google.cloud.managedkafka.v1.Topic;
    import java.io.IOException;
    
    public class ListTopics {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-cluster";
        listTopics(projectId, region, clusterId);
      }
    
      public static void listTopics(String projectId, String region, String clusterId)
          throws Exception {
        try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) {
          ClusterName clusterName = ClusterName.of(projectId, region, clusterId);
          // This operation is being handled synchronously.
          for (Topic topic : managedKafkaClient.listTopics(clusterName).iterateAll()) {
            System.out.println(topic.getAllFields());
          }
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaClient.listTopics got err: %s", e.getMessage());
        }
      }
    }
    

    Python

    Antes de probar este ejemplo, sigue las instrucciones de configuración de Python en Instala las bibliotecas cliente. Si deseas obtener más información, consulta la documentación de referencia de la API de Python de Managed Service for Apache Kafka.

    Para autenticarte en el servicio administrado para Apache Kafka, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta Configura ADC para un entorno de desarrollo local.

    from google.cloud import managedkafka_v1
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # cluster_id = "my-cluster"
    
    client = managedkafka_v1.ManagedKafkaClient()
    
    request = managedkafka_v1.ListTopicsRequest(
        parent=client.cluster_path(project_id, region, cluster_id),
    )
    
    response = client.list_topics(request=request)
    for topic in response:
        print("Got topic:", topic)
    

Próximos pasos

Apache Kafka® es una marca registrada de The Apache Software Foundation o sus afiliados en Estados Unidos y otros países.