Tips penyesuaian tugas Spark

Bagian berikut memberikan tips untuk membantu Anda menyempurnakan aplikasi Spark Managed Service untuk Apache Spark.

Menggunakan cluster efemeral

Saat menggunakan model cluster "efemeral" Managed Service untuk Apache Spark, Anda akan membuat cluster khusus untuk setiap tugas, dan saat tugas selesai, Anda akan menghapus cluster tersebut. Dengan model efemeral, Anda dapat memperlakukan penyimpanan dan komputasi secara terpisah, menyimpan data input dan output tugas di Cloud Storage atau BigQuery, menggunakan cluster hanya untuk komputasi dan penyimpanan data sementara.

Masalah cluster persisten

Menggunakan cluster efemeral satu tugas akan menghindari masalah dan potensi masalah berikut yang terkait dengan penggunaan cluster "persisten" bersama dan berjangka panjang:

  • Titik kegagalan tunggal: status error cluster bersama dapat menyebabkan semua tugas gagal, sehingga memblokir seluruh pipeline data. Penyelidikan dan pemulihan dari error dapat memerlukan waktu berjam-jam. Karena cluster efemeral hanya menyimpan status sementara dalam cluster, jika terjadi error, cluster tersebut dapat dihapus dan dibuat ulang dengan cepat.
  • Kesulitan mempertahankan dan memigrasikan status cluster di HDFS, MySQL, atau sistem file lokal
  • Persaingan resource antar-tugas yang berdampak negatif pada SLO
  • Daemon layanan tidak responsif yang disebabkan oleh tekanan memori
  • Penumpukan log dan file sementara yang dapat melebihi kapasitas disk
  • Kegagalan penskalaan karena kehabisan stok zona cluster
  • Kurangnya dukungan untuk versi image cluster yang sudah tidak berlaku.

Manfaat cluster efemeral

Di sisi positifnya, cluster efemeral memungkinkan Anda melakukan hal berikut:

Menggunakan Spark SQL

The Spark SQL DataFrame API adalah pengoptimalan signifikan dari RDD API. Jika Anda berinteraksi dengan kode yang menggunakan RDD, sebaiknya baca data sebagai DataFrame sebelum meneruskan RDD dalam kode. Dalam kode Java atau Scala, sebaiknya gunakan Spark SQL Dataset API sebagai superset RDD dan DataFrame.

Menggunakan Apache Spark 3

Managed Service untuk Apache Spark 2.0 menginstal Spark 3, yang mencakup fitur dan peningkatan performa berikut:

  • Dukungan GPU
  • Kemampuan untuk membaca file biner
  • Peningkatan performa
  • Pemangkasan Partisi Dinamis
  • Eksekusi kueri adaptif, yang mengoptimalkan tugas Spark secara real time

Menggunakan Alokasi Dinamis

Apache Spark menyertakan fitur Alokasi Dinamis yang menskalakan jumlah eksekutor Spark pada pekerja dalam cluster. Fitur ini memungkinkan tugas menggunakan cluster Managed Service untuk Apache Spark sepenuhnya meskipun cluster tersebut di-scale up. Fitur ini diaktifkan secara default di Managed Service untuk Apache Spark (spark.dynamicAllocation.enabled ditetapkan ke true). Lihat Alokasi Dinamis Spark untuk mengetahui informasi selengkapnya.

Menggunakan penskalaan otomatis Managed Service untuk Apache Spark

Penskalaan Otomatis Managed Service untuk Apache Spark secara dinamis menambahkan dan menghapus pekerja Managed Service untuk Apache Spark dari cluster untuk membantu memastikan bahwa tugas Spark memiliki resource yang diperlukan agar dapat diselesaikan dengan cepat.

Sebaiknya konfigurasikan kebijakan penskalaan otomatis agar hanya menskalakan pekerja sekunder.

Menggunakan Mode Fleksibilitas yang Ditingkatkan Managed Service untuk Apache Spark

Cluster dengan VM preemptible atau kebijakan penskalaan otomatis mungkin menerima pengecualian FetchFailed saat pekerja di-preempt atau dihapus sebelum mereka selesai menayangkan data acak ke pereduksi. Pengecualian ini dapat menyebabkan percobaan ulang tugas dan waktu penyelesaian tugas yang lebih lama.

Rekomendasi: Gunakan Managed Service untuk Apache Spark Mode Fleksibilitas yang Ditingkatkan, yang tidak menyimpan data acak perantara pada pekerja sekunder, sehingga pekerja sekunder dapat di-preempt atau di-scale down dengan aman.

Mengonfigurasi partisi dan pengacakan

Spark menyimpan data dalam partisi sementara di cluster. Jika aplikasi Anda mengelompokkan atau menggabungkan DataFrame, aplikasi tersebut akan mengacak data ke dalam partisi baru sesuai dengan pengelompokan dan konfigurasi tingkat rendah.

Partisi data sangat memengaruhi performa aplikasi: terlalu sedikit partisi membatasi paralelisme tugas dan penggunaan resource cluster; terlalu banyak partisi memperlambat tugas karena pemrosesan dan pengacakan partisi tambahan.

