Apache Kafka용 Google Cloud 관리형 서비스 주제 업데이트

주제를 만든 후 주제 구성을 수정하여 파티션 수와 클러스터 수준에서 이미 설정된 속성이 기본값이 아닌 주제 구성을 비롯한 속성을 업데이트할 수 있습니다. 파티션 수는 늘릴 수만 있고 줄일 수는 없습니다.

단일 주제를 업데이트하려면 Google Cloud 콘솔, Google Cloud CLI, 클라이언트 라이브러리, 관리형 Kafka API 또는 오픈소스 Apache Kafka API를 사용하면 됩니다.

주제를 수정하는 데 필요한 역할 및 권한

주제를 수정하는 데 필요한 권한을 얻으려면 관리자에게 프로젝트에 대한 관리형 Kafka 주제 편집자(roles/managedkafka.topicEditor) IAM 역할을 부여해 달라고 요청하세요. 역할 부여에 대한 상세 설명은 프로젝트, 폴더, 조직에 대한 액세스 관리를 참조하세요.

이 사전 정의된 역할에는 주제를 수정하는 데 필요한 권한이 포함되어 있습니다. 필요한 정확한 권한을 보려면 필수 권한 섹션을 펼치세요.

필수 권한

주제를 수정하려면 다음 권한이 필요합니다.

  • 주제 업데이트: managedkafka.topics.update

커스텀 역할이나 다른 사전 정의된 역할을 사용하여 이 권한을 부여받을 수도 있습니다.

이 역할에 대한 자세한 내용은 Apache Kafka용 관리형 서비스 사전 정의된 역할을 참고하세요.

주제 수정

주제를 수정하려면 다음 단계를 따르세요.

