Google Cloud Managed Service for Apache Kafka トピックを表示する

単一のトピックに関する詳細情報を表示するには、 Google Cloud コンソール、Google Cloud CLI、クライアント ライブラリ、 Managed Kafka API、オープンソースの Apache Kafka API を使用します。

トピックを表示するために必要なロールと権限

トピックの表示に必要な権限を取得するには、プロジェクトに対する Managed Kafka 閲覧者 roles/managedkafka.viewer)IAM ロールの付与を管理者に依頼してください。ロールの付与については、プロジェクト、フォルダ、組織に対するアクセス権の管理をご覧ください。

この事前定義ロールには トピックを表示するために必要な権限が含まれています。必要とされる正確な権限については、「必要な権限」セクションを開いてご確認ください。

必要な権限

トピックを表示するには、次の権限が必要です。

  • トピックを一覧表示する: managedkafka.topics.list
  • トピックを取得する: managedkafka.topics.get

カスタムロールや他の事前定義ロールを使用して、これらの権限を取得することもできます。

Managed Kafka 閲覧者(roles/managedkafka.viewer)IAM ロールの詳細については、Managed Service for Apache Kafka の事前定義ロールをご覧ください。

コンソールでのトピックのプロパティ

コンソールでは、次のトピックのプロパティを表示できます。

  • 構成: このタブには、 トピックに関する一般的な構成の詳細が表示されます。たとえば、次のものがあります。

    • 名前: クラスタ内のトピックの一意の識別子。

    • パーティション: トピック内のパーティションの数。パーティションは、スケーラビリティと並列処理のためにトピックのデータをセグメントに分割します。

    • レプリカ: データの冗長性と可用性を確保するために、各 パーティションで維持されるコピー(レプリカ)の数。

    • クラスタ: トピックが属する Managed Service for Apache Kafka クラスタの名前。

    • リージョン: クラスタとトピックが配置されている Google Cloud リージョン。

    • デフォルト以外のトピック パラメータ: クラスタ全体のデフォルトとは異なる、トピックに設定されているトピックレベルの構成 オーバーライド。

  • モニタリング: このタブには、トピックのアクティビティとパフォーマンスに関連する主要な指標を示すグラフが表示されます。これらのグラフには、次のものがあります。

    • バイト数: バイトが 生成されるレート、またはトピックに送信されるレートを示す時系列グラフ。これは、トピックに公開されたデータの量を経時的に示します。対応する指標は managedkafka.googleapis.com/byte_in_count です。

    • リクエスト数: トピックに対して行われたリクエストのレートを表す時系列グラフ。トピックの全体的なアクティビティと使用状況を反映します。関連する指標は managedkafka.googleapis.com/topic_request_count です。

    • パーティション別のログ セグメント: このグラフには、トピック内の各パーティションのアクティブな ログ セグメントの数が表示されます。ログ セグメントは、Kafka がトピックデータを保存するディスク上の物理ファイルです。関連する指標は managedkafka.googleapis.com/log_segments です。

  • コンシューマー グループ: このセクションには、トピックに 登録されているコンシューマー グループが一覧表示されます。コンシューマー グループは、トピックからメッセージを読み取るために連携して動作するコンシューマーのセットです。

トピックを表示する

コンソール

  1. コンソールで、[クラスタ] ページに移動します。 Google Cloud

    [クラスタ] に移動

    プロジェクトで作成したクラスタが一覧表示されます。

  2. トピックを表示するクラスタをクリックします。

    クラスタの詳細ページが表示されます。クラスタの詳細ページの [リソース] タブに、トピックが一覧表示されます。

  3. 特定のトピックを表示するには、トピック名をクリックします。

    トピックの詳細ページが表示されます。

gcloud

  1. コンソールで Cloud Shell をアクティブにします。 Google Cloud

    Cloud Shell をアクティブにする

    コンソールの下部にある Google Cloud Cloud Shell セッションが開始し、コマンドライン プロンプトが表示されます。Cloud Shell はシェル環境です 。Google Cloud CLI がすでにインストールされており、現在のプロジェクトの値もすでに設定されています 。セッションが初期化されるまで数秒かかることがあります。

  2. gcloud managed-kafka topics describe コマンドを実行します。

    gcloud managed-kafka topics describe TOPIC_ID \
      --cluster=CLUSTER_ID --location=LOCATION_ID
    

    このコマンドは、指定したトピックに関する包括的な詳細情報を取得して表示します。この情報には、パーティションの数、レプリケーション係数、トピックレベルの構成オーバーライドなど、構成設定が含まれます。

    次のように置き換えます。

    • TOPIC_ID: トピックの ID。
    • CLUSTER_ID: トピックを含むクラスタの ID。
    • LOCATION_ID: クラスタのロケーション。

gcloud managed-kafka topics describe コマンドは、パーティション数やレプリケーション係数など、トピックに関する最小限の情報を表示します。パーティションの割り当てや構成設定の完全なセットなど、詳細情報を取得するには、kafka-topics.sh コマンドライン ツールを使用します。

Kafka CLI

