Apache Kafka용 Google Cloud 관리형 서비스 소비자 그룹 업데이트

Google Cloud Managed Service for Apache Kafka 소비자 그룹을 업데이트하여 주제 파티션 목록의 오프셋을 수정할 수 있습니다. 이렇게 하면 그룹의 소비자가 수신하는 메시지를 제어할 수 있습니다.

소비자 그룹을 업데이트하려면 Google Cloud CLI, 클라이언트 라이브러리, 관리형 Kafka API 또는 오픈소스 Apache Kafka API를 사용하면 됩니다. Google Cloud 콘솔은 소비자 그룹 편집을 지원하지 않습니다.

시작하기 전에

컨슈머 그룹을 업데이트하려면 먼저 메시지를 활성 상태로 소비하지 않는지 확인하세요. 컨슈머 그룹은 메시지를 소비한 적이 없거나 마지막 커밋된 오프셋이 offsets.retention.minutes 후에 만료된 경우 Kafka에 의해 자동으로 삭제됩니다.

소비자 그룹을 업데이트하기 전에 다음 단계를 따르세요.

  1. 소비자 그룹이 메시지를 읽고 있는 주제에 메시지를 보냅니다.

  2. 소비자 그룹을 시작하여 메시지를 몇 개 처리합니다.

  3. 모든 소비자가 메시지를 소비하지 못하도록 합니다. 컨슈머를 중지하려면 Control+C를 누릅니다.

메시지 전송 및 소비에 대한 자세한 내용은 Kafka 명령줄 도구로 메시지 생성 및 소비를 참고하세요.

컨슈머 그룹을 업데이트하는 데 필요한 역할 및 권한

소비자 그룹을 수정하는 데 필요한 권한을 얻으려면 관리자에게 프로젝트에 대한 관리형 Kafka 소비자 그룹 편집자 (roles/managedkafka.consumerGroupEditor) IAM 역할을 부여해 달라고 요청하세요. 역할 부여에 대한 자세한 내용은 프로젝트, 폴더, 조직에 대한 액세스 관리를 참조하세요.

이 사전 정의된 역할에는 소비자 그룹을 수정하는 데 필요한 권한이 포함되어 있습니다. 필요한 정확한 권한을 보려면 필수 권한 섹션을 펼치세요.

필수 권한

소비자 그룹을 수정하려면 다음 권한이 필요합니다.

  • 컨슈머 그룹을 업데이트합니다. managedkafka.consumerGroups.update

커스텀 역할이나 다른 사전 정의된 역할을 사용하여 이 권한을 부여받을 수도 있습니다.

관리형 Kafka 소비자 그룹 편집자 역할에 대한 자세한 내용은 Managed Service for Apache Kafka 사전 정의된 역할을 참고하세요.

서비스 에이전트에 읽기 액세스 권한 부여

소비자 그룹 오프셋을 업데이트하려면 서비스 에이전트가 주제 및 소비자 그룹 리소스에 대한 읽기 작업에 액세스해야 합니다. 이 액세스는 Apache Kafka ACL로 구성됩니다.

클러스터 내의 소비자 그룹과 해당 주제에 대해 Apache Kafka ACL을 구성하지 않은 경우 서비스 에이전트는 이러한 리소스에 대한 주변 액세스 권한을 갖습니다. 이 섹션은 건너뛰어도 됩니다.

