פיתוח ובדיקה של צינורות עיבוד נתונים של Dataflow

בדף הזה מפורטות שיטות מומלצות לפיתוח ולבדיקה של צינור עיבוד נתונים ב-Dataflow.

סקירה כללית

האופן שבו הקוד של צינור עיבוד הנתונים מוטמע משפיע באופן משמעותי על הביצועים של צינור עיבוד הנתונים בסביבת הייצור. כדי לעזור לכם ליצור קוד של צינורות נתונים שפועל בצורה נכונה ויעילה, במסמך הזה מוסבר:

  • מפעילים של צינורות נתונים לתמיכה בהרצת קוד בשלבים השונים של הפיתוח והפריסה.
  • סביבות פריסה שמאפשרות להפעיל צינורות עיבוד נתונים במהלך פיתוח, בדיקה, טרום-ייצור וייצור.
  • קוד ותבניות של צינורות עיבוד נתונים בקוד פתוח שאפשר להשתמש בהם כמו שהם, או כבסיס לצינורות עיבוד נתונים חדשים כדי להאיץ את פיתוח הקוד.
  • גישה מבוססת-שיטות מומלצות לבדיקת קוד של צינורות עיבוד נתונים. קודם כל, במסמך הזה מופיעה סקירה כללית שכוללת את ההיקף והקשר של סוגי בדיקות שונים, כמו בדיקות יחידה, בדיקות שילוב ובדיקות מקצה לקצה. בנוסף, כל סוג של בדיקה מוסבר בפירוט, כולל שיטות ליצירה ושילוב עם נתוני בדיקה, ואיזה רכיבי הפעלה של צינורות עיבוד נתונים צריך להשתמש בכל בדיקה.

הפעלת צינורות עיבוד נתונים

במהלך הפיתוח והבדיקות, משתמשים ב-runners שונים של Apache Beam כדי להריץ קוד של צינורות. ‫Apache Beam SDK מספק Direct Runner לפיתוח ולבדיקה מקומיים. כלי האוטומציה שלכם להפצה יכולים גם להשתמש ב-Direct Runner לבדיקות יחידה ולבדיקות שילוב. לדוגמה, אפשר להשתמש ב-Direct Runner בצינור עיבוד נתונים של אינטגרציה רציפה (CI).

צינורות עיבוד נתונים שנפרסים ב-Dataflow משתמשים ב-Dataflow Runner, שמריץ את צינור עיבוד הנתונים בסביבות שדומות לסביבות ייצור. בנוסף, אפשר להשתמש ב-Dataflow Runner לבדיקות פיתוח אד-הוק ולבדיקות של צינורות עיבוד נתונים מקצה לקצה.

בדף הזה מתמקדים בהרצת צינורות עיבוד נתונים שנבנו באמצעות Apache Beam Java SDK, אבל Dataflow תומך גם בצינורות עיבוד נתונים של Apache Beam שפותחו באמצעות Python ו-Go. ערכות ה-SDK של Apache Beam Java,‏ Python ו-Go הן זמינות לכולם לשימוש ב-Dataflow. מפתחי SQL יכולים גם להשתמש ב-Apache Beam SQL כדי ליצור צינורות עיבוד נתונים שמשתמשים בניבי SQL מוכרים.

הגדרת סביבת פריסה

כדי להפריד בין משתמשים, נתונים, קוד ומשאבים אחרים בשלבים שונים של הפיתוח, צריך ליצור סביבות פריסה. כדי לספק סביבות מבודדות לשלבים השונים של פיתוח צינורות, מומלץ להשתמש בGoogle Cloud פרויקטים נפרדים.

בקטעים הבאים מתוארת קבוצה אופיינית של סביבות פריסה.

הסביבה המקומית

הסביבה המקומית היא תחנת עבודה של מפתח. לפיתוח ולבדיקה מהירה, משתמשים ב-Direct Runner כדי להריץ קוד של צינור מקומית.

צינורות שמופעלים באופן מקומי באמצעות Direct Runner יכולים לקיים אינטראקציה עם משאבים מרוחקים של Google Cloud Platform, כמו נושאי Pub/Sub או טבלאות BigQuery. כדאי לתת לכל מפתח פרויקטים נפרדים שלGoogle Cloud כדי שיהיה להם ארגז חול לבדיקות אד-הוק עם שירותי Google Cloud Platform.

