יצירת חיבורים של סנכרון שינויים בזרמי נתונים באמצעות Dataflow

בדף הזה נדגים איך ליצור צינורות עיבוד נתונים ב-Dataflow שצורכים נתונים של שינויים ב-Spanner ומעבירים אותם באמצעות סנכרון שינויים בזרמי נתונים. אפשר להשתמש בקוד לדוגמה שמופיע בדף הזה כדי ליצור צינורות מותאמים אישית.

מושגי ליבה

הנה כמה מושגי ליבה לגבי צינורות Dataflow לסנכרון שינויים בזרמי נתונים.

Dataflow

Dataflow הוא שירות מהיר וחסכוני ללא שרת (serverless) שתומך בעיבוד של נתונים בסטרימינג וגם בעיבוד באצווה. הוא מספק ניידות עם משימות עיבוד שנכתבו באמצעות ספריות הקוד הפתוח Apache Beam, ומבצע אוטומציה של הקצאת משאבים וניהול אשכולות. ‫Dataflow מספק סטרימינג כמעט בזמן אמת כשקוראים מסנכרון שינויים בזרמי נתונים.

אפשר להשתמש ב-Dataflow כדי לצרוך סנכרון שינויים בזרמי נתונים ב-Spanner באמצעות מחבר SpannerIO, שמציע הפשטה של Spanner API לצורך שליחת שאילתות לגבי סנכרון שינויים בזרמי נתונים. בעזרת המחבר הזה, אתם לא צריכים לנהל את מחזור החיים של מחיצות של סנכרון שינויים בזרמי נתונים, שזה הכרחי כשמשתמשים ישירות ב-Spanner API. המחבר מספק לכם זרם של רשומות שינוי נתונים, כך שתוכלו להתמקד יותר בלוגיקה של האפליקציה ופחות בפרטים ספציפיים של ה-API ובחלוקה דינמית של סנכרון שינויים בזרמי נתונים. ברוב המקרים שבהם צריך לקרוא נתונים של שינויים, מומלץ להשתמש במחבר SpannerIO ולא ב-Spanner API.

תבניות Dataflow הן צינורות Dataflow מוכנים מראש שמיישמים תרחישי שימוש נפוצים. סקירה כללית זמינה במאמר בנושא תבניות Dataflow.

צינור עיבוד נתונים של Dataflow

צינור Dataflow של סנכרון שינויים בזרמי נתונים ב-Spanner מורכב מארבעה חלקים עיקריים:

  1. מסד נתונים של Spanner עם מקור נתונים לשינויים
  2. המחבר SpannerIO
  3. טרנספורמציות ומקורות נתונים שהוגדרו על ידי המשתמש
  4. Apache Beam sink I/O writer

תמונה

שינוי השידור ב-Spanner

כאן מוסבר איך יוצרים מקור נתונים לשינויים.

מחבר Apache Beam SpannerIO

זהו מחבר SpannerIO שמתואר בקטע הקודם בנושא Dataflow. זהו מחבר קלט/פלט של מקור שפולט PCollection של רשומות שינוי נתונים לשלבים מאוחרים יותר בצינור. השעה של האירוע בכל רשומה של שינוי נתונים שמועברת תהיה חותמת הזמן של השמירה. חשוב לשים לב שהרשומות שמופקות הן לא מסודרות, ומחבר SpannerIO מבטיח שלא יהיו רשומות מאוחרות.

כשעובדים עם סנכרון שינויים בזרמי נתונים, Dataflow משתמשת בנקודות ביקורת. כתוצאה מכך, כל עובד עשוי להמתין עד למרווח הזמן שמוגדר בין נקודות הבדיקה כדי לשמור שינויים במאגר זמני לפני שליחת השינויים לעיבוד נוסף.

טרנספורמציות שהוגדרו על ידי המשתמש

טרנספורמציה מוגדרת על ידי המשתמש מאפשרת למשתמש לצבור, לשנות או לערוך נתונים לעיבוד בצינור Dataflow. תרחישי שימוש נפוצים הם הסרה של פרטים אישיים מזהים, עמידה בדרישות של פורמט נתונים במורד הזרם ומיון. אפשר לעיין במדריך התכנות בנושא טרנספורמציות במסמכי התיעוד הרשמיים של Apache Beam.

Apache Beam sink I/O writer

‫Apache Beam כולל מחברי קלט/פלט מובנים שאפשר להשתמש בהם כדי לכתוב מצינור Dataflow אל יעד נתונים כמו BigQuery. יש תמיכה מובנית ברוב יעדי הנתונים הנפוצים.

תבניות Dataflow

תבניות Dataflow מספקות שיטה ליצירת משימות Dataflow על סמך תמונות Docker מוכנות מראש לתרחישי שימוש נפוצים באמצעות מסוף Google Cloud , CLI של Google Cloud או קריאות ל-Rest API.

לסנכרון שינויים בזרמי נתונים ב-Spanner, אנחנו מספקים שלוש תבניות Flex של Dataflow:

ההגבלות הבאות חלות כשמשתמשים בתבנית Spanner סנכרון שינויים בזרמי נתונים ל-Pub/Sub:

הגדרת הרשאות IAM לתבניות Dataflow

לפני שיוצרים משימת Dataflow עם שלושת התבניות הגמישות שמופיעות ברשימה, צריך לוודא שיש לכם את ההרשאות הנדרשות לניהול זהויות והרשאות גישה (IAM) לחשבונות השירות הבאים:

אם אין לכם את הרשאות ה-IAM הנדרשות, תצטרכו לציין חשבון שירות של עובד בניהול המשתמשים כדי ליצור את משימת Dataflow. מידע נוסף זמין במאמר בנושא אבטחה והרשאות ב-Dataflow.

כשמנסים להריץ משימה מתבנית Flex של Dataflow בלי כל ההרשאות הנדרשות, יכול להיות שהמשימה תיכשל עם שגיאה בקריאת קובץ התוצאות או עם שגיאת דחיית הרשאה במשאב. מידע נוסף זמין במאמר בנושא פתרון בעיות בתבניות Flex.

פיתוח צינור עיבוד נתונים של Dataflow

בקטע הזה מוסבר איך להגדיר את המחבר בפעם הראשונה, ומוצגות דוגמאות לשילובים נפוצים עם התכונה 'סנכרון שינויים בזרמי נתונים' של Spanner.

