Google Cloud Managed Service for Apache Kafka-Cluster aktualisieren

Sie können einen Managed Service for Apache Kafka-Cluster bearbeiten, um Eigenschaften wie die Clustergröße (Anzahl der vCPUs und Arbeitsspeicher), die Liste der verbundenen Subnetze, die Konfiguration für das automatische Neuausgleichen und die mTLS-Konfiguration zu aktualisieren.

Sie können einen Cluster über die Google Cloud -Konsole, die Google Cloud CLI, die Clientbibliothek oder die Managed Kafka API bearbeiten. Sie können die Open-Source-Apache Kafka API nicht verwenden, um einen Cluster zu aktualisieren.

Für die Aktualisierung bestimmter Attribute, z. B. der vCPU-Anzahl und des Arbeitsspeichers, muss der Dienst den Cluster möglicherweise neu starten. Der Cluster wird Broker für Broker neu gestartet. Bei diesem Vorgang können Anfragen an einzelne Broker fehlschlagen. Diese Fehler sind jedoch vorübergehend. Häufig verwendete Clientbibliotheken verarbeiten diese Fehler automatisch.

Erforderliche Rollen und Berechtigungen

Bitten Sie Ihren Administrator, Ihnen die IAM-Rolle Managed Kafka Cluster Editor (roles/managedkafka.clusterEditor) für Ihr Projekt zuzuweisen, um die Berechtigungen zu erhalten, die Sie zum Aktualisieren eines Clusters benötigen. Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff auf Projekte, Ordner und Organisationen verwalten.

Diese vordefinierte Rolle enthält die Berechtigungen, die zum Aktualisieren eines Clusters erforderlich sind. Maximieren Sie den Abschnitt Erforderliche Berechtigungen, um die notwendigen Berechtigungen anzuzeigen:

Erforderliche Berechtigungen

Die folgenden Berechtigungen sind zum Aktualisieren eines Clusters erforderlich:

  • Cluster bearbeiten: managedkafka.clusters.update

Sie können diese Berechtigungen auch mit benutzerdefinierten Rollen oder anderen vordefinierten Rollen erhalten.

Größe eines Clusters anpassen

Wenn Sie die Anzahl der vCPUs oder den Arbeitsspeicher eines Clusters aktualisieren, gelten die folgenden Regeln:

  • Das allgemeine Verhältnis von vCPU zu Arbeitsspeicher des Clusters muss immer zwischen 1:1 und 1:8 liegen.

  • Wenn Sie die Ressourcen herunterskalieren, muss für jeden vorhandenen Broker mindestens 1 vCPU und 1 GiB Arbeitsspeicher vorhanden sein. Die Anzahl der Broker wird nie verringert.

  • Wenn Sie die Anzahl der Broker erhöhen, darf die durchschnittliche Anzahl von vCPUs und der durchschnittliche Arbeitsspeicher pro Broker im Vergleich zu den Durchschnittswerten vor der Aktualisierung um höchstens 10% sinken.

    Wenn Sie beispielsweise versuchen, einen Cluster von 45 vCPUs (3 Brokern) auf 48 vCPUs (4 Broker) hochzuskalieren, schlägt der Vorgang fehl. Das liegt daran, dass die durchschnittliche vCPU pro Broker von 15 auf 12 sinkt, was einer Reduzierung um 20% entspricht und damit das Limit von 10 % überschreitet.

Weitere Informationen finden Sie unter Clustergröße aktualisieren.

Cluster bearbeiten

So bearbeiten Sie einen Cluster:

Console

  1. Rufen Sie in der Google Cloud Console die Seite Cluster auf.

    Zu den Clustern

  2. Klicken Sie in der Liste der Cluster auf den Cluster, dessen Eigenschaften Sie bearbeiten möchten.

    Die Cluster-Detailseite wird angezeigt.

  3. Klicken Sie auf der Seite mit den Clusterdetails auf Bearbeiten.

  4. Bearbeiten Sie die Eigenschaften nach Bedarf. Die folgenden Eigenschaften eines Clusters können in der Konsole bearbeitet werden:

    • Arbeitsspeicher
    • vCPUs
    • Subnetz
    • Konfiguration für Neuausgleich
    • mTLS-Konfiguration
    • Labels
  5. Klicken Sie auf Speichern.

