Google Cloud Managed Service for Apache Kafka コンシューマー グループを更新する

Google Cloud Managed Service for Apache Kafka コンシューマー グループを更新して、トピック パーティションのリストのオフセットを変更できます。これにより、グループ内のコンシューマーが受信するメッセージを制御できます。

コンシューマー グループを更新するには、Google Cloud CLI、クライアント ライブラリ、Managed Kafka API、オープンソースの Apache Kafka API を使用します。コンソールは、コンシューマー グループの編集ではサポートされていません。 Google Cloud

始める前に

コンシューマー グループを更新する前に、メッセージをアクティブに消費していないことを確認してください。 コンシューマー グループがメッセージを一度も消費していない場合、または最後にコミットされたオフセットが offsets.retention.minutes 後に期限切れになった場合、Kafka によって自動的に削除されます。

コンシューマー グループを更新する前に、次の操作を行います。

  1. コンシューマー グループがメッセージを読み取っているトピックにメッセージを送信します。

  2. コンシューマー グループを起動して、いくつかのメッセージを処理します。

  3. すべてのコンシューマーがメッセージを消費しないようにします。コンシューマーを停止するには、 Control+C を押します。

メッセージの送信と消費の詳細については、 Kafka コマンドライン ツールでメッセージを生成して消費するをご覧ください。

コンシューマー グループを更新するために必要なロールと権限

コンシューマー グループを編集するために必要な権限を取得するには、プロジェクトに対するManaged Kafka コンシューマー グループ編集者 roles/managedkafka.consumerGroupEditor)IAM ロールを付与するよう管理者に依頼してください。ロールの付与については、プロジェクト、フォルダ、組織に対するアクセス権の管理をご覧ください。

この事前定義ロールには コンシューマー グループの編集に必要な権限が含まれています。必要とされる正確な権限については、必要な権限セクションを開いてご確認ください。

必要な権限

コンシューマー グループを編集するには、次の権限が必要です。

  • コンシューマー グループを更新する: managedkafka.consumerGroups.update

カスタムロールや他の事前定義ロールを使用して、これらの権限を取得することもできます。

サービス エージェントに読み取りアクセス権を付与する

コンシューマー グループのオフセットを更新するには、サービス エージェントがトピックとコンシューマー グループのリソースに対する読み取りオペレーションにアクセスできる必要があります。このアクセスは 、 Apache Kafka ACLで構成されます。

クラスタ内のコンシューマー グループとそのトピックに対して Apache Kafka ACL が構成されていない場合、サービス エージェントはこれらのリソースにアンビエント アクセスできます。このセクションはスキップできます。

クラスタ内のコンシューマー グループとそのトピックに対して Apache Kafka ACL が構成されている場合、サービス エージェントは両方のリソースの読み取りオペレーションに対して明示的な ACL アクセスを必要とします。これを行うには、関連するコンシューマー グループとトピックに対する読み取りオペレーションへのアクセス権をサービス エージェントに付与する ACL エントリを追加します。手順は次のとおりです。

  1. Google Cloud CLI をインストールします。

  2. 外部 ID プロバイダ(IdP)を使用している場合は、まず連携 ID を使用して gcloud CLI にログインする必要があります。

  3. gcloud CLI を初期化するには、次のコマンドを実行します:

    gcloud init
  4. gcloud managed-kafka acls add-acl-entry コマンドを実行します。

    gcloud managed-kafka acls add-acl-entry CONSUMER_GROUP_ACL_ID \
      --cluster=CLUSTER_ID \
      --location=LOCATION \
      --principal=User:__AUTH_TOKEN__service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com \
      --operation=READ
      --permission-type=ALLOW
      --host=*
    
    
      gcloud managed-kafka acls add-acl-entry TOPIC_ACL_ID \
      --cluster=CLUSTER_ID \
      --location=LOCATION \
      --principal=User:__AUTH_TOKEN__service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com \
      --operation=READ
      --permission-type=ALLOW
      --host=*

    次のように置き換えます。

    • CONSUMER_GROUP_ACL_ID(必須): コンシューマー グループの ACL エントリを追加する Managed Service for Apache Kafka ACL リソースの一意の ID。すべてのコンシューマー グループにアクセスを適用するには、`allConsumerGroups` を使用します。特定のコンシューマー グループの場合は、`consumerGroup/CONSUMER_GROUP_NAME` を使用します。
    • TOPIC_ACL_ID(必須): トピックの ACL エントリを追加する Managed Service for Apache Kafka ACL リソースの一意の ID。すべてのトピックにアクセスを適用するには、`allTopics` を使用します。特定のトピックの場合は、`topic/TOPIC_NAME` を使用します。
    • CLUSTER_ID(必須): ACL リソースを含むクラスタの ID。
    • LOCATION(必須): クラスタが配置されているリージョン。サポートされているロケーションをご覧ください。
    • PROJECT_NUMBER(必須): クラスタが配置されているプロジェクトのプロジェクト番号。これは、ACL エントリのサービス エージェントのプリンシパル 名を構築するために使用されます。

