Membuat konektor Sink Pub/Sub

Konektor Pub/Sub Sink melakukan streaming pesan dari topik Kafka ke topik Pub/Sub. Hal ini memungkinkan Anda mengintegrasikan aplikasi berbasis Kafka dengan Pub/Sub, sehingga memfasilitasi arsitektur berbasis peristiwa dan pemrosesan data real-time.

Sebelum memulai

Sebelum membuat konektor Pub/Sub Sink, pastikan Anda memiliki hal berikut:

Peran dan izin yang diperlukan

Untuk mendapatkan izin yang diperlukan untuk membuat konektor Sink Pub/Sub, minta administrator untuk memberi Anda peran IAM berikut di project yang berisi cluster Connect:

Untuk mengetahui informasi selengkapnya tentang pemberian peran, lihat Mengelola akses ke project, folder, dan organisasi.

Peran bawaan ini berisi izin yang diperlukan untuk membuat konektor Pub/Sub Sink. Untuk melihat izin yang benar-benar diperlukan, perluas bagian Izin yang diperlukan:

Izin yang diperlukan

Izin berikut diperlukan untuk membuat konektor Pub/Sub Sink:

  • Berikan izin pembuatan konektor di cluster Connect induk: managedkafka.connectors.create

Anda mungkin juga bisa mendapatkan izin ini dengan peran khusus atau peran bawaan lainnya.

Untuk mengetahui informasi selengkapnya tentang peran Managed Kafka Connector Editor, lihat Peran standar Managed Service for Apache Kafka.

Jika cluster Managed Service for Apache Kafka Anda berada dalam project yang sama dengan cluster Connect, tidak diperlukan izin lebih lanjut. Jika cluster Connect berada di project lain, lihat Membuat Cluster Connect di project lain.

Memberikan izin untuk memublikasikan ke topik Pub/Sub

Akun layanan cluster Connect, yang mengikuti format service-<project_number>@gcp-sa-managedkafka.iam.gserviceaccount.com, memerlukan izin untuk memublikasikan pesan ke topik Pub/Sub. Untuk melakukannya, berikan peran Pub/Sub Publisher (roles/pubsub.publisher) ke akun layanan cluster Connect di project yang berisi topik Pub/Sub.

Cara kerja konektor Sink Pub/Sub

Konektor Sink Pub/Sub menarik pesan dari satu atau beberapa topik Kafka dan memublikasikannya ke topik Pub/Sub.

Berikut uraian mendetail tentang cara konektor Sink Pub/Sub menyalin data:

  • Konektor menggunakan pesan dari satu atau beberapa topik Kafka dalam cluster sumber.

  • Konektor menulis pesan ke ID topik Pub/Sub target yang ditentukan menggunakan properti konfigurasi cps.topic. Ini adalah properti wajib diisi.

  • Konektor juga memerlukan project Google Cloud yang berisi topik Pub/Sub untuk ditentukan menggunakan properti konfigurasi cps.project. Ini adalah properti wajib diisi.

  • Konektor juga dapat secara opsional menggunakan endpoint Pub/Sub kustom yang ditentukan dengan menggunakan properti cps.endpoint. Endpoint default adalah "pubsub.googleapis.com:443".

  • Untuk mengoptimalkan performa, konektor akan mem-buffer pesan sebelum memublikasikannya ke Pub/Sub. Anda dapat mengonfigurasi maxBufferSize, maxBufferBytes, maxDelayThresholdMs, maxOutstandingRequestBytes, dan maxOutstandingMessages untuk mengontrol buffering.

  • Kafka record memiliki tiga komponen: header, kunci, nilai. Konektor menggunakan konverter kunci dan nilai untuk mengubah data pesan Kafka menjadi format yang diharapkan oleh Pub/Sub. Saat menggunakan skema nilai peta atau struct, properti messageBodyName menentukan kolom atau kunci yang akan digunakan sebagai isi pesan Pub/Sub.

  • Konektor dapat menyertakan topik, partisi, offset, dan stempel waktu Kafka sebagai atribut pesan dengan menggunakan properti metadata.publish yang ditetapkan ke true.

  • Konektor dapat menyertakan header pesan Kafka sebagai atribut pesan Pub/Sub dengan menggunakan properti headers.publish yang ditetapkan ke true.

  • Konektor dapat menyertakan kunci pengurutan untuk pesan Pub/Sub dengan menggunakan properti orderingKeySource. Opsi untuk nilainya mencakup "none" (default), "key", dan "partition".

  • Properti tasks.max mengontrol tingkat paralelisme untuk konektor. Meningkatkan tasks.max dapat meningkatkan throughput, tetapi paralelisme sebenarnya dibatasi oleh jumlah partisi dalam topik Kafka.

