更新连接器

您可以修改连接器以更新其配置,例如更改其读取或写入的主题、修改数据转换或调整错误处理设置。

如需更新 Connect 集群中的连接器,您可以使用 Google Cloud 控制台、gcloud CLI、Managed Service for Apache Kafka 客户端库或 Managed Kafka API。您无法使用开源 Apache Kafka API 来更新连接器。

准备工作

在更新连接器之前,请先查看其现有配置,并了解您所做的任何更改可能会产生的影响。

更新连接器所需的角色和权限

如需获得修改连接器所需的权限,请让您的管理员为您授予包含 Connect 集群的项目的 Managed Kafka Connector Editor (roles/managedkafka.connectorEditor) IAM 角色。 如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

此预定义角色包含修改连接器所需的权限。如需查看所需的确切权限,请展开所需权限部分:

所需权限

如需修改连接器,您需要具备以下权限:

  • 在父级 Connect 集群上授予更新连接器权限: managedkafka.connectors.update
  • 在父级 Connect 集群上授予 list connectors 权限: This permission is only required for updating a connector using the Google Cloud console

您也可以使用自定义角色或其他预定义角色来获取这些权限。

如需详细了解 Managed Kafka Connector Editor 角色,请参阅 Google Cloud Managed Service for Apache Kafka 预定义角色

连接器的可修改属性

连接器的可修改属性取决于其类型。下表总结了支持的连接器类型可编辑的属性:

MirrorMaker 2.0 源连接器

  • 以英文逗号分隔的主题名称或主题正则表达式:要复制的主题。

    如需详细了解该属性,请参阅主题名称

  • 配置:连接器的其他配置设置。

    如需详细了解该属性,请参阅 配置

  • 任务重启政策:用于重启失败的连接器任务的政策。

    如需详细了解该属性,请参阅 任务重启政策

BigQuery Sink 连接器

  • 主题:要从中流式传输数据的 Kafka 主题。

    如需详细了解该属性,请参阅主题

  • 数据集:用于存储数据的 BigQuery 数据集。

    如需详细了解该属性,请参阅数据集

  • 配置:连接器的其他配置设置。

    如需详细了解该属性,请参阅配置

  • 任务重启政策:用于重启失败的连接器任务的政策。

    如需详细了解该属性,请参阅任务重启政策

Cloud Storage 接收器连接器

  • 主题:要从中流式传输数据的 Kafka 主题。

    如需详细了解该属性,请参阅主题

  • Cloud Storage 存储桶:用于存储数据的 Cloud Storage 存储桶。

    如需详细了解该属性,请参阅存储分区

  • 配置:连接器的其他配置设置。

    如需详细了解该属性,请参阅配置

  • 任务重启政策:用于重启失败的连接器任务的政策。

    如需详细了解该属性,请参阅任务重启政策

Pub/Sub 来源连接器

  • Pub/Sub 订阅:用于接收消息的 Pub/Sub 订阅。
  • Kafka 主题:要将消息流式传输到的 Kafka 主题。
  • 配置:连接器的其他配置设置。 如需了解详情,请参阅 配置连接器
  • 任务重启政策:用于重启失败的连接器任务的政策。如需了解详情,请参阅任务重启政策

Pub/Sub 接收器连接器

  • 主题:要从中流式传输消息的 Kafka 主题。

    如需详细了解该属性,请参阅主题

  • Pub/Sub 主题:要向其发送消息的 Pub/Sub 主题。

    如需详细了解该属性,请参阅 Pub/Sub 主题

  • 配置:连接器的其他配置设置。

    如需详细了解该属性,请参阅配置

  • 任务重启政策:用于重启失败的连接器任务的政策。

    如需详细了解该属性,请参阅任务重启政策

更新连接器

更新连接器可能会导致在应用更改期间数据流暂时中断。

