您可以修改连接器以更新其配置,例如更改其读取或写入的主题、修改数据转换或调整错误处理设置。
如需更新 Connect 集群中的连接器,您可以使用 Google Cloud 控制台、gcloud CLI、Managed Service for Apache Kafka 客户端库或 Managed Kafka API。您无法使用开源 Apache Kafka API 来更新连接器。
准备工作
在更新连接器之前,请先查看其现有配置,并了解您所做的任何更改可能会产生的影响。
更新连接器所需的角色和权限
如需获得修改连接器所需的权限,请让您的管理员为您授予包含 Connect 集群的项目的 Managed Kafka Connector Editor (roles/managedkafka.connectorEditor) IAM 角色。
如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限。
此预定义角色包含修改连接器所需的权限。如需查看所需的确切权限,请展开所需权限部分:
所需权限
如需修改连接器,您需要具备以下权限:
-
在父级 Connect 集群上授予更新连接器权限:
managedkafka.connectors.update -
在父级 Connect 集群上授予 list connectors 权限:
This permission is only required for updating a connector using the Google Cloud console
如需详细了解 Managed Kafka Connector Editor 角色,请参阅 Google Cloud Managed Service for Apache Kafka 预定义角色。
连接器的可修改属性
连接器的可修改属性取决于其类型。下表总结了支持的连接器类型可编辑的属性:
MirrorMaker 2.0 源连接器
- 以英文逗号分隔的主题名称或主题正则表达式:要复制的主题。
如需详细了解该属性,请参阅主题名称。
- 配置:连接器的其他配置设置。
如需详细了解该属性,请参阅 配置。
- 任务重启政策:用于重启失败的连接器任务的政策。
如需详细了解该属性,请参阅 任务重启政策。
BigQuery Sink 连接器
- 主题:要从中流式传输数据的 Kafka 主题。
如需详细了解该属性,请参阅主题。
- 数据集:用于存储数据的 BigQuery 数据集。
如需详细了解该属性,请参阅数据集。
- 配置:连接器的其他配置设置。
如需详细了解该属性,请参阅配置。
- 任务重启政策:用于重启失败的连接器任务的政策。
如需详细了解该属性,请参阅任务重启政策。
Cloud Storage 接收器连接器
- 主题:要从中流式传输数据的 Kafka 主题。
如需详细了解该属性,请参阅主题。
- Cloud Storage 存储桶:用于存储数据的 Cloud Storage 存储桶。
如需详细了解该属性,请参阅存储分区。
- 配置:连接器的其他配置设置。
如需详细了解该属性,请参阅配置。
- 任务重启政策:用于重启失败的连接器任务的政策。
如需详细了解该属性,请参阅任务重启政策。
Pub/Sub 来源连接器
- Pub/Sub 订阅:用于接收消息的 Pub/Sub 订阅。
- Kafka 主题:要将消息流式传输到的 Kafka 主题。
- 配置:连接器的其他配置设置。 如需了解详情,请参阅 配置连接器。
- 任务重启政策:用于重启失败的连接器任务的政策。如需了解详情,请参阅任务重启政策。
Pub/Sub 接收器连接器
- 主题:要从中流式传输消息的 Kafka 主题。
如需详细了解该属性,请参阅主题。
- Pub/Sub 主题:要向其发送消息的 Pub/Sub 主题。
如需详细了解该属性,请参阅 Pub/Sub 主题。
- 配置:连接器的其他配置设置。
如需详细了解该属性,请参阅配置。
- 任务重启政策:用于重启失败的连接器任务的政策。
如需详细了解该属性,请参阅任务重启政策。
更新连接器
更新连接器可能会导致在应用更改期间数据流暂时中断。
控制台
在 Google Cloud 控制台中,前往连接集群页面。
点击托管要更新的连接器的 Connect 集群。
系统会显示连接集群详情页面。
在资源标签页中,找到列表中的连接器,然后点击其名称。
系统会将您重定向到连接器详情页面。
点击修改。
更新连接器的必需属性。可用属性因连接器类型而异。
点击保存。
gcloud
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
使用
gcloud managed-kafka connectors update命令更新连接器:您可以使用
--configs标志(以英文逗号分隔的键值对)或--config-file标志(JSON 或 YAML 文件的路径)更新连接器的配置。以下语法使用
--configs标志以及以英文逗号分隔的键值对。gcloud managed-kafka connectors update CONNECTOR_ID \ --location=LOCATION \ --connect-cluster=CONNECT_CLUSTER_ID \ --configs=KEY1=VALUE1,KEY2=VALUE2...以下语法使用
--config-file标志以及 JSON 或 YAML 文件的路径。gcloud managed-kafka connectors update CONNECTOR_ID \ --location=LOCATION \ --connect-cluster=CONNECT_CLUSTER_ID \ --config-file=PATH_TO_CONFIG_FILE替换以下内容:
- CONNECTOR_ID:必填。要更新的连接器的 ID。
- LOCATION:必填。包含连接器的 Connect 集群的位置。
- CONNECT_CLUSTER_ID:必填。包含连接器的 Connect 集群的 ID。
- KEY1=VALUE1,KEY2=VALUE2...:要更新的配置属性(以英文逗号分隔)。例如
tasks.max=2,value.converter.schemas.enable=true。 - PATH_TO_CONFIG_FILE:包含要更新的配置属性的 JSON 或 YAML 文件的路径。例如
config.json。
使用
--configs的示例命令:gcloud managed-kafka connectors update test-connector \ --location=us-central1 \ --connect-cluster=test-connect-cluster \ --configs=tasks.max=2,value.converter.schemas.enable=true使用
--config-file的命令示例。以下是一个名为update_config.yaml的示例文件:tasks.max: 3 topic: updated-test-topic以下是使用该文件的示例命令:
gcloud managed-kafka connectors update test-connector \ --location=us-central1 \ --connect-cluster=test-connect-cluster \ --config-file=update_config.yaml
Go
在尝试此示例之前,请按照 安装客户端库中的 Go 设置说明进行操作。如需了解详情,请参阅 Managed Service for Apache Kafka Go API 参考文档。
如需向 Managed Service for Apache Kafka 进行身份验证,请设置应用默认凭据(ADC)。 如需了解详情,请参阅为本地开发环境设置 ADC。
Java
在尝试此示例之前,请按照 安装客户端库中的 Java 设置说明进行操作。如需了解详情,请参阅 Managed Service for Apache Kafka Java API 参考文档。
如需向 Managed Service for Apache Kafka 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅 为本地开发环境设置 ADC。
Python
在尝试此示例之前,请按照 安装客户端库中的 Python 设置说明进行操作。如需了解详情,请参阅 Managed Service for Apache Kafka Python API 参考文档。
如需向 Managed Service for Apache Kafka 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置 ADC。
You can also update the connector's task restart policy without
including the configuration, by using the `--task-restart-min-backoff`
and `--task-restart-max-backoff` flags. For example:
```sh
gcloud managed-kafka connectors update test-connector \
--location=us-central1 \
--connect-cluster=test-connect-cluster \
--task-restart-min-backoff="60s" \
--task-restart-max-backoff="90s"