Membuat konektor Cloud Storage Sink

Konektor Cloud Storage Sink memungkinkan Anda melakukan streaming data dari topik Kafka ke bucket Cloud Storage. Hal ini berguna untuk menyimpan dan memproses data dalam jumlah besar secara hemat biaya dan skalabel.

Sebelum memulai

Sebelum membuat konektor Sink Cloud Storage, pastikan Anda memiliki hal berikut:

Peran dan izin yang diperlukan

Untuk mendapatkan izin yang diperlukan guna membuat konektor Sink Cloud Storage, minta administrator Anda untuk memberi Anda peran IAM Managed Kafka Connector Editor (roles/managedkafka.connectorEditor) di project Anda. Untuk mengetahui informasi selengkapnya tentang cara memberikan peran, lihat Mengelola akses ke project, folder, dan organisasi.

Peran bawaan ini berisi izin yang diperlukan untuk membuat konektor Cloud Storage Sink. Untuk melihat izin yang benar-benar diperlukan, perluas bagian Izin yang diperlukan:

Izin yang diperlukan

Izin berikut diperlukan untuk membuat konektor Cloud Storage Sink:

  • Berikan izin pembuatan konektor di cluster Connect induk: managedkafka.connectors.create

Anda mungkin juga bisa mendapatkan izin ini dengan peran khusus atau peran bawaan lainnya.

Untuk mengetahui informasi selengkapnya tentang peran Managed Kafka Connector Editor, lihat Peran standar Managed Service for Apache Kafka.

Jika cluster Managed Service for Apache Kafka Anda berada dalam project yang sama dengan cluster Connect, tidak diperlukan izin lebih lanjut. Jika cluster Connect berada di project lain, lihat Membuat Cluster Connect di project lain.

Memberi izin untuk menulis ke bucket Cloud Storage

Akun layanan cluster Connect, yang mengikuti format service-<project_number>@gcp-sa-managedkafka.iam.gserviceaccount.com, memerlukan izin Cloud Storage berikut:

  • storage.objects.create
  • storage.objects.delete

Untuk melakukannya, berikan peran Storage Object User (roles/storage.objectUser) ke akun layanan cluster Connect di project yang berisi bucket Cloud Storage.

Cara kerja konektor Sink Cloud Storage

Konektor Cloud Storage Sink menarik data dari satu atau beberapa topik Kafka, dan menulis data tersebut ke objek dalam satu bucket Cloud Storage.

Berikut uraian mendetail tentang cara konektor Cloud Storage Sink menyalin data:

  • Konektor menggunakan pesan dari satu atau beberapa topik Kafka dalam cluster sumber.

  • Konektor menulis data ke bucket Cloud Storage target yang Anda tentukan dalam konfigurasi konektor.

  • Konektor memformat data saat menulisnya ke bucket Cloud Storage dengan merujuk pada properti tertentu dalam konfigurasi konektor. Secara default, file output dalam format CSV. Anda dapat mengonfigurasi properti format.output.type untuk menentukan format output yang berbeda, seperti JSON.

  • Konektor juga memberi nama file yang ditulis ke bucket Cloud Storage. Anda dapat menyesuaikan nama file menggunakan properti file.name.prefix dan file.name.template. Misalnya, Anda dapat menyertakan nama topik Kafka atau kunci pesan dalam nama file.

  • Kafka record memiliki tiga komponen: header, kunci, nilai.

    • Anda dapat menyertakan header dalam file output dengan menyetel format.output.fields untuk menyertakan header. Contoh, format.output.fields=value,headers.

    • Anda dapat menyertakan kunci dalam file output dengan menyetel format.output.fields untuk menyertakan key. Contohnya, format.output.fields=key,value,headers.

      Kunci juga dapat digunakan untuk mengelompokkan data dengan menyertakan key dalam properti file.name.template.

  • Anda dapat menyertakan nilai dalam file output secara default karena format.output.fields secara default adalah value.

  • Konektor menulis data yang telah dikonversi dan diformat ke bucket Cloud Storage yang ditentukan.

  • Konektor mengompresi file yang disimpan di bucket Cloud Storage jika Anda mengonfigurasi kompresi file menggunakan properti file.compression.type.

  • Konfigurasi konverter dibatasi oleh properti format.output.type.

    • Misalnya, jika format.output.type ditetapkan ke csv, pengonversi utama harus berupa org.apache.kafka.connect.converters.ByteArrayConverter atau org.apache.kafka.connect.storage.StringConverter, dan pengonversi nilai harus berupa org.apache.kafka.connect.converters.ByteArrayConverter.

    • Jika format.output.type ditetapkan ke json, skema nilai dan kunci tidak ditulis bersama data dalam file output, meskipun properti value.converter.schemas.enable bernilai benar (true).

  • Properti tasks.max mengontrol tingkat paralelisme untuk konektor. Meningkatkan tasks.max dapat meningkatkan throughput, tetapi paralelisme aktual dibatasi oleh jumlah partisi dalam topik Kafka.

