שינוי נתונים באמצעות כתיבה באצווה

בדף הזה מתוארות בקשות כתיבה ב-Batch ב-Spanner, ומוסבר איך אפשר להשתמש בהן כדי לשנות את הנתונים ב-Spanner.

אפשר להשתמש בכתיבה באצווה ב-Spanner כדי להוסיף, לעדכן או למחוק כמה שורות בטבלאות של Spanner. הכתיבה באצווה ב-Spanner תומכת בכתיבה עם השהיה נמוכה ללא פעולת קריאה, ומחזירה תשובות כשהשינויים מוחלים באצוות. כדי להשתמש בכתיבה באצווה, מקבצים מוטציות קשורות, וכל המוטציות בקבוצה מבוצעות באופן אטומי. המוטציות בקבוצות מוחלות בסדר לא מוגדר והן בלתי תלויות זו בזו (לא אטומיות). ‫Spanner לא צריך להמתין עד שכל השינויים יוחלו לפני שליחת תגובה, מה שאומר שפעולת כתיבה באצווה מאפשרת כשל חלקי. אפשר גם לבצע כמה פעולות כתיבה באצווה בו-זמנית. מידע נוסף זמין במאמר בנושא איך משתמשים בכתיבה באצווה.

תרחישים לדוגמה

הכתיבה באצווה ב-Spanner שימושית במיוחד אם רוצים לבצע כמות גדולה של פעולות כתיבה בלי פעולת קריאה, אבל לא נדרשת טרנזקציה אטומית לכל השינויים.

אם רוצים לבצע את בקשות ה-DML באצווה, משתמשים בDML באצווה כדי לשנות את נתוני Spanner. מידע נוסף על ההבדלים בין DML לבין מוטציות זמין במאמר השוואה בין DML לבין מוטציות.

לגבי בקשות לשינוי יחיד, מומלץ להשתמש בטרנזקציה של קריאה וכתיבה עם נעילה.

מגבלות

לכתיבה באצווה ב-Spanner יש את המגבלות הבאות:

  • אי אפשר להשתמש ב-Spanner batch write באמצעות מסוףGoogle Cloud או Google Cloud CLI. היא זמינה רק באמצעות ממשקי REST ו-RPC API וספריות הלקוח של Spanner.

  • הגנה מפני שידור חוזר אינה נתמכת באמצעות כתיבת אצווה. יכול להיות שמוטציות יוחלו יותר מפעם אחת, ומוטציה שמוחלת יותר מפעם אחת עלולה לגרום לכשל. לדוגמה, אם מפעילים מחדש מוטציה של הוספה, יכול להיות שתתקבל שגיאה של קיום נתונים שכבר קיימים, או שאם משתמשים במפתחות מבוססי חותמת זמן שנוצרו או הועברו במוטציה, יכול להיות שיתווספו שורות נוספות לטבלה. כדי למנוע את הבעיה הזו, מומלץ להגדיר את פעולות הכתיבה כך שיהיו אידמפוטנטיות.

  • אי אפשר לבטל בקשה לכתיבת נתונים בקבוצה אחרי שהיא הושלמה. אפשר לבטל בקשה לכתיבת נתונים בכמות גדולה שנמצאת בתהליך. אם מבטלים כתיבה של קבוצת פעולות שנמצאת בתהליך, מוטציות בקבוצות שלא הושלמו מבוטלות. מוטציות בקבוצות שהושלמו מועברות למסד הנתונים.

  • הגודל המקסימלי של בקשת כתיבה באצווה זהה למגבלה של בקשת ביצוע (commit). מידע נוסף זמין במאמר בנושא מגבלות על יצירה, קריאה, עדכון ומחיקה של נתונים.

איך משתמשים בכתיבה בקבוצות

כדי להשתמש בכתיבה באצווה, צריכה להיות לכם הרשאה spanner.databases.write במסד הנתונים שאתם רוצים לשנות. אפשר לבצע כמה שינויים בכתיבה בבת אחת (batch) בבקשה אחת באמצעות קריאה ל-API ל-REST או ל-RPC.

