トピックを作成したら、トピック構成を編集して、パーティション数と、クラスタレベルですでに設定されているプロパティにデフォルト設定されていないトピック構成を更新できます。パーティション数は増やすことはできますが、減らすことはできません。
単一のトピックを更新するには、 Google Cloud コンソール、Google Cloud CLI、 クライアント ライブラリ、Managed Kafka API、オープンソースの Apache Kafka API を使用します。
トピックを編集するために必要なロールと権限
トピックの編集に必要な権限を取得するには、プロジェクトに対する Managed Kafka トピック編集者(roles/managedkafka.topicEditor)IAM ロールを付与するよう管理者に依頼してください。ロールの付与については、プロジェクト、フォルダ、組織へのアクセスを管理するをご覧ください。
この事前定義ロールには トピックの編集に必要な権限が含まれています。必要とされる正確な権限については、「必要な権限」セクションを開いてご確認ください。
必要な権限
トピックを編集するには、次の権限が必要です。
-
トピックを更新する:
managedkafka.topics.update
カスタムロールや他の事前定義ロールを使用して、これらの権限を取得することもできます。
このロールの詳細については、 Managed Service for Apache Kafka の事前定義ロールをご覧ください。
トピックを編集する
トピックを編集する方法は次のとおりです。
コンソール
コンソールで、[クラスタ] ページに移動します。 Google Cloud
プロジェクトで作成したクラスタが一覧表示されます。
編集するトピックが属するクラスタをクリックします。
[クラスタの詳細] ページが開きます。クラスタの詳細ページの [リソース] タブに、トピックが一覧表示されます。
編集するトピックをクリックします。
[トピックの詳細] ページが開きます。
編集を行うには、[編集] をクリックします。
変更後、[保存] をクリックします。
gcloud
-
コンソールで Cloud Shell をアクティブにします。 Google Cloud
コンソールの下部にある Google Cloud Cloud Shell セッションが開始し、コマンドライン プロンプトが表示されます。Cloud Shell はシェル環境です 。Google Cloud CLI がすでにインストールされており、現在のプロジェクトの値もすでに設定されています 。セッションが初期化されるまで数秒かかることがあります。
gcloud managed-kafka topics updateコマンドを実行します。gcloud managed-kafka topics update TOPIC_ID \ --cluster=CLUSTER_ID \ --location=LOCATION_ID \ --partitions=PARTITIONS \ --configs=CONFIGSこのコマンドは、指定された Managed Service for Apache Kafka クラスタ内の既存のトピックの構成を変更します。このコマンドを使用すると、パーティション数を増やし、トピックレベルの構成設定を更新できます。
次のように置き換えます。
- TOPIC_ID: トピックの ID。
- CLUSTER_ID: トピックを含むクラスタの ID。
- LOCATION_ID: クラスタのロケーション。
- PARTITIONS:省略可。トピックのパーティション数の更新。パーティション数は増やすことはできますが、減らすことはできません。
- CONFIGS:省略可。更新する構成設定のリスト。Key-Value ペアのカンマ区切りのリストとして指定します。例:
retention.ms=3600000,retention.bytes=10000000
REST
リクエストのデータを使用する前に、 次のように置き換えます。
-
PROJECT_ID: あなたの Google Cloud プロジェクト ID -
LOCATION: クラスタのロケーション -
CLUSTER_ID: クラスタの ID -
TOPIC_ID: トピックの ID -
UPDATE_MASK: 更新するフィールド。完全修飾名のカンマ区切りのリスト。例:partitionCount -
PARTITION_COUNT: トピックのパーティション数の更新
HTTP メソッドと URL:
PATCH https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC_ID?updateMask=UPDATE_MASK
リクエストの本文(JSON):
{
"name": "TOPIC_ID",
"partitionCount": PARTITION_COUNT
}
リクエストを送信するには、次のいずれかのオプションを展開します。
次のような JSON レスポンスが返されます。
{
"name": "projects/PROJECT_ID/locations/LOCATION/operations/OPERATION_ID",
"metadata": {
"@type": "type.googleapis.com/google.cloud.managedkafka.v1.OperationMetadata",
"createTime": "CREATE_TIME",
"target": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID",
"verb": "update",
"requestedCancellation": false,
"apiVersion": "v1"
},
"done": false
}
Go
このサンプルを試す前に、 クライアント ライブラリをインストールするにある Go の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Go API リファレンス ドキュメントをご覧ください。
Managed Service for Apache Kafka の認証を行うには、アプリケーションのデフォルト認証情報(ADC)を設定します。 詳細については、 ローカル開発環境の ADC の設定をご覧ください。
Java
このサンプルを試す前に、 クライアント ライブラリをインストールするにある Java の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Java API リファレンス ドキュメントをご覧ください。
Managed Service for Apache Kafka の認証を行うには、アプリケーションのデフォルト認証情報を設定します。 詳細については、 ローカル開発環境の ADC の設定をご覧ください。
Python
このサンプルを試す前に、 クライアント ライブラリをインストールするにある Python の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Python API リファレンス ドキュメントをご覧ください。
Managed Service for Apache Kafka の認証を行うには、アプリケーションのデフォルト認証情報を設定します。 詳細については、 ローカル開発環境の ADC の設定をご覧ください。
メッセージ保持を構成する
Kafka はメッセージをログ セグメント ファイル に保存します。デフォルトでは、Kafka は保持期間が経過した後、またはパーティションがデータサイズの上限を超えたときにセグメント ファイルを削除します。この動作は、 ログ圧縮を有効にすることで変更できます。ログ圧縮が有効になっている場合、Kafka は各キーの最新の値のみを保持します。
Google Cloud Managed Service for Apache Kafka は 階層型ストレージ を使用します。つまり、完了した ログ セグメントはローカル ストレージではなく、リモートに保存されます。階層型ストレージの詳細については、 Apache Kafka ドキュメントの 階層型ストレージをご覧ください。
保持値を設定する
ログ圧縮が有効になっていない場合、Kafka がログ セグメント ファイルを保存する方法は次の設定で制御されます。
retention.ms: セグメント ファイルを保存する最大時間(ミリ秒)。retention.bytes: パーティションごとに保存する最大バイト数。パーティション内のデータがこの値を超えると、Kafka は古いセグメント ファイルを破棄します。
これらの設定を更新するには、gcloud CLI または Kafka CLI を使用します。
gcloud
メッセージ保持を設定するには、
gcloud managed-kafka topics update
コマンドを実行します。
gcloud managed-kafka topics update TOPIC_ID \
--cluster=CLUSTER_ID \
--location=LOCATION_ID \
--configs=retention.ms=RETENTION_PERIOD,retention.bytes=MAX_BYTES
次のように置き換えます。
- TOPIC_ID: トピックの ID。
- CLUSTER_ID: トピックを含むクラスタの ID。
- LOCATION_ID: クラスタのロケーション。
- RETENTION_PERIOD:セグメント ファイルを保存する最大時間(ミリ秒)。
- MAX_BYTES: パーティションごとに保存する最大バイト数。
Kafka CLI
このコマンドを実行する前に、Compute Engine VM に Kafka コマンドライン ツールをインストールします。VM は、Managed Service for Apache Kafka クラスタに接続されているサブネットに到達できる必要があります。 Kafka コマンドライン ツールを使用してメッセージを生成して使用するの手順に沿って操作します。
kafka-configs.sh コマンドを実行します。
kafka-configs.sh --alter \
--bootstrap-server=BOOTSTRAP_ADDRESS \
--command-config client.properties \
--entity-type topics \
--entity-name TOPIC_ID \
--add-config retention.ms=RETENTION_PERIOD,retention.bytes=MAX_BYTES
次のように置き換えます。
- BOOTSTRAP_ADDRESS: Managed Service for Apache Kafka クラスタの ブートストラップ アドレス 。
- TOPIC_ID: トピックの ID。
- RETENTION_PERIOD:セグメント ファイルを保存する最大時間(ミリ秒)。
- MAX_BYTES: パーティションごとに保存する最大バイト数。
ログ圧縮を有効にする
ログ圧縮が有効になっている場合、Kafka は各キーの最新のメッセージのみを保存します。ログ圧縮はデフォルトで無効になっています。トピックのログ圧縮を有効にするには、次のように cleanup.policy 構成を "compact" に設定します。
gcloud
gcloud managed-kafka topics update
コマンドを実行します。
gcloud managed-kafka topics update TOPIC_ID \
--cluster=CLUSTER_ID \
--location=LOCATION_ID \
--configs=cleanup.policy=compact
次のように置き換えます。
- TOPIC_ID: トピックの ID。
- CLUSTER_ID: トピックを含むクラスタの ID。
- LOCATION_ID: クラスタのロケーション。
Kafka CLI
このコマンドを実行する前に、Compute Engine VM に Kafka コマンドライン ツールをインストールします。VM は、Managed Service for Apache Kafka クラスタに接続されているサブネットに到達できる必要があります。 Kafka コマンドライン ツールを使用してメッセージを生成して使用するの手順に沿って操作します。
kafka-configs.sh コマンドを実行します。
kafka-configs.sh --alter \
--bootstrap-server=BOOTSTRAP_ADDRESS \
--command-config client.properties \
--entity-type topics \
--entity-name TOPIC_ID \
--add-config cleanup.policy=compact
次のように置き換えます。
- BOOTSTRAP_ADDRESS: Managed Service for Apache Kafka クラスタの ブートストラップ アドレス 。
- TOPIC_ID: トピックの ID。
制限事項
remote.storage.enableなど、リモート ストレージのトピック構成をオーバーライドすることはできません。segment.bytesなど、ログ セグメント ファイルのトピック構成をオーバーライドすることはできません。トピックのログ圧縮を有効にすると、そのトピックの階層型ストレージが暗黙的に無効になります。トピックのすべてのログファイルはローカルに保存されます。