כדי לבצע את השלבים האלה, צריך סביבת פיתוח של Java ל-Dataflow. מידע נוסף זמין במאמר בנושא יצירת צינור עיבוד נתונים של Dataflow באמצעות Java.

יצירת שידור חי של שינויים

פרטים על יצירת מקור נתונים לשינויים מופיעים במאמר יצירת מקור נתונים לשינויים. כדי להמשיך לשלבים הבאים, צריך מסד נתונים של Spanner עם מקור נתונים לשינויים.

הענקת הרשאות גישה פרטניות

אם אתם מצפים שמשתמשים עם בקרת גישה מפורטת יפעילו את משימת Dataflow, עליכם לוודא שהמשתמשים קיבלו גישה לתפקיד במסד הנתונים עם הרשאה SELECT בזרם השינויים והרשאה EXECUTE בפונקציה של טבלת הערכים של זרם השינויים. צריך גם לוודא שהגורם המבצע מציין את תפקיד מסד הנתונים בהגדרת SpannerIO או בתבנית הגמישה של Dataflow.

מידע נוסף זמין במאמר מידע על בקרת גישה ברמת גרנולריות גבוהה.

הוספת המחבר SpannerIO כתלות

מחבר SpannerIO של Apache Beam מסתיר את המורכבות של צריכת סנכרון שינויים בזרמי נתונים ישירות באמצעות Cloud Spanner API, ויוצר מערך נתונים מורכב (PCollection) של רשומות נתונים של סנכרון שינויים בזרמי נתונים לשלבים מאוחרים יותר בצינור עיבוד הנתונים.

אפשר להשתמש באובייקטים האלה בשלבים אחרים של צינור ה-Dataflow של המשתמש. השילוב של זרם השינויים הוא חלק מהמחבר SpannerIO. כדי להשתמש במחבר SpannerIO, צריך להוסיף את יחסי התלות לקובץ pom.xml:

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
  <version>${beam-version}</version> <!-- available from version 2.38.0 -->
</dependency>

יצירת מסד נתונים של מטא נתונים

המחבר צריך לעקוב אחרי כל מחיצה בזמן הרצת צינור הנתונים של Apache Beam. המטא-נתונים האלה נשמרים בטבלת Spanner שנוצרה על ידי המחבר במהלך האתחול. כשמגדירים את המחבר, מציינים את מסד הנתונים שבו תיצור הטבלה.

כמו שמתואר בשיטות מומלצות לשימוש בסנכרון שינויים בזרמי נתונים, אנחנו ממליצים ליצור מסד נתונים חדש למטרה הזו, במקום לאפשר למחבר להשתמש במסד הנתונים של האפליקציה כדי לאחסן את טבלת המטא-נתונים שלו.

הבעלים של משימת Dataflow שמשתמשת במחבר SpannerIO צריך להגדיר את הרשאות ה-IAM הבאות במסד הנתונים של המטא-נתונים:

  • spanner.databases.updateDdl
  • spanner.databases.beginReadOnlyTransaction
  • spanner.databases.beginOrRollbackReadWriteTransaction
  • spanner.databases.read
  • spanner.databases.select
  • spanner.databases.write
  • spanner.sessions.create
  • spanner.sessions.get

הגדרת המחבר

אפשר להגדיר את מחבר הנתונים של Spanner סנכרון שינויים בזרמי נתונים באופן הבא:

SpannerConfig spannerConfig = SpannerConfig
  .create()
  .withProjectId("my-project-id")
  .withInstanceId("my-instance-id")
  .withDatabaseId("my-database-id")
  .withDatabaseRole("my-database-role");    // Needed for fine-grained access control only

Timestamp startTime = Timestamp.now();
Timestamp endTime = Timestamp.ofTimeSecondsAndNanos(
   startTime.getSeconds() + (10 * 60),
   startTime.getNanos()
);

SpannerIO
  .readChangeStream()
  .withSpannerConfig(spannerConfig)
  .withChangeStreamName("my-change-stream")
  .withMetadataInstance("my-meta-instance-id")
  .withMetadataDatabase("my-meta-database-id")
  .withMetadataTable("my-meta-table-name")
  .withRpcPriority(RpcPriority.MEDIUM)
  .withInclusiveStartAt(startTime)
  .withInclusiveEndAt(endTime);

אלה התיאורים של האפשרויות בreadChangeStream():

הגדרת Spanner (נדרשת)

משמשות להגדרת הפרויקט, המופע ומסד הנתונים שבהם נוצר עדכון הנתונים ומהם צריך להריץ שאילתות. אפשר גם לציין את תפקיד מסד הנתונים שבו יש להשתמש כשמנהל ה-IAM שמריץ את משימת Dataflow הוא משתמש עם בקרת גישה מפורטת. התפקיד הזה במסד הנתונים מוקצה לעבודת הגיבוי כדי לאפשר גישה לזרם השינויים. למידע נוסף, ראו מידע על בקרת גישה ברמת דיוק גבוהה.

שם השינוי בשידור (חובה)

השם הזה מזהה באופן ייחודי את מקור הנתונים לשינויים. השם שמופיע כאן צריך להיות זהה לשם שבו השתמשתם כשיצרתם את המשתנה.

מזהה מופע של מטא-נתונים (אופציונלי)

זהו המופע שבו מאוחסנים המטא-נתונים שמשמשים את המחבר כדי לשלוט בשימוש בנתוני ה-API של זרם השינויים.

מזהה מסד נתונים של מטא-נתונים (חובה)

זהו מסד הנתונים שבו מאוחסנים המטא-נתונים שמשמשים את המחבר כדי לשלוט בשימוש בנתונים של API של זרם שינויים.

שם טבלת המטא-נתונים (אופציונלי)

צריך להשתמש במאפיין הזה רק כשמעדכנים צינור קיים.

זהו שם טבלת המטא-נתונים הקיימת מראש שבה ישתמש המחבר. המחבר משתמש בזה כדי לאחסן את המטא-נתונים כדי לשלוט בשימוש בנתונים של ה-API של זרם השינויים. אם לא מציינים את האפשרות הזו, מערכת Spanner יוצרת טבלה חדשה עם שם שנוצר במהלך האתחול של המחבר.

