Apache Kafka용 Google Cloud 관리형 서비스 소비자 그룹 업데이트

Apache Kafka용 Google Cloud 관리형 서비스 소비자 그룹을 업데이트하여 주제 파티션 목록의 오프셋을 수정할 수 있습니다. 이렇게 하면 그룹의 소비자가 수신하는 메시지를 제어할 수 있습니다.

소비자 그룹을 업데이트하려면 Google Cloud CLI, 클라이언트 라이브러리, 관리형 Kafka API 또는 오픈소스 Apache Kafka API를 사용하면 됩니다. Google Cloud 콘솔은 소비자 그룹 편집을 지원하지 않습니다.

시작하기 전에

컨슈머 그룹을 업데이트하려면 먼저 메시지를 활발하게 소비하지 않는지 확인하세요. 컨슈머 그룹은 메시지를 소비한 적이 없거나 마지막 커밋된 오프셋이 offsets.retention.minutes 이후 만료된 경우 Kafka에 의해 자동으로 삭제됩니다.

소비자 그룹을 업데이트하기 전에 다음 단계를 따르세요.

  1. 소비자 그룹이 메시지를 읽고 있는 주제에 메시지를 보냅니다.

  2. 소비자 그룹을 시작하여 몇 개의 메시지를 처리합니다.

  3. 모든 소비자가 메시지를 소비하지 못하도록 합니다. 컨슈머를 중지하려면 Control+C를 누릅니다.

메시지 전송 및 소비에 대한 자세한 내용은 Kafka 명령줄 도구로 메시지 생성 및 소비를 참고하세요.

소비자 그룹을 업데이트하는 데 필요한 역할 및 권한

소비자 그룹을 수정하는 데 필요한 권한을 얻으려면 관리자에게 프로젝트에 대한 관리형 Kafka 소비자 그룹 편집자 (roles/managedkafka.consumerGroupEditor) IAM 역할을 부여해 달라고 요청하세요. 역할 부여에 대한 자세한 내용은 프로젝트, 폴더, 조직에 대한 액세스 관리를 참조하세요.

이 사전 정의된 역할에는 소비자 그룹을 수정하는 데 필요한 권한이 포함되어 있습니다. 필요한 정확한 권한을 보려면 필수 권한 섹션을 펼치세요.

필수 권한

소비자 그룹을 수정하려면 다음 권한이 필요합니다.

  • 컨슈머 그룹을 업데이트합니다. managedkafka.consumerGroups.update

커스텀 역할이나 다른 사전 정의된 역할을 사용하여 이 권한을 부여받을 수도 있습니다.

관리형 Kafka 소비자 그룹 편집자 역할에 대한 자세한 내용은 Apache Kafka용 관리형 서비스 사전 정의된 역할을 참고하세요.

서비스 에이전트에 읽기 액세스 권한 부여

소비자 그룹 오프셋을 업데이트하려면 서비스 에이전트가 주제 및 소비자 그룹 리소스에 대한 읽기 작업에 액세스해야 합니다. 이 액세스는 Apache Kafka ACL로 구성됩니다.

클러스터 내의 소비자 그룹과 해당 주제에 대해 Apache Kafka ACL을 구성하지 않은 경우 서비스 에이전트는 이러한 리소스에 대한 주변 액세스 권한을 갖습니다. 이 섹션은 건너뛰어도 됩니다.

클러스터 내에서 소비자 그룹과 해당 주제에 Apache Kafka ACL이 구성된 경우 서비스 에이전트에는 두 리소스 모두에 대한 읽기 작업에 명시적인 ACL 액세스가 필요합니다. 이렇게 하려면 관련 소비자 그룹 및 주제에 대한 읽기 작업에 서비스 에이전트 액세스 권한을 부여하는 ACL 항목을 추가합니다. 다음 단계를 따르세요.

  1. Install the Google Cloud CLI.

  2. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. gcloud managed-kafka acls add-acl-entry 명령어를 실행합니다.

    gcloud managed-kafka acls add-acl-entry CONSUMER_GROUP_ACL_ID \
      --cluster=CLUSTER_ID \
      --location=LOCATION \
      --principal=User:__AUTH_TOKEN__service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com \
      --operation=READ
      --permission-type=ALLOW
      --host=*
    
    
      gcloud managed-kafka acls add-acl-entry TOPIC_ACL_ID \
      --cluster=CLUSTER_ID \
      --location=LOCATION \
      --principal=User:__AUTH_TOKEN__service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com \
      --operation=READ
      --permission-type=ALLOW
      --host=*

    다음을 바꿉니다.

    • CONSUMER_GROUP_ACL_ID (필수): 소비자 그룹의 ACL 항목을 추가하려는 Apache Kafka용 관리형 서비스 ACL 리소스의 고유 ID입니다. 모든 소비자 그룹에 액세스 권한을 적용하려면 `allConsumerGroups`를 사용합니다. 특정 소비자 그룹의 경우 `consumerGroup/CONSUMER_GROUP_NAME`을 사용합니다.
    • TOPIC_ACL_ID (필수): 주제의 ACL 항목을 추가하려는 Apache Kafka용 관리형 서비스 ACL 리소스의 고유 ID입니다. 모든 주제에 대한 액세스 권한을 적용하려면 `allTopics`를 사용하고 특정 주제에 대한 액세스 권한을 적용하려면 `topic/TOPIC_NAME`을 사용합니다.
    • CLUSTER_ID (필수): ACL 리소스가 포함된 클러스터의 ID입니다.
    • LOCATION (필수): 클러스터가 있는 리전입니다. 지원되는 위치를 참고하세요.
    • PROJECT_NUMBER (필수): 클러스터가 있는 프로젝트의 프로젝트 번호입니다. 이는 ACL 항목의 서비스 에이전트의 주 구성원 이름을 빌드하는 데 사용됩니다.