콘솔

  1. Google Cloud 콘솔에서 클러스터 페이지로 이동합니다.

    클러스터로 이동

    프로젝트에서 만든 클러스터가 나열됩니다.

  2. 수정하려는 주제가 속한 클러스터를 클릭합니다.

    클러스터 세부정보 페이지가 열립니다. 클러스터 세부정보 페이지의 리소스 탭에 주제가 나열됩니다.

  3. 수정하려는 주제를 클릭합니다.

    주제 세부정보 페이지가 열립니다.

  4. 수정하려면 수정을 클릭합니다.

  5. 변경 후 저장을 클릭합니다.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. gcloud managed-kafka topics update 명령어를 실행합니다.

    gcloud managed-kafka topics update TOPIC_ID \
      --cluster=CLUSTER_ID \
      --location=LOCATION_ID \
      --partitions=PARTITIONS \
      --configs=CONFIGS
    

    이 명령어는 지정된 Apache Kafka용 관리형 서비스 클러스터의 기존 주제 구성을 수정합니다. 이 명령어를 사용하여 파티션 수를 늘리고 주제 수준 구성 설정을 업데이트할 수 있습니다.

    다음을 바꿉니다.

    • TOPIC_ID: 주제의 ID입니다.
    • CLUSTER_ID: 주제를 포함하는 클러스터의 ID입니다.
    • LOCATION_ID: 클러스터의 위치입니다.
    • PARTITIONS: 선택사항: 주제의 업데이트된 파티션 수입니다. 파티션 수는 늘릴 수만 있고 줄일 수는 없습니다.
    • CONFIGS: 선택사항: 업데이트할 구성 설정 목록입니다. 쉼표로 구분된 키-값 쌍 목록으로 지정합니다. 예를 들면 retention.ms=3600000,retention.bytes=10000000입니다.
  3. REST

    요청 데이터를 사용하기 전에 다음을 바꿉니다.

    • PROJECT_ID: Google Cloud 프로젝트 ID
    • LOCATION: 클러스터의 위치
    • CLUSTER_ID: 클러스터의 ID
    • TOPIC_ID: 주제의 ID
    • UPDATE_MASK: 업데이트할 필드(정규화된 이름의 쉼표로 구분된 목록) 예를 들면 partitionCount입니다.
    • PARTITION_COUNT: 주제의 업데이트된 파티션 수

    HTTP 메서드 및 URL:

    PATCH https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC_ID?updateMask=UPDATE_MASK

    JSON 요청 본문:

    {
      "name": "TOPIC_ID",
      "partitionCount": PARTITION_COUNT
    }
    

    요청을 보내려면 다음 옵션 중 하나를 펼칩니다.

    다음과 비슷한 JSON 응답이 표시됩니다.

    {
      "name": "projects/PROJECT_ID/locations/LOCATION/operations/OPERATION_ID",
      "metadata": {
        "@type": "type.googleapis.com/google.cloud.managedkafka.v1.OperationMetadata",
        "createTime": "CREATE_TIME",
        "target": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID",
        "verb": "update",
        "requestedCancellation": false,
        "apiVersion": "v1"
      },
      "done": false
    }
    

    Go

    이 샘플을 사용해 보기 전에 클라이언트 라이브러리 설치의 Go 설정 안내를 따르세요. 자세한 내용은 Apache Kafka용 관리형 서비스 Go API 참조 문서를 참고하세요.

    Managed Service for Apache Kafka에 인증하려면 애플리케이션 기본 사용자 인증 정보(ADC)를 설정합니다. 자세한 내용은 로컬 개발 환경의 ADC 설정을 참고하세요.

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    	"google.golang.org/protobuf/types/known/fieldmaskpb"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func updateTopic(w io.Writer, projectID, region, clusterID, topicID string, partitionCount int32, configs map[string]string, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// clusterID := "my-cluster"
    	// topicID := "my-topic"
    	// partitionCount := 20
    	// configs := map[string]string{"min.insync.replicas":"1"}
    	ctx := context.Background()
    	client, err := managedkafka.NewClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
    	}
    	defer client.Close()
    
    	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
    	topicPath := fmt.Sprintf("%s/topics/%s", clusterPath, topicID)
    	TopicConfig := managedkafkapb.Topic{
    		Name:           topicPath,
    		PartitionCount: partitionCount,
    		Configs:        configs,
    	}
    	paths := []string{"partition_count", "configs"}
    	updateMask := &fieldmaskpb.FieldMask{
    		Paths: paths,
    	}
    
    	req := &managedkafkapb.UpdateTopicRequest{
    		UpdateMask: updateMask,
    		Topic:      &TopicConfig,
    	}
    	topic, err := client.UpdateTopic(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.UpdateTopic got err: %w", err)
    	}
    	fmt.Fprintf(w, "Updated topic: %#v\n", topic)
    	return nil
    }
    

    자바

    이 샘플을 시도하기 전에 클라이언트 라이브러리 설치의 Java 설정 안내를 따르세요. 자세한 내용은 Apache Kafka용 관리형 서비스 Java API 참조 문서를 참고하세요.

    Managed Service for Apache Kafka에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 ADC 설정을 참고하세요.

    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
    import com.google.cloud.managedkafka.v1.Topic;
    import com.google.cloud.managedkafka.v1.TopicName;
    import com.google.cloud.managedkafka.v1.UpdateTopicRequest;
    import com.google.protobuf.FieldMask;
    import java.io.IOException;
    import java.util.Arrays;
    import java.util.HashMap;
    import java.util.Map;
    
    public class UpdateTopic {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-cluster";
        String topicId = "my-topic";
        int partitionCount = 200;
        Map<String, String> configs =
            new HashMap<String, String>() {
              {
                put("min.insync.replicas", "1");
              }
            };
        updateTopic(projectId, region, clusterId, topicId, partitionCount, configs);
      }
    
      public static void updateTopic(
          String projectId,
          String region,
          String clusterId,
          String topicId,
          int partitionCount,
          Map<String, String> configs)
          throws Exception {
        Topic topic =
            Topic.newBuilder()
                .setName(TopicName.of(projectId, region, clusterId, topicId).toString())
                .setPartitionCount(partitionCount)
                .putAllConfigs(configs)
                .build();
        String[] paths = {"partition_count", "configs"};
        FieldMask updateMask = FieldMask.newBuilder().addAllPaths(Arrays.asList(paths)).build();
        try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) {
          UpdateTopicRequest request =
              UpdateTopicRequest.newBuilder().setUpdateMask(updateMask).setTopic(topic).build();
          // This operation is being handled synchronously.
          Topic response = managedKafkaClient.updateTopic(request);
          System.out.printf("Updated topic: %s\n", response.getName());
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaClient.updateCluster got err: %s", e.getMessage());
        }
      }
    }
    

    Python

    이 샘플을 시도하기 전에 클라이언트 라이브러리 설치의 Python 설정 안내를 따르세요. 자세한 내용은 Apache Kafka용 관리형 서비스 Python API 참조 문서를 참고하세요.

    Managed Service for Apache Kafka에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 ADC 설정을 참고하세요.

    from google.api_core.exceptions import NotFound
    from google.cloud import managedkafka_v1
    from google.protobuf import field_mask_pb2
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # cluster_id = "my-cluster"
    # topic_id = "my-topic"
    # partition_count = 20
    # configs = {"min.insync.replicas": "1"}
    
    client = managedkafka_v1.ManagedKafkaClient()
    
    topic = managedkafka_v1.Topic()
    topic.name = client.topic_path(project_id, region, cluster_id, topic_id)
    topic.partition_count = partition_count
    topic.configs = configs
    update_mask = field_mask_pb2.FieldMask()
    update_mask.paths.extend(["partition_count", "configs"])
    
    # For a list of editable fields, one can check https://cloud.google.com/managed-kafka/docs/create-topic#properties.
    request = managedkafka_v1.UpdateTopicRequest(
        update_mask=update_mask,
        topic=topic,
    )
    
    try:
        response = client.update_topic(request=request)
        print("Updated topic:", response)
    except NotFound as e:
        print(f"Failed to update topic {topic_id} with error: {e.message}")
    

메시지 보관 구성

Kafka는 로그 세그먼트 파일에 메시지를 저장합니다. 기본적으로 Kafka는 보관 기간이 지나거나 파티션이 데이터 크기 기준점을 초과하면 세그먼트 파일을 삭제합니다. 로그 압축을 사용 설정하여 이 동작을 변경할 수 있습니다. 로그 압축이 사용 설정된 경우 Kafka는 각 키의 최신 값만 유지합니다.

