Aggiorna un connettore

Puoi modificare un connettore per aggiornarne la configurazione, ad esempio modificando gli argomenti da cui legge o a cui scrive, modificando le trasformazioni dei dati o regolando le impostazioni di gestione degli errori.

Per aggiornare un connettore in un cluster di connessione, puoi utilizzare la console Google Cloud , gcloud CLI, la libreria client Managed Service per Apache Kafka o l'API Managed Kafka. Non puoi utilizzare l'API Apache Kafka open source per aggiornare i connettori.

Prima di iniziare

Prima di aggiornare un connettore, esamina la sua configurazione esistente e comprendi il potenziale impatto di eventuali modifiche apportate.

Ruoli e autorizzazioni richiesti per aggiornare un connettore

Per ottenere le autorizzazioni necessarie per modificare un connettore, chiedi all'amministratore di concederti il ruolo IAM Editor connettore Kafka gestito (roles/managedkafka.connectorEditor) sul progetto contenente il cluster Connect. Per saperne di più sulla concessione dei ruoli, consulta Gestisci l'accesso a progetti, cartelle e organizzazioni.

Questo ruolo predefinito contiene le autorizzazioni necessarie per modificare un connettore. Per vedere quali sono esattamente le autorizzazioni richieste, espandi la sezione Autorizzazioni obbligatorie:

Autorizzazioni obbligatorie

Per modificare un connettore sono necessarie le seguenti autorizzazioni:

  • Concedi l'autorizzazione di aggiornamento del connettore sul cluster di connessione principale: managedkafka.connectors.update
  • Concedi l'autorizzazione per elencare i connettori nel cluster di connessione principale: This permission is only required for updating a connector using the Google Cloud console

Potresti anche ottenere queste autorizzazioni con ruoli personalizzati o altri ruoli predefiniti.

Per maggiori informazioni sul ruolo Editor connettore Kafka gestito, consulta la sezione Ruoli predefiniti di Google Cloud Managed Service per Apache Kafka.

Proprietà modificabili di un connettore

Le proprietà modificabili di un connettore dipendono dal suo tipo. Ecco un riepilogo delle proprietà modificabili per i tipi di connettore supportati:

Connettore di origine MirrorMaker 2.0

  • Nomi o regex degli argomenti separati da virgole: gli argomenti da replicare.

    Per saperne di più sulla proprietà, vedi Nomi degli argomenti.

  • Configurazione: impostazioni di configurazione aggiuntive per il connettore.

    Per saperne di più sulla proprietà, consulta la sezione Configurazione.

  • Policy di riavvio attività: la policy per il riavvio delle attività del connettore non riuscite.

    Per saperne di più sulla proprietà, consulta le norme di riavvio delle attività.

Connettore di sink BigQuery

  • Argomenti: gli argomenti Kafka da cui trasmettere i dati.

    Per saperne di più sulla proprietà, consulta Argomenti.

  • Set di dati: il set di dati BigQuery in cui archiviare i dati.

    Per saperne di più sulla proprietà, vedi Dataset.

  • Configurazione: impostazioni di configurazione aggiuntive per il connettore.

    Per saperne di più sulla proprietà, consulta la sezione Configurazione.

  • Policy di riavvio attività: la policy per il riavvio delle attività del connettore non riuscite.

    Per saperne di più sulla proprietà, consulta Policy di riavvio dell'attività.

Connettore di sink Cloud Storage

  • Argomenti: gli argomenti Kafka da cui trasmettere i dati.

    Per saperne di più sulla proprietà, consulta Argomenti.

  • Bucket Cloud Storage: Il bucket Cloud Storage in cui archiviare i dati.

    Per ulteriori informazioni sulla proprietà, vedi Bucket.

  • Configurazione: impostazioni di configurazione aggiuntive per il connettore.

    Per saperne di più sulla proprietà, consulta Configurazione.

  • Policy di riavvio attività: la policy per il riavvio delle attività del connettore non riuscite.

    Per saperne di più sulla proprietà, consulta Policy di riavvio dell'attività.

Connettore di origine Pub/Sub

  • Sottoscrizione Pub/Sub: la sottoscrizione Pub/Sub da cui ricevere i messaggi.
  • Argomento Kafka: l'argomento Kafka a cui trasmettere i messaggi.
  • Configurazione: impostazioni di configurazione aggiuntive per il connettore. Per saperne di più, consulta Configura il connettore.
  • Policy di riavvio attività: la policy per il riavvio delle attività del connettore non riuscite. Per saperne di più, consulta le norme sul riavvio delle attività.

