コネクタを一時停止、再開、停止、再起動する

コネクタの動作は、一時停止、再開、停止、再起動によって制御できます。これらのアクションを使用すると、コネクタを削除して再作成することなく、データフローを管理して問題を解決できます。

Connect クラスタ内のコネクタを一時停止、再開、停止、再起動するには、 Google Cloud コンソール、gcloud CLI、Managed Service for Apache Kafka クライアント ライブラリ、または Managed Kafka API を使用します。オープンソースの Apache Kafka API を使用してコネクタの状態を変更することはできません。

コネクタを一時停止、再開、停止、再起動するために必要なロールと権限

コネクタの一時停止、再開、停止、再起動に必要な権限を取得するには、Connect クラスタを含むプロジェクトに対する Managed Kafka Connector 編集者 roles/managedkafka.connectorEditor)IAM ロールを付与するよう管理者に依頼してください。ロールの付与については、プロジェクト、フォルダ、組織に対するアクセス権の管理をご覧ください。

この事前定義ロールには、コネクタの一時停止、再開、停止、再起動に必要な権限が含まれています。必要とされる正確な権限については、「必要な権限」セクションを開いてご確認ください。

必要な権限

コネクタを一時停止、再開、停止、再起動するには、次の権限が必要です。

  • リクエストされたコネクタにコネクタの一時停止権限を付与します。 managedkafka.connectors.pause
  • リクエストされたコネクタにコネクタの再開権限を付与します。 managedkafka.connectors.resume
  • リクエストされたコネクタにコネクタの再起動権限を付与します。 managedkafka.connectors.restart
  • リクエストされたコネクタにコネクタの停止権限を付与します。 managedkafka.connectors.stop

カスタムロールや他の事前定義ロールを使用して、これらの権限を取得することもできます。

Managed Kafka コネクタ編集者ロールの詳細については、Managed Service for Apache Kafka の事前定義ロールをご覧ください。

コネクタを一時停止する

コネクタを一時停止すると、その状態が保持されます。つまり、コネクタはメッセージやデータの処理を中断した場所を記憶しています。コネクタが再開されるまで、メッセージ処理が停止します。一時停止したコネクタを再開すると、一時停止した場所から続行されます。これは、コネクタの設定を失うことなく、トラブルシューティングやメンテナンスを行う場合に便利です。

