Connector aktualisieren

Sie können einen Connector bearbeiten, um seine Konfiguration zu aktualisieren. Dazu können Sie beispielsweise die Themen ändern, aus denen er Daten liest oder in die er Daten schreibt, Datentransformationen anpassen oder Einstellungen für die Fehlerbehandlung ändern.

Wenn Sie einen Connector in einem Connect-Cluster aktualisieren möchten, können Sie die Google Cloud Console, die gcloud CLI, die Managed Service for Apache Kafka-Clientbibliothek oder die Managed Kafka API verwenden. Sie können die Open-Source-Apache Kafka API nicht verwenden, um die Connectors zu aktualisieren.

Hinweise

Bevor Sie einen Connector aktualisieren, sollten Sie die vorhandene Konfiguration prüfen und sich über die möglichen Auswirkungen von Änderungen informieren.

Erforderliche Rollen und Berechtigungen zum Aktualisieren eines Connectors

Bitten Sie Ihren Administrator, Ihnen die IAM-Rolle Managed Kafka Connector Editor (roles/managedkafka.connectorEditor) für das Projekt mit dem Connect-Cluster zuzuweisen, um die Berechtigungen zu erhalten, die Sie zum Bearbeiten eines Connectors benötigen. Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff auf Projekte, Ordner und Organisationen verwalten.

Diese vordefinierte Rolle enthält die Berechtigungen, die zum Bearbeiten eines Connectors erforderlich sind. Maximieren Sie den Abschnitt Erforderliche Berechtigungen, um die notwendigen Berechtigungen anzuzeigen:

Erforderliche Berechtigungen

Die folgenden Berechtigungen sind zum Bearbeiten eines Connectors erforderlich:

  • Gewähren Sie die Berechtigung zum Aktualisieren des Connectors für den übergeordneten Connect-Cluster: managedkafka.connectors.update
  • Erteilen Sie die Berechtigung „Connectors auflisten“ für den übergeordneten Connect-Cluster: This permission is only required for updating a connector using the Google Cloud console

Sie können diese Berechtigungen auch mit benutzerdefinierten Rollen oder anderen vordefinierten Rollen erhalten.

Weitere Informationen zur Rolle „Managed Kafka Connector Editor“ finden Sie unter Vordefinierte Rollen für Google Cloud Managed Service for Apache Kafka.

Bearbeitbare Attribute eines Connectors

Welche Eigenschaften eines Connectors bearbeitet werden können, hängt vom Typ des Connectors ab. Hier finden Sie eine Zusammenfassung der bearbeitbaren Attribute für die unterstützten Connector-Typen:

MirrorMaker 2.0-Quellconnector

  • Kommagetrennte Themennamen oder ein Themen-Regex: Die zu replizierenden Themen.

    Weitere Informationen zum Attribut finden Sie unter Themennamen.

  • Konfiguration: Zusätzliche Konfigurationseinstellungen für den Connector.

    Weitere Informationen zum Attribut finden Sie unter Konfiguration.

  • Richtlinie für Task-Neustart: Die Richtlinie für den Neustart fehlgeschlagener Connector-Tasks.

    Weitere Informationen zur Property finden Sie unter Richtlinie zum Neustarten von Aufgaben.

BigQuery-Senken-Connector

  • Themen: Die Kafka-Themen, aus denen Daten gestreamt werden sollen.

    Weitere Informationen zum Attribut finden Sie unter Themen.

  • Dataset: Das BigQuery-Dataset, in dem die Daten gespeichert werden sollen.

    Weitere Informationen zum Attribut finden Sie unter Dataset.

  • Konfiguration: Zusätzliche Konfigurationseinstellungen für den Connector.

    Weitere Informationen zum Attribut finden Sie unter Konfiguration.

  • Richtlinie für Task-Neustart: Die Richtlinie für den Neustart fehlgeschlagener Connector-Tasks.

    Weitere Informationen zur Eigenschaft finden Sie unter Richtlinie zum Neustarten von Aufgaben.

Cloud Storage-Senken-Connector

  • Themen: Die Kafka-Themen, aus denen Daten gestreamt werden sollen.

    Weitere Informationen zum Attribut finden Sie unter Themen.

  • Cloud Storage-Bucket: Der Cloud Storage-Bucket, in dem die Daten gespeichert werden.

    Weitere Informationen zur Property finden Sie unter Bucket.

  • Konfiguration: Zusätzliche Konfigurationseinstellungen für den Connector.

    Weitere Informationen zum Attribut finden Sie unter Konfiguration.

  • Richtlinie für Task-Neustart: Die Richtlinie für den Neustart fehlgeschlagener Connector-Tasks.

    Weitere Informationen zur Property finden Sie unter Richtlinie zum Neustarten von Aufgaben.

