Google Cloud Managed Service for Apache Kafka-Cluster aktualisieren

Sie können einen Google Cloud Managed Service for Apache Kafka-Cluster bearbeiten, um Eigenschaften wie die Anzahl der vCPUs, den Arbeitsspeicher, die Subnetze, den Verschlüsselungstyp oder die Labels zu aktualisieren. Sie können auch konfigurieren, ob der Dienst Partitionen auf Broker neu verteilt, wenn dem Cluster ein Broker hinzugefügt wird. Der Dienst erstellt automatisch neue Broker basierend auf der Speicher- und vCPU-Konfiguration des Clusters.

Sie können einen Cluster über die Google Cloud -Konsole, die Google Cloud CLI, die Clientbibliothek oder die Managed Kafka API bearbeiten. Sie können die Open-Source-Apache Kafka API nicht verwenden, um einen Cluster zu aktualisieren.

Hinweise

Wenn Sie die Anzahl der vCPUs oder den Arbeitsspeicher aktualisieren, gelten die folgenden Regeln:

  • Das allgemeine Verhältnis von vCPU zu Arbeitsspeicher des Clusters muss immer zwischen 1:1 und 1:8 liegen.

  • Wenn Sie die Ressourcen herunterskalieren, muss für jeden vorhandenen Broker mindestens 1 vCPU und 1 GiB Arbeitsspeicher vorhanden sein. Die Anzahl der Broker nimmt nie ab.

  • Wenn Sie die Anzahl der Broker erhöhen, darf die durchschnittliche Anzahl von vCPUs und der durchschnittliche Arbeitsspeicher pro Broker im Vergleich zu den Durchschnittswerten vor der Änderung um nicht mehr als 10% sinken.

    Wenn Sie beispielsweise versuchen, einen Cluster von 45 vCPUs (3 Brokern) auf 48 vCPUs (4 Broker) hochzuskalieren, schlägt der Vorgang fehl. Das liegt daran, dass die durchschnittliche vCPU pro Broker von 15 auf 12 sinkt, was einer Reduzierung um 20% entspricht und damit das Limit von 10 % überschreitet.

Weitere Informationen finden Sie unter Clustergröße aktualisieren.

Für die Aktualisierung bestimmter Attribute, z. B. der Anzahl der vCPUs und des Arbeitsspeichers, muss der Dienst den Cluster möglicherweise neu starten. Cluster werden Broker für Broker neu gestartet. Dies führt zu vorübergehenden Fehlern bei Anfragen an einzelne Broker, die jedoch nur von kurzer Dauer sind. Häufig verwendete Clientbibliotheken behandeln solche Fehler automatisch.

Sie können den Clusternamen, den Clusterstandort oder den Verschlüsselungstyp nicht bearbeiten.

Erforderliche Rollen und Berechtigungen zum Bearbeiten eines Clusters

Bitten Sie Ihren Administrator, Ihnen die IAM-Rolle Managed Kafka Cluster Editor (roles/managedkafka.clusterEditor) für Ihr Projekt zuzuweisen, um die Berechtigungen zu erhalten, die Sie zum Aktualisieren eines Clusters benötigen. Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff auf Projekte, Ordner und Organisationen verwalten.

Diese vordefinierte Rolle enthält die Berechtigungen, die zum Aktualisieren eines Clusters erforderlich sind. Maximieren Sie den Abschnitt Erforderliche Berechtigungen, um die notwendigen Berechtigungen anzuzeigen:

Erforderliche Berechtigungen

Die folgenden Berechtigungen sind zum Aktualisieren eines Clusters erforderlich:

  • Cluster bearbeiten: managedkafka.clusters.update

Sie können diese Berechtigungen auch mit benutzerdefinierten Rollen oder anderen vordefinierten Rollen erhalten.

