מצב הגמישות המשופר (EFM) של Dataproc מנהל את נתוני ה-shuffle כדי לצמצם את העיכובים בהתקדמות העבודה שנגרמים כתוצאה מהסרת צמתים מאשכול פעיל. EFM מעביר נתוני ערבוב על ידי כתיבת נתונים לעובדים ראשיים. העובדים שולפים מהצמתים המרוחקים האלה במהלך שלב הצמצום. המצב הזה זמין רק לעבודות Spark.
מכיוון ש-EFM לא שומר נתוני ערבוב ביניים בעובדים משניים, הוא מתאים במיוחד לאשכולות שמשתמשים במכונות וירטואליות שניתן לקטוע את הפעולה שלהן או רק בשינוי גודל אוטומטי של קבוצת העובדים המשניים.
התכונה EFM נתמכת ב-Dataproc
2.0.31+, 2.1.6+, 2.2+ ובגרסאות מאוחרות יותר של תמונות.
- עבודות של Apache Hadoop YARN שלא תומכות בהעברה של AppMaster עלולות להיכשל במצב גמישות משופרת (ראו מתי צריך לחכות לסיום של AppMasters).
- לא מומלץ להשתמש במצב גמישות משופר:
- באשכול שיש בו רק עובדים ראשיים
- במשימות סטרימינג, כי יכולות לעבור עד 30 דקות אחרי השלמת המשימה עד לניקוי נתוני ערבוב ביניים.
- באשכול שמריץ מחברות, כי יכול להיות שנתוני ערבוב לא ינוקו במהלך חיי הסשן.
- כשמשימות Spark מופעלות באשכול שבו מופעלת האפשרות של צמצום הדרגתי של כמות השרתים. השבתה הדרגתית ו-EFM יכולות לפעול למטרות מנוגדות, כי מנגנון ההשבתה ההדרגתית של YARN שומר את הצמתים בהשבתה עד שכל האפליקציות הרלוונטיות מסיימות את הפעולה.
- באשכול שמריץ גם משימות Spark וגם משימות שאינן Spark.
- לא אפשרי במצב גמישות משופר:
- כשההגדרה 'התאמה אוטומטית לעומס של העובד הראשי' מופעלת. ברוב המקרים, עובדים ראשיים ימשיכו לאחסן נתוני ערבוב שלא מועברים אוטומטית. הקטנת קבוצת העובדים הראשית מבטלת את היתרונות של EFM.
שימוש במצב גמישות משופר
הגמישות המשופרת מופעלת כשיוצרים אשכול על ידי הגדרת dataproc:efm.spark.shuffle מאפיין האשכול לערך primary-worker.
לדוגמה:
gcloud dataproc clusters create cluster-name \ --region=region \ --properties=dataproc:efm.spark.shuffle=primary-worker \ other flags ...
דוגמה ל-Apache Spark
- מריצים משימה של ספירת מילים מול טקסט ציבורי של שייקספיר באמצעות קובץ ה-jar של דוגמאות Spark באשכול EFM.
gcloud dataproc jobs submit spark \ --cluster=cluster-name \ --region=region \ --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \ --class=org.apache.spark.examples.JavaWordCount \ -- gs://apache-beam-samples/shakespeare/macbeth.txt
הגדרת כונני SSD מקומיים
מכיוון ש-EFM כותב נתוני ערבוב ביניים לדיסקים שמצורפים למכונות וירטואליות, הוא נהנה מרוחב הפס הנוסף וממספר פעולות הקלט/פלט בשנייה שמסופקים על ידי כונני SSD מקומיים. כדי להקל על הקצאת משאבים, מומלץ לכוון להקצאה של מחיצת SSD מקומי אחת לכל 4 ליבות vCPU כשמגדירים מכונות עובד ראשיות.
כדי לצרף כונני SSD מקומיים, מעבירים את הדגל --num-worker-local-ssds לפקודה gcloud Dataproc clusters create.
בדרך כלל, לא תצטרכו כונני SSD מקומיים בעובדים משניים.
הוספת כונני SSD מקומיים לעובדים המשניים של אשכול (באמצעות הדגל --num-secondary-worker-local-ssds) היא לרוב פחות חשובה, כי העובדים המשניים לא כותבים נתוני ערבוב באופן מקומי.
עם זאת, מכיוון ש-SSD מקומיים משפרים את הביצועים של הדיסק המקומי, יכול להיות שתחליטו להוסיף SSD מקומיים לעובדים משניים אם אתם צופים שעבודות יהיו מוגבלות על ידי קלט/פלט בגלל שימוש בדיסק מקומי: העבודה שלכם משתמשת בדיסק מקומי משמעותי עבור שטח אחסון זמני, או שהמחיצות שלכם גדולות מדי מכדי להיכנס לזיכרון ויעברו לדיסק.
יחס עובדים משני
מכיוון שעובדים משניים כותבים את נתוני ה-shuffle שלהם לעובדים ראשיים, האשכול צריך להכיל מספר מספיק של עובדים ראשיים עם מספיק משאבי CPU, זיכרון ודיסק כדי להתאים לעומס ה-shuffle של העבודה. כדי למנוע את שינוי הגודל של קבוצת העובדים הראשית באשכולות עם התאמה אוטומטית לעומס, וכך למנוע התנהגות לא רצויה, צריך להגדיר את minInstances לערך maxInstances במדיניות התאמה אוטומטית לעומס של קבוצת העובדים הראשית.
אם יש לכם יחס גבוה בין העובדים המשניים לבין העובדים הראשיים (לדוגמה, 10:1), כדאי לעקוב אחרי השימוש ב-CPU, ברשת ובדיסק של העובדים הראשיים כדי לקבוע אם יש עומס יתר. לשם כך:
נכנסים לדף VM instances במסוףGoogle Cloud .
לוחצים על תיבת הסימון שמימין לעובד הראשי.
לוחצים על הכרטיסייה MONITORING (מעקב) כדי לראות את נתוני השימוש במעבד של העובד הראשי, את נתוני ה-IOPS של הדיסק, את נתוני הבייטים ברשת ומדדים אחרים.
אם עומס העבודה על העובדים הראשיים גבוה מדי, כדאי להגדיל את מספר העובדים הראשיים באופן ידני.
שינוי הגודל של קבוצת העובדים הראשית
אפשר להגדיל את קבוצת העובדים הראשית בלי בעיות, אבל הקטנה של קבוצת העובדים הראשית עלולה להשפיע לרעה על התקדמות העבודה. פעולות שמקטינות את קבוצת העובדים הראשית צריכות להשתמש בצמצום הדרגתי של כמות השרתים, שמופעל על ידי הגדרת הדגל --graceful-decommission-timeout.
אשכולות עם התאמה אוטומטית לעומס: ההתאמה לעומס של קבוצת העובדים הראשית מושבתת באשכולות EFM עם מדיניות התאמה אוטומטית לעומס. כדי לשנות את הגודל של קבוצת העובדים הראשית באשכול עם מידרוג אוטומטי:
משביתים את ההתאמה האוטומטית לעומס.
gcloud dataproc clusters update \ --cluster=cluster-name \ --region=region \ --disable-autoscaling
שינוי הגודל של הקבוצה הראשית.
gcloud dataproc clusters update \ --cluster=cluster-name \ --region=region \ --num-workers=num-primary-workers \ --graceful-decommission-timeout=graceful-decommission-timeout # (if downscaling)
הפעלה מחדש של התאמה אוטומטית לעומס:
gcloud dataproc clusters update \ --cluster=cluster-name \ --region=region \ --autoscaling-policy=autoscaling-policy
מעקב אחר השימוש בדיסק של העובד הראשי
לעובדים הראשיים צריך להיות מספיק מקום בדיסק כדי לאחסן את נתוני הערבוב של האשכול.
אפשר לעקוב אחרי זה באופן עקיף באמצעות המדד remaining HDFS capacity.
כשהנפח של הדיסק המקומי מתמלא, אין יותר מקום ל-HDFS, והקיבולת שנותרה קטנה.
כברירת מחדל, אם השימוש בדיסק המקומי של עובד ראשי עולה על 90% מהקיבולת, הצומת יסומן כ-UNHEALTHY בממשק המשתמש של צומת YARN. אם נתקלים בבעיות בקיבולת הדיסק, אפשר למחוק נתונים שלא בשימוש מ-HDFS או להגדיל את מספר העובדים במאגר העובדים הראשי.
הגדרה מתקדמת
חלוקה למחיצות (partitioning) ומקביליות (parallelism)
כששולחים משימת Spark, צריך להגדיר רמה מתאימה של חלוקה למחיצות. ההחלטה לגבי מספר מחיצות הקלט והפלט בשלב של ערבוב נתונים כוללת פשרה בין מאפייני ביצועים שונים. מומלץ להתנסות עם ערכים שמתאימים לצורות המשרות שלכם.
מחיצות קלט
החלוקה למחיצות של הקלט ב-Spark וב-MapReduce נקבעת לפי מערך נתוני הקלט. כשקוראים קבצים מ-Cloud Storage, כל משימה מעבדת נתונים בגודל של בערך 'גודל בלוק' אחד.
במשימות Spark SQL, הגודל המקסימלי של המחיצה נשלט על ידי
spark.sql.files.maxPartitionBytes. מומלץ להגדיל את הנפח ל-1GB:spark.sql.files.maxPartitionBytes=1073741824.ב-Spark RDD, גודל המחיצה נשלט בדרך כלל באמצעות
fs.gs.block.size, שערך ברירת המחדל שלו הוא 128MB. כדאי להגדיל אותו ל-1GB. לדוגמה:--properties spark.hadoop.fs.gs.block.size=1073741824
מחיצות פלט
מספר המשימות בשלבים הבאים נקבע על ידי כמה מאפיינים. במשימות גדולות יותר שמעבדות יותר מ-1TB, כדאי להקצות לפחות 1GB לכל מחיצה.
ב-Spark SQL, מספר מחיצות הפלט נשלט על ידי
spark.sql.shuffle.partitions.במשימות Spark שמשתמשות ב-RDD API, אפשר לציין את מספר מחיצות הפלט או להגדיר את
spark.default.parallelism.
התאמת ערבוב לעובד הראשי
המאפיין המשמעותי ביותר הוא --properties yarn:spark.shuffle.io.serverThreads=<num-threads>.
שימו לב: זוהי מאפיין YARN ברמת האשכול, כי שרת ה-shuffle של Spark פועל כחלק מ-Node Manager. ברירת המחדל היא פי שניים (2x) ממספר ליבות המעבד במכונה (לדוגמה, 16 שרשורים ב-n1-highmem-8). אם הערך של 'זמן חסימת קריאת ערבוב' גדול משנייה אחת, והעובדים הראשיים לא הגיעו למגבלות של הרשת, המעבד או הדיסק, כדאי להגדיל את מספר השרשורים של שרת הערבוב.
בסוגי מכונות גדולים יותר, כדאי להגדיל את הערך של spark.shuffle.io.numConnectionsPerPeer, שמוגדר כברירת מחדל ל-1. (לדוגמה, אפשר להגדיר 5 חיבורים לכל צמד מארחים).
הגדלת מספר הניסיונות החוזרים
אפשר להגדיר את המספר המקסימלי של ניסיונות שמותרים למאסטרים של אפליקציות, למשימות ולשלבים באמצעות הגדרת המאפיינים הבאים:
yarn:yarn.resourcemanager.am.max-attempts spark:spark.task.maxFailures spark:spark.stage.maxConsecutiveAttempts
מכיוון שמשימות ואפליקציות ראשיות מסתיימות בתדירות גבוהה יותר באשכולות שמשתמשים בהרבה מכונות VM זמניות או בהתאמה אוטומטית לעומס ללא צמצום הדרגתי של כמות השרתים, הגדלת הערכים של המאפיינים הקודמים באשכולות האלה יכולה לעזור (שימו לב ששימוש ב-EFM עם Spark וצמצום הדרגתי של כמות השרתים לא אפשרי).
צמצום הדרגתי של כמות השרתים ב-YARN באשכולות EFM
אפשר להשתמש ב-YARN Graceful Decommissioning כדי להסיר צמתים במהירות עם השפעה מינימלית על אפליקציות שפועלות. באשכולות עם התאמה אוטומטית לעומס, אפשר להגדיר את הזמן הקצוב לתפוגה של צמצום הדרגתי של כמות השרתים ב-AutoscalingPolicy שמצורף לאשכול EFM.
שיפורים ב-EFM לצורך צמצום הדרגתי של כמות השרתים
מכיוון שהנתונים הזמניים מאוחסנים במערכת קבצים מבוזרת, אפשר להסיר צמתים מאשכול EFM ברגע שכל הקונטיינרים שפועלים בצמתים האלה סיימו את הפעולה. לעומת זאת, בצמתים של אשכולות Dataproc רגילים, הצמתים לא מוסרים עד שהאפליקציה מסיימת את הפעולה.
הסרת צומת לא ממתינה לסיום של אפליקציות ראשיות שפועלות בצומת. כשהקונטיינר הראשי של האפליקציה מסתיים, הוא מתוזמן מחדש בצומת אחר שלא מוצא משימוש. התקדמות העבודה לא אובדת: האפליקציה הראשית החדשה משחזרת במהירות את המצב מהאפליקציה הראשית הקודמת על ידי קריאת היסטוריית העבודות.