שדרוג של צינור עיבוד נתונים לסטרימינג

בדף הזה מפורטות הנחיות והמלצות לשדרוג צינורות הנתונים של הסטרימינג. לדוגמה, יכול להיות שתצטרכו לשדרג לגרסה חדשה יותר של Apache Beam SDK, או לעדכן את קוד צינור הנתונים. יש אפשרויות שונות שמתאימות לתרחישים שונים.

צינורות של עיבוד באצווה מפסיקים כשהעבודה מסתיימת, אבל צינורות של עיבוד נתונים בזמן אמת פועלים לרוב ברציפות כדי לספק עיבוד ללא הפרעות. לכן, כשמשדרגים צינורות להעברת נתונים בסטרימינג, צריך להביא בחשבון את השיקולים הבאים:

  • יכול להיות שתצטרכו למזער את ההפרעות לצינור או להימנע מהן. במקרים מסוימים, יכול להיות שתוכלו להסתדר עם שיבוש זמני בעיבוד בזמן פריסת גרסה חדשה של צינור. במקרים אחרים, יכול להיות שהאפליקציה שלכם לא תוכל לסבול שיבושים כלשהם.
  • תהליכי עדכון של צינורות עיבוד נתונים צריכים לטפל בשינויים בסכימה באופן שממזער את השיבושים בעיבוד ההודעות ובמערכות אחרות שמחוברות אליהם. לדוגמה, אם הסכימה של ההודעות בצינור לעיבוד אירועים משתנה, יכול להיות שיהיה צורך בשינויים בסכימה גם במאגרי נתונים במורד הצינור.

אפשר להשתמש באחת מהשיטות הבאות כדי לעדכן צינורות להעברת נתונים בסטרימינג, בהתאם לצינור ולדרישות העדכון:

למידע נוסף על בעיות שאתם עשויים להיתקל בהן במהלך עדכון ואיך למנוע אותן, ראו אימות של עבודת החלפה ובדיקת תאימות של עבודה.

שיטות מומלצות

  • שדרוג גרסת Apache Beam SDK בנפרד משינויים בקוד של צינורות.
  • אחרי כל שינוי, כדאי לבדוק את צינור הנתונים לפני שמבצעים עדכונים נוספים.
  • חשוב לשדרג באופן קבוע את גרסת Apache Beam SDK שבה משתמש צינור הנתונים.
  • כדאי להשתמש בשיטות אוטומטיות כשאפשר, כמו עדכונים במהלך הטיסה או עדכונים אוטומטיים של צינורות מקבילים.
  • כדאי להשתמש ב-Managed I/O כשזה אפשרי, כדי ליהנות משדרוגים אוטומטיים של גרסאות המחברים.

ביצוע עדכונים במהלך הטיסה

אפשר לעדכן חלק מצינורות הנתונים של סטרימינג מתמשך בלי להפסיק את העבודה. התרחיש הזה נקרא עדכון על משרה פעילה. עדכונים לגבי עבודות בתהליך זמינים רק בנסיבות מוגבלות:

  • העבודה צריכה להשתמש ב-Streaming Engine.
  • המשימה צריכה להיות במצב פעיל.
  • אתם משנים רק את מספר העובדים שהמשימה משתמשת בהם.

מידע נוסף זמין במאמר בנושא הגדרת טווח של שינוי גודל אוטומטי בדף בנושא שינוי גודל אוטומטי אופקי.

הוראות לעדכון של משימה פעילה מפורטות במאמר בנושא עדכון של צינור קיים.

יצירה או עדכון אוטומטיים (upsert) של תבניות

כשמפעילים צינורות עיבוד נתונים באמצעות תבנית (תבניות קלאסיות, תבניות Flex,‏ Terraform או Config Connector), אפשר להשתמש בניסוי create_or_update_job כדי ליצור או לעדכן (upsert) פונקציונליות.

כשמציינים create_or_update_job בפרמטר additional_experiments או בדגל additional-experiments:

  • אם כבר קיימת משימה פעילה או משימה בהמתנה עם שם המשימה שצוין, שירות התבניות מפעיל באופן אוטומטי את המשימה החדשה כעדכון למשימה הקיימת.
  • אם לא קיים ג'וב פעיל עם השם הזה, שירות התבניות מפעיל את הג'וב החדש כיצירה של ג'וב חדש.

הניסוי הזה מבטל את הצורך לקבוע באופן פרוגרמטי אם להשתמש בפעולת ה-API ליצירה או לעדכון כשמפעילים תבנית.

דוגמאות קוד של Terraform ו-Config Connector שמשתמשות בניסוי הזה מופיעות בקטעים הבאים:

הפעלת משימת החלפה

אם העבודה המעודכנת תואמת לעבודה הקיימת, אפשר לעדכן את צינור הנתונים באמצעות האפשרות update. כשמחליפים משימה קיימת, משימה חדשה מריצה את קוד צינור העברת הנתונים המעודכן. שירות Dataflow שומר את שם העבודה, אבל מריץ את עבודת ההחלפה עם מזהה משימה מעודכן. התהליך הזה עלול לגרום להשבתה זמנית בזמן שהמשימה הקיימת מפסיקה, בדיקת התאימות פועלת והמשימה החדשה מתחילה. פרטים נוספים זמינים במאמר בנושא ההשפעות של החלפת תפקיד.

מערכת Dataflow מבצעת בדיקת תאימות כדי לוודא שאפשר לפרוס את הקוד המעודכן של צינור עיבוד הנתונים בבטחה בצינור עיבוד הנתונים הפעיל. שינויים מסוימים בקוד גורמים לכך שבדיקת התאימות תיכשל, למשל כשמוסיפים קלט צדדי לשלב קיים או מסירים ממנו קלט צדדי. אם בדיקת התאימות נכשלת, אי אפשר לבצע עדכון של משימה במקום.

הוראות להפעלת עבודת החלפה מופיעות במאמר בנושא הפעלת עבודת החלפה.

אם העדכון של צינור העיבוד לא תואם לעבודה הנוכחית, צריך לעצור את צינור העיבוד ולהחליף אותו. אם צינור עיבוד הנתונים לא יכול להפסיק לפעול, מריצים צינורות עיבוד נתונים מקבילים.

עצירה והחלפה ידנית

כדי לבצע עצירה והחלפה ידנית, צריך לבטל או לנקז את צינור עיבוד הנתונים, ואז להחליף אותו בצינור המעודכן. ביטול של צינור גורם ל-Dataflow להפסיק את העיבוד באופן מיידי ולהשבית את המשאבים במהירות האפשרית. כתוצאה מכך, יכול להיות שחלק מהנתונים שעוברים עיבוד יאבדו. הנתונים האלה נקראים נתונים בתהליך. כדי למנוע אובדן נתונים, ברוב המקרים מומלץ להפסיק את השימוש בתכונה. אפשר גם להשתמש בתמונות מצב של Dataflow כדי לשמור את המצב של צינור עיבוד נתונים בסטרימינג, וכך להתחיל גרסה חדשה של משימת Dataflow בלי לאבד את המצב. מידע נוסף זמין במאמר בנושא שימוש בתמונות מצב של Dataflow.

