Mencantumkan topik Google Cloud Managed Service for Apache Kafka Anda

Untuk mencantumkan topik dalam cluster, Anda dapat menggunakan Google Cloud konsol, Google Cloud CLI, library klien, Managed Kafka API, atau Apache Kafka API open source.

Peran dan izin yang diperlukan untuk mencantumkan topik Anda

Untuk mendapatkan izin yang diperlukan guna membuat daftar topik, minta administrator untuk memberi Anda peran IAM Managed Kafka Viewer (roles/managedkafka.viewer) di project Anda. Untuk mengetahui informasi selengkapnya tentang cara memberikan peran, lihat Mengelola akses ke project, folder, dan organisasi.

Peran bawaan ini berisi izin yang diperlukan untuk mencantumkan topik Anda. Untuk melihat izin yang benar-benar diperlukan, perluas bagian Izin yang diperlukan:

Izin yang diperlukan

Izin berikut diperlukan untuk mencantumkan topik Anda:

  • Daftar topik: managedkafka.topics.list
  • Dapatkan topik: managedkafka.topics.get

Anda mungkin juga bisa mendapatkan izin ini dengan peran khusus atau peran bawaan lainnya.

Untuk mengetahui informasi selengkapnya tentang peran IAM Managed Kafka Viewer (roles/managedkafka.viewer), lihat Peran bawaan Managed Service for Apache Kafka.

Mencantumkan topik Anda

Konsol

  1. Di konsol Google Cloud , buka halaman Clusters.

    Buka Cluster

    Cluster yang Anda buat dalam project akan dicantumkan.

  2. Klik cluster yang topiknya ingin Anda lihat.

    Halaman detail cluster akan ditampilkan. Di halaman detail cluster, untuk tab Resources, topik akan dicantumkan.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Jalankan perintah gcloud managed-kafka topics list:

    gcloud managed-kafka topics list CLUSTER_ID \
        --location=LOCATION_ID \
        --limit=LIMIT
    

    Perintah ini mengambil daftar semua topik yang ada dalam cluster Managed Service for Apache Kafka yang ditentukan. Anda dapat menggunakan tanda opsional untuk memfilter, membatasi, dan mengurutkan output.

    Ganti kode berikut:

    • CLUSTER_ID: Nama cluster yang topiknya ingin Anda cantumkan.
    • LOCATION_ID: Lokasi cluster.
    • LIMIT: (Opsional) Jumlah maksimum topik yang akan dicantumkan.
  3. Kafka CLI

    Sebelum menjalankan perintah ini, instal alat command line Kafka di VM Compute Engine. VM harus dapat menjangkau subnet yang terhubung ke cluster Managed Service untuk Apache Kafka. Ikuti petunjuk di Membuat dan menggunakan pesan dengan alat command line Kafka.

    Jalankan perintah kafka-topics.sh sebagai berikut:

    kafka-topics.sh --list \
      --bootstrap-server=BOOTSTRAP_ADDRESS \
      --command-config client.properties
    

    Ganti kode berikut:

    • BOOTSTRAP_ADDRESS: Alamat bootstrap cluster Managed Service untuk Apache Kafka.

    REST

    Sebelum menggunakan salah satu data permintaan, lakukan penggantian berikut:

    • PROJECT_ID: Project ID Google Cloud Anda
    • LOCATION: lokasi cluster
    • CLUSTER_ID: ID cluster

    Metode HTTP dan URL:

    GET https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics

    Untuk mengirim permintaan Anda, perluas salah satu opsi berikut:

    Anda akan melihat respons JSON seperti berikut:

    {
      "topics": [
        {
          "name": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/__remote_log_metadata",
          "partitionCount": 50,
          "replicationFactor": 3,
          "configs": {
            "remote.storage.enable": "false",
            "cleanup.policy": "delete",
            "retention.ms": "-1"
          }
        },
        {
          "name": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC_ID",
          "partitionCount": 3,
          "replicationFactor": 3
        }
      ]
    }
    
    

    Go

    Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Go di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Managed Service for Apache Kafka Go API.

    Untuk melakukan autentikasi ke Managed Service untuk Apache Kafka, siapkan Kredensial Default Aplikasi(ADC). Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/iterator"
    	"google.golang.org/api/option"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func listTopics(w io.Writer, projectID, region, clusterID string, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// clusterID := "my-cluster"
    	ctx := context.Background()
    	client, err := managedkafka.NewClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
    	}
    	defer client.Close()
    
    	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
    	req := &managedkafkapb.ListTopicsRequest{
    		Parent: clusterPath,
    	}
    	topicIter := client.ListTopics(ctx, req)
    	for {
    		res, err := topicIter.Next()
    		if err == iterator.Done {
    			break
    		}
    		if err != nil {
    			return fmt.Errorf("topicIter.Next() got err: %w", err)
    		}
    		fmt.Fprintf(w, "Got topic: %v", res)
    	}
    	return nil
    }
    

    Java

    Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Java di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Managed Service for Apache Kafka Java API.

    Untuk melakukan autentikasi ke Managed Service for Apache Kafka, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.

    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ClusterName;
    import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
    import com.google.cloud.managedkafka.v1.Topic;
    import java.io.IOException;
    
    public class ListTopics {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-cluster";
        listTopics(projectId, region, clusterId);
      }
    
      public static void listTopics(String projectId, String region, String clusterId)
          throws Exception {
        try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) {
          ClusterName clusterName = ClusterName.of(projectId, region, clusterId);
          // This operation is being handled synchronously.
          for (Topic topic : managedKafkaClient.listTopics(clusterName).iterateAll()) {
            System.out.println(topic.getAllFields());
          }
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaClient.listTopics got err: %s", e.getMessage());
        }
      }
    }
    

    Python

    Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Python di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Managed Service for Apache Kafka Python API.

    Untuk melakukan autentikasi ke Managed Service for Apache Kafka, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.

    from google.cloud import managedkafka_v1
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # cluster_id = "my-cluster"
    
    client = managedkafka_v1.ManagedKafkaClient()
    
    request = managedkafka_v1.ListTopicsRequest(
        parent=client.cluster_path(project_id, region, cluster_id),
    )
    
    response = client.list_topics(request=request)
    for topic in response:
        print("Got topic:", topic)
    

Apa langkah selanjutnya?

Apache Kafka® adalah merek dagang terdaftar milik The Apache Software Foundation atau afiliasinya di Amerika Serikat dan/atau negara lain.