חלק מהשירותים, כמו Google Cloud Pub/Sub ו-Bigtable, מספקים אמולטורים לפיתוח מקומי. אפשר להשתמש באמולטורים האלה עם Direct Runner כדי להפעיל פיתוח ובדיקות מקומיים מקצה לקצה.

סביבת ארגז חול

סביבת ארגז החול היא Google Cloud פרויקט שמאפשר למפתחים גישה לשירותים Google Cloud במהלך פיתוח הקוד. מפתחים של צינורות יכולים לשתף פרויקט עם מפתחים אחרים, או להשתמש בפרויקטים אישיים משלהם. Google Cloud שימוש בפרויקטים נפרדים מפשט את התכנון שקשור לשימוש במשאבים משותפים ולניהול מכסות.

מפתחים משתמשים בסביבת ארגז החול כדי להפעיל צינורות עיבוד נתונים אד-הוק באמצעות Dataflow Runner. סביבת הארגז חול שימושית לניפוי באגים ולבדיקת קוד מול רכיב הפעלה של ייצור במהלך שלב פיתוח הקוד. לדוגמה, הפעלה אד-הוק של צינור עיבוד נתונים מאפשרת למפתחים לבצע את הפעולות הבאות:

  • בודקים את ההשפעה של שינויים בקוד על התנהגות ההרחבה.
  • הסבר על ההבדלים הפוטנציאליים בין ההתנהגות של DirectRunner לבין Dataflow Runner.
  • הסבר על האופן שבו Dataflow מבצע אופטימיזציות של גרפים.

לצורך בדיקות אד-הוק, מפתחים יכולים לפרוס קוד מהסביבה המקומית שלהם כדי להריץ את Dataflow בסביבת ארגז החול שלהם.

סביבת טרום-ייצור

סביבת טרום-ייצור מיועדת לשלבי פיתוח שצריכים לפעול בתנאים דומים לתנאי הייצור, כמו בדיקות מקצה לקצה. שימוש בפרויקט נפרד לסביבת טרום-ייצור והגדרתו כך שיהיה דומה ככל האפשר לסביבת הייצור. באופן דומה, כדי לאפשר בדיקות מקצה לקצה עם קנה מידה שדומה לזה של סביבת הייצור, כדאי להגדיר את מכסות הפרויקט של Dataflow ושירותים אחרים כך שיהיו דומות ככל האפשר לסביבת הייצור. Google Cloud

בהתאם לדרישות שלכם, תוכלו לחלק את סביבת הטרום-ייצור לכמה סביבות. לדוגמה, סביבה של בקרת איכות יכולה לתמוך בעבודתם של אנליסטים של איכות, כדי לבדוק יעדים של רמת שירות (SLO) כמו נכונות נתונים, עדכניות וביצועים בתנאים שונים של עומס עבודה.

בדיקות מקצה לקצה כוללות שילוב עם מקורות נתונים ויעדי נתונים במסגרת הבדיקה. כדאי לחשוב איך להפוך אותם לזמינים בסביבת הטרום-הפקה. אפשר לאחסן את נתוני הבדיקה בסביבת הטרום-ייצור עצמה. לדוגמה, נתוני בדיקה מאוחסנים בקטגוריה של Cloud Storage עם נתוני הקלט שלכם. במקרים אחרים, נתוני בדיקה עשויים להגיע ממקור מחוץ לסביבת הטרום-ייצור, כמו נושא ב-Pub/Sub דרך מינוי נפרד שנמצא בסביבת הייצור. בצינורות עיבוד נתונים בסטרימינג, אפשר גם להריץ בדיקות מקצה לקצה באמצעות נתונים שנוצרו, למשל באמצעות הכלי ליצירת נתונים בסטרימינג של Dataflow, כדי לדמות מאפיינים ונפחים של נתונים שדומים לנתונים בסביבת הייצור.

בצינורות עיבוד נתונים של סטרימינג, משתמשים בסביבת טרום-ייצור כדי לבדוק עדכונים של צינורות עיבוד נתונים לפני שמבצעים שינויים בסביבת הייצור. חשוב לבדוק ולאמת את תהליכי העדכון של צינורות להזרמת נתונים, במיוחד אם צריך לתאם בין כמה שלבים, למשל כשמריצים צינורות מקבילים כדי למנוע השבתה.

סביבת ייצור