Mengonfigurasi partisi

Properti berikut mengatur jumlah dan ukuran partisi Anda:

  • spark.sql.files.maxPartitionBytes: ukuran maksimum partisi saat Anda membaca data dari Cloud Storage. Defaultnya adalah 128 MB, yang cukup besar untuk sebagian besar aplikasi yang memproses kurang dari 100 TB.

  • spark.sql.shuffle.partitions: jumlah partisi setelah melakukan pengacakan. Defaultnya adalah 1000 untuk cluster versi image 2.2 dan yang lebih baru. Rekomendasi: Tetapkan nilai ini menjadi 3x jumlah vCPU di cluster Anda.

  • spark.default.parallelism: jumlah partisi yang ditampilkan setelah melakukan transformasi RDD yang memerlukan pengacakan, seperti join, reduceByKey, dan parallelize. Defaultnya adalah jumlah total vCPU di cluster Anda. Saat menggunakan RDD dalam tugas Spark, Anda dapat menetapkan angka ini menjadi 3x vCPU Anda

Membatasi jumlah file

Ada penurunan performa saat Spark membaca file kecil dalam jumlah besar. Simpan data dalam ukuran file yang lebih besar, misalnya, ukuran file dalam rentang 256 MB–512 MB. Demikian pula, batasi jumlah file output (untuk memaksa pengacakan, lihat Menghindari pengacakan yang tidak perlu).

Mengonfigurasi eksekusi kueri adaptif (Spark 3)

Eksekusi kueri adaptif (diaktifkan secara default di Managed Service untuk Apache Spark versi image 2.0) memberikan peningkatan performa tugas Spark, termasuk:

Meskipun setelan konfigurasi default sudah tepat untuk sebagian besar kasus penggunaan, menetapkan spark.sql.adaptive.advisoryPartitionSizeInBytes ke spark.sql.files.maxPartitionBytes (default 128 MB) dapat bermanfaat.

Menghindari pengacakan yang tidak perlu

Spark memungkinkan pengguna memicu pengacakan secara manual untuk menyeimbangkan kembali data mereka dengan fungsi repartition. Pengacakan mahal, jadi pengacakan ulang data harus digunakan dengan hati-hati. Menetapkan konfigurasi partisi dengan tepat sudah cukup untuk memungkinkan Spark mempartisi data Anda secara otomatis.

Pengecualian: Saat menulis data yang dipartisi kolom ke Cloud Storage, mempartisi ulang pada kolom tertentu akan menghindari penulisan banyak file kecil untuk mencapai waktu penulisan yang lebih cepat.

df.repartition("col_name").write().partitionBy("col_name").save("gs://...")

Menyimpan data dalam Parquet atau Avro

Spark SQL secara default membaca dan menulis data dalam file Parquet terkompresi Snappy. Parquet adalah format file kolom yang efisien yang memungkinkan Spark hanya membaca data yang diperlukan untuk menjalankan aplikasi. Ini adalah keuntungan penting saat bekerja dengan set data yang besar. Format kolom lainnya, seperti Apache ORC, juga berperforma baik.

Untuk data non-kolom, Apache Avro menyediakan format file baris biner yang efisien. Meskipun biasanya lebih lambat daripada Parquet, performa Avro lebih baik daripada format berbasis teks, seperti CSV atau JSON.

Mengoptimalkan ukuran disk

Throughput disk persisten diskalakan dengan ukuran disk, yang dapat memengaruhi performa tugas Spark karena tugas menulis metadata dan mengacak data ke disk. Saat menggunakan disk persisten standar, ukuran disk harus minimal 1 terabyte per pekerja (lihat Performa menurut ukuran disk persisten).

Untuk memantau throughput disk pekerja di Google Cloud konsol:

  1. Klik nama cluster di halaman Clusters page.
  2. Klik tab VM INSTANCES.
  3. Klik nama pekerja mana pun.
  4. Klik tab MONITORING, lalu scroll ke Disk Throughput untuk melihat throughput pekerja.

Pertimbangan disk

Cluster Managed Service untuk Apache Spark efemeral, yang tidak mendapatkan manfaat dari penyimpanan persisten, dapat menggunakan SSD lokal. SSD lokal terpasang secara fisik ke cluster dan memberikan throughput yang lebih tinggi daripada disk persisten (lihat tabel Performa). SSD lokal tersedia dalam ukuran tetap 375 gigabyte, tetapi Anda dapat menambahkan beberapa SSD untuk meningkatkan performa.

SSD lokal tidak mempertahankan data setelah cluster dimatikan. Jika memerlukan penyimpanan persisten, Anda dapat menggunakan disk persisten SSD, yang memberikan throughput lebih tinggi untuk ukurannya daripada disk persisten standar. Disk persisten SSD juga merupakan pilihan yang baik jika ukuran partisi akan lebih kecil dari 8 KB (namun, hindari partisi kecil).

Melampirkan GPU ke cluster