ACL 항목 추가에 대한 자세한 내용은 ACL 항목 추가를 참고하세요.

소비자 그룹 업데이트

시작하기 전에 섹션의 단계를 완료했는지 확인합니다.

소비자 그룹을 업데이트하려면 다음 단계를 따르세요.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. gcloud managed-kafka consumer-groups update 명령어를 실행합니다.

    gcloud managed-kafka consumer-groups update CONSUMER_GROUP_ID \
        --cluster=CLUSTER_ID \
        --location=LOCATION \
        --topics-file=TOPICS_FILE

    다음을 바꿉니다.

    • CLUSTER_ID: 클러스터의 ID 또는 이름입니다.

    • LOCATION: 클러스터의 위치입니다.

    • CONSUMER_GROUP_ID: 소비자 그룹의 ID 또는 이름입니다.

    • TOPICS_FILE: 이 설정은 컨슈머 그룹에 대해 업데이트할 주제의 구성이 포함된 파일의 위치를 지정합니다. 파일은 JSON 또는 YAML 형식일 수 있습니다. 파일 경로일 수도 있고 JSON 또는 YAML 콘텐츠를 직접 포함할 수도 있습니다.

      주제 파일은 JSON 구조를 사용하여 ConsumerGroup 주제 지도를 { topicName1: {ConsumerPartitionMetadata}, topicName2:{ConsumerPartitionMetadata}} 형식으로 나타냅니다. 각 주제에 대해 ConsumerPartitionMetadata는 각 파티션의 오프셋과 메타데이터를 제공합니다.

      topic1이라는 주제에서 단일 파티션 (파티션 0)의 오프셋을 10으로 설정하려면 JSON 구성은 다음과 같습니다.{"topic1": {"partitions": { 0 : { "offset": 10, "metadata": ""}}}}

      다음은 topics.json 파일의 콘텐츠 예시입니다.

      {
        "projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_NAME/TOPICS/TOPIC_NAME": {
          "partitions": {
            "1": {
              "offset": "1",
              "metadata": "metadata"
            },
            "2": {
              "offset": "1",
              "metadata": "metadata"
            }
          }
        },
        "projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_NAME/TOPICS/OTHER_TOPIC_NAME": {
          "partitions": {
            "1": {
              "offset": "1",
              "metadata": "metadata"
            }
          }
        }
      }

    • TOPIC_PATH: JSON 또는 YAML 파일에서 주제를 지정할 때는 gcloud managed-kafak topics describe 명령어를 실행하여 가져올 수 있고 projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_ID/topics/topic 형식인 전체 주제 경로를 포함하세요. .

  3. Go

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    	"google.golang.org/protobuf/types/known/fieldmaskpb"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func updateConsumerGroup(w io.Writer, projectID, region, clusterID, consumerGroupID, topicPath string, partitionOffsets map[int32]int64, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// clusterID := "my-cluster"
    	// consumerGroupID := "my-consumer-group"
    	// topicPath := "my-topic-path"
    	// partitionOffsets := map[int32]int64{1: 10, 2: 20, 3: 30}
    	ctx := context.Background()
    	client, err := managedkafka.NewClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
    	}
    	defer client.Close()
    
    	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
    	consumerGroupPath := fmt.Sprintf("%s/consumerGroups/%s", clusterPath, consumerGroupID)
    
    	partitionMetadata := make(map[int32]*managedkafkapb.ConsumerPartitionMetadata)
    	for partition, offset := range partitionOffsets {
    		partitionMetadata[partition] = &managedkafkapb.ConsumerPartitionMetadata{
    			Offset: offset,
    		}
    	}
    	topicConfig := map[string]*managedkafkapb.ConsumerTopicMetadata{
    		topicPath: {
    			Partitions: partitionMetadata,
    		},
    	}
    	consumerGroupConfig := managedkafkapb.ConsumerGroup{
    		Name:   consumerGroupPath,
    		Topics: topicConfig,
    	}
    	paths := []string{"topics"}
    	updateMask := &fieldmaskpb.FieldMask{
    		Paths: paths,
    	}
    
    	req := &managedkafkapb.UpdateConsumerGroupRequest{
    		UpdateMask:    updateMask,
    		ConsumerGroup: &consumerGroupConfig,
    	}
    	consumerGroup, err := client.UpdateConsumerGroup(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.UpdateConsumerGroup got err: %w", err)
    	}
    	fmt.Fprintf(w, "Updated consumer group: %#v\n", consumerGroup)
    	return nil
    }
    

    Java

    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ConsumerGroup;
    import com.google.cloud.managedkafka.v1.ConsumerGroupName;
    import com.google.cloud.managedkafka.v1.ConsumerPartitionMetadata;
    import com.google.cloud.managedkafka.v1.ConsumerTopicMetadata;
    import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
    import com.google.cloud.managedkafka.v1.TopicName;
    import com.google.cloud.managedkafka.v1.UpdateConsumerGroupRequest;
    import com.google.protobuf.FieldMask;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    public class UpdateConsumerGroup {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-cluster";
        String topicId = "my-topic";
        String consumerGroupId = "my-consumer-group";
        Map<Integer, Integer> partitionOffsets =
            new HashMap<Integer, Integer>() {
              {
                put(1, 10);
                put(2, 20);
                put(3, 30);
              }
            };
        updateConsumerGroup(projectId, region, clusterId, topicId, consumerGroupId, partitionOffsets);
      }
    
      public static void updateConsumerGroup(
          String projectId,
          String region,
          String clusterId,
          String topicId,
          String consumerGroupId,
          Map<Integer, Integer> partitionOffsets)
          throws Exception {
        TopicName topicName = TopicName.of(projectId, region, clusterId, topicId);
        ConsumerGroupName consumerGroupName =
            ConsumerGroupName.of(projectId, region, clusterId, consumerGroupId);
    
        Map<Integer, ConsumerPartitionMetadata> partitions =
            new HashMap<Integer, ConsumerPartitionMetadata>() {
              {
                for (Entry<Integer, Integer> partitionOffset : partitionOffsets.entrySet()) {
                  ConsumerPartitionMetadata partitionMetadata =
                      ConsumerPartitionMetadata.newBuilder()
                          .setOffset(partitionOffset.getValue())
                          .build();
                  put(partitionOffset.getKey(), partitionMetadata);
                }
              }
            };
        ConsumerTopicMetadata topicMetadata =
            ConsumerTopicMetadata.newBuilder().putAllPartitions(partitions).build();
        ConsumerGroup consumerGroup =
            ConsumerGroup.newBuilder()
                .setName(consumerGroupName.toString())
                .putTopics(topicName.toString(), topicMetadata)
                .build();
        FieldMask updateMask = FieldMask.newBuilder().addPaths("topics").build();
    
        try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) {
          UpdateConsumerGroupRequest request =
              UpdateConsumerGroupRequest.newBuilder()
                  .setUpdateMask(updateMask)
                  .setConsumerGroup(consumerGroup)
                  .build();
          // This operation is being handled synchronously.
          ConsumerGroup response = managedKafkaClient.updateConsumerGroup(request);
          System.out.printf("Updated consumer group: %s\n", response.getName());
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaClient.updateConsumerGroup got err: %s", e.getMessage());
        }
      }
    }
    

    Python

    from google.api_core.exceptions import NotFound
    from google.cloud import managedkafka_v1
    from google.protobuf import field_mask_pb2
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # cluster_id = "my-cluster"
    # consumer_group_id = "my-consumer-group"
    # topic_path = "my-topic-path"
    # partition_offsets = {10: 10}
    
    client = managedkafka_v1.ManagedKafkaClient()
    
    consumer_group = managedkafka_v1.ConsumerGroup()
    consumer_group.name = client.consumer_group_path(
        project_id, region, cluster_id, consumer_group_id
    )
    
    topic_metadata = managedkafka_v1.ConsumerTopicMetadata()
    for partition, offset in partition_offsets.items():
        partition_metadata = managedkafka_v1.ConsumerPartitionMetadata(offset=offset)
        topic_metadata.partitions[partition] = partition_metadata
    consumer_group.topics = {
        topic_path: topic_metadata,
    }
    
    update_mask = field_mask_pb2.FieldMask()
    update_mask.paths.append("topics")
    
    request = managedkafka_v1.UpdateConsumerGroupRequest(
        update_mask=update_mask,
        consumer_group=consumer_group,
    )
    
    try:
        response = client.update_consumer_group(request=request)
        print("Updated consumer group:", response)
    except NotFound as e:
        print(f"Failed to update consumer group {consumer_group_id} with error: {e.message}")
    

다음 단계

Apache Kafka®는 미국 및/또는 다른 국가에서 사용되는 Apache Software Foundation 또는 해당 계열사의 등록 상표입니다.