כשמשתמשים בכתיבה באצווה, צריך לקבץ יחד את סוגי המוטציות הבאים:

  • הוספת שורות עם אותה תחילית של מפתח ראשי גם בטבלת ההורה וגם בטבלת הצאצא.
  • הוספת שורות לטבלאות עם קשר של מפתח זר בין הטבלאות.
  • סוגים אחרים של מוטציות קשורות, בהתאם לסכמת מסד הנתונים וללוגיקה של האפליקציה.

אפשר גם לכתוב נתונים בכתיבה אצווה באמצעות ספריות הלקוח של Spanner. בדוגמת הקוד הבאה מתבצע עדכון של הטבלה Singers עם שורות חדשות.

ספריות לקוח

Java


import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.DatabaseId;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.MutationGroup;
import com.google.cloud.spanner.Options;
import com.google.cloud.spanner.Spanner;
import com.google.cloud.spanner.SpannerOptions;
import com.google.common.collect.ImmutableList;
import com.google.rpc.Code;
import com.google.spanner.v1.BatchWriteResponse;

public class BatchWriteAtLeastOnceSample {

  /***
   * Assume DDL for the underlying database:
   * <pre>{@code
   *   CREATE TABLE Singers (
   *     SingerId   INT64 NOT NULL,
   *     FirstName  STRING(1024),
   *     LastName   STRING(1024),
   *   ) PRIMARY KEY (SingerId)
   *
   *   CREATE TABLE Albums (
   *     SingerId     INT64 NOT NULL,
   *     AlbumId      INT64 NOT NULL,
   *     AlbumTitle   STRING(1024),
   *   ) PRIMARY KEY (SingerId, AlbumId),
   *   INTERLEAVE IN PARENT Singers ON DELETE CASCADE
   * }</pre>
   */

  private static final MutationGroup MUTATION_GROUP1 =
      MutationGroup.of(
          Mutation.newInsertOrUpdateBuilder("Singers")
              .set("SingerId")
              .to(16)
              .set("FirstName")
              .to("Scarlet")
              .set("LastName")
              .to("Terry")
              .build());
  private static final MutationGroup MUTATION_GROUP2 =
      MutationGroup.of(
          Mutation.newInsertOrUpdateBuilder("Singers")
              .set("SingerId")
              .to(17)
              .set("FirstName")
              .to("Marc")
              .build(),
          Mutation.newInsertOrUpdateBuilder("Singers")
              .set("SingerId")
              .to(18)
              .set("FirstName")
              .to("Catalina")
              .set("LastName")
              .to("Smith")
              .build(),
          Mutation.newInsertOrUpdateBuilder("Albums")
              .set("SingerId")
              .to(17)
              .set("AlbumId")
              .to(1)
              .set("AlbumTitle")
              .to("Total Junk")
              .build(),
          Mutation.newInsertOrUpdateBuilder("Albums")
              .set("SingerId")
              .to(18)
              .set("AlbumId")
              .to(2)
              .set("AlbumTitle")
              .to("Go, Go, Go")
              .build());

  static void batchWriteAtLeastOnce() {
    // TODO(developer): Replace these variables before running the sample.
    final String projectId = "my-project";
    final String instanceId = "my-instance";
    final String databaseId = "my-database";
    batchWriteAtLeastOnce(projectId, instanceId, databaseId);
  }

  static void batchWriteAtLeastOnce(String projectId, String instanceId, String databaseId) {
    try (Spanner spanner =
        SpannerOptions.newBuilder().setProjectId(projectId).build().getService()) {
      DatabaseId dbId = DatabaseId.of(projectId, instanceId, databaseId);
      final DatabaseClient dbClient = spanner.getDatabaseClient(dbId);

      // Creates and issues a BatchWrite RPC request that will apply the mutation groups
      // non-atomically and respond back with a stream of BatchWriteResponse.
      ServerStream<BatchWriteResponse> responses =
          dbClient.batchWriteAtLeastOnce(
              ImmutableList.of(MUTATION_GROUP1, MUTATION_GROUP2),
              Options.tag("batch-write-tag"));

      // Iterates through the results in the stream response and prints the MutationGroup indexes,
      // commit timestamp and status.
      for (BatchWriteResponse response : responses) {
        if (response.getStatus().getCode() == Code.OK_VALUE) {
          System.out.printf(
              "Mutation group indexes %s have been applied with commit timestamp %s",
              response.getIndexesList(), response.getCommitTimestamp());
        } else {
          System.out.printf(
              "Mutation group indexes %s could not be applied with error code %s and "
                  + "error message %s", response.getIndexesList(),
              Code.forNumber(response.getStatus().getCode()), response.getStatus().getMessage());
        }
      }
    }
  }
}

