שיטות מומלצות לתהליכי עבודה שיש ביניהם הקבלה גבוהה

בדף הזה מפורטות שיטות מומלצות לבנייה ולהפעלה של תהליכי עבודה מקביליים מאוד של Dataflow HPC, כולל הסבר על השימוש בקוד חיצוני בצינורות עיבוד הנתונים, על הפעלת צינור עיבוד הנתונים ועל ניהול הטיפול בשגיאות.

הוספת קוד חיצוני לצינור עיבוד הנתונים

ההבדל העיקרי בין צינורות (pipelines) מקבילים מאוד לבין צינורות אחרים הוא שהם משתמשים בקוד C++‎ בתוך DoFn ולא באחת משפות ה-SDK הרגילות של Apache Beam. בצינורות Java, כדי להקל על השימוש בספריות C++‎ בצינור, מומלץ להשתמש בקריאות לפרוצדורות חיצוניות. בקטע הזה מתואר הגישה הכללית שמשמשת להרצת קוד חיצוני (C++) בצינורות Java.

הגדרת צינור עיבוד נתונים של Apache Beam כוללת כמה רכיבים מרכזיים:

  • PCollections הם אוספים קבועים של רכיבים הומוגניים.
  • PTransforms משמשים להגדרת הטרנספורמציות ל-PCollection שיוצר PCollection אחר.
  • הצינור הוא המבנה שמאפשר לכם, באמצעות קוד, להצהיר על האינטראקציות בין PTransforms לבין PCollections. צינור הנתונים מיוצג כגרף אציקלי מכוון (DAG).

כשמשתמשים בקוד משפה שלא נכללת ברשימת השפות הסטנדרטיות של Apache Beam SDK, צריך להציב את הקוד ב-PTransform, שנמצא בתוך DoFn, ולהשתמש באחת משפות ה-SDK הסטנדרטיות כדי להגדיר את צינור הנתונים עצמו. מומלץ להשתמש ב-Apache Beam Python SDK כדי להגדיר את צינור הנתונים, כי ב-Python SDK יש מחלקה של כלי עזר שמפשטת את השימוש בקוד אחר. עם זאת, אפשר להשתמש בערכות SDK אחרות של Apache Beam.

אפשר להשתמש בקוד כדי לבצע ניסויים מהירים בלי לבצע בנייה מלאה. במערכת ייצור, בדרך כלל יוצרים קבצים בינאריים משלכם, וכך יש לכם חופש להתאים את התהליך לצרכים שלכם.

התרשים הבא ממחיש את שני השימושים בנתוני צינור עיבוד הנתונים:

  • התהליך מבוסס על נתונים.
  • הנתונים נאספים במהלך העיבוד ומצורפים לנתוני הנהג.

שני שלבים של נתוני צינור

בדף הזה, נתונים ראשיים (מהמקור) נקראים נתונים מניעים, ונתונים משניים (משלב העיבוד) נקראים נתוני הצטרפות.

בתרחיש לדוגמה בתחום הפיננסים, הנתונים המניעים יכולים להיות כמה מאות אלפי עסקאות. כל עסקה צריכה לעבור עיבוד בשילוב עם נתוני שוק. במקרה כזה, נתוני השוק הם הנתונים שמצורפים. בתרחיש שימוש במדיה, נתוני הליבה יכולים להיות קבצי תמונות שנדרש עיבוד שלהם אבל לא נדרשים מקורות נתונים אחרים, ולכן לא נעשה בהם שימוש בנתוני הצטרפות.

שיקולים לגבי גודל נתוני הנהיגה

אם הגודל של רכיב נתוני הליבה הוא בטווח של מגה-בייטים בודדים, צריך להתייחס אליו לפי הפרדיגמה הרגילה של Apache Beam, כלומר ליצור אובייקט PCollection מהמקור ולשלוח את האובייקט לטרנספורמציות של Apache Beam לעיבוד.

אם גודל רכיב הנתונים המניע הוא במגה-בייט גבוה או בגיגה-בייט, כמו שקורה בדרך כלל במדיה, אפשר להכניס את הנתונים המניעים ל-Cloud Storage. לאחר מכן, באובייקט PCollection ההתחלתי, מציינים את ה-URI של האחסון, ומשתמשים רק בהפניה ל-URI של הנתונים האלה.

שיקולים לגבי גודל הנתונים כשמצטרפים לנתונים