클러스터 내의 소비자 그룹과 해당 주제에 Apache Kafka ACL이 구성된 경우 서비스 에이전트에는 두 리소스 모두에 대한 READ 작업에 대한 명시적 ACL 액세스가 필요합니다. 이렇게 하려면 관련 소비자 그룹 및 주제에 대한 읽기 작업에 서비스 에이전트 액세스 권한을 부여하는 ACL 항목을 추가합니다. 다음 단계를 따르세요.

  1. Google Cloud CLI를 설치합니다.

  2. 외부 ID 공급업체(IdP)를 사용하는 경우 먼저 제휴 ID로 gcloud CLI에 로그인해야 합니다.

  3. gcloud CLI를 초기화하려면, 다음 명령어를 실행합니다.

    gcloud init
  4. gcloud managed-kafka acls add-acl-entry 명령어를 실행합니다.

    gcloud managed-kafka acls add-acl-entry CONSUMER_GROUP_ACL_ID \
      --cluster=CLUSTER_ID \
      --location=LOCATION \
      --principal=User:__AUTH_TOKEN__service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com \
      --operation=READ
      --permission-type=ALLOW
      --host=*
    
    
      gcloud managed-kafka acls add-acl-entry TOPIC_ACL_ID \
      --cluster=CLUSTER_ID \
      --location=LOCATION \
      --principal=User:__AUTH_TOKEN__service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com \
      --operation=READ
      --permission-type=ALLOW
      --host=*

    다음을 바꿉니다.

    • CONSUMER_GROUP_ACL_ID (필수): 소비자 그룹의 ACL 항목을 추가하려는 Managed Service for Apache Kafka ACL 리소스의 고유 ID입니다. 모든 소비자 그룹에 액세스 권한을 적용하려면 `allConsumerGroups`를 사용합니다. 특정 소비자 그룹의 경우 `consumerGroup/CONSUMER_GROUP_NAME`을 사용합니다.
    • TOPIC_ACL_ID (필수): 주제의 ACL 항목을 추가하려는 Managed Service for Apache Kafka ACL 리소스의 고유 ID입니다. 모든 주제에 대한 액세스 권한을 적용하려면 `allTopics`를 사용하고 특정 주제의 경우 `topic/TOPIC_NAME`을 사용합니다.
    • CLUSTER_ID (필수): ACL 리소스가 포함된 클러스터의 ID입니다.
    • LOCATION (필수): 클러스터가 있는 리전입니다. 지원되는 위치를 참고하세요.
    • PROJECT_NUMBER (필수): 클러스터가 있는 프로젝트의 프로젝트 번호입니다. 이는 ACL 항목의 서비스 에이전트의 주 구성원 이름을 빌드하는 데 사용됩니다.

ACL 항목 추가에 대한 자세한 내용은 ACL 항목 추가를 참고하세요.

소비자 그룹 업데이트

시작하기 전에 섹션의 단계를 완료했는지 확인합니다.

컨슈머 그룹을 업데이트하려면 다음 단계를 따르세요.

