Apache Kafka용 Google Cloud 관리형 서비스 클러스터 업데이트

Google Cloud Managed Service for Apache Kafka 클러스터를 수정하여 vCPU 수, 메모리, 서브넷, 암호화 유형, 라벨과 같은 속성을 업데이트할 수 있습니다. 브로커가 클러스터에 추가될 때 서비스가 브로커 간에 파티션을 리밸런싱할지 여부를 구성할 수도 있습니다. 서비스는 클러스터의 메모리 및 vCPU 구성을 기반으로 새 브로커를 자동으로 만듭니다.

클러스터를 수정하려면 Google Cloud 콘솔, Google Cloud CLI, 클라이언트 라이브러리 또는 Managed Kafka API를 사용하면 됩니다. 오픈소스 Apache Kafka API를 사용하여 클러스터를 업데이트할 수 없습니다.

시작하기 전에

vCPU 수 또는 메모리를 업데이트하는 경우 다음 규칙이 적용됩니다.

  • 클러스터의 전체 vCPU 대 메모리 비율은 항상 1:1~1:8 사이여야 합니다.

  • 축소하는 경우 기존 브로커마다 vCPU가 1개 이상이고 메모리가 1GiB 이상이어야 합니다. 브로커 수는 감소하지 않습니다.

  • 업스케일링을 수행하고 변경으로 인해 새 브로커가 추가되는 경우 브로커당 평균 vCPU와 메모리가 업데이트 전 평균에 비해 10% 이상 감소할 수 없습니다.

    예를 들어 클러스터를 45vCPU (브로커 3개)에서 48vCPU (브로커 4개)로 스케일 업하려고 하면 작업이 실패합니다. 브로커당 평균 vCPU가 15에서 12로 감소하여 20% 감소하므로 10% 한도를 초과하기 때문입니다.

자세한 내용은 클러스터 크기 업데이트를 참고하세요.

vCPU 수 및 메모리와 같은 특정 속성을 업데이트하려면 서비스에서 클러스터를 다시 시작해야 할 수 있습니다. 클러스터는 브로커별로 하나씩 다시 시작됩니다. 이로 인해 개별 브로커에 대한 요청이 일시적으로 실패하지만 이러한 실패는 일시적입니다. 일반적으로 사용되는 클라이언트 라이브러리는 이러한 오류를 자동으로 처리합니다.

클러스터 이름, 클러스터 위치 또는 암호화 유형은 수정할 수 없습니다.

클러스터 수정에 필요한 역할 및 권한

클러스터를 업데이트하는 데 필요한 권한을 얻으려면 관리자에게 프로젝트에 대한 관리형 Kafka 클러스터 편집자 (roles/managedkafka.clusterEditor) IAM 역할을 부여해 달라고 요청하세요. 역할 부여에 대한 자세한 내용은 프로젝트, 폴더, 조직에 대한 액세스 관리를 참조하세요.

이 사전 정의된 역할에는 클러스터를 업데이트하는 데 필요한 권한이 포함되어 있습니다. 필요한 정확한 권한을 보려면 필수 권한 섹션을 펼치세요.

필수 권한

클러스터를 업데이트하려면 다음 권한이 필요합니다.

  • 클러스터 수정: managedkafka.clusters.update

커스텀 역할이나 다른 사전 정의된 역할을 사용하여 이 권한을 부여받을 수도 있습니다.

관리형 Kafka 클러스터 편집자 역할로는 Apache Kafka용 관리형 서비스 클러스터에서 주제와 소비자 그룹을 만들거나 삭제하거나 수정할 수 없습니다. 또한 클러스터 내에서 메시지를 게시하거나 소비하기 위한 데이터 플레인 액세스도 허용하지 않습니다. 이 역할에 대한 자세한 내용은 Apache Kafka용 관리형 서비스 사전 정의된 역할을 참고하세요.

클러스터 수정

클러스터를 수정하려면 다음 단계를 따르세요.