Pub/Sub-Quell-Connector

  • Pub/Sub-Abo: Das Pub/Sub-Abo, aus dem Nachrichten empfangen werden sollen.
  • Kafka-Thema: Das Kafka-Thema, in das Nachrichten gestreamt werden sollen.
  • Konfiguration: Zusätzliche Konfigurationseinstellungen für den Connector. Weitere Informationen finden Sie unter Connector konfigurieren.
  • Richtlinie für Task-Neustart: Die Richtlinie für den Neustart fehlgeschlagener Connector-Tasks. Weitere Informationen finden Sie unter Richtlinie zum Neustart von Aufgaben.

Pub/Sub-Senken-Connector

  • Themen: Die Kafka-Themen, aus denen Nachrichten gestreamt werden sollen.

    Weitere Informationen zum Attribut

  • Pub/Sub-Thema: Das Pub/Sub-Thema, an das Nachrichten gesendet werden sollen.

    Weitere Informationen zur Property finden Sie unter Pub/Sub-Thema.

  • Konfiguration: Zusätzliche Konfigurationseinstellungen für den Connector.

    Weitere Informationen zum Attribut finden Sie unter Konfiguration.

  • Richtlinie für Task-Neustart: Die Richtlinie für den Neustart fehlgeschlagener Connector-Tasks.

    Weitere Informationen zur Property finden Sie unter Richtlinie zum Neustarten von Aufgaben.

Connector aktualisieren

Das Aktualisieren eines Connectors kann zu einer vorübergehenden Unterbrechung des Datenflusses führen, während die Änderungen angewendet werden.

