ייבוא, ייצוא ושינוי של נתונים באמצעות Dataflow

בדף הזה מוסבר איך להשתמש במחבר Dataflow ל-Spanner כדי לייבא, לייצא ולשנות נתונים במסדי נתונים של Spanner עם ניב GoogleSQL ובמסדי נתונים עם ניב PostgreSQL.

Dataflow הוא שירות מנוהל לשינוי נתונים ולהוספת מידע לנתונים. מחבר Dataflow ל-Spanner מאפשר לקרוא נתונים מ-Spanner ולכתוב נתונים ב-Spanner בצינור Dataflow, עם אפשרות להמרה או לשינוי של הנתונים. אפשר גם ליצור צינורות להעברת נתונים בין Spanner לבין מוצרים אחרים שלGoogle Cloud .

מחבר Dataflow הוא השיטה המומלצת להעברת נתונים אל Spanner וממנו בכמות גדולה בצורה יעילה. זו גם השיטה המומלצת לביצוע טרנספורמציות גדולות במסד נתונים שלא נתמכות על ידי Partitioned DML, כמו העברות של טבלאות ומחיקות בכמות גדולה שדורשות JOIN. כשעובדים עם מסדי נתונים בודדים, אפשר להשתמש בשיטות אחרות כדי לייבא ולייצא נתונים:

  • משתמשים במסוף כדי לייצא מסד נתונים בודד מ-Spanner אל Cloud Storage בפורמט Avro. Google Cloud
  • אפשר להשתמש במסוף Google Cloud כדי לייבא מסד נתונים בחזרה ל-Spanner מקבצים שייצאתם ל-Cloud Storage.
  • אפשר להשתמש ב-API בארכיטקטורת REST או ב-Google Cloud CLI כדי להריץ משימות ייצוא או ייבוא מ-Spanner ל-Cloud Storage ובחזרה, גם באמצעות פורמט Avro.

המחבר של Dataflow ל-Spanner הוא חלק מ-Apache Beam Java SDK, והוא מספק API לביצוע הפעולות הקודמות. למידע נוסף על חלק מהמושגים שמוסברים בדף הזה, כמו אובייקטים וטרנספורמציות, אפשר לעיין במדריך לתכנות ב-Apache Beam.PCollection

הוספת המחבר לפרויקט Maven

כדי להוסיף את מחבר Dataflow לפרויקט Maven, מוסיפים את ארטיפקט Maven‏ beam-sdks-java-io-google-cloud-platform לקובץ pom.xml כתלות. Google Cloud

לדוגמה, אם בקובץ pom.xml מוגדר beam.version למספר הגרסה המתאים, מוסיפים את התלות הבאה:

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
    <version>${beam.version}</version>
</dependency>

קריאת נתונים מ-Spanner

כדי לקרוא מ-Spanner, צריך להחיל את טרנספורמציית SpannerIO.read. מגדירים את הקריאה באמצעות השיטות בכיתה SpannerIO.Read. הפעלת הטרנספורמציה מחזירה PCollection<Struct>, שבו כל רכיב באוסף מייצג שורה נפרדת שהוחזרה על ידי פעולת הקריאה. אפשר לקרוא מ-Spanner עם שאילתת SQL ספציפית או בלי, בהתאם לפלט שנדרש.

החלת טרנספורמציית SpannerIO.read מחזירה תצוגת נתונים עקבית על ידי ביצוע קריאה חזקה. אלא אם מציינים אחרת, התוצאה של הקריאה מצולמת בזמן שבו התחלתם את הקריאה. במאמר בנושא קריאות אפשר לקבל מידע נוסף על הסוגים השונים של קריאות ש-Spanner יכול לבצע.

קריאת נתונים באמצעות שאילתה

כדי לקרוא קבוצה ספציפית של נתונים מ-Spanner, צריך להגדיר את הטרנספורמציה באמצעות השיטה SpannerIO.Read.withQuery כדי לציין שאילתת SQL. לדוגמה:

// Query for all the columns and rows in the specified Spanner table
PCollection<Struct> records = pipeline.apply(
    SpannerIO.read()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withQuery("SELECT * FROM " + options.getTable()));

קריאת נתונים בלי לציין שאילתה

