Buat konektor Sumber Pub/Sub

Konektor Sumber Pub/Sub melakukan streaming pesan dari Pub/Sub ke Kafka, yang memungkinkan Anda mengintegrasikan Pub/Sub dengan aplikasi dan pipeline data berbasis Kafka.

Kasus penggunaan untuk konektor Sumber Pub/Sub mencakup hal berikut:

  • Penyerapan data real-time. Memublikasikan data dari layanan cloud atau aplikasi lain ke Pub/Sub, lalu mereplikasi data ke Kafka untuk pemrosesan streaming.

  • Arsitektur berbasis peristiwa. Memicu pemrosesan berbasis Kafka dari pesan yang dipublikasikan ke Pub/Sub.

Konektor membaca pesan dari langganan Pub/Sub, mengonversi setiap pesan menjadi data Kafka, dan menulis data ke topik Kafka. Secara default, konektor membuat data Kafka sebagai berikut:

  • Kunci data Kafka adalah null.
  • Nilai data Kafka adalah data pesan Pub/Sub sebagai byte.
  • Header data Kafka kosong.

Namun, Anda dapat mengonfigurasi perilaku ini. Untuk mengetahui informasi selengkapnya, lihat Mengonfigurasi konektor.

Sebelum memulai

Sebelum membuat konektor Sumber Pub/Sub, pastikan Anda memiliki hal berikut:

Peran dan izin yang diperlukan

Untuk mendapatkan izin yang diperlukan untuk membuat konektor, minta administrator Anda untuk memberi Anda peran IAM Managed Kafka Connector Editor (roles/managedkafka.connectorEditor) di project Anda. Untuk mengetahui informasi selengkapnya tentang cara memberikan peran, lihat Mengelola akses ke project, folder, dan organisasi.

Peran bawaan ini berisi izin yang diperlukan untuk membuat konektor. Untuk melihat izin yang benar-benar diperlukan, perluas bagian Izin yang diperlukan:

Izin yang diperlukan

Izin berikut diperlukan untuk membuat konektor:

  • Membuat konektor: managedkafka.connectors.create

Anda mungkin juga bisa mendapatkan izin ini dengan peran khusus atau peran bawaan lainnya.

Memberikan izin untuk membaca dari Pub/Sub

Akun layanan Managed Kafka harus memiliki izin untuk membaca pesan dari langganan Pub/Sub. Berikan peran IAM berikut ke akun layanan di project yang berisi langganan Pub/Sub:

  • Pub/Sub Subscriber (roles/pubsub.subscriber)
  • Pub/Sub Viewer (roles/pubsub.viewer)

Akun layanan Managed Kafka memiliki format berikut: service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com, dengan PROJECT_NUMBER adalah nomor project cluster Connect.

Jika cluster Connect Anda berada di project yang berbeda dengan cluster Managed Service untuk Apache Kafka cluster, lihat Membuat cluster Connect di project yang berbeda.

Membuat konektor Sumber Pub/Sub

Konsol

  1. Di Google Cloud konsol, buka halaman Connect Clusters.

    Buka Connect Clusters

  2. Klik cluster Connect tempat Anda ingin membuat konektor.

  3. Klik Create connector.

  4. Untuk nama konektor, masukkan string.

    Untuk mengetahui panduan tentang cara memberi nama konektor, lihat Panduan untuk memberi nama resource Managed Service untuk Apache Kafka.

  5. Untuk Connector plugin, pilih Pub/Sub Source.

  6. Dalam daftar Cloud Pub/Sub subscription, pilih langganan Pub/Sub. Konektor mengambil pesan dari langganan ini. Langganan ditampilkan sebagai nama resource lengkap: projects/{project}/subscriptions/{subscription}.

  7. Dalam daftar Kafka topic, pilih topik Kafka tempat pesan ditulis.

  8. Opsional: Di kotak Configurations, tambahkan properti konfigurasi atau edit properti default. Untuk mengetahui informasi selengkapnya, lihat Mengonfigurasi konektor.

  9. Pilih Task restart policy. Untuk mengetahui informasi selengkapnya, lihat Kebijakan memulai ulang tugas.

  10. Klik Create.

