קריאות מחוץ לעסקאות

בדף הזה מוסבר איך לבצע קריאות ב-Spanner מחוץ להקשר של טרנזקציות לקריאה בלבד ולקריאה וכתיבה. אם אחד מהמצבים הבאים רלוונטי, כדאי לעיין בדף עסקאות:

  • אם אתם צריכים לכתוב, בהתאם לערך של קריאה אחת או יותר, אתם צריכים לבצע את הקריאה כחלק מעסקת קריאה-כתיבה. מידע נוסף זמין במאמר בנושא עסקאות קריאה-כתיבה.

  • אם אתם מבצעים כמה קריאות שדורשות תצוגה עקבית של הנתונים, כדאי לבצע את הקריאות כחלק מעסקה לקריאה בלבד. מידע נוסף זמין במאמר בנושא עסקאות לקריאה בלבד.

סוגי קריאה

‫Spanner מאפשר לכם לקבוע את רמת העדכניות של הנתונים כשאתם קוראים נתונים, באמצעות שני סוגים של קריאות:

  • קריאה חזקה היא קריאה בחותמת זמן נוכחית, ויש הבטחה שיוצגו בה כל הנתונים שנשמרו עד תחילת הקריאה. ב-Spanner, ברירת המחדל היא שימוש בקריאות חזקות כדי להציג בקשות קריאה.
  • קריאה בעבר היא קריאה שמתבצעת בחותמת זמן שכבר חלפה. אם האפליקציה שלכם רגישה לזמן אחזור אבל סובלנית לנתונים לא עדכניים, קריאות לא עדכניות יכולות לשפר את הביצועים.

כדי לבחור את סוג הקריאה הרצוי, מגדירים חסימה של חותמת הזמן בבקשת הקריאה. כדאי להשתמש בשיטות המומלצות הבאות כשבוחרים חותמת זמן:

  • כדאי לבחור קריאות חזקות כשאפשר. זהו חותם הזמן שמוגדר כברירת מחדל לקריאות ב-Spanner, כולל עסקאות לקריאה בלבד. קריאות חזקות מבטיחות את ההשפעות של כל העסקאות שבוצעו לפני תחילת הפעולה, ללא קשר לרפליקה שמקבלת את הקריאה. לכן, קריאות חזקות מפשטות את קוד האפליקציה והופכות את האפליקציות לאמינות יותר. מידע נוסף על מאפייני העקביות של Spanner זמין במאמר TrueTime and External Consistency.

    אפשרות נוספת היא להעניק לאזורים שאינם אזורים ראשיים, לאזורים עם הרשאות קריאה וכתיבה או לאזורים עם הרשאות קריאה בלבד את הסטטוס read lease region, כדי לצמצם את זמן האחזור של קריאה בעסקאות לקריאה בלבד שנדרש בהן מודל עקביות חזק. כדי לעשות את זה, משתמשים בהסכמי שכירות לקריאה ב-Spanner. אזורי חכירה לקריאה עוזרים למסד הנתונים להפחית את זמן האחזור של קריאה חזקה במקרים של מכונות בשני אזורים או במספר אזורים. עם זאת, פעולות כתיבה יחוו השהיה ארוכה יותר כשמשתמשים בהרשאת קריאה.

  • אם השהייה הופכת קריאות חזקות ללא מעשיות במצבים מסוימים, כדאי להשתמש בקריאות לא עדכניות (עדכניות מוגבלת או עדכניות מדויקת) כדי לשפר את הביצועים במקומות שבהם לא צריך שהקריאות יהיו עדכניות ככל האפשר. כמו שמתואר בדף שכפול, 15 שניות הוא ערך סביר של נתונים לא עדכניים לשימוש כדי להשיג ביצועים טובים.

קריאת נתונים באמצעות תפקיד במסד נתונים

אם אתם משתמשים בבקרת גישה ברמת גרנולריות גבוהה, אתם צריכים לבחור תפקיד במסד הנתונים כדי להריץ הצהרות ושאילתות של SQL, וכדי לבצע פעולות על שורות במסד הנתונים. התפקיד שבחרתם יישאר בתוקף לאורך כל הסשן, עד שתשנו אותו.

הוראות לביצוע קריאה באמצעות תפקיד במסד נתונים מופיעות במאמר גישה למסד נתונים עם בקרת גישה ברמת גרנולריות גבוהה.

שיטות קריאה יחידות

‫Spanner תומך בשיטות קריאה יחידות (כלומר, קריאה מחוץ להקשר של טרנזקציה) במסד נתונים עבור:

  • ביצוע הקריאה כהצהרת שאילתת SQL או באמצעות API לקריאה של Spanner.
  • ביצוע קריאה חזקה משורה אחת או מכמה שורות בטבלה.
  • ביצוע קריאה בעבר משורה אחת או מכמה שורות בטבלה.
  • קריאה משורה אחת או מכמה שורות באינדקס משני.

אם רוצים להפנות קריאות בודדות לרפליקה או לאזור ספציפיים בהגדרת מופע מרובה אזורים או בהגדרה אזורית בהתאמה אישית עם אזורים אופציונליים לקריאה בלבד, אפשר לעיין במאמר בנושא קריאות מופנות.

בקטעים הבאים מוסבר איך להשתמש בשיטות קריאה באמצעות ספריות לקוח של Spanner.

הפעלת שאילתה

בדוגמה הבאה מוצג איך להריץ הצהרת שאילתת SQL על מסד נתונים.

GoogleSQL

C++‎

משתמשים ב-ExecuteQuery() כדי להריץ הצהרת שאילתת SQL על מסד נתונים.

void QueryData(google::cloud::spanner::Client client) {
  namespace spanner = ::google::cloud::spanner;

  spanner::SqlStatement select("SELECT SingerId, LastName FROM Singers");
  using RowType = std::tuple<std::int64_t, std::string>;
  auto rows = client.ExecuteQuery(std::move(select));
  for (auto& row : spanner::StreamOf<RowType>(rows)) {
    if (!row) throw std::move(row).status();
    std::cout << "SingerId: " << std::get<0>(*row) << "\t";
    std::cout << "LastName: " << std::get<1>(*row) << "\n";
  }

  std::cout << "Query completed for [spanner_query_data]\n";
}

C#‎

אפשר להשתמש ב-ExecuteReaderAsync() כדי להריץ שאילתות במסד הנתונים.


using Google.Cloud.Spanner.Data;
using System.Collections.Generic;
using System.Threading.Tasks;

public class QuerySampleDataAsyncSample
{
    public class Album
    {
        public int SingerId { get; set; }
        public int AlbumId { get; set; }
        public string AlbumTitle { get; set; }
    }

    public async Task<List<Album>> QuerySampleDataAsync(string projectId, string instanceId, string databaseId)
    {
        string connectionString = $"Data Source=projects/{projectId}/instances/{instanceId}/databases/{databaseId}";

        var albums = new List<Album>();
        using var connection = new SpannerConnection(connectionString);
        using var cmd = connection.CreateSelectCommand("SELECT SingerId, AlbumId, AlbumTitle FROM Albums");

        using var reader = await cmd.ExecuteReaderAsync();
        while (await reader.ReadAsync())
        {
            albums.Add(new Album
            {
                AlbumId = reader.GetFieldValue<int>("AlbumId"),
                SingerId = reader.GetFieldValue<int>("SingerId"),
                AlbumTitle = reader.GetFieldValue<string>("AlbumTitle")
            });
        }
        return albums;
    }
}