Spark 3 mendukung GPU. Gunakan GPU dengan tindakan inisialisasi RAPIDS untuk mempercepat tugas Spark menggunakan RAPIDS SQL Accelerator. Tindakan inisialisasi driver GPU untuk mengonfigurasi cluster dengan GPU.

Kegagalan dan perbaikan tugas umum

Bagian berikut membahas penyebab umum dan solusi untuk kegagalan tugas.

Kehabisan Memori

Contoh:

  • "Lost executor"
  • "java.lang.OutOfMemoryError: GC overhead limit exceeded"
  • "Container killed by YARN for exceeding memory limits"

Kemungkinan perbaikan:

Kegagalan Pengambilan Acak

Contoh:

  • "FetchFailedException" (error Spark)
  • "Failed to connect to..." (error Spark)
  • "Failed to fetch" (error MapReduce)

Biasanya disebabkan oleh penghapusan pekerja yang masih memiliki data acak untuk ditayangkan sebelum waktunya.

Kemungkinan penyebab dan perbaikan:

  • VM pekerja preemptible diklaim kembali atau VM pekerja non-preemptible dihapus oleh penskala otomatis. Solusi: Gunakan Mode Fleksibilitas yang Ditingkatkan untuk membuat pekerja sekunder dapat di-preempt atau di-scale dengan aman.
  • Eksekutor atau mapper mengalami error karena error OutOfMemory. Solusi: tingkatkan memori eksekutor atau mapper.
  • Layanan pengacakan Spark mungkin mengalami kelebihan beban. Solusi: kurangi jumlah partisi tugas.

Node YARN dalam status UNHEALTHY

Contoh (dari log YARN):

...reported UNHEALTHY with details: 1/1 local-dirs usable space is below
configured utilization percentage/no more usable space
[ /hadoop/yarn/nm-local-dir : used space above threshold of 90.0% ]

Sering kali terkait dengan ruang disk yang tidak cukup untuk data acak. Lakukan diagnosis dengan melihat file log:

  • Buka halaman Clusters project Anda di Google Cloud konsol, lalu klik nama cluster.
  • Klik VIEW LOGS.
  • Filter log menurut hadoop-yarn-nodemanager.
  • Cari "UNHEALTHY".

Kemungkinan Perbaikan:

  • Cache pengguna disimpan dalam direktori yang ditentukan oleh properti yarn.nodemanager.local-dirs dalam yarn-site.xml file. File ini terletak di /etc/hadoop/conf/yarn-site.xml. Anda dapat memeriksa ruang kosong di jalur /hadoop/yarn/nm-local-dir, dan mengosongkan ruang dengan menghapus folder cache pengguna /hadoop/yarn/nm-local-dir/usercache.
  • Jika log melaporkan status "UNHEALTHY", buat ulang cluster Anda dengan ruang disk yang lebih besar, yang akan meningkatkan batas throughput.

Cache pengguna menghabiskan ruang disk

Di Managed Service untuk Apache Spark, cache pengguna disimpan dalam direktori yang ditentukan oleh yarn.nodemanager.local-dirs properti di yarn-site.xml (/etc/hadoop/conf/yarn-site.xml). Dalam sebagian besar kasus, lokasinya adalah /hadoop/yarn/nm-local-dir.

Kemungkinan Perbaikan:

  • Pembersihan manual: Periksa ruang kosong di jalur /hadoop/yarn/nm-local-dir, lalu kosongkan ruang dengan menghapus folder cache pengguna /hadoop/yarn/nm-local-dir/usercache.
  • Resolusi jangka panjang: Kelola proses pembersihan dengan menetapkan properti cluster berikut:

    • yarn.nodemanager.localizer.cache.cleanup.interval-ms
    • yarn.nodemanager.localizer.cache.target-size-mb

    Jika tidak ingin membuat ulang cluster, Anda dapat memperbarui properti ini di yarn-site.xml di semua node, lalu memulai ulang NodeManager dengan menjalankan perintah berikut di node pekerja:

    sudo systemctl restart hadoop-yarn-nodemanager.service
    
  • Mengubah ukuran cluster: Jika log melaporkan status "UNHEALTHY", buat ulang cluster Anda dengan ruang disk yang lebih besar, yang akan meningkatkan batas throughput.

Tugas gagal karena memori driver tidak cukup

Saat menjalankan tugas dalam mode cluster, tugas akan gagal jika ukuran memori node pekerja lebih kecil dari ukuran memori driver.

Contoh dari log driver:

'Exception in thread "main" java.lang.IllegalArgumentException:
Required AM memory (32768+3276 MB) is above the max threshold (12288 MB) of this cluster!
Please check the values of 'yarn.scheduler.maximum -allocation-mb' and/or 'yarn.nodemanager.resource.memory-mb'.'

Kemungkinan Perbaikan:

  • Tetapkan spark:spark.driver.memory kurang dari yarn:yarn.scheduler.maximum-allocation-mb.
  • Gunakan jenis mesin yang sama untuk node master dan pekerja.

Langkah berikutnya