Aggiorna un cluster Google Cloud Managed Service per Apache Kafka

Puoi modificare un cluster Google Cloud Managed Service per Apache Kafka per aggiornare proprietà come il numero di vCPU, memoria, subnet, tipo di crittografia o etichette. Puoi anche configurare se il servizio ribilancia le partizioni tra i broker quando un broker viene aggiunto al cluster. Il servizio crea automaticamente nuovi broker in base alla configurazione di memoria e vCPU del cluster.

Per modificare un cluster, puoi utilizzare la console Google Cloud , Google Cloud CLI, la libreria client o l'API Managed Kafka. Non puoi utilizzare l'API Apache Kafka open source per aggiornare un cluster.

Prima di iniziare

Se aggiorni il numero di vCPU o la memoria, si applicano le seguenti regole:

  • Il rapporto complessivo tra vCPU e memoria del cluster deve sempre rimanere compreso tra 1:1 e 1:8.

  • Se esegui lo scale down, devono essere presenti almeno 1 vCPU e 1 GiB di memoria per ogni broker esistente. Il numero di broker non diminuisce mai.

  • Se esegui l'upgrade e la modifica comporta l'aggiunta di nuovi broker, la vCPU media e la memoria per broker non possono diminuire di oltre il 10% rispetto alle medie prima dell'aggiornamento.

    Ad esempio, se provi ad aumentare le dimensioni di un cluster da 45 vCPU (3 broker) a 48 vCPU (4 broker), l'operazione non va a buon fine. Questo perché la vCPU media per broker diminuisce da 15 a 12, con una riduzione del 20%, che supera il limite del 10%.

Per saperne di più, consulta Aggiornare le dimensioni del cluster.

L'aggiornamento di alcune proprietà, come il conteggio di vCPU e la memoria, potrebbe richiedere il riavvio del cluster da parte del servizio. I cluster vengono riavviati un broker alla volta. Ciò comporta errori temporanei delle richieste ai singoli broker, ma questi errori sono temporanei. Le librerie client di uso comune gestiscono automaticamente questi errori.

Non puoi modificare il nome del cluster, la posizione del cluster o il tipo di crittografia.

Ruoli e autorizzazioni richiesti per modificare un cluster

Per ottenere le autorizzazioni necessarie per aggiornare un cluster, chiedi all'amministratore di concederti il ruolo IAM Managed Kafka Cluster Editor (roles/managedkafka.clusterEditor) nel progetto. Per saperne di più sulla concessione dei ruoli, consulta Gestisci l'accesso a progetti, cartelle e organizzazioni.

Questo ruolo predefinito contiene le autorizzazioni necessarie per aggiornare un cluster. Per vedere quali sono esattamente le autorizzazioni richieste, espandi la sezione Autorizzazioni obbligatorie:

Autorizzazioni obbligatorie

Per aggiornare un cluster sono necessarie le seguenti autorizzazioni:

  • Modifica un cluster: managedkafka.clusters.update

Potresti anche ottenere queste autorizzazioni con ruoli personalizzati o altri ruoli predefiniti.

Il ruolo Editor cluster Kafka gestito non consente di creare, eliminare o modificare argomenti e gruppi di consumatori nei cluster Managed Service per Apache Kafka. Né consente l'accesso al piano dati per pubblicare o utilizzare messaggi all'interno dei cluster. Per saperne di più su questo ruolo, consulta Ruoli predefiniti di Managed Service per Apache Kafka.

Modifica di un cluster

Per modificare un cluster:

Console

  1. Nella console Google Cloud , vai alla pagina Cluster.

    Vai a Cluster

  2. Nell'elenco dei cluster, fai clic su quello di cui vuoi modificare le proprietà.

    Viene visualizzata la pagina dei dettagli del cluster.

  3. Nella pagina dei dettagli del cluster, fai clic su Modifica.

  4. Modifica le proprietà in base alle esigenze. Le seguenti proprietà di un cluster sono modificabili dalla console:

    • Memoria
    • vCPU
    • Subnet
    • Configurazione del ribilanciamento
    • Configurazione mTLS
    • Etichette
  5. Fai clic su Salva.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Esegui il comando gcloud managed-kafka clusters update:

    gcloud managed-kafka clusters update CLUSTER_ID \
        --location=LOCATION \
        --cpu=CPU \
        --memory=MEMORY \
        --subnets=SUBNETS \
        --auto-rebalance \
        --labels=LABELS
    

    Sostituisci quanto segue:

    • CLUSTER_ID: l'ID o il nome del cluster. Non puoi aggiornare questo valore.
    • LOCATION: la posizione del cluster. Non puoi aggiornare questo valore.
    • CPU: Il numero di CPU virtuali per il cluster.
    • MEMORY: La quantità di memoria per il cluster. Utilizza le unità "MB", "MiB", "GB", "GiB", "TB" o "TiB". Ad esempio, "10 GiB".
    • SUBNETS: l'elenco delle subnet a cui connettersi. Utilizza le virgole per separare più valori di subnet.
    • auto-rebalance: consente il ribilanciamento automatico delle partizioni degli argomenti tra i broker quando cambia il numero di CPU nel cluster. Questa opzione è attiva per impostazione predefinita.
    • LABELS: le etichette da associare al cluster.
  3. Se utilizzi il flag --async con il comando, il sistema invia la richiesta di aggiornamento e restituisce immediatamente una risposta, senza attendere il completamento dell'operazione. Con il flag --async, puoi continuare con altre attività mentre l'aggiornamento del cluster avviene in background. Se non utilizzi il flag --async, il sistema attende il completamento dell'operazione prima di restituire una risposta. Devi attendere il completamento dell'aggiornamento del cluster prima di poter continuare con altre attività.

    REST

    Prima di utilizzare i dati della richiesta, apporta le sostituzioni seguenti:

    • PROJECT_ID: il tuo Google Cloud ID progetto
    • LOCATION: la posizione del cluster
    • CLUSTER_ID: l'ID del cluster
    • UPDATE_MASK: quali campi aggiornare, come un elenco separato da virgole di nomi completi. Esempio: capacityConfig.vcpuCount,capacityConfig.memoryBytes
    • CPU_COUNT: il numero di vCPU per il cluster
    • MEMORY: la quantità di memoria per il cluster, in byte
    • SUBNET_ID: l'ID subnet della subnet a cui connettersi

    Metodo HTTP e URL:

    PATCH https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID?updateMask=UPDATE_MASK

    Corpo JSON della richiesta:

    {
      "capacityConfig": {
        "vcpuCount": CPU_COUNT,
        "memoryBytes": MEMORY
      },
      "gcpConfig": {
        "accessConfig": {
          "networkConfigs": [
            {
              "subnet": "projects/PROJECT_ID/regions/LOCATION/subnetworks/SUBNET_ID"
            }
          ]
        }
      }
    }
    

    Per inviare la richiesta, espandi una di queste opzioni:

    Dovresti ricevere una risposta JSON simile alla seguente:

    {
      "name": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC_ID",
      "partitionCount": PARTITION_COUNT,
      "replicationFactor": REPLICATION_FACTOR
    }
    

    Nel corpo della richiesta, includi solo i campi che stai aggiornando, come specificato nel parametro di query UPDATE_MASK. Per aggiungere una subnet, aggiungi una nuova voce a networkConfigs.

    Go

    Prima di provare questo esempio, segui le istruzioni di configurazione di Go in Installare le librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Go di Managed Service per Apache Kafka.

    Per eseguire l'autenticazione in Managed Service per Apache Kafka, configura le Credenziali predefinite dell'applicazione(ADC). Per saperne di più, vedi Configura ADC per un ambiente di sviluppo locale.

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    	"google.golang.org/protobuf/types/known/fieldmaskpb"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func updateCluster(w io.Writer, projectID, region, clusterID string, memory int64, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// clusterID := "my-cluster"
    	// memoryBytes := 4221225472
    	ctx := context.Background()
    	client, err := managedkafka.NewClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
    	}
    	defer client.Close()
    
    	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
    	capacityConfig := &managedkafkapb.CapacityConfig{
    		MemoryBytes: memory,
    	}
    	cluster := &managedkafkapb.Cluster{
    		Name:           clusterPath,
    		CapacityConfig: capacityConfig,
    	}
    	paths := []string{"capacity_config.memory_bytes"}
    	updateMask := &fieldmaskpb.FieldMask{
    		Paths: paths,
    	}
    
    	req := &managedkafkapb.UpdateClusterRequest{
    		UpdateMask: updateMask,
    		Cluster:    cluster,
    	}
    	op, err := client.UpdateCluster(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.UpdateCluster got err: %w", err)
    	}
    	resp, err := op.Wait(ctx)
    	if err != nil {
    		return fmt.Errorf("op.Wait got err: %w", err)
    	}
    	fmt.Fprintf(w, "Updated cluster: %#v\n", resp)
    	return nil
    }
    

    Java

    Prima di provare questo esempio, segui le istruzioni di configurazione di Java in Installare le librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Java di Managed Service per Apache Kafka.

    Per eseguire l'autenticazione in Managed Service per Apache Kafka, configura le Credenziali predefinite dell'applicazione. Per saperne di più, vedi Configura ADC per un ambiente di sviluppo locale.

    
    import com.google.api.gax.longrunning.OperationFuture;
    import com.google.api.gax.longrunning.OperationSnapshot;
    import com.google.api.gax.longrunning.OperationTimedPollAlgorithm;
    import com.google.api.gax.retrying.RetrySettings;
    import com.google.api.gax.retrying.TimedRetryAlgorithm;
    import com.google.cloud.managedkafka.v1.CapacityConfig;
    import com.google.cloud.managedkafka.v1.Cluster;
    import com.google.cloud.managedkafka.v1.ClusterName;
    import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
    import com.google.cloud.managedkafka.v1.ManagedKafkaSettings;
    import com.google.cloud.managedkafka.v1.OperationMetadata;
    import com.google.cloud.managedkafka.v1.UpdateClusterRequest;
    import com.google.protobuf.FieldMask;
    import java.time.Duration;
    import java.util.concurrent.ExecutionException;
    
    public class UpdateCluster {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-cluster";
        long memoryBytes = 25769803776L; // 24 GiB
        updateCluster(projectId, region, clusterId, memoryBytes);
      }
    
      public static void updateCluster(
          String projectId, String region, String clusterId, long memoryBytes) throws Exception {
        CapacityConfig capacityConfig = CapacityConfig.newBuilder().setMemoryBytes(memoryBytes).build();
        Cluster cluster =
            Cluster.newBuilder()
                .setName(ClusterName.of(projectId, region, clusterId).toString())
                .setCapacityConfig(capacityConfig)
                .build();
        FieldMask updateMask = FieldMask.newBuilder().addPaths("capacity_config.memory_bytes").build();
    
        // Create the settings to configure the timeout for polling operations
        ManagedKafkaSettings.Builder settingsBuilder = ManagedKafkaSettings.newBuilder();
        TimedRetryAlgorithm timedRetryAlgorithm = OperationTimedPollAlgorithm.create(
            RetrySettings.newBuilder()
                .setTotalTimeoutDuration(Duration.ofHours(1L))
                .build());
        settingsBuilder.updateClusterOperationSettings()
            .setPollingAlgorithm(timedRetryAlgorithm);
    
        try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create(
            settingsBuilder.build())) {
          UpdateClusterRequest request =
              UpdateClusterRequest.newBuilder().setUpdateMask(updateMask).setCluster(cluster).build();
          OperationFuture<Cluster, OperationMetadata> future =
              managedKafkaClient.updateClusterOperationCallable().futureCall(request);
    
          // Get the initial LRO and print details. CreateCluster contains sample code for polling logs.
          OperationSnapshot operation = future.getInitialFuture().get();
          System.out.printf("Cluster update started. Operation name: %s\nDone: %s\nMetadata: %s\n",
              operation.getName(),
              operation.isDone(),
              future.getMetadata().get().toString());
    
          Cluster response = future.get();
          System.out.printf("Updated cluster: %s\n", response.getName());
        } catch (ExecutionException e) {
          System.err.printf("managedKafkaClient.updateCluster got err: %s", e.getMessage());
        }
      }
    }
    

    Python

    Prima di provare questo esempio, segui le istruzioni di configurazione di Python in Installare le librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Python di Managed Service per Apache Kafka.

    Per eseguire l'autenticazione in Managed Service per Apache Kafka, configura le Credenziali predefinite dell'applicazione. Per saperne di più, vedi Configura ADC per un ambiente di sviluppo locale.

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud import managedkafka_v1
    from google.protobuf import field_mask_pb2
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # cluster_id = "my-cluster"
    # memory_bytes = 4295000000
    
    client = managedkafka_v1.ManagedKafkaClient()
    
    cluster = managedkafka_v1.Cluster()
    cluster.name = client.cluster_path(project_id, region, cluster_id)
    cluster.capacity_config.memory_bytes = memory_bytes
    update_mask = field_mask_pb2.FieldMask()
    update_mask.paths.append("capacity_config.memory_bytes")
    
    # For a list of editable fields, one can check https://cloud.google.com/managed-kafka/docs/create-cluster#properties.
    request = managedkafka_v1.UpdateClusterRequest(
        update_mask=update_mask,
        cluster=cluster,
    )
    
    try:
        operation = client.update_cluster(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        response = operation.result()
        print("Updated cluster:", response)
    except GoogleAPICallError as e:
        print(f"The operation failed with error: {e.message}")
    

Passaggi successivi

Apache Kafka® è un marchio registrato di Apache Software Foundation o delle sue affiliate negli Stati Uniti e/o in altri paesi.