Mengupgrade pipeline streaming

Halaman ini memberikan panduan dan rekomendasi untuk mengupgrade pipeline streaming. Misalnya, Anda mungkin perlu mengupgrade ke versi Apache Beam SDK yang lebih baru, atau Anda mungkin ingin memperbarui kode pipeline. Berbagai opsi disediakan untuk menyesuaikan berbagai skenario.

Sementara pipeline batch berhenti saat tugas selesai, pipeline streaming sering kali berjalan terus-menerus untuk memberikan pemrosesan tanpa gangguan. Oleh karena itu, saat mengupgrade pipeline streaming, Anda harus mempertimbangkan hal berikut:

  • Anda mungkin perlu meminimalkan atau menghindari gangguan pada pipeline. Dalam beberapa kasus, Anda mungkin dapat mentoleransi gangguan sementara pada pemrosesan saat versi baru pipeline di-deploy. Dalam kasus lain, aplikasi Anda mungkin tidak dapat mentoleransi gangguan apa pun.
  • Proses update pipeline harus menangani perubahan skema dengan cara yang meminimalkan gangguan pada pemrosesan pesan dan sistem terlampir lainnya. Misalnya, jika skema untuk pesan dalam pipeline pemrosesan peristiwa berubah, perubahan skema mungkin juga diperlukan di sink data downstream.

Anda dapat menggunakan salah satu metode berikut untuk memperbarui pipeline streaming, bergantung pada pipeline dan persyaratan pembaruan Anda:

Untuk mengetahui informasi selengkapnya tentang masalah yang mungkin Anda alami selama update dan cara mencegahnya, lihat Memvalidasi tugas pengganti dan Pemeriksaan kompatibilitas tugas.

Praktik terbaik

  • Upgrade versi Apache Beam SDK secara terpisah dari perubahan kode pipeline.
  • Uji pipeline Anda setelah setiap perubahan sebelum melakukan pembaruan tambahan.
  • Upgrade secara rutin versi Apache Beam SDK yang digunakan pipeline Anda.
  • Gunakan metode otomatis jika memungkinkan, seperti update saat dalam proses atau update pipeline paralel otomatis.
  • Gunakan Managed I/O jika memungkinkan, untuk mendapatkan manfaat dari upgrade otomatis versi konektor.

Melakukan update saat dalam penerbangan

Anda dapat mengupdate beberapa pipeline streaming yang sedang berjalan tanpa menghentikan tugas. Skenario ini disebut pembaruan tugas saat beroperasi. Pembaruan tugas saat beroperasi hanya tersedia dalam keadaan terbatas:

  • Tugas harus menggunakan Streaming Engine.
  • Tugas harus dalam status berjalan.
  • Anda hanya mengubah jumlah pekerja yang digunakan tugas.

Untuk mengetahui informasi selengkapnya, lihat Menetapkan rentang penskalaan otomatis di halaman Penskalaan Otomatis Horizontal.

Untuk mengetahui petunjuk yang menjelaskan cara melakukan update tugas saat sedang berjalan, lihat Memperbarui pipeline yang ada.

Pembuatan atau pembaruan otomatis (upsert) untuk template

Saat meluncurkan pipeline menggunakan template (Template Klasik, Template Flex, Terraform, atau Config Connector), Anda dapat menggunakan create_or_update_jobeksperimen untuk menggunakan fungsi buat atau perbarui (upsert).

Saat Anda menentukan create_or_update_job dalam parameter additional_experiments atau flag additional-experiments:

  • Jika tugas yang sedang berjalan atau menguras dengan nama tugas yang ditentukan sudah ada, layanan template akan otomatis meluncurkan tugas baru sebagai update untuk tugas yang ada.
  • Jika tidak ada tugas aktif dengan nama tersebut, layanan template akan meluncurkan tugas baru sebagai pembuatan tugas baru.

Eksperimen ini menghilangkan kebutuhan untuk menentukan secara terprogram apakah akan menggunakan tindakan API buat atau update saat meluncurkan template.

Untuk contoh kode Terraform dan Config Connector yang menggunakan eksperimen ini, lihat bagian berikut:

Meluncurkan tugas penggantian

Jika tugas yang diperbarui kompatibel dengan tugas yang ada, Anda dapat memperbarui pipeline menggunakan opsi update. Saat Anda mengganti tugas yang ada, tugas baru akan menjalankan kode pipeline yang diperbarui. Layanan Dataflow mempertahankan nama tugas, tetapi menjalankan tugas penggantian dengan ID pekerjaan yang diperbarui. Proses ini dapat menyebabkan periode nonaktif saat tugas yang ada berhenti, pemeriksaan kompatibilitas berjalan, dan tugas baru dimulai. Untuk mengetahui detail selengkapnya, lihat Efek mengganti tugas.

Dataflow melakukan pemeriksaan kompatibilitas untuk memastikan kode pipeline yang diupdate dapat di-deploy dengan aman ke pipeline yang sedang berjalan. Perubahan kode tertentu menyebabkan pemeriksaan kompatibilitas gagal, seperti saat input samping ditambahkan ke atau dihapus dari langkah yang ada. Jika pemeriksaan kompatibilitas gagal, Anda tidak dapat melakukan update tugas di tempat.

Untuk mengetahui petunjuk yang menjelaskan cara meluncurkan tugas penggantian, lihat Meluncurkan tugas penggantian.

Jika update pipeline tidak kompatibel dengan tugas saat ini, Anda harus menghentikan dan mengganti pipeline. Jika pipeline Anda tidak dapat mentoleransi periode nonaktif, jalankan pipeline paralel.

Penghentian dan penggantian manual

Untuk melakukan penghentian dan penggantian secara manual, batalkan atau hentikan pipeline, lalu ganti dengan pipeline yang telah diupdate. Membatalkan pipeline akan menyebabkan Dataflow segera menghentikan pemrosesan dan mematikan resource secepat mungkin, yang dapat menyebabkan hilangnya beberapa data yang sedang diproses, yang dikenal sebagai data dalam proses. Untuk menghindari kehilangan data, dalam sebagian besar kasus, penghentian adalah tindakan yang lebih disukai. Anda juga dapat menggunakan snapshot Dataflow untuk menyimpan status pipeline streaming, yang memungkinkan Anda memulai versi baru tugas Dataflow tanpa kehilangan status. Untuk mengetahui informasi selengkapnya, lihat Menggunakan snapshot Dataflow.

