במאמר הזה מוסבר איך ליצור תבנית קלאסית בהתאמה אישית מקוד צינור הנתונים של Dataflow. תבניות קלאסיות מאגדות צינורות Dataflow קיימים כדי ליצור תבניות לשימוש חוזר שאפשר להתאים אישית לכל עבודה על ידי שינוי פרמטרים ספציפיים של צינור. במקום לכתוב את התבנית, משתמשים בפקודה כדי ליצור את התבנית מצינור קיים.
בהמשך מופיעה סקירה כללית קצרה של התהליך. פרטים על התהליך הזה מופיעים בקטעים הבאים.
- בקוד של צינור עיבוד הנתונים, משתמשים בממשק
ValueProviderלכל האפשרויות של צינור עיבוד הנתונים שרוצים להגדיר או להשתמש בהן בזמן הריצה. משתמשים באובייקטיםDoFnשמקבלים פרמטרים של זמן ריצה. - אפשר להרחיב את התבנית עם מטא-נתונים נוספים כדי שהפרמטרים המותאמים אישית יאומתו כשהתבנית הקלאסית תופעל. דוגמאות למטא-נתונים כאלה כוללות את השם של התבנית הקלאסית המותאמת אישית ואת הפרמטרים האופציונליים.
- בודקים אם מחברי ה-I/O של צינור הנתונים תומכים באובייקטים מסוג
ValueProviderומבצעים שינויים לפי הצורך. - יוצרים תבנית קלאסית בהתאמה אישית ומכינים אותה להשקה.
- מריצים את התבנית הקלאסית המותאמת אישית.
כדי לקרוא על הסוגים השונים של תבניות Dataflow, על היתרונות שלהן ועל המקרים שבהם כדאי לבחור תבנית קלאסית, אפשר לעיין במאמר תבניות Dataflow.
ההרשאות שנדרשות להפעלת תבנית מותאמת אישית
ההרשאות שצריך כדי להריץ את התבנית הקלאסית של Dataflow תלויות במקום שבו מריצים את התבנית, ובשאלה אם מקור הנתונים ויעד הנתונים של צינור העיבוד נמצאים בפרויקט אחר.
מידע נוסף על הפעלת צינורות Dataflow באופן מקומי או באמצעות Google Cloud Platform זמין במאמר אבטחה והרשאות ב-Dataflow.
רשימת התפקידים וההרשאות ב-Dataflow מופיעה במאמר בקרת גישה ב-Dataflow.
מגבלות
- אפשרות הצינור הבאה לא נתמכת בתבניות קלאסיות. אם אתם צריכים לשלוט במספר השרשורים של רתמת העובדים, אתם יכולים להשתמש בתבניות Flex.
Java
numberOfWorkerHarnessThreads
Python
number_of_worker_harness_threads - ה-runner של Dataflow לא תומך באפשרויות
ValueProviderשל פרמטרים של נושאים ומינויים ב-Pub/Sub. אם אתם צריכים אפשרויות של Pub/Sub בפרמטרים של זמן הריצה, אתם צריכים להשתמש בתבניות Flex.
מידע על פרמטרים של זמן ריצה וממשק ValueProvider
ממשק ValueProvider מאפשר לצינורות עיבוד נתונים לקבל פרמטרים של זמן ריצה. Apache Beam מספקת שלושה סוגים של אובייקטים ValueProvider.
| שם | תיאור |
|---|---|
RuntimeValueProvider |
אפשר להשתמש בפונקציה משתמשים בערך |
StaticValueProvider |
משתמשים בערך |
NestedValueProvider |
משתמשים ב- |
שימוש בפרמטרים של זמן ריצה בקוד של צינור עיבוד הנתונים
בקטע הזה מוסבר איך להשתמש ב-ValueProvider, StaticValueProvider ו-NestedValueProvider.
שימוש ב-ValueProvider באפשרויות של צינור עיבוד הנתונים
משתמשים ב-ValueProvider לכל האפשרויות של צינורות הנתונים שרוצים להגדיר או להשתמש בהן בזמן הריצה.
לדוגמה, קטע הקוד הבא של WordCount לא תומך בפרמטרים של זמן ריצה. הקוד מוסיף אפשרות לקובץ קלט, יוצר צינור ומקריא שורות מקובץ הקלט:
Java
public interface WordCountOptions extends PipelineOptions { @Description("Path of the file to read from") @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt") String getInputFile(); void setInputFile(String value); } public static void main(String[] args) { WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(WordCountOptions.class); Pipeline p = Pipeline.create(options); p.apply("ReadLines", TextIO.read().from(options.getInputFile())); ...
Python
class WordcountOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): parser.add_argument( '--input', default='gs://dataflow-samples/shakespeare/kinglear.txt', help='Path of the file to read from') parser.add_argument( '--output', required=True, help='Output file to write results to.') pipeline_options = PipelineOptions(['--output', 'some/output_path']) p = beam.Pipeline(options=pipeline_options) wordcount_options = pipeline_options.view_as(WordcountOptions) lines = p | 'read' >> ReadFromText(wordcount_options.input)
כדי להוסיף תמיכה בפרמטרים של זמן ריצה, משנים את האפשרות של קובץ הקלט לשימוש ב-ValueProvider.
Java
משתמשים ב-ValueProvider<String> במקום ב-String
כדי לציין את סוג האפשרות של קובץ הקלט.
public interface WordCountOptions extends PipelineOptions { @Description("Path of the file to read from") @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt") ValueProvider<String> getInputFile(); void setInputFile(ValueProvider<String> value); } public static void main(String[] args) { WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(WordCountOptions.class); Pipeline p = Pipeline.create(options); p.apply("ReadLines", TextIO.read().from(options.getInputFile())); ...
Python
החלפת add_argument ב-add_value_provider_argument.
class WordcountOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): # Use add_value_provider_argument for arguments to be templatable # Use add_argument as usual for non-templatable arguments parser.add_value_provider_argument( '--input', default='gs://dataflow-samples/shakespeare/kinglear.txt', help='Path of the file to read from') parser.add_argument( '--output', required=True, help='Output file to write results to.') pipeline_options = PipelineOptions(['--output', 'some/output_path']) p = beam.Pipeline(options=pipeline_options) wordcount_options = pipeline_options.view_as(WordcountOptions) lines = p | 'read' >> ReadFromText(wordcount_options.input)
שימוש ב-ValueProvider בפונקציות
כדי להשתמש בערכי פרמטרים בזמן ריצה בפונקציות שלכם, צריך לעדכן את הפונקציות כך שישתמשו בפרמטרים ValueProvider.
הדוגמה הבאה מכילה אפשרות של מספר שלם ValueProvider ופונקציה פשוטה שמוסיפה מספר שלם. הפונקציה תלויה במספר השלם ValueProvider. במהלך ההרצה, צינור הנתונים
מחיל את MySumFn על כל מספר שלם ב-PCollection
שמכיל את [1, 2, 3]. אם הערך של זמן הריצה הוא 10, התוצאה של PCollection היא [11, 12, 13].
Java
public interface SumIntOptions extends PipelineOptions { // New runtime parameter, specified by the --int // option at runtime. ValueProvider<Integer> getInt(); void setInt(ValueProvider<Integer> value); } class MySumFn extends DoFn<Integer, Integer> { ValueProvider<Integer> mySumInteger; MySumFn(ValueProvider<Integer> sumInt) { // Store the value provider this.mySumInteger = sumInt; } @ProcessElement public void processElement(ProcessContext c) { // Get the value of the value provider and add it to // the element's value. c.output(c.element() + mySumInteger.get()); } } public static void main(String[] args) { SumIntOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(SumIntOptions.class); Pipeline p = Pipeline.create(options); p.apply(Create.of(1, 2, 3)) // Get the value provider and pass it to MySumFn .apply(ParDo.of(new MySumFn(options.getInt()))) .apply("ToString", MapElements.into(TypeDescriptors.strings()).via(x -> x.toString())) .apply("OutputNums", TextIO.write().to("numvalues")); p.run(); }
Python
import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.value_provider import StaticValueProvider from apache_beam.io import WriteToText class UserOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): parser.add_value_provider_argument('--templated_int', type=int) class MySumFn(beam.DoFn): def __init__(self, templated_int): self.templated_int = templated_int def process(self, an_int): yield self.templated_int.get() + an_int pipeline_options = PipelineOptions() p = beam.Pipeline(options=pipeline_options) user_options = pipeline_options.view_as(UserOptions) sum = (p | 'ReadCollection' >> beam.io.ReadFromText( 'gs://some/integer_collection') | 'StringToInt' >> beam.Map(lambda w: int(w)) | 'AddGivenInt' >> beam.ParDo(MySumFn(user_options.templated_int)) | 'WriteResultingCollection' >> WriteToText('some/output_path'))
שימוש ב-StaticValueProvider
כדי לספק ערך סטטי לצינור, משתמשים ב-StaticValueProvider.
בדוגמה הזו נעשה שימוש ב-MySumFn, שהוא DoFn
שמקבל ValueProvider<Integer>. אם אתם יודעים מראש מה הערך של הפרמטר, אתם יכולים להשתמש ב-StaticValueProvider כדי לציין את הערך הסטטי כ-ValueProvider.
Java
הקוד הזה מקבל את הערך בזמן הריצה של צינור העיבוד:
.apply(ParDo.of(new MySumFn(options.getInt())))
במקום זאת, אפשר להשתמש ב-StaticValueProvider עם ערך סטטי:
.apply(ParDo.of(new MySumFn(StaticValueProvider.of(10))))
Python
הקוד הזה מקבל את הערך בזמן הריצה של צינור העיבוד:
beam.ParDo(MySumFn(user_options.templated_int))
במקום זאת, אפשר להשתמש ב-StaticValueProvider עם ערך סטטי:
beam.ParDo(MySumFn(StaticValueProvider(int,10)))
אפשר להשתמש ב-StaticValueProvider גם כשמטמיעים מודול קלט/פלט שתומך גם בפרמטרים רגילים וגם בפרמטרים של זמן ריצה.
StaticValueProvider מצמצם את הכפילות בקוד שנובעת מהטמעה של שתי שיטות דומות.
Java
קוד המקור של הדוגמה הזו הוא מ-TextIO.java ב-GitHub של Apache Beam.
// Create a StaticValueProvider<String> from a regular String parameter // value, and then call .from() with this new StaticValueProvider. public Read from(String filepattern) { checkNotNull(filepattern, "Filepattern cannot be empty."); return from(StaticValueProvider.of(filepattern)); } // This method takes a ValueProvider parameter. public Read from(ValueProvider<String> filepattern) { checkNotNull(filepattern, "Filepattern cannot be empty."); return toBuilder().setFilepattern(filepattern).build(); }
Python
בדוגמה הזו, יש בנאי יחיד שמקבל ארגומנט מסוג string או ValueProvider. אם הארגומנט הוא string, הוא מומר ל-StaticValueProvider.
class Read(): def __init__(self, filepattern): if isinstance(filepattern, str): # Create a StaticValueProvider from a regular string parameter filepattern = StaticValueProvider(str, filepattern) self.filepattern = filepattern
שימוש ב-NestedStaticValueProvider
כדי לחשב ערך מאובייקט ValueProvider אחר, משתמשים בפונקציה
NestedValueProvider.
הפונקציה NestedValueProvider מקבלת כקלט את ValueProvider ואת SerializableFunction המתרגם. כשקוראים לפונקציה
.get() ב-NestedValueProvider, המתרגם
יוצר ערך חדש על סמך הערך ValueProvider. התרגום הזה מאפשר לכם להשתמש בערך ValueProvider כדי ליצור את הערך הסופי שאתם רוצים.
בדוגמה הבאה, המשתמש מספק את שם הקובץ file.txt. ההמרה מוסיפה את הנתיב gs://directory_name/ לפני שם הקובץ. התקשרות אל .get() מחזירה
gs://directory_name/file.txt.
Java
public interface WriteIntsOptions extends PipelineOptions { // New runtime parameter, specified by the --fileName // option at runtime. ValueProvider<String> getFileName(); void setFileName(ValueProvider<String> value); } public static void main(String[] args) { WriteIntsOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(WriteIntsOptions.class); Pipeline p = Pipeline.create(options); p.apply(Create.of(1, 2, 3)) // Write to the computed complete file path. .apply("OutputNums", TextIO.write().to(NestedValueProvider.of( options.getFileName(), new SerializableFunction<String, String>() { @Override public String apply(String file) { return "gs://directoryname/" + file; } }))); p.run(); }
שימוש במטא-נתונים בקוד של צינור עיבוד הנתונים
אפשר להרחיב את התבנית עם מטא-נתונים נוספים כדי שהפרמטרים המותאמים אישית יעברו אימות כשהתבנית תופעל. אם רוצים ליצור מטא-נתונים לתבנית, פועלים לפי השלבים הבאים:
- יוצרים קובץ בפורמט JSON בשם
TEMPLATE_NAME_metadataבאמצעות הפרמטרים שבקטע פרמטרים של מטא-נתונים והפורמט שבקטע דוגמה לקובץ מטא-נתונים. מחליפים אתTEMPLATE_NAMEבשם התבנית.מוודאים שלקובץ המטא-נתונים אין סיומת שם קובץ. לדוגמה, אם שם התבנית הוא
myTemplate, קובץ המטא-נתונים שלה חייב להיותmyTemplate_metadata. - מאחסנים את קובץ המטא-נתונים ב-Cloud Storage באותה תיקייה שבה נמצא התבנית.
פרמטרים של מטא-נתונים
| מפתח הפרמטר | חובה | תיאור הערך | |
|---|---|---|---|
name |
כן | השם של התבנית. | |
description |
לא | פסקה קצרה של טקסט שמתארת את התבנית. | |
streaming |
לא | אם הערך הוא true, התבנית הזו תומכת בסטרימינג. ערך ברירת המחדל הוא
false. |
|
supportsAtLeastOnce |
לא | אם true, התבנית הזו תומכת בעיבוד של לפחות פעם אחת. ערך ברירת המחדל הוא false. מגדירים את הפרמטר הזה לערך true אם התבנית מיועדת לעבודה עם מצב סטרימינג של לפחות פעם אחת.
|
|
supportsExactlyOnce |
לא | אם true, התבנית הזו תומכת בזמן עיבוד של פעם אחת בדיוק. ערך ברירת המחדל הוא true. |
|
defaultStreamingMode |
לא | מצב הסטרימינג שמוגדר כברירת מחדל בתבניות שתומכות גם במצב 'לפחות פעם אחת' וגם במצב 'בדיוק פעם אחת'. משתמשים באחד מהערכים הבאים: "AT_LEAST_ONCE", "EXACTLY_ONCE". אם לא מציינים ערך, מצב הסטרימינג שמוגדר כברירת מחדל הוא בדיוק פעם אחת.
|
|
parameters |
לא | מערך של פרמטרים נוספים שהתבנית משתמשת בהם. מערך ריק משמש כברירת מחדל. | |
name |
כן | שם הפרמטר שמשמש בתבנית. | |
label |
כן | מחרוזת שקריאה לאנשים ומשמשת ב Google Cloud מסוף לתווית של הפרמטר. | |
helpText |
כן | פסקת טקסט קצרה שמתארת את הפרמטר. | |
isOptional |
לא | false אם הפרמטר נדרש ו-true אם הפרמטר אופציונלי. אם לא מגדירים ערך, ברירת המחדל של isOptional היא false.
אם לא כוללים את מפתח הפרמטר הזה במטא-נתונים, המטא-נתונים הופכים לפרמטר חובה. |
|
regexes |
לא | מערך של ביטויים רגולריים של POSIX-egrep בצורת מחרוזת שמשמש לאימות הערך של הפרמטר. לדוגמה, ["^[a-zA-Z][a-zA-Z0-9]+"] הוא ביטוי רגולרי יחיד שבודק שהערך מתחיל באות ואחריו יש תו אחד או יותר. מערך ריק משמש כברירת מחדל. |
קובץ מטא-נתונים לדוגמה
Java
שירות Dataflow משתמש במטא-נתונים הבאים כדי לאמת את הפרמטרים המותאמים אישית של תבנית WordCount:
{ "description": "An example pipeline that counts words in the input file.", "name": "Word Count", "streaming": false, "parameters": [ { "regexes": [ "^gs:\\/\\/[^\\n\\r]+$" ], "name": "inputFile", "helpText": "Path of the file pattern glob to read from - for example, gs://dataflow-samples/shakespeare/kinglear.txt", "label": "Input Cloud Storage file(s)" }, { "regexes": [ "^gs:\\/\\/[^\\n\\r]+$" ], "name": "output", "helpText": "Path and filename prefix for writing output files - for example, gs://MyBucket/counts", "label": "Output Cloud Storage file(s)" } ] }
Python
שירות Dataflow משתמש במטא-נתונים הבאים כדי לאמת את הפרמטרים המותאמים אישית של תבנית WordCount:
{ "description": "An example pipeline that counts words in the input file.", "name": "Word Count", "streaming": false, "parameters": [ { "regexes": [ "^gs:\\/\\/[^\\n\\r]+$" ], "name": "input", "helpText": "Path of the file pattern glob to read from - for example, gs://dataflow-samples/shakespeare/kinglear.txt", "label": "Input Cloud Storage file(s)" }, { "regexes": [ "^gs:\\/\\/[^\\n\\r]+$" ], "name": "output", "helpText": "Path and filename prefix for writing output files - for example, gs://MyBucket/counts", "label": "Output Cloud Storage file(s)" } ] }
אפשר להוריד קבצים של מטא-נתונים לתבניות ש-Google מספקת מספריית התבניות של Dataflow.
מחברים נתמכים של קלט/פלט של צינורות עיבוד נתונים ו-ValueProvider
Java
חלק מהמחברים של קלט/פלט מכילים שיטות שמקבלות אובייקטים מסוג ValueProvider. כדי לבדוק אם מחבר ושיטה ספציפיים נתמכים, אפשר לעיין בתיעוד העזר של ה-API של מחבר ה-I/O. לשיטות הנתמכות יש עומס יתר עם ValueProvider. אם לשיטה אין עומס יתר, השיטה לא תומכת בפרמטרים של זמן ריצה. מחברי הקלט/פלט הבאים נתמכים לפחות באופן חלקי על ידי ValueProvider:
- קובצי IOs:
TextIO,AvroIO,FileIO,TFRecordIO,XmlIO BigQueryIO*-
BigtableIO(נדרשת גרסת SDK 2.3.0 ואילך) PubSubIOSpannerIO
Python
חלק מהמחברים של קלט/פלט מכילים שיטות שמקבלות אובייקטים מסוג ValueProvider. כדי לבדוק אם מחברי I/O נתמכים ומהן השיטות שלהם, אפשר לעיין במאמרי העזרה של ה-API של המחבר. מחברי הקלט/פלט הבאים מקבלים פרמטרים של זמן ריצה:
- קובצי קלט/פלט:
textio,avroio,tfrecordio
יצירה של תבנית מותאמת אישית והעברה שלה לשלב ההכנה
אחרי שכותבים את צינור הנתונים, צריך ליצור את קובץ התבנית ולהכין אותו להעלאה. כשיוצרים תבנית ומעבירים אותה לשלב ההכנה, מיקום ההכנה כולל קבצים נוספים שנדרשים להפעלת התבנית. אם מוחקים את מיקום ההעברה הזמנית, התבנית לא תפעל. המשימה של Dataflow לא מופעלת מיד אחרי שמעבירים את התבנית לאזור זמני. כדי להריץ משימת Dataflow מותאמת אישית שמבוססת על תבנית, אפשר להשתמש במסוףGoogle Cloud , ב-API בארכיטקטורת REST של Dataflow או ב-ה-CLI של gcloud.
בדוגמה הבאה אפשר לראות איך מעבירים קובץ תבנית לשלב ההכנה:
Java
הפקודה הזו של Maven יוצרת תבנית ומעבירה אותה למיקום ב-Cloud Storage שצוין באמצעות --templateLocation.
mvn compile exec:java \
-Dexec.mainClass=com.example.myclass \
-Dexec.args="--runner=DataflowRunner \
--project=PROJECT_ID \
--stagingLocation=gs://BUCKET_NAME/staging \
--templateLocation=gs://BUCKET_NAME/templates/TEMPLATE_NAME \
--region=REGION" \
-P dataflow-runner
מוודאים שהנתיב של templateLocation נכון. מחליפים את מה שכתוב בשדות הבאים:
-
com.example.myclass: מחלקת Java -
PROJECT_ID: מזהה הפרויקט -
BUCKET_NAME: שם הקטגוריה של Cloud Storage -
TEMPLATE_NAME: השם של התבנית -
REGION: האזור שבו רוצים לפרוס את משימת Dataflow
Python
הפקודה הזו של Python יוצרת תבנית ומעבירה אותה למיקום ב-Cloud Storage שצוין באמצעות --template_location.
python -m examples.mymodule \
--runner DataflowRunner \
--project PROJECT_ID \
--staging_location gs://BUCKET_NAME/staging \
--template_location gs://BUCKET_NAME/templates/TEMPLATE_NAME \
--region REGION
מוודאים שהנתיב של template_location נכון. מחליפים את מה שכתוב בשדות הבאים:
-
examples.mymodule: מודול Python -
PROJECT_ID: מזהה הפרויקט -
BUCKET_NAME: שם הקטגוריה של Cloud Storage -
TEMPLATE_NAME: השם של התבנית -
REGION: האזור שבו רוצים לפרוס את משימת Dataflow
אחרי שיוצרים את התבנית ומכינים אותה להפעלה, השלב הבא הוא להפעיל את התבנית.