העדיפות של ה-RPC (אופציונלי)

העדיפות של הבקשה שתשמש לשאילתות של שינוי הנתונים. אם לא מציינים את הפרמטר הזה, המערכת תשתמש ב-high priority.

InclusiveStartAt (חובה)

השינויים מחותמת הזמן שצוינה מוחזרים למתקשר.

InclusiveEndAt (אופציונלי)

השינויים עד חותמת הזמן שצוינה מוחזרים למתקשר. אם לא מציינים את הפרמטר הזה, השינויים יופקו ללא הגבלה.

הוספת טרנספורמציות ויעדים לעיבוד נתונים של שינויים

אחרי שמשלימים את השלבים הקודמים, מחבר SpannerIO שהוגדר מוכן לפלוט PCollection של אובייקטים מסוג DataChangeRecord. במאמר דוגמאות לטרנספורמציות ולמקורות נתונים מופיעות כמה דוגמאות להגדרות של צינורות להעברת נתונים שמעבדים את הנתונים האלה בדרכים שונות.

חשוב לזכור שהרשומות של זרם השינויים שמופקות על ידי מחבר SpannerIO לא מסודרות. הסיבה לכך היא ש-PCollections לא מספקות ערבויות לגבי סדר הפריטים. אם אתם צריכים סטרימינג מסודר, אתם צריכים לקבץ ולמיין את הרשומות כטרנספורמציות בצינורות שלכם. אפשר לעיין בדוגמה: מיון לפי מפתח. אפשר להרחיב את הדוגמה הזו כדי למיין את הרשומות לפי שדות כלשהם ברשומות, למשל לפי מזהי עסקאות.

דוגמאות לטרנספורמציות ולמקורות נתונים

אתם יכולים להגדיר המרות משלכם ולציין מקורות לנתונים שאליהם ייכתבו הנתונים. במסמכי התיעוד של Apache Beam יש מגוון רחב של טרנספורמציות שאפשר להחיל, וגם מחברי קלט/פלט מוכנים לשימוש שאפשר לכתוב איתם את הנתונים למערכות חיצוניות.

דוגמה: סידור לפי מפתח

דוגמת הקוד הזו פולטת רשומות של שינויים בנתונים שמסודרות לפי חותמת הזמן של השמירה ומקובצות לפי מפתחות ראשיים באמצעות המחבר Dataflow.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new BreakRecordByModFn()))
  .apply(ParDo.of(new KeyByIdFn()))
  .apply(ParDo.of(new BufferKeyUntilOutputTimestamp()))
  // Subsequent processing goes here

בדוגמת הקוד הזו נעשה שימוש במצבים ובטיימרים כדי לשמור נתונים זמנית לכל מפתח, ומוגדר זמן התפוגה של הטיימר לזמן T כלשהו בעתיד שהמשתמש הגדיר (מוגדר בפונקציה BufferKeyUntilOutputTimestamp). כשסימן המים של Dataflow עובר את הזמן T, הקוד הזה מרוקן את כל הרשומות במאגר עם חותמת זמן שקטנה מ-T, מסדר את הרשומות האלה לפי חותמת הזמן של השמירה ומוציא צמד מפתח/ערך שבו:

  • המפתח הוא מפתח הקלט, כלומר המפתח הראשי שעבר גיבוב למערך של קטגוריות בגודל 1,000.
  • הערך הוא רשומות השינוי של הנתונים שהועברו למאגר זמני עבור המפתח.

לכל מפתח, אנחנו מספקים את ההתחייבויות הבאות:

  • הטיימרים מופעלים לפי סדר חותמות הזמן של התפוגה.
  • מובטח שהשלבים הבאים יקבלו את הרכיבים באותו סדר שבו הם נוצרו.

לדוגמה, אם המפתח הוא הערך 100, הטיימר מופעל ב-T1 וב-T10 בהתאמה, ויוצר חבילה של רשומות של שינויים בנתונים בכל חותמת זמן. מכיוון שרשומות השינויים בנתונים שנוצרו בשעה T1 נוצרו לפני רשומות השינויים בנתונים שנוצרו בשעה T10, מובטח גם שרשומות השינויים בנתונים שנוצרו בשעה T1 יתקבלו בשלב הבא לפני רשומות השינויים בנתונים שנוצרו בשעה T10. המנגנון הזה עוזר לנו להבטיח סדר קפדני של חותמות זמן של ביצוע פעולות (commit) לכל מפתח ראשי לעיבוד במורד הזרם.

התהליך הזה יחזור על עצמו עד שהצינור יסתיים וכל הרשומות של שינויי הנתונים יעברו עיבוד (או שהוא יחזור על עצמו ללא הגבלת זמן אם לא צוין זמן סיום).

שימו לב שבקוד לדוגמה הזה נעשה שימוש במצבים ובטיימרים, במקום בחלונות, כדי לבצע סידור לפי מפתח. ההסבר הוא שאין ערובה לכך שהחלונות יעובדו לפי הסדר. המשמעות היא שחלונות ישנים יותר יכולים לעבור עיבוד מאוחר יותר מחלונות חדשים יותר, מה שעלול לגרום לעיבוד לא מסודר.

BreakRecordByModFn

כל רשומה של שינוי בנתונים יכולה להכיל כמה שינויים. כל שינוי מייצג הוספה, עדכון או מחיקה של ערך אחד של מפתח ראשי. הפונקציה הזו מפרקת כל רשומה של שינוי נתונים לרשומות נפרדות של שינוי נתונים, אחת לכל שינוי.

private static class BreakRecordByModFn extends DoFn<DataChangeRecord,
                                                     DataChangeRecord>  {
  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record, OutputReceiver<DataChangeRecord>
    outputReceiver) {
    record.getMods().stream()
      .map(
          mod ->
              new DataChangeRecord(
                  record.getPartitionToken(),
                  record.getCommitTimestamp(),
                  record.getServerTransactionId(),
                  record.isLastRecordInTransactionInPartition(),
                  record.getRecordSequence(),
                  record.getTableName(),
                  record.getRowType(),
                  Collections.singletonList(mod),
                  record.getModType(),
                  record.getValueCaptureType(),
                  record.getNumberOfRecordsInTransaction(),
                  record.getNumberOfPartitionsInTransaction(),
                  record.getTransactionTag(),
                  record.isSystemTransaction(),
                  record.getMetadata()))
      .forEach(outputReceiver::output);
  }
}

