Tool: update_connector
Update an existing Google Cloud Managed Service for Apache Kafka Connect connector. To update a connector, please provide the Project ID, Location, Connect Cluster ID, and Connector ID.
The following connector fields can be updated. The agent can first use the get_connector method to get the current connector configuration, to provide a baseline configuration for the user to edit. Use the example configurations provided below for reference.
- connector.configs: Key-value pairs for connector properties. The following connectors support configuration updates:
- MirrorMaker 2.0 Source connector
- Example Configuration:
connector.class(required): "org.apache.kafka.connect.mirror.MirrorSourceConnector"name: "MM2_CONNECTOR_ID"source.cluster.alias(required): "source"target.cluster.alias(required): "target"topics(required): "TOPIC_NAME"source.cluster.bootstrap.servers(required): "SOURCE_CLUSTER_DNS"target.cluster.bootstrap.servers(required): "TARGET_CLUSTER_DNS"offset-syncs.topic.replication.factor: "1"source.cluster.security.protocol: "SASL_SSL"source.cluster.sasl.mechanism: "OAUTHBEARER"source.cluster.sasl.login.callback.handler.class: com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandlersource.cluster.sasl.jaas.config: org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;target.cluster.security.protocol: "SASL_SSL"target.cluster.sasl.mechanism: "OAUTHBEARER"target.cluster.sasl.login.callback.handler.class: "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler"target.cluster.sasl.jaas.config: "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
- Replace with these:
MM2_CONNECTOR_ID: The ID or name of the MirrorMaker 2.0 Source connector.TOPIC_NAME: The name of the Kafka topic(s) to mirror.SOURCE_CLUSTER_DNS: The DNS endpoint for the source Kafka cluster.TARGET_CLUSTER_DNS: The DNS endpoint for the target Kafka cluster.
- Example Configuration:
- MirrorMaker 2.0 Checkpoint connector
- Example Configuration:
connector.class(required): "org.apache.kafka.connect.mirror.MirrorCheckpointConnector"name: "MM2_CONNECTOR_ID"source.cluster.alias(required): "source"target.cluster.alias(required): "target"consumer-groups(required): "CONSUMER_GROUP_NAME"source.cluster.bootstrap.servers(required): "SOURCE_CLUSTER_DNS"target.cluster.bootstrap.servers(required): "TARGET_CLUSTER_DNS"source.cluster.security.protocol: "SASL_SSL"source.cluster.sasl.mechanism: "OAUTHBEARER"source.cluster.sasl.login.callback.handler.class: com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandlersource.cluster.sasl.jaas.config: org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;target.cluster.security.protocol: "SASL_SSL"target.cluster.sasl.mechanism: "OAUTHBEARER"target.cluster.sasl.login.callback.handler.class: "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler"target.cluster.sasl.jaas.config: "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
- Replace with these:
MM2_CONNECTOR_ID: The ID or name of the MirrorMaker 2.0 Source connector.CONSUMER_GROUP_NAME: The name of the consumer group to use for the checkpoint connector.SOURCE_CLUSTER_DNS: The DNS endpoint for the source Kafka cluster.TARGET_CLUSTER_DNS: The DNS endpoint for the target Kafka cluster.
- Example Configuration:
- MirrorMaker 2.0 Heartbeat connector
- Example Configuration:
connector.class(required): "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector"name: "MM2_CONNECTOR_ID"source.cluster.alias(required): "source"target.cluster.alias(required): "target"consumer-groups(required): "CONSUMER_GROUP_NAME"source.cluster.bootstrap.servers(required): "SOURCE_CLUSTER_DNS"target.cluster.bootstrap.servers(required): "TARGET_CLUSTER_DNS"source.cluster.security.protocol: "SASL_SSL"source.cluster.sasl.mechanism: "OAUTHBEARER"source.cluster.sasl.login.callback.handler.class: com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandlersource.cluster.sasl.jaas.config: org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;target.cluster.security.protocol: "SASL_SSL"target.cluster.sasl.mechanism: "OAUTHBEARER"target.cluster.sasl.login.callback.handler.class: "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler"target.cluster.sasl.jaas.config: "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
- Replace with these:
MM2_CONNECTOR_ID: The ID or name of the MirrorMaker 2.0 Heartbeat connector.CONSUMER_GROUP_NAME: The name of the consumer group to use for the heartbeat connector.SOURCE_CLUSTER_DNS: The DNS endpoint for the source Kafka cluster.TARGET_CLUSTER_DNS: The DNS endpoint for the target Kafka cluster.
- Example Configuration:
- BigQuery Sink connector
- Example Configuration:
name: "BQ_SINK_CONNECTOR_ID"project(required): "GCP_PROJECT_ID"topics: "TOPIC_ID"tasks.max: "3"connector.class(required): "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector"key.converter: "org.apache.kafka.connect.storage.StringConverter"value.converter: "org.apache.kafka.connect.json.JsonConverter"value.converter.schemas.enable: "false"defaultDataset(required): "BQ_DATASET_ID"
- Replace with these:
BQ_SINK_CONNECTOR_ID: The ID or name of the BigQuery Sink connector. The name of a connector is immutable.GCP_PROJECT_ID: The ID of the Google Cloud project where your BigQuery dataset resides.TOPIC_ID: The ID of the Managed Service for Apache Kafka topic from which the data flows to the BigQuery Sink connector.BQ_DATASET_ID: The ID of the BigQuery dataset that acts as the sink for the pipeline.
- Example Configuration:
- Cloud Storage Sink connector
- Example Configuration:
name: "GCS_SINK_CONNECTOR_ID"connector.class(required): "io.aiven.kafka.connect.gcs.GcsSinkConnector"tasks.max: "1"topics(required): "TOPIC_ID"gcs.bucket.name(required): "GCS_BUCKET_NAME"gcs.credentials.default: "true"format.output.type: "json"value.converter: "org.apache.kafka.connect.json.JsonConverter"value.converter.schemas.enable: "false"key.converter: "org.apache.kafka.connect.storage.StringConverter"
- Replace with these:
TOPIC_ID: The ID of the Managed Service for Apache Kafka topic from which the data flows to the Cloud Storage Sink connector.GCS_BUCKET_NAME: The name of the Cloud Storage bucket that acts as a sink for the pipeline.GCS_SINK_CONNECTOR_ID: The ID or name of the Cloud Storage Sink connector. The name of a connector is immutable.
- Example Configuration:
- Pub/Sub Source connector
- Example Configuration:
connector.class(required): "com.google.pubsub.kafka.source.CloudPubSubSourceConnector"cps.project(required): "PROJECT_ID"cps.subscription(required): "PUBSUB_SUBSCRIPTION_ID"kafka.topic(required): "KAFKA_TOPIC_ID"value.converter: "org.apache.kafka.connect.converters.ByteArrayConverter"key.converter: "org.apache.kafka.connect.storage.StringConverter"tasks.max: "3"
- Replace with these:
PROJECT_ID: The ID of the Google Cloud project where the Pub/Sub subscription resides.PUBSUB_SUBSCRIPTION_ID: The ID of the Pub/Sub subscription to pull data from.KAFKA_TOPIC_ID: The ID of the Kafka topic where data is written.
- Example Configuration:
- Pub/Sub Sink connector
- Example Configuration:
connector.class: "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector"name: "CPS_SINK_CONNECTOR_ID"tasks.max: "1"topics: "TOPIC_ID"value.converter: "org.apache.kafka.connect.storage.StringConverter"key.converter: "org.apache.kafka.connect.storage.StringConverter"cps.topic: "CPS_TOPIC_ID"cps.project: "GCP_PROJECT_ID"
- Replace with these:
CPS_SINK_CONNECTOR_ID: The ID or name of the Pub/Sub Sink connector. The name of a connector is immutable.TOPIC_ID: The ID of the Managed Service for Apache Kafka topic from which data is read by the Pub/Sub Sink connector.CPS_TOPIC_ID: The ID of the Pub/Sub topic to which data is published.GCP_PROJECT_ID: The ID of the Google Cloud project where your Pub/Sub topic resides.
- Example Configuration:
- MirrorMaker 2.0 Source connector
- connector.task_restart_policy: A policy that specifies how to restart failed connector tasks.
- Minimum Backoff: The minimum amount of time to wait before retrying a failed task (e.g., "60s").
- Maximum Backoff: The maximum amount of time to wait before retrying a failed task (e.g., "43200s" for 12 hours).
- Task Retry Disabled: If true, task retry is disabled.
Important Notes:
- The
UpdateConnectorRequestrequires the following parameters:update_mask: A field mask used to specify the fields to be overwritten. For example, to updateconfigsandtask_restart_policy.minimum_backoff, the mask would be"configs,task_restart_policy.minimum_backoff". A value of*will overwrite all fields.connector: The connector configuration.connector.name: The name of the connector to be updated in the formatprojects/{project}/locations/{location}/connectClusters/{connect_cluster_id}/connectors/{connector_id}.
The following sample demonstrate how to use curl to invoke the update_connector MCP tool.
| Curl Request |
|---|
curl --location 'https://managedkafka.googleapis.com/mcp' \ --header 'content-type: application/json' \ --header 'accept: application/json, text/event-stream' \ --data '{ "method": "tools/call", "params": { "name": "update_connector", "arguments": { // provide these details according to the tool's MCP specification } }, "jsonrpc": "2.0", "id": 1 }' |
Input Schema
Request for UpdateConnector.
UpdateConnectorRequest
| JSON representation |
|---|
{
"updateMask": string,
"connector": {
object ( |
| Fields | |
|---|---|
updateMask |
Required. Field mask is used to specify the fields to be overwritten in the cluster resource by the update. The fields specified in the update_mask are relative to the resource, not the full request. A field will be overwritten if it is in the mask. The mask is required and a value of * will update all fields. This is a comma-separated list of fully qualified names of fields. Example: |
connector |
Required. The connector to update. Its |
FieldMask
| JSON representation |
|---|
{ "paths": [ string ] } |
| Fields | |
|---|---|
paths[] |
The set of field mask paths. |
Connector
| JSON representation |
|---|
{ "name": string, "configs": { string: string, ... }, "state": enum ( |
| Fields | |
|---|---|
name |
Identifier. The name of the connector. Structured like: projects/{project}/locations/{location}/connectClusters/{connect_cluster}/connectors/{connector} |
configs |
Optional. Connector config as keys/values. The keys of the map are connector property names, for example: An object containing a list of |
state |
Output only. The current state of the connector. |
Union field restart_policy. A policy that specifies how to restart the failed connectors/tasks in a Cluster resource. If not set, the failed connectors/tasks won't be restarted. restart_policy can be only one of the following: |
|
taskRestartPolicy |
Optional. Restarts the individual tasks of a Connector. |
TaskRetryPolicy
| JSON representation |
|---|
{ "minimumBackoff": string, "maximumBackoff": string, "taskRetryDisabled": boolean } |
| Fields | |
|---|---|
minimumBackoff |
Optional. The minimum amount of time to wait before retrying a failed task. This sets a lower bound for the backoff delay. A duration in seconds with up to nine fractional digits, ending with ' |
maximumBackoff |
Optional. The maximum amount of time to wait before retrying a failed task. This sets an upper bound for the backoff delay. A duration in seconds with up to nine fractional digits, ending with ' |
taskRetryDisabled |
Optional. If true, task retry is disabled. |
Duration
| JSON representation |
|---|
{ "seconds": string, "nanos": integer } |
| Fields | |
|---|---|
seconds |
Signed seconds of the span of time. Must be from -315,576,000,000 to +315,576,000,000 inclusive. Note: these bounds are computed from: 60 sec/min * 60 min/hr * 24 hr/day * 365.25 days/year * 10000 years |
nanos |
Signed fractions of a second at nanosecond resolution of the span of time. Durations less than one second are represented with a 0 |
ConfigsEntry
| JSON representation |
|---|
{ "key": string, "value": string } |
| Fields | |
|---|---|
key |
|
value |
|
Output Schema
A Kafka Connect connector in a given ConnectCluster.
Connector
| JSON representation |
|---|
{ "name": string, "configs": { string: string, ... }, "state": enum ( |
| Fields | |
|---|---|
name |
Identifier. The name of the connector. Structured like: projects/{project}/locations/{location}/connectClusters/{connect_cluster}/connectors/{connector} |
configs |
Optional. Connector config as keys/values. The keys of the map are connector property names, for example: An object containing a list of |
state |
Output only. The current state of the connector. |
Union field restart_policy. A policy that specifies how to restart the failed connectors/tasks in a Cluster resource. If not set, the failed connectors/tasks won't be restarted. restart_policy can be only one of the following: |
|
taskRestartPolicy |
Optional. Restarts the individual tasks of a Connector. |
TaskRetryPolicy
| JSON representation |
|---|
{ "minimumBackoff": string, "maximumBackoff": string, "taskRetryDisabled": boolean } |
| Fields | |
|---|---|
minimumBackoff |
Optional. The minimum amount of time to wait before retrying a failed task. This sets a lower bound for the backoff delay. A duration in seconds with up to nine fractional digits, ending with ' |
maximumBackoff |
Optional. The maximum amount of time to wait before retrying a failed task. This sets an upper bound for the backoff delay. A duration in seconds with up to nine fractional digits, ending with ' |
taskRetryDisabled |
Optional. If true, task retry is disabled. |
Duration
| JSON representation |
|---|
{ "seconds": string, "nanos": integer } |
| Fields | |
|---|---|
seconds |
Signed seconds of the span of time. Must be from -315,576,000,000 to +315,576,000,000 inclusive. Note: these bounds are computed from: 60 sec/min * 60 min/hr * 24 hr/day * 365.25 days/year * 10000 years |
nanos |
Signed fractions of a second at nanosecond resolution of the span of time. Durations less than one second are represented with a 0 |
ConfigsEntry
| JSON representation |
|---|
{ "key": string, "value": string } |
| Fields | |
|---|---|
key |
|
value |
|
Tool Annotations
Destructive Hint: ✅ | Idempotent Hint: ✅ | Read Only Hint: ❌ | Open World Hint: ❌