כשמרוקנים צינור עיבוד נתונים, כל החלונות בתהליך נסגרים באופן מיידי וכל הטריגרים מופעלים. למרות שהנתונים בזמן ההעברה לא הולכים לאיבוד, יכול להיות שבגלל הניקוי יהיו חלונות עם נתונים חלקיים. אם זה קורה, חלונות בתהליך פולטים תוצאות חלקיות או לא מלאות. מידע נוסף זמין במאמר בנושא ההשפעות של ניקוז משימה. אחרי שהמשימה הקיימת מסתיימת, מפעילים משימת סטרימינג חדשה שמכילה את קוד צינור העיבוד המעודכן, וכך אפשר להמשיך את העיבוד.

בשיטה הזו, תהיה השבתה מסוימת בין הזמן שבו משימת הסטרימינג הקיימת מפסיקה לפעול לבין הזמן שבו צינור הנתונים החלופי מוכן להמשך עיבוד הנתונים. עם זאת, ביטול או ניקוי של פייפליין קיים והפעלת משימה חדשה עם הפייפליין המעודכן פשוטים יותר מהפעלת פייפליינים מקבילים.

מידע נוסף זמין במאמר הפסקת פעולה של משימת Dataflow. אחרי שמסיימים את המשימה הנוכחית, מתחילים משימה חדשה עם אותו שם.

החלפה והפסקה אוטומטית

‫Dataflow מספק תמיכה ב-API להשקת עדכון אוטומטי של עצירה והחלפה. תהליך העבודה הזה בסגנון הצהרתי מבטל את השלבים הידניים. מגדירים את המשימה להחלפה, והמשימה החדשה מופעלת ומתאמת את המעבר באופן אוטומטי.

כשמשתמשים בתהליך העבודה הזה, משאבי משימות חדשים מוקצים בזמן שהמשימה הישנה עדיין פועלת. העבודה הישנה מקבלת אוטומטית אות ניקוז. אחרי שהמשימה הישנה מסיימת את הניקוז או מגיעה לזמן קצוב שהמשתמש הגדיר, המשימה החדשה מתחילה מיד לעבד את הנתונים. משתמשים בתהליך העבודה הזה לצינורות עיבוד נתונים שלא יכולים להכיל נתונים כפולים או צבירות חלקיות, אבל יכולים להכיל השהיה קצרה בעיבוד בזמן שהעבודה הישנה מתרוקנת.

שליחת בקשה אוטומטית לעדכון מסוג stop-and-replace

כדי להשתמש בתהליך העבודה הזה:

מפעילים בקשת עדכון אוטומטית של עצירה והחלפה באמצעות אפשרויות השירות הבאות:

Java

אפשרות 1: עדכון באמצעות אותו שם של משימה

--update \
--dataflowServiceOptions="update_strategy_parallel_job_update" \
--dataflowServiceOptions="parallel_replace_job_max_stop_duration=DURATION"
  • כדי לבצע עדכון אוטומטי של עצירה והחלפה באמצעות אותו שם, משתמשים בדגל --update ובאפשרות update_strategy_parallel_job_update.
  • כדי לבצע עדכון במקום, משתמשים ב-update_strategy_in_place_update.

אפשרות 2: עדכון באמצעות שם משרה אחר

--dataflowServiceOptions="parallel_replace_job_name=OLD_JOB_NAME" \
--dataflowServiceOptions="parallel_replace_job_max_stop_duration=DURATION"
  • כדי לציין את המשרה הישנה לפי מזהה במקום לפי שם המשרה, משתמשים ב---dataflowServiceOptions="parallel_replace_job_id=OLD_JOB_ID".
  • אם מציינים שם חדש למשימה ומשתמשים בדגל --update, ‏ Dataflow מחפש משימה קיימת עם השם החדש, וזה גורם לשגיאה.

אופציונלי: השבתת ביטול אוטומטי

הביטול האוטומטי מופעל כברירת מחדל כשמציינים את האפשרות parallel_replace_job_max_stop_duration. כדי להשבית את הביטול האוטומטי, מגדירים את האפשרות parallel_replace_job_cancel_on_drain_timeout לערך false.

--dataflowServiceOptions="parallel_replace_job_cancel_on_drain_timeout=false"

אם משביתים את הביטול האוטומטי והעבודה הישנה נתקעת במצב של ניקוז, העבודה הישנה והעבודה החדשה ממשיכות לפעול במקביל.

Python

אפשרות 1: עדכון באמצעות אותו שם של משימה

--update \
--dataflow_service_options="update_strategy_parallel_job_update" \
--dataflow_service_options="parallel_replace_job_max_stop_duration=DURATION"
  • כדי לבצע עדכון אוטומטי של עצירה והחלפה באמצעות אותו שם, משתמשים בדגל --update ובאפשרות update_strategy_parallel_job_update.
  • כדי לבצע עדכון במקום, משתמשים ב-update_strategy_in_place_update.

אפשרות 2: עדכון באמצעות שם משרה אחר

--dataflow_service_options="parallel_replace_job_name=OLD_JOB_NAME" \
--dataflow_service_options="parallel_replace_job_max_stop_duration=DURATION"
  • כדי לציין את המשרה הישנה לפי מזהה במקום לפי שם המשרה, משתמשים ב---dataflow_service_options="parallel_replace_job_id=OLD_JOB_ID".
  • אם מציינים שם חדש למשימה ומשתמשים בדגל --update, ‏ Dataflow מחפש משימה קיימת עם השם החדש, וזה גורם לשגיאה.

אופציונלי: השבתת ביטול אוטומטי

הביטול האוטומטי מופעל כברירת מחדל כשמציינים את האפשרות parallel_replace_job_max_stop_duration. כדי להשבית את הביטול האוטומטי, מגדירים את האפשרות parallel_replace_job_cancel_on_drain_timeout לערך false.

--dataflow_service_options="parallel_replace_job_cancel_on_drain_timeout=false"

אם משביתים את הביטול האוטומטי והעבודה הישנה נתקעת במצב של ניקוז, העבודה הישנה והעבודה החדשה ממשיכות לפעול במקביל.

המשך

אפשרות 1: עדכון באמצעות אותו שם של משימה

--update \
--dataflow_service_options="update_strategy_parallel_job_update" \
--dataflow_service_options="parallel_replace_job_max_stop_duration=DURATION"
  • כדי לבצע עדכון אוטומטי של עצירה והחלפה באמצעות אותו שם, משתמשים בדגל --update ובאפשרות update_strategy_parallel_job_update.
  • כדי לבצע עדכון במקום, משתמשים ב-update_strategy_in_place_update.