Connettore di sink Pub/Sub

  • Argomenti: gli argomenti Kafka da cui trasmettere i messaggi.

    Per saperne di più sulla proprietà, consulta Argomenti.

  • Argomento Pub/Sub: l'argomento Pub/Sub a cui inviare i messaggi.

    Per ulteriori informazioni sulla proprietà, consulta Argomento Pub/Sub.

  • Configurazione: impostazioni di configurazione aggiuntive per il connettore.

    Per saperne di più sulla proprietà, consulta Configurazione.

  • Policy di riavvio attività: la policy per il riavvio delle attività del connettore non riuscite.

    Per saperne di più sulla proprietà, consulta Policy di riavvio dell'attività.

Aggiorna un connettore

L'aggiornamento di un connettore potrebbe causare un'interruzione temporanea del flusso di dati durante l'applicazione delle modifiche.

Console

  1. Nella console Google Cloud , vai alla pagina Connetti cluster.

    Vai a Connetti cluster

  2. Fai clic sul cluster Connect che ospita il connettore da aggiornare.

    Viene visualizzata la pagina Dettagli cluster di connessione.

  3. Nella scheda Risorse, trova il connettore nell'elenco e fai clic sul suo nome.

    Viene visualizzata la pagina Dettagli connettore.

  4. Fai clic su Modifica.

  5. Aggiorna le proprietà richieste per il connettore. Le proprietà disponibili variano a seconda del tipo di connettore.

  6. Fai clic su Salva.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Utilizza il comando gcloud managed-kafka connectors update per aggiornare un connettore:

    Puoi aggiornare la configurazione di un connettore utilizzando il flag --configs con coppie chiave-valore separate da virgole o il flag --config-file con un percorso a un file JSON o YAML.

    Ecco la sintassi che utilizza il flag --configs con coppie chiave-valore separate da virgole.

    gcloud managed-kafka connectors update CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID \
        --configs=KEY1=VALUE1,KEY2=VALUE2...
    

    Ecco la sintassi che utilizza il flag --config-file con un percorso a un file JSON o YAML.

    gcloud managed-kafka connectors update CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID \
        --config-file=PATH_TO_CONFIG_FILE
    

    Sostituisci quanto segue:

    • CONNECTOR_ID: obbligatorio. L'ID del connettore che vuoi aggiornare.
    • LOCATION: obbligatorio. La posizione del cluster Connect contenente il connettore.
    • CONNECT_CLUSTER_ID: obbligatorio. L'ID del cluster Connect contenente il connettore.
    • KEY1=VALUE1,KEY2=VALUE2...: Proprietà di configurazione da aggiornare separate da virgole. Ad esempio, tasks.max=2,value.converter.schemas.enable=true.
    • PATH_TO_CONFIG_FILE: il percorso di un file JSON o YAML contenente le proprietà di configurazione da aggiornare. Ad esempio, config.json.

    Comando di esempio che utilizza --configs:

    gcloud managed-kafka connectors update test-connector \
        --location=us-central1 \
        --connect-cluster=test-connect-cluster \
        --configs=tasks.max=2,value.converter.schemas.enable=true
    

    Comando di esempio che utilizza --config-file. Di seguito è riportato un file di esempio denominato update_config.yaml:

    tasks.max: 3
    topic: updated-test-topic
    

    Di seguito è riportato un comando di esempio che utilizza il file:

    gcloud managed-kafka connectors update test-connector \
        --location=us-central1 \
        --connect-cluster=test-connect-cluster \
        --config-file=update_config.yaml
    
  3. Go

    Prima di provare questo esempio, segui le istruzioni di configurazione di Go in Installare le librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Go di Managed Service per Apache Kafka.

    Per eseguire l'autenticazione in Managed Service per Apache Kafka, configura le Credenziali predefinite dell'applicazione(ADC). Per saperne di più, vedi Configura ADC per un ambiente di sviluppo locale.

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    	"google.golang.org/protobuf/types/known/fieldmaskpb"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func updateConnector(w io.Writer, projectID, region, connectClusterID, connectorID string, config map[string]string, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// connectClusterID := "my-connect-cluster"
    	// connectorID := "my-connector"
    	// config := map[string]string{"tasks.max": "6"}
    	ctx := context.Background()
    	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
    	}
    	defer client.Close()
    
    	connectorPath := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s/connectors/%s", projectID, region, connectClusterID, connectorID)
    	connector := &managedkafkapb.Connector{
    		Name:    connectorPath,
    		Configs: config,
    	}
    	paths := []string{"configs"}
    	updateMask := &fieldmaskpb.FieldMask{
    		Paths: paths,
    	}
    
    	req := &managedkafkapb.UpdateConnectorRequest{
    		UpdateMask: updateMask,
    		Connector:  connector,
    	}
    	resp, err := client.UpdateConnector(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.UpdateConnector got err: %w", err)
    	}
    	fmt.Fprintf(w, "Updated connector: %#v\n", resp)
    	return nil
    }
    

    Java

    Prima di provare questo esempio, segui le istruzioni di configurazione di Java in Installare le librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Java di Managed Service per Apache Kafka.

    Per eseguire l'autenticazione in Managed Service per Apache Kafka, configura le Credenziali predefinite dell'applicazione. Per saperne di più, vedi Configura ADC per un ambiente di sviluppo locale.

    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.Connector;
    import com.google.cloud.managedkafka.v1.ConnectorName;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
    import com.google.protobuf.FieldMask;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    public class UpdateConnector {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-connect-cluster";
        String connectorId = "my-connector";
        // The new value for the 'tasks.max' configuration.
        String maxTasks = "5";
        updateConnector(projectId, region, clusterId, connectorId, maxTasks);
      }
    
      public static void updateConnector(
          String projectId, String region, String clusterId, String connectorId, String maxTasks)
          throws IOException {
        try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
          Map<String, String> configMap = new HashMap<>();
          configMap.put("tasks.max", maxTasks);
    
          Connector connector =
              Connector.newBuilder()
                  .setName(ConnectorName.of(projectId, region, clusterId, connectorId).toString())
                  .putAllConfigs(configMap)
                  .build();
    
          // The field mask specifies which fields to update. Here, we update the 'config' field.
          FieldMask updateMask = FieldMask.newBuilder().addPaths("config").build();
    
          // This operation is handled synchronously.
          Connector updatedConnector = managedKafkaConnectClient.updateConnector(connector, updateMask);
          System.out.printf("Updated connector: %s\n", updatedConnector.getName());
          System.out.println(updatedConnector.getAllFields());
    
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaConnectClient.updateConnector got err: %s\n", e.getMessage());
        }
      }
    }

    Python

    Prima di provare questo esempio, segui le istruzioni di configurazione di Python in Installare le librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Python di Managed Service per Apache Kafka.

    Per eseguire l'autenticazione in Managed Service per Apache Kafka, configura le Credenziali predefinite dell'applicazione. Per saperne di più, vedi Configura ADC per un ambiente di sviluppo locale.

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud import managedkafka_v1
    from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
        ManagedKafkaConnectClient,
    )
    from google.cloud.managedkafka_v1.types import Connector
    from google.protobuf import field_mask_pb2
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # connect_cluster_id = "my-connect-cluster"
    # connector_id = "my-connector"
    # configs = {
    #     "tasks.max": "6",
    #     "value.converter.schemas.enable": "true"
    # }
    
    connect_client = ManagedKafkaConnectClient()
    
    connector = Connector()
    connector.name = connect_client.connector_path(
        project_id, region, connect_cluster_id, connector_id
    )
    connector.configs = configs
    update_mask = field_mask_pb2.FieldMask()
    update_mask.paths.append("config")
    
    # For a list of editable fields, one can check https://cloud.google.com/managed-service-for-apache-kafka/docs/connect-cluster/update-connector#editable-properties.
    request = managedkafka_v1.UpdateConnectorRequest(
        update_mask=update_mask,
        connector=connector,
    )
    
    try:
        operation = connect_client.update_connector(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        response = operation.result()
        print("Updated connector:", response)
    except GoogleAPICallError as e:
        print(f"The operation failed with error: {e}")
    

    You can also update the connector's task restart policy without
    including the configuration, by using the `--task-restart-min-backoff`
    and `--task-restart-max-backoff` flags. For example:
    
    ```sh
    gcloud managed-kafka connectors update test-connector \
      --location=us-central1 \
      --connect-cluster=test-connect-cluster \
      --task-restart-min-backoff="60s" \
      --task-restart-max-backoff="90s"
    
Apache Kafka® è un marchio registrato di Apache Software Foundation o delle sue affiliate negli Stati Uniti e/o in altri paesi.