Konektor Cloud Storage Sink memungkinkan Anda melakukan streaming data dari topik Kafka ke bucket Cloud Storage. Hal ini berguna untuk menyimpan dan memproses data dalam jumlah besar secara hemat biaya dan skalabel.
Sebelum memulai
Sebelum membuat konektor Sink Cloud Storage, pastikan Anda memiliki hal berikut:
Buat cluster Managed Service for Apache Kafka untuk cluster Connect Anda. Ini adalah cluster Kafka utama yang terkait dengan cluster Connect. Cluster ini juga merupakan cluster sumber yang membentuk salah satu ujung pipeline konektor.
Buat cluster Connect untuk menghosting konektor Cloud Storage Sink.
Buat bucket Cloud Storage untuk menyimpan data yang di-streaming dari Kafka.
Buat dan konfigurasi topik Kafka dalam cluster sumber. Data dipindahkan dari topik Kafka ini ke bucket Cloud Storage tujuan.
Peran dan izin yang diperlukan
Untuk mendapatkan izin yang
diperlukan guna membuat konektor Sink Cloud Storage,
minta administrator Anda untuk memberi Anda peran IAM
Managed Kafka Connector Editor (roles/managedkafka.connectorEditor)
di project Anda.
Untuk mengetahui informasi selengkapnya tentang cara memberikan peran, lihat Mengelola akses ke project, folder, dan organisasi.
Peran bawaan ini berisi izin yang diperlukan untuk membuat konektor Cloud Storage Sink. Untuk melihat izin yang benar-benar diperlukan, perluas bagian Izin yang diperlukan:
Izin yang diperlukan
Izin berikut diperlukan untuk membuat konektor Cloud Storage Sink:
-
Berikan izin pembuatan konektor di cluster Connect induk:
managedkafka.connectors.create
Anda mungkin juga bisa mendapatkan izin ini dengan peran khusus atau peran bawaan lainnya.
Untuk mengetahui informasi selengkapnya tentang peran Managed Kafka Connector Editor, lihat Peran standar Managed Service for Apache Kafka.
Jika cluster Managed Service for Apache Kafka Anda berada dalam project yang sama dengan cluster Connect, tidak diperlukan izin lebih lanjut. Jika cluster Connect berada di project lain, lihat Membuat Cluster Connect di project lain.
Memberi izin untuk menulis ke bucket Cloud Storage
Akun layanan cluster Connect, yang mengikuti format
service-<project_number>@gcp-sa-managedkafka.iam.gserviceaccount.com,
memerlukan izin Cloud Storage berikut:
storage.objects.createstorage.objects.delete
Untuk melakukannya, berikan peran Storage Object User (roles/storage.objectUser) ke akun layanan cluster Connect di project yang berisi bucket Cloud Storage.
Cara kerja konektor Sink Cloud Storage
Konektor Cloud Storage Sink menarik data dari satu atau beberapa topik Kafka, dan menulis data tersebut ke objek dalam satu bucket Cloud Storage.
Berikut uraian mendetail tentang cara konektor Cloud Storage Sink menyalin data:
Konektor menggunakan pesan dari satu atau beberapa topik Kafka dalam cluster sumber.
Konektor menulis data ke bucket Cloud Storage target yang Anda tentukan dalam konfigurasi konektor.
Konektor memformat data saat menulisnya ke bucket Cloud Storage dengan merujuk pada properti tertentu dalam konfigurasi konektor. Secara default, file output dalam format CSV. Anda dapat mengonfigurasi properti
format.output.typeuntuk menentukan format output yang berbeda, seperti JSON.Konektor juga memberi nama file yang ditulis ke bucket Cloud Storage. Anda dapat menyesuaikan nama file menggunakan properti
file.name.prefixdanfile.name.template. Misalnya, Anda dapat menyertakan nama topik Kafka atau kunci pesan dalam nama file.Kafka record memiliki tiga komponen: header, kunci, nilai.
Anda dapat menyertakan header dalam file output dengan menyetel
format.output.fieldsuntuk menyertakan header. Contoh,format.output.fields=value,headers.Anda dapat menyertakan kunci dalam file output dengan menyetel
format.output.fieldsuntuk menyertakankey. Contohnya,format.output.fields=key,value,headers.Kunci juga dapat digunakan untuk mengelompokkan data dengan menyertakan
keydalam propertifile.name.template.
Anda dapat menyertakan nilai dalam file output secara default karena
format.output.fieldssecara default adalahvalue.Konektor menulis data yang telah dikonversi dan diformat ke bucket Cloud Storage yang ditentukan.
Konektor mengompresi file yang disimpan di bucket Cloud Storage jika Anda mengonfigurasi kompresi file menggunakan properti
file.compression.type.Konfigurasi konverter dibatasi oleh properti
format.output.type.Misalnya, jika
format.output.typeditetapkan kecsv, pengonversi utama harus berupaorg.apache.kafka.connect.converters.ByteArrayConverteratauorg.apache.kafka.connect.storage.StringConverter, dan pengonversi nilai harus berupaorg.apache.kafka.connect.converters.ByteArrayConverter.Jika
format.output.typeditetapkan kejson, skema nilai dan kunci tidak ditulis bersama data dalam file output, meskipun propertivalue.converter.schemas.enablebernilai benar (true).
Properti
tasks.maxmengontrol tingkat paralelisme untuk konektor. Meningkatkantasks.maxdapat meningkatkan throughput, tetapi paralelisme aktual dibatasi oleh jumlah partisi dalam topik Kafka.
Properti konektor Sink Cloud Storage
Saat membuat konektor Sink Cloud Storage, tentukan properti berikut.
Nama konektor
Nama atau ID konektor. Untuk mengetahui panduan tentang cara memberi nama resource, lihat Panduan untuk memberi nama resource Managed Service untuk Apache Kafka. Nama tidak dapat diubah.
Jenis plugin konektor
Pilih Cloud Storage Sink sebagai jenis plugin konektor di Google Cloud konsol. Jika tidak menggunakan antarmuka pengguna untuk mengonfigurasi konektor, Anda juga harus menentukan class konektor.
Topik
Topik Kafka tempat konektor menggunakan pesan.
Anda dapat menentukan satu atau beberapa topik, atau menggunakan ekspresi reguler untuk mencocokkan beberapa topik. Misalnya, topic.* untuk mencocokkan semua topik yang dimulai dengan "topic". Topik ini harus ada dalam cluster Managed Service for Apache Kafka yang terkait dengan cluster Connect Anda.
Bucket Cloud Storage
Pilih atau buat bucket Cloud Storage tempat data disimpan.
Konfigurasi
Bagian ini memungkinkan Anda menentukan properti konfigurasi tambahan khusus konektor untuk konektor Sink Cloud Storage.
Karena data dalam topik Kafka dapat memiliki berbagai format seperti Avro, JSON, atau byte mentah, bagian penting dari konfigurasi melibatkan penentuan konverter. Konverter menerjemahkan data dari format yang digunakan di topik Kafka Anda ke format internal Kafka Connect yang standar. Konektor Cloud Storage Sink kemudian mengambil data internal ini dan mengubahnya menjadi format yang diperlukan oleh bucket Cloud Storage Anda sebelum menulisnya.
Untuk mengetahui informasi umum selengkapnya tentang peran konverter di Kafka Connect, jenis konverter yang didukung, dan opsi konfigurasi umum, lihat konverter.
Berikut beberapa konfigurasi khusus untuk konektor Sink Cloud Storage:
gcs.credentials.default: Apakah akan otomatis menemukan Google Cloud kredensial dari lingkungan eksekusi atau tidak. Harus ditetapkan ketrue.gcs.bucket.name: Menentukan nama bucket Cloud Storage tempat data ditulis. Harus ditetapkan.file.compression.type: Menetapkan jenis kompresi untuk file yang disimpan di bucket Cloud Storage. Contohnya adalahgzip,snappy,zstd, dannone. Nilai defaultnya adalahnone.file.name.prefix: Awalan yang akan ditambahkan ke nama setiap file yang disimpan di bucket Cloud Storage. Nilai defaultnya kosong.format.output.type: Jenis format data yang digunakan untuk menulis data ke file output Cloud Storage. Nilai yang didukung adalah:csv,json,jsonl, danparquet. Nilai defaultnya adalahcsv.
Untuk mengetahui daftar properti konfigurasi yang tersedia khusus untuk konektor ini, lihat Konfigurasi konektor Sink Cloud Storage.
Membuat konektor Sink Cloud Storage
Sebelum membuat konektor, tinjau dokumentasi untuk Properti konektor Sink Cloud Storage.
Konsol
Di konsol Google Cloud , buka halaman Connect Clusters.
Klik cluster Connect yang konektornya ingin Anda buat.
Halaman Connect cluster details akan ditampilkan.
Klik Create connector.
Halaman Create Kafka connector akan ditampilkan.
Untuk nama konektor, masukkan string.
Untuk mengetahui panduan tentang cara memberi nama konektor, lihat Panduan untuk memberi nama resource Managed Service untuk Apache Kafka.
Untuk Connector plugin, pilih Cloud Storage Sink.
Tentukan Topik yang datanya dapat Anda streaming.
Pilih Bucket Penyimpanan untuk menyimpan data.
(Opsional) Konfigurasi setelan tambahan di bagian Konfigurasi.
Pilih Kebijakan mulai ulang tugas. Untuk mengetahui informasi selengkapnya, lihat Kebijakan memulai ulang tugas.
Klik Create.
gcloud
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
Jalankan perintah
gcloud managed-kafka connectors create:gcloud managed-kafka connectors create CONNECTOR_ID \ --location=LOCATION \ --connect-cluster=CONNECT_CLUSTER_ID \ --config-file=CONFIG_FILEGanti kode berikut:
CONNECTOR_ID: ID atau nama konektor. Untuk mengetahui panduan tentang cara memberi nama konektor, lihat Panduan untuk memberi nama resource Managed Service untuk Apache Kafka. Nama konektor tidak dapat diubah.
LOCATION: Lokasi tempat Anda membuat konektor. Lokasi ini harus sama dengan lokasi tempat Anda membuat Connect cluster.
CONNECT_CLUSTER_ID: ID cluster Connect tempat konektor dibuat.
CONFIG_FILE: Jalur ke file konfigurasi YAML untuk konektor BigQuery Sink.
Berikut adalah contoh file konfigurasi untuk konektor Sink Cloud Storage:
connector.class: "io.aiven.kafka.connect.gcs.GcsSinkConnector" tasks.max: "1" topics: "GMK_TOPIC_ID" gcs.bucket.name: "GCS_BUCKET_NAME" gcs.credentials.default: "true" format.output.type: "json" name: "GCS_SINK_CONNECTOR_ID" value.converter: "org.apache.kafka.connect.json.JsonConverter" value.converter.schemas.enable: "false" key.converter: "org.apache.kafka.connect.storage.StringConverter"Ganti kode berikut:
GMK_TOPIC_ID: ID topik Managed Service for Apache Kafka yang menjadi sumber aliran data ke konektor Cloud Storage Sink.
GCS_BUCKET_NAME: Nama bucket Cloud Storage yang berfungsi sebagai sink untuk pipeline.
GCS_SINK_CONNECTOR_ID: ID atau nama konektor Sink Cloud Storage. Untuk mengetahui panduan tentang cara memberi nama konektor, lihat Panduan untuk memberi nama resource Managed Service untuk Apache Kafka. Nama konektor tidak dapat diubah.
Terraform
Anda dapat menggunakan resource Terraform untuk membuat konektor.
Untuk mempelajari cara menerapkan atau menghapus konfigurasi Terraform, lihat Perintah dasar Terraform.
Go
Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Go di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Managed Service for Apache Kafka Go API.
Untuk melakukan autentikasi ke Managed Service untuk Apache Kafka, siapkan Kredensial Default Aplikasi(ADC). Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.
Java
Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Java di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Managed Service for Apache Kafka Java API.
Untuk melakukan autentikasi ke Managed Service for Apache Kafka, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.
Python
Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Python di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Managed Service for Apache Kafka Python API.
Untuk melakukan autentikasi ke Managed Service for Apache Kafka, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.
Setelah membuat konektor, Anda dapat mengedit, menghapus, menjeda, menghentikan, atau memulai ulang konektor.