Mengupdate konektor

Anda dapat mengedit konektor untuk memperbarui konfigurasinya, seperti mengubah topik yang dibaca atau ditulisi, mengubah transformasi data, atau menyesuaikan setelan penanganan error.

Untuk memperbarui konektor di cluster Connect, Anda dapat menggunakan konsol Google Cloud , gcloud CLI, library klien Managed Service for Apache Kafka, atau Managed Kafka API. Anda tidak dapat menggunakan Apache Kafka API open source untuk memperbarui konektor.

Sebelum memulai

Sebelum memperbarui konektor, tinjau konfigurasi yang ada dan pahami kemungkinan dampak dari perubahan yang Anda lakukan.

Peran dan izin yang diperlukan untuk memperbarui konektor

Untuk mendapatkan izin yang diperlukan untuk mengedit konektor, minta administrator Anda untuk memberi Anda peran IAM Managed Kafka Connector Editor (roles/managedkafka.connectorEditor) di project yang berisi cluster Connect. Untuk mengetahui informasi selengkapnya tentang cara memberikan peran, lihat Mengelola akses ke project, folder, dan organisasi.

Peran bawaan ini berisi izin yang diperlukan untuk mengedit konektor. Untuk melihat izin yang benar-benar diperlukan, perluas bagian Izin yang diperlukan:

Izin yang diperlukan

Izin berikut diperlukan untuk mengedit konektor:

  • Berikan izin konektor update di cluster Connect induk: managedkafka.connectors.update
  • Berikan izin konektor daftar di cluster Connect induk: This permission is only required for updating a connector using the Google Cloud console

Anda mungkin juga bisa mendapatkan izin ini dengan peran khusus atau peran bawaan lainnya.

Untuk mengetahui informasi selengkapnya tentang peran Editor Konektor Kafka Terkelola, lihat Peran standar Google Cloud Managed Service for Apache Kafka.

Properti konektor yang dapat diedit

Properti konektor yang dapat diedit bergantung pada jenisnya. Berikut ringkasan properti yang dapat diedit untuk jenis konektor yang didukung:

Konektor Sumber MirrorMaker 2.0

  • Nama topik atau regex topik yang dipisahkan koma: Topik yang akan direplikasi.

    Untuk mengetahui informasi selengkapnya tentang properti, lihat Nama topik.

  • Konfigurasi: Setelan konfigurasi tambahan untuk konektor.

    Untuk mengetahui informasi selengkapnya tentang properti, lihat Konfigurasi.

  • Kebijakan mulai ulang tugas: Kebijakan untuk memulai ulang tugas konektor yang gagal.

    Untuk mengetahui informasi selengkapnya tentang properti, lihat Kebijakan memulai ulang tugas.

Konektor Sink BigQuery

  • Topik: Topik Kafka yang menjadi sumber streaming data.

    Untuk mengetahui informasi selengkapnya tentang properti, lihat Topik.

  • Set data: Set data BigQuery untuk menyimpan data.

    Untuk mengetahui informasi selengkapnya tentang properti, lihat Set data.

  • Konfigurasi: Setelan konfigurasi tambahan untuk konektor.

    Untuk mengetahui informasi selengkapnya tentang properti, lihat Konfigurasi.

  • Kebijakan mulai ulang tugas: Kebijakan untuk memulai ulang tugas konektor yang gagal.

    Untuk mengetahui informasi selengkapnya tentang properti, lihat Kebijakan memulai ulang tugas.

Konektor Sink Cloud Storage

  • Topik: Topik Kafka yang menjadi sumber streaming data.

    Untuk mengetahui informasi selengkapnya tentang properti, lihat Topik.

  • Bucket Cloud Storage: Bucket Cloud Storage untuk menyimpan data.

    Untuk mengetahui informasi selengkapnya tentang properti, lihat Bucket.

  • Konfigurasi: Setelan konfigurasi tambahan untuk konektor.

    Untuk mengetahui informasi selengkapnya tentang properti, lihat Konfigurasi.

  • Kebijakan mulai ulang tugas: Kebijakan untuk memulai ulang tugas konektor yang gagal.

    Untuk mengetahui informasi selengkapnya tentang properti, lihat Kebijakan memulai ulang tugas.

