Mode Fleksibilitas yang Disempurnakan Dataproc

Mode Fleksibilitas yang Ditingkatkan (EFM) Dataproc mengelola data shuffle untuk meminimalkan penundaan progres tugas yang disebabkan oleh penghapusan node dari cluster yang sedang berjalan. EFM memindahkan data shuffle dengan menulis data ke pekerja utama. Pekerja menarik dari node jarak jauh tersebut selama fase pengurangan. Mode ini hanya tersedia untuk tugas Spark.

Karena EFM tidak menyimpan data shuffle perantara pada pekerja sekunder, EFM sangat cocok untuk cluster yang menggunakan preemptible VM atau hanya menskalakan otomatis grup pekerja sekunder.

EFM didukung di Dataproc 2.0.31+, 2.1.6+, 2.2+, dan versi image yang lebih baru.

Batasan:

  • Tugas Apache Hadoop YARN yang tidak mendukung relokasi AppMaster dapat gagal dalam Mode Fleksibilitas yang Ditingkatkan (lihat Kapan harus menunggu AppMaster selesai).
  • Mode Fleksibilitas yang Ditingkatkan tidak direkomendasikan:
    • pada cluster yang hanya memiliki pekerja primer
    • pada tugas streaming karena dapat memerlukan waktu hingga 30 menit setelah tugas selesai untuk membersihkan data shuffle perantara.
    • pada cluster yang menjalankan notebook karena data shuffle mungkin tidak dibersihkan selama masa aktif sesi.
    • saat tugas Spark berjalan di cluster dengan penghentian tuntas diaktifkan. Penonaktifan yang lancar dan EFM dapat bekerja dengan tujuan yang berbeda karena mekanisme penonaktifan yang lancar YARN mempertahankan node DECOMMISSIONING hingga semua aplikasi yang terlibat selesai.
    • di cluster yang menjalankan tugas Spark dan non-Spark.
  • Mode Fleksibilitas yang Ditingkatkan tidak didukung:
    • saat penskalaan otomatis worker utama diaktifkan. Dalam sebagian besar kasus, pekerja utama akan terus menyimpan data pengacakan yang tidak otomatis dimigrasikan. Pengecilan grup worker primer meniadakan manfaat EFM.

Menggunakan Mode Fleksibilitas yang Ditingkatkan

Fleksibilitas yang Ditingkatkan diaktifkan saat Anda membuat cluster dengan menetapkan dataproc:efm.spark.shuffle properti cluster ke primary-worker.

Contoh:

gcloud dataproc clusters create cluster-name \
    --region=region \
    --properties=dataproc:efm.spark.shuffle=primary-worker \
     other flags ...

Contoh Apache Spark

  1. Jalankan tugas WordCount terhadap teks Shakespeare publik menggunakan contoh Spark JAR di cluster EFM.
    gcloud dataproc jobs submit spark \
        --cluster=cluster-name \
        --region=region \
        --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
        --class=org.apache.spark.examples.JavaWordCount \
        -- gs://apache-beam-samples/shakespeare/macbeth.txt
    

Mengonfigurasi SSD lokal

Karena EFM menulis data shuffle perantara ke disk yang terpasang ke VM, EFM diuntungkan dari throughput dan IOPS tambahan yang disediakan oleh SSD lokal. Untuk memfasilitasi alokasi resource, tetapkan target sekitar 1 partisi SSD lokal per 4 vCPU saat mengonfigurasi mesin pekerja utama.

Untuk melampirkan SSD lokal, teruskan tanda --num-worker-local-ssds ke perintah gcloud Dataproc clusters create.

Umumnya, Anda tidak memerlukan SSD lokal pada pekerja sekunder. Menambahkan SSD lokal ke pekerja sekunder cluster (menggunakan flag --num-secondary-worker-local-ssds) sering kali kurang penting karena pekerja sekunder tidak menulis data shuffle secara lokal. Namun, karena SSD lokal meningkatkan performa disk lokal, Anda dapat memutuskan untuk menambahkan SSD lokal ke pekerja sekunder jika Anda memperkirakan tugas akan terikat I/O karena penggunaan disk lokal: tugas Anda menggunakan disk lokal yang signifikan untuk ruang sementara atau partisi Anda terlalu besar untuk dimuat dalam memori dan akan meluap ke disk.

Rasio pekerja sekunder

Karena worker sekunder menulis data shuffle ke worker primer, cluster Anda harus berisi jumlah worker primer yang memadai dengan CPU, memori, dan resource disk yang memadai untuk mengakomodasi beban shuffle tugas Anda. Untuk cluster penskalaan otomatis, guna mencegah grup primer melakukan penskalaan dan menyebabkan perilaku yang tidak diinginkan, tetapkan minInstances ke nilai maxInstances dalam kebijakan penskalaan otomatis untuk grup worker primer.

Jika Anda memiliki rasio pekerja sekunder terhadap primer yang tinggi (misalnya, 10:1), pantau penggunaan CPU, jaringan, dan disk pekerja primer untuk menentukan apakah mereka kelebihan beban. Untuk melakukannya:

  1. Buka halaman VM instances di konsolGoogle Cloud .

  2. Klik kotak centang di sisi kiri pekerja utama.

  3. Klik tab MONITORING untuk melihat Penggunaan CPU, IOPS Disk, Byte Jaringan, dan metrik lainnya dari pekerja utama.

Jika worker primer kelebihan beban, pertimbangkan untuk melakukan penskalaan worker primer secara manual.

Mengubah ukuran grup pekerja utama

Grup pekerja utama dapat diskalakan dengan aman, tetapi menurunkan skala grup pekerja utama dapat berdampak negatif pada progres tugas. Operasi yang menurunkan skala grup worker utama harus menggunakan penghentian tuntas, yang diaktifkan dengan menyetel flag --graceful-decommission-timeout.

