Pemrosesan paralel

Pipeline dijalankan di cluster mesin. Mereka mencapai throughput tinggi dengan membagi tugas yang perlu dilakukan, lalu menjalankan tugas secara paralel di beberapa eksekutor yang tersebar di seluruh cluster. Secara umum, semakin besar jumlah pemisahan (juga disebut partisi), semakin cepat pipeline dapat dijalankan. Tingkat paralelisme di pipeline Anda ditentukan oleh tahap sumber dan pengacakan di pipeline.

Sumber

Di awal setiap operasi pipeline, setiap sumber dalam pipeline Anda menghitung data yang perlu dibaca, dan cara membagi data tersebut menjadi beberapa bagian. Misalnya, pertimbangkan pipeline dasar yang membaca dari Cloud Storage, melakukan beberapa transformasi Wrangler, lalu menulis kembali ke Cloud Storage.

Pipeline dasar yang menampilkan sumber Cloud Storage, transformasi Wrangler, dan sink Cloud Storage

Saat pipeline dimulai, sumber Cloud Storage akan memeriksa file input dan membaginya menjadi beberapa bagian berdasarkan ukuran file. Misalnya, file berukuran satu gigabyte dapat dibagi menjadi 100 bagian, yang masing-masing berukuran 10 MB. Setiap eksekutor membaca data untuk pemisahan tersebut, menjalankan transformasi Wrangler, lalu menulis output ke file part.

Data yang dipartisi di Cloud Storage menjadi transformasi Wrangler paralel ke dalam file part

Jika pipeline Anda berjalan lambat, salah satu hal pertama yang harus diperiksa adalah apakah sumber Anda membuat pemisahan yang cukup untuk memanfaatkan paralelisme sepenuhnya. Misalnya, beberapa jenis kompresi membuat file teks biasa tidak dapat dibagi. Jika Anda membaca file yang telah di-gzip, Anda mungkin melihat bahwa pipeline Anda berjalan jauh lebih lambat daripada jika Anda membaca file yang tidak dikompresi, atau file yang dikompresi dengan BZIP (yang dapat dibagi). Demikian pula, jika Anda menggunakan sumber database dan telah mengonfigurasinya untuk menggunakan satu pemisahan saja, sumber tersebut akan berjalan jauh lebih lambat daripada jika Anda mengonfigurasinya untuk menggunakan lebih banyak pemisahan.

Mengacak

Jenis plugin tertentu menyebabkan data diacak di seluruh cluster. Hal ini terjadi saat data yang diproses oleh satu eksekutor perlu dikirim ke eksekutor lain untuk melakukan komputasi. Pengacakan adalah operasi yang mahal karena melibatkan banyak I/O. Semua plugin yang menyebabkan data diacak akan muncul di bagian Analytics di Pipeline Studio. Hal ini mencakup plugin, seperti Kelompokkan Menurut, Hapus Duplikat, Berbeda, dan Penggabung. Misalnya, anggap saja tahap Kelompokkan Menurut ditambahkan ke pipeline dalam contoh sebelumnya.

Misalkan juga data yang dibaca mewakili pembelian yang dilakukan di toko bahan makanan. Setiap catatan berisi kolom item dan kolom num_purchased. Pada tahap Group By, kita mengonfigurasi pipeline untuk mengelompokkan data pada kolom item dan menghitung jumlah kolom num_purchased.

Saat pipeline berjalan, file input akan dibagi seperti yang dijelaskan sebelumnya. Setelah itu, setiap data diacak di seluruh cluster sehingga setiap data dengan item yang sama termasuk dalam executor yang sama.

Seperti yang diilustrasikan dalam contoh sebelumnya, catatan untuk pembelian apel awalnya tersebar di beberapa eksekutor. Untuk melakukan penggabungan, semua catatan tersebut harus dikirim di seluruh cluster ke executor yang sama.

Sebagian besar plugin yang memerlukan pengacakan memungkinkan Anda menentukan jumlah partisi yang akan digunakan saat mengacak data. Ini mengontrol jumlah eksekutor yang digunakan untuk memproses data yang diacak.

Dalam contoh sebelumnya, jika jumlah partisi ditetapkan ke 2, setiap eksekutor menghitung agregat untuk dua item, bukan satu.

Perhatikan bahwa Anda dapat mengurangi paralelisme pipeline setelah tahap tersebut. Misalnya, pertimbangkan tampilan logis pipeline:

Jika sumber membagi data di 500 partisi, tetapi pengacakan Kelompokkan menurut menggunakan 200 partisi, tingkat paralelisme maksimum setelah Kelompokkan menurut akan turun dari 500 menjadi 200. Daripada 500 file bagian berbeda yang ditulis ke Cloud Storage, Anda hanya memiliki 200 file.

Memilih partisi

Jika jumlah partisi terlalu rendah, Anda tidak akan menggunakan kapasitas penuh cluster untuk memparalelkan sebanyak mungkin pekerjaan. Menetapkan partisi terlalu tinggi akan meningkatkan jumlah overhead yang tidak perlu. Secara umum, lebih baik menggunakan terlalu banyak partisi daripada terlalu sedikit. Overhead tambahan perlu dikhawatirkan jika pipeline Anda memerlukan waktu beberapa menit untuk dijalankan dan Anda mencoba mengurangi beberapa menit. Jika pipeline Anda memerlukan waktu berjam-jam untuk dijalankan, overhead umumnya bukan sesuatu yang perlu Anda khawatirkan.

