MCP Tools Reference: managedkafka

Tool: update_connector

Update an existing Google Cloud Managed Service for Apache Kafka Connect connector. To update a connector, please provide the Project ID, Location, Connect Cluster ID, and Connector ID.

The following connector fields can be updated. The agent can first use the get_connector method to get the current connector configuration, to provide a baseline configuration for the user to edit. Use the example configurations provided below for reference.

  • connector.configs: Key-value pairs for connector properties. The following connectors support configuration updates:
    • MirrorMaker 2.0 Source connector
      • Example Configuration:
        • connector.class (required): "org.apache.kafka.connect.mirror.MirrorSourceConnector"
        • name: "MM2_CONNECTOR_ID"
        • source.cluster.alias (required): "source"
        • target.cluster.alias (required): "target"
        • topics (required): "TOPIC_NAME"
        • source.cluster.bootstrap.servers (required): "SOURCE_CLUSTER_DNS"
        • target.cluster.bootstrap.servers (required): "TARGET_CLUSTER_DNS"
        • offset-syncs.topic.replication.factor: "1"
        • source.cluster.security.protocol: "SASL_SSL"
        • source.cluster.sasl.mechanism: "OAUTHBEARER"
        • source.cluster.sasl.login.callback.handler.class: com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
        • source.cluster.sasl.jaas.config: org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
        • target.cluster.security.protocol: "SASL_SSL"
        • target.cluster.sasl.mechanism: "OAUTHBEARER"
        • target.cluster.sasl.login.callback.handler.class: "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler"
        • target.cluster.sasl.jaas.config: "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
      • Replace with these:
        • MM2_CONNECTOR_ID: The ID or name of the MirrorMaker 2.0 Source connector.
        • TOPIC_NAME: The name of the Kafka topic(s) to mirror.
        • SOURCE_CLUSTER_DNS: The DNS endpoint for the source Kafka cluster.
        • TARGET_CLUSTER_DNS: The DNS endpoint for the target Kafka cluster.
    • MirrorMaker 2.0 Checkpoint connector
      • Example Configuration:
        • connector.class (required): "org.apache.kafka.connect.mirror.MirrorCheckpointConnector"
        • name: "MM2_CONNECTOR_ID"
        • source.cluster.alias (required): "source"
        • target.cluster.alias (required): "target"
        • consumer-groups (required): "CONSUMER_GROUP_NAME"
        • source.cluster.bootstrap.servers (required): "SOURCE_CLUSTER_DNS"
        • target.cluster.bootstrap.servers (required): "TARGET_CLUSTER_DNS"
        • source.cluster.security.protocol: "SASL_SSL"
        • source.cluster.sasl.mechanism: "OAUTHBEARER"
        • source.cluster.sasl.login.callback.handler.class: com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
        • source.cluster.sasl.jaas.config: org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
        • target.cluster.security.protocol: "SASL_SSL"
        • target.cluster.sasl.mechanism: "OAUTHBEARER"
        • target.cluster.sasl.login.callback.handler.class: "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler"
        • target.cluster.sasl.jaas.config: "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
      • Replace with these:
        • MM2_CONNECTOR_ID: The ID or name of the MirrorMaker 2.0 Source connector.
        • CONSUMER_GROUP_NAME: The name of the consumer group to use for the checkpoint connector.
        • SOURCE_CLUSTER_DNS: The DNS endpoint for the source Kafka cluster.
        • TARGET_CLUSTER_DNS: The DNS endpoint for the target Kafka cluster.
    • MirrorMaker 2.0 Heartbeat connector
      • Example Configuration:
        • connector.class (required): "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector"
        • name: "MM2_CONNECTOR_ID"
        • source.cluster.alias (required): "source"
        • target.cluster.alias (required): "target"
        • consumer-groups (required): "CONSUMER_GROUP_NAME"
        • source.cluster.bootstrap.servers (required): "SOURCE_CLUSTER_DNS"
        • target.cluster.bootstrap.servers (required): "TARGET_CLUSTER_DNS"
        • source.cluster.security.protocol: "SASL_SSL"
        • source.cluster.sasl.mechanism: "OAUTHBEARER"
        • source.cluster.sasl.login.callback.handler.class: com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
        • source.cluster.sasl.jaas.config: org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
        • target.cluster.security.protocol: "SASL_SSL"
        • target.cluster.sasl.mechanism: "OAUTHBEARER"
        • target.cluster.sasl.login.callback.handler.class: "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler"
        • target.cluster.sasl.jaas.config: "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
      • Replace with these:
        • MM2_CONNECTOR_ID: The ID or name of the MirrorMaker 2.0 Heartbeat connector.
        • CONSUMER_GROUP_NAME: The name of the consumer group to use for the heartbeat connector.
        • SOURCE_CLUSTER_DNS: The DNS endpoint for the source Kafka cluster.
        • TARGET_CLUSTER_DNS: The DNS endpoint for the target Kafka cluster.
    • BigQuery Sink connector
      • Example Configuration:
        • name: "BQ_SINK_CONNECTOR_ID"
        • project (required): "GCP_PROJECT_ID"
        • topics: "TOPIC_ID"
        • tasks.max: "3"
        • connector.class (required): "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector"
        • key.converter: "org.apache.kafka.connect.storage.StringConverter"
        • value.converter: "org.apache.kafka.connect.json.JsonConverter"
        • value.converter.schemas.enable: "false"
        • defaultDataset (required): "BQ_DATASET_ID"
      • Replace with these:
        • BQ_SINK_CONNECTOR_ID: The ID or name of the BigQuery Sink connector. The name of a connector is immutable.
        • GCP_PROJECT_ID: The ID of the Google Cloud project where your BigQuery dataset resides.
        • TOPIC_ID: The ID of the Managed Service for Apache Kafka topic from which the data flows to the BigQuery Sink connector.
        • BQ_DATASET_ID: The ID of the BigQuery dataset that acts as the sink for the pipeline.
    • Cloud Storage Sink connector
      • Example Configuration:
        • name: "GCS_SINK_CONNECTOR_ID"
        • connector.class (required): "io.aiven.kafka.connect.gcs.GcsSinkConnector"
        • tasks.max: "1"
        • topics (required): "TOPIC_ID"
        • gcs.bucket.name (required): "GCS_BUCKET_NAME"
        • gcs.credentials.default: "true"
        • format.output.type: "json"
        • value.converter: "org.apache.kafka.connect.json.JsonConverter"
        • value.converter.schemas.enable: "false"
        • key.converter: "org.apache.kafka.connect.storage.StringConverter"
      • Replace with these:
        • TOPIC_ID: The ID of the Managed Service for Apache Kafka topic from which the data flows to the Cloud Storage Sink connector.
        • GCS_BUCKET_NAME: The name of the Cloud Storage bucket that acts as a sink for the pipeline.
        • GCS_SINK_CONNECTOR_ID: The ID or name of the Cloud Storage Sink connector. The name of a connector is immutable.
    • Pub/Sub Source connector
      • Example Configuration:
        • connector.class (required): "com.google.pubsub.kafka.source.CloudPubSubSourceConnector"
        • cps.project (required): "PROJECT_ID"
        • cps.subscription (required): "PUBSUB_SUBSCRIPTION_ID"
        • kafka.topic (required): "KAFKA_TOPIC_ID"
        • value.converter: "org.apache.kafka.connect.converters.ByteArrayConverter"
        • key.converter: "org.apache.kafka.connect.storage.StringConverter"
        • tasks.max: "3"
      • Replace with these:
        • PROJECT_ID: The ID of the Google Cloud project where the Pub/Sub subscription resides.
        • PUBSUB_SUBSCRIPTION_ID: The ID of the Pub/Sub subscription to pull data from.
        • KAFKA_TOPIC_ID: The ID of the Kafka topic where data is written.
    • Pub/Sub Sink connector
      • Example Configuration:
        • connector.class: "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector"
        • name: "CPS_SINK_CONNECTOR_ID"
        • tasks.max: "1"
        • topics: "TOPIC_ID"
        • value.converter: "org.apache.kafka.connect.storage.StringConverter"
        • key.converter: "org.apache.kafka.connect.storage.StringConverter"
        • cps.topic: "CPS_TOPIC_ID"
        • cps.project: "GCP_PROJECT_ID"
      • Replace with these:
        • CPS_SINK_CONNECTOR_ID: The ID or name of the Pub/Sub Sink connector. The name of a connector is immutable.
        • TOPIC_ID: The ID of the Managed Service for Apache Kafka topic from which data is read by the Pub/Sub Sink connector.
        • CPS_TOPIC_ID: The ID of the Pub/Sub topic to which data is published.
        • GCP_PROJECT_ID: The ID of the Google Cloud project where your Pub/Sub topic resides.
  • connector.task_restart_policy: A policy that specifies how to restart failed connector tasks.
    • Minimum Backoff: The minimum amount of time to wait before retrying a failed task (e.g., "60s").
    • Maximum Backoff: The maximum amount of time to wait before retrying a failed task (e.g., "43200s" for 12 hours).
    • Task Retry Disabled: If true, task retry is disabled.