gcloud

  1. Aktivieren Sie Cloud Shell in der Google Cloud Console.

    Cloud Shell aktivieren

    Unten in der Google Cloud Console wird eine Cloud Shell-Sitzung gestartet und eine Eingabeaufforderung angezeigt. Cloud Shell ist eine Shell-Umgebung, in der das Google Cloud CLI bereits installiert ist und Werte für Ihr aktuelles Projekt bereits festgelegt sind. Das Initialisieren der Sitzung kann einige Sekunden dauern.

  2. Zum Abrufen der aktuellen Richtlinie führen Sie den Befehl gcloud managed-kafka clusters update aus:

    gcloud managed-kafka clusters update CLUSTER_ID \
        --location=LOCATION \
        --cpu=CPU \
        --memory=MEMORY \
        --subnets=SUBNETS \
        --auto-rebalance \
        --labels=LABELS
    

    Ersetzen Sie Folgendes:

    • CLUSTER_ID: Die ID oder der Name des Clusters. Sie können diesen Wert nicht aktualisieren.
    • LOCATION: Der Standort des Clusters. Sie können diesen Wert nicht aktualisieren.
    • CPU: Die Anzahl der virtuellen CPUs für den Cluster.
    • MEMORY: Die Größe des Arbeitsspeichers für den Cluster. Verwenden Sie die Einheiten „MB“, „MiB“, „GB“, „GiB“, „TB“ oder „TiB“. Beispiel: „10 GiB“.
    • SUBNETS: Die Liste der Subnetze, mit denen eine Verbindung hergestellt werden soll. Trennen Sie mehrere Subnetzwerte durch Kommas.
    • auto-rebalance: Aktiviert den automatischen Ausgleich von Themenpartitionen unter Brokern, wenn sich die Anzahl der CPUs im Cluster ändert. Diese Einstellung ist standardmäßig aktiviert.
    • LABELS: Labels, die dem Cluster zugeordnet werden sollen.

Wenn Sie das Flag --async mit Ihrem Befehl verwenden, sendet das System die Aktualisierungsanfrage und gibt sofort eine Antwort zurück, ohne auf den Abschluss des Vorgangs zu warten. Mit dem Flag --async können Sie mit anderen Aufgaben fortfahren, während die Clusteraktualisierung im Hintergrund erfolgt. Wenn Sie das Flag --async nicht verwenden, wartet das System, bis der Vorgang abgeschlossen ist, bevor eine Antwort zurückgegeben wird. Sie müssen warten, bis der Cluster vollständig aktualisiert wurde, bevor Sie mit anderen Aufgaben fortfahren können.

REST

Ersetzen Sie diese Werte in den folgenden Anfragedaten:

  • PROJECT_ID: Ihre Google Cloud Projekt-ID
  • LOCATION: Der Standort des Clusters
  • CLUSTER_ID: Die ID des Clusters
  • UPDATE_MASK: Die Felder, die aktualisiert werden sollen, als durch Kommas getrennte Liste vollständig qualifizierter Namen. Beispiel: capacityConfig.vcpuCount,capacityConfig.memoryBytes
  • CPU_COUNT: die Anzahl der vCPUs für den Cluster
  • MEMORY: Die Menge an Arbeitsspeicher für den Cluster in Byte.
  • SUBNET_ID: Subnetz-ID des Subnetzes, mit dem eine Verbindung hergestellt werden soll

HTTP-Methode und URL:

PATCH https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID?updateMask=UPDATE_MASK

JSON-Text anfordern:

{
  "capacityConfig": {
    "vcpuCount": CPU_COUNT,
    "memoryBytes": MEMORY
  },
  "gcpConfig": {
    "accessConfig": {
      "networkConfigs": [
        {
          "subnet": "projects/PROJECT_ID/regions/LOCATION/subnetworks/SUBNET_ID"
        }
      ]
    }
  }
}

