Atualizar um conector

É possível editar um conector para atualizar a configuração dele, como mudar os temas de leitura ou gravação, modificar transformações de dados ou ajustar as configurações de tratamento de erros.

Para atualizar um conector em um cluster do Connect, use o console Google Cloud , a CLI gcloud, a biblioteca de cliente do Serviço gerenciado para Apache Kafka ou a API Managed Kafka. Não é possível usar a API de código aberto do Apache Kafka para atualizar os conectores.

Antes de começar

Antes de atualizar um conector, revise a configuração atual e entenda o possível impacto das mudanças.

Papéis e permissões necessários para atualizar um conector

Para receber as permissões necessárias para editar um conector, peça ao administrador para conceder a você o papel do IAM de Editor de conector gerenciado do Kafka (roles/managedkafka.connectorEditor) no projeto que contém o cluster do Connect. Para mais informações sobre a concessão de papéis, consulte Gerenciar o acesso a projetos, pastas e organizações.

Esse papel predefinido contém as permissões necessárias para editar um conector. Para acessar as permissões exatas necessárias, expanda a seção Permissões necessárias:

Permissões necessárias

As seguintes permissões são necessárias para editar um conector:

  • Conceda a permissão de conector de atualização no cluster pai do Connect: managedkafka.connectors.update
  • Conceda a permissão "list connectors" no cluster pai do Connect: This permission is only required for updating a connector using the Google Cloud console

Essas permissões também podem ser concedidas com funções personalizadas ou outros papéis predefinidos.

Para mais informações sobre a função de editor do conector gerenciado do Kafka, consulte Funções predefinidas do serviço gerenciado do Google Cloud para Apache Kafka.

Propriedades editáveis de um conector

As propriedades editáveis de um conector dependem do tipo dele. Confira um resumo das propriedades editáveis para os tipos de conectores compatíveis:

Conector de origem do MirrorMaker 2.0

  • Nomes de tópicos ou regex de tópicos separados por vírgulas: os tópicos a serem replicados.

    Para mais informações sobre a propriedade, consulte Nomes de tópicos.

  • Configuração: outras configurações do conector.

    Para mais informações sobre a propriedade, consulte Configuration.

  • Política de reinicialização de tarefas: a política para reiniciar tarefas de conector com falha.

    Para mais informações sobre a propriedade, consulte Política de reinicialização de tarefas.

Conector de coletor do BigQuery

  • Tópicos: os tópicos do Kafka de onde os dados serão transmitidos.

    Para mais informações sobre a propriedade, consulte Temas.

  • Conjunto de dados: o conjunto de dados do BigQuery para armazenar os dados.

    Para mais informações sobre a propriedade, consulte Conjunto de dados.

  • Configuração: outras configurações para o conector.

    Para mais informações sobre a propriedade, consulte Configuration.

  • Política de reinicialização de tarefas: a política para reiniciar tarefas de conector com falha.

    Para mais informações sobre a propriedade, consulte Política de reinicialização de tarefas.

Conector de coletor do Cloud Storage

  • Tópicos: os tópicos do Kafka de onde os dados serão transmitidos.

    Para mais informações sobre a propriedade, consulte Temas.

  • Bucket do Cloud Storage: O bucket do Cloud Storage para armazenar os dados.

    Para mais informações sobre a propriedade, consulte Bucket.

  • Configuração: outras configurações para o conector.

    Para mais informações sobre a propriedade, consulte Configuration.

  • Política de reinicialização de tarefas: a política para reiniciar tarefas de conector com falha.

    Para mais informações sobre a propriedade, consulte Política de reinicialização de tarefas.

Conector de origem do Pub/Sub

  • Assinatura do Pub/Sub: a assinatura do Pub/Sub de onde as mensagens serão recebidas.
  • Tópico do Kafka: o tópico do Kafka para onde as mensagens serão transmitidas.
  • Configuração: outras configurações para o conector. Para mais informações, consulte Configurar o conector.
  • Política de reinicialização de tarefas: a política para reiniciar tarefas de conector com falha. Para mais informações, consulte Política de reinicialização de tarefas.