אם הנתונים לצירוף הם כמה מאות מגה-בייט או פחות, אפשר להשתמש בקלט צדדי כדי להעביר את הנתונים האלה לטרנספורמציות של Apache Beam. קלט הצד שולח את מנת הנתונים לכל עובד שזקוק לה.

אם הנתונים לצירוף הם בטווח של גיגה-בייט או טרה-בייט, אפשר להשתמש ב-Bigtable או ב-Cloud Storage כדי למזג את הנתונים לצירוף עם הנתונים העיקריים, בהתאם לאופי הנתונים. ‫Bigtable הוא פתרון אידיאלי לתרחישים פיננסיים שבהם יש גישה תכופה לנתוני שוק כחיפושים של צמדי מפתח/ערך מ-Bigtable. מידע נוסף על עיצוב הסכימה של Bigtable, כולל המלצות לעבודה עם נתונים של סדרות זמן, זמין במסמכי Bigtable הבאים:

הרצת הקוד החיצוני

יש הרבה דרכים להריץ קוד חיצוני ב-Apache Beam.

  • יוצרים תהליך שמופעל מאובייקט DoFn בתוך טרנספורמציה של Dataflow.

  • משתמשים ב-JNI עם Java SDK.

  • יוצרים תהליך משנה ישירות מאובייקט DoFn. הגישה הזו לא הכי יעילה, אבל היא יציבה ופשוטה ליישום. בגלל הבעיות הפוטנציאליות בשימוש ב-JNI, בדף הזה מוצג שימוש בקריאה לתהליך משנה.

כשמעצבים את תהליך העבודה, חשוב לקחת בחשבון את כל הצינור מקצה לקצה. חוסר היעילות בתהליך מתקזז עם העובדה שהעברת הנתונים ממקור הנתונים ועד ליעד מתבצעת באמצעות צינור אחד. אם משווים את הגישה הזו לגישות אחרות, כדאי לבדוק את הזמנים מקצה לקצה של צינור הנתונים ואת העלויות מקצה לקצה.

משיכת הקבצים הבינאריים למארחים

כשמשתמשים בשפה מקורית של Apache Beam, ‏ Apache Beam SDK מעביר אוטומטית את כל הקוד הנדרש לעובדים. עם זאת, כשמתקשרים לקוד חיצוני, צריך להעביר את הקוד באופן ידני.

קבצים בינאריים שמאוחסנים בקטגוריות

כדי להעביר את הקוד: בדוגמה מוצגים השלבים ל-Apache Beam Java SDK.

  1. אחסון הקוד החיצוני שעבר קומפילציה, יחד עם פרטי ניהול הגרסאות, ב-Cloud Storage.
  2. בשיטה @Setup, יוצרים בלוק מסונכרן כדי לבדוק אם קובץ הקוד זמין במשאב המקומי. במקום להטמיע בדיקה פיזית, אפשר לאשר את הזמינות באמצעות משתנה סטטי כשהשרשור הראשון מסתיים.
  3. אם הקובץ לא זמין, משתמשים בספריית הלקוח של Cloud Storage כדי לשלוף את הקובץ מהקטגוריה של Cloud Storage אל העובד המקומי. הגישה המומלצת היא להשתמש במחלקה Apache Beam FileSystems למשימה הזו.
  4. אחרי שהקובץ מועבר, מוודאים שביט ההרשאה להרצה מוגדר בקובץ הקוד.
  5. במערכת ייצור, בודקים את הגיבוב של הקבצים הבינאריים כדי לוודא שהקובץ הועתק בצורה נכונה.

אפשרות נוספת היא להשתמש בפונקציה Apache Beam filesToStage, אבל היא מבטלת חלק מהיתרונות של היכולת של ה-Runner לארוז ולהעביר את קוד ה-Java באופן אוטומטי. בנוסף, מכיוון שהקריאה לתהליך המשנה צריכה מיקום קובץ מוחלט, צריך להשתמש בקוד כדי לקבוע את נתיב המחלקה, וכך את המיקום של הקובץ שהועבר על ידי filesToStage. אנחנו לא ממליצים על הגישה הזו.

הפעלת קבצים בינאריים חיצוניים

כדי להריץ קוד חיצוני, צריך ליצור לו עטיפה. כותבים את העטיפה הזו באותה שפה שבה כתוב הקוד החיצוני (למשל, C++‎) או כסקריפט מעטפת. העטיפה מאפשרת להעביר ידיות קבצים וליישם אופטימיזציות כמו שמתואר בקטע עיצוב עיבוד למחזורי מעבד קטנים בדף הזה. העטיפה לא צריכה להיות מורכבת. בקטע הקוד הבא מוצג תיאור של wrapper ב-C++.