Wenn Sie die Anfrage senden möchten, maximieren Sie eine der folgenden Optionen:

Sie sollten eine JSON-Antwort ähnlich wie diese erhalten:

{
  "name": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC_ID",
  "partitionCount": PARTITION_COUNT,
  "replicationFactor": REPLICATION_FACTOR
}

Fügen Sie im Anfragetext nur die Felder ein, die Sie aktualisieren, wie im Abfrageparameter UPDATE_MASK angegeben. Wenn Sie ein Subnetz hinzufügen möchten, hängen Sie einen neuen Eintrag an networkConfigs an.

Go

Bevor Sie dieses Beispiel ausprobieren, folgen Sie der Einrichtungsanleitung für Go unter Clientbibliotheken installieren. Weitere Informationen finden Sie in der Referenzdokumentation zur Managed Service for Apache Kafka Go API.

Richten Sie zur Authentifizierung bei Managed Service for Apache Kafka die Standardanmeldedaten für Anwendungen(Application Default Credentials, ADC) ein. Weitere Informationen finden Sie unter ADC für eine lokale Entwicklungsumgebung einrichten.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/option"
	"google.golang.org/protobuf/types/known/fieldmaskpb"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
)

func updateCluster(w io.Writer, projectID, region, clusterID string, memory int64, opts ...option.ClientOption) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// clusterID := "my-cluster"
	// memoryBytes := 4221225472
	ctx := context.Background()
	client, err := managedkafka.NewClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
	}
	defer client.Close()

	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
	capacityConfig := &managedkafkapb.CapacityConfig{
		MemoryBytes: memory,
	}
	cluster := &managedkafkapb.Cluster{
		Name:           clusterPath,
		CapacityConfig: capacityConfig,
	}
	paths := []string{"capacity_config.memory_bytes"}
	updateMask := &fieldmaskpb.FieldMask{
		Paths: paths,
	}

	req := &managedkafkapb.UpdateClusterRequest{
		UpdateMask: updateMask,
		Cluster:    cluster,
	}
	op, err := client.UpdateCluster(ctx, req)
	if err != nil {
		return fmt.Errorf("client.UpdateCluster got err: %w", err)
	}
	resp, err := op.Wait(ctx)
	if err != nil {
		return fmt.Errorf("op.Wait got err: %w", err)
	}
	fmt.Fprintf(w, "Updated cluster: %#v\n", resp)
	return nil
}

Java

Bevor Sie dieses Beispiel ausprobieren, folgen Sie der Anleitung zur Einrichtung von Java unter Clientbibliotheken installieren. Weitere Informationen finden Sie in der Referenzdokumentation zur Java API für Managed Service for Apache Kafka.

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Managed Service for Apache Kafka zu authentifizieren. Weitere Informationen finden Sie unter ADC für eine lokale Entwicklungsumgebung einrichten.


import com.google.api.gax.longrunning.OperationFuture;
import com.google.api.gax.longrunning.OperationSnapshot;
import com.google.api.gax.longrunning.OperationTimedPollAlgorithm;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.retrying.TimedRetryAlgorithm;
import com.google.cloud.managedkafka.v1.CapacityConfig;
import com.google.cloud.managedkafka.v1.Cluster;
import com.google.cloud.managedkafka.v1.ClusterName;
import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
import com.google.cloud.managedkafka.v1.ManagedKafkaSettings;
import com.google.cloud.managedkafka.v1.OperationMetadata;
import com.google.cloud.managedkafka.v1.UpdateClusterRequest;
import com.google.protobuf.FieldMask;
import java.time.Duration;
import java.util.concurrent.ExecutionException;

