本頁面說明 GoogleSQL 方言資料庫和 PostgreSQL 方言資料庫的 Spanner 變更串流,包括:
- 以分割為基礎的分區模型
- 變更串流記錄的格式和內容
- 用於查詢這些記錄的低階語法
- 查詢工作流程範例
您可以使用 Spanner API 直接查詢變更串流。 如果應用程式使用 Dataflow 讀取變更串流資料,就不需要直接處理本文所述的資料模型。
如需變更串流的入門指南,請參閱變更串流總覽。
變更串流分區
當變更串流監看的資料表發生變更時,Spanner 會在資料庫中寫入對應的變更串流記錄,並在與資料變更相同的交易中同步進行。也就是說,如果交易成功,Spanner 也已成功擷取並保存變更。在內部,Spanner 會將變更串流記錄和資料異動共置,以便由同一部伺服器處理,盡量減少寫入負擔。
在特定分割的 DML 中,Spanner 會在同一項交易中,將寫入內容附加至對應的變更串流資料分割。由於這種共置方式,變更串流不會在服務資源之間增加額外的協調作業,因此可將交易提交的負擔降到最低。
Spanner 會根據資料庫負載和大小,動態分割及合併資料,並將分割的資料分配到各個服務資源,藉此進行擴充。
為確保變更串流的寫入和讀取作業能順利擴充,Spanner 會分割及合併內部變更串流儲存空間和資料庫資料,自動避開熱點。為支援近乎即時讀取變更串流記錄,以及因應資料庫寫入作業的規模變化,Spanner API 的設計允許使用變更串流分割區,同時查詢變更串流。變更串流分區對應,即可變更包含變更串流記錄的變更串流資料分割。變更串流的分割區會隨時間動態變化,並與 Spanner 動態分割及合併資料庫資料的方式相關。
變更串流分割區包含特定時間範圍內不可變更的鍵範圍記錄。任何變更串流分割區都可以分割成一或多個變更串流分割區,或與其他變更串流分割區合併。發生這些分割或合併事件時,系統會建立子分割區,擷取下一個時間範圍內各自不可變動的鍵範圍變化。除了資料變更記錄,變更串流查詢也會傳回子分割區記錄,通知讀取器需要查詢的新變更串流分割區,以及心跳記錄,指出近期未發生寫入作業時的前進進度。
查詢特定變更串流分區時,系統會依提交時間戳記順序傳回變更記錄。系統會針對每筆變更記錄傳回一次結果。變更記錄的順序不保證在變更串流分區之間一致。特定主鍵的變更記錄只會針對特定時間範圍,在一個分割區中傳回。
由於父項/子項分區沿襲,如要依提交時間戳記順序處理特定鍵的變更,必須先處理完所有父項分區的記錄,才能處理子項分區傳回的記錄。
變更串流讀取函式和查詢語法
GoogleSQL
如要查詢變更串流,請使用 ExecuteStreamingSql API。Spanner 會自動建立特殊讀取函式和變更串流。讀取函式可存取變更串流的記錄。讀取函式的命名慣例為 READ_change_stream_name。
假設資料庫中存在變更串流 SingersNameStream,GoogleSQL 的查詢語法如下:
SELECT ChangeRecord
FROM READ_SingersNameStream (
start_timestamp,
end_timestamp,
partition_token,
heartbeat_milliseconds,
read_options
)
讀取函式接受下列引數:
| 引數名稱 | 類型 | 是否必要 | 說明 |
|---|---|---|---|
start_timestamp |
TIMESTAMP |
必填 | 指定應傳回 commit_timestamp 大於或等於 start_timestamp 的記錄。這個值必須在變更串流保留期限內,且應小於或等於目前時間,並大於或等於變更串流的建立時間戳記。 |
end_timestamp |
TIMESTAMP |
選用 (預設值:NULL) |
指定應傳回 commit_timestamp 小於或等於 end_timestamp 的記錄。這個值必須在變更串流保留期限內,且大於或等於 start_timestamp。查詢會在傳回所有 ChangeRecords (最多到 end_timestamp) 後完成,或在您終止連線時完成。如果 end_timestamp 設為 NULL 或未指定,查詢會持續執行,直到傳回所有 ChangeRecords 或您終止連線為止。 |
partition_token |
STRING |
選用 (預設值:NULL) |
根據子分區記錄的內容,指定要查詢的變更串流分區。如果為 NULL 或未指定,表示讀取器是首次查詢變更串流,且尚未取得任何特定分區權杖來查詢。 |
heartbeat_milliseconds |
INT64 |
必填 | 決定在沒有交易提交至這個分割區時,心跳 ChangeRecord 的傳回頻率。值必須介於 1,000 (一秒) 和 300,000 (五分鐘) 之間。 |
read_options |
ARRAY |
選用 (預設值:NULL) |
新增保留供日後使用的讀取選項。唯一允許的值為 NULL。 |
建議您建立輔助方法,用於建構讀取函式查詢的文字,並將參數繫結至該文字,如下列範例所示。
Java
private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE = "SELECT ChangeRecord FROM READ_SingersNameStream" + "(" + " start_timestamp => @startTimestamp," + " end_timestamp => @endTimestamp," + " partition_token => @partitionToken," + " heartbeat_milliseconds => @heartbeatMillis" + ")"; // Helper method to conveniently create change stream query texts and // bind parameters. public static Statement getChangeStreamQuery( String partitionToken, Timestamp startTimestamp, Timestamp endTimestamp, long heartbeatMillis) { return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE) .bind("startTimestamp") .to(startTimestamp) .bind("endTimestamp") .to(endTimestamp) .bind("partitionToken") .to(partitionToken) .bind("heartbeatMillis") .to(heartbeatMillis) .build(); }
PostgreSQL
如要查詢變更串流,請使用 ExecuteStreamingSql API。Spanner 會自動建立特殊讀取函式和變更串流。讀取函式可存取變更串流的記錄。讀取函式的命名慣例為 spanner.read_json_change_stream_name。
假設資料庫中存在變更串流 SingersNameStream,PostgreSQL 的查詢語法如下:
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
start_timestamp,
end_timestamp,
partition_token,
heartbeat_milliseconds,
null
)
讀取函式接受下列引數:
| 引數名稱 | 類型 | 是否必要 | 說明 |
|---|---|---|---|
start_timestamp |
timestamp with time zone |
必填 | 指定應傳回 commit_timestamp 大於或等於 start_timestamp 的變更記錄。這個值必須在變更串流保留期限內,且應小於或等於目前時間,並大於或等於變更串流的建立時間戳記。 |
end_timestamp |
timestamp with timezone |
選用 (預設值:NULL) |
指定要傳回的變更記錄,其 commit_timestamp 小於或等於 end_timestamp。這個值必須在變更串流保留期限內,且大於或等於 start_timestamp。
查詢會在傳回所有變更記錄 (最多到 end_timestamp) 後完成,或在您終止連線時完成。如果設為 NULL,查詢作業會持續執行,直到傳回所有變更記錄,或是您終止連線為止。 |
partition_token |
text |
選用 (預設值:NULL) |
根據子分區記錄的內容,指定要查詢的變更串流分區。如果為 NULL 或未指定,表示讀取器是首次查詢變更串流,且尚未取得任何特定分區權杖來查詢。 |
heartbeat_milliseconds |
bigint |
必填 | 決定在這個分割區中沒有已提交的交易時,心跳 ChangeRecord 的傳回頻率。
值必須介於 1,000 (一秒) 和 300,000 (五分鐘) 之間。 |
null |
null |
必填 | 保留供日後使用 |
建議您建立輔助方法,用於建構讀取函式的文字,並將參數繫結至該函式,如下列範例所示。
Java
private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE = "SELECT * FROM \"spanner\".\"read_json_SingersNameStream\"" + "($1, $2, $3, $4, null)"; // Helper method to conveniently create change stream query texts and // bind parameters. public static Statement getChangeStreamQuery( String partitionToken, Timestamp startTimestamp, Timestamp endTimestamp, long heartbeatMillis) { return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE) .bind("p1") .to(startTimestamp) .bind("p2") .to(endTimestamp) .bind("p3") .to(partitionToken) .bind("p4") .to(heartbeatMillis) .build(); }
變更串流記錄格式
GoogleSQL
變更串流讀取函式會傳回單一 ChangeRecord 資料欄,類型為 ARRAY<STRUCT<...>>。在每個資料列中,這個陣列一律只包含一個元素。
陣列元素具有下列型別:
STRUCT < data_change_record ARRAY<STRUCT<...>>, heartbeat_record ARRAY<STRUCT<...>>, child_partitions_record ARRAY<STRUCT<...>> >
這個 STRUCT 有三個欄位:data_change_record、heartbeat_record 和 child_partitions_record,每個欄位都是 ARRAY<STRUCT<...>> 類型。在變更串流讀取函式傳回的任何資料列中,只有這三個欄位之一包含值,其他兩個欄位則為空白或 NULL。這些陣列欄位最多只會包含一個元素。
以下各節將探討這三種記錄類型。
PostgreSQL
變更串流讀取函式會傳回單一 ChangeRecord 資料欄,其類型為 JSON,結構如下:
{
"data_change_record" : {},
"heartbeat_record" : {},
"child_partitions_record" : {}
}
這個物件有三個可能的鍵:data_change_record、heartbeat_record 和 child_partitions_record,對應的值類型為 JSON。在變更串流讀取函式傳回的任何資料列中,只會存在這三個鍵的其中一個。
以下各節將探討這三種記錄類型。
資料變更記錄
資料變更記錄包含一組對資料表的變更,這些變更具有相同的修改類型 (插入、更新或刪除),且在同一筆交易中,於同一個變更串流分割區內,以相同的提交時間戳記提交。多個變更串流分割區中,同一筆交易可能會傳回多筆資料變更記錄。
所有資料變更記錄都有 commit_timestamp、server_transaction_id 和 record_sequence 欄位,這些欄位共同決定串流記錄的變更串流順序。這三個欄位足以推導變更順序,並提供外部一致性。
請注意,如果多筆交易觸及的資料不重疊,則可能會有相同的提交時間戳記。server_transaction_id 欄位可區分哪些變更集 (可能跨變更串流分割區) 是在同一交易中發出。與 record_sequence 和 number_of_records_in_transaction 欄位配對後,您也可以緩衝及排序特定交易的所有記錄。
資料變更記錄的欄位包括:
GoogleSQL
| 欄位 | 類型 | 說明 |
|---|---|---|
commit_timestamp |
TIMESTAMP |
指出變更的提交時間戳記。 |
record_sequence |
STRING |
指出交易中記錄的序號。
序號在交易中是唯一的,且會單調遞增 (但不一定連續)。針對相同交易的記錄排序 server_transaction_id,即可重建交易中的變更順序。record_sequenceSpanner 可能會為了提升效能而調整排序,因此不一定會與您提供的原始排序相符。 |
server_transaction_id |
STRING |
提供全域不重複字串,代表變更已提交的交易。這個值只能用於處理變更串流記錄,且與 Spanner API 中的交易 ID 無關。 |
is_last_record_in_transaction_in_partition |
BOOL |
指出這是否為目前分區中交易的最後一筆記錄。 |
table_name |
STRING |
受變更影響的資料表名稱。 |
value_capture_type |
STRING |
說明擷取這項變更時,變更串流設定中指定的擷取類型值。 值擷取類型可以是下列其中一項:
預設值為 |
column_types |
[
{
"name": "STRING",
"type": {
"code": "STRING"
},
"is_primary_key": BOOLEAN
"ordinal_position": NUMBER
},
...
] |
指出資料欄名稱、資料欄類型、是否為主鍵,以及結構定義 (ordinal_position) 中定義的資料欄位置。結構定義中表格的第一欄序數位置為 1。陣列資料欄的資料欄類型可以巢狀結構。格式與 Spanner API 參考資料中說明的類型結構相符。 |
mods |
[
{
"keys": {"STRING" : "STRING"},
"new_values": {
"STRING" : "VALUE-TYPE",
[...]
},
"old_values": {
"STRING" : "VALUE-TYPE",
[...]
},
},
[...]
]
|
說明所做的變更,包括主鍵值、舊值,以及變更或追蹤的資料欄新值。舊值和新值是否可用,以及內容為何,取決於已設定的 value_capture_type。new_values 和 old_values 欄位只包含非索引鍵資料欄。 |
mod_type |
STRING |
說明變更類型。可能的值為 INSERT、UPDATE 或 DELETE。 |
number_of_records_in_transaction |
INT64 |
指出所有變更串流分區中,屬於這項交易的資料變更記錄數量。 |
number_of_partitions_in_transaction |
INT64 |
指出傳回這項交易資料變更記錄的分區數量。 |
transaction_tag |
STRING |
指出與這筆交易相關聯的 交易代碼。 |
is_system_transaction |
BOOL |
指出交易是否為系統交易。 |
PostgreSQL
| 欄位 | 類型 | 說明 |
|---|---|---|
commit_timestamp |
STRING |
指出變更的提交時間戳記。 |
record_sequence |
STRING |
指出交易中記錄的序號。
序號在交易中是唯一的,且會單調遞增 (但不一定連續)。針對相同交易排序記錄,server_transaction_idrecord_sequence 即可重建交易中的變更順序。 |
server_transaction_id |
STRING |
提供全域不重複字串,代表變更已提交的交易。這個值只能用於處理變更串流記錄,且與 Spanner API 中的交易 ID 無關 |
is_last_record_in_transaction_in_partition |
BOOLEAN |
指出這是否為目前分區中交易的最後一筆記錄。 |
table_name |
STRING |
指出受到變更影響的資料表名稱。 |
value_capture_type |
STRING |
說明擷取這項變更時,變更串流設定中指定的擷取類型值。 值擷取類型可以是下列其中一項:
預設值為 |
column_types |
[
{
"name": "STRING",
"type": {
"code": "STRING"
},
"is_primary_key": BOOLEAN
"ordinal_position": NUMBER
},
...
] |
指出資料欄名稱、資料欄類型、是否為主鍵,以及結構定義 (ordinal_position) 中定義的資料欄位置。結構定義中表格的第一欄序數位置為 1。陣列資料欄的資料欄類型可以巢狀結構。格式與 Spanner API 參考資料中說明的類型結構相符。 |
mods |
[
{
"keys": {"STRING" : "STRING"},
"new_values": {
"STRING" : "VALUE-TYPE",
[...]
},
"old_values": {
"STRING" : "VALUE-TYPE",
[...]
},
},
[...]
]
|
說明所做的變更,包括主鍵值、舊值,以及變更或追蹤的資料欄新值。舊值和新值的可用性和內容取決於設定的 value_capture_type。new_values 和 old_values 欄位只包含非鍵欄。 |
mod_type |
STRING |
說明變更類型。可能的值為 INSERT、UPDATE 或 DELETE。 |
number_of_records_in_transaction |
INT64 |
指出所有變更串流分區中,屬於這項交易的資料變更記錄數量。 |
number_of_partitions_in_transaction |
NUMBER |
指出傳回這項交易資料變更記錄的分區數量。 |
transaction_tag |
STRING |
指出與這筆交易相關聯的 交易代碼。 |
is_system_transaction |
BOOLEAN |
指出交易是否為系統交易。 |
資料變更記錄範例
以下是一組資料變更記錄範例。這類交易說明的是兩個帳戶之間的轉帳交易。這兩個帳戶位於不同的變更串流分割區。
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {
"LastUpdate": "2022-09-26T11:28:00.189413Z",
"Balance": 1500
},
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "OLD_AND_NEW_VALUES",
"number_of_records_in_transaction": 2,
"number_of_partitions_in_transaction": 2,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false,
}
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
"record_sequence": "00000001",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id2"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 2000
},
"old_values": {
"LastUpdate": "2022-01-20T11:25:00.199915Z",
"Balance": 1500
},
},
...
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "OLD_AND_NEW_VALUES",
"number_of_records_in_transaction": 2,
"number_of_partitions_in_transaction": 2,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false,
}
以下資料變更記錄是擷取類型為 NEW_VALUES 的記錄範例。請注意,系統只會填入新值。
只有 LastUpdate 欄經過修改,因此只會傳回該欄。
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z"
},
"old_values": {}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_VALUES",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
以下資料變更記錄是擷取類型為 NEW_ROW 的記錄範例。只有 LastUpdate 資料欄經過修改,但系統會傳回所有追蹤的資料欄。
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_ROW",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
以下資料變更記錄是擷取類型為 NEW_ROW_AND_OLD_VALUES 的記錄範例。只有 LastUpdate 欄經過修改,但系統會傳回所有追蹤的欄。這個值擷取類型會擷取 LastUpdate 的新值和舊值。
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {
"LastUpdate": "2022-09-26T11:28:00.189413Z"
}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_ROW_AND_OLD_VALUES",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
心跳記錄
系統傳回心跳記錄時,表示所有變更 (commit_timestamp 小於或等於心跳記錄的 timestamp) 都已傳回,且這個分割區的未來資料記錄必須具有高於心跳記錄傳回值的提交時間戳記。如果沒有資料變更寫入分區,系統就會傳回心跳記錄。如果分區有資料變更寫入,則可使用 data_change_record.commit_timestamp (而非 heartbeat_record.timestamp),告知讀取器正在讀取分區中的後續資料。
您可以使用分區傳回的心跳記錄,在所有分區中同步處理讀取器。一旦所有讀取器都收到大於或等於某個時間戳記 A 的心跳訊號,或是收到大於或等於時間戳記 A 的資料或子分割區記錄,讀取器就會知道自己已收到在該時間戳記 A 之前或當時提交的所有記錄,並可開始處理緩衝記錄,例如依時間戳記排序跨分割區記錄,並依 server_transaction_id 分組。
心跳記錄只包含一個欄位:
GoogleSQL
| 欄位 | 類型 | 說明 |
|---|---|---|
timestamp |
TIMESTAMP |
表示活動訊號記錄的時間戳記。 |
PostgreSQL
| 欄位 | 類型 | 說明 |
|---|---|---|
timestamp |
STRING |
表示活動訊號記錄的時間戳記。 |
活動訊號記錄範例
以下是心跳記錄範例,表示時間戳記小於或等於這筆記錄時間戳記的所有記錄都已傳回:
heartbeat_record: {
"timestamp": "2022-09-27T12:35:00.312486Z"
}
子分割區記錄
子分區記錄會傳回子分區的相關資訊,包括分區權杖、父分區的權杖,以及代表子分區最早時間戳記的 start_timestamp,其中包含變更記錄。提交時間戳記緊接在 child_partitions_record.start_timestamp 之前的記錄,會傳回至目前的分區。傳回這個分區的所有子分區記錄後,這項查詢會傳回成功狀態,表示這個分區的所有記錄都已傳回。
子分割區記錄的欄位包括:
GoogleSQL
| 欄位 | 類型 | 說明 |
|---|---|---|
start_timestamp |
TIMESTAMP |
表示這個子分割區記錄中,從子分割區傳回的資料變更記錄,其提交時間戳記大於或等於 start_timestamp。查詢子分區時,查詢應指定子分區權杖,以及大於或等於 child_partitions_token.start_timestamp 的 start_timestamp。分區傳回的所有子分區記錄都具有相同的 start_timestamp,且時間戳記一律會落在查詢指定的 start_timestamp 和 end_timestamp 之間。 |
record_sequence |
STRING |
表示單調遞增的序號,可用於定義特定分區中,具有相同 start_timestamp 的多個子分區記錄傳回時,子分區記錄的排序。分區權杖 start_timestamp 和 record_sequence 可唯一識別子分區記錄。 |
child_partitions |
[
{
"token" : "STRING",
"parent_partition_tokens" : ["STRING"]
}
] |
傳回一組子分割區及其相關聯的資訊。 這包括用於識別查詢中子分區的分區權杖字串,以及父項分區的權杖。 |
PostgreSQL
| 欄位 | 類型 | 說明 |
|---|---|---|
start_timestamp |
STRING |
表示這個子分割區記錄中,從子分割區傳回的資料變更記錄,其提交時間戳記大於或等於 start_timestamp。查詢子分區時,查詢應指定子分區權杖,以及大於或等於 child_partitions_token.start_timestamp 的 start_timestamp。分區傳回的所有子分區記錄都具有相同的 start_timestamp,且時間戳記一律會落在查詢指定的 start_timestamp 和 end_timestamp 之間。 |
record_sequence |
STRING |
表示單調遞增的序號,可用於定義特定分區中,具有相同 start_timestamp 的多個子分區記錄傳回時,子分區記錄的排序。分區權杖 start_timestamp 和 record_sequence 可唯一識別子分區記錄。 |
child_partitions |
[
{
"token": "STRING",
"parent_partition_tokens": ["STRING"],
}, [...]
] |
傳回子區隔陣列和相關聯的資訊。 這包括用於識別查詢中子分區的分區符記字串,以及上層分區的符記。 |
子分區記錄範例
以下是子區塊記錄的範例:
child_partitions_record: {
"start_timestamp": "2022-09-27T12:40:00.562986Z",
"record_sequence": "00000001",
"child_partitions": [
{
"token": "child_token_1",
// To make sure changes for a key is processed in timestamp
// order, wait until the records returned from all parents
// have been processed.
"parent_partition_tokens": ["parent_token_1", "parent_token_2"]
}
],
}
變更串流查詢工作流程
使用 ExecuteStreamingSql API 執行變更串流查詢,並搭配單次使用的唯讀交易和嚴格的時間戳記界限。變更串流讀取函式可讓您指定感興趣時間範圍的 start_timestamp 和 end_timestamp。在保留期限內,您可以使用強式唯讀時間戳記界限,存取所有變更記錄。
所有其他TransactionOptions
在變更串流查詢中均無效。此外,如果 TransactionOptions.read_only.return_read_timestamp 設為 true,則在說明交易的 Transaction 訊息中,會傳回 kint64max - 1 這個特殊值,而非有效的讀取時間戳記。這個特殊值應捨棄,不得用於任何後續查詢。
每項變更串流查詢都會傳回任意數量的資料列,每列包含資料變更記錄、心跳記錄或子分割區記錄。無須為要求設定期限。
變更串流查詢工作流程範例
串流查詢工作流程的第一步是發出第一個變更串流查詢,方法是指定 partition_token 至 NULL。查詢需要指定變更串流的讀取函式、感興趣的開始和結束時間戳記,以及心跳間隔。如果 end_timestamp 為 NULL,查詢會持續傳回資料變更,直到分區結束為止。
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:00Z",
end_timestamp => NULL,
partition_token => NULL,
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:00Z',
NULL,
NULL,
10000,
NULL
) ;
處理這項查詢的資料記錄,直到傳回所有子分區記錄為止。在下列範例中,系統會傳回兩筆子分區記錄和三個分區權杖,然後終止查詢。特定查詢的子分割區記錄一律共用相同的 start_timestamp。
child_partitions_record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:00:01Z",
"record_sequence": "1000012389",
"child_partitions": [
{
"token": "child_token_1",
// Note parent tokens are null for child partitions returned
// from the initial change stream queries.
"parent_partition_tokens": [NULL]
}
{
"token": "child_token_2",
"parent_partition_tokens": [NULL]
}
],
}
child_partitions_record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:00:01Z",
"record_sequence": "1000012390",
"child_partitions": [
{
"token": "child_token_3",
"parent_partition_tokens": [NULL]
}
],
}
如要在 2022-05-01T09:00:01Z 後處理變更,請建立三個新查詢並平行執行。這三項查詢會一併使用,傳回父項涵蓋的相同鍵值範圍資料變更。請一律將 start_timestamp 設為相同子分割區記錄中的 start_timestamp,並使用相同的 end_timestamp 和心跳間隔,在所有查詢中一致處理記錄。
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_1",
heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_2",
heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_3",
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_1',
10000,
NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_2',
10000,
NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_3',
10000,
NULL
);
在傳回另一個子項分割區記錄後,child_token_2 上的查詢就會完成。這項記錄表示新分割區涵蓋 child_token_2 和 child_token_3 的變更,起始時間為 2022-05-01T09:30:15Z。查詢會傳回完全相同的記錄,因為兩者都是新 child_token_4 的父項分割區。child_token_3如要確保特定鍵的資料記錄嚴格依序處理,child_token_4上的查詢必須在所有父項完成後開始。在本例中,父項為 child_token_2 和 child_token_3。每個子區隔權杖只能建立一個查詢。查詢工作流程設計應指派一個父項等待,並在 child_token_4 上排定查詢時間。
child_partitions_record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:30:15Z",
"record_sequence": "1000012389",
"child_partitions": [
{
"token": "child_token_4",
"parent_partition_tokens": ["child_token_2", "child_token_3"],
}
],
}
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream(
start_timestamp => "2022-05-01T09:30:15Z",
end_timestamp => NULL,
partition_token => "child_token_4",
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:30:15Z',
NULL,
'child_token_4',
10000,
NULL
);
如需處理及剖析變更串流記錄的範例,請前往 GitHub 上的 Apache Beam SpannerIO Dataflow 連接器。