int main(int argc, char* argv[])
{
    if(argc < 3){
        std::cerr << "Required return file and data to process" << '\n';
        return 1;
    }

    std::string returnFile = argv[1];
    std::string word = argv[2];

    std::ofstream myfile;
    myfile.open (returnFile);
    myfile << word;
    myfile.close();
    return 0;
}

הקוד הזה קורא שני פרמטרים מרשימת הארגומנטים. הפרמטר הראשון הוא המיקום של קובץ ההחזרה שאליו הנתונים נדחפים. הפרמטר השני הוא הנתונים שהקוד משקף למשתמש. ביישומים בעולם האמיתי, הקוד הזה יעשה יותר מאשר להדפיס את המחרוזת Hello, world!

אחרי שכותבים את קוד העטיפה, מריצים את הקוד החיצוני באופן הבא:

  1. העברת הנתונים לקובצי ההפעלה של הקוד החיצוני.
  2. מריצים את הקבצים הבינאריים, מאתרים שגיאות ומתעדים את השגיאות והתוצאות.
  3. טיפול בפרטי הרישום.
  4. לכידת נתונים מהעיבוד שהושלם.

העברת הנתונים לקבצים הבינאריים

כדי להתחיל בתהליך ההפעלה של הספרייה, מעבירים נתונים לקוד C++. בשלב הזה אפשר לנצל את השילוב של Dataflow עם כלים אחרים של Google Cloud Platform. כלי כמו Bigtable יכול להתמודד עם מערכי נתונים גדולים מאוד ולטפל בגישה עם זמן אחזור קצר ובמקביל למספר רב של משתמשים, מה שמאפשר לאלפי ליבות לגשת למערך הנתונים בו-זמנית. בנוסף, Bigtable יכול לעבד מראש נתונים, מה שמאפשר לעצב, להעשיר ולסנן נתונים. אפשר לבצע את כל הפעולות האלה בטרנספורמציות של Apache Beam לפני שמריצים את הקוד החיצוני.

במערכת ייצור, הדרך המומלצת היא להשתמש ב-מאגר אחסון לפרוטוקולים כדי להצפין את נתוני הקלט. אפשר להמיר את נתוני הקלט לבייטים ולקודד אותם ב-Base64 לפני שמעבירים אותם לספרייה החיצונית. יש שתי דרכים להעביר את הנתונים האלה לספרייה החיצונית:

  • נתוני קלט קטנים. אם הנתונים קטנים ולא חורגים מהאורך המקסימלי של ארגומנט של פקודה במערכת, אפשר להעביר את הארגומנט במיקום 2 של התהליך שנוצר באמצעות java.lang.ProcessBuilder.
  • נתוני קלט גדולים. כדי להכיל את הנתונים שנדרשים לתהליך, צריך ליצור קובץ ששמו כולל UUID אם גודל הנתונים גדול יותר.

הרצת קוד C++, תפיסת שגיאות ורישום ביומן

תיעוד מידע על שגיאות וטיפול בהן הוא חלק חשוב בצינור הנתונים. המשאבים שמשמשים את Dataflow runner הם זמניים, ולעתים קרובות קשה לבדוק את קובצי היומן של worker. חשוב לוודא שאתם מתעדים את כל המידע השימושי ושולחים אותו לרישום ביומן של Dataflow runner, ושאתם מאחסנים את נתוני הרישום ביומן בקטגוריה אחת או יותר של Cloud Storage.

הגישה המומלצת היא להפנות את stdout ו-stderr לקבצים, וכך לא צריך להתחשב בבעיות שקשורות לזיכרון. לדוגמה, ב-Dataflow runner שקורא לקוד C++, אפשר לכלול שורות כמו הבאות:

Java

  import java.lang.ProcessBuilder.Redirect;
  ...
      processbuilder.redirectError(Redirect.appendTo(errfile));
      processbuilder.redirectOutput(Redirect.appendTo(outFile));

Python

# Requires Apache Beam 2.34 or later.
stopping_times, bad_values = (
    integers
    | beam.Map(collatz.total_stopping_time).with_exception_handling(
        use_subprocess=True))

# Write the bad values to a side channel.
bad_values | 'WriteBadValues' >> beam.io.WriteToText(
    os.path.splitext(output_path)[0] + '-bad.txt')

