Google Cloud Managed Service for Apache Kafka トピックを更新する

トピックを作成した後、トピック構成を編集して、これらのプロパティ(パーティションの数と、クラスタレベルですでに設定されているプロパティにデフォルト設定されていないトピック構成)を更新できます。パーティションの数を減らすことはできません。増やすことのみ可能です。

単一のトピックを更新するには、 Google Cloud コンソール、Google Cloud CLI、クライアント ライブラリ、Managed Kafka API、またはオープンソースの Apache Kafka API を使用します。

トピックの編集に必要なロールと権限

トピックの編集に必要な権限を取得するには、プロジェクトに対する Managed Kafka トピック編集者(roles/managedkafka.topicEditor)IAM ロールを付与するよう管理者に依頼してください。ロールの付与については、プロジェクト、フォルダ、組織へのアクセスを管理するをご覧ください。

この事前定義ロールには、トピックの編集に必要な権限が含まれています。必要とされる正確な権限については、「必要な権限」セクションを開いてご確認ください。

必要な権限

トピックを編集するには、次の権限が必要です。

  • トピックを更新する: managedkafka.topics.update

カスタムロールや他の事前定義ロールを使用して、これらの権限を取得することもできます。

このロールの詳細については、Managed Service for Apache Kafka の事前定義ロールをご覧ください。

トピックを編集する

トピックを編集する手順は次のとおりです。

コンソール

  1. Google Cloud コンソールで、[クラスタ] ページに移動します。

    [クラスタ] に移動

    プロジェクトで作成したクラスタが一覧表示されます。

  2. 編集するトピックが属するクラスタをクリックします。

    [クラスタの詳細] ページが開きます。クラスタの詳細ページの [リソース] タブに、トピックが一覧表示されます。

  3. 編集するトピックをクリックします。

    [トピックの詳細] ページが開きます。

  4. 編集するには、[編集] をクリックします。

  5. 変更したら、[保存] をクリックします。

gcloud

  1. Google Cloud コンソールで Cloud Shell をアクティブにします。

    Cloud Shell をアクティブにする

    Google Cloud コンソールの下部にある Cloud Shell セッションが開始し、コマンドライン プロンプトが表示されます。Cloud Shell はシェル環境です。Google Cloud CLI がすでにインストールされており、現在のプロジェクトの値もすでに設定されています。セッションが初期化されるまで数秒かかることがあります。

  2. gcloud managed-kafka topics update コマンドを実行します。

    gcloud managed-kafka topics update TOPIC_ID \
      --cluster=CLUSTER_ID \
      --location=LOCATION_ID \
      --partitions=PARTITIONS \
      --configs=CONFIGS
    

    このコマンドは、指定された Managed Service for Apache Kafka クラスタ内の既存のトピックの構成を変更します。このコマンドを使用すると、パーティションの数を増やし、トピック レベルの構成設定を更新できます。

    次のように置き換えます。

    • TOPIC_ID: トピックの ID。
    • CLUSTER_ID: トピックを含むクラスタの ID。
    • LOCATION_ID: クラスタのロケーション。
    • PARTITIONS: 省略可。トピックの更新されたパーティション数。パーティション数を減らすことはできません。増やすことのみ可能です。
    • CONFIGS: 省略可。更新する構成設定のリスト。Key-Value ペアのカンマ区切りのリストとして指定します。例: retention.ms=3600000,retention.bytes=10000000

REST

リクエストのデータを使用する前に、次のように置き換えます。

  • PROJECT_ID: 実際の Google Cloud プロジェクト ID
  • LOCATION: クラスタのロケーション
  • CLUSTER_ID: クラスタの ID
  • TOPIC_ID: トピックの ID
  • UPDATE_MASK: 更新するフィールド(完全修飾名のカンマ区切りリスト)。例: partitionCount
  • PARTITION_COUNT: トピックの更新されたパーティション数

HTTP メソッドと URL:

PATCH https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC_ID?updateMask=UPDATE_MASK

リクエストの本文(JSON):

{
  "name": "TOPIC_ID",
  "partitionCount": PARTITION_COUNT
}

リクエストを送信するには、次のいずれかのオプションを展開します。

次のような JSON レスポンスが返されます。

{
  "name": "projects/PROJECT_ID/locations/LOCATION/operations/OPERATION_ID",
  "metadata": {
    "@type": "type.googleapis.com/google.cloud.managedkafka.v1.OperationMetadata",
    "createTime": "CREATE_TIME",
    "target": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID",
    "verb": "update",
    "requestedCancellation": false,
    "apiVersion": "v1"
  },
  "done": false
}

Go

