暫停、繼續、停止及重新啟動連接器

你可以暫停、繼續、停止或重新啟動連接器,藉此控制連接器的運作。您可透過這些動作管理資料流並解決問題,不必刪除及重新建立連接器。

如要暫停、恢復、停止或重新啟動 Connect 叢集中的連接器,可以使用 Google Cloud 控制台、gcloud CLI、Managed Service for Apache Kafka 用戶端程式庫或 Managed Kafka API。您無法使用開放原始碼 Apache Kafka API 變更連接器狀態。

暫停、繼續、停止或重新啟動連接器所需的角色和權限

如要取得暫停、繼續、停止或重新啟動連接器所需的權限,請要求管理員在包含 Connect 叢集的專案中,授予您「Managed Kafka Connector 編輯者 」(roles/managedkafka.connectorEditor) IAM 角色。如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和組織的存取權」。

這個預先定義的角色具備暫停、繼續、停止或重新啟動連接器所需的權限。如要查看確切的必要權限,請展開「Required permissions」(必要權限) 部分:

所需權限

如要暫停、繼續、停止或重新啟動連接器,您必須具備下列權限:

  • 在要求的連接器上授予暫停連接器權限: managedkafka.connectors.pause
  • 在要求的連接器上授予繼續執行連接器權限: managedkafka.connectors.resume
  • 在要求的連接器上授予重新啟動連接器權限: managedkafka.connectors.restart
  • 在要求的連接器上授予停止連接器權限: managedkafka.connectors.stop

您或許還可透過自訂角色或其他預先定義的角色取得這些權限。

如要進一步瞭解 Managed Kafka Connector 編輯者角色,請參閱「Managed Service for Apache Kafka 預先定義角色」。

暫停連接器

暫停連接器時,系統會保留其狀態。也就是說,連接器會記住上次處理訊息或資料的進度。訊息處理作業會停止,直到連接器重新啟用。您可以繼續先前暫停的連接器,並從暫停處繼續執行。這項功能有助於進行疑難排解或維護,同時保留連接器的設定。

