בדף הזה מפורטים טיפים לפתרון בעיות ואסטרטגיות לניפוי באגים שיכולים לעזור לכם אם אתם נתקלים בבעיות בבנייה או בהפעלה של צינור Dataflow. המידע הזה יכול לעזור לכם לזהות כשל בצינור, לקבוע את הסיבה להרצת צינור שנכשלה ולהציע כמה דרכי פעולה לתיקון הבעיה.
הדיאגרמה הבאה מציגה את תהליך העבודה לפתרון בעיות ב-Dataflow שמתואר בדף הזה.
Dataflow מספק משוב בזמן אמת על העבודה, ויש קבוצה בסיסית של שלבים שאפשר להשתמש בהם כדי לבדוק את הודעות השגיאה, את היומנים ואת התנאים, כמו התקדמות העבודה שנעצרה.
הנחיות לפתרון שגיאות נפוצות שאתם עשויים להיתקל בהן כשמריצים את משימת Dataflow זמינות במאמר פתרון שגיאות ב-Dataflow. כדי לעקוב אחרי הביצועים של צינור עיבוד הנתונים ולפתור בעיות, אפשר לעיין במאמר מעקב אחרי הביצועים של צינור עיבוד הנתונים.
שיטות מומלצות לצינורות עיבוד נתונים
בהמשך מפורטות שיטות מומלצות לצינורות עיבוד נתונים של Java, Python ו-Go.
- בכל צינורות הנתונים, צריך לוודא שהגדרתם מיקומים שונים למיקום הזמני ולמיקום של שלב הביניים.
- שימוש בקטגוריית אחסון נפרדת של שלבי ביניים יכול להאיץ את ההפעלה מחדש של צינור עיבוד הנתונים, כי אפשר לעשות שימוש חוזר בארטיפקטים במקום להוריד אותם מחדש.
- קבצים שהועברו לאזור ההמתנה מאחסנים ארטיפקטים שנוצרו בזמן התחלת העבודה, כמו קוד צינור הנתונים, ואפשר לעשות בהם שימוש חוזר במהלך משך החיים של העבודה.
- במשימות של עיבוד אצווה או של סטרימינג שהסתיימו, אפשר למחוק קבצים שהועברו לאזור ההמתנה אחרי שהמשימה הסתיימה.
- במשימות באצווה או במשימות של סטרימינג שנמצאות בתהליך, אל תמחקו קבצים זמניים, גם אחרי עדכון של צינור הנתונים.
- קבצים זמניים לא נמחקים אוטומטית. אם אין לכם מדיניות למחיקת קבצים ישנים, כמו זמן חיים (TTL), תצטרכו להסיר אותם באופן ידני. כדי להימנע מעלויות אחסון, צריך להגדיר מדיניות של אורך חיים (TTL) לדליים הזמניים ולדליים של סביבת הבמה.
- למאגר הזמני, מגדירים TTL קצת יותר ארוך ממשך הזמן של העבודה הארוכה ביותר. לדוגמה, זמן חיים (TTL) של 7 ימים הוא נקודת התחלה סבירה.
- בדלי של הסביבה הזמנית, מגדירים ערך TTL ארוך יותר כדי לאפשר שימוש חוזר בארטיפקטים בין הפעלות עוקבות של משימות, וכך לקצר את זמן ההפעלה של המשימה. לדוגמה, נקודת התחלה סבירה היא TTL של 6 חודשים.
- כדי להימנע מעלויות אחסון מיותרות, מומלץ להשבית את המחיקה הרכה בדליים הזמניים ובדלילי הביניים.
בדיקת הסטטוס של צינור המכירות
אפשר לזהות שגיאות בהרצות של צינורות העיבוד באמצעות ממשק המעקב של Dataflow.
- עוברים אל Google Cloud המסוף.
- בוחרים את הפרויקט ב-Google Cloud Platform מתוך רשימת הפרויקטים.
- בתפריט הניווט, בקטע Big Data, לוחצים על Dataflow. רשימת המשימות הפעילות מופיעה בחלונית השמאלית.
- בוחרים את משימת הצינור שרוצים לראות. אפשר לראות את סטטוס העבודות במבט חטוף בשדה סטטוס: 'פועל', 'הושלם' או 'נכשל'.
איך מחפשים מידע על כשלים בצינורות עיבוד נתונים
אם אחת מהמשימות בצינור נכשלת, אפשר לבחור את המשימה כדי לראות מידע מפורט יותר על השגיאות ועל תוצאות ההרצה. כשבוחרים משימה, אפשר לראות את התרשימים העיקריים של צינור הנתונים, את גרף ההפעלה, את החלונית פרטי המשימה ואת החלונית יומנים עם הכרטיסיות יומני המשימה, יומני העובד, אבחון והמלצות.
בדיקת הודעות שגיאה שקשורות לעבודות
כדי לראות את יומני העבודות שנוצרו על ידי קוד צינור הנתונים ושירות Dataflow, בחלונית Logs (יומנים), לוחצים על segmentShow (הצגה).
כדי לסנן את ההודעות שמופיעות ביומני המשימות, לוחצים על מידעarrow_drop_down ועל filter_listסינון. כדי להציג רק הודעות שגיאה, לוחצים על מידעarrow_drop_down ובוחרים באפשרות שגיאה.
כדי להרחיב הודעת שגיאה, לוחצים על הקטע שניתן להרחבה 0x0A>arrow_right.
לחלופין, אפשר ללחוץ על הכרטיסייה 'אבחון'. בכרטיסייה הזו מוצגות השגיאות שהתרחשו לאורך ציר הזמן שנבחר, ספירה של כל השגיאות שנרשמו והמלצות אפשריות לפייפליין.
הצגת יומני שלבים של המשימה
כשבוחרים שלב בתרשים של צינור העברת הנתונים, החלונית 'יומנים' משנה את התצוגה מיומני עבודות שנוצרו על ידי שירות Dataflow ליומנים ממופעי Compute Engine שמריצים את השלב בצינור העברת הנתונים.
Cloud Logging משלב את כל היומנים שנאספו מהמכונות של Compute Engine בפרויקט שלכם במקום אחד. מידע נוסף על השימוש ביכולות השונות של Dataflow לרישום ביומן מופיע במאמר רישום ביומן של הודעות בצינור.
טיפול בדחייה של צינור אוטומטי לעיבוד נתונים
במקרים מסוימים, שירות Dataflow מזהה שהצינור שלכם עלול להפעיל בעיות מוכרות ב-SDK. כדי למנוע שליחה של צינורות שסביר שיעוררו בעיות, Dataflow דוחה באופן אוטומטי את הצינור ומציג את ההודעה הבאה:
The workflow was automatically rejected by the service because it might trigger an identified bug in the SDK (details below). If you think this identification is in error, and would like to override this automated rejection, please re-submit this workflow with the following override flag: [OVERRIDE FLAG]. Bug details: [BUG DETAILS]. Contact Google Cloud Support for further help. Please use this identifier in your communication: [BUG ID].
אחרי שתקראו את האזהרות בפרטי הבאג המקושרים, תוכלו לבטל את הדחייה האוטומטית אם בכל זאת תרצו להריץ את צינור הנתונים. מוסיפים את הדגל --experiments=<override-flag> ושולחים מחדש את צינור העיבוד.
זיהוי הגורם לכשל בצינור עיבוד נתונים
בדרך כלל, הפעלה של צינור עיבוד נתונים של Apache Beam נכשלת בגלל אחת מהסיבות הבאות:
- שגיאות בבניית גרף או צינור לעיבוד נתונים. השגיאות האלה מתרחשות כש-Dataflow נתקל בבעיה בבניית הגרף של השלבים שמרכיבים את צינור העיבוד, כפי שמתואר בצינור העיבוד של Apache Beam.
- שגיאות באימות המשרה. שירות Dataflow מאמת כל משימת פייפליין שאתם מפעילים. שגיאות בתהליך האימות יכולות למנוע את היצירה או ההפעלה של העבודה. שגיאות אימות יכולות לכלול בעיות בקטגוריה של Cloud Storage בפרויקט Google Cloud או בהרשאות של הפרויקט.
- חריגים בקוד של ה-Worker. השגיאות האלה מתרחשות כשיש שגיאות או באגים בקוד שהמשתמש סיפק, ש-Dataflow מפיץ לעובדים מקבילים, כמו מופעי
DoFnשל טרנספורמציהParDo. - שגיאות שנגרמות בגלל כשלים חולפים בשירותים אחרים Google Cloud . יכול להיות שהצינור ייכשל בגלל הפסקה זמנית בשירות או בעיה אחרת בGoogle Cloud שירותים ש-Dataflow מסתמך עליהם, כמו Compute Engine או Cloud Storage.
זיהוי שגיאות בבניית גרף או צינור עיבוד נתונים
שגיאה ביצירת תרשים יכולה להתרחש כש-Dataflow יוצר את תרשים הביצוע של צינור עיבוד הנתונים מהקוד בתוכנית Dataflow. במהלך בניית התרשים, Dataflow בודק אם יש פעולות לא חוקיות.
אם Dataflow מזהה שגיאה בבניית הגרף, חשוב לזכור שלא נוצרת משימה בשירות Dataflow. לכן, לא מוצג משוב בממשק המעקב של Dataflow. במקום זאת, מופיעה הודעת שגיאה דומה להודעה הבאה במסוף או בחלון הטרמינל שבו הפעלתם את צינור Apache Beam:
Java
לדוגמה, אם צינור הנתונים מנסה לבצע צבירה כמו
GroupByKey בחלון גלובלי, לא מופעל,
לא מוגבל PCollection, מופיעה הודעת שגיאה שדומה להודעה הבאה:
... ... Exception in thread "main" java.lang.IllegalStateException: ... GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without a trigger. ... Use a Window.into or Window.triggering transform prior to GroupByKey ...
Python
לדוגמה, אם בצינור שלכם נעשה שימוש ברמזים לגבי סוגים וסוג הארגומנט באחת מהטרנספורמציות לא תואם למה שציפיתם, תוצג הודעת שגיאה בדומה להודעה הבאה:
... in <module> run()
... in run | beam.Map('count', lambda (word, ones): (word, sum(ones))))
... in __or__ return self.pipeline.apply(ptransform, self)
... in apply transform.type_check_inputs(pvalueish)
... in type_check_inputs self.type_check_inputs_or_outputs(pvalueish, 'input')
... in type_check_inputs_or_outputs pvalue_.element_type))
google.cloud.dataflow.typehints.decorators.TypeCheckError: Input type hint violation at group: expected Tuple[TypeVariable[K], TypeVariable[V]], got <type 'str'>
Go
לדוגמה, אם צינור הנתונים משתמש ב-`DoFn` שלא מקבל קלט, תופיע הודעת שגיאה דומה לזו:
... panic: Method ProcessElement in DoFn main.extractFn is missing all inputs. A main input is required. ... Full error: ... inserting ParDo in scope root/CountWords ... graph.AsDoFn: for Fn named main.extractFn ... ProcessElement method has no main inputs ... goroutine 1 [running]: ... github.com/apache/beam/sdks/v2/go/pkg/beam.MustN(...) ... (more stacktrace)
אם נתקלתם בשגיאה כזו, עליכם לבדוק את קוד צינור הנתונים כדי לוודא שהפעולות בצינור הנתונים חוקיות.
זיהוי שגיאות באימות של משימות Dataflow
אחרי ששירות Dataflow מקבל את הגרף של צינור הנתונים, הוא מנסה לאמת את העבודה. האימות הזה כולל את הפעולות הבאות:
- מוודאים שלשירות יש גישה לקטגוריות של Cloud Storage שמשויכות לעבודה לצורך העברת קבצים והפקת פלט זמני.
- בודקים אם יש לכם את ההרשאות הנדרשות בפרויקט Google Cloud .
- לוודא שלשירות יש גישה למקורות קלט ופלט, כמו קבצים.
אם העבודה נכשלת בתהליך האימות, תוצג הודעת שגיאה בממשק המעקב של Dataflow, וגם במסוף או בחלון הטרמינל אם משתמשים בהפעלה חוסמת. הודעת השגיאה דומה לזו:
Java
INFO: To access the Dataflow monitoring console, please navigate to
https://console.developers.google.com/project/google.com%3Aclouddfe/dataflow/job/2016-03-08_18_59_25-16868399470801620798
Submitted job: 2016-03-08_18_59_25-16868399470801620798
...
... Starting 3 workers...
... Executing operation BigQuery-Read+AnonymousParDo+BigQuery-Write
... Executing BigQuery import job "dataflow_job_16868399470801619475".
... Stopping worker pool...
... Workflow failed. Causes: ...BigQuery-Read+AnonymousParDo+BigQuery-Write failed.
Causes: ... BigQuery getting table "non_existent_table" from dataset "cws_demo" in project "my_project" failed.
Message: Not found: Table x:cws_demo.non_existent_table HTTP Code: 404
... Worker pool stopped.
... com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner run
INFO: Job finished with status FAILED
Exception in thread "main" com.google.cloud.dataflow.sdk.runners.DataflowJobExecutionException:
Job 2016-03-08_18_59_25-16868399470801620798 failed with status FAILED
at com.google.cloud.dataflow.sdk.runners.DataflowRunner.run(DataflowRunner.java:155)
at com.google.cloud.dataflow.sdk.runners.DataflowRunner.run(DataflowRunner.java:56)
at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:180)
at com.google.cloud.dataflow.integration.BigQueryCopyTableExample.main(BigQueryCopyTableExample.java:74)
Python
INFO:root:Created job with id: [2016-03-08_14_12_01-2117248033993412477] ... Checking required Cloud APIs are enabled. ... Job 2016-03-08_14_12_01-2117248033993412477 is in state JOB_STATE_RUNNING. ... Combiner lifting skipped for step group: GroupByKey not followed by a combiner. ... Expanding GroupByKey operations into optimizable parts. ... Lifting ValueCombiningMappingFns into MergeBucketsMappingFns ... Annotating graph with Autotuner information. ... Fusing adjacent ParDo, Read, Write, and Flatten operations ... Fusing consumer split into read ... ... Starting 1 workers... ... ... Executing operation read+split+pair_with_one+group/Reify+group/Write ... Executing failure step failure14 ... Workflow failed. Causes: ... read+split+pair_with_one+group/Reify+group/Write failed. Causes: ... Unable to view metadata for files: gs://dataflow-samples/shakespeare/missing.txt. ... Cleaning up. ... Tearing down pending resources... INFO:root:Job 2016-03-08_14_12_01-2117248033993412477 is in state JOB_STATE_FAILED.
Go
בשלב הזה, אימות המשימות שמתואר בקטע הזה לא נתמך ב-Go. שגיאות שנובעות מהבעיות האלה מופיעות כחריגות של העובד.
זיהוי חריגה בקוד של Worker
במהלך הפעלת העבודה, יכול להיות שתיתקלו בשגיאות או בחריגים בקוד של העובד. בדרך כלל השגיאות האלה מעידות על כך שהפקודות DoFn בקוד של צינור העברת הנתונים יצרו חריגים שלא טופלו, ולכן המשימות בעבודת Dataflow נכשלו.
חריגים בקוד המשתמש (לדוגמה, מופעי DoFn) מדווחים בממשק המעקב של Dataflow.
אם מריצים את צינור הנתונים עם חסימת הביצוע, הודעות השגיאה מודפסות במסוף או בחלון המסוף, כמו ההודעה הבאה:
Java
INFO: To access the Dataflow monitoring console, please navigate to https://console.developers.google.com/project/example_project/dataflow/job/2017-05-23_14_02_46-1117850763061203461
Submitted job: 2017-05-23_14_02_46-1117850763061203461
...
... To cancel the job using the 'gcloud' tool, run: gcloud beta dataflow jobs --project=example_project cancel 2017-05-23_14_02_46-1117850763061203461
... Autoscaling is enabled for job 2017-05-23_14_02_46-1117850763061203461.
... The number of workers will be between 1 and 15.
... Autoscaling was automatically enabled for job 2017-05-23_14_02_46-1117850763061203461.
...
... Executing operation BigQueryIO.Write/BatchLoads/Create/Read(CreateSource)+BigQueryIO.Write/BatchLoads/GetTempFilePrefix+BigQueryIO.Write/BatchLoads/TempFilePrefixView/BatchViewOverrides.GroupByWindowHashAsKeyAndWindowAsSortKey/ParDo(UseWindowHashAsKeyAndWindowAsSortKey)+BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)+BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Combine.perKey(Singleton)/GroupByKey/Reify+BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Combine.perKey(Singleton)/GroupByKey/Write+BigQueryIO.Write/BatchLoads/TempFilePrefixView/BatchViewOverrides.GroupByWindowHashAsKeyAndWindowAsSortKey/BatchViewOverrides.GroupByKeyAndSortValuesOnly/Write
... Workers have started successfully.
...
... org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process SEVERE: 2017-05-23T21:06:33.711Z: (c14bab21d699a182): java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: java.lang.ArithmeticException: / by zero
at com.google.cloud.dataflow.worker.runners.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:146)
at com.google.cloud.dataflow.worker.runners.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:104)
at com.google.cloud.dataflow.worker.util.BatchGroupAlsoByWindowAndCombineFn.closeWindow(BatchGroupAlsoByWindowAndCombineFn.java:191)
...
... Cleaning up.
... Stopping worker pool...
... Worker pool stopped.
Python
INFO:root:Job 2016-03-08_14_21_32-8974754969325215880 is in state JOB_STATE_RUNNING. ... INFO:root:... Expanding GroupByKey operations into optimizable parts. INFO:root:... Lifting ValueCombiningMappingFns into MergeBucketsMappingFns INFO:root:... Annotating graph with Autotuner information. INFO:root:... Fusing adjacent ParDo, Read, Write, and Flatten operations ... INFO:root:...: Starting 1 workers... INFO:root:...: Executing operation group/Create INFO:root:...: Value "group/Session" materialized. INFO:root:...: Executing operation read+split+pair_with_one+group/Reify+group/Write INFO:root:Job 2016-03-08_14_21_32-8974754969325215880 is in state JOB_STATE_RUNNING. INFO:root:...: ...: Workers have started successfully. INFO:root:Job 2016-03-08_14_21_32-8974754969325215880 is in state JOB_STATE_RUNNING. INFO:root:...: Traceback (most recent call last): File ".../dataflow_worker/batchworker.py", line 384, in do_work self.current_executor.execute(work_item.map_task) ... File ".../apache_beam/examples/wordcount.runfiles/py/apache_beam/examples/wordcount.py", line 73, in <lambda> ValueError: invalid literal for int() with base 10: 'www'
Go
... 2022-05-26T18:32:52.752315397Zprocess bundle failed for instruction ... process_bundle-4031463614776698457-2 using plan s02-6 : while executing ... Process for Plan[s02-6] failed: Oh no! This is an error message!
כדאי להוסיף לטיפול בשגיאות בקוד מטפלים בחריגים. לדוגמה, אם רוצים להשמיט רכיבים שלא עוברים אימות קלט מותאם אישית שבוצע ב-ParDo, צריך לטפל בחריגה בתוך DoFn ולהשמיט את הרכיב.
אפשר גם לעקוב אחרי רכיבים שנכשלים בכמה דרכים שונות:
- אפשר לרשום ביומן את הרכיבים שנכשלו ולבדוק את הפלט באמצעות Cloud Logging.
- כדי לבדוק אם יש אזהרות או שגיאות ביומנים של תהליך העבודה של Dataflow ושל הפעלת תהליך העבודה, פועלים לפי ההוראות במאמר בנושא הצגת יומנים.
- אפשר לבקש מ-
ParDoלכתוב את הרכיבים שנכשלו בפלט נוסף לבדיקה מאוחרת יותר.
כדי לעקוב אחרי המאפיינים של צינור עיבוד נתונים שפועל, אפשר להשתמש במחלקה Metrics, כמו בדוגמה הבאה:
Java
final Counter counter = Metrics.counter("stats", "even-items"); PCollection<Integer> input = pipeline.apply(...); ... input.apply(ParDo.of(new DoFn<Integer, Integer>() { @ProcessElement public void processElement(ProcessContext c) { if (c.element() % 2 == 0) { counter.inc(); } });
Python
class FilterTextFn(beam.DoFn): """A DoFn that filters for a specific key based on a regex.""" def __init__(self, pattern): self.pattern = pattern # A custom metric can track values in your pipeline as it runs. Create # custom metrics to count unmatched words, and know the distribution of # word lengths in the input PCollection. self.word_len_dist = Metrics.distribution(self.__class__, 'word_len_dist') self.unmatched_words = Metrics.counter(self.__class__, 'unmatched_words') def process(self, element): word = element self.word_len_dist.update(len(word)) if re.match(self.pattern, word): yield element else: self.unmatched_words.inc() filtered_words = ( words | 'FilterText' >> beam.ParDo(FilterTextFn('s.*')))
Go
func addMetricDoFnToPipeline(s beam.Scope, input beam.PCollection) beam.PCollection { return beam.ParDo(s, &MyMetricsDoFn{}, input) } func executePipelineAndGetMetrics(ctx context.Context, p *beam.Pipeline) (metrics.QueryResults, error) { pr, err := beam.Run(ctx, runner, p) if err != nil { return metrics.QueryResults{}, err } // Request the metric called "counter1" in namespace called "namespace" ms := pr.Metrics().Query(func(r beam.MetricResult) bool { return r.Namespace() == "namespace" && r.Name() == "counter1" }) // Print the metric value - there should be only one line because there is // only one metric called "counter1" in the namespace called "namespace" for _, c := range ms.Counters() { fmt.Println(c.Namespace(), "-", c.Name(), ":", c.Committed) } return ms, nil } type MyMetricsDoFn struct { counter beam.Counter } func init() { beam.RegisterType(reflect.TypeOf((*MyMetricsDoFn)(nil))) } func (fn *MyMetricsDoFn) Setup() { // While metrics can be defined in package scope or dynamically // it's most efficient to include them in the DoFn. fn.counter = beam.NewCounter("namespace", "counter1") } func (fn *MyMetricsDoFn) ProcessElement(ctx context.Context, v beam.V, emit func(beam.V)) { // count the elements fn.counter.Inc(ctx, 1) emit(v) }
פתרון בעיות של צינורות שפועלים לאט או שלא מפיקים פלט
אפשר לעיין בדפים הבאים:
שגיאות נפוצות ופעולות מומלצות
אם אתם יודעים מהי השגיאה שגרמה לכישלון של צינור עיבוד הנתונים, תוכלו להיעזר בהנחיות לפתרון בעיות שמופיעות בדף פתרון בעיות ב-Dataflow.