יצירת תבניות Dataflow קלאסיות

במאמר הזה מוסבר איך ליצור תבנית קלאסית בהתאמה אישית מקוד צינור הנתונים של Dataflow. תבניות קלאסיות מאגדות צינורות Dataflow קיימים כדי ליצור תבניות לשימוש חוזר שאפשר להתאים אישית לכל עבודה על ידי שינוי פרמטרים ספציפיים של צינור. במקום לכתוב את התבנית, משתמשים בפקודה כדי ליצור את התבנית מצינור קיים.

בהמשך מופיעה סקירה כללית קצרה של התהליך. פרטים על התהליך הזה מופיעים בקטעים הבאים.

  1. בקוד של צינור עיבוד הנתונים, משתמשים בממשק ValueProvider לכל האפשרויות של צינור עיבוד הנתונים שרוצים להגדיר או להשתמש בהן בזמן הריצה. משתמשים באובייקטים DoFn שמקבלים פרמטרים של זמן ריצה.
  2. אפשר להרחיב את התבנית עם מטא-נתונים נוספים כדי שהפרמטרים המותאמים אישית יאומתו כשהתבנית הקלאסית תופעל. דוגמאות למטא-נתונים כאלה כוללות את השם של התבנית הקלאסית המותאמת אישית ואת הפרמטרים האופציונליים.
  3. בודקים אם מחברי ה-I/O של צינור הנתונים תומכים באובייקטים מסוג ValueProvider ומבצעים שינויים לפי הצורך.
  4. יוצרים תבנית קלאסית בהתאמה אישית ומכינים אותה להשקה.
  5. מריצים את התבנית הקלאסית המותאמת אישית.

כדי לקרוא על הסוגים השונים של תבניות Dataflow, על היתרונות שלהן ועל המקרים שבהם כדאי לבחור תבנית קלאסית, אפשר לעיין במאמר תבניות Dataflow.

ההרשאות שנדרשות להפעלת תבנית מותאמת אישית

ההרשאות שצריך כדי להריץ את התבנית הקלאסית של Dataflow תלויות במקום שבו מריצים את התבנית, ובשאלה אם מקור הנתונים ויעד הנתונים של צינור העיבוד נמצאים בפרויקט אחר.

מידע נוסף על הפעלת צינורות Dataflow באופן מקומי או באמצעות Google Cloud Platform זמין במאמר אבטחה והרשאות ב-Dataflow.

רשימת התפקידים וההרשאות ב-Dataflow מופיעה במאמר בקרת גישה ב-Dataflow.

מגבלות

  • אפשרות הצינור הבאה לא נתמכת בתבניות קלאסיות. אם אתם צריכים לשלוט במספר השרשורים של רתמת העובדים, אתם יכולים להשתמש בתבניות Flex.

    Java

    numberOfWorkerHarnessThreads
      

    Python

    number_of_worker_harness_threads
      
  • ה-runner של Dataflow לא תומך באפשרויות ValueProvider של פרמטרים של נושאים ומינויים ב-Pub/Sub. אם אתם צריכים אפשרויות של Pub/Sub בפרמטרים של זמן הריצה, אתם צריכים להשתמש בתבניות Flex.

מידע על פרמטרים של זמן ריצה וממשק ValueProvider

ממשק ValueProvider מאפשר לצינורות עיבוד נתונים לקבל פרמטרים של זמן ריצה. ‫Apache Beam מספקת שלושה סוגים של אובייקטים ValueProvider.

שם תיאור
RuntimeValueProvider

RuntimeValueProvider הוא סוג ValueProvider שמוגדר כברירת מחדל. ‫RuntimeValueProvider מאפשרת לצינור להסכים לקבל ערך שזמין רק במהלך ההפעלה של צינור. הערך לא זמין במהלך בניית צינור הנתונים, ולכן אי אפשר להשתמש בו כדי לשנות את תרשים זרימת העבודה של צינור הנתונים.

