Halaman ini memberikan panduan dan rekomendasi untuk mengupgrade pipeline streaming. Misalnya, Anda mungkin perlu mengupgrade ke versi Apache Beam SDK yang lebih baru, atau Anda mungkin ingin memperbarui kode pipeline. Berbagai opsi disediakan untuk menyesuaikan berbagai skenario.
Sementara pipeline batch berhenti saat tugas selesai, pipeline streaming sering kali berjalan terus-menerus untuk memberikan pemrosesan tanpa gangguan. Oleh karena itu, saat mengupgrade pipeline streaming, Anda harus mempertimbangkan hal berikut:
- Anda mungkin perlu meminimalkan atau menghindari gangguan pada pipeline. Dalam beberapa kasus, Anda mungkin dapat mentoleransi gangguan sementara pada pemrosesan saat versi baru pipeline di-deploy. Dalam kasus lain, aplikasi Anda mungkin tidak dapat mentoleransi gangguan apa pun.
- Proses update pipeline harus menangani perubahan skema dengan cara yang meminimalkan gangguan pada pemrosesan pesan dan sistem terlampir lainnya. Misalnya, jika skema untuk pesan dalam pipeline pemrosesan peristiwa berubah, perubahan skema mungkin juga diperlukan di sink data downstream.
Anda dapat menggunakan salah satu metode berikut untuk memperbarui pipeline streaming, bergantung pada pipeline dan persyaratan pembaruan Anda:
Untuk mengetahui informasi selengkapnya tentang masalah yang mungkin Anda alami selama update dan cara mencegahnya, lihat Memvalidasi tugas pengganti dan Pemeriksaan kompatibilitas tugas.
Praktik terbaik
- Upgrade versi Apache Beam SDK secara terpisah dari perubahan kode pipeline.
- Uji pipeline Anda setelah setiap perubahan sebelum melakukan pembaruan tambahan.
- Upgrade secara rutin versi Apache Beam SDK yang digunakan pipeline Anda.
- Gunakan metode otomatis jika memungkinkan, seperti update saat dalam proses atau update pipeline paralel otomatis.
- Gunakan Managed I/O jika memungkinkan, untuk mendapatkan manfaat dari upgrade otomatis versi konektor.
Melakukan update saat dalam penerbangan
Anda dapat mengupdate beberapa pipeline streaming yang sedang berjalan tanpa menghentikan tugas. Skenario ini disebut pembaruan tugas saat beroperasi. Pembaruan tugas saat beroperasi hanya tersedia dalam keadaan terbatas:
- Tugas harus menggunakan Streaming Engine.
- Tugas harus dalam status berjalan.
- Anda hanya mengubah jumlah pekerja yang digunakan tugas.
Untuk mengetahui informasi selengkapnya, lihat Menetapkan rentang penskalaan otomatis di halaman Penskalaan Otomatis Horizontal.
Untuk mengetahui petunjuk yang menjelaskan cara melakukan update tugas saat sedang berjalan, lihat Memperbarui pipeline yang ada.
Pembuatan atau pembaruan otomatis (upsert) untuk template
Saat meluncurkan pipeline menggunakan template (Template Klasik, Template Flex, Terraform, atau Config Connector), Anda dapat menggunakan create_or_update_jobeksperimen untuk menggunakan fungsi buat atau perbarui (upsert).
Saat Anda menentukan create_or_update_job dalam parameter additional_experiments
atau flag additional-experiments:
- Jika tugas yang sedang berjalan atau menguras dengan nama tugas yang ditentukan sudah ada, layanan template akan otomatis meluncurkan tugas baru sebagai update untuk tugas yang ada.
- Jika tidak ada tugas aktif dengan nama tersebut, layanan template akan meluncurkan tugas baru sebagai pembuatan tugas baru.
Eksperimen ini menghilangkan kebutuhan untuk menentukan secara terprogram apakah akan menggunakan tindakan API buat atau update saat meluncurkan template.
Untuk contoh kode Terraform dan Config Connector yang menggunakan eksperimen ini, lihat bagian berikut:
- Mengirim permintaan pembaruan otomatis hentikan dan ganti
- Mengirim permintaan update pipeline paralel otomatis
Meluncurkan tugas penggantian
Jika tugas yang diperbarui kompatibel dengan tugas yang ada, Anda dapat memperbarui
pipeline menggunakan opsi update. Saat Anda mengganti tugas yang ada, tugas baru akan menjalankan kode pipeline yang diperbarui.
Layanan Dataflow mempertahankan nama tugas, tetapi menjalankan tugas penggantian dengan ID pekerjaan yang diperbarui. Proses ini dapat menyebabkan periode nonaktif
saat tugas yang ada berhenti, pemeriksaan kompatibilitas berjalan, dan tugas baru
dimulai. Untuk mengetahui detail selengkapnya, lihat
Efek mengganti tugas.
Dataflow melakukan pemeriksaan kompatibilitas untuk memastikan kode pipeline yang diupdate dapat di-deploy dengan aman ke pipeline yang sedang berjalan. Perubahan kode tertentu menyebabkan pemeriksaan kompatibilitas gagal, seperti saat input samping ditambahkan ke atau dihapus dari langkah yang ada. Jika pemeriksaan kompatibilitas gagal, Anda tidak dapat melakukan update tugas di tempat.
Untuk mengetahui petunjuk yang menjelaskan cara meluncurkan tugas penggantian, lihat Meluncurkan tugas penggantian.
Jika update pipeline tidak kompatibel dengan tugas saat ini, Anda harus menghentikan dan mengganti pipeline. Jika pipeline Anda tidak dapat mentoleransi periode nonaktif, jalankan pipeline paralel.
Penghentian dan penggantian manual
Untuk melakukan penghentian dan penggantian secara manual, batalkan atau hentikan pipeline, lalu ganti dengan pipeline yang telah diupdate. Membatalkan pipeline akan menyebabkan Dataflow segera menghentikan pemrosesan dan mematikan resource secepat mungkin, yang dapat menyebabkan hilangnya beberapa data yang sedang diproses, yang dikenal sebagai data dalam proses. Untuk menghindari kehilangan data, dalam sebagian besar kasus, penghentian adalah tindakan yang lebih disukai. Anda juga dapat menggunakan snapshot Dataflow untuk menyimpan status pipeline streaming, yang memungkinkan Anda memulai versi baru tugas Dataflow tanpa kehilangan status. Untuk mengetahui informasi selengkapnya, lihat Menggunakan snapshot Dataflow.
Menguras pipeline akan segera menutup jendela yang sedang diproses dan memicu semua pemicu. Meskipun data yang sedang diproses tidak hilang, pengurasan dapat menyebabkan jendela memiliki data yang tidak lengkap. Jika hal ini terjadi, jendela yang sedang diproses akan mengeluarkan hasil yang sebagian atau tidak lengkap. Untuk mengetahui informasi selengkapnya, lihat Efek pengurasan tugas. Setelah tugas yang ada selesai, luncurkan tugas streaming baru yang berisi kode pipeline yang telah diupdate, yang memungkinkan Anda melanjutkan pemrosesan.
Dengan metode ini, Anda akan mengalami waktu non-operasional antara saat tugas streaming yang ada berhenti dan saat pipeline pengganti siap melanjutkan pemrosesan data. Namun, membatalkan atau menguras pipeline yang ada, lalu meluncurkan tugas baru dengan pipeline yang diperbarui lebih mudah daripada menjalankan pipeline paralel.
Untuk mengetahui informasi selengkapnya, lihat Menguras tugas Dataflow. Setelah menguras tugas saat ini, mulai tugas baru dengan nama tugas yang sama.
Penghentian dan penggantian otomatis
Dataflow menyediakan dukungan API untuk meluncurkan update otomatis berhenti dan ganti. Alur kerja gaya deklaratif ini menghilangkan langkah-langkah prosedural manual. Anda mendeklarasikan tugas yang akan diganti, dan tugas baru akan diluncurkan serta mengoordinasikan transisi secara otomatis.
Saat Anda menggunakan alur kerja ini, resource tugas baru akan disediakan saat tugas lama masih berjalan. Kemudian, tugas lama akan otomatis menerima sinyal pengurasan. Setelah tugas lama selesai menguras atau mencapai waktu tunggu yang ditentukan pengguna, tugas baru akan segera mulai memproses data. Gunakan alur kerja ini untuk pipeline yang tidak dapat mentoleransi data duplikat atau agregasi parsial, tetapi dapat menerima jeda pemrosesan singkat saat tugas lama dihentikan.
Mengirim permintaan update otomatis hentikan dan ganti
Untuk menggunakan alur kerja ini:
- Anda harus menetapkan opsi
parallel_replace_job_max_stop_duration. - Anda tidak boleh menyetel opsi
parallel_replace_job_min_parallel_pipelines_duration. Menetapkan durasi paralel memicu alur kerja pembaruan pipeline paralel otomatis sebagai gantinya.
Luncurkan permintaan update otomatis hentikan dan ganti menggunakan opsi layanan berikut:
Java
Opsi 1: Perbarui menggunakan nama tugas yang sama
--update \
--dataflowServiceOptions="update_strategy_parallel_job_update" \
--dataflowServiceOptions="parallel_replace_job_max_stop_duration=DURATION"
- Untuk melakukan update otomatis hentikan dan ganti menggunakan nama yang sama, gunakan
flag
--updatedan opsiupdate_strategy_parallel_job_update. - Untuk melakukan update langsung, gunakan
update_strategy_in_place_update.
Opsi 2: Perbarui menggunakan nama tugas yang berbeda
--dataflowServiceOptions="parallel_replace_job_name=OLD_JOB_NAME" \
--dataflowServiceOptions="parallel_replace_job_max_stop_duration=DURATION"
- Untuk menentukan tugas lama menurut ID, bukan nama tugas, gunakan
--dataflowServiceOptions="parallel_replace_job_id=OLD_JOB_ID". - Jika Anda menentukan nama tugas baru dan menggunakan tanda
--update, Dataflow akan menelusuri tugas yang ada dengan nama baru, yang menyebabkan error.
Opsional: Menonaktifkan pembatalan otomatis
Pembatalan otomatis diaktifkan secara default saat Anda menentukan opsi parallel_replace_job_max_stop_duration. Untuk menonaktifkan pembatalan otomatis, tetapkan opsi parallel_replace_job_cancel_on_drain_timeout ke false.
--dataflowServiceOptions="parallel_replace_job_cancel_on_drain_timeout=false"
Jika Anda menonaktifkan pembatalan otomatis dan tugas lama macet dalam status pengurasan, tugas lama dan baru akan terus berjalan secara paralel.
Python
Opsi 1: Perbarui menggunakan nama tugas yang sama
--update \
--dataflow_service_options="update_strategy_parallel_job_update" \
--dataflow_service_options="parallel_replace_job_max_stop_duration=DURATION"
- Untuk melakukan update otomatis hentikan dan ganti menggunakan nama yang sama, gunakan
flag
--updatedan opsiupdate_strategy_parallel_job_update. - Untuk melakukan update langsung, gunakan
update_strategy_in_place_update.
Opsi 2: Perbarui menggunakan nama tugas yang berbeda
--dataflow_service_options="parallel_replace_job_name=OLD_JOB_NAME" \
--dataflow_service_options="parallel_replace_job_max_stop_duration=DURATION"
- Untuk menentukan tugas lama menurut ID, bukan nama tugas, gunakan
--dataflow_service_options="parallel_replace_job_id=OLD_JOB_ID". - Jika Anda menentukan nama tugas baru dan menggunakan tanda
--update, Dataflow akan menelusuri tugas yang ada dengan nama baru, yang menyebabkan error.
Opsional: Menonaktifkan pembatalan otomatis
Pembatalan otomatis diaktifkan secara default saat Anda menentukan opsi parallel_replace_job_max_stop_duration. Untuk menonaktifkan pembatalan otomatis, tetapkan opsi parallel_replace_job_cancel_on_drain_timeout ke false.
--dataflow_service_options="parallel_replace_job_cancel_on_drain_timeout=false"
Jika Anda menonaktifkan pembatalan otomatis dan tugas lama macet dalam status pengurasan, tugas lama dan baru akan terus berjalan secara paralel.
Go
Opsi 1: Perbarui menggunakan nama tugas yang sama
--update \
--dataflow_service_options="update_strategy_parallel_job_update" \
--dataflow_service_options="parallel_replace_job_max_stop_duration=DURATION"
- Untuk melakukan update otomatis hentikan dan ganti menggunakan nama yang sama, gunakan
flag
--updatedan opsiupdate_strategy_parallel_job_update. - Untuk melakukan update langsung, gunakan
update_strategy_in_place_update.
Opsi 2: Perbarui menggunakan nama tugas yang berbeda
--dataflow_service_options="parallel_replace_job_name=OLD_JOB_NAME" \
--dataflow_service_options="parallel_replace_job_max_stop_duration=DURATION"
- Untuk menentukan tugas lama menurut ID, bukan nama tugas, gunakan
--dataflow_service_options="parallel_replace_job_id=OLD_JOB_ID". - Jika Anda menentukan nama tugas baru dan menggunakan tanda
--update, Dataflow akan menelusuri tugas yang ada dengan nama baru, yang menyebabkan error.
Opsional: Menonaktifkan pembatalan otomatis
Pembatalan otomatis diaktifkan secara default saat Anda menentukan opsi parallel_replace_job_max_stop_duration. Untuk menonaktifkan pembatalan otomatis, tetapkan opsi parallel_replace_job_cancel_on_drain_timeout ke false.
--dataflow_service_options="parallel_replace_job_cancel_on_drain_timeout=false"
Jika Anda menonaktifkan pembatalan otomatis dan tugas lama macet dalam status pengurasan, tugas lama dan baru akan terus berjalan secara paralel.
gcloud
Opsi 1: Perbarui menggunakan nama tugas yang sama
--update \
--additional-experiments="update_strategy_parallel_job_update" \
--additional-experiments="parallel_replace_job_max_stop_duration=DURATION"
- Untuk melakukan update otomatis hentikan dan ganti menggunakan nama yang sama, gunakan
flag
--updatedan opsiupdate_strategy_parallel_job_update. - Untuk melakukan update langsung, gunakan
update_strategy_in_place_update.
Opsi 2: Perbarui menggunakan nama tugas yang berbeda
--additional-experiments="parallel_replace_job_name=OLD_JOB_NAME" \
--additional-experiments="parallel_replace_job_max_stop_duration=DURATION"
- Untuk menentukan tugas lama menurut ID, bukan nama tugas, gunakan
--additional-experiments="parallel_replace_job_id=OLD_JOB_ID". - Jika Anda menentukan nama tugas baru dan menggunakan tanda
--update, Dataflow akan menelusuri tugas yang ada dengan nama baru, yang menyebabkan error.
Opsional: Menonaktifkan pembatalan otomatis
Pembatalan otomatis diaktifkan secara default saat Anda menentukan opsi parallel_replace_job_max_stop_duration. Untuk menonaktifkan pembatalan otomatis, tetapkan opsi parallel_replace_job_cancel_on_drain_timeout ke false.
--additional-experiments="parallel_replace_job_cancel_on_drain_timeout=false"
Jika Anda menonaktifkan pembatalan otomatis dan tugas lama macet dalam status pengurasan, tugas lama dan baru akan terus berjalan secara paralel.
Opsional: Upsert (membuat atau memperbarui tugas)
Untuk mengaktifkan perilaku upsert (membuat atau memperbarui tugas):
--additional-experiments="create_or_update_job"
Terraform
additional_experiments = [
"parallel_replace_job_max_stop_duration=DURATION",
"parallel_replace_job_cancel_on_drain_timeout=true",
"update_strategy_parallel_job_update",
"parallel_replace_job_preallocate_compute_resources=true",
"create_or_update_job"
]
Config Connector
metadata:
annotations:
# Force to use only direct controller. Multiple controllers can cause a Dataflow job to enter into a continuous update loop.
# https://docs.cloud.google.com/config-connector/docs/concepts/controller-types#underlying-controller-types
alpha.cnrm.cloud.google.com/reconciler: direct
# Optional but recommended: Dataflow batch jobs do not support the drain operation. But for Dataflow streaming jobs, prefer "drain" over "cancel" as an on-delete option.
cnrm.cloud.google.com/on-delete: drain
# Optional but recommended: Set deletion-policy to "abandon" to avoid accidental deletion, this will ignore the on-delete option.
cnrm.cloud.google.com/deletion-policy: abandon
spec:
...
additionalExperiments:
- "parallel_replace_job_max_stop_duration=DURATION"
- "parallel_replace_job_cancel_on_drain_timeout=true"
- "update_strategy_parallel_job_update"
- "parallel_replace_job_preallocate_compute_resources=true"
- "create_or_update_job"
Ganti variabel berikut:
- Anda harus memberikan
parallel_replace_job_nameatauparallel_replace_job_iduntuk mengidentifikasi tugas yang akan diganti:OLD_JOB_NAME: Nama tugas yang akan diganti.OLD_JOB_ID: ID tugas yang akan diganti.
- Anda harus memberikan nilai
parallel_replace_job_max_stop_durationuntuk mengaktifkan penghentian dan penggantian otomatis:DURATION: Jumlah waktu maksimum tugas baru menunggu hingga tugas lama selesai dikuras. Durasi harus diformat sebagai string yang diakhiri dengans,m, atauh(misalnya,30m,1h).
- Jangan tetapkan opsi
parallel_replace_job_min_parallel_pipelines_durationsaat menggunakan alur kerja ini. Menetapkan opsi ini akan memicu alur kerja pembaruan pipeline paralel otomatis, bukan pembaruan pipeline manual. - Opsional: Konfigurasi opsi
parallel_replace_job_cancel_on_drain_timeout. Karena pembatalan otomatis diaktifkan (defaultnyatrue) secara default saat opsiparallel_replace_job_max_stop_durationditetapkan, Anda tidak perlu mengonfigurasi opsi ini secara eksplisit untuk mengaktifkannya.- Untuk mempertahankan perilaku default, hilangkan opsi ini atau tetapkan ke
true. - Untuk menonaktifkan pembatalan otomatis, setel opsi ini ke
false. Jika Anda menyetel opsi ini kefalsedan tugas lama macet dalam status pengurasan, tugas lama dan baru akan terus berjalan secara paralel.
- Untuk mempertahankan perilaku default, hilangkan opsi ini atau tetapkan ke
- Opsional: Tentukan konfigurasi
parallel_replace_job_preallocate_compute_resources:- Menentukan apakah pekerja disediakan terlebih dahulu untuk tugas baru
saat tugas lama dihentikan. Nilai:
true(default) ataufalse. Untuk Terraform dan Config Connector, sebaiknya setel opsi ini ketrueuntuk mencegah waktu tunggu penyediaan resource habis. Jikaparallel_replace_job_preallocate_compute_resourcesdisetel kefalse, tugas baru akan tetap dalam status tertunda hingga tugas lama selesai.
- Menentukan apakah pekerja disediakan terlebih dahulu untuk tugas baru
saat tugas lama dihentikan. Nilai:
Pemrosesan ulang pesan dengan Snapshot dan Pencarian Pub/Sub
Dalam beberapa situasi, setelah mengganti atau membatalkan pipeline yang habis, Anda mungkin perlu memproses ulang pesan Pub/Sub yang sebelumnya dikirim. Misalnya, Anda mungkin perlu menggunakan logika bisnis yang diperbarui untuk memproses ulang data. Pub/Sub Seek adalah fitur yang memungkinkan Anda memutar ulang pesan dari snapshot Pub/Sub. Anda dapat menggunakan Pub/Sub Seek dengan Dataflow untuk memproses ulang pesan dari saat snapshot langganan dibuat.
Selama pengembangan dan pengujian, Anda juga dapat menggunakan Pub/Sub Seek untuk memutar ulang pesan yang diketahui berulang kali guna memverifikasi output dari pipeline Anda. Saat menggunakan Pub/Sub Seek, jangan mencari snapshot langganan saat langganan sedang digunakan oleh pipeline. Jika Anda melakukannya, pencarian dapat membatalkan logika watermark Dataflow dan dapat memengaruhi pemrosesan pesan Pub/Sub tepat satu kali.
Alur kerja gcloud CLI yang direkomendasikan untuk menggunakan Pub/Sub Seek dengan pipeline Dataflow di jendela terminal adalah sebagai berikut:
Untuk membuat snapshot langganan, gunakan perintah
gcloud pubsub snapshots create:gcloud pubsub snapshots create SNAPSHOT_NAME --subscription=PIPELINE_SUBSCRIPTION_NAME
Untuk menghentikan atau membatalkan pipeline, gunakan perintah
gcloud dataflow jobs drainatau perintahgcloud dataflow jobs cancel:gcloud dataflow jobs drain JOB_ID
atau
gcloud dataflow jobs cancel JOB_ID
Untuk mencari snapshot, gunakan perintah
gcloud pubsub subscriptions seek:gcloud pubsub subscriptions seek SNAPSHOT_NAME
Deploy pipeline baru yang menggunakan langganan.
Menjalankan pipeline paralel
Jika perlu menghindari gangguan pada pipeline streaming selama update, Anda dapat menjalankan pipeline paralel. Dengan pendekatan ini, Anda dapat meluncurkan tugas streaming baru dengan kode pipeline yang telah diupdate dan menjalankannya secara paralel dengan tugas yang ada. Anda dapat menggunakan alur kerja deployment update pipeline paralel otomatis Dataflow, atau melakukan langkah-langkahnya secara manual.
Ringkasan pipeline paralel
Saat membuat pipeline baru, gunakan strategi windowing yang sama dengan yang Anda gunakan untuk pipeline yang ada. Untuk alur kerja manual, biarkan pipeline yang ada terus berjalan hingga tanda airnya melampaui stempel waktu jendela lengkap paling awal yang diproses oleh pipeline yang diperbarui. Kemudian, hentikan atau batalkan pipeline yang ada. Jika menggunakan alur kerja otomatis, pekerjaan ini akan dilakukan untuk Anda. Pipeline yang diperbarui akan terus berjalan di tempatnya dan secara efektif mengambil alih pemrosesan dengan sendirinya.
Diagram berikut menggambarkan proses ini.
Dalam diagram, Pipeline B adalah tugas yang diperbarui yang mengambil alih dari Pipeline A. Nilai t adalah stempel waktu jendela lengkap paling awal yang diproses oleh Pipeline B. Nilai w adalah tanda air untuk Pipeline A. Untuk mempermudah, tanda air yang sempurna diasumsikan tanpa data terlambat. Waktu pemrosesan dan waktu nyata ditampilkan pada sumbu horizontal. Kedua pipeline menggunakan jendela tetap (berjatuhan) lima menit. Hasil dipicu setelah tanda air melewati akhir setiap jendela.
Karena output serentak terjadi selama periode waktu saat kedua pipeline tumpang-tindih, konfigurasi kedua pipeline untuk menulis hasil ke tujuan yang berbeda. Sistem hilir kemudian dapat menggunakan abstraksi pada dua sink tujuan, seperti tampilan database, untuk membuat kueri hasil gabungan. Sistem ini juga dapat menggunakan abstraksi untuk menghapus duplikat hasil dari periode yang tumpang-tindih. Untuk mengetahui informasi selengkapnya, lihat Menangani output duplikat.
Batasan
Penggunaan update pipeline paralel otomatis atau manual memiliki batasan berikut:
- Update otomatis saja: Tugas paralel baru harus berupa tugas Streaming Engine.
- Tugas serentak dengan nama yang sama tidak diizinkan. Namun, saat melakukan update pipeline paralel atau penggantian otomatis stop-and-replace menggunakan nama tugas yang sama, Anda dapat menggunakan kembali nama tugas tersebut. Dalam hal ini, tugas baru harus dimulai setidaknya dua menit setelah dimulainya tugas sebelumnya. Pembatasan ini mencegah beberapa pembaruan paralel dari percobaan ulang library klien yang berulang atau panggilan prosedur jarak jauh yang sudah tidak berlaku.
- Menjalankan dua pipeline secara paralel pada input yang sama dapat menyebabkan duplikasi data, agregasi parsial, dan potensi masalah pengurutan saat data dimasukkan ke dalam sink. Sistem hilir harus dirancang untuk mengantisipasi dan mengelola hasil ini.
- Saat membaca dari sumber Pub/Sub, menggunakan langganan yang sama untuk beberapa pipeline tidak direkomendasikan dan dapat menyebabkan masalah kebenaran. Namun, dalam beberapa kasus penggunaan, seperti pipeline ekstrak, transformasi, muat (ETL), penggunaan langganan yang sama di dua pipeline dapat mengurangi duplikasi. Masalah penskalaan otomatis kemungkinan terjadi setiap kali Anda memberikan nilai bukan nol untuk durasi yang tumpang-tindih. Hal ini dapat diatasi dengan menggunakan fitur pembaruan tugas saat beroperasi. Untuk mengetahui informasi selengkapnya, lihat Menyesuaikan penskalaan otomatis untuk pipeline streaming Pub/Sub.
- Untuk Apache Kafka, Anda dapat meminimalkan duplikat dengan mengaktifkan penerapan offset di Kafka. Untuk mengaktifkan penerapan offset di Kafka, lihat Menerapkan kembali ke Kafka.
Update pipeline paralel otomatis
Dataflow menyediakan dukungan API untuk meluncurkan tugas penggantian paralel. API gaya deklaratif ini mengabstraksi pekerjaan manual dalam menjalankan langkah-langkah prosedural. Anda mendeklarasikan tugas yang ingin diperbarui, lalu tugas baru berjalan secara paralel dengan tugas lama. Setelah tugas baru berjalan selama durasi yang Anda tentukan, tugas lama akan dihentikan. Fitur ini menghilangkan jeda pemrosesan selama update. Hal ini juga mengurangi upaya operasional yang diperlukan untuk memperbarui pipeline yang tidak kompatibel.
Metode pembaruan ini paling cocok untuk pipeline yang dapat mentoleransi beberapa duplikat atau agregasi parsial dan tidak memerlukan pengurutan yang ketat saat memasukkan data. Fitur ini sangat cocok untuk pipeline ETL, serta pipeline yang menggunakan mode streaming minimal sekali dan transformasi
Redistribute
dengan setelan izinkan duplikat ditetapkan ke true.
Opsi layanan pipeline paralel otomatis
Gunakan opsi layanan berikut untuk update pipeline paralel otomatis:
| Opsi layanan | Opsional atau wajib | Deskripsi | Dependensi atau pengecualian |
|---|---|---|---|
update_strategy_parallel_job_update |
Wajib (Opsi 1: Perbarui menggunakan nama tugas yang sama) | Perintah untuk melakukan update paralel, yang menjalankan kedua pipeline secara bersamaan untuk meminimalkan waktu nonaktif, saat melakukan update dengan nama tugas yang sama. | Harus ditetapkan bersama dengan tanda --update dan
parallel_replace_job_min_parallel_pipelines_duration.
|
update_strategy_in_place_update |
Opsional | Alternatif untuk update paralel. Melakukan update tugas di tempat standar. | Harus ditetapkan bersama dengan tanda --update.
Tidak dapat muncul bersamaan dengan
Jika opsi ini disetel, opsi lain yang terkait dengan tugas paralel akan diabaikan. |
parallel_replace_job_min_parallel_pipelines_duration |
Wajib | Menentukan durasi minimum kedua pipeline berjalan secara bersamaan.
Setelah durasi ini berlalu, sinyal pengurasan akan dikirim ke tugas lama.
Nilai yang dapat diterima berkisar dari 0s (direkomendasikan untuk nol
tumpang-tindih) hingga 744h (31 hari).
|
Harus dipasangkan dengan cara menargetkan tugas lama. Salah satu dari berikut ini:
|
parallel_replace_job_name atau
parallel_replace_job_id (pilih salah satu) |
Wajib (Opsi 2: Perbarui menggunakan nama tugas yang berbeda) | Mengidentifikasi tugas lama berdasarkan nama atau ID yang akan diganti selama pembaruan nama yang berbeda. | Memerlukan parallel_replace_job_min_parallel_pipelines_duration
untuk ditetapkan.
Jangan gunakan flag |
parallel_replace_job_max_stop_duration |
Opsional | Durasi maksimum tugas lama diizinkan untuk dikosongkan sebelum
pembatalan otomatis dipicu. Misalnya, 30m atau
1h. |
Memerlukan penetapan alur kerja update paralel (Opsi 1 atau Opsi 2). |
parallel_replace_job_cancel_on_drain_timeout |
Opsional Nilai defaultnya adalah |
Opsi boolean yang menentukan apakah tugas lama harus dibatalkan jika durasi pengurasannya melebihi parallel_replace_job_max_stop_duration. |
Digunakan bersama dengan
parallel_replace_job_max_stop_duration.
Setel ke |
Mengirim permintaan pembaruan pipeline paralel otomatis
Untuk menggunakan alur kerja otomatis, luncurkan tugas streaming baru. Anda dapat memperbarui tugas menggunakan nama tugas yang sama atau nama tugas yang berbeda.
Java
Opsi 1: Perbarui menggunakan nama tugas yang sama
--update \
--dataflowServiceOptions="update_strategy_parallel_job_update" \
--dataflowServiceOptions="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
- Untuk melakukan update paralel menggunakan nama yang sama, gunakan flag
--updatedan opsiupdate_strategy_parallel_job_update. - Untuk melakukan update di tempat tanpa menghapus opsi yang terkait dengan
tugas paralel, gunakan
update_strategy_in_place_update, bukanupdate_strategy_parallel_job_update.
Opsi 2: Perbarui menggunakan nama tugas yang berbeda
--dataflowServiceOptions="parallel_replace_job_name=OLD_JOB_NAME" \
--dataflowServiceOptions="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
- Untuk menentukan tugas lama menurut ID, bukan nama tugas, gunakan
--dataflowServiceOptions="parallel_replace_job_id=OLD_JOB_ID". - Jika Anda menentukan nama tugas baru dan menggunakan tanda
--update, Dataflow akan menelusuri tugas yang ada dengan nama baru, yang menyebabkan error.
Opsional: Mengonfigurasi waktu tunggu pengurasan dan pembatalan otomatis
Anda dapat menambahkan opsi berikut ke salah satu konfigurasi untuk menetapkan waktu tunggu pengurasan dan membatalkan tugas lama secara otomatis jika macet.
--dataflowServiceOptions="parallel_replace_job_max_stop_duration=DRAIN_TIMEOUT_DURATION" \
--dataflowServiceOptions="parallel_replace_job_cancel_on_drain_timeout=true"
Jika Anda menonaktifkan pembatalan otomatis dan tugas lama macet dalam status pengurasan, tugas lama dan baru akan terus berjalan secara paralel.
Python
Opsi 1: Perbarui menggunakan nama tugas yang sama
--update \
--dataflow_service_options="update_strategy_parallel_job_update" \
--dataflow_service_options="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
- Untuk melakukan update paralel menggunakan nama yang sama, gunakan flag
--updatedan opsiupdate_strategy_parallel_job_update. - Untuk melakukan update di tempat tanpa menghapus opsi yang terkait dengan
tugas paralel, gunakan
update_strategy_in_place_update, bukanupdate_strategy_parallel_job_update.
Opsi 2: Perbarui menggunakan nama tugas yang berbeda
--dataflow_service_options="parallel_replace_job_name=OLD_JOB_NAME" \
--dataflow_service_options="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
- Untuk menentukan tugas lama menurut ID, bukan nama tugas, gunakan
--dataflow_service_options="parallel_replace_job_id=OLD_JOB_ID". - Jika Anda menentukan nama tugas baru dan menggunakan tanda
--update, Dataflow akan menelusuri tugas yang ada dengan nama baru, yang menyebabkan error.
Opsional: Mengonfigurasi waktu tunggu pengurasan dan pembatalan otomatis
Anda dapat menambahkan opsi berikut ke salah satu konfigurasi untuk menetapkan waktu tunggu pengurasan dan membatalkan tugas lama secara otomatis jika macet.
--dataflow_service_options="parallel_replace_job_max_stop_duration=DRAIN_TIMEOUT_DURATION" \
--dataflow_service_options="parallel_replace_job_cancel_on_drain_timeout=true"
Jika Anda menonaktifkan pembatalan otomatis dan tugas lama macet dalam status pengurasan, tugas lama dan baru akan terus berjalan secara paralel.
Go
Opsi 1: Perbarui menggunakan nama tugas yang sama
--update \
--dataflow_service_options="update_strategy_parallel_job_update" \
--dataflow_service_options="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
- Untuk melakukan update paralel menggunakan nama yang sama, gunakan flag
--updatedan opsiupdate_strategy_parallel_job_update. - Untuk melakukan update di tempat tanpa menghapus opsi yang terkait dengan
tugas paralel, gunakan
update_strategy_in_place_update, bukanupdate_strategy_parallel_job_update.
Opsi 2: Perbarui menggunakan nama tugas yang berbeda
--dataflow_service_options="parallel_replace_job_name=OLD_JOB_NAME" \
--dataflow_service_options="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
- Untuk menentukan tugas lama menurut ID, bukan nama tugas, gunakan
--dataflow_service_options="parallel_replace_job_id=OLD_JOB_ID". - Jika Anda menentukan nama tugas baru dan menggunakan tanda
--update, Dataflow akan menelusuri tugas yang ada dengan nama baru, yang menyebabkan error.
Opsional: Mengonfigurasi waktu tunggu pengurasan dan pembatalan otomatis
Anda dapat menambahkan opsi berikut ke salah satu konfigurasi untuk menetapkan waktu tunggu pengurasan dan membatalkan tugas lama secara otomatis jika macet.
--dataflow_service_options="parallel_replace_job_max_stop_duration=DRAIN_TIMEOUT_DURATION" \
--dataflow_service_options="parallel_replace_job_cancel_on_drain_timeout=true"
Jika Anda menonaktifkan pembatalan otomatis dan tugas lama macet dalam status pengurasan, tugas lama dan baru akan terus berjalan secara paralel.
gcloud
Opsi 1: Perbarui menggunakan nama tugas yang sama
--update \
--additional-experiments="update_strategy_parallel_job_update" \
--additional-experiments="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
- Untuk melakukan update paralel menggunakan nama yang sama, gunakan flag
--updatedan opsiupdate_strategy_parallel_job_update. - Untuk melakukan update di tempat tanpa menghapus opsi yang terkait dengan
tugas paralel, gunakan
update_strategy_in_place_update, bukanupdate_strategy_parallel_job_update.
Opsi 2: Perbarui menggunakan nama tugas yang berbeda
--additional-experiments="parallel_replace_job_name=OLD_JOB_NAME" \
--additional-experiments="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
- Untuk menentukan tugas lama menurut ID, bukan nama tugas, gunakan
--additional-experiments="parallel_replace_job_id=OLD_JOB_ID". - Jika Anda menentukan nama tugas baru dan menggunakan tanda
--update, Dataflow akan menelusuri tugas yang ada dengan nama baru, yang menyebabkan error.
Opsional: Mengonfigurasi waktu tunggu pengurasan dan pembatalan otomatis
Anda dapat menambahkan opsi berikut ke salah satu konfigurasi untuk menetapkan waktu tunggu pengurasan dan membatalkan tugas lama secara otomatis jika macet.
--additional-experiments="parallel_replace_job_max_stop_duration=DRAIN_TIMEOUT_DURATION" \
--additional-experiments="parallel_replace_job_cancel_on_drain_timeout=true"
Jika Anda menonaktifkan pembatalan otomatis dan tugas lama macet dalam status pengurasan, tugas lama dan baru akan terus berjalan secara paralel.
Opsional: Upsert (membuat atau memperbarui tugas)
Untuk mengaktifkan perilaku upsert (membuat atau memperbarui tugas):
--additional-experiments="create_or_update_job"
Terraform
additional_experiments = [
"parallel_replace_job_min_parallel_pipelines_duration=DURATION",
"parallel_replace_job_max_stop_duration=DRAIN_TIMEOUT_DURATION",
"update_strategy_parallel_job_update",
"create_or_update_job"
]
Config Connector
metadata:
annotations:
# Force to use only direct controller. Multiple controllers can cause a Dataflow job to enter into a continuous update loop.
# https://docs.cloud.google.com/config-connector/docs/concepts/controller-types#underlying-controller-types
alpha.cnrm.cloud.google.com/reconciler: direct
# Optional but recommended: Dataflow batch jobs do not support the drain operation. But for Dataflow streaming jobs, prefer "drain" over "cancel" as an on-delete option.
cnrm.cloud.google.com/on-delete: drain
# Optional but recommended: Set deletion-policy to "abandon" to avoid accidental deletion, this will ignore the on-delete option.
cnrm.cloud.google.com/deletion-policy: abandon
spec:
...
additionalExperiments:
- "parallel_replace_job_min_parallel_pipelines_duration=DURATION"
- "parallel_replace_job_max_stop_duration=DRAIN_TIMEOUT_DURATION"
- "update_strategy_parallel_job_update"
- "create_or_update_job"
Ganti variabel berikut:
- Jika Anda melakukan update menggunakan nama tugas yang berbeda (Opsi 2), Anda harus
memberikan
parallel_replace_job_nameatauparallel_replace_job_iduntuk mengidentifikasi tugas yang akan diganti. Update menggunakan nama tugas yang berbeda tidak didukung untuk Terraform atau Config Connector.OLD_JOB_NAME: Nama tugas yang akan diganti.OLD_JOB_ID: ID tugas yang akan diganti.
DURATION: Jumlah waktu minimum kedua pipeline berjalan secara paralel sebagai bilangan bulat atau bilangan floating point. Durasi0sdirekomendasikan untuk tanpa tumpang-tindih. Setelah durasi ini berlalu, tugas lama akan dikirim sinyal penghentian.Durasi harus antara 0 detik (
0s) dan 31 hari (744h). Gunakans,m, danhuntuk menentukan detik, menit, dan jam. Misalnya,10madalah 10 menit.DRAIN_TIMEOUT_DURATION: Opsional. Durasi maksimum tugas lama harus dikosongkan sebelum pembatalan otomatis dipicu. Durasi harus diformat sebagai string yang diakhiri dengans,m, atauh(misalnya,30m,1h).parallel_replace_job_cancel_on_drain_timeout: Opsional. Apakah akan membatalkan tugas sebelumnya jika tidak selesai menguras daya sebelum durasi penghentian maksimum. Nilai defaultnya adalahtruejika durasi waktu tunggu pengurasan disediakan. Untuk menonaktifkan pembatalan otomatis, setel opsi ini kefalse. Jika Anda menyetel opsi ini kefalsedan tugas lama macet dalam status pengurasan, tugas lama dan baru akan terus berjalan secara paralel.
Saat Anda meluncurkan tugas baru, Dataflow akan menunggu semua pekerja disediakan sebelum mulai memproses data. Untuk memantau status deployment, periksa log tugas Dataflow.
Menjalankan pipeline paralel secara manual
Untuk skenario yang lebih kompleks, atau saat Anda memerlukan kontrol yang lebih besar atas proses update, Anda dapat menjalankan pipeline paralel secara manual. Biarkan pipeline yang ada terus berjalan hingga tanda airnya melampaui stempel waktu jendela lengkap paling awal yang diproses oleh pipeline yang diupdate. Kemudian, kosongkan atau batalkan pipeline yang ada.
Menangani output duplikat
Contoh berikut menjelaskan satu pendekatan untuk menangani output duplikat. Kedua pipeline menulis output ke tujuan yang berbeda, menggunakan sistem hilir untuk membuat kueri hasil, dan menghapus duplikat hasil dari periode yang tumpang-tindih. Contoh ini menggunakan pipeline yang membaca data input dari Pub/Sub, melakukan beberapa pemrosesan, dan menulis hasilnya ke BigQuery.
Pada status awal, pipeline streaming yang ada (Pipeline A) sedang berjalan dan membaca pesan dari topik Pub/Sub (Topic) menggunakan langganan (Subscription A). Hasilnya ditulis ke tabel BigQuery (Tabel A). Hasil digunakan melalui tampilan BigQuery, yang bertindak sebagai fasad untuk menyamarkan perubahan tabel pokok. Proses ini adalah penerapan metode desain yang disebut pola fasad. Diagram berikut menunjukkan status awal.
Buat langganan baru (Subscription B) untuk pipeline yang diperbarui. Deploy pipeline yang telah diupdate (Pipeline B), yang membaca dari topik Pub/Sub (Topic) menggunakan Subscription B dan menulis ke tabel BigQuery terpisah (Table B). Diagram berikut menggambarkan alur ini.
Pada tahap ini, Pipeline A dan Pipeline B berjalan secara paralel dan menulis hasil ke tabel terpisah. Anda mencatat waktu t sebagai stempel waktu jendela lengkap paling awal yang diproses oleh Pipeline B.
Saat watermark Pipeline A melebihi waktu t, kosongkan Pipeline A. Saat Anda mengosongkan pipeline, semua jendela yang terbuka akan ditutup, dan pemrosesan untuk data dalam proses akan selesai. Jika pipeline berisi jendela dan jendela lengkap penting (dengan asumsi tidak ada data terlambat), sebelum mengosongkan Pipeline A, biarkan kedua pipeline berjalan hingga Anda memiliki jendela yang tumpang-tindih dan lengkap. Hentikan tugas streaming untuk Pipeline A setelah semua data dalam proses diproses dan ditulis ke Table A. Diagram berikut menunjukkan tahap ini.
Pada tahap ini, hanya Pipeline B yang berjalan. Anda dapat membuat kueri dari tampilan BigQuery (Façade View), yang bertindak sebagai fasad untuk Table A dan Table B. Untuk baris yang memiliki stempel waktu yang sama di kedua tabel, konfigurasikan tampilan untuk menampilkan baris dari Tabel B, atau, jika baris tidak ada di Tabel B, kembali ke Tabel A. Diagram berikut menunjukkan tampilan (Façade View) yang membaca dari Table A dan Table B.
Pada tahap ini, Anda dapat menghapus Subscription A.
Jika masalah terdeteksi pada deployment pipeline baru, memiliki pipeline paralel dapat menyederhanakan rollback. Dalam contoh ini, Anda mungkin ingin menjalankan Pipeline A sambil memantau Pipeline B untuk mengetahui apakah operasi berjalan dengan benar. Jika ada masalah pada Pipeline B, Anda dapat melakukan rollback ke Pipeline A.
Menangani mutasi skema
Sistem penanganan data sering kali perlu mengakomodasi mutasi skema dari waktu ke waktu, terkadang karena perubahan persyaratan bisnis dan terkadang karena alasan teknis. Menerapkan update skema biasanya memerlukan perencanaan dan eksekusi yang cermat untuk menghindari gangguan pada sistem informasi bisnis.
Pertimbangkan pipeline yang membaca pesan yang berisi payload JSON dari topik Pub/Sub. Pipeline mengonversi setiap pesan menjadi instance
TableRow
lalu menulis baris ke tabel BigQuery. Skema tabel output mirip dengan pesan yang diproses oleh pipeline.
Dalam diagram berikut, skema disebut sebagai Schema A.
Seiring waktu, skema pesan dapat berubah dengan cara yang tidak sepele. Misalnya, kolom ditambahkan, dihapus, atau diganti. Schema A berkembang menjadi skema baru. Dalam diskusi berikutnya, skema baru disebut sebagai Schema B. Dalam kasus ini, Pipeline A perlu diperbarui, dan skema tabel output harus mendukung Skema B.
Untuk tabel output, Anda dapat melakukan beberapa mutasi skema tanpa periode nonaktif.
Misalnya, Anda dapat menambahkan kolom baru atau melonggarkan
mode kolom,
seperti mengubah REQUIRED menjadi NULLABLE, tanpa periode nonaktif.
Mutasi ini biasanya tidak memengaruhi kueri yang ada. Namun, mutasi skema yang mengubah atau menghapus kolom skema yang ada akan merusak kueri atau menyebabkan gangguan lainnya. Pendekatan berikut mengakomodasi perubahan tanpa memerlukan periode nonaktif.
Pisahkan data yang ditulis oleh pipeline ke dalam tabel utama dan ke dalam satu atau beberapa tabel penyiapan. Tabel utama menyimpan data historis yang ditulis oleh pipeline. Tabel penyiapan menyimpan output pipeline terbaru. Anda dapat menentukan tampilan fasad BigQuery di atas tabel utama dan tabel penyiapan, yang memungkinkan konsumen membuat kueri data historis dan terbaru.
Diagram berikut merevisi alur pipeline sebelumnya untuk menyertakan tabel penyiapan (Staging Table A), tabel utama, dan tampilan fasad.
Dalam alur yang direvisi, Pipeline A memproses pesan yang menggunakan Schema A dan menulis output ke Staging Table A, yang memiliki skema yang kompatibel. Tabel utama berisi data historis yang ditulis oleh versi pipeline sebelumnya, serta hasil yang digabungkan secara berkala dari tabel penyiapan. Konsumen dapat membuat kueri data terbaru, termasuk data historis dan real-time, dengan menggunakan tampilan fasad.
Saat skema pesan berubah dari Skema A menjadi Skema B, Anda dapat memperbarui kode pipeline agar kompatibel dengan pesan yang menggunakan Skema B. Pipeline yang ada perlu diupdate dengan implementasi baru. Dengan menjalankan pipeline paralel, Anda dapat memastikan pemrosesan data streaming terus berjalan tanpa gangguan. Menghentikan dan mengganti pipeline akan menyebabkan gangguan dalam pemrosesan, karena tidak ada pipeline yang berjalan selama jangka waktu tertentu.
Pipeline yang diperbarui akan menulis ke tabel penyiapan tambahan (Tabel Penyiapan B) yang menggunakan Skema B. Anda dapat menggunakan alur kerja yang diatur untuk membuat tabel penyiapan baru sebelum memperbarui pipeline. Perbarui tampilan fasad untuk menyertakan hasil dari tabel penyiapan baru, yang berpotensi menggunakan langkah alur kerja terkait.
Diagram berikut menunjukkan alur yang diperbarui yang menampilkan Staging Table B dengan Schema B dan cara tampilan fasad diperbarui untuk menyertakan konten dari tabel utama dan dari kedua tabel penyiapan.
Sebagai proses terpisah dari update pipeline, Anda dapat menggabungkan tabel penyiapan ke tabel utama, baik secara berkala maupun sesuai kebutuhan. Diagram berikut menunjukkan cara Staging Table A digabungkan ke dalam tabel utama.
Langkah berikutnya
- Temukan langkah-langkah mendetail untuk memperbarui pipeline yang ada.