Cluster yang diskalakan otomatis: Penskalaan grup worker utama dinonaktifkan di cluster EFM dengan kebijakan penskalaan otomatis. Untuk mengubah ukuran grup worker utama pada cluster yang diskalakan otomatis:

  1. Nonaktifkan penskalaan otomatis.

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --disable-autoscaling
    

  2. Menskalakan grup utama.

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --num-workers=num-primary-workers \
        --graceful-decommission-timeout=graceful-decommission-timeout # (if downscaling)
    

  3. Aktifkan kembali penskalaan otomatis:

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --autoscaling-policy=autoscaling-policy
    

Memantau penggunaan disk pekerja utama

Worker primer harus memiliki ruang disk yang cukup untuk data shuffle cluster. Anda dapat memantaunya secara tidak langsung melalui metrik remaining HDFS capacity. Saat disk lokal penuh, ruang penyimpanan tidak tersedia untuk HDFS, dan kapasitas yang tersisa berkurang.

Secara default, saat penggunaan disk lokal pekerja utama melebihi 90% kapasitas, node akan ditandai sebagai TIDAK SEHAT di UI node YARN. Jika mengalami masalah kapasitas disk, Anda dapat menghapus data yang tidak digunakan dari HDFS atau menskalakan kumpulan worker utama.

Konfigurasi lanjutan

Partisi dan paralelisme

Saat mengirimkan tugas Spark, konfigurasi tingkat partisi yang sesuai. Menentukan jumlah partisi input dan output untuk tahap shuffle melibatkan pertukaran di antara berbagai karakteristik performa. Sebaiknya bereksperimenlah dengan nilai yang sesuai untuk bentuk tugas Anda.

Partisi input

Partisi input Spark dan MapReduce ditentukan oleh set data input. Saat membaca file dari Cloud Storage, setiap tugas memproses data yang setara dengan sekitar satu "ukuran blok".

  • Untuk tugas Spark SQL, ukuran partisi maksimum dikontrol oleh spark.sql.files.maxPartitionBytes. Pertimbangkan untuk menambahkannya menjadi 1 GB: spark.sql.files.maxPartitionBytes=1073741824.

  • Untuk RDD Spark, ukuran partisi biasanya dikontrol dengan fs.gs.block.size, yang secara default bernilai 128 MB. Pertimbangkan untuk meningkatkannya menjadi 1 GB. Contoh: --properties spark.hadoop.fs.gs.block.size=1073741824

Partisi output

Jumlah tugas dalam tahap berikutnya dikontrol oleh beberapa properti. Untuk tugas yang lebih besar yang memproses lebih dari 1 TB, sebaiknya tetapkan setidaknya 1 GB per partisi.

  • Untuk Spark SQL, jumlah partisi output dikontrol oleh spark.sql.shuffle.partitions.

  • Untuk tugas Spark yang menggunakan RDD API, Anda dapat menentukan jumlah partisi output atau menetapkan spark.default.parallelism.

Penyesuaian shuffle untuk shuffle pekerja utama

Properti yang paling signifikan adalah --properties yarn:spark.shuffle.io.serverThreads=<num-threads>. Perhatikan bahwa ini adalah properti YARN tingkat cluster karena server shuffle Spark berjalan sebagai bagian dari Node Manager. Nilai defaultnya adalah dua kali (2x) jumlah core di mesin (misalnya, 16 thread di n1-highmem-8). Jika "Waktu Diblokir Pembacaan Shuffle" lebih besar dari 1 detik, dan worker primer belum mencapai batas jaringan, CPU, atau disk, pertimbangkan untuk meningkatkan jumlah thread server shuffle.

Pada jenis mesin yang lebih besar, pertimbangkan untuk meningkatkan spark.shuffle.io.numConnectionsPerPeer, yang secara default adalah 1. (Misalnya, tetapkan ke 5 koneksi per pasangan host).

Meningkatkan percobaan ulang

Jumlah maksimum upaya yang diizinkan untuk master aplikasi, tugas, dan tahap dapat dikonfigurasi dengan menyetel properti berikut:

yarn:yarn.resourcemanager.am.max-attempts
spark:spark.task.maxFailures
spark:spark.stage.maxConsecutiveAttempts

Karena master dan tugas aplikasi lebih sering dihentikan di cluster yang menggunakan banyak preemptible VM atau penskalaan otomatis tanpa penghentian tuntas, meningkatkan nilai properti sebelumnya di cluster tersebut dapat membantu (perhatikan bahwa penggunaan EFM dengan Spark dan penghentian tuntas tidak didukung).

Penghentian tuntas YARN pada cluster EFM

Penghentian Tuntas YARN dapat digunakan untuk menghapus node dengan cepat dan dampak minimal pada aplikasi yang sedang berjalan. Untuk cluster penskalaan otomatis, waktu tunggu penghentian tuntas dapat ditetapkan dalam AutoscalingPolicy yang terlampir ke cluster EFM.

Peningkatan EFM untuk penghentian tuntas

  1. Karena data perantara disimpan dalam sistem file terdistribusi, node dapat dihapus dari cluster EFM segera setelah semua penampung yang berjalan di node tersebut selesai. Sebagai perbandingan, node tidak dihapus di cluster Dataproc standar hingga aplikasi selesai.

  2. Penghapusan node tidak menunggu hingga master aplikasi yang berjalan di node selesai. Saat penampung master aplikasi dihentikan, penampung tersebut dijadwalkan ulang di node lain yang tidak dihentikan. Progres tugas tidak hilang: master aplikasi baru dengan cepat memulihkan status dari master aplikasi sebelumnya dengan membaca histori tugas.