Actualiza un conector

Puedes editar un conector para actualizar su configuración, por ejemplo, cambiar los temas desde los que lee o en los que escribe, modificar las transformaciones de datos o ajustar la configuración del manejo de errores.

Para actualizar un conector en un clúster de Connect, puedes usar la consola de Google Cloud , gcloud CLI, la biblioteca cliente de Managed Service for Apache Kafka o la API de Managed Kafka. No puedes usar la API de Apache Kafka de código abierto para actualizar los conectores.

Antes de comenzar

Antes de actualizar un conector, revisa su configuración existente y comprende el posible impacto de los cambios que realices.

Roles y permisos obligatorios para actualizar un conector

Para obtener los permisos que necesitas para editar un conector, pídele a tu administrador que te otorgue el rol de IAM de editor de conectores de Kafka administrados (roles/managedkafka.connectorEditor) en el proyecto que contiene el clúster de Connect. Para obtener más información sobre cómo otorgar roles, consulta Administra el acceso a proyectos, carpetas y organizaciones.

Este rol predefinido contiene los permisos necesarios para editar un conector. Para ver los permisos exactos que son necesarios, expande la sección Permisos requeridos:

Permisos necesarios

Se requieren los siguientes permisos para editar un conector:

  • Otorga el permiso del conector de actualización en el clúster principal de Connect: managedkafka.connectors.update
  • Otorga el permiso de conectores de lista en el clúster principal de Connect: This permission is only required for updating a connector using the Google Cloud console

También puedes obtener estos permisos con roles personalizados o con otros roles predefinidos.

Para obtener más información sobre el rol de editor de conectores de Kafka administrado, consulta Roles predefinidos de Google Cloud Managed Service para Apache Kafka.

Propiedades editables de un conector

Las propiedades editables de un conector dependen de su tipo. A continuación, se incluye un resumen de las propiedades editables para los tipos de conectores compatibles:

Conector de origen de MirrorMaker 2.0

  • Nombres de temas o regex de temas separados por comas: Son los temas que se replicarán.

    Para obtener más información sobre la propiedad, consulta Nombres de temas.

  • Configuración: Son parámetros de configuración adicionales para el conector.

    Para obtener más información sobre la propiedad, consulta Configuración.

  • Política de reinicio de tareas: Es la política para reiniciar las tareas del conector que fallaron.

    Para obtener más información sobre la propiedad, consulta Política de reinicio de tareas.

Conector de receptor de BigQuery

  • Temas: Son los temas de Kafka desde los que se transmiten datos.

    Para obtener más información sobre la propiedad, consulta Topics.

  • Conjunto de datos: Es el conjunto de datos de BigQuery en el que se almacenarán los datos.

    Para obtener más información sobre la propiedad, consulta Dataset.

  • Configuración: Son parámetros de configuración adicionales para el conector.

    Para obtener más información sobre la propiedad, consulta Configuración.

  • Política de reinicio de tareas: Es la política para reiniciar las tareas del conector con errores.

    Para obtener más información sobre la propiedad, consulta Política de reinicio de tareas.

Conector de receptor de Cloud Storage

  • Temas: Son los temas de Kafka desde los que se transmiten datos.

    Para obtener más información sobre la propiedad, consulta Topics.

  • Bucket de Cloud Storage: Es el bucket de Cloud Storage en el que se almacenarán los datos.

    Para obtener más información sobre la propiedad, consulta Bucket.

  • Configuración: Son parámetros de configuración adicionales para el conector.

    Para obtener más información sobre la propiedad, consulta Configuración.

  • Política de reinicio de tareas: Es la política para reiniciar las tareas del conector que fallaron.

    Para obtener más información sobre la propiedad, consulta Política de reinicio de tareas.

Conector de fuente de Pub/Sub

  • Suscripción a Pub/Sub: Es la suscripción a Pub/Sub desde la que se reciben mensajes.
  • Tema de Kafka: Es el tema de Kafka al que se transmitirán los mensajes.
  • Configuración: Son parámetros de configuración adicionales para el conector. Para obtener más información, consulta Configura el conector.
  • Política de reinicio de tareas: Es la política para reiniciar las tareas del conector que fallaron. Para obtener más información, consulta la política de reinicio de tareas.

