Google Cloud Managed Service for Apache Kafka-Cluster aktualisieren

Sie können einen Managed Service for Apache Kafka-Cluster bearbeiten, um Attribute wie die Clustergröße (Anzahl der vCPUs und Arbeitsspeicher), die Liste der verbundenen Subnetze, die Konfiguration für den automatischen Neuausgleich und die mTLS-Konfiguration zu aktualisieren.

Zum Bearbeiten eines Clusters können Sie die Google Cloud Console, die Google Cloud CLI, die Clientbibliothek oder die Managed Kafka API verwenden. Sie können die Open-Source-Apache Kafka API nicht verwenden, um einen Cluster zu aktualisieren.

Für die Aktualisierung bestimmter Attribute wie die Anzahl der vCPUs und der Arbeitsspeicher muss der Dienst den Cluster möglicherweise neu starten. Der Cluster wird jeweils Broker für Broker neu gestartet. Während dieses Vorgangs können Anfragen an einzelne Broker fehlschlagen, diese Fehler sind jedoch vorübergehend. Häufig verwendete Clientbibliotheken verarbeiten diese Fehler automatisch.

Erforderliche Rollen und Berechtigungen

Bitten Sie Ihren Administrator, Ihnen die IAM-Rolle „Managed Kafka Cluster Editor (roles/managedkafka.clusterEditor)“ für das Projekt zuzuweisen, um die Berechtigungen zu erhalten, die Sie zum Aktualisieren eines Clusters benötigen. Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff auf Projekte, Ordner und Organisationen verwalten.

Diese vordefinierte Rolle enthält die Berechtigungen, die zum Aktualisieren eines Clusters erforderlich sind. Maximieren Sie den Abschnitt Erforderliche Berechtigungen , um die notwendigen Berechtigungen anzuzeigen, die erforderlich sind:

Erforderliche Berechtigungen

Die folgenden Berechtigungen sind erforderlich, um einen Cluster zu aktualisieren:

  • Cluster bearbeiten: managedkafka.clusters.update

Sie können diese Berechtigungen auch mit benutzerdefinierten Rollen oder anderen vordefinierten Rollen erhalten.

Mit der Rolle „Managed Kafka Cluster Editor“ können Sie keine Themen und Consumer-Gruppen in Managed Service for Apache Kafka-Clustern erstellen, löschen oder ändern. Außerdem ist kein Zugriff auf die Datenebene möglich, um Nachrichten in Clustern zu veröffentlichen oder zu nutzen. Weitere Informationen zu dieser Rolle finden Sie unter Vordefinierte Rollen für Managed Service for Apache Kafka.

Größe eines Clusters anpassen

Wenn Sie die Anzahl der vCPUs oder den Arbeitsspeicher eines Clusters aktualisieren, gelten die folgenden Regeln:

  • Das Verhältnis von vCPUs zu Arbeitsspeicher des Clusters muss immer zwischen 1:1 und 1:8 liegen.

  • Wenn Sie die Größe verringern, muss für jeden vorhandenen Broker mindestens 1 vCPU und 1 GiB Arbeitsspeicher vorhanden sein. Die Anzahl der Broker wird nie verringert.

  • Wenn Sie die Größe erhöhen und die Änderung dazu führt, dass neue Broker hinzugefügt werden, darf die durchschnittliche Anzahl von vCPUs und der durchschnittliche Arbeitsspeicher pro Broker im Vergleich zu den Durchschnittswerten vor der Aktualisierung um nicht mehr als 10% sinken.

    Wenn Sie beispielsweise versuchen, einen Cluster von 45 vCPUs (3 Broker) auf 48 vCPUs (4 Broker) zu erhöhen, schlägt der Vorgang fehl. Das liegt daran, dass die durchschnittliche Anzahl von vCPUs pro Broker von 15 auf 12 sinkt, was einer Reduzierung um 20% entspricht und damit das Limit von 10 % überschreitet.

Weitere Informationen finden Sie unter Clustergröße aktualisieren.

Cluster bearbeiten

So bearbeiten Sie einen Cluster:

Console

  1. Rufen Sie in der Google Cloud Console die Seite Cluster auf.

    Zu den Clustern

  2. Klicken Sie in der Liste der Cluster auf den Cluster, dessen Attribute Sie bearbeiten möchten.

    Die Seite mit den Clusterdetails wird angezeigt.

  3. Klicken Sie auf der Seite mit den Clusterdetails auf Bearbeiten.

  4. Bearbeiten Sie die Attribute nach Bedarf. Die folgenden Attribute eines Clusters können über die Console bearbeitet werden:

    • Arbeitsspeicher
    • vCPUs
    • Subnetz
    • Konfiguration für Neuausgleich
    • mTLS-Konfiguration
    • Labels
  5. Klicken Sie auf Speichern.

gcloud

  1. Aktivieren Sie Cloud Shell in der Google Cloud Console.

    Cloud Shell aktivieren

    Unten in der Google Cloud Console wird eine Cloud Shell Sitzung gestartet und eine Eingabeaufforderung angezeigt. Cloud Shell ist eine Shell-Umgebung in der das Google Cloud CLI bereits installiert ist und Werte für Ihr aktuelles Projekt bereits festgelegt sind. Das Initialisieren der Sitzung kann einige Sekunden dauern.

  2. Führen Sie den gcloud managed-kafka clusters update Befehl aus:

    gcloud managed-kafka clusters update CLUSTER_ID \
        --location=LOCATION \
        --cpu=CPU \
        --memory=MEMORY \
        --subnets=SUBNETS \
        --auto-rebalance \
        --labels=LABELS
    

    Ersetzen Sie Folgendes:

    • CLUSTER_ID: Die ID oder der Name des Clusters. Dieser Wert kann nicht aktualisiert werden.
    • LOCATION: Der Standort des Clusters. Dieser Wert kann nicht aktualisiert werden.
    • CPU: Die Anzahl der virtuellen CPUs für den Cluster.
    • MEMORY: Die Größe des Arbeitsspeichers für den Cluster. Verwenden Sie die Einheiten „MB“, „MiB“, „GB“, „GiB“, „TB“ oder „TiB“. Beispiel: „10GiB“.
    • SUBNETS: Die Liste der Subnetze, mit denen eine Verbindung hergestellt werden soll. Trennen Sie mehrere Subnetzwerte durch Kommas.
    • auto-rebalance: Aktiviert den automatischen Neuausgleich von Themenpartitionen unter Brokern, wenn sich die Anzahl der CPUs im Cluster ändert. Diese Option ist standardmäßig aktiviert.
    • LABELS: Labels, die dem Cluster zugeordnet werden sollen.

Wenn Sie das Flag --async mit Ihrem Befehl verwenden, sendet das System die Aktualisierungsanfrage und gibt sofort eine Antwort zurück, ohne auf den Abschluss des Vorgangs zu warten. Mit dem Flag --async können Sie mit anderen Aufgaben fortfahren, während die Clusteraktualisierung im Hintergrund erfolgt. Wenn Sie das Flag --async nicht verwenden, wartet das System, bis der Vorgang abgeschlossen ist, bevor es eine Antwort zurückgibt. Sie müssen warten, bis der Cluster vollständig aktualisiert wurde, bevor Sie mit anderen Aufgaben fortfahren können.

REST

Ersetzen Sie diese Werte in den folgenden Anfragedaten:

  • PROJECT_ID: Ihre Google Cloud Projekt-ID
  • LOCATION: Der Standort des Clusters
  • CLUSTER_ID: Die ID des Clusters
  • UPDATE_MASK: Die Felder, die aktualisiert werden sollen, als durch Kommas getrennte Liste voll qualifizierter Namen. Beispiel: capacityConfig.vcpuCount,capacityConfig.memoryBytes
  • CPU_COUNT: Die Anzahl der vCPUs für den Cluster
  • MEMORY: Die Größe des Arbeitsspeichers für den Cluster in Byte
  • SUBNET_ID: Die Subnetz-ID des Subnetzes, mit dem eine Verbindung hergestellt werden soll

HTTP-Methode und URL:

PATCH https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID?updateMask=UPDATE_MASK