gcloud

  1. Jalankan perintah gcloud managed-kafka connectors create:

    gcloud managed-kafka connectors create CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID \
        --config-file=CONFIG_FILE
    

    Ganti kode berikut:

    • CONNECTOR_ID: ID atau nama konektor. Untuk mengetahui panduan tentang cara memberi nama konektor, lihat Panduan untuk memberi nama resource Managed Service untuk Apache Kafka. Nama konektor tidak dapat diubah.

    • LOCATION: Lokasi cluster Connect.

    • CONNECT_CLUSTER_ID: ID cluster Connect tempat konektor dibuat.

    • CONFIG_FILE: Jalur ke file konfigurasi YAML atau JSON.

Berikut adalah contoh file konfigurasi:

connector.class: "com.google.pubsub.kafka.source.CloudPubSubSourceConnector"
cps.project: "PROJECT_ID"
cps.subscription: "PUBSUB_SUBSCRIPTION_ID"
kafka.topic: "KAFKA_TOPIC_ID"
value.converter: "org.apache.kafka.connect.converters.ByteArrayConverter"
key.converter: "org.apache.kafka.connect.storage.StringConverter"
tasks.max: "3"

Ganti kode berikut:

  • PROJECT_ID: ID project tempat langganan Pub/Sub berada. Google Cloud

  • PUBSUB_SUBSCRIPTION_ID: ID langganan Pub/Sub untuk mengambil data.

  • KAFKA_TOPIC_ID: ID topik Kafka tempat data ditulis.

Properti konfigurasi cps.project, cps.subscription, dan kafka.topic diperlukan. Untuk opsi konfigurasi tambahan, lihat Mengonfigurasi konektor.

Terraform

Anda dapat menggunakan resource Terraform untuk membuat a konektor.

resource "google_managed_kafka_connector" "example-pubsub-source-connector" {
  project         = data.google_project.default.project_id
  connector_id    = "my-pubsub-source-connector"
  connect_cluster = google_managed_kafka_connect_cluster.default.connect_cluster_id
  location        = "us-central1"

  configs = {
    "connector.class"  = "com.google.pubsub.kafka.source.CloudPubSubSourceConnector"
    "name"             = "my-pubsub-source-connector"
    "tasks.max"        = "3"
    "kafka.topic"      = "GMK_TOPIC_ID"
    "cps.subscription" = "CPS_SUBSCRIPTION_ID"
    "cps.project"      = data.google_project.default.project_id
    "value.converter"  = "org.apache.kafka.connect.converters.ByteArrayConverter"
    "key.converter"    = "org.apache.kafka.connect.storage.StringConverter"
  }

  provider = google-beta
}

Untuk mempelajari cara menerapkan atau menghapus konfigurasi Terraform, lihat Perintah dasar Terraform.

Go

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Go di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat Dokumentasi referensi Managed Service untuk Apache Kafka Go API.

Untuk melakukan autentikasi ke Managed Service untuk Apache Kafka, siapkan Kredensial Default Aplikasi(ADC). Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.

import (
	"context"
	"fmt"
	"io"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/option"
)