Go

משתמשים ב-Client.Single().Query כדי להריץ שאילתות במסד הנתונים.


import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/spanner"
	"google.golang.org/api/iterator"
)

func query(w io.Writer, db string) error {
	ctx := context.Background()
	client, err := spanner.NewClient(ctx, db)
	if err != nil {
		return err
	}
	defer client.Close()

	stmt := spanner.Statement{SQL: `SELECT SingerId, AlbumId, AlbumTitle FROM Albums`}
	iter := client.Single().Query(ctx, stmt)
	defer iter.Stop()
	for {
		row, err := iter.Next()
		if err == iterator.Done {
			return nil
		}
		if err != nil {
			return err
		}
		var singerID, albumID int64
		var albumTitle string
		if err := row.Columns(&singerID, &albumID, &albumTitle); err != nil {
			return err
		}
		fmt.Fprintf(w, "%d %d %s\n", singerID, albumID, albumTitle)
	}
}

Java

משתמשים ב-ReadContext.executeQuery כדי להריץ שאילתות במסד הנתונים.

static void query(DatabaseClient dbClient) {
  try (ResultSet resultSet =
      dbClient
          .singleUse() // Execute a single read or query against Cloud Spanner.
          .executeQuery(Statement.of("SELECT SingerId, AlbumId, AlbumTitle FROM Albums"))) {
    while (resultSet.next()) {
      System.out.printf(
          "%d %d %s\n", resultSet.getLong(0), resultSet.getLong(1), resultSet.getString(2));
    }
  }
}

Node.js

משתמשים ב-Database.run כדי להריץ שאילתות במסד הנתונים.

// Imports the Google Cloud client library
const {Spanner} = require('@google-cloud/spanner');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// const projectId = 'my-project-id';
// const instanceId = 'my-instance';
// const databaseId = 'my-database';

// Creates a client
const spanner = new Spanner({
  projectId: projectId,
});

// Gets a reference to a Cloud Spanner instance and database
const instance = spanner.instance(instanceId);
const database = instance.database(databaseId);

const query = {
  sql: 'SELECT SingerId, AlbumId, AlbumTitle FROM Albums',
};

// Queries rows from the Albums table
try {
  const [rows] = await database.run(query);

  rows.forEach(row => {
    const json = row.toJSON();
    console.log(
      `SingerId: ${json.SingerId}, AlbumId: ${json.AlbumId}, AlbumTitle: ${json.AlbumTitle}`,
    );
  });
} catch (err) {
  console.error('ERROR:', err);
} finally {
  // Close the database when finished.
  await database.close();
}

PHP

משתמשים ב-Database::execute כדי להריץ שאילתות במסד הנתונים.

use Google\Cloud\Spanner\SpannerClient;

/**
 * Queries sample data from the database using SQL.
 * Example:
 * ```
 * query_data($instanceId, $databaseId);
 * ```
 *
 * @param string $instanceId The Spanner instance ID.
 * @param string $databaseId The Spanner database ID.
 */
function query_data(string $instanceId, string $databaseId): void
{
    $spanner = new SpannerClient();
    $instance = $spanner->instance($instanceId);
    $database = $instance->database($databaseId);

    $results = $database->execute(
        'SELECT SingerId, AlbumId, AlbumTitle FROM Albums'
    );

    foreach ($results as $row) {
        printf('SingerId: %s, AlbumId: %s, AlbumTitle: %s' . PHP_EOL,
            $row['SingerId'], $row['AlbumId'], $row['AlbumTitle']);
    }
}

Python

משתמשים ב-Database.execute_sql כדי להריץ שאילתות במסד הנתונים.

def query_data(instance_id, database_id):
    """Queries sample data from the database using SQL."""
    spanner_client = spanner.Client()
    instance = spanner_client.instance(instance_id)
    database = instance.database(database_id)

    with database.snapshot() as snapshot:
        results = snapshot.execute_sql(
            "SELECT SingerId, AlbumId, AlbumTitle FROM Albums"
        )

        for row in results:
            print("SingerId: {}, AlbumId: {}, AlbumTitle: {}".format(*row))

Ruby

משתמשים ב-Client#execute כדי להריץ שאילתות במסד הנתונים.

# project_id  = "Your Google Cloud project ID"
# instance_id = "Your Spanner instance ID"
# database_id = "Your Spanner database ID"

require "google/cloud/spanner"

spanner = Google::Cloud::Spanner.new project: project_id
client  = spanner.client instance_id, database_id

client.execute("SELECT SingerId, AlbumId, AlbumTitle FROM Albums").rows.each do |row|
  puts "#{row[:SingerId]} #{row[:AlbumId]} #{row[:AlbumTitle]}"
end

כשיוצרים הצהרת SQL, כדאי לעיין במאמרים בנושא תחביר שאילתות ופונקציות ואופרטורים ב-SQL.

ביצוע קריאה חזקה

בדוגמה הבאה מוצג איך לבצע קריאה חזקה של אפס שורות או יותר ממסד נתונים.

GoogleSQL

C++‎

הקוד לקריאת נתונים זהה לדוגמה הקודמת של שאילתת Spanner על ידי הרצת שאילתת SQL.

void QueryData(google::cloud::spanner::Client client) {
  namespace spanner = ::google::cloud::spanner;

  spanner::SqlStatement select("SELECT SingerId, LastName FROM Singers");
  using RowType = std::tuple<std::int64_t, std::string>;
  auto rows = client.ExecuteQuery(std::move(select));
  for (auto& row : spanner::StreamOf<RowType>(rows)) {
    if (!row) throw std::move(row).status();
    std::cout << "SingerId: " << std::get<0>(*row) << "\t";
    std::cout << "LastName: " << std::get<1>(*row) << "\n";
  }

  std::cout << "Query completed for [spanner_query_data]\n";
}

C#‎

הקוד לקריאת נתונים זהה לדוגמה הקודמת של שאילתת Spanner על ידי הרצת שאילתת SQL.


using Google.Cloud.Spanner.Data;
using System.Collections.Generic;
using System.Threading.Tasks;

public class QuerySampleDataAsyncSample
{
    public class Album
    {
        public int SingerId { get; set; }
        public int AlbumId { get; set; }
        public string AlbumTitle { get; set; }
    }

    public async Task<List<Album>> QuerySampleDataAsync(string projectId, string instanceId, string databaseId)
    {
        string connectionString = $"Data Source=projects/{projectId}/instances/{instanceId}/databases/{databaseId}";

        var albums = new List<Album>();
        using var connection = new SpannerConnection(connectionString);
        using var cmd = connection.CreateSelectCommand("SELECT SingerId, AlbumId, AlbumTitle FROM Albums");

        using var reader = await cmd.ExecuteReaderAsync();
        while (await reader.ReadAsync())
        {
            albums.Add(new Album
            {
                AlbumId = reader.GetFieldValue<int>("AlbumId"),
                SingerId = reader.GetFieldValue<int>("SingerId"),
                AlbumTitle = reader.GetFieldValue<string>("AlbumTitle")
            });
        }
        return albums;
    }
}

Go

משתמשים ב-Client.Single().Read כדי לקרוא שורות ממסד הנתונים.


import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/spanner"
	"google.golang.org/api/iterator"
)