Konektor Sumber Pub/Sub

  • Langganan Pub/Sub: Langganan Pub/Sub yang akan digunakan untuk menerima pesan.
  • Topik Kafka: Topik Kafka yang menjadi tujuan streaming pesan.
  • Konfigurasi: Setelan konfigurasi tambahan untuk konektor. Untuk mengetahui informasi selengkapnya, lihat Mengonfigurasi konektor.
  • Kebijakan mulai ulang tugas: Kebijakan untuk memulai ulang tugas konektor yang gagal. Untuk mengetahui informasi selengkapnya, lihat Kebijakan memulai ulang tugas.

Konektor Pub/Sub Sink

  • Topics: Topik Kafka yang menjadi sumber streaming pesan.

    Untuk mengetahui informasi selengkapnya tentang properti, lihat Topik.

  • Topik Pub/Sub: Topik Pub/Sub yang menjadi tujuan pengiriman pesan.

    Untuk mengetahui informasi selengkapnya tentang properti, lihat Topik Pub/Sub.

  • Konfigurasi: Setelan konfigurasi tambahan untuk konektor.

    Untuk mengetahui informasi selengkapnya tentang properti, lihat Konfigurasi.

  • Kebijakan mulai ulang tugas: Kebijakan untuk memulai ulang tugas konektor yang gagal.

    Untuk mengetahui informasi selengkapnya tentang properti, lihat Kebijakan memulai ulang tugas.

Mengupdate konektor

Memperbarui konektor dapat menyebabkan gangguan sementara pada alur data saat perubahan diterapkan.

