קריאה של שינוי השידור החי באמצעות Java
ספריית הלקוח של Cloud Bigtable ל-Java מספקת שיטות ברמה נמוכה לעיבוד רשומות של שינויים בנתונים. עם זאת, ברוב המקרים מומלץ להזרים שינויים באמצעות Dataflow במקום להשתמש בשיטות שמתוארות בדף הזה, כי Dataflow מטפל בפיצולים ובמיזוגים של מחיצות בשבילכם.
לפני שמתחילים
לפני שקוראים סנכרון שינויים בזרמי נתונים באמצעות Java, חשוב להכיר את סקירת סנכרון שינויים בזרמי נתונים. לאחר מכן, משלימים את הדרישות המוקדמות הבאות.
מגדירים אימות
כדי להשתמש בדוגמאות של Java שבדף הזה בסביבת פיתוח מקומית, מתקינים ומפעילים את ה-CLI של gcloud, ואז מגדירים את Application Default Credentials באמצעות פרטי הכניסה של המשתמש.
-
התקינו את ה-CLI של Google Cloud.
-
אם אתם משתמשים בספק זהויות חיצוני (IdP), קודם אתם צריכים להיכנס ל-CLI של gcloud באמצעות המאגר המאוחד לניהול זהויות.
-
אם אתם משתמשים במעטפת מקומית, אתם צריכים ליצור פרטי כניסה לאימות מקומי עבור חשבון המשתמש:
gcloud auth application-default login
אם אתם משתמשים ב-Cloud Shell, אין צורך לבצע את הפעולה הזו.
אם מוחזרת שגיאת אימות ואתם משתמשים בספק זהויות חיצוני (IdP), ודאו ש נכנסתם ל-CLI של gcloud באמצעות המאגר המאוחד לניהול זהויות.
מידע נוסף זמין במאמר הגדרת אימות לסביבת פיתוח מקומית.
למידע נוסף על הגדרה של אימות בסביבת ייצור, ראו הגדרה של Application Default Credentials לקוד שפועל ב- Google Cloud .
הפעלת שינוי בשידור
כדי לקרוא טבלה, צריך להפעיל בה זרם שינויים. אפשר גם ליצור טבלה חדשה עם שינוי מופעל בשידור.
התפקידים הנדרשים
כדי לקבל את ההרשאות שדרושות לקריאת נתוני שינויים ב-Bigtable, צריך לבקש מהאדמין להקצות לכם את תפקיד ה-IAM הבא.
- Bigtable Administrator
(
roles/bigtable.admin) on the Bigtable instance that contains the table you plan to stream changes from
הוספה של ספריית הלקוח של Java כתלות
מוסיפים קוד דומה לקוד הבא לקובץ pom.xml של Maven. מחליפים את VERSION בגרסה של ספריית הלקוח שבה אתם משתמשים. הגרסה צריכה להיות 2.21.0 ואילך.
<dependencies>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigtable</artifactId>
<version>VERSION</version>
</dependency>
</dependencies>
קביעת המחיצות של הטבלה
כדי להתחיל לשלוח בקשות ל-ReadChangeStream, צריך לדעת את המחיצות של הטבלה. אפשר לקבוע את זה באמצעות ה-method GenerateInitialChangeStreamPartitions. בדוגמה הבאה אפשר לראות איך משתמשים במתודה הזו כדי לקבל זרם של ByteStringRanges שמייצג כל מחיצה בטבלה. כל ByteStringRange מכיל את מפתח ההתחלה ומפתח הסיום של מחיצה.
ServerStream<ByteStringRange> partitionStream =
client.generateInitialChangeStreamPartitions("MyTable");
עיבוד השינויים בכל מחיצה
לאחר מכן אפשר לעבד את השינויים בכל מחיצה באמצעות השיטה ReadChangeStream. זו דוגמה לאופן שבו פותחים זרם למחיצה, החל מהשעה הנוכחית.
ReadChangeStreamQuery query =
ReadChangeStreamQuery.create("MyTable")
.streamPartition(partition)
.startTime(Instant.now());
ServerStream<ChangeStreamRecord> changeStream = client.readChangeStream(query);
ReadChangeStreamQuery מקבל את הארגומנטים הבאים:
- מחיצה של מקור הנתונים (חובה) – המחיצה שממנה יועברו השינויים
- אחת מהאפשרויות הבאות:
- שעת ההתחלה – חותמת הזמן של השמירה כדי להתחיל לעבד את השינויים ממנה
- טוקנים להמשכיות – טוקנים שמייצגים מיקום שממנו אפשר לחדש את הסטרימינג
- שעת סיום (אופציונלי) – חותמת זמן של ביצוע (commit) להפסקת עיבוד השינויים כשהיא מושגת. אם לא מציינים ערך, הנתונים ממשיכים להיקרא מהזרם.
- משך פעימת הלב (אופציונלי) – התדירות של הודעות פעימת הלב כשאין שינויים חדשים (ברירת המחדל היא חמש שניות)
שינוי הפורמט של הקלטת השידור
רשומה של שינוי בסטרימינג היא אחד משלושה סוגי תגובות:
ChangeStreamMutation– הודעה שמייצגת רשומה של שינוי בנתונים.
CloseStream– הודעה שמציינת שהלקוח צריך להפסיק לקרוא מהזרם.- סטטוס – מציין את הסיבה לסגירת הזרם. אחת מהאפשרויות:
-
OK– הגיע הזמן לסיום המחיצה הנתונה -
OUT_OF_RANGE– החלוקה שצוינה כבר לא קיימת, כלומר בוצעו פיצולים או מיזוגים בחלוקה הזו. צריך ליצור בקשתReadChangeStreamחדשה לכל מחיצה חדשה.
-
-
NewPartitions– מכיל את פרטי החלוקה למחיצות המעודכנים בתגובות שלOUT_OF_RANGE. -
ChangeStreamContinuationTokens– רשימת טוקנים שמשמשים להמשך בקשות חדשות שלReadChangeStreamמאותו מיקום. אחת לכלNewPartition.
- סטטוס – מציין את הסיבה לסגירת הזרם. אחת מהאפשרויות:
Heartbeat– הודעה תקופתית עם מידע שאפשר להשתמש בו כדי לבדוק את מצב הסטרימינג.-
EstimatedLowWatermark– הערכה של סימן המים הנמוך למחיצה הנתונה -
ContinuationToken– טוקן להמשך הזרמת המחיצה הנתונה מהמיקום הנוכחי.
-
תוכן של רשומות שינוי נתונים
מידע על רשומות של שינוי נתונים זמין במאמר מה כוללת רשומה של שינוי נתונים.
טיפול בשינויים במחיצות
כשמחיצות של טבלה משתנות, בקשות ReadChangeStream מחזירות הודעה CloseStream עם המידע שנדרש כדי להמשיך את הסטרימינג מהמחיצות החדשות.
בפיצול, השדה הזה יכיל כמה מחיצות חדשות ומחיצה תואמת ContinuationToken לכל מחיצה. כדי להמשיך להזרים את המחיצות החדשות מאותו מיקום, שולחים בקשת ReadChangeStream חדשה לכל מחיצה חדשה עם האסימון המתאים שלה.
לדוגמה, אם אתם מבצעים סטרימינג של מחיצה [A,C) והיא מתפצלת לשתי מחיצות, [A,B) ו-[B,C), אתם יכולים לצפות לרצף האירועים הבא:
ReadChangeStream(streamPartition = ByteStringRange(A, C)) receives:
CloseStream(
Status = OUT_OF_RANGE,
NewPartitions = List(ByteStringRange(A, B), ByteStringRange(B, C))
ChangeStreamContinuationTokens = List(foo, bar)
)
כדי להמשיך את הסטרימינג של כל מחיצה מאותו מיקום, שולחים את הבקשות הבאות של ReadChangeStreamQuery:
ReadChangeStreamQuery queryAB =
ReadChangeStreamQuery.create("myTable")
.streamPartition(ByteStringRange(A, B))
.continuationTokens(List.of(foo));
ReadChangeStreamQuery queryBC =
ReadChangeStreamQuery.create("myTable")
.streamPartition(ByteStringRange(B, C))
.continuationTokens(List.of(bar));
כדי למזג מחיצות ולהמשיך מאותה מחיצה, צריך לשלוח בקשת ReadChangeStream חדשה שמכילה כל אסימון מהמחיצות הממוזגות.
לדוגמה, אם אתם מבצעים סטרימינג של שתי מחיצות, [A,B) ו-[B,C), והן מתמזגות למחיצה [A,C), אתם יכולים לצפות לרצף האירועים הבא:
ReadChangeStream(streamPartition = ByteStringRange(A, B)) receives:
CloseStream(
Status = OUT_OF_RANGE,
NewPartitions = List(ByteStringRange(A, C)),
ChangeStreamContinuationTokens = List(foo)
)
ReadChangeStream(streamPartition = ByteStringRange(B, C)) receives:
CloseStream(
Status = OUT_OF_RANGE,
NewPartitions = List(ByteStringRange(A, C)),
ChangeStreamContinuationTokens = List(bar)
)
כדי להמשיך את הסטרימינג של מחיצת [A, C) מאותה נקודה, שולחים פקודת ReadChangeStreamQuery כמו זו:
ReadChangeStreamQuery query =
ReadChangeStreamQuery.create("myTable")
.streamPartition(ByteStringRange(A, C))
.continuationTokens(List.of(foo, bar));