טיפול בפרטי רישום

במקרים רבים של שימוש, המערכת מעבדת מיליוני רכיבים. עיבוד מוצלח יוצר יומנים עם ערך מועט או ללא ערך, ולכן צריך לקבל החלטה עסקית לגבי שמירת נתוני היומן. לדוגמה, כדאי לשקול את החלופות הבאות במקום לשמור את כל נתוני היומן:

  • אם המידע שכלול ביומנים מעיבוד מוצלח של רכיב לא חשוב, אל תשמרו אותו.
  • יוצרים לוגיקה שמבצעת דגימה של נתוני היומן, למשל דגימה של כל 10,000 רשומות ביומן. אם העיבוד הומוגני, למשל כשאיטרציות רבות של הקוד יוצרות נתוני יומן זהים במהותם, הגישה הזו מספקת איזון יעיל בין שמירת נתוני היומן לבין אופטימיזציה של העיבוד.

במקרים של כשלים, כמות הנתונים שמועברת ליומנים יכולה להיות גדולה. אסטרטגיה יעילה לטיפול בכמויות גדולות של נתונים ביומן השגיאות היא לקרוא את השורות הראשונות של רשומת היומן ולהעביר רק את השורות האלה ל-Cloud Logging. אפשר לטעון את שאר קובץ היומן לקטגוריות של Cloud Storage. הגישה הזו מאפשרת לכם לעיין בשורות הראשונות של יומני השגיאות מאוחר יותר, ואם צריך, לעיין בקובץ המלא ב-Cloud Storage.

כדאי גם לבדוק את הגודל של קובץ היומן. אם גודל הקובץ הוא אפס, אפשר להתעלם ממנו או לרשום הודעת יומן פשוטה שלפיה בקובץ לא היו נתונים.

איסוף נתונים מעיבוד שהושלם

לא מומלץ להשתמש ב-stdout כדי להעביר את תוצאת החישוב בחזרה לפונקציה DoFn. קוד אחר שהקוד שלכם ב-C++‎ קורא לו, ואפילו הקוד שלכם, עשויים לשלוח הודעות גם אל stdout, ולזהם את הזרם stdoutput שמכיל נתוני רישום. במקום זאת, מומלץ לבצע שינוי בקוד העטיפה של C++ כדי לאפשר לקוד לקבל פרמטר שמציין איפה ליצור את הקובץ שבו מאוחסן הערך. מומלץ לאחסן את הקובץ הזה בפורמט ניטרלי לשפה באמצעות מאגרי פרוטוקולים, כדי שקוד C++‎ יוכל להעביר אובייקט בחזרה לקוד Java או Python. DoFnהאובייקט יכול לקרוא את התוצאה ישירות מהקובץ ולהעביר את פרטי התוצאה לקריאה משלו output.

הניסיון מלמד שחשוב להריץ בדיקות יחידה שמתייחסות לתהליך עצמו. חשוב להטמיע בדיקת יחידה שמריצה את התהליך באופן עצמאי מצינור עיבוד הנתונים של Dataflow. ניפוי הבאגים בספרייה יכול להיות יעיל הרבה יותר אם היא עצמאית ולא צריך להריץ את כל הצינור.

עיצוב עיבוד למחזורי CPU קטנים

יש תקורה לקריאה לתהליך משנה. בהתאם לעומס העבודה, יכול להיות שתצטרכו לבצע פעולות נוספות כדי להקטין את היחס בין העבודה שמתבצעת לבין התקורה הניהולית של הפעלה וסגירה של התהליך.

בתרחיש השימוש במדיה, גודל רכיב הנתונים המרכזי יכול להיות במגה-בייט או בגיגה-בייט. כתוצאה מכך, העיבוד של כל רכיב נתונים יכול להימשך כמה דקות. במקרה כזה, העלות של קריאה לתהליך המשנה זניחה בהשוואה לזמן העיבוד הכולל. הגישה הכי טובה במצב כזה היא להגדיר שרכיב יחיד יתחיל תהליך משלו.

