Konektor Sink BigQuery memungkinkan Anda melakukan streaming data dari Kafka ke BigQuery, sehingga memungkinkan penyerapan dan analisis data secara real-time dalam BigQuery. Konektor BigQuery Sink menggunakan kumpulan data dari satu atau beberapa topik Kafka, dan menulis data ke satu atau beberapa tabel dalam satu set data BigQuery.
Sebelum memulai
Sebelum membuat konektor BigQuery Sink, pastikan Anda memiliki hal berikut:
Buat cluster Managed Service for Apache Kafka untuk cluster Connect Anda. Cluster ini adalah cluster Kafka utama yang terkait dengan cluster Connect. Cluster ini juga merupakan cluster sumber yang membentuk salah satu ujung pipeline konektor BigQuery Sink.
Buat cluster Connect untuk menghosting konektor BigQuery Sink Anda.
Buat set data BigQuery untuk menyimpan data yang di-streaming dari Kafka.
Buat dan konfigurasi topik Kafka dalam cluster sumber. Data dipindahkan dari topik Kafka ini ke set data BigQuery tujuan.
Peran dan izin yang diperlukan
Untuk mendapatkan izin yang diperlukan untuk membuat konektor BigQuery Sink, minta administrator Anda untuk memberi Anda peran IAM Managed Kafka Connector Editor (roles/managedkafka.connectorEditor) di project Anda.
Untuk mengetahui informasi selengkapnya tentang cara memberikan peran, lihat Mengelola akses ke project, folder, dan organisasi.
Peran bawaan ini berisi izin yang diperlukan untuk membuat konektor Sink BigQuery. Untuk melihat izin yang benar-benar diperlukan, perluas bagian Izin yang diperlukan:
Izin yang diperlukan
Izin berikut diperlukan untuk membuat konektor Sink BigQuery:
-
Berikan izin pembuatan konektor di cluster Connect induk:
managedkafka.connectors.create
Anda mungkin juga bisa mendapatkan izin ini dengan peran khusus atau peran bawaan lainnya.
Untuk mengetahui informasi selengkapnya tentang peran Managed Kafka Connector Editor, lihat Peran standar Managed Service for Apache Kafka.
Jika cluster Managed Service for Apache Kafka Anda berada dalam project yang sama dengan cluster Connect, tidak diperlukan izin lebih lanjut. Jika cluster berada di project lain, lihat Membuat Cluster Connect di project lain.
Memberikan izin untuk menulis ke tabel BigQuery
Akun layanan cluster Connect, yang mengikuti format
service-<project_number>@gcp-sa-managedkafka.iam.gserviceaccount.com,
memerlukan izin untuk menulis ke tabel BigQuery. Untuk melakukannya,
berikan peran BigQuery Data Editor (roles/bigquery.dataEditor)
kepada akun layanan cluster Connect di project yang berisi
tabel BigQuery.
Skema untuk konektor Sink BigQuery
Konektor BigQuery Sink menggunakan konverter nilai yang dikonfigurasi (value.converter) untuk mengurai nilai rekaman Kafka menjadi kolom. Kemudian, kolom tersebut akan menulis kolom dengan nama yang sama ke tabel BigQuery.
Konektor memerlukan skema untuk beroperasi. Skema dapat diberikan dengan cara berikut:
- Skema berbasis pesan: Skema disertakan sebagai bagian dari setiap pesan.
- Skema berbasis tabel: Konektor menyimpulkan skema pesan dari skema tabel BigQuery.
- Schema registry: Konektor membaca skema dari schema registry, seperti Managed Service untuk Apache Kafka schema registry (Pratinjau).
Bagian berikutnya menjelaskan opsi ini.
Skema berbasis pesan
Dalam mode ini, setiap rekaman Kafka menyertakan skema JSON. Konektor menggunakan skema untuk menulis data rekaman sebagai baris tabel BigQuery.
Untuk menggunakan skema berbasis pesan, tetapkan properti berikut pada konektor:
value.converter=org.apache.kafka.connect.json.JsonConvertervalue.converter.schemas.enable=true
Contoh nilai rekaman Kafka:
{
"schema": {
"type": "struct",
"fields": [
{
"field": "user",
"type": "string",
"optional": false
},
{
"field": "age",
"type": "int64",
"optional": false
}
]
},
"payload": {
"user": "userId",
"age": 30
}
}
Jika tabel tujuan sudah ada, skema tabel BigQuery harus kompatibel dengan skema pesan sematan. Jika
autoCreateTables=true, konektor akan otomatis membuat tabel tujuan
jika diperlukan. Untuk mengetahui informasi selengkapnya, lihat Pembuatan tabel.
Jika Anda ingin konektor memperbarui skema tabel BigQuery saat skema pesan berubah, tetapkan allowNewBigQueryFields, allowSchemaUnionization, atau allowBigQueryRequiredFieldRelaxation ke true.
Skema berbasis tabel
Dalam mode ini, rekaman Kafka berisi data JSON biasa tanpa skema eksplisit. Konektor menyimpulkan skema dari tabel tujuan.
Persyaratan:
- Tabel BigQuery harus sudah ada.
- Data rekaman Kafka harus kompatibel dengan skema tabel.
- Mode ini tidak mendukung pembaruan skema dinamis berdasarkan pesan masuk.
Untuk menggunakan skema berbasis tabel, tetapkan properti berikut pada konektor:
value.converter=org.apache.kafka.connect.json.JsonConvertervalue.converter.schemas.enable=falsebigQueryPartitionDecorator=false
Jika tabel BigQuery menggunakan
partisi berbasis waktu dengan
partisi harian, bigQueryPartitionDecorator dapat berupa true. Jika tidak, tetapkan
properti ini ke false.
Contoh nilai rekaman Kafka:
{
"user": "userId",
"age": 30
}
Registry skema
Dalam mode ini, setiap rekaman Kafka berisi data Apache Avro, dan skema pesan disimpan dalam registry skema.
Untuk menggunakan konektor BigQuery Sink dengan registry skema, tetapkan properti berikut pada konektor:
value.converter=io.confluent.connect.avro.AvroConvertervalue.converter.schema.registry.url=SCHEMA_REGISTRY_URL
Ganti SCHEMA_REGISTRY_URL dengan URL schema registry.
Untuk menggunakan konektor dengan Managed Service for Apache Kafka schema registry, tetapkan properti berikut:
value.converter.bearer.auth.credentials.source=GCP
Untuk mengetahui informasi selengkapnya, lihat Menggunakan Kafka Connect dengan registry skema.
Tabel BigLake untuk Apache Iceberg di BigQuery
Konektor BigQuery Sink mendukung tabel BigLake untuk Apache Iceberg di BigQuery (selanjutnya, tabel BigLake Iceberg di BigQuery) sebagai target sink.
Tabel BigLake Iceberg di BigQuery memberikan fondasi untuk membangun lakehouse format terbuka di Google Cloud. Tabel Iceberg BigLake di BigQuery menawarkan pengalaman terkelola sepenuhnya yang sama seperti tabel BigQuery, tetapi menyimpan data di bucket penyimpanan milik pelanggan menggunakan Parquet agar dapat beroperasi dengan format tabel terbuka Apache Iceberg.
Untuk mengetahui informasi tentang cara membuat tabel Apache Iceberg, lihat Membuat tabel Apache Iceberg.
Membuat konektor Sink BigQuery
Konsol
Di konsol Google Cloud , buka halaman Connect Clusters.
Klik cluster Connect tempat Anda ingin membuat konektor.
Klik Create connector.
Untuk nama konektor, masukkan string.
Untuk mengetahui panduan tentang cara memberi nama konektor, lihat Panduan untuk memberi nama resource Managed Service untuk Apache Kafka.
Untuk Connector plugin, pilih BigQuery Sink.
Di bagian Topics, tentukan topik Kafka yang akan dibaca. Anda dapat menentukan daftar topik atau ekspresi reguler yang akan dicocokkan dengan nama topik.
Opsi 1: Pilih Pilih daftar topik Kafka. Dalam daftar Kafka topics, pilih satu atau beberapa topik. Klik OK.
Opsi 2: Pilih Gunakan regex topik. Di kolom Topic regex, masukkan regular expression.
Klik Set data dan tentukan set data BigQuery. Anda dapat memilih set data yang ada atau membuat set data baru.
Opsional: Di kotak Configurations, tambahkan properti konfigurasi atau edit properti default. Untuk mengetahui informasi selengkapnya, lihat Mengonfigurasi konektor.
Pilih Kebijakan mulai ulang tugas. Untuk mengetahui informasi selengkapnya, lihat Kebijakan memulai ulang tugas.
Klik Create.
gcloud
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
Jalankan perintah
gcloud managed-kafka connectors create:gcloud managed-kafka connectors create CONNECTOR_ID \ --location=LOCATION \ --connect-cluster=CONNECT_CLUSTER_ID \ --config-file=CONFIG_FILEGanti kode berikut:
CONNECTOR_ID: ID atau nama konektor. Untuk mengetahui panduan tentang cara memberi nama konektor, lihat Panduan untuk memberi nama resource Managed Service untuk Apache Kafka. Nama konektor tidak dapat diubah.
LOCATION: Lokasi tempat Anda membuat konektor. Lokasi ini harus sama dengan lokasi tempat Anda membuat Connect cluster.
CONNECT_CLUSTER_ID: ID cluster Connect tempat konektor dibuat.
CONFIG_FILE: Jalur ke file konfigurasi YAML untuk konektor BigQuery Sink.
Berikut adalah contoh file konfigurasi untuk konektor Sink BigQuery:
name: "BQ_SINK_CONNECTOR_ID" project: "GCP_PROJECT_ID" topics: "GMK_TOPIC_ID" tasks.max: 3 connector.class: "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector" key.converter: "org.apache.kafka.connect.storage.StringConverter" value.converter: "org.apache.kafka.connect.json.JsonConverter" value.converter.schemas.enable: "false" defaultDataset: "BQ_DATASET_ID"Ganti kode berikut:
BQ_SINK_CONNECTOR_ID: ID atau nama konektor BigQuery Sink. Untuk mengetahui panduan tentang cara memberi nama konektor, lihat Panduan untuk memberi nama resource Managed Service for Apache Kafka. Nama konektor tidak dapat diubah.
GCP_PROJECT_ID: ID project Google Cloud tempat set data BigQuery Anda berada.
GMK_TOPIC_ID: ID topik Managed Service for Apache Kafka yang menjadi sumber aliran data ke konektor BigQuery Sink.
BQ_DATASET_ID: ID set data BigQuery yang berfungsi sebagai sink untuk pipeline.
Terraform
Anda dapat menggunakan resource Terraform untuk membuat konektor.
Untuk mempelajari cara menerapkan atau menghapus konfigurasi Terraform, lihat Perintah dasar Terraform.
Go
Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Go di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Managed Service for Apache Kafka Go API.
Untuk melakukan autentikasi ke Managed Service untuk Apache Kafka, siapkan Kredensial Default Aplikasi(ADC). Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.
Java
Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Java di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Managed Service for Apache Kafka Java API.
Untuk melakukan autentikasi ke Managed Service for Apache Kafka, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.
Python
Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Python di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Managed Service for Apache Kafka Python API.
Untuk melakukan autentikasi ke Managed Service for Apache Kafka, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.
Setelah membuat konektor, Anda dapat mengedit, menghapus, menjeda, menghentikan, atau memulai ulang konektor.
Mengonfigurasi konektor
Bagian ini menjelaskan beberapa properti konfigurasi yang dapat Anda tetapkan di konektor. Untuk mengetahui daftar lengkap properti yang khusus untuk konektor ini, lihat Konfigurasi konektor Sink BigQuery.
Nama tabel
Secara default, konektor menggunakan nama topik sebagai nama tabel
BigQuery. Untuk menggunakan nama tabel yang berbeda, tetapkan properti topic2TableMap dengan format berikut:
topic2TableMap=TOPIC_1:TABLE_1,TOPIC_2:TABLE_2,...
Pembuatan tabel
Konektor BigQuery Sink dapat membuat tabel tujuan jika tabel tersebut tidak ada.
Jika
autoCreateTables=true, konektor akan mencoba membuat tabel BigQuery yang belum ada. Setelan ini adalah perilaku default.Jika
autoCreateTables=false, konektor tidak membuat tabel apa pun. Jika tabel tujuan tidak ada, akan terjadi error.
Jika autoCreateTables adalah true, Anda dapat menggunakan properti konfigurasi berikut untuk kontrol yang lebih terperinci tentang cara konektor membuat dan mengonfigurasi tabel baru:
allBQFieldsNullableclusteringPartitionFieldNamesconvertDoubleSpecialValuespartitionExpirationMssanitizeFieldNamessanitizeTopicstimestampPartitionFieldName
Untuk mengetahui informasi tentang properti ini, lihat Konfigurasi konektor Sink BigQuery.
Metadata Kafka
Anda dapat memetakan data tambahan dari Kafka seperti informasi metadata dan informasi utama ke dalam tabel BigQuery dengan mengonfigurasi kolom kafkaDataFieldName dan kafkaKeyFieldName masing-masing. Contoh informasi metadata mencakup topik, partisi, offset, dan waktu penyisipan Kafka.