Connect-Cluster aktualisieren

Sie können einen Connect-Cluster bearbeiten, um Eigenschaften wie die Anzahl der vCPUs, den Arbeitsspeicher, das Netzwerk und die Labels zu aktualisieren.

Wenn Sie einen Connect-Cluster bearbeiten möchten, können Sie die Google Cloud Console, die gcloud CLI, die Clientbibliothek oder die Managed Kafka API verwenden. Sie können die Open-Source-Apache Kafka API nicht verwenden, um einen Connect-Cluster zu aktualisieren.

Hinweise

Nicht alle Eigenschaften eines Connect-Clusters können bearbeitet werden. Sehen Sie sich die Eigenschaften eines Connect-Clusters an, bevor Sie ihn aktualisieren.

Erforderliche Rollen und Berechtigungen zum Bearbeiten eines Connect-Clusters

Bitten Sie Ihren Administrator, Ihnen die IAM-Rolle Managed Kafka Connect Cluster Editor (roles/managedkafka.connectClusterEditor) für Ihr Projekt zuzuweisen, um die Berechtigungen zu erhalten, die Sie zum Bearbeiten eines Connect-Clusters benötigen. Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff auf Projekte, Ordner und Organisationen verwalten.

Diese vordefinierte Rolle enthält die Berechtigungen, die zum Bearbeiten eines Connect-Clusters erforderlich sind. Maximieren Sie den Abschnitt Erforderliche Berechtigungen, um die notwendigen Berechtigungen anzuzeigen:

Erforderliche Berechtigungen

Die folgenden Berechtigungen sind zum Bearbeiten eines Connect-Clusters erforderlich:

  • Erteilen Sie dem Update eine Connect-Clusterberechtigung für den angegebenen Standort: managedkafka.connectClusters.update
  • Erteilen Sie der Ansicht eine Connect-Clusterberechtigung für den angegebenen Standort. Diese Berechtigung ist nur erforderlich, um einen Connect-Cluster über die Google Cloud Console zu aktualisieren: managedkafka.connectors.list

Sie können diese Berechtigungen auch mit benutzerdefinierten Rollen oder anderen vordefinierten Rollen erhalten.

Weitere Informationen zu dieser Rolle finden Sie unter Vordefinierte Rollen für Managed Service for Apache Kafka.

Connect-Cluster bearbeiten

Für die Aktualisierung bestimmter Attribute wie CPU und Arbeitsspeicher ist ein Neustart des Clusters erforderlich.

Bei Clusterneustarts bleiben Daten erhalten, die Latenz kann sich jedoch erhöhen. Die anfängliche Anzahl der Worker im Cluster bestimmt die Dauer des Neustarts.

Sie können die folgenden Connect-Cluster-Properties aktualisieren:

Attribut Bearbeitung möglich
vCPUs Ja
Arbeitsspeicher Ja
Netzwerk Ja
Worker-Subnetz Ja
Auflösbare DNS-Domains Ja (hinzufügen/löschen)
Name des Connect-Clusters Nein
Kafka-Cluster Nein
Standort Nein
Labels Ja (Hinzufügen/Bearbeiten/Löschen)
Secrets Ja (hinzufügen/löschen)