Conector de coletor do Pub/Sub

  • Tópicos: os tópicos do Kafka de onde as mensagens serão transmitidas.

    Para mais informações sobre a propriedade, consulte Temas.

  • Tópico do Pub/Sub: o tópico do Pub/Sub para onde as mensagens serão enviadas.

    Para mais informações sobre a propriedade, consulte Tópico do Pub/Sub.

  • Configuração: outras configurações para o conector.

    Para mais informações sobre a propriedade, consulte Configuration.

  • Política de reinicialização de tarefas: a política para reiniciar tarefas de conector com falha.

    Para mais informações sobre a propriedade, consulte Política de reinicialização de tarefas.

Atualizar um conector

A atualização de um conector pode causar uma interrupção temporária no fluxo de dados enquanto as mudanças são aplicadas.

Console

  1. No console do Google Cloud , acesse a página Conectar clusters.

    Acessar o Connect Clusters

  2. Clique no cluster do Connect que hospeda o conector que você quer atualizar.

    A página Detalhes do cluster de conexão é exibida.

  3. Na guia Recursos, encontre o conector na lista e clique no nome dele.

    Você será redirecionado para a página Detalhes do conector.

  4. Clique em Editar.

  5. Atualize as propriedades obrigatórias do conector. As propriedades disponíveis variam de acordo com o tipo de conector.

  6. Clique em Salvar.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Use o comando gcloud managed-kafka connectors update para atualizar um conector:

    É possível atualizar a configuração de um conector usando a flag --configs com pares de chave-valor separados por vírgula ou a flag --config-file com um caminho para um arquivo JSON ou YAML.

    Esta é a sintaxe que usa a flag --configs com pares de chave-valor separados por vírgula.

    gcloud managed-kafka connectors update CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID \
        --configs=KEY1=VALUE1,KEY2=VALUE2...
    

    Esta é a sintaxe que usa a flag --config-file com um caminho para um arquivo JSON ou YAML.

    gcloud managed-kafka connectors update CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID \
        --config-file=PATH_TO_CONFIG_FILE
    

    Substitua:

    • CONNECTOR_ID: obrigatório. O ID do conector que você quer atualizar.
    • LOCATION: obrigatório. O local do cluster do Connect que contém o conector.
    • CONNECT_CLUSTER_ID: obrigatório. O ID do cluster do Connect que contém o conector.
    • KEY1=VALUE1,KEY2=VALUE2...: propriedades de configuração separadas por vírgulas a serem atualizadas. Por exemplo, tasks.max=2,value.converter.schemas.enable=true.
    • PATH_TO_CONFIG_FILE: o caminho para um arquivo JSON ou YAML que contém as propriedades de configuração a serem atualizadas. Por exemplo, config.json.

    Exemplo de comando usando --configs:

    gcloud managed-kafka connectors update test-connector \
        --location=us-central1 \
        --connect-cluster=test-connect-cluster \
        --configs=tasks.max=2,value.converter.schemas.enable=true
    

    Exemplo de comando usando --config-file. Confira a seguir um exemplo de arquivo chamado update_config.yaml:

    tasks.max: 3
    topic: updated-test-topic
    

    Confira a seguir um exemplo de comando que usa o arquivo:

    gcloud managed-kafka connectors update test-connector \
        --location=us-central1 \
        --connect-cluster=test-connect-cluster \
        --config-file=update_config.yaml
    
  3. Go

    Antes de testar esta amostra, siga as instruções de configuração do Go em Instalar as bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Go do serviço gerenciado para Apache Kafka.

    Para autenticar o Managed Service para Apache Kafka, configure o Application Default Credentials(ADC). Para mais informações, consulte Configurar o ADC para um ambiente de desenvolvimento local.

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    	"google.golang.org/protobuf/types/known/fieldmaskpb"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func updateConnector(w io.Writer, projectID, region, connectClusterID, connectorID string, config map[string]string, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// connectClusterID := "my-connect-cluster"
    	// connectorID := "my-connector"
    	// config := map[string]string{"tasks.max": "6"}
    	ctx := context.Background()
    	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
    	}
    	defer client.Close()
    
    	connectorPath := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s/connectors/%s", projectID, region, connectClusterID, connectorID)
    	connector := &managedkafkapb.Connector{
    		Name:    connectorPath,
    		Configs: config,
    	}
    	paths := []string{"configs"}
    	updateMask := &fieldmaskpb.FieldMask{
    		Paths: paths,
    	}
    
    	req := &managedkafkapb.UpdateConnectorRequest{
    		UpdateMask: updateMask,
    		Connector:  connector,
    	}
    	resp, err := client.UpdateConnector(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.UpdateConnector got err: %w", err)
    	}
    	fmt.Fprintf(w, "Updated connector: %#v\n", resp)
    	return nil
    }
    

    Java

    Antes de testar esta amostra, siga as instruções de configuração do Java em Instalar as bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Java do serviço gerenciado para Apache Kafka.

    Para autenticar o serviço gerenciado para Apache Kafka, configure o Application Default Credentials. Para mais informações, consulte Configurar o ADC para um ambiente de desenvolvimento local.

    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.Connector;
    import com.google.cloud.managedkafka.v1.ConnectorName;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
    import com.google.protobuf.FieldMask;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    public class UpdateConnector {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-connect-cluster";
        String connectorId = "my-connector";
        // The new value for the 'tasks.max' configuration.
        String maxTasks = "5";
        updateConnector(projectId, region, clusterId, connectorId, maxTasks);
      }
    
      public static void updateConnector(
          String projectId, String region, String clusterId, String connectorId, String maxTasks)
          throws IOException {
        try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
          Map<String, String> configMap = new HashMap<>();
          configMap.put("tasks.max", maxTasks);
    
          Connector connector =
              Connector.newBuilder()
                  .setName(ConnectorName.of(projectId, region, clusterId, connectorId).toString())
                  .putAllConfigs(configMap)
                  .build();
    
          // The field mask specifies which fields to update. Here, we update the 'config' field.
          FieldMask updateMask = FieldMask.newBuilder().addPaths("config").build();
    
          // This operation is handled synchronously.
          Connector updatedConnector = managedKafkaConnectClient.updateConnector(connector, updateMask);
          System.out.printf("Updated connector: %s\n", updatedConnector.getName());
          System.out.println(updatedConnector.getAllFields());
    
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaConnectClient.updateConnector got err: %s\n", e.getMessage());
        }
      }
    }

    Python

    Antes de testar esta amostra, siga as instruções de configuração do Python em Instalar as bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Python do serviço gerenciado para Apache Kafka.

    Para autenticar o serviço gerenciado para Apache Kafka, configure o Application Default Credentials. Para mais informações, consulte Configurar o ADC para um ambiente de desenvolvimento local.

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud import managedkafka_v1
    from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
        ManagedKafkaConnectClient,
    )
    from google.cloud.managedkafka_v1.types import Connector
    from google.protobuf import field_mask_pb2
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # connect_cluster_id = "my-connect-cluster"
    # connector_id = "my-connector"
    # configs = {
    #     "tasks.max": "6",
    #     "value.converter.schemas.enable": "true"
    # }
    
    connect_client = ManagedKafkaConnectClient()
    
    connector = Connector()
    connector.name = connect_client.connector_path(
        project_id, region, connect_cluster_id, connector_id
    )
    connector.configs = configs
    update_mask = field_mask_pb2.FieldMask()
    update_mask.paths.append("config")
    
    # For a list of editable fields, one can check https://cloud.google.com/managed-service-for-apache-kafka/docs/connect-cluster/update-connector#editable-properties.
    request = managedkafka_v1.UpdateConnectorRequest(
        update_mask=update_mask,
        connector=connector,
    )
    
    try:
        operation = connect_client.update_connector(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        response = operation.result()
        print("Updated connector:", response)
    except GoogleAPICallError as e:
        print(f"The operation failed with error: {e}")
    

    You can also update the connector's task restart policy without
    including the configuration, by using the `--task-restart-min-backoff`
    and `--task-restart-max-backoff` flags. For example:
    
    ```sh
    gcloud managed-kafka connectors update test-connector \
      --location=us-central1 \
      --connect-cluster=test-connect-cluster \
      --task-restart-min-backoff="60s" \
      --task-restart-max-backoff="90s"
    
Apache Kafka® é uma marca registrada da The Apache Software Foundation ou afiliadas nos Estados Unidos e/ou em outros países.