func read(w io.Writer, db string) error {
	ctx := context.Background()
	client, err := spanner.NewClient(ctx, db)
	if err != nil {
		return err
	}
	defer client.Close()

	iter := client.Single().Read(ctx, "Albums", spanner.AllKeys(),
		[]string{"SingerId", "AlbumId", "AlbumTitle"})
	defer iter.Stop()
	for {
		row, err := iter.Next()
		if err == iterator.Done {
			return nil
		}
		if err != nil {
			return err
		}
		var singerID, albumID int64
		var albumTitle string
		if err := row.Columns(&singerID, &albumID, &albumTitle); err != nil {
			return err
		}
		fmt.Fprintf(w, "%d %d %s\n", singerID, albumID, albumTitle)
	}
}

בדוגמה נעשה שימוש ב-AllKeys כדי להגדיר אוסף של מפתחות או טווחי מפתחות לקריאה.

Java

משתמשים ב-ReadContext.read כדי לקרוא שורות ממסד הנתונים.

static void read(DatabaseClient dbClient) {
  try (ResultSet resultSet =
      dbClient
          .singleUse()
          .read(
              "Albums",
              KeySet.all(), // Read all rows in a table.
              Arrays.asList("SingerId", "AlbumId", "AlbumTitle"))) {
    while (resultSet.next()) {
      System.out.printf(
          "%d %d %s\n", resultSet.getLong(0), resultSet.getLong(1), resultSet.getString(2));
    }
  }
}

בדוגמה נעשה שימוש ב-KeySet כדי להגדיר אוסף של מפתחות או טווחי מפתחות לקריאה.

Node.js

משתמשים ב-Table.read כדי לקרוא שורות ממסד הנתונים.

// Imports the Google Cloud client library
const {Spanner} = require('@google-cloud/spanner');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// const projectId = 'my-project-id';
// const instanceId = 'my-instance';
// const databaseId = 'my-database';

// Creates a client
const spanner = new Spanner({
  projectId: projectId,
});

// Gets a reference to a Cloud Spanner instance and database
const instance = spanner.instance(instanceId);
const database = instance.database(databaseId);

// Reads rows from the Albums table
const albumsTable = database.table('Albums');

const query = {
  columns: ['SingerId', 'AlbumId', 'AlbumTitle'],
  keySet: {
    all: true,
  },
};

try {
  const [rows] = await albumsTable.read(query);

  rows.forEach(row => {
    const json = row.toJSON();
    console.log(
      `SingerId: ${json.SingerId}, AlbumId: ${json.AlbumId}, AlbumTitle: ${json.AlbumTitle}`,
    );
  });
} catch (err) {
  console.error('ERROR:', err);
} finally {
  // Close the database when finished.
  await database.close();
}

בדוגמה נעשה שימוש ב-keySet כדי להגדיר אוסף של מפתחות או טווחי מפתחות לקריאה.

PHP

משתמשים ב-Database::read כדי לקרוא שורות ממסד הנתונים.

use Google\Cloud\Spanner\SpannerClient;

/**
 * Reads sample data from the database.
 * Example:
 * ```
 * read_data($instanceId, $databaseId);
 * ```
 *
 * @param string $instanceId The Spanner instance ID.
 * @param string $databaseId The Spanner database ID.
 */
function read_data(string $instanceId, string $databaseId): void
{
    $spanner = new SpannerClient();
    $instance = $spanner->instance($instanceId);
    $database = $instance->database($databaseId);

    $keySet = $spanner->keySet(['all' => true]);
    $results = $database->read(
        'Albums',
        $keySet,
        ['SingerId', 'AlbumId', 'AlbumTitle']
    );

    foreach ($results->rows() as $row) {
        printf('SingerId: %s, AlbumId: %s, AlbumTitle: %s' . PHP_EOL,
            $row['SingerId'], $row['AlbumId'], $row['AlbumTitle']);
    }
}

בדוגמה נעשה שימוש ב-keySet כדי להגדיר אוסף של מפתחות או טווחי מפתחות לקריאה.

Python

משתמשים ב-Database.read כדי לקרוא שורות ממסד הנתונים.

def read_data(instance_id, database_id):
    """Reads sample data from the database."""
    spanner_client = spanner.Client()
    instance = spanner_client.instance(instance_id)
    database = instance.database(database_id)

    with database.snapshot() as snapshot:
        keyset = spanner.KeySet(all_=True)
        results = snapshot.read(
            table="Albums", columns=("SingerId", "AlbumId", "AlbumTitle"), keyset=keyset
        )

        for row in results:
            print("SingerId: {}, AlbumId: {}, AlbumTitle: {}".format(*row))

בדוגמה נעשה שימוש ב-KeySet כדי להגדיר אוסף של מפתחות או טווחי מפתחות לקריאה.

Ruby

משתמשים ב-Client#read כדי לקרוא שורות ממסד הנתונים.

# project_id  = "Your Google Cloud project ID"
# instance_id = "Your Spanner instance ID"
# database_id = "Your Spanner database ID"

require "google/cloud/spanner"

spanner = Google::Cloud::Spanner.new project: project_id
client  = spanner.client instance_id, database_id

client.read("Albums", [:SingerId, :AlbumId, :AlbumTitle]).rows.each do |row|
  puts "#{row[:SingerId]} #{row[:AlbumId]} #{row[:AlbumTitle]}"
end

ביצוע קריאה בעבר

בדוגמת הקוד הבאה מוצגות דרכים לבצע קריאה בעבר של אפס שורות או יותר ממסד נתונים באמצעות חותמת זמן של דיוק מוחלט של נתונים. הוראות לביצוע קריאה של נתונים לא עדכניים באמצעות חותמת זמן של bounded-staleness מופיעות בהערה אחרי הקוד לדוגמה. מידע נוסף על הסוגים השונים של חותמות זמן זמין במאמר בנושא גבולות של חותמות זמן.

GoogleSQL

C++‎

כדי לבצע קריאה בעבר, משתמשים ב-ExecuteQuery() עם MakeReadOnlyTransaction() ו-Transaction::ReadOnlyOptions().

void ReadStaleData(google::cloud::spanner::Client client) {
  namespace spanner = ::google::cloud::spanner;
  // The timestamp chosen using the `exact_staleness` parameter is bounded
  // below by the creation time of the database, so the visible state may only
  // include that generated by the `extra_statements` executed atomically with
  // the creation of the database. Here we at least know `Albums` exists.
  auto opts = spanner::Transaction::ReadOnlyOptions(std::chrono::seconds(15));
  auto read_only = spanner::MakeReadOnlyTransaction(std::move(opts));

  spanner::SqlStatement select(
      "SELECT SingerId, AlbumId, AlbumTitle FROM Albums");
  using RowType = std::tuple<std::int64_t, std::int64_t, std::string>;

  auto rows = client.ExecuteQuery(std::move(read_only), std::move(select));
  for (auto& row : spanner::StreamOf<RowType>(rows)) {
    if (!row) throw std::move(row).status();
    std::cout << "SingerId: " << std::get<0>(*row)
              << " AlbumId: " << std::get<1>(*row)
              << " AlbumTitle: " << std::get<2>(*row) << "\n";
  }
}

C#‎

משתמשים בשיטה BeginReadOnlyTransactionAsync ב-connection עם ערך TimestampBound.OfExactStaleness() שצוין כדי להריץ שאילתה במסד הנתונים.


using Google.Cloud.Spanner.Data;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;

public class ReadStaleDataAsyncSample
{
    public class Album
    {
        public int SingerId { get; set; }
        public int AlbumId { get; set; }
        public long? MarketingBudget { get; set; }
    }

