Menjeda, melanjutkan, menghentikan, dan memulai ulang konektor

Anda dapat mengontrol operasi konektor dengan menjeda, melanjutkan, menghentikan, atau memulainya ulang. Tindakan ini memungkinkan Anda mengelola alur data dan mengatasi masalah tanpa menghapus dan membuat ulang konektor.

Untuk menjeda, melanjutkan, menghentikan, atau memulai ulang konektor di cluster Connect, Anda dapat menggunakan konsol Google Cloud , gcloud CLI, library klien Managed Service for Apache Kafka, atau Managed Kafka API. Anda tidak dapat menggunakan API Apache Kafka open source untuk mengubah status konektor.

Peran dan izin yang diperlukan untuk menjeda, melanjutkan, menghentikan, atau memulai ulang konektor

Untuk mendapatkan izin yang diperlukan untuk menjeda, melanjutkan, menghentikan, atau memulai ulang konektor, minta administrator untuk memberi Anda peran IAM Managed Kafka Connector Editor (roles/managedkafka.connectorEditor) di project yang berisi cluster Connect. Untuk mengetahui informasi selengkapnya tentang cara memberikan peran, lihat Mengelola akses ke project, folder, dan organisasi.

Peran bawaan ini berisi izin yang diperlukan untuk menjeda, melanjutkan, menghentikan, atau memulai ulang konektor. Untuk melihat izin yang benar-benar diperlukan, perluas bagian Izin yang diperlukan:

Izin yang diperlukan

Izin berikut diperlukan untuk menjeda, melanjutkan, menghentikan, atau memulai ulang konektor:

  • Berikan izin konektor jeda pada konektor yang diminta: managedkafka.connectors.pause
  • Berikan izin konektor melanjutkan pada konektor yang diminta: managedkafka.connectors.resume
  • Berikan izin konektor mulai ulang pada konektor yang diminta: managedkafka.connectors.restart
  • Berikan izin penghentian konektor pada konektor yang diminta: managedkafka.connectors.stop

Anda mungkin juga bisa mendapatkan izin ini dengan peran khusus atau peran bawaan lainnya.

Untuk mengetahui informasi selengkapnya tentang peran Editor Konektor Kafka Terkelola, lihat Peran standar Managed Service untuk Apache Kafka.

Menjeda konektor

Saat Anda menjeda konektor, statusnya akan dipertahankan. Artinya, konektor mengingat tempat terakhir kali konektor memproses pesan atau data. Pemrosesan pesan dihentikan hingga konektor dilanjutkan. Anda dapat melanjutkan konektor yang dijeda, dan konektor akan dilanjutkan dari tempat konektor dijeda. Hal ini berguna untuk memecahkan masalah atau melakukan pemeliharaan tanpa kehilangan penyiapan konektor.

