מעבר מ-App Engine MapReduce ל-Apache Beam ול-Dataflow

המדריך הזה מיועד למשתמשי App Engine MapReduce. במאמר מוסבר איך לבצע העברה משימוש ב-App Engine MapReduce ל-Apache Beam ול-Dataflow.

למה כדאי לעבור

‫App Engine MapReduce הוא מודל תכנות לעיבוד כמויות גדולות של נתונים באופן מקביל ומבוזר. הוא שימושי למשימות גדולות וארוכות שלא ניתן לטפל בהן במסגרת בקשה אחת, כמו:

  • ניתוח יומני אפליקציות
  • צבירה של נתונים קשורים ממקורות חיצוניים
  • טרנספורמציה של נתונים מפורמט אחד לפורמט אחר
  • ייצוא נתונים לניתוח חיצוני

עם זאת, App Engine MapReduce היא ספרייה בקוד פתוח שמתוחזקת על ידי הקהילה, מבוססת על שירותי App Engine ולא נתמכת יותר על ידי Google.

לעומת זאת, Google תומכת באופן מלא ב-Dataflow, והוא מספק פונקציונליות מורחבת בהשוואה ל-App Engine MapReduce.

מקרים של העברה

ריכזנו כאן כמה דוגמאות למקרים שבהם כדאי לעבור מ-App Engine MapReduce ל-Apache Beam ול-Dataflow:

  • אחסון נתוני אפליקציות של מאגר נתונים ב-Datastore במחסן נתונים (data warehouse) של BigQuery לצורך עיבוד אנליטי באמצעות SQL.
  • אפשר להשתמש ב-Dataflow כחלופה ל-App Engine MapReduce לצורך תחזוקה או עדכונים של קבוצת הנתונים שלכם ב-Datastore.
  • גיבוי חלקים ממסד הנתונים של Datastore ב-Cloud Storage.

מהם Dataflow ו-Apache Beam?

‫Dataflow הוא שירות מנוהל להרצת מגוון רחב של דפוסי עיבוד נתונים. ‫Apache Beam הוא מודל תכנות מאוחד שמספק ערכות SDK להגדרת תהליכי עבודה לעיבוד נתונים. שימוש ב-Apache Beam כדי ליצור צינורות עיבוד נתונים מורכבים לנתונים באצווה ולנתוני סטרימינג, ולהריץ אותם ב-Dataflow.

תחילת העבודה עם Dataflow ו-Apache Beam

כדי להתחיל, פועלים לפי המדריך למתחילים הרלוונטי:

יצירה והפעלה של צינור עיבוד נתונים

כשמשתמשים ב-App Engine MapReduce, יוצרים מחלקות לעיבוד נתונים, מוסיפים את ספריית MapReduce, ואחרי שמגדירים את המפרט וההגדרות של העבודה, יוצרים ומתחילים את העבודה בשלב אחד באמצעות השיטה הסטטית start() במחלקת העבודה המתאימה.

במשימות Map, יוצרים את המחלקות Input ו-Output ואת המחלקה Map שמבצעת את המיפוי. במשימות של App Engine MapReduce, יוצרים מחלקות Input ו-Output ומגדירים את המחלקות Mapper ו-Reducer לטרנספורמציות של נתונים.

ב-Apache Beam, התהליך קצת שונה: יוצרים צינור. אתם משתמשים במחברי קלט ופלט כדי לקרוא ולכתוב ממקורות הנתונים וממאגרי הנתונים שלכם. אתם משתמשים בטרנספורמציות נתונים מוגדרות מראש (או כותבים טרנספורמציות משלכם) כדי להטמיע את טרנספורמציות הנתונים. אחרי שהקוד מוכן, פורסים את צינור הנתונים לשירות Dataflow.

המרת משימות MapReduce ב-App Engine לצינורות Apache Beam

בטבלה הבאה מוצגים השלבים המקבילים ב-Apache Beam לשלבים map,‏ shuffle ו-reduce של מודל App Engine MapReduce.

Java

‫App Engine MapReduce מקבילה ל-Apache Beam
מפה MapElements<InputT,OutputT>
ערבוב GroupByKey<K,V>
הפחתה ‪Combine.GroupedValues<K,InputT,OutputT>

מקובל להשתמש ב-Combine.PerKey<K,InputT,OutputT> במקום ב-GroupByKey וב-CombineValues.

Python

‫App Engine MapReduce מקבילה ל-Apache Beam
מפה beam.Map
ערבוב beam.GroupByKey
הפחתה beam.CombineValues

מקובל להשתמש ב-beam.CombinePerKey במקום ב-beam.GroupByKey וב-beam.CombineValues.

Go

‫App Engine MapReduce מקבילה ל-Apache Beam
מפה beam.ParDo
ערבוב beam.GroupByKey
הפחתה beam.Combine


קוד לדוגמה הבא מדגים איך מטמיעים את מודל MapReduce של App Engine ב-Apache Beam:

Java