    public async Task<List<Album>> ReadStaleDataAsync(string projectId, string instanceId, string databaseId)
    {
        string connectionString = $"Data Source=projects/{projectId}/instances/{instanceId}/databases/{databaseId}";

        using var connection = new SpannerConnection(connectionString);
        await connection.OpenAsync();

        var staleness = TimestampBound.OfExactStaleness(TimeSpan.FromSeconds(15));
        using var transaction = await connection.BeginTransactionAsync(
            SpannerTransactionCreationOptions.ForTimestampBoundReadOnly(staleness),
            transactionOptions: null,
            cancellationToken: default);
        using var cmd = connection.CreateSelectCommand("SELECT SingerId, AlbumId, MarketingBudget FROM Albums");
        cmd.Transaction = transaction;

        var albums = new List<Album>();
        using var reader = await cmd.ExecuteReaderAsync();
        while (await reader.ReadAsync())
        {
            albums.Add(new Album
            {
                SingerId = reader.GetFieldValue<int>("SingerId"),
                AlbumId = reader.GetFieldValue<int>("AlbumId"),
                MarketingBudget = reader.IsDBNull(reader.GetOrdinal("MarketingBudget")) ? 0 : reader.GetFieldValue<long>("MarketingBudget")
            });
        }
        return albums;
    }
}

Go

משתמשים ב-Client.ReadOnlyTransaction().WithTimestampBound() ומציינים ערך ExactStaleness כדי לבצע קריאה של שורות ממסד הנתונים באמצעות חותמת זמן מדויקת של חוסר עדכניות.


import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/spanner"
	"google.golang.org/api/iterator"
)

func readStaleData(w io.Writer, db string) error {
	ctx := context.Background()
	client, err := spanner.NewClient(ctx, db)
	if err != nil {
		return err
	}
	defer client.Close()

	ro := client.ReadOnlyTransaction().WithTimestampBound(spanner.ExactStaleness(15 * time.Second))
	defer ro.Close()

	iter := ro.Read(ctx, "Albums", spanner.AllKeys(), []string{"SingerId", "AlbumId", "AlbumTitle"})
	defer iter.Stop()
	for {
		row, err := iter.Next()
		if err == iterator.Done {
			return nil
		}
		if err != nil {
			return err
		}
		var singerID int64
		var albumID int64
		var albumTitle string
		if err := row.Columns(&singerID, &albumID, &albumTitle); err != nil {
			return err
		}
		fmt.Fprintf(w, "%d %d %s\n", singerID, albumID, albumTitle)
	}
}

בדוגמה נעשה שימוש ב-AllKeys כדי להגדיר אוסף של מפתחות או טווחי מפתחות לקריאה.

Java

משתמשים בשיטה read של ReadContext עם TimestampBound.ofExactStaleness() מוגדר כדי לבצע קריאה של שורות ממסד הנתונים באמצעות חותמת זמן מדויקת של הגדרת רמת עדכניות.

static void readStaleData(DatabaseClient dbClient) {
  try (ResultSet resultSet =
      dbClient
          .singleUse(TimestampBound.ofExactStaleness(15, TimeUnit.SECONDS))
          .read(
              "Albums", KeySet.all(), Arrays.asList("SingerId", "AlbumId", "MarketingBudget"))) {
    while (resultSet.next()) {
      System.out.printf(
          "%d %d %s\n",
          resultSet.getLong(0),
          resultSet.getLong(1),
          resultSet.isNull(2) ? "NULL" : resultSet.getLong("MarketingBudget"));
    }
  }
}

בדוגמה נעשה שימוש ב-KeySet כדי להגדיר אוסף של מפתחות או טווחי מפתחות לקריאה.

Node.js

משתמשים באפשרות Table.read עם exactStaleness כדי לבצע קריאה של שורות ממסד הנתונים באמצעות חותמת זמן מדויקת של נתונים לא עדכניים.

// Imports the Google Cloud client library
const {Spanner} = require('@google-cloud/spanner');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// const projectId = 'my-project-id';
// const instanceId = 'my-instance';
// const databaseId = 'my-database';

// Creates a client
const spanner = new Spanner({
  projectId: projectId,
});

// Gets a reference to a Cloud Spanner instance and database
const instance = spanner.instance(instanceId);
const database = instance.database(databaseId);

// Reads rows from the Albums table
const albumsTable = database.table('Albums');

const query = {
  columns: ['SingerId', 'AlbumId', 'AlbumTitle', 'MarketingBudget'],
  keySet: {
    all: true,
  },
};

const options = {
  // Guarantees that all writes committed more than 15000 milliseconds ago are visible
  exactStaleness: 15000,
};

try {
  const [rows] = await albumsTable.read(query, options);

  rows.forEach(row => {
    const json = row.toJSON();
    const id = json.SingerId;
    const album = json.AlbumId;
    const title = json.AlbumTitle;
    const budget = json.MarketingBudget ? json.MarketingBudget : '';
    console.log(
      `SingerId: ${id}, AlbumId: ${album}, AlbumTitle: ${title}, MarketingBudget: ${budget}`,
    );
  });
} catch (err) {
  console.error('ERROR:', err);
} finally {
  // Close the database when finished.
  await database.close();
}

בדוגמה נעשה שימוש ב-keySet כדי להגדיר אוסף של מפתחות או טווחי מפתחות לקריאה.

PHP

משתמשים ב-Database::read עם ערך exactStaleness שצוין כדי לבצע קריאה של שורות ממסד הנתונים באמצעות חותמת זמן מדויקת של חוסר עדכניות.

use Google\Protobuf\Duration;
use Google\Cloud\Spanner\SpannerClient;

/**
 * Reads sample data from the database.  The data is exactly 15 seconds stale.
 * Guarantees that all writes committed more than 15 seconds ago are visible.
 * Example:
 * ```
 * read_stale_data
 *($instanceId, $databaseId);
 * ```
 *
 * @param string $instanceId The Spanner instance ID.
 * @param string $databaseId The Spanner database ID.
 */
function read_stale_data(string $instanceId, string $databaseId): void
{
    $spanner = new SpannerClient();
    $instance = $spanner->instance($instanceId);
    $database = $instance->database($databaseId);
    $keySet = $spanner->keySet(['all' => true]);
    $results = $database->read(
        'Albums',
        $keySet,
        ['SingerId', 'AlbumId', 'AlbumTitle'],
        ['exactStaleness' => new Duration(['seconds' => 15])]
    );

    foreach ($results->rows() as $row) {
        printf('SingerId: %s, AlbumId: %s, AlbumTitle: %s' . PHP_EOL,
            $row['SingerId'], $row['AlbumId'], $row['AlbumTitle']);
    }
}

בדוגמה נעשה שימוש ב-keySet כדי להגדיר אוסף של מפתחות או טווחי מפתחות לקריאה.

Python

משתמשים בשיטה read של Database snapshot עם ערך exact_staleness שצוין כדי לבצע קריאה של שורות ממסד הנתונים באמצעות חותמת זמן מדויקת של חוסר עדכניות.

def read_stale_data(instance_id, database_id):
    """Reads sample data from the database. The data is exactly 15 seconds
    stale."""
    import datetime

    spanner_client = spanner.Client()
    instance = spanner_client.instance(instance_id)
    database = instance.database(database_id)
    staleness = datetime.timedelta(seconds=15)

    with database.snapshot(exact_staleness=staleness) as snapshot:
        keyset = spanner.KeySet(all_=True)
        results = snapshot.read(
            table="Albums",
            columns=("SingerId", "AlbumId", "MarketingBudget"),
            keyset=keyset,
        )

        for row in results:
            print("SingerId: {}, AlbumId: {}, MarketingBudget: {}".format(*row))

