管道會在機器叢集上執行。這類叢集會將需要完成的工作分割成多個部分,然後在叢集中分散的多個執行器上平行執行工作,藉此實現高處理量。一般來說,分割 (也稱為分區) 數量越多,管道的執行速度就越快。管道中的來源和隨機播放階段會決定管道的平行處理層級。
來源
在每次管道執行作業開始時,管道中的每個來源都會計算需要讀取的資料,以及如何將資料分割。舉例來說,假設有個基本管道會從 Cloud Storage 讀取資料、執行一些 Wrangler 轉換,然後寫回 Cloud Storage。

管道啟動時,Cloud Storage 來源會檢查輸入檔案,並根據檔案大小將檔案分割。舉例來說,1 GB 的檔案可以分割成 100 個檔案,每個檔案大小為 10 MB。每個執行器都會讀取該分割的資料、執行 Wrangler 轉換,然後將輸出內容寫入 part 檔案。
如果管道執行速度緩慢,首先要檢查的項目之一,就是來源是否建立足夠的分裂,以充分運用平行處理。舉例來說,某些類型的壓縮會導致純文字檔案無法分割。如果您讀取的是經過 gzip 壓縮的檔案,可能會發現管道的執行速度比讀取未壓縮的檔案或以 BZIP (可分割) 壓縮的檔案慢得多。同樣地,如果您使用資料庫來源,並將其設定為只使用單一分割,執行速度會比設定為使用更多分割慢得多。
隨機播放
某些類型的外掛程式會導致資料在叢集中隨機排列。當一個執行器處理的記錄需要傳送至另一個執行器進行運算時,就會發生這種情況。由於重組作業涉及大量 I/O,因此成本高昂。造成資料重組的外掛程式都會顯示在 Pipeline Studio 的「Analytics」部分。包括 Group By、Deduplicate、Distinct 和 Joiner 等外掛程式。舉例來說,假設在上述範例中,管道中新增了「依階段分組」。
假設讀取的資料代表在雜貨店進行的購買交易。每筆記錄都包含 item 欄位和 num_purchased 欄位。在「Group By」(分組依據) 階段,我們會設定管道,根據 item 欄位將記錄分組,並計算 num_purchased 欄位的總和。
管道執行時,輸入檔案會如先前所述分割。之後,每個記錄都會在叢集中隨機排序,因此具有相同項目的每個記錄都會屬於同一個執行器。
如上例所示,Apple 購買交易的記錄原本分散在多個執行者之間。如要執行彙整作業,所有記錄都必須傳送至叢集中的同一執行器。
大多數需要隨機重組的外掛程式都允許您指定隨機重組資料時要使用的分割區數量。這項設定可控管用於處理重組資料的執行器數量。
在上述範例中,如果分割區數量設為 2,每個執行器會計算兩個項目的匯總,而不是一個。
請注意,之後可以降低管道的平行處理程度。舉例來說,請參考管道的邏輯檢視畫面:
如果來源將資料劃分為 500 個分區,但「Group By」使用 200 個分區進行重組,則「Group By」之後的平行處理上限會從 500 降至 200。您只會有 200 個檔案,而不是 500 個。
選擇分區
如果分割區數量太少,您就無法充分運用叢集的完整容量,盡可能平行處理工作。如果分區設定過高,會增加不必要的負擔。一般來說,分區數量過多會比過少好。如果管道需要幾分鐘才能執行,而您想節省幾分鐘,就必須留意額外負荷。如果管道需要數小時才能執行完畢,一般來說,您不需要擔心額外負荷。
判斷要使用的分區數量時,最簡單但過於簡化的方法是將其設為 max(cluster CPUs, input records / 500,000)。換句話說,請將輸入記錄數除以 500,000。如果該數字大於叢集 CPU 數量,請將該數字做為分割區數量。否則請使用叢集 CPU 數量。舉例來說,如果叢集有 100 個 CPU,且預期重組階段會有 1 億筆輸入記錄,請使用 200 個分割區。
更完整的答案是,當每個分區的中間重組資料完全符合執行器的記憶體時,重組效能最佳,因此不需要溢出到磁碟。Spark 會保留執行器記憶體中略低於 30% 的空間,用於保存重組資料。確切的數字為 (總記憶體 - 300 MB) * 30%。假設每個執行器都設定為使用 2 GB 記憶體,這表示每個分割區的記錄不得超過 (2 GB - 300 MB) * 30% = 約 500 MB。假設每筆記錄壓縮後的大小為 1 KB,則每個分割區的記錄數為 (500 MB/分割區) / (1 KB/記錄) = 500,000 筆記錄/分割區。如果執行器使用的記憶體較多,或記錄較小,您可以視情況調整這個數字。
資料偏移
請注意,在上述範例中,各項商品的購買次數平均分配。也就是說,蘋果、香蕉、紅蘿蔔和雞蛋各有三筆購買交易。在平均分配的鍵上進行重組是效能最高的重組類型,但許多資料集沒有這項屬性。延續上例的雜貨店購物,您會預期雞蛋的購買次數遠多於結婚賀卡。如果某些重組鍵比其他鍵常見許多,表示您正在處理偏斜資料。與未傾斜的資料相比,傾斜的資料效能可能會大幅降低,因為只有少數幾個執行器會執行不成比例的工作量。導致一小部分分區遠大於其他分區。
在這個例子中,能量蛋的購買次數是卡片的五倍,這表示計算能量蛋匯總資料的時間大約是卡片的五倍。如果處理的不是兩筆記錄,而是十筆記錄,這點差異不大,但如果處理的不是十億筆記錄,而是五十億筆記錄,這點就非常重要。如果資料偏斜,洗牌作業使用的分割區數量對管道效能不會有太大影響。
您可以查看一段時間內的輸出記錄圖表,判斷資料是否偏斜。如果管道執行開始時,階段輸出記錄的速度快得多,然後突然變慢,這可能表示資料有偏差。
您也可以檢查叢集記憶體用量隨時間的變化,判斷是否有資料偏斜。如果叢集一段時間都處於容量上限,但突然記憶體用量偏低,這也是資料偏斜的徵兆。
執行聯結時,資料偏斜對效能的影響最大。有幾種技巧可用來提升傾斜聯結的效能。詳情請參閱「平行處理 JOIN 作業」。
自動調整執行作業
如要適應性調整執行作業,請指定要使用的分區範圍,而非確切的分區編號。啟用適應性執行時,系統會忽略確切的分割區編號,即使是在管道設定中設定也一樣。
如果您使用暫時的 Managed Service for Apache Spark 叢集,Cloud Data Fusion 會自動設定適當的設定,但如果是靜態的 Managed Service for Apache Spark 或 Hadoop 叢集,則可以設定下列兩個設定參數:
spark.default.parallelism:設為叢集中可用的 vCore 總數。這可確保叢集不會負載不足,並定義分區數的下限。spark.sql.adaptive.coalescePartitions.initialPartitionNum:將其設為叢集中可用 vCore 數量的 32 倍。這會定義分區數量的上限。Spark.sql.adaptive.enabled:如要啟用最佳化功能,請將這個值設為true。Apache Spark 代管服務會自動設定,但如果您使用一般 Hadoop 叢集,請務必啟用這項設定。
您可以在特定管道的引擎設定中,或在靜態 Managed Service for Apache Spark 叢集的叢集屬性中設定這些參數。
後續步驟
- 瞭解
JOIN作業的平行處理。