// createPubSubSourceConnector creates a Pub/Sub Source connector.
func createPubSubSourceConnector(w io.Writer, projectID, region, connectClusterID, connectorID, kafkaTopic, cpsSubscription, cpsProject, tasksMax, valueConverter, keyConverter string, opts ...option.ClientOption) error {
	// TODO(developer): Update with your config values. Here is a sample configuration:
	// projectID := "my-project-id"
	// region := "us-central1"
	// connectClusterID := "my-connect-cluster"
	// connectorID := "CPS_SOURCE_CONNECTOR_ID"
	// kafkaTopic := "GMK_TOPIC_ID"
	// cpsSubscription := "CPS_SUBSCRIPTION_ID"
	// cpsProject := "GCP_PROJECT_ID"
	// tasksMax := "3"
	// valueConverter := "org.apache.kafka.connect.converters.ByteArrayConverter"
	// keyConverter := "org.apache.kafka.connect.storage.StringConverter"
	ctx := context.Background()
	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
	}
	defer client.Close()

	parent := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s", projectID, region, connectClusterID)

	// Pub/Sub Source sample connector configuration
	config := map[string]string{
		"connector.class":  "com.google.pubsub.kafka.source.CloudPubSubSourceConnector",
		"name":             connectorID,
		"tasks.max":        tasksMax,
		"kafka.topic":      kafkaTopic,
		"cps.subscription": cpsSubscription,
		"cps.project":      cpsProject,
		"value.converter":  valueConverter,
		"key.converter":    keyConverter,
	}

	connector := &managedkafkapb.Connector{
		Name:    fmt.Sprintf("%s/connectors/%s", parent, connectorID),
		Configs: config,
	}

	req := &managedkafkapb.CreateConnectorRequest{
		Parent:      parent,
		ConnectorId: connectorID,
		Connector:   connector,
	}

	resp, err := client.CreateConnector(ctx, req)
	if err != nil {
		return fmt.Errorf("client.CreateConnector got err: %w", err)
	}
	fmt.Fprintf(w, "Created Pub/Sub source connector: %s\n", resp.Name)
	return nil
}

Java

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Java di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat Dokumentasi referensi Managed Service untuk Apache Kafka Java API.

Untuk melakukan autentikasi ke Managed Service untuk Apache Kafka, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.


import com.google.api.gax.rpc.ApiException;
import com.google.cloud.managedkafka.v1.ConnectClusterName;
import com.google.cloud.managedkafka.v1.Connector;
import com.google.cloud.managedkafka.v1.ConnectorName;
import com.google.cloud.managedkafka.v1.CreateConnectorRequest;
import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class CreatePubSubSourceConnector {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String connectClusterId = "my-connect-cluster";
    String connectorId = "my-pubsub-source-connector";
    String pubsubProjectId = "my-pubsub-project-id";
    String subscriptionName = "my-subscription";
    String kafkaTopicName = "pubsub-topic";
    String connectorClass = "com.google.pubsub.kafka.source.CloudPubSubSourceConnector";
    String maxTasks = "3";
    String valueConverter = "org.apache.kafka.connect.converters.ByteArrayConverter";
    String keyConverter = "org.apache.kafka.connect.storage.StringConverter";
    createPubSubSourceConnector(
        projectId,
        region,
        connectClusterId,
        connectorId,
        pubsubProjectId,
        subscriptionName,
        kafkaTopicName,
        connectorClass,
        maxTasks,
        valueConverter,
        keyConverter);
  }

  public static void createPubSubSourceConnector(
      String projectId,
      String region,
      String connectClusterId,
      String connectorId,
      String pubsubProjectId,
      String subscriptionName,
      String kafkaTopicName,
      String connectorClass,
      String maxTasks,
      String valueConverter,
      String keyConverter)
      throws Exception {

    // Build the connector configuration
    Map<String, String> configMap = new HashMap<>();
    configMap.put("connector.class", connectorClass);
    configMap.put("name", connectorId);
    configMap.put("tasks.max", maxTasks);
    configMap.put("kafka.topic", kafkaTopicName);
    configMap.put("cps.subscription", subscriptionName);
    configMap.put("cps.project", pubsubProjectId);
    configMap.put("value.converter", valueConverter);
    configMap.put("key.converter", keyConverter);

    Connector connector = Connector.newBuilder()
        .setName(
            ConnectorName.of(projectId, region, connectClusterId, connectorId).toString())
        .putAllConfigs(configMap)
        .build();

    try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
      CreateConnectorRequest request = CreateConnectorRequest.newBuilder()
          .setParent(ConnectClusterName.of(projectId, region, connectClusterId).toString())
          .setConnectorId(connectorId)
          .setConnector(connector)
          .build();

      // This operation is being handled synchronously.
      Connector response = managedKafkaConnectClient.createConnector(request);
      System.out.printf("Created Pub/Sub Source connector: %s\n", response.getName());
    } catch (IOException | ApiException e) {
      System.err.printf("managedKafkaConnectClient.createConnector got err: %s\n", e.getMessage());
    }
  }
}