Mit der Rolle „Managed Kafka Cluster Editor“ können Sie keine Themen und Nutzergruppen in Managed Service for Apache Kafka-Clustern erstellen, löschen oder ändern. Außerdem ist damit kein Zugriff auf die Datenebene möglich, um Nachrichten in Clustern zu veröffentlichen oder zu nutzen. Weitere Informationen zu dieser Rolle finden Sie unter Vordefinierte Rollen für Managed Service for Apache Kafka.

Cluster bearbeiten

So bearbeiten Sie einen Cluster:

Console

  1. Rufen Sie in der Google Cloud Console die Seite Cluster auf.

    Zu den Clustern

  2. Klicken Sie in der Liste der Cluster auf den Cluster, dessen Eigenschaften Sie bearbeiten möchten.

    Die Seite mit den Clusterdetails wird angezeigt.

  3. Klicken Sie auf der Seite mit den Clusterdetails auf Bearbeiten.

  4. Bearbeiten Sie die Eigenschaften nach Bedarf. Die folgenden Eigenschaften eines Clusters können in der Konsole bearbeitet werden:

    • Arbeitsspeicher
    • vCPUs
    • Subnetz
    • Konfiguration für Neuausgleich
    • mTLS-Konfiguration
    • Labels
  5. Klicken Sie auf Speichern.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Zum Abrufen der aktuellen Richtlinie führen Sie den Befehl gcloud managed-kafka clusters update aus:

    gcloud managed-kafka clusters update CLUSTER_ID \
        --location=LOCATION \
        --cpu=CPU \
        --memory=MEMORY \
        --subnets=SUBNETS \
        --auto-rebalance \
        --labels=LABELS
    

    Ersetzen Sie Folgendes:

    • CLUSTER_ID: Die ID oder der Name des Clusters. Sie können diesen Wert nicht aktualisieren.
    • LOCATION: Der Standort des Clusters. Sie können diesen Wert nicht aktualisieren.
    • CPU: Die Anzahl der virtuellen CPUs für den Cluster.
    • MEMORY: Die Größe des Arbeitsspeichers für den Cluster. Verwenden Sie die Einheiten „MB“, „MiB“, „GB“, „GiB“, „TB“ oder „TiB“. Beispiel: „10 GiB“.
    • SUBNETS: Die Liste der Subnetze, mit denen eine Verbindung hergestellt werden soll. Trennen Sie mehrere Subnetzwerte durch Kommas.
    • auto-rebalance: Aktiviert den automatischen Ausgleich von Themenpartitionen unter Brokern, wenn sich die Anzahl der CPUs im Cluster ändert. Diese Einstellung ist standardmäßig aktiviert.
    • LABELS: Labels, die dem Cluster zugeordnet werden sollen.
  3. Wenn Sie das Flag --async mit Ihrem Befehl verwenden, sendet das System die Aktualisierungsanfrage und gibt sofort eine Antwort zurück, ohne auf den Abschluss des Vorgangs zu warten. Mit dem Flag --async können Sie mit anderen Aufgaben fortfahren, während die Clusteraktualisierung im Hintergrund erfolgt. Wenn Sie das Flag --async nicht verwenden, wartet das System, bis der Vorgang abgeschlossen ist, bevor eine Antwort zurückgegeben wird. Sie müssen warten, bis der Cluster vollständig aktualisiert wurde, bevor Sie mit anderen Aufgaben fortfahren können.

    REST

    Ersetzen Sie diese Werte in den folgenden Anfragedaten:

    • PROJECT_ID: Ihre Google Cloud Projekt-ID
    • LOCATION: Der Standort des Clusters
    • CLUSTER_ID: Die ID des Clusters
    • UPDATE_MASK: Die Felder, die aktualisiert werden sollen, als durch Kommas getrennte Liste vollständig qualifizierter Namen. Beispiel: capacityConfig.vcpuCount,capacityConfig.memoryBytes
    • CPU_COUNT: die Anzahl der vCPUs für den Cluster
    • MEMORY: Die Menge an Arbeitsspeicher für den Cluster in Byte.
    • SUBNET_ID: Subnetz-ID des Subnetzes, mit dem eine Verbindung hergestellt werden soll

    HTTP-Methode und URL:

    PATCH https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID?updateMask=UPDATE_MASK

    JSON-Text anfordern:

    {
      "capacityConfig": {
        "vcpuCount": CPU_COUNT,
        "memoryBytes": MEMORY
      },
      "gcpConfig": {
        "accessConfig": {
          "networkConfigs": [
            {
              "subnet": "projects/PROJECT_ID/regions/LOCATION/subnetworks/SUBNET_ID"
            }
          ]
        }
      }
    }
    

    Wenn Sie die Anfrage senden möchten, maximieren Sie eine der folgenden Optionen:

    Sie sollten eine JSON-Antwort ähnlich wie diese erhalten:

    {
      "name": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC_ID",
      "partitionCount": PARTITION_COUNT,
      "replicationFactor": REPLICATION_FACTOR
    }
    

    Fügen Sie im Anfragetext nur die Felder ein, die Sie aktualisieren, wie im Abfrageparameter UPDATE_MASK angegeben. Wenn Sie ein Subnetz hinzufügen möchten, hängen Sie einen neuen Eintrag an networkConfigs an.

    Go

    Bevor Sie dieses Beispiel ausprobieren, folgen Sie der Einrichtungsanleitung für Go unter Clientbibliotheken installieren. Weitere Informationen finden Sie in der Referenzdokumentation zur Go API für Managed Service for Apache Kafka.

    Richten Sie zur Authentifizierung bei Managed Service for Apache Kafka die Standardanmeldedaten für Anwendungen(Application Default Credentials, ADC) ein. Weitere Informationen finden Sie unter ADC für eine lokale Entwicklungsumgebung einrichten.

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    	"google.golang.org/protobuf/types/known/fieldmaskpb"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func updateCluster(w io.Writer, projectID, region, clusterID string, memory int64, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// clusterID := "my-cluster"
    	// memoryBytes := 4221225472
    	ctx := context.Background()
    	client, err := managedkafka.NewClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
    	}
    	defer client.Close()
    
    	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
    	capacityConfig := &managedkafkapb.CapacityConfig{
    		MemoryBytes: memory,
    	}
    	cluster := &managedkafkapb.Cluster{
    		Name:           clusterPath,
    		CapacityConfig: capacityConfig,
    	}
    	paths := []string{"capacity_config.memory_bytes"}
    	updateMask := &fieldmaskpb.FieldMask{
    		Paths: paths,
    	}
    
    	req := &managedkafkapb.UpdateClusterRequest{
    		UpdateMask: updateMask,
    		Cluster:    cluster,
    	}
    	op, err := client.UpdateCluster(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.UpdateCluster got err: %w", err)
    	}
    	resp, err := op.Wait(ctx)
    	if err != nil {
    		return fmt.Errorf("op.Wait got err: %w", err)
    	}
    	fmt.Fprintf(w, "Updated cluster: %#v\n", resp)
    	return nil
    }
    

    Java

    Bevor Sie dieses Beispiel ausprobieren, folgen Sie der Einrichtungsanleitung für Java unter Clientbibliotheken installieren. Weitere Informationen finden Sie in der Referenzdokumentation zur Java API für Managed Service for Apache Kafka.

    Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Managed Service for Apache Kafka zu authentifizieren. Weitere Informationen finden Sie unter ADC für eine lokale Entwicklungsumgebung einrichten.

    
    import com.google.api.gax.longrunning.OperationFuture;
    import com.google.api.gax.longrunning.OperationSnapshot;
    import com.google.api.gax.longrunning.OperationTimedPollAlgorithm;
    import com.google.api.gax.retrying.RetrySettings;
    import com.google.api.gax.retrying.TimedRetryAlgorithm;
    import com.google.cloud.managedkafka.v1.CapacityConfig;
    import com.google.cloud.managedkafka.v1.Cluster;
    import com.google.cloud.managedkafka.v1.ClusterName;
    import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
    import com.google.cloud.managedkafka.v1.ManagedKafkaSettings;
    import com.google.cloud.managedkafka.v1.OperationMetadata;
    import com.google.cloud.managedkafka.v1.UpdateClusterRequest;
    import com.google.protobuf.FieldMask;
    import java.time.Duration;
    import java.util.concurrent.ExecutionException;
    
    public class UpdateCluster {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-cluster";
        long memoryBytes = 25769803776L; // 24 GiB
        updateCluster(projectId, region, clusterId, memoryBytes);
      }
    
      public static void updateCluster(
          String projectId, String region, String clusterId, long memoryBytes) throws Exception {
        CapacityConfig capacityConfig = CapacityConfig.newBuilder().setMemoryBytes(memoryBytes).build();
        Cluster cluster =
            Cluster.newBuilder()
                .setName(ClusterName.of(projectId, region, clusterId).toString())
                .setCapacityConfig(capacityConfig)
                .build();
        FieldMask updateMask = FieldMask.newBuilder().addPaths("capacity_config.memory_bytes").build();
    
        // Create the settings to configure the timeout for polling operations
        ManagedKafkaSettings.Builder settingsBuilder = ManagedKafkaSettings.newBuilder();
        TimedRetryAlgorithm timedRetryAlgorithm = OperationTimedPollAlgorithm.create(
            RetrySettings.newBuilder()
                .setTotalTimeoutDuration(Duration.ofHours(1L))
                .build());
        settingsBuilder.updateClusterOperationSettings()
            .setPollingAlgorithm(timedRetryAlgorithm);
    
        try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create(
            settingsBuilder.build())) {
          UpdateClusterRequest request =
              UpdateClusterRequest.newBuilder().setUpdateMask(updateMask).setCluster(cluster).build();
          OperationFuture<Cluster, OperationMetadata> future =
              managedKafkaClient.updateClusterOperationCallable().futureCall(request);
    
          // Get the initial LRO and print details. CreateCluster contains sample code for polling logs.
          OperationSnapshot operation = future.getInitialFuture().get();
          System.out.printf("Cluster update started. Operation name: %s\nDone: %s\nMetadata: %s\n",
              operation.getName(),
              operation.isDone(),
              future.getMetadata().get().toString());
    
          Cluster response = future.get();
          System.out.printf("Updated cluster: %s\n", response.getName());
        } catch (ExecutionException e) {
          System.err.printf("managedKafkaClient.updateCluster got err: %s", e.getMessage());
        }
      }
    }
    

    Python

    Bevor Sie dieses Beispiel ausprobieren, folgen Sie der Anleitung für die Einrichtung von Python unter Clientbibliotheken installieren. Weitere Informationen finden Sie in der Referenzdokumentation zur Python API für Managed Service for Apache Kafka.

    Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Managed Service for Apache Kafka zu authentifizieren. Weitere Informationen finden Sie unter ADC für eine lokale Entwicklungsumgebung einrichten.

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud import managedkafka_v1
    from google.protobuf import field_mask_pb2
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # cluster_id = "my-cluster"
    # memory_bytes = 4295000000
    
    client = managedkafka_v1.ManagedKafkaClient()
    
    cluster = managedkafka_v1.Cluster()
    cluster.name = client.cluster_path(project_id, region, cluster_id)
    cluster.capacity_config.memory_bytes = memory_bytes
    update_mask = field_mask_pb2.FieldMask()
    update_mask.paths.append("capacity_config.memory_bytes")
    
    # For a list of editable fields, one can check https://cloud.google.com/managed-kafka/docs/create-cluster#properties.
    request = managedkafka_v1.UpdateClusterRequest(
        update_mask=update_mask,
        cluster=cluster,
    )
    
    try:
        operation = client.update_cluster(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        response = operation.result()
        print("Updated cluster:", response)
    except GoogleAPICallError as e:
        print(f"The operation failed with error: {e.message}")
    

Nächste Schritte

Apache Kafka® ist eine eingetragene Marke der Apache Software Foundation oder deren Tochtergesellschaften in den USA und/oder anderen Ländern.