コンソール

  1. Google Cloud コンソールで、[クラスタを接続] ページに移動します。

    [Connect クラスタ] に移動

  2. 一時停止するコネクタをホストする Connect クラスタをクリックします。

    [クラスタの詳細を接続] ページが表示されます。

  3. [リソース] タブで、リストからコネクタを見つけて名前をクリックします。

    [コネクタの詳細] ページにリダイレクトされます。

  4. [一時停止] をクリックします。

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. コネクタを一時停止するには、gcloud managed-kafka connectors pause コマンドを使用します。

    gcloud managed-kafka connectors pause CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID
    

    次のように置き換えます。

    • CONNECTOR_ID: 必須。一時停止するコネクタの ID。
    • LOCATION: 必須。コネクタを含む Connect クラスタのロケーション。
    • CONNECT_CLUSTER_ID: 必須。コネクタを含む Connect クラスタの ID。
  3. Go

    このサンプルを試す前に、 クライアント ライブラリをインストールするにある Go の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Go API のリファレンス ドキュメントをご覧ください。

    Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報(ADC)を設定します。詳細については、ローカル開発環境の ADC の設定をご覧ください。

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func pauseConnector(w io.Writer, projectID, region, connectClusterID, connectorID string, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// connectClusterID := "my-connect-cluster"
    	// connectorID := "my-connector"
    	ctx := context.Background()
    	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
    	}
    	defer client.Close()
    
    	connectorPath := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s/connectors/%s", projectID, region, connectClusterID, connectorID)
    	req := &managedkafkapb.PauseConnectorRequest{
    		Name: connectorPath,
    	}
    	resp, err := client.PauseConnector(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.PauseConnector got err: %w", err)
    	}
    	fmt.Fprintf(w, "Paused connector: %#v\n", resp)
    	return nil
    }
    

    Java

    このサンプルを試す前に、 クライアント ライブラリをインストールするにある Java の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Java API リファレンス ドキュメントをご覧ください。

    Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、 ローカル開発環境の ADC の設定をご覧ください。

    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ConnectorName;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
    import com.google.cloud.managedkafka.v1.PauseConnectorRequest;
    import java.io.IOException;
    
    public class PauseConnector {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String connectClusterId = "my-connect-cluster";
        String connectorId = "my-connector";
        pauseConnector(projectId, region, connectClusterId, connectorId);
      }
    
      public static void pauseConnector(
          String projectId, String region, String connectClusterId, String connectorId)
          throws Exception {
        try (ManagedKafkaConnectClient managedKafkaConnectClient = 
            ManagedKafkaConnectClient.create()) {
          ConnectorName connectorName = ConnectorName.of(projectId, region, connectClusterId, 
              connectorId);
          PauseConnectorRequest request = PauseConnectorRequest.newBuilder()
              .setName(connectorName.toString()).build();
    
          // This operation is being handled synchronously.
          managedKafkaConnectClient.pauseConnector(request);
          System.out.printf("Connector %s paused successfully.\n", connectorId);
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaConnectClient.pauseConnector got err: %s\n", 
              e.getMessage());
        }
      }
    }
    

    Python

    このサンプルを試す前に、 クライアント ライブラリをインストールするの Python の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Python API リファレンス ドキュメントをご覧ください。

    Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の ADC の設定をご覧ください。

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
        ManagedKafkaConnectClient,
    )
    from google.cloud import managedkafka_v1
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # connect_cluster_id = "my-connect-cluster"
    # connector_id = "my-connector"
    
    connect_client = ManagedKafkaConnectClient()
    
    request = managedkafka_v1.PauseConnectorRequest(
        name=connect_client.connector_path(project_id, region, connect_cluster_id, connector_id),
    )
    
    try:
        operation = connect_client.pause_connector(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        operation.result()
        print(f"Paused connector {connector_id}")
    except GoogleAPICallError as e:
        print(f"Failed to pause connector {connector_id} with error: {e}")
    

コネクタを再開する

一時停止したコネクタを再開すると、中断したところからオペレーションが再開されます。

コンソール

  1. Google Cloud コンソールで、[クラスタを接続] ページに移動します。

    [Connect クラスタ] に移動

  2. 再開するコネクタをホストする Connect クラスタをクリックします。

    [クラスタの詳細を接続] ページが表示されます。

  3. [リソース] タブで、リストから一時停止したコネクタを見つけて、その名前をクリックします。

    [コネクタの詳細] ページにリダイレクトされます。

  4. [再開] をクリックします。

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. コネクタを再開するには、gcloud managed-kafka connectors resume コマンドを使用します。

    gcloud managed-kafka connectors resume CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID
    

    次のように置き換えます。

    • CONNECTOR_ID: 必須。再開するコネクタの ID。
    • LOCATION: 必須。コネクタを含む Connect クラスタのロケーション。
    • CONNECT_CLUSTER_ID: 必須。コネクタを含む Connect クラスタの ID。
  3. Go

    このサンプルを試す前に、 クライアント ライブラリをインストールするにある Go の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Go API のリファレンス ドキュメントをご覧ください。

    Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報(ADC)を設定します。詳細については、ローカル開発環境の ADC の設定をご覧ください。

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func resumeConnector(w io.Writer, projectID, region, connectClusterID, connectorID string, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// connectClusterID := "my-connect-cluster"
    	// connectorID := "my-connector"
    	ctx := context.Background()
    	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
    	}
    	defer client.Close()
    
    	connectorPath := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s/connectors/%s", projectID, region, connectClusterID, connectorID)
    	req := &managedkafkapb.ResumeConnectorRequest{
    		Name: connectorPath,
    	}
    	resp, err := client.ResumeConnector(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.ResumeConnector got err: %w", err)
    	}
    	fmt.Fprintf(w, "Resumed connector: %#v\n", resp)
    	return nil
    }
    

    Java

    このサンプルを試す前に、 クライアント ライブラリをインストールするにある Java の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Java API リファレンス ドキュメントをご覧ください。

    Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、 ローカル開発環境の ADC の設定をご覧ください。

    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ConnectorName;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
    import com.google.cloud.managedkafka.v1.ResumeConnectorRequest;
    import java.io.IOException;
    
    public class ResumeConnector {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String connectClusterId = "my-connect-cluster";
        String connectorId = "my-connector";
        resumeConnector(projectId, region, connectClusterId, connectorId);
      }
    
      public static void resumeConnector(
          String projectId, String region, String connectClusterId, String connectorId)
          throws Exception {
        try (ManagedKafkaConnectClient managedKafkaConnectClient = 
            ManagedKafkaConnectClient.create()) {
          ConnectorName connectorName = ConnectorName.of(projectId, region, connectClusterId, 
              connectorId);
          ResumeConnectorRequest request = ResumeConnectorRequest.newBuilder()
              .setName(connectorName.toString()).build();
    
          // This operation is being handled synchronously.
          managedKafkaConnectClient.resumeConnector(request);
          System.out.printf("Connector %s resumed successfully.\n", connectorId);
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaConnectClient.resumeConnector got err: %s\n", 
              e.getMessage());
        }
      }
    }
    

    Python

    このサンプルを試す前に、 クライアント ライブラリをインストールするの Python の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Python API リファレンス ドキュメントをご覧ください。

    Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の ADC の設定をご覧ください。

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
        ManagedKafkaConnectClient,
    )
    from google.cloud import managedkafka_v1
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # connect_cluster_id = "my-connect-cluster"
    # connector_id = "my-connector"
    
    connect_client = ManagedKafkaConnectClient()
    
    request = managedkafka_v1.ResumeConnectorRequest(
        name=connect_client.connector_path(project_id, region, connect_cluster_id, connector_id),
    )
    
    try:
        operation = connect_client.resume_connector(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        operation.result()
        print(f"Resumed connector {connector_id}")
    except GoogleAPICallError as e:
        print(f"Failed to resume connector {connector_id} with error: {e}")
    

コネクタを停止する

コネクタを停止すると、コネクタのすべてのタスクが停止します。コネクタを停止すると、その状態が保持されます。コネクタを再び実行するには、コネクタを再起動します。ログと指標も永続的に保存されます。

コンソール

  1. Google Cloud コンソールで、[クラスタを接続] ページに移動します。

    [Connect クラスタ] に移動

  2. 停止するコネクタをホストする Connect クラスタをクリックします。

    [クラスタの詳細を接続] ページが表示されます。

  3. [リソース] タブで、リストからコネクタを見つけて名前をクリックします。

    [コネクタの詳細] ページにリダイレクトされます。

  4. [停止] をクリックします。

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. コネクタを停止するには、gcloud managed-kafka connectors stop コマンドを使用します。

    gcloud managed-kafka connectors stop CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID
    

    次のように置き換えます。

    • CONNECTOR_ID: 必須。停止するコネクタの ID。
    • LOCATION: 必須。コネクタを含む Connect クラスタのロケーション。
    • CONNECT_CLUSTER_ID: 必須。コネクタを含む Connect クラスタの ID。
  3. Go

    このサンプルを試す前に、 クライアント ライブラリをインストールするにある Go の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Go API のリファレンス ドキュメントをご覧ください。

    Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報(ADC)を設定します。詳細については、ローカル開発環境の ADC の設定をご覧ください。

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func stopConnector(w io.Writer, projectID, region, connectClusterID, connectorID string, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// connectClusterID := "my-connect-cluster"
    	// connectorID := "my-connector"
    	ctx := context.Background()
    	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
    	}
    	defer client.Close()
    
    	connectorPath := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s/connectors/%s", projectID, region, connectClusterID, connectorID)
    	req := &managedkafkapb.StopConnectorRequest{
    		Name: connectorPath,
    	}
    	resp, err := client.StopConnector(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.StopConnector got err: %w", err)
    	}
    	fmt.Fprintf(w, "Stopped connector: %#v\n", resp)
    	return nil
    }
    

    Java

    このサンプルを試す前に、 クライアント ライブラリをインストールするにある Java の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Java API リファレンス ドキュメントをご覧ください。

    Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、 ローカル開発環境の ADC の設定をご覧ください。

    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ConnectorName;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
    import com.google.cloud.managedkafka.v1.StopConnectorRequest;
    import java.io.IOException;
    
    public class StopConnector {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String connectClusterId = "my-connect-cluster";
        String connectorId = "my-connector";
        stopConnector(projectId, region, connectClusterId, connectorId);
      }
    
      public static void stopConnector(
          String projectId, String region, String connectClusterId, String connectorId)
          throws Exception {
        try (ManagedKafkaConnectClient managedKafkaConnectClient = 
            ManagedKafkaConnectClient.create()) {
          ConnectorName connectorName = ConnectorName.of(projectId, region, connectClusterId, 
              connectorId);
          StopConnectorRequest request = StopConnectorRequest.newBuilder()
              .setName(connectorName.toString()).build();
    
          // This operation is being handled synchronously.
          managedKafkaConnectClient.stopConnector(request);
          System.out.printf("Connector %s stopped successfully.\n", connectorId);
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaConnectClient.stopConnector got err: %s\n", e.getMessage());
        }
      }
    }
    

    Python

    このサンプルを試す前に、 クライアント ライブラリをインストールするの Python の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Python API リファレンス ドキュメントをご覧ください。

    Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の ADC の設定をご覧ください。

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
        ManagedKafkaConnectClient,
    )
    from google.cloud import managedkafka_v1
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # connect_cluster_id = "my-connect-cluster"
    # connector_id = "my-connector"
    
    connect_client = ManagedKafkaConnectClient()
    
    request = managedkafka_v1.StopConnectorRequest(
        name=connect_client.connector_path(project_id, region, connect_cluster_id, connector_id),
    )
    
    try:
        operation = connect_client.stop_connector(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        operation.result()
        print(f"Stopped connector {connector_id}")
    except GoogleAPICallError as e:
        print(f"Failed to stop connector {connector_id} with error: {e}")
    

コネクタを再起動する

コネクタを再起動すると、タスクが完全に停止してから再起動します。これは、コネクタの状態を更新したり、構成の変更を適用したりする場合に便利です。

注: コネクタを再起動すると、データフローが一時的に中断されることがあります。

コンソール

  1. Google Cloud コンソールで、[クラスタを接続] ページに移動します。

    [Connect クラスタ] に移動

  2. 再起動するコネクタをホストする Connect クラスタをクリックします。

    [クラスタの詳細を接続] ページが表示されます。

  3. [リソース] タブで、リストからコネクタを見つけて名前をクリックします。

    [コネクタの詳細] ページにリダイレクトされます。

  4. [再起動] をクリックします。

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. コネクタを再起動するには、gcloud managed-kafka connectors restart コマンドを使用します。

    gcloud managed-kafka connectors restart CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID
    

    次のように置き換えます。

    • CONNECTOR_ID: 必須。再起動するコネクタの ID。
    • LOCATION: 必須。コネクタを含む Connect クラスタのロケーション。
    • CONNECT_CLUSTER_ID: 必須。コネクタを含む Connect クラスタの ID。
  3. Go

    このサンプルを試す前に、 クライアント ライブラリをインストールするにある Go の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Go API のリファレンス ドキュメントをご覧ください。

    Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報(ADC)を設定します。詳細については、ローカル開発環境の ADC の設定をご覧ください。

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func restartConnector(w io.Writer, projectID, region, connectClusterID, connectorID string, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// connectClusterID := "my-connect-cluster"
    	// connectorID := "my-connector"
    	ctx := context.Background()
    	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
    	}
    	defer client.Close()
    
    	connectorPath := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s/connectors/%s", projectID, region, connectClusterID, connectorID)
    	req := &managedkafkapb.RestartConnectorRequest{
    		Name: connectorPath,
    	}
    	resp, err := client.RestartConnector(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.RestartConnector got err: %w", err)
    	}
    	fmt.Fprintf(w, "Restarted connector: %#v\n", resp)
    	return nil
    }
    

    Java

    このサンプルを試す前に、 クライアント ライブラリをインストールするにある Java の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Java API リファレンス ドキュメントをご覧ください。

    Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、 ローカル開発環境の ADC の設定をご覧ください。

    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ConnectorName;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
    import com.google.cloud.managedkafka.v1.RestartConnectorRequest;
    import java.io.IOException;
    
    public class RestartConnector {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String connectClusterId = "my-connect-cluster";
        String connectorId = "my-connector";
        restartConnector(projectId, region, connectClusterId, connectorId);
      }
    
      public static void restartConnector(
          String projectId, String region, String connectClusterId, String connectorId)
          throws Exception {
        try (ManagedKafkaConnectClient managedKafkaConnectClient = 
            ManagedKafkaConnectClient.create()) {
          ConnectorName connectorName = ConnectorName.of(projectId, region, connectClusterId, 
              connectorId);
          RestartConnectorRequest request = RestartConnectorRequest.newBuilder()
              .setName(connectorName.toString()).build();
    
          // This operation is being handled synchronously.
          managedKafkaConnectClient.restartConnector(request);
          System.out.printf("Connector %s restarted successfully.\n", connectorId);
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaConnectClient.restartConnector got err: %s\n", 
              e.getMessage());
        }
      }
    }
    

    Python

    このサンプルを試す前に、 クライアント ライブラリをインストールするの Python の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Python API リファレンス ドキュメントをご覧ください。

    Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の ADC の設定をご覧ください。

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
        ManagedKafkaConnectClient,
    )
    from google.cloud import managedkafka_v1
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # connect_cluster_id = "my-connect-cluster"
    # connector_id = "my-connector"
    
    connect_client = ManagedKafkaConnectClient()
    
    request = managedkafka_v1.RestartConnectorRequest(
        name=connect_client.connector_path(project_id, region, connect_cluster_id, connector_id),
    )
    
    try:
        operation = connect_client.restart_connector(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        operation.result()
        print(f"Restarted connector {connector_id}")
    except GoogleAPICallError as e:
        print(f"Failed to restart connector {connector_id} with error: {e}")
    

Apache Kafka® は、Apache Software Foundation または米国その他の諸国における関連会社の商標です。