このサンプルを試す前に、 クライアント ライブラリをインストールするにある Go の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Go API のリファレンス ドキュメントをご覧ください。

Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報(ADC)を設定します。詳細については、ローカル開発環境の ADC の設定をご覧ください。

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/option"
	"google.golang.org/protobuf/types/known/fieldmaskpb"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
)

func updateTopic(w io.Writer, projectID, region, clusterID, topicID string, partitionCount int32, configs map[string]string, opts ...option.ClientOption) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// clusterID := "my-cluster"
	// topicID := "my-topic"
	// partitionCount := 20
	// configs := map[string]string{"min.insync.replicas":"1"}
	ctx := context.Background()
	client, err := managedkafka.NewClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
	}
	defer client.Close()

	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
	topicPath := fmt.Sprintf("%s/topics/%s", clusterPath, topicID)
	TopicConfig := managedkafkapb.Topic{
		Name:           topicPath,
		PartitionCount: partitionCount,
		Configs:        configs,
	}
	paths := []string{"partition_count", "configs"}
	updateMask := &fieldmaskpb.FieldMask{
		Paths: paths,
	}

	req := &managedkafkapb.UpdateTopicRequest{
		UpdateMask: updateMask,
		Topic:      &TopicConfig,
	}
	topic, err := client.UpdateTopic(ctx, req)
	if err != nil {
		return fmt.Errorf("client.UpdateTopic got err: %w", err)
	}
	fmt.Fprintf(w, "Updated topic: %#v\n", topic)
	return nil
}

Java

このサンプルを試す前に、 クライアント ライブラリをインストールするにある Java の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Java API リファレンス ドキュメントをご覧ください。

Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、 ローカル開発環境の ADC の設定をご覧ください。

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
import com.google.cloud.managedkafka.v1.Topic;
import com.google.cloud.managedkafka.v1.TopicName;
import com.google.cloud.managedkafka.v1.UpdateTopicRequest;
import com.google.protobuf.FieldMask;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

public class UpdateTopic {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String clusterId = "my-cluster";
    String topicId = "my-topic";
    int partitionCount = 200;
    Map<String, String> configs =
        new HashMap<String, String>() {
          {
            put("min.insync.replicas", "1");
          }
        };
    updateTopic(projectId, region, clusterId, topicId, partitionCount, configs);
  }

  public static void updateTopic(
      String projectId,
      String region,
      String clusterId,
      String topicId,
      int partitionCount,
      Map<String, String> configs)
      throws Exception {
    Topic topic =
        Topic.newBuilder()
            .setName(TopicName.of(projectId, region, clusterId, topicId).toString())
            .setPartitionCount(partitionCount)
            .putAllConfigs(configs)
            .build();
    String[] paths = {"partition_count", "configs"};
    FieldMask updateMask = FieldMask.newBuilder().addAllPaths(Arrays.asList(paths)).build();
    try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) {
      UpdateTopicRequest request =
          UpdateTopicRequest.newBuilder().setUpdateMask(updateMask).setTopic(topic).build();
      // This operation is being handled synchronously.
      Topic response = managedKafkaClient.updateTopic(request);
      System.out.printf("Updated topic: %s\n", response.getName());
    } catch (IOException | ApiException e) {
      System.err.printf("managedKafkaClient.updateCluster got err: %s", e.getMessage());
    }
  }
}

Python

このサンプルを試す前に、 クライアント ライブラリをインストールするの Python の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Python API リファレンス ドキュメントをご覧ください。

Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の ADC の設定をご覧ください。

from google.api_core.exceptions import NotFound
from google.cloud import managedkafka_v1
from google.protobuf import field_mask_pb2

# TODO(developer)
# project_id = "my-project-id"
# region = "us-central1"
# cluster_id = "my-cluster"
# topic_id = "my-topic"
# partition_count = 20
# configs = {"min.insync.replicas": "1"}

client = managedkafka_v1.ManagedKafkaClient()

topic = managedkafka_v1.Topic()
topic.name = client.topic_path(project_id, region, cluster_id, topic_id)
topic.partition_count = partition_count
topic.configs = configs
update_mask = field_mask_pb2.FieldMask()
update_mask.paths.extend(["partition_count", "configs"])

# For a list of editable fields, one can check https://cloud.google.com/managed-kafka/docs/create-topic#properties.
request = managedkafka_v1.UpdateTopicRequest(
    update_mask=update_mask,
    topic=topic,
)

try:
    response = client.update_topic(request=request)
    print("Updated topic:", response)
except NotFound as e:
    print(f"Failed to update topic {topic_id} with error: {e.message}")

メッセージ保持を構成する

