Managed Service for Apache Spark 彈性增強模式

Managed Service for Apache Spark 強化版彈性模式 (EFM) 會管理重組資料,讓執行中叢集的節點移除作業造成的工作進度延遲降到最低。EFM 會將資料寫入主要工作站,藉此卸載重組資料。工作站會在縮減階段從這些遠端節點提取資料。這個模式僅適用於 Spark 工作。

由於 EFM 不會在次要 worker 上儲存中繼重組資料,因此非常適合使用先占 VM 的叢集,或僅自動調度資源次要 worker 群組的叢集。

Managed Service for Apache Spark 支援 EFM,2.0.31+2.1.6+2.2+ 和後續映像檔版本皆適用。

限制:

  • 不支援 AppMaster 重新安置的 Apache Hadoop YARN 工作,可能會在增強彈性模式下失敗 (請參閱「何時等待 AppMaster 完成」)。
  • 不建議使用強化版彈性模式:
    • 僅有主要 worker 的叢集
    • 因為工作完成後,最多可能需要 30 分鐘才能清理中繼重組資料。
    • 在執行筆記本的叢集上,因為系統可能不會在工作階段期間清除重組資料。
    • 在啟用安全停用的叢集上執行 Spark 工作時。安全停用機制和 EFM 可能會互相衝突,因為 YARN 安全停用機制會保留 DECOMMISSIONING 節點,直到所有相關應用程式完成為止。
    • 在同時執行 Spark 和非 Spark 工作的叢集上。
  • 不支援強化版彈性模式:
    • 啟用主要工作人員自動調度資源功能時,在大多數情況下,主要工作人員會繼續儲存未自動遷移的重組資料。縮減主要工作站群組會抵銷 EFM 的優點。

使用強化版彈性模式

建立叢集時,將 dataproc:efm.spark.shuffle 叢集屬性設為 primary-worker,即可啟用強化版彈性模式。

範例:

gcloud dataproc clusters create cluster-name \
    --region=region \
    --properties=dataproc:efm.spark.shuffle=primary-worker \
     other flags ...

Apache Spark 範例

  1. 使用 EFM 叢集上的 Spark 範例 JAR,對公開的莎士比亞文字執行 WordCount 工作。
    gcloud dataproc jobs submit spark \
        --cluster=cluster-name \
        --region=region \
        --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
        --class=org.apache.spark.examples.JavaWordCount \
        -- gs://apache-beam-samples/shakespeare/macbeth.txt
    

設定本機 SSD

由於 EFM 會將中繼重組資料寫入 VM 連接的磁碟,因此本機 SSD 提供的額外處理量和 IOPS 有助於提升效能。為方便資源分配,設定主要工作站機器時,請以每 4 個 vCPU 約 1 個本機 SSD 分區為目標。

如要連結本機 SSD,請將 --num-worker-local-ssds 旗標傳遞至 gcloud Managed Service for Apache Spark clusters create 指令。

一般來說,次要工作站不需要本機 SSD。 將本機 SSD 新增至叢集的次要工作人員 (使用 --num-secondary-worker-local-ssds 旗標) 通常較不重要,因為次要工作人員不會在本機寫入重組資料。不過,由於本機 SSD 可提升本機磁碟效能,如果預期工作會因使用本機磁碟而受 I/O 限制,您可能會決定在本機磁碟使用量偏高時,將本機 SSD 新增至次要工作者,或分割區過大而無法放入記憶體,並溢出至磁碟

次要 worker 比例

由於次要 worker 會將 Shuffle 資料寫入主要 worker,因此叢集必須包含足夠數量的主要 worker,並具備充足的 CPU、記憶體和磁碟資源,才能因應工作的 Shuffle 負載。如果是自動調度資源叢集,請在主要 worker 群組的自動調度政策中,將 minInstances 設為 maxInstances 值,防止主要群組調度資源,導致不當行為。

如果次要工作站與主要工作站的比率偏高 (例如 10:1),請監控主要工作站的 CPU 使用率、網路和磁碟用量,判斷是否過載。現在說明一下操作方式:

  1. 前往Google Cloud 控制台的「VM instances」(VM 執行個體) 頁面。

  2. 按一下主要工作人員左側的核取方塊。

  3. 按一下「監控」分頁標籤,即可查看主要 worker 的 CPU 使用率、磁碟 IOPS、網路位元組和其他指標。

如果主要 worker 負載過重,請考慮手動擴充主要 worker

調整主要工作站群組的大小

主要工作站群組可以安全地擴充,但縮減主要工作站群組可能會對工作進度造成負面影響。縮減主要 worker 群組規模的作業應使用安全停用程序,只要設定 --graceful-decommission-timeout 旗標即可啟用這項程序。