סביבת הייצור היא Google Cloud פרויקט ייעודי. במסגרת אספקה רציפה, מתבצעת העתקה של ארטיפקטים של פריסה לסביבת הייצור אחרי שכל בדיקות הקצה לקצה עברו בהצלחה.

שיטות מומלצות לפיתוח

שיטות מומלצות לשימוש בפייפליין ב-Dataflow

בדיקת הפייפליין

בפיתוח תוכנה, בדיקות יחידה, בדיקות שילוב ובדיקות מקצה לקצה הן סוגים נפוצים של בדיקות תוכנה. סוגי הבדיקות האלה רלוונטיים גם לצינורות להעברת נתונים.

ה-SDK של Apache Beam מספק פונקציונליות להפעלת הבדיקות האלה. באידיאל, כל סוג של בדיקה מיועד לסביבת פריסה אחרת. הדיאגרמה הבאה ממחישה כיצד בדיקות יחידה, בדיקות שילוב ובדיקות מקצה לקצה חלות על חלקים שונים של צינור עיבוד הנתונים והנתונים.

סוגי הבדיקות והקשר שלהן לטרנספורמציות, לצינורות עיבוד נתונים, למקורות נתונים ולמאגרי נתונים.

בתרשים מוצג ההיקף של בדיקות שונות והקשר שלהן לטרנספורמציות (מחלקות משנה של DoFn ו-PTransform), לצינורות, למקורות נתונים ולמאגרי נתונים.

בקטעים הבאים מתואר איך בדיקות רשמיות שונות של תוכנה חלות על צינורות נתונים באמצעות Dataflow. בזמן שאתם קוראים את הקטע הזה, כדאי לחזור לתרשים כדי להבין את הקשר בין סוגי הבדיקות השונים.

דגימת נתונים

כדי לראות את הנתונים בכל שלב בצינור עיבוד הנתונים של Dataflow, מפעילים דגימת נתונים במהלך הבדיקה. כך תוכלו לראות את הפלט של הטרנספורמציות ולוודא שהפלט נכון.

בדיקות יחידה

בדיקות יחידה מעריכות את הפעולה התקינה של מחלקות משנה של DoFn ושל טרנספורמציות מורכבות (מחלקות משנה של PTransform) על ידי השוואת הפלט של הטרנספורמציות האלה עם קבוצה מאומתת של נתוני קלט ופלט. בדרך כלל, מפתחים יכולים להריץ את הבדיקות האלה בסביבה המקומית. אפשר גם להריץ את הבדיקות באופן אוטומטי באמצעות אוטומציה של בדיקות יחידה, באמצעות שילוב רציף (CI) בסביבת הבנייה.

הכלי Direct Runner מריץ בדיקות יחידה באמצעות קבוצת משנה קטנה יותר של נתוני בדיקה להפניה, שמתמקדת בבדיקת הלוגיקה העסקית של ההמרות. נתוני הבדיקה צריכים להיות קטנים מספיק כדי להיכנס לזיכרון המקומי במחשב שבו מריצים את הבדיקה.

‫Apache Beam SDK מספק כלל JUnit שנקרא TestPipeline לבדיקת יחידות של טרנספורמציות נפרדות (מחלקות משנה של DoFn), טרנספורמציות מורכבות (מחלקות משנה של PTransform) וצינורות שלמים. אפשר להשתמש ב-TestPipeline ב-Apache Beam pipeline runner כמו Direct Runner או Dataflow Runner כדי להחיל הצהרות על התוכן של אובייקטים מסוג PCollection באמצעות PAssert, כמו שמוצג בקטע הקוד הבא של JUnit test class:

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void myPipelineTest() throws Exception {
  final PCollection<String> pcol = p.apply(...)
  PAssert.that(pcol).containsInAnyOrder(...);
  p.run();
}

בדיקות יחידה של טרנספורמציות ספציפיות

אם מחלקים את הקוד לטרנספורמציות שאפשר לעשות בהן שימוש חוזר, למשל כסיווגים ברמה העליונה או כסיווגים סטטיים מוטמעים, אפשר ליצור בדיקות ממוקדות לחלקים שונים בצינור. בנוסף ליתרונות של בדיקות, טרנספורמציות לשימוש חוזר משפרות את יכולת התחזוקה והשימוש החוזר בקוד, כי הן מאפשרות להכניס את הלוגיקה העסקית של צינור עיבוד הנתונים לחלקים נפרדים. לעומת זאת, אם צינור עיבוד הנתונים משתמש במחלקות פנימיות אנונימיות כדי להטמיע טרנספורמציות, יכול להיות שיהיה קשה לבדוק חלקים נפרדים של צינור עיבוד הנתונים.

