Connect 클러스터 업데이트

Connect 클러스터를 수정하여 vCPU 수, 메모리, 네트워크, 라벨과 같은 속성을 업데이트할 수 있습니다.

Connect 클러스터를 수정하려면 Google Cloud 콘솔, gcloud CLI, 클라이언트 라이브러리 또는 관리형 Kafka API를 사용하면 됩니다. 오픈소스 Apache Kafka API를 사용하여 Connect 클러스터를 업데이트할 수는 없습니다.

시작하기 전에

Connect 클러스터의 일부 속성은 수정할 수 없습니다. 업데이트하기 전에 Connect 클러스터의 속성을 검토하세요.

Connect 클러스터를 수정하는 데 필요한 역할 및 권한

Connect 클러스터를 수정하는 데 필요한 권한을 얻으려면 관리자에게 프로젝트에 대한 관리형 Kafka Connect 클러스터 편집자 (roles/managedkafka.connectClusterEditor) IAM 역할을 부여해 달라고 요청하세요. 역할 부여에 대한 자세한 내용은 프로젝트, 폴더, 조직에 대한 액세스 관리를 참조하세요.

이 사전 정의된 역할에는 Connect 클러스터를 수정하는 데 필요한 권한이 포함되어 있습니다. 필요한 정확한 권한을 보려면 필수 권한 섹션을 펼치세요.

필수 권한

Connect 클러스터를 수정하려면 다음 권한이 필요합니다.

  • 지정된 위치에 업데이트 Connect 클러스터 권한을 부여합니다. managedkafka.connectClusters.update
  • 지정된 위치에 Connect 클러스터 보기 권한을 부여합니다. 이 권한은 Google Cloud 콘솔을 사용하여 Connect 클러스터를 업데이트하는 경우에만 필요합니다. managedkafka.connectors.list

커스텀 역할이나 다른 사전 정의된 역할을 사용하여 이 권한을 부여받을 수도 있습니다.

이 역할에 대한 자세한 내용은 Apache Kafka용 관리형 서비스 사전 정의된 역할을 참고하세요.

연결 클러스터 수정

CPU 및 메모리와 같은 특정 속성을 업데이트하려면 클러스터를 다시 시작해야 합니다.

클러스터를 다시 시작하면 데이터가 보존되지만 지연 시간이 늘어날 수 있습니다. 클러스터의 초기 작업자 수가 다시 시작 기간을 결정합니다.

다음 Connect 클러스터 속성을 업데이트할 수 있습니다.

속성 수정 가능
vCPU
메모리
네트워크
작업자 서브넷
변환 가능한 DNS 도메인 예 (추가/삭제)
Connect 클러스터 이름 아니요
Kafka 클러스터 아니요
위치 아니요
라벨 예 (추가/수정/삭제)
보안 비밀 예 (추가/삭제)