כדי לקרוא מתוך מסד נתונים בלי להשתמש בשאילתה, אפשר לציין שם של טבלה באמצעות ה-method ‏SpannerIO.Read.withTable, ולציין רשימה של עמודות לקריאה באמצעות ה-method ‏SpannerIO.Read.withColumns. לדוגמה:

GoogleSQL

// Query for all the columns and rows in the specified Spanner table
PCollection<Struct> records = pipeline.apply(
    SpannerIO.read()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withTable("Singers")
        .withColumns("singerId", "firstName", "lastName"));

PostgreSQL

// Query for all the columns and rows in the specified Spanner table
PCollection<Struct> records = pipeline.apply(
    SpannerIO.read()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withTable("singers")
        .withColumns("singer_id", "first_name", "last_name"));

כדי להגביל את מספר השורות שנקראות, אפשר לציין קבוצה של מפתחות ראשיים לקריאה באמצעות ה-method‏ SpannerIO.Read.withKeySet.

אפשר גם לקרוא טבלה באמצעות אינדקס משני שצוין. בדומה לreadUsingIndex קריאה ל-API, האינדקס צריך לכלול את כל הנתונים שמופיעים בתוצאות השאילתה.

כדי לעשות זאת, מציינים את הטבלה כמו בדוגמה הקודמת, ומציינים את האינדקס שמכיל את ערכי העמודות הנדרשים באמצעות השיטה SpannerIO.Read.withIndex. האינדקס צריך לאחסן את כל העמודות שהטרנספורמציה צריכה לקרוא. המפתח הראשי של טבלת הבסיס מאוחסן באופן מרומז. לדוגמה, כדי לקרוא את הטבלה Songs באמצעות האינדקס SongsBySongName, משתמשים בקוד הבא:

GoogleSQL

// Read the indexed columns from all rows in the specified index.
PCollection<Struct> records =
    pipeline.apply(
        SpannerIO.read()
            .withInstanceId(instanceId)
            .withDatabaseId(databaseId)
            .withTable("Songs")
            .withIndex("SongsBySongName")
            // Can only read columns that are either indexed, STORED in the index or
            // part of the primary key of the Songs table,
            .withColumns("SingerId", "AlbumId", "TrackId", "SongName"));

PostgreSQL

// // Read the indexed columns from all rows in the specified index.
PCollection<Struct> records =
    pipeline.apply(
        SpannerIO.read()
            .withInstanceId(instanceId)
            .withDatabaseId(databaseId)
            .withTable("Songs")
            .withIndex("SongsBySongName")
            // Can only read columns that are either indexed, STORED in the index or
            // part of the primary key of the songs table,
            .withColumns("singer_id", "album_id", "track_id", "song_name"));

שליטה במידת העדכניות של נתוני העסקאות

מובטח שטרנספורמציה תתבצע על תמונת מצב עקבית של הנתונים. כדי לשלוט בטריות הנתונים, משתמשים בשיטה SpannerIO.Read.withTimestampBound. מידע נוסף זמין במאמר בנושא עסקאות.

קריאה מכמה טבלאות באותה טרנזקציה

אם רוצים לקרוא נתונים מכמה טבלאות באותה נקודת זמן כדי לוודא שהנתונים עקביים, צריך לבצע את כל הקריאות בעסקה אחת. כדי לעשות את זה, צריך להחיל טרנספורמציה של createTransaction, ליצור אובייקט PCollectionView<Transaction> ואז ליצור טרנזקציה. אפשר להעביר את התצוגה שמתקבלת לפעולת קריאה באמצעות SpannerIO.Read.withTransaction.

GoogleSQL

SpannerConfig spannerConfig =
    SpannerConfig.create().withInstanceId(instanceId).withDatabaseId(databaseId);
PCollectionView<Transaction> tx =
    pipeline.apply(
        SpannerIO.createTransaction()
            .withSpannerConfig(spannerConfig)
            .withTimestampBound(TimestampBound.strong()));
PCollection<Struct> singers =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT SingerID, FirstName, LastName FROM Singers")
            .withTransaction(tx));
PCollection<Struct> albums =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT SingerId, AlbumId, AlbumTitle FROM Albums")
            .withTransaction(tx));

PostgreSQL

SpannerConfig spannerConfig =
    SpannerConfig.create().withInstanceId(instanceId).withDatabaseId(databaseId);