控制台

  1. 在 Google Cloud 控制台中,前往连接集群页面。

    前往“关联集群”

  2. 点击托管要更新的连接器的 Connect 集群。

    系统会显示连接集群详情页面。

  3. 资源标签页中,找到列表中的连接器,然后点击其名称。

    系统会将您重定向到连接器详情页面。

  4. 点击修改

  5. 更新连接器的必需属性。可用属性因连接器类型而异。

  6. 点击保存

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 使用 gcloud managed-kafka connectors update 命令更新连接器:

    您可以使用 --configs 标志(以英文逗号分隔的键值对)或 --config-file 标志(JSON 或 YAML 文件的路径)更新连接器的配置。

    以下语法使用 --configs 标志以及以英文逗号分隔的键值对。

    gcloud managed-kafka connectors update CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID \
        --configs=KEY1=VALUE1,KEY2=VALUE2...
    

    以下语法使用 --config-file 标志以及 JSON 或 YAML 文件的路径。

    gcloud managed-kafka connectors update CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID \
        --config-file=PATH_TO_CONFIG_FILE
    

    替换以下内容:

    • CONNECTOR_ID:必填。要更新的连接器的 ID。
    • LOCATION:必填。包含连接器的 Connect 集群的位置。
    • CONNECT_CLUSTER_ID:必填。包含连接器的 Connect 集群的 ID。
    • KEY1=VALUE1,KEY2=VALUE2...:要更新的配置属性(以英文逗号分隔)。例如 tasks.max=2,value.converter.schemas.enable=true
    • PATH_TO_CONFIG_FILE:包含要更新的配置属性的 JSON 或 YAML 文件的路径。例如 config.json

    使用 --configs 的示例命令:

    gcloud managed-kafka connectors update test-connector \
        --location=us-central1 \
        --connect-cluster=test-connect-cluster \
        --configs=tasks.max=2,value.converter.schemas.enable=true
    

    使用 --config-file 的命令示例。以下是一个名为 update_config.yaml 的示例文件:

    tasks.max: 3
    topic: updated-test-topic
    

    以下是使用该文件的示例命令:

    gcloud managed-kafka connectors update test-connector \
        --location=us-central1 \
        --connect-cluster=test-connect-cluster \
        --config-file=update_config.yaml
    
  3. Go

    在尝试此示例之前,请按照 安装客户端库中的 Go 设置说明进行操作。如需了解详情,请参阅 Managed Service for Apache Kafka Go API 参考文档

    如需向 Managed Service for Apache Kafka 进行身份验证,请设置应用默认凭据(ADC)。 如需了解详情,请参阅为本地开发环境设置 ADC

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    	"google.golang.org/protobuf/types/known/fieldmaskpb"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func updateConnector(w io.Writer, projectID, region, connectClusterID, connectorID string, config map[string]string, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// connectClusterID := "my-connect-cluster"
    	// connectorID := "my-connector"
    	// config := map[string]string{"tasks.max": "6"}
    	ctx := context.Background()
    	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
    	}
    	defer client.Close()
    
    	connectorPath := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s/connectors/%s", projectID, region, connectClusterID, connectorID)
    	connector := &managedkafkapb.Connector{
    		Name:    connectorPath,
    		Configs: config,
    	}
    	paths := []string{"configs"}
    	updateMask := &fieldmaskpb.FieldMask{
    		Paths: paths,
    	}
    
    	req := &managedkafkapb.UpdateConnectorRequest{
    		UpdateMask: updateMask,
    		Connector:  connector,
    	}
    	resp, err := client.UpdateConnector(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.UpdateConnector got err: %w", err)
    	}
    	fmt.Fprintf(w, "Updated connector: %#v\n", resp)
    	return nil
    }
    

    Java

    在尝试此示例之前,请按照 安装客户端库中的 Java 设置说明进行操作。如需了解详情,请参阅 Managed Service for Apache Kafka Java API 参考文档

    如需向 Managed Service for Apache Kafka 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅 为本地开发环境设置 ADC

    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.Connector;
    import com.google.cloud.managedkafka.v1.ConnectorName;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
    import com.google.protobuf.FieldMask;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    public class UpdateConnector {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-connect-cluster";
        String connectorId = "my-connector";
        // The new value for the 'tasks.max' configuration.
        String maxTasks = "5";
        updateConnector(projectId, region, clusterId, connectorId, maxTasks);
      }
    
      public static void updateConnector(
          String projectId, String region, String clusterId, String connectorId, String maxTasks)
          throws IOException {
        try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
          Map<String, String> configMap = new HashMap<>();
          configMap.put("tasks.max", maxTasks);
    
          Connector connector =
              Connector.newBuilder()
                  .setName(ConnectorName.of(projectId, region, clusterId, connectorId).toString())
                  .putAllConfigs(configMap)
                  .build();
    
          // The field mask specifies which fields to update. Here, we update the 'config' field.
          FieldMask updateMask = FieldMask.newBuilder().addPaths("config").build();
    
          // This operation is handled synchronously.
          Connector updatedConnector = managedKafkaConnectClient.updateConnector(connector, updateMask);
          System.out.printf("Updated connector: %s\n", updatedConnector.getName());
          System.out.println(updatedConnector.getAllFields());
    
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaConnectClient.updateConnector got err: %s\n", e.getMessage());
        }
      }
    }

    Python

    在尝试此示例之前,请按照 安装客户端库中的 Python 设置说明进行操作。如需了解详情,请参阅 Managed Service for Apache Kafka Python API 参考文档

    如需向 Managed Service for Apache Kafka 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置 ADC

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud import managedkafka_v1
    from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
        ManagedKafkaConnectClient,
    )
    from google.cloud.managedkafka_v1.types import Connector
    from google.protobuf import field_mask_pb2
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # connect_cluster_id = "my-connect-cluster"
    # connector_id = "my-connector"
    # configs = {
    #     "tasks.max": "6",
    #     "value.converter.schemas.enable": "true"
    # }
    
    connect_client = ManagedKafkaConnectClient()
    
    connector = Connector()
    connector.name = connect_client.connector_path(
        project_id, region, connect_cluster_id, connector_id
    )
    connector.configs = configs
    update_mask = field_mask_pb2.FieldMask()
    update_mask.paths.append("config")
    
    # For a list of editable fields, one can check https://cloud.google.com/managed-service-for-apache-kafka/docs/connect-cluster/update-connector#editable-properties.
    request = managedkafka_v1.UpdateConnectorRequest(
        update_mask=update_mask,
        connector=connector,
    )
    
    try:
        operation = connect_client.update_connector(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        response = operation.result()
        print("Updated connector:", response)
    except GoogleAPICallError as e:
        print(f"The operation failed with error: {e}")
    

    You can also update the connector's task restart policy without
    including the configuration, by using the `--task-restart-min-backoff`
    and `--task-restart-max-backoff` flags. For example:
    
    ```sh
    gcloud managed-kafka connectors update test-connector \
      --location=us-central1 \
      --connect-cluster=test-connect-cluster \
      --task-restart-min-backoff="60s" \
      --task-restart-max-backoff="90s"
    
Apache Kafka® 是 Apache Software Foundation 或其关联公司在美国和/或其他国家/地区的注册商标。