KeyByIdFn

הפונקציה הזו מקבלת DataChangeRecord ומחזירה DataChangeRecord עם מפתח שנוצר על ידי גיבוב של המפתח הראשי של Spanner לערך שלם.

private static class KeyByIdFn extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>>  {
  // NUMBER_OF_BUCKETS should be configured by the user to match their key cardinality
  // Here, we are choosing to hash the Spanner primary keys to a bucket index, in order to have a deterministic number
  // of states and timers for performance purposes.
  // Note that having too many buckets might have undesirable effects if it results in a low number of records per bucket
  // On the other hand, having too few buckets might also be problematic, since many keys will be contained within them.
  private static final int NUMBER_OF_BUCKETS = 1000;

  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record,
      OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
    int hashCode = (int) record.getMods().get(0).getKeysJson().hashCode();
    // Hash the received keys into a bucket in order to have a
    // deterministic number of buffers and timers.
    String bucketIndex = String.valueOf(hashCode % NUMBER_OF_BUCKETS);

    outputReceiver.output(KV.of(bucketIndex, record));
  }
}

BufferKeyUntilOutputTimestamp

הטיימרים והמאגרי נתונים הם לכל מפתח. הפונקציה הזו שומרת במאגר זמני כל רשומה של שינוי בנתונים עד שסימן המים עובר את חותמת הזמן שבה רוצים להוציא את הרשומות של השינויים בנתונים שנשמרו במאגר הזמני.

הקוד הזה משתמש בטיימר חוזר כדי לקבוע מתי לנקות את המאגר:

  1. כשהפונקציה מזהה בפעם הראשונה רשומה של שינוי נתונים עבור מפתח, היא מגדירה את הטיימר להפעלה בחותמת הזמן של ביצוע השינוי ברשומה + incrementIntervalSeconds (אפשרות שניתנת להגדרה על ידי המשתמש).
  2. כשהטיימר מופעל, הוא מוסיף ל-recordsToOutput את כל רשומות שינוי הנתונים במאגר הזמני עם חותמת זמן שקטנה מזמן התפוגה של הטיימר. אם במאגר הזמני יש רשומות של שינויים בנתונים עם חותמת זמן שגדולה מזמן התפוגה של הטיימר או שווה לו, המערכת מוסיפה את הרשומות האלה בחזרה למאגר הזמני במקום להוציא אותן. לאחר מכן, הטיימר הבא מוגדר לזמן התפוגה של הטיימר הנוכחי בתוספת incrementIntervalInSeconds.
  3. אם recordsToOutput לא ריק, הפונקציה מסדרת את רשומות שינוי הנתונים ב-recordsToOutput לפי חותמת הזמן של השמירה ומזהה העסקה, ואז מוציאה אותן.
private static class BufferKeyUntilOutputTimestamp extends
    DoFn<KV<String, DataChangeRecord>, KV<String, Iterable<DataChangeRecord>>>  {
  private static final Logger LOG =
      LoggerFactory.getLogger(BufferKeyUntilOutputTimestamp.class);

  private final long incrementIntervalInSeconds = 2;

  private BufferKeyUntilOutputTimestamp(long incrementIntervalInSeconds) {
    this.incrementIntervalInSeconds = incrementIntervalInSeconds;
  }

  @SuppressWarnings("unused")
  @TimerId("timer")
  private final TimerSpec timerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);

  @StateId("buffer")
  private final StateSpec<BagState<DataChangeRecord>> buffer = StateSpecs.bag();

  @StateId("keyString")
  private final StateSpec<ValueState<String>> keyString =
      StateSpecs.value(StringUtf8Coder.of());

  @ProcessElement
  public void process(
      @Element KV<String, DataChangeRecord> element,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @TimerId("timer") Timer timer,
      @StateId("keyString") ValueState<String> keyString) {
    buffer.add(element.getValue());

    // Only set the timer if this is the first time we are receiving a data change
    // record with this key.
    String elementKey = keyString.read();
    if (elementKey == null) {
      Instant commitTimestamp =
          new Instant(element.getValue().getCommitTimestamp().toSqlTimestamp());
      Instant outputTimestamp =
          commitTimestamp.plus(Duration.standardSeconds(incrementIntervalInSeconds));
      timer.set(outputTimestamp);
      keyString.write(element.getKey());
    }
  }

  @OnTimer("timer")
  public void onExpiry(
      OnTimerContext context,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @TimerId("timer") Timer timer,
      @StateId("keyString") ValueState<String> keyString) {
    if (!buffer.isEmpty().read()) {
      String elementKey = keyString.read();

      final List<DataChangeRecord> records =
          StreamSupport.stream(buffer.read().spliterator(), false)
              .collect(Collectors.toList());
      buffer.clear();

      List<DataChangeRecord> recordsToOutput = new ArrayList<>();
      for (DataChangeRecord record : records) {
        Instant recordCommitTimestamp =
            new Instant(record.getCommitTimestamp().toSqlTimestamp());
        final String recordString =
            record.getMods().get(0).getNewValuesJson().isEmpty()
                ? "Deleted record"
                : record.getMods().get(0).getNewValuesJson();
        // When the watermark passes time T, this means that all records with
        // event time < T have been processed and successfully committed. Since the
        // timer fires when the watermark passes the expiration time, we should
        // only output records with event time < expiration time.
        if (recordCommitTimestamp.isBefore(context.timestamp())) {
          LOG.info(
             "Outputting record with key {} and value {} at expiration " +
             "timestamp {}",
              elementKey,
              recordString,
              context.timestamp().toString());
          recordsToOutput.add(record);
        } else {
          LOG.info(
              "Expired at {} but adding record with key {} and value {} back to " +
              "buffer due to commit timestamp {}",
              context.timestamp().toString(),
              elementKey,
              recordString,
              recordCommitTimestamp.toString());
          buffer.add(record);
        }
      }

      // Output records, if there are any to output.
      if (!recordsToOutput.isEmpty()) {
        // Order the records in place, and output them. The user would need
        // to implement DataChangeRecordComparator class that sorts the
        // data change records by commit timestamp and transaction ID.
        Collections.sort(recordsToOutput, new DataChangeRecordComparator());
        context.outputWithTimestamp(
            KV.of(elementKey, recordsToOutput), context.timestamp());
        LOG.info(
            "Expired at {}, outputting records for key {}",
            context.timestamp().toString(),
            elementKey);
      } else {
        LOG.info("Expired at {} with no records", context.timestamp().toString());
      }
    }

    Instant nextTimer = context.timestamp().plus(Duration.standardSeconds(incrementIntervalInSeconds));
    if (buffer.isEmpty() != null && !buffer.isEmpty().read()) {
      LOG.info("Setting next timer to {}", nextTimer.toString());
      timer.set(nextTimer);
    } else {
      LOG.info(
          "Timer not being set since the buffer is empty: ");
      keyString.clear();
    }
  }
}