Conector de receptor de Pub/Sub

  • Temas: Son los temas de Kafka desde los que se transmiten mensajes.

    Para obtener más información sobre la propiedad, consulta Topics.

  • Tema de Pub/Sub: Es el tema de Pub/Sub al que se enviarán los mensajes.

    Para obtener más información sobre la propiedad, consulta Tema de Pub/Sub.

  • Configuración: Son parámetros de configuración adicionales para el conector.

    Para obtener más información sobre la propiedad, consulta Configuración.

  • Política de reinicio de tareas: Es la política para reiniciar las tareas del conector con errores.

    Para obtener más información sobre la propiedad, consulta Política de reinicio de tareas.

Actualiza un conector

La actualización de un conector puede provocar una interrupción temporal en el flujo de datos mientras se aplican los cambios.

Console

  1. En la consola de Google Cloud , ve a la página Connect Clusters.

    Ir a Connect Clusters

  2. Haz clic en el clúster de Connect que aloja el conector que quieres actualizar.

    Se muestra la página Detalles de conexión del clúster.

  3. En la pestaña Resources, busca el conector en la lista y haz clic en su nombre.

    Se te redireccionará a la página Detalles del conector.

  4. Haz clic en Editar.

  5. Actualiza las propiedades requeridas para el conector. Las propiedades disponibles varían según el tipo de conector.

  6. Haz clic en Guardar.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Usa el comando gcloud managed-kafka connectors update para actualizar un conector:

    Puedes actualizar la configuración de un conector con la marca --configs con pares clave-valor separados por comas o la marca --config-file con una ruta de acceso a un archivo JSON o YAML.

    Esta es la sintaxis que usa la marca --configs con pares clave-valor separados por comas.

    gcloud managed-kafka connectors update CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID \
        --configs=KEY1=VALUE1,KEY2=VALUE2...
    

    Esta es la sintaxis que usa la marca --config-file con una ruta de acceso a un archivo JSON o YAML.

    gcloud managed-kafka connectors update CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID \
        --config-file=PATH_TO_CONFIG_FILE
    

    Reemplaza lo siguiente:

    • CONNECTOR_ID: Obligatorio. ID del conector que deseas actualizar.
    • LOCATION: Obligatorio. Ubicación del clúster de Connect que contiene el conector.
    • CONNECT_CLUSTER_ID: Obligatorio. ID del clúster de Connect que contiene el conector.
    • KEY1=VALUE1,KEY2=VALUE2...: Son las propiedades de configuración separadas por comas que se actualizarán. Por ejemplo, tasks.max=2,value.converter.schemas.enable=true.
    • PATH_TO_CONFIG_FILE: Es la ruta de acceso a un archivo JSON o YAML que contiene las propiedades de configuración que se actualizarán. Por ejemplo, config.json.

    Comando de ejemplo con --configs:

    gcloud managed-kafka connectors update test-connector \
        --location=us-central1 \
        --connect-cluster=test-connect-cluster \
        --configs=tasks.max=2,value.converter.schemas.enable=true
    

    Ejemplo de comando que usa --config-file. A continuación, se muestra un archivo de ejemplo llamado update_config.yaml:

    tasks.max: 3
    topic: updated-test-topic
    

    A continuación, se muestra un ejemplo de un comando que usa el archivo:

    gcloud managed-kafka connectors update test-connector \
        --location=us-central1 \
        --connect-cluster=test-connect-cluster \
        --config-file=update_config.yaml
    
  3. Go

    Antes de probar este ejemplo, sigue las instrucciones de configuración de Go en Instala las bibliotecas cliente. Para obtener más información, consulta la documentación de referencia de la API de Managed Service para Apache Kafka en Go.

    Para autenticarte en Managed Service for Apache Kafka, configura las credenciales predeterminadas de la aplicación(ADC). Para obtener más información, consulta Configura ADC para un entorno de desarrollo local.

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    	"google.golang.org/protobuf/types/known/fieldmaskpb"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func updateConnector(w io.Writer, projectID, region, connectClusterID, connectorID string, config map[string]string, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// connectClusterID := "my-connect-cluster"
    	// connectorID := "my-connector"
    	// config := map[string]string{"tasks.max": "6"}
    	ctx := context.Background()
    	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
    	}
    	defer client.Close()
    
    	connectorPath := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s/connectors/%s", projectID, region, connectClusterID, connectorID)
    	connector := &managedkafkapb.Connector{
    		Name:    connectorPath,
    		Configs: config,
    	}
    	paths := []string{"configs"}
    	updateMask := &fieldmaskpb.FieldMask{
    		Paths: paths,
    	}
    
    	req := &managedkafkapb.UpdateConnectorRequest{
    		UpdateMask: updateMask,
    		Connector:  connector,
    	}
    	resp, err := client.UpdateConnector(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.UpdateConnector got err: %w", err)
    	}
    	fmt.Fprintf(w, "Updated connector: %#v\n", resp)
    	return nil
    }
    

    Java

    Antes de probar este ejemplo, sigue las instrucciones de configuración de Java en Instala las bibliotecas cliente. Para obtener más información, consulta la documentación de referencia de la API de Java de Managed Service for Apache Kafka.

    Para autenticarte en el servicio administrado para Apache Kafka, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta Configura ADC para un entorno de desarrollo local.

    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.Connector;
    import com.google.cloud.managedkafka.v1.ConnectorName;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
    import com.google.protobuf.FieldMask;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    public class UpdateConnector {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-connect-cluster";
        String connectorId = "my-connector";
        // The new value for the 'tasks.max' configuration.
        String maxTasks = "5";
        updateConnector(projectId, region, clusterId, connectorId, maxTasks);
      }
    
      public static void updateConnector(
          String projectId, String region, String clusterId, String connectorId, String maxTasks)
          throws IOException {
        try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
          Map<String, String> configMap = new HashMap<>();
          configMap.put("tasks.max", maxTasks);
    
          Connector connector =
              Connector.newBuilder()
                  .setName(ConnectorName.of(projectId, region, clusterId, connectorId).toString())
                  .putAllConfigs(configMap)
                  .build();
    
          // The field mask specifies which fields to update. Here, we update the 'config' field.
          FieldMask updateMask = FieldMask.newBuilder().addPaths("config").build();
    
          // This operation is handled synchronously.
          Connector updatedConnector = managedKafkaConnectClient.updateConnector(connector, updateMask);
          System.out.printf("Updated connector: %s\n", updatedConnector.getName());
          System.out.println(updatedConnector.getAllFields());
    
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaConnectClient.updateConnector got err: %s\n", e.getMessage());
        }
      }
    }

    Python

    Antes de probar este ejemplo, sigue las instrucciones de configuración de Python en Instala las bibliotecas cliente. Si deseas obtener más información, consulta la documentación de referencia de la API de Python de Managed Service for Apache Kafka.

    Para autenticarte en el servicio administrado para Apache Kafka, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta Configura ADC para un entorno de desarrollo local.

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud import managedkafka_v1
    from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
        ManagedKafkaConnectClient,
    )
    from google.cloud.managedkafka_v1.types import Connector
    from google.protobuf import field_mask_pb2
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # connect_cluster_id = "my-connect-cluster"
    # connector_id = "my-connector"
    # configs = {
    #     "tasks.max": "6",
    #     "value.converter.schemas.enable": "true"
    # }
    
    connect_client = ManagedKafkaConnectClient()
    
    connector = Connector()
    connector.name = connect_client.connector_path(
        project_id, region, connect_cluster_id, connector_id
    )
    connector.configs = configs
    update_mask = field_mask_pb2.FieldMask()
    update_mask.paths.append("config")
    
    # For a list of editable fields, one can check https://cloud.google.com/managed-service-for-apache-kafka/docs/connect-cluster/update-connector#editable-properties.
    request = managedkafka_v1.UpdateConnectorRequest(
        update_mask=update_mask,
        connector=connector,
    )
    
    try:
        operation = connect_client.update_connector(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        response = operation.result()
        print("Updated connector:", response)
    except GoogleAPICallError as e:
        print(f"The operation failed with error: {e}")
    

    You can also update the connector's task restart policy without
    including the configuration, by using the `--task-restart-min-backoff`
    and `--task-restart-max-backoff` flags. For example:
    
    ```sh
    gcloud managed-kafka connectors update test-connector \
      --location=us-central1 \
      --connect-cluster=test-connect-cluster \
      --task-restart-min-backoff="60s" \
      --task-restart-max-backoff="90s"
    
Apache Kafka® es una marca registrada de The Apache Software Foundation o sus afiliados en Estados Unidos y otros países.