Aggiorna un cluster di connessione

Puoi modificare un cluster di connessione per aggiornare proprietà come il numero di vCPU, la memoria, la rete e le etichette.

Per modificare un cluster Connect, puoi utilizzare la console Google Cloud , gcloud CLI, la libreria client o l'API Managed Kafka. Non puoi utilizzare l'API Apache Kafka open source per aggiornare un cluster di connessione.

Prima di iniziare

Non tutte le proprietà di un cluster di connessione sono modificabili. Esamina le proprietà di un cluster Connect prima di eseguire l'aggiornamento.

Ruoli e autorizzazioni richiesti per modificare un cluster di connessione

Per ottenere le autorizzazioni necessarie per modificare un cluster Connect, chiedi all'amministratore di concederti il ruolo IAM Editor cluster Kafka Connect gestito (roles/managedkafka.connectClusterEditor) sul tuo progetto. Per saperne di più sulla concessione dei ruoli, consulta Gestisci l'accesso a progetti, cartelle e organizzazioni.

Questo ruolo predefinito contiene le autorizzazioni necessarie per modificare un cluster Connect. Per vedere quali sono esattamente le autorizzazioni richieste, espandi la sezione Autorizzazioni obbligatorie:

Autorizzazioni obbligatorie

Per modificare un cluster Connect sono necessarie le seguenti autorizzazioni:

  • Concedi l'autorizzazione per aggiornare un cluster Connect nella posizione specificata: managedkafka.connectClusters.update
  • Concedi l'autorizzazione Visualizza un cluster di connessione nella posizione specificata. Questa autorizzazione è necessaria solo per aggiornare un cluster Connect utilizzando la console Google Cloud : managedkafka.connectors.list

Potresti anche ottenere queste autorizzazioni con ruoli personalizzati o altri ruoli predefiniti.

Per saperne di più su questo ruolo, consulta Ruoli predefiniti di Managed Service per Apache Kafka.

Modifica un cluster di connessione

L'aggiornamento di determinate proprietà, come CPU e memoria, richiede il riavvio del cluster.

I riavvii del cluster conservano i dati, ma potrebbero aumentare la latenza. Il numero iniziale di worker nel cluster determina la durata del riavvio.

Puoi aggiornare le seguenti proprietà del cluster di connessione:

Proprietà Modificabile
vCPU
Memoria
Rete
Subnet worker
Domini DNS risolvibili Sì (aggiungi/elimina)
Nome cluster di connessione No
Cluster Kafka No
Località No
Etichette Sì (aggiungi/modifica/elimina)
Secret Sì (aggiungi/elimina)