סדר העסקאות

אפשר לשנות את צינור הנתונים הזה כך שהנתונים ימוינו לפי מזהה עסקה וחותמת זמן של ביצוע. כדי לעשות זאת, צריך לשמור במאגר זמני רשומות לכל זוג של מזהה עסקה / חותמת זמן של אישור, במקום לכל מפתח Spanner. כדי לעשות זאת, צריך לשנות את הקוד ב-KeyByIdFn.

דוגמה: הרכבת עסקאות

דוגמת הקוד הזו קוראת רשומות של שינויי נתונים, מרכיבה את כל הרשומות של שינויי הנתונים ששייכות לאותה עסקה לרכיב יחיד, ומפיקה את הרכיב הזה. חשוב לדעת: העסקאות שמופקות על ידי קוד לדוגמה זה לא מסודרות לפי חותמת הזמן של השמירה.

בדוגמת הקוד הזו נעשה שימוש במאגרי נתונים זמניים כדי להרכיב עסקאות מתוך רשומות של שינויים בנתונים. כשמתקבלת בפעם הראשונה רשומה של שינוי נתונים ששייכת לעסקה, המערכת קוראת את השדה numberOfRecordsInTransaction ברשומה של שינוי הנתונים, שמתאר את מספר הרשומות הצפוי של שינוי הנתונים ששייכות לעסקה הזו. הוא שומר במאגר את רשומות השינויים בנתונים ששייכות לעסקה הזו עד שמספר הרשומות במאגר תואם ל-numberOfRecordsInTransaction, ואז הוא מוציא את רשומות השינויים בנתונים כחבילה.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new KeyByTransactionIdFn()))
  .apply(ParDo.of(new TransactionBoundaryFn()))
  // Subsequent processing goes here

KeyByTransactionIdFn

הפונקציה הזו מקבלת DataChangeRecord ומחזירה DataChangeRecord עם מפתח שהוא מזהה העסקה.

private static class KeyByTransactionIdFn extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>>  {
  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record,
      OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
    outputReceiver.output(KV.of(record.getServerTransactionId(), record));
  }
}

TransactionBoundaryFn

מאגרי TransactionBoundaryFn קיבלו צמדי מפתח/ערך של {TransactionId, DataChangeRecord} מ-KeyByTransactionIdFn, והם מאגדים אותם בקבוצות על סמך TransactionId. כאשר מספר הרשומות שנשמרו במאגר הזמני שווה למספר הרשומות שכלולות בעסקה כולה, הפונקציה הזו ממיינת את אובייקטי DataChangeRecord בקבוצה לפי רצף הרשומות ומחזירה צמד מפתח/ערך של {CommitTimestamp, TransactionId}, Iterable<DataChangeRecord>.

בדוגמה הזו, אנחנו מניחים ש-SortKey הוא מחלקה שהוגדרה על ידי המשתמש ומייצגת זוג {CommitTimestamp, TransactionId}. מידע נוסף על SortKey זמין בהטמעה לדוגמה.

private static class TransactionBoundaryFn extends DoFn<KV<String, DataChangeRecord>, KV<SortKey, Iterable<DataChangeRecord>>>  {
  @StateId("buffer")
  private final StateSpec<BagState<DataChangeRecord>> buffer = StateSpecs.bag();

  @StateId("count")
  private final StateSpec<ValueState<Integer>> countState = StateSpecs.value();

  @ProcessElement
  public void process(
      ProcessContext context,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @StateId("count") ValueState<Integer> countState) {
    final KV<String, DataChangeRecord> element = context.element();
    final DataChangeRecord record = element.getValue();

    buffer.add(record);
    int count = (countState.read() != null ? countState.read() : 0);
    count = count + 1;
    countState.write(count);

    if (count == record.getNumberOfRecordsInTransaction()) {
      final List<DataChangeRecord> sortedRecords =
          StreamSupport.stream(buffer.read().spliterator(), false)
              .sorted(Comparator.comparing(DataChangeRecord::getRecordSequence))
              .collect(Collectors.toList());

      final Instant commitInstant =
          new Instant(sortedRecords.get(0).getCommitTimestamp().toSqlTimestamp()
              .getTime());
      context.outputWithTimestamp(
          KV.of(
              new SortKey(sortedRecords.get(0).getCommitTimestamp(),
                          sortedRecords.get(0).getServerTransactionId()),
              sortedRecords),
          commitInstant);
      buffer.clear();
      countState.clear();
    }
  }
}

דוגמה: סינון לפי תג עסקה

כשמתייגים עסקה שמשנה את נתוני המשתמש, התג המתאים והסוג שלו נשמרים כחלק מ-DataChangeRecord. בדוגמאות האלה אפשר לראות איך מסננים רשומות של שינויים לפי תגי עסקאות שהוגדרו על ידי המשתמש וגם לפי תגי מערכת:

סינון תגים בהגדרת המשתמש ב-my-tx-tag:

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(Filter.by(record ->
           !record.isSystemTransaction()
           && record.getTransactionTag().equalsIgnoreCase("my-tx-tag")))
  // Subsequent processing goes here

סינון תגי מערכת/ביקורת של TTL:

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(Filter.by(record ->
           record.isSystemTransaction()
           && record.getTransactionTag().equals("RowDeletionPolicy")))
  // Subsequent processing goes here

דוגמה: אחזור של שורה מלאה

הדוגמה הזו פועלת עם טבלת Spanner בשם Singer עם ההגדרה הבאה:

CREATE TABLE Singers (
  SingerId INT64 NOT NULL,
  FirstName STRING(1024),
  LastName STRING(1024)
) PRIMARY KEY (SingerId);

במצב ברירת המחדל OLD_AND_NEW_VALUES של לכידת ערכים ב<b>סנכרון שינויים בזרמי נתונים</b>, כשמתבצע עדכון בשורה ב-Spanner, רשומת שינוי הנתונים שמתקבלת תכיל רק את העמודות שהשתנו. עמודות שמתועדות אבל לא משתנות לא ייכללו ברשומה. אפשר להשתמש במפתח הראשי של ה-mod כדי לבצע קריאת תמונת מצב של Spanner בחותמת הזמן של השמירה של רשומת שינוי הנתונים, כדי לאחזר את העמודות שלא השתנו או אפילו לאחזר את השורה המלאה.

שימו לב: יכול להיות שתצטרכו לשנות את מדיניות שמירת הנתונים של מסד הנתונים לערך שגדול ממדיניות שמירת הנתונים של סנכרון שינויים בזרמי נתונים או שווה לה, כדי שהקריאה של התמונה תצליח.

חשוב גם לדעת שהשימוש בסוג לכידת הערכים NEW_ROW הוא הדרך המומלצת והיעילה יותר לעשות זאת, כי הוא מחזיר כברירת מחדל את כל העמודות המפוקחות בשורה ולא דורש קריאה נוספת של תמונת מצב לתוך Spanner.

SpannerConfig spannerConfig = SpannerConfig
   .create()
   .withProjectId("my-project-id")
   .withInstanceId("my-instance-id")
   .withDatabaseId("my-database-id")
   .withDatabaseRole("my-database-role");   // Needed for fine-grained access control only

pipeline
   .apply(SpannerIO
       .readChangeStream()
       .withSpannerConfig(spannerConfig)
       // Assume we have a change stream "my-change-stream" that watches Singers table.
       .withChangeStreamName("my-change-stream")
       .withMetadataInstance("my-metadata-instance-id")
       .withMetadataDatabase("my-metadata-database-id")
       .withInclusiveStartAt(Timestamp.now()))
   .apply(ParDo.of(new ToFullRowJsonFn(spannerConfig)))
   // Subsequent processing goes here

ToFullRowJsonFn

הטרנספורמציה הזו תבצע קריאה בעבר בחותמת הזמן של כל רשומה שהתקבלה, ותמפה את השורה המלאה ל-JSON.

public class ToFullRowJsonFn extends DoFn<DataChangeRecord, String> {
 // Since each instance of this DoFn will create its own session pool and will
 // perform calls to Spanner sequentially, we keep the number of sessions in
 // the pool small. This way, we avoid wasting resources.
 private static final int MIN_SESSIONS = 1;
 private static final int MAX_SESSIONS = 5;
 private final String projectId;
 private final String instanceId;
 private final String databaseId;

 private transient DatabaseClient client;
 private transient Spanner spanner;

 public ToFullRowJsonFn(SpannerConfig spannerConfig) {
   this.projectId = spannerConfig.getProjectId().get();
   this.instanceId = spannerConfig.getInstanceId().get();
   this.databaseId = spannerConfig.getDatabaseId().get();
 }

 @Setup
 public void setup() {
   SessionPoolOptions sessionPoolOptions = SessionPoolOptions
      .newBuilder()
      .setMinSessions(MIN_SESSIONS)
      .setMaxSessions(MAX_SESSIONS)
      .build();
   SpannerOptions options = SpannerOptions
       .newBuilder()
       .setProjectId(projectId)
       .setSessionPoolOption(sessionPoolOptions)
       .build();
   DatabaseId id = DatabaseId.of(projectId, instanceId, databaseId);
   spanner = options.getService();
   client = spanner.getDatabaseClient(id);
 }

 @Teardown
 public void teardown() {
   spanner.close();
 }

 @ProcessElement
 public void process(
   @Element DataChangeRecord element,
   OutputReceiver<String> output) {
   com.google.cloud.Timestamp commitTimestamp = element.getCommitTimestamp();
   element.getMods().forEach(mod -> {
     JSONObject keysJson = new JSONObject(mod.getKeysJson());
     JSONObject newValuesJson = new JSONObject(mod.getNewValuesJson());
     ModType modType = element.getModType();
     JSONObject jsonRow = new JSONObject();
     long singerId = keysJson.getLong("SingerId");
     jsonRow.put("SingerId", singerId);
     if (modType == ModType.INSERT) {
       // For INSERT mod, get non-primary key columns from mod.
       jsonRow.put("FirstName", newValuesJson.get("FirstName"));
       jsonRow.put("LastName", newValuesJson.get("LastName"));
     } else if (modType == ModType.UPDATE) {
       // For UPDATE mod, get non-primary key columns by doing a snapshot read using the primary key column from mod.
       try (ResultSet resultSet = client
         .singleUse(TimestampBound.ofReadTimestamp(commitTimestamp))
         .read(
           "Singers",
           KeySet.singleKey(com.google.cloud.spanner.Key.of(singerId)),
             Arrays.asList("FirstName", "LastName"))) {
         if (resultSet.next()) {
           jsonRow.put("FirstName", resultSet.isNull("FirstName") ?
             JSONObject.NULL : resultSet.getString("FirstName"));
           jsonRow.put("LastName", resultSet.isNull("LastName") ?
             JSONObject.NULL : resultSet.getString("LastName"));
         }
       }
     } else {
       // For DELETE mod, there is nothing to do, as we already set SingerId.
     }

     output.output(jsonRow.toString());
   });
 }
}

הקוד הזה יוצר לקוח של מסד נתונים ב-Spanner כדי לבצע את האחזור המלא של השורה, ומגדיר את מאגר הסשנים כך שיכלול רק כמה סשנים, ויבצע קריאות במופע אחד של ToFullReowJsonFn באופן עקבי. ‫Dataflow דואג ליצור הרבה מופעים של הפונקציה הזו, ולכל אחד מהם יש מאגר לקוחות משלו.

דוגמה: Spanner ל-Pub/Sub

