Enumera tus temas de Managed Service para Apache Kafka de Google Cloud

Para enumerar tus temas en un clúster, puedes usar la Google Cloud consola de , la Google Cloud CLI, la biblioteca cliente, la API de Managed Kafka o las APIs de Apache Kafka de código abierto.

Roles y permisos obligatorios para enumerar tus temas

Si quieres obtener los permisos que necesitas para enumerar tus temas, pídele a tu administrador que te otorgue el Visualizador de Managed Kafka (roles/managedkafka.viewer) rol de IAM en tu proyecto. Para obtener más información sobre cómo otorgar roles, consulta Administra el acceso a proyectos, carpetas y organizaciones.

Este rol predefinido contiene los permisos necesarios para enumerar tus temas. Para ver los permisos exactos que son necesarios, expande la sección Permisos requeridos:

Permisos necesarios

Se requieren los siguientes permisos para enumerar tus temas:

  • Enumerar temas: managedkafka.topics.list
  • Obtener tema: managedkafka.topics.get

También puedes obtener estos permisos con roles personalizados o otros roles predefinidos.

Enumera tus temas

Console

  1. En la Google Cloud consola de, ve a la página Clústeres.

    Ir a los clústeres

    Se enumeran los clústeres que creaste en un proyecto.

  2. Haz clic en el clúster para el que deseas ver los temas.

    Se muestra la página de detalles del clúster. En la página de detalles del clúster, en la pestaña Recursos, se enumeran los temas.

gcloud

  1. En la Google Cloud consola de, activa Cloud Shell.

    Activa Cloud Shell

    En la parte inferior de la Google Cloud consola de, se inicia una sesión de Cloud Shell en la que se muestra una ventana de línea de comandos. Cloud Shell es un entorno de shell con Google Cloud CLI ya instalada y con valores ya establecidos para el proyecto actual. La sesión puede tardar unos segundos en inicializarse.

  2. Ejecuta el gcloud managed-kafka topics list comando:

    gcloud managed-kafka topics list CLUSTER_ID \
        --location=LOCATION_ID \
        --limit=LIMIT
    

    Este comando recupera una lista de todos los temas presentes en el clúster de Managed Service para Apache Kafka especificado. Puedes usar marcas opcionales para filtrar, limitar y ordenar el resultado.

    Reemplaza lo siguiente:

    • CLUSTER_ID: Es el nombre del clúster cuyos temas deseas enumerar.
    • LOCATION_ID: Es la ubicación del clúster.
    • LIMIT: (Opcional) Es la cantidad máxima de temas que se enumerarán.

CLI de Kafka

Antes de ejecutar este comando, instala las herramientas de línea de comandos de Kafka en una VM de Compute Engine. La VM debe poder comunicarse con una subred conectada a tu clúster de Managed Service para Apache Kafka. Sigue las instrucciones en Produce y consume mensajes con las herramientas de línea de comandos de Kafka.

Ejecuta el comando kafka-topics.sh de la siguiente manera:

kafka-topics.sh --list \
  --bootstrap-server=BOOTSTRAP_ADDRESS \
  --command-config client.properties

Reemplaza lo siguiente:

REST

Antes de usar cualquiera de los datos de solicitud a continuación, realiza los siguientes reemplazos:

  • PROJECT_ID: Es el ID de tu Google Cloud proyecto.
  • LOCATION: Es la ubicación del clúster.
  • CLUSTER_ID: Es el ID del clúster.

Método HTTP y URL:

GET https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics

Para enviar tu solicitud, expande una de estas opciones:

Deberías recibir una respuesta JSON similar a la que se muestra a continuación:

{
  "topics": [
    {
      "name": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/__remote_log_metadata",
      "partitionCount": 50,
      "replicationFactor": 3,
      "configs": {
        "remote.storage.enable": "false",
        "cleanup.policy": "delete",
        "retention.ms": "-1"
      }
    },
    {
      "name": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC_ID",
      "partitionCount": 3,
      "replicationFactor": 3
    }
  ]
}

Go

Antes de probar este ejemplo, sigue las instrucciones de configuración de Go en Instala las bibliotecas cliente. Para obtener más información, consulta la documentación de referencia de la API de Managed Service para Apache Kafka para Go.

Para autenticarte en Managed Service para Apache Kafka, configura las credenciales predeterminadas de la aplicación(ADC). Para obtener más información, consulta Configura ADC para un entorno de desarrollo local.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/iterator"
	"google.golang.org/api/option"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
)

func listTopics(w io.Writer, projectID, region, clusterID string, opts ...option.ClientOption) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// clusterID := "my-cluster"
	ctx := context.Background()
	client, err := managedkafka.NewClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
	}
	defer client.Close()

	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
	req := &managedkafkapb.ListTopicsRequest{
		Parent: clusterPath,
	}
	topicIter := client.ListTopics(ctx, req)
	for {
		res, err := topicIter.Next()
		if err == iterator.Done {
			break
		}
		if err != nil {
			return fmt.Errorf("topicIter.Next() got err: %w", err)
		}
		fmt.Fprintf(w, "Got topic: %v", res)
	}
	return nil
}

Java

Antes de probar este ejemplo, sigue las instrucciones de configuración de Java en Instala las bibliotecas cliente. Para obtener más información, consulta la documentación de referencia de la API de Managed Service para Apache Kafka para Java.

Para autenticarte en Managed Service para Apache Kafka, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta Configura ADC para un entorno de desarrollo local.

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.managedkafka.v1.ClusterName;
import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
import com.google.cloud.managedkafka.v1.Topic;
import java.io.IOException;

public class ListTopics {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String clusterId = "my-cluster";
    listTopics(projectId, region, clusterId);
  }

  public static void listTopics(String projectId, String region, String clusterId)
      throws Exception {
    try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) {
      ClusterName clusterName = ClusterName.of(projectId, region, clusterId);
      // This operation is being handled synchronously.
      for (Topic topic : managedKafkaClient.listTopics(clusterName).iterateAll()) {
        System.out.println(topic.getAllFields());
      }
    } catch (IOException | ApiException e) {
      System.err.printf("managedKafkaClient.listTopics got err: %s", e.getMessage());
    }
  }
}

Python

Antes de probar este ejemplo, sigue las instrucciones de configuración de Python en Instala las bibliotecas cliente. Para obtener más información, consulta la documentación de referencia de la API de Python de Managed Service para Apache Kafka.

Para autenticarte en Managed Service para Apache Kafka, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta Configura ADC para un entorno de desarrollo local.

from google.cloud import managedkafka_v1

# TODO(developer)
# project_id = "my-project-id"
# region = "us-central1"
# cluster_id = "my-cluster"

client = managedkafka_v1.ManagedKafkaClient()

request = managedkafka_v1.ListTopicsRequest(
    parent=client.cluster_path(project_id, region, cluster_id),
)

response = client.list_topics(request=request)
for topic in response:
    print("Got topic:", topic)

Próximos pasos

Apache Kafka® es una marca registrada de The Apache Software Foundation o sus afiliados de Estados Unidos y otros países.