בקטע הקוד הבא ב-Java מוצגת ההטמעה של טרנספורמציות בתור מחלקות פנימיות אנונימיות, שלא מאפשרות בדיקה בקלות.

PipelineOptions options = PipelineOptionsFactory.create();

Pipeline p = Pipeline.create(options)

PCollection<Integer> output =
    p.apply("Read from text", TextIO.Read.from(...))
        .apply("Split words", ParDo.of(new DoFn() {
          // Untestable anonymous transform 1
        }))
        .apply("Generate anagrams", ParDo.of(new DoFn() {
          // Untestable anonymous transform 2
        }))
        .apply("Count words", Count.perElement());

משווים את הדוגמה הקודמת לדוגמה הבאה, שבה מחלקות פנימיות אנונימיות עוברות רפקטורינג למחלקות משנה קונקרטיות בשם DoFn. אפשר ליצור בדיקות יחידה נפרדות לכל מחלקת משנה קונקרטית DoFn שמרכיבה את צינור הנתונים מקצה לקצה.

PipelineOptions options = PipelineOptionsFactory.create();

Pipeline p = Pipeline.create(options)

PCollection<Integer> output =
    p.apply("Read from text", TextIO.Read.from(...))
        .apply("Split words", ParDo.of(new SplitIntoWordsFn()))
        .apply("Generate anagrams", ParDo.of(new GenerateAnagramsFn()))
        .apply("Count words", Count.perElement());

הבדיקה של כל מחלקת משנה DoFn דומה לבדיקת יחידה של צינור להעברת נתונים (pipeline) של אצווה שמכיל טרנספורמציה אחת. משתמשים בטרנספורמציה Create כדי ליצור אובייקט PCollection של נתוני בדיקה, ואז מעבירים אותו לאובייקט DoFn. משתמשים ב-PAssert כדי לוודא שהתוכן של האובייקט PCollection נכון. בדוגמה הבאה של קוד Java נעשה שימוש במחלקה PAssert כדי לבדוק את פורמט הפלט הנכון.

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void testGenerateAnagramsFn() {
    // Create the test input
    PCollection<String> words = p.apply(Create.of("friend"));

    // Test a single DoFn using the test input
    PCollection<String> anagrams =
        words.apply("Generate anagrams", ParDo.of(new GenerateAnagramsFn()));

    // Assert correct output from
    PAssert.that(anagrams).containsInAnyOrder(
        "finder", "friend", "redfin", "refind");

    p.run();
}

בדיקות שילוב

בדיקות שילוב מאמתות את הפעולה התקינה של כל צינור הנתונים. כדאי להביא בחשבון את סוגי בדיקות השילוב הבאים:

  • בדיקת אינטגרציה של טרנספורמציה שמעריכה את הפונקציונליות המשולבת של כל הטרנספורמציות הבודדות שמרכיבות את פייפליין הנתונים. אפשר לחשוב על בדיקות שילוב של טרנספורמציות כעל בדיקות יחידה לצינור עיבוד הנתונים כולו, לא כולל שילוב עם מקורות נתונים חיצוניים ויעדים. ה-SDK של Apache Beam מספק שיטות להזנת נתוני בדיקה לפייפליין הנתונים ולאימות תוצאות העיבוד. הכלי Direct Runner משמש להרצת בדיקות שילוב של טרנספורמציות.
  • בדיקת שילוב מערכות שמעריכה את השילוב של צינור עיבוד הנתונים עם מקורות נתונים ועם יעדים בזמן אמת. כדי שצינור הנתונים יוכל לתקשר עם מערכות חיצוניות, צריך להגדיר את הבדיקות עם פרטי הכניסה המתאימים לגישה לשירותים חיצוניים. צינורות להעברת נתונים בסטרימינג פועלים ללא הגבלה, לכן צריך להחליט מתי ואיך להפסיק את הפעלת צינור הנתונים. באמצעות Direct Runner, אתם יכולים להריץ בדיקות שילוב של המערכת כדי לאמת במהירות את השילוב בין צינור עיבוד הנתונים למערכות אחרות, בלי לשלוח עבודת Dataflow ולחכות לסיום שלה.