אפשרות 2: עדכון באמצעות שם משרה אחר

--dataflow_service_options="parallel_replace_job_name=OLD_JOB_NAME" \
--dataflow_service_options="parallel_replace_job_max_stop_duration=DURATION"
  • כדי לציין את המשרה הישנה לפי מזהה במקום לפי שם המשרה, משתמשים ב---dataflow_service_options="parallel_replace_job_id=OLD_JOB_ID".
  • אם מציינים שם חדש למשימה ומשתמשים בדגל --update, ‏ Dataflow מחפש משימה קיימת עם השם החדש, וזה גורם לשגיאה.

אופציונלי: השבתת ביטול אוטומטי

הביטול האוטומטי מופעל כברירת מחדל כשמציינים את האפשרות parallel_replace_job_max_stop_duration. כדי להשבית את הביטול האוטומטי, מגדירים את האפשרות parallel_replace_job_cancel_on_drain_timeout לערך false.

--dataflow_service_options="parallel_replace_job_cancel_on_drain_timeout=false"

אם משביתים את הביטול האוטומטי והעבודה הישנה נתקעת במצב של ניקוז, העבודה הישנה והעבודה החדשה ממשיכות לפעול במקביל.

gcloud

אפשרות 1: עדכון באמצעות אותו שם של משימה

--update \
--additional-experiments="update_strategy_parallel_job_update" \
--additional-experiments="parallel_replace_job_max_stop_duration=DURATION"
  • כדי לבצע עדכון אוטומטי של עצירה והחלפה באמצעות אותו שם, משתמשים בדגל --update ובאפשרות update_strategy_parallel_job_update.
  • כדי לבצע עדכון במקום, משתמשים ב-update_strategy_in_place_update.

אפשרות 2: עדכון באמצעות שם משרה אחר

--additional-experiments="parallel_replace_job_name=OLD_JOB_NAME" \
--additional-experiments="parallel_replace_job_max_stop_duration=DURATION"
  • כדי לציין את המשרה הישנה לפי מזהה במקום לפי שם המשרה, משתמשים ב---additional-experiments="parallel_replace_job_id=OLD_JOB_ID".
  • אם מציינים שם חדש למשימה ומשתמשים בדגל --update, ‏ Dataflow מחפש משימה קיימת עם השם החדש, וזה גורם לשגיאה.

אופציונלי: השבתת ביטול אוטומטי

הביטול האוטומטי מופעל כברירת מחדל כשמציינים את האפשרות parallel_replace_job_max_stop_duration. כדי להשבית את הביטול האוטומטי, מגדירים את האפשרות parallel_replace_job_cancel_on_drain_timeout לערך false.

--additional-experiments="parallel_replace_job_cancel_on_drain_timeout=false"

אם משביתים את הביטול האוטומטי והעבודה הישנה נתקעת במצב של ניקוז, העבודה הישנה והעבודה החדשה ממשיכות לפעול במקביל.

אופציונלי: פעולת Upsert (יצירה או עדכון של משימה)

כדי להפעיל התנהגות של upsert (יצירה או עדכון של משימה):

--additional-experiments="create_or_update_job"

Terraform

additional_experiments = [
  "parallel_replace_job_max_stop_duration=DURATION",
  "parallel_replace_job_cancel_on_drain_timeout=true",
  "update_strategy_parallel_job_update",
  "parallel_replace_job_preallocate_compute_resources=true",
  "create_or_update_job"
]

Config Connector

metadata:
  annotations:
    # Force to use only direct controller. Multiple controllers can cause a Dataflow job to enter into a continuous update loop.
    # https://docs.cloud.google.com/config-connector/docs/concepts/controller-types#underlying-controller-types
    alpha.cnrm.cloud.google.com/reconciler: direct
    # Optional but recommended: Dataflow batch jobs do not support the drain operation. But for Dataflow streaming jobs, prefer "drain" over "cancel" as an on-delete option.
    cnrm.cloud.google.com/on-delete: drain
    # Optional but recommended: Set deletion-policy to "abandon" to avoid accidental deletion, this will ignore the on-delete option.
    cnrm.cloud.google.com/deletion-policy: abandon
spec:
  ...
  additionalExperiments:
    - "parallel_replace_job_max_stop_duration=DURATION"
    - "parallel_replace_job_cancel_on_drain_timeout=true"
    - "update_strategy_parallel_job_update"
    - "parallel_replace_job_preallocate_compute_resources=true"
    - "create_or_update_job"

מחליפים את המשתנים הבאים:

  • כדי לזהות את המשרה שרוצים להחליף, צריך לציין את parallel_replace_job_name או את parallel_replace_job_id:
    • OLD_JOB_NAME: שם המשימה שרוצים להחליף.
    • OLD_JOB_ID: המזהה של המשרה שרוצים להחליף.
  • כדי להפעיל את התכונה 'החלפה אוטומטית של קמפיין', צריך לציין את הערך parallel_replace_job_max_stop_duration:
    • DURATION: משך הזמן המקסימלי שהמשימה החדשה ממתינה עד לסיום הניקוז של המשימה הקודמת. המשך צריך להיות בפורמט של מחרוזת שמסתיימת ב-s, ב-m או ב-h (לדוגמה, 30m,‏ 1h).
  • אם משתמשים בתהליך העבודה הזה, לא מגדירים את האפשרות parallel_replace_job_min_parallel_pipelines_duration. הגדרת האפשרות הזו מפעילה במקום זאת את תהליך העבודה של עדכונים אוטומטיים של צינורות מקבילים.
  • אופציונלי: מגדירים את האפשרות parallel_replace_job_cancel_on_drain_timeout. האפשרות לביטול אוטומטי מופעלת כברירת מחדל (הערך שמוגדר הוא true) כשמגדירים את האפשרות parallel_replace_job_max_stop_duration, ולכן לא צריך להגדיר אותה באופן מפורש כדי להפעיל אותה.
    • כדי לשמור על התנהגות ברירת המחדל, משמיטים את האפשרות הזו או מגדירים אותה לערך true.
    • כדי להשבית את הביטול האוטומטי, מגדירים את האפשרות הזו לערך false. אם מגדירים את האפשרות הזו ל-false והמשימה הישנה נתקעת במצב המתנה לסיום, גם המשימה הישנה וגם המשימה החדשה ממשיכות לפעול במקביל.
  • אופציונלי: מגדירים parallel_replace_job_preallocate_compute_resources הגדרות:
    • ההגדרה מציינת אם עובדים מוקצים מראש לעבודה החדשה בזמן שהעבודה הישנה מתרוקנת. הערכים האפשריים: true (ברירת מחדל) או false. ב-Terraform וב-Config Connector, מומלץ להגדיר את האפשרות הזו לערך true כדי למנוע מצב של פסק זמן בהקצאת משאבים. כשהערך של parallel_replace_job_preallocate_compute_resources הוא false, המשימה החדשה נשארת במצב המתנה עד שהמשימה הישנה מסתיימת.