콘솔

  1. Google Cloud 콘솔에서 클러스터 연결 페이지로 이동합니다.

    클러스터 연결로 이동

  2. 업데이트하려는 Connect 클러스터를 클릭합니다.

    클러스터 연결 세부정보 페이지가 표시됩니다.

  3. 수정을 클릭합니다.

    Kafka Connect 클러스터 수정 페이지가 표시됩니다.

  4. 수정 가능한 속성을 필요한 대로 변경합니다.

  5. 저장을 클릭합니다.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. gcloud managed-kafka connect-clusters update 명령어를 실행합니다.

    gcloud managed-kafka connect-clusters update CONNECT_CLUSTER_ID \
        --location=LOCATION \
        [--cpu=CPU --memory=MEMORY
         | --clear-dns-names \
         | --dns-name=DNS_NAME --clear-labels \
         | --labels=LABELS --clear-secrets \
         | --secret=SECRET [--primary-subnet=WORKER_SUBNET \
        [--async]
    

    다음을 바꿉니다.

    • CONNECT_CLUSTER_ID: Connect 클러스터의 ID 또는 이름입니다. Connect 클러스터의 이름은 변경할 수 없습니다.
    • LOCATION: Connect 클러스터의 위치입니다. Connect 클러스터의 위치는 변경할 수 없습니다.
    • CPU: Connect 클러스터의 vCPU 수입니다. 최솟값은 vCPU 3개입니다.
    • MEMORY: Connect 클러스터의 메모리 양입니다. 'MB', 'MiB', 'GB', 'GiB', 'TB', 'TiB' 단위를 사용하세요. 예를 들어 '10GiB'입니다. vCPU당 1GiB에서 8GiB 사이로 프로비저닝해야 합니다.

    • DNS_NAME: Connect Cluster에 표시할 서브넷 네트워크의 DNS 도메인 이름입니다.
    • LABELS: (선택사항) 클러스터와 연결할 라벨입니다. 라벨 형식에 대한 자세한 내용은 라벨을 참고하세요. 추가할 라벨 KEY=VALUE 쌍의 목록입니다. 키는 소문자로 시작해야 하고 하이픈 (-), 밑줄 (_), 소문자, 숫자만 포함할 수 있습니다. 값은 하이픈 (-), 밑줄 (_), 소문자, 숫자만 포함해야 합니다.
    • SECRET: (선택사항) 작업자에 로드할 보안 비밀입니다. Secret Manager의 정확한 보안 비밀 버전을 제공해야 합니다. 별칭은 지원되지 않습니다. 하나의 클러스터에 최대 32개의 보안 비밀을 로드할 수 있습니다. 형식: projects/PROJECT_ID/secrets/SECRET_NAME/versions/VERSION_ID
    • WORKER_SUBNET: Connect 클러스터의 작업자 서브넷입니다. 작업자 서브넷은 Connect 클러스터와 동일한 리전에 있어야 합니다.

      서브넷의 형식은 projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_ID입니다.

  3. Go

    이 샘플을 사용해 보기 전에 클라이언트 라이브러리 설치의 Go 설정 안내를 따르세요. 자세한 내용은 Apache Kafka용 관리형 서비스 Go API 참조 문서를 참고하세요.

    Managed Service for Apache Kafka에 인증하려면 애플리케이션 기본 사용자 인증 정보(ADC)를 설정합니다. 자세한 내용은 로컬 개발 환경의 ADC 설정을 참고하세요.

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    	"google.golang.org/protobuf/types/known/fieldmaskpb"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func updateConnectCluster(w io.Writer, projectID, region, clusterID string, memoryBytes int64, labels map[string]string, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// clusterID := "my-connect-cluster"
    	// memoryBytes := 25769803776 // 24 GiB in bytes
    	// labels := map[string]string{"environment": "production"}
    	ctx := context.Background()
    	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
    	}
    	defer client.Close()
    
    	clusterPath := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s", projectID, region, clusterID)
    
    	// Capacity configuration update
    	capacityConfig := &managedkafkapb.CapacityConfig{
    		MemoryBytes: memoryBytes,
    	}
    
    	connectCluster := &managedkafkapb.ConnectCluster{
    		Name:           clusterPath,
    		CapacityConfig: capacityConfig,
    		Labels:         labels,
    	}
    	paths := []string{"capacity_config.memory_bytes", "labels"}
    	updateMask := &fieldmaskpb.FieldMask{
    		Paths: paths,
    	}
    
    	req := &managedkafkapb.UpdateConnectClusterRequest{
    		UpdateMask:     updateMask,
    		ConnectCluster: connectCluster,
    	}
    	op, err := client.UpdateConnectCluster(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.UpdateConnectCluster got err: %w", err)
    	}
    	resp, err := op.Wait(ctx)
    	if err != nil {
    		return fmt.Errorf("op.Wait got err: %w", err)
    	}
    	fmt.Fprintf(w, "Updated connect cluster: %#v\n", resp)
    	return nil
    }
    

    자바

    이 샘플을 시도하기 전에 클라이언트 라이브러리 설치의 Java 설정 안내를 따르세요. 자세한 내용은 Apache Kafka용 관리형 서비스 Java API 참조 문서를 참고하세요.

    Managed Service for Apache Kafka에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 ADC 설정을 참고하세요.

    
    import com.google.api.gax.longrunning.OperationFuture;
    import com.google.api.gax.longrunning.OperationSnapshot;
    import com.google.api.gax.longrunning.OperationTimedPollAlgorithm;
    import com.google.api.gax.retrying.RetrySettings;
    import com.google.api.gax.retrying.TimedRetryAlgorithm;
    import com.google.cloud.managedkafka.v1.CapacityConfig;
    import com.google.cloud.managedkafka.v1.ConnectCluster;
    import com.google.cloud.managedkafka.v1.ConnectClusterName;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectSettings;
    import com.google.cloud.managedkafka.v1.OperationMetadata;
    import com.google.cloud.managedkafka.v1.UpdateConnectClusterRequest;
    import com.google.protobuf.FieldMask;
    import java.time.Duration;
    import java.util.concurrent.ExecutionException;
    
    public class UpdateConnectCluster {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-connect-cluster";
        long memoryBytes = 25769803776L; // 24 GiB
        updateConnectCluster(projectId, region, clusterId, memoryBytes);
      }
    
      public static void updateConnectCluster(
          String projectId, String region, String clusterId, long memoryBytes) throws Exception {
        CapacityConfig capacityConfig = CapacityConfig.newBuilder().setMemoryBytes(memoryBytes).build();
        ConnectCluster connectCluster = ConnectCluster.newBuilder()
            .setName(ConnectClusterName.of(projectId, region, clusterId).toString())
            .setCapacityConfig(capacityConfig)
            .build();
        FieldMask updateMask = FieldMask.newBuilder().addPaths("capacity_config.memory_bytes").build();
    
        // Create the settings to configure the timeout for polling operations
        ManagedKafkaConnectSettings.Builder settingsBuilder = ManagedKafkaConnectSettings.newBuilder();
        TimedRetryAlgorithm timedRetryAlgorithm = OperationTimedPollAlgorithm.create(
            RetrySettings.newBuilder()
                .setTotalTimeoutDuration(Duration.ofHours(1L))
                .build());
        settingsBuilder.updateConnectClusterOperationSettings()
            .setPollingAlgorithm(timedRetryAlgorithm);
    
        try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create(
            settingsBuilder.build())) {
          UpdateConnectClusterRequest request = UpdateConnectClusterRequest.newBuilder()
              .setUpdateMask(updateMask)
              .setConnectCluster(connectCluster).build();
          OperationFuture<ConnectCluster, OperationMetadata> future = managedKafkaConnectClient
              .updateConnectClusterOperationCallable().futureCall(request);
    
          // Get the initial LRO and print details. CreateConnectCluster contains sample
          // code for polling logs.
          OperationSnapshot operation = future.getInitialFuture().get();
          System.out.printf(
              "Connect cluster update started. Operation name: %s\nDone: %s\nMetadata: %s\n",
              operation.getName(),
              operation.isDone(),
              future.getMetadata().get().toString());
    
          ConnectCluster response = future.get();
          System.out.printf("Updated connect cluster: %s\n", response.getName());
        } catch (ExecutionException e) {
          System.err.printf("managedKafkaConnectClient.updateConnectCluster got err: %s\n", 
              e.getMessage());
        }
      }
    }
    

    Python

    이 샘플을 시도하기 전에 클라이언트 라이브러리 설치의 Python 설정 안내를 따르세요. 자세한 내용은 Apache Kafka용 관리형 서비스 Python API 참조 문서를 참고하세요.

    Managed Service for Apache Kafka에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 ADC 설정을 참고하세요.

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud import managedkafka_v1
    from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
        ManagedKafkaConnectClient,
    )
    from google.cloud.managedkafka_v1.types import ConnectCluster
    from google.protobuf import field_mask_pb2
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # connect_cluster_id = "my-connect-cluster"
    # memory_bytes = 4295000000
    
    connect_client = ManagedKafkaConnectClient()
    
    connect_cluster = ConnectCluster()
    connect_cluster.name = connect_client.connect_cluster_path(
        project_id, region, connect_cluster_id
    )
    connect_cluster.capacity_config.memory_bytes = memory_bytes
    update_mask = field_mask_pb2.FieldMask()
    update_mask.paths.append("capacity_config.memory_bytes")
    
    # For a list of editable fields, one can check https://cloud.google.com/managed-service-for-apache-kafka/docs/connect-cluster/create-connect-cluster#properties.
    request = managedkafka_v1.UpdateConnectClusterRequest(
        update_mask=update_mask,
        connect_cluster=connect_cluster,
    )
    
    try:
        operation = connect_client.update_connect_cluster(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        operation.result()
        response = operation.result()
        print("Updated Connect cluster:", response)
    except GoogleAPICallError as e:
        print(f"The operation failed with error: {e}")
    

다음 단계

Apache Kafka®는 미국 및/또는 다른 국가에서 사용되는 Apache Software Foundation 또는 해당 계열사의 등록 상표입니다.