תכנון בדיקות טרנספורמציה ושילוב מערכות כדי לספק גילוי מהיר של פגמים ומשוב בלי לפגוע בפרודוקטיביות של המפתחים. בבדיקות שפועלות לאורך זמן, כמו בדיקות שמופעלות כמשימות Dataflow, כדאי להשתמש בבדיקה מקצה לקצה שמופעלת בתדירות נמוכה יותר.

אפשר לחשוב על פייפליין כעל טרנספורמציה אחת או יותר שקשורות זו לזו. אפשר ליצור טרנספורמציה מורכבת שמכילה את כל הטרנספורמציות בצינור העיבוד, ולהשתמש ב-TestPipeline כדי לבצע בדיקת שילוב של צינור העיבוד כולו. בהתאם למצב שבו רוצים לבדוק את צינור עיבוד הנתונים – באצווה או בסטרימינג – מספקים נתוני בדיקה באמצעות טרנספורמציות Create או TestStream.

שימוש בנתוני בדיקה לבדיקות שילוב

בסביבת הייצור, צינור עיבוד הנתונים משולב כנראה עם מקורות נתונים שונים ויעדים שונים. עם זאת, כשמבצעים בדיקות יחידה ובדיקות שילוב של טרנספורמציות, צריך להתמקד באימות הלוגיקה העסקית של קוד צינור הנתונים על ידי הזנת נתוני בדיקה ואימות הפלט ישירות. בנוסף לפישוט הבדיקות, הגישה הזו מאפשרת לכם לבודד בעיות שספציפיות לצינורות מבעיות שיכולות להיגרם ממקורות נתונים ומ-data sinks.

צינורות עיבוד נתונים של מקבצי ניסיון

בצינורות לעיבוד נתונים באצווה, משתמשים בטרנספורמציה Create כדי ליצור אובייקט PCollection של נתוני הבדיקה מהקלט מתוך אוסף רגיל בזיכרון, כמו אובייקט List של Java. השימוש בטרנספורמציה Create מתאים אם נתוני הבדיקות קטנים מספיק כדי לכלול אותם בקוד. לאחר מכן תוכלו להשתמש באובייקטים של PAssert הפלט PCollection כדי לקבוע אם קוד צינור הנתונים תקין. הגישה הזו נתמכת על ידי Direct Runner ועל ידי Dataflow Runner.

בדוגמה הבאה של קטע קוד Java מוצגות טענות לגבי אובייקטים של פלט PCollection מטרנספורמציה מורכבת שכוללת חלק מהטרנספורמציות הנפרדות שמרכיבות צינור (WeatherStatsPipeline) או את כולן. הגישה דומה לבדיקות יחידה של טרנספורמציות נפרדות בצינור.

private class WeatherStatsPipeline extends
    PTransform<PCollection<Integer>, PCollection<WeatherSummary>> {
  @Override
  public PCollection<WeatherSummary> expand(PCollection<Integer> input) {
    // Pipeline transforms 
  }
}

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void testWeatherPipeline() {
  // Create test input consisting of temperature readings
  PCollection<Integer> tempCelsius =
      p.apply(Create.of(24, 22, 20, 22, 21, 21, 20));

  // CalculateWeatherStats calculates the min, max, and average temperature
  PCollection<WeatherSummary> result =
      tempCelsius.apply("Calculate weather statistics", new WeatherStatsPipeline());

   // Assert correct output from CalculateWeatherStats
   PAssert.thatSingleton(result).isEqualTo(new WeatherSummary.Builder()
       .withAverageTemp(21)
       .withMaxTemp(24)
       .withMinTemp(20)
       .build());

   p.run();
}

כדי לבדוק את אופן הפעולה של עיבוד החלק הנצפה בלבד, אפשר גם להשתמש בטרנספורמציה Create כדי ליצור רכיבים עם חותמות זמן, כמו שמוצג בקטע הקוד הבא:

private static final Duration WINDOW_DURATION = Duration.standardMinutes(3);

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void testWindowedData() {
    PCollection<String> input =
        p.apply(
            Create.timestamped(
                    TimestampedValue.of("a", new Instant(0L)),
                    TimestampedValue.of("a", new Instant(0L)),
                    TimestampedValue.of("b", new Instant(0L)),
                    TimestampedValue.of("c", new Instant(0L)),
                    TimestampedValue.of("c", new Instant(0L).plus(WINDOW_DURATION)))
                .withCoder(StringUtf8Coder.of()));

   PCollection<KV<String, Long>> windowedCount =
       input
           .apply(Window.into(FixedWindows.of(WINDOW_DURATION)))
           .apply(Count.perElement());

    PAssert.that(windowedCount)
        .containsInAnyOrder(
            // Output from first window
            KV.of("a", 2L),
            KV.of("b", 1L),
            KV.of("c", 1L),
            // Output from second window
            KV.of("c", 1L));

   p.run();
}

