Update a connector

You can edit a connector to update its configuration, such as changing the topics it reads from or writes to, modifying data transformations, or adjusting error handling settings.

To update a connector in a Connect cluster, you can use the Google Cloud console, the gcloud CLI, the Managed Service for Apache Kafka client library, or the Managed Kafka API. You can't use the open source Apache Kafka API to update the connectors.

Before you begin

Before updating a connector, review its existing configuration and understand the potential impact of any changes you make.

Required roles and permissions to update a connector

To get the permissions that you need to edit a connector, ask your administrator to grant you the Managed Kafka Connector Editor (roles/managedkafka.connectorEditor) IAM role on the project containing the Connect cluster. For more information about granting roles, see Manage access to projects, folders, and organizations.

This predefined role contains the permissions required to edit a connector. To see the exact permissions that are required, expand the Required permissions section:

Required permissions

The following permissions are required to edit a connector:

  • Grant the update connector permission on the parent Connect cluster: managedkafka.connectors.update
  • Grant the list connectors permission on the parent Connect cluster: This permission is only required for updating a connector using the Google Cloud console

You might also be able to get these permissions with custom roles or other predefined roles.

For more information about the Managed Kafka Connector Editor role, see Google Cloud Managed Service for Apache Kafka predefined roles.

Editable properties of a connector

The editable properties of a connector depend on its type. Here's a summary of the editable properties for the supported connector types:

MirrorMaker 2.0 Source connector

  • Comma-separated topic names or topic regex: The topics to be replicated.

    For more information about the property, see Topic names.

  • Configuration: Additional configuration settings for the connector.

    For more information about the property, see Configuration.

  • Task restart policy: The policy for restarting failed connector tasks.

    For more information about the property, see Task restart policy.

BigQuery Sink connector

  • Topics: The Kafka topics from which to stream data.

    For more information about the property, see Topics.

  • Dataset: The BigQuery dataset to store the data.

    For more information about the property, see Dataset.

  • Configuration: Additional configuration settings for the connector.

    For more information about the property, see Configuration.

  • Task restart policy: The policy for restarting failed connector tasks.

    For more information about the property, see Task restart policy.

Cloud Storage Sink connector

  • Topics: The Kafka topics from which to stream data.

    For more information about the property, see Topics.

  • Cloud Storage bucket: The Cloud Storage bucket to store the data.

    For more information about the property, see Bucket.

  • Configuration: Additional configuration settings for the connector.

    For more information about the property, see Configuration.

  • Task restart policy: The policy for restarting failed connector tasks.

    For more information about the property, see Task restart policy.

Pub/Sub Source connector

  • Pub/Sub subscription: The Pub/Sub subscription from which to receive messages.
  • Kafka topic: The Kafka topic to which to stream messages.
  • Configuration: Additional configuration settings for the connector. For more information, see Configure the connector.
  • Task restart policy: The policy for restarting failed connector tasks. For more information, see Task restart policy.

Pub/Sub Sink connector

  • Topics: The Kafka topics from which to stream messages.

    For more information about the property, see Topics.

  • Pub/Sub topic: The Pub/Sub topic to which to send messages.

    For more information about the property, see Pub/Sub topic.

  • Configuration: Additional configuration settings for the connector.

    For more information about the property, see Configuration.

  • Task restart policy: The policy for restarting failed connector tasks.

    For more information about the property, see Task restart policy.

Update a connector

Updating a connector may cause a temporary interruption in data flow while the changes are applied.

Console

  1. In the Google Cloud console, go to the Connect Clusters page.

    Go to Connect Clusters

  2. Click the Connect cluster that hosts the connector you want to update.

    The Connect cluster details page is displayed.

  3. On the Resources tab, find the connector in the list and click its name.

    You are redirected to the Connector details page.

  4. Click Edit.

  5. Update the required properties for the connector. The available properties vary depending on the connector type.

  6. Click Save.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Use the gcloud managed-kafka connectors update command to update a connector:

    You can update a connector's configuration using either the --configs flag with comma-separated key-value pairs or the --config-file flag with a path to a JSON or YAML file.

    Here is the syntax that uses the --configs flag with comma-separated key-value pairs.

    gcloud managed-kafka connectors update CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID \
        --configs=KEY1=VALUE1,KEY2=VALUE2...
    

    Here is the syntax that uses the --config-file flag with a path to a JSON or YAML file.

    gcloud managed-kafka connectors update CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID \
        --config-file=PATH_TO_CONFIG_FILE
    

    Replace the following:

    • CONNECTOR_ID: Required. The ID of the connector you want to update.
    • LOCATION: Required. The location of the Connect cluster containing the connector.
    • CONNECT_CLUSTER_ID: Required. The ID of the Connect cluster containing the connector.
    • KEY1=VALUE1,KEY2=VALUE2...: Comma-separated configuration properties to update. For example, tasks.max=2,value.converter.schemas.enable=true.
    • PATH_TO_CONFIG_FILE: The path to a JSON or YAML file containing the configuration properties to update. For example, config.json.

    Example command using --configs:

    gcloud managed-kafka connectors update test-connector \
        --location=us-central1 \
        --connect-cluster=test-connect-cluster \
        --configs=tasks.max=2,value.converter.schemas.enable=true
    

    Example command using --config-file. The following is a sample file that is named update_config.yaml:

    tasks.max: 3
    topic: updated-test-topic
    

    The following is a sample command that uses the file:

    gcloud managed-kafka connectors update test-connector \
        --location=us-central1 \
        --connect-cluster=test-connect-cluster \
        --config-file=update_config.yaml
    

