コネクタを編集して、構成を更新できます。たとえば、読み取りまたは書き込みを行うトピックの変更、データ変換の変更、エラー処理設定の調整などです。
Connect クラスタ内のコネクタを更新するには、 Google Cloud コンソール、gcloud CLI、Managed Service for Apache Kafka クライアント ライブラリ、または Managed Kafka API を使用します。オープンソースの Apache Kafka API を使用してコネクタを更新することはできません。
始める前に
コネクタを更新する前に、既存の構成を確認し、変更による影響を把握してください。
コネクタの更新に必要なロールと権限
コネクタの編集に必要な権限を取得するには、Connect クラスタを含むプロジェクトに対する Managed Kafka Connector 編集者 (roles/managedkafka.connectorEditor)IAM ロールを付与するよう管理者に依頼してください。ロールの付与については、プロジェクト、フォルダ、組織に対するアクセス権の管理をご覧ください。
この事前定義ロールには、コネクタの編集に必要な権限が含まれています。必要とされる正確な権限については、「必要な権限」セクションを開いてご確認ください。
必要な権限
コネクタを編集するには、次の権限が必要です。
-
親 Connect クラスタに対する更新コネクタ権限を付与します。
managedkafka.connectors.update -
親 Connect クラスタでリスト コネクタの権限を付与します。
This permission is only required for updating a connector using the Google Cloud console
カスタムロールや他の事前定義ロールを使用して、これらの権限を取得することもできます。
Managed Kafka Connector 編集者ロールの詳細については、Google Cloud Managed Service for Apache Kafka の事前定義ロールをご覧ください。
コネクタの編集可能なプロパティ
コネクタの編集可能なプロパティは、そのタイプによって異なります。サポートされているコネクタ タイプで編集可能なプロパティの概要は次のとおりです。
MirrorMaker 2.0 ソースコネクタ
- カンマ区切りのトピック名またはトピックの正規表現: 複製するトピック。
プロパティの詳細については、トピック名をご覧ください。
- 構成: コネクタの追加の構成設定。
プロパティの詳細については、 構成をご覧ください。
- タスクの再起動ポリシー: 失敗したコネクタ タスクを再起動するためのポリシー。
このプロパティの詳細については、 タスクの再起動ポリシーをご覧ください。
BigQuery シンクコネクタ
- トピック: データをストリーミングする Kafka トピック。
プロパティの詳細については、トピックをご覧ください。
- データセット: データを保存する BigQuery データセット。
プロパティの詳細については、データセットをご覧ください。
- 構成: コネクタの追加の構成設定。
プロパティの詳細については、構成をご覧ください。
- タスクの再起動ポリシー: 失敗したコネクタ タスクを再起動するためのポリシー。
このプロパティの詳細については、タスクの再起動ポリシーをご覧ください。
Cloud Storage シンク コネクタ
- トピック: データをストリーミングする Kafka トピック。
プロパティの詳細については、トピックをご覧ください。
- Cloud Storage バケット: データを保存する Cloud Storage バケット。
プロパティの詳細については、バケットをご覧ください。
- 構成: コネクタの追加の構成設定。
プロパティの詳細については、構成をご覧ください。
- タスクの再起動ポリシー: 失敗したコネクタ タスクを再起動するためのポリシー。
このプロパティの詳細については、タスクの再起動ポリシーをご覧ください。
Pub/Sub ソースコネクタ
- Pub/Sub サブスクリプション: メッセージの受信元となる Pub/Sub サブスクリプション。
- Kafka トピック: メッセージをストリーミングする Kafka トピック。
- 構成: コネクタの追加の構成設定。詳細については、 コネクタを構成するをご覧ください。
- タスクの再起動ポリシー: 失敗したコネクタ タスクを再起動するためのポリシー。詳細については、タスクの再起動ポリシーをご覧ください。
Pub/Sub シンクコネクタ
- トピック: メッセージをストリーミングする Kafka トピック。
プロパティの詳細については、トピックをご覧ください。
- Pub/Sub トピック: メッセージの送信先となる Pub/Sub トピック。
プロパティの詳細については、Pub/Sub トピックをご覧ください。
- 構成: コネクタの追加の構成設定。
プロパティの詳細については、構成をご覧ください。
- タスクの再起動ポリシー: 失敗したコネクタ タスクを再起動するためのポリシー。
このプロパティの詳細については、タスクの再起動ポリシーをご覧ください。
コネクタを更新する
コネクタを更新すると、変更が適用される間、データフローが一時的に中断されることがあります。
コンソール
Google Cloud コンソールで、[クラスタを接続] ページに移動します。
更新するコネクタをホストする Connect クラスタをクリックします。
[クラスタの詳細を接続] ページが表示されます。
[リソース] タブで、リストからコネクタを見つけて名前をクリックします。
[コネクタの詳細] ページにリダイレクトされます。
[編集] をクリックします。
コネクタの必須プロパティを更新します。使用できるプロパティは、コネクタのタイプによって異なります。
[保存] をクリックします。
gcloud
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
コネクタを更新するには、
gcloud managed-kafka connectors updateコマンドを使用します。コネクタの構成を更新するには、カンマ区切りの Key-Value ペアを含む
--configsフラグを使用するか、JSON ファイルまたは YAML ファイルのパスを含む--config-fileフラグを使用します。次に、カンマ区切りの Key-Value ペアで
--configsフラグを使用する構文を示します。gcloud managed-kafka connectors update CONNECTOR_ID \ --location=LOCATION \ --connect-cluster=CONNECT_CLUSTER_ID \ --configs=KEY1=VALUE1,KEY2=VALUE2...JSON ファイルまたは YAML ファイルのパスで
--config-fileフラグを使用する構文は次のとおりです。gcloud managed-kafka connectors update CONNECTOR_ID \ --location=LOCATION \ --connect-cluster=CONNECT_CLUSTER_ID \ --config-file=PATH_TO_CONFIG_FILE次のように置き換えます。
- CONNECTOR_ID: 必須。更新するコネクタの ID。
- LOCATION: 必須。コネクタを含む Connect クラスタのロケーション。
- CONNECT_CLUSTER_ID: 必須。コネクタを含む Connect クラスタの ID。
- KEY1=VALUE1,KEY2=VALUE2...: 更新する構成プロパティのカンマ区切りリスト。例:
tasks.max=2,value.converter.schemas.enable=true - PATH_TO_CONFIG_FILE: 更新する構成プロパティを含む JSON または YAML ファイルへのパス。例:
config.json
--configsを使用したコマンドの例:gcloud managed-kafka connectors update test-connector \ --location=us-central1 \ --connect-cluster=test-connect-cluster \ --configs=tasks.max=2,value.converter.schemas.enable=true--config-fileを使用したコマンドの例。以下は、update_config.yamlという名前のサンプル ファイルです。tasks.max: 3 topic: updated-test-topic次のサンプル コマンドでは、ファイルを使用しています。
gcloud managed-kafka connectors update test-connector \ --location=us-central1 \ --connect-cluster=test-connect-cluster \ --config-file=update_config.yaml
Go
このサンプルを試す前に、 クライアント ライブラリをインストールするにある Go の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Go API のリファレンス ドキュメントをご覧ください。
Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報(ADC)を設定します。詳細については、ローカル開発環境の ADC の設定をご覧ください。
Java
このサンプルを試す前に、 クライアント ライブラリをインストールするにある Java の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Java API リファレンス ドキュメントをご覧ください。
Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、 ローカル開発環境の ADC の設定をご覧ください。
Python
このサンプルを試す前に、 クライアント ライブラリをインストールするの Python の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Python API リファレンス ドキュメントをご覧ください。
Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の ADC の設定をご覧ください。
You can also update the connector's task restart policy without
including the configuration, by using the `--task-restart-min-backoff`
and `--task-restart-max-backoff` flags. For example:
```sh
gcloud managed-kafka connectors update test-connector \
--location=us-central1 \
--connect-cluster=test-connect-cluster \
--task-restart-min-backoff="60s" \
--task-restart-max-backoff="90s"