בדיקת צינורות עיבוד נתונים בסטרימינג

צינורות להזרמת נתונים מכילים הנחות שמגדירות איך לטפל בנתונים לא מוגבלים. ההנחות האלה מתייחסות בדרך כלל לעדכניות של הנתונים בתנאים של העולם האמיתי, ולכן הן משפיעות על הדיוק בהתאם לאמיתות שלהן. בבדיקות שילוב של צינורות עיבוד נתונים של סטרימינג, מומלץ לכלול בדיקות שמדמות את האופי הלא דטרמיניסטי של הגעת נתוני סטרימינג.

כדי להפעיל בדיקות כאלה, ה-SDK של Apache Beam מספק את המחלקה TestStream כדי ליצור מודל של ההשפעות של תזמוני רכיבים (נתונים מוקדמים, בזמן או מאוחרים) על התוצאות של פייפליין הנתונים. אפשר להשתמש בבדיקות האלה יחד עם המחלקה PAssert כדי לאמת את התוצאות הצפויות.

TestStream נתמכת על ידי Direct Runner ו-Dataflow Runner. בדוגמת הקוד הבאה נוצרת טרנספורמציה TestStream:

final Duration WINDOW_DURATION = Duration.standardMinutes(3);

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void testDroppedLateData() {
   TestStream<String> input = TestStream.create(StringUtf8Coder.of())
      // Add elements arriving before the watermark
      .addElements(
         TimestampedValue.of("a", new Instant(0L)),
         TimestampedValue.of("a", new Instant(0L)),
         TimestampedValue.of("b", new Instant(0L)),
         TimestampedValue.of("c", new Instant(0L).plus(Duration.standardMinutes(3))))
         // Advance the watermark past the end of the window
      .advanceWatermarkTo(new Instant(0L).plus(WINDOW_DURATION).plus(Duration.standardMinutes(1)))
      // Add elements which will be dropped due to lateness
      .addElements(
         TimestampedValue.of("c", new Instant(0L)))
      // Advance the watermark to infinity which will close all windows
      .advanceWatermarkToInfinity();

      PCollection<KV<String, Long>> windowedCount =
          p.apply(input)
             .apply(Window.into(FixedWindows.of(WINDOW_DURATION)))
             .apply(Count.perElement());

   PAssert.that(windowedCount)
      .containsInAnyOrder(
          // Output from first window
          KV.of("a", 2L),
          KV.of("b", 1L),
          KV.of("c", 1L));

   p.run();
}

מידע נוסף על TestStream זמין במאמר בנושא בדיקת צינורות ללא הגבלה ב-Apache Beam. מידע נוסף על השימוש ב-Apache Beam SDK לבדיקות יחידה זמין במאמרי העזרה של Apache Beam.

שימוש בשירותים בבדיקות שילוב Google Cloud

Direct Runner יכול להשתלב עם שירותים שונים, כך שבדיקות אד-הוק בסביבה המקומית ובדיקות שילוב מערכות יכולות להשתמש ב-Pub/Sub, ב-BigQuery ובשירותים אחרים לפי הצורך. Google Cloud כשמשתמשים ב-Direct Runner, הצינור משתמש ב-Application Default Credentials‏ (ADC) כדי לקבל פרטי כניסה. הגדרת ADC משתנה בהתאם למקום שבו מריצים את צינור העברת הנתונים. מידע נוסף זמין במאמר בנושא הגדרה של Application Default Credentials.

לפני שמריצים את צינור הנתונים, צריך להעניק לחשבון שבו צינור הנתונים משתמש הרשאות מספיקות לכל המשאבים הנדרשים. פרטים נוספים מופיעים במאמר בנושא אבטחה והרשאות ב-Dataflow.

לצורך בדיקות שילוב מקומיות לחלוטין, אפשר להשתמש באמולטורים מקומיים עבור חלק משירותיGoogle Cloud . אפשר להשתמש באמולטורים מקומיים ל-Pub/Sub ול-Bigtable.