gcloud

  1. Google Cloud 콘솔에서 Cloud Shell을 활성화합니다.

    Cloud Shell 활성화

    Google Cloud 콘솔 하단에 Cloud Shell 세션이 시작되고 명령줄 프롬프트가 표시됩니다. Cloud Shell은 Google Cloud CLI가 사전 설치된 셸 환경으로, 현재 프로젝트의 값이 이미 설정되어 있습니다. 세션이 초기화되는 데 몇 초 정도 걸릴 수 있습니다.

  2. gcloud managed-kafka consumer-groups update 명령어를 실행합니다.

    gcloud managed-kafka consumer-groups update CONSUMER_GROUP_ID \
        --cluster=CLUSTER_ID \
        --location=LOCATION \
        --topics-file=TOPICS_FILE

    다음을 바꿉니다.

    • CLUSTER_ID: 클러스터의 ID 또는 이름입니다.

    • LOCATION: 클러스터의 위치입니다.

    • CONSUMER_GROUP_ID: 소비자 그룹의 ID 또는 이름입니다.

    • TOPICS_FILE: 이 설정은 컨슈머 그룹에 대해 업데이트할 주제의 구성이 포함된 파일의 위치를 지정합니다. 파일은 JSON 또는 YAML 형식일 수 있습니다. 파일 경로일 수도 있고 JSON 또는 YAML 콘텐츠를 직접 포함할 수도 있습니다.

      주제 파일은 JSON 구조를 사용하여 ConsumerGroup 주제 지도를 { topicName1: {ConsumerPartitionMetadata}, topicName2:{ConsumerPartitionMetadata}} 형식으로 나타냅니다. 각 주제에 대해 ConsumerPartitionMetadata는 각 파티션의 오프셋과 메타데이터를 제공합니다.

      topic1이라는 주제에서 단일 파티션 (파티션 0)의 오프셋을 10으로 설정하려면 JSON 구성은 다음과 같습니다.{"topic1": {"partitions": { 0 : { "offset": 10, "metadata": ""}}}}

      다음은 topics.json 파일의 콘텐츠 예시입니다.

      {
        "projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_NAME/TOPICS/TOPIC_NAME": {
          "partitions": {
            "1": {
              "offset": "1",
              "metadata": "metadata"
            },
            "2": {
              "offset": "1",
              "metadata": "metadata"
            }
          }
        },
        "projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_NAME/TOPICS/OTHER_TOPIC_NAME": {
          "partitions": {
            "1": {
              "offset": "1",
              "metadata": "metadata"
            }
          }
        }
      }

    • TOPIC_PATH: JSON 또는 YAML 파일에서 주제를 지정할 때는 gcloud managed-kafak topics describe 명령어를 실행하여 가져올 수 있고 projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_ID/topics/topic 형식인 전체 주제 경로를 포함하세요. .

Go

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/option"
	"google.golang.org/protobuf/types/known/fieldmaskpb"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
)

func updateConsumerGroup(w io.Writer, projectID, region, clusterID, consumerGroupID, topicPath string, partitionOffsets map[int32]int64, opts ...option.ClientOption) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// clusterID := "my-cluster"
	// consumerGroupID := "my-consumer-group"
	// topicPath := "my-topic-path"
	// partitionOffsets := map[int32]int64{1: 10, 2: 20, 3: 30}
	ctx := context.Background()
	client, err := managedkafka.NewClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
	}
	defer client.Close()

	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
	consumerGroupPath := fmt.Sprintf("%s/consumerGroups/%s", clusterPath, consumerGroupID)

	partitionMetadata := make(map[int32]*managedkafkapb.ConsumerPartitionMetadata)
	for partition, offset := range partitionOffsets {
		partitionMetadata[partition] = &managedkafkapb.ConsumerPartitionMetadata{
			Offset: offset,
		}
	}
	topicConfig := map[string]*managedkafkapb.ConsumerTopicMetadata{
		topicPath: {
			Partitions: partitionMetadata,
		},
	}
	consumerGroupConfig := managedkafkapb.ConsumerGroup{
		Name:   consumerGroupPath,
		Topics: topicConfig,
	}
	paths := []string{"topics"}
	updateMask := &fieldmaskpb.FieldMask{
		Paths: paths,
	}

	req := &managedkafkapb.UpdateConsumerGroupRequest{
		UpdateMask:    updateMask,
		ConsumerGroup: &consumerGroupConfig,
	}
	consumerGroup, err := client.UpdateConsumerGroup(ctx, req)
	if err != nil {
		return fmt.Errorf("client.UpdateConsumerGroup got err: %w", err)
	}
	fmt.Fprintf(w, "Updated consumer group: %#v\n", consumerGroup)
	return nil
}

