Halaman ini memberikan ringkasan tentang siklus proses pipeline dari kode pipeline hingga tugas Dataflow.
Halaman ini menjelaskan konsep berikut:
- Apa yang dimaksud dengan grafik eksekusi, dan bagaimana pipeline Apache Beam menjadi tugas Dataflow
- Cara Dataflow menangani error
- Cara Dataflow secara otomatis melakukan paralelisme dan mendistribusikan logika pemrosesan dalam pipeline Anda ke worker yang menjalankan tugas Anda
- Pengoptimalan tugas yang mungkin dilakukan Dataflow
Grafik eksekusi
Saat Anda menjalankan pipeline Dataflow, Dataflow akan membuat grafik eksekusi dari kode yang membuat objek Pipeline, termasuk semua transformasi dan fungsi pemrosesan terkait, seperti objek DoFn. Ini adalah grafik eksekusi pipeline, dan fase ini disebut
waktu pembuatan grafik.
Selama pembuatan grafik, Apache Beam akan menjalankan kode secara lokal dari titik entri utama kode pipeline, berhenti pada panggilan ke langkah sumber, sink, atau transformasi, dan mengubah panggilan ini menjadi node grafik.
Oleh karena itu, bagian kode di titik entri pipeline (metode main Java dan Go atau tingkat atas skrip Python) akan dijalankan secara lokal di mesin yang menjalankan pipeline. Kode yang sama yang dideklarasikan dalam metode objek DoFn akan dijalankan di worker Dataflow.
Misalnya, contoh WordCount yang disertakan dengan Apache Beam SDK, berisi serangkaian transformasi untuk membaca, mengekstrak, menghitung, memformat, dan menulis kata-kata individual dalam kumpulan teks, beserta jumlah kemunculan untuk setiap kata. Diagram berikut menunjukkan cara transformasi dalam pipeline WordCount diperluas menjadi grafik eksekusi:

Gambar 1: Contoh grafik eksekusi WordCount
Grafik eksekusi sering kali berbeda dengan urutan transformasi yang Anda tentukan saat membuat pipeline. Perbedaan ini ada karena layanan Dataflow melakukan berbagai pengoptimalan dan fusi pada grafik eksekusi sebelum dijalankan di resource cloud terkelola. Layanan Dataflow menghormati dependensi data saat menjalankan pipeline Anda. Namun, langkah-langkah tanpa dependensi data di antara langkah-langkah tersebut dapat dijalankan dalam urutan apa pun.
Untuk melihat grafik eksekusi yang tidak dioptimalkan yang telah dibuat Dataflow untuk pipeline Anda, pilih tugas Anda di antarmuka pemantauan Dataflow. Untuk mengetahui informasi selengkapnya tentang melihat tugas, lihat Menggunakan antarmuka pemantauan Dataflow.
Selama pembuatan grafik, Apache Beam memvalidasi bahwa resource apa pun yang direferensikan oleh pipeline, seperti bucket Cloud Storage, tabel BigQuery, dan topik atau langganan Pub/Sub, benar-benar ada dan dapat diakses. Validasi dilakukan melalui panggilan API standar ke layanan masing-masing, sehingga penting bagi akun pengguna yang digunakan untuk menjalankan pipeline memiliki konektivitas yang tepat ke layanan yang diperlukan dan diotorisasi untuk memanggil API layanan. Sebelum mengirimkan pipeline ke layanan Dataflow, Apache Beam juga memeriksa error lainnya, dan memastikan bahwa grafik pipeline tidak berisi operasi ilegal.
Grafik eksekusi kemudian diterjemahkan ke dalam format JSON, dan grafik eksekusi JSON dikirimkan ke endpoint layanan Dataflow.
Layanan Dataflow kemudian memvalidasi grafik eksekusi JSON. Saat divalidasi, grafik tersebut akan menjadi tugas di layanan Dataflow. Anda dapat melihat tugas, grafik eksekusi, status, dan informasi log -nya menggunakan antarmuka pemantauan Dataflow.
Java
Layanan Dataflow mengirimkan respons ke mesin tempat Anda menjalankan program Dataflow. Respons ini dienkapsulasi dalam objek DataflowPipelineJob, yang berisi jobId tugas Dataflow Anda.
Gunakan jobId untuk memantau, melacak, dan memecahkan masalah tugas Anda menggunakan
antarmuka pemantauan Dataflow
dan antarmuka command line Dataflow.
Untuk mengetahui informasi selengkapnya, lihat
referensi API untuk DataflowPipelineJob.
Python
Layanan Dataflow mengirimkan respons ke mesin tempat Anda menjalankan program Dataflow. Respons ini dienkapsulasi dalam objek DataflowPipelineResult, yang berisi job_id tugas Dataflow Anda.
Gunakan job_id untuk memantau, melacak, dan memecahkan masalah tugas Anda
menggunakan
antarmuka pemantauan Dataflow
dan
antarmuka command line Dataflow.
Go
Layanan Dataflow mengirimkan respons ke mesin tempat Anda menjalankan program Dataflow. Respons ini dienkapsulasi dalam objek dataflowPipelineResult, yang berisi jobID tugas Dataflow Anda.
Gunakan jobID untuk memantau, melacak, dan memecahkan masalah tugas Anda
menggunakan
antarmuka pemantauan Dataflow
dan
antarmuka command line Dataflow.
Pembuatan grafik juga terjadi saat Anda menjalankan pipeline secara lokal, tetapi grafik tidak diterjemahkan ke JSON atau dikirimkan ke layanan. Sebagai gantinya, grafik dijalankan secara lokal di mesin yang sama tempat Anda meluncurkan program Dataflow. Untuk mengetahui informasi selengkapnya, lihat Mengonfigurasi PipelineOptions untuk eksekusi lokal.
Penanganan error dan pengecualian
Pipeline Anda mungkin menampilkan pengecualian saat memproses data. Beberapa error ini bersifat sementara, seperti kesulitan sementara dalam mengakses layanan eksternal. Error lainnya bersifat permanen, seperti error yang disebabkan oleh data input yang rusak atau tidak dapat diuraikan, atau pointer null selama komputasi.
Dataflow memproses elemen dalam paket arbitrer, dan mencoba kembali paket lengkap saat error ditampilkan untuk elemen apa pun dalam paket tersebut. Saat berjalan dalam mode batch, paket yang menyertakan item yang gagal akan dicoba kembali sebanyak empat kali. Pipeline akan gagal sepenuhnya jika satu paket gagal empat kali. Saat berjalan dalam mode streaming, paket yang menyertakan item yang gagal akan dicoba kembali tanpa batas, yang dapat menyebabkan pipeline Anda berhenti secara permanen.
Saat memproses dalam mode batch, Anda mungkin melihat sejumlah besar kegagalan individual sebelum tugas pipeline gagal sepenuhnya, yang terjadi saat paket tertentu gagal setelah empat kali percobaan ulang. Misalnya, jika pipeline Anda mencoba memproses 100 paket, Dataflow dapat menghasilkan beberapa ratus kegagalan individual hingga satu paket mencapai kondisi empat kegagalan untuk keluar.
Error worker startup, seperti kegagalan menginstal paket di worker, bersifat sementara. Skenario ini menghasilkan percobaan ulang tanpa batas, dan dapat menyebabkan pipeline Anda berhenti secara permanen.
Paralelisme dan distribusi
Layanan Dataflow secara otomatis melakukan paralelisme dan mendistribusikan logika pemrosesan dalam pipeline Anda ke worker yang Anda tetapkan untuk menjalankan tugas Anda. Dataflow menggunakan abstraksi dalam
model pemrograman untuk merepresentasikan
fungsi pemrosesan paralel. Misalnya, transformasi ParDo dalam pipeline menyebabkan Dataflow secara otomatis mendistribusikan kode pemrosesan, yang direpresentasikan oleh objek DoFn, ke beberapa worker untuk dijalankan secara paralel.
Ada dua jenis paralelisme tugas:
Paralelisme horizontal terjadi saat data pipeline dibagi dan diproses di beberapa worker pada saat yang sama. Lingkungan runtime Dataflow didukung oleh kumpulan worker terdistribusi. Pipeline memiliki potensi paralelisme yang lebih tinggi jika kumpulan tersebut berisi lebih banyak worker, tetapi konfigurasi tersebut juga memiliki biaya yang lebih tinggi. Secara teori, paralelisme horizontal tidak memiliki batas atas. Namun, Dataflow membatasi kumpulan worker hingga 4.000 worker untuk mengoptimalkan penggunaan resource di seluruh fleet.
Paralelisme vertikal terjadi saat data pipeline dibagi dan diproses oleh beberapa core CPU di worker yang sama. Setiap worker didukung oleh VM Compute Engine. VM dapat menjalankan beberapa proses untuk memenuhi semua core CPU-nya. VM dengan lebih banyak core memiliki potensi paralelisme vertikal yang lebih tinggi, tetapi konfigurasi ini menghasilkan biaya yang lebih tinggi. Jumlah core yang lebih tinggi sering kali menyebabkan peningkatan penggunaan memori, sehingga jumlah core biasanya diskalakan bersama dengan ukuran memori. Mengingat batas fisik arsitektur komputer, batas atas paralelisme vertikal jauh lebih rendah daripada batas atas paralelisme horizontal.
Paralelisme terkelola
Secara default, Dataflow otomatis mengelola paralelisme tugas. Dataflow memantau statistik runtime untuk tugas, seperti penggunaan CPU dan memori, untuk menentukan cara menskalakan tugas. Bergantung pada setelan tugas Anda, Dataflow dapat menskalakan tugas secara horizontal, yang disebut Penskalaan Otomatis Horizontal, atau secara vertikal, yang disebut Penskalaan Vertikal. Penskalaan otomatis untuk paralelisme mengoptimalkan biaya tugas dan performa tugas.
Untuk meningkatkan performa tugas, Dataflow juga mengoptimalkan pipeline secara internal. Pengoptimalan umum adalah pengoptimalan fusi dan pengoptimalan gabungan. Dengan menggabungkan langkah-langkah pipeline, Dataflow menghilangkan biaya yang tidak perlu yang terkait dengan langkah-langkah koordinasi dalam sistem terdistribusi dan menjalankan setiap langkah secara terpisah.
Faktor yang memengaruhi paralelisme
Faktor-faktor berikut memengaruhi seberapa baik fungsi paralelisme dalam tugas Dataflow.
Sumber input
Jika sumber input tidak mengizinkan paralelisme, langkah penyerapan sumber input dapat menjadi bottleneck dalam tugas Dataflow. Misalnya, saat Anda menyerap data dari satu file teks terkompresi, Dataflow tidak dapat melakukan paralelisme data input. Karena sebagian besar format kompresi tidak dapat dibagi secara arbitrer menjadi shard selama penyerapan, Dataflow perlu membaca data secara berurutan dari awal file. Throughput keseluruhan pipeline diperlambat oleh bagian pipeline yang tidak paralel. Solusi untuk masalah ini adalah menggunakan sumber input yang lebih skalabel.
Dalam beberapa kasus, fusi langkah juga mengurangi paralelisme. Jika sumber input tidak mengizinkan paralelisme, jika Dataflow menggabungkan langkah penyerapan data dengan langkah-langkah berikutnya dan menetapkan langkah gabungan ini ke satu thread, seluruh pipeline mungkin berjalan lebih lambat.
Untuk menghindari skenario ini, sisipkan langkah Redistribute setelah langkah penyerapan sumber input. Untuk mengetahui informasi selengkapnya, lihat bagian
Mencegah fusi dalam dokumen ini.
Fanout default dan bentuk data
Fanout default dari satu langkah transformasi dapat menjadi bottleneck dan membatasi paralelisme. Misalnya, transformasi ParDo "fan-out tinggi" dapat menyebabkan fusi membatasi kemampuan Dataflow untuk mengoptimalkan penggunaan worker. Dalam operasi seperti itu, Anda mungkin memiliki kumpulan input dengan elemen yang relatif sedikit, tetapi ParDo menghasilkan output dengan ratusan atau ribuan kali lebih banyak elemen, diikuti oleh ParDo lainnya. Jika layanan Dataflow menggabungkan operasi ParDo ini, paralelisme dalam langkah ini dibatasi hingga jumlah item dalam kumpulan input, meskipun PCollection perantara berisi lebih banyak elemen.
Untuk mengetahui potensi solusinya, lihat bagian Mencegah fusi dalam dokumen ini.
Bentuk data
Bentuk data, baik data input maupun data perantara, dapat membatasi paralelisme.
Misalnya, saat langkah GroupByKey pada kunci natural, seperti kota, diikuti oleh langkah map atau Combine, Dataflow akan menggabungkan kedua langkah tersebut. Jika ruang kunci kecil, misalnya, lima kota, dan satu kunci sangat aktif, misalnya, kota besar, sebagian besar item dalam output langkah GroupByKey didistribusikan ke satu proses. Proses ini menjadi bottleneck dan memperlambat tugas.
Dalam contoh ini, Anda dapat mendistribusikan ulang hasil langkah GroupByKey ke ruang kunci buatan yang lebih besar, bukan menggunakan kunci natural. Sisipkan langkah Redistribute antara langkah GroupByKey dan langkah map atau Combine. Pada langkah Redistribute, buat ruang kunci buatan, misalnya dengan menggunakan fungsi hash, untuk mengatasi paralelisme terbatas yang disebabkan oleh bentuk data.
Untuk mengetahui informasi selengkapnya, lihat bagian Mencegah fusi dalam dokumen ini.
Sink output
Sink adalah transformasi yang menulis ke sistem penyimpanan data eksternal, seperti file atau database. Dalam praktiknya, sink dimodelkan dan diimplementasikan sebagai objek DoFn standar dan digunakan untuk mewujudkan PCollection ke sistem eksternal.
Dalam hal ini, PCollection berisi hasil pipeline akhir. Thread yang memanggil sink API dapat berjalan secara paralel untuk menulis data ke sistem eksternal. Secara default, tidak ada koordinasi antara thread. Tanpa lapisan perantara untuk mem-buffer permintaan tulis dan alur kontrol, sistem eksternal dapat kelebihan beban dan mengurangi throughput tulis. Meningkatkan skala resource dengan menambahkan lebih banyak paralelisme dapat memperlambat pipeline lebih lanjut.
Solusi untuk masalah ini adalah mengurangi paralelisme dalam langkah tulis.
Anda dapat menambahkan langkah GroupByKey tepat sebelum langkah tulis. Langkah GroupByKey mengelompokkan data output ke dalam kumpulan batch yang lebih kecil untuk mengurangi total panggilan RPC dan koneksi ke sistem eksternal. Misalnya, gunakan GroupByKey untuk membuat ruang hash 50 dari 1 juta titik data.
Kelemahan dari pendekatan ini adalah bahwa pendekatan ini memperkenalkan batas hardcode untuk paralelisme. Opsi lainnya adalah menerapkan backoff eksponensial di sink saat menulis data. Opsi ini dapat memberikan throttling klien minimum.
Memantau paralelisme
Untuk memantau paralelisme, Anda dapat menggunakan Google Cloud konsol untuk melihat straggler yang terdeteksi. Untuk mengetahui informasi selengkapnya, lihat Memecahkan masalah straggler dalam tugas batch dan Memecahkan masalah straggler dalam tugas streaming.
Pengoptimalan fusi
Setelah bentuk JSON dari grafik eksekusi pipeline Anda divalidasi, layanan Dataflow mungkin mengubah grafik untuk melakukan pengoptimalan.
Pengoptimalan dapat mencakup penggabungan beberapa langkah atau transformasi dalam grafik eksekusi pipeline Anda menjadi satu langkah. Penggabungan langkah-langkah mencegah layanan Dataflow perlu mewujudkan setiap PCollection perantara dalam pipeline Anda, yang dapat menimbulkan biaya yang mahal dalam hal overhead memori dan pemrosesan.
Meskipun semua transformasi yang Anda tentukan dalam pembuatan pipeline dijalankan di layanan, untuk memastikan eksekusi pipeline Anda yang paling efisien, transformasi mungkin dijalankan dalam urutan yang berbeda atau sebagai bagian dari transformasi gabungan yang lebih besar. Layanan Dataflow menghormati dependensi data antara langkah-langkah dalam grafik eksekusi, tetapi langkah-langkah lainnya dapat dijalankan dalam urutan apa pun.
Contoh fusi
Diagram berikut menunjukkan cara grafik eksekusi dari contoh WordCount yang disertakan dengan Apache Beam SDK untuk Java dapat dioptimalkan dan digabungkan oleh layanan Dataflow untuk eksekusi yang efisien:

Gambar 2: Contoh Grafik Eksekusi WordCount yang Dioptimalkan
Mencegah fusi
Dalam beberapa kasus, Dataflow mungkin salah menebak cara optimal untuk menggabungkan operasi dalam pipeline, yang dapat membatasi kemampuan Dataflow untuk menggunakan semua worker yang tersedia. Dalam kasus seperti itu, Anda dapat memberikan petunjuk kepada Dataflow untuk mendistribusikan ulang data, dengan menggunakan transformasi Redistribute.
Untuk menambahkan transformasi Redistribute, panggil salah satu metode berikut:
Redistribute.arbitrarily: Menunjukkan bahwa data kemungkinan tidak seimbang. Dataflow memilih algoritma terbaik untuk mendistribusikan ulang data.Redistribute.byKey: Menunjukkan bahwaPCollectionpasangan nilai kunci kemungkinan tidak seimbang dan harus didistribusikan ulang berdasarkan kunci. Biasanya, Dataflow akan menempatkan semua elemen dari satu kunci di thread worker yang sama. Namun, penempatan kunci tidak dijamin, dan elemen diproses secara independen.
Jika pipeline Anda berisi transformasi Redistribute, Dataflow biasanya mencegah penggabungan langkah-langkah sebelum dan sesudah transformasi Redistribute, dan mengacak data sehingga langkah-langkah hilir dari transformasi Redistribute memiliki paralelisme yang lebih optimal.
Memantau fusi
Anda dapat mengakses grafik yang dioptimalkan dan tahap gabungan di Google Cloud konsol, menggunakan gcloud CLI, atau menggunakan API.
Konsol
Untuk melihat tahap dan langkah gabungan grafik di konsol, di tab Execution details untuk tugas Dataflow Anda, buka tampilan grafik Stage workflow.
Untuk melihat langkah komponen yang digabungkan untuk suatu tahap, di grafik, klik tahap gabungan. Di panel Stage info, baris Component steps menampilkan tahap gabungan. Terkadang, bagian dari satu transformasi komposit digabungkan ke dalam beberapa tahap.
gcloud
Untuk mengakses grafik yang dioptimalkan dan tahap gabungan menggunakan gcloud CLI, jalankan perintah gcloud berikut:
gcloud dataflow jobs describe --full JOB_ID --format json
Ganti JOB_ID dengan ID tugas Dataflow Anda.
Untuk mengekstrak bit yang relevan, teruskan output perintah gcloud ke jq:
gcloud dataflow jobs describe --full JOB_ID --format json | jq '.pipelineDescription.executionPipelineStage\[\] | {"stage_id": .id, "stage_name": .name, "fused_steps": .componentTransform }'
Untuk melihat deskripsi tahap gabungan dalam file respons output, dalam
ComponentTransform
array, lihat
ExecutionStageSummary
objek.
API
Untuk mengakses grafik yang dioptimalkan dan tahap gabungan menggunakan API, panggil
project.locations.jobs.get.
Untuk melihat deskripsi tahap gabungan dalam file respons output, dalam
ComponentTransform
array, lihat
ExecutionStageSummary
objek.
Pengoptimalan gabungan
Operasi agregasi adalah konsep penting dalam pemrosesan data skala besar.
Agregasi menggabungkan data yang secara konseptual berjauhan, sehingga sangat berguna untuk korelasi. Model pemrograman Dataflow merepresentasikan operasi agregasi sebagai transformasi GroupByKey, CoGroupByKey, dan
Combine.
Operasi agregasi Dataflow menggabungkan data di seluruh set data, termasuk data yang mungkin tersebar di beberapa worker. Selama operasi agregasi tersebut, sering kali paling efisien untuk menggabungkan data sebanyak mungkin secara lokal sebelum menggabungkan data di seluruh instance. Saat Anda menerapkan GroupByKey atau transformasi agregasi lainnya, layanan Dataflow akan otomatis melakukan penggabungan parsial secara lokal sebelum operasi pengelompokan utama.
Saat melakukan penggabungan parsial atau multi-level, layanan Dataflow membuat keputusan yang berbeda berdasarkan apakah pipeline Anda bekerja dengan data batch atau streaming. Untuk data terbatas, layanan ini lebih mengutamakan efisiensi dan akan melakukan penggabungan lokal sebanyak mungkin. Untuk data tidak terbatas, layanan ini lebih mengutamakan latensi yang lebih rendah, dan mungkin tidak melakukan penggabungan parsial, karena dapat meningkatkan latensi.