ב-Managed Service for Apache Spark Enhanced Flexibility Mode (EFM), הנתונים של פעולת ה-shuffle מנוהלים כדי לצמצם את העיכובים בהתקדמות העבודה שנגרמים כתוצאה מהסרת צמתים מאשכול פעיל. EFM מעביר נתוני ערבוב על ידי כתיבת נתונים לעובדים ראשיים. העובדים שולפים מהצמתים המרוחקים האלה במהלך שלב הצמצום. המצב הזה זמין רק לעבודות Spark.
מכיוון ש-EFM לא שומר נתוני ערבוב ביניים בעובדים משניים, הוא מתאים במיוחד לאשכולות שמשתמשים במכונות וירטואליות שניתן לקטוע את הפעולה שלהן או רק בשינוי גודל אוטומטי של קבוצת העובדים המשניים.
EFM נתמך ב-Managed Service for Apache Spark
2.0.31+, 2.1.6+, 2.2+ ובגרסאות מאוחרות יותר של תמונות.
- משימות של Apache Hadoop YARN שלא תומכות בהעברה של AppMaster עלולות להיכשל במצב גמישות משופרת (ראו מתי צריך לחכות לסיום של AppMasters).
- לא מומלץ להשתמש במצב גמישות משופר:
- באשכול שיש בו רק עובדים ראשיים
- במשימות סטרימינג, כי יכולות לעבור עד 30 דקות אחרי השלמת המשימה עד לניקוי נתוני ערבוב ביניים.
- באשכול שמריץ מחברות, כי יכול להיות שנתוני ערבוב לא ינוקו במהלך חיי הסשן.
- כשמשימות Spark מופעלות באשכול שבו מופעלת האפשרות של צמצום הדרגתי של כמות השרתים. צמצום הדרגתי של כמות השרתים ו-EFM יכולים לפעול למטרות מנוגדות, כי מנגנון צמצום הדרגתי של כמות השרתים של YARN שומר את הצמתים בהשבתה עד שכל האפליקציות הרלוונטיות מסתיימות.
- באשכול שמריץ גם משימות Spark וגם משימות שאינן Spark.
- מצב גמישות משופר לא נתמך:
- כשההגדרה 'שינוי גודל אוטומטי של העובד הראשי' מופעלת. ברוב המקרים, עובדים ראשיים ימשיכו לאחסן נתוני ערבוב שלא מועברים אוטומטית. הקטנת קבוצת העובדים הראשית מבטלת את היתרונות של EFM.
שימוש במצב גמישות משופר
הגמישות המשופרת מופעלת כשיוצרים אשכול על ידי הגדרת dataproc:efm.spark.shuffle מאפיין האשכול לערך primary-worker.
לדוגמה:
gcloud dataproc clusters create cluster-name \ --region=region \ --properties=dataproc:efm.spark.shuffle=primary-worker \ other flags ...
דוגמה ל-Apache Spark
- מריצים משימה של ספירת מילים מול טקסט ציבורי של שייקספיר באמצעות קובץ ה-jar של דוגמאות Spark באשכול EFM.
gcloud dataproc jobs submit spark \ --cluster=cluster-name \ --region=region \ --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \ --class=org.apache.spark.examples.JavaWordCount \ -- gs://apache-beam-samples/shakespeare/macbeth.txt
הגדרת כונני SSD מקומיים
מכיוון ש-EFM כותב נתוני ערבוב ביניים לדיסקים שמצורפים למכונות וירטואליות, הוא נהנה מרוחב הפס הנוסף ומ-IOPS שמסופקים על ידי כונני SSD מקומיים. כדי להקל על הקצאת משאבים, כשמגדירים מכונות עובד ראשיות, מומלץ לכוון להקצאה של מחיצה אחת של SSD מקומי לכל 4 ליבות vCPU.
כדי לצרף כונני SSD מקומיים, מעבירים את הדגל --num-worker-local-ssds לפקודה gcloud Managed Service for Apache Spark clusters create.
בדרך כלל, לא תצטרכו כונני SSD מקומיים בעובדים משניים.
בדרך כלל אין חשיבות רבה להוספת כונני SSD מקומיים לעובדים המשניים של אשכול (באמצעות הדגל --num-secondary-worker-local-ssds), כי העובדים המשניים לא כותבים נתונים של ערבוב באופן מקומי.
עם זאת, מכיוון ש-SSD מקומיים משפרים את הביצועים של הדיסק המקומי, יכול להיות שתחליטו להוסיף SSD מקומיים לעובדים משניים אם אתם מצפים שעבודות יהיו מוגבלות על ידי קלט/פלט בגלל שימוש בדיסק מקומי: העבודה שלכם משתמשת בדיסק מקומי משמעותי עבור שטח אחסון זמני, או שהמחיצות שלכם גדולות מדי מכדי להיכנס לזיכרון ויעברו לדיסק.
יחס עובדים משני
מכיוון שעובדים משניים כותבים את נתוני ה-shuffle שלהם לעובדים ראשיים, באשכול צריך להיות מספר מספיק של עובדים ראשיים עם מספיק משאבי CPU, זיכרון ודיסק כדי להתאים לעומס ה-shuffle של העבודה. באשכולות עם התאמה אוטומטית לעומס, כדי למנוע את ההתאמה לעומס של הקבוצה הראשית וכך למנוע התנהגות לא רצויה, צריך להגדיר את minInstances לערך maxInstances במדיניות התאמה אוטומטית לעומס של קבוצת ה-worker הראשית.
אם יש לכם יחס גבוה בין העובדים המשניים לבין העובדים הראשיים (לדוגמה, 10:1), כדאי לעקוב אחרי השימוש ב-CPU, ברשת ובדיסק של העובדים הראשיים כדי לקבוע אם יש עליהם עומס יתר. לשם כך:
נכנסים לדף VM instances במסוףGoogle Cloud .
לוחצים על תיבת הסימון שמימין לעובד הראשי.
לוחצים על הכרטיסייה MONITORING (מעקב) כדי לראות את נתוני השימוש במעבד של העובד הראשי, את נתוני ה-IOPS של הדיסק, את נתוני הבייטים ברשת ומדדים אחרים.
אם עומס העבודה על העובדים הראשיים גבוה מדי, כדאי להגדיל את מספר העובדים הראשיים באופן ידני.
שינוי הגודל של קבוצת העובדים הראשית
אפשר להגדיל את מספר העובדים בקבוצת העובדים הראשית בלי בעיות, אבל הקטנת מספר העובדים בקבוצת העובדים הראשית עלולה להשפיע לרעה על התקדמות העבודה. פעולות שמקטינות את קבוצת העובדים הראשית צריכות להשתמש בצמצום הדרגתי של כמות השרתים, שמופעל על ידי הגדרת הדגל --graceful-decommission-timeout.
אשכולות עם התאמה אוטומטית לעומס: ההתאמה לעומס של קבוצת העובדים הראשית מושבתת באשכולות EFM עם מדיניות של התאמה אוטומטית לעומס. כדי לשנות את הגודל של קבוצת העובדים הראשית באשכול עם מידרוג אוטומטי:
משביתים את שינוי הגודל האוטומטי.
gcloud dataproc clusters update \ --cluster=cluster-name \ --region=region \ --disable-autoscaling
שינוי הגודל של הקבוצה הראשית.
gcloud dataproc clusters update \ --cluster=cluster-name \ --region=region \ --num-workers=num-primary-workers \ --graceful-decommission-timeout=graceful-decommission-timeout # (if downscaling)
הפעלה מחדש של שינוי הגודל האוטומטי:
gcloud dataproc clusters update \ --cluster=cluster-name \ --region=region \ --autoscaling-policy=autoscaling-policy
מעקב אחר השימוש בדיסק של העובד הראשי
לעובדים הראשיים צריך להיות מספיק מקום בדיסק לנתוני הערבוב של האשכול.
אפשר לעקוב אחרי זה באופן עקיף באמצעות המדד remaining HDFS capacity.
כשהנפח הפנוי בדיסק המקומי מתמלא, אין יותר מקום ל-HDFS, והקיבולת שנותרה קטנה יותר.
כברירת מחדל, אם השימוש בדיסק המקומי של עובד ראשי עולה על 90% מהקיבולת, הצומת יסומן כ-UNHEALTHY בממשק המשתמש של צומת YARN. אם נתקלים בבעיות בקיבולת הדיסק, אפשר למחוק נתונים שלא בשימוש מ-HDFS או להגדיל את מספר העובדים במאגר העובדים הראשי.
הגדרה מתקדמת
חלוקה למחיצות (partitioning) ומקביליות
כששולחים משימת Spark, צריך להגדיר רמה מתאימה של חלוקה למחיצות. ההחלטה לגבי מספר מחיצות הקלט והפלט בשלב של ערבוב נתונים כוללת פשרה בין מאפייני ביצועים שונים. מומלץ להתנסות עם ערכים שמתאימים למבנה התפקידים שלכם.
מחיצות קלט
החלוקה למחיצות של הקלט ב-Spark וב-MapReduce נקבעת לפי מערך נתוני הקלט. כשקוראים קבצים מ-Cloud Storage, כל משימה מעבדת נתונים בגודל של בערך 'גודל בלוק' אחד.
במשימות Spark SQL, הגודל המקסימלי של המחיצה נקבע על ידי
spark.sql.files.maxPartitionBytes. מומלץ להגדיל את הנפח ל-1GB:spark.sql.files.maxPartitionBytes=1073741824.ב-RDD של Spark, גודל המחיצה נשלט בדרך כלל באמצעות
fs.gs.block.size, שמוגדר כברירת מחדל ל-128MB. כדאי להגדיל אותו ל-1GB. לדוגמה:--properties spark.hadoop.fs.gs.block.size=1073741824
מחיצות פלט
מספר המשימות בשלבים הבאים נקבע על ידי כמה מאפיינים. במשימות גדולות יותר שמעבדות יותר מ-1TB, כדאי להקצות לפחות 1GB לכל מחיצה.
ב-Spark SQL, מספר מחיצות הפלט נשלט על ידי
spark.sql.shuffle.partitions.במשימות Spark שמשתמשות ב-RDD API, אפשר לציין את מספר מחיצות הפלט או להגדיר את
spark.default.parallelism.
שינוי סדר העובדים בשיבוץ העובדים הראשי
המאפיין המשמעותי ביותר הוא --properties yarn:spark.shuffle.io.serverThreads=<num-threads>.
שימו לב שזוהי מאפיין YARN ברמת האשכול, כי שרת ה-shuffle של Spark פועל כחלק מ-Node Manager. ברירת המחדל היא פי שניים (2x) ממספר ליבות המעבד במכונה (לדוגמה, 16 שרשורים ב-n1-highmem-8). אם הערך של 'זמן חסימת ערבוב' גדול משנייה אחת, והעובדים הראשיים לא הגיעו למגבלות של הרשת, המעבד או הדיסק, כדאי להגדיל את מספר השרשורים של שרת הערבוב.
בסוגי מכונות גדולים יותר, כדאי להגדיל את הערך של spark.shuffle.io.numConnectionsPerPeer, שמוגדר כברירת מחדל ל-1. (לדוגמה, הגדרה של 5 חיבורים לכל צמד מארחים).
הגדלת מספר הניסיונות החוזרים
אפשר להגדיר את המספר המקסימלי של ניסיונות שמותרים למשימות, לשלבים ולאפליקציות ראשיות על ידי הגדרת המאפיינים הבאים:
yarn:yarn.resourcemanager.am.max-attempts spark:spark.task.maxFailures spark:spark.stage.maxConsecutiveAttempts
מכיוון שאפליקציות ראשיות ומשימות מסתיימות בתדירות גבוהה יותר באשכולות שבהם נעשה שימוש בהרבה מכונות VM זמני או בהתאמה אוטומטית לעומס ללא צמצום הדרגתי של כמות השרתים, הגדלת הערכים של המאפיינים הקודמים באשכולות האלה יכולה לעזור (שימו לב ששימוש ב-EFM עם Spark וצמצום הדרגתי של כמות השרתים לא אפשרי).
צמצום הדרגתי של כמות השרתים ב-YARN באשכולות EFM
אפשר להשתמש ב-YARN Graceful Decommissioning כדי להסיר צמתים במהירות עם השפעה מינימלית על אפליקציות שפועלות. באשכולות עם התאמה אוטומטית לעומס, אפשר להגדיר את זמן קצוב לתפוגה של צמצום הדרגתי של כמות השרתים ב-AutoscalingPolicy שמצורף לאשכול EFM.
שיפורים ב-EFM לצמצום הדרגתי של כמות השרתים
מכיוון שהנתונים הזמניים מאוחסנים במערכת קבצים מבוזרת, אפשר להסיר צמתים מאשכול EFM ברגע שכל הקונטיינרים שפועלים בצמתים האלה סיימו את הפעולה. לעומת זאת, בצבי אשכולות רגילים של Managed Service for Apache Spark, הצמתים לא מוסרים עד שהאפליקציה מסיימת את הפעולה.
הסרת צומת לא ממתינה לסיום של אפליקציות ראשיות שפועלות בצומת. כשהמאגר הראשי של האפליקציה מסתיים, המערכת מתזמנת אותו מחדש בצומת אחר שלא מוצא משימוש. התקדמות העבודה לא אובדת: האפליקציה הראשית החדשה משחזרת במהירות את המצב מהאפליקציה הראשית הקודמת על ידי קריאת היסטוריית העבודה.