אפשר להשתמש בפונקציה isAccessible() כדי לבדוק אם הערך של ValueProvider זמין. אם קוראים לפונקציה get() לפני הפעלת צינור הנתונים, Apache Beam מחזירה שגיאה:
Value only available at runtime, but accessed from a non-runtime context.

משתמשים בערך RuntimeValueProvider כשלא יודעים את הערך מראש. כדי לשנות את ערכי הפרמטרים בזמן הריצה, אל תגדירו ערכים לפרמטרים בתבנית. מגדירים את הערכים של הפרמטרים כשיוצרים משימות מהתבנית.

StaticValueProvider

StaticValueProvider מאפשר לכם לספק ערך סטטי לצינור המכירות. הערך זמין במהלך בניית צינור הנתונים, כך שאפשר להשתמש בו כדי לשנות את גרף זרימת העבודה של צינור הנתונים.

משתמשים בערך StaticValueProvider כשמכירים את הערך מראש. דוגמאות מופיעות בקטע StaticValueProvider.

NestedValueProvider

NestedValueProvider מאפשר לחשב ערך מאובייקט ValueProvider אחר. ‫NestedValueProvider עוטף ValueProvider, והסוג של ValueProvider העטוף קובע אם אפשר לגשת לערך במהלך בניית צינור הנתונים.

משתמשים ב-NestedValueProvider כשרוצים להשתמש בערך כדי לחשב ערך אחר בזמן הריצה. דוגמאות מפורטות זמינות בקטע NestedValueProvider.

שימוש בפרמטרים של זמן ריצה בקוד של צינור עיבוד הנתונים

בקטע הזה מוסבר איך להשתמש ב-ValueProvider,‏ StaticValueProvider ו-NestedValueProvider.

שימוש ב-ValueProvider באפשרויות של צינור עיבוד הנתונים

משתמשים ב-ValueProvider לכל האפשרויות של צינורות הנתונים שרוצים להגדיר או להשתמש בהן בזמן הריצה.

לדוגמה, קטע הקוד הבא של WordCount לא תומך בפרמטרים של זמן ריצה. הקוד מוסיף אפשרות לקובץ קלט, יוצר צינור ומקריא שורות מקובץ הקלט:

Java

  public interface WordCountOptions extends PipelineOptions {
    @Description("Path of the file to read from")
    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
    String getInputFile();
    void setInputFile(String value);
  }

  public static void main(String[] args) {
    WordCountOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WordCountOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply("ReadLines", TextIO.read().from(options.getInputFile()));
    ...

Python

  class WordcountOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      parser.add_argument(
          '--input',
          default='gs://dataflow-samples/shakespeare/kinglear.txt',
          help='Path of the file to read from')
      parser.add_argument(
          '--output',
          required=True,
          help='Output file to write results to.')
  pipeline_options = PipelineOptions(['--output', 'some/output_path'])
  p = beam.Pipeline(options=pipeline_options)

  wordcount_options = pipeline_options.view_as(WordcountOptions)
  lines = p | 'read' >> ReadFromText(wordcount_options.input)

כדי להוסיף תמיכה בפרמטרים של זמן ריצה, משנים את האפשרות של קובץ הקלט לשימוש ב-ValueProvider.

Java

משתמשים ב-ValueProvider<String> במקום ב-String כדי לציין את סוג האפשרות של קובץ הקלט.

  public interface WordCountOptions extends PipelineOptions {
    @Description("Path of the file to read from")
    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
    ValueProvider<String> getInputFile();
    void setInputFile(ValueProvider<String> value);
  }

  public static void main(String[] args) {
    WordCountOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WordCountOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply("ReadLines", TextIO.read().from(options.getInputFile()));
    ...

Python

החלפת add_argument ב-add_value_provider_argument.

 class WordcountOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      # Use add_value_provider_argument for arguments to be templatable
      # Use add_argument as usual for non-templatable arguments
      parser.add_value_provider_argument(
          '--input',
          default='gs://dataflow-samples/shakespeare/kinglear.txt',
          help='Path of the file to read from')
      parser.add_argument(
          '--output',
          required=True,
          help='Output file to write results to.')
  pipeline_options = PipelineOptions(['--output', 'some/output_path'])
  p = beam.Pipeline(options=pipeline_options)

  wordcount_options = pipeline_options.view_as(WordcountOptions)
  lines = p | 'read' >> ReadFromText(wordcount_options.input)

שימוש ב-ValueProvider בפונקציות

כדי להשתמש בערכי פרמטרים בזמן ריצה בפונקציות שלכם, צריך לעדכן את הפונקציות כך שישתמשו בפרמטרים ValueProvider.

הדוגמה הבאה מכילה אפשרות של מספר שלם ValueProvider ופונקציה פשוטה שמוסיפה מספר שלם. הפונקציה תלויה במספר השלם ValueProvider. במהלך ההרצה, צינור הנתונים מחיל את MySumFn על כל מספר שלם ב-PCollection שמכיל את [1, 2, 3]. אם הערך של זמן הריצה הוא 10, התוצאה של PCollection היא [11, 12, 13].

Java

  public interface SumIntOptions extends PipelineOptions {
      // New runtime parameter, specified by the --int
      // option at runtime.
      ValueProvider<Integer> getInt();
      void setInt(ValueProvider<Integer> value);
  }

  class MySumFn extends DoFn<Integer, Integer> {
      ValueProvider<Integer> mySumInteger;

      MySumFn(ValueProvider<Integer> sumInt) {
          // Store the value provider
          this.mySumInteger = sumInt;
      }

      @ProcessElement
      public void processElement(ProcessContext c) {
         // Get the value of the value provider and add it to
         // the element's value.
         c.output(c.element() + mySumInteger.get());
      }
  }

  public static void main(String[] args) {
    SumIntOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(SumIntOptions.class);

    Pipeline p = Pipeline.create(options);

    p.apply(Create.of(1, 2, 3))
      // Get the value provider and pass it to MySumFn
     .apply(ParDo.of(new MySumFn(options.getInt())))
     .apply("ToString", MapElements.into(TypeDescriptors.strings()).via(x -> x.toString()))
     .apply("OutputNums", TextIO.write().to("numvalues"));

    p.run();
  }

Python

  import apache_beam as beam
  from apache_beam.options.pipeline_options import PipelineOptions
  from apache_beam.options.value_provider import StaticValueProvider
  from apache_beam.io import WriteToText

  class UserOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      parser.add_value_provider_argument('--templated_int', type=int)

  class MySumFn(beam.DoFn):
    def __init__(self, templated_int):
      self.templated_int = templated_int

    def process(self, an_int):
      yield self.templated_int.get() + an_int

  pipeline_options = PipelineOptions()
  p = beam.Pipeline(options=pipeline_options)

  user_options = pipeline_options.view_as(UserOptions)
  sum = (p
         | 'ReadCollection' >> beam.io.ReadFromText(
             'gs://some/integer_collection')
         | 'StringToInt' >> beam.Map(lambda w: int(w))
         | 'AddGivenInt' >> beam.ParDo(MySumFn(user_options.templated_int))
         | 'WriteResultingCollection' >> WriteToText('some/output_path'))

שימוש ב-StaticValueProvider

כדי לספק ערך סטטי לצינור, משתמשים ב-StaticValueProvider.

בדוגמה הזו נעשה שימוש ב-MySumFn, שהוא DoFn שמקבל ValueProvider<Integer>. אם אתם יודעים מראש מה הערך של הפרמטר, אתם יכולים להשתמש ב-StaticValueProvider כדי לציין את הערך הסטטי כ-ValueProvider.

Java

הקוד הזה מקבל את הערך בזמן הריצה של צינור העיבוד:

  .apply(ParDo.of(new MySumFn(options.getInt())))

במקום זאת, אפשר להשתמש ב-StaticValueProvider עם ערך סטטי:

  .apply(ParDo.of(new MySumFn(StaticValueProvider.of(10))))

Python

הקוד הזה מקבל את הערך בזמן הריצה של צינור העיבוד:

  beam.ParDo(MySumFn(user_options.templated_int))

במקום זאת, אפשר להשתמש ב-StaticValueProvider עם ערך סטטי:

  beam.ParDo(MySumFn(StaticValueProvider(int,10)))

אפשר להשתמש ב-StaticValueProvider גם כשמטמיעים מודול קלט/פלט שתומך גם בפרמטרים רגילים וגם בפרמטרים של זמן ריצה. StaticValueProvider מצמצם את הכפילות בקוד שנובעת מהטמעה של שתי שיטות דומות.

Java

קוד המקור של הדוגמה הזו הוא מ-TextIO.java ב-GitHub של Apache Beam.

  // Create a StaticValueProvider<String> from a regular String parameter
  // value, and then call .from() with this new StaticValueProvider.
  public Read from(String filepattern) {
    checkNotNull(filepattern, "Filepattern cannot be empty.");
    return from(StaticValueProvider.of(filepattern));
  }

  // This method takes a ValueProvider parameter.
  public Read from(ValueProvider<String> filepattern) {
    checkNotNull(filepattern, "Filepattern cannot be empty.");
    return toBuilder().setFilepattern(filepattern).build();
  }

Python

בדוגמה הזו, יש בנאי יחיד שמקבל ארגומנט מסוג string או ValueProvider. אם הארגומנט הוא string, הוא מומר ל-StaticValueProvider.

class Read():

  def __init__(self, filepattern):
    if isinstance(filepattern, str):
      # Create a StaticValueProvider from a regular string parameter
      filepattern = StaticValueProvider(str, filepattern)

    self.filepattern = filepattern

שימוש ב-NestedStaticValueProvider

כדי לחשב ערך מאובייקט ValueProvider אחר, משתמשים בפונקציה NestedValueProvider.

הפונקציה NestedValueProvider מקבלת כקלט את ValueProvider ואת SerializableFunction המתרגם. כשקוראים לפונקציה .get() ב-NestedValueProvider, המתרגם יוצר ערך חדש על סמך הערך ValueProvider. התרגום הזה מאפשר לכם להשתמש בערך ValueProvider כדי ליצור את הערך הסופי שאתם רוצים.

בדוגמה הבאה, המשתמש מספק את שם הקובץ file.txt. ההמרה מוסיפה את הנתיב gs://directory_name/ לפני שם הקובץ. התקשרות אל .get() מחזירה gs://directory_name/file.txt.

Java

  public interface WriteIntsOptions extends PipelineOptions {
      // New runtime parameter, specified by the --fileName
      // option at runtime.
      ValueProvider<String> getFileName();
      void setFileName(ValueProvider<String> value);
  }

  public static void main(String[] args) {
     WriteIntsOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WriteIntsOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply(Create.of(1, 2, 3))
     // Write to the computed complete file path.
     .apply("OutputNums", TextIO.write().to(NestedValueProvider.of(
        options.getFileName(),
        new SerializableFunction<String, String>() {
          @Override
          public String apply(String file) {
            return "gs://directoryname/" + file;
          }
        })));

    p.run();
  }

שימוש במטא-נתונים בקוד של צינור עיבוד הנתונים

אפשר להרחיב את התבנית עם מטא-נתונים נוספים כדי שהפרמטרים המותאמים אישית יעברו אימות כשהתבנית תופעל. אם רוצים ליצור מטא-נתונים לתבנית, פועלים לפי השלבים הבאים:

  1. יוצרים קובץ בפורמט JSON בשם TEMPLATE_NAME_metadata באמצעות הפרמטרים שבקטע פרמטרים של מטא-נתונים והפורמט שבקטע דוגמה לקובץ מטא-נתונים. מחליפים את TEMPLATE_NAME בשם התבנית.

    מוודאים שלקובץ המטא-נתונים אין סיומת שם קובץ. לדוגמה, אם שם התבנית הוא myTemplate, קובץ המטא-נתונים שלה חייב להיות myTemplate_metadata.

  2. מאחסנים את קובץ המטא-נתונים ב-Cloud Storage באותה תיקייה שבה נמצא התבנית.

פרמטרים של מטא-נתונים

מפתח הפרמטר חובה תיאור הערך
name כן השם של התבנית.
description לא פסקה קצרה של טקסט שמתארת את התבנית.
streaming לא אם הערך הוא true, התבנית הזו תומכת בסטרימינג. ערך ברירת המחדל הוא false.
supportsAtLeastOnce לא אם true, התבנית הזו תומכת בעיבוד של לפחות פעם אחת. ערך ברירת המחדל הוא false. מגדירים את הפרמטר הזה לערך true אם התבנית מיועדת לעבודה עם מצב סטרימינג של לפחות פעם אחת.
supportsExactlyOnce לא אם true, התבנית הזו תומכת בזמן עיבוד של פעם אחת בדיוק. ערך ברירת המחדל הוא true.
defaultStreamingMode לא מצב הסטרימינג שמוגדר כברירת מחדל בתבניות שתומכות גם במצב 'לפחות פעם אחת' וגם במצב 'בדיוק פעם אחת'. משתמשים באחד מהערכים הבאים: "AT_LEAST_ONCE", "EXACTLY_ONCE". אם לא מציינים ערך, מצב הסטרימינג שמוגדר כברירת מחדל הוא בדיוק פעם אחת.
parameters לא מערך של פרמטרים נוספים שהתבנית משתמשת בהם. מערך ריק משמש כברירת מחדל.
name כן שם הפרמטר שמשמש בתבנית.
label כן מחרוזת שקריאה לאנשים ומשמשת ב Google Cloud מסוף לתווית של הפרמטר.
helpText כן פסקת טקסט קצרה שמתארת את הפרמטר.
isOptional לא false אם הפרמטר נדרש ו-true אם הפרמטר אופציונלי. אם לא מגדירים ערך, ברירת המחדל של isOptional היא false. אם לא כוללים את מפתח הפרמטר הזה במטא-נתונים, המטא-נתונים הופכים לפרמטר חובה.
regexes לא מערך של ביטויים רגולריים של POSIX-egrep בצורת מחרוזת שמשמש לאימות הערך של הפרמטר. לדוגמה, ["^[a-zA-Z][a-zA-Z0-9]+"] הוא ביטוי רגולרי יחיד שבודק שהערך מתחיל באות ואחריו יש תו אחד או יותר. מערך ריק משמש כברירת מחדל.

קובץ מטא-נתונים לדוגמה

Java

שירות Dataflow משתמש במטא-נתונים הבאים כדי לאמת את הפרמטרים המותאמים אישית של תבנית WordCount:

{
  "description": "An example pipeline that counts words in the input file.",
  "name": "Word Count",
  "streaming": false,
  "parameters": [
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "inputFile",
      "helpText": "Path of the file pattern glob to read from - for example, gs://dataflow-samples/shakespeare/kinglear.txt",
      "label": "Input Cloud Storage file(s)"
    },
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "output",
      "helpText": "Path and filename prefix for writing output files - for example, gs://MyBucket/counts",
      "label": "Output Cloud Storage file(s)"
    }
  ]
}

