התבנית Bigtable change streams to Pub/Sub היא צינור עיבוד נתונים בסטרימינג שמעביר בסטרימינג רשומות של שינויים בנתונים ב-Bigtable ומפרסם אותן בנושא ב-Pub/Sub באמצעות Dataflow.
זרם שינויים ב-Bigtable מאפשר להירשם לשינויים בנתונים ברמת הטבלה. כשנרשמים לסנכרון שינויים בזרמי נתונים בטבלה, חלים האילוצים הבאים:
- מוחזרים רק תאים ששונו ותיאורים של פעולות מחיקה.
- מוחזר רק הערך החדש של תא ששונה.
כשמפרסמים רשומות של שינויי נתונים בנושא Pub/Sub, יכול להיות שההודעות יוכנסו שלא לפי הסדר בהשוואה לסדר המקורי של חותמות הזמן של ביצוע השינויים ב-Bigtable.
רשומות של שינויים בנתונים ב-Bigtable שלא ניתן לפרסם בנושאים ב-Pub/Sub מועברות באופן זמני לספרייה של תור הודעות שלא ניתן לעיבוד (dead-letter queue) ב-Cloud Storage. אחרי מספר הניסיונות המקסימלי הכושל, הרשומות האלה מועברות לצמיתות לאותו מדריך של תור הודעות שלא ניתן למסור לבדיקה אנושית או לעיבוד נוסף על ידי המשתמש.
הצינור דורש שנושא היעד ב-Pub/Sub יתקיים. יכול להיות שהנושא של היעד מוגדר לאימות הודעות באמצעות סכימה. אם נושא Pub/Sub מציין סכימה, הצינור יתחיל לפעול רק אם הסכימה תקפה. בהתאם לסוג הסכימה, משתמשים באחת מהגדרות הסכימה הבאות לנושא היעד:
מאגרי אחסון לפרוטוקולים
syntax = "proto2"; package com.google.cloud.teleport.bigtable; option java_outer_classname = "ChangeLogEntryProto"; message ChangelogEntryProto{ required bytes rowKey = 1; enum ModType { SET_CELL = 0; DELETE_FAMILY = 1; DELETE_CELLS = 2; UNKNOWN = 3; } required ModType modType = 2; required bool isGC = 3; required int32 tieBreaker = 4; required int64 commitTimestamp = 5; required string columnFamily = 6; optional bytes column = 7; optional int64 timestamp = 8; optional int64 timestampFrom = 9; optional int64 timestampTo = 10; optional bytes value = 11; required string sourceInstance = 12; required string sourceCluster = 13; required string sourceTable = 14; }
Avro
{ "name" : "ChangelogEntryMessage", "type" : "record", "namespace" : "com.google.cloud.teleport.bigtable", "fields" : [ { "name" : "rowKey", "type" : "bytes"}, { "name" : "modType", "type" : { "name": "ModType", "type": "enum", "symbols": ["SET_CELL", "DELETE_FAMILY", "DELETE_CELLS", "UNKNOWN"]} }, { "name": "isGC", "type": "boolean" }, { "name": "tieBreaker", "type": "int"}, { "name": "columnFamily", "type": "string"}, { "name": "commitTimestamp", "type" : "long"}, { "name" : "sourceInstance", "type" : "string"}, { "name" : "sourceCluster", "type" : "string"}, { "name" : "sourceTable", "type" : "string"}, { "name": "column", "type" : ["null", "bytes"]}, { "name": "timestamp", "type" : ["null", "long"]}, { "name": "timestampFrom", "type" : ["null", "long"]}, { "name": "timestampTo", "type" : ["null", "long"]}, { "name" : "value", "type" : ["null", "bytes"]} ] }
JSON
משתמשים בסכימת Protobuf הבאה עם קידוד הודעות JSON:
syntax = "proto2"; package com.google.cloud.teleport.bigtable; option java_outer_classname = "ChangelogEntryMessageText"; message ChangelogEntryText{ required string rowKey = 1; enum ModType { SET_CELL = 0; DELETE_FAMILY = 1; DELETE_CELLS = 2; UNKNOWN = 3; } required ModType modType = 2; required bool isGC = 3; required int32 tieBreaker = 4; required int64 commitTimestamp = 5; required string columnFamily = 6; optional string column = 7; optional int64 timestamp = 8; optional int64 timestampFrom = 9; optional int64 timestampTo = 10; optional string value = 11; required string sourceInstance = 12; required string sourceCluster = 13; required string sourceTable = 14; }
כל הודעת Pub/Sub חדשה כוללת רשומה אחת מרשומה של שינוי נתונים שמוחזרת מזרם השינויים מהשורה התואמת בטבלת Bigtable. התבנית Pub/Sub משטחת את הרשומות בכל רשומה של שינוי נתונים לשינויים ברמת התא הבודד.
תיאור הודעת הפלט של Pub/Sub
| שם השדה | תיאור |
|---|---|
rowKey |
מפתח השורה של השורה שהשתנתה. הנתונים מגיעים בצורה של מערך בייטים. כשמוגדר קידוד הודעות JSON, מפתחות השורות מוחזרים כמחרוזות. כשמציינים את useBase64Rowkeys, מפתחות השורות מקודדים ב-Base64. אחרת, נעשה שימוש בערכת תווים שצוינה על ידי bigtableChangeStreamCharset כדי לפענח את הבייטים של מפתח השורה למחרוזת. |
modType |
סוג השינוי בשורה. משתמשים באחד מהערכים הבאים: SET_CELL, DELETE_CELLS או DELETE_FAMILY. |
columnFamily |
קבוצת העמודות שהושפעה משינוי השורה. |
column |
מגדיר העמודה שהושפע משינוי השורה. עבור סוג המוטציה DELETE_FAMILY, שדה העמודה לא מוגדר. הנתונים מגיעים בצורה של מערך בייטים. כשמוגדר קידוד הודעות JSON, העמודות מוחזרות כמחרוזות. אם מציינים את useBase64ColumnQualifier, שדה העמודה מקודד ב-Base64. אחרת, נעשה שימוש בערכת תווים שצוינה על ידי bigtableChangeStreamCharset כדי לפענח את הבייטים של מפתח השורה למחרוזת. |
commitTimestamp |
השעה שבה Bigtable מחיל את השינוי. הזמן נמדד במיקרו-שניות מאז ראשית זמן יוניקס (1 בינואר 1970 בשעה UTC). |
timestamp |
ערך חותמת הזמן של התא שהמוטציה משפיעה עליו. עבור סוגי המוטציות DELETE_CELLS ו-DELETE_FAMILY, חותמת הזמן לא מוגדרת. הזמן נמדד במיקרו-שניות מאז ראשית זמן יוניקס (1 בינואר 1970 בשעה UTC). |
timestampFrom |
מתאר את ההתחלה של מרווח הזמן של חותמות הזמן, כולל, לכל התאים שנמחקו על ידי המוטציה DELETE_CELLS. בסוגים אחרים של מוטציות, הערך של timestampFrom לא מוגדר. הזמן נמדד במיקרו-שניות מאז ראשית זמן יוניקס (1 בינואר 1970 בשעה UTC). |
timestampTo |
מתאר את הסוף הבלעדי של מרווח חותמות הזמן של כל התאים שנמחקו על ידי המוטציה DELETE_CELLS. בסוגים אחרים של מוטציות, הערך של timestampTo לא מוגדר. |
isGC |
ערך בוליאני שמציין אם המוטציה נוצרה על ידי מנגנון garbage collection של Bigtable. |
tieBreaker |
כששני שינויים נרשמים בו-זמנית על ידי אשכולות שונים של Bigtable, השינוי עם הערך הכי גבוה של tiebreaker מוחל על טבלת המקור. מוטציות עם ערכים נמוכים יותר של tiebreaker נפסלות. |
value |
הערך החדש שהוגדר על ידי השינוי. אם לא מגדירים את האפשרות stripValues של צינור העיבוד, הערך מוגדר לשינויים ב-SET_CELL. לסוגים אחרים של מוטציות, הערך לא מוגדר. הנתונים מגיעים בצורה של מערך בייטים. כשמוגדר קידוד של הודעות JSON, הערכים מוחזרים כמחרוזות.
כשמציינים את useBase64Values, הערך מקודד בפורמט Base64. אחרת, נעשה שימוש בערכת תווים שצוינה על ידי bigtableChangeStreamCharset כדי לפענח את בייטים של הערך למחרוזת. |
sourceInstance |
השם של מופע Bigtable שרשם את השינוי. יכול להיות שזה יקרה כשמספר צינורות מעבירים שינויים ממופעים שונים לאותו נושא Pub/Sub. |
sourceCluster |
השם של אשכול Bigtable שרשם את השינוי. יכול להיות שיהיה צורך להשתמש בשיטה הזו כשכמה צינורות מעבירים שינויים בסטרימינג ממופעים שונים לאותו נושא Pub/Sub. |
sourceTable |
השם של טבלת Bigtable שקיבלה את השינוי. יכול להיות שיהיה צורך להשתמש בה אם כמה צינורות מעבירים שינויים בסטרימינג מטבלאות שונות לאותו נושא Pub/Sub. |
הדרישות לגבי צינורות עיבוד נתונים
- מופע המקור של Bigtable שצוין.
- טבלת המקור שצוינה ב-Bigtable. צריך להפעיל את סנכרון שינויים בזרמי נתונים בטבלה.
- פרופיל האפליקציה שצוין ב-Bigtable.
- נושא ה-Pub/Sub שצוין חייב להתקיים.
פרמטרים של תבניות
פרמטרים נדרשים
- pubSubTopic: השם של נושא היעד ב-Pub/Sub.
- bigtableChangeStreamAppProfile: מזהה פרופיל האפליקציה של Bigtable. פרופיל האפליקציה צריך להשתמש בניתוב של אשכול יחיד ולאפשר עסקאות של שורה אחת.
- bigtableReadInstanceId: מזהה מכונת Bigtable של המקור.
- bigtableReadTableId: מזהה טבלת Bigtable של המקור.
פרמטרים אופציונליים
- messageEncoding: הקידוד של ההודעות שיפורסמו בנושא Pub/Sub. כשסכימת נושא היעד מוגדרת, קידוד ההודעה נקבע לפי הגדרות הנושא. הערכים הנתמכים הם:
BINARYו-JSON. ברירת המחדל היאJSON. - messageFormat: הקידוד של ההודעות לפרסום בנושא Pub/Sub. כשסכימת נושא היעד מוגדרת, קידוד ההודעה נקבע לפי הגדרות הנושא. הערכים הנתמכים:
AVRO,PROTOCOL_BUFFERSו-JSON. ערך ברירת המחדל הואJSON. כשמשתמשים בפורמטJSON, השדות rowKey, column ו-value של ההודעה הם מחרוזות, והתוכן שלהם נקבע לפי אפשרויות צינור העיבודuseBase64Rowkeys,useBase64ColumnQualifiers,useBase64Valuesו-bigtableChangeStreamCharset. - stripValues: אם הערך מוגדר כ-
true, מוטציותSET_CELLמוחזרות בלי ערכים חדשים. ברירת המחדל היאfalse. הפרמטר הזה שימושי כשלא צריך שערך חדש יהיה נוכח, או כשערכים גדולים מאוד וחורגים ממגבלות הגודל של הודעות Pub/Sub. - dlqDirectory: הספרייה של תור ההודעות המתות. רשומות שלא עוברות עיבוד מאוחסנות בספרייה הזו. ברירת המחדל היא ספרייה מתחת למיקום הזמני של עבודת Dataflow. ברוב המקרים אפשר להשתמש בנתיב ברירת המחדל.
- dlqRetryMinutes: מספר הדקות בין ניסיונות חוזרים של תור הודעות מתות. ברירת המחדל היא
10. - dlqMaxRetries: המספר המקסימלי של ניסיונות חוזרים של הודעות שהועברו לתור Dead Letter. ברירת המחדל היא
5. - useBase64Rowkeys: משמש עם קידוד הודעות JSON. אם הערך הוא
true, השדהrowKeyהוא מחרוזת בקידוד Base64. אחרת,rowKeyנוצר באמצעותbigtableChangeStreamCharsetכדי לפענח בייטים למחרוזת. ברירת המחדל היאfalse. - pubSubProjectId: מזהה הפרויקט ב-Bigtable. ברירת המחדל היא הפרויקט של משימת Dataflow.
- useBase64ColumnQualifiers: משמש עם קידוד הודעות JSON. אם הערך הוא
true, השדהcolumnהוא מחרוזת בקידוד Base64. אחרת, העמודה נוצרת באמצעותbigtableChangeStreamCharsetכדי לפענח בייטים למחרוזת. ברירת המחדל היאfalse. - useBase64Values: משמש עם קידוד הודעות JSON. אם הערך הוא
true, שדה הערך הוא מחרוזת בקידוד Base64. אחרת, הערך נוצר באמצעותbigtableChangeStreamCharsetכדי לפענח בייטים למחרוזת. ברירת המחדל היאfalse. - disableDlqRetries: מציין אם להשבית את הניסיונות החוזרים עבור תור ההודעות המתות. ברירת המחדל היא: false.
- bigtableChangeStreamMetadataInstanceId: מזהה מופע של מטא-נתונים של Bigtable סנכרון שינויים בזרמי נתונים. ברירת המחדל היא ריק.
- bigtableChangeStreamMetadataTableTableId: המזהה של טבלת המטא-נתונים של מחבר Bigtable סנכרון שינויים בזרמי נתונים. אם לא מספקים את המידע הזה, המערכת יוצרת באופן אוטומטי טבלת מטא-נתונים של מחבר Bigtable לסנכרון שינויים בזרמי נתונים במהלך ההרצה של צינור הנתונים. ברירת המחדל היא ריק.
- bigtableChangeStreamCharset: שם charset של סנכרון שינויים בזרמי נתונים של Bigtable. ברירת המחדל היא UTF-8.
- bigtableChangeStreamStartTimestamp: חותמת הזמן (https://tools.ietf.org/html/rfc3339) שמשמשת לקריאת סנכרון שינויים בזרמי נתונים, כולל חותמת הזמן עצמה. לדוגמה,
2022-05-05T07:59:59Z. ברירת המחדל היא חותמת הזמן של שעת ההתחלה של הצינור. - bigtableChangeStreamIgnoreColumnFamilies: רשימה מופרדת בפסיקים של שינויים בשמות של משפחות עמודות שצריך להתעלם מהם. ברירת המחדל היא ריק.
- bigtableChangeStreamIgnoreColumns: רשימה מופרדת בפסיקים של שינויים בשמות עמודות שצריך להתעלם מהם. דוגמה: "cf1:col1,cf2:col2". ברירת המחדל היא ריק.
- bigtableChangeStreamName: שם ייחודי של צינור הנתונים של הלקוח. מאפשרת לכם להמשיך את העיבוד מהנקודה שבה צינור קודם שהופעל נעצר. ברירת המחדל היא שם שנוצר באופן אוטומטי. הערך שבו נעשה שימוש מופיע ביומני המשימות של Dataflow.
- bigtableChangeStreamResume: אם הערך הוא
true, צינור חדש ימשיך את העיבוד מהנקודה שבה צינור שפעל בעבר עם אותו ערךbigtableChangeStreamNameהפסיק. אם צינור העיבוד עם הערךbigtableChangeStreamNameשצוין לא הופעל אף פעם, לא יופעל צינור עיבוד חדש. כשמגדירים את האפשרותfalse, מתחיל צינור חדש. אם צינור עם אותו ערךbigtableChangeStreamNameכבר הופעל עבור המקור הנתון, לא יופעל צינור חדש. ברירת המחדל היאfalse. - bigtableReadChangeStreamTimeoutMs: משך הזמן הקצוב לתפוגה של בקשות Bigtable ReadChangeStream באלפיות השנייה.
- bigtableReadProjectId: מזהה הפרויקט ב-Bigtable. ברירת המחדל היא הפרויקט של משימת Dataflow.
הפעלת התבנית
המסוף
- עוברים לדף Dataflow Create job from template (יצירת משימה מתבנית). כניסה לדף Create job from template
- בשדה שם המשימה, מזינים שם ייחודי למשימה.
- אופציונלי: בשדה Regional endpoint (נקודת קצה אזורית), בוחרים ערך מהתפריט הנפתח. אזור ברירת המחדל הוא
us-central1.רשימת האזורים שבהם אפשר להריץ משימת Dataflow מופיעה במאמר מיקומי Dataflow.
- בתפריט הנפתח Dataflow template (תבנית של העברת נתונים), בוחרים באפשרות the Bigtable change streams to Pub/Sub template.
- בשדות הפרמטרים שמופיעים, מזינים את ערכי הפרמטרים.
- לוחצים על הפעלת העבודה.
gcloud
במעטפת או בטרמינל, מריצים את התבנית:
gcloud dataflow flex-template run JOB_NAME \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_PubSub \ --parameters \ bigtableReadInstanceId=BIGTABLE_INSTANCE_ID,\ bigtableReadTableId=BIGTABLE_TABLE_ID,\ bigtableChangeStreamAppProfile=BIGTABLE_APPLICATION_PROFILE_ID,\ pubSubTopic=PUBSUB_TOPIC
מחליפים את מה שכתוב בשדות הבאים:
-
PROJECT_ID: מזהה הפרויקט שבו רוצים להריץ את משימת Dataflow Google Cloud -
JOB_NAME: שם ייחודי של המשימה לפי בחירתכם -
VERSION: הגרסה של התבנית שבה רוצים להשתמשאפשר להשתמש בערכים הבאים:
-
latestכדי להשתמש בגרסה העדכנית של התבנית, שזמינה בתיקיית ההורה ללא תאריך בדלי – gs://dataflow-templates-REGION_NAME/latest/ - שם הגרסה, כמו
2023-09-12-00_RC00, כדי להשתמש בגרסה ספציפית של התבנית, שאפשר למצוא אותה בתיקיית האב המתאימה עם התאריך בדלי – gs://dataflow-templates-REGION_NAME/
-
-
REGION_NAME: האזור שבו רוצים לפרוס את עבודת Dataflow, לדוגמה:us-central1 -
BIGTABLE_INSTANCE_ID: מזהה מופע Bigtable. -
BIGTABLE_TABLE_ID: מזהה הטבלה ב-Bigtable. -
BIGTABLE_APPLICATION_PROFILE_ID: מזהה פרופיל האפליקציה של Bigtable. -
PUBSUB_TOPIC: שם נושא היעד ב-Pub/Sub
API
כדי להריץ את התבנית באמצעות API בארכיטקטורת REST, שולחים בקשת HTTP POST. מידע נוסף על ה-API ועל היקפי ההרשאות שלו זמין במאמר projects.templates.launch.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_PubSub", "parameters": { "bigtableReadInstanceId": "BIGTABLE_INSTANCE_ID", "bigtableReadTableId": "BIGTABLE_TABLE_ID", "bigtableChangeStreamAppProfile": "BIGTABLE_APPLICATION_PROFILE_ID", "pubSubTopic": "PUBSUB_TOPIC" } } }
מחליפים את מה שכתוב בשדות הבאים:
-
PROJECT_ID: מזהה הפרויקט שבו רוצים להריץ את משימת Dataflow Google Cloud -
JOB_NAME: שם ייחודי של המשימה לפי בחירתכם -
VERSION: הגרסה של התבנית שבה רוצים להשתמשאפשר להשתמש בערכים הבאים:
-
latestכדי להשתמש בגרסה העדכנית של התבנית, שזמינה בתיקיית ההורה ללא תאריך בדלי – gs://dataflow-templates-REGION_NAME/latest/ - שם הגרסה, כמו
2023-09-12-00_RC00, כדי להשתמש בגרסה ספציפית של התבנית, שאפשר למצוא אותה בתיקיית האב המתאימה עם התאריך בדלי – gs://dataflow-templates-REGION_NAME/
-
-
LOCATION: האזור שבו רוצים לפרוס את עבודת Dataflow, לדוגמה:us-central1 -
BIGTABLE_INSTANCE_ID: מזהה מופע Bigtable. -
BIGTABLE_TABLE_ID: מזהה הטבלה ב-Bigtable. -
BIGTABLE_APPLICATION_PROFILE_ID: מזהה פרופיל האפליקציה של Bigtable. -
PUBSUB_TOPIC: שם נושא היעד ב-Pub/Sub