כדי לבדוק את השילוב של צינורות עיבוד נתונים של סטרימינג במערכת, אפשר להשתמש בשיטה setBlockOnRun (שמוגדרת בממשק DirectOptions) כדי ש-Direct Runner יפעיל את צינור עיבוד הנתונים באופן אסינכרוני. אחרת, הביצוע של צינור העיבוד חוסם את תהליך האב שקורא לו (לדוגמה, סקריפט בצינור העיבוד של הבנייה) עד שמפסיקים את צינור העיבוד באופן ידני. אם מריצים את צינור עיבוד הנתונים באופן אסינכרוני, אפשר להשתמש במופע PipelineResult שמוחזר כדי לבטל את ההרצה של צינור עיבוד הנתונים, כמו בדוגמת הקוד הבאה:

public interface StreamingIntegrationTestOptions extends
   DirectOptions, StreamingOptions, MyOtherPipelineOptions {
   ...
}

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void testNonBlockingPipeline() {
    StreamingIntegrationTestOptions options =
        p.getOptions().as(StreamingIntegrationOptions.class);

    options.setBlockOnRun(false); // Set non-blocking pipeline execution
    options.setStreaming(true); // Set streaming mode

    p.apply(...); // Apply pipeline transformations

    PipelineResult result = p.run(); // Run the pipeline

    // Generate input, verify output, etc
    ...

    // Later on, cancel the pipeline using the previously returned
    result.cancel();
}

בדיקות מקצה לקצה

בדיקות מקצה לקצה מאמתות את הפעולה הנכונה של צינור הנתונים מקצה לקצה על ידי הפעלתו ב-Dataflow Runner בתנאים שדומים מאוד לתנאי הייצור. הבדיקות מאמתות שהפונקציות של הלוגיקה העסקית פועלות בצורה תקינה באמצעות Dataflow Runner, ובודקות אם צינור הנתונים פועל כמצופה בעומסים שדומים לעומסים בסביבת הייצור. בדרך כלל מריצים בדיקות מקצה לקצה בפרויקט ייעודי Google Cloud שמוגדר כסביבת טרום-ייצור.

כדי לבדוק את צינור הנתונים בקנה מידה שונה, אפשר להשתמש בסוגים שונים של בדיקות מקצה לקצה, לדוגמה:

  • כדי לאמת במהירות את הפונקציונליות של צינור הנתונים בסביבת טרום-ייצור, מומלץ להריץ בדיקות מקצה לקצה בקנה מידה קטן באמצעות חלק קטן (למשל אחוז אחד) של מערך נתוני הבדיקה.
  • להריץ בדיקות מקצה לקצה בקנה מידה גדול באמצעות מערך נתונים מלא של בדיקות כדי לאמת את הפונקציונליות של צינורות הנתונים בתנאים ובכמויות נתונים שדומים לאלה של סביבת הייצור.

במקרה של צינורות עיבוד נתונים של סטרימינג, מומלץ להריץ צינורות עיבוד נתונים של בדיקה במקביל לצינור עיבוד הנתונים של הייצור, אם הם יכולים להשתמש באותם נתונים. התהליך הזה מאפשר לכם להשוות בין התוצאות וההתנהגות התפעולית, כמו שינוי גודל אוטומטי וביצועים.

בדיקות מקצה לקצה עוזרות לחזות את מידת ההתאמה של צינור עיבוד הנתונים ל-SLO של הייצור. בסביבת טרום-ייצור נבדק צינור עיבוד הנתונים בתנאים דומים לתנאי הייצור. בבדיקות מקצה לקצה, צינורות עיבוד הנתונים פועלים באמצעות Dataflow Runner כדי לעבד קבוצות נתוני הפניה מלאות שתואמות לקבוצות נתונים בסביבת הייצור או דומות להן.

יכול להיות שלא ניתן ליצור נתונים סינתטיים לבדיקה שמדמים בצורה מדויקת נתונים אמיתיים. כדי לפתור את הבעיה, אפשר להשתמש בחילוצים מנוקים ממקורות נתונים של ייצור כדי ליצור מערכי נתונים של הפניה, שבהם כל המידע האישי הרגיש עובר אנונימיזציה באמצעות טרנספורמציות מתאימות. לשם כך מומלץ להשתמש בSensitive Data Protection. באמצעות Sensitive Data Protection אפשר לזהות מידע אישי רגיש ממגוון סוגי תוכן ומקורות נתונים, ולהחיל מגוון טכניקות להסרת פרטים מזהים, כולל צנזור, התממת מידע, הצפנה תוך שמירה על הפורמט והזזת תאריכים.