自動調度資源叢集:如果 EFM 叢集採用自動調度資源政策,系統會停用主要工作站群組調度資源功能。如要調整自動調度資源叢集中的主要 worker 群組大小,請按照下列步驟操作:

  1. 停用自動調度資源功能。

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --disable-autoscaling
    

  2. 擴充主要群組。

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --num-workers=num-primary-workers \
        --graceful-decommission-timeout=graceful-decommission-timeout # (if downscaling)
    

  3. 重新啟用自動調度資源功能:

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --autoscaling-policy=autoscaling-policy
    

監控主要工作站磁碟用量

主要工作站必須有足夠的磁碟空間,才能儲存叢集的隨機資料。您可以透過 remaining HDFS capacity 指標間接監控這項指標。本機磁碟空間用盡時,HDFS 就無法使用空間,剩餘容量也會減少。

根據預設,當主要工作站的本機磁碟用量超過容量的 90% 時,節點會在 YARN 節點 UI 中標示為 UNHEALTHY。如果遇到磁碟容量問題,可以從 HDFS 刪除未使用的資料,或擴充主要工作站集區。

進階設定

分區與平行處理

提交 Spark 工作時,請設定適當的分割層級。決定重組階段的輸入和輸出分割區數量時,需要權衡不同的效能特徵。建議您嘗試適合工作形狀的值。

輸入分區

Spark 和 MapReduce 輸入分區是由輸入資料集決定。從 Cloud Storage 讀取檔案時,每個工作會處理約一個「區塊大小」的資料。

  • 如果是 Spark SQL 工作,最大分割區大小由 spark.sql.files.maxPartitionBytes 控制。建議您將大小增加至 1 GB:spark.sql.files.maxPartitionBytes=1073741824

  • 如果是 Spark RDD,通常會使用 fs.gs.block.size 控制分割區大小,預設為 128 MB。建議增加至 1 GB。 範例:--properties spark.hadoop.fs.gs.block.size=1073741824

輸出分區

後續階段的工作數量由多項屬性控管。 如果處理量超過 1 TB,建議每個分區至少有 1 GB 的空間。

  • 如果是 Spark SQL,輸出分割區的數量由 spark.sql.shuffle.partitions 控制。

  • 如果是使用 RDD API 的 Spark 工作,您可以指定輸出分割區數量或設定 spark.default.parallelism

主要工作站重組的重組調整

最重要的屬性是 --properties yarn:spark.shuffle.io.serverThreads=<num-threads>。 請注意,這是叢集層級的 YARN 屬性,因為 Spark 隨機播放伺服器會以節點管理員的一部分執行。預設值為機器上的核心數的兩倍 (例如 n1-highmem-8 上的 16 個執行緒)。如果「Shuffle Read Blocked Time」大於 1 秒,且主要 worker 尚未達到網路、CPU 或磁碟限制,請考慮增加 Shuffle 伺服器執行緒數量。

如果是較大的機器類型,請考慮增加 spark.shuffle.io.numConnectionsPerPeer (預設為 1)。(例如,將每個主機配對的連線數設為 5)。

增加重試次數

您可以設定下列屬性,為應用程式主控項、工作和階段設定允許的嘗試次數上限:

yarn:yarn.resourcemanager.am.max-attempts
spark:spark.task.maxFailures
spark:spark.stage.maxConsecutiveAttempts

在大量使用先佔 VM 的叢集,或未執行安全停用程序就自動調度資源的叢集中,應用程式主機和工作會更頻繁地終止,因此在這些叢集中增加上述屬性的值會有幫助 (請注意,不支援搭配 Spark 使用 EFM 和安全停用程序)。

EFM 叢集上的 YARN 安全停用程序

YARN 安全停用程序可快速移除節點,對執行中的應用程式影響極小。如要自動調度叢集資源,可以在附加至 EFM 叢集的 AutoscalingPolicy 中設定安全停用逾時

EFM 強化安全停用程序

  1. 由於中繼資料儲存在分散式檔案系統中,因此只要節點上執行的所有容器都已完成作業,即可從 EFM 叢集中移除節點。相較之下,在標準的 Managed Service for Apache Spark 叢集上,節點不會移除,直到應用程式完成為止。

  2. 節點移除作業不會等待節點上執行的應用程式主機完成作業。應用程式主容器終止後,系統會在未停用的其他節點上重新排程。作業進度不會遺失:新的應用程式主控台會讀取作業記錄,從先前的應用程式主控台快速復原狀態。