Go


import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/spanner"
	sppb "cloud.google.com/go/spanner/apiv1/spannerpb"
	"google.golang.org/grpc/status"
)

// batchWrite demonstrates writing mutations to a Spanner database through
// BatchWrite API - https://pkg.go.dev/cloud.google.com/go/spanner#Client.BatchWrite
func batchWrite(w io.Writer, db string) error {
	// db := "projects/my-project/instances/my-instance/databases/my-database"
	ctx := context.Background()
	client, err := spanner.NewClient(ctx, db)
	if err != nil {
		return err
	}
	defer client.Close()

	// Database is assumed to exist - https://cloud.google.com/spanner/docs/getting-started/go#create_a_database
	singerColumns := []string{"SingerId", "FirstName", "LastName"}
	albumColumns := []string{"SingerId", "AlbumId", "AlbumTitle"}
	mutationGroups := make([]*spanner.MutationGroup, 2)

	mutationGroup1 := []*spanner.Mutation{
		spanner.InsertOrUpdate("Singers", singerColumns, []interface{}{16, "Scarlet", "Terry"}),
	}
	mutationGroups[0] = &spanner.MutationGroup{Mutations: mutationGroup1}

	mutationGroup2 := []*spanner.Mutation{
		spanner.InsertOrUpdate("Singers", singerColumns, []interface{}{17, "Marc", ""}),
		spanner.InsertOrUpdate("Singers", singerColumns, []interface{}{18, "Catalina", "Smith"}),
		spanner.InsertOrUpdate("Albums", albumColumns, []interface{}{17, 1, "Total Junk"}),
		spanner.InsertOrUpdate("Albums", albumColumns, []interface{}{18, 2, "Go, Go, Go"}),
	}
	mutationGroups[1] = &spanner.MutationGroup{Mutations: mutationGroup2}
	iter := client.BatchWrite(ctx, mutationGroups)
	// See https://pkg.go.dev/cloud.google.com/go/spanner#BatchWriteResponseIterator.Do
	doFunc := func(response *sppb.BatchWriteResponse) error {
		if err = status.ErrorProto(response.GetStatus()); err == nil {
			fmt.Fprintf(w, "Mutation group indexes %v have been applied with commit timestamp %v",
				response.GetIndexes(), response.GetCommitTimestamp())
		} else {
			fmt.Fprintf(w, "Mutation group indexes %v could not be applied with error %v",
				response.GetIndexes(), err)
		}
		// Return an actual error as needed.
		return nil
	}
	return iter.Do(doFunc)
}

צומת


// Imports the Google Cloud client library
const {Spanner, MutationGroup} = require('@google-cloud/spanner');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// const instanceId = 'my-instance';
// const databaseId = 'my-database';
// const projectId = 'my-project-id';

// Creates a client
const spanner = new Spanner({
  projectId: projectId,
});

// Gets a reference to a Cloud Spanner instance and database
const instance = spanner.instance(instanceId);
const database = instance.database(databaseId);

// Create Mutation Groups
/**
 * Related mutations should be placed in a group, such as insert mutations for both a parent and a child row.
 * A group must contain related mutations.
 * Please see {@link https://cloud.google.com/spanner/docs/reference/rpc/google.spanner.v1#google.spanner.v1.BatchWriteRequest.MutationGroup}
 * for more details and examples.
 */
const mutationGroup1 = new MutationGroup();
mutationGroup1.insert('Singers', {
  SingerId: 1,
  FirstName: 'Scarlet',
  LastName: 'Terry',
});