Menguras pipeline akan segera menutup jendela yang sedang diproses dan memicu semua pemicu. Meskipun data yang sedang diproses tidak hilang, pengurasan dapat menyebabkan jendela memiliki data yang tidak lengkap. Jika hal ini terjadi, jendela yang sedang diproses akan mengeluarkan hasil yang sebagian atau tidak lengkap. Untuk mengetahui informasi selengkapnya, lihat Efek pengurasan tugas. Setelah tugas yang ada selesai, luncurkan tugas streaming baru yang berisi kode pipeline yang telah diupdate, yang memungkinkan Anda melanjutkan pemrosesan.

Dengan metode ini, Anda akan mengalami waktu non-operasional antara saat tugas streaming yang ada berhenti dan saat pipeline pengganti siap melanjutkan pemrosesan data. Namun, membatalkan atau menguras pipeline yang ada, lalu meluncurkan tugas baru dengan pipeline yang diperbarui lebih mudah daripada menjalankan pipeline paralel.

Untuk mengetahui informasi selengkapnya, lihat Menguras tugas Dataflow. Setelah menguras tugas saat ini, mulai tugas baru dengan nama tugas yang sama.

Penghentian dan penggantian otomatis

Dataflow menyediakan dukungan API untuk meluncurkan update otomatis berhenti dan ganti. Alur kerja gaya deklaratif ini menghilangkan langkah-langkah prosedural manual. Anda mendeklarasikan tugas yang akan diganti, dan tugas baru akan diluncurkan serta mengoordinasikan transisi secara otomatis.

Saat Anda menggunakan alur kerja ini, resource tugas baru akan disediakan saat tugas lama masih berjalan. Kemudian, tugas lama akan otomatis menerima sinyal pengurasan. Setelah tugas lama selesai menguras atau mencapai waktu tunggu yang ditentukan pengguna, tugas baru akan segera mulai memproses data. Gunakan alur kerja ini untuk pipeline yang tidak dapat mentoleransi data duplikat atau agregasi parsial, tetapi dapat menerima jeda pemrosesan singkat saat tugas lama dihentikan.

Mengirim permintaan update otomatis hentikan dan ganti

Untuk menggunakan alur kerja ini:

  • Anda harus menetapkan opsi parallel_replace_job_max_stop_duration.
  • Anda tidak boleh menyetel opsi parallel_replace_job_min_parallel_pipelines_duration. Menetapkan durasi paralel memicu alur kerja pembaruan pipeline paralel otomatis sebagai gantinya.

Luncurkan permintaan update otomatis hentikan dan ganti menggunakan opsi layanan berikut:

Java

Opsi 1: Perbarui menggunakan nama tugas yang sama

--update \
--dataflowServiceOptions="update_strategy_parallel_job_update" \
--dataflowServiceOptions="parallel_replace_job_max_stop_duration=DURATION"
  • Untuk melakukan update otomatis hentikan dan ganti menggunakan nama yang sama, gunakan flag --update dan opsi update_strategy_parallel_job_update.
  • Untuk melakukan update langsung, gunakan update_strategy_in_place_update.

Opsi 2: Perbarui menggunakan nama tugas yang berbeda

--dataflowServiceOptions="parallel_replace_job_name=OLD_JOB_NAME" \
--dataflowServiceOptions="parallel_replace_job_max_stop_duration=DURATION"
  • Untuk menentukan tugas lama menurut ID, bukan nama tugas, gunakan --dataflowServiceOptions="parallel_replace_job_id=OLD_JOB_ID".
  • Jika Anda menentukan nama tugas baru dan menggunakan tanda --update, Dataflow akan menelusuri tugas yang ada dengan nama baru, yang menyebabkan error.

Opsional: Menonaktifkan pembatalan otomatis

Pembatalan otomatis diaktifkan secara default saat Anda menentukan opsi parallel_replace_job_max_stop_duration. Untuk menonaktifkan pembatalan otomatis, tetapkan opsi parallel_replace_job_cancel_on_drain_timeout ke false.

--dataflowServiceOptions="parallel_replace_job_cancel_on_drain_timeout=false"

Jika Anda menonaktifkan pembatalan otomatis dan tugas lama macet dalam status pengurasan, tugas lama dan baru akan terus berjalan secara paralel.

Python

Opsi 1: Perbarui menggunakan nama tugas yang sama

--update \
--dataflow_service_options="update_strategy_parallel_job_update" \
--dataflow_service_options="parallel_replace_job_max_stop_duration=DURATION"
  • Untuk melakukan update otomatis hentikan dan ganti menggunakan nama yang sama, gunakan flag --update dan opsi update_strategy_parallel_job_update.
  • Untuk melakukan update langsung, gunakan update_strategy_in_place_update.

Opsi 2: Perbarui menggunakan nama tugas yang berbeda

--dataflow_service_options="parallel_replace_job_name=OLD_JOB_NAME" \
--dataflow_service_options="parallel_replace_job_max_stop_duration=DURATION"
  • Untuk menentukan tugas lama menurut ID, bukan nama tugas, gunakan --dataflow_service_options="parallel_replace_job_id=OLD_JOB_ID".
  • Jika Anda menentukan nama tugas baru dan menggunakan tanda --update, Dataflow akan menelusuri tugas yang ada dengan nama baru, yang menyebabkan error.

Opsional: Menonaktifkan pembatalan otomatis

Pembatalan otomatis diaktifkan secara default saat Anda menentukan opsi parallel_replace_job_max_stop_duration. Untuk menonaktifkan pembatalan otomatis, tetapkan opsi parallel_replace_job_cancel_on_drain_timeout ke false.

--dataflow_service_options="parallel_replace_job_cancel_on_drain_timeout=false"

Jika Anda menonaktifkan pembatalan otomatis dan tugas lama macet dalam status pengurasan, tugas lama dan baru akan terus berjalan secara paralel.

Go

Opsi 1: Perbarui menggunakan nama tugas yang sama