ACL エントリの追加の詳細については、ACL エントリ を追加するをご覧ください。

コンシューマー グループを更新する

始める前にの手順を完了していることを確認してください。

コンシューマー グループを更新する手順は次のとおりです。

gcloud

  1. コンソールで Cloud Shell をアクティブにします。 Google Cloud

    Cloud Shell をアクティブにする

    コンソールの下部にある Google Cloud Cloud Shell セッションが開始し、コマンドライン プロンプトが表示されます。Cloud Shell はシェル環境です 。Google Cloud CLI がすでにインストールされており、現在のプロジェクトの値もすでに設定されています 。セッションが初期化されるまで数秒かかることがあります。

  2. gcloud managed-kafka consumer-groups update コマンドを実行します。

    gcloud managed-kafka consumer-groups update CONSUMER_GROUP_ID \
        --cluster=CLUSTER_ID \
        --location=LOCATION \
        --topics-file=TOPICS_FILE

    次のように置き換えます。

    • CLUSTER_ID: クラスタの ID または名前。

    • LOCATION: クラスタのロケーション。

    • CONSUMER_GROUP_ID: コンシューマー グループの ID または名前。

    • TOPICS_FILE: この設定では、コンシューマー グループ用に更新するトピックの構成を含むファイル の場所を指定します。 ファイルは JSON または YAML 形式で指定できます。ファイルパスを指定することも、JSON または YAML コンテンツを直接含めることもできます。

      トピック ファイルは、JSON 構造を使用して ConsumerGroup トピック マップを表します。形式は { topicName1: {ConsumerPartitionMetadata}, topicName2:{ConsumerPartitionMetadata}} です。 各トピックについて、ConsumerPartitionMetadata は各パーティションのオフセットとメタデータを提供します。

      topic1 という名前のトピックの単一 パーティション(パーティション 0)のオフセットを 10 に設定する場合、 JSON 構成は次のようになります。{"topic1": {"partitions": { 0 : { "offset": 10, "metadata": ""}}}}

      以下に、topics.json ファイルの内容の例を示します。

      {
        "projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_NAME/TOPICS/TOPIC_NAME": {
          "partitions": {
            "1": {
              "offset": "1",
              "metadata": "metadata"
            },
            "2": {
              "offset": "1",
              "metadata": "metadata"
            }
          }
        },
        "projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_NAME/TOPICS/OTHER_TOPIC_NAME": {
          "partitions": {
            "1": {
              "offset": "1",
              "metadata": "metadata"
            }
          }
        }
      }

    • TOPIC_PATH: JSON または YAML ファイルでトピックを指定する場合は、 完全なトピックパスを含めます。これは、gcloud managed-kafak topics describe コマンドを実行して取得できます。形式は projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_ID/topics/topic です。 。

Go

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/option"
	"google.golang.org/protobuf/types/known/fieldmaskpb"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
)

func updateConsumerGroup(w io.Writer, projectID, region, clusterID, consumerGroupID, topicPath string, partitionOffsets map[int32]int64, opts ...option.ClientOption) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// clusterID := "my-cluster"
	// consumerGroupID := "my-consumer-group"
	// topicPath := "my-topic-path"
	// partitionOffsets := map[int32]int64{1: 10, 2: 20, 3: 30}
	ctx := context.Background()
	client, err := managedkafka.NewClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
	}
	defer client.Close()

	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
	consumerGroupPath := fmt.Sprintf("%s/consumerGroups/%s", clusterPath, consumerGroupID)

	partitionMetadata := make(map[int32]*managedkafkapb.ConsumerPartitionMetadata)
	for partition, offset := range partitionOffsets {
		partitionMetadata[partition] = &managedkafkapb.ConsumerPartitionMetadata{
			Offset: offset,
		}
	}
	topicConfig := map[string]*managedkafkapb.ConsumerTopicMetadata{
		topicPath: {
			Partitions: partitionMetadata,
		},
	}
	consumerGroupConfig := managedkafkapb.ConsumerGroup{
		Name:   consumerGroupPath,
		Topics: topicConfig,
	}
	paths := []string{"topics"}
	updateMask := &fieldmaskpb.FieldMask{
		Paths: paths,
	}

	req := &managedkafkapb.UpdateConsumerGroupRequest{
		UpdateMask:    updateMask,
		ConsumerGroup: &consumerGroupConfig,
	}
	consumerGroup, err := client.UpdateConsumerGroup(ctx, req)
	if err != nil {
		return fmt.Errorf("client.UpdateConsumerGroup got err: %w", err)
	}
	fmt.Fprintf(w, "Updated consumer group: %#v\n", consumerGroup)
	return nil
}

