Mode Fleksibilitas yang Ditingkatkan Managed Service for Apache Spark

Mode Fleksibilitas yang Ditingkatkan/Enhanced Flexibility Mode (EFM) Managed Service untuk Apache Spark mengelola data shuffle untuk meminimalkan penundaan progres tugas yang disebabkan oleh penghapusan node dari cluster yang sedang berjalan. EFM mengalihkan data shuffle dengan menulis data ke worker primer. Worker mengambil data dari node jarak jauh tersebut selama fase pengurangan. Mode ini hanya tersedia untuk tugas Spark.

Karena EFM tidak menyimpan data shuffle perantara di worker sekunder, EFM sangat cocok untuk cluster yang menggunakan preemptible VM atau hanya melakukan penskalaan otomatis grup worker sekunder.

EFM didukung di Managed Service untuk Apache Spark 2.0.31+, 2.1.6+, 2.2+, dan versi image yang lebih baru .

Batasan:

  • Tugas Apache Hadoop YARN yang tidak mendukung relokasi AppMaster dapat gagal dalam Mode Fleksibilitas yang Ditingkatkan (lihat Kapan harus menunggu AppMaster selesai).
  • Mode Fleksibilitas yang Ditingkatkan tidak direkomendasikan:
    • di cluster yang hanya memiliki worker primer
    • pada tugas streaming karena dapat memerlukan waktu hingga 30 menit setelah tugas selesai untuk membersihkan data shuffle perantara.
    • di cluster yang menjalankan notebook karena data shuffle mungkin tidak dibersihkan selama masa aktif sesi.
    • saat tugas Spark berjalan di cluster dengan penghentian tuntas diaktifkan. Penghentian tuntas dan EFM dapat bekerja dengan tujuan yang berbeda karena mekanisme penghentian tuntas YARN mempertahankan node DECOMMISSIONING hingga semua aplikasi yang terlibat selesai.
    • di cluster yang menjalankan tugas Spark dan non-Spark.
  • Mode Fleksibilitas yang Ditingkatkan tidak didukung:
    • saat penskalaan otomatis worker primer diaktifkan. Dalam sebagian besar kasus, worker primer akan terus menyimpan data shuffle yang tidak dimigrasikan secara otomatis. Penskalaan grup worker primer akan meniadakan manfaat EFM.

Menggunakan Mode Fleksibilitas yang Ditingkatkan

Fleksibilitas yang Ditingkatkan diaktifkan saat Anda membuat cluster dengan menetapkan properti cluster dataproc:efm.spark.shuffle ke primary-worker.

Contoh:

gcloud dataproc clusters create cluster-name \
    --region=region \
    --properties=dataproc:efm.spark.shuffle=primary-worker \
     other flags ...

Contoh Apache Spark

  1. Jalankan tugas WordCount terhadap teks Shakespeare publik menggunakan contoh jar Spark di cluster EFM.
    gcloud dataproc jobs submit spark \
        --cluster=cluster-name \
        --region=region \
        --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
        --class=org.apache.spark.examples.JavaWordCount \
        -- gs://apache-beam-samples/shakespeare/macbeth.txt
    

Mengonfigurasi SSD lokal

Karena EFM menulis data shuffle perantara ke disk yang terpasang ke VM, EFM mendapatkan manfaat dari throughput dan IOPS tambahan yang disediakan oleh SSD lokal. Untuk memfasilitasi alokasi resource, targetkan tujuan sekitar 1 partisi SSD lokal per 4 vCPU saat mengonfigurasi mesin worker primer.

Untuk melampirkan SSD lokal, teruskan flag --num-worker-local-ssds ke perintah gcloud Managed Service untuk Apache Spark clusters create.

Umumnya, Anda tidak memerlukan SSD lokal di worker sekunder. Menambahkan SSD lokal ke worker sekunder cluster (menggunakan flag --num-secondary-worker-local-ssds) sering kali kurang penting karena worker sekunder tidak menulis data shuffle secara lokal. Namun, karena SSD lokal meningkatkan performa disk lokal, Anda dapat memutuskan untuk menambahkan SSD lokal ke worker sekunder jika Anda memperkirakan tugas akan terikat I/O karena penggunaan disk lokal: tugas Anda menggunakan disk lokal yang signifikan untuk ruang scratch atau partisi Anda terlalu besar untuk dimuat dalam memori dan akan meluap ke disk.

Rasio worker sekunder

Karena worker sekunder menulis data shuffle ke worker primer, cluster Anda harus berisi jumlah worker primer yang memadai dengan resource CPU, memori, dan disk yang memadai untuk mengakomodasi beban shuffle tugas Anda. Untuk cluster penskalaan otomatis, agar grup primer tidak melakukan penskalaan dan menyebabkan perilaku yang tidak diinginkan, tetapkan minInstances ke nilai maxInstances dalam kebijakan penskalaan otomatis untuk grup worker primer.

Jika Anda memiliki rasio worker sekunder ke primer yang tinggi (misalnya, 10:1), pantau penggunaan CPU, jaringan, dan disk worker primer untuk menentukan apakah worker tersebut kelebihan beban. Untuk melakukannya:

  1. Buka halaman instance VM di Google Cloud konsol.

  2. Klik kotak centang di sisi kiri worker primer.

  3. Klik tab MONITORING untuk melihat Penggunaan CPU, IOPS Disk, Byte Jaringan, dan metrik lainnya dari worker primer.

Jika worker primer kelebihan beban, pertimbangkan untuk melakukan penskalaan worker primer secara manual.

Mengubah ukuran grup worker primer

Grup worker primer dapat diskalakan dengan aman, tetapi penskalaan grup worker primer dapat berdampak negatif pada progres tugas. Operasi yang melakukan penskalaan grup worker primer harus menggunakan penghentian tuntas, yang diaktifkan dengan menetapkan flag --graceful-decommission-timeout.

