コネクタを更新する

コネクタを編集して、構成を更新できます。たとえば、読み取りまたは書き込みを行うトピックの変更、データ変換の変更、エラー処理設定の調整などです。

Connect クラスタ内のコネクタを更新するには、 Google Cloud コンソール、gcloud CLI、Managed Service for Apache Kafka クライアント ライブラリ、または Managed Kafka API を使用します。オープンソースの Apache Kafka API を使用してコネクタを更新することはできません。

始める前に

コネクタを更新する前に、既存の構成を確認し、変更による影響を把握してください。

コネクタの更新に必要なロールと権限

コネクタの編集に必要な権限を取得するには、Connect クラスタを含むプロジェクトに対する Managed Kafka Connector 編集者 roles/managedkafka.connectorEditor)IAM ロールを付与するよう管理者に依頼してください。ロールの付与については、プロジェクト、フォルダ、組織に対するアクセス権の管理をご覧ください。

この事前定義ロールには、コネクタの編集に必要な権限が含まれています。必要とされる正確な権限については、「必要な権限」セクションを開いてご確認ください。

必要な権限

コネクタを編集するには、次の権限が必要です。

  • 親 Connect クラスタに対する更新コネクタ権限を付与します。 managedkafka.connectors.update
  • 親 Connect クラスタでリスト コネクタの権限を付与します。 This permission is only required for updating a connector using the Google Cloud console

カスタムロールや他の事前定義ロールを使用して、これらの権限を取得することもできます。

Managed Kafka Connector 編集者ロールの詳細については、Google Cloud Managed Service for Apache Kafka の事前定義ロールをご覧ください。

コネクタの編集可能なプロパティ

コネクタの編集可能なプロパティは、そのタイプによって異なります。サポートされているコネクタ タイプで編集可能なプロパティの概要は次のとおりです。

MirrorMaker 2.0 ソースコネクタ

  • カンマ区切りのトピック名またはトピックの正規表現: 複製するトピック。

    プロパティの詳細については、トピック名をご覧ください。

  • 構成: コネクタの追加の構成設定。

    プロパティの詳細については、 構成をご覧ください。

  • タスクの再起動ポリシー: 失敗したコネクタ タスクを再起動するためのポリシー。

    このプロパティの詳細については、 タスクの再起動ポリシーをご覧ください。

BigQuery シンクコネクタ

  • トピック: データをストリーミングする Kafka トピック。

    プロパティの詳細については、トピックをご覧ください。

  • データセット: データを保存する BigQuery データセット。

    プロパティの詳細については、データセットをご覧ください。

  • 構成: コネクタの追加の構成設定。

    プロパティの詳細については、構成をご覧ください。

  • タスクの再起動ポリシー: 失敗したコネクタ タスクを再起動するためのポリシー。

    このプロパティの詳細については、タスクの再起動ポリシーをご覧ください。

Cloud Storage シンク コネクタ

  • トピック: データをストリーミングする Kafka トピック。

    プロパティの詳細については、トピックをご覧ください。

  • Cloud Storage バケット: データを保存する Cloud Storage バケット。

    プロパティの詳細については、バケットをご覧ください。

  • 構成: コネクタの追加の構成設定。

    プロパティの詳細については、構成をご覧ください。

  • タスクの再起動ポリシー: 失敗したコネクタ タスクを再起動するためのポリシー。

    このプロパティの詳細については、タスクの再起動ポリシーをご覧ください。

Pub/Sub ソースコネクタ

  • Pub/Sub サブスクリプション: メッセージの受信元となる Pub/Sub サブスクリプション。
  • Kafka トピック: メッセージをストリーミングする Kafka トピック。
  • 構成: コネクタの追加の構成設定。詳細については、 コネクタを構成するをご覧ください。
  • タスクの再起動ポリシー: 失敗したコネクタ タスクを再起動するためのポリシー。詳細については、タスクの再起動ポリシーをご覧ください。