控制台

  1. 前往 Google Cloud 控制台的「Connect Clusters」(連結叢集) 頁面。

    前往「Connect Clusters」(連結叢集)

  2. 按一下要暫停連接器的 Connect 叢集。

    系統隨即會顯示「Connect cluster details」(連線叢集詳細資料) 頁面。

  3. 在「資源」分頁中,找出清單中的連結器,然後按一下連結器名稱。

    系統會將您重新導向至「連接器詳細資料」頁面。

  4. 按一下「暫停」

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 使用 gcloud managed-kafka connectors pause 指令暫停連接器:

    gcloud managed-kafka connectors pause CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID
    

    更改下列內容:

    • CONNECTOR_ID:必填,要暫停的連接器 ID。
    • LOCATION:必填,包含連接器的 Connect 叢集位置。
    • CONNECT_CLUSTER_ID:必填,包含連接器的 Connect 叢集 ID。
  3. Go

    在試用這個範例之前,請先按照「 安裝用戶端程式庫」中的 Go 設定說明操作。詳情請參閱 Managed Service for Apache Kafka Go API 參考說明文件

    如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證(ADC)。 詳情請參閱「為本機開發環境設定 ADC」。

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func pauseConnector(w io.Writer, projectID, region, connectClusterID, connectorID string, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// connectClusterID := "my-connect-cluster"
    	// connectorID := "my-connector"
    	ctx := context.Background()
    	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
    	}
    	defer client.Close()
    
    	connectorPath := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s/connectors/%s", projectID, region, connectClusterID, connectorID)
    	req := &managedkafkapb.PauseConnectorRequest{
    		Name: connectorPath,
    	}
    	resp, err := client.PauseConnector(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.PauseConnector got err: %w", err)
    	}
    	fmt.Fprintf(w, "Paused connector: %#v\n", resp)
    	return nil
    }
    

    Java

    在試用這個範例之前,請先按照「 安裝用戶端程式庫」中的 Java 設定操作說明進行操作。詳情請參閱 Managed Service for Apache Kafka Java API 參考說明文件

    如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證。詳情請參閱「 為本機開發環境設定 ADC」。

    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ConnectorName;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
    import com.google.cloud.managedkafka.v1.PauseConnectorRequest;
    import java.io.IOException;
    
    public class PauseConnector {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String connectClusterId = "my-connect-cluster";
        String connectorId = "my-connector";
        pauseConnector(projectId, region, connectClusterId, connectorId);
      }
    
      public static void pauseConnector(
          String projectId, String region, String connectClusterId, String connectorId)
          throws Exception {
        try (ManagedKafkaConnectClient managedKafkaConnectClient = 
            ManagedKafkaConnectClient.create()) {
          ConnectorName connectorName = ConnectorName.of(projectId, region, connectClusterId, 
              connectorId);
          PauseConnectorRequest request = PauseConnectorRequest.newBuilder()
              .setName(connectorName.toString()).build();
    
          // This operation is being handled synchronously.
          managedKafkaConnectClient.pauseConnector(request);
          System.out.printf("Connector %s paused successfully.\n", connectorId);
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaConnectClient.pauseConnector got err: %s\n", 
              e.getMessage());
        }
      }
    }
    

    Python

    在試用這個範例之前,請先按照「 安裝用戶端程式庫」中的 Python 設定說明操作。詳情請參閱 Managed Service for Apache Kafka Python API 參考說明文件

    如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定 ADC」。

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
        ManagedKafkaConnectClient,
    )
    from google.cloud import managedkafka_v1
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # connect_cluster_id = "my-connect-cluster"
    # connector_id = "my-connector"
    
    connect_client = ManagedKafkaConnectClient()
    
    request = managedkafka_v1.PauseConnectorRequest(
        name=connect_client.connector_path(project_id, region, connect_cluster_id, connector_id),
    )
    
    try:
        operation = connect_client.pause_connector(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        operation.result()
        print(f"Paused connector {connector_id}")
    except GoogleAPICallError as e:
        print(f"Failed to pause connector {connector_id} with error: {e}")
    

繼續執行連接器

重新啟用已暫停的連接器後,系統會從上次中斷處繼續執行作業。

控制台

  1. 前往 Google Cloud 控制台的「Connect Clusters」(連結叢集) 頁面。

    前往「Connect Clusters」(連結叢集)

  2. 按一下要恢復的連接器所在的 Connect 叢集。

    系統隨即會顯示「Connect cluster details」(連線叢集詳細資料) 頁面。

  3. 在「資源」分頁中,找出清單中已暫停的連結器,然後按一下其名稱。

    系統會將您重新導向至「連接器詳細資料」頁面。

  4. 按一下「繼續」

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 使用 gcloud managed-kafka connectors resume 指令繼續執行連接器:

    gcloud managed-kafka connectors resume CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID
    

    更改下列內容:

    • CONNECTOR_ID:必填,要繼續的連接器 ID。
    • LOCATION:必填,包含連接器的 Connect 叢集位置。
    • CONNECT_CLUSTER_ID:必填,包含連接器的 Connect 叢集 ID。
  3. Go

    在試用這個範例之前,請先按照「 安裝用戶端程式庫」中的 Go 設定說明操作。詳情請參閱 Managed Service for Apache Kafka Go API 參考說明文件

    如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證(ADC)。 詳情請參閱「為本機開發環境設定 ADC」。

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func resumeConnector(w io.Writer, projectID, region, connectClusterID, connectorID string, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// connectClusterID := "my-connect-cluster"
    	// connectorID := "my-connector"
    	ctx := context.Background()
    	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
    	}
    	defer client.Close()
    
    	connectorPath := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s/connectors/%s", projectID, region, connectClusterID, connectorID)
    	req := &managedkafkapb.ResumeConnectorRequest{
    		Name: connectorPath,
    	}
    	resp, err := client.ResumeConnector(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.ResumeConnector got err: %w", err)
    	}
    	fmt.Fprintf(w, "Resumed connector: %#v\n", resp)
    	return nil
    }
    

    Java

    在試用這個範例之前,請先按照「 安裝用戶端程式庫」中的 Java 設定操作說明進行操作。詳情請參閱 Managed Service for Apache Kafka Java API 參考說明文件

    如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證。詳情請參閱「 為本機開發環境設定 ADC」。

    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ConnectorName;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
    import com.google.cloud.managedkafka.v1.ResumeConnectorRequest;
    import java.io.IOException;
    
    public class ResumeConnector {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String connectClusterId = "my-connect-cluster";
        String connectorId = "my-connector";
        resumeConnector(projectId, region, connectClusterId, connectorId);
      }
    
      public static void resumeConnector(
          String projectId, String region, String connectClusterId, String connectorId)
          throws Exception {
        try (ManagedKafkaConnectClient managedKafkaConnectClient = 
            ManagedKafkaConnectClient.create()) {
          ConnectorName connectorName = ConnectorName.of(projectId, region, connectClusterId, 
              connectorId);
          ResumeConnectorRequest request = ResumeConnectorRequest.newBuilder()
              .setName(connectorName.toString()).build();
    
          // This operation is being handled synchronously.
          managedKafkaConnectClient.resumeConnector(request);
          System.out.printf("Connector %s resumed successfully.\n", connectorId);
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaConnectClient.resumeConnector got err: %s\n", 
              e.getMessage());
        }
      }
    }
    

    Python

    在試用這個範例之前,請先按照「 安裝用戶端程式庫」中的 Python 設定說明操作。詳情請參閱 Managed Service for Apache Kafka Python API 參考說明文件

    如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定 ADC」。

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
        ManagedKafkaConnectClient,
    )
    from google.cloud import managedkafka_v1
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # connect_cluster_id = "my-connect-cluster"
    # connector_id = "my-connector"
    
    connect_client = ManagedKafkaConnectClient()
    
    request = managedkafka_v1.ResumeConnectorRequest(
        name=connect_client.connector_path(project_id, region, connect_cluster_id, connector_id),
    )
    
    try:
        operation = connect_client.resume_connector(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        operation.result()
        print(f"Resumed connector {connector_id}")
    except GoogleAPICallError as e:
        print(f"Failed to resume connector {connector_id} with error: {e}")
    

停止連接器

停止連接器會停止連接器的所有工作。停止連接器會保留其狀態。如要讓連接器再次運作,請重新啟動連接器。記錄和指標也會長期儲存。

控制台

  1. 前往 Google Cloud 控制台的「Connect Clusters」(連結叢集) 頁面。

    前往「Connect Clusters」(連結叢集)

  2. 按一下要停止連接器的 Connect 叢集。

    系統隨即會顯示「Connect cluster details」(連線叢集詳細資料) 頁面。

  3. 在「資源」分頁中,找出清單中的連結器,然後按一下連結器名稱。

    系統會將您重新導向至「連接器詳細資料」頁面。

  4. 按一下「停止」

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 使用 gcloud managed-kafka connectors stop 指令停止連接器:

    gcloud managed-kafka connectors stop CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID
    

    更改下列內容:

    • CONNECTOR_ID:必填,要停止的連接器 ID。
    • LOCATION:必填,包含連接器的 Connect 叢集位置。
    • CONNECT_CLUSTER_ID:必填,包含連接器的 Connect 叢集 ID。
  3. Go

    在試用這個範例之前,請先按照「 安裝用戶端程式庫」中的 Go 設定說明操作。詳情請參閱 Managed Service for Apache Kafka Go API 參考說明文件

    如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證(ADC)。 詳情請參閱「為本機開發環境設定 ADC」。

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func stopConnector(w io.Writer, projectID, region, connectClusterID, connectorID string, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// connectClusterID := "my-connect-cluster"
    	// connectorID := "my-connector"
    	ctx := context.Background()
    	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
    	}
    	defer client.Close()
    
    	connectorPath := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s/connectors/%s", projectID, region, connectClusterID, connectorID)
    	req := &managedkafkapb.StopConnectorRequest{
    		Name: connectorPath,
    	}
    	resp, err := client.StopConnector(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.StopConnector got err: %w", err)
    	}
    	fmt.Fprintf(w, "Stopped connector: %#v\n", resp)
    	return nil
    }
    

    Java

    在試用這個範例之前,請先按照「 安裝用戶端程式庫」中的 Java 設定操作說明進行操作。詳情請參閱 Managed Service for Apache Kafka Java API 參考說明文件

    如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證。詳情請參閱「 為本機開發環境設定 ADC」。

    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ConnectorName;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
    import com.google.cloud.managedkafka.v1.StopConnectorRequest;
    import java.io.IOException;
    
    public class StopConnector {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String connectClusterId = "my-connect-cluster";
        String connectorId = "my-connector";
        stopConnector(projectId, region, connectClusterId, connectorId);
      }
    
      public static void stopConnector(
          String projectId, String region, String connectClusterId, String connectorId)
          throws Exception {
        try (ManagedKafkaConnectClient managedKafkaConnectClient = 
            ManagedKafkaConnectClient.create()) {
          ConnectorName connectorName = ConnectorName.of(projectId, region, connectClusterId, 
              connectorId);
          StopConnectorRequest request = StopConnectorRequest.newBuilder()
              .setName(connectorName.toString()).build();
    
          // This operation is being handled synchronously.
          managedKafkaConnectClient.stopConnector(request);
          System.out.printf("Connector %s stopped successfully.\n", connectorId);
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaConnectClient.stopConnector got err: %s\n", e.getMessage());
        }
      }
    }
    

    Python

    在試用這個範例之前,請先按照「 安裝用戶端程式庫」中的 Python 設定說明操作。詳情請參閱 Managed Service for Apache Kafka Python API 參考說明文件

    如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定 ADC」。

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
        ManagedKafkaConnectClient,
    )
    from google.cloud import managedkafka_v1
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # connect_cluster_id = "my-connect-cluster"
    # connector_id = "my-connector"
    
    connect_client = ManagedKafkaConnectClient()
    
    request = managedkafka_v1.StopConnectorRequest(
        name=connect_client.connector_path(project_id, region, connect_cluster_id, connector_id),
    )
    
    try:
        operation = connect_client.stop_connector(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        operation.result()
        print(f"Stopped connector {connector_id}")
    except GoogleAPICallError as e:
        print(f"Failed to stop connector {connector_id} with error: {e}")
    

重新啟動連接器

重新啟動連接器會完全停止,然後重新啟動其工作。這項功能可用於重新整理連接器的狀態或套用設定變更。

注意:重新啟動連接器可能會導致資料流暫時中斷。

控制台

  1. 前往 Google Cloud 控制台的「Connect Clusters」(連結叢集) 頁面。

    前往「Connect Clusters」(連結叢集)

  2. 按一下要重新啟動連接器的 Connect 叢集。

    系統隨即會顯示「Connect cluster details」(連線叢集詳細資料) 頁面。

  3. 在「資源」分頁中,找出清單中的連結器,然後按一下連結器名稱。

    系統會將您重新導向至「連接器詳細資料」頁面。

  4. 按一下 [Restart]

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 使用 gcloud managed-kafka connectors restart 指令重新啟動連接器:

    gcloud managed-kafka connectors restart CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID
    

    更改下列內容:

    • CONNECTOR_ID:必填,要重新啟動的連接器 ID。
    • LOCATION:必填,包含連接器的 Connect 叢集位置。
    • CONNECT_CLUSTER_ID:必填,包含連接器的 Connect 叢集 ID。
  3. Go

    在試用這個範例之前,請先按照「 安裝用戶端程式庫」中的 Go 設定說明操作。詳情請參閱 Managed Service for Apache Kafka Go API 參考說明文件

    如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證(ADC)。 詳情請參閱「為本機開發環境設定 ADC」。

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func restartConnector(w io.Writer, projectID, region, connectClusterID, connectorID string, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// connectClusterID := "my-connect-cluster"
    	// connectorID := "my-connector"
    	ctx := context.Background()
    	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
    	}
    	defer client.Close()
    
    	connectorPath := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s/connectors/%s", projectID, region, connectClusterID, connectorID)
    	req := &managedkafkapb.RestartConnectorRequest{
    		Name: connectorPath,
    	}
    	resp, err := client.RestartConnector(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.RestartConnector got err: %w", err)
    	}
    	fmt.Fprintf(w, "Restarted connector: %#v\n", resp)
    	return nil
    }
    

    Java

    在試用這個範例之前,請先按照「 安裝用戶端程式庫」中的 Java 設定操作說明進行操作。詳情請參閱 Managed Service for Apache Kafka Java API 參考說明文件

    如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證。詳情請參閱「 為本機開發環境設定 ADC」。

    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ConnectorName;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
    import com.google.cloud.managedkafka.v1.RestartConnectorRequest;
    import java.io.IOException;
    
    public class RestartConnector {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String connectClusterId = "my-connect-cluster";
        String connectorId = "my-connector";
        restartConnector(projectId, region, connectClusterId, connectorId);
      }
    
      public static void restartConnector(
          String projectId, String region, String connectClusterId, String connectorId)
          throws Exception {
        try (ManagedKafkaConnectClient managedKafkaConnectClient = 
            ManagedKafkaConnectClient.create()) {
          ConnectorName connectorName = ConnectorName.of(projectId, region, connectClusterId, 
              connectorId);
          RestartConnectorRequest request = RestartConnectorRequest.newBuilder()
              .setName(connectorName.toString()).build();
    
          // This operation is being handled synchronously.
          managedKafkaConnectClient.restartConnector(request);
          System.out.printf("Connector %s restarted successfully.\n", connectorId);
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaConnectClient.restartConnector got err: %s\n", 
              e.getMessage());
        }
      }
    }
    

    Python

    在試用這個範例之前,請先按照「 安裝用戶端程式庫」中的 Python 設定說明操作。詳情請參閱 Managed Service for Apache Kafka Python API 參考說明文件

    如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定 ADC」。

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
        ManagedKafkaConnectClient,
    )
    from google.cloud import managedkafka_v1
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # connect_cluster_id = "my-connect-cluster"
    # connector_id = "my-connector"
    
    connect_client = ManagedKafkaConnectClient()
    
    request = managedkafka_v1.RestartConnectorRequest(
        name=connect_client.connector_path(project_id, region, connect_cluster_id, connector_id),
    )
    
    try:
        operation = connect_client.restart_connector(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        operation.result()
        print(f"Restarted connector {connector_id}")
    except GoogleAPICallError as e:
        print(f"Failed to restart connector {connector_id} with error: {e}")
    

Apache Kafka® 是 The Apache Software Foundation 或其關聯企業在美國與/或其他國家/地區的註冊商標。