בתרחיש הזה, המתקשר מעביר רשומות ל-Pub/Sub במהירות האפשרית, ללא קיבוץ או צבירה. האפשרות הזו מתאימה להפעלת עיבוד בהמשך, למשל להזרמת כל השורות החדשות שנוספו לטבלת Spanner אל Pub/Sub לעיבוד נוסף.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
  .apply(PubsubIO.writeStrings().to("my-topic"));

שימו לב שאפשר להגדיר את יעד Pub/Sub כך שיבטיח סמנטיקה של פעם אחת בדיוק.

דוגמה: Spanner ל-Cloud Storage

בתרחיש הזה, המתקשר מקבץ את כל הרשומות בחלון נתון ושומר את הקבוצה בקבצים נפרדים ב-Cloud Storage. האפשרות הזו מתאימה לניתוח ולשמירת נתונים בארכיון בנקודת זמן מסוימת, והיא לא תלויה בתקופת השמירה של Spanner.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
  .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
  .apply(TextIO
    .write()
    .to("gs://my-bucket/change-stream-results-")
    .withSuffix(".txt")
    .withWindowedWrites()
    .withNumShards(1));

שימו לב: יעד Cloud Storage מספק סמנטיקה של לפחות מסירה אחת כברירת מחדל. עם עיבוד נוסף, אפשר לשנות את ההגדרה כך שתהיה סמנטיקה של 'פעם אחת בדיוק'.

אנחנו מספקים גם תבנית Dataflow לתרחיש השימוש הזה: ראו חיבור של סנכרון שינויים בזרמי נתונים ל-Cloud Storage.

דוגמה: Spanner ל-BigQuery (טבלת ספר חשבונות)

במקרה הזה, המתקשר מעביר רשומות שינוי ל-BigQuery. כל רשומה של שינוי בנתונים משתקפת כשורה אחת ב-BigQuery. התכונה הזו מתאימה לניתוח נתונים. הקוד הזה משתמש בפונקציות שהוגדרו קודם, בקטע Fetch full row, כדי לאחזר את השורה המלאה של הרשומה ולכתוב אותה ב-BigQuery.

SpannerConfig spannerConfig = SpannerConfig
  .create()
  .withProjectId("my-project-id")
  .withInstanceId("my-instance-id")
  .withDatabaseId("my-database-id")
  .withDatabaseRole("my-database-role");   // Needed for fine-grained access control only

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(spannerConfig)
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new ToFullRowJsonFn(spannerConfig)))
  .apply(BigQueryIO
    .<String>write()
    .to("my-bigquery-table")
    .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
    .withWriteDisposition(Write.WriteDisposition.WRITE_APPEND)
    .withSchema(new TableSchema().setFields(Arrays.asList(
      new TableFieldSchema()
        .setName("SingerId")
        .setType("INT64")
        .setMode("REQUIRED"),
      new TableFieldSchema()
        .setName("FirstName")
        .setType("STRING")
        .setMode("REQUIRED"),
      new TableFieldSchema()
        .setName("LastName")
        .setType("STRING")
        .setMode("REQUIRED")
    )))
    .withAutoSharding()
    .optimizedWrites()
    .withFormatFunction((String element) -> {
      ObjectMapper objectMapper = new ObjectMapper();
      JsonNode jsonNode = null;
      try {
        jsonNode = objectMapper.readTree(element);
      } catch (IOException e) {
        e.printStackTrace();
      }
      return new TableRow()
        .set("SingerId", jsonNode.get("SingerId").asInt())
        .set("FirstName", jsonNode.get("FirstName").asText())
        .set("LastName", jsonNode.get("LastName").asText());
    }
  )
);

שימו לב: יעד BigQuery מספק סמנטיקה של 'לפחות פעם אחת' כברירת מחדל. עם עיבוד נוסף, אפשר לשנות את ההגדרה כך שתהיה סמנטיקה של 'פעם אחת בדיוק'.

אנחנו מספקים גם תבנית Dataflow לתרחיש השימוש הזה. אפשר לעיין במאמר קישור של סנכרון שינויים בזרמי נתונים ל-BigQuery.

מעקב אחרי צינור עיבוד נתונים

יש שני סוגים של מדדים שזמינים למעקב אחרי צינור עיבוד נתונים של Dataflow לשינוי נתונים.

מדדים רגילים של Dataflow

‫Dataflow מספק כמה מדדים כדי לוודא שהעבודה תקינה, כמו רעננות הנתונים, השהיית המערכת, קצב העברת הנתונים של העבודה, ניצול המעבד של העובד ועוד. מידע נוסף זמין במאמר שימוש ב-Monitoring לצינורות Dataflow.

בצינורות של סנכרון שינויים בזרמי נתונים, יש שני מדדים עיקריים שחשוב להתייחס אליהם: השהיית המערכת ועדכניות הנתונים.

החביון של המערכת יציין את משך הזמן המקסימלי הנוכחי (בשניות) שבו פריט נתונים מעובד או ממתין לעיבוד.

עדכניות הנתונים תציג את משך הזמן שחלף בין עכשיו (זמן אמת) לבין סימן המים של הפלט. סימן המים של הפלט T מציין שכל הרכיבים עם זמן אירוע (במובן המדויק) לפני T עברו עיבוד לצורך חישוב. במילים אחרות, רעננות הנתונים מציינת עד כמה צינור עיבוד הנתונים מעודכן מבחינת עיבוד האירועים שהתקבלו בו.

אם אין מספיק משאבים בצינור, אפשר לראות את ההשפעה הזו בשני המדדים האלה. החביון של המערכת יגדל, כי הפריטים יצטרכו לחכות יותר זמן לפני שהם יעברו עיבוד. גם עדכניות הנתונים תגדל, כי צינור הנתונים לא יוכל לעמוד בקצב של כמות הנתונים שמתקבלת.

מדדים מותאמים אישית של שינויים בנתונים

