Managed Service for Apache Spark 彈性增強模式

Managed Service for Apache Spark 強化版彈性模式 (EFM) 會管理重組資料,為運作中的叢集盡可能縮短節點移除作業產生的工作進度延遲。EFM 會將資料寫入主要工作站,藉此卸載重組資料。工作站會在縮減階段從這些遠端節點提取資料。這個模式僅適用於 Spark 工作。

由於 EFM 不會在次要工作站儲存中繼資料重組資料,因此非常適合使用先占 VM 的叢集,或僅自動調度資源次要工作站群組的叢集。

Managed Service for Apache Spark 支援 EFM,但須使用 2.0.31+2.1.6+2.2+ 和更新版本的映像檔

限制:

  • 不支援 AppMaster 重新安置的 Apache Hadoop YARN 工作,可能會在「增強彈性模式」中失敗 (請參閱「何時等待 AppMaster 完成」)。
  • 不建議使用強化版彈性模式:
    • 叢集只有主要 worker
    • 因為工作完成後,最多可能需要 30 分鐘才能清理中繼重組資料。
    • 在執行筆記本的叢集上,因為系統可能不會在工作階段期間清除重組資料。
    • 在啟用安全停用程序的叢集上執行 Spark 工作時,安全停用程序和 EFM 可能會互相衝突,因為 YARN 安全停用機制會保留 DECOMMISSIONING 節點,直到所有相關應用程式完成為止。
    • 在同時執行 Spark 和非 Spark 工作的叢集上。
  • 強化版彈性模式不支援以下項目:
    • 啟用主要工作站自動調度資源功能時,主要工作站通常會繼續儲存未自動遷移的重組資料。縮減主要工作站群組會抵銷 EFM 的優點。

使用強化版彈性模式

建立叢集時,將 dataproc:efm.spark.shuffle 叢集屬性設為 primary-worker,即可啟用強化版彈性模式。

範例:

gcloud dataproc clusters create cluster-name \
    --region=region \
    --properties=dataproc:efm.spark.shuffle=primary-worker \
     other flags ...

Apache Spark 範例

  1. 使用 EFM 叢集上的 Spark 範例 JAR,針對莎士比亞的公開文字執行 WordCount 工作。
    gcloud dataproc jobs submit spark \
        --cluster=cluster-name \
        --region=region \
        --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
        --class=org.apache.spark.examples.JavaWordCount \
        -- gs://apache-beam-samples/shakespeare/macbeth.txt
    

設定本機 SSD

由於 EFM 會將中間的隨機資料寫入 VM 附加的磁碟,因此本機 SSD 提供的額外總處理量和 IOPS 有助於提升效能。為方便資源分配,設定主要工作站機器時,請以每 4 個 vCPU 約 1 個本機 SSD 分區為目標。

如要附加本機 SSD,請將 --num-worker-local-ssds 旗標傳遞至 gcloud Managed Service for Apache Spark clusters create 指令。

一般來說,次要工作者不需要本機 SSD。由於次要工作者不會在本機寫入洗牌資料,因此通常不太需要使用 --num-secondary-worker-local-ssds 旗標,將本機 SSD 新增至叢集的次要工作者。不過,由於本機 SSD 可提升本機磁碟效能,如果預期工作會因使用本機磁碟而受 I/O 限制,您可能會決定將本機 SSD 新增至次要工作者:工作會使用大量本機磁碟做為暫存空間,或分割區過大而無法放入記憶體,因此會溢出至磁碟

次要工作站比例

由於次要 worker 會將隨機重組資料寫入主要 worker,因此叢集必須包含足夠數量的主要 worker,並具備足夠的 CPU、記憶體和磁碟資源,才能因應作業的隨機重組負載。如果是自動調度資源叢集,請在主要 worker 群組的自動調度資源政策中,將 minInstances 設為 maxInstances 值,防止主要群組調度資源,進而避免發生非預期行為。

如果次要工作站與主要工作站的比率偏高 (例如 10:1),請監控主要工作站的 CPU 使用率、網路和磁碟用量,判斷是否過載。現在說明一下操作方式:

  1. 前往Google Cloud 控制台的「VM instances」(VM 執行個體) 頁面。

  2. 按一下主要工作人員左側的核取方塊。

  3. 按一下「MONITORING」(監控) 分頁標籤,即可查看主要 worker 的 CPU 使用率、磁碟 IOPS、網路位元組和其他指標。

如果主要 worker 負載過重,請考慮手動擴充主要 worker

調整主要工作站群組的大小

