您可以編輯連接器來更新設定,例如變更讀取或寫入的主題、修改資料轉換,或調整錯誤處理設定。
如要更新 Connect 叢集中的連結器,可以使用 Google Cloud 控制台、gcloud CLI、Managed Service for Apache Kafka 用戶端程式庫或 Managed Kafka API。您無法使用開放原始碼 Apache Kafka API 更新連接器。
事前準備
更新連接器前,請先檢查現有設定,並瞭解所做變更可能造成的影響。
更新連結器所需的角色和權限
如要取得編輯連接器所需的權限,請要求管理員在包含 Connect 叢集的專案中,授予您「Managed Kafka Connector Editor 」(roles/managedkafka.connectorEditor) IAM 角色。如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和組織的存取權」。
這個預先定義的角色具備編輯連接器所需的權限。如要查看確切的必要權限,請展開「Required permissions」(必要權限) 部分:
所需權限
如要編輯連接器,必須具備下列權限:
-
在上層 Connect 叢集上授予更新連接器權限:
managedkafka.connectors.update -
在父項 Connect 叢集上授予 list connectors 權限:
This permission is only required for updating a connector using the Google Cloud console
如要進一步瞭解 Managed Kafka Connector Editor 角色,請參閱 Google Cloud Managed Service for Apache Kafka 預先定義的角色。
連接器的可編輯屬性
連接器的可編輯屬性取決於其類型。以下摘要列出支援的連接器類型可編輯的屬性:
MirrorMaker 2.0 來源連接器
- 主題的名稱或規則運算式 (以半形逗號分隔):要複製的主題。
如要進一步瞭解這項屬性,請參閱「主題名稱」。
- 設定:連接器的其他設定。
如要進一步瞭解這項屬性,請參閱「 設定」。
- 工作重新啟動政策:重新啟動失敗連接器工作的政策。
如要進一步瞭解這項屬性,請參閱「 工作重新啟動政策」。
BigQuery Sink 連接器
- 主題:要從中串流資料的 Kafka 主題。
如要進一步瞭解這項屬性,請參閱「主題」。
- 資料集:用於儲存資料的 BigQuery 資料集。
如要進一步瞭解這項屬性,請參閱「資料集」。
- 設定:連接器的其他設定。
如要進一步瞭解屬性,請參閱「設定」。
- 任務重新啟動政策:重新啟動失敗連接器任務的政策。
如要進一步瞭解這項屬性,請參閱「工作重新啟動政策」。
Cloud Storage 接收器連接器
- 主題:要從中串流資料的 Kafka 主題。
如要進一步瞭解這項屬性,請參閱「主題」。
- Cloud Storage bucket:
用於儲存資料的 Cloud Storage bucket。
如要進一步瞭解這項屬性,請參閱「Bucket」。
- 設定:連接器的其他設定。
如要進一步瞭解這項屬性,請參閱「設定」。
- 工作重新啟動政策:重新啟動失敗連接器工作的政策。
如要進一步瞭解這項屬性,請參閱「工作重新啟動政策」。
Pub/Sub 來源連接器
- Pub/Sub 訂閱項目:接收訊息的 Pub/Sub 訂閱項目。
- Kafka 主題:要將訊息串流至的 Kafka 主題。
- 設定:連接器的其他設定。 詳情請參閱「 設定連接器」。
- 任務重新啟動政策:重新啟動失敗的連接器任務政策。詳情請參閱「工作重新啟動政策」。
Pub/Sub 接收器連接器
- 主題:要從中串流訊息的 Kafka 主題。
如要進一步瞭解該屬性,請參閱「主題」。
- Pub/Sub 主題:要傳送訊息的 Pub/Sub 主題。
如要進一步瞭解這項屬性,請參閱「Pub/Sub 主題」。
- 設定:連接器的其他設定。
如要進一步瞭解這項屬性,請參閱「設定」。
- 任務重新啟動政策:重新啟動失敗連接器任務的政策。
如要進一步瞭解這項屬性,請參閱「工作重新啟動政策」。
更新連接器
更新連接器時,系統會套用變更,因此資料流程可能會暫時中斷。
控制台
前往 Google Cloud 控制台的「Connect Clusters」(連結叢集) 頁面。
按一下要更新連接器的 Connect 叢集。
系統隨即會顯示「Connect cluster details」(連線叢集詳細資料) 頁面。
在「資源」分頁中,找出清單中的連結器,然後按一下連結器名稱。
系統會將您重新導向至「連接器詳細資料」頁面。
按一下 [編輯]。
更新連接器的必要屬性。可用屬性會因連結器類型而異。
按一下 [儲存]。
gcloud
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
使用
gcloud managed-kafka connectors update指令更新連接器:您可以使用
--configs標記搭配以半形逗號分隔的鍵/值組合,或使用--config-file標記搭配 JSON 或 YAML 檔案的路徑,更新連線器的設定。以下是使用
--configs旗標和以半形逗號分隔的鍵/值組合的語法。gcloud managed-kafka connectors update CONNECTOR_ID \ --location=LOCATION \ --connect-cluster=CONNECT_CLUSTER_ID \ --configs=KEY1=VALUE1,KEY2=VALUE2...以下語法會搭配
--config-file旗標,以及 JSON 或 YAML 檔案的路徑。gcloud managed-kafka connectors update CONNECTOR_ID \ --location=LOCATION \ --connect-cluster=CONNECT_CLUSTER_ID \ --config-file=PATH_TO_CONFIG_FILE更改下列內容:
- CONNECTOR_ID:必填,要更新的連接器 ID。
- LOCATION:必填,含有連接器的 Connect 叢集位置。
- CONNECT_CLUSTER_ID:必填,包含連接器的 Connect 叢集 ID。
- KEY1=VALUE1,KEY2=VALUE2...:以半形逗號分隔的設定屬性,用於更新。例如:
tasks.max=2,value.converter.schemas.enable=true。 - PATH_TO_CONFIG_FILE:包含要更新設定屬性的 JSON 或 YAML 檔案路徑。例如:
config.json。
使用
--configs的範例指令:gcloud managed-kafka connectors update test-connector \ --location=us-central1 \ --connect-cluster=test-connect-cluster \ --configs=tasks.max=2,value.converter.schemas.enable=true使用
--config-file的範例指令。以下是名為update_config.yaml的範例檔案:tasks.max: 3 topic: updated-test-topic以下是使用該檔案的指令範例:
gcloud managed-kafka connectors update test-connector \ --location=us-central1 \ --connect-cluster=test-connect-cluster \ --config-file=update_config.yaml
Go
在試用這個範例之前,請先按照「 安裝用戶端程式庫」中的 Go 設定說明操作。詳情請參閱 Managed Service for Apache Kafka Go API 參考說明文件。
如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證(ADC)。 詳情請參閱「為本機開發環境設定 ADC」。
Java
在試用這個範例之前,請先按照「 安裝用戶端程式庫」中的 Java 設定操作說明進行操作。詳情請參閱 Managed Service for Apache Kafka Java API 參考說明文件。
如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證。詳情請參閱「 為本機開發環境設定 ADC」。
Python
在試用這個範例之前,請先按照「 安裝用戶端程式庫」中的 Python 設定說明操作。詳情請參閱 Managed Service for Apache Kafka Python API 參考說明文件。
如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定 ADC」。
You can also update the connector's task restart policy without
including the configuration, by using the `--task-restart-min-backoff`
and `--task-restart-max-backoff` flags. For example:
```sh
gcloud managed-kafka connectors update test-connector \
--location=us-central1 \
--connect-cluster=test-connect-cluster \
--task-restart-min-backoff="60s" \
--task-restart-max-backoff="90s"