Important Notes:

  • The UpdateConnectorRequest requires the following parameters:
    • update_mask: A field mask used to specify the fields to be overwritten. For example, to update configs and task_restart_policy.minimum_backoff, the mask would be "configs,task_restart_policy.minimum_backoff". A value of * will overwrite all fields.
    • connector: The connector configuration.
    • connector.name: The name of the connector to be updated in the format projects/{project}/locations/{location}/connectClusters/{connect_cluster_id}/connectors/{connector_id}.

The following sample demonstrate how to use curl to invoke the update_connector MCP tool.

Curl Request
                  
curl --location 'https://managedkafka.googleapis.com/mcp' \
--header 'content-type: application/json' \
--header 'accept: application/json, text/event-stream' \
--data '{
  "method": "tools/call",
  "params": {
    "name": "update_connector",
    "arguments": {
      // provide these details according to the tool's MCP specification
    }
  },
  "jsonrpc": "2.0",
  "id": 1
}'
                

Input Schema

Request for UpdateConnector.

UpdateConnectorRequest

JSON representation
{
  "updateMask": string,
  "connector": {
    object (Connector)
  }
}
Fields
updateMask

string (FieldMask format)

Required. Field mask is used to specify the fields to be overwritten in the cluster resource by the update. The fields specified in the update_mask are relative to the resource, not the full request. A field will be overwritten if it is in the mask. The mask is required and a value of * will update all fields.