主要工作站群組可以安全地向上擴充,但縮減主要工作站群組可能會對工作進度造成負面影響。縮減主要工作人員群組規模的作業應使用安全停用程序,只要設定 --graceful-decommission-timeout 旗標即可啟用這項程序。

自動調度資源叢集:如果 EFM 叢集採用自動調度資源政策,系統會停用主要 worker 群組調度資源功能。如要調整自動調整叢集中的主要 worker 群組大小,請按照下列步驟操作:

  1. 停用自動調度資源功能。

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --disable-autoscaling
    

  2. 擴充主要群組。

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --num-workers=num-primary-workers \
        --graceful-decommission-timeout=graceful-decommission-timeout # (if downscaling)
    

  3. 重新啟用自動調度資源功能:

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --autoscaling-policy=autoscaling-policy
    

監控主要工作站磁碟用量

主要工作站必須有足夠的磁碟空間,才能儲存叢集的重組資料。您可以透過 remaining HDFS capacity 指標間接監控這項資訊。本機磁碟空間用盡時,HDFS 就無法使用空間,剩餘容量也會減少。

根據預設,當主要工作站的本機磁碟用量超過容量的 90% 時,節點會在 YARN 節點 UI 中標示為 UNHEALTHY。如果遇到磁碟容量問題,可以從 HDFS 刪除未使用的資料,或擴充主要工作站集區。

進階設定

分區與平行處理

提交 Spark 工作時,請設定適當的分區層級。決定 Shuffle 階段的輸入和輸出分區數量時,需要權衡不同的效能特性。建議您嘗試適合工作形狀的值。

輸入分區

Spark 和 MapReduce 輸入分區是由輸入資料集決定。 從 Cloud Storage 讀取檔案時,每個工作會處理約一個「區塊大小」的資料。

  • 如果是 Spark SQL 工作,最大分割區大小由 spark.sql.files.maxPartitionBytes 控制。 建議您將大小增加至 1 GB:spark.sql.files.maxPartitionBytes=1073741824

  • 如果是 Spark RDD,通常會使用 fs.gs.block.size 控制分割區大小,預設為 128 MB。建議增加至 1 GB。 範例:--properties spark.hadoop.fs.gs.block.size=1073741824

輸出分區

後續階段的工作數量由多項屬性控管。如果處理量超過 1 TB 的大型工作,請考慮為每個分割區分配至少 1 GB。

  • 如果是 Spark SQL,輸出分割區的數量由 spark.sql.shuffle.partitions 控制。

  • 如果是使用 RDD API 的 Spark 工作,您可以指定輸出分割區數量或設定 spark.default.parallelism

主要工作站重組的重組微調

最重要的屬性是 --properties yarn:spark.shuffle.io.serverThreads=<num-threads>。請注意,這是叢集層級的 YARN 屬性,因為 Spark Shuffle 伺服器會以節點管理員的一部分執行。預設值為機器上的核心數量的兩倍 (例如,n1-highmem-8 上的 16 個執行緒)。如果「Shuffle Read Blocked Time」大於 1 秒,且主要工作站尚未達到網路、CPU 或磁碟限制,請考慮增加 Shuffle 伺服器執行緒的數量。

如果是較大的機器類型,請考慮增加 spark.shuffle.io.numConnectionsPerPeer,預設值為 1。(例如,將每個主機配對的連線數設為 5)。

增加重試次數

您可以設定下列屬性,為應用程式主機、工作和階段設定允許的嘗試次數上限:

yarn:yarn.resourcemanager.am.max-attempts
spark:spark.task.maxFailures
spark:spark.stage.maxConsecutiveAttempts

由於使用大量先佔 VM 或自動調度資源 (不含安全停用程序) 的叢集,應用程式主機和工作終止的頻率較高,因此在這些叢集中增加上述屬性的值會有幫助 (請注意,不支援搭配 Spark 使用 EFM 和安全停用程序)。

EFM 叢集上的 YARN 安全停用程序

YARN 安全停用可用於快速移除節點,對執行中的應用程式影響極小。對於自動調度資源叢集,您可以在附加至 EFM 叢集的 AutoscalingPolicy 中設定安全停用逾時

EFM 強化安全停用程序

  1. 由於中繼資料儲存在分散式檔案系統中,因此只要節點上執行的所有容器都已完成作業,即可從 EFM 叢集中移除節點。相較之下,在標準 Managed Service for Apache Spark 叢集中,應用程式完成作業後才會移除節點。

  2. 節點移除作業不會等待節點上執行的應用程式主機完成作業。 應用程式主容器終止後,系統會在未停用的其他節點上重新排程。作業進度不會遺失:新的應用程式主控台會讀取作業記錄,從先前的應用程式主控台快速復原狀態。