Apache Kafka용 Google Cloud 관리형 서비스는 계층화된 스토리지를 사용합니다. 즉, 완료된 로그 세그먼트는 로컬 스토리지가 아닌 원격으로 저장됩니다. 계층형 스토리지에 대해 자세히 알아보려면 Apache Kafka 문서의 계층형 스토리지를 참고하세요.

보관 값 설정

로그 압축이 사용 설정되지 않은 경우 다음 설정은 Kafka가 로그 세그먼트 파일을 저장하는 방식을 제어합니다.

  • retention.ms: 세그먼트 파일을 저장할 최대 시간(밀리초)입니다.
  • retention.bytes: 파티션당 저장할 최대 바이트 수입니다. 파티션의 데이터가 이 값을 초과하면 Kafka에서 이전 세그먼트 파일을 삭제합니다.

이러한 설정을 업데이트하려면 gcloud CLI 또는 Kafka CLI를 사용하세요.

gcloud

메시지 보관을 설정하려면 gcloud managed-kafka topics update 명령어를 실행합니다.

gcloud managed-kafka topics update TOPIC_ID \
  --cluster=CLUSTER_ID \
  --location=LOCATION_ID \
  --configs=retention.ms=RETENTION_PERIOD,retention.bytes=MAX_BYTES

다음을 바꿉니다.

  • TOPIC_ID: 주제의 ID입니다.
  • CLUSTER_ID: 주제가 포함된 클러스터의 ID입니다.
  • LOCATION_ID: 클러스터의 위치입니다.
  • RETENTION_PERIOD: 세그먼트 파일을 저장할 최대 시간(밀리초)입니다.
  • MAX_BYTES: 파티션당 저장할 최대 바이트 수입니다.

Kafka CLI

이 명령어를 실행하기 전에 Compute Engine VM에 Kafka 명령줄 도구를 설치하세요. VM은 Apache Kafka용 관리형 서비스 클러스터에 연결된 서브넷에 도달할 수 있어야 합니다. Kafka 명령줄 도구로 메시지 생성 및 사용의 안내를 따릅니다.

kafka-configs.sh 명령어를 실행합니다.

kafka-configs.sh --alter \
  --bootstrap-server=BOOTSTRAP_ADDRESS \
  --command-config client.properties \
  --entity-type topics \
  --entity-name TOPIC_ID \
  --add-config retention.ms=RETENTION_PERIOD,retention.bytes=MAX_BYTES

다음을 바꿉니다.

  • BOOTSTRAP_ADDRESS: Managed Service for Apache Kafka 클러스터의 부트스트랩 주소입니다.
  • TOPIC_ID: 주제의 ID입니다.
  • RETENTION_PERIOD: 세그먼트 파일을 저장할 최대 시간(밀리초)입니다.
  • MAX_BYTES: 파티션당 저장할 최대 바이트 수입니다.

로그 압축 사용 설정

로그 압축이 사용 설정된 경우 Kafka는 각 키의 최신 메시지만 저장합니다. 로그 압축은 기본적으로 사용 중지되어 있습니다. 주제에 대해 로그 압축을 사용 설정하려면 다음과 같이 cleanup.policy 구성을 "compact"로 설정합니다.

gcloud

gcloud managed-kafka topics update 명령어를 실행합니다.

gcloud managed-kafka topics update TOPIC_ID \
  --cluster=CLUSTER_ID \
  --location=LOCATION_ID \
  --configs=cleanup.policy=compact

다음을 바꿉니다.

  • TOPIC_ID: 주제의 ID입니다.
  • CLUSTER_ID: 주제가 포함된 클러스터의 ID입니다.
  • LOCATION_ID: 클러스터의 위치입니다.

Kafka CLI

이 명령어를 실행하기 전에 Compute Engine VM에 Kafka 명령줄 도구를 설치하세요. VM은 Apache Kafka용 관리형 서비스 클러스터에 연결된 서브넷에 도달할 수 있어야 합니다. Kafka 명령줄 도구로 메시지 생성 및 사용의 안내를 따릅니다.

kafka-configs.sh 명령어를 실행합니다.

kafka-configs.sh --alter \
  --bootstrap-server=BOOTSTRAP_ADDRESS \
  --command-config client.properties \
  --entity-type topics \
  --entity-name TOPIC_ID \
  --add-config cleanup.policy=compact

다음을 바꿉니다.

  • BOOTSTRAP_ADDRESS: Managed Service for Apache Kafka 클러스터의 부트스트랩 주소입니다.
  • TOPIC_ID: 주제의 ID입니다.

제한사항

  • remote.storage.enable와 같은 원격 스토리지의 주제 구성을 재정의할 수 없습니다.

  • segment.bytes와 같은 로그 세그먼트 파일의 주제 구성은 재정의할 수 없습니다.

  • 주제에 대해 로그 압축을 사용 설정하면 해당 주제에 대해 계층화된 스토리지가 암시적으로 사용 중지됩니다. 주제의 모든 로그 파일이 로컬에 저장됩니다.

다음 단계