Cluster penskalaan otomatis: Penskalaan grup worker primer dinonaktifkan pada cluster EFM dengan kebijakan penskalaan otomatis. Untuk mengubah ukuran grup worker primer di cluster penskalaan otomatis:

  1. Nonaktifkan penskalaan otomatis.

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --disable-autoscaling
    

  2. Skalakan grup primer.

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --num-workers=num-primary-workers \
        --graceful-decommission-timeout=graceful-decommission-timeout # (if downscaling)
    

  3. Aktifkan kembali penskalaan otomatis:

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --autoscaling-policy=autoscaling-policy
    

Memantau penggunaan disk worker primer

Worker primer harus memiliki ruang disk yang memadai untuk data shuffle cluster. Anda dapat memantaunya secara tidak langsung melalui metrik remaining HDFS capacity. Saat disk lokal terisi, ruang menjadi tidak tersedia untuk HDFS, dan kapasitas yang tersisa akan berkurang.

Secara default, saat penggunaan disk lokal worker primer melebihi 90% kapasitas, node akan ditandai sebagai UNHEALTHY di UI node YARN. Jika mengalami masalah kapasitas disk, Anda dapat menghapus data yang tidak digunakan dari HDFS atau melakukan penskalaan kumpulan worker primer.

Konfigurasi lanjutan

Partisi dan paralelisme

Saat mengirimkan tugas Spark, konfigurasikan tingkat partisi yang sesuai. Menentukan jumlah partisi input dan output untuk tahap shuffle melibatkan pertukaran antara karakteristik performa yang berbeda. Sebaiknya bereksperimenlah dengan nilai yang sesuai untuk bentuk tugas Anda.

Partisi input

Partisi input Spark dan MapReduce ditentukan oleh set data input. Saat membaca file dari Cloud Storage, setiap tugas memproses data yang kira-kira bernilai satu "ukuran blok".

  • Untuk tugas Spark SQL, ukuran partisi maksimum dikontrol oleh spark.sql.files.maxPartitionBytes. Pertimbangkan untuk meningkatkannya menjadi 1 GB: spark.sql.files.maxPartitionBytes=1073741824.

  • Untuk Spark RDD, ukuran partisi biasanya dikontrol dengan fs.gs.block.size, yang secara default adalah 128 MB. Pertimbangkan untuk meningkatkannya menjadi 1 GB. Contoh: --properties spark.hadoop.fs.gs.block.size=1073741824

Partisi output

Jumlah tugas pada tahap berikutnya dikontrol oleh beberapa properti. Pada tugas yang lebih besar yang memproses lebih dari 1 TB, pertimbangkan untuk memiliki setidaknya 1 GB per partisi.

  • Untuk Spark SQL, jumlah partisi output dikontrol oleh spark.sql.shuffle.partitions.

  • Untuk tugas Spark yang menggunakan RDD API, Anda dapat menentukan jumlah partisi output atau menetapkan spark.default.parallelism.

Penyesuaian shuffle untuk shuffle worker primer

Properti yang paling signifikan adalah --properties yarn:spark.shuffle.io.serverThreads=<num-threads>. Perhatikan bahwa ini adalah properti YARN tingkat cluster karena server shuffle Spark berjalan sebagai bagian dari Node Manager. Nilai defaultnya adalah dua kali (2x) jumlah core di mesin (misalnya, 16 thread di n1-highmem-8). Jika "Shuffle Read Blocked Time" lebih besar dari 1 detik, dan worker primer belum mencapai batas jaringan, CPU, atau disk, pertimbangkan untuk meningkatkan jumlah thread server shuffle.

Pada jenis mesin yang lebih besar, pertimbangkan untuk meningkatkan spark.shuffle.io.numConnectionsPerPeer, yang secara default adalah 1. (Misalnya, tetapkan ke 5 koneksi per pasangan host).

Meningkatkan upaya coba ulang

Jumlah maksimum upaya yang diizinkan untuk master aplikasi, tugas, dan tahap dapat dikonfigurasi dengan menetapkan properti berikut:

yarn:yarn.resourcemanager.am.max-attempts
spark:spark.task.maxFailures
spark:spark.stage.maxConsecutiveAttempts

Karena master aplikasi dan tugas lebih sering dihentikan di cluster yang menggunakan banyak preemptible VM atau penskalaan otomatis tanpa penghentian tuntas, meningkatkan nilai properti sebelumnya di cluster tersebut dapat membantu (perhatikan bahwa penggunaan EFM dengan Spark dan penghentian tuntas tidak didukung).

Penghentian tuntas YARN pada cluster EFM

Penghentian Tuntas YARN dapat digunakan untuk menghapus node dengan cepat dan dampak minimal pada aplikasi yang sedang berjalan. Untuk cluster penskalaan otomatis, waktu tunggu penghentian tuntas dapat ditetapkan dalam AutoscalingPolicy yang terlampir ke cluster EFM.

Peningkatan EFM untuk penghentian tuntas

  1. Karena data perantara disimpan dalam sistem file terdistribusi, node dapat dihapus dari cluster EFM segera setelah semua penampung yang berjalan di node tersebut selesai. Sebagai perbandingan, node tidak dihapus di cluster Managed Service untuk Apache Spark standar hingga aplikasi selesai.

  2. Penghapusan node tidak menunggu master aplikasi yang berjalan di node selesai. Saat penampung master aplikasi dihentikan, penampung tersebut akan dijadwalkan ulang di node lain yang tidak dihentikan. Progres tugas tidak hilang: master aplikasi baru dengan cepat memulihkan status dari master aplikasi sebelumnya dengan membaca histori tugas.