Python

שירות Dataflow משתמש במטא-נתונים הבאים כדי לאמת את הפרמטרים המותאמים אישית של תבנית WordCount:

{
  "description": "An example pipeline that counts words in the input file.",
  "name": "Word Count",
  "streaming": false,
  "parameters": [
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "input",
      "helpText": "Path of the file pattern glob to read from - for example, gs://dataflow-samples/shakespeare/kinglear.txt",
      "label": "Input Cloud Storage file(s)"
    },
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "output",
      "helpText": "Path and filename prefix for writing output files - for example, gs://MyBucket/counts",
      "label": "Output Cloud Storage file(s)"
    }
  ]
}

אפשר להוריד קבצים של מטא-נתונים לתבניות ש-Google מספקת מספריית התבניות של Dataflow.

מחברים נתמכים של קלט/פלט של צינורות עיבוד נתונים ו-ValueProvider

Java

חלק מהמחברים של קלט/פלט מכילים שיטות שמקבלות אובייקטים מסוג ValueProvider. כדי לבדוק אם מחבר ושיטה ספציפיים נתמכים, אפשר לעיין בתיעוד העזר של ה-API של מחבר ה-I/O. לשיטות הנתמכות יש עומס יתר עם ValueProvider. אם לשיטה אין עומס יתר, השיטה לא תומכת בפרמטרים של זמן ריצה. מחברי הקלט/פלט הבאים נתמכים לפחות באופן חלקי על ידי ValueProvider:

  • קובצי IOs: TextIO, AvroIO, FileIO, TFRecordIO, XmlIO
  • BigQueryIO*
  • BigtableIO (נדרשת גרסת SDK‏ 2.3.0 ואילך)
  • PubSubIO
  • SpannerIO