This is a comma-separated list of fully qualified names of fields. Example: "user.displayName,photo".

connector

object (Connector)

Required. The connector to update. Its name field must be populated.

FieldMask

JSON representation
{
  "paths": [
    string
  ]
}
Fields
paths[]

string

The set of field mask paths.

Connector

JSON representation
{
  "name": string,
  "configs": {
    string: string,
    ...
  },
  "state": enum (State),

  // Union field restart_policy can be only one of the following:
  "taskRestartPolicy": {
    object (TaskRetryPolicy)
  }
  // End of list of possible types for union field restart_policy.
}
Fields
name

string

Identifier. The name of the connector. Structured like: projects/{project}/locations/{location}/connectClusters/{connect_cluster}/connectors/{connector}

configs

map (key: string, value: string)

Optional. Connector config as keys/values. The keys of the map are connector property names, for example: connector.class, tasks.max, key.converter.

An object containing a list of "key": value pairs. Example: { "name": "wrench", "mass": "1.3kg", "count": "3" }.

state

enum (State)

Output only. The current state of the connector.

Union field restart_policy. A policy that specifies how to restart the failed connectors/tasks in a Cluster resource. If not set, the failed connectors/tasks won't be restarted. restart_policy can be only one of the following:
taskRestartPolicy

object (TaskRetryPolicy)

Optional. Restarts the individual tasks of a Connector.

TaskRetryPolicy

JSON representation
{
  "minimumBackoff": string,
  "maximumBackoff": string,
  "taskRetryDisabled": boolean
}
Fields
minimumBackoff

