Mettre à jour un connecteur

Vous pouvez modifier un connecteur pour mettre à jour sa configuration, par exemple en modifiant les thèmes à partir desquels il lit ou dans lesquels il écrit, en modifiant les transformations de données ou en ajustant les paramètres de gestion des exceptions.

Pour mettre à jour un connecteur dans un cluster Connect, vous pouvez utiliser la console Google Cloud , la gcloud CLI, la bibliothèque cliente Managed Service pour Apache Kafka ou l'API Managed Kafka. Vous ne pouvez pas utiliser l'API Apache Kafka Open Source pour mettre à jour les connecteurs.

Avant de commencer

Avant de mettre à jour un connecteur, examinez sa configuration existante et comprenez l'impact potentiel des modifications que vous apportez.

Rôles et autorisations requis pour mettre à jour un connecteur

Pour obtenir les autorisations nécessaires pour modifier un connecteur, demandez à votre administrateur de vous accorder le rôle IAM Éditeur de connecteur Kafka géré (roles/managedkafka.connectorEditor) sur le projet contenant le cluster Connect. Pour en savoir plus sur l'attribution de rôles, consultez Gérer l'accès aux projets, aux dossiers et aux organisations.

Ce rôle prédéfini contient les autorisations requises pour modifier un connecteur. Pour connaître les autorisations exactes requises, développez la section Autorisations requises :

Autorisations requises

Les autorisations suivantes sont requises pour modifier un connecteur :

  • Accordez l'autorisation de mise à jour du connecteur sur le cluster Connect parent : managedkafka.connectors.update
  • Accordez l'autorisation "Lister les connecteurs" sur le cluster Connect parent : This permission is only required for updating a connector using the Google Cloud console

Vous pouvez également obtenir ces autorisations avec des rôles personnalisés ou d'autres rôles prédéfinis.

Pour en savoir plus sur le rôle d'éditeur de connecteurs Kafka gérés, consultez Rôles prédéfinis de Google Cloud Managed Service pour Apache Kafka.

Propriétés modifiables d'un connecteur

Les propriétés modifiables d'un connecteur dépendent de son type. Voici un récapitulatif des propriétés modifiables pour les types de connecteurs compatibles :

Connecteur source MirrorMaker 2.0

  • Noms de sujets ou expressions régulières de sujets (séparés par une virgule) : sujets à répliquer.

    Pour en savoir plus sur la propriété, consultez Noms de sujets.

  • Configuration : paramètres de configuration supplémentaires pour le connecteur.

    Pour en savoir plus sur la propriété, consultez Configuration.

  • Règle de redémarrage des tâches : règle de redémarrage des tâches de connecteur ayant échoué.

    Pour en savoir plus sur la propriété, consultez Stratégie de redémarrage des tâches.

Connecteur de récepteur BigQuery

  • Sujets : sujets Kafka à partir desquels diffuser les données.

    Pour en savoir plus sur la propriété, consultez Rubriques.

  • Ensemble de données : ensemble de données BigQuery dans lequel stocker les données.

    Pour en savoir plus sur la propriété, consultez Ensemble de données.

  • Configuration : paramètres de configuration supplémentaires pour le connecteur.

    Pour en savoir plus sur la propriété, consultez Configuration.

  • Règle de redémarrage des tâches : règle de redémarrage des tâches de connecteur ayant échoué.

    Pour en savoir plus sur la propriété, consultez Règle de redémarrage des tâches.

Connecteur de récepteur Cloud Storage

  • Sujets : sujets Kafka à partir desquels diffuser les données.

    Pour en savoir plus sur la propriété, consultez Rubriques.

  • Bucket Cloud Storage : bucket Cloud Storage dans lequel stocker les données.

    Pour en savoir plus sur la propriété, consultez Bucket.

  • Configuration : paramètres de configuration supplémentaires pour le connecteur.

    Pour en savoir plus sur la propriété, consultez Configuration.

  • Règle de redémarrage des tâches : règle de redémarrage des tâches de connecteur ayant échoué.

    Pour en savoir plus sur la propriété, consultez Stratégie de redémarrage des tâches.

Connecteur source Pub/Sub

  • Abonnement Pub/Sub : abonnement Pub/Sub à partir duquel recevoir les messages.
  • Sujet Kafka : sujet Kafka vers lequel diffuser les messages.
  • Configuration : paramètres de configuration supplémentaires pour le connecteur. Pour en savoir plus, consultez Configurer le connecteur.
  • Règle de redémarrage des tâches : règle de redémarrage des tâches de connecteur ayant échoué. Pour en savoir plus, consultez la section Règle de redémarrage des tâches.