--update \
--dataflow_service_options="update_strategy_parallel_job_update" \
--dataflow_service_options="parallel_replace_job_max_stop_duration=DURATION"
  • Untuk melakukan update otomatis hentikan dan ganti menggunakan nama yang sama, gunakan flag --update dan opsi update_strategy_parallel_job_update.
  • Untuk melakukan update langsung, gunakan update_strategy_in_place_update.

Opsi 2: Perbarui menggunakan nama tugas yang berbeda

--dataflow_service_options="parallel_replace_job_name=OLD_JOB_NAME" \
--dataflow_service_options="parallel_replace_job_max_stop_duration=DURATION"
  • Untuk menentukan tugas lama menurut ID, bukan nama tugas, gunakan --dataflow_service_options="parallel_replace_job_id=OLD_JOB_ID".
  • Jika Anda menentukan nama tugas baru dan menggunakan tanda --update, Dataflow akan menelusuri tugas yang ada dengan nama baru, yang menyebabkan error.

Opsional: Menonaktifkan pembatalan otomatis

Pembatalan otomatis diaktifkan secara default saat Anda menentukan opsi parallel_replace_job_max_stop_duration. Untuk menonaktifkan pembatalan otomatis, tetapkan opsi parallel_replace_job_cancel_on_drain_timeout ke false.

--dataflow_service_options="parallel_replace_job_cancel_on_drain_timeout=false"

Jika Anda menonaktifkan pembatalan otomatis dan tugas lama macet dalam status pengurasan, tugas lama dan baru akan terus berjalan secara paralel.

gcloud

Opsi 1: Perbarui menggunakan nama tugas yang sama

--update \
--additional-experiments="update_strategy_parallel_job_update" \
--additional-experiments="parallel_replace_job_max_stop_duration=DURATION"
  • Untuk melakukan update otomatis hentikan dan ganti menggunakan nama yang sama, gunakan flag --update dan opsi update_strategy_parallel_job_update.
  • Untuk melakukan update langsung, gunakan update_strategy_in_place_update.

Opsi 2: Perbarui menggunakan nama tugas yang berbeda

--additional-experiments="parallel_replace_job_name=OLD_JOB_NAME" \
--additional-experiments="parallel_replace_job_max_stop_duration=DURATION"
  • Untuk menentukan tugas lama menurut ID, bukan nama tugas, gunakan --additional-experiments="parallel_replace_job_id=OLD_JOB_ID".
  • Jika Anda menentukan nama tugas baru dan menggunakan tanda --update, Dataflow akan menelusuri tugas yang ada dengan nama baru, yang menyebabkan error.

Opsional: Menonaktifkan pembatalan otomatis

Pembatalan otomatis diaktifkan secara default saat Anda menentukan opsi parallel_replace_job_max_stop_duration. Untuk menonaktifkan pembatalan otomatis, tetapkan opsi parallel_replace_job_cancel_on_drain_timeout ke false.

--additional-experiments="parallel_replace_job_cancel_on_drain_timeout=false"

Jika Anda menonaktifkan pembatalan otomatis dan tugas lama macet dalam status pengurasan, tugas lama dan baru akan terus berjalan secara paralel.

Opsional: Upsert (membuat atau memperbarui tugas)

Untuk mengaktifkan perilaku upsert (membuat atau memperbarui tugas):

--additional-experiments="create_or_update_job"

Terraform

additional_experiments = [
  "parallel_replace_job_max_stop_duration=DURATION",
  "parallel_replace_job_cancel_on_drain_timeout=true",
  "update_strategy_parallel_job_update",
  "parallel_replace_job_preallocate_compute_resources=true",
  "create_or_update_job"
]

Config Connector

metadata:
  annotations:
    # Force to use only direct controller. Multiple controllers can cause a Dataflow job to enter into a continuous update loop.
    # https://docs.cloud.google.com/config-connector/docs/concepts/controller-types#underlying-controller-types
    alpha.cnrm.cloud.google.com/reconciler: direct
    # Optional but recommended: Dataflow batch jobs do not support the drain operation. But for Dataflow streaming jobs, prefer "drain" over "cancel" as an on-delete option.
    cnrm.cloud.google.com/on-delete: drain
    # Optional but recommended: Set deletion-policy to "abandon" to avoid accidental deletion, this will ignore the on-delete option.
    cnrm.cloud.google.com/deletion-policy: abandon
spec:
  ...
  additionalExperiments:
    - "parallel_replace_job_max_stop_duration=DURATION"
    - "parallel_replace_job_cancel_on_drain_timeout=true"
    - "update_strategy_parallel_job_update"
    - "parallel_replace_job_preallocate_compute_resources=true"
    - "create_or_update_job"

Ganti variabel berikut:

  • Anda harus memberikan parallel_replace_job_name atau parallel_replace_job_id untuk mengidentifikasi tugas yang akan diganti:
    • OLD_JOB_NAME: Nama tugas yang akan diganti.
    • OLD_JOB_ID: ID tugas yang akan diganti.
  • Anda harus memberikan nilai parallel_replace_job_max_stop_duration untuk mengaktifkan penghentian dan penggantian otomatis:
    • DURATION: Jumlah waktu maksimum tugas baru menunggu hingga tugas lama selesai dikuras. Durasi harus diformat sebagai string yang diakhiri dengan s, m, atau h (misalnya, 30m, 1h).
  • Jangan tetapkan opsi parallel_replace_job_min_parallel_pipelines_duration saat menggunakan alur kerja ini. Menetapkan opsi ini akan memicu alur kerja pembaruan pipeline paralel otomatis, bukan pembaruan pipeline manual.
  • Opsional: Konfigurasi opsi parallel_replace_job_cancel_on_drain_timeout. Karena pembatalan otomatis diaktifkan (defaultnya true) secara default saat opsi parallel_replace_job_max_stop_duration ditetapkan, Anda tidak perlu mengonfigurasi opsi ini secara eksplisit untuk mengaktifkannya.
    • Untuk mempertahankan perilaku default, hilangkan opsi ini atau tetapkan ke true.
    • Untuk menonaktifkan pembatalan otomatis, setel opsi ini ke false. Jika Anda menyetel opsi ini ke false dan tugas lama macet dalam status pengurasan, tugas lama dan baru akan terus berjalan secara paralel.
  • Opsional: Tentukan konfigurasi parallel_replace_job_preallocate_compute_resources:
    • Menentukan apakah pekerja disediakan terlebih dahulu untuk tugas baru saat tugas lama dihentikan. Nilai: true (default) atau false. Untuk Terraform dan Config Connector, sebaiknya setel opsi ini ke true untuk mencegah waktu tunggu penyediaan resource habis. Jika parallel_replace_job_preallocate_compute_resources disetel ke false, tugas baru akan tetap dalam status tertunda hingga tugas lama selesai.