Java

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.managedkafka.v1.ConsumerGroup;
import com.google.cloud.managedkafka.v1.ConsumerGroupName;
import com.google.cloud.managedkafka.v1.ConsumerPartitionMetadata;
import com.google.cloud.managedkafka.v1.ConsumerTopicMetadata;
import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
import com.google.cloud.managedkafka.v1.TopicName;
import com.google.cloud.managedkafka.v1.UpdateConsumerGroupRequest;
import com.google.protobuf.FieldMask;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class UpdateConsumerGroup {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String clusterId = "my-cluster";
    String topicId = "my-topic";
    String consumerGroupId = "my-consumer-group";
    Map<Integer, Integer> partitionOffsets =
        new HashMap<Integer, Integer>() {
          {
            put(1, 10);
            put(2, 20);
            put(3, 30);
          }
        };
    updateConsumerGroup(projectId, region, clusterId, topicId, consumerGroupId, partitionOffsets);
  }

  public static void updateConsumerGroup(
      String projectId,
      String region,
      String clusterId,
      String topicId,
      String consumerGroupId,
      Map<Integer, Integer> partitionOffsets)
      throws Exception {
    TopicName topicName = TopicName.of(projectId, region, clusterId, topicId);
    ConsumerGroupName consumerGroupName =
        ConsumerGroupName.of(projectId, region, clusterId, consumerGroupId);

    Map<Integer, ConsumerPartitionMetadata> partitions =
        new HashMap<Integer, ConsumerPartitionMetadata>() {
          {
            for (Entry<Integer, Integer> partitionOffset : partitionOffsets.entrySet()) {
              ConsumerPartitionMetadata partitionMetadata =
                  ConsumerPartitionMetadata.newBuilder()
                      .setOffset(partitionOffset.getValue())
                      .build();
              put(partitionOffset.getKey(), partitionMetadata);
            }
          }
        };
    ConsumerTopicMetadata topicMetadata =
        ConsumerTopicMetadata.newBuilder().putAllPartitions(partitions).build();
    ConsumerGroup consumerGroup =
        ConsumerGroup.newBuilder()
            .setName(consumerGroupName.toString())
            .putTopics(topicName.toString(), topicMetadata)
            .build();
    FieldMask updateMask = FieldMask.newBuilder().addPaths("topics").build();

    try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) {
      UpdateConsumerGroupRequest request =
          UpdateConsumerGroupRequest.newBuilder()
              .setUpdateMask(updateMask)
              .setConsumerGroup(consumerGroup)
              .build();
      // This operation is being handled synchronously.
      ConsumerGroup response = managedKafkaClient.updateConsumerGroup(request);
      System.out.printf("Updated consumer group: %s\n", response.getName());
    } catch (IOException | ApiException e) {
      System.err.printf("managedKafkaClient.updateConsumerGroup got err: %s", e.getMessage());
    }
  }
}

Python

from google.api_core.exceptions import NotFound
from google.cloud import managedkafka_v1
from google.protobuf import field_mask_pb2

# TODO(developer)
# project_id = "my-project-id"
# region = "us-central1"
# cluster_id = "my-cluster"
# consumer_group_id = "my-consumer-group"
# topic_path = "my-topic-path"
# partition_offsets = {10: 10}

client = managedkafka_v1.ManagedKafkaClient()

consumer_group = managedkafka_v1.ConsumerGroup()
consumer_group.name = client.consumer_group_path(
    project_id, region, cluster_id, consumer_group_id
)

topic_metadata = managedkafka_v1.ConsumerTopicMetadata()
for partition, offset in partition_offsets.items():
    partition_metadata = managedkafka_v1.ConsumerPartitionMetadata(offset=offset)
    topic_metadata.partitions[partition] = partition_metadata
consumer_group.topics = {
    topic_path: topic_metadata,
}

update_mask = field_mask_pb2.FieldMask()
update_mask.paths.append("topics")

request = managedkafka_v1.UpdateConsumerGroupRequest(
    update_mask=update_mask,
    consumer_group=consumer_group,
)

try:
    response = client.update_consumer_group(request=request)
    print("Updated consumer group:", response)
except NotFound as e:
    print(f"Failed to update consumer group {consumer_group_id} with error: {e.message}")

다음 단계

Apache Kafka®는 미국 및/또는 다른 국가에서 사용되는 Apache Software Foundation 또는 해당 계열사의 등록 상표입니다.