Kafka はメッセージをログ セグメント ファイルに保存します。デフォルトでは、Kafka は保持期間が過ぎるか、パーティションがデータサイズしきい値を超えると、セグメント ファイルを削除します。この動作は、ログの圧縮を有効にすることで変更できます。ログ圧縮が有効になっている場合、Kafka は各キーの最新の値のみを保持します。

Google Cloud Managed Service for Apache Kafka は階層型ストレージを使用します。つまり、完了したログ セグメントはローカル ストレージではなく、リモートに保存されます。階層型ストレージの詳細については、Apache Kafka ドキュメントの階層型ストレージをご覧ください。

保持値を設定する

ログの圧縮が有効になっていない場合、次の設定によって Kafka がログ セグメント ファイルを保存する方法が制御されます。

  • retention.ms: セグメント ファイルを保存する最大時間(ミリ秒単位)。
  • retention.bytes: パーティションごとに保存する最大バイト数。パーティション内のデータがこの値を超えると、Kafka は古いセグメント ファイルを破棄します。

これらの設定を更新するには、gcloud CLI または Kafka CLI を使用します。

gcloud

メッセージの保持を設定するには、gcloud managed-kafka topics update コマンドを実行します。

gcloud managed-kafka topics update TOPIC_ID \
  --cluster=CLUSTER_ID \
  --location=LOCATION_ID \
  --configs=retention.ms=RETENTION_PERIOD,retention.bytes=MAX_BYTES

次のように置き換えます。

  • TOPIC_ID: トピックの ID。
  • CLUSTER_ID: トピックを含むクラスタの ID。
  • LOCATION_ID: クラスタのロケーション。
  • RETENTION_PERIOD: セグメント ファイルを保存する最大時間(ミリ秒単位)。
  • MAX_BYTES: パーティションごとに保存する最大バイト数。

Kafka CLI

このコマンドを実行する前に、Compute Engine VM に Kafka コマンドライン ツールをインストールします。VM は、Managed Service for Apache Kafka クラスタに接続されているサブネットに到達できる必要があります。 Kafka コマンドライン ツールを使用してメッセージを生成して使用するの手順に沿って操作します。

kafka-configs.sh コマンドを実行します。

kafka-configs.sh --alter \
  --bootstrap-server=BOOTSTRAP_ADDRESS \
  --command-config client.properties \
  --entity-type topics \
  --entity-name TOPIC_ID \
  --add-config retention.ms=RETENTION_PERIOD,retention.bytes=MAX_BYTES

次のように置き換えます。

  • BOOTSTRAP_ADDRESS: Managed Service for Apache Kafka クラスタのブートストラップ アドレス
  • TOPIC_ID: トピックの ID。
  • RETENTION_PERIOD: セグメント ファイルを保存する最大時間(ミリ秒単位)。
  • MAX_BYTES: パーティションごとに保存する最大バイト数。

ログの圧縮を有効にする

ログの圧縮が有効になっている場合、Kafka は各キーの最新のメッセージのみを保存します。ログの圧縮はデフォルトで無効になっています。トピックのログ圧縮を有効にするには、次のように cleanup.policy 構成を "compact" に設定します。

gcloud

gcloud managed-kafka topics update コマンドを実行します。

gcloud managed-kafka topics update TOPIC_ID \
  --cluster=CLUSTER_ID \
  --location=LOCATION_ID \
  --configs=cleanup.policy=compact

次のように置き換えます。

  • TOPIC_ID: トピックの ID。
  • CLUSTER_ID: トピックを含むクラスタの ID。
  • LOCATION_ID: クラスタのロケーション。

Kafka CLI

このコマンドを実行する前に、Compute Engine VM に Kafka コマンドライン ツールをインストールします。VM は、Managed Service for Apache Kafka クラスタに接続されているサブネットに到達できる必要があります。 Kafka コマンドライン ツールを使用してメッセージを生成して使用するの手順に沿って操作します。

kafka-configs.sh コマンドを実行します。

kafka-configs.sh --alter \
  --bootstrap-server=BOOTSTRAP_ADDRESS \
  --command-config client.properties \
  --entity-type topics \
  --entity-name TOPIC_ID \
  --add-config cleanup.policy=compact

次のように置き換えます。

制限事項

  • remote.storage.enable などのリモート ストレージのトピック構成をオーバーライドすることはできません。

  • segment.bytes などのログ セグメント ファイルのトピック構成をオーバーライドすることはできません。

  • トピックのログ圧縮を有効にすると、そのトピックの階層型ストレージが暗黙的に無効になります。トピックのすべてのログファイルはローカルに保存されます。

次のステップ