Pemrosesan ulang pesan dengan Snapshot dan Pencarian Pub/Sub

Dalam beberapa situasi, setelah mengganti atau membatalkan pipeline yang habis, Anda mungkin perlu memproses ulang pesan Pub/Sub yang sebelumnya dikirim. Misalnya, Anda mungkin perlu menggunakan logika bisnis yang diperbarui untuk memproses ulang data. Pub/Sub Seek adalah fitur yang memungkinkan Anda memutar ulang pesan dari snapshot Pub/Sub. Anda dapat menggunakan Pub/Sub Seek dengan Dataflow untuk memproses ulang pesan dari saat snapshot langganan dibuat.

Selama pengembangan dan pengujian, Anda juga dapat menggunakan Pub/Sub Seek untuk memutar ulang pesan yang diketahui berulang kali guna memverifikasi output dari pipeline Anda. Saat menggunakan Pub/Sub Seek, jangan mencari snapshot langganan saat langganan sedang digunakan oleh pipeline. Jika Anda melakukannya, pencarian dapat membatalkan logika watermark Dataflow dan dapat memengaruhi pemrosesan pesan Pub/Sub tepat satu kali.

Alur kerja gcloud CLI yang direkomendasikan untuk menggunakan Pub/Sub Seek dengan pipeline Dataflow di jendela terminal adalah sebagai berikut:

  1. Untuk membuat snapshot langganan, gunakan perintah gcloud pubsub snapshots create:

    gcloud pubsub snapshots create SNAPSHOT_NAME --subscription=PIPELINE_SUBSCRIPTION_NAME
    
  2. Untuk menghentikan atau membatalkan pipeline, gunakan perintah gcloud dataflow jobs drain atau perintah gcloud dataflow jobs cancel:

    gcloud dataflow jobs drain JOB_ID
    

    atau

    gcloud dataflow jobs cancel JOB_ID
    
  3. Untuk mencari snapshot, gunakan perintah gcloud pubsub subscriptions seek:

    gcloud pubsub subscriptions seek SNAPSHOT_NAME
    
  4. Deploy pipeline baru yang menggunakan langganan.

Menjalankan pipeline paralel

Jika perlu menghindari gangguan pada pipeline streaming selama update, Anda dapat menjalankan pipeline paralel. Dengan pendekatan ini, Anda dapat meluncurkan tugas streaming baru dengan kode pipeline yang telah diupdate dan menjalankannya secara paralel dengan tugas yang ada. Anda dapat menggunakan alur kerja deployment update pipeline paralel otomatis Dataflow, atau melakukan langkah-langkahnya secara manual.

Ringkasan pipeline paralel

Saat membuat pipeline baru, gunakan strategi windowing yang sama dengan yang Anda gunakan untuk pipeline yang ada. Untuk alur kerja manual, biarkan pipeline yang ada terus berjalan hingga tanda airnya melampaui stempel waktu jendela lengkap paling awal yang diproses oleh pipeline yang diperbarui. Kemudian, hentikan atau batalkan pipeline yang ada. Jika menggunakan alur kerja otomatis, pekerjaan ini akan dilakukan untuk Anda. Pipeline yang diperbarui akan terus berjalan di tempatnya dan secara efektif mengambil alih pemrosesan dengan sendirinya.

Diagram berikut menggambarkan proses ini.

Pipeline B tumpang-tindih dengan Pipeline B selama periode 5 menit.

Dalam diagram, Pipeline B adalah tugas yang diperbarui yang mengambil alih dari Pipeline A. Nilai t adalah stempel waktu jendela lengkap paling awal yang diproses oleh Pipeline B. Nilai w adalah tanda air untuk Pipeline A. Untuk mempermudah, tanda air yang sempurna diasumsikan tanpa data terlambat. Waktu pemrosesan dan waktu nyata ditampilkan pada sumbu horizontal. Kedua pipeline menggunakan jendela tetap (berjatuhan) lima menit. Hasil dipicu setelah tanda air melewati akhir setiap jendela.

Karena output serentak terjadi selama periode waktu saat kedua pipeline tumpang-tindih, konfigurasi kedua pipeline untuk menulis hasil ke tujuan yang berbeda. Sistem hilir kemudian dapat menggunakan abstraksi pada dua sink tujuan, seperti tampilan database, untuk membuat kueri hasil gabungan. Sistem ini juga dapat menggunakan abstraksi untuk menghapus duplikat hasil dari periode yang tumpang-tindih. Untuk mengetahui informasi selengkapnya, lihat Menangani output duplikat.

Batasan