Properti konektor Sink Pub/Sub

Saat membuat konektor Pub/Sub Sink, Anda perlu menentukan properti berikut.

Nama konektor

Nama unik untuk konektor dalam cluster Connect. Untuk mengetahui panduan tentang penamaan resource, lihat Panduan untuk memberi nama resource Managed Service untuk Apache Kafka.

Jenis plugin konektor

Pilih Pub/Sub Sink sebagai jenis plugin konektor. Hal ini menentukan arah aliran data, yaitu dari Kafka ke Pub/Sub dan implementasi konektor spesifik yang digunakan. Jika Anda tidak menggunakan antarmuka pengguna untuk mengonfigurasi konektor, Anda juga harus menentukan class konektor.

Topik Kafka

Topik Kafka tempat konektor menggunakan pesan. Anda dapat menentukan satu atau beberapa topik, atau menggunakan ekspresi reguler untuk mencocokkan beberapa topik. Misalnya, topic.* untuk mencocokkan semua topik yang dimulai dengan "topic". Topik ini harus ada dalam cluster Managed Service for Apache Kafka yang terkait dengan cluster Connect Anda.

Topik Pub/Sub

Topik Pub/Sub yang ada tempat konektor memublikasikan pesan. Pastikan akun layanan cluster Connect memiliki peran roles/pubsub.publisher di project topik, seperti yang dijelaskan dalam Sebelum memulai.

Konfigurasi

Di bagian ini, Anda dapat menentukan properti konfigurasi tambahan khusus konektor.

Karena data dalam topik Kafka dapat memiliki berbagai format seperti Avro, JSON, atau byte mentah, bagian penting dari konfigurasi melibatkan penentuan konverter. Konverter menerjemahkan data dari format yang digunakan di topik Kafka Anda ke format internal Kafka Connect yang standar. Konektor Sink Pub/Sub kemudian mengambil data internal ini dan mengubahnya menjadi format yang diperlukan oleh Pub/Sub sebelum menuliskannya.

Untuk mengetahui informasi umum selengkapnya tentang peran konverter di Kafka Connect, jenis konverter yang didukung, dan opsi konfigurasi umum, lihat konverter.

Berikut beberapa konfigurasi khusus untuk konektor Pub/Sub Sink:

  • cps.project: Menentukan ID project Google Cloud yang berisi topik Pub/Sub.

  • cps.topic: Menentukan topik Pub/Sub tempat data dipublikasikan.

  • cps.endpoint: Menentukan endpoint Pub/Sub yang akan digunakan.

Untuk mengetahui daftar properti konfigurasi yang tersedia khusus untuk konektor ini, lihat Konfigurasi konektor Sink Pub/Sub.

Membuat konektor Sink Pub/Sub

Sebelum membuat konektor, tinjau dokumentasi untuk Properti konektor Sink Pub/Sub.