עיבוד מחדש של הודעות באמצעות Pub/Sub Snapshot ו-Seek

במקרים מסוימים, אחרי שמחליפים או מבטלים צינור נתונים שהתרוקן, יכול להיות שיהיה צורך לעבד מחדש הודעות Pub/Sub שנמסרו בעבר. לדוגמה, יכול להיות שתצטרכו להשתמש בלוגיקה עסקית מעודכנת כדי לעבד מחדש נתונים. Pub/Sub Seek היא תכונה שמאפשרת להפעיל מחדש הודעות מתמונת מצב של Pub/Sub. אפשר להשתמש ב-Pub/Sub Seek עם Dataflow כדי לעבד מחדש הודעות מהזמן שבו נוצר צילום המצב של המינוי.

במהלך הפיתוח והבדיקה, אפשר גם להשתמש ב-Pub/Sub Seek כדי להפעיל מחדש את ההודעות הידועות שוב ושוב, כדי לאמת את הפלט מצינור עיבוד הנתונים. כשמשתמשים ב-Pub/Sub Seek, לא מבצעים פעולת Seek לתמונת מצב של מינוי בזמן שצינור עיבוד נתונים צורך את המינוי. אם תעשו את זה, יכול להיות שהפעולה תבטל את הלוגיקה של סימן המים ב-Dataflow ותשפיע על העיבוד של הודעות Pub/Sub בדיוק פעם אחת.

הנה תהליך עבודה מומלץ באמצעות ה-CLI של gcloud לשימוש ב-Pub/Sub Seek עם צינורות Dataflow בחלון טרמינל:

  1. כדי ליצור snapshot של המינוי, משתמשים בפקודה gcloud pubsub snapshots create:

    gcloud pubsub snapshots create SNAPSHOT_NAME --subscription=PIPELINE_SUBSCRIPTION_NAME
    
  2. כדי לנקז את הנתונים מהצינור או לבטל אותו, משתמשים בפקודה gcloud dataflow jobs drain או בפקודה gcloud dataflow jobs cancel:

    gcloud dataflow jobs drain JOB_ID
    

    או

    gcloud dataflow jobs cancel JOB_ID
    
  3. כדי לעבור לתמונת המצב, משתמשים בפקודה gcloud pubsub subscriptions seek:

    gcloud pubsub subscriptions seek SNAPSHOT_NAME
    
  4. פריסת צינור חדש שמשתמש במינוי.

הפעלת צינורות במקביל

אם אתם רוצים למנוע שיבושים בצינור הנתונים של הסטרימינג במהלך עדכון, אתם יכולים להפעיל צינורות נתונים מקבילים. בגישה הזו אפשר להפעיל משימת סטרימינג חדשה עם קוד הצינור המעודכן, ולהריץ אותה במקביל למשימה הקיימת. אתם יכולים להשתמש בתהליך העבודה האוטומטי של Dataflow לפריסת עדכונים מקבילים של צינורות, או לבצע את השלבים באופן ידני.

סקירה כללית של צינורות עיבוד נתונים מקבילים

כשיוצרים את צינור העיבוד החדש, צריך להשתמש באותה אסטרטגיית חלונות שבה השתמשתם בצינור העיבוד הקיים. בתהליך העבודה הידני, מאפשרים לצינור הקיים להמשיך לפעול עד שסימן המים שלו חורג מחותמת הזמן של החלון המוקדם ביותר שמעובד על ידי צינור העדכון. לאחר מכן, מרוקנים את צינור הנתונים הקיים או מבטלים אותו. אם משתמשים בתהליך העבודה האוטומטי, הפעולות האלה מתבצעות בשבילכם. הצינור המעודכן ממשיך לפעול במקומו, ובעצם משתלט על העיבוד בעצמו.

התרשים הבא ממחיש את התהליך הזה.

צינור B חופף לצינור B בחלון זמן של 5 דקות.

בתרשים, Pipeline B היא המשימה המעודכנת שמחליפה את Pipeline A. הערך t הוא חותמת הזמן של החלון המוקדם ביותר שPipeline B עיבד. הערך w הוא סימן המים של Pipeline A. לצורך פשטות, נניח שיש סימן מים מושלם ללא נתונים שמגיעים באיחור. הציר האופקי מייצג את זמן העיבוד ואת הזמן שחלף. שני צינורות הנתונים משתמשים בחלונות קבועים (מתגלגלים) של חמש דקות. התוצאות מופעלות אחרי שהסימן עובר את סוף כל חלון.

מכיוון שהפלט המקביל מתרחש במהלך התקופה שבה יש חפיפה בין שני צינורות הנתונים, צריך להגדיר את שני צינורות הנתונים כך שיכתבו את התוצאות ליעדים שונים. מערכות במורד הזרם יכולות להשתמש בהפשטה של שני יעדי היעד, כמו תצוגת מסד נתונים, כדי לבצע שאילתות על התוצאות המשולבות. המערכות האלה יכולות גם להשתמש באבסטרקציה כדי לבטל כפילויות בתוצאות מהתקופה החופפת. מידע נוסף זמין במאמר בנושא טיפול בפלט כפול.

מגבלות

יש מגבלות על שימוש בעדכונים אוטומטיים או ידניים של צינורות מקבילים:

  • עדכונים אוטומטיים בלבד: המשימה המקבילה החדשה חייבת להיות משימה של Streaming Engine.
  • אסור להפעיל במקביל עבודות עם אותו שם. עם זאת, כשמבצעים עצירה והחלפה אוטומטיות או עדכון מקביל של צינורות באמצעות אותו שם של משימה, אפשר להשתמש שוב בשם המשימה. במקרה כזה, העבודה החדשה צריכה להתחיל לפחות שתי דקות אחרי תחילת העבודה הקודמת. ההגבלה הזו מונעת עדכונים מקבילים מרובים מניסיונות חוזרים של ספריות לקוח או מקריאות לפרוצדורות מרוחקות שהתוקף שלהן פג.
  • הפעלת שני צינורות במקביל על אותו קלט עלולה להוביל לכפילות בנתונים, לצבירות חלקיות ולבעיות פוטנציאליות בסדר כשמכניסים נתונים למאגר. המערכת במורד הזרם צריכה להיות מתוכננת כך שתצפה לתוצאות האלה ותנהל אותן.
  • כשקוראים ממאגר נתונים מסוג Pub/Sub, לא מומלץ להשתמש באותו מינוי לכמה צינורות, כי זה עלול לגרום לבעיות בדיוק הנתונים. עם זאת, במקרים מסוימים, כמו צינורות ETL (שליפה, טרנספורמציה, טעינה), שימוש באותו מינוי בשני צינורות עשוי לצמצם את הכפילות. בעיות בהתאמה אוטומטית לעומס עלולות לקרות בכל פעם שמציינים ערך שונה מאפס עבור משך החפיפה. אפשר לצמצם את הסיכון הזה באמצעות התכונה לעדכון משימות בזמן שהן פועלות. מידע נוסף זמין במאמר בנושא כוונון עדין של התאמה אוטומטית לעומס עבור פייפליינים של סטרימינג ב-Pub/Sub.
  • ב-Apache Kafka, אפשר למזער כפילויות על ידי הפעלת offset committing ב-Kafka. כדי להפעיל את האפשרות 'שמירת אופסט' ב-Kafka, אפשר לעיין במאמר שמירה חוזרת ב-Kafka.