בדוגמה נעשה שימוש ב-KeySet כדי להגדיר אוסף של מפתחות או טווחי מפתחות לקריאה.

Ruby

משתמשים בשיטה read של תמונת מצב Client עם staleness ערך שצוין (בשניות) כדי לבצע קריאה של שורות ממסד הנתונים באמצעות חותמת זמן מדויקת של חוסר עדכניות.

# project_id  = "Your Google Cloud project ID"
# instance_id = "Your Spanner instance ID"
# database_id = "Your Spanner database ID"
require "google/cloud/spanner"

spanner = Google::Cloud::Spanner.new project: project_id
client  = spanner.client instance_id, database_id

# Perform a read with a data staleness of 15 seconds
client.snapshot staleness: 15 do |snapshot|
  snapshot.read("Albums", [:SingerId, :AlbumId, :AlbumTitle]).rows.each do |row|
    puts "#{row[:SingerId]} #{row[:AlbumId]} #{row[:AlbumTitle]}"
  end
end

ביצוע קריאה באמצעות אינדקס

בדוגמה הבאה אפשר לראות איך קוראים אפס שורות או יותר ממסד נתונים באמצעות אינדקס:

GoogleSQL

C++‎

משתמשים בפונקציה Read() כדי לבצע קריאה באמצעות אינדקס.

void ReadDataWithIndex(google::cloud::spanner::Client client) {
  namespace spanner = ::google::cloud::spanner;

  auto rows =
      client.Read("Albums", google::cloud::spanner::KeySet::All(),
                  {"AlbumId", "AlbumTitle"},
                  google::cloud::Options{}.set<spanner::ReadIndexNameOption>(
                      "AlbumsByAlbumTitle"));
  using RowType = std::tuple<std::int64_t, std::string>;
  for (auto& row : spanner::StreamOf<RowType>(rows)) {
    if (!row) throw std::move(row).status();
    std::cout << "AlbumId: " << std::get<0>(*row) << "\t";
    std::cout << "AlbumTitle: " << std::get<1>(*row) << "\n";
  }
  std::cout << "Read completed for [spanner_read_data_with_index]\n";
}

C#‎

כדי לקרוא נתונים באמצעות האינדקס, מריצים שאילתה שמציינת במפורש את האינדקס:


using Google.Cloud.Spanner.Data;
using System.Collections.Generic;
using System.Threading.Tasks;

public class QueryDataWithIndexAsyncSample
{
    public class Album
    {
        public int AlbumId { get; set; }
        public string AlbumTitle { get; set; }
        public long MarketingBudget { get; set; }
    }

    public async Task<List<Album>> QueryDataWithIndexAsync(string projectId, string instanceId, string databaseId,
        string startTitle, string endTitle)
    {
        string connectionString = $"Data Source=projects/{projectId}/instances/{instanceId}/databases/{databaseId}";
        using var connection = new SpannerConnection(connectionString);
        using var cmd = connection.CreateSelectCommand(
            "SELECT AlbumId, AlbumTitle, MarketingBudget FROM Albums@ "
            + "{FORCE_INDEX=AlbumsByAlbumTitle} "
            + $"WHERE AlbumTitle >= @startTitle "
            + $"AND AlbumTitle < @endTitle",
            new SpannerParameterCollection
            {
                { "startTitle", SpannerDbType.String, startTitle },
                { "endTitle", SpannerDbType.String, endTitle }
            });

        var albums = new List<Album>();
        using var reader = await cmd.ExecuteReaderAsync();
        while (await reader.ReadAsync())
        {
            albums.Add(new Album
            {
                AlbumId = reader.GetFieldValue<int>("AlbumId"),
                AlbumTitle = reader.GetFieldValue<string>("AlbumTitle"),
                MarketingBudget = reader.IsDBNull(reader.GetOrdinal("MarketingBudget")) ? 0 : reader.GetFieldValue<long>("MarketingBudget")
            });
        }
        return albums;
    }
}

Go

משתמשים ב-Client.Single().ReadUsingIndex כדי לקרוא שורות ממסד הנתונים באמצעות אינדקס.


import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/spanner"
	"google.golang.org/api/iterator"
)

func readUsingIndex(w io.Writer, db string) error {
	ctx := context.Background()
	client, err := spanner.NewClient(ctx, db)
	if err != nil {
		return err
	}
	defer client.Close()

	iter := client.Single().ReadUsingIndex(ctx, "Albums", "AlbumsByAlbumTitle", spanner.AllKeys(),
		[]string{"AlbumId", "AlbumTitle"})
	defer iter.Stop()
	for {
		row, err := iter.Next()
		if err == iterator.Done {
			return nil
		}
		if err != nil {
			return err
		}
		var albumID int64
		var albumTitle string
		if err := row.Columns(&albumID, &albumTitle); err != nil {
			return err
		}
		fmt.Fprintf(w, "%d %s\n", albumID, albumTitle)
	}
}

Java

משתמשים ב-ReadContext.readUsingIndex כדי לקרוא שורות ממסד הנתונים באמצעות אינדקס.

static void readUsingIndex(DatabaseClient dbClient) {
  try (ResultSet resultSet =
      dbClient
          .singleUse()
          .readUsingIndex(
              "Albums",
              "AlbumsByAlbumTitle",
              KeySet.all(),
              Arrays.asList("AlbumId", "AlbumTitle"))) {
    while (resultSet.next()) {
      System.out.printf("%d %s\n", resultSet.getLong(0), resultSet.getString(1));
    }
  }
}

Node.js

משתמשים ב-Table.read ומציינים את האינדקס בשאילתה כדי לקרוא שורות ממסד הנתונים באמצעות אינדקס.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const instanceId = 'my-instance';
// const databaseId = 'my-database';
// const projectId = 'my-project-id';

// Imports the Google Cloud Spanner client library
const {Spanner} = require('@google-cloud/spanner');

// Instantiates a client
const spanner = new Spanner({
  projectId: projectId,
});

async function readDataWithIndex() {
  // Gets a reference to a Cloud Spanner instance and database
  const instance = spanner.instance(instanceId);
  const database = instance.database(databaseId);

  const albumsTable = database.table('Albums');

  const query = {
    columns: ['AlbumId', 'AlbumTitle'],
    keySet: {
      all: true,
    },
    index: 'AlbumsByAlbumTitle',
  };

  // Reads the Albums table using an index
  try {
    const [rows] = await albumsTable.read(query);

    rows.forEach(row => {
      const json = row.toJSON();
      console.log(`AlbumId: ${json.AlbumId}, AlbumTitle: ${json.AlbumTitle}`);
    });
  } catch (err) {
    console.error('ERROR:', err);
  } finally {
    // Close the database when finished.
    database.close();
  }
}
readDataWithIndex();

PHP

משתמשים ב-Database::read ומציינים את האינדקס כדי לקרוא שורות ממסד הנתונים באמצעות אינדקס.

use Google\Cloud\Spanner\SpannerClient;

/**
 * Reads sample data from the database using an index.
 *
 * The index must exist before running this sample. You can add the index
 * by running the `add_index` sample or by running this DDL statement against
 * your database:
 *
 *     CREATE INDEX AlbumsByAlbumTitle ON Albums(AlbumTitle)
 *
 * Example:
 * ```
 * read_data_with_index($instanceId, $databaseId);
 * ```
 *
 * @param string $instanceId The Spanner instance ID.
 * @param string $databaseId The Spanner database ID.
 */