Properti konektor Sink Cloud Storage

Saat membuat konektor Sink Cloud Storage, tentukan properti berikut.

Nama konektor

Nama atau ID konektor. Untuk mengetahui panduan tentang cara memberi nama resource, lihat Panduan untuk memberi nama resource Managed Service untuk Apache Kafka. Nama tidak dapat diubah.

Jenis plugin konektor

Pilih Cloud Storage Sink sebagai jenis plugin konektor di Google Cloud konsol. Jika tidak menggunakan antarmuka pengguna untuk mengonfigurasi konektor, Anda juga harus menentukan class konektor.

Topik

Topik Kafka tempat konektor menggunakan pesan. Anda dapat menentukan satu atau beberapa topik, atau menggunakan ekspresi reguler untuk mencocokkan beberapa topik. Misalnya, topic.* untuk mencocokkan semua topik yang dimulai dengan "topic". Topik ini harus ada dalam cluster Managed Service for Apache Kafka yang terkait dengan cluster Connect Anda.

Bucket Cloud Storage

Pilih atau buat bucket Cloud Storage tempat data disimpan.

Konfigurasi

Bagian ini memungkinkan Anda menentukan properti konfigurasi tambahan khusus konektor untuk konektor Sink Cloud Storage.

Karena data dalam topik Kafka dapat memiliki berbagai format seperti Avro, JSON, atau byte mentah, bagian penting dari konfigurasi melibatkan penentuan konverter. Konverter menerjemahkan data dari format yang digunakan di topik Kafka Anda ke format internal Kafka Connect yang standar. Konektor Cloud Storage Sink kemudian mengambil data internal ini dan mengubahnya menjadi format yang diperlukan oleh bucket Cloud Storage Anda sebelum menulisnya.

Untuk mengetahui informasi umum selengkapnya tentang peran konverter di Kafka Connect, jenis konverter yang didukung, dan opsi konfigurasi umum, lihat konverter.

Berikut beberapa konfigurasi khusus untuk konektor Sink Cloud Storage:

  • gcs.credentials.default: Apakah akan otomatis menemukan Google Cloud kredensial dari lingkungan eksekusi atau tidak. Harus ditetapkan ke true.

  • gcs.bucket.name: Menentukan nama bucket Cloud Storage tempat data ditulis. Harus ditetapkan.

  • file.compression.type: Menetapkan jenis kompresi untuk file yang disimpan di bucket Cloud Storage. Contohnya adalah gzip, snappy, zstd, dan none. Nilai defaultnya adalah none.

  • file.name.prefix: Awalan yang akan ditambahkan ke nama setiap file yang disimpan di bucket Cloud Storage. Nilai defaultnya kosong.

  • format.output.type: Jenis format data yang digunakan untuk menulis data ke file output Cloud Storage. Nilai yang didukung adalah: csv, json, jsonl, dan parquet. Nilai defaultnya adalah csv.

