בדף הזה מוסבר על סנכרון שינויים בזרמי נתונים ב-Spanner למסדי נתונים בפורמט GoogleSQL ולמסדי נתונים בפורמט PostgreSQL, כולל:
- מודל החלוקה למחיצות (partitioning) שמבוסס על פיצול
- הפורמט והתוכן של רשומות בפיד שינויים
- התחביר ברמה הנמוכה שמשמש להרצת שאילתות על הרשומות האלה
- דוגמה לתהליך העבודה של השאילתה
משתמשים ב-Spanner API כדי לשאול שאילתות ישירות בסנכרון שינויים בזרמי נתונים. באפליקציות שמשתמשות ב-Dataflow כדי לקרוא נתונים של סנכרון שינויים בזרמי נתונים, אין צורך לעבוד ישירות עם מודל הנתונים שמתואר כאן.
למדריך כללי יותר בנושא סנכרון שינויים בזרמי נתונים, אפשר לעיין במאמר סקירה כללית על סנכרון שינויים בזרמי נתונים.
שינוי המחיצות של שינוי השידור
כשמתרחש שינוי בטבלה שנמצאת במעקב של זרם שינויים, Spanner כותב רשומה תואמת של זרם שינויים במסד הנתונים, באופן סינכרוני באותה טרנזקציה שבה מתבצע שינוי הנתונים. המשמעות היא שאם העסקה תצליח, Spanner גם יתעד וישמור את השינוי בהצלחה. באופן פנימי, Spanner ממקם את הרשומה של זרם השינויים ואת שינוי הנתונים באותו מקום, כדי שהם יעובדו על ידי אותו שרת וכך יצמצם את התקורה של פעולות הכתיבה.
כחלק מ-DML לפי פיצול מסוים, Spanner מוסיף את הפעולה לנתוני הפיצול המתאימים של זרם השינויים באותה טרנזקציה. בגלל המיקום המשותף הזה, שינויים בנתוני הזרם לא מוסיפים תיאום נוסף בין משאבי ההצגה, וכך מצמצמים את התקורה של אישור העסקה.
מערכת Spanner מתרחבת על ידי פיצול ומיזוג נתונים באופן דינמי על סמך העומס והגודל של מסד הנתונים, והפצת הפיצולים בין משאבי ההגשה.
כדי לאפשר קריאה וכתיבה של נתונים בסנכרון שינויים בזרמי נתונים בהיקף גדול, מערכת Spanner מפצלת וממזגת את האחסון הפנימי של סנכרון שינויים בזרמי נתונים יחד עם נתוני מסד הנתונים, וכך נמנעת אוטומטית היווצרות של נקודות חמות. כדי לתמוך בקריאת רשומות של שינויים בזרם כמעט בזמן אמת, ככל שהכתיבה למסד הנתונים מתרחבת, Spanner API מיועד לשאילתות מקבילות של שינויים בזרם באמצעות מחיצות של שינויים בזרם. מחיצות של שינוי הזרם ממופות לנתוני פיצול של שינוי הזרם שמכילים את הרשומות של שינוי הזרם. המחיצות של שינוי הנתונים משתנות באופן דינמי לאורך זמן, ויש קורלציה בין השינויים האלה לבין האופן שבו Spanner מפצל וממזג את נתוני מסד הנתונים באופן דינמי.
מחיצה של זרם שינויים מכילה רשומות של טווח מפתחות שלא ניתן לשינוי עבור טווח זמן ספציפי. כל מחיצה של שינוי בנתונים יכולה להתפצל למחיצה אחת או יותר של שינוי בנתונים, או להתמזג עם מחיצות אחרות של שינוי בנתונים. כשאירועי הפיצול או המיזוג האלה קורים, נוצרות מחיצות צאצא כדי לתעד את השינויים בטווחים המתאימים של מפתחות קבועים לטווח הזמן הבא. בנוסף לרשומות של שינויים בנתונים, שאילתה של זרם שינויים מחזירה רשומות של מחיצות צאצא כדי להודיע לקוראים על מחיצות חדשות של זרם שינויים שצריך לשלוח להן שאילתה, וגם רשומות של פעימות לב כדי לציין התקדמות כשלא בוצעו כתיבות לאחרונה.
כשמבצעים שאילתה על מחיצה מסוימת של זרם שינויים, רשומות השינויים מוחזרות לפי סדר חותמות הזמן של ביצוע השינויים. כל רשומה של שינוי מוחזרת בדיוק פעם אחת. הסדר של רשומות השינוי לא מובטח במחיצות של זרם השינויים. רשומות של שינויים במפתח ראשי מסוים מוחזרות רק במחיצה אחת לטווח זמן מסוים.
בגלל שרשור המחיצות מסוג הורה-צאצא, כדי לעבד שינויים במפתח מסוים לפי סדר חותמת הזמן של השליחה, צריך לעבד רשומות שמוחזרות ממחיצות צאצא רק אחרי שמעבדים רשומות מכל מחיצות ההורה.
שינוי פונקציות הקריאה של השידור החי ותחביר השאילתות
GoogleSQL
כדי לשלוח שאילתות לסנכרון שינויים בזרמי נתונים, משתמשים ב-API ExecuteStreamingSql. Spanner יוצר באופן אוטומטי פונקציית קריאה מיוחדת יחד עם זרם השינויים. פונקציית הקריאה מספקת גישה לרשומות של סנכרון שינויים בזרמי נתונים. המוסכמה למתן שמות לפונקציות קריאה היא READ_change_stream_name.
אם יש בסיס נתונים עם מקור נתונים לשינויים SingersNameStream, תחביר השאילתה ב-GoogleSQL הוא:
SELECT ChangeRecord
FROM READ_SingersNameStream (
start_timestamp,
end_timestamp,
partition_token,
heartbeat_milliseconds,
read_options
)
פונקציית הקריאה מקבלת את הארגומנטים הבאים:
| שם הארגומנט | סוג | חובה? | תיאור |
|---|---|---|---|
start_timestamp |
TIMESTAMP |
חובה | הפונקציה מציינת שצריך להחזיר רשומות שבהן הערך של commit_timestamp גדול מ-start_timestamp או שווה לו. הערך צריך להיות בטווח של תקופת השמירה של נתוני הזרם לשינויים, וקטן או שווה לזמן הנוכחי, וגדול או שווה לחותמת הזמן של יצירת הזרם לשינויים. |
end_timestamp |
TIMESTAMP |
אופציונלי (ברירת מחדל: NULL) |
הערך הזה מציין שצריך להחזיר רשומות עם commit_timestamp שהוא קטן מ-end_timestamp או שווה לו. הערך חייב להיות בטווח של תקופת השמירה של נתוני שינוי, וגדול מ-start_timestamp או שווה לו. השאילתה מסתיימת אחרי שכל ChangeRecords מוחזרים עד end_timestamp, או כשמפסיקים את החיבור. אם הערך של end_timestamp הוא NULL
או שהוא לא מוגדר, הביצוע של השאילתה נמשך עד שכל הערכים של ChangeRecords מוחזרים או עד שמסיימים את החיבור. |
partition_token |
STRING |
אופציונלי (ברירת מחדל: NULL) |
מציין איזו מחיצה של זרם שינויים צריך לשלוח אליה שאילתה, על סמך התוכן של רשומות של מחיצות צאצא. אם הערך הוא NULL או שלא צוין ערך, המשמעות היא שקורא השינויים שולח שאילתה לזרם השינויים בפעם הראשונה, ולא קיבל אסימוני מחיצה ספציפיים לשליחת שאילתה מהם. |
heartbeat_milliseconds |
INT64 |
חובה | המאפיין הזה קובע את התדירות שבה מוחזר פעימת לב ChangeRecord אם לא בוצעו טרנזקציות במחיצה הזו.
הערך צריך להיות בין 1,000 (שנייה אחת) לבין 300,000 (חמש דקות). |
read_options |
ARRAY |
אופציונלי (ברירת מחדל: NULL) |
נוספות אפשרויות קריאה ששמורות לשימוש עתידי. הערך המותר היחיד הוא NULL. |
מומלץ ליצור שיטת עזר לבניית הטקסט של שאילתת הפונקציה read ולהגדיר לה פרמטרים, כמו בדוגמה הבאה.
Java
private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE = "SELECT ChangeRecord FROM READ_SingersNameStream" + "(" + " start_timestamp => @startTimestamp," + " end_timestamp => @endTimestamp," + " partition_token => @partitionToken," + " heartbeat_milliseconds => @heartbeatMillis" + ")"; // Helper method to conveniently create change stream query texts and // bind parameters. public static Statement getChangeStreamQuery( String partitionToken, Timestamp startTimestamp, Timestamp endTimestamp, long heartbeatMillis) { return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE) .bind("startTimestamp") .to(startTimestamp) .bind("endTimestamp") .to(endTimestamp) .bind("partitionToken") .to(partitionToken) .bind("heartbeatMillis") .to(heartbeatMillis) .build(); }
PostgreSQL
כדי לשלוח שאילתות לסנכרון שינויים בזרמי נתונים, משתמשים ב-API ExecuteStreamingSql. Spanner יוצר באופן אוטומטי פונקציית קריאה מיוחדת יחד עם זרם השינויים. פונקציית הקריאה מספקת גישה לרשומות של סנכרון שינויים בזרמי נתונים. המוסכמה למתן שמות לפונקציות קריאה היא spanner.read_json_change_stream_name.
בהנחה שקיים שינוי בזרם SingersNameStream במסד הנתונים, תחביר השאילתה עבור PostgreSQL הוא:
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
start_timestamp,
end_timestamp,
partition_token,
heartbeat_milliseconds,
null
)
פונקציית הקריאה מקבלת את הארגומנטים הבאים:
| שם הארגומנט | סוג | חובה? | תיאור |
|---|---|---|---|
start_timestamp |
timestamp with time zone |
חובה | מציין שצריך להחזיר רשומות של שינויים עם commit_timestamp
גדול או שווה ל-start_timestamp. הערך צריך להיות בטווח של תקופת השמירה של נתוני השינויים, קטן או שווה לזמן הנוכחי וגדול או שווה לחותמת הזמן של יצירת נתוני השינויים. |
end_timestamp |
timestamp with timezone |
אופציונלי (ברירת מחדל: NULL) |
מציין שצריך להחזיר רשומות של שינויים עם commit_timestamp
ערך קטן מ-end_timestamp או שווה לו. הערך חייב להיות בטווח של תקופת השמירה של נתוני הזרם של השינויים, וגדול מ-start_timestamp או שווה לו.
השאילתה מסתיימת אחרי שכל רשומות השינויים מוחזרות עד end_timestamp, או עד שאתם מפסיקים את החיבור.
אם NULL, השאילתה ממשיכה לפעול עד שכל רשומות השינוי
מוחזרות או עד שסוגרים את החיבור. |
partition_token |
text |
אופציונלי (ברירת מחדל: NULL) |
מציין איזו מחיצה של זרם שינויים צריך לשלוח אליה שאילתה, על סמך התוכן של רשומות של מחיצות צאצא. אם הערך הוא NULL או שלא צוין ערך, המשמעות היא שקורא השינויים שולח שאילתה לזרם השינויים בפעם הראשונה, ולא קיבל אסימוני מחיצה ספציפיים לשליחת שאילתה מהם. |
heartbeat_milliseconds |
bigint |
חובה | ההגדרה הזו קובעת באיזו תדירות מוחזר אות פעימת לב ChangeRecord כשלא מתבצעות פעולות במחיצה הזו.
הערך צריך להיות בין 1,000 (שנייה אחת) לבין 300,000 (חמש דקות). |
null |
null |
חובה | שמור לשימוש בעתיד |
מומלץ ליצור שיטת עזר לבניית הטקסט של פונקציית הקריאה ולהגדרת פרמטרים עבורה, כמו בדוגמה הבאה.
Java
private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE = "SELECT * FROM \"spanner\".\"read_json_SingersNameStream\"" + "($1, $2, $3, $4, null)"; // Helper method to conveniently create change stream query texts and // bind parameters. public static Statement getChangeStreamQuery( String partitionToken, Timestamp startTimestamp, Timestamp endTimestamp, long heartbeatMillis) { return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE) .bind("p1") .to(startTimestamp) .bind("p2") .to(endTimestamp) .bind("p3") .to(partitionToken) .bind("p4") .to(heartbeatMillis) .build(); }
שינוי פורמט הרשומה של סנכרון שינויים בזרמי נתונים
GoogleSQL
פונקציית הקריאה של סנכרון שינויים בזרמי נתונים מחזירה ChangeRecord עמודה אחת מסוג ARRAY<STRUCT<...>>. בכל שורה, המערך הזה תמיד מכיל רכיב אחד.
הסוג של רכיבי המערך הוא:
STRUCT < data_change_record ARRAY<STRUCT<...>>, heartbeat_record ARRAY<STRUCT<...>>, child_partitions_record ARRAY<STRUCT<...>> >
יש שלושה שדות ב-STRUCT הזה: data_change_record, heartbeat_record ו-child_partitions_record, כל אחד מהסוג ARRAY<STRUCT<...>>. בכל שורה שמוחזרת על ידי פונקציית הקריאה של סנכרון שינויים בזרמי נתונים, רק אחד משלושת השדות האלה מכיל ערך. שני השדות האחרים ריקים או מכילים את הערך NULL. השדות האלה של מערכים מכילים לכל היותר רכיב אחד.
בקטעים הבאים נבחן כל אחד משלושת סוגי הרשומות האלה.
PostgreSQL
פונקציית הקריאה של סנכרון שינויים בזרמי נתונים מחזירה עמודה אחת ChangeRecord מסוג JSON עם המבנה הבא:
{
"data_change_record" : {},
"heartbeat_record" : {},
"child_partitions_record" : {}
}
יש שלושה מפתחות אפשריים באובייקט הזה: data_change_record, heartbeat_record ו-child_partitions_record. סוג הערך התואם הוא JSON. בכל שורה שפונקציית הקריאה של סנכרון שינויים בזרמי נתונים מחזירה, קיים רק אחד משלושת המפתחות האלה.
בקטעים הבאים נבחן כל אחד משלושת סוגי הרשומות האלה.
רשומות של שינויים בנתונים
רשומה של שינוי נתונים מכילה קבוצה של שינויים בטבלה עם אותו סוג שינוי (הוספה, עדכון או מחיקה) שבוצעו באותו חותמת זמן של ביצוע באותו חלק של זרם השינויים לאותה טרנזקציה. יכול להיות שיוחזרו כמה רשומות של שינויים בנתונים לאותה טרנזקציה בכמה מחיצות של זרם השינויים.
לכל רשומת שינוי נתונים יש שדות commit_timestamp, server_transaction_id ו-record_sequence, שביחד קובעים את הסדר בשינוי של רשומת מקור נתונים. שלושת השדות האלה מספיקים כדי להסיק את סדר השינויים ולספק עקביות חיצונית.
שימו לב: לכמה עסקאות יכולה להיות אותה חותמת זמן של אישור אם הן משפיעות על נתונים לא חופפים. השדה server_transaction_id מאפשר להבחין בין קבוצות של שינויים (יכול להיות שבין מחיצות שונות של זרם השינויים) שבוצעו באותה טרנזקציה. אם משלבים אותו עם השדות record_sequence ו-number_of_records_in_transaction, אפשר גם לשמור את כל הרשומות מטרנזקציה מסוימת בספרייה ולסדר אותן.
השדות של רשומה של שינוי בנתונים כוללים את הפרטים הבאים:
GoogleSQL
| שדה | סוג | תיאור |
|---|---|---|
commit_timestamp |
TIMESTAMP |
חותמת הזמן שבה השינוי בוצע. |
record_sequence |
STRING |
מציין את המספר הסידורי של הרשומה בעסקה.
מספרי הרצף הם ייחודיים ועולים באופן מונוטוני (אבל לא בהכרח רציפים) בתוך עסקה. ממיינים את הרשומות של אותו server_transaction_id לפי record_sequence כדי לשחזר את סדר השינויים בתוך העסקה.
יכול להיות ש-Spanner יבצע אופטימיזציה של הסדר הזה כדי לשפר את הביצועים, ויכול להיות שהסדר לא תמיד יהיה זהה לסדר המקורי שסיפקתם. |
server_transaction_id |
STRING |
מחרוזת ייחודית גלובלית שמייצגת את העסקה שבה השינוי בוצע. הערך הזה צריך לשמש רק בהקשר של עיבוד רשומות של שינויים בנתונים, ואין לו קשר למזהה העסקה ב-API של Spanner. |
is_last_record_in_transaction_in_partition |
BOOL |
מציין אם זו הרשומה האחרונה של עסקה במחיצה הנוכחית. |
table_name |
STRING |
שם הטבלה שהשינוי משפיע עליה. |
value_capture_type |
STRING |
מתאר את סוג לכידת הערך שצוין בהגדרת שינוי הנתונים בזמן שהשינוי הזה נלכד. סוג לכידת הערך יכול להיות אחד מהערכים הבאים:
ערך ברירת המחדל הוא |
column_types |
[
{
"name": "STRING",
"type": {
"code": "STRING"
},
"is_primary_key": BOOLEAN
"ordinal_position": NUMBER
},
...
] |
מציין את שם העמודה, את סוג העמודה, אם היא מוגדרת כמפתח ראשי ואת המיקום של העמודה כפי שמוגדר בסכימה (ordinal_position). העמודה הראשונה בטבלה בסכימה תהיה בעלת מיקום סידורי של 1. הסוג של העמודה יכול להיות מקונן בעמודות של מערכים. הפורמט תואם למבנה הסוג שמתואר בהפניית Spanner API.
|
mods |
[
{
"keys": {"STRING" : "STRING"},
"new_values": {
"STRING" : "VALUE-TYPE",
[...]
},
"old_values": {
"STRING" : "VALUE-TYPE",
[...]
},
},
[...]
]
|
תיאור השינויים שבוצעו, כולל ערכי המפתח הראשי, הערכים הקודמים והערכים החדשים של העמודות ששונו או שנערך אחריהן מעקב.
הזמינות והתוכן של הערכים הישנים והחדשים תלויים בvalue_capture_type שהוגדר. השדות new_values ו-old_values מכילים רק את העמודות שאינן עמודות מפתח. |
mod_type |
STRING |
תיאור של סוג השינוי. אחד מהערכים INSERT, UPDATE או DELETE. |
number_of_records_in_transaction |
INT64 |
מספר הרשומות של שינוי הנתונים שמהוות חלק מהעסקה הזו בכל המחיצות של זרם השינויים. |
number_of_partitions_in_transaction |
INT64 |
מציין את מספר המחיצות שמחזירות רשומות של שינויים בנתונים עבור העסקה הזו. |
transaction_tag |
STRING |
ציון תג העסקה שמשויך לעסקה הזו. |
is_system_transaction |
BOOL |
מציין אם העסקה היא עסקה של המערכת. |
PostgreSQL
| שדה | סוג | תיאור |
|---|---|---|
commit_timestamp |
STRING |
חותמת הזמן שבה השינוי בוצע. |
record_sequence |
STRING |
מציין את המספר הסידורי של הרשומה בעסקה.
מספרי הרצף הם ייחודיים ועולים באופן מונוטוני (אבל לא בהכרח רציפים) בתוך עסקה. ממיינים את הרשומות של אותו server_transaction_id לפי record_sequence כדי לשחזר את סדר השינויים בתוך העסקה. |
server_transaction_id |
STRING |
מחרוזת ייחודית גלובלית שמייצגת את העסקה שבה השינוי בוצע. הערך הזה צריך לשמש רק בהקשר של עיבוד רשומות של שינויים בנתונים, ואין לו קשר למזהה העסקה ב-API של Spanner |
is_last_record_in_transaction_in_partition |
BOOLEAN |
מציין אם זו הרשומה האחרונה של עסקה במחיצה הנוכחית. |
table_name |
STRING |
השם של הטבלה שהשינוי משפיע עליה. |
value_capture_type |
STRING |
מתאר את סוג לכידת הערך שצוין בהגדרת שינוי הנתונים בזמן שהשינוי הזה נלכד. סוג לכידת הערך יכול להיות אחד מהערכים הבאים:
ערך ברירת המחדל הוא |
column_types |
[
{
"name": "STRING",
"type": {
"code": "STRING"
},
"is_primary_key": BOOLEAN
"ordinal_position": NUMBER
},
...
] |
מציין את שם העמודה, את סוג העמודה, אם היא מוגדרת כמפתח ראשי ואת המיקום של העמודה כפי שהוגדר בסכימה (ordinal_position). העמודה הראשונה בטבלה בסכימה תהיה במיקום הסידורי 1. הסוג של העמודה יכול להיות מקונן בעמודות של מערכים. הפורמט תואם למבנה הסוג שמתואר בהפניית Spanner API.
|
mods |
[
{
"keys": {"STRING" : "STRING"},
"new_values": {
"STRING" : "VALUE-TYPE",
[...]
},
"old_values": {
"STRING" : "VALUE-TYPE",
[...]
},
},
[...]
]
|
תיאור השינויים שבוצעו, כולל ערכי המפתח הראשי, הערכים הקודמים והערכים החדשים של העמודות ששונו או שמתבצע אחריהן מעקב. הזמינות והתוכן של הערכים הישנים והחדשים תלויים בvalue_capture_type שהוגדר. השדות new_values ו-old_values מכילים רק את העמודות שאינן עמודות מפתח.
|
mod_type |
STRING |
תיאור של סוג השינוי. אחד מהערכים INSERT, UPDATE או DELETE. |
number_of_records_in_transaction |
INT64 |
מספר הרשומות של שינוי הנתונים שמהוות חלק מהעסקה הזו בכל המחיצות של זרם השינויים. |
number_of_partitions_in_transaction |
NUMBER |
מציין את מספר המחיצות שמחזירות רשומות של שינויים בנתונים עבור העסקה הזו. |
transaction_tag |
STRING |
ציון תג העסקה שמשויך לעסקה הזו. |
is_system_transaction |
BOOLEAN |
מציין אם העסקה היא עסקה של המערכת. |
דוגמה לרשומה של שינוי בנתונים
בהמשך מופיעים שני רשומות לדוגמה של שינויים בנתונים. הם מתארים טרנזקציה יחידה שבה מתבצעת העברה בין שני חשבונות. שני החשבונות נמצאים במחיצות נפרדות של עדכונים בזמן אמת.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {
"LastUpdate": "2022-09-26T11:28:00.189413Z",
"Balance": 1500
},
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "OLD_AND_NEW_VALUES",
"number_of_records_in_transaction": 2,
"number_of_partitions_in_transaction": 2,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false,
}
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
"record_sequence": "00000001",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id2"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 2000
},
"old_values": {
"LastUpdate": "2022-01-20T11:25:00.199915Z",
"Balance": 1500
},
},
...
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "OLD_AND_NEW_VALUES",
"number_of_records_in_transaction": 2,
"number_of_partitions_in_transaction": 2,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false,
}
רשומת שינוי הנתונים הבאה היא דוגמה לרשומה עם הערך capture type NEW_VALUES. שימו לב: רק ערכים חדשים יאוכלסו.
רק העמודה LastUpdate שונתה, ולכן רק העמודה הזו הוחזרה.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z"
},
"old_values": {}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_VALUES",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
רשומת שינוי הנתונים הבאה היא דוגמה לרשומה עם הערך capture type NEW_ROW. רק העמודה LastUpdate
עברה שינוי, אבל כל העמודות שבמעקב מוחזרות.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_ROW",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
רשומת שינוי הנתונים הבאה היא דוגמה לרשומה עם הערך capture type NEW_ROW_AND_OLD_VALUES. רק העמודה LastUpdate שונתה, אבל כל העמודות שבמעקב מוחזרות. סוג הלכידה הזה לוכד את הערך החדש והערך הישן של LastUpdate.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {
"LastUpdate": "2022-09-26T11:28:00.189413Z"
}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_ROW_AND_OLD_VALUES",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
רשומות של פעימות לב
כשמוחזר רשומה של אות חיים, זה מצביע על כך שכל השינויים עם commit_timestamp שקטן או שווה ל-timestamp של רשומת אות החיים הוחזרו, ולרשומות נתונים עתידיות במחיצה הזו צריכים להיות חותמות זמן של ביצוע מחויבות גבוהות יותר מאלה שמוחזרות על ידי רשומת אות החיים. רשומות של פעימות לב מוחזרות כשאין שינויים בנתונים שנכתבים למחיצה. כשמתבצעים שינויים בנתונים שנכתבים למחיצה, אפשר להשתמש ב-data_change_record.commit_timestamp במקום ב-heartbeat_record.timestamp כדי לציין שהקורא מתקדם בקריאת המחיצה.
אפשר להשתמש ברשומות של אותות פעימת לב שמוחזרות במחיצות כדי לסנכרן קוראים בכל המחיצות. אחרי שכל הקוראים מקבלים דופק (heartbeat) שגדול או שווה לחותמת זמן מסוימת A, או מקבלים נתונים או רשומות של מחיצות צאצא שגדולות או שוות לחותמת זמן A, הקוראים יודעים שהם קיבלו את כל הרשומות שאושרו בחותמת הזמן הזו או לפני כן, והם יכולים להתחיל לעבד את הרשומות שנשמרו במאגר הזמני – למשל, למיין את הרשומות של המחיצות השונות לפי חותמת זמן ולקבץ אותן לפי server_transaction_id.A
רשומת פעימת לב מכילה רק שדה אחד:
GoogleSQL
| שדה | סוג | תיאור |
|---|---|---|
timestamp |
TIMESTAMP |
חותמת הזמן של רשומת הדופק. |
PostgreSQL
| שדה | סוג | תיאור |
|---|---|---|
timestamp |
STRING |
חותמת הזמן של רשומת הדופק. |
דוגמה לרשומת דופק
דוגמה לרשומת דופק, שמעבירה את המידע שכל הרשומות עם חותמות זמן שקטנות או שוות לחותמת הזמן של הרשומה הזו הוחזרו:
heartbeat_record: {
"timestamp": "2022-09-27T12:35:00.312486Z"
}
רשומות של מחיצות צאצא
רשומות של חלוקת משנה מחזירות מידע על חלוקות משנה: טוקנים של חלוקות משנה, טוקנים של חלוקות האב ו-start_timestamp שמייצג את חותמת הזמן המוקדמת ביותר שחלוקות המשנה מכילות רשומות של שינויים לגביהן. רשומות שחותמות הזמן של השמירה שלהן הן מיד לפני child_partitions_record.start_timestamp מוחזרות במחיצה הנוכחית. אחרי שכל הרשומות של מחיצת הצאצא מוחזרות עבור המחיצה הזו, השאילתה הזו מוחזרת עם סטטוס הצלחה, שמציין שכל הרשומות הוחזרו עבור המחיצה הזו.
השדות של רשומה של מחיצת צאצא כוללים את הפרטים הבאים:
GoogleSQL
| שדה | סוג | תיאור |
|---|---|---|
start_timestamp |
TIMESTAMP |
מציין שרשומות השינויים בנתונים שמוחזרות ממחיצות צאצא ברשומה הזו של מחיצת צאצא כוללות חותמת זמן של ביצוע (commit) שגדולה או שווה ל-start_timestamp. כשמבצעים שאילתה על מחיצת צאצא, צריך לציין בשאילתה את טוקן מחיצת הצאצא ואת התנאי start_timestamp גדול או שווה ל-child_partitions_token.start_timestamp. כל הרשומות של מחיצות צאצא שמוחזרות על ידי מחיצה מסוימת כוללות את אותו start_timestamp, וחותמת הזמן תמיד נמצאת בין start_timestamp לבין end_timestamp שצוינו בשאילתה. |
record_sequence |
STRING |
מספר סידורי שעולה באופן מונוטוני, שאפשר להשתמש בו כדי להגדיר את הסדר של רשומות מחיצות צאצא כשמוחזרות כמה רשומות מחיצות צאצא עם אותו start_timestamp במחיצה מסוימת. אסימון המחיצה, start_timestamp
ו-record_sequence מזהים באופן ייחודי רשומה של מחיצת צאצא.
|
child_partitions |
[
{
"token" : "STRING",
"parent_partition_tokens" : ["STRING"]
}
] |
הפונקציה מחזירה קבוצה של מחיצות צאצא ומידע שמשויך אליהן. ההרשאה הזו כוללת את מחרוזת אסימון החלוקה למחיצות שמשמשת לזיהוי מחיצת הצאצא בשאילתות, וגם את האסימונים של מחיצות האב שלה. |
PostgreSQL
| שדה | סוג | תיאור |
|---|---|---|
start_timestamp |
STRING |
מציין שרשומות השינויים בנתונים שמוחזרות ממחיצות צאצא ברשומה הזו של מחיצת צאצא כוללות חותמת זמן של ביצוע (commit) שגדולה או שווה ל-start_timestamp. כשמבצעים שאילתה על מחיצת צאצא, צריך לציין בשאילתה את טוקן מחיצת הצאצא ואת התנאי start_timestamp גדול או שווה ל-child_partitions_token.start_timestamp. כל הרשומות של מחיצות צאצא שמוחזרות על ידי מחיצה מסוימת כוללות את אותו start_timestamp, וחותמת הזמן תמיד נמצאת בין start_timestamp לבין end_timestamp שצוינו בשאילתה.
|
record_sequence |
STRING |
מספר סידורי שעולה באופן מונוטוני, שאפשר להשתמש בו כדי להגדיר את הסדר של רשומות מחיצות צאצא כשמוחזרות כמה רשומות מחיצות צאצא עם אותו start_timestamp במחיצה מסוימת. אסימון המחיצה, start_timestamp
ו-record_sequence מזהים באופן ייחודי רשומה של מחיצת צאצא.
|
child_partitions |
[
{
"token": "STRING",
"parent_partition_tokens": ["STRING"],
}, [...]
] |
הפונקציה מחזירה מערך של מחיצות צאצא ומידע שמשויך אליהן. המידע הזה כולל את מחרוזת האסימון של המחיצה שמשמשת לזיהוי מחיצת הצאצא בשאילתות, וגם את האסימונים של מחיצות האב שלה. |
דוגמה לרשומת מחיצה של ילד
זו דוגמה לרשומה של מחיצת צאצא:
child_partitions_record: {
"start_timestamp": "2022-09-27T12:40:00.562986Z",
"record_sequence": "00000001",
"child_partitions": [
{
"token": "child_token_1",
// To make sure changes for a key is processed in timestamp
// order, wait until the records returned from all parents
// have been processed.
"parent_partition_tokens": ["parent_token_1", "parent_token_2"]
}
],
}
תהליך העבודה של שאילתות בשינוי השידור החי
מריצים שאילתות של שינוי נתונים באמצעות ExecuteStreamingSql API, עם טרנזקציה לקריאה בלבד לשימוש חד-פעמי וחסימת חותמת זמן חזקה. הפונקציה לקריאת נתוני שינויים מאפשרת לכם לציין את start_timestamp ואת end_timestamp לטווח הזמן הרלוונטי. אפשר לגשת לכל רשומות השינויים בתוך תקופת השמירה באמצעות חותמת הזמן החזקה של הגבול לקריאה בלבד.
כל שאר TransactionOptions לא תקפים לשאילתות של שינוי נתונים. בנוסף, אם הערך של TransactionOptions.read_only.return_read_timestamp מוגדר ל-true, מוחזר ערך מיוחד של kint64max - 1 בהודעה Transaction שמתארת את העסקה, במקום חותמת זמן קריאה תקינה. צריך להתעלם מהערך המיוחד הזה ולא להשתמש בו בשום שאילתה בהמשך.
כל שאילתה של מקור נתונים לשינויים יכולה להחזיר כל מספר של שורות, שכל אחת מהן מכילה רשומה של שינוי נתונים, רשומה של אותות חיים או רשומה של מחיצות צאצא. אין צורך להגדיר מועד אחרון לבקשה.
דוגמה לתהליך עבודה של שאילתת שינוי בנתונים
תהליך העבודה של שאילתת סטרימינג מתחיל בהנפקת השאילתה הראשונה של שינוי הנתונים בסטרימינג על ידי ציון partition_token עד NULL. בשליחה של שאילתה צריך לציין את פונקציית הקריאה של סנכרון שינויים בזרמי נתונים, את חותמת הזמן של ההתחלה והסיום של הנתונים הרלוונטיים ואת מרווח הזמן של פעימות הלב. כשהערך של end_timestamp הוא NULL, השאילתה ממשיכה להחזיר שינויים בנתונים עד שהמחיצה מסתיימת.
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:00Z",
end_timestamp => NULL,
partition_token => NULL,
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:00Z',
NULL,
NULL,
10000,
NULL
) ;
מעבדים רשומות של נתונים מהשאילתה הזו עד שכל הרשומות של מחיצות הצאצא מוחזרות. בדוגמה הבאה, מוחזרים שני רשומות של מחיצות צאצא ושלושה אסימונים של מחיצות, ואז השאילתה מסתיימת. רשומות של מחיצות צאצא משאילתה ספציפית תמיד חולקות את אותו start_timestamp.
child_partitions_record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:00:01Z",
"record_sequence": "1000012389",
"child_partitions": [
{
"token": "child_token_1",
// Note parent tokens are null for child partitions returned
// from the initial change stream queries.
"parent_partition_tokens": [NULL]
}
{
"token": "child_token_2",
"parent_partition_tokens": [NULL]
}
],
}
child_partitions_record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:00:01Z",
"record_sequence": "1000012390",
"child_partitions": [
{
"token": "child_token_3",
"parent_partition_tokens": [NULL]
}
],
}
כדי לעבד שינויים אחרי 2022-05-01T09:00:01Z, צריך ליצור שלוש שאילתות חדשות ולהריץ אותן במקביל. כשמשתמשים בשלוש השאילתות יחד, הן מחזירות שינויים בנתונים עבור אותו טווח מפתחות שהשאילתה הראשית מכסה. תמיד צריך להגדיר את start_timestamp לערך start_timestamp באותה רשומת מחיצה משנית, ולהשתמש באותו ערך של end_timestamp ובאותו מרווח בין פעימות לב כדי לעבד את הרשומות באופן עקבי בכל השאילתות.
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_1",
heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_2",
heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_3",
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_1',
10000,
NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_2',
10000,
NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_3',
10000,
NULL
);
השאילתה ב-child_token_2 מסתיימת אחרי החזרת רשומה נוספת של מחיצת צאצא. הרשומה הזו מציינת שמחיצה חדשה מכסה שינויים גם ב-child_token_2 וגם ב-child_token_3 החל מ-2022-05-01T09:30:15Z. אותו רשומה בדיוק מוחזרת על ידי השאילתה ב-child_token_3, כי שניהם הם מחיצות האב של child_token_4 החדש. כדי להבטיח עיבוד מסודר של רשומות נתונים עבור מפתח מסוים, השאילתה ב-child_token_4 צריכה להתחיל אחרי שכל ההורים סיימו. במקרה הזה, ההורים הם child_token_2 ו-child_token_3. אפשר ליצור רק שאילתה אחת לכל טוקן של מחיצת צאצא. בתהליך העבודה של השאילתה צריך להגדיר הורה אחד שימתין ויקבע את מועד השאילתה ב-child_token_4.
child_partitions_record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:30:15Z",
"record_sequence": "1000012389",
"child_partitions": [
{
"token": "child_token_4",
"parent_partition_tokens": ["child_token_2", "child_token_3"],
}
],
}
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream(
start_timestamp => "2022-05-01T09:30:15Z",
end_timestamp => NULL,
partition_token => "child_token_4",
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:30:15Z',
NULL,
'child_token_4',
10000,
NULL
);
אפשר למצוא דוגמאות לטיפול ברשומות של זרם שינויים ולניתוח שלהן במחבר Apache Beam SpannerIO Dataflow ב-GitHub.