עדכונים אוטומטיים של צינורות לעיבוד נתונים במקביל

‫Dataflow מספק תמיכה ב-API להפעלת עבודת החלפה מקבילה. ממשק ה-API הזה בסגנון דקלרטיבי מבצע הפשטה של העבודה הידנית שנדרשת להרצת שלבים פרוצדורליים. מצהירים על העבודה שרוצים לעדכן, ואז עבודה חדשה פועלת במקביל לעבודה הישנה. אחרי שהמשימה החדשה תפעל למשך הזמן שתציינו, המשימה הישנה תופסק. התכונה הזו מבטלת את ההשהיות בעיבוד במהלך עדכונים. היא גם מפחיתה את המאמץ התפעולי שנדרש לעדכון צינורות לא תואמים.

שיטת העדכון הזו מתאימה במיוחד לצינורות עיבוד נתונים שיכולים להכיל כפילויות או צבירות חלקיות, ושלא נדרש בהם סדר קפדני בזמן הוספת הנתונים. הוא מתאים במיוחד לצינורות ETL, וגם לצינורות שמשתמשים במצב סטרימינג של 'לפחות פעם אחת' ובטרנספורמציה Redistribute עם ההגדרה allow duplicates שמוגדרת ל-true.

אפשרויות לשירותי צינורות מקבילים אוטומטיים

אפשר להשתמש באפשרויות השירות הבאות לעדכונים אוטומטיים של צינורות מקבילים:

אפשרות שירות אופציונלי או חובה תיאור תלות או החרגות
update_strategy_parallel_job_update חובה (אפשרות 1: עדכון באמצעות אותו שם של משימה) פקודה לביצוע עדכון מקביל, שמריצה את שני צינורות הנתונים בו-זמנית כדי למזער את זמן ההשבתה, כשמעדכנים תחת אותו שם של משימה. צריך להגדיר אותו לצד הדגל --update והדגל parallel_replace_job_min_parallel_pipelines_duration.
update_strategy_in_place_update אופציונלי חלופה לעדכון מקביל. מבצע עדכון רגיל של המשרה במקום. חובה להגדיר את האפשרות הזו לצד הדגל --update.

המאפיין הזה בלעדי למאפיין update_strategy_parallel_job_update.

אם האפשרות הזו מוגדרת, המערכת מתעלמת מאפשרויות אחרות שקשורות לעבודות מקבילות.

parallel_replace_job_min_parallel_pipelines_duration חובה מציין את משך הזמן המינימלי שבו שני צינורות העיבוד פועלים בו-זמנית. אחרי פרק הזמן הזה, אות ניקוז נשלח למשימה הישנה. הערכים הקבילים הם בין 0s (מומלץ לחפיפה אפסית) לבין 744h (31 ימים). צריך לשלב אותה עם דרך לטרגט את המשרה הישנה. אחת מהאפשרויות הבאות:
  • אפשרות 1 – שימוש באותו שם משרה: update_strategy_parallel_job_update, או
  • אפשרות 2 – שימוש בשם משימה אחר: parallel_replace_job_name (או לחלופין parallel_replace_job_id כדי לזהות את המשימה)
parallel_replace_job_name או parallel_replace_job_id (צריך לבחור באחת מהאפשרויות) חובה (אפשרות 2: עדכון באמצעות שם משרה שונה) מזהה את המשרה הישנה לפי שם או מזהה, כדי להחליף אותה במהלך עדכון של שם אחר. צריך להגדיר את parallel_replace_job_min_parallel_pipelines_duration.

אל תשתמשו בדגל --update או באפשרות parallel_replace_job_id עם האפשרות הזו.

parallel_replace_job_max_stop_duration אופציונלי משך הזמן המקסימלי שבו המשימה הישנה יכולה להמשיך לפעול לפני שהביטול האוטומטי מופעל. לדוגמה, 30m או 1h. נדרשת הגדרה של תהליך עבודה מקביל לעדכון (אפשרות 1 או אפשרות 2).
parallel_replace_job_cancel_on_drain_timeout אופציונלי

אם מוגדר משך זמן מקסימלי לעצירה, ברירת המחדל היא true.

אפשרות בוליאנית שמציינת אם לבטל את העבודה הישנה אם משך הניקוז שלה חורג מ-parallel_replace_job_max_stop_duration. משמש לצד המאפיין parallel_replace_job_max_stop_duration.

מגדירים את הערך false כדי להשבית את הביטול האוטומטי. אם משביתים את הביטול האוטומטי והמשימה הישנה נתקעת במצב של ניקוז, גם המשימה הישנה וגם המשימה החדשה ממשיכות לפעול במקביל.

שליחת בקשה אוטומטית לעדכון של צינור מקביל לעיבוד נתונים

כדי להשתמש בתהליך העבודה האוטומטי, מפעילים משימת סטרימינג חדשה. אפשר לעדכן משימה באמצעות אותו שם משימה או שם משימה אחר.

Java

אפשרות 1: עדכון באמצעות אותו שם של משימה

--update \
--dataflowServiceOptions="update_strategy_parallel_job_update" \
--dataflowServiceOptions="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
  • כדי לבצע עדכון מקביל באמצעות אותו שם, משתמשים בדגל --update ובאפשרות update_strategy_parallel_job_update.
  • כדי לבצע עדכון במקום בלי להסיר אפשרויות שקשורות לעבודות מקבילות, משתמשים בפקודה update_strategy_in_place_update במקום בפקודה update_strategy_parallel_job_update.

אפשרות 2: עדכון באמצעות שם משרה אחר

--dataflowServiceOptions="parallel_replace_job_name=OLD_JOB_NAME" \
--dataflowServiceOptions="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
  • כדי לציין את המשרה הישנה לפי מזהה במקום לפי שם המשרה, משתמשים ב---dataflowServiceOptions="parallel_replace_job_id=OLD_JOB_ID".
  • אם מציינים שם חדש למשימה ומשתמשים בדגל --update, ‏ Dataflow מחפש משימה קיימת עם השם החדש, וזה גורם לשגיאה.

