Google Cloud Managed Service for Apache Kafka コンシューマー グループを更新して、トピック パーティションのリストのオフセットを変更できます。これにより、グループ内のコンシューマーが受信するメッセージを制御できます。
コンシューマー グループを更新するには、Google Cloud CLI、クライアント ライブラリ、Managed Kafka API、またはオープンソースの Apache Kafka API を使用します。 Google Cloud コンソールでは、コンシューマー グループの編集はサポートされていません。
始める前に
コンシューマー グループを更新するには、まずメッセージがアクティブに消費されていないことを確認します。コンシューマー グループがメッセージを一度も使用していない場合、または最後にコミットされたオフセットが offsets.retention.minutes 後に期限切れになった場合、Kafka によって自動的に削除されます。
コンシューマー グループを更新する前に、次の手順を行います。
コンシューマー グループがメッセージを読み取っているトピックにメッセージを送信します。
コンシューマー グループを起動して、いくつかのメッセージを処理します。
すべてのお客様がメッセージを消費しないようにします。コンシューマーを停止するには、Control+C キーを押します。
メッセージの送信と使用の詳細については、Kafka コマンドライン ツールを使用してメッセージを生成して使用するをご覧ください。
コンシューマー グループの更新に必要なロールと権限
コンシューマー グループの編集に必要な権限を取得するには、プロジェクトに対する Managed Kafka コンシューマー グループ編集者 (roles/managedkafka.consumerGroupEditor)IAM ロールを付与するよう管理者に依頼してください。ロールの付与については、プロジェクト、フォルダ、組織に対するアクセス権の管理をご覧ください。
この事前定義ロールには、コンシューマー グループの編集に必要な権限が含まれています。必要とされる正確な権限については、「必要な権限」セクションを開いてご確認ください。
必要な権限
コンシューマー グループを編集するには、次の権限が必要です。
-
コンシューマー グループを更新します。
managedkafka.consumerGroups.update
カスタムロールや他の事前定義ロールを使用して、これらの権限を取得することもできます。
Managed Kafka コンシューマー グループ編集者ロールの詳細については、Managed Service for Apache Kafka の事前定義ロールをご覧ください。
サービス エージェントに読み取りアクセス権を付与する
コンシューマー グループのオフセットを更新するには、サービス エージェントにトピックとコンシューマー グループのリソースに対する READ オペレーションのアクセス権が必要です。このアクセスは、Apache Kafka ACL で構成されます。
クラスタ内のコンシューマー グループとそのトピックに対して Apache Kafka ACL を構成していない場合、サービス エージェントはこれらのリソースにアンビエント アクセスできます。このセクションはスキップできます。
クラスタ内のコンシューマー グループとそのトピックに Apache Kafka ACL が構成されている場合、サービス エージェントには両方のリソースに対する READ オペレーションの明示的な ACL アクセス権が必要です。これを行うには、関連するコンシューマー グループとトピックに対する READ オペレーションへのアクセス権をサービス エージェントに付与する ACL エントリを追加します。手順は次のとおりです。
-
Install the Google Cloud CLI.
-
If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.
-
To initialize the gcloud CLI, run the following command:
gcloud init gcloud managed-kafka acls add-acl-entryコマンドを実行します。gcloud managed-kafka acls add-acl-entry CONSUMER_GROUP_ACL_ID \ --cluster=CLUSTER_ID \ --location=LOCATION \ --principal=User:__AUTH_TOKEN__service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com \ --operation=READ --permission-type=ALLOW --host=* gcloud managed-kafka acls add-acl-entry TOPIC_ACL_ID \ --cluster=CLUSTER_ID \ --location=LOCATION \ --principal=User:__AUTH_TOKEN__service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com \ --operation=READ --permission-type=ALLOW --host=*
次のように置き換えます。
CONSUMER_GROUP_ACL_ID(必須): コンシューマー グループの ACL エントリを追加する Managed Service for Apache Kafka ACL リソースの一意の ID。すべてのコンシューマー グループにアクセス権を適用するには、`allConsumerGroups` を使用します。特定のコンシューマー グループの場合は、`consumerGroup/CONSUMER_GROUP_NAME` を使用します。TOPIC_ACL_ID(必須): トピックの ACL エントリを追加する Managed Service for Apache Kafka ACL リソースの一意の ID。すべてのトピックにアクセス権を適用するには、`allTopics` を使用します。特定のトピックの場合は、`topic/TOPIC_NAME` を使用します。CLUSTER_ID(必須): ACL リソースを含むクラスタの ID。LOCATION(必須): クラスタが配置されているリージョン。サポートされているロケーションをご覧ください。PROJECT_NUMBER(必須): クラスタが配置されているプロジェクトのプロジェクト番号。これは、ACL エントリのサービス エージェントのプリンシパル名を構築するために使用されます。
ACL エントリの追加について詳しくは、ACL エントリを追加するをご覧ください。
コンシューマー グループを更新する
始める前にセクションの手順が完了していることを確認します。
コンシューマー グループを更新する手順は次のとおりです。
gcloud
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
gcloud managed-kafka consumer-groups updateコマンドを実行します。gcloud managed-kafka consumer-groups update CONSUMER_GROUP_ID \ --cluster=CLUSTER_ID \ --location=LOCATION \ --topics-file=TOPICS_FILE
次のように置き換えます。
-
CLUSTER_ID: クラスタの ID または名前。
-
LOCATION: クラスタのロケーション。
-
CONSUMER_GROUP_ID: コンシューマー グループの ID または名前。
-
TOPICS_FILE: この設定は、コンシューマー グループ用に更新するトピックの構成を含むファイルの場所を指定します。ファイルは JSON または YAML 形式で作成できます。ファイルパスを指定するか、JSON または YAML のコンテンツを直接含めることができます。
トピック ファイルは、JSON 構造を使用して
ConsumerGroupトピックマップを{ topicName1: {ConsumerPartitionMetadata}, topicName2:{ConsumerPartitionMetadata}}の形式で表します。トピックごとに、ConsumerPartitionMetadataは各パーティションのオフセットとメタデータを提供します。topic1という名前のトピックの単一パーティション(パーティション 0)のオフセットを 10 に設定する場合、JSON 構成は次のようになります。{"topic1": {"partitions": { 0 : { "offset": 10, "metadata": ""}}}}topics.jsonファイルの内容の例を次に示します。{ "projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_NAME/TOPICS/TOPIC_NAME": { "partitions": { "1": { "offset": "1", "metadata": "metadata" }, "2": { "offset": "1", "metadata": "metadata" } } }, "projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_NAME/TOPICS/OTHER_TOPIC_NAME": { "partitions": { "1": { "offset": "1", "metadata": "metadata" } } } }
-
TOPIC_PATH: JSON ファイルまたは YAML ファイルでトピックを指定する場合は、
gcloud managed-kafak topics describeコマンドを実行して取得できる完全なトピックパス(projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_ID/topics/topic形式)を含めます。.
-