public class UpdateCluster {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String clusterId = "my-cluster";
    long memoryBytes = 25769803776L; // 24 GiB
    updateCluster(projectId, region, clusterId, memoryBytes);
  }

  public static void updateCluster(
      String projectId, String region, String clusterId, long memoryBytes) throws Exception {
    CapacityConfig capacityConfig = CapacityConfig.newBuilder().setMemoryBytes(memoryBytes).build();
    Cluster cluster =
        Cluster.newBuilder()
            .setName(ClusterName.of(projectId, region, clusterId).toString())
            .setCapacityConfig(capacityConfig)
            .build();
    FieldMask updateMask = FieldMask.newBuilder().addPaths("capacity_config.memory_bytes").build();

    // Create the settings to configure the timeout for polling operations
    ManagedKafkaSettings.Builder settingsBuilder = ManagedKafkaSettings.newBuilder();
    TimedRetryAlgorithm timedRetryAlgorithm = OperationTimedPollAlgorithm.create(
        RetrySettings.newBuilder()
            .setTotalTimeoutDuration(Duration.ofHours(1L))
            .build());
    settingsBuilder.updateClusterOperationSettings()
        .setPollingAlgorithm(timedRetryAlgorithm);

    try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create(
        settingsBuilder.build())) {
      UpdateClusterRequest request =
          UpdateClusterRequest.newBuilder().setUpdateMask(updateMask).setCluster(cluster).build();
      OperationFuture<Cluster, OperationMetadata> future =
          managedKafkaClient.updateClusterOperationCallable().futureCall(request);

      // Get the initial LRO and print details. CreateCluster contains sample code for polling logs.
      OperationSnapshot operation = future.getInitialFuture().get();
      System.out.printf("Cluster update started. Operation name: %s\nDone: %s\nMetadata: %s\n",
          operation.getName(),
          operation.isDone(),
          future.getMetadata().get().toString());

      Cluster response = future.get();
      System.out.printf("Updated cluster: %s\n", response.getName());
    } catch (ExecutionException e) {
      System.err.printf("managedKafkaClient.updateCluster got err: %s", e.getMessage());
    }
  }
}

Python

Bevor Sie dieses Beispiel ausprobieren, folgen Sie der Anleitung für die Einrichtung von Python unter Clientbibliotheken installieren. Weitere Informationen finden Sie in der Referenzdokumentation zur Python API für Managed Service for Apache Kafka.

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Managed Service for Apache Kafka zu authentifizieren. Weitere Informationen finden Sie unter ADC für eine lokale Entwicklungsumgebung einrichten.

from google.api_core.exceptions import GoogleAPICallError
from google.cloud import managedkafka_v1
from google.protobuf import field_mask_pb2

# TODO(developer)
# project_id = "my-project-id"
# region = "us-central1"
# cluster_id = "my-cluster"
# memory_bytes = 4295000000

client = managedkafka_v1.ManagedKafkaClient()

cluster = managedkafka_v1.Cluster()
cluster.name = client.cluster_path(project_id, region, cluster_id)
cluster.capacity_config.memory_bytes = memory_bytes
update_mask = field_mask_pb2.FieldMask()
update_mask.paths.append("capacity_config.memory_bytes")

# For a list of editable fields, one can check https://cloud.google.com/managed-kafka/docs/create-cluster#properties.
request = managedkafka_v1.UpdateClusterRequest(
    update_mask=update_mask,
    cluster=cluster,
)

try:
    operation = client.update_cluster(request=request)
    print(f"Waiting for operation {operation.operation.name} to complete...")
    response = operation.result()
    print("Updated cluster:", response)
except GoogleAPICallError as e:
    print(f"The operation failed with error: {e.message}")

Beschränkungen

Nachdem Sie einen Managed Service for Apache Kafka-Cluster erstellt haben, können Sie die folgenden Eigenschaften nicht mehr aktualisieren:

  • Der Clustername
  • Der Clusterstandort
  • Der Verschlüsselungstyp

Sie können den Verschlüsselungstyp zwar nicht ändern, aber Verschlüsselungsschlüssel rotieren.

Nächste Schritte

Apache Kafka® ist eine eingetragene Marke der Apache Software Foundation oder ihrer Tochtergesellschaften in den USA und/oder anderen Ländern.