Connecteur de récepteur Pub/Sub

  • Sujets : sujets Kafka à partir desquels diffuser les messages.

    Pour en savoir plus sur la propriété, consultez Rubriques.

  • Sujet Pub/Sub : sujet Pub/Sub auquel envoyer les messages.

    Pour en savoir plus sur la propriété, consultez Sujet Pub/Sub.

  • Configuration : paramètres de configuration supplémentaires pour le connecteur.

    Pour en savoir plus sur la propriété, consultez Configuration.

  • Règle de redémarrage des tâches : règle de redémarrage des tâches de connecteur ayant échoué.

    Pour en savoir plus sur la propriété, consultez Stratégie de redémarrage des tâches.

Mettre à jour un connecteur

La mise à jour d'un connecteur peut entraîner une interruption temporaire du flux de données pendant l'application des modifications.

Console

  1. Dans la console Google Cloud , accédez à la page Connecter des clusters.

    Accéder à Connect Clusters

  2. Cliquez sur le cluster Connect qui héberge le connecteur à mettre à jour.

    La page Connecter les détails du cluster s'affiche.

  3. Dans l'onglet Ressources, recherchez le connecteur dans la liste, puis cliquez sur son nom.

    Vous êtes redirigé vers la page Informations sur le connecteur.

  4. Cliquez sur Modifier.

  5. Mettez à jour les propriétés requises pour le connecteur. Les propriétés disponibles varient en fonction du type de connecteur.

  6. Cliquez sur Enregistrer.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Exécutez la commande gcloud managed-kafka connectors update pour mettre à jour un connecteur :

    Vous pouvez mettre à jour la configuration d'un connecteur à l'aide de l'indicateur --configs avec des paires clé/valeur séparées par des virgules ou de l'indicateur --config-file avec un chemin d'accès à un fichier JSON ou YAML.

    Voici la syntaxe qui utilise l'indicateur --configs avec des paires clé-valeur séparées par des virgules.

    gcloud managed-kafka connectors update CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID \
        --configs=KEY1=VALUE1,KEY2=VALUE2...
    

    Voici la syntaxe qui utilise l'indicateur --config-file avec un chemin d'accès à un fichier JSON ou YAML.

    gcloud managed-kafka connectors update CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID \
        --config-file=PATH_TO_CONFIG_FILE
    

    Remplacez les éléments suivants :

    • CONNECTOR_ID : valeur obligatoire. ID du connecteur que vous souhaitez mettre à jour.
    • LOCATION : valeur obligatoire. Emplacement du cluster Connect contenant le connecteur.
    • CONNECT_CLUSTER_ID : valeur obligatoire. ID du cluster Connect contenant le connecteur.
    • KEY1=VALUE1,KEY2=VALUE2... : propriétés de configuration à mettre à jour, séparées par une virgule. Exemple : tasks.max=2,value.converter.schemas.enable=true.
    • PATH_TO_CONFIG_FILE : chemin d'accès à un fichier JSON ou YAML contenant les propriétés de configuration à mettre à jour. Exemple : config.json.

    Exemple de commande utilisant --configs :

    gcloud managed-kafka connectors update test-connector \
        --location=us-central1 \
        --connect-cluster=test-connect-cluster \
        --configs=tasks.max=2,value.converter.schemas.enable=true
    

    Exemple de commande utilisant --config-file. Voici un exemple de fichier nommé update_config.yaml :

    tasks.max: 3
    topic: updated-test-topic
    

    Voici un exemple de commande qui utilise le fichier :

    gcloud managed-kafka connectors update test-connector \
        --location=us-central1 \
        --connect-cluster=test-connect-cluster \
        --config-file=update_config.yaml
    
  3. Go

    Avant d'essayer cet exemple, suivez les instructions de configuration pour Go dans Installer les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Managed Service pour Apache Kafka en langage Go.

    Pour vous authentifier auprès de Managed Service pour Apache Kafka, configurez les identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer les ADC pour un environnement de développement local.

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    	"google.golang.org/protobuf/types/known/fieldmaskpb"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func updateConnector(w io.Writer, projectID, region, connectClusterID, connectorID string, config map[string]string, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// connectClusterID := "my-connect-cluster"
    	// connectorID := "my-connector"
    	// config := map[string]string{"tasks.max": "6"}
    	ctx := context.Background()
    	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
    	}
    	defer client.Close()
    
    	connectorPath := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s/connectors/%s", projectID, region, connectClusterID, connectorID)
    	connector := &managedkafkapb.Connector{
    		Name:    connectorPath,
    		Configs: config,
    	}
    	paths := []string{"configs"}
    	updateMask := &fieldmaskpb.FieldMask{
    		Paths: paths,
    	}
    
    	req := &managedkafkapb.UpdateConnectorRequest{
    		UpdateMask: updateMask,
    		Connector:  connector,
    	}
    	resp, err := client.UpdateConnector(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.UpdateConnector got err: %w", err)
    	}
    	fmt.Fprintf(w, "Updated connector: %#v\n", resp)
    	return nil
    }
    

    Java

    Avant d'essayer cet exemple, suivez les instructions de configuration pour Java dans Installer les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Java pour Managed Service pour Apache Kafka.

    Pour vous authentifier auprès de Managed Service pour Apache Kafka, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer les ADC pour un environnement de développement local.

    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.Connector;
    import com.google.cloud.managedkafka.v1.ConnectorName;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
    import com.google.protobuf.FieldMask;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    public class UpdateConnector {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-connect-cluster";
        String connectorId = "my-connector";
        // The new value for the 'tasks.max' configuration.
        String maxTasks = "5";
        updateConnector(projectId, region, clusterId, connectorId, maxTasks);
      }
    
      public static void updateConnector(
          String projectId, String region, String clusterId, String connectorId, String maxTasks)
          throws IOException {
        try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
          Map<String, String> configMap = new HashMap<>();
          configMap.put("tasks.max", maxTasks);
    
          Connector connector =
              Connector.newBuilder()
                  .setName(ConnectorName.of(projectId, region, clusterId, connectorId).toString())
                  .putAllConfigs(configMap)
                  .build();
    
          // The field mask specifies which fields to update. Here, we update the 'config' field.
          FieldMask updateMask = FieldMask.newBuilder().addPaths("config").build();
    
          // This operation is handled synchronously.
          Connector updatedConnector = managedKafkaConnectClient.updateConnector(connector, updateMask);
          System.out.printf("Updated connector: %s\n", updatedConnector.getName());
          System.out.println(updatedConnector.getAllFields());
    
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaConnectClient.updateConnector got err: %s\n", e.getMessage());
        }
      }
    }

    Python

    Avant d'essayer cet exemple, suivez les instructions de configuration pour Python dans Installer les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Python Managed Service pour Apache Kafka.

    Pour vous authentifier auprès de Managed Service pour Apache Kafka, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer les ADC pour un environnement de développement local.

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud import managedkafka_v1
    from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
        ManagedKafkaConnectClient,
    )
    from google.cloud.managedkafka_v1.types import Connector
    from google.protobuf import field_mask_pb2
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # connect_cluster_id = "my-connect-cluster"
    # connector_id = "my-connector"
    # configs = {
    #     "tasks.max": "6",
    #     "value.converter.schemas.enable": "true"
    # }
    
    connect_client = ManagedKafkaConnectClient()
    
    connector = Connector()
    connector.name = connect_client.connector_path(
        project_id, region, connect_cluster_id, connector_id
    )
    connector.configs = configs
    update_mask = field_mask_pb2.FieldMask()
    update_mask.paths.append("config")
    
    # For a list of editable fields, one can check https://cloud.google.com/managed-service-for-apache-kafka/docs/connect-cluster/update-connector#editable-properties.
    request = managedkafka_v1.UpdateConnectorRequest(
        update_mask=update_mask,
        connector=connector,
    )
    
    try:
        operation = connect_client.update_connector(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        response = operation.result()
        print("Updated connector:", response)
    except GoogleAPICallError as e:
        print(f"The operation failed with error: {e}")
    

    You can also update the connector's task restart policy without
    including the configuration, by using the `--task-restart-min-backoff`
    and `--task-restart-max-backoff` flags. For example:
    
    ```sh
    gcloud managed-kafka connectors update test-connector \
      --location=us-central1 \
      --connect-cluster=test-connect-cluster \
      --task-restart-min-backoff="60s" \
      --task-restart-max-backoff="90s"
    
Apache Kafka® est une marque déposée d'Apache Software Foundation ou de ses filiales aux États-Unis et/ou dans d'autres pays.