העברות מקבילות

צינורות (Pipelines) מופעלים באשכולות של מכונות. הם משיגים תפוקה גבוהה על ידי פיצול העבודה שצריך לבצע, ולאחר מכן הפעלת העבודה במקביל במספר רכיבי Executor שפזורים ברחבי האשכול. באופן כללי, ככל שמספר הפיצולים (שנקראים גם מחיצות) גדול יותר, כך אפשר להריץ את צינור הנתונים מהר יותר. רמת המקביליות בצינור נקבעת על ידי המקורות ושלבי הערבוב בצינור.

מקורות

בתחילת כל הרצה של צינור, כל מקור בצינור מחשב אילו נתונים צריך לקרוא ואיך אפשר לחלק את הנתונים האלה לפי פיצולים. לדוגמה, נניח שיש צינור בסיסי שקורא מ-Cloud Storage, מבצע כמה טרנספורמציות של Wrangler ואז כותב בחזרה ל-Cloud Storage.

צינור בסיסי שמציג מקור Cloud Storage, טרנספורמציה של Wrangler ויעד Cloud Storage

כשהפייפליין מתחיל, מקור Cloud Storage בודק את קובצי הקלט ומחלק אותם לפי הגודל שלהם. לדוגמה, אפשר לחלק קובץ בגודל של גיגה-בייט אחד ל-100 חלקים, כל אחד בגודל של 10 MB. כל רכיב Executor קורא את הנתונים של החלוקה, מריץ את ההמרות של Wrangler ואז כותב את הפלט לקובץ part.

חלוקת נתונים ב-Cloud Storage למחיצות לטרנספורמציות מקבילות ב-Wrangler לקבצים חלקיים

אם צינור הנתונים פועל לאט, אחד הדברים הראשונים שצריך לבדוק הוא אם המקורות יוצרים מספיק פיצולים כדי לנצל את היתרונות של ההפעלה המקבילית. לדוגמה, דחיסה מסוג מסוים לא מאפשרת לפצל קבצים של טקסט רגיל. אם אתם קוראים קבצים שדחוסים באמצעות gzip, יכול להיות שתבחינו שהצינור פועל הרבה יותר לאט מאשר אם הייתם קוראים קבצים לא דחוסים, או קבצים שדחוסים באמצעות BZIP (שאפשר לפצל). באופן דומה, אם אתם משתמשים במקור מסד הנתונים והגדרתם אותו לשימוש רק בפיצול אחד, הוא יפעל הרבה יותר לאט מאשר אם תגדירו אותו לשימוש ביותר פיצולים.

השמעה אקראית

סוגים מסוימים של תוספים גורמים לערבוב של הנתונים באשכול. זה קורה כשצריך לשלוח רשומות שמעובדות על ידי מפעיל אחד למפעיל אחר כדי לבצע את החישוב. פעולות Shuffle הן פעולות יקרות כי הן כוללות הרבה קלט/פלט. תוספים שגורמים לערבוב הנתונים מוצגים בקטע Analytics ב-Pipeline Studio. הם כוללים תוספים כמו Group By,‏ Deduplicate,‏ Distinct ו-Joiner. לדוגמה, נניח שמוסיפים לשלב Group By בצינור של הדוגמה הקודמת.

נניח גם שהנתונים שנקראים מייצגים רכישות שבוצעו בחנות מכולת. כל רשומה מכילה שדה item ושדה num_purchased. בשלב Group By, מגדירים את צינור העיבוד כך שיקבץ רשומות בשדה item ויחשב את הסכום של השדה num_purchased.

כשצינור הנתונים פועל, הקבצים של נתוני הקלט מפולחים כמו שמתואר קודם. לאחר מכן, כל רשומה עוברת ערבוב באשכול כך שכל רשומה עם אותו פריט שייכת לאותו מפעיל.

כפי שמודגם בדוגמה הקודמת, הרשומות של רכישות של מוצרי אפל פוזרו במקור בין כמה מפעילים. כדי לבצע את הצבירה, כל הרשומות האלה צריכות להישלח ברחבי האשכול לאותו מפעיל.