אופציונלי: הגדרת פסק זמן לסיום העברת הנתונים וביטול אוטומטי

אתם יכולים להוסיף את האפשרויות הבאות לכל אחת מההגדרות כדי להגדיר פסק זמן לניקוי ולבטל אוטומטית את העבודה הישנה אם היא נתקעת.

--dataflowServiceOptions="parallel_replace_job_max_stop_duration=DRAIN_TIMEOUT_DURATION" \
--dataflowServiceOptions="parallel_replace_job_cancel_on_drain_timeout=true"

אם משביתים את הביטול האוטומטי והעבודה הישנה נתקעת במצב של ניקוז, העבודה הישנה והעבודה החדשה ממשיכות לפעול במקביל.

Python

אפשרות 1: עדכון באמצעות אותו שם של משימה

--update \
--dataflow_service_options="update_strategy_parallel_job_update" \
--dataflow_service_options="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
  • כדי לבצע עדכון מקביל באמצעות אותו שם, משתמשים בדגל --update ובאפשרות update_strategy_parallel_job_update.
  • כדי לבצע עדכון במקום בלי להסיר אפשרויות שקשורות לעבודות מקבילות, משתמשים בפקודה update_strategy_in_place_update במקום בפקודה update_strategy_parallel_job_update.

אפשרות 2: עדכון באמצעות שם משרה אחר

--dataflow_service_options="parallel_replace_job_name=OLD_JOB_NAME" \
--dataflow_service_options="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
  • כדי לציין את המשרה הישנה לפי מזהה במקום לפי שם המשרה, משתמשים ב---dataflow_service_options="parallel_replace_job_id=OLD_JOB_ID".
  • אם מציינים שם חדש למשימה ומשתמשים בדגל --update, ‏ Dataflow מחפש משימה קיימת עם השם החדש, וזה גורם לשגיאה.

אופציונלי: הגדרת פסק זמן לסיום העברת הנתונים וביטול אוטומטי

אתם יכולים להוסיף את האפשרויות הבאות לכל אחת מההגדרות כדי להגדיר פסק זמן לניקוי ולבטל אוטומטית את העבודה הישנה אם היא נתקעת.

--dataflow_service_options="parallel_replace_job_max_stop_duration=DRAIN_TIMEOUT_DURATION" \
--dataflow_service_options="parallel_replace_job_cancel_on_drain_timeout=true"

אם משביתים את הביטול האוטומטי והעבודה הישנה נתקעת במצב של ניקוז, העבודה הישנה והעבודה החדשה ממשיכות לפעול במקביל.

המשך

אפשרות 1: עדכון באמצעות אותו שם של משימה

--update \
--dataflow_service_options="update_strategy_parallel_job_update" \
--dataflow_service_options="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
  • כדי לבצע עדכון מקביל באמצעות אותו שם, משתמשים בדגל --update ובאפשרות update_strategy_parallel_job_update.
  • כדי לבצע עדכון במקום בלי להסיר אפשרויות שקשורות לעבודות מקבילות, משתמשים בפקודה update_strategy_in_place_update במקום בפקודה update_strategy_parallel_job_update.

אפשרות 2: עדכון באמצעות שם משרה אחר

--dataflow_service_options="parallel_replace_job_name=OLD_JOB_NAME" \
--dataflow_service_options="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
  • כדי לציין את המשרה הישנה לפי מזהה במקום לפי שם המשרה, משתמשים ב---dataflow_service_options="parallel_replace_job_id=OLD_JOB_ID".
  • אם מציינים שם חדש למשימה ומשתמשים בדגל --update, ‏ Dataflow מחפש משימה קיימת עם השם החדש, וזה גורם לשגיאה.

אופציונלי: הגדרת פסק זמן לסיום העברת הנתונים וביטול אוטומטי

אתם יכולים להוסיף את האפשרויות הבאות לכל אחת מההגדרות כדי להגדיר פסק זמן לניקוי ולבטל אוטומטית את העבודה הישנה אם היא נתקעת.

--dataflow_service_options="parallel_replace_job_max_stop_duration=DRAIN_TIMEOUT_DURATION" \
--dataflow_service_options="parallel_replace_job_cancel_on_drain_timeout=true"

אם משביתים את הביטול האוטומטי והעבודה הישנה נתקעת במצב של ניקוז, העבודה הישנה והעבודה החדשה ממשיכות לפעול במקביל.

gcloud

אפשרות 1: עדכון באמצעות אותו שם של משימה

--update \
--additional-experiments="update_strategy_parallel_job_update" \
--additional-experiments="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
  • כדי לבצע עדכון מקביל באמצעות אותו שם, משתמשים בדגל --update ובאפשרות update_strategy_parallel_job_update.
  • כדי לבצע עדכון במקום בלי להסיר אפשרויות שקשורות לעבודות מקבילות, משתמשים בפקודה update_strategy_in_place_update במקום בפקודה update_strategy_parallel_job_update.

אפשרות 2: עדכון באמצעות שם משרה אחר

--additional-experiments="parallel_replace_job_name=OLD_JOB_NAME" \
--additional-experiments="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
  • כדי לציין את המשרה הישנה לפי מזהה במקום לפי שם המשרה, משתמשים ב---additional-experiments="parallel_replace_job_id=OLD_JOB_ID".
  • אם מציינים שם חדש למשימה ומשתמשים בדגל --update,‏ Dataflow מחפש משימה קיימת עם השם החדש, וזה גורם לשגיאה.

אופציונלי: הגדרת פסק זמן לסיום העברת הנתונים וביטול אוטומטי

אתם יכולים להוסיף את האפשרויות הבאות לכל אחת מההגדרות כדי להגדיר פסק זמן לניקוי ולבטל אוטומטית את העבודה הישנה אם היא נתקעת.

--additional-experiments="parallel_replace_job_max_stop_duration=DRAIN_TIMEOUT_DURATION" \
--additional-experiments="parallel_replace_job_cancel_on_drain_timeout=true"

אם משביתים את הביטול האוטומטי והעבודה הישנה נתקעת במצב של ניקוז, העבודה הישנה והעבודה החדשה ממשיכות לפעול במקביל.

אופציונלי: פעולת Upsert (יצירה או עדכון של משימה)

כדי להפעיל התנהגות של upsert (יצירה או עדכון של משימה):

--additional-experiments="create_or_update_job"

Terraform

additional_experiments = [
  "parallel_replace_job_min_parallel_pipelines_duration=DURATION",
  "parallel_replace_job_max_stop_duration=DRAIN_TIMEOUT_DURATION",
  "update_strategy_parallel_job_update",
  "create_or_update_job"
]