נלקח מ-MinimalWordCount.java:
p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/*"))

 // Apply a ParDo that returns a PCollection, where each element is an
 // individual word in Shakespeare's texts.
 .apply("ExtractWords", ParDo.of(new DoFn<String, String>() {
                   @ProcessElement
                   public void processElement(ProcessContext c) {
                     for (String word : c.element().split(ExampleUtils.TOKENIZER_PATTERN)) {
                       if (!word.isEmpty()) {
                         c.output(word);
                       }
                     }
                   }
                 }))

 // Apply the Count transform that returns a new PCollection of key/value pairs,
 // where each key represents a unique word in the text.
 .apply(Count.perElement())

 // Apply a MapElements transform that formats our PCollection of word counts
 // into a printable string, suitable for writing to an output file.
 .apply("FormatResults", MapElements.via(new SimpleFunction<KV<String, Long>, String>() {
                   @Override
                   public String apply(KV<String, Long> input) {
                     return input.getKey() + ": " + input.getValue();
                   }
                 }))

 // Apply a write transform that writes the contents of the PCollection to a
 // series of text files.
 .apply(TextIO.write().to("wordcounts"));

Python

נלקח מהקובץ wordcount_minimal.py:
# Read the text file[pattern] into a PCollection.
lines = p | ReadFromText(known_args.input)

# Count the occurrences of each word.
counts = (
    lines
    | 'Split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
                  .with_output_types(unicode))
    | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
    | 'GroupAndSum' >> beam.CombinePerKey(sum))

# Format the counts into a PCollection of strings.
output = counts | 'Format' >> beam.Map(lambda (w, c): '%s: %s' % (w, c))

# Write the output using a "Write" transform that has side effects.
# pylint: disable=expression-not-assigned
output | WriteToText(known_args.output)

Go

נלקח מ-minimal_wordcount.go:
// beam.Init() is an initialization hook that must be called on startup.
beam.Init()

// Create the Pipeline object and root scope.
p := beam.NewPipeline()
s := p.Root()
lines := textio.Read(s, "gs://apache-beam-samples/shakespeare/kinglear.txt")

// Invoke a ParDo transform on our PCollection of text lines.
// This ParDo invokes a DoFn (defined in-line) on each element that
// tokenizes the text line into individual words. The ParDo returns a
// PCollection of type string, where each element is an individual word in
// Shakespeare's collected texts.
words := beam.ParDo(s, func(line string, emit func(string)) {
    for _, word := range wordRE.FindAllString(line, -1) {
        emit(word)
    }
}, lines)

// Invoke the stats.Count transform on our PCollection of
// individual words. The Count transform returns a new PCollection of
// key/value pairs, where each key represents a unique word in the text.
// The associated value is the occurrence count for that word.
counted := stats.Count(s, words)

// Use a ParDo to format our PCollection of word counts into a printable
// string, suitable for writing to an output file. When each element
// produces exactly one element, the DoFn can simply return it.
formatted := beam.ParDo(s, func(w string, c int) string {
    return fmt.Sprintf("%s: %v", w, c)
}, counted)

// Invoke textio.Write at the end of the pipeline to write
// the contents of a PCollection (in this case, our PCollection of
// formatted strings) to a text file.
textio.Write(s, "wordcounts.txt", formatted)

יתרונות נוספים של Apache Beam ו-Dataflow

אם תבחרו להעביר את משימות MapReduce של App Engine לצינורות של Apache Beam, תוכלו ליהנות מכמה תכונות ש-Apache Beam ו-Dataflow מציעות.

תזמון משימות ב-Cloud Dataflow

אם אתם מכירים את תורי המשימות של App Engine, אתם יכולים לתזמן את המשימות החוזרות באמצעות Cron. בדוגמה הזו מוסבר איך להשתמש ב-App Engine Cron כדי לתזמן את צינורות עיבוד הנתונים של Apache Beam.

יש כמה דרכים נוספות לתזמן את ההפעלה של צינור הנתונים. אתם יכולים:

מעקב אחרי משימות ב-Cloud Dataflow

כדי לעקוב אחרי משימות MapReduce ב-App Engine, אתם צריכים כתובת URL שמתארחת ב-appspot.com.

כשמריצים את צינורות הנתונים באמצעות שירות Dataflow המנוהל, אפשר לעקוב אחרי צינורות הנתונים באמצעות ממשק המשתמש לניטור של Dataflow באינטרנט. אפשר גם להשתמש ב-Cloud Monitoring כדי לקבל מידע נוסף על צינורות הנתונים.

קריאה וכתיבה

ב-Apache Beam, ל-Readers and Writers של App Engine MapReduce קוראים מקורות נתונים ויעדים, או מחברי קלט/פלט.

ל-Apache Beam יש הרבה מחברי קלט/פלט למספר שירותים של Google Cloud, כמו Bigtable,‏ BigQuery,‏ Datastore,‏ Cloud SQL ועוד. יש גם מחברי קלט/פלט שנוצרו על ידי תורמים ל-Apache Beam עבור שירותים שאינם של Google, כמו Apache Cassandra ו-MongoDB.

המאמרים הבאים