function read_data_with_index(string $instanceId, string $databaseId): void
{
    $spanner = new SpannerClient();
    $instance = $spanner->instance($instanceId);
    $database = $instance->database($databaseId);

    $keySet = $spanner->keySet(['all' => true]);
    $results = $database->read(
        'Albums',
        $keySet,
        ['AlbumId', 'AlbumTitle'],
        ['index' => 'AlbumsByAlbumTitle']
    );

    foreach ($results->rows() as $row) {
        printf('AlbumId: %s, AlbumTitle: %s' . PHP_EOL,
            $row['AlbumId'], $row['AlbumTitle']);
    }
}

Python

משתמשים ב-Database.read ומציינים את האינדקס כדי לקרוא שורות ממסד הנתונים באמצעות אינדקס.

def read_data_with_index(instance_id, database_id):
    """Reads sample data from the database using an index.

    The index must exist before running this sample. You can add the index
    by running the `add_index` sample or by running this DDL statement against
    your database:

        CREATE INDEX AlbumsByAlbumTitle ON Albums(AlbumTitle)

    """
    spanner_client = spanner.Client()
    instance = spanner_client.instance(instance_id)
    database = instance.database(database_id)

    with database.snapshot() as snapshot:
        keyset = spanner.KeySet(all_=True)
        results = snapshot.read(
            table="Albums",
            columns=("AlbumId", "AlbumTitle"),
            keyset=keyset,
            index="AlbumsByAlbumTitle",
        )

        for row in results:
            print("AlbumId: {}, AlbumTitle: {}".format(*row))

Ruby

משתמשים ב-Client#read ומציינים את האינדקס כדי לקרוא שורות ממסד הנתונים באמצעות אינדקס.

# project_id  = "Your Google Cloud project ID"
# instance_id = "Your Spanner instance ID"
# database_id = "Your Spanner database ID"

require "google/cloud/spanner"

spanner = Google::Cloud::Spanner.new project: project_id
client  = spanner.client instance_id, database_id

result = client.read "Albums", [:AlbumId, :AlbumTitle],
                     index: "AlbumsByAlbumTitle"

result.rows.each do |row|
  puts "#{row[:AlbumId]} #{row[:AlbumTitle]}"
end

קריאת נתונים במקביל

כשמבצעים פעולות קריאה או שאילתה בכמות גדולה של נתונים מ-Spanner, אפשר להשתמש ב-PartitionQuery API כדי לקבל תוצאות מהר יותר. ה-API מחלק את השאילתה למנות, או למחיצות, באמצעות כמה מכונות כדי לאחזר את המחיצות במקביל. חשוב לזכור שהשימוש ב-PartitionQuery API גורם לזמן אחזור ארוך יותר, כי הוא מיועד רק לפעולות בכמות גדולה כמו ייצוא או סריקה של כל מסד הנתונים.

אפשר לבצע כל פעולת קריאה של API במקביל באמצעות ספריות הלקוח של Spanner. עם זאת, אפשר לחלק שאילתות SQL רק אם הן ניתנות לחלוקה. כדי ששאילתה תהיה ניתנת לחלוקה למחיצות, תוכנית השאילתה צריכה לעמוד באחד מהתנאים הבאים:

  • האופרטור הראשון בתוכנית להפעלת השאילתה הוא איחוד מבוזר, ותוכנית הפעלת השאילתה מכילה רק איחוד מבוזר אחד (לא כולל איחודים מבוזרים מקומיים). תוכנית השאילתה לא יכולה להכיל אופרטורים מבוזרים אחרים, כמו distributed cross apply.

  • אין אופרטורים מבוזרים בתוכנית השאילתה.

ממשק PartitionQuery API מריץ את השאילתות במצב אצווה. יכול להיות ש-Spanner יבחר תוכנית להרצת שאילתות שתאפשר חלוקה של השאילתות למחיצות בסיסיות, כשהן מורצות במצב אצווה. כתוצאה מכך, יכול להיות ש-PartitionQuery API ו-Spanner Studio ישתמשו בתוכניות שונות להרצת שאילתות עבור אותה שאילתה. יכול להיות שלא תהיה לכם אפשרות לקבל את תוכנית הביצוע של השאילתה שבה נעשה שימוש ב-PartitionQuery API ב-Spanner Studio.

בשביל שאילתות עם חלוקה למחיצות כמו זו, אפשר להפעיל את התכונה 'הגדלת נפח הנתונים ב-Spanner'. התכונה Data Boost מאפשרת להריץ שאילתות אנליטיות גדולות עם השפעה כמעט אפסית על עומסי העבודה הקיימים במופע Spanner שהוקצה. בדף הזה מופיעות דוגמאות קוד ב-C++‎,‏ Go,‏ Java,‏ Node.js ו-Python שממחישות איך להפעיל את Data Boost.

מידע נוסף על Data Boost זמין במאמר סקירה כללית על Data Boost.

GoogleSQL

C++‎

בדוגמה הזו, המערכת מאחזרת מחיצות של שאילתת SQL של הטבלה Singers ומבצעת את השאילתה על כל מחיצה באמצעות השלבים הבאים:

  • יצירת טרנזקציית אצווה ב-Spanner.
  • יצירת מחיצות לשאילתה, כדי שאפשר יהיה להפיץ את המחיצות לכמה עובדים.
  • שליפת תוצאות השאילתה לכל מחיצה.
void UsePartitionQuery(google::cloud::spanner::Client client) {
  namespace spanner = ::google::cloud::spanner;
  auto txn = spanner::MakeReadOnlyTransaction();

  spanner::SqlStatement select(
      "SELECT SingerId, FirstName, LastName FROM Singers");
  using RowType = std::tuple<std::int64_t, std::string, std::string>;

  auto partitions = client.PartitionQuery(
      std::move(txn), std::move(select),
      google::cloud::Options{}.set<spanner::PartitionDataBoostOption>(true));
  if (!partitions) throw std::move(partitions).status();

  // You would probably choose to execute these partitioned queries in
  // separate threads/processes, or on a different machine.
  int number_of_rows = 0;
  for (auto const& partition : *partitions) {
    auto rows = client.ExecuteQuery(partition);
    for (auto& row : spanner::StreamOf<RowType>(rows)) {
      if (!row) throw std::move(row).status();
      number_of_rows++;
    }
  }
  std::cout << "Number of partitions: " << partitions->size() << "\n"
            << "Number of rows: " << number_of_rows << "\n";
  std::cout << "Read completed for [spanner_batch_client]\n";
}

C#‎

בדוגמה הזו, המערכת מאחזרת מחיצות של שאילתת SQL של הטבלה Singers ומבצעת את השאילתה על כל מחיצה באמצעות השלבים הבאים:

  • יצירת טרנזקציית אצווה ב-Spanner.
  • יצירת מחיצות לשאילתה, כדי שאפשר יהיה להפיץ את המחיצות לכמה עובדים.
  • שליפת תוצאות השאילתה לכל מחיצה.

using Google.Cloud.Spanner.Data;
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