PCollectionView<Transaction> tx =
    pipeline.apply(
        SpannerIO.createTransaction()
            .withSpannerConfig(spannerConfig)
            .withTimestampBound(TimestampBound.strong()));
PCollection<Struct> singers =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT singer_id, first_name, last_name FROM singers")
            .withTransaction(tx));
PCollection<Struct> albums =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT singer_id, album_id, album_title FROM albums")
            .withTransaction(tx));

קריאת נתונים מכל הטבלאות הזמינות

אפשר לקרוא נתונים מכל הטבלאות הזמינות במסד נתונים של Spanner.

GoogleSQL

PCollection<Struct> allRecords =
    pipeline
        .apply(
            SpannerIO.read()
                .withSpannerConfig(spannerConfig)
                .withBatching(false)
                .withQuery(
                    "SELECT t.table_name FROM information_schema.tables AS t WHERE t"
                        + ".table_catalog = '' AND t.table_schema = ''"))
        .apply(
            MapElements.into(TypeDescriptor.of(ReadOperation.class))
                .via(
                    (SerializableFunction<Struct, ReadOperation>)
                        input -> {
                          String tableName = input.getString(0);
                          return ReadOperation.create().withQuery("SELECT * FROM " + tableName);
                        }))
        .apply(SpannerIO.readAll().withSpannerConfig(spannerConfig));

PostgreSQL

PCollection<Struct> allRecords =
    pipeline
        .apply(
            SpannerIO.read()
                .withSpannerConfig(spannerConfig)
                .withBatching(false)
                .withQuery(
                    Statement.newBuilder(
                            "SELECT t.table_name FROM information_schema.tables AS t "
                                + "WHERE t.table_catalog = $1 AND t.table_schema = $2")
                        .bind("p1")
                        .to(spannerConfig.getDatabaseId().get())
                        .bind("p2")
                        .to("public")
                        .build()))
        .apply(
            MapElements.into(TypeDescriptor.of(ReadOperation.class))
                .via(
                    (SerializableFunction<Struct, ReadOperation>)
                        input -> {
                          String tableName = input.getString(0);
                          return ReadOperation.create()
                              .withQuery("SELECT * FROM \"" + tableName + "\"");
                        }))
        .apply(SpannerIO.readAll().withSpannerConfig(spannerConfig));

פתרון בעיות שקשורות לשאילתות לא נתמכות

המחבר של Dataflow תומך רק בשאילתות Spanner SQL שבהן האופרטור הראשון בתוכנית הביצוע של השאילתה הוא Distributed Union. אם מנסים לקרוא נתונים מ-Spanner באמצעות שאילתה ומתקבל חריג שבו מצוין שהשאילתה does not have a DistributedUnion at the root, צריך לפעול לפי השלבים במאמר הסבר על האופן שבו Spanner מבצע שאילתות כדי לאחזר תוכנית הרצה לשאילתה באמצעות מסוף Google Cloud .

אם שאילתת ה-SQL שלכם לא נתמכת, פשטו אותה לשאילתה עם איחוד מבוזר כאופרטור הראשון בתוכנית הביצוע של השאילתה. מסירים פונקציות מצטברות, צירופי טבלאות ואת האופרטורים DISTINCT, GROUP BY ו-ORDER, כי הם האופרטורים שהכי סביר שימנעו את הפעלת השאילתה.

יצירת מוטציות לכתיבה

במקום השיטה newInsertBuilder, צריך להשתמש בשיטה newInsertOrUpdateBuilder של המחלקה Mutation, אלא אם יש צורך מוחלט בשיטה newInsertBuilder בצינורות Java. בצינורות Python, משתמשים ב-SpannerInsertOrUpdate במקום ב-SpannerInsert. ‫Dataflow מספקת הבטחות של 'לפחות פעם אחת', כלומר יכול להיות שהמוטציה תיכתב כמה פעמים. כתוצאה מכך, יכול להיות שרק מוטציות ייצרו שגיאות שיגרמו לצינור לעבור למצב כשל.INSERTcom.google.cloud.spanner.SpannerException: ALREADY_EXISTS כדי למנוע את השגיאה הזו, אפשר להשתמש במוטציה INSERT_OR_UPDATE במקום זאת, שמוסיפה שורה חדשה או מעדכנת את ערכי העמודות אם השורה כבר קיימת. אפשר להשתמש במוטציה INSERT_OR_UPDATE יותר מפעם אחת.