Console

  1. Rufen Sie in der Google Cloud Console die Seite Connect Clusters auf.

    Zu „Cluster verbinden“

  2. Klicken Sie auf den Connect-Cluster, den Sie aktualisieren möchten.

    Die Seite Clusterdetails verbinden wird angezeigt.

  3. Klicken Sie auf Bearbeiten.

    Die Seite Kafka Connect-Cluster bearbeiten wird angezeigt.

  4. Nehmen Sie die erforderlichen Änderungen an den bearbeitbaren Attributen vor.

  5. Klicken Sie auf Speichern.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Führen Sie den Befehl gcloud managed-kafka connect-clusters update aus:

    gcloud managed-kafka connect-clusters update CONNECT_CLUSTER_ID \
        --location=LOCATION \
        [--cpu=CPU --memory=MEMORY
         | --clear-dns-names \
         | --dns-name=DNS_NAME --clear-labels \
         | --labels=LABELS --clear-secrets \
         | --secret=SECRET [--primary-subnet=WORKER_SUBNET \
        [--async]
    

    Ersetzen Sie Folgendes:

    • CONNECT_CLUSTER_ID: Die ID oder der Name des Connect-Clusters. Der Name eines Connect-Clusters ist unveränderlich.
    • LOCATION: Der Standort des Connect-Clusters. Der Standort eines Connect-Clusters kann nicht geändert werden.
    • CPU: Die Anzahl der vCPUs für den Connect-Cluster. Der Mindestwert beträgt 3 vCPUs.
    • MEMORY: Die Größe des Arbeitsspeichers für den Connect-Cluster. Verwenden Sie die Einheiten „MB“, „MiB“, „GB“, „GiB“, „TB“ oder „TiB“. Beispiel: „10GiB“ Sie müssen zwischen 1 GiB und 8 GiB pro vCPU bereitstellen.

    • DNS_NAME: DNS-Domainname aus dem Netzwerk des Subnetzes, der für den Connect-Cluster sichtbar gemacht werden soll.
    • LABELS: (Optional) Labels, die dem Cluster zugeordnet werden sollen. Weitere Informationen zum Format von Labels finden Sie unter Labels. Liste der hinzuzufügenden KEY=VALUE-Labelpaare. Schlüssel müssen mit einem Kleinbuchstaben beginnen und dürfen nur Bindestriche (-), Unterstriche (_), Kleinbuchstaben und Zahlen enthalten. Werte dürfen nur Bindestriche (-), Unterstriche (_), Kleinbuchstaben und Zahlen enthalten.
    • SECRET: (Optional) Secrets, die in Worker geladen werden sollen. Es müssen genaue Secret-Versionen aus Secret Manager angegeben werden. Aliase werden nicht unterstützt. In einen Cluster können bis zu 32 Secrets geladen werden. Format: projects/PROJECT_ID/secrets/SECRET_NAME/versions/VERSION_ID
    • WORKER_SUBNET: Das Worker-Subnetz für den Connect-Cluster. Das Worker-Subnetz muss sich in derselben Region wie der Connect-Cluster befinden.

      Das Format des Subnetzes ist projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_ID.

  3. Go

    Bevor Sie dieses Beispiel ausprobieren, folgen Sie der Einrichtungsanleitung für Go unter Clientbibliotheken installieren. Weitere Informationen finden Sie in der Referenzdokumentation zur Go API für Managed Service for Apache Kafka.

    Richten Sie zur Authentifizierung bei Managed Service for Apache Kafka die Standardanmeldedaten für Anwendungen(Application Default Credentials, ADC) ein. Weitere Informationen finden Sie unter ADC für eine lokale Entwicklungsumgebung einrichten.

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    	"google.golang.org/protobuf/types/known/fieldmaskpb"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func updateConnectCluster(w io.Writer, projectID, region, clusterID string, memoryBytes int64, labels map[string]string, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// clusterID := "my-connect-cluster"
    	// memoryBytes := 25769803776 // 24 GiB in bytes
    	// labels := map[string]string{"environment": "production"}
    	ctx := context.Background()
    	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
    	}
    	defer client.Close()
    
    	clusterPath := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s", projectID, region, clusterID)
    
    	// Capacity configuration update
    	capacityConfig := &managedkafkapb.CapacityConfig{
    		MemoryBytes: memoryBytes,
    	}
    
    	connectCluster := &managedkafkapb.ConnectCluster{
    		Name:           clusterPath,
    		CapacityConfig: capacityConfig,
    		Labels:         labels,
    	}
    	paths := []string{"capacity_config.memory_bytes", "labels"}
    	updateMask := &fieldmaskpb.FieldMask{
    		Paths: paths,
    	}
    
    	req := &managedkafkapb.UpdateConnectClusterRequest{
    		UpdateMask:     updateMask,
    		ConnectCluster: connectCluster,
    	}
    	op, err := client.UpdateConnectCluster(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.UpdateConnectCluster got err: %w", err)
    	}
    	resp, err := op.Wait(ctx)
    	if err != nil {
    		return fmt.Errorf("op.Wait got err: %w", err)
    	}
    	fmt.Fprintf(w, "Updated connect cluster: %#v\n", resp)
    	return nil
    }
    

    Java

    Bevor Sie dieses Beispiel ausprobieren, folgen Sie der Einrichtungsanleitung für Java unter Clientbibliotheken installieren. Weitere Informationen finden Sie in der Referenzdokumentation zur Java API für Managed Service for Apache Kafka.

    Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Managed Service for Apache Kafka zu authentifizieren. Weitere Informationen finden Sie unter ADC für eine lokale Entwicklungsumgebung einrichten.

    
    import com.google.api.gax.longrunning.OperationFuture;
    import com.google.api.gax.longrunning.OperationSnapshot;
    import com.google.api.gax.longrunning.OperationTimedPollAlgorithm;
    import com.google.api.gax.retrying.RetrySettings;
    import com.google.api.gax.retrying.TimedRetryAlgorithm;
    import com.google.cloud.managedkafka.v1.CapacityConfig;
    import com.google.cloud.managedkafka.v1.ConnectCluster;
    import com.google.cloud.managedkafka.v1.ConnectClusterName;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectSettings;
    import com.google.cloud.managedkafka.v1.OperationMetadata;
    import com.google.cloud.managedkafka.v1.UpdateConnectClusterRequest;
    import com.google.protobuf.FieldMask;
    import java.time.Duration;
    import java.util.concurrent.ExecutionException;
    
    public class UpdateConnectCluster {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-connect-cluster";
        long memoryBytes = 25769803776L; // 24 GiB
        updateConnectCluster(projectId, region, clusterId, memoryBytes);
      }
    
      public static void updateConnectCluster(
          String projectId, String region, String clusterId, long memoryBytes) throws Exception {
        CapacityConfig capacityConfig = CapacityConfig.newBuilder().setMemoryBytes(memoryBytes).build();
        ConnectCluster connectCluster = ConnectCluster.newBuilder()
            .setName(ConnectClusterName.of(projectId, region, clusterId).toString())
            .setCapacityConfig(capacityConfig)
            .build();
        FieldMask updateMask = FieldMask.newBuilder().addPaths("capacity_config.memory_bytes").build();
    
        // Create the settings to configure the timeout for polling operations
        ManagedKafkaConnectSettings.Builder settingsBuilder = ManagedKafkaConnectSettings.newBuilder();
        TimedRetryAlgorithm timedRetryAlgorithm = OperationTimedPollAlgorithm.create(
            RetrySettings.newBuilder()
                .setTotalTimeoutDuration(Duration.ofHours(1L))
                .build());
        settingsBuilder.updateConnectClusterOperationSettings()
            .setPollingAlgorithm(timedRetryAlgorithm);
    
        try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create(
            settingsBuilder.build())) {
          UpdateConnectClusterRequest request = UpdateConnectClusterRequest.newBuilder()
              .setUpdateMask(updateMask)
              .setConnectCluster(connectCluster).build();
          OperationFuture<ConnectCluster, OperationMetadata> future = managedKafkaConnectClient
              .updateConnectClusterOperationCallable().futureCall(request);
    
          // Get the initial LRO and print details. CreateConnectCluster contains sample
          // code for polling logs.
          OperationSnapshot operation = future.getInitialFuture().get();
          System.out.printf(
              "Connect cluster update started. Operation name: %s\nDone: %s\nMetadata: %s\n",
              operation.getName(),
              operation.isDone(),
              future.getMetadata().get().toString());
    
          ConnectCluster response = future.get();
          System.out.printf("Updated connect cluster: %s\n", response.getName());
        } catch (ExecutionException e) {
          System.err.printf("managedKafkaConnectClient.updateConnectCluster got err: %s\n", 
              e.getMessage());
        }
      }
    }
    

    Python

    Bevor Sie dieses Beispiel ausprobieren, folgen Sie der Anleitung für die Einrichtung von Python unter Clientbibliotheken installieren. Weitere Informationen finden Sie in der Referenzdokumentation zur Python API für Managed Service for Apache Kafka.

    Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Managed Service for Apache Kafka zu authentifizieren. Weitere Informationen finden Sie unter ADC für eine lokale Entwicklungsumgebung einrichten.

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud import managedkafka_v1
    from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
        ManagedKafkaConnectClient,
    )
    from google.cloud.managedkafka_v1.types import ConnectCluster
    from google.protobuf import field_mask_pb2
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # connect_cluster_id = "my-connect-cluster"
    # memory_bytes = 4295000000
    
    connect_client = ManagedKafkaConnectClient()
    
    connect_cluster = ConnectCluster()
    connect_cluster.name = connect_client.connect_cluster_path(
        project_id, region, connect_cluster_id
    )
    connect_cluster.capacity_config.memory_bytes = memory_bytes
    update_mask = field_mask_pb2.FieldMask()
    update_mask.paths.append("capacity_config.memory_bytes")
    
    # For a list of editable fields, one can check https://cloud.google.com/managed-service-for-apache-kafka/docs/connect-cluster/create-connect-cluster#properties.
    request = managedkafka_v1.UpdateConnectClusterRequest(
        update_mask=update_mask,
        connect_cluster=connect_cluster,
    )
    
    try:
        operation = connect_client.update_connect_cluster(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        operation.result()
        response = operation.result()
        print("Updated Connect cluster:", response)
    except GoogleAPICallError as e:
        print(f"The operation failed with error: {e}")
    

Nächste Schritte

Apache Kafka® ist eine eingetragene Marke der Apache Software Foundation oder deren Tochtergesellschaften in den USA und/oder anderen Ländern.