Python

חלק מהמחברים של קלט/פלט מכילים שיטות שמקבלות אובייקטים מסוג ValueProvider. כדי לבדוק אם מחברי I/O נתמכים ומהן השיטות שלהם, אפשר לעיין במאמרי העזרה של ה-API של המחבר. מחברי הקלט/פלט הבאים מקבלים פרמטרים של זמן ריצה:

  • קובצי קלט/פלט: textio, avroio, tfrecordio

יצירה של תבנית מותאמת אישית והעברה שלה לשלב ההכנה

אחרי שכותבים את צינור הנתונים, צריך ליצור את קובץ התבנית ולהכין אותו להעלאה. כשיוצרים תבנית ומעבירים אותה לשלב ההכנה, מיקום ההכנה כולל קבצים נוספים שנדרשים להפעלת התבנית. אם מוחקים את מיקום ההעברה הזמנית, התבנית לא תפעל. המשימה של Dataflow לא מופעלת מיד אחרי שמעבירים את התבנית לאזור זמני. כדי להריץ משימת Dataflow מותאמת אישית שמבוססת על תבנית, אפשר להשתמש במסוףGoogle Cloud , ב-API בארכיטקטורת REST של Dataflow או ב-ה-CLI של gcloud.

בדוגמה הבאה אפשר לראות איך מעבירים קובץ תבנית לשלב ההכנה:

Java

הפקודה הזו של Maven יוצרת תבנית ומעבירה אותה למיקום ב-Cloud Storage שצוין באמצעות --templateLocation.

    mvn compile exec:java \
     -Dexec.mainClass=com.example.myclass \
     -Dexec.args="--runner=DataflowRunner \
                  --project=PROJECT_ID \
                  --stagingLocation=gs://BUCKET_NAME/staging \
                  --templateLocation=gs://BUCKET_NAME/templates/TEMPLATE_NAME \
                  --region=REGION" \
     -P dataflow-runner
    

מוודאים שהנתיב של templateLocation נכון. מחליפים את מה שכתוב בשדות הבאים:

  • com.example.myclass: מחלקת Java
  • PROJECT_ID: מזהה הפרויקט
  • BUCKET_NAME: שם הקטגוריה של Cloud Storage
  • TEMPLATE_NAME: השם של התבנית
  • REGION: האזור שבו רוצים לפרוס את משימת Dataflow

Python

הפקודה הזו של Python יוצרת תבנית ומעבירה אותה למיקום ב-Cloud Storage שצוין באמצעות --template_location.

  python -m examples.mymodule \
    --runner DataflowRunner \
    --project PROJECT_ID \
    --staging_location gs://BUCKET_NAME/staging \
    --template_location gs://BUCKET_NAME/templates/TEMPLATE_NAME \
    --region REGION

מוודאים שהנתיב של template_location נכון. מחליפים את מה שכתוב בשדות הבאים:

  • examples.mymodule: מודול Python
  • PROJECT_ID: מזהה הפרויקט
  • BUCKET_NAME: שם הקטגוריה של Cloud Storage
  • TEMPLATE_NAME: השם של התבנית
  • REGION: האזור שבו רוצים לפרוס את משימת Dataflow

אחרי שיוצרים את התבנית ומכינים אותה להפעלה, השלב הבא הוא להפעיל את התבנית.