JSON-Text anfordern:

{
  "capacityConfig": {
    "vcpuCount": CPU_COUNT,
    "memoryBytes": MEMORY
  },
  "gcpConfig": {
    "accessConfig": {
      "networkConfigs": [
        {
          "subnet": "projects/PROJECT_ID/regions/LOCATION/subnetworks/SUBNET_ID"
        }
      ]
    }
  }
}

Wenn Sie die Anfrage senden möchten, maximieren Sie eine der folgenden Optionen:

Sie sollten eine JSON-Antwort ähnlich wie diese erhalten:

{
  "name": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC_ID",
  "partitionCount": PARTITION_COUNT,
  "replicationFactor": REPLICATION_FACTOR
}

Nehmen Sie in den Anfragetext nur die Felder auf, die Sie aktualisieren, wie im UPDATE_MASK Abfrageparameter angegeben. Wenn Sie ein Subnetz hinzufügen möchten, fügen Sie networkConfigs einen neuen Eintrag hinzu.

Go

Folgen Sie der Einrichtungsanleitung für Go unter Clientbibliotheken installieren, bevor Sie dieses Beispiel anwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Managed Service for Apache Kafka Go API.

Richten Sie zur Authentifizierung bei Managed Service for Apache Kafka die Standardanmeldedaten für Anwendungen(Application Default Credentials, ADC) ein. Weitere Informationen finden Sie unter ADC für eine lokale Entwicklungsumgebung einrichten.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/option"
	"google.golang.org/protobuf/types/known/fieldmaskpb"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
)

func updateCluster(w io.Writer, projectID, region, clusterID string, memory int64, opts ...option.ClientOption) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// clusterID := "my-cluster"
	// memoryBytes := 4221225472
	ctx := context.Background()
	client, err := managedkafka.NewClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
	}
	defer client.Close()

	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
	capacityConfig := &managedkafkapb.CapacityConfig{
		MemoryBytes: memory,
	}
	cluster := &managedkafkapb.Cluster{
		Name:           clusterPath,
		CapacityConfig: capacityConfig,
	}
	paths := []string{"capacity_config.memory_bytes"}
	updateMask := &fieldmaskpb.FieldMask{
		Paths: paths,
	}

	req := &managedkafkapb.UpdateClusterRequest{
		UpdateMask: updateMask,
		Cluster:    cluster,
	}
	op, err := client.UpdateCluster(ctx, req)
	if err != nil {
		return fmt.Errorf("client.UpdateCluster got err: %w", err)
	}
	resp, err := op.Wait(ctx)
	if err != nil {
		return fmt.Errorf("op.Wait got err: %w", err)
	}
	fmt.Fprintf(w, "Updated cluster: %#v\n", resp)
	return nil
}

Java

Folgen Sie der Einrichtungsanleitung für Java unter Clientbibliotheken installieren, bevor Sie dieses Beispiel anwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Managed Service for Apache Kafka Java API.

Richten Sie zur Authentifizierung bei Managed Service for Apache Kafka die Standardanmeldedaten für Anwendungen ein. Weitere Informationen finden Sie unter ADC für eine lokale Entwicklungsumgebung einrichten.


import com.google.api.gax.longrunning.OperationFuture;
import com.google.api.gax.longrunning.OperationSnapshot;
import com.google.api.gax.longrunning.OperationTimedPollAlgorithm;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.retrying.TimedRetryAlgorithm;
import com.google.cloud.managedkafka.v1.CapacityConfig;
import com.google.cloud.managedkafka.v1.Cluster;
import com.google.cloud.managedkafka.v1.ClusterName;
import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
import com.google.cloud.managedkafka.v1.ManagedKafkaSettings;
import com.google.cloud.managedkafka.v1.OperationMetadata;
import com.google.cloud.managedkafka.v1.UpdateClusterRequest;
import com.google.protobuf.FieldMask;
import java.time.Duration;
import java.util.concurrent.ExecutionException;