הבדלים בבדיקות מקצה לקצה של צינורות לעיבוד נתונים בסטרימינג ושל צינורות לעיבוד נתונים ברצף (batch processing)

לפני שמריצים בדיקה מלאה מקצה לקצה מול מערך גדול של נתוני בדיקה, כדאי להריץ בדיקה עם אחוז קטן יותר של נתוני הבדיקה (למשל אחוז אחד) ולאמת את ההתנהגות הצפויה בפרק זמן קצר יותר. בדומה לבדיקות שילוב באמצעות Direct Runner, אפשר להשתמש ב-PAssert באובייקטים של PCollection כשמריצים צינורות באמצעות Dataflow Runner. מידע נוסף על PAssert זמין בקטע בדיקות יחידה בדף הזה.

בהתאם לתרחיש השימוש, יכול להיות שיהיה לא מעשי, יקר או מאתגר לאמת פלט גדול מאוד מבדיקות מקצה לקצה. במקרה כזה, אפשר לאמת דוגמאות מייצגות מתוך קבוצת התוצאות של הפלט. לדוגמה, אפשר להשתמש ב-BigQuery כדי לדגום ולהשוות שורות פלט עם מערך נתונים של תוצאות צפויות.

בצינורות סטרימינג, יכול להיות שיהיה קשה לדמות תנאי סטרימינג ריאליסטיים באמצעות נתונים סינתטיים. דרך נפוצה לספק נתונים בזמן אמת לבדיקות מקצה לקצה היא לשלב בדיקות עם מקורות נתונים של ייצור. אם אתם משתמשים ב-Pub/Sub כמקור נתונים, אתם יכולים להפעיל זרם נתונים נפרד לבדיקות מקצה לקצה באמצעות מינויים נוספים לנושאים קיימים. אחר כך אפשר להשוות בין התוצאות של צינורות שונים שצורכים את אותם נתונים. זה שימושי כדי לאמת צינורות מועמדים מול צינורות אחרים לפני הייצור ובמהלך הייצור.

בתרשים הבא מוצג איך השיטה הזו מאפשרת להפעיל במקביל צינור עיבוד נתונים של ייצור וצינור עיבוד נתונים של בדיקה בסביבות פריסה שונות.

הפעלת צינור עיבוד נתונים לבדיקה במקביל לצינור עיבוד נתונים לייצור באמצעות מקור סטרימינג יחיד של Pub/Sub.

בתרשים, שני צינורות הנתונים קוראים מאותו נושא Pub/Sub, אבל הם משתמשים במינויים נפרדים. ההגדרה הזו מאפשרת לשני צינורות העיבוד לעבד את אותם נתונים באופן עצמאי, וכך אפשר להשוות את התוצאות. צינור עיבוד הנתונים לבדיקה משתמש בחשבון שירות נפרד מפרויקט הייצור, ולכן לא משתמש במכסת המינויים ל-Pub/Sub של פרויקט הייצור.

בניגוד לצינורות להעברת נתונים באצווה, צינורות להעברת נתונים בזמן אמת ממשיכים לפעול עד שמבטלים אותם באופן מפורש. בבדיקות מקצה לקצה, צריך להחליט אם להשאיר את צינור הנתונים פועל, אולי עד להפעלת הבדיקה הבאה מקצה לקצה, או לבטל את צינור הנתונים בנקודה שמייצגת את השלמת הבדיקה כדי לבדוק את התוצאות.

ההחלטה הזו מושפעת מסוג נתוני הבדיקה שבהם אתם משתמשים. לדוגמה, אם משתמשים בקבוצה מוגבלת של נתוני בדיקה שמועברים לצינור הנתונים של הסטרימינג, אפשר לבטל את צינור הנתונים אחרי שכל הרכיבים סיימו את העיבוד. לחלופין, אם אתם משתמשים במקור נתונים אמיתי, כמו נושא קיים ב-Pub/Sub שמשמש בסביבת הייצור, או אם אתם יוצרים נתוני בדיקה באופן רציף, כדאי להשאיר את צינורות הבדיקה פועלים למשך תקופה ארוכה יותר. האפשרות השנייה מאפשרת להשוות את ההתנהגות לסביבת הייצור, או אפילו לצינורות בדיקה אחרים.