Buat konektor Sumber Pub/Sub

Konektor Sumber Pub/Sub mengalirkan pesan dari Pub/Sub ke Kafka. Dengan begitu, Anda dapat mengintegrasikan Pub/Sub dengan aplikasi dan pipeline data berbasis Kafka.

Konektor membaca pesan dari langganan Pub/Sub, mengonversi setiap pesan menjadi rekaman Kafka, dan menulis rekaman ke topik Kafka. Secara default, konektor membuat rekaman Kafka sebagai berikut:

  • Kunci data Kafka adalah null.
  • Nilai rekaman Kafka adalah data pesan Pub/Sub sebagai byte.
  • Header rekaman Kafka kosong.

Namun, Anda dapat mengonfigurasi perilaku ini. Untuk mengetahui informasi selengkapnya, lihat Mengonfigurasi konektor.

Sebelum memulai

Sebelum membuat konektor Sumber Pub/Sub, pastikan Anda memiliki hal berikut:

Peran dan izin yang diperlukan

Untuk mendapatkan izin yang diperlukan untuk membuat konektor Sumber Pub/Sub, minta administrator Anda untuk memberi Anda peran IAM Managed Kafka Connector Editor (roles/managedkafka.connectorEditor) di project yang berisi cluster Connect. Untuk mengetahui informasi selengkapnya tentang cara memberikan peran, lihat Mengelola akses ke project, folder, dan organisasi.

Peran bawaan ini berisi izin yang diperlukan untuk membuat konektor Sumber Pub/Sub. Untuk melihat izin yang benar-benar diperlukan, perluas bagian Izin yang diperlukan:

Izin yang diperlukan

Izin berikut diperlukan untuk membuat konektor Sumber Pub/Sub:

  • Berikan izin pembuatan konektor di cluster Connect induk: managedkafka.connectors.create

Anda mungkin juga bisa mendapatkan izin ini dengan peran khusus atau peran bawaan lainnya.

Untuk mengetahui informasi selengkapnya tentang peran Editor Konektor Kafka Terkelola, lihat Peran standar Managed Service untuk Apache Kafka.

Jika cluster Managed Service for Apache Kafka Anda berada dalam project yang sama dengan cluster Connect, tidak diperlukan izin lebih lanjut. Jika cluster Connect berada di project lain, lihat Membuat Cluster Connect di project lain.

Memberikan izin untuk membaca dari Pub/Sub

Akun layanan Managed Kafka harus memiliki izin untuk membaca pesan dari langganan Pub/Sub. Berikan peran IAM berikut kepada akun layanan di project yang berisi langganan Pub/Sub:

  • Pub/Sub Subscriber (roles/pubsub.subscriber)
  • Pub/Sub Viewer (roles/pubsub.viewer)

Akun layanan Managed Kafka memiliki format berikut: service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com. Ganti PROJECT_NUMBER dengan nomor project.

Buat konektor Sumber Pub/Sub

Konsol

  1. Di konsol Google Cloud , buka halaman Connect Clusters.

    Buka Connect Clusters

  2. Klik cluster Connect tempat Anda ingin membuat konektor.

  3. Klik Create connector.

  4. Untuk nama konektor, masukkan string.

    Untuk mengetahui panduan tentang cara memberi nama konektor, lihat Panduan untuk memberi nama resource Managed Service untuk Apache Kafka.

  5. Untuk Connector plugin, pilih Pub/Sub Source.

  6. Dalam daftar Langganan Cloud Pub/Sub, pilih langganan Pub/Sub. Konektor menarik pesan dari langganan ini. Langganan ditampilkan sebagai nama resource lengkap: projects/{project}/subscriptions/{subscription}.

  7. Di daftar Kafka topic, pilih topik Kafka tempat pesan ditulis.

  8. Opsional: Di kotak Configurations, tambahkan properti konfigurasi atau edit properti default. Untuk mengetahui informasi selengkapnya, lihat Mengonfigurasi konektor.

  9. Pilih Kebijakan mulai ulang tugas. Untuk mengetahui informasi selengkapnya, lihat Kebijakan memulai ulang tugas.

  10. Klik Create.

gcloud

  1. Jalankan perintah gcloud managed-kafka connectors create:

    gcloud managed-kafka connectors create CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID \
        --config-file=CONFIG_FILE
    

    Ganti kode berikut:

    • CONNECTOR_ID: ID atau nama konektor. Untuk mengetahui panduan tentang cara memberi nama konektor, lihat Panduan untuk memberi nama resource Managed Service untuk Apache Kafka. Nama konektor tidak dapat diubah.

    • LOCATION: Lokasi cluster Connect.

    • CONNECT_CLUSTER_ID: ID cluster Connect tempat konektor dibuat.

    • CONFIG_FILE: Jalur ke file konfigurasi YAML atau JSON.