string (Duration format)

Optional. The minimum amount of time to wait before retrying a failed task. This sets a lower bound for the backoff delay.

A duration in seconds with up to nine fractional digits, ending with 's'. Example: "3.5s".

maximumBackoff

string (Duration format)

Optional. The maximum amount of time to wait before retrying a failed task. This sets an upper bound for the backoff delay.

A duration in seconds with up to nine fractional digits, ending with 's'. Example: "3.5s".

taskRetryDisabled

boolean

Optional. If true, task retry is disabled.

Duration

JSON representation
{
  "seconds": string,
  "nanos": integer
}
Fields
seconds

string (int64 format)

Signed seconds of the span of time. Must be from -315,576,000,000 to +315,576,000,000 inclusive. Note: these bounds are computed from: 60 sec/min * 60 min/hr * 24 hr/day * 365.25 days/year * 10000 years

nanos

integer

Signed fractions of a second at nanosecond resolution of the span of time. Durations less than one second are represented with a 0 seconds field and a positive or negative nanos field. For durations of one second or more, a non-zero value for the nanos field must be of the same sign as the seconds field. Must be from -999,999,999 to +999,999,999 inclusive.

ConfigsEntry

JSON representation
{
  "key": string,
  "value": string
}
Fields
key

string

value

string

Output Schema

A Kafka Connect connector in a given ConnectCluster.

Connector

JSON representation
{
  "name": string,
  "configs": {
    string: string,
    ...
  },
  "state": enum (State),

  // Union field restart_policy can be only one of the following:
  "taskRestartPolicy": {
    object (TaskRetryPolicy)
  }
  // End of list of possible types for union field restart_policy.
}
Fields
name

string

Identifier. The name of the connector. Structured like: projects/{project}/locations/{location}/connectClusters/{connect_cluster}/connectors/{connector}

configs

map (key: string, value: string)

Optional. Connector config as keys/values. The keys of the map are connector property names, for example: connector.class, tasks.max, key.converter.

An object containing a list of "key": value pairs. Example: { "name": "wrench", "mass": "1.3kg", "count": "3" }.

state

enum (State)

Output only. The current state of the connector.

Union field restart_policy. A policy that specifies how to restart the failed connectors/tasks in a Cluster resource. If not set, the failed connectors/tasks won't be restarted. restart_policy can be only one of the following:
taskRestartPolicy

object (TaskRetryPolicy)

Optional. Restarts the individual tasks of a Connector.

TaskRetryPolicy

JSON representation
{
  "minimumBackoff": string,
  "maximumBackoff": string,
  "taskRetryDisabled": boolean
}
Fields
minimumBackoff

string (Duration format)

Optional. The minimum amount of time to wait before retrying a failed task. This sets a lower bound for the backoff delay.

A duration in seconds with up to nine fractional digits, ending with 's'. Example: "3.5s".

maximumBackoff

string (Duration format)

Optional. The maximum amount of time to wait before retrying a failed task. This sets an upper bound for the backoff delay.

A duration in seconds with up to nine fractional digits, ending with 's'. Example: "3.5s".

taskRetryDisabled

boolean

Optional. If true, task retry is disabled.

Duration

JSON representation
{
  "seconds": string,
  "nanos": integer
}
Fields
seconds

string (int64 format)

Signed seconds of the span of time. Must be from -315,576,000,000 to +315,576,000,000 inclusive. Note: these bounds are computed from: 60 sec/min * 60 min/hr * 24 hr/day * 365.25 days/year * 10000 years

nanos

integer

Signed fractions of a second at nanosecond resolution of the span of time. Durations less than one second are represented with a 0 seconds field and a positive or negative nanos field. For durations of one second or more, a non-zero value for the nanos field must be of the same sign as the seconds field. Must be from -999,999,999 to +999,999,999 inclusive.

ConfigsEntry

JSON representation
{
  "key": string,
  "value": string
}
Fields
key

string

value

string

Tool Annotations

Destructive Hint: ✅ | Idempotent Hint: ✅ | Read Only Hint: ❌ | Open World Hint: ❌