Untuk mengetahui daftar properti konfigurasi yang tersedia khusus untuk konektor ini, lihat Konfigurasi konektor Sink Cloud Storage.

Membuat konektor Sink Cloud Storage

Sebelum membuat konektor, tinjau dokumentasi untuk Properti konektor Sink Cloud Storage.

Konsol

  1. Di konsol Google Cloud , buka halaman Connect Clusters.

    Buka Connect Clusters

  2. Klik cluster Connect yang konektornya ingin Anda buat.

    Halaman Connect cluster details akan ditampilkan.

  3. Klik Create connector.

    Halaman Create Kafka connector akan ditampilkan.

  4. Untuk nama konektor, masukkan string.

    Untuk mengetahui panduan tentang cara memberi nama konektor, lihat Panduan untuk memberi nama resource Managed Service untuk Apache Kafka.

  5. Untuk Connector plugin, pilih Cloud Storage Sink.

  6. Tentukan Topik yang datanya dapat Anda streaming.

  7. Pilih Bucket Penyimpanan untuk menyimpan data.

  8. (Opsional) Konfigurasi setelan tambahan di bagian Konfigurasi.

  9. Pilih Kebijakan mulai ulang tugas. Untuk mengetahui informasi selengkapnya, lihat Kebijakan memulai ulang tugas.

  10. Klik Create.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Jalankan perintah gcloud managed-kafka connectors create:

    gcloud managed-kafka connectors create CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID \
        --config-file=CONFIG_FILE
    

    Ganti kode berikut:

    • CONNECTOR_ID: ID atau nama konektor. Untuk mengetahui panduan tentang cara memberi nama konektor, lihat Panduan untuk memberi nama resource Managed Service untuk Apache Kafka. Nama konektor tidak dapat diubah.

    • LOCATION: Lokasi tempat Anda membuat konektor. Lokasi ini harus sama dengan lokasi tempat Anda membuat Connect cluster.

    • CONNECT_CLUSTER_ID: ID cluster Connect tempat konektor dibuat.

    • CONFIG_FILE: Jalur ke file konfigurasi YAML untuk konektor BigQuery Sink.

    Berikut adalah contoh file konfigurasi untuk konektor Sink Cloud Storage:

    connector.class: "io.aiven.kafka.connect.gcs.GcsSinkConnector"
    tasks.max: "1"
    topics: "GMK_TOPIC_ID"
    gcs.bucket.name: "GCS_BUCKET_NAME"
    gcs.credentials.default: "true"
    format.output.type: "json"
    name: "GCS_SINK_CONNECTOR_ID"
    value.converter: "org.apache.kafka.connect.json.JsonConverter"
    value.converter.schemas.enable: "false"
    key.converter: "org.apache.kafka.connect.storage.StringConverter"
    

    Ganti kode berikut:

    • GMK_TOPIC_ID: ID topik Managed Service for Apache Kafka yang menjadi sumber aliran data ke konektor Cloud Storage Sink.

    • GCS_BUCKET_NAME: Nama bucket Cloud Storage yang berfungsi sebagai sink untuk pipeline.

    • GCS_SINK_CONNECTOR_ID: ID atau nama konektor Sink Cloud Storage. Untuk mengetahui panduan tentang cara memberi nama konektor, lihat Panduan untuk memberi nama resource Managed Service untuk Apache Kafka. Nama konektor tidak dapat diubah.

  3. Terraform

    Anda dapat menggunakan resource Terraform untuk membuat konektor.

    resource "google_managed_kafka_connector" "example-cloud-storage-sink-connector" {
      project         = data.google_project.default.project_id
      connector_id    = "my-gcs-sink-connector"
      connect_cluster = google_managed_kafka_connect_cluster.default.connect_cluster_id
      location        = "us-central1"
    
      configs = {
        "connector.class"                = "io.aiven.kafka.connect.gcs.GcsSinkConnector"
        "tasks.max"                      = "3"
        "topics"                         = "GMK_TOPIC_ID"
        "gcs.bucket.name"                = "GCS_BUCKET_NAME"
        "gcs.credentials.default"        = "true"
        "format.output.type"             = "json"
        "name"                           = "my-gcs-sink-connector"
        "value.converter"                = "org.apache.kafka.connect.json.JsonConverter"
        "value.converter.schemas.enable" = "false"
        "key.converter"                  = "org.apache.kafka.connect.storage.StringConverter"
      }
      provider = google-beta
    }

    Untuk mempelajari cara menerapkan atau menghapus konfigurasi Terraform, lihat Perintah dasar Terraform.

    Go

    Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Go di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Managed Service for Apache Kafka Go API.

    Untuk melakukan autentikasi ke Managed Service untuk Apache Kafka, siapkan Kredensial Default Aplikasi(ADC). Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.

    import (
    	"context"
    	"fmt"
    	"io"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    )
    
    // createCloudStorageSinkConnector creates a Cloud Storage Sink connector.
    func createCloudStorageSinkConnector(w io.Writer, projectID, region, connectClusterID, connectorID, topics, gcsBucketName, tasksMax, formatOutputType, valueConverter, valueConverterSchemasEnable, keyConverter, gcsCredentialsDefault string, opts ...option.ClientOption) error {
    	// TODO(developer): Update with your config values. Here is a sample configuration:
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// connectClusterID := "my-connect-cluster"
    	// connectorID := "GCS_SINK_CONNECTOR_ID"
    	// topics := "GMK_TOPIC_ID"
    	// gcsBucketName := "GCS_BUCKET_NAME"
    	// tasksMax := "3"
    	// formatOutputType := "json"
    	// valueConverter := "org.apache.kafka.connect.json.JsonConverter"
    	// valueConverterSchemasEnable := "false"
    	// keyConverter := "org.apache.kafka.connect.storage.StringConverter"
    	// gcsCredentialsDefault := "true"
    	ctx := context.Background()
    	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
    	}
    	defer client.Close()
    
    	parent := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s", projectID, region, connectClusterID)
    
    	config := map[string]string{
    		"connector.class":                "io.aiven.kafka.connect.gcs.GcsSinkConnector",
    		"tasks.max":                      tasksMax,
    		"topics":                         topics,
    		"gcs.bucket.name":                gcsBucketName,
    		"gcs.credentials.default":        gcsCredentialsDefault,
    		"format.output.type":             formatOutputType,
    		"name":                           connectorID,
    		"value.converter":                valueConverter,
    		"value.converter.schemas.enable": valueConverterSchemasEnable,
    		"key.converter":                  keyConverter,
    	}
    
    	connector := &managedkafkapb.Connector{
    		Name:    fmt.Sprintf("%s/connectors/%s", parent, connectorID),
    		Configs: config,
    	}
    
    	req := &managedkafkapb.CreateConnectorRequest{
    		Parent:      parent,
    		ConnectorId: connectorID,
    		Connector:   connector,
    	}
    
    	resp, err := client.CreateConnector(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.CreateConnector got err: %w", err)
    	}
    	fmt.Fprintf(w, "Created Cloud Storage sink connector: %s\n", resp.Name)
    	return nil
    }
    

    Java

    Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Java di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Managed Service for Apache Kafka Java API.

    Untuk melakukan autentikasi ke Managed Service for Apache Kafka, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.

    
    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ConnectClusterName;
    import com.google.cloud.managedkafka.v1.Connector;
    import com.google.cloud.managedkafka.v1.ConnectorName;
    import com.google.cloud.managedkafka.v1.CreateConnectorRequest;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    public class CreateCloudStorageSinkConnector {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String connectClusterId = "my-connect-cluster";
        String connectorId = "my-gcs-sink-connector";
        String bucketName = "my-gcs-bucket";
        String kafkaTopicName = "kafka-topic";
        String connectorClass = "io.aiven.kafka.connect.gcs.GcsSinkConnector";
        String maxTasks = "3";
        String gcsCredentialsDefault = "true";
        String formatOutputType = "json";
        String valueConverter = "org.apache.kafka.connect.json.JsonConverter";
        String valueSchemasEnable = "false";
        String keyConverter = "org.apache.kafka.connect.storage.StringConverter";
        createCloudStorageSinkConnector(
            projectId,
            region,
            connectClusterId,
            connectorId,
            bucketName,
            kafkaTopicName,
            connectorClass,
            maxTasks,
            gcsCredentialsDefault,
            formatOutputType,
            valueConverter,
            valueSchemasEnable,
            keyConverter);
      }
    
      public static void createCloudStorageSinkConnector(
          String projectId,
          String region,
          String connectClusterId,
          String connectorId,
          String bucketName,
          String kafkaTopicName,
          String connectorClass,
          String maxTasks,
          String gcsCredentialsDefault,
          String formatOutputType,
          String valueConverter,
          String valueSchemasEnable,
          String keyConverter)
          throws Exception {
    
        // Build the connector configuration
        Map<String, String> configMap = new HashMap<>();
        configMap.put("connector.class", connectorClass);
        configMap.put("tasks.max", maxTasks);
        configMap.put("topics", kafkaTopicName);
        configMap.put("gcs.bucket.name", bucketName);
        configMap.put("gcs.credentials.default", gcsCredentialsDefault);
        configMap.put("format.output.type", formatOutputType);
        configMap.put("name", connectorId);
        configMap.put("value.converter", valueConverter);
        configMap.put("value.converter.schemas.enable", valueSchemasEnable);
        configMap.put("key.converter", keyConverter);
    
        Connector connector = Connector.newBuilder()
            .setName(
                ConnectorName.of(projectId, region, connectClusterId, connectorId).toString())
            .putAllConfigs(configMap)
            .build();
    
        try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
          CreateConnectorRequest request = CreateConnectorRequest.newBuilder()
              .setParent(ConnectClusterName.of(projectId, region, connectClusterId).toString())
              .setConnectorId(connectorId)
              .setConnector(connector)
              .build();
    
          // This operation is being handled synchronously.
          Connector response = managedKafkaConnectClient.createConnector(request);
          System.out.printf("Created Cloud Storage Sink connector: %s\n", response.getName());
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaConnectClient.createConnector got err: %s\n", e.getMessage());
        }
      }
    }
    

    Python

    Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Python di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Managed Service for Apache Kafka Python API.

    Untuk melakukan autentikasi ke Managed Service for Apache Kafka, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
        ManagedKafkaConnectClient,
    )
    from google.cloud.managedkafka_v1.types import Connector, CreateConnectorRequest
    
    connect_client = ManagedKafkaConnectClient()
    parent = connect_client.connect_cluster_path(project_id, region, connect_cluster_id)
    
    configs = {
        "connector.class": "io.aiven.kafka.connect.gcs.GcsSinkConnector",
        "tasks.max": tasks_max,
        "topics": topics,
        "gcs.bucket.name": gcs_bucket_name,
        "gcs.credentials.default": "true",
        "format.output.type": format_output_type,
        "name": connector_id,
        "value.converter": value_converter,
        "value.converter.schemas.enable": value_converter_schemas_enable,
        "key.converter": key_converter,
    }
    
    connector = Connector()
    connector.name = connector_id
    connector.configs = configs
    
    request = CreateConnectorRequest(
        parent=parent,
        connector_id=connector_id,
        connector=connector,
    )
    
    try:
        operation = connect_client.create_connector(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        response = operation.result()
        print("Created Connector:", response)
    except GoogleAPICallError as e:
        print(f"The operation failed with error: {e}")

Setelah membuat konektor, Anda dapat mengedit, menghapus, menjeda, menghentikan, atau memulai ulang konektor.

Apa langkah selanjutnya?

Apache Kafka® adalah merek dagang terdaftar milik The Apache Software Foundation atau afiliasinya di Amerika Serikat dan/atau negara lain.