המדדים האלה מוצגים ב-Cloud Monitoring וכוללים:

  • זמן האחזור (היסטוגרמה) בין הרגע שבו רשומה מתבצעת ב-Spanner לבין הרגע שבו היא מועברת ל-PCollection על ידי המחבר. אפשר להשתמש במדד הזה כדי לראות אם יש בעיות בביצועים (זמן האחזור) של צינור הנתונים.
  • המספר הכולל של רשומות הנתונים שנקראו. זהו אינדיקציה כללית למספר הרשומות שהמחבר פולט. המספר הזה צריך לעלות כל הזמן, בהתאם למגמת הכתיבה במסד הנתונים הבסיסי של Spanner.
  • מספר המחיצות שמתבצעת מהן קריאה. תמיד צריך לקרוא מחיצות. אם המספר הוא אפס, המשמעות היא שהתרחשה שגיאה בצינור.
  • המספר הכולל של השאילתות שהונפקו במהלך ההרצה של המחבר. זהו סימן כולל לשאילתות של זרם השינויים שבוצעו במופע Spanner במהלך ההפעלה של צינור הנתונים. אפשר להשתמש בערך הזה כדי לקבל הערכה של העומס מהמחבר למסד הנתונים של Spanner.

עדכון של צינור קיים לעיבוד נתונים

אפשר לעדכן צינור פעיל שמשתמש במחבר SpannerIO כדי לעבד סנכרון שינויים בזרמי נתונים, אם בדיקות התאימות של העבודה עוברות בהצלחה. כדי לעשות זאת, צריך להגדיר במפורש את הפרמטר של שם טבלת המטא-נתונים של הג'וב החדש כשמעדכנים אותו. משתמשים בערך של אפשרות הצינור metadataTable מהעבודה שמעדכנים.

אם אתם משתמשים בתבנית Dataflow שסופקה על ידי Google, צריך להגדיר את שם הטבלה באמצעות הפרמטר spannerMetadataTableName. אפשר גם לשנות את העבודה הקיימת כדי להשתמש באופן מפורש בטבלת המטא נתונים באמצעות השיטה withMetadataTable(your-metadata-table-name) בהגדרת המחבר. אחרי שמסיימים את השלב הזה, אפשר לפעול לפי ההוראות במאמר הפעלת עבודת ההחלפה מתוך מסמכי Dataflow כדי לעדכן עבודה שפועלת.

שיטות מומלצות לשימוש בסנכרון שינויים בזרמי נתונים וב-Dataflow

ריכזנו כאן כמה שיטות מומלצות ליצירת חיבורים של סנכרון שינויים בזרמי נתונים באמצעות Dataflow.

שימוש במסד נתונים נפרד של מטא-נתונים

מומלץ ליצור מסד נתונים נפרד לשימוש של מחבר SpannerIO לאחסון מטא-נתונים, במקום להגדיר אותו לשימוש במסד הנתונים של האפליקציה.

מידע נוסף זמין במאמר שימוש במסד נתונים נפרד של מטא-נתונים.

קביעת הגודל של האשכול

כלל אצבע למספר העובדים הראשוני במשימת סנכרון שינויים בזרמי נתונים של Spanner הוא עובד אחד לכל 1,000 פעולות כתיבה בשנייה. שימו לב שההערכה הזו יכולה להשתנות בהתאם לכמה גורמים, כמו גודל כל טרנזקציה, כמה רשומות של שינוי נתונים נוצרות מטרנזקציה אחת ושינויים, צבירות או מקורות נתונים אחרים שמשמשים בצינור.

אחרי הקצאת המשאבים הראשונית, חשוב לעקוב אחרי המדדים שמוזכרים במאמר מעקב אחרי צינור עיבוד נתונים, כדי לוודא שצינור עיבוד הנתונים תקין. מומלץ להתנסות עם גודל התחלתי של מאגר העובדים ולעקוב אחרי אופן הטיפול של צינור הנתונים בעומס, ולהגדיל את מספר הצמתים אם צריך. המדד 'ניצול המעבד' הוא מדד חשוב לבדיקה אם העומס תקין וצריך עוד צמתים.

מגבלות ידועות

יש כמה מגבלות ידועות כשמשתמשים בסנכרון שינויים בזרמי נתונים של Spanner עם Dataflow:

התאמה אוטומטית לעומס (Automatic scaling)

כדי לתמוך בהתאמה אוטומטית לעומס (auto-scaling) בכל צינורות הנתונים שכוללים SpannerIO.readChangeStream, צריך להשתמש ב-Apache Beam בגרסה 2.39.0 ואילך.

אם אתם משתמשים בגרסה של Apache Beam שקודמת ל-2.39.0, צריך לציין במפורש את אלגוריתם ההתאמה האוטומטית לעומס בצינורות שמכילים SpannerIO.readChangeStream כ-NONE, כמו שמתואר במאמר התאמה אוטומטית של קנה מידה אופקי.

כדי לשנות את גודל צינור Dataflow באופן ידני במקום להשתמש בהתאמה אוטומטית לעומס, אפשר לעיין במאמר בנושא שינוי גודל צינור סטרימינג באופן ידני.

Runner V2

מחבר הנתונים של Spanner סנכרון שינויים בזרמי נתונים דורש Dataflow Runner V2. צריך לציין את זה באופן ידני במהלך הביצוע, אחרת תוצג שגיאה. אתם יכולים לציין את Runner V2 על ידי הגדרת העבודה באמצעות --experiments=use_unified_worker,use_runner_v2.

תמונת מצב

המחבר של Spanner לשינוי זרמים לא תומך בתמונות מצב של Dataflow.

זמן להשלמת תהליך

מחבר סנכרון שינויים בזרמי נתונים ב-Spanner לא תומך בהפסקת העבודה של ג'וב. אפשר לבטל רק עבודות קיימות.

אפשר גם לעדכן צינור קיים בלי להפסיק אותו.

OpenCensus

כדי להשתמש ב-OpenCensus כדי לעקוב אחרי צינור הנתונים, צריך לציין גרסה 0.28.3 ואילך.

NullPointerException בתחילת הפייפליין

באג בגרסה 2.38.0 של Apache Beam עלול לגרום לNullPointerException כשמפעילים את צינור הנתונים בתנאים מסוימים. הפעולה הזו תמנע את התחלת העבודה, ובמקום זאת תוצג הודעת השגיאה הבאה:

java.lang.NullPointerException: null value in entry: Cloud Storage_PROJECT_ID=null

כדי לפתור את הבעיה הזו, אפשר להשתמש בגרסה 2.39.0 ואילך של Apache Beam, או לציין באופן ידני את הגרסה של beam-sdks-java-core כ-2.37.0:

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-core</artifactId>
  <version>2.37.0</version>
</dependency>

מידע נוסף