Tool: update_connector
Update an existing Google Cloud Managed Service for Apache Kafka Connect connector.
The configs field can be updated. The agent can first use the get_connector method to get the current connector configuration, to provide a baseline configuration for the user to edit. Use the example configurations provided below for reference.
- configs: Key-value pairs for connector properties. The following connectors support configuration updates:
- MirrorMaker 2.0 Source connector
- Example Configuration:
connector.class(required): "org.apache.kafka.connect.mirror.MirrorSourceConnector"name: "MM2_CONNECTOR_ID"source.cluster.alias(required): "source"target.cluster.alias(required): "target"topics(required): "TOPIC_NAME"source.cluster.bootstrap.servers(required): "SOURCE_CLUSTER_DNS"target.cluster.bootstrap.servers(required): "TARGET_CLUSTER_DNS"offset-syncs.topic.replication.factor: "1"source.cluster.security.protocol: "SASL_SSL"source.cluster.sasl.mechanism: "OAUTHBEARER"source.cluster.sasl.login.callback.handler.class: com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandlersource.cluster.sasl.jaas.config: org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;target.cluster.security.protocol: "SASL_SSL"target.cluster.sasl.mechanism: "OAUTHBEARER"target.cluster.sasl.login.callback.handler.class: "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler"target.cluster.sasl.jaas.config: "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
- Replace with these:
MM2_CONNECTOR_ID: The ID or name of the MirrorMaker 2.0 Source connector.TOPIC_NAME: The name of the Kafka topic(s) to mirror.SOURCE_CLUSTER_DNS: The DNS endpoint for the source Kafka cluster.TARGET_CLUSTER_DNS: The DNS endpoint for the target Kafka cluster.
- Example Configuration:
- MirrorMaker 2.0 Checkpoint connector
- Example Configuration:
connector.class(required): "org.apache.kafka.connect.mirror.MirrorCheckpointConnector"name: "MM2_CONNECTOR_ID"source.cluster.alias(required): "source"target.cluster.alias(required): "target"consumer-groups(required): "CONSUMER_GROUP_NAME"source.cluster.bootstrap.servers(required): "SOURCE_CLUSTER_DNS"target.cluster.bootstrap.servers(required): "TARGET_CLUSTER_DNS"source.cluster.security.protocol: "SASL_SSL"source.cluster.sasl.mechanism: "OAUTHBEARER"source.cluster.sasl.login.callback.handler.class: com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandlersource.cluster.sasl.jaas.config: org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;target.cluster.security.protocol: "SASL_SSL"target.cluster.sasl.mechanism: "OAUTHBEARER"target.cluster.sasl.login.callback.handler.class: "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler"target.cluster.sasl.jaas.config: "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
- Replace with these:
MM2_CONNECTOR_ID: The ID or name of the MirrorMaker 2.0 Source connector.CONSUMER_GROUP_NAME: The name of the consumer group to use for the checkpoint connector.SOURCE_CLUSTER_DNS: The DNS endpoint for the source Kafka cluster.TARGET_CLUSTER_DNS: The DNS endpoint for the target Kafka cluster.
- Example Configuration:
- MirrorMaker 2.0 Heartbeat connector
- Example Configuration:
connector.class(required): "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector"name: "MM2_CONNECTOR_ID"source.cluster.alias(required): "source"target.cluster.alias(required): "target"consumer-groups(required): "CONSUMER_GROUP_NAME"source.cluster.bootstrap.servers(required): "SOURCE_CLUSTER_DNS"target.cluster.bootstrap.servers(required): "TARGET_CLUSTER_DNS"source.cluster.security.protocol: "SASL_SSL"source.cluster.sasl.mechanism: "OAUTHBEARER"source.cluster.sasl.login.callback.handler.class: com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandlersource.cluster.sasl.jaas.config: org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;target.cluster.security.protocol: "SASL_SSL"target.cluster.sasl.mechanism: "OAUTHBEARER"target.cluster.sasl.login.callback.handler.class: "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler"target.cluster.sasl.jaas.config: "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
- Replace with these:
MM2_CONNECTOR_ID: The ID or name of the MirrorMaker 2.0 Heartbeat connector.CONSUMER_GROUP_NAME: The name of the consumer group to use for the heartbeat connector.SOURCE_CLUSTER_DNS: The DNS endpoint for the source Kafka cluster.TARGET_CLUSTER_DNS: The DNS endpoint for the target Kafka cluster.
- Example Configuration:
- BigQuery Sink connector
- Example Configuration:
name: "BQ_SINK_CONNECTOR_ID"project(required): "GCP_PROJECT_ID"topics: "TOPIC_ID"tasks.max: "3"connector.class(required): "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector"key.converter: "org.apache.kafka.connect.storage.StringConverter"value.converter: "org.apache.kafka.connect.json.JsonConverter"value.converter.schemas.enable: "false"defaultDataset(required): "BQ_DATASET_ID"
- Replace with these:
BQ_SINK_CONNECTOR_ID: The ID or name of the BigQuery Sink connector. The name of a connector is immutable.GCP_PROJECT_ID: The ID of the Google Cloud project where your BigQuery dataset resides.TOPIC_ID: The ID of the Managed Service for Apache Kafka topic from which the data flows to the BigQuery Sink connector.BQ_DATASET_ID: The ID of the BigQuery dataset that acts as the sink for the pipeline.
- Example Configuration:
- Cloud Storage Sink connector
- Example Configuration:
name: "GCS_SINK_CONNECTOR_ID"connector.class(required): "io.aiven.kafka.connect.gcs.GcsSinkConnector"tasks.max: "1"topics(required): "TOPIC_ID"gcs.bucket.name(required): "GCS_BUCKET_NAME"gcs.credentials.default: "true"format.output.type: "json"value.converter: "org.apache.kafka.connect.json.JsonConverter"value.converter.schemas.enable: "false"key.converter: "org.apache.kafka.connect.storage.StringConverter"
- Replace with these:
TOPIC_ID: The ID of the Managed Service for Apache Kafka topic from which the data flows to the Cloud Storage Sink connector.GCS_BUCKET_NAME: The name of the Cloud Storage bucket that acts as a sink for the pipeline.GCS_SINK_CONNECTOR_ID: The ID or name of the Cloud Storage Sink connector. The name of a connector is immutable.
- Example Configuration:
- Pub/Sub Source connector
- Example Configuration:
connector.class(required): "com.google.pubsub.kafka.source.CloudPubSubSourceConnector"cps.project(required): "PROJECT_ID"cps.subscription(required): "PUBSUB_SUBSCRIPTION_ID"kafka.topic(required): "KAFKA_TOPIC_ID"value.converter: "org.apache.kafka.connect.converters.ByteArrayConverter"key.converter: "org.apache.kafka.connect.storage.StringConverter"tasks.max: "3"
- Replace with these:
PROJECT_ID: The ID of the Google Cloud project where the Pub/Sub subscription resides.PUBSUB_SUBSCRIPTION_ID: The ID of the Pub/Sub subscription to pull data from.KAFKA_TOPIC_ID: The ID of the Kafka topic where data is written.
- Example Configuration:
- Pub/Sub Sink connector
- Example Configuration:
connector.class: "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector"name: "CPS_SINK_CONNECTOR_ID"tasks.max: "1"topics: "TOPIC_ID"value.converter: "org.apache.kafka.connect.storage.StringConverter"key.converter: "org.apache.kafka.connect.storage.StringConverter"cps.topic: "CPS_TOPIC_ID"cps.project: "GCP_PROJECT_ID"
- Replace with these:
CPS_SINK_CONNECTOR_ID: The ID or name of the Pub/Sub Sink connector. The name of a connector is immutable.TOPIC_ID: The ID of the Managed Service for Apache Kafka topic from which data is read by the Pub/Sub Sink connector.CPS_TOPIC_ID: The ID of the Pub/Sub topic to which data is published.GCP_PROJECT_ID: The ID of the Google Cloud project where your Pub/Sub topic resides.
- Example Configuration:
- MirrorMaker 2.0 Source connector
task_restart_policy(optional): A policy that specifies how to restart failed connector tasks. If not set, failed tasks won't be restarted.minimum_backoff(optional): The minimum amount of time to wait before retrying a failed task (e.g., "60s"). Defaults to 60 seconds.maximum_backoff(optional): The maximum amount of time to wait before retrying a failed task (e.g., "43200s" for 12 hours). Defaults to 12 hours.task_retry_disabled(optional): If true, task retry is disabled.
Important Notes:
- When calling update_connector, you must provide the name of the connector to update, formatted as
projects/{project}/locations/{location}/connectClusters/{connect_cluster_id}/connectors/{connector_id}.
The following sample demonstrate how to use curl to invoke the update_connector MCP tool.
| Curl Request |
|---|
curl --location 'https://managedkafka.googleapis.com/mcp' \ --header 'content-type: application/json' \ --header 'accept: application/json, text/event-stream' \ --data '{ "method": "tools/call", "params": { "name": "update_connector", "arguments": { // provide these details according to the tool's MCP specification } }, "jsonrpc": "2.0", "id": 1 }' |
Input Schema
Request message for UpdateConnector.
UpdateConnectorRequest
| JSON representation |
|---|
{ "name": string, "configs": { string: string, ... }, "taskRestartPolicyMinimumBackoff": string, "taskRestartPolicyMaximumBackoff": string, "fieldsToClear": [ string ], // Union field |
| Fields | |
|---|---|
name |
Required. The name of the connector to update. Format: projects/{project}/locations/{location}/connectClusters/{connect_cluster_id}/connectors/{connector_id} |
configs |
Optional. Connector config as keys/values. An object containing a list of |
taskRestartPolicyMinimumBackoff |
Optional. The minimum amount of time to wait before retrying a failed task. A duration in seconds with up to nine fractional digits, ending with ' |
taskRestartPolicyMaximumBackoff |
Optional. The maximum amount of time to wait before retrying a failed task. A duration in seconds with up to nine fractional digits, ending with ' |
fieldsToClear[] |
Optional. Fields to clear in the connector update. |
Union field
|
|
taskRestartPolicyTaskRetryDisabled |
Optional. If true, task retry is disabled. |
ConfigsEntry
| JSON representation |
|---|
{ "key": string, "value": string } |
| Fields | |
|---|---|
key |
|
value |
|
Duration
| JSON representation |
|---|
{ "seconds": string, "nanos": integer } |
| Fields | |
|---|---|
seconds |
Signed seconds of the span of time. Must be from -315,576,000,000 to +315,576,000,000 inclusive. Note: these bounds are computed from: 60 sec/min * 60 min/hr * 24 hr/day * 365.25 days/year * 10000 years |
nanos |
Signed fractions of a second at nanosecond resolution of the span of time. Durations less than one second are represented with a 0 |
Output Schema
A Kafka Connect connector in a given ConnectCluster.
Connector
| JSON representation |
|---|
{ "name": string, "configs": { string: string, ... }, "state": enum ( |
| Fields | |
|---|---|
name |
Identifier. The name of the connector. Structured like: projects/{project}/locations/{location}/connectClusters/{connect_cluster}/connectors/{connector} |
configs |
Optional. Connector config as keys/values. The keys of the map are connector property names, for example: An object containing a list of |
state |
Output only. The current state of the connector. |
Union field restart_policy. A policy that specifies how to restart the failed connectors/tasks in a Cluster resource. If not set, the failed connectors/tasks won't be restarted. restart_policy can be only one of the following: |
|
taskRestartPolicy |
Optional. Restarts the individual tasks of a Connector. |
TaskRetryPolicy
| JSON representation |
|---|
{ "minimumBackoff": string, "maximumBackoff": string, "taskRetryDisabled": boolean } |
| Fields | |
|---|---|
minimumBackoff |
Optional. The minimum amount of time to wait before retrying a failed task. This sets a lower bound for the backoff delay. A duration in seconds with up to nine fractional digits, ending with ' |
maximumBackoff |
Optional. The maximum amount of time to wait before retrying a failed task. This sets an upper bound for the backoff delay. A duration in seconds with up to nine fractional digits, ending with ' |
taskRetryDisabled |
Optional. If true, task retry is disabled. |
Duration
| JSON representation |
|---|
{ "seconds": string, "nanos": integer } |
| Fields | |
|---|---|
seconds |
Signed seconds of the span of time. Must be from -315,576,000,000 to +315,576,000,000 inclusive. Note: these bounds are computed from: 60 sec/min * 60 min/hr * 24 hr/day * 365.25 days/year * 10000 years |
nanos |
Signed fractions of a second at nanosecond resolution of the span of time. Durations less than one second are represented with a 0 |
ConfigsEntry
| JSON representation |
|---|
{ "key": string, "value": string } |
| Fields | |
|---|---|
key |
|
value |
|
Tool Annotations
Destructive Hint: ✅ | Idempotent Hint: ✅ | Read Only Hint: ❌ | Open World Hint: ❌