Go

Before trying this sample, follow the Go setup instructions in Install the client libraries. For more information, see the Managed Service for Apache Kafka Go API reference documentation.

To authenticate to Managed Service for Apache Kafka, set up Application Default Credentials(ADC). For more information, see Set up ADC for a local development environment.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/option"
	"google.golang.org/protobuf/types/known/fieldmaskpb"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
)

func updateConnector(w io.Writer, projectID, region, connectClusterID, connectorID string, config map[string]string, opts ...option.ClientOption) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// connectClusterID := "my-connect-cluster"
	// connectorID := "my-connector"
	// config := map[string]string{"tasks.max": "6"}
	ctx := context.Background()
	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
	}
	defer client.Close()

	connectorPath := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s/connectors/%s", projectID, region, connectClusterID, connectorID)
	connector := &managedkafkapb.Connector{
		Name:    connectorPath,
		Configs: config,
	}
	paths := []string{"configs"}
	updateMask := &fieldmaskpb.FieldMask{
		Paths: paths,
	}

	req := &managedkafkapb.UpdateConnectorRequest{
		UpdateMask: updateMask,
		Connector:  connector,
	}
	resp, err := client.UpdateConnector(ctx, req)
	if err != nil {
		return fmt.Errorf("client.UpdateConnector got err: %w", err)
	}
	fmt.Fprintf(w, "Updated connector: %#v\n", resp)
	return nil
}

Java

Before trying this sample, follow the Java setup instructions in Install the client libraries. For more information, see the Managed Service for Apache Kafka Java API reference documentation.

To authenticate to Managed Service for Apache Kafka, set up Application Default Credentials. For more information, see Set up ADC for a local development environment.

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.managedkafka.v1.Connector;
import com.google.cloud.managedkafka.v1.ConnectorName;
import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
import com.google.protobuf.FieldMask;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class UpdateConnector {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String clusterId = "my-connect-cluster";
    String connectorId = "my-connector";
    // The new value for the 'tasks.max' configuration.
    String maxTasks = "5";
    updateConnector(projectId, region, clusterId, connectorId, maxTasks);
  }

  public static void updateConnector(
      String projectId, String region, String clusterId, String connectorId, String maxTasks)
      throws IOException {
    try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
      Map<String, String> configMap = new HashMap<>();
      configMap.put("tasks.max", maxTasks);

      Connector connector =
          Connector.newBuilder()
              .setName(ConnectorName.of(projectId, region, clusterId, connectorId).toString())
              .putAllConfigs(configMap)
              .build();

      // The field mask specifies which fields to update. Here, we update the 'config' field.
      FieldMask updateMask = FieldMask.newBuilder().addPaths("config").build();

      // This operation is handled synchronously.
      Connector updatedConnector = managedKafkaConnectClient.updateConnector(connector, updateMask);
      System.out.printf("Updated connector: %s\n", updatedConnector.getName());
      System.out.println(updatedConnector.getAllFields());

    } catch (IOException | ApiException e) {
      System.err.printf("managedKafkaConnectClient.updateConnector got err: %s\n", e.getMessage());
    }
  }
}

Python

Before trying this sample, follow the Python setup instructions in Install the client libraries. For more information, see the Managed Service for Apache Kafka Python API reference documentation.

To authenticate to Managed Service for Apache Kafka, set up Application Default Credentials. For more information, see Set up ADC for a local development environment.

from google.api_core.exceptions import GoogleAPICallError
from google.cloud import managedkafka_v1
from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
    ManagedKafkaConnectClient,
)
from google.cloud.managedkafka_v1.types import Connector
from google.protobuf import field_mask_pb2

# TODO(developer)
# project_id = "my-project-id"
# region = "us-central1"
# connect_cluster_id = "my-connect-cluster"
# connector_id = "my-connector"
# configs = {
#     "tasks.max": "6",
#     "value.converter.schemas.enable": "true"
# }

connect_client = ManagedKafkaConnectClient()

connector = Connector()
connector.name = connect_client.connector_path(
    project_id, region, connect_cluster_id, connector_id
)
connector.configs = configs
update_mask = field_mask_pb2.FieldMask()
update_mask.paths.append("config")

# For a list of editable fields, one can check https://cloud.google.com/managed-service-for-apache-kafka/docs/connect-cluster/update-connector#editable-properties.
request = managedkafka_v1.UpdateConnectorRequest(
    update_mask=update_mask,
    connector=connector,
)

try:
    operation = connect_client.update_connector(request=request)
    print(f"Waiting for operation {operation.operation.name} to complete...")
    response = operation.result()
    print("Updated connector:", response)
except GoogleAPICallError as e:
    print(f"The operation failed with error: {e}")

You can also update the connector's task restart policy without
including the configuration, by using the `--task-restart-min-backoff`
and `--task-restart-max-backoff` flags. For example:

```sh
gcloud managed-kafka connectors update test-connector \
  --location=us-central1 \
  --connect-cluster=test-connect-cluster \
  --task-restart-min-backoff="60s" \
  --task-restart-max-backoff="90s"
Apache Kafka® is a registered trademark of The Apache Software Foundation or its affiliates in the United States and/or other countries.