Console

  1. Rufen Sie in der Google Cloud Console die Seite Connect Clusters auf.

    Zu „Cluster verbinden“

  2. Klicken Sie auf den Connect-Cluster, in dem sich der Connector befindet, den Sie aktualisieren möchten.

    Die Seite Clusterdetails verbinden wird angezeigt.

  3. Suchen Sie auf dem Tab Ressourcen in der Liste nach dem Connector und klicken Sie auf seinen Namen.

    Sie werden zur Seite Connector-Details weitergeleitet.

  4. Klicken Sie auf Bearbeiten.

  5. Aktualisieren Sie die erforderlichen Attribute für den Connector. Die verfügbaren Eigenschaften variieren je nach Connectortyp.

  6. Klicken Sie auf Speichern.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Verwenden Sie den Befehl gcloud managed-kafka connectors update, um einen Connector zu aktualisieren:

    Sie können die Konfiguration eines Connectors entweder mit dem Flag --configs und durch Kommas getrennten Schlüssel/Wert-Paaren oder mit dem Flag --config-file und einem Pfad zu einer JSON- oder YAML-Datei aktualisieren.

    Hier ist die Syntax, bei der das Flag --configs mit durch Kommas getrennten Schlüssel/Wert-Paaren verwendet wird.

    gcloud managed-kafka connectors update CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID \
        --configs=KEY1=VALUE1,KEY2=VALUE2...
    

    Hier sehen Sie die Syntax, bei der das Flag --config-file mit einem Pfad zu einer JSON- oder YAML-Datei verwendet wird.

    gcloud managed-kafka connectors update CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID \
        --config-file=PATH_TO_CONFIG_FILE
    

    Ersetzen Sie Folgendes:

    • CONNECTOR_ID: erforderlich. Die ID des Connectors, den Sie aktualisieren möchten.
    • LOCATION: erforderlich. Der Standort des Connect-Clusters, der den Connector enthält.
    • CONNECT_CLUSTER_ID: erforderlich. Die ID des Connect-Clusters, der den Connector enthält.
    • KEY1=VALUE1,KEY2=VALUE2...: Durch Kommas getrennte Konfigurationseigenschaften, die aktualisiert werden sollen. Beispiel: tasks.max=2,value.converter.schemas.enable=true
    • PATH_TO_CONFIG_FILE: Der Pfad zu einer JSON- oder YAML-Datei, die die zu aktualisierenden Konfigurationseigenschaften enthält. Beispiel: config.json

    Beispielbefehl mit --configs:

    gcloud managed-kafka connectors update test-connector \
        --location=us-central1 \
        --connect-cluster=test-connect-cluster \
        --configs=tasks.max=2,value.converter.schemas.enable=true
    

    Beispiel für einen Befehl mit --config-file. Das Folgende ist eine Beispieldatei mit dem Namen update_config.yaml:

    tasks.max: 3
    topic: updated-test-topic
    

    Hier ist ein Beispielbefehl, der die Datei verwendet:

    gcloud managed-kafka connectors update test-connector \
        --location=us-central1 \
        --connect-cluster=test-connect-cluster \
        --config-file=update_config.yaml
    
  3. Go

    Bevor Sie dieses Beispiel ausprobieren, folgen Sie der Einrichtungsanleitung für Go unter Clientbibliotheken installieren. Weitere Informationen finden Sie in der Referenzdokumentation zur Go API für Managed Service for Apache Kafka.

    Richten Sie zur Authentifizierung bei Managed Service for Apache Kafka die Standardanmeldedaten für Anwendungen(Application Default Credentials, ADC) ein. Weitere Informationen finden Sie unter ADC für eine lokale Entwicklungsumgebung einrichten.

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    	"google.golang.org/protobuf/types/known/fieldmaskpb"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func updateConnector(w io.Writer, projectID, region, connectClusterID, connectorID string, config map[string]string, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// connectClusterID := "my-connect-cluster"
    	// connectorID := "my-connector"
    	// config := map[string]string{"tasks.max": "6"}
    	ctx := context.Background()
    	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
    	}
    	defer client.Close()
    
    	connectorPath := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s/connectors/%s", projectID, region, connectClusterID, connectorID)
    	connector := &managedkafkapb.Connector{
    		Name:    connectorPath,
    		Configs: config,
    	}
    	paths := []string{"configs"}
    	updateMask := &fieldmaskpb.FieldMask{
    		Paths: paths,
    	}
    
    	req := &managedkafkapb.UpdateConnectorRequest{
    		UpdateMask: updateMask,
    		Connector:  connector,
    	}
    	resp, err := client.UpdateConnector(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.UpdateConnector got err: %w", err)
    	}
    	fmt.Fprintf(w, "Updated connector: %#v\n", resp)
    	return nil
    }
    

    Java

    Bevor Sie dieses Beispiel ausprobieren, folgen Sie der Einrichtungsanleitung für Java unter Clientbibliotheken installieren. Weitere Informationen finden Sie in der Referenzdokumentation zur Java API für Managed Service for Apache Kafka.

    Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Managed Service for Apache Kafka zu authentifizieren. Weitere Informationen finden Sie unter ADC für eine lokale Entwicklungsumgebung einrichten.

    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.Connector;
    import com.google.cloud.managedkafka.v1.ConnectorName;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
    import com.google.protobuf.FieldMask;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    public class UpdateConnector {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-connect-cluster";
        String connectorId = "my-connector";
        // The new value for the 'tasks.max' configuration.
        String maxTasks = "5";
        updateConnector(projectId, region, clusterId, connectorId, maxTasks);
      }
    
      public static void updateConnector(
          String projectId, String region, String clusterId, String connectorId, String maxTasks)
          throws IOException {
        try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
          Map<String, String> configMap = new HashMap<>();
          configMap.put("tasks.max", maxTasks);
    
          Connector connector =
              Connector.newBuilder()
                  .setName(ConnectorName.of(projectId, region, clusterId, connectorId).toString())
                  .putAllConfigs(configMap)
                  .build();
    
          // The field mask specifies which fields to update. Here, we update the 'config' field.
          FieldMask updateMask = FieldMask.newBuilder().addPaths("config").build();
    
          // This operation is handled synchronously.
          Connector updatedConnector = managedKafkaConnectClient.updateConnector(connector, updateMask);
          System.out.printf("Updated connector: %s\n", updatedConnector.getName());
          System.out.println(updatedConnector.getAllFields());
    
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaConnectClient.updateConnector got err: %s\n", e.getMessage());
        }
      }
    }

    Python

    Bevor Sie dieses Beispiel ausprobieren, folgen Sie der Anleitung für die Einrichtung von Python unter Clientbibliotheken installieren. Weitere Informationen finden Sie in der Referenzdokumentation zur Python API für Managed Service for Apache Kafka.

    Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Managed Service for Apache Kafka zu authentifizieren. Weitere Informationen finden Sie unter ADC für eine lokale Entwicklungsumgebung einrichten.

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud import managedkafka_v1
    from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
        ManagedKafkaConnectClient,
    )
    from google.cloud.managedkafka_v1.types import Connector
    from google.protobuf import field_mask_pb2
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # connect_cluster_id = "my-connect-cluster"
    # connector_id = "my-connector"
    # configs = {
    #     "tasks.max": "6",
    #     "value.converter.schemas.enable": "true"
    # }
    
    connect_client = ManagedKafkaConnectClient()
    
    connector = Connector()
    connector.name = connect_client.connector_path(
        project_id, region, connect_cluster_id, connector_id
    )
    connector.configs = configs
    update_mask = field_mask_pb2.FieldMask()
    update_mask.paths.append("config")
    
    # For a list of editable fields, one can check https://cloud.google.com/managed-service-for-apache-kafka/docs/connect-cluster/update-connector#editable-properties.
    request = managedkafka_v1.UpdateConnectorRequest(
        update_mask=update_mask,
        connector=connector,
    )
    
    try:
        operation = connect_client.update_connector(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        response = operation.result()
        print("Updated connector:", response)
    except GoogleAPICallError as e:
        print(f"The operation failed with error: {e}")
    

    You can also update the connector's task restart policy without
    including the configuration, by using the `--task-restart-min-backoff`
    and `--task-restart-max-backoff` flags. For example:
    
    ```sh
    gcloud managed-kafka connectors update test-connector \
      --location=us-central1 \
      --connect-cluster=test-connect-cluster \
      --task-restart-min-backoff="60s" \
      --task-restart-max-backoff="90s"
    
Apache Kafka® ist eine eingetragene Marke der Apache Software Foundation oder deren Tochtergesellschaften in den USA und/oder anderen Ländern.