Java

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.managedkafka.v1.ConsumerGroup;
import com.google.cloud.managedkafka.v1.ConsumerGroupName;
import com.google.cloud.managedkafka.v1.ConsumerPartitionMetadata;
import com.google.cloud.managedkafka.v1.ConsumerTopicMetadata;
import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
import com.google.cloud.managedkafka.v1.TopicName;
import com.google.cloud.managedkafka.v1.UpdateConsumerGroupRequest;
import com.google.protobuf.FieldMask;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class UpdateConsumerGroup {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String clusterId = "my-cluster";
    String topicId = "my-topic";
    String consumerGroupId = "my-consumer-group";
    Map<Integer, Integer> partitionOffsets =
        new HashMap<Integer, Integer>() {
          {
            put(1, 10);
            put(2, 20);
            put(3, 30);
          }
        };
    updateConsumerGroup(projectId, region, clusterId, topicId, consumerGroupId, partitionOffsets);
  }

  public static void updateConsumerGroup(
      String projectId,
      String region,
      String clusterId,
      String topicId,
      String consumerGroupId,
      Map<Integer, Integer> partitionOffsets)
      throws Exception {
    TopicName topicName = TopicName.of(projectId, region, clusterId, topicId);
    ConsumerGroupName consumerGroupName =
        ConsumerGroupName.of(projectId, region, clusterId, consumerGroupId);

    Map<Integer, ConsumerPartitionMetadata> partitions =
        new HashMap<Integer, ConsumerPartitionMetadata>() {
          {
            for (Entry<Integer, Integer> partitionOffset : partitionOffsets.entrySet()) {
              ConsumerPartitionMetadata partitionMetadata =
                  ConsumerPartitionMetadata.newBuilder()
                      .setOffset(partitionOffset.getValue())
                      .build();
              put(partitionOffset.getKey(), partitionMetadata);
            }
          }
        };
    ConsumerTopicMetadata topicMetadata =
        ConsumerTopicMetadata.newBuilder().putAllPartitions(partitions).build();
    ConsumerGroup consumerGroup =
        ConsumerGroup.newBuilder()
            .setName(consumerGroupName.toString())
            .putTopics(topicName.toString(), topicMetadata)
            .build();
    FieldMask updateMask = FieldMask.newBuilder().addPaths("topics").build();

    try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) {
      UpdateConsumerGroupRequest request =
          UpdateConsumerGroupRequest.newBuilder()
              .setUpdateMask(updateMask)
              .setConsumerGroup(consumerGroup)
              .build();
      // This operation is being handled synchronously.
      ConsumerGroup response = managedKafkaClient.updateConsumerGroup(request);
      System.out.printf("Updated consumer group: %s\n", response.getName());
    } catch (IOException | ApiException e) {
      System.err.printf("managedKafkaClient.updateConsumerGroup got err: %s", e.getMessage());
    }
  }
}

Python

from google.api_core.exceptions import NotFound
from google.cloud import managedkafka_v1
from google.protobuf import field_mask_pb2

# TODO(developer)
# project_id = "my-project-id"
# region = "us-central1"
# cluster_id = "my-cluster"
# consumer_group_id = "my-consumer-group"
# topic_path = "my-topic-path"
# partition_offsets = {10: 10}

client = managedkafka_v1.ManagedKafkaClient()

consumer_group = managedkafka_v1.ConsumerGroup()
consumer_group.name = client.consumer_group_path(
    project_id, region, cluster_id, consumer_group_id
)

topic_metadata = managedkafka_v1.ConsumerTopicMetadata()
for partition, offset in partition_offsets.items():
    partition_metadata = managedkafka_v1.ConsumerPartitionMetadata(offset=offset)
    topic_metadata.partitions[partition] = partition_metadata
consumer_group.topics = {
    topic_path: topic_metadata,
}

update_mask = field_mask_pb2.FieldMask()
update_mask.paths.append("topics")

request = managedkafka_v1.UpdateConsumerGroupRequest(
    update_mask=update_mask,
    consumer_group=consumer_group,
)

try:
    response = client.update_consumer_group(request=request)
    print("Updated consumer group:", response)
except NotFound as e:
    print(f"Failed to update consumer group {consumer_group_id} with error: {e.message}")

次のステップ

Apache Kafka® は、Apache Software Foundation または米国その他の諸国における関連会社の商標です。