Google Cloud Managed Service for Apache Kafka コンシューマー グループを更新して、トピック パーティションのリストのオフセットを変更できます。これにより、グループ内のコンシューマーが受信するメッセージを制御できます。
コンシューマー グループを更新するには、Google Cloud CLI、クライアント ライブラリ、Managed Kafka API、オープンソースの Apache Kafka API を使用します。コンソールは、コンシューマー グループの編集ではサポートされていません。 Google Cloud
始める前に
コンシューマー グループを更新する前に、メッセージをアクティブに消費していないことを確認してください。
コンシューマー
グループがメッセージを一度も消費していない場合、または最後にコミットされたオフセットが offsets.retention.minutes
後に期限切れになった場合、Kafka によって自動的に削除されます。
コンシューマー グループを更新する前に、次の操作を行います。
コンシューマー グループがメッセージを読み取っているトピックにメッセージを送信します。
コンシューマー グループを起動して、いくつかのメッセージを処理します。
すべてのコンシューマーがメッセージを消費しないようにします。コンシューマーを停止するには、 Control+C を押します。
メッセージの送信と消費の詳細については、 Kafka コマンドライン ツールでメッセージを生成して消費するをご覧ください。
コンシューマー グループを更新するために必要なロールと権限
コンシューマー グループを編集するために必要な権限を取得するには、プロジェクトに対するManaged Kafka コンシューマー グループ編集者 (roles/managedkafka.consumerGroupEditor)IAM ロールを付与するよう管理者に依頼してください。ロールの付与については、プロジェクト、フォルダ、組織に対するアクセス権の管理をご覧ください。
この事前定義ロールには コンシューマー グループの編集に必要な権限が含まれています。必要とされる正確な権限については、必要な権限セクションを開いてご確認ください。
必要な権限
コンシューマー グループを編集するには、次の権限が必要です。
-
コンシューマー グループを更新する:
managedkafka.consumerGroups.update
カスタムロールや他の事前定義ロールを使用して、これらの権限を取得することもできます。
サービス エージェントに読み取りアクセス権を付与する
コンシューマー グループのオフセットを更新するには、サービス エージェントがトピックとコンシューマー グループのリソースに対する読み取りオペレーションにアクセスできる必要があります。このアクセスは 、 Apache Kafka ACLで構成されます。
クラスタ内のコンシューマー グループとそのトピックに対して Apache Kafka ACL が構成されていない場合、サービス エージェントはこれらのリソースにアンビエント アクセスできます。このセクションはスキップできます。
クラスタ内のコンシューマー グループとそのトピックに対して Apache Kafka ACL が構成されている場合、サービス エージェントは両方のリソースの読み取りオペレーションに対して明示的な ACL アクセスを必要とします。これを行うには、関連するコンシューマー グループとトピックに対する読み取りオペレーションへのアクセス権をサービス エージェントに付与する ACL エントリを追加します。手順は次のとおりです。
-
Google Cloud CLI をインストールします。
-
外部 ID プロバイダ(IdP)を使用している場合は、まず連携 ID を使用して gcloud CLI にログインする必要があります。
-
gcloud CLI を初期化するには、次のコマンドを実行します:
gcloud init gcloud managed-kafka acls add-acl-entryコマンドを実行します。gcloud managed-kafka acls add-acl-entry CONSUMER_GROUP_ACL_ID \ --cluster=CLUSTER_ID \ --location=LOCATION \ --principal=User:__AUTH_TOKEN__service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com \ --operation=READ --permission-type=ALLOW --host=* gcloud managed-kafka acls add-acl-entry TOPIC_ACL_ID \ --cluster=CLUSTER_ID \ --location=LOCATION \ --principal=User:__AUTH_TOKEN__service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com \ --operation=READ --permission-type=ALLOW --host=*
次のように置き換えます。
CONSUMER_GROUP_ACL_ID(必須): コンシューマー グループの ACL エントリを追加する Managed Service for Apache Kafka ACL リソースの一意の ID。すべてのコンシューマー グループにアクセスを適用するには、`allConsumerGroups` を使用します。特定のコンシューマー グループの場合は、`consumerGroup/CONSUMER_GROUP_NAME` を使用します。TOPIC_ACL_ID(必須): トピックの ACL エントリを追加する Managed Service for Apache Kafka ACL リソースの一意の ID。すべてのトピックにアクセスを適用するには、`allTopics` を使用します。特定のトピックの場合は、`topic/TOPIC_NAME` を使用します。CLUSTER_ID(必須): ACL リソースを含むクラスタの ID。LOCATION(必須): クラスタが配置されているリージョン。サポートされているロケーションをご覧ください。PROJECT_NUMBER(必須): クラスタが配置されているプロジェクトのプロジェクト番号。これは、ACL エントリのサービス エージェントのプリンシパル 名を構築するために使用されます。
ACL エントリの追加の詳細については、ACL エントリ を追加するをご覧ください。
コンシューマー グループを更新する
コンシューマー グループを更新する手順は次のとおりです。
gcloud
-
コンソールで Cloud Shell をアクティブにします。 Google Cloud
コンソールの下部にある Google Cloud Cloud Shell セッションが開始し、コマンドライン プロンプトが表示されます。Cloud Shell はシェル環境です 。Google Cloud CLI がすでにインストールされており、現在のプロジェクトの値もすでに設定されています 。セッションが初期化されるまで数秒かかることがあります。
gcloud managed-kafka consumer-groups updateコマンドを実行します。gcloud managed-kafka consumer-groups update CONSUMER_GROUP_ID \ --cluster=CLUSTER_ID \ --location=LOCATION \ --topics-file=TOPICS_FILE
次のように置き換えます。
-
CLUSTER_ID: クラスタの ID または名前。
-
LOCATION: クラスタのロケーション。
-
CONSUMER_GROUP_ID: コンシューマー グループの ID または名前。
-
TOPICS_FILE: この設定では、コンシューマー グループ用に更新するトピックの構成を含むファイル の場所を指定します。 ファイルは JSON または YAML 形式で指定できます。ファイルパスを指定することも、JSON または YAML コンテンツを直接含めることもできます。
トピック ファイルは、JSON 構造を使用して
ConsumerGroupトピック マップを表します。形式は{ topicName1: {ConsumerPartitionMetadata}, topicName2:{ConsumerPartitionMetadata}}です。 各トピックについて、ConsumerPartitionMetadataは各パーティションのオフセットとメタデータを提供します。topic1という名前のトピックの単一 パーティション(パーティション 0)のオフセットを 10 に設定する場合、 JSON 構成は次のようになります。{"topic1": {"partitions": { 0 : { "offset": 10, "metadata": ""}}}}以下に、
topics.jsonファイルの内容の例を示します。{ "projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_NAME/TOPICS/TOPIC_NAME": { "partitions": { "1": { "offset": "1", "metadata": "metadata" }, "2": { "offset": "1", "metadata": "metadata" } } }, "projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_NAME/TOPICS/OTHER_TOPIC_NAME": { "partitions": { "1": { "offset": "1", "metadata": "metadata" } } } }
-
TOPIC_PATH: JSON または YAML ファイルでトピックを指定する場合は、 完全なトピックパスを含めます。これは、
gcloud managed-kafak topics describeコマンドを実行して取得できます。形式はprojects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_ID/topics/topicです。 。
-