Berikut adalah contoh file konfigurasi:

connector.class: "com.google.pubsub.kafka.source.CloudPubSubSourceConnector"
cps.project: "PROJECT_ID"
cps.subscription: "PUBSUB_SUBSCRIPTION_ID"
kafka.topic: "KAFKA_TOPIC_ID"
value.converter: "org.apache.kafka.connect.converters.ByteArrayConverter"
key.converter: "org.apache.kafka.connect.storage.StringConverter"
tasks.max: "3"

Ganti kode berikut:

  • PROJECT_ID: ID project Google Cloud tempat langganan Pub/Sub berada.

  • PUBSUB_SUBSCRIPTION_ID: ID langganan Pub/Sub untuk menarik data.

  • KAFKA_TOPIC_ID: ID topik Kafka tempat data ditulis.

Properti konfigurasi cps.project, cps.subscription, dan kafka.topic diperlukan. Untuk opsi konfigurasi tambahan, lihat Mengonfigurasi konektor.

Terraform

Anda dapat menggunakan resource Terraform untuk membuat konektor.

resource "google_managed_kafka_connector" "example-pubsub-source-connector" {
  project         = data.google_project.default.project_id
  connector_id    = "my-pubsub-source-connector"
  connect_cluster = google_managed_kafka_connect_cluster.default.connect_cluster_id
  location        = "us-central1"

  configs = {
    "connector.class"  = "com.google.pubsub.kafka.source.CloudPubSubSourceConnector"
    "name"             = "my-pubsub-source-connector"
    "tasks.max"        = "3"
    "kafka.topic"      = "GMK_TOPIC_ID"
    "cps.subscription" = "CPS_SUBSCRIPTION_ID"
    "cps.project"      = data.google_project.default.project_id
    "value.converter"  = "org.apache.kafka.connect.converters.ByteArrayConverter"
    "key.converter"    = "org.apache.kafka.connect.storage.StringConverter"
  }

  provider = google-beta
}

Untuk mempelajari cara menerapkan atau menghapus konfigurasi Terraform, lihat Perintah dasar Terraform.

Go

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Go di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Managed Service for Apache Kafka Go API.

Untuk melakukan autentikasi ke Managed Service untuk Apache Kafka, siapkan Kredensial Default Aplikasi(ADC). Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.

import (
	"context"
	"fmt"
	"io"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/option"
)

// createPubSubSourceConnector creates a Pub/Sub Source connector.
func createPubSubSourceConnector(w io.Writer, projectID, region, connectClusterID, connectorID, kafkaTopic, cpsSubscription, cpsProject, tasksMax, valueConverter, keyConverter string, opts ...option.ClientOption) error {
	// TODO(developer): Update with your config values. Here is a sample configuration:
	// projectID := "my-project-id"
	// region := "us-central1"
	// connectClusterID := "my-connect-cluster"
	// connectorID := "CPS_SOURCE_CONNECTOR_ID"
	// kafkaTopic := "GMK_TOPIC_ID"
	// cpsSubscription := "CPS_SUBSCRIPTION_ID"
	// cpsProject := "GCP_PROJECT_ID"
	// tasksMax := "3"
	// valueConverter := "org.apache.kafka.connect.converters.ByteArrayConverter"
	// keyConverter := "org.apache.kafka.connect.storage.StringConverter"
	ctx := context.Background()
	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
	}
	defer client.Close()

	parent := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s", projectID, region, connectClusterID)

	// Pub/Sub Source sample connector configuration
	config := map[string]string{
		"connector.class":  "com.google.pubsub.kafka.source.CloudPubSubSourceConnector",
		"name":             connectorID,
		"tasks.max":        tasksMax,
		"kafka.topic":      kafkaTopic,
		"cps.subscription": cpsSubscription,
		"cps.project":      cpsProject,
		"value.converter":  valueConverter,
		"key.converter":    keyConverter,
	}

	connector := &managedkafkapb.Connector{
		Name:    fmt.Sprintf("%s/connectors/%s", parent, connectorID),
		Configs: config,
	}

	req := &managedkafkapb.CreateConnectorRequest{
		Parent:      parent,
		ConnectorId: connectorID,
		Connector:   connector,
	}

	resp, err := client.CreateConnector(ctx, req)
	if err != nil {
		return fmt.Errorf("client.CreateConnector got err: %w", err)
	}
	fmt.Fprintf(w, "Created Pub/Sub source connector: %s\n", resp.Name)
	return nil
}

Java

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Java di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Managed Service for Apache Kafka Java API.

Untuk melakukan autentikasi ke Managed Service for Apache Kafka, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.