Pub/Sub シンクコネクタ

  • トピック: メッセージをストリーミングする Kafka トピック。

    プロパティの詳細については、トピックをご覧ください。

  • Pub/Sub トピック: メッセージの送信先となる Pub/Sub トピック。

    プロパティの詳細については、Pub/Sub トピックをご覧ください。

  • 構成: コネクタの追加の構成設定。

    プロパティの詳細については、構成をご覧ください。

  • タスクの再起動ポリシー: 失敗したコネクタ タスクを再起動するためのポリシー。

    このプロパティの詳細については、タスクの再起動ポリシーをご覧ください。

コネクタを更新する

コネクタを更新すると、変更が適用される間、データフローが一時的に中断されることがあります。

コンソール

  1. Google Cloud コンソールで、[クラスタを接続] ページに移動します。

    [Connect クラスタ] に移動

  2. 更新するコネクタをホストする Connect クラスタをクリックします。

    [クラスタの詳細を接続] ページが表示されます。

  3. [リソース] タブで、リストからコネクタを見つけて名前をクリックします。

    [コネクタの詳細] ページにリダイレクトされます。

  4. [編集] をクリックします。

  5. コネクタの必須プロパティを更新します。使用できるプロパティは、コネクタのタイプによって異なります。

  6. [保存] をクリックします。

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. コネクタを更新するには、gcloud managed-kafka connectors update コマンドを使用します。

    コネクタの構成を更新するには、カンマ区切りの Key-Value ペアを含む --configs フラグを使用するか、JSON ファイルまたは YAML ファイルのパスを含む --config-file フラグを使用します。

    次に、カンマ区切りの Key-Value ペアで --configs フラグを使用する構文を示します。

    gcloud managed-kafka connectors update CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID \
        --configs=KEY1=VALUE1,KEY2=VALUE2...
    

    JSON ファイルまたは YAML ファイルのパスで --config-file フラグを使用する構文は次のとおりです。

    gcloud managed-kafka connectors update CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID \
        --config-file=PATH_TO_CONFIG_FILE
    

    次のように置き換えます。

    • CONNECTOR_ID: 必須。更新するコネクタの ID。
    • LOCATION: 必須。コネクタを含む Connect クラスタのロケーション。
    • CONNECT_CLUSTER_ID: 必須。コネクタを含む Connect クラスタの ID。
    • KEY1=VALUE1,KEY2=VALUE2...: 更新する構成プロパティのカンマ区切りリスト。例: tasks.max=2,value.converter.schemas.enable=true
    • PATH_TO_CONFIG_FILE: 更新する構成プロパティを含む JSON または YAML ファイルへのパス。例: config.json

    --configs を使用したコマンドの例:

    gcloud managed-kafka connectors update test-connector \
        --location=us-central1 \
        --connect-cluster=test-connect-cluster \
        --configs=tasks.max=2,value.converter.schemas.enable=true
    

    --config-file を使用したコマンドの例。以下は、update_config.yaml という名前のサンプル ファイルです。

    tasks.max: 3
    topic: updated-test-topic
    

    次のサンプル コマンドでは、ファイルを使用しています。

    gcloud managed-kafka connectors update test-connector \
        --location=us-central1 \
        --connect-cluster=test-connect-cluster \
        --config-file=update_config.yaml
    
  3. Go

    このサンプルを試す前に、 クライアント ライブラリをインストールするにある Go の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Go API のリファレンス ドキュメントをご覧ください。

    Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報(ADC)を設定します。詳細については、ローカル開発環境の ADC の設定をご覧ください。

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    	"google.golang.org/protobuf/types/known/fieldmaskpb"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func updateConnector(w io.Writer, projectID, region, connectClusterID, connectorID string, config map[string]string, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// connectClusterID := "my-connect-cluster"
    	// connectorID := "my-connector"
    	// config := map[string]string{"tasks.max": "6"}
    	ctx := context.Background()
    	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
    	}
    	defer client.Close()
    
    	connectorPath := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s/connectors/%s", projectID, region, connectClusterID, connectorID)
    	connector := &managedkafkapb.Connector{
    		Name:    connectorPath,
    		Configs: config,
    	}
    	paths := []string{"configs"}
    	updateMask := &fieldmaskpb.FieldMask{
    		Paths: paths,
    	}
    
    	req := &managedkafkapb.UpdateConnectorRequest{
    		UpdateMask: updateMask,
    		Connector:  connector,
    	}
    	resp, err := client.UpdateConnector(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.UpdateConnector got err: %w", err)
    	}
    	fmt.Fprintf(w, "Updated connector: %#v\n", resp)
    	return nil
    }
    

    Java

    このサンプルを試す前に、 クライアント ライブラリをインストールするにある Java の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Java API リファレンス ドキュメントをご覧ください。

    Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、 ローカル開発環境の ADC の設定をご覧ください。

    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.Connector;
    import com.google.cloud.managedkafka.v1.ConnectorName;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
    import com.google.protobuf.FieldMask;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    public class UpdateConnector {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-connect-cluster";
        String connectorId = "my-connector";
        // The new value for the 'tasks.max' configuration.
        String maxTasks = "5";
        updateConnector(projectId, region, clusterId, connectorId, maxTasks);
      }
    
      public static void updateConnector(
          String projectId, String region, String clusterId, String connectorId, String maxTasks)
          throws IOException {
        try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
          Map<String, String> configMap = new HashMap<>();
          configMap.put("tasks.max", maxTasks);
    
          Connector connector =
              Connector.newBuilder()
                  .setName(ConnectorName.of(projectId, region, clusterId, connectorId).toString())
                  .putAllConfigs(configMap)
                  .build();
    
          // The field mask specifies which fields to update. Here, we update the 'config' field.
          FieldMask updateMask = FieldMask.newBuilder().addPaths("config").build();
    
          // This operation is handled synchronously.
          Connector updatedConnector = managedKafkaConnectClient.updateConnector(connector, updateMask);
          System.out.printf("Updated connector: %s\n", updatedConnector.getName());
          System.out.println(updatedConnector.getAllFields());
    
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaConnectClient.updateConnector got err: %s\n", e.getMessage());
        }
      }
    }

    Python

    このサンプルを試す前に、 クライアント ライブラリをインストールするの Python の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Python API リファレンス ドキュメントをご覧ください。

    Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の ADC の設定をご覧ください。

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud import managedkafka_v1
    from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
        ManagedKafkaConnectClient,
    )
    from google.cloud.managedkafka_v1.types import Connector
    from google.protobuf import field_mask_pb2
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # connect_cluster_id = "my-connect-cluster"
    # connector_id = "my-connector"
    # configs = {
    #     "tasks.max": "6",
    #     "value.converter.schemas.enable": "true"
    # }
    
    connect_client = ManagedKafkaConnectClient()
    
    connector = Connector()
    connector.name = connect_client.connector_path(
        project_id, region, connect_cluster_id, connector_id
    )
    connector.configs = configs
    update_mask = field_mask_pb2.FieldMask()
    update_mask.paths.append("config")
    
    # For a list of editable fields, one can check https://cloud.google.com/managed-service-for-apache-kafka/docs/connect-cluster/update-connector#editable-properties.
    request = managedkafka_v1.UpdateConnectorRequest(
        update_mask=update_mask,
        connector=connector,
    )
    
    try:
        operation = connect_client.update_connector(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        response = operation.result()
        print("Updated connector:", response)
    except GoogleAPICallError as e:
        print(f"The operation failed with error: {e}")
    

    You can also update the connector's task restart policy without
    including the configuration, by using the `--task-restart-min-backoff`
    and `--task-restart-max-backoff` flags. For example:
    
    ```sh
    gcloud managed-kafka connectors update test-connector \
      --location=us-central1 \
      --connect-cluster=test-connect-cluster \
      --task-restart-min-backoff="60s" \
      --task-restart-max-backoff="90s"
    
Apache Kafka® は、Apache Software Foundation または米国その他の諸国における関連会社の商標です。