콘솔

  1. Google Cloud 콘솔에서 클러스터 페이지로 이동합니다.

    클러스터로 이동

  2. 클러스터 목록에서 속성을 수정할 클러스터를 클릭합니다.

    클러스터 세부정보 페이지가 표시됩니다.

  3. 클러스터 세부정보 페이지에서 수정을 클릭합니다.

  4. 필요에 따라 속성을 수정합니다. 클러스터의 다음 속성은 콘솔에서 수정할 수 있습니다.

    • 메모리
    • vCPU
    • 서브넷
    • 재분산 구성
    • mTLS 구성
    • 라벨
  5. 저장을 클릭합니다.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. gcloud managed-kafka clusters update 명령어를 실행합니다.

    gcloud managed-kafka clusters update CLUSTER_ID \
        --location=LOCATION \
        --cpu=CPU \
        --memory=MEMORY \
        --subnets=SUBNETS \
        --auto-rebalance \
        --labels=LABELS
    

    다음을 바꿉니다.

    • CLUSTER_ID: 클러스터의 ID 또는 이름입니다. 이 값은 업데이트할 수 없습니다.
    • LOCATION: 클러스터의 위치입니다. 이 값은 업데이트할 수 없습니다.
    • CPU: 클러스터의 가상 CPU 수입니다.
    • MEMORY: 클러스터의 메모리 양입니다. 'MB', 'MiB', 'GB', 'GiB', 'TB', 'TiB' 단위를 사용하세요. 예: '10GiB'
    • SUBNETS: 연결할 서브넷 목록입니다. 쉼표를 사용하여 여러 서브넷 값을 구분합니다.
    • auto-rebalance: 클러스터의 CPU 수가 변경될 때 브로커 간에 주제 파티션의 자동 재분산을 사용 설정합니다. 이 기능은 기본적으로 사용 설정되어 있습니다.
    • LABELS: 클러스터와 연결할 라벨입니다.
  3. 명령어와 함께 --async 플래그를 사용하면 시스템은 작업이 완료될 때까지 기다리지 않고 업데이트 요청을 전송하고 즉시 응답을 반환합니다. --async 플래그를 사용하면 클러스터 업데이트가 백그라운드에서 진행되는 동안 다른 작업을 계속할 수 있습니다. --async 플래그를 사용하지 않으면 시스템은 작업이 완료될 때까지 기다린 후 응답을 반환합니다. 클러스터가 완전히 업데이트될 때까지 기다려야 다른 작업을 계속할 수 있습니다.

    REST

    요청 데이터를 사용하기 전에 다음을 바꿉니다.

    • PROJECT_ID: Google Cloud 프로젝트 ID
    • LOCATION: 클러스터의 위치
    • CLUSTER_ID: 클러스터의 ID
    • UPDATE_MASK: 업데이트할 필드(정규화된 이름의 쉼표로 구분된 목록) 예를 들면 capacityConfig.vcpuCount,capacityConfig.memoryBytes입니다.
    • CPU_COUNT: 클러스터의 vCPU 수
    • MEMORY: 클러스터의 메모리 양(바이트)
    • SUBNET_ID: 연결할 서브넷의 서브넷 ID

    HTTP 메서드 및 URL:

    PATCH https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID?updateMask=UPDATE_MASK

    JSON 요청 본문:

    {
      "capacityConfig": {
        "vcpuCount": CPU_COUNT,
        "memoryBytes": MEMORY
      },
      "gcpConfig": {
        "accessConfig": {
          "networkConfigs": [
            {
              "subnet": "projects/PROJECT_ID/regions/LOCATION/subnetworks/SUBNET_ID"
            }
          ]
        }
      }
    }
    

    요청을 보내려면 다음 옵션 중 하나를 펼칩니다.

    다음과 비슷한 JSON 응답이 표시됩니다.

    {
      "name": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC_ID",
      "partitionCount": PARTITION_COUNT,
      "replicationFactor": REPLICATION_FACTOR
    }
    

    요청 본문에는 UPDATE_MASK 쿼리 매개변수에 지정된 대로 업데이트하는 필드만 포함합니다. 서브넷을 추가하려면 networkConfigs에 새 항목을 추가합니다.

    Go

    이 샘플을 사용해 보기 전에 클라이언트 라이브러리 설치의 Go 설정 안내를 따르세요. 자세한 내용은 Apache Kafka용 관리형 서비스 Go API 참조 문서를 참고하세요.

    Managed Service for Apache Kafka에 인증하려면 애플리케이션 기본 사용자 인증 정보(ADC)를 설정합니다. 자세한 내용은 로컬 개발 환경의 ADC 설정을 참고하세요.

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    	"google.golang.org/protobuf/types/known/fieldmaskpb"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func updateCluster(w io.Writer, projectID, region, clusterID string, memory int64, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// clusterID := "my-cluster"
    	// memoryBytes := 4221225472
    	ctx := context.Background()
    	client, err := managedkafka.NewClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
    	}
    	defer client.Close()
    
    	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
    	capacityConfig := &managedkafkapb.CapacityConfig{
    		MemoryBytes: memory,
    	}
    	cluster := &managedkafkapb.Cluster{
    		Name:           clusterPath,
    		CapacityConfig: capacityConfig,
    	}
    	paths := []string{"capacity_config.memory_bytes"}
    	updateMask := &fieldmaskpb.FieldMask{
    		Paths: paths,
    	}
    
    	req := &managedkafkapb.UpdateClusterRequest{
    		UpdateMask: updateMask,
    		Cluster:    cluster,
    	}
    	op, err := client.UpdateCluster(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.UpdateCluster got err: %w", err)
    	}
    	resp, err := op.Wait(ctx)
    	if err != nil {
    		return fmt.Errorf("op.Wait got err: %w", err)
    	}
    	fmt.Fprintf(w, "Updated cluster: %#v\n", resp)
    	return nil
    }
    

    자바

    이 샘플을 시도하기 전에 클라이언트 라이브러리 설치의 Java 설정 안내를 따르세요. 자세한 내용은 Apache Kafka용 관리형 서비스 Java API 참조 문서를 참고하세요.

    Managed Service for Apache Kafka에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 ADC 설정을 참고하세요.

    
    import com.google.api.gax.longrunning.OperationFuture;
    import com.google.api.gax.longrunning.OperationSnapshot;
    import com.google.api.gax.longrunning.OperationTimedPollAlgorithm;
    import com.google.api.gax.retrying.RetrySettings;
    import com.google.api.gax.retrying.TimedRetryAlgorithm;
    import com.google.cloud.managedkafka.v1.CapacityConfig;
    import com.google.cloud.managedkafka.v1.Cluster;
    import com.google.cloud.managedkafka.v1.ClusterName;
    import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
    import com.google.cloud.managedkafka.v1.ManagedKafkaSettings;
    import com.google.cloud.managedkafka.v1.OperationMetadata;
    import com.google.cloud.managedkafka.v1.UpdateClusterRequest;
    import com.google.protobuf.FieldMask;
    import java.time.Duration;
    import java.util.concurrent.ExecutionException;
    
    public class UpdateCluster {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-cluster";
        long memoryBytes = 25769803776L; // 24 GiB
        updateCluster(projectId, region, clusterId, memoryBytes);
      }
    
      public static void updateCluster(
          String projectId, String region, String clusterId, long memoryBytes) throws Exception {
        CapacityConfig capacityConfig = CapacityConfig.newBuilder().setMemoryBytes(memoryBytes).build();
        Cluster cluster =
            Cluster.newBuilder()
                .setName(ClusterName.of(projectId, region, clusterId).toString())
                .setCapacityConfig(capacityConfig)
                .build();
        FieldMask updateMask = FieldMask.newBuilder().addPaths("capacity_config.memory_bytes").build();
    
        // Create the settings to configure the timeout for polling operations
        ManagedKafkaSettings.Builder settingsBuilder = ManagedKafkaSettings.newBuilder();
        TimedRetryAlgorithm timedRetryAlgorithm = OperationTimedPollAlgorithm.create(
            RetrySettings.newBuilder()
                .setTotalTimeoutDuration(Duration.ofHours(1L))
                .build());
        settingsBuilder.updateClusterOperationSettings()
            .setPollingAlgorithm(timedRetryAlgorithm);
    
        try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create(
            settingsBuilder.build())) {
          UpdateClusterRequest request =
              UpdateClusterRequest.newBuilder().setUpdateMask(updateMask).setCluster(cluster).build();
          OperationFuture<Cluster, OperationMetadata> future =
              managedKafkaClient.updateClusterOperationCallable().futureCall(request);
    
          // Get the initial LRO and print details. CreateCluster contains sample code for polling logs.
          OperationSnapshot operation = future.getInitialFuture().get();
          System.out.printf("Cluster update started. Operation name: %s\nDone: %s\nMetadata: %s\n",
              operation.getName(),
              operation.isDone(),
              future.getMetadata().get().toString());
    
          Cluster response = future.get();
          System.out.printf("Updated cluster: %s\n", response.getName());
        } catch (ExecutionException e) {
          System.err.printf("managedKafkaClient.updateCluster got err: %s", e.getMessage());
        }
      }
    }
    

    Python

    이 샘플을 시도하기 전에 클라이언트 라이브러리 설치의 Python 설정 안내를 따르세요. 자세한 내용은 Apache Kafka용 관리형 서비스 Python API 참조 문서를 참고하세요.

    Managed Service for Apache Kafka에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 ADC 설정을 참고하세요.

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud import managedkafka_v1
    from google.protobuf import field_mask_pb2
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # cluster_id = "my-cluster"
    # memory_bytes = 4295000000
    
    client = managedkafka_v1.ManagedKafkaClient()
    
    cluster = managedkafka_v1.Cluster()
    cluster.name = client.cluster_path(project_id, region, cluster_id)
    cluster.capacity_config.memory_bytes = memory_bytes
    update_mask = field_mask_pb2.FieldMask()
    update_mask.paths.append("capacity_config.memory_bytes")
    
    # For a list of editable fields, one can check https://cloud.google.com/managed-kafka/docs/create-cluster#properties.
    request = managedkafka_v1.UpdateClusterRequest(
        update_mask=update_mask,
        cluster=cluster,
    )
    
    try:
        operation = client.update_cluster(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        response = operation.result()
        print("Updated cluster:", response)
    except GoogleAPICallError as e:
        print(f"The operation failed with error: {e.message}")
    

다음 단계

Apache Kafka®는 미국 및/또는 다른 국가에서 사용되는 Apache Software Foundation 또는 해당 계열사의 등록 상표입니다.