Konsol

  1. Di konsol Google Cloud , buka halaman Connect Clusters.

    Buka Connect Clusters

  2. Klik cluster Connect yang menghosting konektor yang ingin Anda jeda.

    Halaman Connect cluster details akan ditampilkan.

  3. Di tab Resources, temukan konektor dalam daftar, lalu klik namanya.

    Anda akan dialihkan ke halaman Detail konektor.

  4. Klik Jeda.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Gunakan perintah gcloud managed-kafka connectors pause untuk menjeda konektor:

    gcloud managed-kafka connectors pause CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID
    

    Ganti kode berikut:

    • CONNECTOR_ID: Wajib diisi. ID konektor yang ingin Anda jeda.
    • LOCATION: Wajib diisi. Lokasi Connect cluster yang berisi konektor.
    • CONNECT_CLUSTER_ID: Wajib diisi. ID cluster Connect yang berisi konektor.
  3. Go

    Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Go di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Managed Service for Apache Kafka Go API.

    Untuk melakukan autentikasi ke Managed Service untuk Apache Kafka, siapkan Kredensial Default Aplikasi(ADC). Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func pauseConnector(w io.Writer, projectID, region, connectClusterID, connectorID string, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// connectClusterID := "my-connect-cluster"
    	// connectorID := "my-connector"
    	ctx := context.Background()
    	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
    	}
    	defer client.Close()
    
    	connectorPath := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s/connectors/%s", projectID, region, connectClusterID, connectorID)
    	req := &managedkafkapb.PauseConnectorRequest{
    		Name: connectorPath,
    	}
    	resp, err := client.PauseConnector(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.PauseConnector got err: %w", err)
    	}
    	fmt.Fprintf(w, "Paused connector: %#v\n", resp)
    	return nil
    }
    

    Java

    Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Java di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Managed Service for Apache Kafka Java API.

    Untuk melakukan autentikasi ke Managed Service for Apache Kafka, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.

    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ConnectorName;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
    import com.google.cloud.managedkafka.v1.PauseConnectorRequest;
    import java.io.IOException;
    
    public class PauseConnector {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String connectClusterId = "my-connect-cluster";
        String connectorId = "my-connector";
        pauseConnector(projectId, region, connectClusterId, connectorId);
      }
    
      public static void pauseConnector(
          String projectId, String region, String connectClusterId, String connectorId)
          throws Exception {
        try (ManagedKafkaConnectClient managedKafkaConnectClient = 
            ManagedKafkaConnectClient.create()) {
          ConnectorName connectorName = ConnectorName.of(projectId, region, connectClusterId, 
              connectorId);
          PauseConnectorRequest request = PauseConnectorRequest.newBuilder()
              .setName(connectorName.toString()).build();
    
          // This operation is being handled synchronously.
          managedKafkaConnectClient.pauseConnector(request);
          System.out.printf("Connector %s paused successfully.\n", connectorId);
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaConnectClient.pauseConnector got err: %s\n", 
              e.getMessage());
        }
      }
    }
    

    Python

    Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Python di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Managed Service for Apache Kafka Python API.

    Untuk melakukan autentikasi ke Managed Service for Apache Kafka, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
        ManagedKafkaConnectClient,
    )
    from google.cloud import managedkafka_v1
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # connect_cluster_id = "my-connect-cluster"
    # connector_id = "my-connector"
    
    connect_client = ManagedKafkaConnectClient()
    
    request = managedkafka_v1.PauseConnectorRequest(
        name=connect_client.connector_path(project_id, region, connect_cluster_id, connector_id),
    )
    
    try:
        operation = connect_client.pause_connector(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        operation.result()
        print(f"Paused connector {connector_id}")
    except GoogleAPICallError as e:
        print(f"Failed to pause connector {connector_id} with error: {e}")
    

Melanjutkan konektor

Melanjutkan konektor yang dijeda akan memulai ulang operasinya dari tempat terakhir kali dihentikan.

Konsol

  1. Di konsol Google Cloud , buka halaman Connect Clusters.

    Buka Connect Clusters

  2. Klik cluster Connect yang menghosting konektor yang ingin Anda lanjutkan.

    Halaman Connect cluster details akan ditampilkan.

  3. Di tab Resources, temukan konektor yang dijeda dalam daftar, lalu klik namanya.

    Anda akan dialihkan ke halaman Detail konektor.

  4. Klik Lanjutkan.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Gunakan perintah gcloud managed-kafka connectors resume untuk melanjutkan konektor:

    gcloud managed-kafka connectors resume CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID
    

    Ganti kode berikut:

    • CONNECTOR_ID: Wajib diisi. ID konektor yang ingin Anda lanjutkan.
    • LOCATION: Wajib diisi. Lokasi Connect cluster yang berisi konektor.
    • CONNECT_CLUSTER_ID: Wajib diisi. ID cluster Connect yang berisi konektor.
  3. Go

    Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Go di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Managed Service for Apache Kafka Go API.

    Untuk melakukan autentikasi ke Managed Service untuk Apache Kafka, siapkan Kredensial Default Aplikasi(ADC). Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func resumeConnector(w io.Writer, projectID, region, connectClusterID, connectorID string, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// connectClusterID := "my-connect-cluster"
    	// connectorID := "my-connector"
    	ctx := context.Background()
    	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
    	}
    	defer client.Close()
    
    	connectorPath := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s/connectors/%s", projectID, region, connectClusterID, connectorID)
    	req := &managedkafkapb.ResumeConnectorRequest{
    		Name: connectorPath,
    	}
    	resp, err := client.ResumeConnector(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.ResumeConnector got err: %w", err)
    	}
    	fmt.Fprintf(w, "Resumed connector: %#v\n", resp)
    	return nil
    }
    

    Java

    Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Java di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Managed Service for Apache Kafka Java API.

    Untuk melakukan autentikasi ke Managed Service for Apache Kafka, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.

    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ConnectorName;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
    import com.google.cloud.managedkafka.v1.ResumeConnectorRequest;
    import java.io.IOException;
    
    public class ResumeConnector {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String connectClusterId = "my-connect-cluster";
        String connectorId = "my-connector";
        resumeConnector(projectId, region, connectClusterId, connectorId);
      }
    
      public static void resumeConnector(
          String projectId, String region, String connectClusterId, String connectorId)
          throws Exception {
        try (ManagedKafkaConnectClient managedKafkaConnectClient = 
            ManagedKafkaConnectClient.create()) {
          ConnectorName connectorName = ConnectorName.of(projectId, region, connectClusterId, 
              connectorId);
          ResumeConnectorRequest request = ResumeConnectorRequest.newBuilder()
              .setName(connectorName.toString()).build();
    
          // This operation is being handled synchronously.
          managedKafkaConnectClient.resumeConnector(request);
          System.out.printf("Connector %s resumed successfully.\n", connectorId);
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaConnectClient.resumeConnector got err: %s\n", 
              e.getMessage());
        }
      }
    }
    

    Python

    Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Python di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Managed Service for Apache Kafka Python API.

    Untuk melakukan autentikasi ke Managed Service for Apache Kafka, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
        ManagedKafkaConnectClient,
    )
    from google.cloud import managedkafka_v1
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # connect_cluster_id = "my-connect-cluster"
    # connector_id = "my-connector"
    
    connect_client = ManagedKafkaConnectClient()
    
    request = managedkafka_v1.ResumeConnectorRequest(
        name=connect_client.connector_path(project_id, region, connect_cluster_id, connector_id),
    )
    
    try:
        operation = connect_client.resume_connector(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        operation.result()
        print(f"Resumed connector {connector_id}")
    except GoogleAPICallError as e:
        print(f"Failed to resume connector {connector_id} with error: {e}")
    

Menghentikan konektor

Menghentikan konektor akan menghentikan semua tugas konektor. Menghentikan konektor akan mempertahankan statusnya. Untuk menjalankan konektor lagi, Anda harus memulai ulang konektor. Log dan metrik juga disimpan secara tahan lama.

Konsol

  1. Di konsol Google Cloud , buka halaman Connect Clusters.

    Buka Connect Clusters

  2. Klik cluster Connect yang menghosting konektor yang ingin Anda hentikan.

    Halaman Connect cluster details akan ditampilkan.

  3. Di tab Resources, temukan konektor dalam daftar, lalu klik namanya.

    Anda akan dialihkan ke halaman Detail konektor.

  4. Klik Stop.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Gunakan perintah gcloud managed-kafka connectors stop untuk menghentikan konektor:

    gcloud managed-kafka connectors stop CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID
    

    Ganti kode berikut:

    • CONNECTOR_ID: Wajib diisi. ID konektor yang ingin Anda hentikan.
    • LOCATION: Wajib diisi. Lokasi Connect cluster yang berisi konektor.
    • CONNECT_CLUSTER_ID: Wajib diisi. ID cluster Connect yang berisi konektor.
  3. Go

    Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Go di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Managed Service for Apache Kafka Go API.

    Untuk melakukan autentikasi ke Managed Service untuk Apache Kafka, siapkan Kredensial Default Aplikasi(ADC). Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func stopConnector(w io.Writer, projectID, region, connectClusterID, connectorID string, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// connectClusterID := "my-connect-cluster"
    	// connectorID := "my-connector"
    	ctx := context.Background()
    	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
    	}
    	defer client.Close()
    
    	connectorPath := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s/connectors/%s", projectID, region, connectClusterID, connectorID)
    	req := &managedkafkapb.StopConnectorRequest{
    		Name: connectorPath,
    	}
    	resp, err := client.StopConnector(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.StopConnector got err: %w", err)
    	}
    	fmt.Fprintf(w, "Stopped connector: %#v\n", resp)
    	return nil
    }
    

    Java

    Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Java di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Managed Service for Apache Kafka Java API.

    Untuk melakukan autentikasi ke Managed Service for Apache Kafka, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.

    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ConnectorName;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
    import com.google.cloud.managedkafka.v1.StopConnectorRequest;
    import java.io.IOException;
    
    public class StopConnector {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String connectClusterId = "my-connect-cluster";
        String connectorId = "my-connector";
        stopConnector(projectId, region, connectClusterId, connectorId);
      }
    
      public static void stopConnector(
          String projectId, String region, String connectClusterId, String connectorId)
          throws Exception {
        try (ManagedKafkaConnectClient managedKafkaConnectClient = 
            ManagedKafkaConnectClient.create()) {
          ConnectorName connectorName = ConnectorName.of(projectId, region, connectClusterId, 
              connectorId);
          StopConnectorRequest request = StopConnectorRequest.newBuilder()
              .setName(connectorName.toString()).build();
    
          // This operation is being handled synchronously.
          managedKafkaConnectClient.stopConnector(request);
          System.out.printf("Connector %s stopped successfully.\n", connectorId);
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaConnectClient.stopConnector got err: %s\n", e.getMessage());
        }
      }
    }
    

    Python

    Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Python di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Managed Service for Apache Kafka Python API.

    Untuk melakukan autentikasi ke Managed Service for Apache Kafka, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
        ManagedKafkaConnectClient,
    )
    from google.cloud import managedkafka_v1
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # connect_cluster_id = "my-connect-cluster"
    # connector_id = "my-connector"
    
    connect_client = ManagedKafkaConnectClient()
    
    request = managedkafka_v1.StopConnectorRequest(
        name=connect_client.connector_path(project_id, region, connect_cluster_id, connector_id),
    )
    
    try:
        operation = connect_client.stop_connector(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        operation.result()
        print(f"Stopped connector {connector_id}")
    except GoogleAPICallError as e:
        print(f"Failed to stop connector {connector_id} with error: {e}")
    

Mulai ulang konektor

Memulai ulang konektor akan menghentikan sepenuhnya, lalu memulai ulang tugasnya. Hal ini dapat berguna untuk memperbarui status konektor atau menerapkan perubahan konfigurasi.

Catatan: Memulai ulang konektor dapat menyebabkan gangguan singkat dalam alur data.

Konsol

  1. Di konsol Google Cloud , buka halaman Connect Clusters.

    Buka Connect Clusters

  2. Klik cluster Connect yang menghosting konektor yang ingin Anda mulai ulang.

    Halaman Connect cluster details akan ditampilkan.

  3. Di tab Resources, temukan konektor dalam daftar, lalu klik namanya.

    Anda akan dialihkan ke halaman Detail konektor.

  4. Klik Restart.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Gunakan perintah gcloud managed-kafka connectors restart untuk memulai ulang konektor:

    gcloud managed-kafka connectors restart CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID
    

    Ganti kode berikut:

    • CONNECTOR_ID: Wajib diisi. ID konektor yang ingin Anda mulai ulang.
    • LOCATION: Wajib diisi. Lokasi Connect cluster yang berisi konektor.
    • CONNECT_CLUSTER_ID: Wajib diisi. ID cluster Connect yang berisi konektor.
  3. Go

    Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Go di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Managed Service for Apache Kafka Go API.

    Untuk melakukan autentikasi ke Managed Service untuk Apache Kafka, siapkan Kredensial Default Aplikasi(ADC). Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func restartConnector(w io.Writer, projectID, region, connectClusterID, connectorID string, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// connectClusterID := "my-connect-cluster"
    	// connectorID := "my-connector"
    	ctx := context.Background()
    	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
    	}
    	defer client.Close()
    
    	connectorPath := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s/connectors/%s", projectID, region, connectClusterID, connectorID)
    	req := &managedkafkapb.RestartConnectorRequest{
    		Name: connectorPath,
    	}
    	resp, err := client.RestartConnector(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.RestartConnector got err: %w", err)
    	}
    	fmt.Fprintf(w, "Restarted connector: %#v\n", resp)
    	return nil
    }
    

    Java

    Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Java di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Managed Service for Apache Kafka Java API.

    Untuk melakukan autentikasi ke Managed Service for Apache Kafka, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.

    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ConnectorName;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
    import com.google.cloud.managedkafka.v1.RestartConnectorRequest;
    import java.io.IOException;
    
    public class RestartConnector {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String connectClusterId = "my-connect-cluster";
        String connectorId = "my-connector";
        restartConnector(projectId, region, connectClusterId, connectorId);
      }
    
      public static void restartConnector(
          String projectId, String region, String connectClusterId, String connectorId)
          throws Exception {
        try (ManagedKafkaConnectClient managedKafkaConnectClient = 
            ManagedKafkaConnectClient.create()) {
          ConnectorName connectorName = ConnectorName.of(projectId, region, connectClusterId, 
              connectorId);
          RestartConnectorRequest request = RestartConnectorRequest.newBuilder()
              .setName(connectorName.toString()).build();
    
          // This operation is being handled synchronously.
          managedKafkaConnectClient.restartConnector(request);
          System.out.printf("Connector %s restarted successfully.\n", connectorId);
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaConnectClient.restartConnector got err: %s\n", 
              e.getMessage());
        }
      }
    }
    

    Python

    Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Python di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Managed Service for Apache Kafka Python API.

    Untuk melakukan autentikasi ke Managed Service for Apache Kafka, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
        ManagedKafkaConnectClient,
    )
    from google.cloud import managedkafka_v1
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # connect_cluster_id = "my-connect-cluster"
    # connector_id = "my-connector"
    
    connect_client = ManagedKafkaConnectClient()
    
    request = managedkafka_v1.RestartConnectorRequest(
        name=connect_client.connector_path(project_id, region, connect_cluster_id, connector_id),
    )
    
    try:
        operation = connect_client.restart_connector(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        operation.result()
        print(f"Restarted connector {connector_id}")
    except GoogleAPICallError as e:
        print(f"Failed to restart connector {connector_id} with error: {e}")
    

Apache Kafka® adalah merek dagang terdaftar milik The Apache Software Foundation atau afiliasinya di Amerika Serikat dan/atau negara lain.