public class BatchReadRecordsAsyncSample
{
    private int _rowsRead;
    private int _partitionCount;
    public async Task<(int RowsRead, int Partitions)> BatchReadRecordsAsync(string projectId, string instanceId, string databaseId)
    {
        string connectionString = $"Data Source=projects/{projectId}/instances/{instanceId}/databases/{databaseId}";
        using var connection = new SpannerConnection(connectionString);
        await connection.OpenAsync();

        using var transaction = await connection.BeginTransactionAsync(
            SpannerTransactionCreationOptions.ReadOnly.WithIsDetached(true),
            new SpannerTransactionOptions { DisposeBehavior = DisposeBehavior.CloseResources },
            cancellationToken: default);
        using var cmd = connection.CreateSelectCommand("SELECT SingerId, FirstName, LastName FROM Singers");
        cmd.Transaction = transaction;

        // A CommandPartition object is serializable and can be used from a different process.
        // If data boost is enabled, partitioned read and query requests will be executed
        // using Spanner independent compute resources.
        var partitions = await cmd.GetReaderPartitionsAsync(PartitionOptions.Default.WithDataBoostEnabled(true));

        var transactionId = transaction.TransactionId;
        await Task.WhenAll(partitions.Select(x => DistributedReadWorkerAsync(x, transactionId)));
        Console.WriteLine($"Done reading!  Total rows read: {_rowsRead:N0} with {_partitionCount} partition(s)");
        return (RowsRead: _rowsRead, Partitions: _partitionCount);
    }

    private async Task DistributedReadWorkerAsync(CommandPartition readPartition, TransactionId id)
    {
        var localId = Interlocked.Increment(ref _partitionCount);
        using var connection = new SpannerConnection(id.ConnectionString);
        using var transaction = await connection.BeginTransactionAsync(
            SpannerTransactionCreationOptions.FromReadOnlyTransactionId(id),
            transactionOptions: null,
            cancellationToken: default);
        using var cmd = connection.CreateCommandWithPartition(readPartition, transaction);
        using var reader = await cmd.ExecuteReaderAsync();
        while (await reader.ReadAsync())
        {
            Interlocked.Increment(ref _rowsRead);
            Console.WriteLine($"Partition ({localId}) "
                + $"{reader.GetFieldValue<int>("SingerId")}"
                + $" {reader.GetFieldValue<string>("FirstName")}"
                + $" {reader.GetFieldValue<string>("LastName")}");
        }
        Console.WriteLine($"Done with single reader {localId}.");
    }
}

Go

בדוגמה הזו, המערכת מאחזרת מחיצות של שאילתת SQL של הטבלה Singers ומבצעת את השאילתה על כל מחיצה באמצעות השלבים הבאים:

  • יצירת לקוח ועסקה ב-Spanner.
  • יצירת מחיצות לשאילתה, כדי שאפשר יהיה להפיץ את המחיצות לכמה עובדים.
  • שליפת תוצאות השאילתה לכל מחיצה.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/spanner"
	"google.golang.org/api/iterator"
)

func readBatchData(w io.Writer, db string) error {
	ctx := context.Background()
	client, err := spanner.NewClient(ctx, db)
	if err != nil {
		return err
	}
	defer client.Close()

	txn, err := client.BatchReadOnlyTransaction(ctx, spanner.StrongRead())
	if err != nil {
		return err
	}
	defer txn.Close()

	// Singer represents a row in the Singers table.
	type Singer struct {
		SingerID   int64
		FirstName  string
		LastName   string
		SingerInfo []byte
	}
	stmt := spanner.Statement{SQL: "SELECT SingerId, FirstName, LastName FROM Singers;"}
	// A Partition object is serializable and can be used from a different process.
	// DataBoost option is an optional parameter which can also be used for partition read
	// and query to execute the request via spanner independent compute resources.
	partitions, err := txn.PartitionQueryWithOptions(ctx, stmt, spanner.PartitionOptions{}, spanner.QueryOptions{DataBoostEnabled: true})
	if err != nil {
		return err
	}
	recordCount := 0
	for i, p := range partitions {
		iter := txn.Execute(ctx, p)
		defer iter.Stop()
		for {
			row, err := iter.Next()
			if err == iterator.Done {
				break
			} else if err != nil {
				return err
			}
			var s Singer
			if err := row.ToStruct(&s); err != nil {
				return err
			}
			fmt.Fprintf(w, "Partition (%d) %v\n", i, s)
			recordCount++
		}
	}
	fmt.Fprintf(w, "Total partition count: %v\n", len(partitions))
	fmt.Fprintf(w, "Total record count: %v\n", recordCount)
	return nil
}

Java

בדוגמה הזו, המערכת מאחזרת מחיצות של שאילתת SQL של הטבלה Singers ומבצעת את השאילתה על כל מחיצה באמצעות השלבים הבאים:

  • יצירה של לקוח Spanner batch ועסקה.
  • יצירת מחיצות לשאילתה, כדי שאפשר יהיה להפיץ את המחיצות לכמה עובדים.
  • שליפת תוצאות השאילתה לכל מחיצה.
int numThreads = Runtime.getRuntime().availableProcessors();
ExecutorService executor = Executors.newFixedThreadPool(numThreads);

// Statistics
int totalPartitions;
AtomicInteger totalRecords = new AtomicInteger(0);

try {
  BatchClient batchClient =
      spanner.getBatchClient(DatabaseId.of(options.getProjectId(), instanceId, databaseId));

  final BatchReadOnlyTransaction txn =
      batchClient.batchReadOnlyTransaction(TimestampBound.strong());

  // A Partition object is serializable and can be used from a different process.
  // DataBoost option is an optional parameter which can be used for partition read
  // and query to execute the request via spanner independent compute resources.

  List<Partition> partitions =
      txn.partitionQuery(
          PartitionOptions.getDefaultInstance(),
          Statement.of("SELECT SingerId, FirstName, LastName FROM Singers"),
          // Option to enable data boost for a given request
          Options.dataBoostEnabled(true));

  totalPartitions = partitions.size();

  for (final Partition p : partitions) {
    executor.execute(
        () -> {
          try (ResultSet results = txn.execute(p)) {
            while (results.next()) {
              long singerId = results.getLong(0);
              String firstName = results.getString(1);
              String lastName = results.getString(2);
              System.out.println("[" + singerId + "] " + firstName + " " + lastName);
              totalRecords.getAndIncrement();
            }
          }
        });
  }
} finally {
  executor.shutdown();
  executor.awaitTermination(1, TimeUnit.HOURS);
  spanner.close();
}

double avgRecordsPerPartition = 0.0;
if (totalPartitions != 0) {
  avgRecordsPerPartition = (double) totalRecords.get() / totalPartitions;
}
System.out.println("totalPartitions=" + totalPartitions);
System.out.println("totalRecords=" + totalRecords);
System.out.println("avgRecordsPerPartition=" + avgRecordsPerPartition);

Node.js

בדוגמה הזו, המערכת מאחזרת מחיצות של שאילתת SQL של הטבלה Singers ומבצעת את השאילתה על כל מחיצה באמצעות השלבים הבאים:

  • יצירת לקוח Spanner וחבילת בקשות.
  • יצירת מחיצות לשאילתה, כדי שאפשר יהיה להפיץ את המחיצות לכמה עובדים.
  • שליפת תוצאות השאילתה לכל מחיצה.
// Imports the Google Cloud client library
const {Spanner} = require('@google-cloud/spanner');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// const projectId = 'my-project-id';
// const instanceId = 'my-instance';
// const databaseId = 'my-database';

// Creates a client
const spanner = new Spanner({
  projectId: projectId,
});