Python

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Python di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat Dokumentasi referensi Managed Service untuk Apache Kafka Python API.

Untuk melakukan autentikasi ke Managed Service untuk Apache Kafka, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.

from google.api_core.exceptions import GoogleAPICallError
from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
    ManagedKafkaConnectClient,
)
from google.cloud.managedkafka_v1.types import Connector, CreateConnectorRequest

connect_client = ManagedKafkaConnectClient()
parent = connect_client.connect_cluster_path(project_id, region, connect_cluster_id)

configs = {
    "connector.class": "com.google.pubsub.kafka.source.CloudPubSubSourceConnector",
    "name": connector_id,
    "tasks.max": tasks_max,
    "kafka.topic": kafka_topic,
    "cps.subscription": cps_subscription,
    "cps.project": cps_project,
    "value.converter": value_converter,
    "key.converter": key_converter,
}

connector = Connector()
connector.name = connector_id
connector.configs = configs

request = CreateConnectorRequest(
    parent=parent,
    connector_id=connector_id,
    connector=connector,
)

try:
    operation = connect_client.create_connector(request=request)
    print(f"Waiting for operation {operation.operation.name} to complete...")
    response = operation.result()
    print("Created Connector:", response)
except GoogleAPICallError as e:
    print(f"The operation failed with error: {e}")

Setelah membuat konektor, Anda dapat mengedit, menghapus, menjeda, menghentikan, atau memulai ulang konektor.

Mengonfigurasi konektor

Bagian ini menjelaskan beberapa properti konfigurasi yang dapat Anda tetapkan pada konektor.

Untuk mengetahui daftar lengkap properti yang khusus untuk konektor ini, lihat Konfigurasi konektor Sumber Pub/Sub.

Mode pull

Mode pull menentukan cara konektor mengambil pesan Pub/Sub. Mode berikut didukung:

  • Mode pull (default). Pesan diambil dalam batch. Untuk mengaktifkan mode ini, tetapkan cps.streamingPull.enabled=false. Untuk mengonfigurasi ukuran batch, tetapkan properti cps.maxBatchSize.

    Untuk mengetahui informasi selengkapnya tentang mode pull, lihat Pull API.

  • Mode Streaming Pull. Mengaktifkan throughput maksimum dan latensi terendah saat mengambil pesan dari Pub/Sub. Untuk mengaktifkan mode ini, tetapkan cps.streamingPull.enabled=true.

    Untuk mengetahui informasi selengkapnya tentang mode streaming pull, lihat StreamingPull API.

    Jika streaming pull diaktifkan, Anda dapat menyesuaikan performa dengan menetapkan properti konfigurasi berikut:

    • cps.streamingPull.flowControlBytes: Jumlah maksimum byte pesan yang belum selesai per tugas.
    • cps.streamingPull.flowControlMessages: Jumlah maksimum pesan yang belum selesai per tugas.
    • cps.streamingPull.maxAckExtensionMs: Jumlah waktu maksimum konektor memperpanjang batas waktu langganan, dalam milidetik.
    • cps.streamingPull.maxMsPerAckExtension: Jumlah waktu maksimum konektor memperpanjang batas waktu langganan per perpanjangan, dalam milidetik.
    • cps.streamingPull.parallelStreams: Jumlah streaming untuk mengambil pesan dari langganan.

