Mode Fleksibilitas yang Ditingkatkan/Enhanced Flexibility Mode (EFM) Managed Service untuk Apache Spark mengelola data shuffle untuk meminimalkan penundaan progres tugas yang disebabkan oleh penghapusan node dari cluster yang sedang berjalan. EFM mengalihkan data shuffle dengan menulis data ke worker primer. Worker mengambil data dari node jarak jauh tersebut selama fase pengurangan. Mode ini hanya tersedia untuk tugas Spark.
Karena EFM tidak menyimpan data shuffle perantara di worker sekunder, EFM sangat cocok untuk cluster yang menggunakan preemptible VM atau hanya melakukan penskalaan otomatis grup worker sekunder.
EFM didukung di Managed Service untuk Apache Spark
2.0.31+, 2.1.6+, 2.2+, dan versi image yang lebih baru
.
- Tugas Apache Hadoop YARN yang tidak mendukung relokasi AppMaster dapat gagal dalam Mode Fleksibilitas yang Ditingkatkan (lihat Kapan harus menunggu AppMaster selesai).
- Mode Fleksibilitas yang Ditingkatkan tidak direkomendasikan:
- di cluster yang hanya memiliki worker primer
- pada tugas streaming karena dapat memerlukan waktu hingga 30 menit setelah tugas selesai untuk membersihkan data shuffle perantara.
- di cluster yang menjalankan notebook karena data shuffle mungkin tidak dibersihkan selama masa aktif sesi.
- saat tugas Spark berjalan di cluster dengan penghentian tuntas diaktifkan. Penghentian tuntas dan EFM dapat bekerja dengan tujuan yang berbeda karena mekanisme penghentian tuntas YARN mempertahankan node DECOMMISSIONING hingga semua aplikasi yang terlibat selesai.
- di cluster yang menjalankan tugas Spark dan non-Spark.
- Mode Fleksibilitas yang Ditingkatkan tidak didukung:
- saat penskalaan otomatis worker primer diaktifkan. Dalam sebagian besar kasus, worker primer akan terus menyimpan data shuffle yang tidak dimigrasikan secara otomatis. Penskalaan grup worker primer akan meniadakan manfaat EFM.
Menggunakan Mode Fleksibilitas yang Ditingkatkan
Fleksibilitas yang Ditingkatkan diaktifkan saat Anda membuat cluster dengan menetapkan properti cluster
dataproc:efm.spark.shuffle
ke primary-worker.
Contoh:
gcloud dataproc clusters create cluster-name \ --region=region \ --properties=dataproc:efm.spark.shuffle=primary-worker \ other flags ...
Contoh Apache Spark
- Jalankan tugas WordCount terhadap teks Shakespeare publik menggunakan contoh jar Spark di cluster EFM.
gcloud dataproc jobs submit spark \ --cluster=cluster-name \ --region=region \ --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \ --class=org.apache.spark.examples.JavaWordCount \ -- gs://apache-beam-samples/shakespeare/macbeth.txt
Mengonfigurasi SSD lokal
Karena EFM menulis data shuffle perantara ke disk yang terpasang ke VM, EFM mendapatkan manfaat dari throughput dan IOPS tambahan yang disediakan oleh SSD lokal. Untuk memfasilitasi alokasi resource, targetkan tujuan sekitar 1 partisi SSD lokal per 4 vCPU saat mengonfigurasi mesin worker primer.
Untuk melampirkan SSD lokal, teruskan flag --num-worker-local-ssds ke
perintah gcloud Managed Service untuk Apache Spark clusters create.
Umumnya, Anda tidak memerlukan SSD lokal di worker sekunder.
Menambahkan SSD lokal ke worker sekunder cluster (menggunakan flag --num-secondary-worker-local-ssds) sering kali kurang penting karena worker sekunder tidak menulis data shuffle secara lokal.
Namun, karena SSD lokal meningkatkan performa disk lokal, Anda dapat memutuskan untuk menambahkan
SSD lokal ke worker sekunder jika Anda memperkirakan
tugas akan terikat I/O
karena penggunaan disk lokal: tugas Anda menggunakan disk lokal yang signifikan untuk
ruang scratch atau partisi Anda
terlalu besar untuk dimuat dalam memori dan akan meluap ke disk.
Rasio worker sekunder
Karena worker sekunder menulis data shuffle ke worker primer, cluster Anda harus berisi jumlah worker primer yang memadai dengan resource CPU, memori, dan disk yang memadai untuk mengakomodasi beban shuffle tugas Anda. Untuk
cluster penskalaan otomatis, agar grup primer tidak melakukan penskalaan dan menyebabkan
perilaku yang tidak diinginkan, tetapkan minInstances ke nilai maxInstances dalam
kebijakan penskalaan otomatis
untuk grup worker primer.
Jika Anda memiliki rasio worker sekunder ke primer yang tinggi (misalnya, 10:1), pantau penggunaan CPU, jaringan, dan disk worker primer untuk menentukan apakah worker tersebut kelebihan beban. Untuk melakukannya:
Buka halaman instance VM di Google Cloud konsol.
Klik kotak centang di sisi kiri worker primer.
Klik tab MONITORING untuk melihat Penggunaan CPU, IOPS Disk, Byte Jaringan, dan metrik lainnya dari worker primer.
Jika worker primer kelebihan beban, pertimbangkan untuk melakukan penskalaan worker primer secara manual.
Mengubah ukuran grup worker primer
Grup worker primer dapat diskalakan dengan aman, tetapi penskalaan grup worker primer dapat berdampak negatif pada progres tugas. Operasi yang melakukan penskalaan grup worker primer
harus menggunakan
penghentian tuntas, yang diaktifkan dengan menetapkan flag --graceful-decommission-timeout.
Cluster penskalaan otomatis: Penskalaan grup worker primer dinonaktifkan pada cluster EFM dengan kebijakan penskalaan otomatis. Untuk mengubah ukuran grup worker primer di cluster penskalaan otomatis:
Nonaktifkan penskalaan otomatis.
gcloud dataproc clusters update \ --cluster=cluster-name \ --region=region \ --disable-autoscaling
Skalakan grup primer.
gcloud dataproc clusters update \ --cluster=cluster-name \ --region=region \ --num-workers=num-primary-workers \ --graceful-decommission-timeout=graceful-decommission-timeout # (if downscaling)
Aktifkan kembali penskalaan otomatis:
gcloud dataproc clusters update \ --cluster=cluster-name \ --region=region \ --autoscaling-policy=autoscaling-policy
Memantau penggunaan disk worker primer
Worker primer harus memiliki ruang disk yang memadai untuk data shuffle cluster.
Anda dapat memantaunya secara tidak langsung melalui metrik remaining HDFS capacity.
Saat disk lokal terisi, ruang menjadi tidak tersedia untuk HDFS, dan kapasitas yang tersisa akan berkurang.
Secara default, saat penggunaan disk lokal worker primer melebihi 90% kapasitas, node akan ditandai sebagai UNHEALTHY di UI node YARN. Jika mengalami masalah kapasitas disk, Anda dapat menghapus data yang tidak digunakan dari HDFS atau melakukan penskalaan kumpulan worker primer.
Konfigurasi lanjutan
Partisi dan paralelisme
Saat mengirimkan tugas Spark, konfigurasikan tingkat partisi yang sesuai. Menentukan jumlah partisi input dan output untuk tahap shuffle melibatkan pertukaran antara karakteristik performa yang berbeda. Sebaiknya bereksperimenlah dengan nilai yang sesuai untuk bentuk tugas Anda.
Partisi input
Partisi input Spark dan MapReduce ditentukan oleh set data input. Saat membaca file dari Cloud Storage, setiap tugas memproses data yang kira-kira bernilai satu "ukuran blok".
Untuk tugas Spark SQL, ukuran partisi maksimum dikontrol oleh
spark.sql.files.maxPartitionBytes. Pertimbangkan untuk meningkatkannya menjadi 1 GB:spark.sql.files.maxPartitionBytes=1073741824.Untuk Spark RDD, ukuran partisi biasanya dikontrol dengan
fs.gs.block.size, yang secara default adalah 128 MB. Pertimbangkan untuk meningkatkannya menjadi 1 GB. Contoh:--properties spark.hadoop.fs.gs.block.size=1073741824
Partisi output
Jumlah tugas pada tahap berikutnya dikontrol oleh beberapa properti. Pada tugas yang lebih besar yang memproses lebih dari 1 TB, pertimbangkan untuk memiliki setidaknya 1 GB per partisi.
Untuk Spark SQL, jumlah partisi output dikontrol oleh
spark.sql.shuffle.partitions.Untuk tugas Spark yang menggunakan RDD API, Anda dapat menentukan jumlah partisi output atau menetapkan
spark.default.parallelism.
Penyesuaian shuffle untuk shuffle worker primer
Properti yang paling signifikan adalah --properties yarn:spark.shuffle.io.serverThreads=<num-threads>.
Perhatikan bahwa ini adalah properti YARN tingkat cluster karena server shuffle Spark berjalan sebagai bagian dari Node Manager. Nilai defaultnya adalah dua kali (2x) jumlah core di mesin (misalnya, 16 thread di n1-highmem-8). Jika "Shuffle Read Blocked Time" lebih besar dari 1 detik, dan worker primer belum mencapai batas jaringan, CPU, atau disk, pertimbangkan untuk meningkatkan jumlah thread server shuffle.
Pada jenis mesin yang lebih besar, pertimbangkan untuk meningkatkan spark.shuffle.io.numConnectionsPerPeer, yang secara default adalah 1. (Misalnya, tetapkan ke 5 koneksi per pasangan host).
Meningkatkan upaya coba ulang
Jumlah maksimum upaya yang diizinkan untuk master aplikasi, tugas, dan tahap dapat dikonfigurasi dengan menetapkan properti berikut:
yarn:yarn.resourcemanager.am.max-attempts spark:spark.task.maxFailures spark:spark.stage.maxConsecutiveAttempts
Karena master aplikasi dan tugas lebih sering dihentikan di cluster yang menggunakan banyak preemptible VM atau penskalaan otomatis tanpa penghentian tuntas, meningkatkan nilai properti sebelumnya di cluster tersebut dapat membantu (perhatikan bahwa penggunaan EFM dengan Spark dan penghentian tuntas tidak didukung).
Penghentian tuntas YARN pada cluster EFM
Penghentian Tuntas YARN dapat digunakan untuk menghapus node dengan cepat dan dampak minimal pada aplikasi yang sedang berjalan. Untuk cluster penskalaan otomatis, waktu tunggu penghentian tuntas dapat ditetapkan dalam AutoscalingPolicy yang terlampir ke cluster EFM.
Peningkatan EFM untuk penghentian tuntas
Karena data perantara disimpan dalam sistem file terdistribusi, node dapat dihapus dari cluster EFM segera setelah semua penampung yang berjalan di node tersebut selesai. Sebagai perbandingan, node tidak dihapus di cluster Managed Service untuk Apache Spark standar hingga aplikasi selesai.
Penghapusan node tidak menunggu master aplikasi yang berjalan di node selesai. Saat penampung master aplikasi dihentikan, penampung tersebut akan dijadwalkan ulang di node lain yang tidak dihentikan. Progres tugas tidak hilang: master aplikasi baru dengan cepat memulihkan status dari master aplikasi sebelumnya dengan membaca histori tugas.