const mutationGroup2 = new MutationGroup();
mutationGroup2.insert('Singers', {
  SingerId: 2,
  FirstName: 'Marc',
});
mutationGroup2.insert('Singers', {
  SingerId: 3,
  FirstName: 'Catalina',
  LastName: 'Smith',
});
mutationGroup2.insert('Albums', {
  AlbumId: 1,
  SingerId: 2,
  AlbumTitle: 'Total Junk',
});
mutationGroup2.insert('Albums', {
  AlbumId: 2,
  SingerId: 3,
  AlbumTitle: 'Go, Go, Go',
});

const options = {
  transactionTag: 'batch-write-tag',
};

try {
  database
    .batchWriteAtLeastOnce([mutationGroup1, mutationGroup2], options)
    .on('error', console.error)
    .on('data', response => {
      // Check the response code of each response to determine whether the mutation group(s) were applied successfully.
      if (response.status.code === 0) {
        console.log(
          `Mutation group indexes ${
            response.indexes
          }, have been applied with commit timestamp ${Spanner.timestamp(
            response.commitTimestamp,
          ).toJSON()}`,
        );
      }
      // Mutation groups that fail to commit trigger a response with a non-zero status code.
      else {
        console.log(
          `Mutation group indexes ${response.indexes}, could not be applied with error code ${response.status.code}, and error message ${response.status.message}`,
        );
      }
    })
    .on('end', () => {
      console.log('Request completed successfully');
    });
} catch (err) {
  console.log(err);
}

Python

def batch_write(instance_id, database_id):
    """Inserts sample data into the given database via BatchWrite API.

    The database and table must already exist and can be created using
    `create_database`.
    """
    from google.rpc.code_pb2 import OK

    spanner_client = spanner.Client()
    instance = spanner_client.instance(instance_id)
    database = instance.database(database_id)

    with database.mutation_groups() as groups:
        group1 = groups.group()
        group1.insert_or_update(
            table="Singers",
            columns=("SingerId", "FirstName", "LastName"),
            values=[
                (16, "Scarlet", "Terry"),
            ],
        )

        group2 = groups.group()
        group2.insert_or_update(
            table="Singers",
            columns=("SingerId", "FirstName", "LastName"),
            values=[
                (17, "Marc", ""),
                (18, "Catalina", "Smith"),
            ],
        )
        group2.insert_or_update(
            table="Albums",
            columns=("SingerId", "AlbumId", "AlbumTitle"),
            values=[
                (17, 1, "Total Junk"),
                (18, 2, "Go, Go, Go"),
            ],
        )

        for response in groups.batch_write():
            if response.status.code == OK:
                print(
                    "Mutation group indexes {} have been applied with commit timestamp {}".format(
                        response.indexes, response.commit_timestamp
                    )
                )
            else:
                print(
                    "Mutation group indexes {} could not be applied with error {}".format(
                        response.indexes, response.status
                    )
                )

C++‎

namespace spanner = ::google::cloud::spanner;
// Use upserts as mutation groups are not replay protected.
auto commit_results = client.CommitAtLeastOnce({
    // group #0
    spanner::Mutations{
        spanner::InsertOrUpdateMutationBuilder(
            "Singers", {"SingerId", "FirstName", "LastName"})
            .EmplaceRow(16, "Scarlet", "Terry")
            .Build(),
    },
    // group #1
    spanner::Mutations{
        spanner::InsertOrUpdateMutationBuilder(
            "Singers", {"SingerId", "FirstName", "LastName"})
            .EmplaceRow(17, "Marc", "")
            .EmplaceRow(18, "Catalina", "Smith")
            .Build(),
        spanner::InsertOrUpdateMutationBuilder(
            "Albums", {"SingerId", "AlbumId", "AlbumTitle"})
            .EmplaceRow(17, 1, "Total Junk")
            .EmplaceRow(18, 2, "Go, Go, Go")
            .Build(),
    },
});
for (auto& commit_result : commit_results) {
  if (!commit_result) throw std::move(commit_result).status();
  std::cout << "Mutation group indexes [";
  for (auto index : commit_result->indexes) std::cout << " " << index;
  std::cout << " ]: ";
  if (commit_result->commit_timestamp) {
    auto const& ts = *commit_result->commit_timestamp;
    std::cout << "Committed at " << ts.get<absl::Time>().value();
  } else {
    std::cout << commit_result->commit_timestamp.status();
  }
  std::cout << "\n";
}

המאמרים הבאים