Penggunaan update pipeline paralel otomatis atau manual memiliki batasan berikut:

  • Update otomatis saja: Tugas paralel baru harus berupa tugas Streaming Engine.
  • Tugas serentak dengan nama yang sama tidak diizinkan. Namun, saat melakukan update pipeline paralel atau penggantian otomatis stop-and-replace menggunakan nama tugas yang sama, Anda dapat menggunakan kembali nama tugas tersebut. Dalam hal ini, tugas baru harus dimulai setidaknya dua menit setelah dimulainya tugas sebelumnya. Pembatasan ini mencegah beberapa pembaruan paralel dari percobaan ulang library klien yang berulang atau panggilan prosedur jarak jauh yang sudah tidak berlaku.
  • Menjalankan dua pipeline secara paralel pada input yang sama dapat menyebabkan duplikasi data, agregasi parsial, dan potensi masalah pengurutan saat data dimasukkan ke dalam sink. Sistem hilir harus dirancang untuk mengantisipasi dan mengelola hasil ini.
  • Saat membaca dari sumber Pub/Sub, menggunakan langganan yang sama untuk beberapa pipeline tidak direkomendasikan dan dapat menyebabkan masalah kebenaran. Namun, dalam beberapa kasus penggunaan, seperti pipeline ekstrak, transformasi, muat (ETL), penggunaan langganan yang sama di dua pipeline dapat mengurangi duplikasi. Masalah penskalaan otomatis kemungkinan terjadi setiap kali Anda memberikan nilai bukan nol untuk durasi yang tumpang-tindih. Hal ini dapat diatasi dengan menggunakan fitur pembaruan tugas saat beroperasi. Untuk mengetahui informasi selengkapnya, lihat Menyesuaikan penskalaan otomatis untuk pipeline streaming Pub/Sub.
  • Untuk Apache Kafka, Anda dapat meminimalkan duplikat dengan mengaktifkan penerapan offset di Kafka. Untuk mengaktifkan penerapan offset di Kafka, lihat Menerapkan kembali ke Kafka.

Update pipeline paralel otomatis

Dataflow menyediakan dukungan API untuk meluncurkan tugas penggantian paralel. API gaya deklaratif ini mengabstraksi pekerjaan manual dalam menjalankan langkah-langkah prosedural. Anda mendeklarasikan tugas yang ingin diperbarui, lalu tugas baru berjalan secara paralel dengan tugas lama. Setelah tugas baru berjalan selama durasi yang Anda tentukan, tugas lama akan dihentikan. Fitur ini menghilangkan jeda pemrosesan selama update. Hal ini juga mengurangi upaya operasional yang diperlukan untuk memperbarui pipeline yang tidak kompatibel.

Metode pembaruan ini paling cocok untuk pipeline yang dapat mentoleransi beberapa duplikat atau agregasi parsial dan tidak memerlukan pengurutan yang ketat saat memasukkan data. Fitur ini sangat cocok untuk pipeline ETL, serta pipeline yang menggunakan mode streaming minimal sekali dan transformasi Redistribute dengan setelan izinkan duplikat ditetapkan ke true.

Opsi layanan pipeline paralel otomatis

Gunakan opsi layanan berikut untuk update pipeline paralel otomatis:

Opsi layanan Opsional atau wajib Deskripsi Dependensi atau pengecualian
update_strategy_parallel_job_update Wajib (Opsi 1: Perbarui menggunakan nama tugas yang sama) Perintah untuk melakukan update paralel, yang menjalankan kedua pipeline secara bersamaan untuk meminimalkan waktu nonaktif, saat melakukan update dengan nama tugas yang sama. Harus ditetapkan bersama dengan tanda --update dan parallel_replace_job_min_parallel_pipelines_duration.
update_strategy_in_place_update Opsional Alternatif untuk update paralel. Melakukan update tugas di tempat standar. Harus ditetapkan bersama dengan tanda --update.

Tidak dapat muncul bersamaan dengan update_strategy_parallel_job_update.

Jika opsi ini disetel, opsi lain yang terkait dengan tugas paralel akan diabaikan.

parallel_replace_job_min_parallel_pipelines_duration Wajib Menentukan durasi minimum kedua pipeline berjalan secara bersamaan. Setelah durasi ini berlalu, sinyal pengurasan akan dikirim ke tugas lama. Nilai yang dapat diterima berkisar dari 0s (direkomendasikan untuk nol tumpang-tindih) hingga 744h (31 hari). Harus dipasangkan dengan cara menargetkan tugas lama. Salah satu dari berikut ini:
  • Opsi 1 - menggunakan nama tugas yang sama: update_strategy_parallel_job_update, atau
  • Opsi 2 - menggunakan nama tugas yang berbeda: parallel_replace_job_name (atau alternatifnya parallel_replace_job_id untuk mengidentifikasi tugas)
parallel_replace_job_name atau parallel_replace_job_id (pilih salah satu) Wajib (Opsi 2: Perbarui menggunakan nama tugas yang berbeda) Mengidentifikasi tugas lama berdasarkan nama atau ID yang akan diganti selama pembaruan nama yang berbeda. Memerlukan parallel_replace_job_min_parallel_pipelines_duration untuk ditetapkan.

Jangan gunakan flag --update atau parallel_replace_job_id dengan opsi ini.

parallel_replace_job_max_stop_duration Opsional Durasi maksimum tugas lama diizinkan untuk dikosongkan sebelum pembatalan otomatis dipicu. Misalnya, 30m atau 1h. Memerlukan penetapan alur kerja update paralel (Opsi 1 atau Opsi 2).
parallel_replace_job_cancel_on_drain_timeout Opsional

Nilai defaultnya adalah true jika durasi berhenti maksimum ditetapkan.

Opsi boolean yang menentukan apakah tugas lama harus dibatalkan jika durasi pengurasannya melebihi parallel_replace_job_max_stop_duration. Digunakan bersama dengan parallel_replace_job_max_stop_duration.

Setel ke false untuk menonaktifkan pembatalan otomatis. Jika Anda menonaktifkan pembatalan otomatis dan tugas lama macet dalam status pengurasan, tugas lama dan baru akan terus berjalan secara paralel.

Mengirim permintaan pembaruan pipeline paralel otomatis

Untuk menggunakan alur kerja otomatis, luncurkan tugas streaming baru. Anda dapat memperbarui tugas menggunakan nama tugas yang sama atau nama tugas yang berbeda.

Java

Opsi 1: Perbarui menggunakan nama tugas yang sama

--update \
--dataflowServiceOptions="update_strategy_parallel_job_update" \
--dataflowServiceOptions="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
  • Untuk melakukan update paralel menggunakan nama yang sama, gunakan flag --update dan opsi update_strategy_parallel_job_update.
  • Untuk melakukan update di tempat tanpa menghapus opsi yang terkait dengan tugas paralel, gunakan update_strategy_in_place_update, bukan update_strategy_parallel_job_update.

Opsi 2: Perbarui menggunakan nama tugas yang berbeda