import com.google.api.gax.rpc.ApiException;
import com.google.cloud.managedkafka.v1.ConnectClusterName;
import com.google.cloud.managedkafka.v1.Connector;
import com.google.cloud.managedkafka.v1.ConnectorName;
import com.google.cloud.managedkafka.v1.CreateConnectorRequest;
import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class CreatePubSubSourceConnector {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String connectClusterId = "my-connect-cluster";
    String connectorId = "my-pubsub-source-connector";
    String pubsubProjectId = "my-pubsub-project-id";
    String subscriptionName = "my-subscription";
    String kafkaTopicName = "pubsub-topic";
    String connectorClass = "com.google.pubsub.kafka.source.CloudPubSubSourceConnector";
    String maxTasks = "3";
    String valueConverter = "org.apache.kafka.connect.converters.ByteArrayConverter";
    String keyConverter = "org.apache.kafka.connect.storage.StringConverter";
    createPubSubSourceConnector(
        projectId,
        region,
        connectClusterId,
        connectorId,
        pubsubProjectId,
        subscriptionName,
        kafkaTopicName,
        connectorClass,
        maxTasks,
        valueConverter,
        keyConverter);
  }

  public static void createPubSubSourceConnector(
      String projectId,
      String region,
      String connectClusterId,
      String connectorId,
      String pubsubProjectId,
      String subscriptionName,
      String kafkaTopicName,
      String connectorClass,
      String maxTasks,
      String valueConverter,
      String keyConverter)
      throws Exception {

    // Build the connector configuration
    Map<String, String> configMap = new HashMap<>();
    configMap.put("connector.class", connectorClass);
    configMap.put("name", connectorId);
    configMap.put("tasks.max", maxTasks);
    configMap.put("kafka.topic", kafkaTopicName);
    configMap.put("cps.subscription", subscriptionName);
    configMap.put("cps.project", pubsubProjectId);
    configMap.put("value.converter", valueConverter);
    configMap.put("key.converter", keyConverter);

    Connector connector = Connector.newBuilder()
        .setName(
            ConnectorName.of(projectId, region, connectClusterId, connectorId).toString())
        .putAllConfigs(configMap)
        .build();

    try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
      CreateConnectorRequest request = CreateConnectorRequest.newBuilder()
          .setParent(ConnectClusterName.of(projectId, region, connectClusterId).toString())
          .setConnectorId(connectorId)
          .setConnector(connector)
          .build();

      // This operation is being handled synchronously.
      Connector response = managedKafkaConnectClient.createConnector(request);
      System.out.printf("Created Pub/Sub Source connector: %s\n", response.getName());
    } catch (IOException | ApiException e) {
      System.err.printf("managedKafkaConnectClient.createConnector got err: %s\n", e.getMessage());
    }
  }
}

Python

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Python di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Managed Service for Apache Kafka Python API.

Untuk melakukan autentikasi ke Managed Service for Apache Kafka, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.

from google.api_core.exceptions import GoogleAPICallError
from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
    ManagedKafkaConnectClient,
)
from google.cloud.managedkafka_v1.types import Connector, CreateConnectorRequest

connect_client = ManagedKafkaConnectClient()
parent = connect_client.connect_cluster_path(project_id, region, connect_cluster_id)

configs = {
    "connector.class": "com.google.pubsub.kafka.source.CloudPubSubSourceConnector",
    "name": connector_id,
    "tasks.max": tasks_max,
    "kafka.topic": kafka_topic,
    "cps.subscription": cps_subscription,
    "cps.project": cps_project,
    "value.converter": value_converter,
    "key.converter": key_converter,
}

connector = Connector()
connector.name = connector_id
connector.configs = configs

request = CreateConnectorRequest(
    parent=parent,
    connector_id=connector_id,
    connector=connector,
)

try:
    operation = connect_client.create_connector(request=request)
    print(f"Waiting for operation {operation.operation.name} to complete...")
    response = operation.result()
    print("Created Connector:", response)
except GoogleAPICallError as e:
    print(f"The operation failed with error: {e}")

Setelah membuat konektor, Anda dapat mengedit, menghapus, menjeda, menghentikan, atau memulai ulang konektor.

Mengonfigurasi konektor

Bagian ini menjelaskan beberapa properti konfigurasi yang dapat Anda tetapkan di konektor.

Untuk mengetahui daftar lengkap properti yang khusus untuk konektor ini, lihat Konfigurasi konektor Sumber Pub/Sub.

Mode tarik