Cara yang berguna, tetapi terlalu sederhana, untuk menentukan jumlah partisi yang akan digunakan adalah dengan menyetelnya ke max(cluster CPUs, input records / 500,000). Dengan kata lain, ambil jumlah rekaman input dan bagi dengan 500.000. Jika jumlah tersebut lebih besar daripada jumlah CPU cluster, gunakan jumlah tersebut untuk jumlah partisi. Jika tidak, gunakan jumlah CPU cluster. Misalnya, jika cluster Anda memiliki 100 CPU dan tahap pengacakan diperkirakan memiliki 100 juta input data, gunakan 200 partisi.

Jawaban yang lebih lengkap adalah bahwa pengacakan akan berperforma terbaik jika data pengacakan perantara untuk setiap partisi dapat dimuat sepenuhnya dalam memori eksekutor sehingga tidak ada yang perlu ditransfer ke disk. Spark mencadangkan kurang dari 30% memori executor untuk menyimpan data shuffle. Jumlah pastinya adalah (total memori - 300 MB) * 30%. Jika kita mengasumsikan setiap eksekutor ditetapkan untuk menggunakan memori 2 GB, berarti setiap partisi tidak boleh menyimpan lebih dari (2 GB - 300 MB) * 30% = sekitar 500 MB data. Jika kita mengasumsikan setiap kumpulan data dikompresi menjadi berukuran 1 KB, maka (500 MB / partisi) / (1 KB/kumpulan data) = 500.000 kumpulan data per partisi. Jika eksekutor Anda menggunakan lebih banyak memori, atau data Anda lebih kecil, Anda dapat menyesuaikan jumlah ini.

Kecondongan data

Perhatikan bahwa dalam contoh sebelumnya, pembelian berbagai item didistribusikan secara merata. Artinya, ada tiga pembelian untuk setiap item, yaitu apel, pisang, wortel, dan telur. Pengacakan pada kunci yang didistribusikan secara merata adalah jenis pengacakan dengan performa terbaik, tetapi banyak set data tidak memiliki properti ini. Melanjutkan pembelian di toko bahan makanan dalam contoh sebelumnya, Anda akan mengharapkan lebih banyak pembelian telur daripada kartu pernikahan. Jika ada beberapa kunci pengacakan yang jauh lebih umum daripada kunci lainnya, Anda sedang berurusan dengan data yang miring. Data miring dapat berperforma jauh lebih buruk daripada data tidak miring karena jumlah pekerjaan yang tidak proporsional dilakukan oleh sejumlah kecil pelaksana. Hal ini menyebabkan subset kecil partisi menjadi jauh lebih besar daripada partisi lainnya.

Dalam contoh ini, pembelian telur lima kali lebih banyak daripada pembelian kartu, yang berarti agregat telur membutuhkan waktu sekitar lima kali lebih lama untuk dihitung. Hal ini tidak terlalu penting saat menangani 10 data, bukan dua, tetapi akan sangat berbeda saat menangani lima miliar data, bukan satu miliar. Jika Anda mengalami kemiringan data, jumlah partisi yang digunakan dalam pengacakan tidak akan berdampak besar pada performa pipeline.

Anda dapat mengenali kemiringan data dengan memeriksa grafik untuk catatan output dari waktu ke waktu. Jika tahap menghasilkan rekaman dengan kecepatan yang jauh lebih tinggi di awal eksekusi pipeline, lalu tiba-tiba melambat, ini mungkin berarti Anda memiliki data yang miring.

Anda juga dapat mengenali kemiringan data dengan memeriksa penggunaan memori cluster dari waktu ke waktu. Jika kluster Anda mencapai kapasitas selama beberapa waktu, tetapi tiba-tiba memiliki penggunaan memori yang rendah selama beberapa waktu, ini juga merupakan tanda bahwa Anda sedang menghadapi kemiringan data.

Data miring paling signifikan memengaruhi performa saat penggabungan dilakukan. Ada beberapa teknik yang dapat digunakan untuk meningkatkan performa untuk gabungan miring. Untuk mengetahui informasi selengkapnya, lihat Pemrosesan paralel untuk operasi JOIN.

Penyesuaian adaptif untuk eksekusi

Untuk menyesuaikan eksekusi secara adaptif, tentukan rentang partisi yang akan digunakan, bukan nomor partisi yang tepat. Nomor partisi yang tepat, meskipun ditetapkan dalam konfigurasi pipeline, akan diabaikan jika eksekusi adaptif diaktifkan.

Jika Anda menggunakan cluster Managed Service for Apache Spark sementara, Cloud Data Fusion akan otomatis menetapkan konfigurasi yang tepat, tetapi untuk cluster Managed Service for Apache Spark atau Hadoop statis, dua parameter konfigurasi berikut dapat ditetapkan:

  • spark.default.parallelism: tetapkan ke jumlah total vCore yang tersedia di cluster. Hal ini memastikan cluster Anda tidak kurang dimuat dan menentukan batas bawah untuk jumlah partisi.
  • spark.sql.adaptive.coalescePartitions.initialPartitionNum: tetapkan ke 32x dari jumlah vCore yang tersedia di cluster. Ini menentukan batas atas untuk jumlah partisi.
  • Spark.sql.adaptive.enabled: untuk mengaktifkan pengoptimalan, tetapkan nilai ini ke true. Managed Service for Apache Spark menyetelnya secara otomatis, tetapi jika Anda menggunakan cluster Hadoop generik, Anda harus memastikan bahwa fitur ini diaktifkan .

Parameter ini dapat ditetapkan dalam konfigurasi mesin pipeline tertentu atau dalam properti cluster cluster Managed Service for Apache Spark statis.

Langkah berikutnya