--dataflowServiceOptions="parallel_replace_job_name=OLD_JOB_NAME" \
--dataflowServiceOptions="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
  • Untuk menentukan tugas lama menurut ID, bukan nama tugas, gunakan --dataflowServiceOptions="parallel_replace_job_id=OLD_JOB_ID".
  • Jika Anda menentukan nama tugas baru dan menggunakan tanda --update, Dataflow akan menelusuri tugas yang ada dengan nama baru, yang menyebabkan error.

Opsional: Mengonfigurasi waktu tunggu pengurasan dan pembatalan otomatis

Anda dapat menambahkan opsi berikut ke salah satu konfigurasi untuk menetapkan waktu tunggu pengurasan dan membatalkan tugas lama secara otomatis jika macet.

--dataflowServiceOptions="parallel_replace_job_max_stop_duration=DRAIN_TIMEOUT_DURATION" \
--dataflowServiceOptions="parallel_replace_job_cancel_on_drain_timeout=true"

Jika Anda menonaktifkan pembatalan otomatis dan tugas lama macet dalam status pengurasan, tugas lama dan baru akan terus berjalan secara paralel.

Python

Opsi 1: Perbarui menggunakan nama tugas yang sama

--update \
--dataflow_service_options="update_strategy_parallel_job_update" \
--dataflow_service_options="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
  • Untuk melakukan update paralel menggunakan nama yang sama, gunakan flag --update dan opsi update_strategy_parallel_job_update.
  • Untuk melakukan update di tempat tanpa menghapus opsi yang terkait dengan tugas paralel, gunakan update_strategy_in_place_update, bukan update_strategy_parallel_job_update.

Opsi 2: Perbarui menggunakan nama tugas yang berbeda

--dataflow_service_options="parallel_replace_job_name=OLD_JOB_NAME" \
--dataflow_service_options="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
  • Untuk menentukan tugas lama menurut ID, bukan nama tugas, gunakan --dataflow_service_options="parallel_replace_job_id=OLD_JOB_ID".
  • Jika Anda menentukan nama tugas baru dan menggunakan tanda --update, Dataflow akan menelusuri tugas yang ada dengan nama baru, yang menyebabkan error.

Opsional: Mengonfigurasi waktu tunggu pengurasan dan pembatalan otomatis

Anda dapat menambahkan opsi berikut ke salah satu konfigurasi untuk menetapkan waktu tunggu pengurasan dan membatalkan tugas lama secara otomatis jika macet.

--dataflow_service_options="parallel_replace_job_max_stop_duration=DRAIN_TIMEOUT_DURATION" \
--dataflow_service_options="parallel_replace_job_cancel_on_drain_timeout=true"

Jika Anda menonaktifkan pembatalan otomatis dan tugas lama macet dalam status pengurasan, tugas lama dan baru akan terus berjalan secara paralel.

Go

Opsi 1: Perbarui menggunakan nama tugas yang sama

--update \
--dataflow_service_options="update_strategy_parallel_job_update" \
--dataflow_service_options="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
  • Untuk melakukan update paralel menggunakan nama yang sama, gunakan flag --update dan opsi update_strategy_parallel_job_update.
  • Untuk melakukan update di tempat tanpa menghapus opsi yang terkait dengan tugas paralel, gunakan update_strategy_in_place_update, bukan update_strategy_parallel_job_update.

Opsi 2: Perbarui menggunakan nama tugas yang berbeda

--dataflow_service_options="parallel_replace_job_name=OLD_JOB_NAME" \
--dataflow_service_options="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
  • Untuk menentukan tugas lama menurut ID, bukan nama tugas, gunakan --dataflow_service_options="parallel_replace_job_id=OLD_JOB_ID".
  • Jika Anda menentukan nama tugas baru dan menggunakan tanda --update, Dataflow akan menelusuri tugas yang ada dengan nama baru, yang menyebabkan error.

Opsional: Mengonfigurasi waktu tunggu pengurasan dan pembatalan otomatis

Anda dapat menambahkan opsi berikut ke salah satu konfigurasi untuk menetapkan waktu tunggu pengurasan dan membatalkan tugas lama secara otomatis jika macet.

--dataflow_service_options="parallel_replace_job_max_stop_duration=DRAIN_TIMEOUT_DURATION" \
--dataflow_service_options="parallel_replace_job_cancel_on_drain_timeout=true"

Jika Anda menonaktifkan pembatalan otomatis dan tugas lama macet dalam status pengurasan, tugas lama dan baru akan terus berjalan secara paralel.

gcloud

Opsi 1: Perbarui menggunakan nama tugas yang sama

--update \
--additional-experiments="update_strategy_parallel_job_update" \
--additional-experiments="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
  • Untuk melakukan update paralel menggunakan nama yang sama, gunakan flag --update dan opsi update_strategy_parallel_job_update.
  • Untuk melakukan update di tempat tanpa menghapus opsi yang terkait dengan tugas paralel, gunakan update_strategy_in_place_update, bukan update_strategy_parallel_job_update.

Opsi 2: Perbarui menggunakan nama tugas yang berbeda

--additional-experiments="parallel_replace_job_name=OLD_JOB_NAME" \
--additional-experiments="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
  • Untuk menentukan tugas lama menurut ID, bukan nama tugas, gunakan --additional-experiments="parallel_replace_job_id=OLD_JOB_ID".
  • Jika Anda menentukan nama tugas baru dan menggunakan tanda --update, Dataflow akan menelusuri tugas yang ada dengan nama baru, yang menyebabkan error.

Opsional: Mengonfigurasi waktu tunggu pengurasan dan pembatalan otomatis

Anda dapat menambahkan opsi berikut ke salah satu konfigurasi untuk menetapkan waktu tunggu pengurasan dan membatalkan tugas lama secara otomatis jika macet.

--additional-experiments="parallel_replace_job_max_stop_duration=DRAIN_TIMEOUT_DURATION" \
--additional-experiments="parallel_replace_job_cancel_on_drain_timeout=true"

Jika Anda menonaktifkan pembatalan otomatis dan tugas lama macet dalam status pengurasan, tugas lama dan baru akan terus berjalan secara paralel.

Opsional: Upsert (membuat atau memperbarui tugas)

Untuk mengaktifkan perilaku upsert (membuat atau memperbarui tugas):

--additional-experiments="create_or_update_job"