Konsol

  1. Di konsol Google Cloud , buka halaman Connect Clusters.

    Buka Connect Clusters

  2. Klik cluster Connect yang menghosting konektor yang ingin Anda perbarui.

    Halaman Connect cluster details akan ditampilkan.

  3. Di tab Resources, temukan konektor dalam daftar, lalu klik namanya.

    Anda akan dialihkan ke halaman Detail konektor.

  4. Klik Edit.

  5. Perbarui properti yang diperlukan untuk konektor. Properti yang tersedia berbeda-beda bergantung pada jenis konektor.

  6. Klik Simpan.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Gunakan perintah gcloud managed-kafka connectors update untuk memperbarui konektor:

    Anda dapat memperbarui konfigurasi konektor menggunakan flag --configs dengan key-value pair yang dipisahkan koma atau flag --config-file dengan jalur ke file JSON atau YAML.

    Berikut adalah sintaksis yang menggunakan tanda --configs dengan pasangan nilai kunci yang dipisahkan koma.

    gcloud managed-kafka connectors update CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID \
        --configs=KEY1=VALUE1,KEY2=VALUE2...
    

    Berikut sintaksis yang menggunakan tanda --config-file dengan jalur ke file JSON atau YAML.

    gcloud managed-kafka connectors update CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID \
        --config-file=PATH_TO_CONFIG_FILE
    

    Ganti kode berikut:

    • CONNECTOR_ID: Wajib diisi. ID konektor yang ingin Anda perbarui.
    • LOCATION: Wajib diisi. Lokasi cluster Connect yang berisi konektor.
    • CONNECT_CLUSTER_ID: Wajib diisi. ID cluster Connect yang berisi konektor.
    • KEY1=VALUE1,KEY2=VALUE2...: Properti konfigurasi yang dipisahkan koma untuk diupdate. Contoh, tasks.max=2,value.converter.schemas.enable=true.
    • PATH_TO_CONFIG_FILE: Jalur ke file JSON atau YAML yang berisi properti konfigurasi yang akan diupdate. Contoh, config.json.

    Contoh perintah menggunakan --configs:

    gcloud managed-kafka connectors update test-connector \
        --location=us-central1 \
        --connect-cluster=test-connect-cluster \
        --configs=tasks.max=2,value.converter.schemas.enable=true
    

    Contoh perintah menggunakan --config-file. Berikut adalah contoh file yang bernama update_config.yaml:

    tasks.max: 3
    topic: updated-test-topic
    

    Berikut adalah contoh perintah yang menggunakan file:

    gcloud managed-kafka connectors update test-connector \
        --location=us-central1 \
        --connect-cluster=test-connect-cluster \
        --config-file=update_config.yaml
    
  3. Go

    Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Go di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Managed Service for Apache Kafka Go API.

    Untuk melakukan autentikasi ke Managed Service untuk Apache Kafka, siapkan Kredensial Default Aplikasi(ADC). Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    	"google.golang.org/protobuf/types/known/fieldmaskpb"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func updateConnector(w io.Writer, projectID, region, connectClusterID, connectorID string, config map[string]string, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// connectClusterID := "my-connect-cluster"
    	// connectorID := "my-connector"
    	// config := map[string]string{"tasks.max": "6"}
    	ctx := context.Background()
    	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
    	}
    	defer client.Close()
    
    	connectorPath := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s/connectors/%s", projectID, region, connectClusterID, connectorID)
    	connector := &managedkafkapb.Connector{
    		Name:    connectorPath,
    		Configs: config,
    	}
    	paths := []string{"configs"}
    	updateMask := &fieldmaskpb.FieldMask{
    		Paths: paths,
    	}
    
    	req := &managedkafkapb.UpdateConnectorRequest{
    		UpdateMask: updateMask,
    		Connector:  connector,
    	}
    	resp, err := client.UpdateConnector(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.UpdateConnector got err: %w", err)
    	}
    	fmt.Fprintf(w, "Updated connector: %#v\n", resp)
    	return nil
    }
    

    Java

    Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Java di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Managed Service for Apache Kafka Java API.

    Untuk melakukan autentikasi ke Managed Service for Apache Kafka, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.

    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.Connector;
    import com.google.cloud.managedkafka.v1.ConnectorName;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
    import com.google.protobuf.FieldMask;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    public class UpdateConnector {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-connect-cluster";
        String connectorId = "my-connector";
        // The new value for the 'tasks.max' configuration.
        String maxTasks = "5";
        updateConnector(projectId, region, clusterId, connectorId, maxTasks);
      }
    
      public static void updateConnector(
          String projectId, String region, String clusterId, String connectorId, String maxTasks)
          throws IOException {
        try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
          Map<String, String> configMap = new HashMap<>();
          configMap.put("tasks.max", maxTasks);
    
          Connector connector =
              Connector.newBuilder()
                  .setName(ConnectorName.of(projectId, region, clusterId, connectorId).toString())
                  .putAllConfigs(configMap)
                  .build();
    
          // The field mask specifies which fields to update. Here, we update the 'config' field.
          FieldMask updateMask = FieldMask.newBuilder().addPaths("config").build();
    
          // This operation is handled synchronously.
          Connector updatedConnector = managedKafkaConnectClient.updateConnector(connector, updateMask);
          System.out.printf("Updated connector: %s\n", updatedConnector.getName());
          System.out.println(updatedConnector.getAllFields());
    
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaConnectClient.updateConnector got err: %s\n", e.getMessage());
        }
      }
    }

    Python

    Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Python di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Managed Service for Apache Kafka Python API.

    Untuk melakukan autentikasi ke Managed Service for Apache Kafka, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud import managedkafka_v1
    from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
        ManagedKafkaConnectClient,
    )
    from google.cloud.managedkafka_v1.types import Connector
    from google.protobuf import field_mask_pb2
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # connect_cluster_id = "my-connect-cluster"
    # connector_id = "my-connector"
    # configs = {
    #     "tasks.max": "6",
    #     "value.converter.schemas.enable": "true"
    # }
    
    connect_client = ManagedKafkaConnectClient()
    
    connector = Connector()
    connector.name = connect_client.connector_path(
        project_id, region, connect_cluster_id, connector_id
    )
    connector.configs = configs
    update_mask = field_mask_pb2.FieldMask()
    update_mask.paths.append("config")
    
    # For a list of editable fields, one can check https://cloud.google.com/managed-service-for-apache-kafka/docs/connect-cluster/update-connector#editable-properties.
    request = managedkafka_v1.UpdateConnectorRequest(
        update_mask=update_mask,
        connector=connector,
    )
    
    try:
        operation = connect_client.update_connector(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        response = operation.result()
        print("Updated connector:", response)
    except GoogleAPICallError as e:
        print(f"The operation failed with error: {e}")
    

    You can also update the connector's task restart policy without
    including the configuration, by using the `--task-restart-min-backoff`
    and `--task-restart-max-backoff` flags. For example:
    
    ```sh
    gcloud managed-kafka connectors update test-connector \
      --location=us-central1 \
      --connect-cluster=test-connect-cluster \
      --task-restart-min-backoff="60s" \
      --task-restart-max-backoff="90s"
    
Apache Kafka® adalah merek dagang terdaftar milik The Apache Software Foundation atau afiliasinya di Amerika Serikat dan/atau negara lain.