このコマンドを実行する前に、Compute Engine VM に Kafka コマンドライン ツールをインストールします。VM は、Managed Service for Apache Kafka クラスタに接続されているサブネットに到達できる必要があります。 Kafka コマンドライン ツールでメッセージの生成と消費の手順に沿って操作します。

トピックの詳細を表示するには、kafka-topics.sh --describe コマンドを実行します。

kafka-topics.sh --describe \
  --bootstrap-server=BOOTSTRAP_ADDRESS \
  --command-config client.properties \
  --topic TOPIC_ID

次のように置き換えます。

このコマンドは、トピックのプロパティのサブセットを返します。たとえば、次のものがあります。

  • パーティション数
  • レプリケーション係数
  • パーティションの割り当て
  • 動的構成(明示的に設定した設定)
  • 静的構成(クラスタの起動時に適用される設定)

デフォルト値の設定など、トピックの構成設定の完全なセットを表示するには、kafka-configs.sh --describe コマンドを実行します。

kafka-configs.sh --describe \
--bootstrap-server=BOOTSTRAP_ADDRESS \
--command-config client.properties \
--entity-type topics \
--entity-name TOPIC_ID \
--all

出力は、Key-Value ペアとしての設定のリストです。--all フラグは、すべての構成設定を返します。動的構成設定のみのリストを取得するには、--all フラグを省略します。

REST

リクエストのデータを使用する前に、 次のように置き換えます。

  • PROJECT_ID: あなたの Google Cloud プロジェクト ID
  • LOCATION: クラスタのロケーション
  • CLUSTER_ID: クラスタの ID
  • TOPIC_ID: トピックの ID

HTTP メソッドと URL:

GET https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC_ID

リクエストを送信するには、次のいずれかのオプションを展開します。

次のような JSON レスポンスが返されます。

{
  "name": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC_ID",
  "partitionCount": PARTITION_COUNT,
  "replicationFactor": REPLICATION_FACTOR
}

Go

このサンプルを試す前に、 クライアント ライブラリをインストールするにある Go の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Go API リファレンス ドキュメントをご覧ください

Managed Service for Apache Kafka の認証を行うには、アプリケーションのデフォルト認証情報(ADC)を設定します。 詳細については、 ローカル開発環境の ADC の設定をご覧ください。

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/option"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
)

func getTopic(w io.Writer, projectID, region, clusterID, topicID string, opts ...option.ClientOption) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// clusterID := "my-cluster"
	// topicID := "my-topic"
	ctx := context.Background()
	client, err := managedkafka.NewClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
	}
	defer client.Close()

	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
	topicPath := fmt.Sprintf("%s/topics/%s", clusterPath, topicID)
	req := &managedkafkapb.GetTopicRequest{
		Name: topicPath,
	}
	topic, err := client.GetTopic(ctx, req)
	if err != nil {
		return fmt.Errorf("client.GetTopic got err: %w", err)
	}
	fmt.Fprintf(w, "Got topic: %#v\n", topic)
	return nil
}

Java

このサンプルを試す前に、 クライアント ライブラリをインストールするにある Java の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Java API リファレンス ドキュメントをご覧ください。

Managed Service for Apache Kafka の認証を行うには、アプリケーションのデフォルト認証情報を設定します。 詳細については、 ローカル開発環境の ADC の設定をご覧ください。

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
import com.google.cloud.managedkafka.v1.Topic;
import com.google.cloud.managedkafka.v1.TopicName;
import java.io.IOException;

public class GetTopic {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String clusterId = "my-cluster";
    String topicId = "my-topic";
    getTopic(projectId, region, clusterId, topicId);
  }

  public static void getTopic(String projectId, String region, String clusterId, String topicId)
      throws Exception {
    try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) {
      // This operation is being handled synchronously.
      Topic topic =
          managedKafkaClient.getTopic(TopicName.of(projectId, region, clusterId, topicId));
      System.out.println(topic.getAllFields());
    } catch (IOException | ApiException e) {
      System.err.printf("managedKafkaClient.getTopic got err: %s", e.getMessage());
    }
  }
}

Python

このサンプルを試す前に、 クライアント ライブラリをインストールするにある Python の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Python API リファレンス ドキュメントをご覧ください。

Managed Service for Apache Kafka の認証を行うには、アプリケーションのデフォルト認証情報を設定します。 詳細については、 ローカル開発環境の ADC の設定をご覧ください。

from google.api_core.exceptions import NotFound
from google.cloud import managedkafka_v1

# TODO(developer)
# project_id = "my-project-id"
# region = "us-central1"
# cluster_id = "my-cluster"
# topic_id = "my-topic"

client = managedkafka_v1.ManagedKafkaClient()

topic_path = client.topic_path(project_id, region, cluster_id, topic_id)
request = managedkafka_v1.GetTopicRequest(
    name=topic_path,
)

try:
    topic = client.get_topic(request=request)
    print("Got topic:", topic)
except NotFound as e:
    print(f"Failed to get topic {topic_id} with error: {e.message}")

次のステップ