public class UpdateCluster {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String clusterId = "my-cluster";
    long memoryBytes = 25769803776L; // 24 GiB
    updateCluster(projectId, region, clusterId, memoryBytes);
  }

  public static void updateCluster(
      String projectId, String region, String clusterId, long memoryBytes) throws Exception {
    CapacityConfig capacityConfig = CapacityConfig.newBuilder().setMemoryBytes(memoryBytes).build();
    Cluster cluster =
        Cluster.newBuilder()
            .setName(ClusterName.of(projectId, region, clusterId).toString())
            .setCapacityConfig(capacityConfig)
            .build();
    FieldMask updateMask = FieldMask.newBuilder().addPaths("capacity_config.memory_bytes").build();

    // Create the settings to configure the timeout for polling operations
    ManagedKafkaSettings.Builder settingsBuilder = ManagedKafkaSettings.newBuilder();
    TimedRetryAlgorithm timedRetryAlgorithm = OperationTimedPollAlgorithm.create(
        RetrySettings.newBuilder()
            .setTotalTimeoutDuration(Duration.ofHours(1L))
            .build());
    settingsBuilder.updateClusterOperationSettings()
        .setPollingAlgorithm(timedRetryAlgorithm);

    try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create(
        settingsBuilder.build())) {
      UpdateClusterRequest request =
          UpdateClusterRequest.newBuilder().setUpdateMask(updateMask).setCluster(cluster).build();
      OperationFuture<Cluster, OperationMetadata> future =
          managedKafkaClient.updateClusterOperationCallable().futureCall(request);

      // Get the initial LRO and print details. CreateCluster contains sample code for polling logs.
      OperationSnapshot operation = future.getInitialFuture().get();
      System.out.printf("Cluster update started. Operation name: %s\nDone: %s\nMetadata: %s\n",
          operation.getName(),
          operation.isDone(),
          future.getMetadata().get().toString());

      Cluster response = future.get();
      System.out.printf("Updated cluster: %s\n", response.getName());
    } catch (ExecutionException e) {
      System.err.printf("managedKafkaClient.updateCluster got err: %s", e.getMessage());
    }
  }
}

Python

Folgen Sie der Einrichtungsanleitung für Python unter Clientbibliotheken installieren, bevor Sie dieses Beispiel anwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Managed Service for Apache Kafka Python API.

Richten Sie zur Authentifizierung bei Managed Service for Apache Kafka die Standardanmeldedaten für Anwendungen ein. Weitere Informationen finden Sie unter ADC für eine lokale Entwicklungsumgebung einrichten.

from google.api_core.exceptions import GoogleAPICallError
from google.cloud import managedkafka_v1
from google.protobuf import field_mask_pb2

# TODO(developer)
# project_id = "my-project-id"
# region = "us-central1"
# cluster_id = "my-cluster"
# memory_bytes = 4295000000

client = managedkafka_v1.ManagedKafkaClient()

cluster = managedkafka_v1.Cluster()
cluster.name = client.cluster_path(project_id, region, cluster_id)
cluster.capacity_config.memory_bytes = memory_bytes
update_mask = field_mask_pb2.FieldMask()
update_mask.paths.append("capacity_config.memory_bytes")

# For a list of editable fields, one can check https://cloud.google.com/managed-kafka/docs/create-cluster#properties.
request = managedkafka_v1.UpdateClusterRequest(
    update_mask=update_mask,
    cluster=cluster,
)

try:
    operation = client.update_cluster(request=request)
    print(f"Waiting for operation {operation.operation.name} to complete...")
    response = operation.result()
    print("Updated cluster:", response)
except GoogleAPICallError as e:
    print(f"The operation failed with error: {e.message}")

Beschränkungen

Nachdem Sie einen Managed Service for Apache Kafka-Cluster erstellt haben, können Sie die folgenden Attribute nicht mehr aktualisieren:

  • Der Clustername
  • Der Clusterstandort
  • Der Verschlüsselungstyp

Sie können den Verschlüsselungstyp nicht ändern, aber Sie können Verschlüsselungsschlüssel rotieren.

Nächste Schritte

Apache Kafka® ist eine eingetragene Marke der Apache Software Foundation oder ihrer Tochtergesellschaften in den USA und/oder anderen Ländern.