Halaman ini menjelaskan aliran perubahan di Spanner untuk database dialek GoogleSQL dan database dialek PostgreSQL, termasuk:
- Model pembuatan partisi berbasis pemisahan
- Format dan konten rekaman aliran perubahan
- Sintaksis tingkat rendah yang digunakan untuk membuat kueri catatan tersebut
- Contoh alur kerja kueri
Anda menggunakan Spanner API untuk membuat kueri aliran data perubahan secara langsung. Aplikasi yang menggunakan Dataflow untuk membaca data stream perubahan tidak perlu berinteraksi langsung dengan model data yang dijelaskan di sini.
Untuk panduan pengantar yang lebih luas tentang aliran perubahan, lihat Ringkasan aliran perubahan.
Mengubah partisi aliran data perubahan
Saat perubahan terjadi pada tabel yang dipantau oleh aliran perubahan, Spanner akan menulis catatan aliran perubahan yang sesuai dalam database, secara sinkron dalam transaksi yang sama dengan perubahan data. Artinya, jika transaksi berhasil, Spanner juga telah berhasil merekam dan mempertahankan perubahan. Secara internal, Spanner menempatkan catatan aliran perubahan dan perubahan data secara bersamaan sehingga diproses oleh server yang sama untuk meminimalkan overhead penulisan.
Sebagai bagian dari DML ke pemisahan tertentu, Spanner menambahkan penulisan ke pemisahan data aliran perubahan yang sesuai dalam transaksi yang sama. Karena kolokasi ini, aliran perubahan tidak menambahkan koordinasi ekstra di seluruh resource penayangan, yang meminimalkan overhead penerapan transaksi.
Spanner melakukan penskalaan dengan memisahkan dan menggabungkan data secara dinamis berdasarkan beban dan ukuran database, serta mendistribusikan bagian di seluruh resource penayangan.
Untuk memungkinkan penulisan dan pembacaan aliran perubahan diskalakan, Spanner membagi dan menggabungkan penyimpanan aliran perubahan internal bersama dengan data database, sehingga secara otomatis menghindari hotspot. Untuk mendukung pembacaan rekaman aliran data perubahan mendekati real-time saat penulisan database diskalakan, Spanner API dirancang agar aliran data perubahan dapat dikueri secara bersamaan menggunakan partisi aliran data perubahan. Partisi aliran perubahan dipetakan ke pemisahan data aliran perubahan yang berisi kumpulan data aliran perubahan. Partisi aliran perubahan berubah secara dinamis dari waktu ke waktu dan berkorelasi dengan cara Spanner memisahkan dan menggabungkan data database secara dinamis.
Partisi aliran perubahan berisi data untuk rentang kunci yang tidak dapat diubah selama rentang waktu tertentu. Setiap partisi aliran perubahan dapat dibagi menjadi satu atau beberapa partisi aliran perubahan, atau digabungkan dengan partisi aliran perubahan lainnya. Saat peristiwa pemisahan atau penggabungan ini terjadi, partisi turunan dibuat untuk merekam perubahan untuk rentang kunci yang tidak dapat diubah masing-masing untuk rentang waktu berikutnya. Selain kumpulan data perubahan data, kueri aliran perubahan menampilkan kumpulan data partisi turunan untuk memberi tahu pembaca tentang partisi aliran perubahan baru yang perlu dikueri, serta kumpulan data detak jantung untuk menunjukkan progres ke depan jika tidak ada penulisan yang terjadi baru-baru ini.
Saat mengkueri partisi aliran perubahan tertentu, kumpulan data perubahan akan ditampilkan dalam urutan stempel waktu commit. Setiap catatan perubahan ditampilkan tepat satu kali. Di seluruh partisi aliran perubahan, pengurutan kumpulan data perubahan tidak dijamin. Catatan perubahan untuk kunci utama tertentu hanya ditampilkan di satu partisi untuk rentang waktu tertentu.
Karena silsilah partisi induk-turunan, untuk memproses perubahan untuk kunci tertentu dalam urutan stempel waktu penerapan, data yang ditampilkan dari partisi turunan hanya boleh diproses setelah data dari semua partisi induk diproses.
Mengubah fungsi baca streaming dan sintaksis kueri
GoogleSQL
Untuk membuat kueri aliran perubahan, gunakan
ExecuteStreamingSql
API. Spanner secara otomatis membuat fungsi baca khusus
bersama dengan aliran perubahannya. Fungsi baca memberikan akses ke
rekaman aliran perubahannya. Konvensi penamaan fungsi baca adalah
READ_change_stream_name.
Dengan asumsi bahwa aliran perubahan SingersNameStream ada di database, sintaksis kueri untuk GoogleSQL adalah sebagai berikut:
SELECT ChangeRecord
FROM READ_SingersNameStream (
start_timestamp,
end_timestamp,
partition_token,
heartbeat_milliseconds,
read_options
)
Fungsi baca menerima argumen berikut:
| Nama argumen | Jenis | Wajib? | Deskripsi |
|---|---|---|---|
start_timestamp |
TIMESTAMP |
Wajib | Menentukan bahwa data dengan commit_timestamp yang lebih besar
dari atau sama dengan start_timestamp harus ditampilkan. Nilai
harus berada dalam periode retensi aliran perubahan, dan harus
kurang dari atau sama dengan waktu saat ini, dan lebih besar dari atau sama dengan
stempel waktu pembuatan aliran perubahan. |
end_timestamp |
TIMESTAMP |
Opsional (Default: NULL) |
Menentukan bahwa data dengan commit_timestamp kurang
dari atau sama dengan end_timestamp harus ditampilkan. Nilai
harus berada dalam periode retensi aliran perubahan
dan lebih besar atau sama dengan start_timestamp. Kueri
berakhir setelah menampilkan semua ChangeRecords
hingga end_timestamp atau saat Anda menghentikan
koneksi. Jika end_timestamp disetel ke NULL
atau tidak ditentukan, kueri akan terus dieksekusi hingga semua
ChangeRecords ditampilkan atau hingga Anda menghentikan
koneksi. |
partition_token |
STRING |
Opsional (Default: NULL) |
Menentukan partisi aliran perubahan yang akan dikueri, berdasarkan
konten kumpulan data partisi turunan. Jika NULL atau tidak ditentukan, berarti
pembaca sedang membuat kueri aliran perubahan untuk pertama kalinya, dan
belum mendapatkan token partisi tertentu untuk membuat kueri. |
heartbeat_milliseconds |
INT64 |
Wajib | Menentukan seberapa sering ChangeRecord detak jantung ditampilkan jika tidak ada transaksi yang di-commit di partisi ini.
Nilai harus antara 1,000 (satu detik) dan
300,000 (lima menit). |
read_options |
ARRAY |
Opsional (Default: NULL) |
Menambahkan opsi baca yang dicadangkan untuk penggunaan pada masa mendatang. Satu-satunya
nilai yang diizinkan adalah NULL. |
Sebaiknya buat metode bantuan untuk membuat teks kueri fungsi baca dan mengikat parameter ke dalamnya, seperti yang ditunjukkan dalam contoh berikut.
Java
private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE = "SELECT ChangeRecord FROM READ_SingersNameStream" + "(" + " start_timestamp => @startTimestamp," + " end_timestamp => @endTimestamp," + " partition_token => @partitionToken," + " heartbeat_milliseconds => @heartbeatMillis" + ")"; // Helper method to conveniently create change stream query texts and // bind parameters. public static Statement getChangeStreamQuery( String partitionToken, Timestamp startTimestamp, Timestamp endTimestamp, long heartbeatMillis) { return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE) .bind("startTimestamp") .to(startTimestamp) .bind("endTimestamp") .to(endTimestamp) .bind("partitionToken") .to(partitionToken) .bind("heartbeatMillis") .to(heartbeatMillis) .build(); }
PostgreSQL
Untuk membuat kueri aliran perubahan, gunakan
ExecuteStreamingSql
API. Spanner secara otomatis membuat fungsi baca khusus
bersama dengan aliran perubahannya. Fungsi baca memberikan akses ke
rekaman aliran perubahannya. Konvensi penamaan fungsi baca adalah
spanner.read_json_change_stream_name.
Dengan asumsi bahwa aliran perubahan SingersNameStream ada di database, sintaksis kueri untuk PostgreSQL adalah sebagai berikut:
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
start_timestamp,
end_timestamp,
partition_token,
heartbeat_milliseconds,
null
)
Fungsi baca menerima argumen berikut:
| Nama argumen | Jenis | Wajib? | Deskripsi |
|---|---|---|---|
start_timestamp |
timestamp with time zone |
Wajib | Menentukan bahwa catatan perubahan dengan commit_timestamp
lebih besar dari atau sama dengan start_timestamp
harus ditampilkan. Nilai harus berada dalam periode retensi aliran perubahan, dan harus kurang dari atau sama dengan waktu saat ini, dan lebih besar dari atau sama dengan stempel waktu pembuatan aliran perubahan. |
end_timestamp |
timestamp with timezone |
Opsional (Default: NULL) |
Menentukan bahwa catatan perubahan dengan commit_timestamp
kurang dari atau sama dengan end_timestamp harus
ditampilkan. Nilai harus berada dalam periode retensi aliran perubahan
dan lebih besar atau sama dengan start_timestamp.
Kueri selesai setelah menampilkan semua catatan perubahan hingga
end_timestamp atau hingga Anda menghentikan koneksi.
Jika NULL, kueri akan terus dieksekusi hingga semua catatan
perubahan ditampilkan atau hingga Anda menghentikan koneksi. |
partition_token |
text |
Opsional (Default: NULL) |
Menentukan partisi aliran perubahan yang akan dikueri, berdasarkan
konten kumpulan data partisi turunan. Jika NULL atau tidak ditentukan, berarti
pembaca sedang membuat kueri aliran perubahan untuk pertama kalinya, dan
belum mendapatkan token partisi tertentu untuk membuat kueri. |
heartbeat_milliseconds |
bigint |
Wajib | Menentukan seberapa sering detak jantung ChangeRecord
ditampilkan jika tidak ada transaksi yang di-commit di
partisi ini.
Nilai harus antara 1,000 (satu detik) dan
300,000 (lima menit). |
null |
null |
Wajib | Disimpan untuk penggunaan di masa mendatang |
Sebaiknya buat metode bantuan untuk membuat teks fungsi baca dan mengikat parameter ke dalamnya, seperti yang ditunjukkan dalam contoh berikut.
Java
private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE = "SELECT * FROM \"spanner\".\"read_json_SingersNameStream\"" + "($1, $2, $3, $4, null)"; // Helper method to conveniently create change stream query texts and // bind parameters. public static Statement getChangeStreamQuery( String partitionToken, Timestamp startTimestamp, Timestamp endTimestamp, long heartbeatMillis) { return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE) .bind("p1") .to(startTimestamp) .bind("p2") .to(endTimestamp) .bind("p3") .to(partitionToken) .bind("p4") .to(heartbeatMillis) .build(); }
Format kumpulan data aliran perubahan
GoogleSQL
Fungsi baca aliran perubahan menampilkan satu kolom ChangeRecord dengan
jenis ARRAY<STRUCT<...>>. Di setiap baris, array ini selalu berisi satu
elemen.
Elemen array memiliki jenis berikut:
STRUCT < data_change_record ARRAY<STRUCT<...>>, heartbeat_record ARRAY<STRUCT<...>>, child_partitions_record ARRAY<STRUCT<...>> >
Ada tiga kolom di STRUCT ini: data_change_record, heartbeat_record, dan child_partitions_record, yang masing-masing berjenis
ARRAY<STRUCT<...>>. Di setiap baris yang ditampilkan oleh fungsi baca stream perubahan, hanya satu dari tiga kolom ini yang berisi nilai; dua kolom lainnya kosong atau NULL. Kolom array ini berisi paling banyak satu elemen.
Bagian berikut membahas masing-masing dari ketiga jenis catatan ini.
PostgreSQL
Fungsi baca aliran perubahan menampilkan satu kolom ChangeRecord
berjenis JSON dengan struktur berikut:
{
"data_change_record" : {},
"heartbeat_record" : {},
"child_partitions_record" : {}
}
Ada tiga kemungkinan kunci dalam objek ini: data_change_record,
heartbeat_record, dan child_partitions_record, dengan jenis nilai
yang sesuai adalah JSON. Di setiap baris yang ditampilkan oleh fungsi baca aliran perubahan,
hanya ada satu dari tiga kunci ini.
Bagian berikut membahas masing-masing dari ketiga jenis catatan ini.
Catatan perubahan data
Catatan perubahan data berisi serangkaian perubahan pada tabel dengan jenis modifikasi yang sama (penyisipan, pembaruan, atau penghapusan) yang dilakukan pada stempel waktu penerapan yang sama dalam satu partisi aliran perubahan untuk transaksi yang sama. Beberapa rekaman perubahan data dapat ditampilkan untuk transaksi yang sama di beberapa partisi aliran perubahan.
Semua catatan perubahan data memiliki kolom commit_timestamp, server_transaction_id,
dan record_sequence, yang bersama-sama menentukan urutan dalam aliran
perubahan untuk catatan aliran. Ketiga kolom ini sudah cukup untuk mendapatkan urutan perubahan dan memberikan konsistensi eksternal.
Perhatikan bahwa beberapa transaksi dapat memiliki stempel waktu commit yang sama jika
transaksi tersebut menyentuh data yang tidak tumpang-tindih. Kolom server_transaction_id
menawarkan kemampuan untuk membedakan kumpulan perubahan mana (yang berpotensi
di seluruh partisi aliran perubahan) yang dikeluarkan dalam
transaksi yang sama. Dengan memasangkannya dengan kolom record_sequence dan number_of_records_in_transaction, Anda dapat mem-buffer dan mengurutkan semua catatan dari transaksi tertentu.
Kolom rekaman perubahan data mencakup hal berikut:
GoogleSQL
| Kolom | Jenis | Deskripsi |
|---|---|---|
commit_timestamp |
TIMESTAMP |
Menunjukkan stempel waktu saat perubahan dilakukan. |
record_sequence |
STRING |
Menunjukkan nomor urut untuk data dalam transaksi.
Nomor urut unik dan meningkat secara monoton (tetapi tidak
harus berurutan) dalam transaksi. Urutkan rekaman untuk server_transaction_id yang sama menurut record_sequence untuk merekonstruksi urutan perubahan dalam transaksi.
Spanner dapat mengoptimalkan pengurutan ini untuk performa yang lebih baik
dan mungkin tidak selalu cocok dengan pengurutan asli yang Anda berikan. |
server_transaction_id |
STRING |
Memberikan string unik secara global yang merepresentasikan transaksi saat perubahan dilakukan. Nilai ini hanya boleh digunakan dalam konteks pemrosesan rekaman aliran perubahan dan tidak berkorelasi dengan ID transaksi di API Spanner. |
is_last_record_in_transaction_in_partition |
BOOL |
Menunjukkan apakah ini adalah catatan terakhir untuk transaksi dalam partisi saat ini. |
table_name |
STRING |
Nama tabel yang terpengaruh oleh perubahan. |
value_capture_type |
STRING |
Menjelaskan jenis pengambilan nilai yang ditentukan dalam konfigurasi aliran perubahan saat perubahan ini diambil. Jenis pengambilan nilai dapat berupa salah satu dari berikut:
Secara default, nilainya adalah |
column_types |
[
{
"name": "STRING",
"type": {
"code": "STRING"
},
"is_primary_key": BOOLEAN
"ordinal_position": NUMBER
},
...
] |
Menunjukkan nama kolom, jenis kolom,
apakah kolom tersebut merupakan kunci utama, dan posisi kolom seperti
yang ditentukan dalam skema (ordinal_position). Kolom pertama dalam
tabel dalam skema akan memiliki posisi ordinal 1. Jenis kolom dapat bertingkat untuk kolom array. Formatnya cocok dengan struktur
jenis yang dijelaskan dalam referensi Spanner API.
|
mods |
[
{
"keys": {"STRING" : "STRING"},
"new_values": {
"STRING" : "VALUE-TYPE",
[...]
},
"old_values": {
"STRING" : "VALUE-TYPE",
[...]
},
},
[...]
]
|
Menjelaskan perubahan yang dilakukan, termasuk nilai kunci utama, nilai lama, dan nilai baru dari kolom yang diubah atau dilacak.
Ketersediaan dan konten nilai lama dan baru bergantung pada value_capture_type yang dikonfigurasi. Kolom new_values dan
old_values hanya berisi kolom non-kunci. |
mod_type |
STRING |
Menjelaskan jenis perubahan. Salah satu dari INSERT,
UPDATE, atau DELETE. |
number_of_records_in_transaction |
INT64 |
Menunjukkan jumlah rekaman perubahan data yang merupakan bagian dari transaksi ini di semua partisi aliran perubahan. |
number_of_partitions_in_transaction |
INT64 |
Menunjukkan jumlah partisi yang menampilkan rekaman perubahan data untuk transaksi ini. |
transaction_tag |
STRING |
Menunjukkan Tag transaksi yang terkait dengan transaksi ini. |
is_system_transaction |
BOOL |
Menunjukkan apakah transaksi adalah transaksi sistem. |
PostgreSQL
| Kolom | Jenis | Deskripsi |
|---|---|---|
commit_timestamp |
STRING |
Menunjukkan stempel waktu saat perubahan dilakukan. |
record_sequence |
STRING |
Menunjukkan nomor urut untuk data dalam transaksi.
Nomor urut unik dan meningkat secara monoton (tetapi tidak
harus berurutan) dalam transaksi. Urutkan rekaman untuk server_transaction_id yang sama menurut record_sequence untuk merekonstruksi urutan perubahan dalam transaksi. |
server_transaction_id |
STRING |
Memberikan string unik secara global yang merepresentasikan transaksi saat perubahan dilakukan. Nilai hanya boleh digunakan dalam konteks pemrosesan rekaman aliran perubahan dan tidak berkorelasi dengan ID transaksi di API Spanner |
is_last_record_in_transaction_in_partition |
BOOLEAN |
Menunjukkan apakah ini adalah catatan terakhir untuk transaksi dalam partisi saat ini. |
table_name |
STRING |
Menunjukkan nama tabel yang terpengaruh oleh perubahan. |
value_capture_type |
STRING |
Menjelaskan jenis pengambilan nilai yang ditentukan dalam konfigurasi aliran perubahan saat perubahan ini diambil. Jenis pengambilan nilai dapat berupa salah satu dari berikut:
Secara default, nilainya adalah |
column_types |
[
{
"name": "STRING",
"type": {
"code": "STRING"
},
"is_primary_key": BOOLEAN
"ordinal_position": NUMBER
},
...
] |
Menunjukkan nama kolom, jenis kolom,
apakah itu kunci utama, dan posisi kolom seperti
yang ditentukan dalam skema (ordinal_position). Kolom pertama dari
tabel dalam skema akan memiliki posisi ordinal 1. Jenis kolom dapat bertingkat untuk kolom array. Formatnya cocok dengan struktur
jenis yang dijelaskan dalam
referensi Spanner API.
|
mods |
[
{
"keys": {"STRING" : "STRING"},
"new_values": {
"STRING" : "VALUE-TYPE",
[...]
},
"old_values": {
"STRING" : "VALUE-TYPE",
[...]
},
},
[...]
]
|
Menjelaskan perubahan yang dilakukan, termasuk nilai kunci utama, nilai lama, dan nilai baru dari kolom yang diubah atau dilacak. Ketersediaan dan konten nilai lama dan baru bergantung
pada value_capture_type yang dikonfigurasi. Kolom
new_values dan old_values hanya berisi
kolom non-kunci.
|
mod_type |
STRING |
Menjelaskan jenis perubahan. Salah satu dari INSERT,
UPDATE, atau DELETE. |
number_of_records_in_transaction |
INT64 |
Menunjukkan jumlah rekaman perubahan data yang merupakan bagian dari transaksi ini di semua partisi aliran perubahan. |
number_of_partitions_in_transaction |
NUMBER |
Menunjukkan jumlah partisi yang menampilkan rekaman perubahan data untuk transaksi ini. |
transaction_tag |
STRING |
Menunjukkan Tag transaksi yang terkait dengan transaksi ini. |
is_system_transaction |
BOOLEAN |
Menunjukkan apakah transaksi adalah transaksi sistem. |
Contoh catatan perubahan data
Berikut adalah sepasang contoh rekaman perubahan data. Mereka menjelaskan satu transaksi yang berisi transfer antara dua akun. Kedua akun berada di partisi aliran perubahan yang terpisah.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {
"LastUpdate": "2022-09-26T11:28:00.189413Z",
"Balance": 1500
},
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "OLD_AND_NEW_VALUES",
"number_of_records_in_transaction": 2,
"number_of_partitions_in_transaction": 2,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false,
}
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
"record_sequence": "00000001",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id2"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 2000
},
"old_values": {
"LastUpdate": "2022-01-20T11:25:00.199915Z",
"Balance": 1500
},
},
...
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "OLD_AND_NEW_VALUES",
"number_of_records_in_transaction": 2,
"number_of_partitions_in_transaction": 2,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false,
}
Berikut adalah contoh rekaman perubahan data dengan jenis pengambilan nilai NEW_VALUES. Perhatikan bahwa hanya nilai baru yang diisi.
Hanya kolom LastUpdate yang diubah, sehingga hanya kolom tersebut yang ditampilkan.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z"
},
"old_values": {}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_VALUES",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
Berikut adalah contoh rekaman perubahan data dengan jenis pengambilan nilai NEW_ROW. Hanya kolom LastUpdate
yang diubah, tetapi semua kolom yang dilacak ditampilkan.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_ROW",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
Berikut adalah contoh rekaman perubahan data dengan jenis pengambilan nilai NEW_ROW_AND_OLD_VALUES. Hanya kolom LastUpdate yang
diubah, tetapi semua kolom yang dilacak ditampilkan. Jenis pengambilan nilai ini mengambil
nilai baru dan nilai lama LastUpdate.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {
"LastUpdate": "2022-09-26T11:28:00.189413Z"
}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_ROW_AND_OLD_VALUES",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
Catatan detak jantung
Jika rekaman detak jantung ditampilkan, hal ini menunjukkan bahwa semua perubahan dengan
commit_timestamp kurang dari atau sama dengan timestamp rekaman detak jantung
telah ditampilkan, dan rekaman data mendatang dalam partisi ini harus memiliki stempel waktu penerapan yang lebih tinggi daripada yang ditampilkan oleh rekaman detak jantung. Rekaman detak jantung ditampilkan jika tidak ada perubahan data yang ditulis ke partisi. Jika ada perubahan data yang ditulis ke
partisi, data_change_record.commit_timestamp dapat digunakan sebagai pengganti
heartbeat_record.timestamp untuk menunjukkan bahwa pembaca membuat progres
dalam membaca partisi.
Anda dapat menggunakan rekaman detak jantung yang ditampilkan pada partisi untuk menyinkronkan
pembaca di semua partisi. Setelah semua pembaca menerima detak jantung yang lebih besar dari atau sama dengan stempel waktu A atau telah menerima data atau catatan partisi turunan yang lebih besar dari atau sama dengan stempel waktu A, pembaca akan mengetahui bahwa mereka telah menerima semua catatan yang di-commit pada atau sebelum stempel waktu A dan dapat mulai memproses catatan yang di-buffer—misalnya, mengurutkan catatan lintas partisi berdasarkan stempel waktu dan mengelompokkannya berdasarkan server_transaction_id.
Rekaman detak jantung hanya berisi satu kolom:
GoogleSQL
| Kolom | Jenis | Deskripsi |
|---|---|---|
timestamp |
TIMESTAMP |
Menunjukkan stempel waktu rekaman detak jantung. |
PostgreSQL
| Kolom | Jenis | Deskripsi |
|---|---|---|
timestamp |
STRING |
Menunjukkan stempel waktu rekaman detak jantung. |
Contoh rekaman detak jantung
Contoh rekaman detak jantung, yang mengomunikasikan bahwa semua rekaman dengan stempel waktu yang kurang dari atau sama dengan stempel waktu rekaman ini telah ditampilkan:
heartbeat_record: {
"timestamp": "2022-09-27T12:35:00.312486Z"
}
Catatan partisi turunan
Rekaman partisi turunan menampilkan informasi tentang partisi turunan: token
partisinya, token partisi induknya, dan
start_timestamp yang merepresentasikan stempel waktu paling awal yang berisi rekaman perubahan untuk partisi turunan. Data yang stempel waktu commit-nya
tepat sebelum child_partitions_record.start_timestamp akan
ditampilkan di partisi saat ini. Setelah menampilkan semua
kumpulan data partisi turunan untuk partisi ini, kueri ini akan ditampilkan dengan
status berhasil, yang menunjukkan bahwa semua kumpulan data telah ditampilkan untuk
partisi ini.
Kolom rekaman partisi turunan mencakup hal berikut:
GoogleSQL
| Kolom | Jenis | Deskripsi |
|---|---|---|
start_timestamp |
TIMESTAMP |
Menunjukkan bahwa catatan perubahan data yang ditampilkan dari partisi turunan dalam catatan partisi turunan ini memiliki stempel waktu penerapan yang lebih besar dari atau sama dengan start_timestamp. Saat membuat kueri partisi turunan, kueri harus menentukan token partisi turunan dan start_timestamp yang lebih besar dari atau sama dengan child_partitions_token.start_timestamp. Semua catatan partisi turunan yang ditampilkan oleh partisi memiliki start_timestamp yang sama dan stempel waktu selalu berada di antara start_timestamp dan end_timestamp yang ditentukan kueri. |
record_sequence |
STRING |
Menunjukkan nomor urut yang meningkat secara monoton yang dapat digunakan untuk
menentukan pengurutan rekaman partisi turunan jika ada beberapa
rekaman partisi turunan yang ditampilkan dengan start_timestamp yang sama
dalam partisi tertentu. Token partisi, start_timestamp
dan record_sequence secara unik mengidentifikasi data partisi turunan.
|
child_partitions |
[
{
"token" : "STRING",
"parent_partition_tokens" : ["STRING"]
}
] |
Menampilkan sekumpulan partisi turunan dan informasi terkaitnya. Hal ini mencakup string token partisi yang digunakan untuk mengidentifikasi partisi turunan dalam kueri, serta token partisi induknya. |
PostgreSQL
| Kolom | Jenis | Deskripsi |
|---|---|---|
start_timestamp |
STRING |
Menunjukkan bahwa catatan perubahan data yang ditampilkan dari partisi turunan dalam catatan partisi turunan ini memiliki stempel waktu penerapan yang lebih besar dari atau sama dengan start_timestamp. Saat membuat kueri partisi turunan, kueri harus menentukan token partisi turunan dan start_timestamp yang lebih besar dari atau sama dengan child_partitions_token.start_timestamp. Semua catatan partisi turunan yang ditampilkan oleh partisi memiliki start_timestamp yang sama dan stempel waktu selalu berada di antara start_timestamp dan end_timestamp yang ditentukan kueri.
|
record_sequence |
STRING |
Menunjukkan nomor urut yang meningkat secara monoton yang dapat digunakan untuk
menentukan pengurutan rekaman partisi turunan jika ada beberapa
rekaman partisi turunan yang ditampilkan dengan start_timestamp yang sama
dalam partisi tertentu. Token partisi, start_timestamp
dan record_sequence secara unik mengidentifikasi data partisi turunan.
|
child_partitions |
[
{
"token": "STRING",
"parent_partition_tokens": ["STRING"],
}, [...]
] |
Menampilkan array partisi turunan dan informasi terkaitnya. Hal ini mencakup string token partisi yang digunakan untuk mengidentifikasi partisi turunan dalam kueri, serta token partisi induknya. |
Contoh catatan partisi turunan
Berikut adalah contoh rekaman partisi turunan:
child_partitions_record: {
"start_timestamp": "2022-09-27T12:40:00.562986Z",
"record_sequence": "00000001",
"child_partitions": [
{
"token": "child_token_1",
// To make sure changes for a key is processed in timestamp
// order, wait until the records returned from all parents
// have been processed.
"parent_partition_tokens": ["parent_token_1", "parent_token_2"]
}
],
}
Alur kerja kueri aliran data perubahan
Jalankan kueri aliran perubahan menggunakan API
ExecuteStreamingSql, dengan
transaksi hanya baca
sekali pakai dan
batas stempel waktu yang kuat. Fungsi pembacaan
stream perubahan memungkinkan Anda menentukan start_timestamp dan
end_timestamp untuk rentang waktu yang diinginkan. Semua catatan perubahan dalam periode retensi dapat diakses menggunakan batas stempel waktu hanya baca yang kuat.
Semua
TransactionOptions
lainnya tidak valid untuk kueri aliran perubahan. Selain itu, jika TransactionOptions.read_only.return_read_timestamp disetel ke true, nilai khusus kint64max - 1 akan ditampilkan dalam pesan Transaction yang menjelaskan transaksi, bukan stempel waktu baca yang valid. Nilai khusus ini harus dibuang dan tidak digunakan untuk kueri berikutnya.
Setiap kueri aliran perubahan dapat menampilkan sejumlah baris, yang masing-masing berisi catatan perubahan data, catatan detak jantung, atau catatan partisi turunan. Tidak perlu menetapkan batas waktu untuk permintaan.
Contoh alur kerja kueri aliran perubahan
Alur kerja kueri streaming dimulai dengan mengeluarkan kueri stream perubahan pertama
dengan menentukan partition_token ke NULL. Kueri perlu menentukan
fungsi baca untuk aliran perubahan, stempel waktu awal dan akhir yang diinginkan,
dan interval detak jantung. Jika end_timestamp adalah NULL, kueri akan terus
menampilkan perubahan data hingga partisi berakhir.
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:00Z",
end_timestamp => NULL,
partition_token => NULL,
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:00Z',
NULL,
NULL,
10000,
NULL
) ;
Proses data merekam dari kueri ini hingga semua rekaman partisi turunan ditampilkan. Dalam contoh berikut, dua rekaman partisi turunan dan tiga token partisi ditampilkan, lalu kueri dihentikan. Partisi turunan
mencatat dari kueri tertentu selalu memiliki start_timestamp yang sama.
child_partitions_record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:00:01Z",
"record_sequence": "1000012389",
"child_partitions": [
{
"token": "child_token_1",
// Note parent tokens are null for child partitions returned
// from the initial change stream queries.
"parent_partition_tokens": [NULL]
}
{
"token": "child_token_2",
"parent_partition_tokens": [NULL]
}
],
}
child_partitions_record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:00:01Z",
"record_sequence": "1000012390",
"child_partitions": [
{
"token": "child_token_3",
"parent_partition_tokens": [NULL]
}
],
}
Untuk memproses perubahan setelah 2022-05-01T09:00:01Z, buat tiga kueri baru dan
jalankan secara paralel. Jika digunakan bersama, ketiga kueri tersebut akan menampilkan perubahan data untuk rentang kunci yang sama dengan yang dicakup oleh induknya. Selalu tetapkan start_timestamp ke
start_timestamp dalam rekaman partisi turunan yang sama dan gunakan
end_timestamp dan interval detak jantung yang sama untuk memproses rekaman secara konsisten
di semua kueri.
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_1",
heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_2",
heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_3",
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_1',
10000,
NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_2',
10000,
NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_3',
10000,
NULL
);
Kueri di child_token_2 selesai setelah menampilkan rekaman partisi turunan lainnya. Catatan ini menunjukkan bahwa partisi baru mencakup perubahan untuk child_token_2 dan child_token_3 mulai dari 2022-05-01T09:30:15Z. Data yang
persis sama ditampilkan oleh kueri di child_token_3, karena keduanya merupakan
partisi induk dari child_token_4 baru. Untuk memastikan pemrosesan
data yang diurutkan secara ketat untuk kunci tertentu, kueri di child_token_4
harus dimulai setelah semua induk selesai. Dalam hal ini, induknya adalah
child_token_2 dan child_token_3. Buat hanya satu kueri untuk setiap token partisi turunan. Desain alur kerja kueri harus menetapkan satu induk untuk menunggu dan
menjadwalkan kueri di child_token_4.
child_partitions_record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:30:15Z",
"record_sequence": "1000012389",
"child_partitions": [
{
"token": "child_token_4",
"parent_partition_tokens": ["child_token_2", "child_token_3"],
}
],
}
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream(
start_timestamp => "2022-05-01T09:30:15Z",
end_timestamp => NULL,
partition_token => "child_token_4",
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:30:15Z',
NULL,
'child_token_4',
10000,
NULL
);
Temukan contoh penanganan dan parsing rekaman aliran perubahan di konektor Apache Beam SpannerIO Dataflow di GitHub.