Google Cloud Managed Service for Apache Kafka のトピックを一覧表示する

クラスタ内のトピックを一覧表示するには、 Google Cloud コンソール、Google Cloud CLI、クライアント ライブラリ、Managed Kafka API、オープンソースの Apache Kafka API を使用します。

トピックを一覧表示するために必要なロールと権限

トピックを一覧表示するために必要な権限を取得するには、プロジェクトに対する Managed Kafka 閲覧者 roles/managedkafka.viewer)IAM ロールを付与するよう管理者に依頼してください。ロールの付与については、プロジェクト、フォルダ、組織に対するアクセス権の管理をご覧ください。

この事前定義ロールには、トピックのリストに必要な権限が含まれています。必要とされる正確な権限については、「必要な権限」セクションを開いてご確認ください。

必要な権限

トピックを一覧表示するには、次の権限が必要です。

  • トピックを一覧表示する: managedkafka.topics.list
  • トピックを取得する: managedkafka.topics.get

カスタムロールや他の事前定義ロールを使用して、これらの権限を取得することもできます。

Managed Kafka 閲覧者(roles/managedkafka.viewer)IAM ロールの詳細については、Managed Service for Apache Kafka の事前定義ロールをご覧ください。

トピックを一覧表示する

コンソール

  1. Google Cloud コンソールで、[クラスタ] ページに移動します。

    [クラスタ] に移動

    プロジェクトで作成したクラスタが一覧表示されます。

  2. トピックを表示するクラスタをクリックします。

    クラスタの詳細ページが表示されます。クラスタの詳細ページの [リソース] タブに、トピックが一覧表示されます。

gcloud

  1. Google Cloud コンソールで Cloud Shell をアクティブにします。

    Cloud Shell をアクティブにする

    Google Cloud コンソールの下部にある Cloud Shell セッションが開始し、コマンドライン プロンプトが表示されます。Cloud Shell はシェル環境です。Google Cloud CLI がすでにインストールされており、現在のプロジェクトの値もすでに設定されています。セッションが初期化されるまで数秒かかることがあります。

  2. gcloud managed-kafka topics list コマンドを実行します。

    gcloud managed-kafka topics list CLUSTER_ID \
        --location=LOCATION_ID \
        --limit=LIMIT
    

    このコマンドは、指定された Managed Service for Apache Kafka クラスタ内にあるすべてのトピックのリストを取得します。オプション フラグを使用して、出力をフィルタ、制限、並べ替えることができます。

    次のように置き換えます。

    • CLUSTER_ID: トピックを一覧表示するクラスタの名前。
    • LOCATION_ID: クラスタのロケーション。
    • LIMIT: 省略可。一覧表示するトピックの最大数。

Kafka CLI

このコマンドを実行する前に、Compute Engine VM に Kafka コマンドライン ツールをインストールします。VM は、Managed Service for Apache Kafka クラスタに接続されているサブネットに到達できる必要があります。 Kafka コマンドライン ツールでメッセージを生成して使用するの手順に沿って操作します。

次のように kafka-topics.sh コマンドを実行します。

kafka-topics.sh --list \
  --bootstrap-server=BOOTSTRAP_ADDRESS \
  --command-config client.properties

次のように置き換えます。

REST

リクエストのデータを使用する前に、次のように置き換えます。

  • PROJECT_ID: 実際の Google Cloud プロジェクト ID
  • LOCATION: クラスタのロケーション
  • CLUSTER_ID: クラスタの ID

HTTP メソッドと URL:

GET https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics

リクエストを送信するには、次のいずれかのオプションを展開します。

次のような JSON レスポンスが返されます。

{
  "topics": [
    {
      "name": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/__remote_log_metadata",
      "partitionCount": 50,
      "replicationFactor": 3,
      "configs": {
        "remote.storage.enable": "false",
        "cleanup.policy": "delete",
        "retention.ms": "-1"
      }
    },
    {
      "name": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC_ID",
      "partitionCount": 3,
      "replicationFactor": 3
    }
  ]
}

Go

このサンプルを試す前に、 クライアント ライブラリをインストールするにある Go の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Go API リファレンス ドキュメントをご覧ください。

Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報(ADC)を設定します。詳細については、ローカル開発環境の ADC を設定するをご覧ください。

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/iterator"
	"google.golang.org/api/option"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
)

func listTopics(w io.Writer, projectID, region, clusterID string, opts ...option.ClientOption) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// clusterID := "my-cluster"
	ctx := context.Background()
	client, err := managedkafka.NewClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
	}
	defer client.Close()

	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
	req := &managedkafkapb.ListTopicsRequest{
		Parent: clusterPath,
	}
	topicIter := client.ListTopics(ctx, req)
	for {
		res, err := topicIter.Next()
		if err == iterator.Done {
			break
		}
		if err != nil {
			return fmt.Errorf("topicIter.Next() got err: %w", err)
		}
		fmt.Fprintf(w, "Got topic: %v", res)
	}
	return nil
}

Java

このサンプルを試す前に、 クライアント ライブラリをインストールするにある Java の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Java API リファレンス ドキュメントをご覧ください。

Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、 ローカル開発環境の ADC の設定をご覧ください。

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.managedkafka.v1.ClusterName;
import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
import com.google.cloud.managedkafka.v1.Topic;
import java.io.IOException;

public class ListTopics {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String clusterId = "my-cluster";
    listTopics(projectId, region, clusterId);
  }

  public static void listTopics(String projectId, String region, String clusterId)
      throws Exception {
    try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) {
      ClusterName clusterName = ClusterName.of(projectId, region, clusterId);
      // This operation is being handled synchronously.
      for (Topic topic : managedKafkaClient.listTopics(clusterName).iterateAll()) {
        System.out.println(topic.getAllFields());
      }
    } catch (IOException | ApiException e) {
      System.err.printf("managedKafkaClient.listTopics got err: %s", e.getMessage());
    }
  }
}

Python

このサンプルを試す前に、 クライアント ライブラリをインストールするの Python の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Python API リファレンス ドキュメントをご覧ください。

Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の ADC の設定をご覧ください。

from google.cloud import managedkafka_v1

# TODO(developer)
# project_id = "my-project-id"
# region = "us-central1"
# cluster_id = "my-cluster"

client = managedkafka_v1.ManagedKafkaClient()

request = managedkafka_v1.ListTopicsRequest(
    parent=client.cluster_path(project_id, region, cluster_id),
)

response = client.list_topics(request=request)
for topic in response:
    print("Got topic:", topic)

次のステップ

Apache Kafka® は、Apache Software Foundation または米国その他の諸国における関連会社の商標です。