Terraform

additional_experiments = [
  "parallel_replace_job_min_parallel_pipelines_duration=DURATION",
  "parallel_replace_job_max_stop_duration=DRAIN_TIMEOUT_DURATION",
  "update_strategy_parallel_job_update",
  "create_or_update_job"
]

Config Connector

metadata:
  annotations:
    # Force to use only direct controller. Multiple controllers can cause a Dataflow job to enter into a continuous update loop.
    # https://docs.cloud.google.com/config-connector/docs/concepts/controller-types#underlying-controller-types
    alpha.cnrm.cloud.google.com/reconciler: direct
    # Optional but recommended: Dataflow batch jobs do not support the drain operation. But for Dataflow streaming jobs, prefer "drain" over "cancel" as an on-delete option.
    cnrm.cloud.google.com/on-delete: drain
    # Optional but recommended: Set deletion-policy to "abandon" to avoid accidental deletion, this will ignore the on-delete option.
    cnrm.cloud.google.com/deletion-policy: abandon
spec:
  ...
  additionalExperiments:
    - "parallel_replace_job_min_parallel_pipelines_duration=DURATION"
    - "parallel_replace_job_max_stop_duration=DRAIN_TIMEOUT_DURATION"
    - "update_strategy_parallel_job_update"
    - "create_or_update_job"

Ganti variabel berikut:

  • Jika Anda melakukan update menggunakan nama tugas yang berbeda (Opsi 2), Anda harus memberikan parallel_replace_job_name atau parallel_replace_job_id untuk mengidentifikasi tugas yang akan diganti. Update menggunakan nama tugas yang berbeda tidak didukung untuk Terraform atau Config Connector.
    • OLD_JOB_NAME: Nama tugas yang akan diganti.
    • OLD_JOB_ID: ID tugas yang akan diganti.
  • DURATION: Jumlah waktu minimum kedua pipeline berjalan secara paralel sebagai bilangan bulat atau bilangan floating point. Durasi 0s direkomendasikan untuk tanpa tumpang-tindih. Setelah durasi ini berlalu, tugas lama akan dikirim sinyal penghentian.

    Durasi harus antara 0 detik (0s) dan 31 hari (744h). Gunakan s, m, dan h untuk menentukan detik, menit, dan jam. Misalnya, 10m adalah 10 menit.

  • DRAIN_TIMEOUT_DURATION: Opsional. Durasi maksimum tugas lama harus dikosongkan sebelum pembatalan otomatis dipicu. Durasi harus diformat sebagai string yang diakhiri dengan s, m, atau h (misalnya, 30m, 1h).

  • parallel_replace_job_cancel_on_drain_timeout: Opsional. Apakah akan membatalkan tugas sebelumnya jika tidak selesai menguras daya sebelum durasi penghentian maksimum. Nilai defaultnya adalah true jika durasi waktu tunggu pengurasan disediakan. Untuk menonaktifkan pembatalan otomatis, setel opsi ini ke false. Jika Anda menyetel opsi ini ke false dan tugas lama macet dalam status pengurasan, tugas lama dan baru akan terus berjalan secara paralel.

Saat Anda meluncurkan tugas baru, Dataflow akan menunggu semua pekerja disediakan sebelum mulai memproses data. Untuk memantau status deployment, periksa log tugas Dataflow.

Menjalankan pipeline paralel secara manual

Untuk skenario yang lebih kompleks, atau saat Anda memerlukan kontrol yang lebih besar atas proses update, Anda dapat menjalankan pipeline paralel secara manual. Biarkan pipeline yang ada terus berjalan hingga tanda airnya melampaui stempel waktu jendela lengkap paling awal yang diproses oleh pipeline yang diupdate. Kemudian, kosongkan atau batalkan pipeline yang ada.

Menangani output duplikat

Contoh berikut menjelaskan satu pendekatan untuk menangani output duplikat. Kedua pipeline menulis output ke tujuan yang berbeda, menggunakan sistem hilir untuk membuat kueri hasil, dan menghapus duplikat hasil dari periode yang tumpang-tindih. Contoh ini menggunakan pipeline yang membaca data input dari Pub/Sub, melakukan beberapa pemrosesan, dan menulis hasilnya ke BigQuery.

  1. Pada status awal, pipeline streaming yang ada (Pipeline A) sedang berjalan dan membaca pesan dari topik Pub/Sub (Topic) menggunakan langganan (Subscription A). Hasilnya ditulis ke tabel BigQuery (Tabel A). Hasil digunakan melalui tampilan BigQuery, yang bertindak sebagai fasad untuk menyamarkan perubahan tabel pokok. Proses ini adalah penerapan metode desain yang disebut pola fasad. Diagram berikut menunjukkan status awal.

    Satu pipeline dengan satu langganan, dan menulis ke satu tabel BigQuery.

  2. Buat langganan baru (Subscription B) untuk pipeline yang diperbarui. Deploy pipeline yang telah diupdate (Pipeline B), yang membaca dari topik Pub/Sub (Topic) menggunakan Subscription B dan menulis ke tabel BigQuery terpisah (Table B). Diagram berikut menggambarkan alur ini.

    Dua pipeline, masing-masing dengan satu langganan. Setiap pipeline menulis ke tabel BigQuery yang terpisah. Tampilan fasad membaca dari kedua tabel.

    Pada tahap ini, Pipeline A dan Pipeline B berjalan secara paralel dan menulis hasil ke tabel terpisah. Anda mencatat waktu t sebagai stempel waktu jendela lengkap paling awal yang diproses oleh Pipeline B.

  3. Saat watermark Pipeline A melebihi waktu t, kosongkan Pipeline A. Saat Anda mengosongkan pipeline, semua jendela yang terbuka akan ditutup, dan pemrosesan untuk data dalam proses akan selesai. Jika pipeline berisi jendela dan jendela lengkap penting (dengan asumsi tidak ada data terlambat), sebelum mengosongkan Pipeline A, biarkan kedua pipeline berjalan hingga Anda memiliki jendela yang tumpang-tindih dan lengkap. Hentikan tugas streaming untuk Pipeline A setelah semua data dalam proses diproses dan ditulis ke Table A. Diagram berikut menunjukkan tahap ini.

    Pipeline A menguras dan tidak lagi membaca Langganan A, serta tidak lagi mengirim data ke Tabel A setelah pengurasan selesai. Semua pemrosesan ditangani oleh pipeline kedua.

  4. Pada tahap ini, hanya Pipeline B yang berjalan. Anda dapat membuat kueri dari tampilan BigQuery (Façade View), yang bertindak sebagai fasad untuk Table A dan Table B. Untuk baris yang memiliki stempel waktu yang sama di kedua tabel, konfigurasikan tampilan untuk menampilkan baris dari Tabel B, atau, jika baris tidak ada di Tabel B, kembali ke Tabel A. Diagram berikut menunjukkan tampilan (Façade View) yang membaca dari Table A dan Table B.

    Pipeline A sudah tidak ada, dan hanya Pipeline B yang berjalan.

    Pada tahap ini, Anda dapat menghapus Subscription A.