ברוב הפלאגינים שדורשים ערבוב אפשר לציין את מספר המחיצות שבהן רוצים להשתמש כשמערבבים את הנתונים. ההגדרה הזו קובעת כמה תהליכי ביצוע ישמשו לעיבוד הנתונים המעורבבים.

בדוגמה הקודמת, אם מספר המחיצות מוגדר כ-2, כל תהליך executor מחשב את הערכים המצטברים של שני פריטים במקום פריט אחד.

שימו לב שאפשר להקטין את רמת המקביליות של צינור העיבוד אחרי השלב הזה. לדוגמה, נסתכל על התצוגה הלוגית של הצינור:

אם המקור מחלק את הנתונים ל-500 מחיצות, אבל הפעולה Group By מבצעת ערבוב באמצעות 200 מחיצות, רמת המקביליות המקסימלית אחרי הפעולה Group By יורדת מ-500 ל-200. במקום 500 קבצים שונים של חלקים שנכתבו ב-Cloud Storage, יש לכם רק 200.

בחירת מחיצות

אם מספר המחיצות נמוך מדי, לא תנצלו את מלוא הקיבולת של האשכול כדי להקביל כמה שיותר עבודות. הגדרת המחיצות גבוהה מדי מגדילה את כמות התקורה המיותרת. באופן כללי, עדיף להשתמש ביותר מדי מחיצות מאשר בפחות מדי מחיצות. אם צינור הנתונים פועל במשך כמה דקות ואתם מנסים לקצר את זמן הפעולה בכמה דקות, כדאי שתבדקו אם יש עלויות תקורה מיותרות. אם צינור העיבוד נמשך שעות, בדרך כלל אין צורך לדאוג לתקורה.

דרך שימושית, אבל פשוטה מדי, לקבוע את מספר המחיצות שבהן צריך להשתמש היא להגדיר אותו ל-max(cluster CPUs, input records / 500,000). במילים אחרות, מחלקים את מספר רשומות הקלט ב-500,000. אם המספר הזה גדול ממספר המעבדים באשכול, משתמשים בו כמספר המחיצות. אחרת, משתמשים במספר המעבדים המרכזיים באשכול. לדוגמה, אם באשכול יש 100 מעבדים, ובשלב ה-shuffle צפויים 100 מיליון רשומות קלט, צריך להשתמש ב-200 מחיצות.

תשובה מלאה יותר היא שפעולות ערבוב נתונים מתבצעות בצורה הכי טובה כשהנתונים הזמניים של הערבוב בכל מחיצה יכולים להיכנס במלואם לזיכרון של המפעיל, כך שלא צריך להעביר שום דבר לדיסק. מערכת Spark שומרת קצת פחות מ-30% מהזיכרון של executor כדי להחזיק נתוני shuffle. המספר המדויק הוא (הזיכרון הכולל – 300 MB) * 30%. אם נניח שכל רכיב executor מוגדר להשתמש בזיכרון של 2GB, המשמעות היא שכל מחיצה לא צריכה להכיל יותר מ-(2GB – 300MB) * 30% = בערך 500MB של רשומות. אם נניח שכל רשומה נדחסת לגודל של 1 KB, אז המשמעות היא (500 MB / מחיצה) / (1 KB / רשומה) = 500,000 רשומות לכל מחיצה. אם התהליכים שלכם משתמשים ביותר זיכרון, או שהרשומות קטנות יותר, אפשר לשנות את המספר הזה בהתאם.

חלוקת נתונים לא מאוזנת