כתיבה ל-Spanner וטרנספורמציה של נתונים

אפשר לכתוב נתונים ל-Spanner באמצעות מחבר Dataflow על ידי שימוש בטרנספורמציה SpannerIO.write כדי להפעיל אוסף של מוטציות בשורות קלט. מחבר Dataflow מקבץ שינויים למנות כדי לשפר את היעילות.

בדוגמה הבאה אפשר לראות איך להחיל טרנספורמציה של כתיבה על PCollection של מוטציות:

GoogleSQL

albums
    // Spanner expects a Mutation object, so create it using the Album's data
    .apply("CreateAlbumMutation", ParDo.of(new DoFn<Album, Mutation>() {
      @ProcessElement
      public void processElement(ProcessContext c) {
        Album album = c.element();
        c.output(Mutation.newInsertOrUpdateBuilder("albums")
            .set("singerId").to(album.singerId)
            .set("albumId").to(album.albumId)
            .set("albumTitle").to(album.albumTitle)
            .build());
      }
    }))
    // Write mutations to Spanner
    .apply("WriteAlbums", SpannerIO.write()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId));

PostgreSQL

PCollectionView<Dialect> dialectView =
    pipeline.apply(Create.of(Dialect.POSTGRESQL)).apply(View.asSingleton());
albums
    // Spanner expects a Mutation object, so create it using the Album's data
    .apply("CreateAlbumMutation", ParDo.of(new DoFn<Album, Mutation>() {
      @ProcessElement
      public void processElement(ProcessContext c) {
        Album album = c.element();
        c.output(Mutation.newInsertOrUpdateBuilder("albums")
            .set("singerId").to(album.singerId)
            .set("albumId").to(album.albumId)
            .set("albumTitle").to(album.albumTitle)
            .build());
      }
    }))
    // Write mutations to Spanner
    .apply("WriteAlbums", SpannerIO.write()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withDialectView(dialectView));

אם טרנספורמציה נעצרת באופן בלתי צפוי לפני שהיא מסתיימת, המוטציות שכבר הוחלו לא מבוטלות.

החלת קבוצות של שינויים באופן אטומי

אפשר להשתמש במחלקה MutationGroup כדי לוודא שקבוצה של מוטציות מוחלת יחד באופן אטומי. מובטח ששינויים ב-MutationGroup יישלחו באותה עסקה, אבל יכול להיות שהעסקה תנסה להישלח שוב.

הביצועים של קבוצות מוטציות הם הכי טובים כשמשתמשים בהן כדי לקבץ מוטציות שמשפיעות על נתונים שמאוחסנים קרוב אחד לשני במרחב המפתחות. מכיוון ש-Spanner משלב נתונים של טבלאות אב וטבלאות צאצא בטבלת האב, הנתונים האלה תמיד קרובים זה לזה במרחב המפתחות. מומלץ לבנות את קבוצת השינויים כך שהיא תכיל שינוי אחד שחל על טבלת אב ושינויים נוספים שחלים על טבלאות צאצא, או כך שכל השינויים שלה ישנו נתונים שקרובים זה לזה במרחב המפתחות. מידע נוסף על האופן שבו Spanner מאחסן נתונים של טבלאות אב וטבלאות צאצא זמין במאמר בנושא סכימה ומודל נתונים. אם לא מארגנים את קבוצות השינויים לפי היררכיות הטבלאות המומלצות, או אם הנתונים שאליהם מתבצעת גישה לא נמצאים קרוב זה לזה במרחב המפתחות, יכול להיות ש-Spanner יצטרך לבצע אישורי טרנזקציות דו-שלביים, מה שיוביל לביצועים איטיים יותר. מידע נוסף זמין במאמר Locality tradeoffs.

כדי להשתמש ב-MutationGroup, צריך ליצור טרנספורמציה של SpannerIO.write ולקרוא ל-method‏ SpannerIO.Write.grouped, שמחזירה טרנספורמציה שאפשר להחיל על PCollection של אובייקטים מסוג MutationGroup.