Endpoint Pub/Sub

Secara default, konektor menggunakan endpoint Pub/Sub global. Untuk menentukan endpoint, tetapkan properti cps.endpoint ke alamat endpoint. Untuk mengetahui informasi selengkapnya tentang endpoint, lihat Endpoint Pub/Sub.

Partisi Kafka

Secara default, konektor menulis ke satu partisi dalam topik. Untuk menentukan jumlah partisi yang ditulis konektor, tetapkan properti kafka.partition.count. Nilai tidak boleh melebihi jumlah partisi topik .

Untuk menentukan cara konektor menetapkan pesan ke partisi, tetapkan properti kafka.partition.scheme. Untuk mengetahui informasi selengkapnya, lihat Konfigurasi konektor Sumber Pub/Sub.

Pelaku konversi

Tetapkan pengonversi kunci ke org.apache.kafka.connect.storage.StringConverter.

Bergantung pada konfigurasi konektor, tetapkan pengonversi nilai ke salah satu hal berikut:

  • org.apache.kafka.connect.converters.ByteArrayConverter
  • org.apache.kafka.connect.json.JsonConverter

Untuk mengetahui informasi selengkapnya, lihat Nilai data.

Konversi pesan

Konektor Sumber Pub/Sub mengonversi pesan Pub/Sub ke data Kafka. Bagian berikut menjelaskan proses konversi.

Kunci data

Pengonversi kunci harus berupa org.apache.kafka.connect.storage.StringConverter.

  • Secara default, kunci data adalah null.

  • Untuk menggunakan atribut pesan Pub/Sub sebagai kunci, tetapkan kafka.key.attribute ke nama atribut. Misalnya, kafka.key.attribute=username.

  • Untuk menggunakan kunci pengurutan Pub/Sub sebagai kunci, tetapkan kafka.key.attribute=orderingKey.

Header data

Secara default, header data kosong.

Jika kafka.record.headers adalah true, atribut pesan Pub/Sub akan ditulis sebagai header data. Untuk menyertakan kunci pengurutan, tetapkan cps.makeOrderingKeyAttribute=true.

Nilai data

Nilai data ditulis sebagai array byte atau sebagai jenis struct.

Nilai data array byte

Jika kafka.record.headers adalah true, atau pesan Pub/Sub tidak memiliki atribut kustom, konektor akan menulis data pesan sebagai array byte. Tetapkan pengonversi nilai ke org.apache.kafka.connect.converters.ByteArrayConverter.

Nilai data struct

Jika kafka.record.headers adalah false dan pesan memiliki setidaknya satu atribut kustom, konektor akan menulis nilai data sebagai struct. Tetapkan pengonversi nilai ke org.apache.kafka.connect.json.JsonConverter.

struct berisi kolom berikut:

  • message: Data pesan Pub/Sub, sebagai byte.

  • Kolom untuk setiap atribut pesan Pub/Sub. Untuk menyertakan kunci pengurutan, tetapkan cps.makeOrderingKeyAttribute=true.

Misalnya, jika pesan memiliki atribut username, nilai data akan terlihat seperti berikut:

{
  "message":"MESSAGE_DATA",
  "username":"Alice"
}

Jika value.converter.schemas.enable adalah true, struct akan menyertakan payload dan skema:

{
  "schema":
    {
      "type":"struct",
      "fields": [
        {
          "type":"bytes",
          "optional":false,
          "field":"message"
        },
        {
          "type":"string",
          "optional":false,
          "field":"username"
        }
      ],
      "optional":false
    },
    "payload": {
      "message":"MESSAGE_DATA",
      "username":"Alice"
    }
}

Apa langkah selanjutnya?

Apache Kafka® adalah merek dagang terdaftar dari The Apache Software Foundation atau afiliasinya di Amerika Serikat dan/atau negara lain.