Mode pull menentukan cara konektor mengambil pesan Pub/Sub. Mode berikut didukung:

  • Mode tarik (default). Pesan diambil dalam batch. Untuk mengaktifkan mode ini, tetapkan cps.streamingPull.enabled=false. Untuk mengonfigurasi ukuran batch, tetapkan properti cps.maxBatchSize.

    Untuk mengetahui informasi selengkapnya tentang mode pull, lihat Pull API.

  • Mode Tarik Streaming. Memungkinkan throughput maksimum dan latensi terendah saat mengambil pesan dari Pub/Sub. Untuk mengaktifkan mode ini, tetapkan cps.streamingPull.enabled=true.

    Untuk mengetahui informasi selengkapnya tentang mode pull streaming, lihat StreamingPull API.

    Jika penarikan streaming diaktifkan, Anda dapat menyesuaikan performa dengan menyetel properti konfigurasi berikut:

    • cps.streamingPull.flowControlBytes: Jumlah maksimum byte pesan yang belum diproses per tugas.
    • cps.streamingPull.flowControlMessages: Jumlah maksimum pesan yang belum diproses per tugas.
    • cps.streamingPull.maxAckExtensionMs: Jumlah waktu maksimum konektor memperpanjang batas waktu langganan, dalam milidetik.
    • cps.streamingPull.maxMsPerAckExtension: Jumlah waktu maksimum yang digunakan konektor untuk memperpanjang batas waktu langganan per perpanjangan, dalam milidetik.
    • cps.streamingPull.parallelStreams: Jumlah aliran untuk menarik pesan dari langganan.

Endpoint Pub/Sub

Secara default, konektor menggunakan endpoint Pub/Sub global. Untuk menentukan endpoint, tetapkan properti cps.endpoint ke alamat endpoint. Untuk mengetahui informasi selengkapnya tentang endpoint, lihat Endpoint Pub/Sub.

Kumpulan data Kafka

Konektor Sumber Pub/Sub mengonversi pesan Pub/Sub menjadi rekaman Kafka. Bagian berikut menjelaskan proses konversi.

Kunci catatan

Pengonversi kunci harus berupa org.apache.kafka.connect.storage.StringConverter.

  • Secara default, kunci rekaman adalah null.

  • Untuk menggunakan atribut pesan Pub/Sub sebagai kunci, tetapkan kafka.key.attribute ke nama atribut. Contohnya, kafka.key.attribute=username.

  • Untuk menggunakan kunci pengurutan Pub/Sub sebagai kunci, tetapkan kafka.key.attribute=orderingKey.

Header rekaman

Secara default, header rekaman kosong.

Jika kafka.record.headers adalah true, atribut pesan Pub/Sub ditulis sebagai header rekaman. Untuk menyertakan kunci pengurutan, tetapkan cps.makeOrderingKeyAttribute=true.

Nilai catatan

Jika kafka.record.headers adalah true, atau pesan Pub/Sub tidak memiliki atribut kustom, nilai rekaman adalah data pesan, sebagai array byte. Tetapkan konverter nilai ke org.apache.kafka.connect.converters.ByteArrayConverter.

Jika tidak, jika kafka.record.headers adalah false dan pesan memiliki minimal satu atribut kustom, konektor akan menulis nilai rekaman sebagai struct. Tetapkan konverter nilai ke org.apache.kafka.connect.json.JsonConverter.

struct berisi kolom berikut:

  • message: Data pesan Pub/Sub, sebagai byte.

  • Kolom untuk setiap atribut pesan Pub/Sub. Untuk menyertakan kunci pengurutan, tetapkan cps.makeOrderingKeyAttribute=true.

Misalnya, dengan asumsi pesan memiliki atribut username:

{
  "message":"MESSAGE_DATA",
  "username":"Alice"
}

Jika value.converter.schemas.enable adalah true, struct mencakup payload dan skema:

{
  "schema":
    {
      "type":"struct",
      "fields": [
        {
          "type":"bytes",
          "optional":false,
          "field":"message"
        },
        {
          "type":"string",
          "optional":false,
          "field":"username"
        }
      ],
      "optional":false
    },
    "payload": {
      "message":"MESSAGE_DATA",
      "username":"Alice"
    }
}

Partisi Kafka

Secara default, konektor menulis ke satu partisi dalam topik. Untuk menentukan jumlah partisi yang ditulis konektor, tetapkan properti kafka.partition.count. Nilai tidak boleh melebihi jumlah partisi topik.

Untuk menentukan cara konektor menetapkan pesan ke partisi, tetapkan properti kafka.partition.scheme. Untuk mengetahui informasi selengkapnya, lihat Konfigurasi konektor Sumber Pub/Sub.

Apa langkah selanjutnya?

Apache Kafka® adalah merek dagang terdaftar milik The Apache Software Foundation atau afiliasinya di Amerika Serikat dan/atau negara lain.