Jika masalah terdeteksi pada deployment pipeline baru, memiliki pipeline paralel dapat menyederhanakan rollback. Dalam contoh ini, Anda mungkin ingin menjalankan Pipeline A sambil memantau Pipeline B untuk mengetahui apakah operasi berjalan dengan benar. Jika ada masalah pada Pipeline B, Anda dapat melakukan rollback ke Pipeline A.

Menangani mutasi skema

Sistem penanganan data sering kali perlu mengakomodasi mutasi skema dari waktu ke waktu, terkadang karena perubahan persyaratan bisnis dan terkadang karena alasan teknis. Menerapkan update skema biasanya memerlukan perencanaan dan eksekusi yang cermat untuk menghindari gangguan pada sistem informasi bisnis.

Pertimbangkan pipeline yang membaca pesan yang berisi payload JSON dari topik Pub/Sub. Pipeline mengonversi setiap pesan menjadi instance TableRow lalu menulis baris ke tabel BigQuery. Skema tabel output mirip dengan pesan yang diproses oleh pipeline. Dalam diagram berikut, skema disebut sebagai Schema A.

Pipeline yang membaca langganan dan menulis ke tabel output BigQuery menggunakan Skema A.

Seiring waktu, skema pesan dapat berubah dengan cara yang tidak sepele. Misalnya, kolom ditambahkan, dihapus, atau diganti. Schema A berkembang menjadi skema baru. Dalam diskusi berikutnya, skema baru disebut sebagai Schema B. Dalam kasus ini, Pipeline A perlu diperbarui, dan skema tabel output harus mendukung Skema B.

Untuk tabel output, Anda dapat melakukan beberapa mutasi skema tanpa periode nonaktif. Misalnya, Anda dapat menambahkan kolom baru atau melonggarkan mode kolom, seperti mengubah REQUIRED menjadi NULLABLE, tanpa periode nonaktif. Mutasi ini biasanya tidak memengaruhi kueri yang ada. Namun, mutasi skema yang mengubah atau menghapus kolom skema yang ada akan merusak kueri atau menyebabkan gangguan lainnya. Pendekatan berikut mengakomodasi perubahan tanpa memerlukan periode nonaktif.

Pisahkan data yang ditulis oleh pipeline ke dalam tabel utama dan ke dalam satu atau beberapa tabel penyiapan. Tabel utama menyimpan data historis yang ditulis oleh pipeline. Tabel penyiapan menyimpan output pipeline terbaru. Anda dapat menentukan tampilan fasad BigQuery di atas tabel utama dan tabel penyiapan, yang memungkinkan konsumen membuat kueri data historis dan terbaru.

Diagram berikut merevisi alur pipeline sebelumnya untuk menyertakan tabel penyiapan (Staging Table A), tabel utama, dan tampilan fasad.

Pipeline yang membaca langganan dan menulis ke tabel staging BigQuery. Tabel kedua (utama) memiliki output dari versi skema sebelumnya. Tampilan fasad membaca dari tabel penyiapan dan tabel utama.

Dalam alur yang direvisi, Pipeline A memproses pesan yang menggunakan Schema A dan menulis output ke Staging Table A, yang memiliki skema yang kompatibel. Tabel utama berisi data historis yang ditulis oleh versi pipeline sebelumnya, serta hasil yang digabungkan secara berkala dari tabel penyiapan. Konsumen dapat membuat kueri data terbaru, termasuk data historis dan real-time, dengan menggunakan tampilan fasad.

Saat skema pesan berubah dari Skema A menjadi Skema B, Anda dapat memperbarui kode pipeline agar kompatibel dengan pesan yang menggunakan Skema B. Pipeline yang ada perlu diupdate dengan implementasi baru. Dengan menjalankan pipeline paralel, Anda dapat memastikan pemrosesan data streaming terus berjalan tanpa gangguan. Menghentikan dan mengganti pipeline akan menyebabkan gangguan dalam pemrosesan, karena tidak ada pipeline yang berjalan selama jangka waktu tertentu.

Pipeline yang diperbarui akan menulis ke tabel penyiapan tambahan (Tabel Penyiapan B) yang menggunakan Skema B. Anda dapat menggunakan alur kerja yang diatur untuk membuat tabel penyiapan baru sebelum memperbarui pipeline. Perbarui tampilan fasad untuk menyertakan hasil dari tabel penyiapan baru, yang berpotensi menggunakan langkah alur kerja terkait.

Diagram berikut menunjukkan alur yang diperbarui yang menampilkan Staging Table B dengan Schema B dan cara tampilan fasad diperbarui untuk menyertakan konten dari tabel utama dan dari kedua tabel penyiapan.

Pipeline kini menggunakan Skema B dan menulis ke Tabel Staging B. Tampilan fasad membaca dari Tabel utama, Tabel Staging A, dan Tabel Staging B.

Sebagai proses terpisah dari update pipeline, Anda dapat menggabungkan tabel penyiapan ke tabel utama, baik secara berkala maupun sesuai kebutuhan. Diagram berikut menunjukkan cara Staging Table A digabungkan ke dalam tabel utama.

Tabel Staging A digabungkan ke dalam tabel utama. Tampilan fasad membaca dari Tabel Penyiapan B dan dari tabel utama.

Langkah berikutnya