Config Connector

metadata:
  annotations:
    # Force to use only direct controller. Multiple controllers can cause a Dataflow job to enter into a continuous update loop.
    # https://docs.cloud.google.com/config-connector/docs/concepts/controller-types#underlying-controller-types
    alpha.cnrm.cloud.google.com/reconciler: direct
    # Optional but recommended: Dataflow batch jobs do not support the drain operation. But for Dataflow streaming jobs, prefer "drain" over "cancel" as an on-delete option.
    cnrm.cloud.google.com/on-delete: drain
    # Optional but recommended: Set deletion-policy to "abandon" to avoid accidental deletion, this will ignore the on-delete option.
    cnrm.cloud.google.com/deletion-policy: abandon
spec:
  ...
  additionalExperiments:
    - "parallel_replace_job_min_parallel_pipelines_duration=DURATION"
    - "parallel_replace_job_max_stop_duration=DRAIN_TIMEOUT_DURATION"
    - "update_strategy_parallel_job_update"
    - "create_or_update_job"

מחליפים את המשתנים הבאים:

  • אם אתם מעדכנים באמצעות שם עבודה שונה (אפשרות 2), אתם צריכים לספק את parallel_replace_job_name או את parallel_replace_job_id כדי לזהות את העבודה שרוצים להחליף. אי אפשר לעדכן באמצעות שם משימה שונה ב-Terraform או ב-Config Connector.
    • OLD_JOB_NAME: שם המשימה שרוצים להחליף.
    • OLD_JOB_ID: המזהה של המשרה שרוצים להחליף.
  • DURATION: משך הזמן המינימלי ששני הפייפליינים פועלים במקביל, כמספר שלם או כמספר עם נקודה צפה (floating-point). מומלץ להגדיר משך זמן של 0s כדי שלא יהיו חפיפות. אחרי פרק הזמן הזה, אות ניקוז נשלח למשימה הישנה.

    משך הזמן צריך להיות בין 0 שניות (0s) ל-31 ימים (744h). משתמשים ב-s,‏ m ו-h כדי לציין שניות, דקות ושעות. לדוגמה, 10m הוא 10 דקות.

  • DRAIN_TIMEOUT_DURATION: אופציונלי. משך הזמן המקסימלי שבו העבודה הישנה צריכה להתרוקן לפני שהביטול האוטומטי מופעל. משך הזמן צריך להיות בפורמט של מחרוזת שמסתיימת ב-s, ב-m או ב-h (לדוגמה, 30m,‏ 1h).

  • parallel_replace_job_cancel_on_drain_timeout: אופציונלי. האם לבטל את המשימה הקודמת אם היא לא מסיימת את הניקוז לפני משך הזמן המקסימלי להפסקה. אם מציינים משך זמן קצוב לתהליך ההתרוקנות, ברירת המחדל היא true. כדי להשבית את הביטול האוטומטי, מגדירים את האפשרות הזו לערך false. אם מגדירים את האפשרות הזו ל-false והמשימה הישנה נתקעת במצב המתנה לסיום, גם המשימה הישנה וגם המשימה החדשה ממשיכות לפעול במקביל.

כשמפעילים את המשימה החדשה, Dataflow מחכה עד שכל העובדים יוקצו לפני שהוא מתחיל לעבד את הנתונים. כדי לעקוב אחרי סטטוס הפריסה, בודקים את יומני המשימות של Dataflow.

הפעלה ידנית של צינורות מקבילים

בתרחישים מורכבים יותר, או כשצריך יותר שליטה בתהליך העדכון, אפשר להריץ צינורות מקבילים באופן ידני. מאפשרים לצינור הקיים להמשיך לפעול עד שסימן המים שלו חורג מחותמת הזמן של החלון השלם המוקדם ביותר שעובד על ידי צינור העדכון. לאחר מכן, מרוקנים או מבטלים את צינור הנתונים הקיים.

טיפול בפלט משוכפל