שימו לב שבמקרה של הדוגמה שלמעלה, הרכישות של הפריטים השונים התפלגו באופן שווה. כלומר, היו שלוש רכישות של תפוחים, שלוש רכישות של בננות, שלוש רכישות של גזר ושלוש רכישות של ביצים. ערבוב על מפתח עם חלוקה שווה הוא סוג הערבוב עם הביצועים הכי טובים, אבל להרבה מערכי נתונים אין את המאפיין הזה. אם נמשיך עם הדוגמה הקודמת של רכישה במכולת, אפשר לצפות להרבה יותר רכישות של ביצים מאשר של כרטיסי ברכה לחתונה. אם יש כמה מפתחות ערבוב שהם הרבה יותר נפוצים ממפתחות אחרים, מדובר בנתונים מוטים. נתונים מוטים יכולים להניב ביצועים גרועים משמעותית בהשוואה לנתונים לא מוטים, כי כמות לא פרופורציונלית של עבודה מבוצעת על ידי מספר קטן של מפעילים. היא גורמת לכך שקבוצת משנה קטנה של מחיצות תהיה גדולה בהרבה מכל השאר.

בדוגמה הזו, יש פי חמש יותר רכישות של ביצים מאשר רכישות של כרטיסים, מה שאומר שחישוב המצטבר של הביצים ייקח בערך פי חמש יותר זמן. זה לא משנה הרבה כשעובדים עם 10 רשומות במקום עם שתיים, אבל זה משנה מאוד כשעובדים עם חמישה מיליארד רשומות במקום עם מיליארד. כשקיימת חלוקת נתונים לא מאוזנת, למספר המחיצות שמשמשות למיון נתונים אין השפעה גדולה על הביצועים של פייפליין.

ניתן לזהות חלוקת נתונים לא מאוזנת (partition skew) על ידי בחינת התרשים של רשומות הפלט לאורך זמן. אם השלב מוציא רשומות בקצב גבוה בהרבה בתחילת ההרצה של צינור הנתונים ואז פתאום מאט, יכול להיות שיש לכם נתונים מוטים.

אפשר גם לזהות חלוקת נתונים לא מאוזנת על ידי בדיקת השימוש בזיכרון של האשכול לאורך זמן. אם האשכול שלכם נמצא בקיבולת מלאה למשך זמן מסוים, אבל פתאום יש בו שימוש נמוך בזיכרון למשך זמן מסוים, זה גם סימן לכך שאתם מתמודדים עם חלוקת נתונים לא מאוזנת.

ההשפעה של נתונים מוטים על הביצועים היא הכי משמעותית כשמבצעים פעולת צירוף. יש כמה טכניקות שאפשר להשתמש בהן כדי לשפר את הביצועים של הצטרפויות מוטות. מידע נוסף זמין במאמר בנושא עיבוד מקביל של פעולות JOIN.

כוונון דינמי להפעלה

כדי להתאים את הביצוע באופן דינמי, צריך לציין את טווח המחיצות לשימוש, ולא את מספר המחיצה המדויק. מספר המחיצה המדויק, גם אם הוא מוגדר בהגדרות של צינור העברת הנתונים, מוזנח כשהביצוע האדפטיבי מופעל.

אם אתם משתמשים באשכול זמני של Managed Service for Apache Spark,‏ Cloud Data Fusion מגדיר את התצורה המתאימה באופן אוטומטי. אבל אם אתם משתמשים באשכול סטטי של Managed Service for Apache Spark או Hadoop, אתם יכולים להגדיר את שני פרמטרי התצורה הבאים:

  • spark.default.parallelism: מגדירים את הערך למספר הכולל של ליבות וירטואליות שזמינות באשכול. כך מוודאים שהאשכול לא נמצא בעומס נמוך מדי, ומגדירים את הגבול התחתון למספר המחיצות.
  • spark.sql.adaptive.coalescePartitions.initialPartitionNum: מגדירים את הערך ל-32x של מספר ליבות ה-vCore שזמינות באשכול. המאפיין הזה מגדיר את הגבול העליון של מספר המחיצות.
  • Spark.sql.adaptive.enabled: כדי להפעיל את האופטימיזציות, מגדירים את הערך הזה ל-true. השירות Managed Service for Apache Spark מגדיר את זה באופן אוטומטי, אבל אם אתם משתמשים באשכולות Hadoop כלליים, אתם צריכים לוודא שההגדרה הזו מופעלת .

אפשר להגדיר את הפרמטרים האלה בהגדרות המנוע של צינור ספציפי או במאפייני האשכול של אשכול סטטי של Managed Service for Apache Spark.

המאמרים הבאים