Console

  1. Nella console Google Cloud , vai alla pagina Connetti cluster.

    Vai a Connetti cluster

  2. Fai clic sul cluster Connect che vuoi aggiornare.

    Viene visualizzata la pagina Dettagli cluster di connessione.

  3. Fai clic su Modifica.

    Viene visualizzata la pagina Modifica cluster di connessione Kafka.

  4. Apporta le modifiche necessarie alle proprietà modificabili.

  5. Fai clic su Salva.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Esegui il comando gcloud managed-kafka connect-clusters update:

    gcloud managed-kafka connect-clusters update CONNECT_CLUSTER_ID \
        --location=LOCATION \
        [--cpu=CPU --memory=MEMORY
         | --clear-dns-names \
         | --dns-name=DNS_NAME --clear-labels \
         | --labels=LABELS --clear-secrets \
         | --secret=SECRET [--primary-subnet=WORKER_SUBNET \
        [--async]
    

    Sostituisci quanto segue:

    • CONNECT_CLUSTER_ID: l'ID o il nome del cluster Connect. Il nome di un cluster di connessione è immutabile.
    • LOCATION: la posizione del cluster Connect. La località di un cluster di connessione è immutabile.
    • CPU: Il numero di vCPU per il cluster Connect. Il valore minimo è 3 vCPU.
    • MEMORY: La quantità di memoria per il cluster Connect. Utilizza le unità "MB", "MiB", "GB", "GiB", "TB" o "TiB". Ad esempio, "10 GiB". Devi eseguire il provisioning tra 1 GiB e 8 GiB per vCPU.

    • DNS_NAME: il nome di dominio DNS dalla rete della subnet da rendere visibile al cluster di connessione.
    • (Facoltativo) LABELS: etichette da associare al cluster. Per saperne di più sul formato delle etichette, consulta Etichette. Elenco di coppie KEY=VALUE di etichette da aggiungere. Le chiavi devono iniziare con un carattere minuscolo e contenere solo trattini (-), trattini bassi (_), caratteri minuscoli e numeri. I valori devono contenere solo trattini (-), trattini bassi (_), caratteri minuscoli e numeri.
    • SECRET: (facoltativo) secret da caricare nei worker. Devono essere fornite le versioni esatte dei secret di Secret Manager, gli alias non sono supportati. È possibile caricare fino a 32 secret in un cluster. Formato: projects/PROJECT_ID/secrets/SECRET_NAME/versions/VERSION_ID
    • WORKER_SUBNET: la subnet worker per il cluster Connect. La subnet worker deve trovarsi nella stessa regione del cluster di connessione.

      Il formato della subnet è projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_ID.

  3. Go

    Prima di provare questo esempio, segui le istruzioni di configurazione di Go in Installare le librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Go di Managed Service per Apache Kafka.

    Per eseguire l'autenticazione in Managed Service per Apache Kafka, configura le Credenziali predefinite dell'applicazione(ADC). Per saperne di più, vedi Configura ADC per un ambiente di sviluppo locale.

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    	"google.golang.org/protobuf/types/known/fieldmaskpb"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func updateConnectCluster(w io.Writer, projectID, region, clusterID string, memoryBytes int64, labels map[string]string, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// clusterID := "my-connect-cluster"
    	// memoryBytes := 25769803776 // 24 GiB in bytes
    	// labels := map[string]string{"environment": "production"}
    	ctx := context.Background()
    	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
    	}
    	defer client.Close()
    
    	clusterPath := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s", projectID, region, clusterID)
    
    	// Capacity configuration update
    	capacityConfig := &managedkafkapb.CapacityConfig{
    		MemoryBytes: memoryBytes,
    	}
    
    	connectCluster := &managedkafkapb.ConnectCluster{
    		Name:           clusterPath,
    		CapacityConfig: capacityConfig,
    		Labels:         labels,
    	}
    	paths := []string{"capacity_config.memory_bytes", "labels"}
    	updateMask := &fieldmaskpb.FieldMask{
    		Paths: paths,
    	}
    
    	req := &managedkafkapb.UpdateConnectClusterRequest{
    		UpdateMask:     updateMask,
    		ConnectCluster: connectCluster,
    	}
    	op, err := client.UpdateConnectCluster(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.UpdateConnectCluster got err: %w", err)
    	}
    	resp, err := op.Wait(ctx)
    	if err != nil {
    		return fmt.Errorf("op.Wait got err: %w", err)
    	}
    	fmt.Fprintf(w, "Updated connect cluster: %#v\n", resp)
    	return nil
    }
    

    Java

    Prima di provare questo esempio, segui le istruzioni di configurazione di Java in Installare le librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Java di Managed Service per Apache Kafka.

    Per eseguire l'autenticazione in Managed Service per Apache Kafka, configura le Credenziali predefinite dell'applicazione. Per saperne di più, vedi Configura ADC per un ambiente di sviluppo locale.

    
    import com.google.api.gax.longrunning.OperationFuture;
    import com.google.api.gax.longrunning.OperationSnapshot;
    import com.google.api.gax.longrunning.OperationTimedPollAlgorithm;
    import com.google.api.gax.retrying.RetrySettings;
    import com.google.api.gax.retrying.TimedRetryAlgorithm;
    import com.google.cloud.managedkafka.v1.CapacityConfig;
    import com.google.cloud.managedkafka.v1.ConnectCluster;
    import com.google.cloud.managedkafka.v1.ConnectClusterName;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectSettings;
    import com.google.cloud.managedkafka.v1.OperationMetadata;
    import com.google.cloud.managedkafka.v1.UpdateConnectClusterRequest;
    import com.google.protobuf.FieldMask;
    import java.time.Duration;
    import java.util.concurrent.ExecutionException;
    
    public class UpdateConnectCluster {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-connect-cluster";
        long memoryBytes = 25769803776L; // 24 GiB
        updateConnectCluster(projectId, region, clusterId, memoryBytes);
      }
    
      public static void updateConnectCluster(
          String projectId, String region, String clusterId, long memoryBytes) throws Exception {
        CapacityConfig capacityConfig = CapacityConfig.newBuilder().setMemoryBytes(memoryBytes).build();
        ConnectCluster connectCluster = ConnectCluster.newBuilder()
            .setName(ConnectClusterName.of(projectId, region, clusterId).toString())
            .setCapacityConfig(capacityConfig)
            .build();
        FieldMask updateMask = FieldMask.newBuilder().addPaths("capacity_config.memory_bytes").build();
    
        // Create the settings to configure the timeout for polling operations
        ManagedKafkaConnectSettings.Builder settingsBuilder = ManagedKafkaConnectSettings.newBuilder();
        TimedRetryAlgorithm timedRetryAlgorithm = OperationTimedPollAlgorithm.create(
            RetrySettings.newBuilder()
                .setTotalTimeoutDuration(Duration.ofHours(1L))
                .build());
        settingsBuilder.updateConnectClusterOperationSettings()
            .setPollingAlgorithm(timedRetryAlgorithm);
    
        try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create(
            settingsBuilder.build())) {
          UpdateConnectClusterRequest request = UpdateConnectClusterRequest.newBuilder()
              .setUpdateMask(updateMask)
              .setConnectCluster(connectCluster).build();
          OperationFuture<ConnectCluster, OperationMetadata> future = managedKafkaConnectClient
              .updateConnectClusterOperationCallable().futureCall(request);
    
          // Get the initial LRO and print details. CreateConnectCluster contains sample
          // code for polling logs.
          OperationSnapshot operation = future.getInitialFuture().get();
          System.out.printf(
              "Connect cluster update started. Operation name: %s\nDone: %s\nMetadata: %s\n",
              operation.getName(),
              operation.isDone(),
              future.getMetadata().get().toString());
    
          ConnectCluster response = future.get();
          System.out.printf("Updated connect cluster: %s\n", response.getName());
        } catch (ExecutionException e) {
          System.err.printf("managedKafkaConnectClient.updateConnectCluster got err: %s\n", 
              e.getMessage());
        }
      }
    }
    

    Python

    Prima di provare questo esempio, segui le istruzioni di configurazione di Python in Installare le librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Python di Managed Service per Apache Kafka.

    Per eseguire l'autenticazione in Managed Service per Apache Kafka, configura le Credenziali predefinite dell'applicazione. Per saperne di più, vedi Configura ADC per un ambiente di sviluppo locale.

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud import managedkafka_v1
    from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
        ManagedKafkaConnectClient,
    )
    from google.cloud.managedkafka_v1.types import ConnectCluster
    from google.protobuf import field_mask_pb2
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # connect_cluster_id = "my-connect-cluster"
    # memory_bytes = 4295000000
    
    connect_client = ManagedKafkaConnectClient()
    
    connect_cluster = ConnectCluster()
    connect_cluster.name = connect_client.connect_cluster_path(
        project_id, region, connect_cluster_id
    )
    connect_cluster.capacity_config.memory_bytes = memory_bytes
    update_mask = field_mask_pb2.FieldMask()
    update_mask.paths.append("capacity_config.memory_bytes")
    
    # For a list of editable fields, one can check https://cloud.google.com/managed-service-for-apache-kafka/docs/connect-cluster/create-connect-cluster#properties.
    request = managedkafka_v1.UpdateConnectClusterRequest(
        update_mask=update_mask,
        connect_cluster=connect_cluster,
    )
    
    try:
        operation = connect_client.update_connect_cluster(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        operation.result()
        response = operation.result()
        print("Updated Connect cluster:", response)
    except GoogleAPICallError as e:
        print(f"The operation failed with error: {e}")
    

Passaggi successivi

Apache Kafka® è un marchio registrato di Apache Software Foundation o delle sue affiliate negli Stati Uniti e/o in altri paesi.