Template Apache Kafka ke BigQuery adalah pipeline streaming yang menyerap data teks dari cluster Google Cloud Managed Service for Apache Kafka, lalu menampilkan data yang dihasilkan ke tabel BigQuery. Setiap error yang terjadi saat menyisipkan data ke dalam tabel output akan disisipkan ke dalam tabel error terpisah di BigQuery.
Anda juga dapat menggunakan template Apache Kafka ke BigQuery dengan Kafka yang dikelola sendiri atau eksternal.
Persyaratan pipeline
- Server broker Apache Kafka harus berjalan dan dapat dijangkau dari mesin pekerja Dataflow.
- Topik Apache Kafka harus ada.
- Anda harus mengaktifkan Dataflow, BigQuery, dan Cloud Storage API. Jika autentikasi diperlukan, Anda juga harus mengaktifkan Secret Manager API.
- Buat set data dan tabel BigQuery dengan skema yang sesuai untuk topik input Kafka Anda. Jika Anda menggunakan beberapa skema dalam topik yang sama dan ingin menulis ke beberapa tabel, Anda tidak perlu membuat tabel sebelum mengonfigurasi pipeline.
- Jika antrean pesan yang tidak diproses (pesan yang tidak diproses) untuk template diaktifkan, buat tabel kosong yang tidak memiliki skema untuk antrean pesan yang tidak diproses.
- Jika Anda terhubung ke cluster Managed Service for Apache Kafka, pipeline juga harus memenuhi persyaratan yang tercantum di Menggunakan Dataflow dengan Managed Service for Apache Kafka.
Format pesan Kafka
Template ini mendukung pembacaan pesan dari Kafka dalam format berikut:
Format JSON
Untuk membaca pesan JSON, setel parameter template messageFormat ke
"JSON".
Encoding biner Avro
Untuk membaca pesan Avro biner, tetapkan parameter template berikut:
messageFormat:"AVRO_BINARY_ENCODING".binaryAvroSchemaPath: Lokasi file skema Avro di Cloud Storage. Contoh:gs://BUCKET_NAME/message-schema.avsc.
Untuk mengetahui informasi selengkapnya tentang format biner Avro, lihat Encoding biner di dokumentasi Apache Avro.
Avro yang dienkode Confluent Schema Registry
Untuk membaca pesan dalam Avro yang dienkode Confluent Schema Registry, tetapkan parameter template berikut:
messageFormat:"AVRO_CONFLUENT_WIRE_FORMAT".schemaFormat: Salah satu nilai berikut:"SINGLE_SCHEMA_FILE": Skema pesan ditentukan dalam file skema Avro. Tentukan lokasi Cloud Storage file skema dalam parameterconfluentAvroSchemaPath.-
"SCHEMA_REGISTRY": Pesan dienkode menggunakan Confluent Schema Registry. Tentukan URL instance Confluent Schema Registry di parameterschemaRegistryConnectionUrl, dan tentukan mode autentikasi di parameterschemaRegistryAuthenticationMode.
Untuk mengetahui informasi selengkapnya tentang format ini, lihat Format kabel dalam dokumentasi Confluent.
Autentikasi
Template Apache Kafka ke BigQuery mendukung autentikasi SASL/PLAIN ke broker Kafka.
Parameter template
Parameter yang diperlukan
- readBootstrapServerAndTopic: Topik Kafka untuk membaca input.
- writeMode: Menulis kumpulan data ke satu tabel atau beberapa tabel (berdasarkan skema). Mode
DYNAMIC_TABLE_NAMEShanya didukung untukAVRO_CONFLUENT_WIRE_FORMATFormat Pesan Sumber danSCHEMA_REGISTRYSumber Skema. Nama tabel target dibuat secara otomatis berdasarkan nama skema Avro setiap pesan, yang bisa berupa satu skema (membuat satu tabel) atau beberapa skema (membuat beberapa tabel). ModeSINGLE_TABLE_NAMEmenulis ke satu tabel (skema tunggal) yang ditentukan oleh pengguna. Nilai defaultnya adalahSINGLE_TABLE_NAME. - kafkaReadAuthenticationMode: Mode autentikasi yang akan digunakan dengan cluster Kafka. Gunakan
KafkaAuthenticationMethod.NONEuntuk tanpa autentikasi,KafkaAuthenticationMethod.SASL_PLAINuntuk nama pengguna dan sandi SASL/PLAIN,KafkaAuthenticationMethod.SASL_SCRAM_512untuk autentikasi SASL_SCRAM_512, danKafkaAuthenticationMethod.TLSuntuk autentikasi berbasis sertifikat.KafkaAuthenticationMethod.APPLICATION_DEFAULT_CREDENTIALShanya boleh digunakan untuk cluster Google Cloud Apache Kafka untuk BigQuery, yang memungkinkan autentikasi menggunakan kredensial default aplikasi. - messageFormat: Format pesan Kafka yang akan dibaca. Nilai yang didukung adalah
AVRO_CONFLUENT_WIRE_FORMAT(Avro yang dienkode Confluent Schema Registry),AVRO_BINARY_ENCODING(Avro biner biasa), danJSON. Nilai defaultnya: AVRO_CONFLUENT_WIRE_FORMAT. - useBigQueryDLQ: Jika benar (true), pesan yang gagal akan ditulis ke BigQuery dengan informasi error tambahan. Nilai defaultnya: salah.
Parameter opsional
- outputTableSpec: Lokasi tabel BigQuery yang menjadi tujuan penulisan output. Nama harus dalam format
<project>:<dataset>.<table_name>. Skema tabel harus cocok dengan objek input. - persistKafkaKey: Jika benar (true), pipeline akan menyimpan kunci pesan Kafka dalam tabel BigQuery, di kolom
_keyberjenisBYTES. Default-nya adalahfalse(Kunci diabaikan). - outputProject: Project output BigQuery tempat set data berada. Tabel akan dibuat secara dinamis dalam set data. Nilai defaultnya adalah kosong.
- outputDataset: Set data output BigQuery yang menjadi tujuan penulisan output. Tabel akan dibuat secara dinamis dalam set data. Jika tabel dibuat sebelumnya, nama tabel harus mengikuti konvensi penamaan yang ditentukan. Nama harus
bqTableNamePrefix + Avro Schema FullName, setiap kata akan dipisahkan dengan tanda hubung-. Nilai defaultnya adalah kosong. - bqTableNamePrefix: Awalan penamaan yang akan digunakan saat membuat tabel output BigQuery. Hanya berlaku saat menggunakan registry skema. Nilai defaultnya adalah kosong.
- createDisposition: BigQuery CreateDisposition. Misalnya:
CREATE_IF_NEEDED,CREATE_NEVER. Nilai defaultnya: CREATE_IF_NEEDED. - writeDisposition: BigQuery WriteDisposition. Misalnya:
WRITE_APPEND,WRITE_EMPTY, atauWRITE_TRUNCATE. Nilai defaultnya: WRITE_APPEND. - useAutoSharding: Jika benar (true), pipeline akan menggunakan pembuatan shard otomatis saat menulis ke BigQuery. Nilai defaultnya adalah
true. - numStorageWriteApiStreams: Menentukan jumlah streaming penulisan, parameter ini harus ditetapkan. Default-nya adalah
0. - storageWriteApiTriggeringFrequencySec: Menentukan frekuensi pemicuan dalam detik, parameter ini harus ditetapkan. Defaultnya adalah 5 detik.
- useStorageWriteApiAtLeastOnce: Parameter ini hanya berlaku jika "Gunakan BigQuery Storage Write API" diaktifkan. Jika diaktifkan, semantik setidaknya satu kali akan digunakan untuk Storage Write API, jika tidak, semantik tepat satu kali akan digunakan. Nilai defaultnya: salah.
- enableCommitOffsets: Commit offset pesan yang diproses ke Kafka. Jika diaktifkan, fitur ini akan meminimalkan kehilangan atau pemrosesan duplikat pesan saat memulai ulang pipeline. Memerlukan penentuan ID Grup Konsumen. Nilai defaultnya: salah.
- consumerGroupId: ID unik untuk grup konsumen tempat pipeline ini berada. Wajib jika Commit Offset ke Kafka diaktifkan. Nilai defaultnya adalah kosong.
- kafkaReadOffset: Titik awal untuk membaca pesan saat tidak ada offset yang di-commit. Yang paling awal dimulai dari awal, yang terbaru dimulai dari pesan terbaru. Nilai defaultnya: terbaru.
- kafkaReadUsernameSecretId: ID secret dari Secret Manager Google Cloud yang berisi nama pengguna Kafka untuk digunakan dengan autentikasi
SASL_PLAIN. Contoh,projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. Nilai defaultnya adalah kosong. - kafkaReadPasswordSecretId: ID secret dari Secret Manager Google Cloud yang berisi sandi Kafka untuk digunakan dengan autentikasi
SASL_PLAIN. Contoh,projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. Nilai defaultnya adalah kosong. - kafkaReadKeystoreLocation: Jalur Google Cloud Storage ke file Java KeyStore (JKS) yang berisi sertifikat TLS dan kunci pribadi yang akan digunakan saat mengautentikasi dengan cluster Kafka. Contoh,
gs://your-bucket/keystore.jks. - kafkaReadTruststoreLocation: Jalur Google Cloud Storage ke file Java TrustStore (JKS) yang berisi sertifikat tepercaya untuk digunakan dalam memverifikasi identitas broker Kafka.
- kafkaReadTruststorePasswordSecretId: ID secret dari Secret Manager Google Cloud yang berisi sandi yang akan digunakan untuk mengakses file Java TrustStore (JKS) untuk autentikasi TLS Kafka. Contoh,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. - kafkaReadKeystorePasswordSecretId: ID secret dari Secret Manager Google Cloud yang berisi sandi yang akan digunakan untuk mengakses file Java KeyStore (JKS) untuk autentikasi TLS Kafka. Contoh,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. - kafkaReadKeyPasswordSecretId: ID secret dari Secret Manager Google Cloud yang berisi sandi yang akan digunakan untuk mengakses kunci pribadi dalam file Java KeyStore (JKS) untuk autentikasi TLS Kafka. Contoh,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. - kafkaReadSaslScramUsernameSecretId: ID secret dari Secret Manager Google Cloud yang berisi nama pengguna Kafka untuk digunakan dengan autentikasi
SASL_SCRAM. Contoh,projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. - kafkaReadSaslScramPasswordSecretId: ID secret dari Secret Manager Google Cloud yang berisi sandi Kafka untuk digunakan dengan autentikasi
SASL_SCRAM. Contoh,projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. - kafkaReadSaslScramTruststoreLocation: Jalur Google Cloud Storage ke file Java TrustStore (JKS) yang berisi sertifikat tepercaya yang akan digunakan untuk memverifikasi identitas broker Kafka.
- kafkaReadSaslScramTruststorePasswordSecretId: ID secret dari Secret Manager Google Cloud yang berisi sandi yang akan digunakan untuk mengakses file Java TrustStore (JKS) untuk autentikasi SASL_SCRAM Kafka. Contoh,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. - schemaFormat: Format skema Kafka. Dapat diberikan sebagai
SINGLE_SCHEMA_FILEatauSCHEMA_REGISTRY. JikaSINGLE_SCHEMA_FILEditentukan, gunakan skema yang disebutkan dalam file skema avro untuk semua pesan. JikaSCHEMA_REGISTRYditentukan, pesan dapat memiliki satu skema atau beberapa skema. Nilai defaultnya: SINGLE_SCHEMA_FILE. - confluentAvroSchemaPath: Jalur Google Cloud Storage ke satu file skema Avro yang digunakan untuk mendekode semua pesan dalam topik. Nilai defaultnya adalah kosong.
- schemaRegistryConnectionUrl: URL untuk instance Confluent Schema Registry yang digunakan untuk mengelola skema Avro untuk dekode pesan. Nilai defaultnya adalah kosong.
- binaryAvroSchemaPath: Jalur Google Cloud Storage ke file skema Avro yang digunakan untuk mendekode pesan Avro yang dienkode biner. Nilai defaultnya adalah kosong.
- schemaRegistryAuthenticationMode: Mode autentikasi Schema Registry. Dapat berupa NONE, TLS, atau OAUTH. Nilai defaultnya: NONE.
- schemaRegistryTruststoreLocation: Lokasi sertifikat SSL tempat trust store untuk autentikasi ke Schema Registry disimpan. Contoh,
/your-bucket/truststore.jks. - schemaRegistryTruststorePasswordSecretId: SecretId di Secret Manager tempat sandi untuk mengakses secret di truststore disimpan. Contoh,
projects/your-project-number/secrets/your-secret-name/versions/your-secret-version. - schemaRegistryKeystoreLocation: Lokasi keystore yang berisi sertifikat SSL dan kunci pribadi. Contoh,
/your-bucket/keystore.jks. - schemaRegistryKeystorePasswordSecretId: SecretId di Secret Manager tempat sandi untuk mengakses file keystore. Misalnya,
projects/your-project-number/secrets/your-secret-name/versions/your-secret-version. - schemaRegistryKeyPasswordSecretId: SecretId sandi yang diperlukan untuk mengakses kunci pribadi klien yang disimpan dalam keystore. Misalnya,
projects/your-project-number/secrets/your-secret-name/versions/your-secret-version. - schemaRegistryOauthClientId: Client ID yang digunakan untuk mengautentikasi klien Schema Registry dalam mode OAUTH. Diperlukan untuk format pesan AVRO_CONFLUENT_WIRE_FORMAT.
- schemaRegistryOauthClientSecretId: ID secret dari Google Cloud Secret Manager yang berisi Rahasia Klien yang akan digunakan untuk mengautentikasi klien Schema Registry dalam mode OAUTH. Diperlukan untuk format pesan AVRO_CONFLUENT_WIRE_FORMAT. Contoh,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. - schemaRegistryOauthScope: Cakupan token akses yang digunakan untuk mengautentikasi klien Schema Registry dalam mode OAUTH. Kolom ini bersifat opsional, karena permintaan dapat dilakukan tanpa parameter cakupan yang diteruskan. Contoh,
openid. - schemaRegistryOauthTokenEndpointUrl: URL berbasis HTTP(S) untuk penyedia identitas OAuth/OIDC yang digunakan untuk mengautentikasi klien Schema Registry dalam mode OAUTH. Diperlukan untuk format pesan AVRO_CONFLUENT_WIRE_FORMAT.
- outputDeadletterTable: Nama tabel BigQuery yang sepenuhnya memenuhi syarat untuk pesan yang gagal. Pesan yang gagal mencapai tabel output karena berbagai alasan (misalnya, skema tidak cocok, JSON salah format) akan ditulis ke tabel ini. Tabel akan dibuat oleh template. Contoh,
your-project-id:your-dataset.your-table-name. - javascriptTextTransformGcsPath: Cloud Storage URI dari file .js yang menentukan fungsi yang ditentukan pengguna (UDF) JavaScript yang akan digunakan. Contoh,
gs://my-bucket/my-udfs/my_file.js. - javascriptTextTransformFunctionName: Nama fungsi yang ditentukan pengguna (UDF) JavaScript yang akan digunakan. Misalnya, jika kode fungsi JavaScript Anda adalah
myTransform(inJson) { /*...do stuff...*/ }, maka nama fungsinya adalahmyTransform. Untuk contoh UDF JavaScript, lihat Contoh UDF (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples). - javascriptTextTransformReloadIntervalMinutes: Menentukan seberapa sering UDF dimuat ulang, dalam hitungan menit. Jika nilainya lebih besar dari 0, Dataflow akan memeriksa file UDF di Cloud Storage secara berkala, dan memuat ulang UDF jika file dimodifikasi. Parameter ini memungkinkan Anda mengupdate UDF saat pipeline sedang berjalan, tanpa perlu memulai ulang tugas. Jika nilainya adalah
0, pemuatan ulang UDF akan dinonaktifkan. Nilai defaultnya adalah0.
Fungsi yang ditentukan pengguna (UDF)
Secara opsional, Anda dapat memperluas template ini dengan menulis fungsi yang ditentukan pengguna (UDF). Template ini memanggil UDF untuk setiap elemen input. Payload elemen diserialisasi sebagai string JSON. Untuk mengetahui informasi selengkapnya, lihat Membuat fungsi yang ditentukan pengguna (UDF) untuk template Dataflow.
Template ini hanya mendukung UDF untuk pesan Kafka berformat JSON. Jika pesan Kafka menggunakan format Avro, UDF tidak akan dipanggil.Spesifikasi fungsi
UDF memiliki spesifikasi berikut:
- Input: nilai rekaman Kafka, diserialisasi sebagai string JSON
- Output: string JSON yang cocok dengan skema tabel tujuan BigQuery
Menjalankan template
Konsol
- Buka halaman Dataflow Membuat tugas dari template. Buka Membuat tugas dari template
- Di kolom Nama tugas, masukkan nama tugas yang unik.
- Opsional: Untuk Endpoint regional, pilih nilai dari menu drop-down. Region default-nya adalah
us-central1.Untuk mengetahui daftar region tempat Anda dapat menjalankan tugas Dataflow, lihat Lokasi Dataflow.
- Dari menu drop-down Template Dataflow, pilih the Kafka to BigQuery template.
- Di kolom parameter yang disediakan, masukkan nilai parameter Anda.
- Opsional: Untuk beralih dari pemrosesan tepat satu kali ke mode streaming setidaknya satu kali, pilih Setidaknya Satu Kali.
- Klik Jalankan tugas.
gcloud
Di shell atau terminal Anda, jalankan template:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_BigQuery_Flex \ --parameters \ readBootstrapServerAndTopic=BOOTSTRAP_SERVER_AND_TOPIC,\ kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\ messageFormat=JSON,\ writeMode=SINGLE_TABLE_NAME,\ outputTableSpec=PROJECT_ID:DATASET_NAME.TABLE_NAME,\ useBigQueryDLQ=true,\ outputDeadletterTable=PROJECT_ID:DATASET_NAME.ERROR_TABLE_NAME
Ganti kode berikut:
PROJECT_ID: Google Cloud Project ID tempat Anda ingin menjalankan tugas DataflowJOB_NAME: nama tugas unik pilihan AndaREGION_NAME: region tempat Anda ingin men-deploy tugas Dataflow—misalnya,us-central1VERSION: versi template yang ingin Anda gunakanAnda dapat menggunakan nilai berikut:
latestuntuk menggunakan template versi terbaru, yang tersedia di folder induk tanpa tanggal di bucket—gs://dataflow-templates-REGION_NAME/latest/- nama versi, seperti
2023-09-12-00_RC00, untuk menggunakan template versi tertentu, yang dapat ditemukan bertingkat di masing-masing folder induk yang diberi tanggal dalam bucket—gs://dataflow-templates-REGION_NAME/
BOOTSTRAP_SERVER_AND_TOPIC: alamat dan topik server bootstrap Apache KafkaFormat alamat dan topik server bootstrap bergantung pada jenis cluster:
- Cluster Managed Service for Apache Kafka:
projects/PROJECT_ID/locations/REGION_NAME/clusters/CLUSTER_NAME/topics/TOPIC_NAME - Cluster Kafka eksternal:
BOOTSTRAP_SERVER_ADDRESS;TOPIC_NAME
- Cluster Managed Service for Apache Kafka:
DATASET_NAME: nama set data BigQuery AndaTABLE_NAME: nama tabel output BigQueryERROR_TABLE_NAME: nama tabel BigQuery untuk menulis catatan error
API
Untuk menjalankan template menggunakan REST API, kirim permintaan POST HTTP. Untuk mengetahui informasi selengkapnya tentang API dan cakupan otorisasinya, lihat projects.templates.launch.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "readBootstrapServerAndTopic": "BOOTSTRAP_SERVER_AND_TOPIC", "kafkaReadAuthenticationMode": "APPLICATION_DEFAULT_CREDENTIALS", "messageFormat": "JSON", "writeMode": "SINGLE_TABLE_NAME", "outputTableSpec": "PROJECT_ID:DATASET_NAME.TABLE_NAME", "useBigQueryDLQ": "true", "outputDeadletterTable": "PROJECT_ID:DATASET_NAME.ERROR_TABLE_NAME" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_BigQuery_Flex", } }
Ganti kode berikut:
PROJECT_ID: Google Cloud Project ID tempat Anda ingin menjalankan tugas DataflowJOB_NAME: nama tugas unik pilihan AndaLOCATION: region tempat Anda ingin men-deploy tugas Dataflow—misalnya,us-central1VERSION: versi template yang ingin Anda gunakanAnda dapat menggunakan nilai berikut:
latestuntuk menggunakan template versi terbaru, yang tersedia di folder induk tanpa tanggal di bucket—gs://dataflow-templates-REGION_NAME/latest/- nama versi, seperti
2023-09-12-00_RC00, untuk menggunakan template versi tertentu, yang dapat ditemukan bertingkat di masing-masing folder induk yang diberi tanggal dalam bucket—gs://dataflow-templates-REGION_NAME/
BOOTSTRAP_SERVER_AND_TOPIC: alamat dan topik server bootstrap Apache KafkaFormat alamat dan topik server bootstrap bergantung pada jenis cluster:
- Cluster Managed Service for Apache Kafka:
projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_NAME/topics/TOPIC_NAME - Cluster Kafka eksternal:
BOOTSTRAP_SERVER_ADDRESS;TOPIC_NAME
- Cluster Managed Service for Apache Kafka:
DATASET_NAME: nama set data BigQuery AndaTABLE_NAME: nama tabel output BigQueryERROR_TABLE_NAME: nama tabel BigQuery untuk menulis catatan error
Untuk mengetahui informasi selengkapnya, lihat Menulis data dari Kafka ke BigQuery dengan Dataflow.
Langkah berikutnya
- Pelajari template Dataflow.
- Lihat daftar template yang disediakan Google.