עם זאת, בתרחישי שימוש אחרים, כמו בתחום הפיננסים, העיבוד דורש יחידות קטנות מאוד של זמן מעבד (עשרות אלפיות שנייה). במקרה כזה, התקורה של קריאה לתהליך המשנה גדולה באופן לא פרופורציונלי. פתרון לבעיה הזו הוא שימוש בטרנספורמציה GroupByKey של Apache Beam כדי ליצור קבוצות של 50 עד 100 רכיבים שיוזנו לתהליך. לדוגמה, אפשר לבצע את השלבים הבאים:

  • בפונקציה DoFn, יוצרים צמד מפתח/ערך. אם אתם מעבדים עסקאות פיננסיות, אתם יכולים להשתמש במספר העסקה כמפתח. אם אין לכם מספר ייחודי שאפשר להשתמש בו כמפתח, אתם יכולים ליצור סכום ביקורת מהנתונים ולהשתמש בפונקציית מודולו כדי ליצור מחיצות של 50 רכיבים.
  • שולחים את המפתח לפונקציה GroupByKey.create, שמחזירה אוסף KV<key,Iterable<data>> שמכיל את 50 האלמנטים שאפשר לשלוח לתהליך.

הגבלת מקביליות של עובדים

כשעובדים עם שפה שיש לה תמיכה מובנית ב-Dataflow runner, לא צריך לחשוב על מה שקורה לעובד. ב-Dataflow יש הרבה תהליכים שמפקחים על בקרה על זרימת נתונים ועל השרשורים במצב אצווה או במצב סטרימינג.

עם זאת, אם אתם משתמשים בשפה חיצונית כמו C++, כדאי לדעת שאתם עושים משהו קצת לא רגיל כשאתם מתחילים תהליכי משנה. במצב אצווה, רץ Dataflow משתמש ביחס קטן של שרשורי עבודה למעבדים בהשוואה למצב סטרימינג. מומלץ, במיוחד במצב סטרימינג, ליצור סמפור בתוך המחלקה כדי לשלוט באופן ישיר יותר במקביליות של כל עובד.

לדוגמה, כשמעבדים מדיה, יכול להיות שלא תרצו שמאות רכיבי המרת קידוד יעובדו במקביל על ידי עובד יחיד. במקרים כאלה, אפשר ליצור מחלקה של כלי עזר שמספקת הרשאות לפונקציה DoFn לעבודה שמבוצעת. השימוש במחלקה הזו מאפשר לכם לשלוט ישירות בשרשורי העובדים בצינור.

שימוש ביעדי נתונים בקיבולת גבוהה ב-Google Cloud Platform

אחרי שהנתונים עוברים עיבוד, הם נשלחים ל-data sink. היעד צריך להיות מסוגל לטפל בנפח התוצאות שנוצרות על ידי פתרון העיבוד ברשת.

בתרשים הבא מוצגים חלק מה-sinks שזמינים ב-Google Cloud Platform כש-Dataflow מפעיל עומס עבודה של רשת.

מאגרי נתונים שזמינים ב-Google Cloud Platform

‫Bigtable,‏ BigQuery ו-Pub/Sub יכולים להתמודד עם זרמים גדולים מאוד של נתונים. לדוגמה, כל צומת של Bigtable יכול לטפל ב-10,000 הוספות בשנייה בגודל של עד 1K עם יכולת קלה להרחבה אופקית. כתוצאה מכך, אשכול Bigtable עם 100 צמתים יכול לקלוט מיליון הודעות לשנייה שנוצרות על ידי רשת Dataflow.

ניהול שגיאות פילוח

כשמשתמשים בקוד C++ בצינור, צריך להחליט איך לנהל שגיאות פילוח (segfaults), כי אם לא מטפלים בהן בצורה נכונה, ההשלכות שלהן לא מוגבלות למקום מסוים. ה-runner של Dataflow יוצר תהליכים לפי הצורך ב-Java,‏ Python או Go, ואז מקצה עבודה לתהליכים בצורה של חבילות.

אם הקריאה לקוד C++ מתבצעת באמצעות כלים עם צימוד הדוק, כמו JNI או Cython, ותהליך C++ גורם לשגיאת פילוח (segfault), גם תהליך הקריאה והמכונה הווירטואלית של Java‏ (JVM) קורסים. בתרחיש הזה, אי אפשר לזהות נקודות נתונים לא תקינות. כדי שיהיה אפשר לזהות נקודות נתונים בעייתיות, צריך להשתמש בצימוד חלש יותר, שמפצל את הנתונים הבעייתיים ומאפשר לצינור עיבוד הנתונים להמשיך. עם זאת, אם יש לכם קוד C++ בוגר שנבדק באופן מלא מול כל וריאציות הנתונים, אתם יכולים להשתמש במנגנונים כמו Cython.

המאמרים הבאים