בדוגמה הבאה מתואר אחד מהפתרונות לטיפול בפלט כפול. שני צינורות העיבוד כותבים פלט ליעדים שונים, משתמשים במערכות במורד הזרם כדי לשלוח שאילתות לתוצאות, ומבטלים כפילויות בתוצאות מפרק הזמן החופף. בדוגמה הזו נעשה שימוש בצינור שקורא נתוני קלט מ-Pub/Sub, מבצע עיבוד מסוים וכותב את התוצאות ל-BigQuery.

  1. במצב ההתחלתי, צינור הנתונים הקיים בסטרימינג (Pipeline A) פועל וקורא הודעות מנושא ב-Pub/Sub (Topic) באמצעות מינוי (Subscription A). התוצאות נכתבות לטבלה ב-BigQuery (טבלה א'). התוצאות מוצגות דרך תצוגה ב-BigQuery, שמשמשת כחזית להסתרת שינויים בטבלה הבסיסית. התהליך הזה הוא יישום של שיטת עיצוב שנקראת תבנית חזית. התרשים הבא מציג את המצב ההתחלתי.

    פייפליין אחד עם מינוי אחד, וכתיבה לטבלה ב-BigQuery אחת.

  2. יוצרים מינוי חדש (מינוי ב') לצינור המעודכן. פורסים את צינור העדכון (Pipeline B), שקורא מנושא Pub/Sub ‏ (Topic) באמצעות Subscription B וכותב לטבלת BigQuery נפרדת (Table B). בתרשים הבא מוצג התהליך.

    שני צינורות, כל אחד עם מינוי אחד. כל צינור כותב לטבלת BigQuery נפרדת. תצוגת חזית קוראת משתי הטבלאות.

    בשלב הזה, Pipeline A ו-Pipeline B פועלים במקביל וכותבים את התוצאות לטבלאות נפרדות. אתם מתעדים את הזמן t כחותמת הזמן של החלון המוקדם ביותר שמעובד על ידי Pipeline B.

  3. אם סימן המים של Pipeline A חורג מהזמן t, צריך לנקז את Pipeline A. כשמרוקנים את הצינור, כל החלונות הפתוחים נסגרים והעיבוד של הנתונים בתהליך מסתיים. אם צינור הנתונים מכיל חלונות וחשובים לכם חלונות מלאים (בהנחה שאין נתונים מאוחרים), לפני שמרוקנים את צינור הנתונים א', מפעילים את שני צינורות הנתונים עד שמתקבלים חלונות חופפים מלאים. מפסיקים את משימת הסטרימינג של Pipeline A אחרי שכל הנתונים בתהליך יעובדו וייכתבו ל-Table A. התרשים הבא מציג את השלב הזה.

    צינור א' מתרוקן ולא קורא יותר את מינוי א', והוא לא שולח יותר נתונים לטבלה א' אחרי שהריקון מסתיים. כל העיבוד מתבצע על ידי צינור העיבוד השני.

  4. בשלב הזה, פועל רק Pipeline B. אפשר לשלוח שאילתה מתצוגת BigQuery (תצוגת חזית), שמשמשת כחזית לטבלה א' ולטבלה ב'. לגבי שורות עם חותמת זמן זהה בשתי הטבלאות, מגדירים את התצוגה כך שהיא תחזיר את השורות מטבלה ב', או, אם השורות לא קיימות בטבלה ב', תחזור לטבלה א'. בתרשים הבא מוצגת התצוגה (Façade View) שקוראת מטבלה א' ומטבלה ב'.

    הצינור א' נעלם ורק צינור ב' פועל.

    בשלב הזה, אפשר למחוק את מינוי א'.

אם מתגלות בעיות בפריסה של צינור חדש, צינורות מקבילים יכולים לפשט את החזרה לגרסה הקודמת. בדוגמה הזו, יכול להיות שתרצו להמשיך להפעיל את צינור עיבוד הנתונים א' בזמן שאתם עוקבים אחרי הפעולה התקינה של צינור עיבוד הנתונים ב'. אם מתעוררות בעיות בצינור B, אפשר לחזור לצינור A.

טיפול במוטציות של סכימה

מערכות לטיפול בנתונים צריכות לעיתים קרובות להתאים את עצמן לשינויים בסכימה לאורך זמן, לפעמים בגלל שינויים בדרישות העסקיות ולפעמים מסיבות טכניות. בדרך כלל צריך לתכנן ולבצע בקפידה עדכונים של סכימות כדי למנוע שיבושים במערכות המידע העסקי.

נניח שיש צינור שקורא הודעות שמכילות מטען ייעודי (payload) בפורמט JSON מנושא Pub/Sub. הצינור ממיר כל הודעה למופע של TableRow ואז כותב את השורות לטבלה ב-BigQuery. הסכימה של טבלת הפלט דומה להודעות שעוברות עיבוד בצינור. בתרשים הבא, הסכימה נקראת סכימה א'.

צינור שקורא מינוי וכותב לטבלת פלט של BigQuery באמצעות סכימה א'.

עם הזמן, סכמת ההודעה עשויה להשתנות בדרכים משמעותיות. לדוגמה, שדות נוספים, מוסרים או מוחלפים. ‫Schema A מתפתח לסכימה חדשה. בדיון שבהמשך, הסכימה החדשה נקראת סכימה ב'. במקרה הזה, צריך לעדכן את Pipeline A, וסכימת טבלת הפלט צריכה לתמוך ב-Schema B.

בטבלת הפלט, אפשר לבצע שינויים בסכימה בלי השבתה. לדוגמה, אפשר להוסיף שדות חדשים או לשנות את מצבי העמודות, כמו שינוי של REQUIRED ל-NULLABLE, בלי השבתה. המוטציות האלה בדרך כלל לא משפיעות על שאילתות קיימות. עם זאת, מוטציות בסכימה שמשנות או מסירות שדות קיימים בסכימה גורמות לשגיאות בשאילתות או לשיבושים אחרים. הגישה הבאה מאפשרת לבצע שינויים בלי להשבית את המערכת.

להפריד את הנתונים שנכתבים על ידי צינור הנתונים לטבלה ראשית ולטבלה אחת או יותר של נתונים זמניים. הטבלה הראשית מאחסנת נתונים היסטוריים שנכתבו על ידי צינור הנתונים. בטבלאות זמניות מאוחסן הפלט האחרון של צינור העיבוד. אפשר להגדיר תצוגת חזית של BigQuery על טבלאות העיקריות וטבלאות הביניים, וכך לאפשר למשתמשים להריץ שאילתות על נתונים היסטוריים ועל נתונים עדכניים.

בתרשים הבא מוצג עדכון של זרימת הצינור הקודמת, שכולל טבלת ביניים (Staging Table A), טבלה ראשית ותצוגת חזית.

צינור שקורא מינוי וכותב לטבלת ביניים ב-BigQuery. בטבלה השנייה (הראשית) יש פלט מגרסה קודמת של הסכימה. תצוגת חזית קוראת נתונים גם מטבלת הביניים וגם מהטבלה הראשית.

בתהליך המתוקן, Pipeline A מעבד הודעות שמשתמשות ב-Schema A וכותב את הפלט ל-Staging Table A, שיש לו סכימה תואמת. הטבלה הראשית מכילה נתונים היסטוריים שנכתבו על ידי גרסאות קודמות של צינור הנתונים, וגם תוצאות שמתבצע מיזוג שלהן באופן תקופתי מטבלת הביניים. הצרכנים יכולים לשאול שאלות לגבי נתונים עדכניים, כולל נתונים היסטוריים ונתונים בזמן אמת, באמצעות תצוגת הפסאדה.

אם סכימת ההודעות משתנה מסכימה א' לסכימה ב', יכול להיות שתצטרכו לעדכן את קוד צינור העיבוד כך שיתאים להודעות שמשתמשות בסכימה ב'. צריך לעדכן את צינור הנתונים הקיים בהטמעה החדשה. הפעלת צינורות מקבילים מאפשרת לוודא שעיבוד הנתונים בסטרימינג ימשיך ללא הפרעה. הפסקת צינורות עיבוד נתונים והחלפתם גורמת להפסקה בעיבוד, כי אף צינור עיבוד נתונים לא פועל למשך תקופה מסוימת.

הצינור המעודכן כותב לטבלת ביניים נוספת (Staging Table B) שמשתמשת ב-Schema B. אתם יכולים להשתמש בתהליך עבודה מתואם כדי ליצור את טבלת הביניים החדשה לפני עדכון צינור הנתונים. מעדכנים את תצוגת החזית כך שתכלול תוצאות מטבלת הביניים החדשה, ואפשר גם להשתמש בשלב קשור בתהליך העבודה.

בתרשים הבא מוצג התהליך המעודכן שבו מופיעה טבלת ביניים ב' עם סכימה ב', ומוסבר איך התצוגה החזיתית מתעדכנת כך שתכלול תוכן מהטבלה הראשית ומשתי טבלאות הביניים.

הצינור משתמש עכשיו בסכימה B וכותב לטבלת הביניים B. תצוגת חזית קוראת מהטבלה הראשית, מטבלת הביניים א' ומטבלת הביניים ב'.

בתהליך נפרד מעדכון צינור הנתונים, אפשר למזג את טבלאות הביניים לטבלה הראשית, באופן תקופתי או לפי הצורך. בתרשים הבא מוצג מיזוג של טבלת ביניים א' לתוך הטבלה הראשית.

טבלת הביניים A מוזגה לטבלה הראשית. התצוגה של החזית קוראת מטבלת הביניים B ומטבלת העיקרון.

המאמרים הבאים