Konsol

  1. Di konsol Google Cloud , buka halaman Connect Clusters.

    Buka Connect Clusters

  2. Klik cluster Connect yang konektornya ingin Anda buat.

    Halaman Connect cluster details akan ditampilkan.

  3. Klik Create connector.

    Halaman Create Kafka connector akan ditampilkan.

  4. Untuk nama konektor, masukkan string.

    Untuk mengetahui panduan tentang cara memberi nama konektor, lihat Panduan untuk memberi nama resource Managed Service untuk Apache Kafka.

  5. Untuk Connector plugin, pilih Pub/Sub Sink.

  6. Di bagian Topics, pilih Select a list of Kafka topics atau Use a topic regex. Kemudian, pilih atau masukkan topik Kafka yang akan digunakan konektor ini untuk menerima pesan. Topik ini ada di cluster Kafka terkait Anda.

  7. Untuk Select a Cloud Pub/Sub topic, pilih topik Pub/Sub tempat konektor ini memublikasikan pesan. Topik ditampilkan dalam format nama lengkap resource: projects/{project}/topics/{topic}.

  8. (Opsional) Konfigurasi setelan tambahan di bagian Configurations. Di sinilah Anda akan menentukan properti seperti tasks.max, key.converter, dan value.converter, seperti yang dibahas di bagian sebelumnya.

  9. Pilih Kebijakan mulai ulang tugas. Untuk mengetahui informasi selengkapnya, lihat Kebijakan memulai ulang tugas.

  10. Klik Create.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Jalankan perintah gcloud managed-kafka connectors create:

    gcloud managed-kafka connectors create CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID \
        --config-file=CONFIG_FILE
    

    Ganti kode berikut:

    • CONNECTOR_ID: ID atau nama konektor. Untuk mengetahui panduan tentang cara memberi nama konektor, lihat Panduan untuk memberi nama resource Managed Service untuk Apache Kafka. Nama konektor tidak dapat diubah.

    • LOCATION: Lokasi tempat Anda membuat konektor. Lokasi ini harus sama dengan lokasi tempat Anda membuat Connect cluster.

    • CONNECT_CLUSTER_ID: ID cluster Connect tempat konektor dibuat.

    • CONFIG_FILE: Jalur ke file konfigurasi YAML untuk konektor BigQuery Sink.

    Berikut adalah contoh file konfigurasi untuk konektor Pub/Sub Sink:

    connector.class: "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector"
    name: "CPS_SINK_CONNECTOR_ID"
    tasks.max: "1"
    topics: "GMK_TOPIC_ID"
    value.converter: "org.apache.kafka.connect.storage.StringConverter"
    key.converter: "org.apache.kafka.connect.storage.StringConverter"
    cps.topic: "CPS_TOPIC_ID"
    cps.project: "GCP_PROJECT_ID"
    

    Ganti kode berikut:

    • CPS_SINK_CONNECTOR_ID: ID atau nama konektor Pub/Sub Sink. Untuk mengetahui panduan tentang cara memberi nama konektor, lihat Panduan untuk memberi nama resource Managed Service for Apache Kafka. Nama konektor tidak dapat diubah.

    • GMK_TOPIC_ID: ID topik Managed Service for Apache Kafka yang datanya dibaca oleh konektor Pub/Sub Sink.

    • CPS_TOPIC_ID: ID topik Pub/Sub tempat data dipublikasikan.

    • GCP_PROJECT_ID: ID project Google Cloud tempat topik Pub/Sub Anda berada.

  3. Terraform

    Anda dapat menggunakan resource Terraform untuk membuat konektor.

    resource "google_managed_kafka_connector" "example-pubsub-sink-connector" {
      project         = data.google_project.default.project_id
      connector_id    = "my-pubsub-sink-connector"
      connect_cluster = google_managed_kafka_connect_cluster.default.connect_cluster_id
      location        = "us-central1"
    
      configs = {
        "connector.class" = "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector"
        "name"            = "my-pubsub-sink-connector"
        "tasks.max"       = "3"
        "topics"          = "TOPIC_NAME"
        "cps.topic"       = "CPS_TOPIC_NAME"
        "cps.project"     = "CPS_PROJECT_NAME"
        "value.converter" = "org.apache.kafka.connect.storage.StringConverter"
        "key.converter"   = "org.apache.kafka.connect.storage.StringConverter"
      }
    
      provider = google-beta
    }

    Untuk mempelajari cara menerapkan atau menghapus konfigurasi Terraform, lihat Perintah dasar Terraform.

    Go

    Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Go di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Managed Service for Apache Kafka Go API.

    Untuk melakukan autentikasi ke Managed Service untuk Apache Kafka, siapkan Kredensial Default Aplikasi(ADC). Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.

    import (
    	"context"
    	"fmt"
    	"io"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    )
    
    // createPubSubSinkConnector creates a Pub/Sub Sink connector.
    func createPubSubSinkConnector(w io.Writer, projectID, region, connectClusterID, connectorID, topics, valueConverter, keyConverter, cpsTopic, cpsProject, tasksMax string, opts ...option.ClientOption) error {
    	// TODO(developer): Update with your config values. Here is a sample configuration:
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// connectClusterID := "my-connect-cluster"
    	// connectorID := "CPS_SINK_CONNECTOR_ID"
    	// topics := "GMK_TOPIC_ID"
    	// valueConverter := "org.apache.kafka.connect.storage.StringConverter"
    	// keyConverter := "org.apache.kafka.connect.storage.StringConverter"
    	// cpsTopic := "CPS_TOPIC_ID"
    	// cpsProject := "GCP_PROJECT_ID"
    	// tasksMax := "3"
    	ctx := context.Background()
    	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
    	}
    	defer client.Close()
    
    	parent := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s", projectID, region, connectClusterID)
    
    	// Pub/Sub Sink sample connector configuration
    	config := map[string]string{
    		"connector.class": "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector",
    		"name":            connectorID,
    		"tasks.max":       tasksMax,
    		"topics":          topics,
    		"value.converter": valueConverter,
    		"key.converter":   keyConverter,
    		"cps.topic":       cpsTopic,
    		"cps.project":     cpsProject,
    	}
    
    	connector := &managedkafkapb.Connector{
    		Name:    fmt.Sprintf("%s/connectors/%s", parent, connectorID),
    		Configs: config,
    	}
    
    	req := &managedkafkapb.CreateConnectorRequest{
    		Parent:      parent,
    		ConnectorId: connectorID,
    		Connector:   connector,
    	}
    
    	resp, err := client.CreateConnector(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.CreateConnector got err: %w", err)
    	}
    	fmt.Fprintf(w, "Created Pub/Sub sink connector: %s\n", resp.Name)
    	return nil
    }
    

    Java

    Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Java di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Managed Service for Apache Kafka Java API.

    Untuk melakukan autentikasi ke Managed Service for Apache Kafka, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.

    
    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ConnectClusterName;
    import com.google.cloud.managedkafka.v1.Connector;
    import com.google.cloud.managedkafka.v1.ConnectorName;
    import com.google.cloud.managedkafka.v1.CreateConnectorRequest;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    public class CreatePubSubSinkConnector {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String connectClusterId = "my-connect-cluster";
        String connectorId = "my-pubsub-sink-connector";
        String pubsubProjectId = "my-pubsub-project-id";
        String pubsubTopicName = "my-pubsub-topic";
        String kafkaTopicName = "kafka-topic";
        String connectorClass = "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector";
        String maxTasks = "3";
        String valueConverter = "org.apache.kafka.connect.storage.StringConverter";
        String keyConverter = "org.apache.kafka.connect.storage.StringConverter";
        createPubSubSinkConnector(
            projectId,
            region,
            connectClusterId,
            connectorId,
            pubsubProjectId,
            pubsubTopicName,
            kafkaTopicName,
            connectorClass,
            maxTasks,
            valueConverter,
            keyConverter);
      }
    
      public static void createPubSubSinkConnector(
          String projectId,
          String region,
          String connectClusterId,
          String connectorId,
          String pubsubProjectId,
          String pubsubTopicName,
          String kafkaTopicName,
          String connectorClass,
          String maxTasks,
          String valueConverter,
          String keyConverter)
          throws Exception {
    
        // Build the connector configuration
        Map<String, String> configMap = new HashMap<>();
        configMap.put("connector.class", connectorClass);
        configMap.put("name", connectorId);
        configMap.put("tasks.max", maxTasks);
        configMap.put("topics", kafkaTopicName);
        configMap.put("value.converter", valueConverter);
        configMap.put("key.converter", keyConverter);
        configMap.put("cps.topic", pubsubTopicName);
        configMap.put("cps.project", pubsubProjectId);
    
        Connector connector = Connector.newBuilder()
            .setName(
                ConnectorName.of(projectId, region, connectClusterId, connectorId).toString())
            .putAllConfigs(configMap)
            .build();
    
        try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
          CreateConnectorRequest request = CreateConnectorRequest.newBuilder()
              .setParent(ConnectClusterName.of(projectId, region, connectClusterId).toString())
              .setConnectorId(connectorId)
              .setConnector(connector)
              .build();
    
          // This operation is being handled synchronously.
          Connector response = managedKafkaConnectClient.createConnector(request);
          System.out.printf("Created Pub/Sub Sink connector: %s\n", response.getName());
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaConnectClient.createConnector got err: %s\n", e.getMessage());
        }
      }
    }
    

    Python

    Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Python di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Managed Service for Apache Kafka Python API.

    Untuk melakukan autentikasi ke Managed Service for Apache Kafka, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
        ManagedKafkaConnectClient,
    )
    from google.cloud.managedkafka_v1.types import Connector, CreateConnectorRequest
    
    connect_client = ManagedKafkaConnectClient()
    parent = connect_client.connect_cluster_path(project_id, region, connect_cluster_id)
    
    configs = {
        "connector.class": "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector",
        "name": connector_id,
        "tasks.max": tasks_max,
        "topics": topics,
        "value.converter": value_converter,
        "key.converter": key_converter,
        "cps.topic": cps_topic,
        "cps.project": cps_project,
    }
    
    connector = Connector()
    connector.name = connector_id
    connector.configs = configs
    
    request = CreateConnectorRequest(
        parent=parent,
        connector_id=connector_id,
        connector=connector,
    )
    
    try:
        operation = connect_client.create_connector(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        response = operation.result()
        print("Created Connector:", response)
    except GoogleAPICallError as e:
        print(f"The operation failed with error: {e}")

Setelah membuat konektor, Anda dapat mengedit, menghapus, menjeda, menghentikan, atau memulai ulang konektor.

Apa langkah selanjutnya?

Apache Kafka® adalah merek dagang terdaftar milik The Apache Software Foundation atau afiliasinya di Amerika Serikat dan/atau negara lain.