// Gets a reference to a Cloud Spanner instance and database
const instance = spanner.instance(instanceId);
const database = instance.database(databaseId);
const [transaction] = await database.createBatchTransaction();

const query = {
  sql: 'SELECT * FROM Singers',
  // DataBoost option is an optional parameter which can also be used for partition read
  // and query to execute the request via spanner independent compute resources.
  dataBoostEnabled: true,
};

// A Partition object is serializable and can be used from a different process.
const [partitions] = await transaction.createQueryPartitions(query);
console.log(`Successfully created ${partitions.length} query partitions.`);

let row_count = 0;
const promises = [];
partitions.forEach(partition => {
  promises.push(
    transaction.execute(partition).then(results => {
      const rows = results[0].map(row => row.toJSON());
      row_count += rows.length;
    }),
  );
});
Promise.all(promises)
  .then(() => {
    console.log(
      `Successfully received ${row_count} from executed partitions.`,
    );
    transaction.close();
  })
  .then(() => {
    database.close();
  });

PHP

בדוגמה הזו, המערכת מאחזרת מחיצות של שאילתת SQL של הטבלה Singers ומבצעת את השאילתה על כל מחיצה באמצעות השלבים הבאים:

  • יצירת לקוח Spanner וחבילת בקשות.
  • יצירת מחיצות לשאילתה, כדי שאפשר יהיה להפיץ את המחיצות לכמה עובדים.
  • שליפת תוצאות השאילתה לכל מחיצה.
use Google\Cloud\Spanner\SpannerClient;

/**
 * Queries sample data from the database using SQL.
 * Example:
 * ```
 * batch_query_data($instanceId, $databaseId);
 * ```
 *
 * @param string $instanceId The Spanner instance ID.
 * @param string $databaseId The Spanner database ID.
 */
function batch_query_data(string $instanceId, string $databaseId): void
{
    $spanner = new SpannerClient();
    $batch = $spanner->batch($instanceId, $databaseId);
    $snapshot = $batch->snapshot();
    $queryString = 'SELECT SingerId, FirstName, LastName FROM Singers';
    $partitions = $snapshot->partitionQuery($queryString, [
        // This is an optional parameter which can be used for partition
        // read and query to execute the request via spanner independent
        // compute resources.
        'dataBoostEnabled' => true
    ]);
    $totalPartitions = count($partitions);
    $totalRecords = 0;
    foreach ($partitions as $partition) {
        $result = $snapshot->executePartition($partition);
        $rows = $result->rows();
        foreach ($rows as $row) {
            $singerId = $row['SingerId'];
            $firstName = $row['FirstName'];
            $lastName = $row['LastName'];
            printf('SingerId: %s, FirstName: %s, LastName: %s' . PHP_EOL, $singerId, $firstName, $lastName);
            $totalRecords++;
        }
    }
    printf('Total Partitions: %d' . PHP_EOL, $totalPartitions);
    printf('Total Records: %d' . PHP_EOL, $totalRecords);
    $averageRecordsPerPartition = $totalRecords / $totalPartitions;
    printf('Average Records Per Partition: %f' . PHP_EOL, $averageRecordsPerPartition);
}

Python

בדוגמה הזו, המערכת מאחזרת מחיצות של שאילתת SQL של הטבלה Singers ומבצעת את השאילתה על כל מחיצה באמצעות השלבים הבאים:

  • יצירת לקוח Spanner ועסקת אצווה.
  • יצירת מחיצות לשאילתה, כדי שאפשר יהיה להפיץ את המחיצות לכמה עובדים.
  • שליפת תוצאות השאילתה לכל מחיצה.

def run_batch_query(instance_id, database_id):
    """Runs an example batch query."""

    # Expected Table Format:
    # CREATE TABLE Singers (
    #   SingerId   INT64 NOT NULL,
    #   FirstName  STRING(1024),
    #   LastName   STRING(1024),
    #   SingerInfo BYTES(MAX),
    # ) PRIMARY KEY (SingerId);

    spanner_client = spanner.Client()
    instance = spanner_client.instance(instance_id)
    database = instance.database(database_id)

    # Create the batch transaction and generate partitions
    snapshot = database.batch_snapshot()
    partitions = snapshot.generate_read_batches(
        table="Singers",
        columns=("SingerId", "FirstName", "LastName"),
        keyset=spanner.KeySet(all_=True),
        # A Partition object is serializable and can be used from a different process.
        # DataBoost option is an optional parameter which can also be used for partition read
        # and query to execute the request via spanner independent compute resources.
        data_boost_enabled=True,
    )

    # Create a pool of workers for the tasks
    start = time.time()
    with concurrent.futures.ThreadPoolExecutor() as executor:
        futures = [executor.submit(process, snapshot, p) for p in partitions]

        for future in concurrent.futures.as_completed(futures, timeout=3600):
            finish, row_ct = future.result()
            elapsed = finish - start
            print("Completed {} rows in {} seconds".format(row_ct, elapsed))

    # Clean up
    snapshot.close()


def process(snapshot, partition):
    """Processes the requests of a query in an separate process."""
    print("Started processing partition.")
    row_ct = 0
    for row in snapshot.process_read_batch(partition):
        print("SingerId: {}, AlbumId: {}, AlbumTitle: {}".format(*row))
        row_ct += 1
    return time.time(), row_ct

Ruby

בדוגמה הזו, המערכת מאחזרת מחיצות של שאילתת SQL של הטבלה Singers ומבצעת את השאילתה על כל מחיצה באמצעות השלבים הבאים:

  • יצירת לקוח של Spanner batch.
  • יצירת מחיצות לשאילתה, כדי שאפשר יהיה להפיץ את המחיצות לכמה עובדים.
  • שליפת תוצאות השאילתה לכל מחיצה.
# project_id  = "Your Google Cloud project ID"
# instance_id = "Your Spanner instance ID"
# database_id = "Your Spanner database ID"

require "google/cloud/spanner"

# Prepare a thread pool with number of processors
processor_count  = Concurrent.processor_count
thread_pool      = Concurrent::FixedThreadPool.new processor_count

# Prepare AtomicFixnum to count total records using multiple threads
total_records = Concurrent::AtomicFixnum.new

# Create a new Spanner batch client
spanner        = Google::Cloud::Spanner.new project: project_id
batch_client   = spanner.batch_client instance_id, database_id

# Get a strong timestamp bound batch_snapshot
batch_snapshot = batch_client.batch_snapshot strong: true

# Get partitions for specified query
# data_boost_enabled option is an optional parameter which can be used for partition read
# and query to execute the request via spanner independent compute resources.
partitions       = batch_snapshot.partition_query "SELECT SingerId, FirstName, LastName FROM Singers", data_boost_enabled: true
total_partitions = partitions.size

# Enqueue a new thread pool job
partitions.each_with_index do |partition, _partition_index|
  thread_pool.post do
    # Increment total_records per new row
    batch_snapshot.execute_partition(partition).rows.each do |_row|
      total_records.increment
    end
  end
end

# Wait for queued jobs to complete
thread_pool.shutdown
thread_pool.wait_for_termination

# Close the client connection and release resources.
batch_snapshot.close

# Collect statistics for batch query
average_records_per_partition = 0.0
if total_partitions != 0
  average_records_per_partition = total_records.value / total_partitions.to_f
end

puts "Total Partitions: #{total_partitions}"
puts "Total Records: #{total_records.value}"
puts "Average records per Partition: #{average_records_per_partition}"