כשיוצרים MutationGroup, המוטציה הראשונה שמופיעה ברשימה הופכת למוטציה הראשית. אם קבוצת המוטציות משפיעה על טבלת אב ועל טבלת צאצא, המוטציה הראשית צריכה להיות מוטציה בטבלת האב. אחרת, אפשר להשתמש בכל מוטציה כמוטציה הראשית. מחבר Dataflow משתמש במוטציה הראשית כדי לקבוע את גבולות המחיצות, כדי לאגד ביעילות מוטציות.

לדוגמה, נניח שהאפליקציה שלכם עוקבת אחרי התנהגות המשתמשים ומסמנת התנהגות בעייתית לבדיקה. לכל התנהגות שסומנה, צריך לעדכן את הטבלה Users כדי לחסום את הגישה של המשתמש לאפליקציה, וגם לתעד את האירוע בטבלה PendingReviews. כדי לוודא ששתי הטבלאות מתעדכנות באופן אטומי, משתמשים ב-MutationGroup:

GoogleSQL

PCollection<MutationGroup> mutations =
    suspiciousUserIds.apply(
        MapElements.via(
            new SimpleFunction<>() {

              @Override
              public MutationGroup apply(String userId) {
                // Immediately block the user.
                Mutation userMutation =
                    Mutation.newUpdateBuilder("Users")
                        .set("id")
                        .to(userId)
                        .set("state")
                        .to("BLOCKED")
                        .build();
                long generatedId =
                    Hashing.sha1()
                        .newHasher()
                        .putString(userId, Charsets.UTF_8)
                        .putLong(timestamp.getSeconds())
                        .putLong(timestamp.getNanos())
                        .hash()
                        .asLong();

                // Add an entry to pending review requests.
                Mutation pendingReview =
                    Mutation.newInsertOrUpdateBuilder("PendingReviews")
                        .set("id")
                        .to(generatedId) // Must be deterministically generated.
                        .set("userId")
                        .to(userId)
                        .set("action")
                        .to("REVIEW ACCOUNT")
                        .set("note")
                        .to("Suspicious activity detected.")
                        .build();

                return MutationGroup.create(userMutation, pendingReview);
              }
            }));

mutations.apply(SpannerIO.write()
    .withInstanceId(instanceId)
    .withDatabaseId(databaseId)
    .grouped());

PostgreSQL

PCollectionView<Dialect> dialectView =
    pipeline.apply(Create.of(Dialect.POSTGRESQL)).apply(View.asSingleton());
PCollection<MutationGroup> mutations = suspiciousUserIds
    .apply(MapElements.via(new SimpleFunction<String, MutationGroup>() {

      @Override
      public MutationGroup apply(String userId) {
        // Immediately block the user.
        Mutation userMutation = Mutation.newUpdateBuilder("Users")
            .set("id").to(userId)
            .set("state").to("BLOCKED")
            .build();
        long generatedId = Hashing.sha1().newHasher()
            .putString(userId, Charsets.UTF_8)
            .putLong(timestamp.getSeconds())
            .putLong(timestamp.getNanos())
            .hash()
            .asLong();

        // Add an entry to pending review requests.
        Mutation pendingReview = Mutation.newInsertOrUpdateBuilder("PendingReviews")
            .set("id").to(generatedId)  // Must be deterministically generated.
            .set("userId").to(userId)
            .set("action").to("REVIEW ACCOUNT")
            .set("note").to("Suspicious activity detected.")
            .build();

        return MutationGroup.create(userMutation, pendingReview);
      }
    }));

mutations.apply(SpannerIO.write()
    .withInstanceId(instanceId)
    .withDatabaseId(databaseId)
    .withDialectView(dialectView)
    .grouped());

כשיוצרים קבוצת מוטציות, המוטציה הראשונה שמסופקת כארגומנט הופכת למוטציה הראשית. במקרה הזה, אין קשר בין שתי הטבלאות, ולכן אין מוטציה ראשית ברורה. בחרנו ב-userMutation כשפה הראשית והצבנו אותה ראשונה. החלת שתי המוטציות בנפרד תהיה מהירה יותר, אבל לא תבטיח אטומיות, ולכן קבוצת המוטציות היא הבחירה הטובה ביותר במצב הזה.

המאמרים הבאים